diff --git a/telethon/network/connection/connection.py b/telethon/network/connection/connection.py index 16764320..0b0fdb05 100644 --- a/telethon/network/connection/connection.py +++ b/telethon/network/connection/connection.py @@ -86,9 +86,6 @@ class Connection(abc.ABC): Disconnects from the server, and clears pending outgoing and incoming messages. """ - self._disconnect(error=None) - - def _disconnect(self, error): self._connected = False while not self._send_queue.empty(): @@ -154,33 +151,43 @@ class Connection(abc.ABC): except Exception: msg = 'Unexpected exception in the send loop' __log__.exception(msg) - self._disconnect(ConnectionError(msg)) + self.disconnect() async def _recv_loop(self): """ This loop is constantly putting items on the queue as they're read. """ - try: - while self._connected: + while self._connected: + try: data = await self._recv() + except asyncio.CancelledError: + break + except Exception as e: + if isinstance(e, (ConnectionError, asyncio.IncompleteReadError)): + msg = 'The server closed the connection' + __log__.info(msg) + elif isinstance(e, InvalidChecksumError): + msg = 'The server response had an invalid checksum' + __log__.info(msg) + else: + msg = 'Unexpected exception in the receive loop' + __log__.exception(msg) + + if self._waiting_recv: + # Await to lend back control to recv() to avoid deadlock. + # + # This is important because things will break if we don't + # get back to asyncio's loop and just disconnect clearing + # the queues in the process. + await self._recv_queue.put(None) + + self.disconnect() + break + + try: await self._recv_queue.put(data) - except asyncio.CancelledError: - pass - except Exception as e: - if isinstance(e, (ConnectionError, asyncio.IncompleteReadError)): - msg = 'The server closed the connection' - __log__.info(msg) - elif isinstance(e, InvalidChecksumError): - msg = 'The server response had an invalid checksum' - __log__.info(msg) - else: - msg = 'Unexpected exception in the receive loop' - __log__.exception(msg) - - if self._waiting_recv and not self._recv_queue.empty(): - await self._recv_queue.put_nowait(None) - - self._disconnect(ConnectionError(msg)) + except asyncio.CancelledError: + break @abc.abstractmethod def _send(self, data):