From 55c09cde97f5d4053c963ed46ccf3f1d3f0dc37b Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sun, 6 May 2018 18:16:48 +0200 Subject: [PATCH 01/12] Fix online documentation showing duplicated errors --- telethon_generator/generators/docs.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/telethon_generator/generators/docs.py b/telethon_generator/generators/docs.py index 646fdca8..c4a16474 100755 --- a/telethon_generator/generators/docs.py +++ b/telethon_generator/generators/docs.py @@ -230,6 +230,11 @@ def _write_html_pages(tlobjects, errors, layer, input_res, output_dir): for t, cs in type_to_constructors.items(): type_to_constructors[t] = list(sorted(cs, key=lambda c: c.name)) + # Telegram may send errors with the same str_code but different int_code. + # They are all imported on telethon.errors anyway so makes no difference. + errors = list(sorted({e.str_code: e for e in errors}.values(), + key=lambda e: e.name)) + method_causes_errors = defaultdict(list) for error in errors: for method in error.caused_by: From 6cd96389c017d9f0bdb20e346cd756bebada11f7 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sun, 6 May 2018 18:59:53 +0200 Subject: [PATCH 02/12] Call disconnect on ConnectionResetError hoping a reconnection Maybe self._reconnect() had no effect unless a clean disconnect was done, and so retrying would be mostly useless. Just a guess. --- telethon/telegram_bare_client.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 826cc155..e753dc06 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -541,6 +541,11 @@ class TelegramBareClient: __log__.warning('Connection was reset while invoking') if self._user_connected: # Server disconnected us, __call__ will try reconnecting. + try: + self._sender.disconnect() + except: + pass + return None else: # User never called .connect(), so raise this error. From 2922e8df112ff043d09e30775459223def0c171d Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Mon, 7 May 2018 16:46:58 +0200 Subject: [PATCH 03/12] Fix still broken log for broken packets --- telethon/network/connection.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/telethon/network/connection.py b/telethon/network/connection.py index a8c70876..a877ac0f 100644 --- a/telethon/network/connection.py +++ b/telethon/network/connection.py @@ -190,14 +190,15 @@ class Connection: if packet_len <= 12: __log__.error('Read invalid packet length %d, ' 'reading data left:', packet_len) - while True: - data = b'' - try: + data = b'' + try: + while True: data += self.read(1) - except TimeoutError: - break - finally: - __log__.error(repr(data)) + except TimeoutError: + pass + finally: + __log__.error(repr(data)) + # Connection reset and hope it's fixed after self.conn.close() raise ConnectionResetError() From f06b9b68d510affb151c3137d0d44f2f172f42a4 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Mon, 7 May 2018 17:01:04 +0200 Subject: [PATCH 04/12] Fix race condition causing broken responses --- telethon/network/mtproto_sender.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 9b0be18a..c3209de9 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -57,6 +57,12 @@ class MtProtoSender: # Multithreading self._send_lock = Lock() + # If we're invoking something from an update thread but we're also + # receiving other request from the main thread (e.g. an update arrives + # and we need to process it) we must ensure that only one is calling + # receive at a given moment, since the receive step is fragile. + self._recv_lock = Lock() + def connect(self): """Connects to the server.""" self.connection.connect(self.session.server_address, self.session.port) @@ -132,8 +138,12 @@ class MtProtoSender: the UpdateState that will process all the received Update and Updates objects. """ + if self._recv_lock.locked(): + return + try: - body = self.connection.recv() + with self._recv_lock: + body = self.connection.recv() except (BufferError, InvalidChecksumError): # TODO BufferError, we should spot the cause... # "No more bytes left"; something wrong happened, clear From 6652fe276c0ef8082e4280f6fd3da268620a3170 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Mon, 7 May 2018 17:05:27 +0200 Subject: [PATCH 05/12] Remove broken packet length check --- telethon/network/connection.py | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/telethon/network/connection.py b/telethon/network/connection.py index a877ac0f..45afaefc 100644 --- a/telethon/network/connection.py +++ b/telethon/network/connection.py @@ -183,26 +183,6 @@ class Connection: """ packet_len_seq = self.read(8) # 4 and 4 packet_len, seq = struct.unpack(' Date: Mon, 7 May 2018 18:02:15 +0200 Subject: [PATCH 06/12] First attempt at updates catch_up for private chats/groups --- telethon/telegram_client.py | 40 +++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 4a131f9a..36f78a47 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -16,9 +16,13 @@ from mimetypes import guess_type from .crypto import CdnDecrypter from .tl import TLObject from .tl.custom import InputSizedFile +from .tl.functions.updates import GetDifferenceRequest from .tl.functions.upload import ( SaveBigFilePartRequest, SaveFilePartRequest, GetFileRequest ) +from .tl.types.updates import ( + DifferenceSlice, DifferenceEmpty, Difference, DifferenceTooLong +) from .tl.types.upload import FileCdnRedirect try: @@ -2400,6 +2404,42 @@ class TelegramClient(TelegramBareClient): def list_update_handlers(self): return [callback for callback, _ in self.list_event_handlers()] + def catch_up(self): + state = self.session.get_update_state(0) + self.session.catching_up = True + try: + while True: + d = self(GetDifferenceRequest(state.pts, state.date, state.qts)) + if isinstance(d, DifferenceEmpty): + state.date = d.date + state.seq = d.seq + break + elif isinstance(d, (DifferenceSlice, Difference)): + if isinstance(d, Difference): + state = d.state + elif d.intermediate_state.pts > state.pts: + state = d.intermediate_state + else: + # TODO Figure out why other applications can rely on + # using always the intermediate_state to eventually + # reach a DifferenceEmpty, but that leads to an + # infinite loop here (so check against old pts to stop) + break + + self.updates.process(Updates( + users=d.users, + chats=d.chats, + date=state.date, + seq=state.seq, + updates=d.other_updates + [UpdateNewMessage(m, 0, 0) + for m in d.new_messages] + )) + elif isinstance(d, DifferenceTooLong): + break + finally: + self.session.set_update_state(0, state) + self.session.catching_up = False + # endregion # region Small utilities to make users' life easier From 392508c78d766c2eee1f819a4f445bd9e8670d69 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Mon, 7 May 2018 19:53:32 +0200 Subject: [PATCH 07/12] Add voice/video note parameters to send_file --- telethon/telegram_client.py | 34 +++++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 36f78a47..fb2a75d8 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -1190,6 +1190,7 @@ class TelegramClient(TelegramBareClient): return msgs def get_message_history(self, *args, **kwargs): + """Deprecated, see :meth:`get_messages`.""" warnings.warn( 'get_message_history is deprecated, use get_messages instead' ) @@ -1444,6 +1445,8 @@ class TelegramClient(TelegramBareClient): thumb=None, allow_cache=True, parse_mode='md', + voice_note=False, + video_note=False, **kwargs): """ Sends a file to the specified entity. @@ -1500,9 +1503,18 @@ class TelegramClient(TelegramBareClient): parse_mode (`str`, optional): The parse mode for the caption message. - Kwargs: - If "is_voice_note" in kwargs, despite its value, and the file is - sent as a document, it will be sent as a voice note. + voice_note (`bool`, optional): + If ``True`` the audio will be sent as a voice note. + + Set `allow_cache` to ``False`` if you sent the same file + without this setting before for it to work. + + video_note (`bool`, optional): + If ``True`` the video will be sent as a video note, + also known as a round video message. + + Set `allow_cache` to ``False`` if you sent the same file + without this setting before for it to work. Notes: If the ``hachoir3`` package (``hachoir`` module) is installed, @@ -1541,7 +1553,8 @@ class TelegramClient(TelegramBareClient): entity, x, allow_cache=allow_cache, caption=caption, force_document=force_document, progress_callback=progress_callback, reply_to=reply_to, - attributes=attributes, thumb=thumb, **kwargs + attributes=attributes, thumb=thumb, voice_note=voice_note, + video_note=video_note, **kwargs ) for x in documents ) return result @@ -1602,6 +1615,7 @@ class TelegramClient(TelegramBareClient): hachoir.parser.createParser(file) ) attr_dict[DocumentAttributeAudio] = DocumentAttributeAudio( + voice=voice_note, title=m.get('title') if m.has('title') else None, performer=m.get('author') if m.has('author') else None, duration=int(m.get('duration').seconds @@ -1614,13 +1628,16 @@ class TelegramClient(TelegramBareClient): hachoir.parser.createParser(file) ) doc = DocumentAttributeVideo( + round_message=video_note, w=m.get('width') if m.has('width') else 0, h=m.get('height') if m.has('height') else 0, duration=int(m.get('duration').seconds if m.has('duration') else 0) ) else: - doc = DocumentAttributeVideo(0, 0, 0) + doc = DocumentAttributeVideo(0, 0, 0, + round_message=video_note) + attr_dict[DocumentAttributeVideo] = doc else: attr_dict = { @@ -1629,7 +1646,7 @@ class TelegramClient(TelegramBareClient): getattr(file, 'name', None) or 'unnamed')) } - if 'is_voice_note' in kwargs: + if voice_note: if DocumentAttributeAudio in attr_dict: attr_dict[DocumentAttributeAudio].voice = True else: @@ -1678,7 +1695,9 @@ class TelegramClient(TelegramBareClient): return msg def send_voice_note(self, *args, **kwargs): - """Wrapper method around :meth:`send_file` with is_voice_note=True.""" + """Deprecated, see :meth:`send_file`.""" + warnings.warn('send_voice_note is deprecated, use ' + 'send_file(..., voice_note=True) instead') kwargs['is_voice_note'] = True return self.send_file(*args, **kwargs) @@ -2390,6 +2409,7 @@ class TelegramClient(TelegramBareClient): return [(callback, event) for event, callback in self._event_builders] def add_update_handler(self, handler): + """Deprecated, see :meth:`add_event_handler`.""" warnings.warn( 'add_update_handler is deprecated, use the @client.on syntax ' 'or add_event_handler(callback, events.Raw) instead (see ' From bda7eb0ef13eb445d84c89b6f75b089b8260bf60 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Mon, 7 May 2018 21:26:53 +0200 Subject: [PATCH 08/12] Update to v0.19 --- readthedocs/extra/changelog.rst | 57 +++++++++++++++++++++++++++++++++ telethon/version.py | 2 +- 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/readthedocs/extra/changelog.rst b/readthedocs/extra/changelog.rst index a5e8b67d..9bf37f9c 100644 --- a/readthedocs/extra/changelog.rst +++ b/readthedocs/extra/changelog.rst @@ -14,6 +14,63 @@ it can take advantage of new goodies! .. contents:: List of All Versions +Catching up on Updates +====================== + +*Published at 2018/05/07* + ++-----------------------+ +| Scheme layer used: 76 | ++-----------------------+ + +This update prepares the library for catching up with updates with the new +`telethon.telegram_client.TelegramClient.catch_up` method. This feature needs +more testing, but for now it will let you "catch up" on some old updates that +occurred while the library was offline, and brings some new features and bug +fixes. + + +Additions +~~~~~~~~~ + +- Add ``search``, ``filter`` and ``from_user`` parameters to + `telethon.telegram_client.TelegramClient.iter_messages`. +- `telethon.telegram_client.TelegramClient.download_file` now + supports a ``None`` path to return the file in memory and + return its ``bytes``. +- Events now have a ``.original_update`` field. + +Bug fixes +~~~~~~~~~ + +- Fixed a race condition when receiving items from the network. +- A disconnection is made when "retries reached 0". This hasn't been + tested but it might fix the bug. +- ``reply_to`` would not override :tl:`Message` object's reply value. +- Add missing caption when sending :tl:`Message` with media. + +Enhancements +~~~~~~~~~~~~ + +- Retry automatically on ``RpcCallFailError``. This error happened a lot + when iterating over many messages, and retrying often fixes it. +- Faster `telethon.telegram_client.TelegramClient.iter_messages` by + sleeping only as much as needed. +- `telethon.telegram_client.TelegramClient.edit_message` now supports + omitting the entity if you pass a :tl:`Message`. +- `telethon.events.raw.Raw` can now be filtered by type. + +Internal changes +~~~~~~~~~~~~~~~~ + +- The library now distinguishes between MTProto and API schemas. +- :tl:`State` is now persisted to the session file. +- Connection won't retry forever. +- Fixed some errors and cleaned up the generation of code. +- Fixed typos and enhanced some documentation in general. +- Add auto-cast for :tl:`InputMessage` and :tl:`InputLocation`. + + Pickle-able objects (v0.18.3) ============================= diff --git a/telethon/version.py b/telethon/version.py index 441e2f14..20bd4c54 100644 --- a/telethon/version.py +++ b/telethon/version.py @@ -1,3 +1,3 @@ # Versions should comply with PEP440. # This line is parsed in setup.py: -__version__ = '0.18.3' +__version__ = '0.19' From e200acbca8a10a8a5851b502788ceced6dd2b2c1 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Wed, 9 May 2018 09:46:07 +0200 Subject: [PATCH 09/12] Allow sending ordered MessageContainer --- telethon/network/mtproto_sender.py | 19 +++++++++++++++---- telethon/telegram_bare_client.py | 12 ++++++++---- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index c3209de9..b146a4e0 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -2,7 +2,6 @@ This module contains the class used to communicate with Telegram's servers encrypting every packet, and relies on a valid AuthKey in the used Session. """ -import gzip import logging from threading import Lock @@ -14,6 +13,7 @@ from ..errors import ( from ..extensions import BinaryReader from ..tl import TLMessage, MessageContainer, GzipPacked from ..tl.all_tlobjects import tlobjects +from ..tl.functions import InvokeAfterMsgRequest from ..tl.functions.auth import LogOutRequest from ..tl.types import ( MsgsAck, Pong, BadServerSalt, BadMsgNotification, FutureSalts, @@ -84,15 +84,26 @@ class MtProtoSender: # region Send and receive - def send(self, *requests): + def send(self, *requests, ordered=False): """ Sends the specified TLObject(s) (which must be requests), and acknowledging any message which needed confirmation. :param requests: the requests to be sent. + :param ordered: whether the requests should be invoked in the + order in which they appear or they can be executed + in arbitrary order in the server. """ - # Finally send our packed request(s) - messages = [TLMessage(self.session, r) for r in requests] + if ordered: + requests = iter(requests) + messages = [TLMessage(self.session, next(requests))] + for r in requests: + messages.append(TLMessage( + self.session, InvokeAfterMsgRequest(messages[-1].msg_id, r) + )) + else: + messages = [TLMessage(self.session, r) for r in requests] + self._pending_receive.update({m.msg_id: m for m in messages}) __log__.debug('Sending requests with IDs: %s', ', '.join( diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index e753dc06..c343dbe4 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -429,11 +429,15 @@ class TelegramBareClient: # region Invoking Telegram requests - def __call__(self, *requests, retries=5): + def __call__(self, *requests, retries=5, ordered=False): """Invokes (sends) a MTProtoRequest and returns (receives) its result. The invoke will be retried up to 'retries' times before raising RuntimeError(). + + If more than one request is given and ordered is True, then the + requests will be invoked sequentially in the server (useful for + bursts of requests that need to be ordered). """ if not all(isinstance(x, TLObject) and x.content_related for x in requests): @@ -458,7 +462,7 @@ class TelegramBareClient: not self._idling.is_set() or self._reconnect_lock.locked() for retry in range(retries): - result = self._invoke(call_receive, *requests) + result = self._invoke(call_receive, *requests, ordered=ordered) if result is not None: return result @@ -481,7 +485,7 @@ class TelegramBareClient: # Let people use client.invoke(SomeRequest()) instead client(...) invoke = __call__ - def _invoke(self, call_receive, *requests): + def _invoke(self, call_receive, *requests, ordered=False): try: # Ensure that we start with no previous errors (i.e. resending) for x in requests: @@ -506,7 +510,7 @@ class TelegramBareClient: self._wrap_init_connection(GetConfigRequest()) ) - self._sender.send(*requests) + self._sender.send(*requests, ordered=ordered) if not call_receive: # TODO This will be slightly troublesome if we allow From e2e7e631b524e7e438a9e5331acfaa112fb1c727 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Wed, 9 May 2018 10:19:45 +0200 Subject: [PATCH 10/12] Stop using *args when invoking many requests at once --- telethon/network/mtproto_sender.py | 11 +++-- telethon/telegram_bare_client.py | 66 +++++++++++++++++++----------- telethon/telegram_client.py | 5 +-- 3 files changed, 49 insertions(+), 33 deletions(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index b146a4e0..9616e5cc 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -5,7 +5,7 @@ encrypting every packet, and relies on a valid AuthKey in the used Session. import logging from threading import Lock -from .. import helpers as utils +from .. import helpers, utils from ..errors import ( BadMessageError, InvalidChecksumError, BrokenAuthKeyError, rpc_message_to_error @@ -84,7 +84,7 @@ class MtProtoSender: # region Send and receive - def send(self, *requests, ordered=False): + def send(self, requests, ordered=False): """ Sends the specified TLObject(s) (which must be requests), and acknowledging any message which needed confirmation. @@ -94,6 +94,9 @@ class MtProtoSender: order in which they appear or they can be executed in arbitrary order in the server. """ + if not utils.is_list_like(requests): + requests = (requests,) + if ordered: requests = iter(requests) messages = [TLMessage(self.session, next(requests))] @@ -184,7 +187,7 @@ class MtProtoSender: :param message: the TLMessage to be sent. """ with self._send_lock: - self.connection.send(utils.pack_message(self.session, message)) + self.connection.send(helpers.pack_message(self.session, message)) def _decode_msg(self, body): """ @@ -200,7 +203,7 @@ class MtProtoSender: raise BufferError("Can't decode packet ({})".format(body)) with BinaryReader(body) as reader: - return utils.unpack_message(self.session, reader) + return helpers.unpack_message(self.session, reader) def _process_msg(self, msg_id, sequence, reader, state): """ diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index c343dbe4..0c27d050 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -429,32 +429,51 @@ class TelegramBareClient: # region Invoking Telegram requests - def __call__(self, *requests, retries=5, ordered=False): - """Invokes (sends) a MTProtoRequest and returns (receives) its result. - - The invoke will be retried up to 'retries' times before raising - RuntimeError(). - - If more than one request is given and ordered is True, then the - requests will be invoked sequentially in the server (useful for - bursts of requests that need to be ordered). + def __call__(self, request, retries=5, ordered=False): """ + Invokes (sends) one or more MTProtoRequests and returns (receives) + their result. + + Args: + request (`TLObject` | `list`): + The request or requests to be invoked. + + retries (`bool`, optional): + How many times the request should be retried automatically + in case it fails with a non-RPC error. + + The invoke will be retried up to 'retries' times before raising + ``RuntimeError``. + + ordered (`bool`, optional): + Whether the requests (if more than one was given) should be + executed sequentially on the server. They run in arbitrary + order by default. + + Returns: + The result of the request (often a `TLObject`) or a list of + results if more than one request was given. + """ + single = not utils.is_list_like(request) + if single: + request = (request,) + if not all(isinstance(x, TLObject) and - x.content_related for x in requests): + x.content_related for x in request): raise TypeError('You can only invoke requests, not types!') if self._background_error: raise self._background_error - for request in requests: - request.resolve(self, utils) + for r in request: + r.resolve(self, utils) # For logging purposes - if len(requests) == 1: - which = type(requests[0]).__name__ + if single: + which = type(request[0]).__name__ else: which = '{} requests ({})'.format( - len(requests), [type(x).__name__ for x in requests]) + len(request), [type(x).__name__ for x in request]) # Determine the sender to be used (main or a new connection) __log__.debug('Invoking %s', which) @@ -462,13 +481,13 @@ class TelegramBareClient: not self._idling.is_set() or self._reconnect_lock.locked() for retry in range(retries): - result = self._invoke(call_receive, *requests, ordered=ordered) + result = self._invoke(call_receive, request, ordered=ordered) if result is not None: - return result + return result[0] if single else result log = __log__.info if retry == 0 else __log__.warning log('Invoking %s failed %d times, connecting again and retrying', - [str(x) for x in requests], retry + 1) + which, retry + 1) sleep(1) # The ReadThread has priority when attempting reconnection, @@ -479,13 +498,13 @@ class TelegramBareClient: self._reconnect() raise RuntimeError('Number of retries reached 0 for {}.'.format( - [type(x).__name__ for x in requests] + which )) # Let people use client.invoke(SomeRequest()) instead client(...) invoke = __call__ - def _invoke(self, call_receive, *requests, ordered=False): + def _invoke(self, call_receive, requests, ordered=False): try: # Ensure that we start with no previous errors (i.e. resending) for x in requests: @@ -510,7 +529,7 @@ class TelegramBareClient: self._wrap_init_connection(GetConfigRequest()) ) - self._sender.send(*requests, ordered=ordered) + self._sender.send(requests, ordered=ordered) if not call_receive: # TODO This will be slightly troublesome if we allow @@ -566,10 +585,7 @@ class TelegramBareClient: # rejected by the other party as a whole." return None - if len(requests) == 1: - return requests[0].result - else: - return [x.result for x in requests] + return [x.result for x in requests] except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e: diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index fb2a75d8..841b6fb7 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -1367,10 +1367,7 @@ class TelegramClient(TelegramBareClient): if requests[0].offset > limit: break - if len(requests) == 1: - results = (self(requests[0]),) - else: - results = self(*requests) + results = self(requests) for i in reversed(range(len(requests))): participants = results[i] if not participants.users: From ef509d13c743d7b00289d731521cf913b1f18f95 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Wed, 9 May 2018 16:18:42 +0200 Subject: [PATCH 11/12] Move InvokeAfterMsg to TLMessage to cleanly confirm results --- telethon/network/mtproto_sender.py | 5 ++--- telethon/tl/tl_message.py | 16 ++++++++++++++-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 9616e5cc..04a2020e 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -101,9 +101,8 @@ class MtProtoSender: requests = iter(requests) messages = [TLMessage(self.session, next(requests))] for r in requests: - messages.append(TLMessage( - self.session, InvokeAfterMsgRequest(messages[-1].msg_id, r) - )) + messages.append(TLMessage(self.session, r, + after_id=messages[-1].msg_id)) else: messages = [TLMessage(self.session, r) for r in requests] diff --git a/telethon/tl/tl_message.py b/telethon/tl/tl_message.py index bcb48279..f6246de2 100644 --- a/telethon/tl/tl_message.py +++ b/telethon/tl/tl_message.py @@ -1,11 +1,12 @@ import struct from . import TLObject, GzipPacked +from ..tl.functions import InvokeAfterMsgRequest class TLMessage(TLObject): """https://core.telegram.org/mtproto/service_messages#simple-container""" - def __init__(self, session, request): + def __init__(self, session, request, after_id=None): super().__init__() del self.content_related self.msg_id = session.get_new_msg_id() @@ -13,16 +14,27 @@ class TLMessage(TLObject): self.request = request self.container_msg_id = None + # After which message ID this one should run. We do this so + # InvokeAfterMsgRequest is transparent to the user and we can + # easily invoke after while confirming the original request. + self.after_id = after_id + def to_dict(self, recursive=True): return { 'msg_id': self.msg_id, 'seq_no': self.seq_no, 'request': self.request, 'container_msg_id': self.container_msg_id, + 'after_id': self.after_id } def __bytes__(self): - body = GzipPacked.gzip_if_smaller(self.request) + if self.after_id is None: + body = GzipPacked.gzip_if_smaller(self.request) + else: + body = GzipPacked.gzip_if_smaller( + InvokeAfterMsgRequest(self.after_id, self.request)) + return struct.pack(' Date: Thu, 10 May 2018 09:44:25 +0200 Subject: [PATCH 12/12] Avoid receive busy wait when two threads receive items --- telethon/network/mtproto_sender.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 04a2020e..3c0a4ce8 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -152,7 +152,12 @@ class MtProtoSender: Update and Updates objects. """ if self._recv_lock.locked(): - return + with self._recv_lock: + # Don't busy wait, acquire it but return because there's + # already a receive running and we don't want another one. + # It would lock until Telegram sent another update even if + # the current receive already received the expected response. + return try: with self._recv_lock: