Let TelegramBareClient handle FileMigrateErrors instead (closes #148)

This commit is contained in:
Lonami Exo 2017-07-04 10:21:15 +02:00
parent 15673d9f77
commit 1f3aec589b
2 changed files with 75 additions and 105 deletions

View File

@ -5,22 +5,25 @@ from os import path
# Import some externalized utilities to work with the Telegram types and more # Import some externalized utilities to work with the Telegram types and more
from . import helpers as utils from . import helpers as utils
from .errors import RPCError, FloodWaitError from .errors import RPCError, FloodWaitError, FileMigrateError
from .network import authenticator, MtProtoSender, TcpTransport from .network import authenticator, MtProtoSender, TcpTransport
from .utils import get_appropriated_part_size from .utils import get_appropriated_part_size
# For sending and receiving requests # For sending and receiving requests
from .tl import MTProtoRequest from .tl import MTProtoRequest, JsonSession
from .tl.all_tlobjects import layer from .tl.all_tlobjects import layer
from .tl.functions import (InitConnectionRequest, InvokeWithLayerRequest) from .tl.functions import (InitConnectionRequest, InvokeWithLayerRequest)
# Initial request # Initial request
from .tl.functions.help import GetConfigRequest from .tl.functions.help import GetConfigRequest
from .tl.functions.auth import ImportAuthorizationRequest from .tl.functions.auth import (
ImportAuthorizationRequest, ExportAuthorizationRequest
)
# Easier access for working with media # Easier access for working with media
from .tl.functions.upload import ( from .tl.functions.upload import (
GetFileRequest, SaveBigFilePartRequest, SaveFilePartRequest) GetFileRequest, SaveBigFilePartRequest, SaveFilePartRequest
)
# All the types we need to work with # All the types we need to work with
from .tl.types import InputFile, InputFileBig from .tl.types import InputFile, InputFileBig
@ -64,6 +67,11 @@ class TelegramBareClient:
self._timeout = timeout self._timeout = timeout
self._logger = logging.getLogger(__name__) self._logger = logging.getLogger(__name__)
# Cache "exported" senders 'dc_id: TelegramBareClient' and
# their corresponding sessions not to recreate them all
# the time since it's a (somewhat expensive) process.
self._cached_clients = {}
# These will be set later # These will be set later
self.dc_options = None self.dc_options = None
self._sender = None self._sender = None
@ -125,8 +133,6 @@ class TelegramBareClient:
)) ))
if exported_auth is not None: if exported_auth is not None:
# TODO Don't actually need this for exported authorizations,
# they're only valid on such data center.
result = self(GetConfigRequest()) result = self(GetConfigRequest())
# We're only interested in the DC options, # We're only interested in the DC options,
@ -201,6 +207,54 @@ class TelegramBareClient:
return next(dc for dc in self.dc_options if dc.id == dc_id) return next(dc for dc in self.dc_options if dc.id == dc_id)
def _get_exported_client(self, dc_id,
init_connection=False,
bypass_cache=False):
"""Gets a cached exported TelegramBareClient for the desired DC.
If it's the first time retrieving the TelegramBareClient, the
current authorization is exported to the new DC so that
it can be used there, and the connection is initialized.
If after using the sender a ConnectionResetError is raised,
this method should be called again with init_connection=True
in order to perform the reconnection.
If bypass_cache is True, a new client will be exported and
it will not be cached.
"""
# Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt
# for clearly showing how to export the authorization! ^^
client = self._cached_clients.get(dc_id)
if client and not bypass_cache:
if init_connection:
client.reconnect()
return client
else:
dc = self._get_dc(dc_id)
# Export the current authorization to the new DC.
export_auth = self(ExportAuthorizationRequest(dc_id))
# Create a temporary session for this IP address, which needs
# to be different because each auth_key is unique per DC.
#
# Construct this session with the connection parameters
# (system version, device model...) from the current one.
session = JsonSession(self.session)
session.server_address = dc.ip_address
session.port = dc.port
client = TelegramBareClient(
session, self.api_id, self.api_hash,
timeout=self._timeout
)
client.connect(exported_auth=export_auth)
if not bypass_cache:
# Don't go through this expensive process every time.
self._cached_clients[dc_id] = client
return client
# endregion # endregion
# region Invoking Telegram requests # region Invoking Telegram requests
@ -341,12 +395,21 @@ class TelegramBareClient:
else: else:
f = file f = file
# The used client will change if FileMigrateError occurs
client = self
try: try:
offset_index = 0 offset_index = 0
while True: while True:
offset = offset_index * part_size offset = offset_index * part_size
result = self(
GetFileRequest(input_location, offset, part_size)) try:
result = client(
GetFileRequest(input_location, offset, part_size))
except FileMigrateError as e:
client = self._get_exported_client(e.new_dc)
continue
offset_index += 1 offset_index += 1
# If we have received no data (0 bytes), the file is over # If we have received no data (0 bytes), the file is over

