From 92b606a3e8dfc29b40ad273f070a0f54ca3cfcf6 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 8 Jun 2018 20:41:48 +0200 Subject: [PATCH] Automatically reconnect on connection reset --- telethon/network/mtprotosender.py | 54 +++++++++++++++++++++++++------ 1 file changed, 44 insertions(+), 10 deletions(-) diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index 9b93c053..e7cbb3e9 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -42,6 +42,8 @@ class MTProtoSender: def __init__(self, session): self.session = session self._connection = ConnectionTcpFull() + self._ip = None + self._port = None # Whether the user has explicitly connected or disconnected. # @@ -50,6 +52,7 @@ class MTProtoSender: # be cleared but on explicit user disconnection all the # pending futures should be cancelled. self._user_connected = False + self._reconnecting = False # Send and receive calls must be atomic self._send_lock = asyncio.Lock() @@ -100,10 +103,14 @@ class MTProtoSender: if self._user_connected: return - # TODO Generate auth_key if needed - async with self._send_lock: - await self._connection.connect(ip, port) + self._ip = ip + self._port = port self._user_connected = True + await self._connect() + + async def _connect(self): + async with self._send_lock: + await self._connection.connect(self._ip, self._port) # TODO Handle SecurityError, AssertionError if self.session.auth_key is None: @@ -114,6 +121,19 @@ class MTProtoSender: self._send_loop_handle = asyncio.ensure_future(self._send_loop()) self._recv_loop_handle = asyncio.ensure_future(self._recv_loop()) + async def _reconnect(self): + """ + Cleanly disconnects and then reconnects. + """ + self._reconnecting = True + await self._send_loop_handle + await self._recv_loop_handle + async with self._send_lock: + await self._connection.close() + + self._reconnecting = False + await self._connect() + async def disconnect(self): """ Cleanly disconnects the instance from the network, cancels @@ -194,7 +214,7 @@ class MTProtoSender: Besides `connect`, only this method ever sends data. """ - while self._user_connected: + while self._user_connected and not self._reconnecting: if self._pending_ack: await self._send_queue.put(TLMessage( self.session, MsgsAck(list(self._pending_ack)))) @@ -235,7 +255,7 @@ class MTProtoSender: Besides `connect`, only this method ever receives data. """ - while self._user_connected: + while self._user_connected and not self._reconnecting: # TODO Are there more exceptions besides timeout? # Disconnecting or switching off WiFi only resulted in # timeouts, and once the network was back it continued @@ -245,18 +265,32 @@ class MTProtoSender: body = await self._connection.recv() except asyncio.TimeoutError: continue + except ConnectionError: + asyncio.ensure_future(self._reconnect()) + break # TODO Check salt, session_id and sequence_number try: message, remote_msg_id, remote_seq =\ helpers.unpack_message(self.session, body) except (BrokenAuthKeyError, BufferError): - # TODO Are these temporary or do we need a new key? - pass + # The authorization key may be broken if a message was + # sent malformed, or if the authkey truly is corrupted. + # + # There may be a buffer error if Telegram's response was too + # short and hence not understood. Reset the authorization key + # and try again in either case. + # + # TODO Is it possible to detect malformed messages vs + # an actually broken authkey? + self.session.auth_key = None + asyncio.ensure_future(self._reconnect()) + break except SecurityError: - # TODO Can we safely ignore these? Has the message - # been decoded correctly? - pass + # A step while decoding had the incorrect data. This message + # should not be considered safe and it should be ignored. + # TODO Maybe we should check if the message was decoded OK + continue else: with BinaryReader(message) as reader: await self._process_message(