From 4de4026bb31cf7869e7fc273b3dffdbc19689bb4 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sun, 3 Sep 2017 09:56:10 +0200 Subject: [PATCH] Move the "constant read" thread to the main TelegramClient --- telethon/network/mtproto_sender.py | 41 ++----------------------- telethon/telegram_bare_client.py | 20 ++++++------- telethon/telegram_client.py | 48 ++++++++++++++++++++++++++++-- 3 files changed, 58 insertions(+), 51 deletions(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 81b448e1..327f98ed 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -17,17 +17,9 @@ class MtProtoSender: (https://core.telegram.org/mtproto/description) """ - def __init__(self, connection, session, constant_read): + def __init__(self, connection, session): """Creates a new MtProtoSender configured to send messages through 'connection' and using the parameters from 'session'. - - If 'constant_read' is set to True, another thread will be - created and started upon connection to constantly read - from the other end. Otherwise, manual calls to .receive() - must be performed. The MtProtoSender cannot be connected, - or an error will be thrown. - - This way, sending and receiving will be completely independent. """ self.connection = connection self.session = session @@ -43,10 +35,6 @@ class MtProtoSender: # TODO There might be a better way to handle msgs_ack requests self.logging_out = False - # Will create a new _recv_thread when connecting if set - self._constant_read = constant_read - self._recv_thread = None - # Every unhandled result gets passed to these callbacks, which # should be functions accepting a single parameter: a TLObject. # This should only be Update(s), although it can actually be any type. @@ -59,29 +47,14 @@ class MtProtoSender: def connect(self): """Connects to the server""" - if not self.is_connected(): - self.connection.connect() - if self._constant_read: - self._recv_thread = Thread( - name='ReadThread', daemon=True, - target=self._recv_thread_impl - ) - self._recv_thread.start() + self.connection.connect() def is_connected(self): return self.connection.is_connected() def disconnect(self): """Disconnects from the server""" - if self.is_connected(): - self.connection.close() - if self._constant_read: - # The existing thread will close eventually, since it's - # only running while the MtProtoSender.is_connected() - self._recv_thread = None - - def is_constant_read(self): - return self._constant_read + self.connection.close() # region Send and receive @@ -117,14 +90,6 @@ class MtProtoSender: del self._need_confirmation[:] - def _recv_thread_impl(self): - while self.is_connected(): - try: - self.receive() - except TimeoutError: - # No problem. - pass - def receive(self): """Receives a single message from the connected endpoint. diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 20b5efec..21682bbe 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -97,8 +97,7 @@ class TelegramBareClient: # region Connecting - def connect(self, exported_auth=None, initial_query=None, - constant_read=False): + def connect(self, exported_auth=None, initial_query=None): """Connects to the Telegram servers, executing authentication if required. Note that authenticating to the Telegram servers is not the same as authenticating the desired user itself, which @@ -110,9 +109,6 @@ class TelegramBareClient: If 'initial_query' is not None, it will override the default 'GetConfigRequest()', and its result will be returned ONLY if the client wasn't connected already. - - The 'constant_read' parameter will be used when creating - the MtProtoSender. Refer to it for more information. """ if self._sender and self._sender.is_connected(): # Try sending a ping to make sure we're connected already @@ -139,9 +135,7 @@ class TelegramBareClient: self.session.save() - self._sender = MtProtoSender( - connection, self.session, constant_read=constant_read - ) + self._sender = MtProtoSender(connection, self.session) self._sender.unhandled_callbacks = self._update_callbacks self._sender.connect() @@ -293,11 +287,15 @@ class TelegramBareClient: # region Invoking Telegram requests - def invoke(self, request, updates=None): + def invoke(self, request, updates=None, call_receive=True): """Invokes (sends) a MTProtoRequest and returns (receives) its result. If 'updates' is not None, all read update object will be put in such list. Otherwise, update objects will be ignored. + + If 'call_receive' is set to False, then there should be another + thread calling to 'self._sender.receive()' running or this method + will lock forever. """ if not isinstance(request, TLObject) and not request.content_related: raise ValueError('You can only invoke requests, not types!') @@ -307,12 +305,12 @@ class TelegramBareClient: try: self._sender.send(request) - if self._sender.is_constant_read(): + if not call_receive: # TODO This will be slightly troublesome if we allow # switching between constant read or not on the fly. # Must also watch out for calling .read() from two places, # in which case a Lock would be required for .receive(). - request.confirm_received.wait() # TODO Optional timeout here? + request.confirm_received.wait() # TODO Socket's timeout here? else: while not request.confirm_received.is_set(): self._sender.receive() diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index c85fd0f7..ccef6d34 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -111,6 +111,10 @@ class TelegramClient(TelegramBareClient): # Uploaded files cache so subsequent calls are instant self._upload_cache = {} + # Constantly read for results and updates from within the main client + self._recv_thread = None + + # endregion # region Connecting @@ -123,6 +127,10 @@ class TelegramClient(TelegramBareClient): *args will be ignored. """ + if self._sender.is_connected(): + return + + ok = super().connect() # The main TelegramClient is the only one that will have # constant_read, since it's also the only one who receives # updates and need to be processed as soon as they occur. @@ -132,13 +140,27 @@ class TelegramClient(TelegramBareClient): # read constantly or not for updates needs to be known before hand, # and further updates won't be able to be added unless allowing to # switch the mode on the fly. - return super().connect(constant_read=True) + if ok: + self._recv_thread = Thread( + name='ReadThread', daemon=True, + target=self._recv_thread_impl + ) + self._recv_thread.start() + + return ok def disconnect(self): """Disconnects from the Telegram server and stops all the spawned threads""" + if not self._sender.is_connected(): + return + super().disconnect() + # The existing thread will close eventually, since it's + # only running while the MtProtoSender.is_connected() + self._recv_thread = None + # Also disconnect all the cached senders for sender in self._cached_clients.values(): sender.disconnect() @@ -185,7 +207,9 @@ class TelegramClient(TelegramBareClient): self._lock.acquire() # TODO Retry if 'result' is None? - return super().invoke(request) + return super().invoke( + request, call_receive=self._recv_thread is None + ) except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e: self._logger.debug('DC error when invoking request, ' @@ -876,3 +900,23 @@ class TelegramClient(TelegramBareClient): return self._update_callbacks[:] # endregion + + # Constant read + + # By using this approach, another thread will be + # created and started upon connection to constantly read + # from the other end. Otherwise, manual calls to .receive() + # must be performed. The MtProtoSender cannot be connected, + # or an error will be thrown. + # + # This way, sending and receiving will be completely independent. + def _recv_thread_impl(self): + while self._sender.is_connected(): + try: + self._sender.receive() + print('got one') + except TimeoutError: + # No problem. + pass + + # endregion