View File

@ -8,8 +8,8 @@ from . import TelegramBareClient
# Import some externalized utilities to work with the Telegram types and more # Import some externalized utilities to work with the Telegram types and more
from . import helpers as utils from . import helpers as utils
from .errors import (RPCError, UnauthorizedError, InvalidParameterError, from .errors import (RPCError, UnauthorizedError, InvalidParameterError,
ReadCancelledError, FileMigrateError, PhoneMigrateError, ReadCancelledError, PhoneCodeEmptyError,
NetworkMigrateError, UserMigrateError, PhoneCodeEmptyError, PhoneMigrateError, NetworkMigrateError, UserMigrateError,
PhoneCodeExpiredError, PhoneCodeHashEmptyError, PhoneCodeExpiredError, PhoneCodeHashEmptyError,
PhoneCodeInvalidError, InvalidChecksumError) PhoneCodeInvalidError, InvalidChecksumError)
@ -123,10 +123,6 @@ class TelegramClient(TelegramBareClient):
self.session.system_lang_code = \ self.session.system_lang_code = \
system_lang_code if system_lang_code else self.session.lang_code system_lang_code if system_lang_code else self.session.lang_code
# Cache "exported" senders 'dc_id: MtProtoSender' and
# their corresponding sessions not to recreate them all
# the time since it's a (somewhat expensive) process.
self._cached_clients = {}
self._updates_thread = None self._updates_thread = None
self._phone_code_hashes = {} self._phone_code_hashes = {}
@ -162,55 +158,6 @@ class TelegramClient(TelegramBareClient):
# region Working with different connections # region Working with different connections
def _get_exported_client(self, dc_id,
init_connection=False,
bypass_cache=False):
"""Gets a cached exported TelegramBareClient for the desired DC.
If it's the first time retrieving the TelegramBareClient, the
current authorization is exported to the new DC so that
it can be used there, and the connection is initialized.
If after using the sender a ConnectionResetError is raised,
this method should be called again with init_connection=True
in order to perform the reconnection.
If bypass_cache is True, a new client will be exported and
it will not be cached.
"""
# Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt
# for clearly showing how to export the authorization! ^^
client = self._cached_clients.get(dc_id)
if client and not bypass_cache:
if init_connection:
client.reconnect()
return client
else:
dc = self._get_dc(dc_id)
# Export the current authorization to the new DC.
export_auth = self(ExportAuthorizationRequest(dc_id))
# Create a temporary session for this IP address, which needs
# to be different because each auth_key is unique per DC.
#
# Construct this session with the connection parameters
# (system version, device model...) from the current one.
session = JsonSession(self.session)
session.server_address = dc.ip_address
session.port = dc.port
client = TelegramBareClient(
session, self.api_id, self.api_hash,
timeout=self._timeout
)
client.connect(exported_auth=export_auth)
if not bypass_cache:
# Don't go through this expensive process every time.
self._cached_clients[dc_id] = client
return client
def create_new_connection(self, on_dc=None): def create_new_connection(self, on_dc=None):
"""Creates a new connection which can be used in parallel """Creates a new connection which can be used in parallel
with the original TelegramClient. A TelegramBareClient with the original TelegramClient. A TelegramBareClient
@ -222,15 +169,10 @@ class TelegramClient(TelegramBareClient):
If the client is meant to be used on a different data If the client is meant to be used on a different data
center, the data center ID should be specified instead. center, the data center ID should be specified instead.
Note that TelegramBareClients will not handle automatic
reconnection (i.e. switching to another data center to
download media), and InvalidDCError will be raised in
such case.
""" """
if on_dc is None: if on_dc is None:
client = TelegramBareClient(self.session, self.api_id, self.api_hash, client = TelegramBareClient(
proxy=self.proxy) self.session, self.api_id, self.api_hash, proxy=self.proxy)
client.connect() client.connect()
else: else:
client = self._get_exported_client(on_dc, bypass_cache=True) client = self._get_exported_client(on_dc, bypass_cache=True)
@ -724,41 +666,6 @@ class TelegramClient(TelegramBareClient):
return file_path return file_path
def download_file(self,
input_location,
file,
part_size_kb=None,
file_size=None,
progress_callback=None,
on_dc=None):
"""Downloads the given InputFileLocation to file (a stream or str).
If 'progress_callback' is not None, it should be a function that
takes two parameters, (bytes_downloaded, total_bytes). Note that
'total_bytes' simply equals 'file_size', and may be None.
"""
if on_dc is None:
try:
super().download_file(
input_location,
file,
part_size_kb=part_size_kb,
file_size=file_size,
progress_callback=progress_callback
)
except FileMigrateError as e:
on_dc = e.new_dc
if on_dc is not None:
client = self._get_exported_client(on_dc)
client.download_file(
input_location,
file,
part_size_kb=part_size_kb,
file_size=file_size,
progress_callback=progress_callback
)
# endregion # endregion
# endregion # endregion