From ac6dbb8a5ce8a30fb20d2893d90898171b990d71 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 28 Jun 2018 16:04:12 +0200 Subject: [PATCH 1/5] Fix is_connected accessing unexisting property --- telethon/client/telegrambaseclient.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index bcc794df..bd05bd12 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -287,7 +287,8 @@ class TelegramBaseClient(abc.ABC): """ Returns ``True`` if the user has connected. """ - return self._sender.is_connected() + sender = getattr(self, '_sender', None) + return sender and sender.is_connected() async def disconnect(self): """ From dbca38c6f5fc334ea22cb339b2555e3ddfc30895 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 28 Jun 2018 21:15:29 +0200 Subject: [PATCH 2/5] Allow iterating over messages in reverse --- telethon/client/messages.py | 84 +++++++++++++++++++++++++++++-------- 1 file changed, 66 insertions(+), 18 deletions(-) diff --git a/telethon/client/messages.py b/telethon/client/messages.py index 7d5110b3..79d0db92 100644 --- a/telethon/client/messages.py +++ b/telethon/client/messages.py @@ -25,7 +25,7 @@ class MessageMethods(UploadMethods, MessageParseMethods): self, entity, limit=None, *, offset_date=None, offset_id=0, max_id=0, min_id=0, add_offset=0, search=None, filter=None, from_user=None, batch_size=100, wait_time=None, ids=None, - _total=None): + reverse=False, _total=None): """ Iterator over the message history for the specified entity. @@ -94,6 +94,15 @@ class MessageMethods(UploadMethods, MessageParseMethods): will appear in its place, so that zipping the list of IDs with the messages can match one-to-one. + reverse (`bool`, optional): + If set to ``True``, the messages will be returned in reverse + order (from oldest to newest, instead of the default newest + to oldest). This also means that the meaning of `offset_id` + and `offset_date` parameters is reversed, although they will + still be exclusive. `min_id` becomes equivalent to `offset_id` + instead of being `max_id` as well since messages are returned + in ascending order. + _total (`list`, optional): A single-item list to pass the total parameter by reference. @@ -115,6 +124,8 @@ class MessageMethods(UploadMethods, MessageParseMethods): if ids: if not utils.is_list_like(ids): ids = (ids,) + if reverse: + ids = list(reversed(ids)) async for x in self._iter_ids(entity, ids, total=_total): await yield_(x) return @@ -124,10 +135,26 @@ class MessageMethods(UploadMethods, MessageParseMethods): # # We can emulate their behaviour locally by setting offset = max_id # and simply stopping once we hit a message with ID <= min_id. - offset_id = max(offset_id, max_id) - if offset_id and min_id: - if offset_id - min_id <= 1: - return + if reverse: + offset_id = max(offset_id, min_id) + if offset_id and max_id: + if max_id - offset_id <= 1: + print('suck lol') + return + + if not max_id: + max_id = float('inf') + else: + offset_id = max(offset_id, max_id) + if offset_id and min_id: + if offset_id - min_id <= 1: + return + + if reverse: + if offset_id: + offset_id += 1 + else: + offset_id = 1 from_id = None limit = float('inf') if limit is None else int(limit) @@ -142,7 +169,7 @@ class MessageMethods(UploadMethods, MessageParseMethods): max_date=offset_date, offset_id=offset_id, add_offset=add_offset, - limit=1, + limit=0, # Search actually returns 0 items if we ask it to max_id=0, min_id=0, hash=0, @@ -185,12 +212,24 @@ class MessageMethods(UploadMethods, MessageParseMethods): wait_time = 1 if limit > 3000 else 0 have = 0 - last_id = float('inf') - batch_size = min(max(batch_size, 1), 100) + last_id = 0 if reverse else float('inf') + + # Telegram has a hard limit of 100. + # We don't need to fetch 100 if the limit is less. + batch_size = min(max(batch_size, 1), min(100, limit)) + + # Use a negative offset to work around reversing the results + if reverse: + request.add_offset -= batch_size + while have < limit: start = time.time() - # Telegram has a hard limit of 100 + request.limit = min(limit - have, batch_size) + if reverse and request.limit != batch_size: + # Last batch needs special care if we're on reverse + request.add_offset += batch_size - request.limit + 1 + r = await self(request) if _total: _total[0] = getattr(r, 'count', len(r.messages)) @@ -198,19 +237,23 @@ class MessageMethods(UploadMethods, MessageParseMethods): entities = {utils.get_peer_id(x): x for x in itertools.chain(r.users, r.chats)} - for message in r.messages: - if message.id <= min_id: - return - + messages = reversed(r.messages) if reverse else r.messages + for message in messages: if (isinstance(message, types.MessageEmpty) - or message.id >= last_id - or (from_id and message.from_id != from_id)): + or from_id and message.from_id != from_id): continue + if reverse: + if message.id <= last_id or message.id >= max_id: + return + else: + if message.id >= last_id or message.id <= min_id: + return + # There has been reports that on bad connections this method # was returning duplicated IDs sometimes. Using ``last_id`` # is an attempt to avoid these duplicates, since the message - # IDs are returned in descending order. + # IDs are returned in descending order (or asc if reverse). last_id = message.id await yield_(custom.Message(self, message, entities, entity)) @@ -219,11 +262,11 @@ class MessageMethods(UploadMethods, MessageParseMethods): if len(r.messages) < request.limit: break - request.offset_id = r.messages[-1].id # Find the first message that's not empty (in some rare cases # it can happen that the last message is :tl:`MessageEmpty`) last_message = None - for m in reversed(r.messages): + messages = r.messages if reverse else reversed(r.messages) + for m in messages: if not isinstance(m, types.MessageEmpty): last_message = m break @@ -237,11 +280,16 @@ class MessageMethods(UploadMethods, MessageParseMethods): # should just give up since there won't be any new Message. break else: + request.offset_id = last_message.id if isinstance(request, functions.messages.GetHistoryRequest): request.offset_date = last_message.date else: request.max_date = last_message.date + if reverse: + # We want to skip the one we already have + request.add_offset -= 1 + await asyncio.sleep( max(wait_time - (time.time() - start), 0), loop=self._loop) From 0f737a86af81875344bd5b79621ef8558efbfc4e Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 29 Jun 2018 09:57:57 +0200 Subject: [PATCH 3/5] Revert sign_in needing named code argument --- telethon/client/auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/telethon/client/auth.py b/telethon/client/auth.py index 6f86a733..25c0444c 100644 --- a/telethon/client/auth.py +++ b/telethon/client/auth.py @@ -194,7 +194,7 @@ class AuthMethods(MessageParseMethods, UserMethods): return self async def sign_in( - self, phone=None, *, code=None, password=None, + self, phone=None, code=None, *, password=None, bot_token=None, phone_code_hash=None): """ Starts or completes the sign in process with the given phone number From 3c2ff45b0bfe90b25972622ecf69cf9c8d5b4bf0 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 29 Jun 2018 10:45:04 +0200 Subject: [PATCH 4/5] Support dispatching updates in a sequential order --- telethon/client/telegrambaseclient.py | 19 +++++++++++++++++++ telethon/client/updates.py | 14 +++++++++++++- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index bd05bd12..53d8a4e3 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -94,6 +94,17 @@ class TelegramBaseClient(abc.ABC): Whether reconnection should be retried `connection_retries` times automatically if Telegram disconnects us or not. + sequential_updates (`bool`, optional): + By default every incoming update will create a new task, so + you can handle several updates in parallel. Some scripts need + the order in which updates are processed to be sequential, and + this setting allows them to do so. + + If set to ``True``, incoming updates will be put in a queue + and processed sequentially. This means your event handlers + should *not* perform long-running operations since new + updates are put inside of an unbounded queue. + flood_sleep_threshold (`int` | `float`, optional): The threshold below which the library should automatically sleep on flood wait errors (inclusive). For instance, if a @@ -141,6 +152,7 @@ class TelegramBaseClient(abc.ABC): request_retries=5, connection_retries=5, auto_reconnect=True, + sequential_updates=False, flood_sleep_threshold=60, device_model=None, system_version=None, @@ -230,6 +242,13 @@ class TelegramBaseClient(abc.ABC): self._last_request = time.time() self._channel_pts = {} + if sequential_updates: + self._updates_queue = asyncio.Queue() + self._dispatching_updates_queue = asyncio.Event() + else: + self._updates_queue = None + self._dispatching_updates_queue = None + # Start with invalid state (-1) so we can have somewhere to store # the state, but also be able to determine if we are authorized. self._state = types.updates.State(-1, 0, datetime.now(), 0, -1) diff --git a/telethon/client/updates.py b/telethon/client/updates.py index fe47e87d..220a08ae 100644 --- a/telethon/client/updates.py +++ b/telethon/client/updates.py @@ -179,7 +179,13 @@ class UpdateMethods(UserMethods): self._handle_update(update.update) else: update._entities = getattr(update, '_entities', {}) - self._loop.create_task(self._dispatch_update(update)) + if self._updates_queue is None: + self._loop.create_task(self._dispatch_update(update)) + else: + self._updates_queue.put_nowait(update) + if not self._dispatching_updates_queue.is_set(): + self._dispatching_updates_queue.set() + self._loop.create_task(self._dispatch_queue_updates()) need_diff = False if hasattr(update, 'pts'): @@ -230,6 +236,12 @@ class UpdateMethods(UserMethods): await self(functions.updates.GetStateRequest()) + async def _dispatch_queue_updates(self): + while not self._updates_queue.empty(): + await self._dispatch_update(self._updates_queue.get_nowait()) + + self._dispatching_updates_queue.clear() + async def _dispatch_update(self, update): if self._events_pending_resolve: if self._event_resolve_lock.locked(): From d64eb7ea2b01bf5f1dea3db25ffb301d23dfa1b1 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 29 Jun 2018 11:04:42 +0200 Subject: [PATCH 5/5] Avoid cyclic imports on older Python versions --- telethon/extensions/markdown.py | 17 +++++++---------- telethon/helpers.py | 15 +++++++++++++++ telethon/utils.py | 16 +--------------- 3 files changed, 23 insertions(+), 25 deletions(-) diff --git a/telethon/extensions/markdown.py b/telethon/extensions/markdown.py index cb54c99e..5274dc85 100644 --- a/telethon/extensions/markdown.py +++ b/telethon/extensions/markdown.py @@ -5,15 +5,12 @@ since they seem to count as two characters and it's a bit strange. """ import re +from ..helpers import add_surrogate, del_surrogate from ..tl import TLObject from ..tl.types import ( MessageEntityBold, MessageEntityItalic, MessageEntityCode, MessageEntityPre, MessageEntityTextUrl ) -from ..utils import ( - add_surrogate as _add_surrogate, - del_surrogate as _del_surrogate -) DEFAULT_DELIMITERS = { '**': MessageEntityBold, @@ -57,7 +54,7 @@ def parse(message, delimiters=None, url_re=None): # Work on byte level with the utf-16le encoding to get the offsets right. # The offset will just be half the index we're at. - message = _add_surrogate(message) + message = add_surrogate(message) while i < len(message): if url_re and current is None: # If we're not inside a previous match since Telegram doesn't allow @@ -73,7 +70,7 @@ def parse(message, delimiters=None, url_re=None): result.append(MessageEntityTextUrl( offset=url_match.start(), length=len(url_match.group(1)), - url=_del_surrogate(url_match.group(2)) + url=del_surrogate(url_match.group(2)) )) i += len(url_match.group(1)) # Next loop iteration, don't check delimiters, since @@ -128,7 +125,7 @@ def parse(message, delimiters=None, url_re=None): + message[current.offset:] ) - return _del_surrogate(message), result + return del_surrogate(message), result def unparse(text, entities, delimiters=None, url_fmt=None): @@ -156,7 +153,7 @@ def unparse(text, entities, delimiters=None, url_fmt=None): else: entities = tuple(sorted(entities, key=lambda e: e.offset, reverse=True)) - text = _add_surrogate(text) + text = add_surrogate(text) delimiters = {v: k for k, v in delimiters.items()} for entity in entities: s = entity.offset @@ -167,8 +164,8 @@ def unparse(text, entities, delimiters=None, url_fmt=None): elif isinstance(entity, MessageEntityTextUrl) and url_fmt: text = ( text[:s] + - _add_surrogate(url_fmt.format(text[s:e], entity.url)) + + add_surrogate(url_fmt.format(text[s:e], entity.url)) + text[e:] ) - return _del_surrogate(text) + return del_surrogate(text) diff --git a/telethon/helpers.py b/telethon/helpers.py index de66813f..9c3bb116 100644 --- a/telethon/helpers.py +++ b/telethon/helpers.py @@ -1,5 +1,6 @@ """Various helpers not related to the Telegram API itself""" import os +import struct from hashlib import sha1, sha256 @@ -17,6 +18,20 @@ def ensure_parent_dir_exists(file_path): if parent: os.makedirs(parent, exist_ok=True) + +def add_surrogate(text): + return ''.join( + # SMP -> Surrogate Pairs (Telegram offsets are calculated with these). + # See https://en.wikipedia.org/wiki/Plane_(Unicode)#Overview for more. + ''.join(chr(y) for y in struct.unpack(' Surrogate Pairs (Telegram offsets are calculated with these). - # See https://en.wikipedia.org/wiki/Plane_(Unicode)#Overview for more. - ''.join(chr(y) for y in struct.unpack('