From 0a693c705a13c0e5146296768a7fd6c3d876a0b1 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 29 Sep 2017 19:55:14 +0200 Subject: [PATCH] Create a new connection when called from a different thread This allows to invoke several requests in parallel while not waiting for other requests to be written to the network. --- telethon/telegram_bare_client.py | 17 ++++++++++----- telethon/telegram_client.py | 36 ++++++++++++++++++++++++++++---- 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index f4d3d8da..a6d78c14 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -290,7 +290,7 @@ class TelegramBareClient: # region Invoking Telegram requests - def invoke(self, *requests, call_receive=True, retries=5): + def invoke(self, *requests, call_receive=True, retries=5, sender=None): """Invokes (sends) a MTProtoRequest and returns (receives) its result. If 'updates' is not None, all read update object will be put @@ -307,13 +307,16 @@ class TelegramBareClient: if retries <= 0: raise ValueError('Number of retries reached 0.') + if sender is None: + sender = self._sender + try: # Ensure that we start with no previous errors (i.e. resending) for x in requests: x.confirm_received.clear() x.rpc_error = None - self._sender.send(*requests) + sender.send(*requests) if not call_receive: # TODO This will be slightly troublesome if we allow # switching between constant read or not on the fly. @@ -321,11 +324,11 @@ class TelegramBareClient: # in which case a Lock would be required for .receive(). for x in requests: x.confirm_received.wait( - self._sender.connection.get_timeout() + sender.connection.get_timeout() ) else: while not all(x.confirm_received.is_set() for x in requests): - self._sender.receive(update_state=self.updates) + sender.receive(update_state=self.updates) except TimeoutError: pass # We will just retry @@ -333,9 +336,13 @@ class TelegramBareClient: except ConnectionResetError: self._logger.debug('Server disconnected us. Reconnecting and ' 'resending request...') - self._reconnect() + if sender != self._sender: + sender.connect() + else: + self._reconnect() except FloodWaitError: + sender.disconnect() self.disconnect() raise diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 6092c70c..4c686c83 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -18,7 +18,7 @@ from .errors import ( PhoneMigrateError, NetworkMigrateError, UserMigrateError, PhoneCodeExpiredError, PhoneCodeHashEmptyError, PhoneCodeInvalidError ) -from .network import ConnectionMode +from .network import Connection, ConnectionMode, MtProtoSender from .tl import Session, TLObject from .tl.functions import PingRequest from .tl.functions.account import ( @@ -146,6 +146,11 @@ class TelegramClient(TelegramBareClient): # Constantly read for results and updates from within the main client self._recv_thread = None + # Identifier of the main thread (the one that called .connect()). + # This will be used to create new connections from any other thread, + # so that requests can be sent in parallel. + self._main_thread_ident = None + # Default PingRequest delay self._last_ping = datetime.now() self._ping_delay = timedelta(minutes=1) @@ -162,6 +167,8 @@ class TelegramClient(TelegramBareClient): exported_auth is meant for internal purposes and can be ignored. """ + self._main_thread_ident = threading.get_ident() + if socks and self._recv_thread: # Treat proxy errors specially since they're not related to # Telegram itself, but rather to the proxy. If any happens on @@ -246,23 +253,40 @@ class TelegramClient(TelegramBareClient): """ # This is only valid when the read thread is reconnecting, # that is, the connection lock is locked. - if self._on_read_thread() and not self._connect_lock.locked(): + on_read_thread = self._on_read_thread() + if on_read_thread and not self._connect_lock.locked(): return # Just ignore, we would be raising and crashing the thread self.updates.check_error() + # Determine the sender to be used (main or a new connection) + # TODO Polish this so it's nicer + on_main_thread = threading.get_ident() == self._main_thread_ident + if on_main_thread or on_read_thread: + sender = self._sender + else: + conn = Connection( + self.session.server_address, self.session.port, + mode=self._sender.connection._mode, + proxy=self._sender.connection.conn.proxy, + timeout=self._sender.connection.get_timeout() + ) + sender = MtProtoSender(self.session, conn) + sender.connect() + try: # We should call receive from this thread if there's no background # thread reading or if the server disconnected us and we're trying # to reconnect. This is because the read thread may either be # locked also trying to reconnect or we may be said thread already. - call_receive = \ + call_receive = not on_main_thread or \ self._recv_thread is None or self._connect_lock.locked() return super().invoke( *requests, call_receive=call_receive, - retries=kwargs.get('retries', 5) + retries=kwargs.get('retries', 5), + sender=sender ) except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e: @@ -284,6 +308,10 @@ class TelegramClient(TelegramBareClient): while self._user_connected and not self._reconnect(): sleep(0.1) # Retry forever until we can send the request + finally: + if sender != self._sender: + sender.disconnect() + # Let people use client(SomeRequest()) instead client.invoke(...) __call__ = invoke