Spawn new worker threads to handle updates instead using ReadThread

This commit is contained in:
Lonami Exo 2017-09-30 10:12:01 +02:00
parent 479afddf50
commit b87a798dd5

View File

@ -1,6 +1,7 @@
import logging
from collections import deque from collections import deque
from datetime import datetime from datetime import datetime
from threading import RLock, Event from threading import RLock, Event, Thread
from .tl import types as tl from .tl import types as tl
@ -11,14 +12,24 @@ class UpdateState:
""" """
def __init__(self, polling): def __init__(self, polling):
self._polling = polling self._polling = polling
self._workers = 4
self._worker_threads = []
self.handlers = [] self.handlers = []
self._updates_lock = RLock() self._updates_lock = RLock()
self._updates_available = Event() self._updates_available = Event()
self._updates = deque() self._updates = deque()
self._logger = logging.getLogger(__name__)
# https://core.telegram.org/api/updates # https://core.telegram.org/api/updates
self._state = tl.updates.State(0, 0, datetime.now(), 0, 0) self._state = tl.updates.State(0, 0, datetime.now(), 0, 0)
# TODO Rename "polling" to some other variable
# that signifies "running background threads".
if polling:
self._setup_workers()
def can_poll(self): def can_poll(self):
"""Returns True if a call to .poll() won't lock""" """Returns True if a call to .poll() won't lock"""
return self._updates_available.is_set() return self._updates_available.is_set()
@ -39,17 +50,74 @@ class UpdateState:
return update return update
# TODO How should this be handled with background worker threads?
def get_polling(self): def get_polling(self):
return self._polling return self._polling
def set_polling(self, polling): def set_polling(self, polling):
self._polling = polling self._polling = polling
if not polling: if polling:
self._setup_workers()
else:
with self._updates_lock: with self._updates_lock:
self._updates.clear() self._updates.clear()
self._stop_workers()
polling = property(fget=get_polling, fset=set_polling) polling = property(fget=get_polling, fset=set_polling)
def get_workers(self):
return self._workers
def set_workers(self, n):
self._stop_workers()
self._workers = n
self._setup_workers()
workers = property(fget=get_workers, fset=set_workers)
def _stop_workers(self):
"""Raises "StopIterationException" on the worker threads to stop them,
and also clears all of them off the list
"""
if self._worker_threads:
pass
self.set_error(StopIteration())
for t in self._worker_threads:
t.join()
self._worker_threads.clear()
def _setup_workers(self):
if self._worker_threads:
# There already are workers
return
for i in range(self._workers):
thread = Thread(
target=UpdateState._worker_loop,
name='UpdateWorker{}'.format(i),
daemon=True,
args=(self, i)
)
self._worker_threads.append(thread)
thread.start()
def _worker_loop(self, wid):
while True:
try:
update = self.poll()
# TODO Maybe people can add different handlers per update type
for handler in self.handlers:
handler(update)
except StopIteration:
break
except Exception as e:
# We don't want to crash a worker thread due to any reason
self._logger.debug(
'[ERROR] Unhandled exception on worker {}'.format(wid), e
)
def set_error(self, error): def set_error(self, error):
"""Sets an error, so that the next call to .poll() will raise it. """Sets an error, so that the next call to .poll() will raise it.
Can be (and is) used to pass exceptions between threads. Can be (and is) used to pass exceptions between threads.
@ -85,6 +153,3 @@ class UpdateState:
if self._polling: if self._polling:
self._updates.append(update) self._updates.append(update)
self._updates_available.set() self._updates_available.set()
for handler in self.handlers:
handler(update)