diff --git a/telethon/network/mtprotolayer.py b/telethon/network/mtprotolayer.py index 2960b28e..d3cd7ae4 100644 --- a/telethon/network/mtprotolayer.py +++ b/telethon/network/mtprotolayer.py @@ -34,6 +34,9 @@ class MTProtoLayer: """ self._connection.disconnect() + def reset_state(self): + self._state = MTProtoState(self._state.auth_key) + async def send(self, state_list): """ The list of `RequestState` that will be sent. They will diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index 8196932c..5dc5a530 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -8,7 +8,8 @@ from .mtprotoplainsender import MTProtoPlainSender from .requeststate import RequestState from .. import utils from ..errors import ( - BadMessageError, SecurityError, TypeNotFoundError, rpc_message_to_error + BadMessageError, BrokenAuthKeyError, SecurityError, TypeNotFoundError, + rpc_message_to_error ) from ..extensions import BinaryReader from ..helpers import _ReadyQueue @@ -279,6 +280,9 @@ class MTProtoSender: """ self._reconnecting = True + __log__.debug('Closing current connection...') + self._connection.disconnect() + __log__.debug('Awaiting for the send loop before reconnecting...') await self._send_loop_handle @@ -290,22 +294,24 @@ class MTProtoSender: self._reconnecting = False + # Start with a clean state (and thus session ID) to avoid old msgs + self._connection.reset_state() + retries = self._retries if self._auto_reconnect else 0 for retry in range(1, retries + 1): try: await self._connect() - # TODO Keep this? - """ - for m in self._pending_messages.values(): - self._send_queue.put_nowait(m) - """ + except ConnectionError: + __log__.info('Failed reconnection retry %d/%d', retry, retries) + else: + self._send_queue.extend(self._pending_state.values()) + self._pending_state.clear() + # TODO Where is this needed? if self._auto_reconnect_callback: self._loop.create_task(self._auto_reconnect_callback()) break - except ConnectionError: - __log__.info('Failed reconnection retry %d/%d', retry, retries) else: __log__.error('Failed to reconnect automatically.') await self._disconnect(error=ConnectionError()) @@ -356,11 +362,36 @@ class MTProtoSender: Besides `connect`, only this method ever receives data. """ while self._user_connected and not self._reconnecting: - # TODO Handle timeout, cancelled, arbitrary, broken auth, buffer, - # security and type not found. + # TODO handle incomplete read? __log__.debug('Receiving items from the network...') - message = await self._connection.recv() - await self._process_message(message) + try: + message = await self._connection.recv() + except TypeNotFoundError as e: + __log__.info('Type %08x not found, remaining data %r', + e.invalid_constructor_id, e.remaining) + continue + except SecurityError as e: + # A step while decoding had the incorrect data. This message + # should not be considered safe and it should be ignored. + __log__.warning('Security error while unpacking a ' + 'received message: %s', e) + continue + except asyncio.CancelledError: + return + except (BrokenAuthKeyError, BufferError): + __log__.info('Broken authorization key; resetting') + self._connection._state.auth_key = None + self._start_reconnect() + return + except Exception: + __log__.exception('Unhandled error while receiving data') + self._start_reconnect() + return + else: + try: + await self._process_message(message) + except Exception: + __log__.exception('Unhandled error while processing msgs') # Response Handlers diff --git a/telethon/network/mtprotostate.py b/telethon/network/mtprotostate.py index e38dad60..4b3dbefa 100644 --- a/telethon/network/mtprotostate.py +++ b/telethon/network/mtprotostate.py @@ -120,6 +120,7 @@ class MTProtoState: else: raise BufferError("Can't decode packet ({})".format(body)) + # TODO Check salt, session_id and sequence_number key_id = struct.unpack('