From f9cd220ddd9a0e37afbca4308f62434842d5e568 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Mon, 11 Jun 2018 20:05:10 +0200 Subject: [PATCH] Implement _get_exported_sender --- telethon/client/downloads.py | 40 ++++------- telethon/client/telegrambaseclient.py | 96 +++++++++++---------------- 2 files changed, 49 insertions(+), 87 deletions(-) diff --git a/telethon/client/downloads.py b/telethon/client/downloads.py index 80b5b1e6..9fe65ec0 100644 --- a/telethon/client/downloads.py +++ b/telethon/client/downloads.py @@ -199,9 +199,8 @@ class DownloadMethods(UserMethods): else: f = file - # The used client will change if FileMigrateError occurs - client = self - cdn_decrypter = None + # The used sender will change if ``FileMigrateError`` occurs + sender = self._sender input_location = utils.get_input_location(input_location) __log__.info('Downloading file in chunks of %d bytes', part_size) @@ -209,47 +208,32 @@ class DownloadMethods(UserMethods): offset = 0 while True: try: - if cdn_decrypter: - result = cdn_decrypter.get_file() - else: - result = client(functions.upload.GetFileRequest( - input_location, offset, part_size - )) - - if isinstance(result, types.upload.FileCdnRedirect): - __log__.info('File lives in a CDN') - cdn_decrypter, result = \ - await CdnDecrypter.prepare_decrypter( - client, await self._get_cdn_client(result), - result - ) - + result = await sender.send(functions.upload.GetFileRequest( + input_location, offset, part_size + )) + if isinstance(result, types.upload.FileCdnRedirect): + # TODO Implement + raise NotImplementedError except errors.FileMigrateError as e: __log__.info('File lives in another DC') - client = await self._get_exported_client(e.new_dc) + sender = await self._get_exported_sender(e.new_dc) continue offset += part_size - - # If we have received no data (0 bytes), the file is over - # So there is nothing left to download and write if not result.bytes: - # Return some extra information, unless it's a CDN file if in_memory: f.flush() return f.getvalue() else: return getattr(result, 'type', '') + __log__.debug('Saving %d more bytes', len(result.bytes)) f.write(result.bytes) - __log__.debug('Saved %d more bytes', len(result.bytes)) if progress_callback: progress_callback(f.tell(), file_size) finally: - if client != self: - await client.disconnect() - if cdn_decrypter: - await cdn_decrypter.client.disconnect() + if sender != self._sender: + await sender.disconnect() if isinstance(file, str) or in_memory: f.close() diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index f57afd8b..36ba255e 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -151,10 +151,10 @@ class TelegramBaseClient(abc.ABC): if isinstance(connection, type): connection = connection(proxy=proxy, timeout=timeout) - # Used on connection - the user may modify these and reconnect + # Used on connection. Capture the variables in a lambda since + # exporting clients need to create this InvokeWithLayerRequest. system = platform.uname() - state = MTProtoState(self.session.auth_key) - first = functions.InvokeWithLayerRequest( + self._init_with = lambda x: functions.InvokeWithLayerRequest( LAYER, functions.InitConnectionRequest( api_id=self.api_id, device_model=device_model or system.system or 'Unknown', @@ -163,15 +163,20 @@ class TelegramBaseClient(abc.ABC): lang_code=lang_code, system_lang_code=system_lang_code, lang_pack='', # "langPacks are for official apps only" - query=functions.help.GetConfigRequest() + query=x ) ) - self._sender = MTProtoSender(state, connection, first_query=first) - # Cache "exported" sessions as 'dc_id: Session' not to recreate - # them all the time since generating a new key is a relatively - # expensive operation. - self._exported_sessions = {} + state = MTProtoState(self.session.auth_key) + self._connection = connection + self._sender = MTProtoSender( + state, connection, + first_query=self._init_with(functions.help.GetConfigRequest()) + ) + + # Cache :tl:`ExportedAuthorization` as ``dc_id: MTProtoState`` + # to easily import them when getting an exported sender. + self._exported_auths = {} # This member will process updates if enabled. # One may change self.updates.enabled at any later point. @@ -180,10 +185,6 @@ class TelegramBaseClient(abc.ABC): # Save whether the user is authorized here (a.k.a. logged in) self._authorized = None # None = We don't know yet - # The first request must be in invokeWithLayer(initConnection(X)). - # See https://core.telegram.org/api/invoking#saving-client-info. - self._first_request = True - # Default PingRequest delay self._last_ping = datetime.now() self._ping_delay = timedelta(minutes=1) @@ -279,57 +280,34 @@ class TelegramBaseClient(abc.ABC): and bool(dc.ipv6) == self._use_ipv6 and bool(dc.cdn) == cdn ) - async def _get_exported_client(self, dc_id): - """Creates and connects a new TelegramBareClient for the desired DC. - - If it's the first time calling the method with a given dc_id, - a new session will be first created, and its auth key generated. - Exporting/Importing the authorization will also be done so that - the auth is bound with the key. + async def _get_exported_sender(self, dc_id): + """ + Returns a cached `MTProtoSender` for the given `dc_id`, or creates + a new one if it doesn't exist yet, and imports a freshly exported + authorization key for it to be usable. """ - # TODO Implement - raise NotImplementedError # Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt - # for clearly showing how to export the authorization! ^^ - session = self._exported_sessions.get(dc_id) - if session: - export_auth = None # Already bound with the auth key - else: - # TODO Add a lock, don't allow two threads to create an auth key - # (when calling .connect() if there wasn't a previous session). - # for the same data center. - dc = self._get_dc(dc_id) - - # Export the current authorization to the new DC. + # for clearly showing how to export the authorization + auth = self._exported_auths.get(dc_id) + dc = await self._get_dc(dc_id) + state = MTProtoState(auth) + # TODO Don't hardcode ConnectionTcpFull() + # Can't reuse self._sender._connection as it has its own seqno. + # + # If one were to do that, Telegram would reset the connection + # with no further clues. + sender = MTProtoSender(state, ConnectionTcpFull()) + await sender.connect(dc.ip_address, dc.port) + if not auth: __log__.info('Exporting authorization for data center %s', dc) - export_auth =\ - await self(functions.auth.ExportAuthorizationRequest(dc_id)) - - # Create a temporary session for this IP address, which needs - # to be different because each auth_key is unique per DC. - # - # Construct this session with the connection parameters - # (system version, device model...) from the current one. - session = self.session.clone() - session.set_dc(dc.id, dc.ip_address, dc.port) - self._exported_sessions[dc_id] = session - - __log__.info('Creating exported new client') - client = TelegramBareClient( - session, self.api_id, self.api_hash, - proxy=self._sender.connection.conn.proxy, - timeout=self._sender.connection.get_timeout() - ) - client.connect(_sync_updates=False) - if isinstance(export_auth, ExportedAuthorization): - client(ImportAuthorizationRequest( - id=export_auth.id, bytes=export_auth.bytes + auth = await self(functions.auth.ExportAuthorizationRequest(dc_id)) + req = self._init_with(functions.auth.ImportAuthorizationRequest( + id=auth.id, bytes=auth.bytes )) - elif export_auth is not None: - __log__.warning('Unknown export auth type %s', export_auth) + await sender.send(req) + self._exported_auths[dc_id] = sender.state.auth_key - client._authorized = True # We exported the auth, so we got auth - return client + return sender async def _get_cdn_client(self, cdn_redirect): """Similar to ._get_exported_client, but for CDNs"""