diff --git a/telethon/network/connection/connection.py b/telethon/network/connection/connection.py index 0b0fdb05..1c43c2fc 100644 --- a/telethon/network/connection/connection.py +++ b/telethon/network/connection/connection.py @@ -33,7 +33,6 @@ class Connection(abc.ABC): self._recv_task = None self._send_queue = asyncio.Queue(1) self._recv_queue = asyncio.Queue(1) - self._waiting_recv = False async def connect(self, timeout=None, ssl=None): """ @@ -88,15 +87,9 @@ class Connection(abc.ABC): """ self._connected = False - while not self._send_queue.empty(): - self._send_queue.get_nowait() - if self._send_task: self._send_task.cancel() - while not self._recv_queue.empty(): - self._recv_queue.get_nowait() - if self._recv_task: self._recv_task.cancel() @@ -126,17 +119,12 @@ class Connection(abc.ABC): This method returns a coroutine. """ - if not self._connected: - raise ConnectionError('Not connected') + while self._connected: + result = await self._recv_queue.get() + if result: # None = sentinel value = keep trying + return result - self._waiting_recv = True - result = await self._recv_queue.get() - self._waiting_recv = False - - if result: - return result - else: - raise ConnectionError('The server closed the connection') + raise ConnectionError('Not connected') async def _send_loop(self): """ @@ -173,15 +161,12 @@ class Connection(abc.ABC): msg = 'Unexpected exception in the receive loop' __log__.exception(msg) - if self._waiting_recv: - # Await to lend back control to recv() to avoid deadlock. - # - # This is important because things will break if we don't - # get back to asyncio's loop and just disconnect clearing - # the queues in the process. - await self._recv_queue.put(None) - self.disconnect() + + # Add a sentinel value to unstuck recv + if self._recv_queue.empty(): + self._recv_queue.put_nowait(None) + break try: