Add client.prepare_album

This commit is contained in:
Lonami Exo 2023-11-07 19:34:10 +01:00
parent ae44426a78
commit 6047c689ca
10 changed files with 531 additions and 150 deletions

View File

@ -37,6 +37,7 @@ from ..events import Event
from ..events.filters import Filter
from ..types import (
AdminRight,
AlbumBuilder,
AsyncList,
Chat,
ChatLike,
@ -53,8 +54,8 @@ from ..types import (
PasswordToken,
RecentAction,
User,
buttons,
)
from ..types import buttons as btns
from .auth import (
bot_sign_in,
check_password,
@ -77,10 +78,12 @@ from .dialogs import delete_dialog, edit_draft, get_dialogs, get_drafts
from .files import (
download,
get_file_bytes,
prepare_album,
send_audio,
send_file,
send_photo,
send_video,
upload,
)
from .messages import (
MessageMap,
@ -1085,6 +1088,34 @@ class Client:
"""
return await pin_message(self, chat, message_id)
def prepare_album(self) -> AlbumBuilder:
"""
Prepare an album upload to send.
Albums are a way to send multiple photos or videos as separate messages with the same grouped identifier.
:return: A new album builder instance, with no media added to it yet.
.. rubric:: Example
.. code-block:: python
# Prepare a new album
album = await client.prepare_album()
# Add a bunch of photos
for photo in ('a.jpg', 'b.png'):
await album.add_photo(photo)
# A video in-between
await album.add_video('c.mp4')
# And another photo
await album.add_photo('d.jpeg')
# Album is ready to be sent to as many chats as needed
await album.send(chat)
"""
return prepare_album(self)
async def read_message(
self, chat: ChatLike, message_id: Union[int, Literal["all"]]
) -> None:
@ -1310,6 +1341,7 @@ class Client:
self,
chat: ChatLike,
file: Union[str, Path, InFileLike, File],
mime_type: Optional[str] = None,
*,
size: Optional[int] = None,
name: Optional[str] = None,
@ -1320,6 +1352,8 @@ class Client:
caption: Optional[str] = None,
caption_markdown: Optional[str] = None,
caption_html: Optional[str] = None,
reply_to: Optional[int] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
) -> Message:
"""
Send an audio file.
@ -1333,6 +1367,7 @@ class Client:
:param file: See :meth:`send_file`.
:param size: See :meth:`send_file`.
:param name: See :meth:`send_file`.
:param mime_type: See :meth:`send_file`.
:param duration: See :meth:`send_file`.
:param voice: See :meth:`send_file`.
:param title: See :meth:`send_file`.
@ -1351,6 +1386,7 @@ class Client:
self,
chat,
file,
mime_type,
size=size,
name=name,
duration=duration,
@ -1360,6 +1396,8 @@ class Client:
caption=caption,
caption_markdown=caption_markdown,
caption_html=caption_html,
reply_to=reply_to,
buttons=buttons,
)
async def send_file(
@ -1386,6 +1424,8 @@ class Client:
caption: Optional[str] = None,
caption_markdown: Optional[str] = None,
caption_html: Optional[str] = None,
reply_to: Optional[int] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
) -> Message:
"""
Send any type of file with any amount of attributes.
@ -1438,8 +1478,10 @@ class Client:
When given a string or path, its :attr:`~pathlib.PurePath.name` will be used by default only if this parameter is omitted.
This parameter **must** be specified when sending a previously-opened or in-memory files.
The library will not attempt to read any ``name`` attributes the object may have.
When given a :term:`file-like object`, if it has a ``.name`` :class:`str` property, it will be used.
This is the case for files opened via :func:`open`.
This parameter **must** be specified when sending any other previously-opened or in-memory files.
:param mime_type:
Override for the default mime-type.
@ -1536,6 +1578,8 @@ class Client:
caption=caption,
caption_markdown=caption_markdown,
caption_html=caption_html,
reply_to=reply_to,
buttons=buttons,
)
async def send_message(
@ -1547,9 +1591,7 @@ class Client:
html: Optional[str] = None,
link_preview: bool = False,
reply_to: Optional[int] = None,
buttons: Optional[
Union[List[buttons.Button], List[List[buttons.Button]]]
] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
) -> Message:
"""
Send a message.
@ -1594,12 +1636,15 @@ class Client:
*,
size: Optional[int] = None,
name: Optional[str] = None,
mime_type: Optional[str] = None,
compress: bool = True,
width: Optional[int] = None,
height: Optional[int] = None,
caption: Optional[str] = None,
caption_markdown: Optional[str] = None,
caption_html: Optional[str] = None,
reply_to: Optional[int] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
) -> Message:
"""
Send a photo file.
@ -1617,6 +1662,7 @@ class Client:
:param file: See :meth:`send_file`.
:param size: See :meth:`send_file`.
:param name: See :meth:`send_file`.
:param mime_type: See :meth:`send_file`.
:param compress: See :meth:`send_file`.
:param width: See :meth:`send_file`.
:param height: See :meth:`send_file`.
@ -1636,12 +1682,15 @@ class Client:
file,
size=size,
name=name,
mime_type=mime_type,
compress=compress,
width=width,
height=height,
caption=caption,
caption_markdown=caption_markdown,
caption_html=caption_html,
reply_to=reply_to,
buttons=buttons,
)
async def send_video(
@ -1651,14 +1700,18 @@ class Client:
*,
size: Optional[int] = None,
name: Optional[str] = None,
mime_type: Optional[str] = None,
duration: Optional[float] = None,
width: Optional[int] = None,
height: Optional[int] = None,
round: bool = False,
supports_streaming: bool = False,
muted: bool = False,
caption: Optional[str] = None,
caption_markdown: Optional[str] = None,
caption_html: Optional[str] = None,
reply_to: Optional[int] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
) -> Message:
"""
Send a video file.
@ -1672,6 +1725,7 @@ class Client:
:param file: See :meth:`send_file`.
:param size: See :meth:`send_file`.
:param name: See :meth:`send_file`.
:param mime_type: See :meth:`send_file`.
:param duration: See :meth:`send_file`.
:param width: See :meth:`send_file`.
:param height: See :meth:`send_file`.
@ -1693,14 +1747,18 @@ class Client:
file,
size=size,
name=name,
mime_type=mime_type,
duration=duration,
width=width,
height=height,
round=round,
supports_streaming=supports_streaming,
muted=muted,
caption=caption,
caption_markdown=caption_markdown,
caption_html=caption_html,
reply_to=reply_to,
buttons=buttons,
)
async def set_chat_default_restrictions(
@ -1961,6 +2019,11 @@ class Client:
def _input_to_peer(self, input: Optional[abcs.InputPeer]) -> Optional[abcs.Peer]:
return input_to_peer(self, input)
async def _upload(
self, fd: Union[str, Path, InFileLike], size: Optional[int], name: Optional[str]
) -> Tuple[abcs.InputFile, str]:
return await upload(self, fd, size, name)
async def __call__(self, request: Request[Return]) -> Return:
if not self._sender:
raise ConnectionError("not connected")

View File

@ -4,8 +4,15 @@ import time
from typing import TYPE_CHECKING, Optional
from ...tl import functions, types
from ..types import AsyncList, ChatLike, Dialog, Draft, build_chat_map, build_msg_map
from .messages import parse_message
from ..types import (
AsyncList,
ChatLike,
Dialog,
Draft,
build_chat_map,
build_msg_map,
parse_message,
)
if TYPE_CHECKING:
from .client import Client
@ -125,7 +132,6 @@ async def edit_draft(
message, entities = parse_message(
text=text, markdown=markdown, html=html, allow_empty=False
)
assert isinstance(message, str)
result = await self(
functions.messages.save_draft(

View File

@ -2,13 +2,13 @@ from __future__ import annotations
import hashlib
import mimetypes
import urllib.parse
from inspect import isawaitable
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Union
from typing import TYPE_CHECKING, List, Optional, Tuple, Union
from ...tl import abcs, functions, types
from ..types import (
AlbumBuilder,
AsyncList,
ChatLike,
File,
@ -16,10 +16,14 @@ from ..types import (
Message,
OutFileLike,
OutWrapper,
)
from ..types import buttons as btns
from ..types import (
expand_stripped_size,
generate_random_id,
parse_message,
try_get_url_path,
)
from .messages import parse_message
if TYPE_CHECKING:
from .client import Client
@ -34,6 +38,10 @@ BIG_FILE_SIZE = 10 * 1024 * 1024
math_round = round
def prepare_album(self: Client) -> AlbumBuilder:
return AlbumBuilder._create(client=self)
async def send_photo(
self: Client,
chat: ChatLike,
@ -41,12 +49,15 @@ async def send_photo(
*,
size: Optional[int] = None,
name: Optional[str] = None,
mime_type: Optional[str] = None,
compress: bool = True,
width: Optional[int] = None,
height: Optional[int] = None,
caption: Optional[str] = None,
caption_markdown: Optional[str] = None,
caption_html: Optional[str] = None,
reply_to: Optional[int] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
) -> Message:
return await send_file(
self,
@ -54,12 +65,17 @@ async def send_photo(
file,
size=size,
name=name,
mime_type="image/jpeg" # specific mime doesn't matter, only that it's image
if compress
else mime_type,
compress=compress,
width=width,
height=height,
caption=caption,
caption_markdown=caption_markdown,
caption_html=caption_html,
reply_to=reply_to,
buttons=buttons,
)
@ -67,6 +83,7 @@ async def send_audio(
self: Client,
chat: ChatLike,
file: Union[str, Path, InFileLike, File],
mime_type: Optional[str] = None,
*,
size: Optional[int] = None,
name: Optional[str] = None,
@ -77,6 +94,8 @@ async def send_audio(
caption: Optional[str] = None,
caption_markdown: Optional[str] = None,
caption_html: Optional[str] = None,
reply_to: Optional[int] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
) -> Message:
return await send_file(
self,
@ -84,6 +103,7 @@ async def send_audio(
file,
size=size,
name=name,
mime_type=mime_type,
duration=duration,
voice=voice,
title=title,
@ -91,6 +111,8 @@ async def send_audio(
caption=caption,
caption_markdown=caption_markdown,
caption_html=caption_html,
reply_to=reply_to,
buttons=buttons,
)
@ -101,41 +123,40 @@ async def send_video(
*,
size: Optional[int] = None,
name: Optional[str] = None,
mime_type: Optional[str] = None,
duration: Optional[float] = None,
width: Optional[int] = None,
height: Optional[int] = None,
round: bool = False,
supports_streaming: bool = False,
muted: bool = False,
caption: Optional[str] = None,
caption_markdown: Optional[str] = None,
caption_html: Optional[str] = None,
reply_to: Optional[int] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
) -> Message:
return await send_file(
self,
chat,
file,
size=size,
mime_type=mime_type,
name=name,
duration=duration,
width=width,
height=height,
round=round,
supports_streaming=supports_streaming,
muted=muted,
caption=caption,
caption_markdown=caption_markdown,
caption_html=caption_html,
reply_to=reply_to,
buttons=buttons,
)
def try_get_url_path(maybe_url: Union[str, Path, InFileLike]) -> Optional[str]:
if not isinstance(maybe_url, str):
return None
lowercase = maybe_url.lower()
if lowercase.startswith("http://") or lowercase.startswith("https://"):
return urllib.parse.urlparse(maybe_url).path
return None
async def send_file(
self: Client,
chat: ChatLike,
@ -160,15 +181,18 @@ async def send_file(
caption: Optional[str] = None,
caption_markdown: Optional[str] = None,
caption_html: Optional[str] = None,
reply_to: Optional[int] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
) -> Message:
message, entities = parse_message(
text=caption, markdown=caption_markdown, html=caption_html, allow_empty=True
)
assert isinstance(message, str)
# Re-send existing file.
if isinstance(file, File):
return await do_send_file(self, chat, file._input_media, message, entities)
return await do_send_file(
self, chat, file._input_media, message, entities, reply_to, buttons
)
# URLs are handled early as they can't use any other attributes either.
input_media: abcs.InputMedia
@ -190,23 +214,11 @@ async def send_file(
input_media = types.InputMediaDocumentExternal(
spoiler=False, url=file, ttl_seconds=None
)
return await do_send_file(self, chat, input_media, message, entities)
return await do_send_file(
self, chat, input_media, message, entities, reply_to, buttons
)
# Paths are opened and closed by us. Anything else is *only* read, not closed.
if isinstance(file, (str, Path)):
path = Path(file) if isinstance(file, str) else file
if size is None:
size = path.stat().st_size
if name is None:
name = path.name
with path.open("rb") as fd:
input_file = await upload(self, fd, size, name)
else:
if size is None:
raise ValueError("size must be set when sending file-like objects")
if name is None:
raise ValueError("name must be set when sending file-like objects")
input_file = await upload(self, file, size, name)
input_file, name = await upload(self, file, size, name)
# Mime is mandatory for documents, but we also use it to determine whether to send as photo.
if mime_type is None:
@ -249,7 +261,7 @@ async def send_file(
round_message=round,
supports_streaming=supports_streaming,
nosound=muted,
duration=int(math_round(duration)),
duration=duration,
w=width,
h=height,
preload_prefix_size=None,
@ -268,7 +280,9 @@ async def send_file(
ttl_seconds=None,
)
return await do_send_file(self, chat, input_media, message, entities)
return await do_send_file(
self, chat, input_media, message, entities, reply_to, buttons
)
async def do_send_file(
@ -277,6 +291,8 @@ async def do_send_file(
input_media: abcs.InputMedia,
message: str,
entities: Optional[List[abcs.MessageEntity]],
reply_to: Optional[int],
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]],
) -> Message:
peer = (await client._resolve_to_packed(chat))._to_input_peer()
random_id = generate_random_id()
@ -289,11 +305,15 @@ async def do_send_file(
noforwards=False,
update_stickersets_order=False,
peer=peer,
reply_to=None,
reply_to=types.InputReplyToMessage(
reply_to_msg_id=reply_to, top_msg_id=None
)
if reply_to
else None,
media=input_media,
message=message,
random_id=random_id,
reply_markup=None,
reply_markup=btns.build_keyboard(buttons),
entities=entities,
schedule_date=None,
send_as=None,
@ -304,6 +324,31 @@ async def do_send_file(
async def upload(
client: Client,
file: Union[str, Path, InFileLike],
size: Optional[int],
name: Optional[str],
) -> Tuple[abcs.InputFile, str]:
# Paths are opened and closed by us. Anything else is *only* read, not closed.
if isinstance(file, (str, Path)):
path = Path(file) if isinstance(file, str) else file
if size is None:
size = path.stat().st_size
if name is None:
name = path.name
with path.open("rb") as fd:
return await do_upload(client, fd, size, name), name
else:
if size is None:
raise ValueError("size must be set when sending file-like objects")
if name is None:
name = getattr(file, "name", None)
if not isinstance(name, str):
raise ValueError("name must be set when sending file-like objects")
return await do_upload(client, file, size, name), name
async def do_upload(
client: Client,
fd: InFileLike,
size: int,

View File

@ -2,85 +2,18 @@ from __future__ import annotations
import datetime
import sys
from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Self, Tuple, Union
from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Self, Union
from ...session import PackedChat
from ...tl import abcs, functions, types
from ..parsers import parse_html_message, parse_markdown_message
from ..types import (
AsyncList,
Chat,
ChatLike,
Message,
build_chat_map,
buttons,
generate_random_id,
peer_id,
)
from ..types import AsyncList, Chat, ChatLike, Message, build_chat_map
from ..types import buttons as btns
from ..types import generate_random_id, parse_message, peer_id
if TYPE_CHECKING:
from .client import Client
def parse_message(
*,
text: Optional[Union[str, Message]],
markdown: Optional[str],
html: Optional[str],
allow_empty: bool,
) -> Tuple[Union[str, Message], Optional[List[abcs.MessageEntity]]]:
cnt = sum((text is not None, markdown is not None, html is not None))
if cnt != 1:
if cnt == 0 and allow_empty:
return "", None
raise ValueError("must specify exactly one of text, markdown or html")
if text is not None:
parsed, entities = text, None
elif markdown is not None:
parsed, entities = parse_markdown_message(markdown)
elif html is not None:
parsed, entities = parse_html_message(html)
else:
raise RuntimeError("unexpected case")
return parsed, entities or None
def build_keyboard(
btns: Optional[Union[List[buttons.Button], List[List[buttons.Button]]]]
) -> Optional[abcs.ReplyMarkup]:
# list[button] -> list[list[button]]
# This does allow for "invalid" inputs (mixing lists and non-lists), but that's acceptable.
buttons_lists_iter = (
button if isinstance(button, list) else [button] for button in (btns or [])
)
# Remove empty rows (also making it easy to check if all-empty).
buttons_lists = [bs for bs in buttons_lists_iter if bs]
if not buttons_lists:
return None
rows: List[abcs.KeyboardButtonRow] = [
types.KeyboardButtonRow(buttons=[btn._raw for btn in btns])
for btns in buttons_lists
]
# Guaranteed to have at least one, first one used to check if it's inline.
# If the user mixed inline with non-inline, Telegram will complain.
if isinstance(buttons_lists[0][0], buttons.InlineButton):
return types.ReplyInlineMarkup(rows=rows)
else:
return types.ReplyKeyboardMarkup(
resize=False,
single_use=False,
selective=False,
persistent=False,
rows=rows,
placeholder=None,
)
async def send_message(
self: Client,
chat: ChatLike,
@ -90,16 +23,39 @@ async def send_message(
html: Optional[str] = None,
link_preview: bool = False,
reply_to: Optional[int] = None,
buttons: Optional[Union[List[buttons.Button], List[List[buttons.Button]]]] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
) -> Message:
packed = await self._resolve_to_packed(chat)
peer = packed._to_input_peer()
message, entities = parse_message(
text=text, markdown=markdown, html=html, allow_empty=False
)
random_id = generate_random_id()
result = await self(
functions.messages.send_message(
if isinstance(text, Message):
message = text.text or ""
request = functions.messages.send_message(
no_webpage=not text.link_preview,
silent=text.silent,
background=False,
clear_draft=False,
noforwards=not text.can_forward,
update_stickersets_order=False,
peer=peer,
reply_to=types.InputReplyToMessage(
reply_to_msg_id=text.replied_message_id, top_msg_id=None
)
if text.replied_message_id
else None,
message=message,
random_id=random_id,
reply_markup=getattr(text._raw, "reply_markup", None),
entities=getattr(text._raw, "entities", None) or None,
schedule_date=None,
send_as=None,
)
else:
message, entities = parse_message(
text=text, markdown=markdown, html=html, allow_empty=False
)
request = functions.messages.send_message(
no_webpage=not link_preview,
silent=False,
background=False,
@ -114,33 +70,13 @@ async def send_message(
else None,
message=message,
random_id=random_id,
reply_markup=build_keyboard(buttons),
reply_markup=btns.build_keyboard(buttons),
entities=entities,
schedule_date=None,
send_as=None,
)
if isinstance(message, str)
else functions.messages.send_message(
no_webpage=not message.link_preview,
silent=message.silent,
background=False,
clear_draft=False,
noforwards=not message.can_forward,
update_stickersets_order=False,
peer=peer,
reply_to=types.InputReplyToMessage(
reply_to_msg_id=message.replied_message_id, top_msg_id=None
)
if message.replied_message_id
else None,
message=message.text or "",
random_id=random_id,
reply_markup=getattr(message._raw, "reply_markup", None),
entities=getattr(message._raw, "entities", None) or None,
schedule_date=None,
send_as=None,
)
)
result = await self(request)
if isinstance(result, types.UpdateShortSentMessage):
return Message._from_defaults(
self,
@ -161,7 +97,7 @@ async def send_message(
if reply_to
else None,
date=result.date,
message=message if isinstance(message, str) else (message.text or ""),
message=message,
media=result.media,
entities=result.entities,
ttl_period=result.ttl_period,
@ -184,7 +120,6 @@ async def edit_message(
message, entities = parse_message(
text=text, markdown=markdown, html=html, allow_empty=False
)
assert isinstance(message, str)
return self._build_message_map(
await self(
functions.messages.edit_message(

View File

@ -1,4 +1,5 @@
from .admin_right import AdminRight
from .album_builder import AlbumBuilder
from .async_list import AsyncList
from .callback_answer import CallbackAnswer
from .chat import (
@ -14,10 +15,23 @@ from .chat import (
from .chat_restriction import ChatRestriction
from .dialog import Dialog
from .draft import Draft
from .file import File, InFileLike, OutFileLike, OutWrapper, expand_stripped_size
from .file import (
File,
InFileLike,
OutFileLike,
OutWrapper,
expand_stripped_size,
try_get_url_path,
)
from .inline_result import InlineResult
from .login_token import LoginToken
from .message import Message, adapt_date, build_msg_map, generate_random_id
from .message import (
Message,
adapt_date,
build_msg_map,
generate_random_id,
parse_message,
)
from .meta import NoPublicConstructor
from .participant import Participant
from .password_token import PasswordToken
@ -25,6 +39,7 @@ from .recent_action import RecentAction
__all__ = [
"AdminRight",
"AlbumBuilder",
"AsyncList",
"ChatRestriction",
"CallbackAnswer",
@ -43,12 +58,14 @@ __all__ = [
"OutFileLike",
"OutWrapper",
"expand_stripped_size",
"try_get_url_path",
"InlineResult",
"LoginToken",
"Message",
"adapt_date",
"build_msg_map",
"generate_random_id",
"parse_message",
"NoPublicConstructor",
"Participant",
"PasswordToken",

View File

@ -0,0 +1,239 @@
from __future__ import annotations
import mimetypes
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Union
from ...tl import abcs, functions, types
from .chat import ChatLike
from .file import InFileLike, try_get_url_path
from .message import Message, generate_random_id, parse_message
from .meta import NoPublicConstructor
if TYPE_CHECKING:
from ..client.client import Client
class AlbumBuilder(metaclass=NoPublicConstructor):
"""
Album builder to prepare albums with multiple files before sending it all at once.
This class is constructed by calling :meth:`telethon.Client.prepare_album`.
"""
def __init__(self, *, client: Client):
self._client = client
self._medias: List[types.InputSingleMedia] = []
async def add_photo(
self,
file: Union[str, Path, InFileLike],
*,
size: Optional[int] = None,
caption: Optional[str] = None,
caption_markdown: Optional[str] = None,
caption_html: Optional[str] = None,
) -> None:
"""
Add a photo to the album.
:param file:
The photo to attach to the album.
This behaves the same way as the file parameter in :meth:`telethon.Client.send_file`,
*except* that it cannot be previously-sent media.
:param size: See :meth:`telethon.Client.send_file`.
:param caption: See :ref:`formatting`.
:param caption_markdown: See :ref:`formatting`.
:param caption_html: See :ref:`formatting`.
"""
input_media: abcs.InputMedia
if try_get_url_path(file) is not None:
assert isinstance(file, str)
input_media = types.InputMediaPhotoExternal(
spoiler=False, url=file, ttl_seconds=None
)
else:
input_file, _ = await self._client._upload(file, size, "a.jpg")
input_media = types.InputMediaUploadedPhoto(
spoiler=False, file=input_file, stickers=None, ttl_seconds=None
)
media = await self._client(
functions.messages.upload_media(
peer=types.InputPeerSelf(), media=input_media
)
)
assert isinstance(media, types.MessageMediaPhoto)
assert isinstance(media.photo, types.Photo)
input_media = types.InputMediaPhoto(
spoiler=media.spoiler,
id=types.InputPhoto(
id=media.photo.id,
access_hash=media.photo.access_hash,
file_reference=media.photo.file_reference,
),
ttl_seconds=media.ttl_seconds,
)
message, entities = parse_message(
text=caption, markdown=caption_markdown, html=caption_html, allow_empty=True
)
self._medias.append(
types.InputSingleMedia(
media=input_media,
random_id=generate_random_id(),
message=message,
entities=entities,
)
)
async def add_video(
self,
file: Union[str, Path, InFileLike],
*,
size: Optional[int] = None,
name: Optional[str] = None,
mime_type: Optional[str] = None,
duration: Optional[float] = None,
width: Optional[int] = None,
height: Optional[int] = None,
round: bool = False,
supports_streaming: bool = False,
muted: bool = False,
caption: Optional[str] = None,
caption_markdown: Optional[str] = None,
caption_html: Optional[str] = None,
) -> None:
"""
Add a video to the album.
:param file:
The video to attach to the album.
This behaves the same way as the file parameter in :meth:`telethon.Client.send_file`,
*except* that it cannot be previously-sent media.
:param size: See :meth:`telethon.Client.send_file`.
:param name: See :meth:`telethon.Client.send_file`.
:param mime_type: See :meth:`telethon.Client.send_file`.
:param duration: See :meth:`telethon.Client.send_file`.
:param width: See :meth:`telethon.Client.send_file`.
:param height: See :meth:`telethon.Client.send_file`.
:param round: See :meth:`telethon.Client.send_file`.
:param supports_streaming: See :meth:`telethon.Client.send_file`.
:param muted: See :meth:`telethon.Client.send_file`.
:param caption: See :ref:`formatting`.
:param caption_markdown: See :ref:`formatting`.
:param caption_html: See :ref:`formatting`.
"""
input_media: abcs.InputMedia
if try_get_url_path(file) is not None:
assert isinstance(file, str)
input_media = types.InputMediaDocumentExternal(
spoiler=False, url=file, ttl_seconds=None
)
else:
input_file, name = await self._client._upload(file, size, name)
if mime_type is None:
mime_type, _ = mimetypes.guess_type(name, strict=False)
if mime_type is None:
mime_type = "application/octet-stream"
attributes: List[abcs.DocumentAttribute] = []
attributes.append(types.DocumentAttributeFilename(file_name=name))
if duration is not None and width is not None and height is not None:
attributes.append(
types.DocumentAttributeVideo(
round_message=round,
supports_streaming=supports_streaming,
nosound=muted,
duration=duration,
w=width,
h=height,
preload_prefix_size=None,
)
)
input_media = types.InputMediaUploadedDocument(
nosound_video=muted,
force_file=False,
spoiler=False,
file=input_file,
thumb=None,
mime_type=mime_type,
attributes=attributes,
stickers=None,
ttl_seconds=None,
)
media = await self._client(
functions.messages.upload_media(
peer=types.InputPeerEmpty(), media=input_media
)
)
assert isinstance(media, types.MessageMediaDocument)
assert isinstance(media.document, types.Document)
input_media = types.InputMediaDocument(
spoiler=media.spoiler,
id=types.InputDocument(
id=media.document.id,
access_hash=media.document.access_hash,
file_reference=media.document.file_reference,
),
ttl_seconds=media.ttl_seconds,
query=None,
)
message, entities = parse_message(
text=caption, markdown=caption_markdown, html=caption_html, allow_empty=True
)
self._medias.append(
types.InputSingleMedia(
media=input_media,
random_id=generate_random_id(),
message=message,
entities=entities,
)
)
async def send(
self, chat: ChatLike, *, reply_to: Optional[int] = None
) -> List[Message]:
"""
Send the album.
:return: All sent messages that are part of the album.
.. rubric:: Example
.. code-block:: python
album = await client.prepare_album()
for photo in ('a.jpg', 'b.png'):
await album.add_photo(photo)
messages = await album.send(chat)
"""
peer = (await self._client._resolve_to_packed(chat))._to_input_peer()
msg_map = self._client._build_message_map(
await self._client(
functions.messages.send_multi_media(
silent=False,
background=False,
clear_draft=False,
noforwards=False,
update_stickersets_order=False,
peer=peer,
reply_to=types.InputReplyToMessage(
reply_to_msg_id=reply_to, top_msg_id=None
)
if reply_to
else None,
multi_media=self._medias,
schedule_date=None,
send_as=None,
)
),
peer,
)
return [msg_map.with_random_id(media.random_id) for media in self._medias]

View File

@ -1,7 +1,7 @@
from __future__ import annotations
import weakref
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, List, Optional, Union
from ....tl import abcs, types
from .button import Button
@ -23,6 +23,40 @@ def as_concrete_row(row: abcs.KeyboardButtonRow) -> types.KeyboardButtonRow:
return row
def build_keyboard(
btns: Optional[Union[List[Button], List[List[Button]]]]
) -> Optional[abcs.ReplyMarkup]:
# list[button] -> list[list[button]]
# This does allow for "invalid" inputs (mixing lists and non-lists), but that's acceptable.
buttons_lists_iter = (
button if isinstance(button, list) else [button] for button in (btns or [])
)
# Remove empty rows (also making it easy to check if all-empty).
buttons_lists = [bs for bs in buttons_lists_iter if bs]
if not buttons_lists:
return None
rows: List[abcs.KeyboardButtonRow] = [
types.KeyboardButtonRow(buttons=[btn._raw for btn in btns])
for btns in buttons_lists
]
# Guaranteed to have at least one, first one used to check if it's inline.
# If the user mixed inline with non-inline, Telegram will complain.
if isinstance(buttons_lists[0][0], InlineButton):
return types.ReplyInlineMarkup(rows=rows)
else:
return types.ReplyKeyboardMarkup(
resize=False,
single_use=False,
selective=False,
persistent=False,
rows=rows,
placeholder=None,
)
def create_button(message: Message, raw: abcs.KeyboardButton) -> Button:
"""
Create a custom button from a Telegram button.

View File

@ -1,6 +1,7 @@
from __future__ import annotations
import mimetypes
import urllib.parse
from inspect import isawaitable
from io import BufferedWriter
from pathlib import Path
@ -68,6 +69,15 @@ def photo_size_dimensions(
raise RuntimeError("unexpected case")
def try_get_url_path(maybe_url: Union[str, Path, InFileLike]) -> Optional[str]:
if not isinstance(maybe_url, str):
return None
lowercase = maybe_url.lower()
if lowercase.startswith("http://") or lowercase.startswith("https://"):
return urllib.parse.urlparse(maybe_url).path
return None
class InFileLike(Protocol):
"""
A :term:`file-like object` used for input only.

View File

@ -2,10 +2,15 @@ from __future__ import annotations
import datetime
import time
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Self, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Self, Tuple, Union
from ...tl import abcs, types
from ..parsers import generate_html_message, generate_markdown_message
from ..parsers import (
generate_html_message,
generate_markdown_message,
parse_html_message,
parse_markdown_message,
)
from .buttons import Button, as_concrete_row, create_button
from .chat import Chat, ChatLike, expand_peer, peer_id
from .file import File
@ -465,3 +470,28 @@ def build_msg_map(
msg.id: msg
for msg in (Message._from_raw(client, m, chat_map) for m in messages)
}
def parse_message(
*,
text: Optional[str],
markdown: Optional[str],
html: Optional[str],
allow_empty: bool,
) -> Tuple[str, Optional[List[abcs.MessageEntity]]]:
cnt = sum((text is not None, markdown is not None, html is not None))
if cnt != 1:
if cnt == 0 and allow_empty:
return "", None
raise ValueError("must specify exactly one of text, markdown or html")
if text is not None:
parsed, entities = text, None
elif markdown is not None:
parsed, entities = parse_markdown_message(markdown)
elif html is not None:
parsed, entities = parse_html_message(html)
else:
raise RuntimeError("unexpected case")
return parsed, entities or None

View File

@ -3,6 +3,7 @@ Classes for the various objects the library returns.
"""
from .._impl.client.types import (
AdminRight,
AlbumBuilder,
AsyncList,
CallbackAnswer,
Channel,
@ -25,6 +26,7 @@ from .._impl.session import PackedChat, PackedType
__all__ = [
"AdminRight",
"AlbumBuilder",
"AsyncList",
"CallbackAnswer",
"Channel",