Change code to recv and handle disconnections

This commit is contained in:
Lonami Exo 2018-11-24 20:51:32 +01:00
parent 09b16f96fc
commit d2e995ef95

View File

@ -33,7 +33,6 @@ class Connection(abc.ABC):
self._recv_task = None
self._send_queue = asyncio.Queue(1)
self._recv_queue = asyncio.Queue(1)
self._waiting_recv = False
async def connect(self, timeout=None, ssl=None):
"""
@ -88,15 +87,9 @@ class Connection(abc.ABC):
"""
self._connected = False
while not self._send_queue.empty():
self._send_queue.get_nowait()
if self._send_task:
self._send_task.cancel()
while not self._recv_queue.empty():
self._recv_queue.get_nowait()
if self._recv_task:
self._recv_task.cancel()
@ -126,17 +119,12 @@ class Connection(abc.ABC):
This method returns a coroutine.
"""
if not self._connected:
raise ConnectionError('Not connected')
self._waiting_recv = True
while self._connected:
result = await self._recv_queue.get()
self._waiting_recv = False
if result:
if result: # None = sentinel value = keep trying
return result
else:
raise ConnectionError('The server closed the connection')
raise ConnectionError('Not connected')
async def _send_loop(self):
"""
@ -173,15 +161,12 @@ class Connection(abc.ABC):
msg = 'Unexpected exception in the receive loop'
__log__.exception(msg)
if self._waiting_recv:
# Await to lend back control to recv() to avoid deadlock.
#
# This is important because things will break if we don't
# get back to asyncio's loop and just disconnect clearing
# the queues in the process.
await self._recv_queue.put(None)
self.disconnect()
# Add a sentinel value to unstuck recv
if self._recv_queue.empty():
self._recv_queue.put_nowait(None)
break
try: