import abc import itertools from .. import utils from ..errors import RPCError from ..extensions import markdown from ..tl import types, functions class _EventBuilder(abc.ABC): @abc.abstractmethod def build(self, update): """Builds an event for the given update if possible, or returns None""" @abc.abstractmethod def resolve(self, client): """Helper method to allow event builders to be resolved before usage""" class _EventCommon(abc.ABC): """Intermediate class with common things to all events""" def __init__(self, chat_peer=None, msg_id=None, broadcast=False): self._client = None self._chat_peer = chat_peer self._message_id = msg_id self._input_chat = None self._chat = None self.is_private = isinstance(chat_peer, types.PeerUser) self.is_group = ( isinstance(chat_peer, (types.PeerChat, types.PeerChannel)) and not broadcast ) self.is_channel = isinstance(chat_peer, types.PeerChannel) def _get_input_entity(self, msg_id, entity_id, chat=None): """ Helper function to call GetMessages on the give msg_id and return the input entity whose ID is the given entity ID. If ``chat`` is present it must be an InputPeer. """ try: if isinstance(chat, types.InputPeerChannel): result = self._client( functions.channels.GetMessagesRequest(chat, [msg_id]) ) else: result = self._client( functions.messages.GetMessagesRequest([msg_id]) ) except RPCError: return entity = { utils.get_peer_id(x): x for x in itertools.chain( getattr(result, 'chats', []), getattr(result, 'users', [])) }.get(entity_id) if entity: return utils.get_input_peer(entity) @property def input_chat(self): """ The (:obj:`InputPeer`) (group, megagroup or channel) on which the event occurred. This doesn't have the title or anything, but is useful if you don't need those to avoid further requests. Note that this might be ``None`` if the library can't find it. """ if self._input_chat is None and self._chat_peer is not None: try: self._input_chat = self._client.get_input_entity( self._chat_peer ) except (ValueError, TypeError): # The library hasn't seen this chat, get the message if not isinstance(self._chat_peer, types.PeerChannel): # TODO For channels, getDifference? Maybe looking # in the dialogs (which is already done) is enough. if self._message_id is not None: self._input_chat = self._get_input_entity( self._message_id, utils.get_peer_id(self._chat_peer) ) return self._input_chat @property def chat(self): """ The (:obj:`User` | :obj:`Chat` | :obj:`Channel`, optional) on which the event occurred. This property will make an API call the first time to get the most up to date version of the chat, so use with care as there is no caching besides local caching yet. """ if self._chat is None and self.input_chat: self._chat = self._client.get_entity(self._input_chat) return self._chat # Classes defined here are actually Event builders # for their inner Event classes. Inner ._client is # set later by the creator TelegramClient. class NewMessage(_EventBuilder): """ Represents a new message event builder. Args: incoming (:obj:`bool`, optional): If set to ``True``, only **incoming** messages will be handled. Mutually exclusive with ``outgoing`` (can only set one of either). outgoing (:obj:`bool`, optional): If set to ``True``, only **outgoing** messages will be handled. Mutually exclusive with ``incoming`` (can only set one of either). chats (:obj:`entity`, optional): May be one or more entities (username/peer/etc.). By default, only matching chats will be handled. blacklist_chats (:obj:`bool`, optional): Whether to treat the the list of chats as a blacklist (if it matches it will NOT be handled) or a whitelist (default). """ def __init__(self, incoming=None, outgoing=None, chats=None, blacklist_chats=False): if incoming and outgoing: raise ValueError('Can only set either incoming or outgoing') self.incoming = incoming self.outgoing = outgoing self.chats = chats self.blacklist_chats = blacklist_chats def resolve(self, client): if hasattr(self.chats, '__iter__') and not isinstance(self.chats, str): self.chats = set(utils.get_peer_id(x) for x in client.get_input_entity(self.chats)) elif self.chats is not None: self.chats = {utils.get_peer_id( client.get_input_entity(self.chats))} def build(self, update): if isinstance(update, (types.UpdateNewMessage, types.UpdateNewChannelMessage)): if not isinstance(update.message, types.Message): return # We don't care about MessageService's here event = NewMessage.Event(update.message) elif isinstance(update, types.UpdateShortMessage): event = NewMessage.Event(types.Message( out=update.out, mentioned=update.mentioned, media_unread=update.media_unread, silent=update.silent, id=update.id, to_id=types.PeerUser(update.user_id), message=update.message, date=update.date, fwd_from=update.fwd_from, via_bot_id=update.via_bot_id, reply_to_msg_id=update.reply_to_msg_id, entities=update.entities )) else: return # Short-circuit if we let pass all events if all(x is None for x in (self.incoming, self.outgoing, self.chats)): return event if self.incoming and event.message.out: return if self.outgoing and not event.message.out: return if self.chats is not None: inside = utils.get_peer_id(event.message.to_id) in self.chats if inside == self.blacklist_chats: # If this chat matches but it's a blacklist ignore. # If it doesn't match but it's a whitelist ignore. return # Tests passed so return the event return event class Event(_EventCommon): """ Represents the event of a new message. Members: message (:obj:`Message`): This is the original ``Message`` object. input_chat (:obj:`InputPeer`): This is the input chat (private, group, megagroup or channel) to which the message was sent. This doesn't have the title or anything, but is useful if you don't need those to avoid further requests. Note that this might not be available if the library can't find the input chat. chat (:obj:`User` | :obj:`Chat` | :obj:`Channel`, optional): This property will make an API call the first time to get the most up to date version of the chat, so use with care as there is no caching besides local caching yet. ``input_chat`` needs to be available (often the case). is_private (:obj:`bool`): True if the message was sent as a private message. is_group (:obj:`bool`): True if the message was sent on a group or megagroup. is_channel (:obj:`bool`): True if the message was sent on a megagroup or channel. input_sender (:obj:`InputPeer`): This is the input version of the user who sent the message. Similarly to ``input_chat``, this doesn't have things like username or similar, but still useful in some cases. Note that this might not be available if the library can't find the input chat. sender (:obj:`User`): This property will make an API call the first time to get the most up to date version of the sender, so use with care as there is no caching besides local caching yet. ``input_sender`` needs to be available (often the case). text (:obj:`str`): The message text, markdown-formatted. raw_text (:obj:`str`): The raw message text, ignoring any formatting. is_reply (:obj:`str`): Whether the message is a reply to some other or not. reply_message (:obj:`Message`, optional): This property will make an API call the first time to get the full ``Message`` object that one was replying to, so use with care as there is no caching besides local caching yet. forward (:obj:`MessageFwdHeader`, optional): The unmodified ``MessageFwdHeader``, if present. out (:obj:`bool`): Whether the message is outgoing (i.e. you sent it from another session) or incoming (i.e. someone else sent it). """ def __init__(self, message): super().__init__(chat_peer=message.to_id, msg_id=message.id, broadcast=bool(message.post)) self.message = message self._text = None self._input_chat = None self._chat = None self._input_sender = None self._sender = None self.is_reply = bool(message.reply_to_msg_id) self._reply_message = None def respond(self, *args, **kwargs): """ Responds to the message (not as a reply). This is a shorthand for ``client.send_message(event.chat, ...)``. """ return self._client.send_message(self.input_chat, *args, **kwargs) def reply(self, *args, **kwargs): """ Replies to the message (as a reply). This is a shorthand for ``client.send_message(event.chat, ..., reply_to=event.message.id)``. """ return self._client.send_message(self.input_chat, reply_to=self.message.id, *args, **kwargs) @property def input_sender(self): if self._input_sender is None: try: self._input_sender = self._client.get_input_entity( self.message.from_id ) except (ValueError, TypeError): if isinstance(self.message.to_id, types.PeerChannel): # We can rely on self.input_chat for this self._input_sender = self._get_input_entity( self.message.id, self.message.from_id, chat=self.input_chat ) return self._client.get_input_entity(self.message.from_id) @property def sender(self): if self._sender is None and self.input_sender: self._sender = self._client.get_entity(self._input_sender) return self._sender @property def text(self): if self._text is None: if not self.message.entities: return self.message.message self._text = markdown.unparse(self.message.message, self.message.entities or []) return self._text @property def raw_text(self): return self.message.message @property def reply_message(self): if not self.message.reply_to_msg_id: return None if self._reply_message is None: if isinstance(self.input_chat, types.InputPeerChannel): r = self._client(functions.channels.GetMessagesRequest( self.input_chat, [self.message.reply_to_msg_id] )) else: r = self._client(functions.messages.GetMessagesRequest( [self.message.reply_to_msg_id] )) if not isinstance(r, types.messages.MessagesNotModified): self._reply_message = r.messages[0] return self._reply_message @property def forward(self): return self.message.fwd_from @property def media(self): return self.message.media @property def photo(self): if isinstance(self.message.media, types.MessageMediaPhoto): photo = self.message.media.photo if isinstance(photo, types.Photo): return photo @property def document(self): if isinstance(self.message.media, types.MessageMediaDocument): doc = self.message.media.document if isinstance(doc, types.Document): return doc @property def out(self): return self.message.out class ChatAction(_EventBuilder): """ Represents an action in a chat (such as user joined, left, or new pin). Args: chats (:obj:`entity`, optional): May be one or more entities (username/peer/etc.). By default, only matching chats will be handled. blacklist_chats (:obj:`bool`, optional): Whether to treat the the list of chats as a blacklist (if it matches it will NOT be handled) or a whitelist (default). """ def __init__(self, chats=None, blacklist_chats=False): # TODO This can probably be reused in all builders self.chats = chats self.blacklist_chats = blacklist_chats def resolve(self, client): if hasattr(self.chats, '__iter__') and not isinstance(self.chats, str): self.chats = set(utils.get_peer_id(x) for x in client.get_input_entity(self.chats)) elif self.chats is not None: self.chats = {utils.get_peer_id( client.get_input_entity(self.chats))} def build(self, update): if isinstance(update, types.UpdateChannelPinnedMessage): # Telegram sends UpdateChannelPinnedMessage and then # UpdateNewChannelMessage with MessageActionPinMessage. event = ChatAction.Event(types.PeerChannel(update.channel_id), new_pin=update.id) elif isinstance(update, types.UpdateChatParticipantAdd): event = ChatAction.Event(types.PeerChat(update.chat_id), added_by=update.inviter_id or True, users=update.user_id) elif isinstance(update, types.UpdateChatParticipantDelete): event = ChatAction.Event(types.PeerChat(update.chat_id), kicked_by=True, users=update.user_id) elif (isinstance(update, ( types.UpdateNewMessage, types.UpdateNewChannelMessage)) and isinstance(update.message, types.MessageService)): msg = update.message action = update.message.action if isinstance(action, types.MessageActionChatJoinedByLink): event = ChatAction.Event(msg.to_id, added_by=True, users=msg.from_id) elif isinstance(action, types.MessageActionChatAddUser): event = ChatAction.Event(msg.to_id, added_by=msg.from_id or True, users=action.users) elif isinstance(action, types.MessageActionChatDeleteUser): event = ChatAction.Event(msg.to_id, kicked_by=msg.from_id or True, users=action.user_id) elif isinstance(action, types.MessageActionChatCreate): event = ChatAction.Event(msg.to_id, users=action.users, created=True, new_title=action.title) elif isinstance(action, types.MessageActionChannelCreate): event = ChatAction.Event(msg.to_id, created=True, new_title=action.title) elif isinstance(action, types.MessageActionChatEditTitle): event = ChatAction.Event(msg.to_id, new_title=action.title) elif isinstance(action, types.MessageActionChatEditPhoto): event = ChatAction.Event(msg.to_id, new_photo=action.photo) elif isinstance(action, types.MessageActionChatDeletePhoto): event = ChatAction.Event(msg.to_id, new_photo=True) else: return else: return if self.chats is None: return event else: inside = utils.get_peer_id(event._chat_peer) in self.chats if inside == self.blacklist_chats: # If this chat matches but it's a blacklist ignore. # If it doesn't match but it's a whitelist ignore. return return event class Event(_EventCommon): """ Represents the event of a new chat action. Members: new_pin (:obj:`bool`): ``True`` if the pin has changed (new pin or removed). new_photo (:obj:`bool`): ``True`` if there's a new chat photo (or it was removed). photo (:obj:`Photo`, optional): The new photo (or ``None`` if it was removed). user_added (:obj:`bool`): ``True`` if the user was added by some other. user_joined (:obj:`bool`): ``True`` if the user joined on their own. user_left (:obj:`bool`): ``True`` if the user left on their own. user_kicked (:obj:`bool`): ``True`` if the user was kicked by some other. created (:obj:`bool`, optional): ``True`` if this chat was just created. new_title (:obj:`bool`, optional): The new title string for the chat, if applicable. """ def __init__(self, chat_peer, new_pin=None, new_photo=None, added_by=None, kicked_by=None, created=None, users=None, new_title=None): super().__init__(chat_peer=chat_peer, msg_id=new_pin) self.new_pin = isinstance(new_pin, int) self._pinned_message = new_pin self.new_photo = new_photo is not None self.photo = \ new_photo if isinstance(new_photo, types.Photo) else None self._added_by = None self._kicked_by = None self.user_added, self.user_joined, self.user_left,\ self.user_kicked = (False, False, False, False) if added_by is True: self.user_joined = True elif added_by: self.user_added = True self._added_by = added_by if kicked_by is True: self.user_left = True elif kicked_by: self.user_kicked = True self._kicked_by = kicked_by self.created = bool(created) self._user_peers = users if isinstance(users, list) else [users] self._users = None self.new_title = new_title @property def pinned_message(self): """ If ``new_pin`` is ``True``, this returns the (:obj:`Message`) object that was pinned. """ if self._pinned_message == 0: return None if isinstance(self._pinned_message, int) and self.input_chat: r = self._client(functions.channels.GetMessagesRequest( self._input_chat, [self._pinned_message] )) try: self._pinned_message = next( x for x in r.messages if isinstance(x, types.Message) and x.id == self._pinned_message ) except StopIteration: pass if isinstance(self._pinned_message, types.Message): return self._pinned_message @property def added_by(self): """ The user who added ``users``, if applicable (``None`` otherwise). """ if self._added_by and not isinstance(self._added_by, types.User): self._added_by = self._client.get_entity(self._added_by) return self._added_by @property def kicked_by(self): """ The user who kicked ``users``, if applicable (``None`` otherwise). """ if self._kicked_by and not isinstance(self._kicked_by, types.User): self._kicked_by = self._client.get_entity(self._kicked_by) return self._kicked_by @property def user(self): """ The single user that takes part in this action (e.g. joined). Might be ``None`` if the information can't be retrieved or there is no user taking part. """ try: return next(self.users) except (StopIteration, TypeError): return None @property def users(self): """ A list of users that take part in this action (e.g. joined). Might be empty if the information can't be retrieved or there are no users taking part. """ if self._users is None and self._user_peers: try: self._users = self._client.get_entity(self._user_peers) except (TypeError, ValueError): self._users = [] return self._users