diff --git a/telethon/client/downloads.py b/telethon/client/downloads.py index b52e2fa5..22a7997d 100644 --- a/telethon/client/downloads.py +++ b/telethon/client/downloads.py @@ -202,6 +202,7 @@ class DownloadMethods(UserMethods): # The used sender will change if ``FileMigrateError`` occurs sender = self._sender + exported = False input_location = utils.get_input_location(input_location) __log__.info('Downloading file in chunks of %d bytes', part_size) @@ -217,7 +218,8 @@ class DownloadMethods(UserMethods): raise NotImplementedError except errors.FileMigrateError as e: __log__.info('File lives in another DC') - sender = self._get_exported_sender(e.new_dc) + sender = self._borrow_exported_sender(e.new_dc) + exported = True continue offset += part_size @@ -233,7 +235,9 @@ class DownloadMethods(UserMethods): if progress_callback: progress_callback(f.tell(), file_size) finally: - if sender != self._sender: + if exported: + self._return_exported_sender(sender) + elif sender != self._sender: sender.disconnect() if isinstance(file, str) or in_memory: f.close() diff --git a/telethon/client/messages.py b/telethon/client/messages.py index 1d552277..f47cd25e 100644 --- a/telethon/client/messages.py +++ b/telethon/client/messages.py @@ -1,7 +1,7 @@ import asyncio import itertools import logging -import warnings +import time from collections import UserList from async_generator import async_generator, yield_ @@ -188,7 +188,7 @@ class MessageMethods(UploadMethods, MessageParseMethods): last_id = float('inf') batch_size = min(max(batch_size, 1), 100) while have < limit: - start = asyncio.get_event_loop().time() + start = time.time() # Telegram has a hard limit of 100 request.limit = min(limit - have, batch_size) r = self(request) @@ -242,9 +242,8 @@ class MessageMethods(UploadMethods, MessageParseMethods): else: request.max_date = last_message.date - now = asyncio.get_event_loop().time() asyncio.sleep( - max(wait_time - (now - start), 0), loop=self._loop) + max(wait_time - (time.time() - start), 0), loop=self._loop) def get_messages(self, *args, **kwargs): """ diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index bb5fa3f7..4c64334e 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -67,9 +67,7 @@ class TelegramBaseClient(abc.ABC): timeout (`int` | `float` | `timedelta`, optional): The timeout to be used when connecting, sending and receiving - responses from the network. This is **not** the timeout to - be used when ``await``'ing for invoked requests, and you - should use ``asyncio.wait`` or ``asyncio.wait_for`` for that. + responses from the network. request_retries (`int`, optional): How many times a request should be retried. Request are retried @@ -211,9 +209,11 @@ class TelegramBaseClient(abc.ABC): auto_reconnect_callback=self._handle_auto_reconnect ) - # Cache :tl:`ExportedAuthorization` as ``dc_id: MTProtoState`` - # to easily import them when getting an exported sender. - self._exported_auths = {} + # Cache ``{dc_id: (n, MTProtoSender)}`` for all borrowed senders, + # being ``n`` the amount of borrows a given sender has; once ``n`` + # reaches ``0`` it should be disconnected and removed. + self._borrowed_senders = {} + self._borrow_sender_lock = threading.Lock() # Save whether the user is authorized here (a.k.a. logged in) self._authorized = None # None = We don't know yet @@ -355,36 +355,65 @@ class TelegramBaseClient(abc.ABC): and bool(dc.ipv6) == self._use_ipv6 and bool(dc.cdn) == cdn ) - def _get_exported_sender(self, dc_id): + def _create_exported_sender(self, dc_id): """ - Returns a cached `MTProtoSender` for the given `dc_id`, or creates - a new one if it doesn't exist yet, and imports a freshly exported - authorization key for it to be usable. + Creates a new exported `MTProtoSender` for the given `dc_id` and + returns it. This method should be used by `_borrow_exported_sender`. """ # Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt # for clearly showing how to export the authorization - auth = self._exported_auths.get(dc_id) dc = self._get_dc(dc_id) - state = MTProtoState(auth) + state = MTProtoState(None) # Can't reuse self._sender._connection as it has its own seqno. # # If one were to do that, Telegram would reset the connection # with no further clues. sender = MTProtoSender(state, self._connection.clone()) sender.connect(dc.ip_address, dc.port) - if not auth: - __log__.info('Exporting authorization for data center %s', dc) - auth = self(functions.auth.ExportAuthorizationRequest(dc_id)) - req = self._init_with(functions.auth.ImportAuthorizationRequest( - id=auth.id, bytes=auth.bytes - )) - sender.send(req) - self._exported_auths[dc_id] = sender.state.auth_key + __log__.info('Exporting authorization for data center %s', dc) + auth = self(functions.auth.ExportAuthorizationRequest(dc_id)) + req = self._init_with(functions.auth.ImportAuthorizationRequest( + id=auth.id, bytes=auth.bytes + )) + sender.send(req) + return sender + + def _borrow_exported_sender(self, dc_id): + """ + Borrows a connected `MTProtoSender` for the given `dc_id`. + If it's not cached, creates a new one if it doesn't exist yet, + and imports a freshly exported authorization key for it to be usable. + + Once its job is over it should be `_return_exported_sender`. + """ + with self._borrow_sender_lock: + n, sender = self._borrowed_senders.get(dc_id, (0, None)) + if not sender: + sender = self._create_exported_sender(dc_id) + sender.dc_id = dc_id + + self._borrowed_senders[dc_id] = (n + 1, sender) return sender + def _return_exported_sender(self, sender): + """ + Returns a borrowed exported sender. If all borrows have + been returned, the sender is cleanly disconnected. + """ + with self._borrow_sender_lock: + dc_id = sender.dc_id + n, _ = self._borrowed_senders[dc_id] + n -= 1 + if n > 0: + self._borrowed_senders[dc_id] = (n, sender) + else: + __log__.info('Disconnecting borrowed sender for DC %d', dc_id) + sender.disconnect() + del self._borrowed_senders[dc_id] + def _get_cdn_client(self, cdn_redirect): - """Similar to ._get_exported_client, but for CDNs""" + """Similar to ._borrow_exported_client, but for CDNs""" # TODO Implement raise NotImplementedError session = self._exported_sessions.get(cdn_redirect.dc_id) diff --git a/telethon/network/mtprotoplainsender.py b/telethon/network/mtprotoplainsender.py index 71ea99eb..3a71aaa5 100644 --- a/telethon/network/mtprotoplainsender.py +++ b/telethon/network/mtprotoplainsender.py @@ -39,10 +39,15 @@ class MTProtoPlainSender: raise BrokenAuthKeyError() with BinaryReader(body) as reader: - assert reader.read_long() == 0 # auth_key_id - assert reader.read_long() > msg_id # msg_id - assert reader.read_int() # length + assert reader.read_long() == 0, 'Bad auth_key_id' # auth_key_id + assert reader.read_long() != 0, 'Bad msg_id' # msg_id + # ^ We should make sure that the read ``msg_id`` is greater + # than our own ``msg_id``. However, under some circumstances + # (bad system clock/working behind proxies) this seems to not + # be the case, which would cause endless assertion errors. + + assert reader.read_int() > 0, 'Bad length' # length # We could read length bytes and use those in a new reader to read # the next TLObject without including the padding, but since the # reader isn't used for anything else after this, it's unnecessary.