From 21a93d58ec1949a55c31364658f390d685c2ddb9 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 30 Nov 2017 21:09:34 +0100 Subject: [PATCH] Use a synchronized queue instead event/deque pair --- telethon/update_state.py | 42 ++++++++++++++-------------------------- 1 file changed, 15 insertions(+), 27 deletions(-) diff --git a/telethon/update_state.py b/telethon/update_state.py index 302d4ab8..b7c43ba3 100644 --- a/telethon/update_state.py +++ b/telethon/update_state.py @@ -1,8 +1,9 @@ import logging import pickle from collections import deque +from queue import Queue, Empty from datetime import datetime -from threading import RLock, Event, Thread +from threading import RLock, Thread from .tl import types as tl @@ -26,8 +27,7 @@ class UpdateState: self.handlers = [] self._updates_lock = RLock() - self._updates_available = Event() - self._updates = deque() + self._updates = Queue() self._latest_updates = deque(maxlen=10) self._logger = logging.getLogger(__name__) @@ -37,24 +37,18 @@ class UpdateState: def can_poll(self): """Returns True if a call to .poll() won't lock""" - return self._updates_available.is_set() + return not self._updates.empty() def poll(self, timeout=None): """Polls an update or blocks until an update object is available. If 'timeout is not None', it should be a floating point value, and the method will 'return None' if waiting times out. """ - if not self._updates_available.wait(timeout=timeout): + try: + update = self._updates.get(timeout=timeout) + except Empty: return - with self._updates_lock: - if not self._updates_available.is_set(): - return - - update = self._updates.popleft() - if not self._updates: - self._updates_available.clear() - if isinstance(update, Exception): raise update # Some error was set through (surely StopIteration) @@ -70,7 +64,8 @@ class UpdateState: self.stop_workers() self._workers = n if n is None: - self._updates.clear() + while self._updates: + self._updates.get() else: self.setup_workers() @@ -86,8 +81,7 @@ class UpdateState: # on all the worker threads # TODO Should this reset the pts and such? for _ in range(self._workers): - self._updates.appendleft(StopIteration()) - self._updates_available.set() + self._updates.put(StopIteration()) for t in self._worker_threads: t.join() @@ -164,21 +158,15 @@ class UpdateState: self._latest_updates.append(data) if isinstance(update, tl.UpdateShort): - self._updates.append(update.update) - self._updates_available.set() - + self._updates.put(update.update) # Expand "Updates" into "Update", and pass these to callbacks. # Since .users and .chats have already been processed, we # don't need to care about those either. elif isinstance(update, (tl.Updates, tl.UpdatesCombined)): - self._updates.extend(update.updates) - self._updates_available.set() - + for u in update.updates: + self._updates.put(u) elif not isinstance(update, tl.UpdatesTooLong): # TODO Handle "Updates too long" - self._updates.append(update) - self._updates_available.set() - + self._updates.put(update) else: - self._updates.append(update) - self._updates_available.set() + self._updates.put(update)