mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2024-11-26 11:23:46 +03:00
34a8140ff0
This was causing the checks against chats to fail. In addition to that, before setting the attribute, it is now casted to bool to prevent more issues like this in the future (or if users use non-boolean values).
203 lines
6.5 KiB
Python
203 lines
6.5 KiB
Python
import abc
|
|
import asyncio
|
|
import warnings
|
|
|
|
from .. import utils
|
|
from ..tl import TLObject, types
|
|
from ..tl.custom.chatgetter import ChatGetter
|
|
|
|
|
|
async def _into_id_set(client, chats):
|
|
"""Helper util to turn the input chat or chats into a set of IDs."""
|
|
if chats is None:
|
|
return None
|
|
|
|
if not utils.is_list_like(chats):
|
|
chats = (chats,)
|
|
|
|
result = set()
|
|
for chat in chats:
|
|
if isinstance(chat, int):
|
|
if chat < 0:
|
|
result.add(chat) # Explicitly marked IDs are negative
|
|
else:
|
|
result.update({ # Support all valid types of peers
|
|
utils.get_peer_id(types.PeerUser(chat)),
|
|
utils.get_peer_id(types.PeerChat(chat)),
|
|
utils.get_peer_id(types.PeerChannel(chat)),
|
|
})
|
|
elif isinstance(chat, TLObject) and chat.SUBCLASS_OF_ID == 0x2d45687:
|
|
# 0x2d45687 == crc32(b'Peer')
|
|
result.add(utils.get_peer_id(chat))
|
|
else:
|
|
chat = await client.get_input_entity(chat)
|
|
if isinstance(chat, types.InputPeerSelf):
|
|
chat = await client.get_me(input_peer=True)
|
|
result.add(utils.get_peer_id(chat))
|
|
|
|
return result
|
|
|
|
|
|
class EventBuilder(abc.ABC):
|
|
"""
|
|
The common event builder, with builtin support to filter per chat.
|
|
|
|
Args:
|
|
chats (`entity`, optional):
|
|
May be one or more entities (username/peer/etc.), preferably IDs.
|
|
By default, only matching chats will be handled.
|
|
|
|
blacklist_chats (`bool`, optional):
|
|
Whether to treat the chats as a blacklist instead of
|
|
as a whitelist (default). This means that every chat
|
|
will be handled *except* those specified in ``chats``
|
|
which will be ignored if ``blacklist_chats=True``.
|
|
|
|
func (`callable`, optional):
|
|
A callable function that should accept the event as input
|
|
parameter, and return a value indicating whether the event
|
|
should be dispatched or not (any truthy value will do, it
|
|
does not need to be a `bool`). It works like a custom filter:
|
|
|
|
.. code-block:: python
|
|
|
|
@client.on(events.NewMessage(func=lambda e: e.is_private))
|
|
async def handler(event):
|
|
pass # code here
|
|
"""
|
|
self_id = None
|
|
|
|
def __init__(self, chats=None, *, blacklist_chats=False, func=None):
|
|
self.chats = chats
|
|
self.blacklist_chats = bool(blacklist_chats)
|
|
self.resolved = False
|
|
self.func = func
|
|
self._resolve_lock = None
|
|
|
|
@classmethod
|
|
@abc.abstractmethod
|
|
def build(cls, update):
|
|
"""Builds an event for the given update if possible, or returns None"""
|
|
|
|
async def resolve(self, client):
|
|
"""Helper method to allow event builders to be resolved before usage"""
|
|
if self.resolved:
|
|
return
|
|
|
|
if not self._resolve_lock:
|
|
self._resolve_lock = asyncio.Lock(loop=client.loop)
|
|
|
|
async with self._resolve_lock:
|
|
if not self.resolved:
|
|
await self._resolve(client)
|
|
self.resolved = True
|
|
|
|
async def _resolve(self, client):
|
|
self.chats = await _into_id_set(client, self.chats)
|
|
if not EventBuilder.self_id:
|
|
EventBuilder.self_id = await client.get_peer_id('me')
|
|
|
|
def filter(self, event):
|
|
"""
|
|
If the ID of ``event._chat_peer`` isn't in the chats set (or it is
|
|
but the set is a blacklist) returns ``None``, otherwise the event.
|
|
|
|
The events must have been resolved before this can be called.
|
|
"""
|
|
if not self.resolved:
|
|
return None
|
|
|
|
if self.chats is not None:
|
|
# Note: the `event.chat_id` property checks if it's `None` for us
|
|
inside = event.chat_id in self.chats
|
|
if inside == self.blacklist_chats:
|
|
# If this chat matches but it's a blacklist ignore.
|
|
# If it doesn't match but it's a whitelist ignore.
|
|
return None
|
|
|
|
if not self.func or self.func(event):
|
|
return event
|
|
|
|
|
|
class EventCommon(ChatGetter, abc.ABC):
|
|
"""
|
|
Intermediate class with common things to all events.
|
|
|
|
Remember that this class implements `ChatGetter
|
|
<telethon.tl.custom.chatgetter.ChatGetter>` which
|
|
means you have access to all chat properties and methods.
|
|
|
|
In addition, you can access the `original_update`
|
|
field which contains the original :tl:`Update`.
|
|
"""
|
|
_event_name = 'Event'
|
|
|
|
def __init__(self, chat_peer=None, msg_id=None, broadcast=False):
|
|
self._entities = {}
|
|
self._client = None
|
|
self._chat_peer = chat_peer
|
|
self._message_id = msg_id
|
|
self._input_chat = None
|
|
self._chat = None
|
|
self._broadcast = broadcast
|
|
self.original_update = None
|
|
|
|
def _set_client(self, client):
|
|
"""
|
|
Setter so subclasses can act accordingly when the client is set.
|
|
"""
|
|
self._client = client
|
|
|
|
def _get_entity_pair(self, entity_id):
|
|
"""
|
|
Returns ``(entity, input_entity)`` for the given entity ID.
|
|
"""
|
|
entity = self._entities.get(entity_id)
|
|
try:
|
|
input_entity = utils.get_input_peer(entity)
|
|
except TypeError:
|
|
try:
|
|
input_entity = self._client._entity_cache[entity_id]
|
|
except KeyError:
|
|
input_entity = None
|
|
|
|
return entity, input_entity
|
|
|
|
def _load_entities(self):
|
|
"""
|
|
Must load all the entities it needs from cache, and
|
|
return ``False`` if it could not find all of them.
|
|
"""
|
|
if not self._chat_peer:
|
|
return True # Nothing to load (e.g. MessageDeleted)
|
|
|
|
self._chat, self._input_chat = self._get_entity_pair(self.chat_id)
|
|
return self._input_chat is not None
|
|
|
|
@property
|
|
def client(self):
|
|
"""
|
|
The `telethon.TelegramClient` that created this event.
|
|
"""
|
|
return self._client
|
|
|
|
def __str__(self):
|
|
return TLObject.pretty_format(self.to_dict())
|
|
|
|
def stringify(self):
|
|
return TLObject.pretty_format(self.to_dict(), indent=0)
|
|
|
|
def to_dict(self):
|
|
d = {k: v for k, v in self.__dict__.items() if k[0] != '_'}
|
|
d['_'] = self._event_name
|
|
return d
|
|
|
|
|
|
def name_inner_event(cls):
|
|
"""Decorator to rename cls.Event 'Event' as 'cls.Event'"""
|
|
if hasattr(cls, 'Event'):
|
|
cls.Event._event_name = '{}.Event'.format(cls.__name__)
|
|
else:
|
|
warnings.warn('Class {} does not have a inner Event'.format(cls))
|
|
return cls
|