Fix parallel downloads when using exported senders

This commit is contained in:
Lonami Exo 2018-06-28 14:10:36 +02:00
parent 90ea4ba8db
commit 491302bb32
2 changed files with 55 additions and 20 deletions

View File

@ -202,6 +202,7 @@ class DownloadMethods(UserMethods):
# The used sender will change if ``FileMigrateError`` occurs # The used sender will change if ``FileMigrateError`` occurs
sender = self._sender sender = self._sender
exported = False
input_location = utils.get_input_location(input_location) input_location = utils.get_input_location(input_location)
__log__.info('Downloading file in chunks of %d bytes', part_size) __log__.info('Downloading file in chunks of %d bytes', part_size)
@ -217,7 +218,8 @@ class DownloadMethods(UserMethods):
raise NotImplementedError raise NotImplementedError
except errors.FileMigrateError as e: except errors.FileMigrateError as e:
__log__.info('File lives in another DC') __log__.info('File lives in another DC')
sender = await self._get_exported_sender(e.new_dc) sender = await self._borrow_exported_sender(e.new_dc)
exported = True
continue continue
offset += part_size offset += part_size
@ -233,7 +235,9 @@ class DownloadMethods(UserMethods):
if progress_callback: if progress_callback:
progress_callback(f.tell(), file_size) progress_callback(f.tell(), file_size)
finally: finally:
if sender != self._sender: if exported:
await self._return_exported_sender(sender)
elif sender != self._sender:
await sender.disconnect() await sender.disconnect()
if isinstance(file, str) or in_memory: if isinstance(file, str) or in_memory:
f.close() f.close()

View File

@ -213,9 +213,11 @@ class TelegramBaseClient(abc.ABC):
auto_reconnect_callback=self._handle_auto_reconnect auto_reconnect_callback=self._handle_auto_reconnect
) )
# Cache :tl:`ExportedAuthorization` as ``dc_id: MTProtoState`` # Cache ``{dc_id: (n, MTProtoSender)}`` for all borrowed senders,
# to easily import them when getting an exported sender. # being ``n`` the amount of borrows a given sender has; once ``n``
self._exported_auths = {} # reaches ``0`` it should be disconnected and removed.
self._borrowed_senders = {}
self._borrow_sender_lock = asyncio.Lock()
# Save whether the user is authorized here (a.k.a. logged in) # Save whether the user is authorized here (a.k.a. logged in)
self._authorized = None # None = We don't know yet self._authorized = None # None = We don't know yet
@ -369,36 +371,65 @@ class TelegramBaseClient(abc.ABC):
and bool(dc.ipv6) == self._use_ipv6 and bool(dc.cdn) == cdn and bool(dc.ipv6) == self._use_ipv6 and bool(dc.cdn) == cdn
) )
async def _get_exported_sender(self, dc_id): async def _create_exported_sender(self, dc_id):
""" """
Returns a cached `MTProtoSender` for the given `dc_id`, or creates Creates a new exported `MTProtoSender` for the given `dc_id` and
a new one if it doesn't exist yet, and imports a freshly exported returns it. This method should be used by `_borrow_exported_sender`.
authorization key for it to be usable.
""" """
# Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt # Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt
# for clearly showing how to export the authorization # for clearly showing how to export the authorization
auth = self._exported_auths.get(dc_id)
dc = await self._get_dc(dc_id) dc = await self._get_dc(dc_id)
state = MTProtoState(auth) state = MTProtoState(None)
# Can't reuse self._sender._connection as it has its own seqno. # Can't reuse self._sender._connection as it has its own seqno.
# #
# If one were to do that, Telegram would reset the connection # If one were to do that, Telegram would reset the connection
# with no further clues. # with no further clues.
sender = MTProtoSender(state, self._connection.clone(), self._loop) sender = MTProtoSender(state, self._connection.clone(), self._loop)
await sender.connect(dc.ip_address, dc.port) await sender.connect(dc.ip_address, dc.port)
if not auth:
__log__.info('Exporting authorization for data center %s', dc) __log__.info('Exporting authorization for data center %s', dc)
auth = await self(functions.auth.ExportAuthorizationRequest(dc_id)) auth = await self(functions.auth.ExportAuthorizationRequest(dc_id))
req = self._init_with(functions.auth.ImportAuthorizationRequest( req = self._init_with(functions.auth.ImportAuthorizationRequest(
id=auth.id, bytes=auth.bytes id=auth.id, bytes=auth.bytes
)) ))
await sender.send(req) await sender.send(req)
self._exported_auths[dc_id] = sender.state.auth_key return sender
async def _borrow_exported_sender(self, dc_id):
"""
Borrows a connected `MTProtoSender` for the given `dc_id`.
If it's not cached, creates a new one if it doesn't exist yet,
and imports a freshly exported authorization key for it to be usable.
Once its job is over it should be `_return_exported_sender`.
"""
async with self._borrow_sender_lock:
n, sender = self._borrowed_senders.get(dc_id, (0, None))
if not sender:
sender = await self._create_exported_sender(dc_id)
sender.dc_id = dc_id
self._borrowed_senders[dc_id] = (n + 1, sender)
return sender return sender
async def _return_exported_sender(self, sender):
"""
Returns a borrowed exported sender. If all borrows have
been returned, the sender is cleanly disconnected.
"""
async with self._borrow_sender_lock:
dc_id = sender.dc_id
n, _ = self._borrowed_senders[dc_id]
n -= 1
if n > 0:
self._borrowed_senders[dc_id] = (n, sender)
else:
__log__.info('Disconnecting borrowed sender for DC %d', dc_id)
await sender.disconnect()
del self._borrowed_senders[dc_id]
async def _get_cdn_client(self, cdn_redirect): async def _get_cdn_client(self, cdn_redirect):
"""Similar to ._get_exported_client, but for CDNs""" """Similar to ._borrow_exported_client, but for CDNs"""
# TODO Implement # TODO Implement
raise NotImplementedError raise NotImplementedError
session = self._exported_sessions.get(cdn_redirect.dc_id) session = self._exported_sessions.get(cdn_redirect.dc_id)