Move update handling to the MtProtoSender, being functional again

This commit is contained in:
Lonami Exo 2017-09-02 21:45:27 +02:00
parent 21eaf8bd72
commit 9bc1f64efe
3 changed files with 29 additions and 129 deletions

View File

@ -47,6 +47,16 @@ class MtProtoSender:
self._constant_read = constant_read
self._recv_thread = None
# Every unhandled result gets passed to these callbacks, which
# should be functions accepting a single parameter: a TLObject.
# This should only be Update(s), although it can actually be any type.
#
# The thread from which these callbacks are called can be any.
#
# The creator of the MtProtoSender is responsible for setting this
# to point to the list wherever their callbacks reside.
self.unhandled_callbacks = None
def connect(self):
"""Connects to the server"""
if not self.is_connected():
@ -239,16 +249,15 @@ class MtProtoSender:
return True
# If the code is not parsed manually, then it was parsed by the code generator!
# In this case, we will simply treat the incoming TLObject as an Update,
# if we can first find a matching TLObject
# If the code is not parsed manually then it should be a TLObject.
if code in tlobjects:
result = reader.tgread_object()
if updates is None:
self._logger.debug('Ignored update for %s', repr(result))
if self.unhandled_callbacks:
self._logger.debug('Passing TLObject to callbacks %s', repr(result))
for callback in self.unhandled_callbacks:
callback(result)
else:
self._logger.debug('Read update for %s', repr(result))
updates.append(result)
self._logger.debug('Ignoring unhandled TLObject %s', repr(result))
return True

View File

@ -83,6 +83,12 @@ class TelegramBareClient:
# the time since it's a (somewhat expensive) process.
self._cached_clients = {}
# Update callbacks (functions accepting a single TLObject) go here
#
# Note that changing the list to which this variable points to
# will not reflect the changes on the existing senders.
self._update_callbacks = []
# These will be set later
self.dc_options = None
self._sender = None
@ -136,6 +142,7 @@ class TelegramBareClient:
self._sender = MtProtoSender(
connection, self.session, constant_read=constant_read
)
self._sender.unhandled_callbacks = self._update_callbacks
self._sender.connect()
# Now it's time to send an InitConnectionRequest

View File

@ -98,14 +98,6 @@ class TelegramClient(TelegramBareClient):
# Safety across multiple threads (for the updates thread)
self._lock = RLock()
# Updates-related members
self._update_handlers = []
self._updates_thread_running = Event()
self._updates_thread_receiving = Event()
self._next_ping_at = 0
self.ping_interval = 60 # Seconds
# Used on connection - the user may modify these and reconnect
kwargs['app_version'] = kwargs.get('app_version', self.__version__)
for name, value in kwargs.items():
@ -145,7 +137,6 @@ class TelegramClient(TelegramBareClient):
def disconnect(self):
"""Disconnects from the Telegram server
and stops all the spawned threads"""
self._set_updates_thread(running=False)
super().disconnect()
# Also disconnect all the cached senders
@ -193,18 +184,8 @@ class TelegramClient(TelegramBareClient):
try:
self._lock.acquire()
updates = [] if self._update_handlers else None
result = super().invoke(
request, updates=updates
)
if updates:
for update in updates:
for handler in self._update_handlers:
handler(update)
# TODO Retry if 'result' is None?
return result
return super().invoke(request)
except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e:
self._logger.debug('DC error when invoking request, '
@ -394,8 +375,8 @@ class TelegramClient(TelegramBareClient):
no_webpage=not link_preview
)
result = self(request)
for handler in self._update_handlers:
handler(result)
for callback in self._update_callbacks:
callback(result)
return request.random_id
def get_message_history(self,
@ -886,109 +867,12 @@ class TelegramClient(TelegramBareClient):
def add_update_handler(self, handler):
"""Adds an update handler (a function which takes a TLObject,
an update, as its parameter) and listens for updates"""
if not self._sender:
raise RuntimeError("You can't add update handlers until you've "
"successfully connected to the server.")
first_handler = not self._update_handlers
self._update_handlers.append(handler)
if first_handler:
self._set_updates_thread(running=True)
self._update_callbacks.append(handler)
def remove_update_handler(self, handler):
self._update_handlers.remove(handler)
if not self._update_handlers:
self._set_updates_thread(running=False)
self._update_callbacks.remove(handler)
def list_update_handlers(self):
return self._update_handlers[:]
def _set_updates_thread(self, running):
"""Sets the updates thread status (running or not)"""
if running == self._updates_thread_running.is_set():
return
# Different state, update the saved value and behave as required
self._logger.debug('Changing updates thread running status to %s', running)
if running:
self._updates_thread_running.set()
if not self._updates_thread:
self._updates_thread = Thread(
name='UpdatesThread', daemon=True,
target=self._updates_thread_method)
self._updates_thread.start()
else:
self._updates_thread_running.clear()
if self._updates_thread_receiving.is_set():
# self._sender.cancel_receive()
pass
def _updates_thread_method(self):
"""This method will run until specified and listen for incoming updates"""
while self._updates_thread_running.is_set():
# Always sleep a bit before each iteration to relax the CPU,
# since it's possible to early 'continue' the loop to reach
# the next iteration, but we still should to sleep.
sleep(0.1)
with self._lock:
self._logger.debug('Updates thread acquired the lock')
try:
self._updates_thread_receiving.set()
self._logger.debug(
'Trying to receive updates from the updates thread'
)
if time() > self._next_ping_at:
self._next_ping_at = time() + self.ping_interval
self(PingRequest(utils.generate_random_long()))
#updates = self._sender.receive_updates(timeout=timeout)
updates = []
self._updates_thread_receiving.clear()
self._logger.debug(
'Received {} update(s) from the updates thread'
.format(len(updates))
)
for update in updates:
for handler in self._update_handlers:
handler(update)
except ConnectionResetError:
self._logger.debug('Server disconnected us. Reconnecting...')
self.reconnect()
except TimeoutError:
self._logger.debug('Receiving updates timed out')
except ReadCancelledError:
self._logger.debug('Receiving updates cancelled')
except BrokenPipeError:
self._logger.debug('Tcp session is broken. Reconnecting...')
self.reconnect()
except InvalidChecksumError:
self._logger.debug('MTProto session is broken. Reconnecting...')
self.reconnect()
except OSError:
self._logger.debug('OSError on updates thread, %s logging out',
'was' if self._sender.logging_out else 'was not')
if self._sender.logging_out:
# This error is okay when logging out, means we got disconnected
# TODO Not sure why this happens because we call disconnect()...
self._set_updates_thread(running=False)
else:
raise
self._logger.debug('Updates thread released the lock')
# Thread is over, so clean unset its variable
self._updates_thread = None
return self._update_callbacks[:]
# endregion