Add hacks to properly handle events.Album from other DCs

Fixes #1479.
This commit is contained in:
Lonami Exo 2020-06-06 21:01:02 +02:00
parent faf7263d8f
commit 4b933069f1
3 changed files with 113 additions and 0 deletions

View File

@ -379,6 +379,15 @@ class TelegramBaseClient(abc.ABC):
# {chat_id: {Conversation}} # {chat_id: {Conversation}}
self._conversations = collections.defaultdict(set) self._conversations = collections.defaultdict(set)
# Hack to workaround the fact Telegram may send album updates as
# different Updates when being sent from a different data center.
# {grouped_id: AlbumHack}
#
# FIXME: We don't bother cleaning this up because it's not really
# worth it, albums are pretty rare and this only holds them
# for a second at most.
self._albums = {}
# Default parse mode # Default parse mode
self._parse_mode = markdown self._parse_mode = markdown

View File

@ -459,6 +459,46 @@ class UpdateMethods:
self._log[__name__].exception('Unhandled exception on %s', self._log[__name__].exception('Unhandled exception on %s',
name) name)
async def _dispatch_event(self: 'TelegramClient', event):
"""
Dispatches a single, out-of-order event. Used by `AlbumHack`.
"""
# We're duplicating a most logic from `_dispatch_update`, but all in
# the name of speed; we don't want to make it worse for all updates
# just because albums may need it.
for builder, callback in self._event_builders:
if not isinstance(event, builder.Event):
continue
if not builder.resolved:
await builder.resolve(self)
filter = builder.filter(event)
if inspect.isawaitable(filter):
filter = await filter
if not filter:
continue
try:
await callback(event)
except errors.AlreadyInConversationError:
name = getattr(callback, '__name__', repr(callback))
self._log[__name__].debug(
'Event handler "%s" already has an open conversation, '
'ignoring new one', name)
except events.StopPropagation:
name = getattr(callback, '__name__', repr(callback))
self._log[__name__].debug(
'Event handler "%s" stopped chain of propagation '
'for event %s.', name, type(event).__name__
)
break
except Exception as e:
if not isinstance(e, asyncio.CancelledError) or self.is_connected():
name = getattr(callback, '__name__', repr(callback))
self._log[__name__].exception('Unhandled exception on %s',
name)
async def _get_difference(self: 'TelegramClient', update, channel_id, pts_date): async def _get_difference(self: 'TelegramClient', update, channel_id, pts_date):
""" """
Get the difference for this `channel_id` if any, then load entities. Get the difference for this `channel_id` if any, then load entities.

View File

@ -1,4 +1,6 @@
import asyncio
import time import time
import weakref
from .common import EventBuilder, EventCommon, name_inner_event from .common import EventBuilder, EventCommon, name_inner_event
from .. import utils from .. import utils
@ -14,6 +16,54 @@ _IGNORE_MAX_AGE = 5 # seconds
_IGNORE_DICT = {} _IGNORE_DICT = {}
_HACK_DELAY = 0.5
class AlbumHack:
"""
When receiving an album from a different data-center, they will come in
separate `Updates`, so we need to temporarily remember them for a while
and only after produce the event.
Of course events are not designed for this kind of wizardy, so this is
a dirty hack that gets the job done.
When cleaning up the code base we may want to figure out a better way
to do this, or just leave the album problem to the users; the update
handling code is bad enough as it is.
"""
def __init__(self, client, event):
# It's probably silly to use a weakref here because this object is
# very short-lived but might as well try to do "the right thing".
self._client = weakref.ref(client)
self._event = event # parent event
self._due = client.loop.time() + _HACK_DELAY
client.loop.create_task(self.deliver_event())
def extend(self, messages):
client = self._client()
if client: # weakref may be dead
self._event.messages.extend(messages)
self._due = client.loop.time() + _HACK_DELAY
async def deliver_event(self):
while True:
client = self._client()
if client is None:
return # weakref is dead, nothing to deliver
diff = self._due - client.loop.time()
if diff <= 0:
# We've hit our due time, deliver event. It won't respect
# sequential updates but fixing that would just worsen this.
await client._dispatch_event(self._event)
return
del client # Clear ref and sleep until our due time
await asyncio.sleep(diff)
@name_inner_event @name_inner_event
class Album(EventBuilder): class Album(EventBuilder):
""" """
@ -66,6 +116,7 @@ class Album(EventBuilder):
return return
# Check if the ignore list is too big, and if it is clean it # Check if the ignore list is too big, and if it is clean it
# TODO time could technically go backwards; time is not monotonic
now = time.time() now = time.time()
if len(_IGNORE_DICT) > _IGNORE_MAX_SIZE: if len(_IGNORE_DICT) > _IGNORE_MAX_SIZE:
for i in [i for i, t in _IGNORE_DICT.items() if now - t > _IGNORE_MAX_AGE]: for i in [i for i, t in _IGNORE_DICT.items() if now - t > _IGNORE_MAX_AGE]:
@ -84,6 +135,11 @@ class Album(EventBuilder):
and u.message.grouped_id == group) and u.message.grouped_id == group)
]) ])
def filter(self, event):
# Albums with less than two messages require a few hacks to work.
if len(event.messages) > 1:
return super().filter(event)
class Event(EventCommon, SenderGetter): class Event(EventCommon, SenderGetter):
""" """
Represents the event of a new album. Represents the event of a new album.
@ -115,6 +171,14 @@ class Album(EventBuilder):
for msg in self.messages: for msg in self.messages:
msg._finish_init(client, self._entities, None) msg._finish_init(client, self._entities, None)
if len(self.messages) == 1:
# This will require hacks to be a proper album event
hack = client._albums.get(self.grouped_id)
if hack is None:
client._albums[self.grouped_id] = AlbumHack(client, self)
else:
hack.extend(self.messages)
@property @property
def grouped_id(self): def grouped_id(self):
""" """