diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 3a6a44f2..a589fa9e 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -33,6 +33,7 @@ from .tl.functions.upload import ( GetFileRequest, SaveBigFilePartRequest, SaveFilePartRequest ) from .tl.types import InputFile, InputFileBig +from .tl.types.auth import ExportedAuthorization from .tl.types.upload import FileCdnRedirect from .update_state import UpdateState from .utils import get_appropriated_part_size @@ -62,7 +63,7 @@ class TelegramBareClient: __version__ = '0.15.3' # TODO Make this thread-safe, all connections share the same DC - _dc_options = None + _config = None # Server configuration (with .dc_options) # region Initialization @@ -161,7 +162,7 @@ class TelegramBareClient: # region Connecting - def connect(self, _exported_auth=None, _sync_updates=True, _cdn=False): + def connect(self, _sync_updates=True): """Connects to the Telegram servers, executing authentication if required. Note that authenticating to the Telegram servers is not the same as authenticating the desired user itself, which @@ -169,62 +170,24 @@ class TelegramBareClient: Note that the optional parameters are meant for internal use. - If '_exported_auth' is not None, it will be used instead to - determine the authorization key for the current session. - If '_sync_updates', sync_updates() will be called and a second thread will be started if necessary. Note that this will FAIL if the client is not connected to the user's native data center, raising a "UserMigrateError", and calling .disconnect() in the process. - - If '_cdn' is False, methods that are not allowed on such data - centers won't be invoked. """ self._main_thread_ident = threading.get_ident() self._background_error = None # Clear previous errors try: self._sender.connect() - if not self.session.auth_key: - # New key, we need to tell the server we're going to use - # the latest layer - try: - self.session.auth_key, self.session.time_offset = \ - authenticator.do_authentication(self._sender.connection) - except BrokenAuthKeyError: - return False - - self.session.layer = LAYER - self.session.save() - init_connection = True - else: - init_connection = self.session.layer != LAYER - - if init_connection: - if _exported_auth is not None: - self._init_connection(ImportAuthorizationRequest( - _exported_auth.id, _exported_auth.bytes - )) - elif not _cdn: - TelegramBareClient._dc_options = \ - self._init_connection(GetConfigRequest()).dc_options - - elif _exported_auth is not None: - self(ImportAuthorizationRequest( - _exported_auth.id, _exported_auth.bytes - )) - - if TelegramBareClient._dc_options is None and not _cdn: - TelegramBareClient._dc_options = \ - self(GetConfigRequest()).dc_options # Connection was successful! Try syncing the update state # UNLESS '_sync_updates' is False (we probably are in # another data center and this would raise UserMigrateError) # to also assert whether the user is logged in or not. self._user_connected = True - if self._authorized is None and _sync_updates and not _cdn: + if self._authorized is None and _sync_updates: try: self.sync_updates() self._set_connected_and_authorized() @@ -239,11 +202,7 @@ class TelegramBareClient: # This is fine, probably layer migration self._logger.debug('Found invalid item, probably migrating', e) self.disconnect() - return self.connect( - _exported_auth=_exported_auth, - _sync_updates=_sync_updates, - _cdn=_cdn - ) + return self.connect(_sync_updates=_sync_updates) except (RPCError, ConnectionError) as error: # Probably errors from the previous session, ignore them @@ -256,8 +215,9 @@ class TelegramBareClient: def is_connected(self): return self._sender.is_connected() - def _init_connection(self, query=None): - result = self(InvokeWithLayerRequest(LAYER, InitConnectionRequest( + def _wrap_init_connection(self, query): + """Wraps query around InvokeWithLayerRequest(InitConnectionRequest())""" + return InvokeWithLayerRequest(LAYER, InitConnectionRequest( api_id=self.api_id, device_model=self.session.device_model, system_version=self.session.system_version, @@ -266,10 +226,7 @@ class TelegramBareClient: system_lang_code=self.session.system_lang_code, lang_pack='', # "langPacks are for official apps only" query=query - ))) - self.session.layer = LAYER - self.session.save() - return result + )) def disconnect(self): """Disconnects from the Telegram server @@ -308,13 +265,18 @@ class TelegramBareClient: except ConnectionResetError: return False else: - self.disconnect() - self.session.auth_key = None # Force creating new auth_key + # Since we're reconnecting possibly due to a UserMigrateError, + # we need to first know the Data Centers we can connect to. Do + # that before disconnecting. dc = self._get_dc(new_dc) - ip = dc.ip_address - self.session.server_address = ip + + self.session.server_address = dc.ip_address self.session.port = dc.port + # auth_key's are associated with a server, which has now changed + # so it's not valid anymore. Set to None to force recreating it. + self.session.auth_key = None self.session.save() + self.disconnect() return self.connect() # endregion @@ -327,10 +289,8 @@ class TelegramBareClient: def _get_dc(self, dc_id, ipv6=False, cdn=False): """Gets the Data Center (DC) associated to 'dc_id'""" - if TelegramBareClient._dc_options is None: - raise ConnectionError( - 'Cannot determine the required data center IP address. ' - 'Stabilise a successful initial connection first.') + if not TelegramBareClient._config: + TelegramBareClient._config = self(GetConfigRequest()) try: if cdn: @@ -339,15 +299,15 @@ class TelegramBareClient: rsa.add_key(pk.public_key) return next( - dc for dc in TelegramBareClient._dc_options if dc.id == dc_id - and bool(dc.ipv6) == ipv6 and bool(dc.cdn) == cdn + dc for dc in TelegramBareClient._config.dc_options + if dc.id == dc_id and bool(dc.ipv6) == ipv6 and bool(dc.cdn) == cdn ) except StopIteration: if not cdn: raise # New configuration, perhaps a new CDN was added? - TelegramBareClient._dc_options = self(GetConfigRequest()).dc_options + TelegramBareClient._config = self(GetConfigRequest()) return self._get_dc(dc_id, ipv6=ipv6, cdn=cdn) def _get_exported_client(self, dc_id): @@ -387,7 +347,14 @@ class TelegramBareClient: proxy=self._sender.connection.conn.proxy, timeout=self._sender.connection.get_timeout() ) - client.connect(_exported_auth=export_auth, _sync_updates=False) + client.connect(_sync_updates=False) + if isinstance(export_auth, ExportedAuthorization): + client(ImportAuthorizationRequest( + id=export_auth.id, bytes=export_auth.bytes + )) + elif export_auth is not None: + self._logger.warning('Unknown return export_auth type', export_auth) + client._authorized = True # We exported the auth, so we got auth return client @@ -409,9 +376,10 @@ class TelegramBareClient: # This will make use of the new RSA keys for this specific CDN. # - # This relies on the fact that TelegramBareClient._dc_options is - # static and it won't be called from this DC (it would fail). - client.connect(_cdn=True) # Avoid invoking non-CDN specific methods + # We won't be calling GetConfigRequest because it's only called + # when needed by ._get_dc, and also it's static so it's likely + # set already. Avoid invoking non-CDN methods by not syncing updates. + client.connect(_sync_updates=False) client._authorized = self._authorized return client @@ -472,12 +440,34 @@ class TelegramBareClient: invoke = __call__ def _invoke(self, sender, call_receive, update_state, *requests): + # We need to specify the new layer (by initializing a new + # connection) if it has changed from the latest known one. + init_connection = self.session.layer != LAYER + try: # Ensure that we start with no previous errors (i.e. resending) for x in requests: x.confirm_received.clear() x.rpc_error = None + if not self.session.auth_key: + # New key, we need to tell the server we're going to use + # the latest layer and initialize the connection doing so. + self.session.auth_key, self.session.time_offset = \ + authenticator.do_authentication(self._sender.connection) + init_connection = True + + if init_connection: + if len(requests) == 1: + requests = [self._wrap_init_connection(requests[0])] + else: + # We need a SINGLE request (like GetConfig) to init conn. + # Once that's done, the N original requests will be + # invoked. + TelegramBareClient._config = self( + self._wrap_init_connection(GetConfigRequest()) + ) + sender.send(*requests) if not call_receive: @@ -493,6 +483,10 @@ class TelegramBareClient: while not all(x.confirm_received.is_set() for x in requests): sender.receive(update_state=update_state) + except BrokenAuthKeyError: + self._logger.error('Broken auth key, a new one will be generated') + self.session.auth_key = None + except TimeoutError: pass # We will just retry @@ -513,6 +507,12 @@ class TelegramBareClient: sleep(0.1) # Retry forever until we can send the request return None + if init_connection: + # We initialized the connection successfully, even if + # a request had an RPC error we have invoked it fine. + self.session.layer = LAYER + self.session.save() + try: raise next(x.rpc_error for x in requests if x.rpc_error) except StopIteration: