diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index 9fce65fe..9f689427 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -379,6 +379,15 @@ class TelegramBaseClient(abc.ABC): # {chat_id: {Conversation}} self._conversations = collections.defaultdict(set) + # Hack to workaround the fact Telegram may send album updates as + # different Updates when being sent from a different data center. + # {grouped_id: AlbumHack} + # + # FIXME: We don't bother cleaning this up because it's not really + # worth it, albums are pretty rare and this only holds them + # for a second at most. + self._albums = {} + # Default parse mode self._parse_mode = markdown diff --git a/telethon/client/updates.py b/telethon/client/updates.py index 70a3d9a1..265ae850 100644 --- a/telethon/client/updates.py +++ b/telethon/client/updates.py @@ -459,6 +459,46 @@ class UpdateMethods: self._log[__name__].exception('Unhandled exception on %s', name) + async def _dispatch_event(self: 'TelegramClient', event): + """ + Dispatches a single, out-of-order event. Used by `AlbumHack`. + """ + # We're duplicating a most logic from `_dispatch_update`, but all in + # the name of speed; we don't want to make it worse for all updates + # just because albums may need it. + for builder, callback in self._event_builders: + if not isinstance(event, builder.Event): + continue + + if not builder.resolved: + await builder.resolve(self) + + filter = builder.filter(event) + if inspect.isawaitable(filter): + filter = await filter + if not filter: + continue + + try: + await callback(event) + except errors.AlreadyInConversationError: + name = getattr(callback, '__name__', repr(callback)) + self._log[__name__].debug( + 'Event handler "%s" already has an open conversation, ' + 'ignoring new one', name) + except events.StopPropagation: + name = getattr(callback, '__name__', repr(callback)) + self._log[__name__].debug( + 'Event handler "%s" stopped chain of propagation ' + 'for event %s.', name, type(event).__name__ + ) + break + except Exception as e: + if not isinstance(e, asyncio.CancelledError) or self.is_connected(): + name = getattr(callback, '__name__', repr(callback)) + self._log[__name__].exception('Unhandled exception on %s', + name) + async def _get_difference(self: 'TelegramClient', update, channel_id, pts_date): """ Get the difference for this `channel_id` if any, then load entities. diff --git a/telethon/events/album.py b/telethon/events/album.py index f32dfc83..473542bd 100644 --- a/telethon/events/album.py +++ b/telethon/events/album.py @@ -1,4 +1,6 @@ +import asyncio import time +import weakref from .common import EventBuilder, EventCommon, name_inner_event from .. import utils @@ -14,6 +16,54 @@ _IGNORE_MAX_AGE = 5 # seconds _IGNORE_DICT = {} +_HACK_DELAY = 0.5 + + +class AlbumHack: + """ + When receiving an album from a different data-center, they will come in + separate `Updates`, so we need to temporarily remember them for a while + and only after produce the event. + + Of course events are not designed for this kind of wizardy, so this is + a dirty hack that gets the job done. + + When cleaning up the code base we may want to figure out a better way + to do this, or just leave the album problem to the users; the update + handling code is bad enough as it is. + """ + def __init__(self, client, event): + # It's probably silly to use a weakref here because this object is + # very short-lived but might as well try to do "the right thing". + self._client = weakref.ref(client) + self._event = event # parent event + self._due = client.loop.time() + _HACK_DELAY + + client.loop.create_task(self.deliver_event()) + + def extend(self, messages): + client = self._client() + if client: # weakref may be dead + self._event.messages.extend(messages) + self._due = client.loop.time() + _HACK_DELAY + + async def deliver_event(self): + while True: + client = self._client() + if client is None: + return # weakref is dead, nothing to deliver + + diff = self._due - client.loop.time() + if diff <= 0: + # We've hit our due time, deliver event. It won't respect + # sequential updates but fixing that would just worsen this. + await client._dispatch_event(self._event) + return + + del client # Clear ref and sleep until our due time + await asyncio.sleep(diff) + + @name_inner_event class Album(EventBuilder): """ @@ -66,6 +116,7 @@ class Album(EventBuilder): return # Check if the ignore list is too big, and if it is clean it + # TODO time could technically go backwards; time is not monotonic now = time.time() if len(_IGNORE_DICT) > _IGNORE_MAX_SIZE: for i in [i for i, t in _IGNORE_DICT.items() if now - t > _IGNORE_MAX_AGE]: @@ -84,6 +135,11 @@ class Album(EventBuilder): and u.message.grouped_id == group) ]) + def filter(self, event): + # Albums with less than two messages require a few hacks to work. + if len(event.messages) > 1: + return super().filter(event) + class Event(EventCommon, SenderGetter): """ Represents the event of a new album. @@ -115,6 +171,14 @@ class Album(EventBuilder): for msg in self.messages: msg._finish_init(client, self._entities, None) + if len(self.messages) == 1: + # This will require hacks to be a proper album event + hack = client._albums.get(self.grouped_id) + if hack is None: + client._albums[self.grouped_id] = AlbumHack(client, self) + else: + hack.extend(self.messages) + @property def grouped_id(self): """