Support connecting and downloading encrypted files from CDNs (#208)

This commit is contained in:
Lonami Exo 2017-08-24 13:02:48 +02:00
parent ba32d6f270
commit 09272ef6fc

View File

@ -1,23 +1,26 @@
import logging
import pyaes
from datetime import timedelta
from hashlib import md5
from os import path
from io import BytesIO
# Import some externalized utilities to work with the Telegram types and more
from telethon.tl.functions import PingRequest
from . import helpers as utils
from .errors import (
RPCError, FloodWaitError, FileMigrateError, TypeNotFoundError
)
from .network import authenticator, MtProtoSender, TcpTransport
from .utils import get_appropriated_part_size
from .crypto import AES
from .crypto import rsa
# For sending and receiving requests
from .tl import TLObject, JsonSession
from .tl.all_tlobjects import layer
from .tl.functions import (InitConnectionRequest, InvokeWithLayerRequest)
from .tl.functions import (
InitConnectionRequest, InvokeWithLayerRequest, PingRequest
)
# Initial request
from .tl.functions.help import GetConfigRequest
@ -27,11 +30,15 @@ from .tl.functions.auth import (
# Easier access for working with media
from .tl.functions.upload import (
GetFileRequest, SaveBigFilePartRequest, SaveFilePartRequest
GetFileRequest, GetCdnFileRequest, ReuploadCdnFileRequest,
SaveBigFilePartRequest, SaveFilePartRequest
)
from .tl.functions.help import GetCdnConfigRequest
# All the types we need to work with
from .tl.types import InputFile, InputFileBig
from .tl.types.upload import FileCdnRedirect, CdnFileReuploadNeeded
class TelegramBareClient:
@ -85,7 +92,7 @@ class TelegramBareClient:
# region Connecting
def connect(self, exported_auth=None):
def connect(self, exported_auth=None, initial_query=None):
"""Connects to the Telegram servers, executing authentication if
required. Note that authenticating to the Telegram servers is
not the same as authenticating the desired user itself, which
@ -93,13 +100,20 @@ class TelegramBareClient:
If 'exported_auth' is not None, it will be used instead to
determine the authorization key for the current session.
If 'initial_query' is not None, it will override the default
'GetConfigRequest()', and its result will be returned ONLY
if the client wasn't connected already.
"""
if self._sender and self._sender.is_connected():
# Try sending a ping to make sure we're connected already
# TODO Maybe there's a better way to check this
try:
self(PingRequest(utils.generate_random_long()))
return True
if initial_query is None:
self(PingRequest(utils.generate_random_long()))
return True
else:
return self(initial_query)
except:
# If ping failed, ensure we're disconnected first
self.disconnect()
@ -122,7 +136,7 @@ class TelegramBareClient:
# Now it's time to send an InitConnectionRequest
# This must always be invoked with the layer we'll be using
if exported_auth is None:
query = GetConfigRequest()
query = initial_query if initial_query else GetConfigRequest()
else:
query = ImportAuthorizationRequest(
exported_auth.id, exported_auth.bytes)
@ -141,26 +155,30 @@ class TelegramBareClient:
layer=layer, query=request
))
if exported_auth is not None:
result = self(GetConfigRequest())
if initial_query is None:
if exported_auth is not None:
result = self(GetConfigRequest())
# We're only interested in the DC options,
# although many other options are available!
self.dc_options = result.dc_options
return True
# We're only interested in the DC options,
# although many other options are available!
self.dc_options = result.dc_options
return True
else:
return result
except TypeNotFoundError as e:
# This is fine, probably layer migration
self._logger.debug('Found invalid item, probably migrating', e)
self.disconnect()
return self.connect(exported_auth=exported_auth)
return self.connect(exported_auth=exported_auth,
initial_query=initial_query)
except (RPCError, ConnectionError) as error:
# Probably errors from the previous session, ignore them
self.disconnect()
self._logger.debug('Could not stabilise initial connection: {}'
.format(error))
return False
return None if initial_query else False
def disconnect(self):
"""Disconnects from the Telegram server"""
@ -213,14 +231,25 @@ class TelegramBareClient:
# region Working with different Data Centers
def _get_dc(self, dc_id):
def _get_dc(self, dc_id, cdn=False):
"""Gets the Data Center (DC) associated to 'dc_id'"""
if not self.dc_options:
raise ConnectionError(
'Cannot determine the required data center IP address. '
'Stabilise a successful initial connection first.')
return next(dc for dc in self.dc_options if dc.id == dc_id)
try:
return next(dc for dc in self.dc_options
if dc.id == dc_id and bool(dc.cdn) == cdn)
except StopIteration:
if not cdn:
raise
for pk in self(GetCdnConfigRequest()).public_keys:
rsa.add_key(pk.public_key)
self.dc_options = self(GetConfigRequest()).dc_options
return self._get_dc(dc_id, cdn=cdn)
def _get_exported_client(self, dc_id,
init_connection=False,
@ -270,6 +299,21 @@ class TelegramBareClient:
self._cached_clients[dc_id] = client
return client
def _get_cdn_client(self, dc_id, query):
"""_get_exported_client counterpart for CDNs.
Returns a tuple of (client, query result)
"""
dc = self._get_dc(dc_id, cdn=True)
session = JsonSession(self.session)
session.server_address = dc.ip_address
session.port = dc.port
client = TelegramBareClient(
session, self.api_id, self.api_hash,
timeout=self._timeout
)
# This will make use of the new RSA keys for this specific CDN
return client, client.connect(initial_query=query)
# endregion
# region Invoking Telegram requests
@ -293,7 +337,7 @@ class TelegramBareClient:
except ConnectionResetError:
self._logger.debug('Server disconnected us. Reconnecting and '
'resending request...')
'resending request...')
self.reconnect()
return self.invoke(request)
@ -414,6 +458,7 @@ class TelegramBareClient:
takes two parameters, (bytes_downloaded, total_bytes). Note that
'total_bytes' simply equals 'file_size', and may be None.
"""
# TODO Clean up this CDN mess
if not part_size_kb:
if not file_size:
part_size_kb = 64 # Reasonable default
@ -436,12 +481,40 @@ class TelegramBareClient:
try:
offset_index = 0
cdn_redirect = None
while True:
offset = offset_index * part_size
try:
result = client(
GetFileRequest(input_location, offset, part_size))
if cdn_redirect:
result = client(GetCdnFileRequest(
cdn_redirect.file_token, offset, part_size
))
else:
result = client(GetFileRequest(
input_location, offset, part_size
))
if isinstance(result, FileCdnRedirect):
# https://core.telegram.org/cdn
cdn_redirect = result
client, cdn_file = self._get_cdn_client(
result.dc_id,
GetCdnFileRequest(
cdn_redirect.file_token, offset, part_size
)
)
if isinstance(cdn_file, CdnFileReuploadNeeded):
# We need to use the original client here
self(ReuploadCdnFileRequest(
file_token=cdn_redirect.file_token,
request_token=cdn_file.request_token
))
# TODO else: we have the first file bytes already,
# avoid a redundant call
continue
except FileMigrateError as e:
client = self._get_exported_client(e.new_dc)
continue
@ -451,7 +524,13 @@ class TelegramBareClient:
# If we have received no data (0 bytes), the file is over
# So there is nothing left to download and write
if not result.bytes:
return result.type # Return some extra information
# Return some extra information, unless it's a cdn file
return getattr(result, 'type', '')
if cdn_redirect:
# We first need to decrypt the result
# TODO Decrypt the file, and use libssl if available
pass
f.write(result.bytes)
if progress_callback: