From 483e2aadf13461f12fd9b14d8521d00430b5e400 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Tue, 15 Feb 2022 11:57:55 +0100 Subject: [PATCH] Begin updating the way updates are built --- telethon/_client/updates.py | 104 ++++++++------ telethon/_events/album.py | 8 +- telethon/_events/base.py | 9 +- telethon/_events/callbackquery.py | 37 ++--- telethon/_events/chataction.py | 186 +++++++++++++------------ telethon/_events/inlinequery.py | 24 ++-- telethon/_events/messagedeleted.py | 26 ++-- telethon/_events/messageedited.py | 2 +- telethon/_events/messageread.py | 39 ++++-- telethon/_events/newmessage.py | 10 +- telethon/_events/raw.py | 2 +- telethon/_events/userupdate.py | 50 +++---- telethon/_misc/utils.py | 36 +---- telethon/types/_custom/chat.py | 46 ++++-- telethon/types/_custom/chatgetter.py | 8 +- telethon/types/_custom/message.py | 6 +- telethon/types/_custom/sendergetter.py | 8 +- telethon/types/_custom/user.py | 20 ++- 18 files changed, 337 insertions(+), 284 deletions(-) diff --git a/telethon/_client/updates.py b/telethon/_client/updates.py index 16c20266..32375dd6 100644 --- a/telethon/_client/updates.py +++ b/telethon/_client/updates.py @@ -170,7 +170,7 @@ async def _update_loop(self: 'TelegramClient'): updates_to_dispatch = deque() while self.is_connected(): if updates_to_dispatch: - await _dispatch(self, updates_to_dispatch.popleft()) + await _dispatch(self, *updates_to_dispatch.popleft()) continue get_diff = self._message_box.get_difference() @@ -178,8 +178,7 @@ async def _update_loop(self: 'TelegramClient'): self._log[__name__].info('Getting difference for account updates') diff = await self(get_diff) updates, users, chats = self._message_box.apply_difference(diff, self._entity_cache) - self._entity_cache.extend(users, chats) - updates_to_dispatch.extend(updates) + updates_to_dispatch.extend(_preprocess_updates(self, updates, users, chats)) continue get_diff = self._message_box.get_channel_difference(self._entity_cache) @@ -187,8 +186,7 @@ async def _update_loop(self: 'TelegramClient'): self._log[__name__].info('Getting difference for channel updates') diff = await self(get_diff) updates, users, chats = self._message_box.apply_channel_difference(get_diff, diff, self._entity_cache) - self._entity_cache.extend(users, chats) - updates_to_dispatch.extend(updates) + updates_to_dispatch.extend(_preprocess_updates(self, updates, users, chats)) continue deadline = self._message_box.check_deadlines() @@ -203,46 +201,74 @@ async def _update_loop(self: 'TelegramClient'): processed = [] users, chats = self._message_box.process_updates(updates, self._entity_cache, processed) - self._entity_cache.extend(users, chats) - updates_to_dispatch.extend(processed) + updates_to_dispatch.extend(_preprocess_updates(self, processed, users, chats)) except Exception: self._log[__name__].exception('Fatal error handling updates (this is a bug in Telethon, please report it)') -async def _dispatch(self, update): +def _preprocess_updates(self, updates, users, chats): + self._entity_cache.extend(users, chats) + entities = Entities(self, users, chats) + return ((u, entities) for u in updates) + + +class Entities: + def __init__(self, client, users, chats): + self.self_id = client._session_state.user_id + self._entities = {e.id: e for e in itertools.chain( + (User(client, u) for u in users), + (Chat(client, c) for u in chats), + )} + + def get(self, client, peer): + if not peer: + return None + + id = utils.get_peer_id(peer) + try: + return self._entities[id] + except KeyError: + entity = client._entity_cache.get(query.user_id) + if not entity: + raise RuntimeError('Update is missing a hash but did not trigger a gap') + + self._entities[entity.id] = User(client, entity) if entity.is_user else Chat(client, entity) + return self._entities[entity.id] + + +async def _dispatch(self, update, entities): self._dispatching_update_handlers = True + try: + event_cache = {} + for handler in self._update_handlers: + event, entities = event_cache.get(handler._event) + if not event: + # build can fail if we're missing an access hash; we want this to crash + event_cache[handler._event] = event = handler._event._build(self, update, entities) - event_cache = {} - for handler in self._update_handlers: - event = event_cache.get(handler._event) - if not event: - event_cache[handler._event] = event = handler._event._build( - update, [], self._session_state.user_id, {}, self) - - while True: - # filters can be modified at any time, and there can be any amount of them which are not yet resolved - try: - if handler._filter(event): - try: - await handler._callback(event) - except StopPropagation: - self._dispatching_update_handlers = False - return - except Exception: - name = getattr(handler._callback, '__name__', repr(handler._callback)) - self._log[__name__].exception('Unhandled exception on %s (this is likely a bug in your code)', name) - except NotResolved as nr: + while True: + # filters can be modified at any time, and there can be any amount of them which are not yet resolved try: - await nr.unresolved.resolve() - continue + if handler._filter(event): + try: + await handler._callback(event) + except StopPropagation: + return + except Exception: + name = getattr(handler._callback, '__name__', repr(handler._callback)) + self._log[__name__].exception('Unhandled exception on %s (this is likely a bug in your code)', name) + except NotResolved as nr: + try: + await nr.unresolved.resolve() + continue + except Exception as e: + # we cannot really do much about this; it might be a temporary network issue + warnings.warn(f'failed to resolve filter, handler will be skipped: {e}: {nr.unresolved!r}') except Exception as e: - # we cannot really do much about this; it might be a temporary network issue - warnings.warn(f'failed to resolve filter, handler will be skipped: {e}: {nr.unresolved!r}') - except Exception as e: - # invalid filter (e.g. types when types were not used as input) - warnings.warn(f'invalid filter applied, handler will be skipped: {e}: {e.filter!r}') + # invalid filter (e.g. types when types were not used as input) + warnings.warn(f'invalid filter applied, handler will be skipped: {e}: {e.filter!r}') - # we only want to continue on unresolved filter (to check if there are more unresolved) - break - - self._dispatching_update_handlers = False + # we only want to continue on unresolved filter (to check if there are more unresolved) + break + finally: + self._dispatching_update_handlers = False diff --git a/telethon/_events/album.py b/telethon/_events/album.py index 41646acf..d57968f0 100644 --- a/telethon/_events/album.py +++ b/telethon/_events/album.py @@ -107,7 +107,7 @@ class Album(EventBuilder, _custom.chatgetter.ChatGetter, _custom.sendergetter.Se _custom.sendergetter.SenderGetter.__init__(self, message.sender_id) self.messages = messages - def _build(cls, update, others=None, self_id=None, *todo, **todo2): + def _build(cls, client, update, entities): if not others: return # We only care about albums which come inside the same Updates @@ -146,6 +146,12 @@ class Album(EventBuilder, _custom.chatgetter.ChatGetter, _custom.sendergetter.Se and u.message.grouped_id == group) ]) + self = cls.__new__(cls) + self._client = client + self._sender = entities.get(_tl.PeerUser(update.user_id)) + self._chat = entities.get(_tl.PeerUser(update.user_id)) + return self + def _set_client(self, client): super()._set_client(client) self._sender, self._input_sender = utils._get_entity_pair(self.sender_id, self._entities) diff --git a/telethon/_events/base.py b/telethon/_events/base.py index f0af195f..404ff7b6 100644 --- a/telethon/_events/base.py +++ b/telethon/_events/base.py @@ -34,15 +34,12 @@ class StopPropagation(Exception): class EventBuilder(abc.ABC): @classmethod @abc.abstractmethod - def _build(cls, update, others, self_id, entities, client): + def _build(cls, client, update, entities): """ Builds an event for the given update if possible, or returns None. - `others` are the rest of updates that came in the same container - as the current `update`. - - `self_id` should be the current user's ID, since it is required - for some events which lack this information but still need it. + `entities` must have `get(Peer) -> User|Chat` and `self_id`, + which must be the current user's ID. """ diff --git a/telethon/_events/callbackquery.py b/telethon/_events/callbackquery.py index 585fba51..5d36856b 100644 --- a/telethon/_events/callbackquery.py +++ b/telethon/_events/callbackquery.py @@ -69,29 +69,32 @@ class CallbackQuery(EventBuilder, _custom.chatgetter.ChatGetter, _custom.senderg Button.inline('Nope', b'no') ]) """ - def __init__(self, query, peer, msg_id): - _custom.chatgetter.ChatGetter.__init__(self, peer) - _custom.sendergetter.SenderGetter.__init__(self, query.user_id) + @classmethod + def _build(cls, client, update, entities): + query = update + if isinstance(update, _tl.UpdateBotCallbackQuery): + peer = update.peer + msg_id = update.msg_id + elif isinstance(update, _tl.UpdateInlineBotCallbackQuery): + # See https://github.com/LonamiWebs/Telethon/pull/1005 + # The long message ID is actually just msg_id + peer_id + msg_id, pid = struct.unpack('