mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-07-13 01:22:21 +03:00
Implement dispatching events
This commit is contained in:
parent
a16c397de4
commit
a1fe80557a
|
@ -149,6 +149,7 @@ def init(
|
||||||
self._updates_queue = asyncio.Queue(maxsize=max_queued_updates)
|
self._updates_queue = asyncio.Queue(maxsize=max_queued_updates)
|
||||||
self._updates_handle = None
|
self._updates_handle = None
|
||||||
self._update_handlers = [] # sorted list
|
self._update_handlers = [] # sorted list
|
||||||
|
self._dispatching_update_handlers = False # while dispatching, if add/remove are called, we need to make a copy
|
||||||
self._message_box = MessageBox()
|
self._message_box = MessageBox()
|
||||||
self._entity_cache = EntityCache() # required for proper update handling (to know when to getDifference)
|
self._entity_cache = EntityCache() # required for proper update handling (to know when to getDifference)
|
||||||
|
|
||||||
|
|
|
@ -56,15 +56,42 @@ def add_event_handler(
|
||||||
|
|
||||||
if event is None:
|
if event is None:
|
||||||
for param in inspect.signature(callback).parameters.values():
|
for param in inspect.signature(callback).parameters.values():
|
||||||
if not issubclass(param.annotation, EventBuilder):
|
|
||||||
raise TypeError(f'unrecognized event handler type: {param.annotation!r}')
|
|
||||||
event = param.annotation
|
event = param.annotation
|
||||||
break # only check the first parameter
|
break # only check the first parameter
|
||||||
|
|
||||||
if event is None:
|
if event is None:
|
||||||
event = Raw
|
event = Raw
|
||||||
|
|
||||||
|
if not inspect.iscoroutinefunction(callback):
|
||||||
|
raise TypeError(f'callback was not an async def function: {callback!r}')
|
||||||
|
|
||||||
|
if not isinstance(event, type):
|
||||||
|
raise TypeError(f'event type was not a type (an instance of something was probably used): {event!r}')
|
||||||
|
|
||||||
|
if not isinstance(priority, int):
|
||||||
|
raise TypeError(f'priority was not an integer: {priority!r}')
|
||||||
|
|
||||||
|
if not issubclass(event, EventBuilder):
|
||||||
|
try:
|
||||||
|
if event.SUBCLASS_OF_ID != 0x9f89304e:
|
||||||
|
raise TypeError(f'invalid raw update type for the event handler: {event!r}')
|
||||||
|
|
||||||
|
if 'types' in filters:
|
||||||
|
warnings.warn('"types" filter is ignored when the event type already is a raw update')
|
||||||
|
|
||||||
|
filters['types'] = event
|
||||||
|
event = Raw
|
||||||
|
except AttributeError:
|
||||||
|
raise TypeError(f'unrecognized event handler type: {param.annotation!r}')
|
||||||
|
|
||||||
handler = EventHandler(event, callback, priority, make_filter(**filters))
|
handler = EventHandler(event, callback, priority, make_filter(**filters))
|
||||||
|
|
||||||
|
if self._dispatching_update_handlers:
|
||||||
|
# Now that there's a copy, we're no longer dispatching from the old update_handlers,
|
||||||
|
# so we can modify it. This is why we can turn the flag off.
|
||||||
|
self._update_handlers = self._update_handlers[:]
|
||||||
|
self._dispatching_update_handlers = False
|
||||||
|
|
||||||
bisect.insort(self._update_handlers, handler)
|
bisect.insort(self._update_handlers, handler)
|
||||||
return handler
|
return handler
|
||||||
|
|
||||||
|
@ -80,6 +107,11 @@ def remove_event_handler(
|
||||||
if not self._update_handlers:
|
if not self._update_handlers:
|
||||||
return [] # won't be removing anything (some code paths rely on non-empty lists)
|
return [] # won't be removing anything (some code paths rely on non-empty lists)
|
||||||
|
|
||||||
|
if self._dispatching_update_handlers:
|
||||||
|
# May be an unnecessary copy if nothing was removed, but that's not a big deal.
|
||||||
|
self._update_handlers = self._update_handlers[:]
|
||||||
|
self._dispatching_update_handlers = False
|
||||||
|
|
||||||
if isinstance(callback, EventHandler):
|
if isinstance(callback, EventHandler):
|
||||||
if event is not None or priority is not None:
|
if event is not None or priority is not None:
|
||||||
warnings.warn('event and priority are ignored when removing EventHandler instances')
|
warnings.warn('event and priority are ignored when removing EventHandler instances')
|
||||||
|
@ -138,8 +170,7 @@ async def _update_loop(self: 'TelegramClient'):
|
||||||
updates_to_dispatch = deque()
|
updates_to_dispatch = deque()
|
||||||
while self.is_connected():
|
while self.is_connected():
|
||||||
if updates_to_dispatch:
|
if updates_to_dispatch:
|
||||||
# TODO dispatch
|
await _dispatch(self, updates_to_dispatch.popleft())
|
||||||
updates_to_dispatch.popleft()
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
get_diff = self._message_box.get_difference()
|
get_diff = self._message_box.get_difference()
|
||||||
|
@ -176,3 +207,33 @@ async def _update_loop(self: 'TelegramClient'):
|
||||||
updates_to_dispatch.extend(processed)
|
updates_to_dispatch.extend(processed)
|
||||||
except Exception:
|
except Exception:
|
||||||
self._log[__name__].exception('Fatal error handling updates (this is a bug in Telethon, please report it)')
|
self._log[__name__].exception('Fatal error handling updates (this is a bug in Telethon, please report it)')
|
||||||
|
|
||||||
|
|
||||||
|
async def _dispatch(self, update):
|
||||||
|
self._dispatching_update_handlers = True
|
||||||
|
|
||||||
|
event_cache = {}
|
||||||
|
for handler in self._update_handlers:
|
||||||
|
event = event_cache.get(handler._event)
|
||||||
|
if not event:
|
||||||
|
event_cache[handler._event] = event = handler._event._build(update)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# filters can be modified at any time, and there can be any amount of them which are not yet resolved
|
||||||
|
try:
|
||||||
|
if handler._filter(event):
|
||||||
|
try:
|
||||||
|
await handler._callback(event)
|
||||||
|
except Exception:
|
||||||
|
name = getattr(handler.callback, '__name__', repr(handler.callback))
|
||||||
|
self._log[__name__].exception('Unhandled exception on %s (this is likely a bug in your code)', name)
|
||||||
|
break
|
||||||
|
except NotResolved as e:
|
||||||
|
try:
|
||||||
|
await unresolved.resolve()
|
||||||
|
except Exception:
|
||||||
|
# we cannot really do much about this; it might be a temporary network issue
|
||||||
|
warnings.warn(f'failed to resolve filter, handler will be skipped: {e.unresolved!r}')
|
||||||
|
break
|
||||||
|
|
||||||
|
self._dispatching_update_handlers = False
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
import abc
|
import abc
|
||||||
import functools
|
import functools
|
||||||
|
|
||||||
|
from .filters import Filter
|
||||||
|
|
||||||
|
|
||||||
class StopPropagation(Exception):
|
class StopPropagation(Exception):
|
||||||
"""
|
"""
|
||||||
|
@ -48,7 +50,7 @@ class EventBuilder(abc.ABC):
|
||||||
class EventHandler:
|
class EventHandler:
|
||||||
__slots__ = ('_event', '_callback', '_priority', '_filter')
|
__slots__ = ('_event', '_callback', '_priority', '_filter')
|
||||||
|
|
||||||
def __init__(self, event, callback, priority, filter):
|
def __init__(self, event: EventBuilder, callback: callable, priority: int, filter: Filter):
|
||||||
self._event = event
|
self._event = event
|
||||||
self._callback = callback
|
self._callback = callback
|
||||||
self._priority = priority
|
self._priority = priority
|
||||||
|
|
Loading…
Reference in New Issue
Block a user