mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-02-17 03:51:05 +03:00
Handle receiving errors
This commit is contained in:
parent
cf7e5d5592
commit
bc1fd9039d
|
@ -34,6 +34,9 @@ class MTProtoLayer:
|
||||||
"""
|
"""
|
||||||
self._connection.disconnect()
|
self._connection.disconnect()
|
||||||
|
|
||||||
|
def reset_state(self):
|
||||||
|
self._state = MTProtoState(self._state.auth_key)
|
||||||
|
|
||||||
async def send(self, state_list):
|
async def send(self, state_list):
|
||||||
"""
|
"""
|
||||||
The list of `RequestState` that will be sent. They will
|
The list of `RequestState` that will be sent. They will
|
||||||
|
|
|
@ -8,7 +8,8 @@ from .mtprotoplainsender import MTProtoPlainSender
|
||||||
from .requeststate import RequestState
|
from .requeststate import RequestState
|
||||||
from .. import utils
|
from .. import utils
|
||||||
from ..errors import (
|
from ..errors import (
|
||||||
BadMessageError, SecurityError, TypeNotFoundError, rpc_message_to_error
|
BadMessageError, BrokenAuthKeyError, SecurityError, TypeNotFoundError,
|
||||||
|
rpc_message_to_error
|
||||||
)
|
)
|
||||||
from ..extensions import BinaryReader
|
from ..extensions import BinaryReader
|
||||||
from ..helpers import _ReadyQueue
|
from ..helpers import _ReadyQueue
|
||||||
|
@ -279,6 +280,9 @@ class MTProtoSender:
|
||||||
"""
|
"""
|
||||||
self._reconnecting = True
|
self._reconnecting = True
|
||||||
|
|
||||||
|
__log__.debug('Closing current connection...')
|
||||||
|
self._connection.disconnect()
|
||||||
|
|
||||||
__log__.debug('Awaiting for the send loop before reconnecting...')
|
__log__.debug('Awaiting for the send loop before reconnecting...')
|
||||||
await self._send_loop_handle
|
await self._send_loop_handle
|
||||||
|
|
||||||
|
@ -290,22 +294,24 @@ class MTProtoSender:
|
||||||
|
|
||||||
self._reconnecting = False
|
self._reconnecting = False
|
||||||
|
|
||||||
|
# Start with a clean state (and thus session ID) to avoid old msgs
|
||||||
|
self._connection.reset_state()
|
||||||
|
|
||||||
retries = self._retries if self._auto_reconnect else 0
|
retries = self._retries if self._auto_reconnect else 0
|
||||||
for retry in range(1, retries + 1):
|
for retry in range(1, retries + 1):
|
||||||
try:
|
try:
|
||||||
await self._connect()
|
await self._connect()
|
||||||
# TODO Keep this?
|
except ConnectionError:
|
||||||
"""
|
__log__.info('Failed reconnection retry %d/%d', retry, retries)
|
||||||
for m in self._pending_messages.values():
|
else:
|
||||||
self._send_queue.put_nowait(m)
|
self._send_queue.extend(self._pending_state.values())
|
||||||
"""
|
self._pending_state.clear()
|
||||||
|
|
||||||
|
# TODO Where is this needed?
|
||||||
if self._auto_reconnect_callback:
|
if self._auto_reconnect_callback:
|
||||||
self._loop.create_task(self._auto_reconnect_callback())
|
self._loop.create_task(self._auto_reconnect_callback())
|
||||||
|
|
||||||
break
|
break
|
||||||
except ConnectionError:
|
|
||||||
__log__.info('Failed reconnection retry %d/%d', retry, retries)
|
|
||||||
else:
|
else:
|
||||||
__log__.error('Failed to reconnect automatically.')
|
__log__.error('Failed to reconnect automatically.')
|
||||||
await self._disconnect(error=ConnectionError())
|
await self._disconnect(error=ConnectionError())
|
||||||
|
@ -356,11 +362,36 @@ class MTProtoSender:
|
||||||
Besides `connect`, only this method ever receives data.
|
Besides `connect`, only this method ever receives data.
|
||||||
"""
|
"""
|
||||||
while self._user_connected and not self._reconnecting:
|
while self._user_connected and not self._reconnecting:
|
||||||
# TODO Handle timeout, cancelled, arbitrary, broken auth, buffer,
|
# TODO handle incomplete read?
|
||||||
# security and type not found.
|
|
||||||
__log__.debug('Receiving items from the network...')
|
__log__.debug('Receiving items from the network...')
|
||||||
message = await self._connection.recv()
|
try:
|
||||||
await self._process_message(message)
|
message = await self._connection.recv()
|
||||||
|
except TypeNotFoundError as e:
|
||||||
|
__log__.info('Type %08x not found, remaining data %r',
|
||||||
|
e.invalid_constructor_id, e.remaining)
|
||||||
|
continue
|
||||||
|
except SecurityError as e:
|
||||||
|
# A step while decoding had the incorrect data. This message
|
||||||
|
# should not be considered safe and it should be ignored.
|
||||||
|
__log__.warning('Security error while unpacking a '
|
||||||
|
'received message: %s', e)
|
||||||
|
continue
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
return
|
||||||
|
except (BrokenAuthKeyError, BufferError):
|
||||||
|
__log__.info('Broken authorization key; resetting')
|
||||||
|
self._connection._state.auth_key = None
|
||||||
|
self._start_reconnect()
|
||||||
|
return
|
||||||
|
except Exception:
|
||||||
|
__log__.exception('Unhandled error while receiving data')
|
||||||
|
self._start_reconnect()
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
await self._process_message(message)
|
||||||
|
except Exception:
|
||||||
|
__log__.exception('Unhandled error while processing msgs')
|
||||||
|
|
||||||
# Response Handlers
|
# Response Handlers
|
||||||
|
|
||||||
|
|
|
@ -120,6 +120,7 @@ class MTProtoState:
|
||||||
else:
|
else:
|
||||||
raise BufferError("Can't decode packet ({})".format(body))
|
raise BufferError("Can't decode packet ({})".format(body))
|
||||||
|
|
||||||
|
# TODO Check salt, session_id and sequence_number
|
||||||
key_id = struct.unpack('<Q', body[:8])[0]
|
key_id = struct.unpack('<Q', body[:8])[0]
|
||||||
if key_id != self.auth_key.key_id:
|
if key_id != self.auth_key.key_id:
|
||||||
raise SecurityError('Server replied with an invalid auth key')
|
raise SecurityError('Server replied with an invalid auth key')
|
||||||
|
|
Loading…
Reference in New Issue
Block a user