From 9bb5cfd871c493fe94a8b0b2db125c63f19029ed Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 28 Jun 2018 09:40:38 +0200 Subject: [PATCH 1/5] Mention telethon-sync --- README.rst | 5 ++++- readthedocs/extra/basic/asyncio-magic.rst | 7 ++++--- readthedocs/extra/changelog.rst | 8 +++----- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/README.rst b/README.rst index 60fb0e74..0d6c28a3 100755 --- a/README.rst +++ b/README.rst @@ -10,7 +10,10 @@ to interact with Telegram's API. **If you're upgrading from Telethon pre-1.0 to 1.0, please make sure to read** `this section of the documentation -`_. +`_, +or ``pip install telethon-sync`` which is compatible with `synchronous code +`_. Don't forget to remove +the asynchronous version (``pip uninstall telethon``) if you do install sync. What is this? ------------- diff --git a/readthedocs/extra/basic/asyncio-magic.rst b/readthedocs/extra/basic/asyncio-magic.rst index c4129b9e..4cba2c28 100644 --- a/readthedocs/extra/basic/asyncio-magic.rst +++ b/readthedocs/extra/basic/asyncio-magic.rst @@ -13,9 +13,10 @@ Magic with asyncio import telethon.sync - At the beginning of your main script and you will be good. If you do use - updates or events, keep reading, or install the latest version using - threads and Python 3.4 support with ``pip install telethon==0.19.1.6``. + At the beginning of your main script and you will be good. If you **do** + use updates or events, keep reading, or ``pip install telethon-sync``, a + branch that mimics the ``asyncio`` code with threads and should work + under Python 3.4. You might also want to check the :ref:`changelog`. diff --git a/readthedocs/extra/changelog.rst b/readthedocs/extra/changelog.rst index 228fc8ab..d691bc88 100644 --- a/readthedocs/extra/changelog.rst +++ b/readthedocs/extra/changelog.rst @@ -46,11 +46,9 @@ Synchronous magic (v1.0) If you come from Telethon pre-1.0 you **really** want to read :ref:`asyncio-magic` to port your scripts to the new version. - If you're not ready for this, you can ``pip install telethon==0.19.1.6``. - It's the latest version of the library using threads for Python 3.4+. - - If you're interested in maintaining a Telethon version that supports - Python 3.4 and uses threads, please open an issue and let me know. + If you're not ready for this, you can ``pip install telethon-sync``. + It's a synchronous branch that mimics the ``asyncio`` version with + threads and should work under Python 3.4 The library has been around for well over a year. A lot of improvements have been made, a lot of user complaints have been fixed, and a lot of user desires From fb40e7b50837d67fe5e8df27995a2c80bdc26296 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 28 Jun 2018 09:48:03 +0200 Subject: [PATCH 2/5] Update mtprotoplainsender.py asserts --- telethon/network/mtprotoplainsender.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/telethon/network/mtprotoplainsender.py b/telethon/network/mtprotoplainsender.py index e04e2934..3f789f76 100644 --- a/telethon/network/mtprotoplainsender.py +++ b/telethon/network/mtprotoplainsender.py @@ -39,10 +39,15 @@ class MTProtoPlainSender: raise BrokenAuthKeyError() with BinaryReader(body) as reader: - assert reader.read_long() == 0 # auth_key_id - assert reader.read_long() > msg_id # msg_id - assert reader.read_int() # length + assert reader.read_long() == 0, 'Bad auth_key_id' # auth_key_id + assert reader.read_long() != 0, 'Bad msg_id' # msg_id + # ^ We should make sure that the read ``msg_id`` is greater + # than our own ``msg_id``. However, under some circumstances + # (bad system clock/working behind proxies) this seems to not + # be the case, which would cause endless assertion errors. + + assert reader.read_int() > 0, 'Bad length' # length # We could read length bytes and use those in a new reader to read # the next TLObject without including the padding, but since the # reader isn't used for anything else after this, it's unnecessary. From 90ea4ba8db865bd03dfe4dca1d88a17ef78308ba Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 28 Jun 2018 14:05:18 +0200 Subject: [PATCH 3/5] Add client.idle -> client.run_until_disconnected to changelog --- readthedocs/extra/changelog.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/readthedocs/extra/changelog.rst b/readthedocs/extra/changelog.rst index d691bc88..ddb6510e 100644 --- a/readthedocs/extra/changelog.rst +++ b/readthedocs/extra/changelog.rst @@ -87,6 +87,8 @@ Breaking Changes - ``message.get_fwd_sender`` is now in `message.forward `. +- ``client.idle`` is now `client.run_until_disconnected() + ` - ``client.add_update_handler`` is now `client.add_event_handler ` - ``client.remove_update_handler`` is now `client.remove_event_handler From 491302bb32734e642fa704dada887b6d97fe0d34 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 28 Jun 2018 14:10:36 +0200 Subject: [PATCH 4/5] Fix parallel downloads when using exported senders --- telethon/client/downloads.py | 8 +++- telethon/client/telegrambaseclient.py | 67 ++++++++++++++++++++------- 2 files changed, 55 insertions(+), 20 deletions(-) diff --git a/telethon/client/downloads.py b/telethon/client/downloads.py index 318692b9..ad2611b3 100644 --- a/telethon/client/downloads.py +++ b/telethon/client/downloads.py @@ -202,6 +202,7 @@ class DownloadMethods(UserMethods): # The used sender will change if ``FileMigrateError`` occurs sender = self._sender + exported = False input_location = utils.get_input_location(input_location) __log__.info('Downloading file in chunks of %d bytes', part_size) @@ -217,7 +218,8 @@ class DownloadMethods(UserMethods): raise NotImplementedError except errors.FileMigrateError as e: __log__.info('File lives in another DC') - sender = await self._get_exported_sender(e.new_dc) + sender = await self._borrow_exported_sender(e.new_dc) + exported = True continue offset += part_size @@ -233,7 +235,9 @@ class DownloadMethods(UserMethods): if progress_callback: progress_callback(f.tell(), file_size) finally: - if sender != self._sender: + if exported: + await self._return_exported_sender(sender) + elif sender != self._sender: await sender.disconnect() if isinstance(file, str) or in_memory: f.close() diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index d0608814..bcc794df 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -213,9 +213,11 @@ class TelegramBaseClient(abc.ABC): auto_reconnect_callback=self._handle_auto_reconnect ) - # Cache :tl:`ExportedAuthorization` as ``dc_id: MTProtoState`` - # to easily import them when getting an exported sender. - self._exported_auths = {} + # Cache ``{dc_id: (n, MTProtoSender)}`` for all borrowed senders, + # being ``n`` the amount of borrows a given sender has; once ``n`` + # reaches ``0`` it should be disconnected and removed. + self._borrowed_senders = {} + self._borrow_sender_lock = asyncio.Lock() # Save whether the user is authorized here (a.k.a. logged in) self._authorized = None # None = We don't know yet @@ -369,36 +371,65 @@ class TelegramBaseClient(abc.ABC): and bool(dc.ipv6) == self._use_ipv6 and bool(dc.cdn) == cdn ) - async def _get_exported_sender(self, dc_id): + async def _create_exported_sender(self, dc_id): """ - Returns a cached `MTProtoSender` for the given `dc_id`, or creates - a new one if it doesn't exist yet, and imports a freshly exported - authorization key for it to be usable. + Creates a new exported `MTProtoSender` for the given `dc_id` and + returns it. This method should be used by `_borrow_exported_sender`. """ # Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt # for clearly showing how to export the authorization - auth = self._exported_auths.get(dc_id) dc = await self._get_dc(dc_id) - state = MTProtoState(auth) + state = MTProtoState(None) # Can't reuse self._sender._connection as it has its own seqno. # # If one were to do that, Telegram would reset the connection # with no further clues. sender = MTProtoSender(state, self._connection.clone(), self._loop) await sender.connect(dc.ip_address, dc.port) - if not auth: - __log__.info('Exporting authorization for data center %s', dc) - auth = await self(functions.auth.ExportAuthorizationRequest(dc_id)) - req = self._init_with(functions.auth.ImportAuthorizationRequest( - id=auth.id, bytes=auth.bytes - )) - await sender.send(req) - self._exported_auths[dc_id] = sender.state.auth_key + __log__.info('Exporting authorization for data center %s', dc) + auth = await self(functions.auth.ExportAuthorizationRequest(dc_id)) + req = self._init_with(functions.auth.ImportAuthorizationRequest( + id=auth.id, bytes=auth.bytes + )) + await sender.send(req) + return sender + + async def _borrow_exported_sender(self, dc_id): + """ + Borrows a connected `MTProtoSender` for the given `dc_id`. + If it's not cached, creates a new one if it doesn't exist yet, + and imports a freshly exported authorization key for it to be usable. + + Once its job is over it should be `_return_exported_sender`. + """ + async with self._borrow_sender_lock: + n, sender = self._borrowed_senders.get(dc_id, (0, None)) + if not sender: + sender = await self._create_exported_sender(dc_id) + sender.dc_id = dc_id + + self._borrowed_senders[dc_id] = (n + 1, sender) return sender + async def _return_exported_sender(self, sender): + """ + Returns a borrowed exported sender. If all borrows have + been returned, the sender is cleanly disconnected. + """ + async with self._borrow_sender_lock: + dc_id = sender.dc_id + n, _ = self._borrowed_senders[dc_id] + n -= 1 + if n > 0: + self._borrowed_senders[dc_id] = (n, sender) + else: + __log__.info('Disconnecting borrowed sender for DC %d', dc_id) + await sender.disconnect() + del self._borrowed_senders[dc_id] + async def _get_cdn_client(self, cdn_redirect): - """Similar to ._get_exported_client, but for CDNs""" + """Similar to ._borrow_exported_client, but for CDNs""" # TODO Implement raise NotImplementedError session = self._exported_sessions.get(cdn_redirect.dc_id) From ac2b10f2a5057da3fe94f8dd4030da096c363c4a Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 28 Jun 2018 15:12:18 +0200 Subject: [PATCH 5/5] Stop using loop's time() function --- telethon/client/messages.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/telethon/client/messages.py b/telethon/client/messages.py index 1c84be36..7d5110b3 100644 --- a/telethon/client/messages.py +++ b/telethon/client/messages.py @@ -1,7 +1,7 @@ import asyncio import itertools import logging -import warnings +import time from collections import UserList from async_generator import async_generator, yield_ @@ -188,7 +188,7 @@ class MessageMethods(UploadMethods, MessageParseMethods): last_id = float('inf') batch_size = min(max(batch_size, 1), 100) while have < limit: - start = asyncio.get_event_loop().time() + start = time.time() # Telegram has a hard limit of 100 request.limit = min(limit - have, batch_size) r = await self(request) @@ -242,9 +242,8 @@ class MessageMethods(UploadMethods, MessageParseMethods): else: request.max_date = last_message.date - now = asyncio.get_event_loop().time() await asyncio.sleep( - max(wait_time - (now - start), 0), loop=self._loop) + max(wait_time - (time.time() - start), 0), loop=self._loop) async def get_messages(self, *args, **kwargs): """