diff --git a/telethon/network/connection/connection.py b/telethon/network/connection/connection.py index d6a916b2..ac25fe5d 100644 --- a/telethon/network/connection/connection.py +++ b/telethon/network/connection/connection.py @@ -296,8 +296,10 @@ class Connection(abc.ABC): This method returns a coroutine. """ while self._connected: - result = await self._recv_queue.get() - if result: # None = sentinel value = keep trying + result, err = await self._recv_queue.get() + if err: + raise err + if result: return result raise ConnectionError('Not connected') @@ -324,37 +326,29 @@ class Connection(abc.ABC): """ This loop is constantly putting items on the queue as they're read. """ - while self._connected: - try: - data = await self._recv() - except asyncio.CancelledError: - break - except Exception as e: - if isinstance(e, (IOError, asyncio.IncompleteReadError)): - msg = 'The server closed the connection' - self._log.info(msg) - elif isinstance(e, InvalidChecksumError): - msg = 'The server response had an invalid checksum' - self._log.info(msg) - elif isinstance(e, InvalidBufferError): - msg = 'The server response had an invalid buffer' - self._log.error(msg) + try: + while self._connected: + try: + data = await self._recv() + except (IOError, asyncio.IncompleteReadError) as e: + self._log.warning('Server closed the connection: %s', e) + await self._recv_queue.put((None, e)) + except InvalidChecksumError as e: + self._log.warning('Server response had invalid checksum: %s', e) + await self._recv_queue.put((None, e)) + except InvalidBufferError as e: + self._log.warning('Server response had invalid buffer: %s', e) + await self._recv_queue.put((None, e)) + except Exception: + self._log.exception('Unexpected exception in the receive loop') + await self._recv_queue.put((None, e)) else: - msg = 'Unexpected exception in the receive loop' - self._log.exception(msg) + await self._recv_queue.put((data, None)) + except asyncio.CancelledError: + pass + finally: + await self.disconnect() - await self.disconnect() - - # Add a sentinel value to unstuck recv - if self._recv_queue.empty(): - self._recv_queue.put_nowait(None) - - break - - try: - await self._recv_queue.put(data) - except asyncio.CancelledError: - break def _init_conn(self): """