diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index bd05bd12..53d8a4e3 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -94,6 +94,17 @@ class TelegramBaseClient(abc.ABC): Whether reconnection should be retried `connection_retries` times automatically if Telegram disconnects us or not. + sequential_updates (`bool`, optional): + By default every incoming update will create a new task, so + you can handle several updates in parallel. Some scripts need + the order in which updates are processed to be sequential, and + this setting allows them to do so. + + If set to ``True``, incoming updates will be put in a queue + and processed sequentially. This means your event handlers + should *not* perform long-running operations since new + updates are put inside of an unbounded queue. + flood_sleep_threshold (`int` | `float`, optional): The threshold below which the library should automatically sleep on flood wait errors (inclusive). For instance, if a @@ -141,6 +152,7 @@ class TelegramBaseClient(abc.ABC): request_retries=5, connection_retries=5, auto_reconnect=True, + sequential_updates=False, flood_sleep_threshold=60, device_model=None, system_version=None, @@ -230,6 +242,13 @@ class TelegramBaseClient(abc.ABC): self._last_request = time.time() self._channel_pts = {} + if sequential_updates: + self._updates_queue = asyncio.Queue() + self._dispatching_updates_queue = asyncio.Event() + else: + self._updates_queue = None + self._dispatching_updates_queue = None + # Start with invalid state (-1) so we can have somewhere to store # the state, but also be able to determine if we are authorized. self._state = types.updates.State(-1, 0, datetime.now(), 0, -1) diff --git a/telethon/client/updates.py b/telethon/client/updates.py index fe47e87d..220a08ae 100644 --- a/telethon/client/updates.py +++ b/telethon/client/updates.py @@ -179,7 +179,13 @@ class UpdateMethods(UserMethods): self._handle_update(update.update) else: update._entities = getattr(update, '_entities', {}) - self._loop.create_task(self._dispatch_update(update)) + if self._updates_queue is None: + self._loop.create_task(self._dispatch_update(update)) + else: + self._updates_queue.put_nowait(update) + if not self._dispatching_updates_queue.is_set(): + self._dispatching_updates_queue.set() + self._loop.create_task(self._dispatch_queue_updates()) need_diff = False if hasattr(update, 'pts'): @@ -230,6 +236,12 @@ class UpdateMethods(UserMethods): await self(functions.updates.GetStateRequest()) + async def _dispatch_queue_updates(self): + while not self._updates_queue.empty(): + await self._dispatch_update(self._updates_queue.get_nowait()) + + self._dispatching_updates_queue.clear() + async def _dispatch_update(self, update): if self._events_pending_resolve: if self._event_resolve_lock.locked():