diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index 43df209f..c79afdc0 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -308,7 +308,9 @@ class TelegramBaseClient(abc.ABC): self._updates_queue = asyncio.Queue(loop=self._loop) self._dispatching_updates_queue = asyncio.Event(loop=self._loop) else: - self._updates_queue = None + # Use a set of pending instead of a queue so we can properly + # terminate all pending updates on disconnect. + self._updates_queue = set() self._dispatching_updates_queue = None self._authorized = None # None = unknown, False = no, True = yes @@ -394,11 +396,28 @@ class TelegramBaseClient(abc.ABC): if self._loop.is_running(): return self._disconnect_coro() else: - self._loop.run_until_complete(self._disconnect_coro()) + try: + self._loop.run_until_complete(self._disconnect_coro()) + except RuntimeError: + # Python 3.5.x complains when called from + # `__aexit__` and there were pending updates with: + # "Event loop stopped before Future completed." + # + # However, it doesn't really make a lot of sense. + pass async def _disconnect_coro(self: 'TelegramClient'): await self._disconnect() + # trio's nurseries would handle this for us, but this is asyncio. + # All tasks spawned in the background should properly be terminated. + if self._dispatching_updates_queue is None and self._updates_queue: + for task in self._updates_queue: + task.cancel() + + await asyncio.wait(self._updates_queue, loop=self._loop) + self._updates_queue.clear() + pts, date = self._state_cache[None] self.session.set_update_state(0, types.updates.State( pts=pts, diff --git a/telethon/client/updates.py b/telethon/client/updates.py index a9d5c9aa..66eac4be 100644 --- a/telethon/client/updates.py +++ b/telethon/client/updates.py @@ -229,8 +229,10 @@ class UpdateMethods(UserMethods): # arguments which is faster. channel_id = self._state_cache.get_channel_id(update) args = (update, channel_id, self._state_cache[channel_id]) - if self._updates_queue is None: - self._loop.create_task(self._dispatch_update(*args)) + if self._dispatching_updates_queue is None: + task = self._loop.create_task(self._dispatch_update(*args)) + self._updates_queue.add(task) + task.add_done_callback(lambda _: self._updates_queue.discard(task)) else: self._updates_queue.put_nowait(args) if not self._dispatching_updates_queue.is_set(): @@ -343,10 +345,11 @@ class UpdateMethods(UserMethods): 'for event %s.', name, type(event).__name__ ) break - except Exception: - name = getattr(callback, '__name__', repr(callback)) - self._log[__name__].exception('Unhandled exception on %s', - name) + except Exception as e: + if not isinstance(e, asyncio.CancelledError) or self.is_connected(): + name = getattr(callback, '__name__', repr(callback)) + self._log[__name__].exception('Unhandled exception on %s', + name) async def _get_difference(self: 'TelegramClient', update, channel_id, pts_date): """