Use a synchronized queue instead event/deque pair

This commit is contained in:
Lonami Exo 2017-11-30 21:09:34 +01:00
parent 7d7b2cb1fa
commit 21a93d58ec

View File

@ -1,8 +1,9 @@
import logging import logging
import pickle import pickle
from collections import deque from collections import deque
from queue import Queue, Empty
from datetime import datetime from datetime import datetime
from threading import RLock, Event, Thread from threading import RLock, Thread
from .tl import types as tl from .tl import types as tl
@ -26,8 +27,7 @@ class UpdateState:
self.handlers = [] self.handlers = []
self._updates_lock = RLock() self._updates_lock = RLock()
self._updates_available = Event() self._updates = Queue()
self._updates = deque()
self._latest_updates = deque(maxlen=10) self._latest_updates = deque(maxlen=10)
self._logger = logging.getLogger(__name__) self._logger = logging.getLogger(__name__)
@ -37,24 +37,18 @@ class UpdateState:
def can_poll(self): def can_poll(self):
"""Returns True if a call to .poll() won't lock""" """Returns True if a call to .poll() won't lock"""
return self._updates_available.is_set() return not self._updates.empty()
def poll(self, timeout=None): def poll(self, timeout=None):
"""Polls an update or blocks until an update object is available. """Polls an update or blocks until an update object is available.
If 'timeout is not None', it should be a floating point value, If 'timeout is not None', it should be a floating point value,
and the method will 'return None' if waiting times out. and the method will 'return None' if waiting times out.
""" """
if not self._updates_available.wait(timeout=timeout): try:
update = self._updates.get(timeout=timeout)
except Empty:
return return
with self._updates_lock:
if not self._updates_available.is_set():
return
update = self._updates.popleft()
if not self._updates:
self._updates_available.clear()
if isinstance(update, Exception): if isinstance(update, Exception):
raise update # Some error was set through (surely StopIteration) raise update # Some error was set through (surely StopIteration)
@ -70,7 +64,8 @@ class UpdateState:
self.stop_workers() self.stop_workers()
self._workers = n self._workers = n
if n is None: if n is None:
self._updates.clear() while self._updates:
self._updates.get()
else: else:
self.setup_workers() self.setup_workers()
@ -86,8 +81,7 @@ class UpdateState:
# on all the worker threads # on all the worker threads
# TODO Should this reset the pts and such? # TODO Should this reset the pts and such?
for _ in range(self._workers): for _ in range(self._workers):
self._updates.appendleft(StopIteration()) self._updates.put(StopIteration())
self._updates_available.set()
for t in self._worker_threads: for t in self._worker_threads:
t.join() t.join()
@ -164,21 +158,15 @@ class UpdateState:
self._latest_updates.append(data) self._latest_updates.append(data)
if isinstance(update, tl.UpdateShort): if isinstance(update, tl.UpdateShort):
self._updates.append(update.update) self._updates.put(update.update)
self._updates_available.set()
# Expand "Updates" into "Update", and pass these to callbacks. # Expand "Updates" into "Update", and pass these to callbacks.
# Since .users and .chats have already been processed, we # Since .users and .chats have already been processed, we
# don't need to care about those either. # don't need to care about those either.
elif isinstance(update, (tl.Updates, tl.UpdatesCombined)): elif isinstance(update, (tl.Updates, tl.UpdatesCombined)):
self._updates.extend(update.updates) for u in update.updates:
self._updates_available.set() self._updates.put(u)
elif not isinstance(update, tl.UpdatesTooLong): elif not isinstance(update, tl.UpdatesTooLong):
# TODO Handle "Updates too long" # TODO Handle "Updates too long"
self._updates.append(update) self._updates.put(update)
self._updates_available.set()
else: else:
self._updates.append(update) self._updates.put(update)
self._updates_available.set()