Properly await all spawned background tasks

This commit is contained in:
Lonami Exo 2019-05-04 21:02:07 +02:00
parent 532bd1c916
commit 945d438696
2 changed files with 30 additions and 8 deletions

View File

@ -308,7 +308,9 @@ class TelegramBaseClient(abc.ABC):
self._updates_queue = asyncio.Queue(loop=self._loop)
self._dispatching_updates_queue = asyncio.Event(loop=self._loop)
else:
self._updates_queue = None
# Use a set of pending instead of a queue so we can properly
# terminate all pending updates on disconnect.
self._updates_queue = set()
self._dispatching_updates_queue = None
self._authorized = None # None = unknown, False = no, True = yes
@ -394,11 +396,28 @@ class TelegramBaseClient(abc.ABC):
if self._loop.is_running():
return self._disconnect_coro()
else:
self._loop.run_until_complete(self._disconnect_coro())
try:
self._loop.run_until_complete(self._disconnect_coro())
except RuntimeError:
# Python 3.5.x complains when called from
# `__aexit__` and there were pending updates with:
# "Event loop stopped before Future completed."
#
# However, it doesn't really make a lot of sense.
pass
async def _disconnect_coro(self: 'TelegramClient'):
await self._disconnect()
# trio's nurseries would handle this for us, but this is asyncio.
# All tasks spawned in the background should properly be terminated.
if self._dispatching_updates_queue is None and self._updates_queue:
for task in self._updates_queue:
task.cancel()
await asyncio.wait(self._updates_queue, loop=self._loop)
self._updates_queue.clear()
pts, date = self._state_cache[None]
self.session.set_update_state(0, types.updates.State(
pts=pts,

View File

@ -229,8 +229,10 @@ class UpdateMethods(UserMethods):
# arguments which is faster.
channel_id = self._state_cache.get_channel_id(update)
args = (update, channel_id, self._state_cache[channel_id])
if self._updates_queue is None:
self._loop.create_task(self._dispatch_update(*args))
if self._dispatching_updates_queue is None:
task = self._loop.create_task(self._dispatch_update(*args))
self._updates_queue.add(task)
task.add_done_callback(lambda _: self._updates_queue.discard(task))
else:
self._updates_queue.put_nowait(args)
if not self._dispatching_updates_queue.is_set():
@ -343,10 +345,11 @@ class UpdateMethods(UserMethods):
'for event %s.', name, type(event).__name__
)
break
except Exception:
name = getattr(callback, '__name__', repr(callback))
self._log[__name__].exception('Unhandled exception on %s',
name)
except Exception as e:
if not isinstance(e, asyncio.CancelledError) or self.is_connected():
name = getattr(callback, '__name__', repr(callback))
self._log[__name__].exception('Unhandled exception on %s',
name)
async def _get_difference(self: 'TelegramClient', update, channel_id, pts_date):
"""