diff --git a/telethon/_client/updates.py b/telethon/_client/updates.py index 2460c561..b0d632b5 100644 --- a/telethon/_client/updates.py +++ b/telethon/_client/updates.py @@ -7,6 +7,7 @@ import time import traceback import typing import logging +from collections import deque from ..errors._rpcbase import RpcError from .._events.common import EventBuilder, EventCommon @@ -82,6 +83,42 @@ async def catch_up(self: 'TelegramClient'): pass async def _update_loop(self: 'TelegramClient'): - while self.is_connected(): - updates = await self._updates_queue.get() - updates, users, chats = self._message_box.process_updates(updates, self._entity_cache) + try: + updates_to_dispatch = deque() + while self.is_connected(): + if updates_to_dispatch: + # TODO dispatch + updates_to_dispatch.popleft() + continue + + get_diff = self._message_box.get_difference() + if get_diff: + self._log[__name__].info('Getting difference for account updates') + diff = await self(get_diff) + updates, users, chats = self._message_box.apply_difference(diff, self._entity_cache) + updates_to_dispatch.extend(updates) + continue + + get_diff = self._message_box.get_channel_difference(self._entity_cache) + if get_diff: + self._log[__name__].info('Getting difference for channel updates') + diff = await self(get_diff) + updates, users, chats = self._message_box.apply_channel_difference(diff, self._entity_cache) + updates_to_dispatch.extend(updates) + continue + + deadline = self._message_box.check_deadlines() + try: + updates = await asyncio.wait_for( + self._updates_queue.get(), + deadline - asyncio.get_running_loop().time() + ) + except asyncio.TimeoutError: + self._log[__name__].info('Timeout waiting for updates expired') + continue + + processed = [] + users, chats = self._message_box.process_updates(updates, self._entity_cache, processed) + updates_to_dispatch.extend(processed) + except Exception: + self._log[__name__].exception('Fatal error handling updates (this is a bug in Telethon, please report it)')