Continue work on Message sending overhaul

This commit is contained in:
Lonami Exo 2021-10-12 17:59:30 +02:00
parent 3853f98e5f
commit 72fc8f6808
7 changed files with 435 additions and 470 deletions

View File

@ -650,3 +650,18 @@ CdnDecrypter has been removed
-----------------------------
It was not really working and was more intended to be an implementation detail than anything else.
---
you can no longer pass an attributes list because the constructor is now nice.
use raw api if you really need it.
goal is to hide raw api from high level api. sorry.
no parsemode. use the correct parameter. it's more convenient than setting two.
formatting_entities stays because otherwise it's the only feasible way to manually specify it.
todo update send_message and send_file docs (well review all functions)
album overhaul. use a list of Message instead.

View File

@ -1,10 +1,12 @@
import inspect
import itertools
import time
import typing
import warnings
from .._misc import helpers, utils, requestiter, hints
from ..types import _custom
from ..types._custom.inputmessage import InputMessage
from .. import errors, _tl
_MAX_CHUNK_SIZE = 100
@ -395,82 +397,95 @@ async def send_message(
entity: 'hints.EntityLike',
message: 'hints.MessageLike' = '',
*,
reply_to: 'typing.Union[int, _tl.Message]' = None,
attributes: 'typing.Sequence[_tl.TypeDocumentAttribute]' = None,
parse_mode: typing.Optional[str] = (),
formatting_entities: typing.Optional[typing.List[_tl.TypeMessageEntity]] = None,
link_preview: bool = True,
file: 'typing.Union[hints.FileLike, typing.Sequence[hints.FileLike]]' = None,
thumb: 'hints.FileLike' = None,
force_document: bool = False,
clear_draft: bool = False,
buttons: 'hints.MarkupLike' = None,
silent: bool = None,
background: bool = None,
# - Message contents
# Formatting
markdown: str = None,
html: str = None,
formatting_entities: list = None,
link_preview: bool = (),
# Media
file: typing.Optional[hints.FileLike] = None,
file_name: str = None,
mime_type: str = None,
thumb: str = False,
force_file: bool = False,
file_size: int = None,
# Media attributes
duration: int = None,
width: int = None,
height: int = None,
title: str = None,
performer: str = None,
supports_streaming: bool = False,
video_note: bool = False,
voice_note: bool = False,
waveform: bytes = None,
# Additional parametrization
silent: bool = False,
buttons: list = None,
ttl: int = None,
# - Send options
reply_to: 'typing.Union[int, _tl.Message]' = None,
clear_draft: bool = False,
background: bool = None,
schedule: 'hints.DateLike' = None,
comment_to: 'typing.Union[int, _tl.Message]' = None
comment_to: 'typing.Union[int, _tl.Message]' = None,
) -> '_tl.Message':
if file is not None:
return await self.send_file(
entity, file, caption=message, reply_to=reply_to,
attributes=attributes, parse_mode=parse_mode,
force_document=force_document, thumb=thumb,
buttons=buttons, clear_draft=clear_draft, silent=silent,
schedule=schedule, supports_streaming=supports_streaming,
if isinstance(message, str):
message = InputMessage(
text=message,
markdown=markdown,
html=html,
formatting_entities=formatting_entities,
comment_to=comment_to, background=background
link_preview=link_preview,
file=file,
file_name=file_name,
mime_type=mime_type,
thumb=thumb,
force_file=force_file,
file_size=file_size,
duration=duration,
width=width,
height=height,
title=title,
performer=performer,
supports_streaming=supports_streaming,
video_note=video_note,
voice_note=voice_note,
waveform=waveform,
silent=silent,
buttons=buttons,
ttl=ttl,
)
elif isinstance(message, _custom.Message):
message = message._as_input()
elif not isinstance(message, InputMessage):
raise TypeError(f'message must be either str, Message or InputMessage, but got: {message!r}')
entity = await self.get_input_entity(entity)
if comment_to is not None:
entity, reply_to = await _get_comment_data(self, entity, comment_to)
elif reply_to:
reply_to = utils.get_message_id(reply_to)
if isinstance(message, _tl.Message):
if buttons is None:
markup = message.reply_markup
else:
markup = _custom.button.build_reply_markup(buttons)
if message._file:
# TODO Properly implement allow_cache to reuse the sha256 of the file
# i.e. `None` was used
if silent is None:
silent = message.silent
# TODO album
if message._file._should_upload_thumb():
message._file._set_uploaded_thumb(await self.upload_file(message._file._thumb))
if (message.media and not isinstance(
message.media, _tl.MessageMediaWebPage)):
return await self.send_file(
entity,
message.media,
caption=message.message,
silent=silent,
background=background,
reply_to=reply_to,
buttons=markup,
formatting_entities=message.entities,
schedule=schedule
)
if message._file._should_upload_file():
message._file._set_uploaded_file(await self.upload_file(message._file._file))
request = _tl.fn.messages.SendMessage(
peer=entity,
message=message.message or '',
silent=silent,
background=background,
reply_to_msg_id=utils.get_message_id(reply_to),
reply_markup=markup,
entities=message.entities,
clear_draft=clear_draft,
no_webpage=not isinstance(
message.media, _tl.MessageMediaWebPage),
schedule_date=schedule
request = _tl.fn.messages.SendMedia(
entity, message._file._media, reply_to_msg_id=reply_to, message=message._text,
entities=message._fmt_entities, reply_markup=message._reply_markup, silent=message._silent,
schedule_date=schedule, clear_draft=clear_draft,
background=background
)
message = message.message
else:
if formatting_entities is None:
message, formatting_entities = await self._parse_message_text(message, parse_mode)
if not message:
raise ValueError(
'The message cannot be empty unless a file is provided'
)
request = _tl.fn.messages.SendMessage(
peer=entity,
message=message,
@ -489,7 +504,7 @@ async def send_message(
return _custom.Message._new(self, _tl.Message(
id=result.id,
peer_id=await _get_peer(self, entity),
message=message,
message=message._text,
date=result.date,
out=result.out,
media=result.media,

View File

@ -2075,31 +2075,46 @@ class TelegramClient:
entity: 'hints.EntityLike',
message: 'hints.MessageLike' = '',
*,
reply_to: 'typing.Union[int, _tl.Message]' = None,
attributes: 'typing.Sequence[_tl.TypeDocumentAttribute]' = None,
parse_mode: typing.Optional[str] = (),
formatting_entities: typing.Optional[typing.List[_tl.TypeMessageEntity]] = None,
link_preview: bool = True,
file: 'typing.Union[hints.FileLike, typing.Sequence[hints.FileLike]]' = None,
thumb: 'hints.FileLike' = None,
force_document: bool = False,
clear_draft: bool = False,
buttons: 'hints.MarkupLike' = None,
silent: bool = None,
background: bool = None,
# - Message contents
# Formatting
markdown: str = None,
html: str = None,
formatting_entities: list = None,
link_preview: bool = (),
# Media
file: 'typing.Optional[hints.FileLike]' = None,
file_name: str = None,
mime_type: str = None,
thumb: str = False,
force_file: bool = False,
file_size: int = None,
# Media attributes
duration: int = None,
width: int = None,
height: int = None,
title: str = None,
performer: str = None,
supports_streaming: bool = False,
video_note: bool = False,
voice_note: bool = False,
waveform: bytes = None,
# Additional parametrization
silent: bool = False,
buttons: list = None,
ttl: int = None,
# - Send options
reply_to: 'typing.Union[int, _tl.Message]' = None,
clear_draft: bool = False,
background: bool = None,
schedule: 'hints.DateLike' = None,
comment_to: 'typing.Union[int, _tl.Message]' = None
comment_to: 'typing.Union[int, _tl.Message]' = None,
) -> '_tl.Message':
"""
Sends a message to the specified user, chat or channel.
Sends a Message to the specified user, chat or channel.
The default parse mode is the same as the official applications
(a _custom flavour of markdown). ``**bold**, `code` or __italic__``
are available. In addition you can send ``[links](https://example.com)``
and ``[mentions](@username)`` (or using IDs like in the Bot API:
``[mention](tg://user?id=123456789)``) and ``pre`` blocks with three
backticks.
The message can be either a string or a previous Message instance.
If it's a previous Message instance, the rest of parameters will be ignored.
If it's not, a Message instance will be constructed, and send_to used.
Sending a ``/start`` command with a parameter (like ``?start=data``)
is also done through this method. Simply send ``'/start data'`` to
@ -3517,15 +3532,6 @@ class TelegramClient:
async def _parse_message_text(self: 'TelegramClient', message, parse_mode):
pass
@forward_call(uploads._file_to_media)
async def _file_to_media(
self, file, force_document=False, file_size=None,
progress_callback=None, attributes=None, thumb=None,
allow_cache=True, voice_note=False, video_note=False,
supports_streaming=False, mime_type=None, as_image=None,
ttl=None):
pass
@forward_call(messageparse._get_response_message)
def _get_response_message(self: 'TelegramClient', request, result, input_chat):
pass

View File

@ -92,105 +92,74 @@ def _resize_photo_if_needed(
async def send_file(
self: 'TelegramClient',
entity: 'hints.EntityLike',
file: 'typing.Union[hints.FileLike, typing.Sequence[hints.FileLike]]',
file: typing.Optional[hints.FileLike] = None,
*,
caption: typing.Union[str, typing.Sequence[str]] = None,
force_document: bool = False,
# - Message contents
# Formatting
caption: 'hints.MessageLike' = '',
markdown: str = None,
html: str = None,
formatting_entities: list = None,
link_preview: bool = (),
# Media
file_name: str = None,
mime_type: str = None,
thumb: str = False,
force_file: bool = False,
file_size: int = None,
clear_draft: bool = False,
progress_callback: 'hints.ProgressCallback' = None,
reply_to: 'hints.MessageIDLike' = None,
attributes: 'typing.Sequence[_tl.TypeDocumentAttribute]' = None,
thumb: 'hints.FileLike' = None,
allow_cache: bool = True,
parse_mode: str = (),
formatting_entities: typing.Optional[typing.List[_tl.TypeMessageEntity]] = None,
voice_note: bool = False,
video_note: bool = False,
buttons: 'hints.MarkupLike' = None,
silent: bool = None,
background: bool = None,
# Media attributes
duration: int = None,
width: int = None,
height: int = None,
title: str = None,
performer: str = None,
supports_streaming: bool = False,
video_note: bool = False,
voice_note: bool = False,
waveform: bytes = None,
# Additional parametrization
silent: bool = False,
buttons: list = None,
ttl: int = None,
# - Send options
reply_to: 'typing.Union[int, _tl.Message]' = None,
clear_draft: bool = False,
background: bool = None,
schedule: 'hints.DateLike' = None,
comment_to: 'typing.Union[int, _tl.Message]' = None,
ttl: int = None,
**kwargs) -> '_tl.Message':
# TODO Properly implement allow_cache to reuse the sha256 of the file
# i.e. `None` was used
if not file:
raise TypeError('Cannot use {!r} as file'.format(file))
if not caption:
caption = ''
entity = await self.get_input_entity(entity)
if comment_to is not None:
entity, reply_to = await _get_comment_data(self, entity, comment_to)
else:
reply_to = utils.get_message_id(reply_to)
# First check if the user passed an iterable, in which case
# we may want to send grouped.
if utils.is_list_like(file):
if utils.is_list_like(caption):
captions = caption
else:
captions = [caption]
result = []
while file:
result += await _send_album(
self, entity, file[:10], caption=captions[:10],
progress_callback=progress_callback, reply_to=reply_to,
parse_mode=parse_mode, silent=silent, schedule=schedule,
supports_streaming=supports_streaming, clear_draft=clear_draft,
force_document=force_document, background=background,
)
file = file[10:]
captions = captions[10:]
for doc, cap in zip(file, captions):
result.append(await self.send_file(
entity, doc, allow_cache=allow_cache,
caption=cap, force_document=force_document,
progress_callback=progress_callback, reply_to=reply_to,
attributes=attributes, thumb=thumb, voice_note=voice_note,
video_note=video_note, buttons=buttons, silent=silent,
supports_streaming=supports_streaming, schedule=schedule,
clear_draft=clear_draft, background=background,
**kwargs
))
return result
if formatting_entities is not None:
msg_entities = formatting_entities
else:
caption, msg_entities =\
await self._parse_message_text(caption, parse_mode)
file_handle, media, image = await _file_to_media(
self, file, force_document=force_document,
) -> '_tl.Message':
self.send_message(
entity=entity,
message=caption,
markdown=markdown,
html=html,
formatting_entities=formatting_entities,
link_preview=link_preview,
file=file,
file_name=file_name,
mime_type=mime_type,
thumb=thumb,
force_file=force_file,
file_size=file_size,
progress_callback=progress_callback,
attributes=attributes, allow_cache=allow_cache, thumb=thumb,
voice_note=voice_note, video_note=video_note,
supports_streaming=supports_streaming, ttl=ttl
duration=duration,
width=width,
height=height,
title=title,
performer=performer,
supports_streaming=supports_streaming,
video_note=video_note,
voice_note=voice_note,
waveform=waveform,
silent=silent,
buttons=buttons,
ttl=ttl,
reply_to=reply_to,
clear_draft=clear_draft,
background=background,
schedule=schedule,
comment_to=comment_to,
)
# e.g. invalid cast from :tl:`MessageMediaWebPage`
if not media:
raise TypeError('Cannot use {!r} as file'.format(file))
markup = _custom.button.build_reply_markup(buttons)
request = _tl.fn.messages.SendMedia(
entity, media, reply_to_msg_id=reply_to, message=caption,
entities=msg_entities, reply_markup=markup, silent=silent,
schedule_date=schedule, clear_draft=clear_draft,
background=background
)
return self._get_response_message(request, await self(request), entity)
async def _send_album(self: 'TelegramClient', entity, files, caption='',
progress_callback=None, reply_to=None,
parse_mode=(), silent=None, schedule=None,
@ -368,98 +337,3 @@ async def upload_file(
)
async def _file_to_media(
self, file, force_document=False, file_size=None,
progress_callback=None, attributes=None, thumb=None,
allow_cache=True, voice_note=False, video_note=False,
supports_streaming=False, mime_type=None, as_image=None,
ttl=None):
if not file:
return None, None, None
if isinstance(file, pathlib.Path):
file = str(file.absolute())
is_image = utils.is_image(file)
if as_image is None:
as_image = is_image and not force_document
# `aiofiles` do not base `io.IOBase` but do have `read`, so we
# just check for the read attribute to see if it's file-like.
if not isinstance(file, (str, bytes, _tl.InputFile, _tl.InputFileBig))\
and not hasattr(file, 'read'):
# The user may pass a Message containing media (or the media,
# or anything similar) that should be treated as a file. Try
# getting the input media for whatever they passed and send it.
#
# We pass all attributes since these will be used if the user
# passed :tl:`InputFile`, and all information may be relevant.
try:
return (None, utils.get_input_media(
file,
is_photo=as_image,
attributes=attributes,
force_document=force_document,
voice_note=voice_note,
video_note=video_note,
supports_streaming=supports_streaming,
ttl=ttl
), as_image)
except TypeError:
# Can't turn whatever was given into media
return None, None, as_image
media = None
file_handle = None
if isinstance(file, (_tl.InputFile, _tl.InputFileBig)):
file_handle = file
elif not isinstance(file, str) or os.path.isfile(file):
file_handle = await self.upload_file(
_resize_photo_if_needed(file, as_image),
file_size=file_size,
progress_callback=progress_callback
)
elif re.match('https?://', file):
if as_image:
media = _tl.InputMediaPhotoExternal(file, ttl_seconds=ttl)
else:
media = _tl.InputMediaDocumentExternal(file, ttl_seconds=ttl)
if media:
pass # Already have media, don't check the rest
elif not file_handle:
raise ValueError(
'Failed to convert {} to media. Not an existing file or '
'HTTP URL'.format(file)
)
elif as_image:
media = _tl.InputMediaUploadedPhoto(file_handle, ttl_seconds=ttl)
else:
attributes, mime_type = utils.get_attributes(
file,
mime_type=mime_type,
attributes=attributes,
force_document=force_document and not is_image,
voice_note=voice_note,
video_note=video_note,
supports_streaming=supports_streaming,
thumb=thumb
)
if not thumb:
thumb = None
else:
if isinstance(thumb, pathlib.Path):
thumb = str(thumb.absolute())
thumb = await self.upload_file(thumb, file_size=file_size)
media = _tl.InputMediaUploadedDocument(
file=file_handle,
mime_type=mime_type,
attributes=attributes,
thumb=thumb,
force_file=force_document and not is_image,
ttl_seconds=ttl
)
return file_handle, media, as_image

View File

@ -1,11 +1,36 @@
import mimetypes
import os
import pathlib
import re
import time
from pathlib import Path
from ... import _tl
from ..._misc import utils
class InputFile:
# Expected Time-To-Live for _uploaded_*.
# After this period they should be reuploaded.
# Telegram's limit are unknown, so this value is conservative.
UPLOAD_TTL = 8 * 60 * 60
__slots__ = (
# main media
'_file', # can reupload
'_media', # can only use as-is
'_uploaded_file', # (input file, timestamp)
# thumbnail
'_thumb', # can reupload
'_uploaded_thumb', # (input file, timestamp)
# document parameters
'_mime_type',
'_attributes',
'_video_note',
'_force_file',
'_ttl',
)
def __init__(
self,
file = None,
@ -26,145 +51,125 @@ class InputFile:
waveform: bytes = None,
ttl: int = None,
):
if isinstance(file, pathlib.Path):
if not file_name:
file_name = file.name
file = str(file.absolute())
elif not file_name:
if isinstance(file, str):
file_name = os.path.basename(file)
# main media
self._file = None
self._media = None
self._uploaded_file = None
if isinstance(file, str) and re.match('https?://', file, flags=re.IGNORECASE):
if not force_file and mime_type.startswith('image'):
self._media = _tl.InputMediaPhotoExternal(file, ttl_seconds=ttl)
else:
file_name = getattr(file, 'name', 'unnamed')
self._media = _tl.InputMediaDocumentExternal(file, ttl_seconds=ttl)
if not mime_type:
mime_type = mimetypes.guess_type(file_name)[0] or 'application/octet-stream'
elif isinstance(file, (str, bytes, Path)) or callable(getattr(file, 'read', None)):
self._file = file
mime_type = mime_type.lower()
elif isinstance(file, (_tl.InputFile, _tl.InputFileBig)):
self._uploaded_file = (file, time.time())
attributes = [_tl.DocumentAttributeFilename(file_name)]
# TODO hachoir or tinytag or ffmpeg
if mime_type.startswith('image'):
if width is not None and height is not None:
attributes.append(_tl.DocumentAttributeImageSize(
w=width,
h=height,
))
elif mime_type.startswith('audio'):
attributes.append(_tl.DocumentAttributeAudio(
duration=duration,
voice=voice_note,
title=title,
performer=performer,
waveform=waveform,
))
elif mime_type.startswith('video'):
attributes.append(_tl.DocumentAttributeVideo(
duration=duration,
w=width,
h=height,
round_message=video_note,
supports_streaming=supports_streaming,
))
# mime_type: str = None,
# thumb: str = False,
# force_file: bool = False,
# file_size: int = None,
# ttl: int = None,
self._file = file
self._attributes = attributes
# TODO rest
is_image = utils.is_image(file)
if as_image is None:
as_image = is_image and not force_document
# `aiofiles` do not base `io.IOBase` but do have `read`, so we
# just check for the read attribute to see if it's file-like.
if not isinstance(file, (str, bytes, _tl.InputFile, _tl.InputFileBig))\
and not hasattr(file, 'read'):
# The user may pass a Message containing media (or the media,
# or anything similar) that should be treated as a file. Try
# getting the input media for whatever they passed and send it.
#
# We pass all attributes since these will be used if the user
# passed :tl:`InputFile`, and all information may be relevant.
try:
return (None, utils.get_input_media(
file,
is_photo=as_image,
attributes=attributes,
force_document=force_document,
voice_note=voice_note,
video_note=video_note,
supports_streaming=supports_streaming,
ttl=ttl
), as_image)
except TypeError:
# Can't turn whatever was given into media
return None, None, as_image
media = None
file_handle = None
if isinstance(file, (_tl.InputFile, _tl.InputFileBig)):
file_handle = file
elif not isinstance(file, str) or os.path.isfile(file):
file_handle = await self.upload_file(
_resize_photo_if_needed(file, as_image),
file_size=file_size,
progress_callback=progress_callback
)
elif re.match('https?://', file):
if as_image:
media = _tl.InputMediaPhotoExternal(file, ttl_seconds=ttl)
else:
media = _tl.InputMediaDocumentExternal(file, ttl_seconds=ttl)
if media:
pass # Already have media, don't check the rest
elif not file_handle:
raise ValueError(
'Failed to convert {} to media. Not an existing file or '
'HTTP URL'.format(file)
)
elif as_image:
media = _tl.InputMediaUploadedPhoto(file_handle, ttl_seconds=ttl)
else:
attributes, mime_type = utils.get_attributes(
self._media = utils.get_input_media(
file,
mime_type=mime_type,
attributes=attributes,
force_document=force_document and not is_image,
is_photo=not force_file and mime_type.startswith('image'),
attributes=[],
force_document=force_file,
voice_note=voice_note,
video_note=video_note,
supports_streaming=supports_streaming,
thumb=thumb
ttl=ttl
)
if not thumb:
thumb = None
else:
if isinstance(thumb, pathlib.Path):
thumb = str(thumb.absolute())
thumb = await self.upload_file(thumb, file_size=file_size)
# thumbnail
self._thumb = None
self._uploaded_thumb = None
media = _tl.InputMediaUploadedDocument(
file=file_handle,
mime_type=mime_type,
attributes=attributes,
thumb=thumb,
force_file=force_document and not is_image,
ttl_seconds=ttl
if isinstance(thumb, (str, bytes, Path)) or callable(getattr(thumb, 'read', None)):
self._thumb = thumb
elif isinstance(thumb, (_tl.InputFile, _tl.InputFileBig)):
self._uploaded_thumb = (thumb, time.time())
else:
raise TypeError(f'thumb must be a file to upload, but got: {thumb!r}')
# document parameters (only if it's our file, i.e. there's no media ready yet)
if self._media:
self._mime_type = None
self._attributes = None
self._video_note = None
self._force_file = None
self._ttl = None
else:
if isinstance(file, Path):
if not file_name:
file_name = file.name
file = str(file.absolute())
elif not file_name:
if isinstance(file, str):
file_name = os.path.basename(file)
else:
file_name = getattr(file, 'name', 'unnamed')
if not mime_type:
mime_type = mimetypes.guess_type(file_name)[0] or 'application/octet-stream'
mime_type = mime_type.lower()
attributes = [_tl.DocumentAttributeFilename(file_name)]
# TODO hachoir or tinytag or ffmpeg
if mime_type.startswith('image'):
if width is not None and height is not None:
attributes.append(_tl.DocumentAttributeImageSize(
w=width,
h=height,
))
elif mime_type.startswith('audio'):
attributes.append(_tl.DocumentAttributeAudio(
duration=duration,
voice=voice_note,
title=title,
performer=performer,
waveform=waveform,
))
elif mime_type.startswith('video'):
attributes.append(_tl.DocumentAttributeVideo(
duration=duration,
w=width,
h=height,
round_message=video_note,
supports_streaming=supports_streaming,
))
self._mime_type = mime_type
self._attributes = attributes
self._video_note = video_note
self._force_file = force_file
self._ttl = ttl
def _should_upload_thumb(self):
return self._thumb and (
not self._uploaded_thumb
or time.time() > self._uploaded_thumb[1] + InputFile.UPLOAD_TTL)
def _should_upload_file(self):
return self._file and (
not self._uploaded_file
or time.time() > self._uploaded_file[1] + InputFile.UPLOAD_TTL)
def _set_uploaded_thumb(self, input_file):
self._uploaded_thumb = (input_file, time.time())
def _set_uploaded_file(self, input_file):
if not self._force_file and self._mime_type.startswith('image'):
self._media = _tl.InputMediaUploadedPhoto(input_file, ttl_seconds=self._ttl)
else:
self._media = _tl.InputMediaUploadedDocument(
file=input_file,
mime_type=self._mime_type,
attributes=self._attributes,
thumb=self._uploaded_thumb[0] if self._uploaded_thumb else None,
force_file=self._force_file,
ttl_seconds=self._ttl,
)
return file_handle, media, as_image

View File

@ -1,3 +1,8 @@
from typing import Optional
from .inputfile import InputFile
from ... import _misc
from .button import build_reply_markup
class InputMessage:
__slots__ = (
@ -9,21 +14,83 @@ class InputMessage:
'_file',
)
_default_parse_mode = (lambda t: (t, []), lambda t, e: t)
_default_link_preview = True
def __init__(
self,
text,
text: str = None,
*,
link_preview,
silent,
reply_markup,
fmt_entities,
file,
markdown: str = None,
html: str = None,
formatting_entities: list = None,
link_preview: bool = (),
file=None,
file_name: str = None,
mime_type: str = None,
thumb: str = False,
force_file: bool = False,
file_size: int = None,
duration: int = None,
width: int = None,
height: int = None,
title: str = None,
performer: str = None,
supports_streaming: bool = False,
video_note: bool = False,
voice_note: bool = False,
waveform: bytes = None,
silent: bool = False,
buttons: list = None,
ttl: int = None,
parse_fn = None,
):
if (text and markdown) or (text and html) or (markdown and html):
raise ValueError('can only set one of: text, markdown, html')
if formatting_entities:
text = text or markdown or html
elif text:
text, formatting_entities = self._default_parse_mode[0](text)
elif markdown:
text, formatting_entities = _misc.markdown.parse(markdown)
elif html:
text, formatting_entities = _misc.html.parse(html)
reply_markup = build_reply_markup(buttons) if buttons else None
if not text:
text = ''
if not formatting_entities:
formatting_entities = None
if link_preview == ():
link_preview = self._default_link_preview
if file and not isinstance(file, InputFile):
file = InputFile(
file=file,
file_name=file_name,
mime_type=mime_type,
thumb=thumb,
force_file=force_file,
file_size=file_size,
duration=duration,
width=width,
height=height,
title=title,
performer=performer,
supports_streaming=supports_streaming,
video_note=video_note,
voice_note=voice_note,
waveform=waveform,
)
self._text = text
self._link_preview = link_preview
self._silent = silent
self._reply_markup = reply_markup
self._fmt_entities = fmt_entities
self._fmt_entities = formatting_entities
self._file = file
# oh! when this message is used, the file can be cached in here! if not inputfile upload and set inputfile

View File

@ -228,9 +228,6 @@ class Message(ChatGetter, SenderGetter):
# region Initialization
_default_parse_mode = None
_default_link_preview = True
def __init__(
self,
text: str = None,
@ -241,7 +238,7 @@ class Message(ChatGetter, SenderGetter):
formatting_entities: list = None,
link_preview: bool = (),
# Media
file: Optional[hints.FileLike] = None,
file: 'Optional[hints.FileLike]' = None,
file_name: str = None,
mime_type: str = None,
thumb: str = False,
@ -379,54 +376,30 @@ class Message(ChatGetter, SenderGetter):
Not all types of media can be used with this parameter, such as text documents, which
will fail with ``TtlMediaInvalidError``.
"""
if (text and markdown) or (text and html) or (markdown and html):
raise ValueError('can only set one of: text, markdown, html')
if formatting_entities:
text = text or markdown or html
elif text:
text, formatting_entities = self._default_parse_mode[0](text)
elif markdown:
text, formatting_entities = _misc.markdown.parse(markdown)
elif html:
text, formatting_entities = _misc.html.parse(html)
reply_markup = build_reply_markup(buttons) if buttons else None
if not text:
text = ''
if not formatting_entities:
formatting_entities = None
if link_preview == ():
link_preview = self._default_link_preview
if file:
file = InputFile(
file=file,
file_name=file_name,
mime_type=mime_type,
thumb=thumb,
force_file=force_file,
file_size=file_size,
duration=duration,
width=width,
height=height,
title=title,
performer=performer,
supports_streaming=supports_streaming,
video_note=video_note,
voice_note=voice_note,
waveform=waveform,
)
self._message = InputMessage(
text=text,
markdown=markdown,
html=html,
formatting_entities=formatting_entities,
link_preview=link_preview,
file =file,
file_name=file_name,
mime_type=mime_type,
thumb=thumb,
force_file=force_file,
file_size=file_size,
duration=duration,
width=width,
height=height,
title=title,
performer=performer,
supports_streaming=supports_streaming,
video_note=video_note,
voice_note=voice_note,
waveform=waveform,
silent=silent,
reply_markup=reply_markup,
fmt_entities=formatting_entities,
file=file,
buttons=buttons,
ttl=ttl,
)
@classmethod
@ -446,7 +419,7 @@ class Message(ChatGetter, SenderGetter):
sender_id = utils.get_peer_id(message.peer_id)
# Note that these calls would reset the client
ChatGetter.__init__(self, self.peer_id, broadcast=self.post)
ChatGetter.__init__(self, message.peer_id, broadcast=message.post)
SenderGetter.__init__(self, sender_id)
self._client = client
self._message = message
@ -511,8 +484,8 @@ class Message(ChatGetter, SenderGetter):
return self
@classmethod
def set_default_parse_mode(cls, mode):
@staticmethod
def set_default_parse_mode(mode):
"""
Change the default parse mode when creating messages. The ``mode`` can be:
@ -531,27 +504,29 @@ class Message(ChatGetter, SenderGetter):
if isinstance(mode, str):
mode = mode.lower()
if mode in ('md', 'markdown'):
cls._default_parse_mode = (_misc.markdown.parse, _misc.markdown.unparse)
mode = (_misc.markdown.parse, _misc.markdown.unparse)
elif mode in ('htm', 'html'):
cls._default_parse_mode = (_misc.html.parse, _misc.html.unparse)
mode = (_misc.html.parse, _misc.html.unparse)
else:
raise ValueError(f'mode must be one of md, markdown, htm or html, but was {mode!r}')
elif callable(mode):
cls._default_parse_mode = (mode, lambda t, e: t)
mode = (mode, lambda t, e: t)
elif isinstance(mode, tuple):
if len(mode) == 2 and callable(mode[0]) and callable(mode[1]):
cls._default_parse_mode = mode
mode = mode
else:
raise ValueError(f'mode must be a tuple of exactly two callables')
else:
raise TypeError(f'mode must be either a str, callable or tuple, but was {mode!r}')
InputMessage._default_parse_mode = mode
@classmethod
def set_default_link_preview(cls, enabled):
"""
Change the default value for link preview (either ``True`` or ``False``).
"""
cls._default_link_preview = enabled
InputMessage._default_link_preview = enabled
# endregion Initialization
@ -1297,6 +1272,14 @@ class Message(ChatGetter, SenderGetter):
# region Private Methods
def _as_input(self):
if isinstance(self._message, InputMessage):
return self._message
return InputMessage(
)
async def _reload_message(self):
"""
Re-fetches this message to reload the sender and chat entities,