From 21eaf8bd72320179ab62fb5c3af1251822bc3f0a Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 2 Sep 2017 21:27:11 +0200 Subject: [PATCH] Allow setting whether the MtProtoSender should use constant_read --- telethon/network/mtproto_sender.py | 56 +++++++++++++++++++++++------- telethon/telegram_bare_client.py | 21 +++++++++-- telethon/telegram_client.py | 11 +++++- 3 files changed, 71 insertions(+), 17 deletions(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 8fdfdc4f..07dc5597 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -13,9 +13,22 @@ logging.getLogger(__name__).addHandler(logging.NullHandler()) class MtProtoSender: - """MTProto Mobile Protocol sender (https://core.telegram.org/mtproto/description)""" + """MTProto Mobile Protocol sender + (https://core.telegram.org/mtproto/description) + """ - def __init__(self, connection, session): + def __init__(self, connection, session, constant_read): + """Creates a new MtProtoSender configured to send messages through + 'connection' and using the parameters from 'session'. + + If 'constant_read' is set to True, another thread will be + created and started upon connection to constantly read + from the other end. Otherwise, manual calls to .receive() + must be performed. The MtProtoSender cannot be connected, + or an error will be thrown. + + This way, sending and receiving will be completely independent. + """ self.connection = connection self.session = session self._logger = logging.getLogger(__name__) @@ -30,23 +43,35 @@ class MtProtoSender: # TODO There might be a better way to handle msgs_ack requests self.logging_out = False - # Reading and writing shouldn't be related. Call .recv() forever here. - # TODO Maybe this could be disabled with some "constant_read=bool". - self._recv_thread = Thread( - name='ReadThread', daemon=True, target=self._recv_thread_impl - ) + # Will create a new _recv_thread when connecting if set + self._constant_read = constant_read + self._recv_thread = None def connect(self): """Connects to the server""" - self.connection.connect() - self._recv_thread.start() + if not self.is_connected(): + self.connection.connect() + if self._constant_read: + self._recv_thread = Thread( + name='ReadThread', daemon=True, + target=self._recv_thread_impl + ) + self._recv_thread.start() def is_connected(self): return self.connection.is_connected() def disconnect(self): """Disconnects from the server""" - self.connection.close() + if self.is_connected(): + self.connection.close() + if self._constant_read: + # The existing thread will close eventually, since it's + # only running while the MtProtoSender.is_connected() + self._recv_thread = None + + def is_constant_read(self): + return self._constant_read # region Send and receive @@ -85,13 +110,18 @@ class MtProtoSender: def _recv_thread_impl(self): while self.is_connected(): try: - self._receive_message() + self.receive() except TimeoutError: # No problem. pass - def _receive_message(self): - """Receives a single message from the connected endpoint.""" + def receive(self): + """Receives a single message from the connected endpoint. + + This method returns nothing, and will only affect other parts + of the MtProtoSender such as the updates callback being fired + or a pending request being confirmed. + """ # TODO Don't ignore updates self._logger.debug('Receiving a message...') body = self.connection.recv() diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 418b8c09..9a9fbf69 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -91,7 +91,8 @@ class TelegramBareClient: # region Connecting - def connect(self, exported_auth=None, initial_query=None): + def connect(self, exported_auth=None, initial_query=None, + constant_read=False): """Connects to the Telegram servers, executing authentication if required. Note that authenticating to the Telegram servers is not the same as authenticating the desired user itself, which @@ -103,6 +104,9 @@ class TelegramBareClient: If 'initial_query' is not None, it will override the default 'GetConfigRequest()', and its result will be returned ONLY if the client wasn't connected already. + + The 'constant_read' parameter will be used when creating + the MtProtoSender. Refer to it for more information. """ if self._sender and self._sender.is_connected(): # Try sending a ping to make sure we're connected already @@ -129,7 +133,9 @@ class TelegramBareClient: self.session.save() - self._sender = MtProtoSender(connection, self.session) + self._sender = MtProtoSender( + connection, self.session, constant_read=constant_read + ) self._sender.connect() # Now it's time to send an InitConnectionRequest @@ -294,7 +300,16 @@ class TelegramBareClient: try: self._sender.send(request) - request.confirm_received.wait() # TODO Optional timeout here? + if self._sender.is_constant_read(): + # TODO This will be slightly troublesome if we allow + # switching between constant read or not on the fly. + # Must also watch out for calling .read() from two places, + # in which case a Lock would be required for .receive(). + request.confirm_received.wait() # TODO Optional timeout here? + else: + while not request.confirm_received.is_set(): + self._sender.receive() + if request.rpc_error: raise request.rpc_error return request.result diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 7a3f6af0..8c34ebc6 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -131,7 +131,16 @@ class TelegramClient(TelegramBareClient): *args will be ignored. """ - return super().connect() + # The main TelegramClient is the only one that will have + # constant_read, since it's also the only one who receives + # updates and need to be processed as soon as they occur. + # + # TODO Allow to disable this to avoid the creation of a new thread + # if the user is not going to work with updates at all? Whether to + # read constantly or not for updates needs to be known before hand, + # and further updates won't be able to be added unless allowing to + # switch the mode on the fly. + return super().connect(constant_read=True) def disconnect(self): """Disconnects from the Telegram server