Support dispatching updates in a sequential order

This commit is contained in:
Lonami Exo 2018-06-29 10:45:04 +02:00
parent 0f737a86af
commit 3c2ff45b0b
2 changed files with 32 additions and 1 deletions

View File

@ -94,6 +94,17 @@ class TelegramBaseClient(abc.ABC):
Whether reconnection should be retried `connection_retries` Whether reconnection should be retried `connection_retries`
times automatically if Telegram disconnects us or not. times automatically if Telegram disconnects us or not.
sequential_updates (`bool`, optional):
By default every incoming update will create a new task, so
you can handle several updates in parallel. Some scripts need
the order in which updates are processed to be sequential, and
this setting allows them to do so.
If set to ``True``, incoming updates will be put in a queue
and processed sequentially. This means your event handlers
should *not* perform long-running operations since new
updates are put inside of an unbounded queue.
flood_sleep_threshold (`int` | `float`, optional): flood_sleep_threshold (`int` | `float`, optional):
The threshold below which the library should automatically The threshold below which the library should automatically
sleep on flood wait errors (inclusive). For instance, if a sleep on flood wait errors (inclusive). For instance, if a
@ -141,6 +152,7 @@ class TelegramBaseClient(abc.ABC):
request_retries=5, request_retries=5,
connection_retries=5, connection_retries=5,
auto_reconnect=True, auto_reconnect=True,
sequential_updates=False,
flood_sleep_threshold=60, flood_sleep_threshold=60,
device_model=None, device_model=None,
system_version=None, system_version=None,
@ -230,6 +242,13 @@ class TelegramBaseClient(abc.ABC):
self._last_request = time.time() self._last_request = time.time()
self._channel_pts = {} self._channel_pts = {}
if sequential_updates:
self._updates_queue = asyncio.Queue()
self._dispatching_updates_queue = asyncio.Event()
else:
self._updates_queue = None
self._dispatching_updates_queue = None
# Start with invalid state (-1) so we can have somewhere to store # Start with invalid state (-1) so we can have somewhere to store
# the state, but also be able to determine if we are authorized. # the state, but also be able to determine if we are authorized.
self._state = types.updates.State(-1, 0, datetime.now(), 0, -1) self._state = types.updates.State(-1, 0, datetime.now(), 0, -1)

View File

@ -179,7 +179,13 @@ class UpdateMethods(UserMethods):
self._handle_update(update.update) self._handle_update(update.update)
else: else:
update._entities = getattr(update, '_entities', {}) update._entities = getattr(update, '_entities', {})
self._loop.create_task(self._dispatch_update(update)) if self._updates_queue is None:
self._loop.create_task(self._dispatch_update(update))
else:
self._updates_queue.put_nowait(update)
if not self._dispatching_updates_queue.is_set():
self._dispatching_updates_queue.set()
self._loop.create_task(self._dispatch_queue_updates())
need_diff = False need_diff = False
if hasattr(update, 'pts'): if hasattr(update, 'pts'):
@ -230,6 +236,12 @@ class UpdateMethods(UserMethods):
await self(functions.updates.GetStateRequest()) await self(functions.updates.GetStateRequest())
async def _dispatch_queue_updates(self):
while not self._updates_queue.empty():
await self._dispatch_update(self._updates_queue.get_nowait())
self._dispatching_updates_queue.clear()
async def _dispatch_update(self, update): async def _dispatch_update(self, update):
if self._events_pending_resolve: if self._events_pending_resolve:
if self._event_resolve_lock.locked(): if self._event_resolve_lock.locked():