Automatically reconnect on connection reset

This commit is contained in:
Lonami Exo 2018-06-08 20:41:48 +02:00
parent 5c917fb425
commit 92b606a3e8

View File

@ -42,6 +42,8 @@ class MTProtoSender:
def __init__(self, session): def __init__(self, session):
self.session = session self.session = session
self._connection = ConnectionTcpFull() self._connection = ConnectionTcpFull()
self._ip = None
self._port = None
# Whether the user has explicitly connected or disconnected. # Whether the user has explicitly connected or disconnected.
# #
@ -50,6 +52,7 @@ class MTProtoSender:
# be cleared but on explicit user disconnection all the # be cleared but on explicit user disconnection all the
# pending futures should be cancelled. # pending futures should be cancelled.
self._user_connected = False self._user_connected = False
self._reconnecting = False
# Send and receive calls must be atomic # Send and receive calls must be atomic
self._send_lock = asyncio.Lock() self._send_lock = asyncio.Lock()
@ -100,10 +103,14 @@ class MTProtoSender:
if self._user_connected: if self._user_connected:
return return
# TODO Generate auth_key if needed self._ip = ip
async with self._send_lock: self._port = port
await self._connection.connect(ip, port)
self._user_connected = True self._user_connected = True
await self._connect()
async def _connect(self):
async with self._send_lock:
await self._connection.connect(self._ip, self._port)
# TODO Handle SecurityError, AssertionError # TODO Handle SecurityError, AssertionError
if self.session.auth_key is None: if self.session.auth_key is None:
@ -114,6 +121,19 @@ class MTProtoSender:
self._send_loop_handle = asyncio.ensure_future(self._send_loop()) self._send_loop_handle = asyncio.ensure_future(self._send_loop())
self._recv_loop_handle = asyncio.ensure_future(self._recv_loop()) self._recv_loop_handle = asyncio.ensure_future(self._recv_loop())
async def _reconnect(self):
"""
Cleanly disconnects and then reconnects.
"""
self._reconnecting = True
await self._send_loop_handle
await self._recv_loop_handle
async with self._send_lock:
await self._connection.close()
self._reconnecting = False
await self._connect()
async def disconnect(self): async def disconnect(self):
""" """
Cleanly disconnects the instance from the network, cancels Cleanly disconnects the instance from the network, cancels
@ -194,7 +214,7 @@ class MTProtoSender:
Besides `connect`, only this method ever sends data. Besides `connect`, only this method ever sends data.
""" """
while self._user_connected: while self._user_connected and not self._reconnecting:
if self._pending_ack: if self._pending_ack:
await self._send_queue.put(TLMessage( await self._send_queue.put(TLMessage(
self.session, MsgsAck(list(self._pending_ack)))) self.session, MsgsAck(list(self._pending_ack))))
@ -235,7 +255,7 @@ class MTProtoSender:
Besides `connect`, only this method ever receives data. Besides `connect`, only this method ever receives data.
""" """
while self._user_connected: while self._user_connected and not self._reconnecting:
# TODO Are there more exceptions besides timeout? # TODO Are there more exceptions besides timeout?
# Disconnecting or switching off WiFi only resulted in # Disconnecting or switching off WiFi only resulted in
# timeouts, and once the network was back it continued # timeouts, and once the network was back it continued
@ -245,18 +265,32 @@ class MTProtoSender:
body = await self._connection.recv() body = await self._connection.recv()
except asyncio.TimeoutError: except asyncio.TimeoutError:
continue continue
except ConnectionError:
asyncio.ensure_future(self._reconnect())
break
# TODO Check salt, session_id and sequence_number # TODO Check salt, session_id and sequence_number
try: try:
message, remote_msg_id, remote_seq =\ message, remote_msg_id, remote_seq =\
helpers.unpack_message(self.session, body) helpers.unpack_message(self.session, body)
except (BrokenAuthKeyError, BufferError): except (BrokenAuthKeyError, BufferError):
# TODO Are these temporary or do we need a new key? # The authorization key may be broken if a message was
pass # sent malformed, or if the authkey truly is corrupted.
#
# There may be a buffer error if Telegram's response was too
# short and hence not understood. Reset the authorization key
# and try again in either case.
#
# TODO Is it possible to detect malformed messages vs
# an actually broken authkey?
self.session.auth_key = None
asyncio.ensure_future(self._reconnect())
break
except SecurityError: except SecurityError:
# TODO Can we safely ignore these? Has the message # A step while decoding had the incorrect data. This message
# been decoded correctly? # should not be considered safe and it should be ignored.
pass # TODO Maybe we should check if the message was decoded OK
continue
else: else:
with BinaryReader(message) as reader: with BinaryReader(message) as reader:
await self._process_message( await self._process_message(