diff --git a/telethon/client/auth.py b/telethon/client/auth.py index ba7523e2..0f5b8a3f 100644 --- a/telethon/client/auth.py +++ b/telethon/client/auth.py @@ -193,7 +193,7 @@ class AuthMethods(MessageParseMethods, UserMethods): return self def sign_in( - self, phone=None, *, code=None, password=None, + self, phone=None, code=None, *, password=None, bot_token=None, phone_code_hash=None): """ Starts or completes the sign in process with the given phone number diff --git a/telethon/client/messages.py b/telethon/client/messages.py index 989b7d39..2bf7317c 100644 --- a/telethon/client/messages.py +++ b/telethon/client/messages.py @@ -22,7 +22,7 @@ class MessageMethods(UploadMethods, MessageParseMethods): self, entity, limit=None, *, offset_date=None, offset_id=0, max_id=0, min_id=0, add_offset=0, search=None, filter=None, from_user=None, batch_size=100, wait_time=None, ids=None, - _total=None): + reverse=False, _total=None): """ Iterator over the message history for the specified entity. @@ -91,6 +91,15 @@ class MessageMethods(UploadMethods, MessageParseMethods): will appear in its place, so that zipping the list of IDs with the messages can match one-to-one. + reverse (`bool`, optional): + If set to ``True``, the messages will be returned in reverse + order (from oldest to newest, instead of the default newest + to oldest). This also means that the meaning of `offset_id` + and `offset_date` parameters is reversed, although they will + still be exclusive. `min_id` becomes equivalent to `offset_id` + instead of being `max_id` as well since messages are returned + in ascending order. + _total (`list`, optional): A single-item list to pass the total parameter by reference. @@ -112,6 +121,8 @@ class MessageMethods(UploadMethods, MessageParseMethods): if ids: if not utils.is_list_like(ids): ids = (ids,) + if reverse: + ids = list(reversed(ids)) for x in self._iter_ids(entity, ids, total=_total): yield (x) return @@ -121,10 +132,26 @@ class MessageMethods(UploadMethods, MessageParseMethods): # # We can emulate their behaviour locally by setting offset = max_id # and simply stopping once we hit a message with ID <= min_id. - offset_id = max(offset_id, max_id) - if offset_id and min_id: - if offset_id - min_id <= 1: - return + if reverse: + offset_id = max(offset_id, min_id) + if offset_id and max_id: + if max_id - offset_id <= 1: + print('suck lol') + return + + if not max_id: + max_id = float('inf') + else: + offset_id = max(offset_id, max_id) + if offset_id and min_id: + if offset_id - min_id <= 1: + return + + if reverse: + if offset_id: + offset_id += 1 + else: + offset_id = 1 from_id = None limit = float('inf') if limit is None else int(limit) @@ -139,7 +166,7 @@ class MessageMethods(UploadMethods, MessageParseMethods): max_date=offset_date, offset_id=offset_id, add_offset=add_offset, - limit=1, + limit=0, # Search actually returns 0 items if we ask it to max_id=0, min_id=0, hash=0, @@ -182,12 +209,24 @@ class MessageMethods(UploadMethods, MessageParseMethods): wait_time = 1 if limit > 3000 else 0 have = 0 - last_id = float('inf') - batch_size = min(max(batch_size, 1), 100) + last_id = 0 if reverse else float('inf') + + # Telegram has a hard limit of 100. + # We don't need to fetch 100 if the limit is less. + batch_size = min(max(batch_size, 1), min(100, limit)) + + # Use a negative offset to work around reversing the results + if reverse: + request.add_offset -= batch_size + while have < limit: start = time.time() - # Telegram has a hard limit of 100 + request.limit = min(limit - have, batch_size) + if reverse and request.limit != batch_size: + # Last batch needs special care if we're on reverse + request.add_offset += batch_size - request.limit + 1 + r = self(request) if _total: _total[0] = getattr(r, 'count', len(r.messages)) @@ -195,19 +234,23 @@ class MessageMethods(UploadMethods, MessageParseMethods): entities = {utils.get_peer_id(x): x for x in itertools.chain(r.users, r.chats)} - for message in r.messages: - if message.id <= min_id: - return - + messages = reversed(r.messages) if reverse else r.messages + for message in messages: if (isinstance(message, types.MessageEmpty) - or message.id >= last_id - or (from_id and message.from_id != from_id)): + or from_id and message.from_id != from_id): continue + if reverse: + if message.id <= last_id or message.id >= max_id: + return + else: + if message.id >= last_id or message.id <= min_id: + return + # There has been reports that on bad connections this method # was returning duplicated IDs sometimes. Using ``last_id`` # is an attempt to avoid these duplicates, since the message - # IDs are returned in descending order. + # IDs are returned in descending order (or asc if reverse). last_id = message.id yield (custom.Message(self, message, entities, entity)) @@ -216,11 +259,11 @@ class MessageMethods(UploadMethods, MessageParseMethods): if len(r.messages) < request.limit: break - request.offset_id = r.messages[-1].id # Find the first message that's not empty (in some rare cases # it can happen that the last message is :tl:`MessageEmpty`) last_message = None - for m in reversed(r.messages): + messages = r.messages if reverse else reversed(r.messages) + for m in messages: if not isinstance(m, types.MessageEmpty): last_message = m break @@ -234,11 +277,16 @@ class MessageMethods(UploadMethods, MessageParseMethods): # should just give up since there won't be any new Message. break else: + request.offset_id = last_message.id if isinstance(request, functions.messages.GetHistoryRequest): request.offset_date = last_message.date else: request.max_date = last_message.date + if reverse: + # We want to skip the one we already have + request.add_offset -= 1 + time.sleep( max(wait_time - (time.time() - start), 0)) diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index 4c64334e..b93e1f37 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -1,6 +1,7 @@ import abc import logging import platform +import queue import sys import threading import time @@ -91,6 +92,17 @@ class TelegramBaseClient(abc.ABC): Whether reconnection should be retried `connection_retries` times automatically if Telegram disconnects us or not. + sequential_updates (`bool`, optional): + By default every incoming update will create a new task, so + you can handle several updates in parallel. Some scripts need + the order in which updates are processed to be sequential, and + this setting allows them to do so. + + If set to ``True``, incoming updates will be put in a queue + and processed sequentially. This means your event handlers + should *not* perform long-running operations since new + updates are put inside of an unbounded queue. + flood_sleep_threshold (`int` | `float`, optional): The threshold below which the library should automatically sleep on flood wait errors (inclusive). For instance, if a @@ -138,6 +150,7 @@ class TelegramBaseClient(abc.ABC): request_retries=5, connection_retries=5, auto_reconnect=True, + sequential_updates=False, flood_sleep_threshold=60, device_model=None, system_version=None, @@ -226,6 +239,13 @@ class TelegramBaseClient(abc.ABC): self._last_request = time.time() self._channel_pts = {} + if sequential_updates: + self._updates_queue = queue.Queue() + self._dispatching_updates_queue = threading.Event() + else: + self._updates_queue = None + self._dispatching_updates_queue = None + # Start with invalid state (-1) so we can have somewhere to store # the state, but also be able to determine if we are authorized. self._state = types.updates.State(-1, 0, datetime.now(), 0, -1) @@ -279,7 +299,8 @@ class TelegramBaseClient(abc.ABC): """ Returns ``True`` if the user has connected. """ - return self._sender.is_connected() + sender = getattr(self, '_sender', None) + return sender and sender.is_connected() def disconnect(self): """ diff --git a/telethon/client/updates.py b/telethon/client/updates.py index edb14096..0a8e89d1 100644 --- a/telethon/client/updates.py +++ b/telethon/client/updates.py @@ -168,7 +168,13 @@ class UpdateMethods(UserMethods): self._handle_update(update.update) else: update._entities = getattr(update, '_entities', {}) - syncio.create_task(self._dispatch_update, update) + if self._updates_queue is None: + syncio.create_task(self._dispatch_update, update) + else: + self._updates_queue.put_nowait(update) + if not self._dispatching_updates_queue.is_set(): + self._dispatching_updates_queue.set() + syncio.create_task(self._dispatch_queue_updates) need_diff = False if hasattr(update, 'pts'): @@ -217,6 +223,12 @@ class UpdateMethods(UserMethods): self(functions.updates.GetStateRequest()) + def _dispatch_queue_updates(self): + while not self._updates_queue.empty(): + self._dispatch_update(self._updates_queue.get_nowait()) + + self._dispatching_updates_queue.clear() + def _dispatch_update(self, update): if self._events_pending_resolve: if self._event_resolve_lock.locked(): diff --git a/telethon/extensions/markdown.py b/telethon/extensions/markdown.py index cb54c99e..5274dc85 100644 --- a/telethon/extensions/markdown.py +++ b/telethon/extensions/markdown.py @@ -5,15 +5,12 @@ since they seem to count as two characters and it's a bit strange. """ import re +from ..helpers import add_surrogate, del_surrogate from ..tl import TLObject from ..tl.types import ( MessageEntityBold, MessageEntityItalic, MessageEntityCode, MessageEntityPre, MessageEntityTextUrl ) -from ..utils import ( - add_surrogate as _add_surrogate, - del_surrogate as _del_surrogate -) DEFAULT_DELIMITERS = { '**': MessageEntityBold, @@ -57,7 +54,7 @@ def parse(message, delimiters=None, url_re=None): # Work on byte level with the utf-16le encoding to get the offsets right. # The offset will just be half the index we're at. - message = _add_surrogate(message) + message = add_surrogate(message) while i < len(message): if url_re and current is None: # If we're not inside a previous match since Telegram doesn't allow @@ -73,7 +70,7 @@ def parse(message, delimiters=None, url_re=None): result.append(MessageEntityTextUrl( offset=url_match.start(), length=len(url_match.group(1)), - url=_del_surrogate(url_match.group(2)) + url=del_surrogate(url_match.group(2)) )) i += len(url_match.group(1)) # Next loop iteration, don't check delimiters, since @@ -128,7 +125,7 @@ def parse(message, delimiters=None, url_re=None): + message[current.offset:] ) - return _del_surrogate(message), result + return del_surrogate(message), result def unparse(text, entities, delimiters=None, url_fmt=None): @@ -156,7 +153,7 @@ def unparse(text, entities, delimiters=None, url_fmt=None): else: entities = tuple(sorted(entities, key=lambda e: e.offset, reverse=True)) - text = _add_surrogate(text) + text = add_surrogate(text) delimiters = {v: k for k, v in delimiters.items()} for entity in entities: s = entity.offset @@ -167,8 +164,8 @@ def unparse(text, entities, delimiters=None, url_fmt=None): elif isinstance(entity, MessageEntityTextUrl) and url_fmt: text = ( text[:s] + - _add_surrogate(url_fmt.format(text[s:e], entity.url)) + + add_surrogate(url_fmt.format(text[s:e], entity.url)) + text[e:] ) - return _del_surrogate(text) + return del_surrogate(text) diff --git a/telethon/helpers.py b/telethon/helpers.py index de66813f..9c3bb116 100644 --- a/telethon/helpers.py +++ b/telethon/helpers.py @@ -1,5 +1,6 @@ """Various helpers not related to the Telegram API itself""" import os +import struct from hashlib import sha1, sha256 @@ -17,6 +18,20 @@ def ensure_parent_dir_exists(file_path): if parent: os.makedirs(parent, exist_ok=True) + +def add_surrogate(text): + return ''.join( + # SMP -> Surrogate Pairs (Telegram offsets are calculated with these). + # See https://en.wikipedia.org/wiki/Plane_(Unicode)#Overview for more. + ''.join(chr(y) for y in struct.unpack(' Surrogate Pairs (Telegram offsets are calculated with these). - # See https://en.wikipedia.org/wiki/Plane_(Unicode)#Overview for more. - ''.join(chr(y) for y in struct.unpack('