Implement message-related methods

This commit is contained in:
Lonami Exo 2023-09-02 00:49:16 +02:00
parent c46387f7bf
commit 5e43efc55d
6 changed files with 702 additions and 70 deletions

View File

@ -100,7 +100,7 @@ class InlineResult(metaclass=NoPublicConstructor):
peer = self._default_peer peer = self._default_peer
random_id = generate_random_id() random_id = generate_random_id()
return self._client._find_updates_message( return self._client._build_message_map(
await self._client( await self._client(
functions.messages.send_inline_bot_result( functions.messages.send_inline_bot_result(
silent=False, silent=False,
@ -117,9 +117,8 @@ class InlineResult(metaclass=NoPublicConstructor):
send_as=None, send_as=None,
) )
), ),
random_id,
peer, peer,
) ).with_random_id(random_id)
async def inline_query( async def inline_query(

View File

@ -1,7 +1,8 @@
import asyncio import asyncio
import datetime
from collections import deque from collections import deque
from types import TracebackType from types import TracebackType
from typing import Deque, Optional, Self, Type, TypeVar, Union from typing import Deque, List, Literal, Optional, Self, Type, TypeVar, Union
from ...mtsender.sender import Sender from ...mtsender.sender import Sender
from ...session.chat.hash_cache import ChatHashCache from ...session.chat.hash_cache import ChatHashCache
@ -10,6 +11,7 @@ from ...session.message_box.defs import Session
from ...session.message_box.messagebox import MessageBox from ...session.message_box.messagebox import MessageBox
from ...tl import abcs from ...tl import abcs
from ...tl.core.request import Request from ...tl.core.request import Request
from ..types.async_list import AsyncList
from ..types.chat import ChatLike from ..types.chat import ChatLike
from ..types.chat.user import User from ..types.chat.user import User
from ..types.login_token import LoginToken from ..types.login_token import LoginToken
@ -41,14 +43,17 @@ from .chats import (
from .dialogs import conversation, delete_dialog, edit_folder, iter_dialogs, iter_drafts from .dialogs import conversation, delete_dialog, edit_folder, iter_dialogs, iter_drafts
from .downloads import download_media, download_profile_photo, iter_download from .downloads import download_media, download_profile_photo, iter_download
from .messages import ( from .messages import (
MessageMap,
build_message_map,
delete_messages, delete_messages,
edit_message, edit_message,
find_updates_message,
forward_messages, forward_messages,
iter_messages, get_messages,
get_messages_with_ids,
pin_message, pin_message,
search_all_messages,
search_messages,
send_message, send_message,
send_read_acknowledge,
unpin_message, unpin_message,
) )
from .net import ( from .net import (
@ -196,37 +201,112 @@ class Client:
def iter_download(self) -> None: def iter_download(self) -> None:
iter_download(self) iter_download(self)
def iter_messages(self) -> None: async def send_message(
iter_messages(self) self,
chat: ChatLike,
*,
text: Optional[str] = None,
markdown: Optional[str] = None,
html: Optional[str] = None,
link_preview: Optional[bool] = None,
) -> Message:
return await send_message(
self,
chat,
text=text,
markdown=markdown,
html=html,
link_preview=link_preview,
)
async def send_message(self) -> None: async def edit_message(
await send_message(self) self,
chat: ChatLike,
message_id: int,
*,
text: Optional[str] = None,
markdown: Optional[str] = None,
html: Optional[str] = None,
link_preview: Optional[bool] = None,
) -> Message:
return await edit_message(
self,
chat,
message_id,
text=text,
markdown=markdown,
html=html,
link_preview=link_preview,
)
async def forward_messages(self) -> None: async def delete_messages(
await forward_messages(self) self, chat: ChatLike, message_ids: List[int], *, revoke: bool = True
) -> int:
return await delete_messages(self, chat, message_ids, revoke=revoke)
async def edit_message(self) -> None: async def forward_messages(
await edit_message(self) self, target: ChatLike, message_ids: List[int], source: ChatLike
) -> List[Message]:
return await forward_messages(self, target, message_ids, source)
async def delete_messages(self) -> None: def get_messages(
await delete_messages(self) self,
chat: ChatLike,
limit: Optional[int] = None,
*,
offset_id: Optional[int],
offset_date: Optional[datetime.datetime],
) -> AsyncList[Message]:
return get_messages(
self, chat, limit, offset_id=offset_id, offset_date=offset_date
)
async def send_read_acknowledge(self) -> None: def get_messages_with_ids(
await send_read_acknowledge(self) self,
chat: ChatLike,
message_ids: List[int],
) -> AsyncList[Message]:
return get_messages_with_ids(self, chat, message_ids)
async def pin_message(self) -> None: def search_messages(
await pin_message(self) self,
chat: ChatLike,
limit: Optional[int] = None,
*,
query: Optional[str] = None,
offset_id: int,
offset_date: datetime.datetime,
) -> AsyncList[Message]:
return search_messages(
self, chat, limit, query=query, offset_id=offset_id, offset_date=offset_date
)
async def unpin_message(self) -> None: def search_all_messages(
await unpin_message(self) self,
limit: Optional[int] = None,
*,
query: Optional[str] = None,
offset_id: int,
offset_date: datetime.datetime,
) -> AsyncList[Message]:
return search_all_messages(
self, limit, query=query, offset_id=offset_id, offset_date=offset_date
)
def _find_updates_message( async def pin_message(self, chat: ChatLike, message_id: int) -> Message:
return await pin_message(self, chat, message_id)
async def unpin_message(
self, chat: ChatLike, message_id: Union[int, Literal["all"]]
) -> None:
return await unpin_message(self, chat, message_id)
def _build_message_map(
self, self,
result: abcs.Updates, result: abcs.Updates,
random_id: int, peer: Optional[abcs.InputPeer],
chat: Optional[abcs.InputPeer], ) -> MessageMap:
) -> Message: return build_message_map(self, result, peer)
return find_updates_message(self, result, random_id, chat)
async def set_receive_updates(self) -> None: async def set_receive_updates(self) -> None:
await set_receive_updates(self) await set_receive_updates(self)

View File

@ -1,60 +1,511 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING, Dict, List, Optional, Union import datetime
import sys
from typing import (
TYPE_CHECKING,
Any,
Coroutine,
Dict,
List,
Literal,
Optional,
Tuple,
Union,
)
from ...tl import abcs, types from telethon._impl.client.types.async_list import AsyncList
from telethon._impl.session.chat.packed import PackedChat
from ...tl import abcs, functions, types
from ..parsers import parse_html_message, parse_markdown_message
from ..types.chat import ChatLike
from ..types.message import Message from ..types.message import Message
from ..utils import generate_random_id
if TYPE_CHECKING: if TYPE_CHECKING:
from .client import Client from .client import Client
def iter_messages(self: Client) -> None: def parse_message(
self *,
raise NotImplementedError text: Optional[str] = None,
markdown: Optional[str] = None,
html: Optional[str] = None,
) -> Tuple[str, Optional[List[abcs.MessageEntity]]]:
if sum((text is not None, markdown is not None, html is not None)) != 1:
raise ValueError("must specify exactly one of text, markdown or html")
if text is not None:
parsed, entities = text, None
elif markdown is not None:
parsed, entities = parse_markdown_message(markdown)
elif html is not None:
parsed, entities = parse_html_message(html)
else:
raise RuntimeError("unexpected case")
return parsed, entities or None
async def send_message(self: Client) -> None: async def send_message(
self self: Client,
raise NotImplementedError chat: ChatLike,
*,
text: Optional[str] = None,
markdown: Optional[str] = None,
html: Optional[str] = None,
link_preview: Optional[bool] = None,
) -> Message:
peer = (await self._resolve_to_packed(chat))._to_input_peer()
message, entities = parse_message(text=text, markdown=markdown, html=html)
random_id = generate_random_id()
return self._build_message_map(
await self(
functions.messages.send_message(
no_webpage=not link_preview,
silent=False,
background=False,
clear_draft=False,
noforwards=False,
update_stickersets_order=False,
peer=peer,
reply_to_msg_id=None,
top_msg_id=None,
message=message,
random_id=random_id,
reply_markup=None,
entities=entities,
schedule_date=None,
send_as=None,
)
),
peer,
).with_random_id(random_id)
async def forward_messages(self: Client) -> None: async def edit_message(
self self: Client,
raise NotImplementedError chat: ChatLike,
message_id: int,
*,
text: Optional[str] = None,
markdown: Optional[str] = None,
html: Optional[str] = None,
link_preview: Optional[bool] = None,
) -> Message:
peer = (await self._resolve_to_packed(chat))._to_input_peer()
message, entities = parse_message(text=text, markdown=markdown, html=html)
return self._build_message_map(
await self(
functions.messages.edit_message(
no_webpage=not link_preview,
peer=peer,
id=message_id,
message=message,
media=None,
reply_markup=None,
entities=entities,
schedule_date=None,
)
),
peer,
).with_id(message_id)
async def edit_message(self: Client) -> None: async def delete_messages(
self self: Client, chat: ChatLike, message_ids: List[int], *, revoke: bool = True
raise NotImplementedError ) -> int:
packed_chat = await self._resolve_to_packed(chat)
if packed_chat.is_channel():
affected = await self(
functions.channels.delete_messages(
channel=packed_chat._to_input_channel(), id=message_ids
)
)
else:
affected = await self(
functions.messages.delete_messages(revoke=revoke, id=message_ids)
)
assert isinstance(affected, types.messages.AffectedMessages)
return affected.pts_count
async def delete_messages(self: Client) -> None: async def forward_messages(
self self: Client, target: ChatLike, message_ids: List[int], source: ChatLike
raise NotImplementedError ) -> List[Message]:
to_peer = (await self._resolve_to_packed(target))._to_input_peer()
from_peer = (await self._resolve_to_packed(source))._to_input_peer()
random_ids = [generate_random_id() for _ in message_ids]
map = self._build_message_map(
await self(
functions.messages.forward_messages(
silent=False,
background=False,
with_my_score=False,
drop_author=False,
drop_media_captions=False,
noforwards=False,
from_peer=from_peer,
id=message_ids,
random_id=random_ids,
to_peer=to_peer,
top_msg_id=None,
schedule_date=None,
send_as=None,
)
),
to_peer,
)
return [map.with_random_id(id) for id in random_ids]
async def send_read_acknowledge(self: Client) -> None: class MessageList(AsyncList[Message]):
self def _extend_buffer(self, client: Client, messages: abcs.messages.Messages) -> None:
raise NotImplementedError if isinstance(messages, types.messages.Messages):
self._buffer.extend(Message._from_raw(m) for m in messages.messages)
self._total = len(messages.messages)
self._done = True
elif isinstance(messages, types.messages.MessagesSlice):
self._buffer.extend(Message._from_raw(m) for m in messages.messages)
self._total = messages.count
elif isinstance(messages, types.messages.ChannelMessages):
self._buffer.extend(Message._from_raw(m) for m in messages.messages)
self._total = messages.count
elif isinstance(messages, types.messages.MessagesNotModified):
self._total = messages.count
else:
raise RuntimeError("unexpected case")
def _last_non_empty_message(self) -> Message:
return next(
(
m
for m in reversed(self._buffer)
if not isinstance(m._raw, types.MessageEmpty)
),
Message._from_raw(types.MessageEmpty(id=0, peer_id=None)),
)
async def pin_message(self: Client) -> None: class HistoryList(MessageList):
self def __init__(
raise NotImplementedError self,
client: Client,
chat: ChatLike,
limit: int,
*,
offset_id: int,
offset_date: int,
):
super().__init__()
self._client = client
self._chat = chat
self._peer: Optional[abcs.InputPeer] = None
self._limit = limit
self._offset_id = offset_id
self._offset_date = offset_date
async def _fetch_next(self) -> None:
if self._peer is None:
self._peer = (
await self._client._resolve_to_packed(self._chat)
)._to_input_peer()
result = await self._client(
functions.messages.get_history(
peer=self._peer,
offset_id=self._offset_id,
offset_date=self._offset_date,
add_offset=0,
limit=min(max(self._limit, 1), 100),
max_id=0,
min_id=0,
hash=0,
)
)
self._extend_buffer(self._client, result)
self._limit -= len(self._buffer)
if self._buffer:
last = self._last_non_empty_message()
self._offset_id = self._buffer[-1].id
if (date := getattr(last._raw, "date", None)) is not None:
self._offset_date = date
async def unpin_message(self: Client) -> None: def get_messages(
self self: Client,
raise NotImplementedError chat: ChatLike,
limit: Optional[int] = None,
*,
offset_id: Optional[int],
offset_date: Optional[datetime.datetime],
) -> AsyncList[Message]:
return HistoryList(
self,
chat,
sys.maxsize if limit is None else limit,
offset_id=offset_id or 0,
offset_date=int(offset_date.timestamp()) if offset_date is not None else 0,
)
def find_updates_message( class CherryPickedList(MessageList):
def __init__(
self,
client: Client,
chat: ChatLike,
ids: List[int],
):
super().__init__()
self._client = client
self._chat = chat
self._packed: Optional[PackedChat] = None
self._ids = ids
async def _fetch_next(self) -> None:
if not self._ids:
return
if self._packed is None:
self._packed = await self._client._resolve_to_packed(self._chat)
if self._packed.is_channel():
result = await self._client(
functions.channels.get_messages(
channel=self._packed._to_input_channel(),
id=[types.InputMessageId(id=id) for id in self._ids[:100]],
)
)
else:
result = await self._client(
functions.messages.get_messages(
id=[types.InputMessageId(id=id) for id in self._ids[:100]]
)
)
self._extend_buffer(self._client, result)
self._ids = self._ids[100:]
def get_messages_with_ids(
self: Client,
chat: ChatLike,
message_ids: List[int],
) -> AsyncList[Message]:
return CherryPickedList(self, chat, message_ids)
class SearchList(MessageList):
def __init__(
self,
client: Client,
chat: ChatLike,
limit: int,
*,
query: str,
offset_id: int,
offset_date: int,
):
super().__init__()
self._client = client
self._chat = chat
self._peer: Optional[abcs.InputPeer] = None
self._limit = limit
self._query = query
self._offset_id = offset_id
self._offset_date = offset_date
async def _fetch_next(self) -> None:
if self._peer is None:
self._peer = (
await self._client._resolve_to_packed(self._chat)
)._to_input_peer()
result = await self._client(
functions.messages.search(
peer=self._peer,
q=self._query,
from_id=None,
top_msg_id=None,
filter=types.InputMessagesFilterEmpty(),
min_date=0,
max_date=self._offset_date,
offset_id=self._offset_id,
add_offset=0,
limit=min(max(self._limit, 1), 100),
max_id=0,
min_id=0,
hash=0,
)
)
self._extend_buffer(self._client, result)
self._limit -= len(self._buffer)
if self._buffer:
last = self._last_non_empty_message()
self._offset_id = self._buffer[-1].id
if (date := getattr(last._raw, "date", None)) is not None:
self._offset_date = date
def search_messages(
self: Client,
chat: ChatLike,
limit: Optional[int] = None,
*,
query: Optional[str] = None,
offset_id: int,
offset_date: datetime.datetime,
) -> AsyncList[Message]:
return SearchList(
self,
chat,
sys.maxsize if limit is None else limit,
query=query or "",
offset_id=offset_id or 0,
offset_date=int(offset_date.timestamp()) if offset_date is not None else 0,
)
class GlobalSearchList(MessageList):
def __init__(
self,
client: Client,
limit: int,
*,
query: str,
offset_id: int,
offset_date: int,
):
super().__init__()
self._client = client
self._limit = limit
self._query = query
self._offset_id = offset_id
self._offset_date = offset_date
self._offset_rate = 0
self._offset_peer: abcs.InputPeer = types.InputPeerEmpty()
async def _fetch_next(self) -> None:
result = await self._client(
functions.messages.search_global(
folder_id=None,
q=self._query,
filter=types.InputMessagesFilterEmpty(),
min_date=0,
max_date=self._offset_date,
offset_rate=self._offset_rate,
offset_peer=self._offset_peer,
offset_id=self._offset_id,
limit=min(max(self._limit, 1), 100),
)
)
self._extend_buffer(self._client, result)
self._limit -= len(self._buffer)
if self._buffer:
last = self._last_non_empty_message()
last_packed = last.chat.pack()
self._offset_id = self._buffer[-1].id
if (date := getattr(last._raw, "date", None)) is not None:
self._offset_date = date
if isinstance(result, types.messages.MessagesSlice):
self._offset_rate = result.next_rate or 0
self._offset_peer = (
last_packed._to_input_peer() if last_packed else types.InputPeerEmpty()
)
def search_all_messages(
self: Client,
limit: Optional[int] = None,
*,
query: Optional[str] = None,
offset_id: int,
offset_date: datetime.datetime,
) -> AsyncList[Message]:
return GlobalSearchList(
self,
sys.maxsize if limit is None else limit,
query=query or "",
offset_id=offset_id or 0,
offset_date=int(offset_date.timestamp()) if offset_date is not None else 0,
)
async def pin_message(self: Client, chat: ChatLike, message_id: int) -> Message:
peer = (await self._resolve_to_packed(chat))._to_input_peer()
return self._build_message_map(
await self(
functions.messages.update_pinned_message(
silent=True, unpin=False, pm_oneside=False, peer=peer, id=message_id
)
),
peer,
).get_single()
async def unpin_message(
self: Client, chat: ChatLike, message_id: Union[int, Literal["all"]]
) -> None:
peer = (await self._resolve_to_packed(chat))._to_input_peer()
if message_id == "all":
await self(
functions.messages.unpin_all_messages(
peer=peer,
top_msg_id=None,
)
)
else:
await self(
functions.messages.update_pinned_message(
silent=True, unpin=True, pm_oneside=False, peer=peer, id=message_id
)
)
class MessageMap:
__slots__ = ("_client", "_peer", "_random_id_to_id", "_id_to_message")
def __init__(
self,
client: Client,
peer: Optional[abcs.InputPeer],
random_id_to_id: Dict[int, int],
id_to_message: Dict[int, Message],
) -> None:
self._client = client
self._peer = peer
self._random_id_to_id = random_id_to_id
self._id_to_message = id_to_message
def with_random_id(self, random_id: int) -> Message:
id = self._random_id_to_id.get(random_id)
return self.with_id(id) if id is not None else self._empty()
def with_id(self, id: int) -> Message:
message = self._id_to_message.get(id)
return message if message is not None else self._empty(id)
def get_single(self) -> Message:
if len(self._id_to_message) == 1:
for message in self._id_to_message.values():
return message
return self._empty()
def _empty(self, id: int = 0) -> Message:
return Message._from_raw(
types.MessageEmpty(id=id, peer_id=self._client._input_as_peer(self._peer))
)
def build_message_map(
self: Client, self: Client,
result: abcs.Updates, result: abcs.Updates,
random_id: int, peer: Optional[abcs.InputPeer],
chat: Optional[abcs.InputPeer], ) -> MessageMap:
) -> Message:
if isinstance(result, types.UpdateShort): if isinstance(result, types.UpdateShort):
updates = [result.update] updates = [result.update]
entities: Dict[int, object] = {} entities: Dict[int, object] = {}
@ -63,15 +514,13 @@ def find_updates_message(
entities = {} entities = {}
raise NotImplementedError() raise NotImplementedError()
else: else:
return Message._from_raw( return MessageMap(self, peer, {}, {})
types.MessageEmpty(id=0, peer_id=self._input_as_peer(chat))
)
random_to_id = {} random_id_to_id = {}
id_to_message = {} id_to_message = {}
for update in updates: for update in updates:
if isinstance(update, types.UpdateMessageId): if isinstance(update, types.UpdateMessageId):
random_to_id[update.random_id] = update.id random_id_to_id[update.random_id] = update.id
elif isinstance( elif isinstance(
update, update,
@ -87,9 +536,14 @@ def find_updates_message(
update.message, update.message,
(types.Message, types.MessageService, types.MessageEmpty), (types.Message, types.MessageService, types.MessageEmpty),
) )
id_to_message[update.message.id] = update.message id_to_message[update.message.id] = Message._from_raw(update.message)
elif isinstance(update, types.UpdateMessagePoll): elif isinstance(update, types.UpdateMessagePoll):
raise NotImplementedError() raise NotImplementedError()
return Message._from_raw(id_to_message[random_to_id[random_id]]) return MessageMap(
self,
peer,
random_id_to_id,
id_to_message,
)

View File

@ -0,0 +1,67 @@
import abc
from collections import deque
from typing import Any, Deque, Generator, Generic, List, Self, TypeVar
T = TypeVar("T")
class AsyncList(abc.ABC, Generic[T]):
"""
An asynchronous list.
It can be awaited to get all the items as a normal `list`,
or iterated over via `async for`.
Both approaches will perform as many requests as needed to retrieve the
items, but awaiting will need to do it all at once, which can be slow.
Using asynchronous iteration will perform the requests lazily as needed,
and lets you break out of the loop at any time to stop fetching items.
The `len()` of the asynchronous list will be the "total count" reported
by the server. It does not necessarily reflect how many items will
actually be returned. This count can change as more items are fetched.
"""
def __init__(self) -> None:
self._buffer: Deque[T] = deque()
self._total: int = 0
self._done = False
@abc.abstractmethod
async def _fetch_next(self) -> None:
"""
Fetch the next chunk of items.
The `_buffer` should be extended from the end, not the front.
The `_total` should be updated with the count reported by the server.
The `_done` flag should be set if it is known that the end was reached
"""
async def _collect(self) -> List[T]:
prev = -1
while prev != len(self._buffer):
prev = len(self._buffer)
await self._fetch_next()
return list(self._buffer)
def __await__(self) -> Generator[Any, None, List[T]]:
return self._collect().__await__()
def __aiter__(self) -> Self:
return self
async def __anext__(self) -> T:
if not self._buffer:
if self._done:
raise StopAsyncIteration
await self._fetch_next()
if not self._buffer:
self._done = True
raise StopAsyncIteration
return self._buffer.popleft()
def __len__(self) -> int:
return self._total

View File

@ -1,16 +1,37 @@
from typing import Self import datetime
from typing import Optional, Self
from telethon._impl.tl import abcs
from ...client.types.chat import Chat
from ...tl import abcs, types
from .meta import NoPublicConstructor from .meta import NoPublicConstructor
class Message(metaclass=NoPublicConstructor): class Message(metaclass=NoPublicConstructor):
__slots__ = ("_message",) __slots__ = ("_raw",)
def __init__(self, message: abcs.Message) -> None: def __init__(self, message: abcs.Message) -> None:
self._message = message assert isinstance(
message, (types.Message, types.MessageService, types.MessageEmpty)
)
self._raw = message
@classmethod @classmethod
def _from_raw(cls, message: abcs.Message) -> Self: def _from_raw(cls, message: abcs.Message) -> Self:
return cls._create(message) return cls._create(message)
@property
def id(self) -> int:
return self._raw.id
@property
def date(self) -> Optional[datetime.datetime]:
date = getattr(self._raw, "date", None)
return (
datetime.datetime.fromtimestamp(date, tz=datetime.timezone.utc)
if date is not None
else None
)
@property
def chat(self) -> Chat:
raise NotImplementedError

View File

@ -0,0 +1,11 @@
import time
_last_id = 0
def generate_random_id() -> int:
global _last_id
if _last_id == 0:
_last_id = int(time.time() * 1e9)
_last_id += 1
return _last_id