diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index d027a358..f7a3933d 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -106,10 +106,10 @@ class TelegramBareClient: # we only want one to actually perform the reconnection. self._connect_lock = Lock() - # Cache "exported" senders 'dc_id: TelegramBareClient' and - # their corresponding sessions not to recreate them all - # the time since it's a (somewhat expensive) process. - self._cached_clients = {} + # Cache "exported" sessions as 'dc_id: Session' not to recreate + # them all the time since generating a new key is a relatively + # expensive operation. + self._exported_sessions = {} # This member will process updates if enabled. # One may change self.updates.enabled at any later point. @@ -154,14 +154,22 @@ class TelegramBareClient: # region Connecting - def connect(self, exported_auth=None): + def connect(self, _exported_auth=None, _sync_updates=True): """Connects to the Telegram servers, executing authentication if required. Note that authenticating to the Telegram servers is not the same as authenticating the desired user itself, which may require a call (or several) to 'sign_in' for the first time. - If 'exported_auth' is not None, it will be used instead to + Note that the optional parameters are meant for internal use. + + If '_exported_auth' is not None, it will be used instead to determine the authorization key for the current session. + + If '_sync_updates', sync_updates() will be called and a + second thread will be started if necessary. Note that this + will FAIL if the client is not connected to the user's + native data center, raising a "UserMigrateError", and + calling .disconnect() in the process. """ self._main_thread_ident = threading.get_ident() @@ -183,17 +191,17 @@ class TelegramBareClient: init_connection = self.session.layer != LAYER if init_connection: - if exported_auth is not None: + if _exported_auth is not None: self._init_connection(ImportAuthorizationRequest( - exported_auth.id, exported_auth.bytes + _exported_auth.id, _exported_auth.bytes )) else: TelegramBareClient._dc_options = \ self._init_connection(GetConfigRequest()).dc_options - elif exported_auth is not None: + elif _exported_auth is not None: self(ImportAuthorizationRequest( - exported_auth.id, exported_auth.bytes + _exported_auth.id, _exported_auth.bytes )) if TelegramBareClient._dc_options is None: @@ -201,11 +209,11 @@ class TelegramBareClient: self(GetConfigRequest()).dc_options # Connection was successful! Try syncing the update state - # IF we don't have an exported authorization (hence we're - # not in our NATIVE data center or we'd get UserMigrateError) + # UNLESS '_sync_updates' is False (we probably are in + # another data center and this would raise UserMigrateError) # to also assert whether the user is logged in or not. self._user_connected = True - if not exported_auth: + if _sync_updates: try: self.sync_updates() self._set_connected_and_authorized() @@ -218,7 +226,10 @@ class TelegramBareClient: # This is fine, probably layer migration self._logger.debug('Found invalid item, probably migrating', e) self.disconnect() - return self.connect(exported_auth=exported_auth) + return self.connect( + _exported_auth=_exported_auth, + _sync_updates=_sync_updates + ) except (RPCError, ConnectionError) as error: # Probably errors from the previous session, ignore them @@ -258,11 +269,8 @@ class TelegramBareClient: # ._recv_thread = None, it knows it doesn't have to. self._sender.disconnect() - # Also disconnect all the cached senders - for sender in self._cached_clients.values(): - sender.disconnect() - - self._cached_clients.clear() + # TODO Shall we clear the _exported_sessions, or may be reused? + pass def _reconnect(self, new_dc=None): """If 'new_dc' is not set, only a call to .connect() will be made @@ -324,30 +332,22 @@ class TelegramBareClient: TelegramBareClient._dc_options = self(GetConfigRequest()).dc_options return self._get_dc(dc_id, ipv6=ipv6, cdn=cdn) - def _get_exported_client(self, dc_id, - init_connection=False, - bypass_cache=False): - """Gets a cached exported TelegramBareClient for the desired DC. + def _get_exported_client(self, dc_id): + """Creates and connects a new TelegramBareClient for the desired DC. - If it's the first time retrieving the TelegramBareClient, the - current authorization is exported to the new DC so that - it can be used there, and the connection is initialized. - - If after using the sender a ConnectionResetError is raised, - this method should be called again with init_connection=True - in order to perform the reconnection. - - If bypass_cache is True, a new client will be exported and - it will not be cached. + If it's the first time calling the method with a given dc_id, + a new session will be first created, and its auth key generated. + Exporting/Importing the authorization will also be done so that + the auth is bound with the key. """ # Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt # for clearly showing how to export the authorization! ^^ - client = self._cached_clients.get(dc_id) - if client and not bypass_cache: - if init_connection: - client.reconnect() - return client + session = self._exported_sessions.get(dc_id) + if session: + export_auth = None # Already bound with the auth key else: + # TODO Add a lock, don't allow two threads to create an auth key + # for the same data center. dc = self._get_dc(dc_id) # Export the current authorization to the new DC. @@ -361,17 +361,15 @@ class TelegramBareClient: session = Session(self.session) session.server_address = dc.ip_address session.port = dc.port - client = TelegramBareClient( - session, self.api_id, self.api_hash, - proxy=self._sender.connection.conn.proxy, - timeout=self._sender.connection.get_timeout() - ) - client.connect(exported_auth=export_auth) + self._exported_sessions[dc_id] = session - if not bypass_cache: - # Don't go through this expensive process every time. - self._cached_clients[dc_id] = client - return client + client = TelegramBareClient( + session, self.api_id, self.api_hash, + proxy=self._sender.connection.conn.proxy, + timeout=self._sender.connection.get_timeout() + ) + client.connect(_exported_auth=export_auth, _sync_updates=False) + return client # endregion @@ -672,6 +670,9 @@ class TelegramBareClient: if progress_callback: progress_callback(f.tell(), file_size) finally: + if client != self: + client.disconnect() + if cdn_decrypter: try: cdn_decrypter.client.disconnect()