From 097d1669b26f7d4e2b0afe4fabb16ab8bc1847bc Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Tue, 30 May 2017 13:03:14 +0200 Subject: [PATCH] Allow invoking requests on different DCs (#53) --- telethon/telegram_client.py | 112 +++++++++++++++++++++++++++++++++++- 1 file changed, 110 insertions(+), 2 deletions(-) diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 21b1ba03..81ee707e 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -90,6 +90,12 @@ class TelegramClient: self._updates_thread_running = Event() self._updates_thread_receiving = Event() + # Cache "exported" senders 'dc_id: MtProtoSender' and + # their corresponding sessions not to recreate them all + # the time since it's a (somewhat expensive) process. + self._cached_senders = {} + self._cached_sessions = {} + # These will be set later self._updates_thread = None self.dc_options = None @@ -156,6 +162,13 @@ class TelegramClient: self.transport.close() self.transport = None + # Also disconnect all the cached senders + for sender in self._cached_senders: + sender.disconnect() + + self._cached_senders.clear() + self._cached_sessions.clear() + def reconnect(self): """Disconnects and connects again (effectively reconnecting)""" self.disconnect() @@ -187,6 +200,73 @@ class TelegramClient: return next(dc for dc in self.dc_options if dc.id == dc_id) + def _get_exported_sender(self, dc_id, init_connection=False): + """Gets a cached exported MtProtoSender for the desired DC. + + If it's the first time retrieving the MtProtoSender, the + current authorization is exported to the new DC so that + it can be used there, and the connection is initialized. + + If after using the sender a ConnectionResetError is rose, + this method should be called again with init_connection=True + in order to perform the reconnection.""" + # Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt + # for clearly showing how to export the authorization! ^^ + + sender = self._cached_senders.get(dc_id) + session = self._cached_sessions.get(dc_id) + + if sender and session: + if init_connection: + # TODO reconnect + pass + + return sender + else: + from telethon.tl.functions.auth import \ + ExportAuthorizationRequest, ImportAuthorizationRequest + + dc = self._get_dc(dc_id) + + # Step 1. Export the current authorization to the new DC. + export_auth = self.invoke(ExportAuthorizationRequest(dc_id)) + + # Step 2. Create a transport connected to the new DC. + # We also create a temporary session because + # it's what will contain the required AuthKey + # for MtProtoSender to work. + transport = TcpTransport(dc.ip_address, dc.port, proxy=self.proxy) + session = Session(None) + session.auth_key, session.time_offset = \ + authenticator.do_authentication(transport) + + # Step 3. After authenticating on the new DC, + # we can create the proper MtProtoSender. + sender = MtProtoSender(transport, session) + sender.connect() + + # InvokeWithLayer(InitConnection(ImportAuthorization(...))) + init_connection = InitConnectionRequest( + api_id=self.api_id, + device_model=platform.node(), + system_version=platform.system(), + app_version=self.__version__, + lang_code='en', + query=ImportAuthorizationRequest( + export_auth.id, export_auth.bytes) + ) + query = InvokeWithLayerRequest(layer=layer, query=init_connection) + + sender.send(query) + sender.receive(query) + + # Step 4. We're connected and using the desired layer! + # Don't go through this expensive process every time. + self._cached_senders[dc_id] = sender + self._cached_sessions[dc_id] = session + + return sender + # endregion # region Telegram requests functions @@ -222,8 +302,13 @@ class TelegramClient: if throw_invalid_dc: raise - self._reconnect_to_dc(error.new_dc) - return self.invoke(request, timeout=timeout, throw_invalid_dc=True) + if error.message.startswith('FILE_MIGRATE_'): + return self.invoke_on_dc(request, error.new_dc, + timeout=timeout) + else: + self._reconnect_to_dc(error.new_dc) + return self.invoke(request, + timeout=timeout, throw_invalid_dc=True) except ConnectionResetError: self._logger.info('Server disconnected us. Reconnecting and ' @@ -239,6 +324,29 @@ class TelegramClient: finally: self._lock.release() + def invoke_on_dc(self, request, dc_id, + timeout=timedelta(seconds=5), reconnect=False): + """Invokes the given request on a different DC + by making use of the exported MtProtoSenders. + + If 'reconnect=True', then the a reconnection will be performed + and ConnectionResetError will be rose if it occurs a second time. + """ + try: + sender = self._get_exported_sender( + dc_id, init_connection=reconnect) + + sender.send(request) + sender.receive(request) + return request.result + + except ConnectionResetError: + if reconnect: + raise + else: + return self.invoke_on_dc(request, dc_id, + timeout=timeout, reconnect=True) + # region Authorization requests def is_user_authorized(self):