diff --git a/telethon/update_state.py b/telethon/update_state.py index 1ac2e00d..98345cdf 100644 --- a/telethon/update_state.py +++ b/telethon/update_state.py @@ -39,19 +39,15 @@ class UpdateState: return not self._updates.empty() def poll(self, timeout=None): - """Polls an update or blocks until an update object is available. - If 'timeout is not None', it should be a floating point value, - and the method will 'return None' if waiting times out. + """ + Polls an update or blocks until an update object is available. + If 'timeout is not None', it should be a floating point value, + and the method will 'return None' if waiting times out. """ try: - update = self._updates.get(timeout=timeout) + return self._updates.get(timeout=timeout) except Empty: - return - - if isinstance(update, Exception): - raise update # Some error was set through (surely StopIteration) - - return update + return None def get_workers(self): return self._workers @@ -60,27 +56,24 @@ class UpdateState: """Changes the number of workers running. If 'n is None', clears all pending updates from memory. """ - self.stop_workers() - self._workers = n - if n is not None: + if n is None: + self.stop_workers() + else: + self._workers = n self.setup_workers() workers = property(fget=get_workers, fset=set_workers) def stop_workers(self): """ - Raises "StopIterationException" on the worker threads to stop - them, and also clears all the workers/updates from the lists. + Waits for all the worker threads to stop. """ - if self._workers: - with self._updates_lock: - # Insert at the beginning so the very next poll causes an error - # on all the worker threads - # TODO Should this reset the pts and such? - while self._updates: - self._updates.get() - for _ in range(self._workers): - self._updates.put(StopIteration()) + # Put dummy ``None`` objects so that they don't need to timeout. + n = self._workers + self._workers = None + with self._updates_lock: + for _ in range(n): + self._updates.put(None) for t in self._worker_threads: t.join() @@ -103,7 +96,7 @@ class UpdateState: thread.start() def _worker_loop(self, wid): - while True: + while self._workers is not None: try: update = self.poll(timeout=UpdateState.WORKER_POLL_TIMEOUT) if update and self.handler: