Telethon/telethon/events/newmessage.py

419 lines
15 KiB
Python

import abc
import datetime
import itertools
import re
import warnings
from .. import utils
from ..errors import RPCError
from ..extensions import markdown
from ..tl import TLObject, types, functions
from .common import EventBuilder, EventCommon, name_inner_event
@name_inner_event
class NewMessage(EventBuilder):
"""
Represents a new message event builder.
Args:
incoming (`bool`, optional):
If set to ``True``, only **incoming** messages will be handled.
Mutually exclusive with ``outgoing`` (can only set one of either).
outgoing (`bool`, optional):
If set to ``True``, only **outgoing** messages will be handled.
Mutually exclusive with ``incoming`` (can only set one of either).
pattern (`str`, `callable`, `Pattern`, optional):
If set, only messages matching this pattern will be handled.
You can specify a regex-like string which will be matched
against the message, a callable function that returns ``True``
if a message is acceptable, or a compiled regex pattern.
"""
def __init__(self, incoming=None, outgoing=None,
chats=None, blacklist_chats=False, pattern=None):
if incoming and outgoing:
raise ValueError('Can only set either incoming or outgoing')
super().__init__(chats=chats, blacklist_chats=blacklist_chats)
self.incoming = incoming
self.outgoing = outgoing
if isinstance(pattern, str):
self.pattern = re.compile(pattern).match
elif not pattern or callable(pattern):
self.pattern = pattern
elif hasattr(pattern, 'match') and callable(pattern.match):
self.pattern = pattern.match
else:
raise TypeError('Invalid pattern type given')
def build(self, update):
if isinstance(update,
(types.UpdateNewMessage, types.UpdateNewChannelMessage)):
if not isinstance(update.message, types.Message):
return # We don't care about MessageService's here
event = NewMessage.Event(update.message)
elif isinstance(update, types.UpdateShortMessage):
event = NewMessage.Event(types.Message(
out=update.out,
mentioned=update.mentioned,
media_unread=update.media_unread,
silent=update.silent,
id=update.id,
to_id=types.PeerUser(update.user_id),
from_id=self._self_id if update.out else update.user_id,
message=update.message,
date=update.date,
fwd_from=update.fwd_from,
via_bot_id=update.via_bot_id,
reply_to_msg_id=update.reply_to_msg_id,
entities=update.entities
))
elif isinstance(update, types.UpdateShortChatMessage):
event = NewMessage.Event(types.Message(
out=update.out,
mentioned=update.mentioned,
media_unread=update.media_unread,
silent=update.silent,
id=update.id,
from_id=update.from_id,
to_id=types.PeerChat(update.chat_id),
message=update.message,
date=update.date,
fwd_from=update.fwd_from,
via_bot_id=update.via_bot_id,
reply_to_msg_id=update.reply_to_msg_id,
entities=update.entities
))
else:
return
event._entities = update._entities
return self._message_filter_event(event)
def _message_filter_event(self, event):
# Short-circuit if we let pass all events
if all(x is None for x in (self.incoming, self.outgoing, self.chats,
self.pattern)):
return event
if self.incoming and event.message.out:
return
if self.outgoing and not event.message.out:
return
if self.pattern:
match = self.pattern(event.message.message or '')
if not match:
return
event.pattern_match = match
return self._filter_event(event)
class Event(EventCommon):
"""
Represents the event of a new message.
Members:
message (:tl:`Message`):
This is the original :tl:`Message` object.
is_private (`bool`):
True if the message was sent as a private message.
is_group (`bool`):
True if the message was sent on a group or megagroup.
is_channel (`bool`):
True if the message was sent on a megagroup or channel.
is_reply (`str`):
Whether the message is a reply to some other or not.
"""
def __init__(self, message):
if not message.out and isinstance(message.to_id, types.PeerUser):
# Incoming message (e.g. from a bot) has to_id=us, and
# from_id=bot (the actual "chat" from an user's perspective).
chat_peer = types.PeerUser(message.from_id)
else:
chat_peer = message.to_id
super().__init__(chat_peer=chat_peer,
msg_id=message.id, broadcast=bool(message.post))
self.message = message
self._text = None
self._input_sender = None
self._sender = None
self.is_reply = bool(message.reply_to_msg_id)
self._reply_message = None
def respond(self, *args, **kwargs):
"""
Responds to the message (not as a reply). This is a shorthand for
``client.send_message(event.chat, ...)``.
"""
return self._client.send_message(self.input_chat, *args, **kwargs)
def reply(self, *args, **kwargs):
"""
Replies to the message (as a reply). This is a shorthand for
``client.send_message(event.chat, ..., reply_to=event.message.id)``.
"""
kwargs['reply_to'] = self.message.id
return self._client.send_message(self.input_chat, *args, **kwargs)
def forward_to(self, *args, **kwargs):
"""
Forwards the message. This is a shorthand for
``client.forward_messages(entity, event.message, event.chat)``.
"""
kwargs['messages'] = self.message.id
kwargs['from_peer'] = self.input_chat
return self._client.forward_messages(*args, **kwargs)
def edit(self, *args, **kwargs):
"""
Edits the message iff it's outgoing. This is a shorthand for
``client.edit_message(event.chat, event.message, ...)``.
Returns ``None`` if the message was incoming,
or the edited message otherwise.
"""
if self.message.fwd_from:
return None
if not self.message.out:
if not isinstance(self.message.to_id, types.PeerUser):
return None
me = self._client.get_me(input_peer=True)
if self.message.to_id.user_id != me.user_id:
return None
return self._client.edit_message(self.input_chat,
self.message,
*args, **kwargs)
def delete(self, *args, **kwargs):
"""
Deletes the message. You're responsible for checking whether you
have the permission to do so, or to except the error otherwise.
This is a shorthand for
``client.delete_messages(event.chat, event.message, ...)``.
"""
return self._client.delete_messages(self.input_chat,
[self.message],
*args, **kwargs)
@property
def input_sender(self):
"""
This (:tl:`InputPeer`) is the input version of the user who
sent the message. Similarly to ``input_chat``, this doesn't have
things like username or similar, but still useful in some cases.
Note that this might not be available if the library can't
find the input chat, or if the message a broadcast on a channel.
"""
if self._input_sender is None:
if self.is_channel and not self.is_group:
return None
try:
self._input_sender = self._client.get_input_entity(
self.message.from_id
)
except (ValueError, TypeError):
# We can rely on self.input_chat for this
self._sender, self._input_sender = self._get_entity(
self.message.id,
self.message.from_id,
chat=self.input_chat
)
return self._input_sender
@property
def sender(self):
"""
This (:tl:`User`) may make an API call the first time to get
the most up to date version of the sender (mostly when the event
doesn't belong to a channel), so keep that in mind.
``input_sender`` needs to be available (often the case).
"""
if not self.input_sender:
return None
if self._sender is None:
self._sender = \
self._entities.get(utils.get_peer_id(self._input_sender))
if self._sender is None:
self._sender = self._client.get_entity(self._input_sender)
return self._sender
@property
def sender_id(self):
"""
Returns the marked sender integer ID, if present.
"""
return self.message.from_id
@property
def text(self):
"""
The message text, markdown-formatted.
"""
if self._text is None:
if not self.message.entities:
return self.message.message
self._text = markdown.unparse(self.message.message,
self.message.entities or [])
return self._text
@property
def raw_text(self):
"""
The raw message text, ignoring any formatting.
"""
return self.message.message
@property
def reply_message(self):
"""
This optional :tl:`Message` will make an API call the first
time to get the full :tl:`Message` object that one was replying to,
so use with care as there is no caching besides local caching yet.
"""
if not self.message.reply_to_msg_id:
return None
if self._reply_message is None:
if isinstance(self.input_chat, types.InputPeerChannel):
r = self._client(functions.channels.GetMessagesRequest(
self.input_chat, [
types.InputMessageID(self.message.reply_to_msg_id)
]
))
else:
r = self._client(functions.messages.GetMessagesRequest(
[types.InputMessageID(self.message.reply_to_msg_id)]
))
if not isinstance(r, types.messages.MessagesNotModified):
self._reply_message = r.messages[0]
return self._reply_message
@property
def forward(self):
"""
The unmodified :tl:`MessageFwdHeader`, if present..
"""
return self.message.fwd_from
@property
def media(self):
"""
The unmodified :tl:`MessageMedia`, if present.
"""
return self.message.media
@property
def photo(self):
"""
If the message media is a photo,
this returns the :tl:`Photo` object.
"""
if isinstance(self.message.media, types.MessageMediaPhoto):
photo = self.message.media.photo
if isinstance(photo, types.Photo):
return photo
@property
def document(self):
"""
If the message media is a document,
this returns the :tl:`Document` object.
"""
if isinstance(self.message.media, types.MessageMediaDocument):
doc = self.message.media.document
if isinstance(doc, types.Document):
return doc
def _document_by_attribute(self, kind, condition=None):
"""
Helper method to return the document only if it has an attribute
that's an instance of the given kind, and passes the condition.
"""
doc = self.document
if doc:
for attr in doc.attributes:
if isinstance(attr, kind):
if not condition or condition(doc):
return doc
@property
def audio(self):
"""
If the message media is a document with an Audio attribute,
this returns the :tl:`Document` object.
"""
return self._document_by_attribute(types.DocumentAttributeAudio,
lambda attr: not attr.voice)
@property
def voice(self):
"""
If the message media is a document with a Voice attribute,
this returns the :tl:`Document` object.
"""
return self._document_by_attribute(types.DocumentAttributeAudio,
lambda attr: attr.voice)
@property
def video(self):
"""
If the message media is a document with a Video attribute,
this returns the :tl:`Document` object.
"""
return self._document_by_attribute(types.DocumentAttributeVideo)
@property
def video_note(self):
"""
If the message media is a document with a Video attribute,
this returns the :tl:`Document` object.
"""
return self._document_by_attribute(types.DocumentAttributeVideo,
lambda attr: attr.round_message)
@property
def gif(self):
"""
If the message media is a document with an Animated attribute,
this returns the :tl:`Document` object.
"""
return self._document_by_attribute(types.DocumentAttributeAnimated)
@property
def sticker(self):
"""
If the message media is a document with a Sticker attribute,
this returns the :tl:`Document` object.
"""
return self._document_by_attribute(types.DocumentAttributeSticker)
@property
def out(self):
"""
Whether the message is outgoing (i.e. you sent it from
another session) or incoming (i.e. someone else sent it).
"""
return self.message.out