Propagate errors at the connection level

This commit is contained in:
Lonami Exo 2023-03-12 17:43:19 +01:00
parent f3414d134a
commit acd3407418

View File

@ -296,8 +296,10 @@ class Connection(abc.ABC):
This method returns a coroutine. This method returns a coroutine.
""" """
while self._connected: while self._connected:
result = await self._recv_queue.get() result, err = await self._recv_queue.get()
if result: # None = sentinel value = keep trying if err:
raise err
if result:
return result return result
raise ConnectionError('Not connected') raise ConnectionError('Not connected')
@ -324,37 +326,29 @@ class Connection(abc.ABC):
""" """
This loop is constantly putting items on the queue as they're read. This loop is constantly putting items on the queue as they're read.
""" """
try:
while self._connected: while self._connected:
try: try:
data = await self._recv() data = await self._recv()
except asyncio.CancelledError: except (IOError, asyncio.IncompleteReadError) as e:
break self._log.warning('Server closed the connection: %s', e)
except Exception as e: await self._recv_queue.put((None, e))
if isinstance(e, (IOError, asyncio.IncompleteReadError)): except InvalidChecksumError as e:
msg = 'The server closed the connection' self._log.warning('Server response had invalid checksum: %s', e)
self._log.info(msg) await self._recv_queue.put((None, e))
elif isinstance(e, InvalidChecksumError): except InvalidBufferError as e:
msg = 'The server response had an invalid checksum' self._log.warning('Server response had invalid buffer: %s', e)
self._log.info(msg) await self._recv_queue.put((None, e))
elif isinstance(e, InvalidBufferError): except Exception:
msg = 'The server response had an invalid buffer' self._log.exception('Unexpected exception in the receive loop')
self._log.error(msg) await self._recv_queue.put((None, e))
else: else:
msg = 'Unexpected exception in the receive loop' await self._recv_queue.put((data, None))
self._log.exception(msg) except asyncio.CancelledError:
pass
finally:
await self.disconnect() await self.disconnect()
# Add a sentinel value to unstuck recv
if self._recv_queue.empty():
self._recv_queue.put_nowait(None)
break
try:
await self._recv_queue.put(data)
except asyncio.CancelledError:
break
def _init_conn(self): def _init_conn(self):
""" """