Handle IncompleteReadError and InvalidChecksumError

This commit is contained in:
Lonami Exo 2018-10-03 14:46:10 +02:00
parent 3fd7c33127
commit e319fa3aa9
2 changed files with 38 additions and 14 deletions

View File

@ -1,5 +1,9 @@
import abc import abc
import asyncio import asyncio
import logging
__log__ = logging.getLogger(__name__)
class Connection(abc.ABC): class Connection(abc.ABC):
@ -74,36 +78,44 @@ class Connection(abc.ABC):
""" """
return self._send_queue.put(data) return self._send_queue.put(data)
def recv(self): async def recv(self):
""" """
Receives a packet of data through this connection mode. Receives a packet of data through this connection mode.
This method returns a coroutine. This method returns a coroutine.
""" """
return self._recv_queue.get() ok, result = await self._recv_queue.get()
if ok:
return result
else:
raise result from None
# TODO Get/put to the queue with cancellation # TODO Get/put to the queue with cancellation
async def _send_loop(self): async def _send_loop(self):
""" """
This loop is constantly popping items off the queue to send them. This loop is constantly popping items off the queue to send them.
""" """
try:
while not self._disconnected.is_set(): while not self._disconnected.is_set():
self._send(await self._send_queue.get()) self._send(await self._send_queue.get())
await self._writer.drain() await self._writer.drain()
except asyncio.CancelledError:
pass
except Exception:
logging.exception('Unhandled exception in the sending loop')
self.disconnect()
# TODO Handle IncompleteReadError and InvalidChecksumError
async def _recv_loop(self): async def _recv_loop(self):
""" """
This loop is constantly putting items on the queue as they're read. This loop is constantly putting items on the queue as they're read.
""" """
while not self._disconnected.is_set():
try: try:
while not self._disconnected.is_set():
data = await self._recv() data = await self._recv()
except asyncio.IncompleteReadError: await self._recv_queue.put((True, data))
if not self._disconnected.is_set(): except Exception as e:
raise await self._recv_queue.put((False, e))
else: self.disconnect()
await self._recv_queue.put(data)
@abc.abstractmethod @abc.abstractmethod
def _send(self, data): def _send(self, data):

View File

@ -9,7 +9,7 @@ from .requeststate import RequestState
from .. import utils from .. import utils
from ..errors import ( from ..errors import (
BadMessageError, BrokenAuthKeyError, SecurityError, TypeNotFoundError, BadMessageError, BrokenAuthKeyError, SecurityError, TypeNotFoundError,
rpc_message_to_error InvalidChecksumError, rpc_message_to_error
) )
from ..extensions import BinaryReader from ..extensions import BinaryReader
from ..helpers import _ReadyQueue from ..helpers import _ReadyQueue
@ -376,6 +376,11 @@ class MTProtoSender:
__log__.warning('Security error while unpacking a ' __log__.warning('Security error while unpacking a '
'received message: %s', e) 'received message: %s', e)
continue continue
except InvalidChecksumError as e:
__log__.warning(
'Invalid checksum on the read packet (was %s expected %s)',
e.checksum, e.valid_checksum
)
except asyncio.CancelledError: except asyncio.CancelledError:
return return
except (BrokenAuthKeyError, BufferError): except (BrokenAuthKeyError, BufferError):
@ -383,6 +388,13 @@ class MTProtoSender:
self._connection._state.auth_key = None self._connection._state.auth_key = None
self._start_reconnect() self._start_reconnect()
return return
except asyncio.IncompleteReadError:
# TODO Handle packets that are too big and trigger this
# If it's not a packet that triggered this, just reconnect
__log__.info('Telegram closed the connection')
self._pending_state.clear()
self._start_reconnect()
return
except Exception: except Exception:
__log__.exception('Unhandled error while receiving data') __log__.exception('Unhandled error while receiving data')
self._start_reconnect() self._start_reconnect()