mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2024-11-22 17:36:34 +03:00
Implement _get_exported_sender
This commit is contained in:
parent
64dd957189
commit
f9cd220ddd
|
@ -199,9 +199,8 @@ class DownloadMethods(UserMethods):
|
|||
else:
|
||||
f = file
|
||||
|
||||
# The used client will change if FileMigrateError occurs
|
||||
client = self
|
||||
cdn_decrypter = None
|
||||
# The used sender will change if ``FileMigrateError`` occurs
|
||||
sender = self._sender
|
||||
input_location = utils.get_input_location(input_location)
|
||||
|
||||
__log__.info('Downloading file in chunks of %d bytes', part_size)
|
||||
|
@ -209,47 +208,32 @@ class DownloadMethods(UserMethods):
|
|||
offset = 0
|
||||
while True:
|
||||
try:
|
||||
if cdn_decrypter:
|
||||
result = cdn_decrypter.get_file()
|
||||
else:
|
||||
result = client(functions.upload.GetFileRequest(
|
||||
input_location, offset, part_size
|
||||
))
|
||||
|
||||
if isinstance(result, types.upload.FileCdnRedirect):
|
||||
__log__.info('File lives in a CDN')
|
||||
cdn_decrypter, result = \
|
||||
await CdnDecrypter.prepare_decrypter(
|
||||
client, await self._get_cdn_client(result),
|
||||
result
|
||||
)
|
||||
|
||||
result = await sender.send(functions.upload.GetFileRequest(
|
||||
input_location, offset, part_size
|
||||
))
|
||||
if isinstance(result, types.upload.FileCdnRedirect):
|
||||
# TODO Implement
|
||||
raise NotImplementedError
|
||||
except errors.FileMigrateError as e:
|
||||
__log__.info('File lives in another DC')
|
||||
client = await self._get_exported_client(e.new_dc)
|
||||
sender = await self._get_exported_sender(e.new_dc)
|
||||
continue
|
||||
|
||||
offset += part_size
|
||||
|
||||
# If we have received no data (0 bytes), the file is over
|
||||
# So there is nothing left to download and write
|
||||
if not result.bytes:
|
||||
# Return some extra information, unless it's a CDN file
|
||||
if in_memory:
|
||||
f.flush()
|
||||
return f.getvalue()
|
||||
else:
|
||||
return getattr(result, 'type', '')
|
||||
|
||||
__log__.debug('Saving %d more bytes', len(result.bytes))
|
||||
f.write(result.bytes)
|
||||
__log__.debug('Saved %d more bytes', len(result.bytes))
|
||||
if progress_callback:
|
||||
progress_callback(f.tell(), file_size)
|
||||
finally:
|
||||
if client != self:
|
||||
await client.disconnect()
|
||||
if cdn_decrypter:
|
||||
await cdn_decrypter.client.disconnect()
|
||||
if sender != self._sender:
|
||||
await sender.disconnect()
|
||||
if isinstance(file, str) or in_memory:
|
||||
f.close()
|
||||
|
||||
|
|
|
@ -151,10 +151,10 @@ class TelegramBaseClient(abc.ABC):
|
|||
if isinstance(connection, type):
|
||||
connection = connection(proxy=proxy, timeout=timeout)
|
||||
|
||||
# Used on connection - the user may modify these and reconnect
|
||||
# Used on connection. Capture the variables in a lambda since
|
||||
# exporting clients need to create this InvokeWithLayerRequest.
|
||||
system = platform.uname()
|
||||
state = MTProtoState(self.session.auth_key)
|
||||
first = functions.InvokeWithLayerRequest(
|
||||
self._init_with = lambda x: functions.InvokeWithLayerRequest(
|
||||
LAYER, functions.InitConnectionRequest(
|
||||
api_id=self.api_id,
|
||||
device_model=device_model or system.system or 'Unknown',
|
||||
|
@ -163,15 +163,20 @@ class TelegramBaseClient(abc.ABC):
|
|||
lang_code=lang_code,
|
||||
system_lang_code=system_lang_code,
|
||||
lang_pack='', # "langPacks are for official apps only"
|
||||
query=functions.help.GetConfigRequest()
|
||||
query=x
|
||||
)
|
||||
)
|
||||
self._sender = MTProtoSender(state, connection, first_query=first)
|
||||
|
||||
# Cache "exported" sessions as 'dc_id: Session' not to recreate
|
||||
# them all the time since generating a new key is a relatively
|
||||
# expensive operation.
|
||||
self._exported_sessions = {}
|
||||
state = MTProtoState(self.session.auth_key)
|
||||
self._connection = connection
|
||||
self._sender = MTProtoSender(
|
||||
state, connection,
|
||||
first_query=self._init_with(functions.help.GetConfigRequest())
|
||||
)
|
||||
|
||||
# Cache :tl:`ExportedAuthorization` as ``dc_id: MTProtoState``
|
||||
# to easily import them when getting an exported sender.
|
||||
self._exported_auths = {}
|
||||
|
||||
# This member will process updates if enabled.
|
||||
# One may change self.updates.enabled at any later point.
|
||||
|
@ -180,10 +185,6 @@ class TelegramBaseClient(abc.ABC):
|
|||
# Save whether the user is authorized here (a.k.a. logged in)
|
||||
self._authorized = None # None = We don't know yet
|
||||
|
||||
# The first request must be in invokeWithLayer(initConnection(X)).
|
||||
# See https://core.telegram.org/api/invoking#saving-client-info.
|
||||
self._first_request = True
|
||||
|
||||
# Default PingRequest delay
|
||||
self._last_ping = datetime.now()
|
||||
self._ping_delay = timedelta(minutes=1)
|
||||
|
@ -279,57 +280,34 @@ class TelegramBaseClient(abc.ABC):
|
|||
and bool(dc.ipv6) == self._use_ipv6 and bool(dc.cdn) == cdn
|
||||
)
|
||||
|
||||
async def _get_exported_client(self, dc_id):
|
||||
"""Creates and connects a new TelegramBareClient for the desired DC.
|
||||
|
||||
If it's the first time calling the method with a given dc_id,
|
||||
a new session will be first created, and its auth key generated.
|
||||
Exporting/Importing the authorization will also be done so that
|
||||
the auth is bound with the key.
|
||||
async def _get_exported_sender(self, dc_id):
|
||||
"""
|
||||
Returns a cached `MTProtoSender` for the given `dc_id`, or creates
|
||||
a new one if it doesn't exist yet, and imports a freshly exported
|
||||
authorization key for it to be usable.
|
||||
"""
|
||||
# TODO Implement
|
||||
raise NotImplementedError
|
||||
# Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt
|
||||
# for clearly showing how to export the authorization! ^^
|
||||
session = self._exported_sessions.get(dc_id)
|
||||
if session:
|
||||
export_auth = None # Already bound with the auth key
|
||||
else:
|
||||
# TODO Add a lock, don't allow two threads to create an auth key
|
||||
# (when calling .connect() if there wasn't a previous session).
|
||||
# for the same data center.
|
||||
dc = self._get_dc(dc_id)
|
||||
|
||||
# Export the current authorization to the new DC.
|
||||
# for clearly showing how to export the authorization
|
||||
auth = self._exported_auths.get(dc_id)
|
||||
dc = await self._get_dc(dc_id)
|
||||
state = MTProtoState(auth)
|
||||
# TODO Don't hardcode ConnectionTcpFull()
|
||||
# Can't reuse self._sender._connection as it has its own seqno.
|
||||
#
|
||||
# If one were to do that, Telegram would reset the connection
|
||||
# with no further clues.
|
||||
sender = MTProtoSender(state, ConnectionTcpFull())
|
||||
await sender.connect(dc.ip_address, dc.port)
|
||||
if not auth:
|
||||
__log__.info('Exporting authorization for data center %s', dc)
|
||||
export_auth =\
|
||||
await self(functions.auth.ExportAuthorizationRequest(dc_id))
|
||||
|
||||
# Create a temporary session for this IP address, which needs
|
||||
# to be different because each auth_key is unique per DC.
|
||||
#
|
||||
# Construct this session with the connection parameters
|
||||
# (system version, device model...) from the current one.
|
||||
session = self.session.clone()
|
||||
session.set_dc(dc.id, dc.ip_address, dc.port)
|
||||
self._exported_sessions[dc_id] = session
|
||||
|
||||
__log__.info('Creating exported new client')
|
||||
client = TelegramBareClient(
|
||||
session, self.api_id, self.api_hash,
|
||||
proxy=self._sender.connection.conn.proxy,
|
||||
timeout=self._sender.connection.get_timeout()
|
||||
)
|
||||
client.connect(_sync_updates=False)
|
||||
if isinstance(export_auth, ExportedAuthorization):
|
||||
client(ImportAuthorizationRequest(
|
||||
id=export_auth.id, bytes=export_auth.bytes
|
||||
auth = await self(functions.auth.ExportAuthorizationRequest(dc_id))
|
||||
req = self._init_with(functions.auth.ImportAuthorizationRequest(
|
||||
id=auth.id, bytes=auth.bytes
|
||||
))
|
||||
elif export_auth is not None:
|
||||
__log__.warning('Unknown export auth type %s', export_auth)
|
||||
await sender.send(req)
|
||||
self._exported_auths[dc_id] = sender.state.auth_key
|
||||
|
||||
client._authorized = True # We exported the auth, so we got auth
|
||||
return client
|
||||
return sender
|
||||
|
||||
async def _get_cdn_client(self, cdn_redirect):
|
||||
"""Similar to ._get_exported_client, but for CDNs"""
|
||||
|
|
Loading…
Reference in New Issue
Block a user