From 09272ef6fcd6c0b4915875d250d9f66946e8190a Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 24 Aug 2017 13:02:48 +0200 Subject: [PATCH] Support connecting and downloading encrypted files from CDNs (#208) --- telethon/telegram_bare_client.py | 123 +++++++++++++++++++++++++------ 1 file changed, 101 insertions(+), 22 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index c10dfe79..e6a6f44b 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -1,23 +1,26 @@ import logging +import pyaes from datetime import timedelta from hashlib import md5 from os import path from io import BytesIO # Import some externalized utilities to work with the Telegram types and more -from telethon.tl.functions import PingRequest - from . import helpers as utils from .errors import ( RPCError, FloodWaitError, FileMigrateError, TypeNotFoundError ) from .network import authenticator, MtProtoSender, TcpTransport from .utils import get_appropriated_part_size +from .crypto import AES +from .crypto import rsa # For sending and receiving requests from .tl import TLObject, JsonSession from .tl.all_tlobjects import layer -from .tl.functions import (InitConnectionRequest, InvokeWithLayerRequest) +from .tl.functions import ( + InitConnectionRequest, InvokeWithLayerRequest, PingRequest +) # Initial request from .tl.functions.help import GetConfigRequest @@ -27,11 +30,15 @@ from .tl.functions.auth import ( # Easier access for working with media from .tl.functions.upload import ( - GetFileRequest, SaveBigFilePartRequest, SaveFilePartRequest + GetFileRequest, GetCdnFileRequest, ReuploadCdnFileRequest, + SaveBigFilePartRequest, SaveFilePartRequest ) +from .tl.functions.help import GetCdnConfigRequest + # All the types we need to work with from .tl.types import InputFile, InputFileBig +from .tl.types.upload import FileCdnRedirect, CdnFileReuploadNeeded class TelegramBareClient: @@ -85,7 +92,7 @@ class TelegramBareClient: # region Connecting - def connect(self, exported_auth=None): + def connect(self, exported_auth=None, initial_query=None): """Connects to the Telegram servers, executing authentication if required. Note that authenticating to the Telegram servers is not the same as authenticating the desired user itself, which @@ -93,13 +100,20 @@ class TelegramBareClient: If 'exported_auth' is not None, it will be used instead to determine the authorization key for the current session. + + If 'initial_query' is not None, it will override the default + 'GetConfigRequest()', and its result will be returned ONLY + if the client wasn't connected already. """ if self._sender and self._sender.is_connected(): # Try sending a ping to make sure we're connected already # TODO Maybe there's a better way to check this try: - self(PingRequest(utils.generate_random_long())) - return True + if initial_query is None: + self(PingRequest(utils.generate_random_long())) + return True + else: + return self(initial_query) except: # If ping failed, ensure we're disconnected first self.disconnect() @@ -122,7 +136,7 @@ class TelegramBareClient: # Now it's time to send an InitConnectionRequest # This must always be invoked with the layer we'll be using if exported_auth is None: - query = GetConfigRequest() + query = initial_query if initial_query else GetConfigRequest() else: query = ImportAuthorizationRequest( exported_auth.id, exported_auth.bytes) @@ -141,26 +155,30 @@ class TelegramBareClient: layer=layer, query=request )) - if exported_auth is not None: - result = self(GetConfigRequest()) + if initial_query is None: + if exported_auth is not None: + result = self(GetConfigRequest()) - # We're only interested in the DC options, - # although many other options are available! - self.dc_options = result.dc_options - return True + # We're only interested in the DC options, + # although many other options are available! + self.dc_options = result.dc_options + return True + else: + return result except TypeNotFoundError as e: # This is fine, probably layer migration self._logger.debug('Found invalid item, probably migrating', e) self.disconnect() - return self.connect(exported_auth=exported_auth) + return self.connect(exported_auth=exported_auth, + initial_query=initial_query) except (RPCError, ConnectionError) as error: # Probably errors from the previous session, ignore them self.disconnect() self._logger.debug('Could not stabilise initial connection: {}' .format(error)) - return False + return None if initial_query else False def disconnect(self): """Disconnects from the Telegram server""" @@ -213,14 +231,25 @@ class TelegramBareClient: # region Working with different Data Centers - def _get_dc(self, dc_id): + def _get_dc(self, dc_id, cdn=False): """Gets the Data Center (DC) associated to 'dc_id'""" if not self.dc_options: raise ConnectionError( 'Cannot determine the required data center IP address. ' 'Stabilise a successful initial connection first.') - return next(dc for dc in self.dc_options if dc.id == dc_id) + try: + return next(dc for dc in self.dc_options + if dc.id == dc_id and bool(dc.cdn) == cdn) + except StopIteration: + if not cdn: + raise + + for pk in self(GetCdnConfigRequest()).public_keys: + rsa.add_key(pk.public_key) + + self.dc_options = self(GetConfigRequest()).dc_options + return self._get_dc(dc_id, cdn=cdn) def _get_exported_client(self, dc_id, init_connection=False, @@ -270,6 +299,21 @@ class TelegramBareClient: self._cached_clients[dc_id] = client return client + def _get_cdn_client(self, dc_id, query): + """_get_exported_client counterpart for CDNs. + Returns a tuple of (client, query result) + """ + dc = self._get_dc(dc_id, cdn=True) + session = JsonSession(self.session) + session.server_address = dc.ip_address + session.port = dc.port + client = TelegramBareClient( + session, self.api_id, self.api_hash, + timeout=self._timeout + ) + # This will make use of the new RSA keys for this specific CDN + return client, client.connect(initial_query=query) + # endregion # region Invoking Telegram requests @@ -293,7 +337,7 @@ class TelegramBareClient: except ConnectionResetError: self._logger.debug('Server disconnected us. Reconnecting and ' - 'resending request...') + 'resending request...') self.reconnect() return self.invoke(request) @@ -414,6 +458,7 @@ class TelegramBareClient: takes two parameters, (bytes_downloaded, total_bytes). Note that 'total_bytes' simply equals 'file_size', and may be None. """ + # TODO Clean up this CDN mess if not part_size_kb: if not file_size: part_size_kb = 64 # Reasonable default @@ -436,12 +481,40 @@ class TelegramBareClient: try: offset_index = 0 + cdn_redirect = None while True: offset = offset_index * part_size try: - result = client( - GetFileRequest(input_location, offset, part_size)) + if cdn_redirect: + result = client(GetCdnFileRequest( + cdn_redirect.file_token, offset, part_size + )) + else: + result = client(GetFileRequest( + input_location, offset, part_size + )) + + if isinstance(result, FileCdnRedirect): + # https://core.telegram.org/cdn + cdn_redirect = result + client, cdn_file = self._get_cdn_client( + result.dc_id, + GetCdnFileRequest( + cdn_redirect.file_token, offset, part_size + ) + ) + + if isinstance(cdn_file, CdnFileReuploadNeeded): + # We need to use the original client here + self(ReuploadCdnFileRequest( + file_token=cdn_redirect.file_token, + request_token=cdn_file.request_token + )) + # TODO else: we have the first file bytes already, + # avoid a redundant call + continue + except FileMigrateError as e: client = self._get_exported_client(e.new_dc) continue @@ -451,7 +524,13 @@ class TelegramBareClient: # If we have received no data (0 bytes), the file is over # So there is nothing left to download and write if not result.bytes: - return result.type # Return some extra information + # Return some extra information, unless it's a cdn file + return getattr(result, 'type', '') + + if cdn_redirect: + # We first need to decrypt the result + # TODO Decrypt the file, and use libssl if available + pass f.write(result.bytes) if progress_callback: