mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2024-11-22 17:36:34 +03:00
Move update handling to the MtProtoSender, being functional again
This commit is contained in:
parent
21eaf8bd72
commit
9bc1f64efe
|
@ -47,6 +47,16 @@ class MtProtoSender:
|
|||
self._constant_read = constant_read
|
||||
self._recv_thread = None
|
||||
|
||||
# Every unhandled result gets passed to these callbacks, which
|
||||
# should be functions accepting a single parameter: a TLObject.
|
||||
# This should only be Update(s), although it can actually be any type.
|
||||
#
|
||||
# The thread from which these callbacks are called can be any.
|
||||
#
|
||||
# The creator of the MtProtoSender is responsible for setting this
|
||||
# to point to the list wherever their callbacks reside.
|
||||
self.unhandled_callbacks = None
|
||||
|
||||
def connect(self):
|
||||
"""Connects to the server"""
|
||||
if not self.is_connected():
|
||||
|
@ -239,16 +249,15 @@ class MtProtoSender:
|
|||
|
||||
return True
|
||||
|
||||
# If the code is not parsed manually, then it was parsed by the code generator!
|
||||
# In this case, we will simply treat the incoming TLObject as an Update,
|
||||
# if we can first find a matching TLObject
|
||||
# If the code is not parsed manually then it should be a TLObject.
|
||||
if code in tlobjects:
|
||||
result = reader.tgread_object()
|
||||
if updates is None:
|
||||
self._logger.debug('Ignored update for %s', repr(result))
|
||||
if self.unhandled_callbacks:
|
||||
self._logger.debug('Passing TLObject to callbacks %s', repr(result))
|
||||
for callback in self.unhandled_callbacks:
|
||||
callback(result)
|
||||
else:
|
||||
self._logger.debug('Read update for %s', repr(result))
|
||||
updates.append(result)
|
||||
self._logger.debug('Ignoring unhandled TLObject %s', repr(result))
|
||||
|
||||
return True
|
||||
|
||||
|
|
|
@ -83,6 +83,12 @@ class TelegramBareClient:
|
|||
# the time since it's a (somewhat expensive) process.
|
||||
self._cached_clients = {}
|
||||
|
||||
# Update callbacks (functions accepting a single TLObject) go here
|
||||
#
|
||||
# Note that changing the list to which this variable points to
|
||||
# will not reflect the changes on the existing senders.
|
||||
self._update_callbacks = []
|
||||
|
||||
# These will be set later
|
||||
self.dc_options = None
|
||||
self._sender = None
|
||||
|
@ -136,6 +142,7 @@ class TelegramBareClient:
|
|||
self._sender = MtProtoSender(
|
||||
connection, self.session, constant_read=constant_read
|
||||
)
|
||||
self._sender.unhandled_callbacks = self._update_callbacks
|
||||
self._sender.connect()
|
||||
|
||||
# Now it's time to send an InitConnectionRequest
|
||||
|
|
|
@ -98,14 +98,6 @@ class TelegramClient(TelegramBareClient):
|
|||
# Safety across multiple threads (for the updates thread)
|
||||
self._lock = RLock()
|
||||
|
||||
# Updates-related members
|
||||
self._update_handlers = []
|
||||
self._updates_thread_running = Event()
|
||||
self._updates_thread_receiving = Event()
|
||||
|
||||
self._next_ping_at = 0
|
||||
self.ping_interval = 60 # Seconds
|
||||
|
||||
# Used on connection - the user may modify these and reconnect
|
||||
kwargs['app_version'] = kwargs.get('app_version', self.__version__)
|
||||
for name, value in kwargs.items():
|
||||
|
@ -145,7 +137,6 @@ class TelegramClient(TelegramBareClient):
|
|||
def disconnect(self):
|
||||
"""Disconnects from the Telegram server
|
||||
and stops all the spawned threads"""
|
||||
self._set_updates_thread(running=False)
|
||||
super().disconnect()
|
||||
|
||||
# Also disconnect all the cached senders
|
||||
|
@ -193,18 +184,8 @@ class TelegramClient(TelegramBareClient):
|
|||
try:
|
||||
self._lock.acquire()
|
||||
|
||||
updates = [] if self._update_handlers else None
|
||||
result = super().invoke(
|
||||
request, updates=updates
|
||||
)
|
||||
|
||||
if updates:
|
||||
for update in updates:
|
||||
for handler in self._update_handlers:
|
||||
handler(update)
|
||||
|
||||
# TODO Retry if 'result' is None?
|
||||
return result
|
||||
return super().invoke(request)
|
||||
|
||||
except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e:
|
||||
self._logger.debug('DC error when invoking request, '
|
||||
|
@ -394,8 +375,8 @@ class TelegramClient(TelegramBareClient):
|
|||
no_webpage=not link_preview
|
||||
)
|
||||
result = self(request)
|
||||
for handler in self._update_handlers:
|
||||
handler(result)
|
||||
for callback in self._update_callbacks:
|
||||
callback(result)
|
||||
return request.random_id
|
||||
|
||||
def get_message_history(self,
|
||||
|
@ -886,109 +867,12 @@ class TelegramClient(TelegramBareClient):
|
|||
def add_update_handler(self, handler):
|
||||
"""Adds an update handler (a function which takes a TLObject,
|
||||
an update, as its parameter) and listens for updates"""
|
||||
if not self._sender:
|
||||
raise RuntimeError("You can't add update handlers until you've "
|
||||
"successfully connected to the server.")
|
||||
|
||||
first_handler = not self._update_handlers
|
||||
self._update_handlers.append(handler)
|
||||
if first_handler:
|
||||
self._set_updates_thread(running=True)
|
||||
self._update_callbacks.append(handler)
|
||||
|
||||
def remove_update_handler(self, handler):
|
||||
self._update_handlers.remove(handler)
|
||||
if not self._update_handlers:
|
||||
self._set_updates_thread(running=False)
|
||||
self._update_callbacks.remove(handler)
|
||||
|
||||
def list_update_handlers(self):
|
||||
return self._update_handlers[:]
|
||||
|
||||
def _set_updates_thread(self, running):
|
||||
"""Sets the updates thread status (running or not)"""
|
||||
if running == self._updates_thread_running.is_set():
|
||||
return
|
||||
|
||||
# Different state, update the saved value and behave as required
|
||||
self._logger.debug('Changing updates thread running status to %s', running)
|
||||
if running:
|
||||
self._updates_thread_running.set()
|
||||
if not self._updates_thread:
|
||||
self._updates_thread = Thread(
|
||||
name='UpdatesThread', daemon=True,
|
||||
target=self._updates_thread_method)
|
||||
|
||||
self._updates_thread.start()
|
||||
else:
|
||||
self._updates_thread_running.clear()
|
||||
if self._updates_thread_receiving.is_set():
|
||||
# self._sender.cancel_receive()
|
||||
pass
|
||||
|
||||
def _updates_thread_method(self):
|
||||
"""This method will run until specified and listen for incoming updates"""
|
||||
|
||||
while self._updates_thread_running.is_set():
|
||||
# Always sleep a bit before each iteration to relax the CPU,
|
||||
# since it's possible to early 'continue' the loop to reach
|
||||
# the next iteration, but we still should to sleep.
|
||||
sleep(0.1)
|
||||
|
||||
with self._lock:
|
||||
self._logger.debug('Updates thread acquired the lock')
|
||||
try:
|
||||
self._updates_thread_receiving.set()
|
||||
self._logger.debug(
|
||||
'Trying to receive updates from the updates thread'
|
||||
)
|
||||
|
||||
if time() > self._next_ping_at:
|
||||
self._next_ping_at = time() + self.ping_interval
|
||||
self(PingRequest(utils.generate_random_long()))
|
||||
|
||||
#updates = self._sender.receive_updates(timeout=timeout)
|
||||
updates = []
|
||||
|
||||
self._updates_thread_receiving.clear()
|
||||
self._logger.debug(
|
||||
'Received {} update(s) from the updates thread'
|
||||
.format(len(updates))
|
||||
)
|
||||
for update in updates:
|
||||
for handler in self._update_handlers:
|
||||
handler(update)
|
||||
|
||||
except ConnectionResetError:
|
||||
self._logger.debug('Server disconnected us. Reconnecting...')
|
||||
self.reconnect()
|
||||
|
||||
except TimeoutError:
|
||||
self._logger.debug('Receiving updates timed out')
|
||||
|
||||
except ReadCancelledError:
|
||||
self._logger.debug('Receiving updates cancelled')
|
||||
|
||||
except BrokenPipeError:
|
||||
self._logger.debug('Tcp session is broken. Reconnecting...')
|
||||
self.reconnect()
|
||||
|
||||
except InvalidChecksumError:
|
||||
self._logger.debug('MTProto session is broken. Reconnecting...')
|
||||
self.reconnect()
|
||||
|
||||
except OSError:
|
||||
self._logger.debug('OSError on updates thread, %s logging out',
|
||||
'was' if self._sender.logging_out else 'was not')
|
||||
|
||||
if self._sender.logging_out:
|
||||
# This error is okay when logging out, means we got disconnected
|
||||
# TODO Not sure why this happens because we call disconnect()...
|
||||
self._set_updates_thread(running=False)
|
||||
else:
|
||||
raise
|
||||
|
||||
self._logger.debug('Updates thread released the lock')
|
||||
|
||||
# Thread is over, so clean unset its variable
|
||||
self._updates_thread = None
|
||||
return self._update_callbacks[:]
|
||||
|
||||
# endregion
|
||||
|
|
Loading…
Reference in New Issue
Block a user