mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-01-27 01:34:29 +03:00
Fix deadlock on unexpected disconnections
This commit is contained in:
parent
f6bc80bc6b
commit
f046d1f0a3
|
@ -86,9 +86,6 @@ class Connection(abc.ABC):
|
||||||
Disconnects from the server, and clears
|
Disconnects from the server, and clears
|
||||||
pending outgoing and incoming messages.
|
pending outgoing and incoming messages.
|
||||||
"""
|
"""
|
||||||
self._disconnect(error=None)
|
|
||||||
|
|
||||||
def _disconnect(self, error):
|
|
||||||
self._connected = False
|
self._connected = False
|
||||||
|
|
||||||
while not self._send_queue.empty():
|
while not self._send_queue.empty():
|
||||||
|
@ -154,33 +151,43 @@ class Connection(abc.ABC):
|
||||||
except Exception:
|
except Exception:
|
||||||
msg = 'Unexpected exception in the send loop'
|
msg = 'Unexpected exception in the send loop'
|
||||||
__log__.exception(msg)
|
__log__.exception(msg)
|
||||||
self._disconnect(ConnectionError(msg))
|
self.disconnect()
|
||||||
|
|
||||||
async def _recv_loop(self):
|
async def _recv_loop(self):
|
||||||
"""
|
"""
|
||||||
This loop is constantly putting items on the queue as they're read.
|
This loop is constantly putting items on the queue as they're read.
|
||||||
"""
|
"""
|
||||||
try:
|
while self._connected:
|
||||||
while self._connected:
|
try:
|
||||||
data = await self._recv()
|
data = await self._recv()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
if isinstance(e, (ConnectionError, asyncio.IncompleteReadError)):
|
||||||
|
msg = 'The server closed the connection'
|
||||||
|
__log__.info(msg)
|
||||||
|
elif isinstance(e, InvalidChecksumError):
|
||||||
|
msg = 'The server response had an invalid checksum'
|
||||||
|
__log__.info(msg)
|
||||||
|
else:
|
||||||
|
msg = 'Unexpected exception in the receive loop'
|
||||||
|
__log__.exception(msg)
|
||||||
|
|
||||||
|
if self._waiting_recv:
|
||||||
|
# Await to lend back control to recv() to avoid deadlock.
|
||||||
|
#
|
||||||
|
# This is important because things will break if we don't
|
||||||
|
# get back to asyncio's loop and just disconnect clearing
|
||||||
|
# the queues in the process.
|
||||||
|
await self._recv_queue.put(None)
|
||||||
|
|
||||||
|
self.disconnect()
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
await self._recv_queue.put(data)
|
await self._recv_queue.put(data)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
pass
|
break
|
||||||
except Exception as e:
|
|
||||||
if isinstance(e, (ConnectionError, asyncio.IncompleteReadError)):
|
|
||||||
msg = 'The server closed the connection'
|
|
||||||
__log__.info(msg)
|
|
||||||
elif isinstance(e, InvalidChecksumError):
|
|
||||||
msg = 'The server response had an invalid checksum'
|
|
||||||
__log__.info(msg)
|
|
||||||
else:
|
|
||||||
msg = 'Unexpected exception in the receive loop'
|
|
||||||
__log__.exception(msg)
|
|
||||||
|
|
||||||
if self._waiting_recv and not self._recv_queue.empty():
|
|
||||||
await self._recv_queue.put_nowait(None)
|
|
||||||
|
|
||||||
self._disconnect(ConnectionError(msg))
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def _send(self, data):
|
def _send(self, data):
|
||||||
|
|
Loading…
Reference in New Issue
Block a user