Rename process_updates/polling to workers

This commit is contained in:
Lonami Exo 2017-09-30 11:17:31 +02:00
parent 72b7e99222
commit 7cef5885fa
3 changed files with 32 additions and 47 deletions

View File

@ -67,7 +67,7 @@ class TelegramBareClient:
def __init__(self, session, api_id, api_hash,
connection_mode=ConnectionMode.TCP_FULL,
proxy=None,
process_updates=False,
update_workers=None,
timeout=timedelta(seconds=5),
**kwargs):
"""Refer to TelegramClient.__init__ for docs on this method"""
@ -108,7 +108,7 @@ class TelegramBareClient:
# This member will process updates if enabled.
# One may change self.updates.enabled at any later point.
self.updates = UpdateState(process_updates)
self.updates = UpdateState(workers=update_workers)
# Used on connection - the user may modify these and reconnect
kwargs['app_version'] = kwargs.get('app_version', self.__version__)

View File

@ -57,7 +57,7 @@ class TelegramClient(TelegramBareClient):
def __init__(self, session, api_id, api_hash,
connection_mode=ConnectionMode.TCP_FULL,
proxy=None,
process_updates=False,
update_workers=None,
timeout=timedelta(seconds=5),
**kwargs):
"""Initializes the Telegram client with the specified API ID and Hash.
@ -71,11 +71,11 @@ class TelegramClient(TelegramBareClient):
This will only affect how messages are sent over the network
and how much processing is required before sending them.
If 'process_updates' is set to True, incoming updates will be
processed and you must manually call 'self.updates.poll()' from
another thread to retrieve the saved update objects, or your
memory will fill with these. You may modify the value of
'self.updates.polling' at any later point.
The integer 'update_workers' represents depending on its value:
is None: Updates will *not* be stored in memory.
= 0: Another thread is responsible for calling self.updates.poll()
> 0: 'update_workers' background threads will be spawned, any
any of them will invoke all the self.updates.handlers.
Despite the value of 'process_updates', if you later call
'.add_update_handler(...)', updates will also be processed
@ -94,7 +94,7 @@ class TelegramClient(TelegramBareClient):
session, api_id, api_hash,
connection_mode=connection_mode,
proxy=proxy,
process_updates=process_updates,
update_workers=update_workers,
timeout=timeout
)

View File

@ -1,5 +1,4 @@
import logging
import threading
from collections import deque
from datetime import datetime
from threading import RLock, Event, Thread
@ -11,9 +10,15 @@ class UpdateState:
"""Used to hold the current state of processed updates.
To retrieve an update, .poll() should be called.
"""
def __init__(self, polling):
self._polling = polling
self._workers = 4
def __init__(self, workers=None):
"""
:param workers: This integer parameter has three possible cases:
workers is None: Updates will *not* be stored on self.
workers = 0: Another thread is responsible for calling self.poll()
workers > 0: 'workers' background threads will be spawned, any
any of them will invoke all the self.handlers.
"""
self._workers = workers
self._worker_threads = []
self.handlers = []
@ -25,11 +30,7 @@ class UpdateState:
# https://core.telegram.org/api/updates
self._state = tl.updates.State(0, 0, datetime.now(), 0, 0)
# TODO Rename "polling" to some other variable
# that signifies "running background threads".
if polling:
self._setup_workers()
self._setup_workers()
def can_poll(self):
"""Returns True if a call to .poll() won't lock"""
@ -37,9 +38,6 @@ class UpdateState:
def poll(self):
"""Polls an update or blocks until an update object is available"""
if not self._polling:
raise ValueError('Updates are not being polled hence not saved.')
self._updates_available.wait()
with self._updates_lock:
if not self._updates_available.is_set():
@ -54,28 +52,19 @@ class UpdateState:
return update
# TODO How should this be handled with background worker threads?
def get_polling(self):
return self._polling
def set_polling(self, polling):
self._polling = polling
if polling:
self._setup_workers()
else:
with self._updates_lock:
self._updates.clear()
self._stop_workers()
polling = property(fget=get_polling, fset=set_polling)
def get_workers(self):
return self._workers
def set_workers(self, n):
"""Changes the number of workers running.
If 'n is None', clears all pending updates from memory.
"""
self._stop_workers()
self._workers = n
self._setup_workers()
if n is None:
self._updates.clear()
else:
self._setup_workers()
workers = property(fget=get_workers, fset=set_workers)
@ -83,9 +72,6 @@ class UpdateState:
"""Raises "StopIterationException" on the worker threads to stop them,
and also clears all of them off the list
"""
if self._worker_threads:
pass
self.set_error(StopIteration())
for t in self._worker_threads:
t.join()
@ -93,8 +79,8 @@ class UpdateState:
self._worker_threads.clear()
def _setup_workers(self):
if self._worker_threads:
# There already are workers
if self._worker_threads or not self._workers:
# There already are workers, or workers is None or 0. Do nothing.
return
for i in range(self._workers):
@ -141,8 +127,8 @@ class UpdateState:
"""Processes an update object. This method is normally called by
the library itself.
"""
if not self._polling and not self.handlers:
return
if self._workers is None:
return # No processing needs to be done if nobody's working
with self._updates_lock:
if isinstance(update, tl.updates.State):
@ -154,6 +140,5 @@ class UpdateState:
return # We already handled this update
self._state.pts = pts
if self._polling:
self._updates.append(update)
self._updates_available.set()
self._updates.append(update)
self._updates_available.set()