Use modern typehint syntax

This commit is contained in:
Lonami Exo 2024-03-17 13:06:03 +01:00
parent 9afe2f853d
commit 8267f5d590
72 changed files with 461 additions and 520 deletions

View File

@ -2,7 +2,8 @@ import datetime
import io
import struct
import timeit
from typing import Any, Iterator
from collections.abc import Iterator
from typing import Any
from .data_codegen import DATA, Obj

View File

@ -1,6 +1,6 @@
import sys
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Any, Optional
from setuptools import build_meta as _orig
from setuptools.build_meta import * # noqa: F403 # pyright: ignore [reportWildcardImportFromLibrary]
@ -39,7 +39,7 @@ def gen_types_if_needed() -> None:
def build_wheel( # type: ignore [no-redef]
wheel_directory: str,
config_settings: Optional[Dict[Any, Any]] = None,
config_settings: Optional[dict[Any, Any]] = None,
metadata_directory: Optional[str] = None,
) -> str:
gen_types_if_needed()
@ -47,7 +47,7 @@ def build_wheel( # type: ignore [no-redef]
def build_sdist( # type: ignore [no-redef]
sdist_directory: str, config_settings: Optional[Dict[Any, Any]] = None
sdist_directory: str, config_settings: Optional[dict[Any, Any]] = None
) -> str:
gen_types_if_needed()
return _orig.build_sdist(sdist_directory, config_settings)
@ -55,7 +55,7 @@ def build_sdist( # type: ignore [no-redef]
def build_editable( # type: ignore [no-redef]
wheel_directory: str,
config_settings: Optional[Dict[Any, Any]] = None,
config_settings: Optional[dict[Any, Any]] = None,
metadata_directory: Optional[str] = None,
) -> str:
gen_types_if_needed()

View File

@ -2,7 +2,7 @@ from __future__ import annotations
import getpass
import re
from typing import TYPE_CHECKING, Optional, Union
from typing import TYPE_CHECKING, Optional
from ...crypto import two_factor_auth
from ...mtproto import RpcError
@ -115,9 +115,7 @@ async def request_login_code(self: Client, phone: str) -> LoginToken:
return LoginToken._new(result, phone)
async def sign_in(
self: Client, token: LoginToken, code: str
) -> Union[User, PasswordToken]:
async def sign_in(self: Client, token: LoginToken, code: str) -> User | PasswordToken:
try:
result = await self(
functions.auth.sign_in(
@ -213,7 +211,7 @@ async def get_password_information(client: Client) -> PasswordToken:
async def check_password(
self: Client, token: PasswordToken, password: Union[str, bytes]
self: Client, token: PasswordToken, password: str | bytes
) -> User:
algo = token._password.current_algo
if not isinstance(

View File

@ -1,6 +1,7 @@
from __future__ import annotations
from typing import TYPE_CHECKING, AsyncIterator, List, Optional, Self
from collections.abc import AsyncIterator
from typing import TYPE_CHECKING, Optional, Self
from ...tl import abcs, functions, types
from ..types import ChatLike, InlineResult, NoPublicConstructor
@ -22,7 +23,7 @@ class InlineResults(metaclass=NoPublicConstructor):
self._query = query
self._peer = chat or types.InputPeerEmpty()
self._offset: Optional[str] = ""
self._buffer: List[InlineResult] = []
self._buffer: list[InlineResult] = []
self._done = False
def __aiter__(self) -> Self:

View File

@ -1,7 +1,7 @@
from __future__ import annotations
import datetime
from typing import TYPE_CHECKING, Optional, Sequence, Set
from typing import TYPE_CHECKING, Optional, Sequence
from ...session import PackedChat
from ...tl import abcs, functions, types
@ -32,7 +32,7 @@ class ParticipantList(AsyncList[Participant]):
self._chat = chat
self._packed: Optional[PackedChat] = None
self._offset = 0
self._seen: Set[int] = set()
self._seen: set[int] = set()
async def _fetch_next(self) -> None:
if self._packed is None:

View File

@ -1,24 +1,10 @@
import asyncio
import datetime
import logging
from collections.abc import AsyncIterator, Awaitable, Callable
from pathlib import Path
from types import TracebackType
from typing import (
Any,
AsyncIterator,
Awaitable,
Callable,
Dict,
List,
Literal,
Optional,
Self,
Sequence,
Tuple,
Type,
TypeVar,
Union,
)
from typing import Any, Literal, Optional, Self, Sequence, Type, TypeVar
from ....version import __version__ as default_version
from ...mtsender import Connector, Sender
@ -215,7 +201,7 @@ class Client:
def __init__(
self,
session: Optional[Union[str, Path, Storage]],
session: Optional[str | Path | Storage],
api_id: int,
api_hash: Optional[str] = None,
*,
@ -253,9 +239,9 @@ class Client:
lang_code=lang_code or "en",
catch_up=catch_up or False,
datacenter=datacenter,
flood_sleep_threshold=60
if flood_sleep_threshold is None
else flood_sleep_threshold,
flood_sleep_threshold=(
60 if flood_sleep_threshold is None else flood_sleep_threshold
),
update_queue_limit=update_queue_limit,
base_logger=base_logger,
connector=connector or (lambda ip, port: asyncio.open_connection(ip, port)),
@ -267,11 +253,11 @@ class Client:
self._chat_hashes = ChatHashCache(None)
self._last_update_limit_warn: Optional[float] = None
self._updates: asyncio.Queue[
Tuple[abcs.Update, Dict[int, Chat]]
tuple[abcs.Update, dict[int, Chat]]
] = asyncio.Queue(maxsize=self._config.update_queue_limit or 0)
self._dispatcher: Optional[asyncio.Task[None]] = None
self._handlers: Dict[
Type[Event], List[Tuple[Callable[[Any], Awaitable[Any]], Optional[Filter]]]
self._handlers: dict[
Type[Event], list[tuple[Callable[[Any], Awaitable[Any]], Optional[Filter]]]
] = {}
self._check_all_handlers = check_all_handlers
@ -356,9 +342,7 @@ class Client:
"""
return await bot_sign_in(self, token)
async def check_password(
self, token: PasswordToken, password: Union[str, bytes]
) -> User:
async def check_password(self, token: PasswordToken, password: str | bytes) -> User:
"""
Check the two-factor-authentication (2FA) password.
If it is correct, completes the login.
@ -428,7 +412,7 @@ class Client:
await delete_dialog(self, chat)
async def delete_messages(
self, chat: ChatLike, message_ids: List[int], *, revoke: bool = True
self, chat: ChatLike, message_ids: list[int], *, revoke: bool = True
) -> int:
"""
Delete messages.
@ -484,7 +468,7 @@ class Client:
"""
await disconnect(self)
async def download(self, media: File, file: Union[str, Path, OutFileLike]) -> None:
async def download(self, media: File, file: str | Path | OutFileLike) -> None:
"""
Download a file.
@ -585,7 +569,7 @@ class Client:
markdown: Optional[str] = None,
html: Optional[str] = None,
link_preview: bool = False,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
buttons: Optional[list[btns.Button] | list[list[btns.Button]]] = None,
) -> Message:
"""
Edit a message.
@ -633,8 +617,8 @@ class Client:
)
async def forward_messages(
self, target: ChatLike, message_ids: List[int], source: ChatLike
) -> List[Message]:
self, target: ChatLike, message_ids: list[int], source: ChatLike
) -> list[Message]:
"""
Forward messages from one :term:`chat` to another.
@ -691,8 +675,8 @@ class Client:
return get_admin_log(self, chat)
async def get_chats(
self, chats: Union[List[ChatLike], Tuple[ChatLike, ...]]
) -> List[Chat]:
self, chats: list[ChatLike] | tuple[ChatLike, ...]
) -> list[Chat]:
"""
Get the latest basic information about the given chats.
@ -907,7 +891,7 @@ class Client:
)
def get_messages_with_ids(
self, chat: ChatLike, message_ids: List[int]
self, chat: ChatLike, message_ids: list[int]
) -> AsyncList[Message]:
"""
Get the full message objects from the corresponding message identifiers.
@ -1160,7 +1144,7 @@ class Client:
return prepare_album(self)
async def read_message(
self, chat: ChatLike, message_id: Union[int, Literal["all"]]
self, chat: ChatLike, message_id: int | Literal["all"]
) -> None:
"""
Mark messages as read.
@ -1361,7 +1345,7 @@ class Client:
async def send_audio(
self,
chat: ChatLike,
file: Union[str, Path, InFileLike, File],
file: str | Path | InFileLike | File,
mime_type: Optional[str] = None,
*,
size: Optional[int] = None,
@ -1374,7 +1358,7 @@ class Client:
caption_markdown: Optional[str] = None,
caption_html: Optional[str] = None,
reply_to: Optional[int] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
buttons: Optional[list[btns.Button] | list[list[btns.Button]]] = None,
) -> Message:
"""
Send an audio file.
@ -1424,7 +1408,7 @@ class Client:
async def send_file(
self,
chat: ChatLike,
file: Union[str, Path, InFileLike, File],
file: str | Path | InFileLike | File,
*,
size: Optional[int] = None,
name: Optional[str] = None,
@ -1446,7 +1430,7 @@ class Client:
caption_markdown: Optional[str] = None,
caption_html: Optional[str] = None,
reply_to: Optional[int] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
buttons: Optional[list[btns.Button] | list[list[btns.Button]]] = None,
) -> Message:
"""
Send any type of file with any amount of attributes.
@ -1606,13 +1590,13 @@ class Client:
async def send_message(
self,
chat: ChatLike,
text: Optional[Union[str, Message]] = None,
text: Optional[str | Message] = None,
*,
markdown: Optional[str] = None,
html: Optional[str] = None,
link_preview: bool = False,
reply_to: Optional[int] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
buttons: Optional[list[btns.Button] | list[list[btns.Button]]] = None,
) -> Message:
"""
Send a message.
@ -1653,7 +1637,7 @@ class Client:
async def send_photo(
self,
chat: ChatLike,
file: Union[str, Path, InFileLike, File],
file: str | Path | InFileLike | File,
*,
size: Optional[int] = None,
name: Optional[str] = None,
@ -1665,7 +1649,7 @@ class Client:
caption_markdown: Optional[str] = None,
caption_html: Optional[str] = None,
reply_to: Optional[int] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
buttons: Optional[list[btns.Button] | list[list[btns.Button]]] = None,
) -> Message:
"""
Send a photo file.
@ -1717,7 +1701,7 @@ class Client:
async def send_video(
self,
chat: ChatLike,
file: Union[str, Path, InFileLike, File],
file: str | Path | InFileLike | File,
*,
size: Optional[int] = None,
name: Optional[str] = None,
@ -1732,7 +1716,7 @@ class Client:
caption_markdown: Optional[str] = None,
caption_html: Optional[str] = None,
reply_to: Optional[int] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
buttons: Optional[list[btns.Button] | list[list[btns.Button]]] = None,
) -> Message:
"""
Send a video file.
@ -1947,7 +1931,7 @@ class Client:
"""
await set_participant_restrictions(self, chat, user, restrictions, until=until)
async def sign_in(self, token: LoginToken, code: str) -> Union[User, PasswordToken]:
async def sign_in(self, token: LoginToken, code: str) -> User | PasswordToken:
"""
Sign in to a user account.
@ -1993,7 +1977,7 @@ class Client:
await sign_out(self)
async def unpin_message(
self, chat: ChatLike, message_id: Union[int, Literal["all"]]
self, chat: ChatLike, message_id: int | Literal["all"]
) -> None:
"""
Unpin one or all messages from the top.
@ -2041,8 +2025,8 @@ class Client:
return input_to_peer(self, input)
async def _upload(
self, fd: Union[str, Path, InFileLike], size: Optional[int], name: Optional[str]
) -> Tuple[abcs.InputFile, str]:
self, fd: str | Path | InFileLike, size: Optional[int], name: Optional[str]
) -> tuple[abcs.InputFile, str]:
return await upload(self, fd, size, name)
async def __call__(self, request: Request[Return]) -> Return:

View File

@ -4,7 +4,7 @@ import hashlib
import mimetypes
from inspect import isawaitable
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Optional
from ...tl import abcs, functions, types
from ..types import (
@ -45,7 +45,7 @@ def prepare_album(self: Client) -> AlbumBuilder:
async def send_photo(
self: Client,
chat: ChatLike,
file: Union[str, Path, InFileLike, File],
file: str | Path | InFileLike | File,
*,
size: Optional[int] = None,
name: Optional[str] = None,
@ -57,7 +57,7 @@ async def send_photo(
caption_markdown: Optional[str] = None,
caption_html: Optional[str] = None,
reply_to: Optional[int] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
buttons: Optional[list[btns.Button] | list[list[btns.Button]]] = None,
) -> Message:
return await send_file(
self,
@ -65,9 +65,11 @@ async def send_photo(
file,
size=size,
name=name,
mime_type="image/jpeg" # specific mime doesn't matter, only that it's image
if compress
else mime_type,
mime_type=(
"image/jpeg" # specific mime doesn't matter, only that it's image
if compress
else mime_type
),
compress=compress,
width=width,
height=height,
@ -82,7 +84,7 @@ async def send_photo(
async def send_audio(
self: Client,
chat: ChatLike,
file: Union[str, Path, InFileLike, File],
file: str | Path | InFileLike | File,
mime_type: Optional[str] = None,
*,
size: Optional[int] = None,
@ -95,7 +97,7 @@ async def send_audio(
caption_markdown: Optional[str] = None,
caption_html: Optional[str] = None,
reply_to: Optional[int] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
buttons: Optional[list[btns.Button] | list[list[btns.Button]]] = None,
) -> Message:
return await send_file(
self,
@ -119,7 +121,7 @@ async def send_audio(
async def send_video(
self: Client,
chat: ChatLike,
file: Union[str, Path, InFileLike, File],
file: str | Path | InFileLike | File,
*,
size: Optional[int] = None,
name: Optional[str] = None,
@ -134,7 +136,7 @@ async def send_video(
caption_markdown: Optional[str] = None,
caption_html: Optional[str] = None,
reply_to: Optional[int] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
buttons: Optional[list[btns.Button] | list[list[btns.Button]]],
) -> Message:
return await send_file(
self,
@ -160,7 +162,7 @@ async def send_video(
async def send_file(
self: Client,
chat: ChatLike,
file: Union[str, Path, InFileLike, File],
file: str | Path | InFileLike | File,
*,
size: Optional[int] = None,
name: Optional[str] = None,
@ -182,7 +184,7 @@ async def send_file(
caption_markdown: Optional[str] = None,
caption_html: Optional[str] = None,
reply_to: Optional[int] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
buttons: Optional[list[btns.Button] | list[list[btns.Button]]],
) -> Message:
message, entities = parse_message(
text=caption, markdown=caption_markdown, html=caption_html, allow_empty=True
@ -237,7 +239,7 @@ async def send_file(
# Only bother to calculate attributes when sending documents.
else:
attributes: List[abcs.DocumentAttribute] = []
attributes: list[abcs.DocumentAttribute] = []
attributes.append(types.DocumentAttributeFilename(file_name=name))
if mime_type.startswith("image/"):
@ -290,9 +292,9 @@ async def do_send_file(
chat: ChatLike,
input_media: abcs.InputMedia,
message: str,
entities: Optional[List[abcs.MessageEntity]],
entities: Optional[list[abcs.MessageEntity]],
reply_to: Optional[int],
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]],
buttons: Optional[list[btns.Button] | list[list[btns.Button]]],
) -> Message:
peer = (await client._resolve_to_packed(chat))._to_input_peer()
random_id = generate_random_id()
@ -305,11 +307,11 @@ async def do_send_file(
noforwards=False,
update_stickersets_order=False,
peer=peer,
reply_to=types.InputReplyToMessage(
reply_to_msg_id=reply_to, top_msg_id=None
)
if reply_to
else None,
reply_to=(
types.InputReplyToMessage(reply_to_msg_id=reply_to, top_msg_id=None)
if reply_to
else None
),
media=input_media,
message=message,
random_id=random_id,
@ -325,10 +327,10 @@ async def do_send_file(
async def upload(
client: Client,
file: Union[str, Path, InFileLike],
file: str | Path | InFileLike,
size: Optional[int],
name: Optional[str],
) -> Tuple[abcs.InputFile, str]:
) -> tuple[abcs.InputFile, str]:
# Paths are opened and closed by us. Anything else is *only* read, not closed.
if isinstance(file, (str, Path)):
path = Path(file) if isinstance(file, str) else file
@ -360,7 +362,7 @@ async def do_upload(
part = 0
total_parts = (size + MAX_CHUNK_SIZE - 1) // MAX_CHUNK_SIZE
buffer = bytearray()
to_store: Union[bytearray, bytes] = b""
to_store: bytearray | bytes = b""
hash_md5 = hashlib.md5()
is_big = size > BIG_FILE_SIZE
@ -454,9 +456,7 @@ def get_file_bytes(self: Client, media: File) -> AsyncList[bytes]:
return FileBytesList(self, media)
async def download(
self: Client, media: File, file: Union[str, Path, OutFileLike]
) -> None:
async def download(self: Client, media: File, file: str | Path | OutFileLike) -> None:
fd = OutWrapper(file)
try:
async for chunk in get_file_bytes(self, media):

View File

@ -2,7 +2,7 @@ from __future__ import annotations
import datetime
import sys
from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Self, Union
from typing import TYPE_CHECKING, Literal, Optional, Self
from ...session import PackedChat
from ...tl import abcs, functions, types
@ -17,13 +17,13 @@ if TYPE_CHECKING:
async def send_message(
self: Client,
chat: ChatLike,
text: Optional[Union[str, Message]] = None,
text: Optional[str | Message] = None,
*,
markdown: Optional[str] = None,
html: Optional[str] = None,
link_preview: bool = False,
reply_to: Optional[int] = None,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
buttons: Optional[list[btns.Button] | list[list[btns.Button]]] = None,
) -> Message:
packed = await self._resolve_to_packed(chat)
peer = packed._to_input_peer()
@ -121,7 +121,7 @@ async def edit_message(
markdown: Optional[str] = None,
html: Optional[str] = None,
link_preview: bool = False,
buttons: Optional[Union[List[btns.Button], List[List[btns.Button]]]] = None,
buttons: Optional[list[btns.Button] | list[list[btns.Button]]] = None,
) -> Message:
peer = (await self._resolve_to_packed(chat))._to_input_peer()
message, entities = parse_message(
@ -145,7 +145,7 @@ async def edit_message(
async def delete_messages(
self: Client, chat: ChatLike, message_ids: List[int], *, revoke: bool = True
self: Client, chat: ChatLike, message_ids: list[int], *, revoke: bool = True
) -> int:
packed_chat = await self._resolve_to_packed(chat)
if packed_chat.is_channel():
@ -163,8 +163,8 @@ async def delete_messages(
async def forward_messages(
self: Client, target: ChatLike, message_ids: List[int], source: ChatLike
) -> List[Message]:
self: Client, target: ChatLike, message_ids: list[int], source: ChatLike
) -> list[Message]:
to_peer = (await self._resolve_to_packed(target))._to_input_peer()
from_peer = (await self._resolve_to_packed(source))._to_input_peer()
random_ids = [generate_random_id() for _ in message_ids]
@ -198,7 +198,7 @@ class MessageList(AsyncList[Message]):
def _extend_buffer(
self, client: Client, messages: abcs.messages.Messages
) -> Dict[int, Chat]:
) -> dict[int, Chat]:
if isinstance(messages, types.messages.MessagesNotModified):
self._total = messages.count
return {}
@ -224,7 +224,7 @@ class MessageList(AsyncList[Message]):
def _last_non_empty_message(
self,
) -> Union[types.Message, types.MessageService, types.MessageEmpty]:
) -> types.Message | types.MessageService | types.MessageEmpty:
return next(
(
m._raw
@ -318,13 +318,13 @@ class CherryPickedList(MessageList):
self,
client: Client,
chat: ChatLike,
ids: List[int],
ids: list[int],
):
super().__init__()
self._client = client
self._chat = chat
self._packed: Optional[PackedChat] = None
self._ids: List[abcs.InputMessage] = [types.InputMessageId(id=id) for id in ids]
self._ids: list[abcs.InputMessage] = [types.InputMessageId(id=id) for id in ids]
async def _fetch_next(self) -> None:
if not self._ids:
@ -350,7 +350,7 @@ class CherryPickedList(MessageList):
def get_messages_with_ids(
self: Client,
chat: ChatLike,
message_ids: List[int],
message_ids: list[int],
) -> AsyncList[Message]:
return CherryPickedList(self, chat, message_ids)
@ -509,7 +509,7 @@ async def pin_message(self: Client, chat: ChatLike, message_id: int) -> Message:
async def unpin_message(
self: Client, chat: ChatLike, message_id: Union[int, Literal["all"]]
self: Client, chat: ChatLike, message_id: int | Literal["all"]
) -> None:
peer = (await self._resolve_to_packed(chat))._to_input_peer()
if message_id == "all":
@ -528,7 +528,7 @@ async def unpin_message(
async def read_message(
self: Client, chat: ChatLike, message_id: Union[int, Literal["all"]]
self: Client, chat: ChatLike, message_id: int | Literal["all"]
) -> None:
packed = await self._resolve_to_packed(chat)
if message_id == "all":
@ -555,8 +555,8 @@ class MessageMap:
self,
client: Client,
peer: Optional[abcs.InputPeer],
random_id_to_id: Dict[int, int],
id_to_message: Dict[int, Message],
random_id_to_id: dict[int, int],
id_to_message: dict[int, Message],
) -> None:
self._client = client
self._peer = peer
@ -599,8 +599,8 @@ def build_message_map(
else:
return MessageMap(client, peer, {}, {})
random_id_to_id: Dict[int, int] = {}
id_to_message: Dict[int, Message] = {}
random_id_to_id: dict[int, int] = {}
id_to_message: dict[int, Message] = {}
for update in updates:
if isinstance(update, types.UpdateMessageId):
random_id_to_id[update.random_id] = update.id

View File

@ -6,7 +6,7 @@ import logging
import platform
import re
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, TypeVar
from typing import TYPE_CHECKING, Optional, TypeVar
from ....version import __version__
from ...mtproto import BadStatus, Full, RpcError
@ -75,10 +75,10 @@ def as_concrete_dc_option(opt: abcs.DcOption) -> types.DcOption:
async def connect_sender(
config: Config,
known_dcs: List[DataCenter],
known_dcs: list[DataCenter],
dc: DataCenter,
force_auth_gen: bool = False,
) -> Tuple[Sender, List[DataCenter]]:
) -> tuple[Sender, list[DataCenter]]:
# Only the ID of the input DC may be known.
# Find the corresponding address and authentication key if needed.
addr = dc.ipv4_addr or next(
@ -143,7 +143,7 @@ async def connect_sender(
]
dc_options.sort(key=lambda opt: opt.static, reverse=True)
latest_dcs: Dict[int, DataCenter] = {}
latest_dcs: dict[int, DataCenter] = {}
for opt in dc_options:
dc = latest_dcs.setdefault(opt.id, DataCenter(id=opt.id))
if opt.ipv6:

View File

@ -1,17 +1,8 @@
from __future__ import annotations
import asyncio
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
List,
Optional,
Sequence,
Type,
TypeVar,
)
from collections.abc import Awaitable, Callable
from typing import TYPE_CHECKING, Any, Optional, Sequence, Type, TypeVar
from ...session import Gap
from ...tl import abcs
@ -81,7 +72,7 @@ def set_handler_filter(
handlers[i] = (h, filter)
def process_socket_updates(client: Client, all_updates: List[abcs.Updates]) -> None:
def process_socket_updates(client: Client, all_updates: list[abcs.Updates]) -> None:
if not all_updates:
return
@ -103,7 +94,7 @@ def process_socket_updates(client: Client, all_updates: List[abcs.Updates]) -> N
def extend_update_queue(
client: Client,
updates: List[abcs.Update],
updates: list[abcs.Update],
users: Sequence[abcs.User],
chats: Sequence[abcs.Chat],
) -> None:

View File

@ -1,6 +1,6 @@
from __future__ import annotations
from typing import TYPE_CHECKING, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Optional
from ...mtproto import RpcError
from ...session import PackedChat, PackedType
@ -75,12 +75,12 @@ async def resolve_username(self: Client, username: str) -> Chat:
async def get_chats(
self: Client, chats: Union[List[ChatLike], Tuple[ChatLike, ...]]
) -> List[Chat]:
packed_chats: List[PackedChat] = []
input_users: List[types.InputUser] = []
input_chats: List[int] = []
input_channels: List[types.InputChannel] = []
self: Client, chats: list[ChatLike] | tuple[ChatLike, ...]
) -> list[Chat]:
packed_chats: list[PackedChat] = []
input_users: list[types.InputUser] = []
input_chats: list[int] = []
input_channels: list[types.InputChannel] = []
for chat in chats:
packed = await resolve_to_packed(self, chat)
@ -121,7 +121,7 @@ async def get_chats(
async def resolve_to_packed(
client: Client, chat: Union[ChatLike, abcs.InputPeer, abcs.Peer]
client: Client, chat: ChatLike | abcs.InputPeer | abcs.Peer
) -> PackedChat:
if isinstance(chat, PackedChat):
return chat

View File

@ -1,5 +1,5 @@
import re
from typing import Dict, Tuple, Type
from typing import Type
from ..mtproto import RpcError
@ -20,14 +20,14 @@ def pretty_name(name: str) -> str:
return "".join(map(str.title, name.split("_")))
def from_code(code: int, *, _cache: Dict[int, Type[RpcError]] = {}) -> Type[RpcError]:
def from_code(code: int, *, _cache: dict[int, Type[RpcError]] = {}) -> Type[RpcError]:
code = canonicalize_code(code)
if code not in _cache:
_cache[code] = type(f"Code{code}", (RpcError,), {})
return _cache[code]
def from_name(name: str, *, _cache: Dict[str, Type[RpcError]] = {}) -> Type[RpcError]:
def from_name(name: str, *, _cache: dict[str, Type[RpcError]] = {}) -> Type[RpcError]:
name = canonicalize_name(name)
if name not in _cache:
_cache[name] = type(pretty_name(name), (RpcError,), {})
@ -35,7 +35,7 @@ def from_name(name: str, *, _cache: Dict[str, Type[RpcError]] = {}) -> Type[RpcE
def adapt_rpc(
error: RpcError, *, _cache: Dict[Tuple[int, str], Type[RpcError]] = {}
error: RpcError, *, _cache: dict[tuple[int, str], Type[RpcError]] = {}
) -> RpcError:
code = canonicalize_code(error.code)
name = canonicalize_name(error.name)

View File

@ -1,7 +1,7 @@
from __future__ import annotations
import abc
from typing import TYPE_CHECKING, Dict, Optional, Self
from typing import TYPE_CHECKING, Optional, Self
from ...tl import abcs
from ..types import Chat, NoPublicConstructor
@ -25,7 +25,7 @@ class Event(metaclass=NoPublicConstructor):
@classmethod
@abc.abstractmethod
def _try_from_update(
cls, client: Client, update: abcs.Update, chat_map: Dict[int, Chat]
cls, client: Client, update: abcs.Update, chat_map: dict[int, Chat]
) -> Optional[Self]:
pass

View File

@ -1,6 +1,7 @@
import abc
import typing
from typing import Callable, Tuple, TypeAlias
from collections.abc import Callable
from typing import TypeAlias
from ..event import Event
@ -74,7 +75,7 @@ class Any(Combinable):
self._filters = (filter1, filter2, *filters)
@property
def filters(self) -> Tuple[Filter, ...]:
def filters(self) -> tuple[Filter, ...]:
"""
The filters being checked, in order.
"""
@ -114,7 +115,7 @@ class All(Combinable):
self._filters = (filter1, filter2, *filters)
@property
def filters(self) -> Tuple[Filter, ...]:
def filters(self) -> tuple[Filter, ...]:
"""
The filters being checked, in order.
"""

View File

@ -1,4 +1,4 @@
from typing import Sequence, Set, Type, Union
from typing import Sequence, Type
from ...types import Channel, Group, User
from ..event import Event
@ -18,7 +18,7 @@ class Chats(Combinable):
self._chats = set(chat_ids)
@property
def chat_ids(self) -> Set[int]:
def chat_ids(self) -> set[int]:
"""
A copy of the set of chat identifiers this filter is filtering on.
"""
@ -43,7 +43,7 @@ class Senders(Combinable):
self._senders = set(sender_ids)
@property
def sender_ids(self) -> Set[int]:
def sender_ids(self) -> set[int]:
"""
A copy of the set of sender identifiers this filter is filtering on.
"""
@ -79,12 +79,12 @@ class ChatType(Combinable):
def __init__(
self,
type: Type[Union[User, Group, Channel]],
type: Type[User | Group | Channel],
) -> None:
self._type = type
@property
def type(self) -> Type[Union[User, Group, Channel]]:
def type(self) -> Type[User | Group | Channel]:
"""
The chat type this filter is filtering on.
"""

View File

@ -1,7 +1,7 @@
from __future__ import annotations
import re
from typing import TYPE_CHECKING, Literal, Optional, Tuple, Union
from typing import TYPE_CHECKING, Literal, Optional
from ..event import Event
from .combinators import Combinable
@ -28,7 +28,7 @@ class Text(Combinable):
__slots__ = ("_pattern",)
def __init__(self, regexp: Union[str, re.Pattern[str]]) -> None:
def __init__(self, regexp: str | re.Pattern[str]) -> None:
self._pattern = re.compile(regexp) if isinstance(regexp, str) else regexp
def __call__(self, event: Event) -> bool:
@ -164,7 +164,7 @@ class Media(Combinable):
@property
def types(
self,
) -> Tuple[Literal["photo", "audio", "video"], ...]:
) -> tuple[Literal["photo", "audio", "video"], ...]:
"""
The media types being checked.
"""

View File

@ -1,6 +1,6 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Dict, Optional, Self, Sequence, Union
from typing import TYPE_CHECKING, Optional, Self, Sequence
from ...tl import abcs, types
from ..types import Chat, Message, expand_peer, peer_id
@ -25,7 +25,7 @@ class NewMessage(Event, Message):
@classmethod
def _try_from_update(
cls, client: Client, update: abcs.Update, chat_map: Dict[int, Chat]
cls, client: Client, update: abcs.Update, chat_map: dict[int, Chat]
) -> Optional[Self]:
if isinstance(update, (types.UpdateNewMessage, types.UpdateNewChannelMessage)):
if isinstance(update.message, types.Message):
@ -47,7 +47,7 @@ class MessageEdited(Event, Message):
@classmethod
def _try_from_update(
cls, client: Client, update: abcs.Update, chat_map: Dict[int, Chat]
cls, client: Client, update: abcs.Update, chat_map: dict[int, Chat]
) -> Optional[Self]:
if isinstance(
update, (types.UpdateEditMessage, types.UpdateEditChannelMessage)
@ -75,7 +75,7 @@ class MessageDeleted(Event):
@classmethod
def _try_from_update(
cls, client: Client, update: abcs.Update, chat_map: Dict[int, Chat]
cls, client: Client, update: abcs.Update, chat_map: dict[int, Chat]
) -> Optional[Self]:
if isinstance(update, types.UpdateDeleteMessages):
return cls._create(update.messages, None)
@ -109,13 +109,13 @@ class MessageRead(Event):
def __init__(
self,
client: Client,
update: Union[
types.UpdateReadHistoryInbox,
types.UpdateReadHistoryOutbox,
types.UpdateReadChannelInbox,
types.UpdateReadChannelOutbox,
],
chat_map: Dict[int, Chat],
update: (
types.UpdateReadHistoryInbox
| types.UpdateReadHistoryOutbox
| types.UpdateReadChannelInbox
| types.UpdateReadChannelOutbox
),
chat_map: dict[int, Chat],
) -> None:
self._client = client
self._raw = update
@ -123,7 +123,7 @@ class MessageRead(Event):
@classmethod
def _try_from_update(
cls, client: Client, update: abcs.Update, chat_map: Dict[int, Chat]
cls, client: Client, update: abcs.Update, chat_map: dict[int, Chat]
) -> Optional[Self]:
if isinstance(
update,

View File

@ -1,6 +1,6 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Dict, Optional, Self
from typing import TYPE_CHECKING, Optional, Self
from ...tl import abcs, functions, types
from ..client.messages import CherryPickedList
@ -23,7 +23,7 @@ class ButtonCallback(Event):
self,
client: Client,
update: types.UpdateBotCallbackQuery,
chat_map: Dict[int, Chat],
chat_map: dict[int, Chat],
):
self._client = client
self._raw = update
@ -31,7 +31,7 @@ class ButtonCallback(Event):
@classmethod
def _try_from_update(
cls, client: Client, update: abcs.Update, chat_map: Dict[int, Chat]
cls, client: Client, update: abcs.Update, chat_map: dict[int, Chat]
) -> Optional[Self]:
if isinstance(update, types.UpdateBotCallbackQuery) and update.data is not None:
return cls._create(client, update, chat_map)
@ -105,7 +105,7 @@ class InlineQuery(Event):
@classmethod
def _try_from_update(
cls, client: Client, update: abcs.Update, chat_map: Dict[int, Chat]
cls, client: Client, update: abcs.Update, chat_map: dict[int, Chat]
) -> Optional[Self]:
if isinstance(update, types.UpdateBotInlineQuery):
return cls._create(update)

View File

@ -1,19 +1,8 @@
from collections import deque
from collections.abc import Callable, Iterable
from html import escape
from html.parser import HTMLParser
from typing import (
Any,
Callable,
Deque,
Dict,
Iterable,
List,
Optional,
Tuple,
Type,
Union,
cast,
)
from typing import Any, Optional, Type, cast
from ...tl.abcs import MessageEntity
from ...tl.types import (
@ -37,12 +26,12 @@ class HTMLToTelegramParser(HTMLParser):
def __init__(self) -> None:
super().__init__()
self.text = ""
self.entities: List[MessageEntity] = []
self._building_entities: Dict[str, MessageEntity] = {}
self._open_tags: Deque[str] = deque()
self._open_tags_meta: Deque[Optional[str]] = deque()
self.entities: list[MessageEntity] = []
self._building_entities: dict[str, MessageEntity] = {}
self._open_tags: deque[str] = deque()
self._open_tags_meta: deque[Optional[str]] = deque()
def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None:
def handle_starttag(self, tag: str, attrs: list[tuple[str, Optional[str]]]) -> None:
self._open_tags.appendleft(tag)
self._open_tags_meta.appendleft(None)
@ -128,7 +117,7 @@ class HTMLToTelegramParser(HTMLParser):
self.entities.append(entity)
def parse(html: str) -> Tuple[str, List[MessageEntity]]:
def parse(html: str) -> tuple[str, list[MessageEntity]]:
"""
Parses the given HTML message and returns its stripped representation
plus a list of the MessageEntity's that were found.
@ -144,8 +133,8 @@ def parse(html: str) -> Tuple[str, List[MessageEntity]]:
return del_surrogate(parser.text), parser.entities
ENTITY_TO_FORMATTER: Dict[
Type[MessageEntity], Union[Tuple[str, str], Callable[[Any, str], Tuple[str, str]]]
ENTITY_TO_FORMATTER: dict[
Type[MessageEntity], tuple[str, str] | Callable[[Any, str], tuple[str, str]]
] = {
MessageEntityBold: ("<strong>", "</strong>"),
MessageEntityItalic: ("<em>", "</em>"),
@ -183,7 +172,7 @@ def unparse(text: str, entities: Iterable[MessageEntity]) -> str:
return escape(text)
text = add_surrogate(text)
insert_at: List[Tuple[int, str]] = []
insert_at: list[tuple[int, str]] = []
for e in entities:
offset, length = getattr(e, "offset", None), getattr(e, "length", None)
assert isinstance(offset, int) and isinstance(length, int)

View File

@ -1,5 +1,6 @@
import re
from typing import Any, Dict, Iterator, List, Tuple, Type
from collections.abc import Iterator
from typing import Any, Type
import markdown_it
import markdown_it.token
@ -19,7 +20,7 @@ from ...tl.types import (
from .strings import add_surrogate, del_surrogate, within_surrogate
MARKDOWN = markdown_it.MarkdownIt().enable("strikethrough")
DELIMITERS: Dict[Type[MessageEntity], Tuple[str, str]] = {
DELIMITERS: dict[Type[MessageEntity], tuple[str, str]] = {
MessageEntityBlockquote: ("> ", ""),
MessageEntityBold: ("**", "**"),
MessageEntityCode: ("`", "`"),
@ -44,7 +45,7 @@ HTML_TO_TYPE = {
def expand_inline_and_html(
tokens: List[markdown_it.token.Token],
tokens: list[markdown_it.token.Token],
) -> Iterator[markdown_it.token.Token]:
for token in tokens:
if token.type == "inline":
@ -63,7 +64,7 @@ def expand_inline_and_html(
yield token
def parse(message: str) -> Tuple[str, List[MessageEntity]]:
def parse(message: str) -> tuple[str, list[MessageEntity]]:
"""
Parses the given markdown message and returns its stripped representation
plus a list of the MessageEntity's that were found.
@ -71,7 +72,7 @@ def parse(message: str) -> Tuple[str, List[MessageEntity]]:
if not message:
return message, []
entities: List[MessageEntity]
entities: list[MessageEntity]
token: markdown_it.token.Token
def push(ty: Any, **extra: object) -> None:
@ -145,7 +146,7 @@ def parse(message: str) -> Tuple[str, List[MessageEntity]]:
return del_surrogate(message), entities
def unparse(text: str, entities: List[MessageEntity]) -> str:
def unparse(text: str, entities: list[MessageEntity]) -> str:
"""
Performs the reverse operation to .parse(), effectively returning
markdown-like syntax given a normal text and its MessageEntity's.
@ -157,7 +158,7 @@ def unparse(text: str, entities: List[MessageEntity]) -> str:
return text
text = add_surrogate(text)
insert_at: List[Tuple[int, str]] = []
insert_at: list[tuple[int, str]] = []
for e in entities:
offset, length = getattr(e, "offset", None), getattr(e, "length", None)
assert isinstance(offset, int) and isinstance(length, int)

View File

@ -1,7 +1,6 @@
from __future__ import annotations
from enum import Enum
from typing import Set
from ...tl import abcs, types
@ -62,7 +61,7 @@ class AdminRight(Enum):
"""Allows deleting stories in a channel."""
@classmethod
def _from_raw(cls, rights: abcs.ChatAdminRights) -> Set[AdminRight]:
def _from_raw(cls, rights: abcs.ChatAdminRights) -> set[AdminRight]:
assert isinstance(rights, types.ChatAdminRights)
all_rights = (
cls.CHANGE_INFO if rights.change_info else None,
@ -84,7 +83,7 @@ class AdminRight(Enum):
return set(filter(None, iter(all_rights)))
@classmethod
def _chat_rights(cls) -> Set[AdminRight]:
def _chat_rights(cls) -> set[AdminRight]:
return {
cls.CHANGE_INFO,
cls.POST_MESSAGES,
@ -104,7 +103,7 @@ class AdminRight(Enum):
}
@classmethod
def _set_to_raw(cls, all_rights: Set[AdminRight]) -> types.ChatAdminRights:
def _set_to_raw(cls, all_rights: set[AdminRight]) -> types.ChatAdminRights:
return types.ChatAdminRights(
change_info=cls.CHANGE_INFO in all_rights,
post_messages=cls.POST_MESSAGES in all_rights,

View File

@ -2,7 +2,7 @@ from __future__ import annotations
import mimetypes
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Union
from typing import TYPE_CHECKING, Optional
from ...tl import abcs, functions, types
from .chat import ChatLike
@ -23,11 +23,11 @@ class AlbumBuilder(metaclass=NoPublicConstructor):
def __init__(self, *, client: Client):
self._client = client
self._medias: List[types.InputSingleMedia] = []
self._medias: list[types.InputSingleMedia] = []
async def add_photo(
self,
file: Union[str, Path, InFileLike],
file: str | Path | InFileLike,
*,
size: Optional[int] = None,
caption: Optional[str] = None,
@ -90,7 +90,7 @@ class AlbumBuilder(metaclass=NoPublicConstructor):
async def add_video(
self,
file: Union[str, Path, InFileLike],
file: str | Path | InFileLike,
*,
size: Optional[int] = None,
name: Optional[str] = None,
@ -141,7 +141,7 @@ class AlbumBuilder(metaclass=NoPublicConstructor):
if mime_type is None:
mime_type = "application/octet-stream"
attributes: List[abcs.DocumentAttribute] = []
attributes: list[abcs.DocumentAttribute] = []
attributes.append(types.DocumentAttributeFilename(file_name=name))
if duration is not None and width is not None and height is not None:
attributes.append(
@ -198,7 +198,7 @@ class AlbumBuilder(metaclass=NoPublicConstructor):
async def send(
self, chat: ChatLike, *, reply_to: Optional[int] = None
) -> List[Message]:
) -> list[Message]:
"""
Send the album.
@ -224,11 +224,13 @@ class AlbumBuilder(metaclass=NoPublicConstructor):
noforwards=False,
update_stickersets_order=False,
peer=peer,
reply_to=types.InputReplyToMessage(
reply_to_msg_id=reply_to, top_msg_id=None
)
if reply_to
else None,
reply_to=(
types.InputReplyToMessage(
reply_to_msg_id=reply_to, top_msg_id=None
)
if reply_to
else None
),
multi_media=self._medias,
schedule_date=None,
send_as=None,

View File

@ -1,6 +1,7 @@
import abc
from collections import deque
from typing import Any, Deque, Generator, Generic, List, Self, TypeVar
from collections.abc import Generator
from typing import Any, Generic, Self, TypeVar
T = TypeVar("T")
@ -40,7 +41,7 @@ class AsyncList(abc.ABC, Generic[T]):
"""
def __init__(self) -> None:
self._buffer: Deque[T] = deque()
self._buffer: deque[T] = deque()
self._total: int = 0
self._done = False
@ -54,14 +55,14 @@ class AsyncList(abc.ABC, Generic[T]):
The `_done` flag should be set if it is known that the end was reached
"""
async def _collect(self) -> List[T]:
async def _collect(self) -> list[T]:
prev = -1
while not self._done and prev != len(self._buffer):
prev = len(self._buffer)
await self._fetch_next()
return list(self._buffer)
def __await__(self) -> Generator[Any, None, List[T]]:
def __await__(self) -> Generator[Any, None, list[T]]:
return self._collect().__await__()
def __aiter__(self) -> Self:

View File

@ -1,7 +1,7 @@
from __future__ import annotations
import weakref
from typing import TYPE_CHECKING, List, Optional, Union
from typing import TYPE_CHECKING, Optional
from ....tl import abcs, types
from .button import Button
@ -24,7 +24,7 @@ def as_concrete_row(row: abcs.KeyboardButtonRow) -> types.KeyboardButtonRow:
def build_keyboard(
btns: Optional[Union[List[Button], List[List[Button]]]]
btns: Optional[list[Button] | list[list[Button]]],
) -> Optional[abcs.ReplyMarkup]:
# list[button] -> list[list[button]]
# This does allow for "invalid" inputs (mixing lists and non-lists), but that's acceptable.
@ -37,7 +37,7 @@ def build_keyboard(
if not buttons_lists:
return None
rows: List[abcs.KeyboardButtonRow] = [
rows: list[abcs.KeyboardButtonRow] = [
types.KeyboardButtonRow(buttons=[btn._raw for btn in btns])
for btns in buttons_lists
]

View File

@ -1,7 +1,7 @@
from __future__ import annotations
import weakref
from typing import TYPE_CHECKING, Optional, Union
from typing import TYPE_CHECKING, Optional
from ....tl import types
@ -9,24 +9,24 @@ if TYPE_CHECKING:
from ..message import Message
ButtonTypes = Union[
types.KeyboardButton,
types.KeyboardButtonUrl,
types.KeyboardButtonCallback,
types.KeyboardButtonRequestPhone,
types.KeyboardButtonRequestGeoLocation,
types.KeyboardButtonSwitchInline,
types.KeyboardButtonGame,
types.KeyboardButtonBuy,
types.KeyboardButtonUrlAuth,
types.InputKeyboardButtonUrlAuth,
types.KeyboardButtonRequestPoll,
types.InputKeyboardButtonUserProfile,
types.KeyboardButtonUserProfile,
types.KeyboardButtonWebView,
types.KeyboardButtonSimpleWebView,
types.KeyboardButtonRequestPeer,
]
ButtonTypes = (
types.KeyboardButton
| types.KeyboardButtonUrl
| types.KeyboardButtonCallback
| types.KeyboardButtonRequestPhone
| types.KeyboardButtonRequestGeoLocation
| types.KeyboardButtonSwitchInline
| types.KeyboardButtonGame
| types.KeyboardButtonBuy
| types.KeyboardButtonUrlAuth
| types.InputKeyboardButtonUrlAuth
| types.KeyboardButtonRequestPoll
| types.InputKeyboardButtonUserProfile
| types.KeyboardButtonUserProfile
| types.KeyboardButtonWebView
| types.KeyboardButtonSimpleWebView
| types.KeyboardButtonRequestPeer
)
class Button:

View File

@ -3,7 +3,7 @@ from __future__ import annotations
import itertools
import sys
from collections import defaultdict
from typing import TYPE_CHECKING, DefaultDict, Dict, List, Optional, Sequence, Union
from typing import TYPE_CHECKING, Optional, Sequence
from ....session import PackedChat
from ....tl import abcs, types
@ -15,12 +15,12 @@ from .user import User
if TYPE_CHECKING:
from ...client.client import Client
ChatLike = Union[Chat, PackedChat, int, str]
ChatLike = Chat | PackedChat | int | str
def build_chat_map(
client: Client, users: Sequence[abcs.User], chats: Sequence[abcs.Chat]
) -> Dict[int, Chat]:
) -> dict[int, Chat]:
users_iter = (User._from_raw(u) for u in users)
chats_iter = (
(
@ -31,11 +31,11 @@ def build_chat_map(
for c in chats
)
result: Dict[int, Chat] = {c.id: c for c in itertools.chain(users_iter, chats_iter)}
result: dict[int, Chat] = {c.id: c for c in itertools.chain(users_iter, chats_iter)}
if len(result) != len(users) + len(chats):
# The fabled ID collision between different chat types.
counter: DefaultDict[int, List[Union[abcs.User, abcs.Chat]]] = defaultdict(list)
counter: defaultdict[int, list[abcs.User | abcs.Chat]] = defaultdict(list)
for user in users:
if (id := getattr(user, "id", None)) is not None:
counter[id].append(user)

View File

@ -1,4 +1,4 @@
from typing import Optional, Self, Union
from typing import Optional, Self
from ....session import PackedChat, PackedType
from ....tl import abcs, types
@ -16,7 +16,7 @@ class Channel(Chat, metaclass=NoPublicConstructor):
def __init__(
self,
raw: Union[types.Channel, types.ChannelForbidden],
raw: types.Channel | types.ChannelForbidden,
) -> None:
self._raw = raw
@ -55,9 +55,11 @@ class Channel(Chat, metaclass=NoPublicConstructor):
return None
else:
return PackedChat(
ty=PackedType.GIGAGROUP
if getattr(self._raw, "gigagroup", False)
else PackedType.BROADCAST,
ty=(
PackedType.GIGAGROUP
if getattr(self._raw, "gigagroup", False)
else PackedType.BROADCAST
),
id=self._raw.id,
access_hash=self._raw.access_hash,
)

View File

@ -1,7 +1,7 @@
from __future__ import annotations
import datetime
from typing import TYPE_CHECKING, Optional, Self, Sequence, Union
from typing import TYPE_CHECKING, Optional, Self, Sequence
from ....session import PackedChat, PackedType
from ....tl import abcs, types
@ -24,13 +24,13 @@ class Group(Chat, metaclass=NoPublicConstructor):
def __init__(
self,
client: Client,
chat: Union[
types.ChatEmpty,
types.Chat,
types.ChatForbidden,
types.Channel,
types.ChannelForbidden,
],
chat: (
types.ChatEmpty
| types.Chat
| types.ChatForbidden
| types.Channel
| types.ChannelForbidden
),
) -> None:
self._client = client
self._raw = chat

View File

@ -1,7 +1,6 @@
from __future__ import annotations
from enum import Enum
from typing import Set
from ...tl import abcs, types
@ -85,7 +84,7 @@ class ChatRestriction(Enum):
"""Prevents sending plain text messages with no media to the chat."""
@classmethod
def _from_raw(cls, rights: abcs.ChatBannedRights) -> Set[ChatRestriction]:
def _from_raw(cls, rights: abcs.ChatBannedRights) -> set[ChatRestriction]:
assert isinstance(rights, types.ChatBannedRights)
restrictions = (
cls.VIEW_MESSAGES if rights.view_messages else None,
@ -113,7 +112,7 @@ class ChatRestriction(Enum):
@classmethod
def _set_to_raw(
cls, restrictions: Set[ChatRestriction], until_date: int
cls, restrictions: set[ChatRestriction], until_date: int
) -> types.ChatBannedRights:
return types.ChatBannedRights(
view_messages=cls.VIEW_MESSAGES in restrictions,

View File

@ -1,6 +1,6 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Dict, Optional, Self, Union
from typing import TYPE_CHECKING, Optional, Self
from ...tl import abcs, types
from .chat import Chat, peer_id
@ -26,9 +26,9 @@ class Dialog(metaclass=NoPublicConstructor):
def __init__(
self,
client: Client,
raw: Union[types.Dialog, types.DialogFolder],
chat_map: Dict[int, Chat],
msg_map: Dict[int, Message],
raw: types.Dialog | types.DialogFolder,
chat_map: dict[int, Chat],
msg_map: dict[int, Message],
) -> None:
self._client = client
self._raw = raw
@ -40,8 +40,8 @@ class Dialog(metaclass=NoPublicConstructor):
cls,
client: Client,
dialog: abcs.Dialog,
chat_map: Dict[int, Chat],
msg_map: Dict[int, Message],
chat_map: dict[int, Chat],
msg_map: dict[int, Message],
) -> Self:
assert isinstance(dialog, (types.Dialog, types.DialogFolder))
return cls._create(client, dialog, chat_map, msg_map)

View File

@ -1,7 +1,7 @@
from __future__ import annotations
import datetime
from typing import TYPE_CHECKING, Dict, Optional, Self
from typing import TYPE_CHECKING, Optional, Self
from ...session import PackedChat
from ...tl import abcs, functions, types
@ -27,7 +27,7 @@ class Draft(metaclass=NoPublicConstructor):
peer: abcs.Peer,
top_msg_id: Optional[int],
raw: abcs.DraftMessage,
chat_map: Dict[int, Chat],
chat_map: dict[int, Chat],
) -> None:
assert isinstance(raw, (types.DraftMessage, types.DraftMessageEmpty))
self._client = client
@ -38,7 +38,7 @@ class Draft(metaclass=NoPublicConstructor):
@classmethod
def _from_raw_update(
cls, client: Client, draft: types.UpdateDraftMessage, chat_map: Dict[int, Chat]
cls, client: Client, draft: types.UpdateDraftMessage, chat_map: dict[int, Chat]
) -> Self:
return cls._create(client, draft.peer, draft.top_msg_id, draft.draft, chat_map)
@ -49,7 +49,7 @@ class Draft(metaclass=NoPublicConstructor):
peer: abcs.Peer,
top_msg_id: int,
draft: abcs.DraftMessage,
chat_map: Dict[int, Chat],
chat_map: dict[int, Chat],
) -> Self:
return cls._create(client, peer, top_msg_id, draft, chat_map)
@ -197,11 +197,11 @@ class Draft(metaclass=NoPublicConstructor):
noforwards=False,
update_stickersets_order=False,
peer=peer,
reply_to=types.InputReplyToMessage(
reply_to_msg_id=reply_to, top_msg_id=None
)
if reply_to
else None,
reply_to=(
types.InputReplyToMessage(reply_to_msg_id=reply_to, top_msg_id=None)
if reply_to
else None
),
message=message,
random_id=random_id,
reply_markup=None,
@ -216,19 +216,23 @@ class Draft(metaclass=NoPublicConstructor):
{},
out=result.out,
id=result.id,
from_id=types.PeerUser(user_id=self._client._session.user.id)
if self._client._session.user
else None,
from_id=(
types.PeerUser(user_id=self._client._session.user.id)
if self._client._session.user
else None
),
peer_id=packed._to_peer(),
reply_to=types.MessageReplyHeader(
reply_to_scheduled=False,
forum_topic=False,
reply_to_msg_id=reply_to,
reply_to_peer_id=None,
reply_to_top_id=None,
)
if reply_to
else None,
reply_to=(
types.MessageReplyHeader(
reply_to_scheduled=False,
forum_topic=False,
reply_to_msg_id=reply_to,
reply_to_peer_id=None,
reply_to_top_id=None,
)
if reply_to
else None
),
date=result.date,
message=message,
media=result.media,

View File

@ -2,20 +2,11 @@ from __future__ import annotations
import mimetypes
import urllib.parse
from collections.abc import Coroutine
from inspect import isawaitable
from io import BufferedWriter
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Coroutine,
List,
Optional,
Protocol,
Self,
Sequence,
Union,
)
from typing import TYPE_CHECKING, Any, Optional, Protocol, Self, Sequence
from ...tl import abcs, types
from .meta import NoPublicConstructor
@ -79,7 +70,7 @@ def photo_size_dimensions(
raise RuntimeError("unexpected case")
def try_get_url_path(maybe_url: Union[str, Path, InFileLike]) -> Optional[str]:
def try_get_url_path(maybe_url: str | Path | InFileLike) -> Optional[str]:
if not isinstance(maybe_url, str):
return None
lowercase = maybe_url.lower()
@ -97,7 +88,7 @@ class InFileLike(Protocol):
It's only used in function parameters.
"""
def read(self, n: int, /) -> Union[bytes, Coroutine[Any, Any, bytes]]:
def read(self, n: int, /) -> bytes | Coroutine[Any, Any, bytes]:
"""
Read from the file or buffer.
@ -116,7 +107,7 @@ class OutFileLike(Protocol):
It's only used in function parameters.
"""
def write(self, data: bytes) -> Union[Any, Coroutine[Any, Any, Any]]:
def write(self, data: bytes) -> Any | Coroutine[Any, Any, Any]:
"""
Write all the data into the file or buffer.
@ -128,10 +119,10 @@ class OutFileLike(Protocol):
class OutWrapper:
__slots__ = ("_fd", "_owned_fd")
_fd: Union[OutFileLike, BufferedWriter]
_fd: OutFileLike | BufferedWriter
_owned_fd: Optional[BufferedWriter]
def __init__(self, file: Union[str, Path, OutFileLike]):
def __init__(self, file: str | Path | OutFileLike):
if isinstance(file, str):
file = Path(file)
@ -173,7 +164,7 @@ class File(metaclass=NoPublicConstructor):
input_media: abcs.InputMedia,
thumb: Optional[abcs.PhotoSize],
thumbs: Optional[Sequence[abcs.PhotoSize]],
raw: Optional[Union[abcs.MessageMedia, abcs.Photo, abcs.Document]],
raw: Optional[abcs.MessageMedia | abcs.Photo | abcs.Document],
client: Optional[Client],
):
self._attributes = attributes
@ -336,7 +327,7 @@ class File(metaclass=NoPublicConstructor):
return mimetypes.guess_extension(self._mime) or ""
@property
def thumbnails(self) -> List[File]:
def thumbnails(self) -> list[File]:
"""
The file thumbnails.
@ -393,7 +384,7 @@ class File(metaclass=NoPublicConstructor):
return None
async def download(self, file: Union[str, Path, OutFileLike]) -> None:
async def download(self, file: str | Path | OutFileLike) -> None:
"""
Alias for :meth:`telethon.Client.download`.

View File

@ -1,6 +1,6 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Union
from typing import TYPE_CHECKING, Optional
from ...tl import abcs, functions, types
from .chat import ChatLike
@ -22,7 +22,7 @@ class InlineResult(metaclass=NoPublicConstructor):
self,
client: Client,
results: types.messages.BotResults,
result: Union[types.BotInlineMediaResult, types.BotInlineResult],
result: types.BotInlineMediaResult | types.BotInlineResult,
default_peer: abcs.InputPeer,
):
self._client = client

View File

@ -2,17 +2,7 @@ from __future__ import annotations
import datetime
import time
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Optional,
Self,
Sequence,
Tuple,
Union,
)
from typing import TYPE_CHECKING, Any, Optional, Self, Sequence
from ...tl import abcs, types
from ..parsers import (
@ -68,7 +58,7 @@ class Message(metaclass=NoPublicConstructor):
"""
def __init__(
self, client: Client, message: abcs.Message, chat_map: Dict[int, Chat]
self, client: Client, message: abcs.Message, chat_map: dict[int, Chat]
) -> None:
assert isinstance(
message, (types.Message, types.MessageService, types.MessageEmpty)
@ -79,7 +69,7 @@ class Message(metaclass=NoPublicConstructor):
@classmethod
def _from_raw(
cls, client: Client, message: abcs.Message, chat_map: Dict[int, Chat]
cls, client: Client, message: abcs.Message, chat_map: dict[int, Chat]
) -> Self:
return cls._create(client, message, chat_map)
@ -87,14 +77,14 @@ class Message(metaclass=NoPublicConstructor):
def _from_defaults(
cls,
client: Client,
chat_map: Dict[int, Chat],
chat_map: dict[int, Chat],
id: int,
peer_id: abcs.Peer,
date: int,
message: str,
**kwargs: Any,
) -> Self:
default_kwargs: Dict[str, Any] = {
default_kwargs: dict[str, Any] = {
"out": False,
"mentioned": False,
"media_unread": False,
@ -337,12 +327,12 @@ class Message(metaclass=NoPublicConstructor):
async def respond(
self,
text: Optional[Union[str, Message]] = None,
text: Optional[str | Message] = None,
*,
markdown: Optional[str] = None,
html: Optional[str] = None,
link_preview: bool = False,
buttons: Optional[Union[List[Button], List[List[Button]]]] = None,
buttons: Optional[list[Button] | list[list[Button]]] = None,
) -> Message:
"""
Alias for :meth:`telethon.Client.send_message`.
@ -364,12 +354,12 @@ class Message(metaclass=NoPublicConstructor):
async def reply(
self,
text: Optional[Union[str, Message]] = None,
text: Optional[str | Message] = None,
*,
markdown: Optional[str] = None,
html: Optional[str] = None,
link_preview: bool = False,
buttons: Optional[Union[List[Button], List[List[Button]]]] = None,
buttons: Optional[list[Button] | list[list[Button]]] = None,
) -> Message:
"""
Alias for :meth:`telethon.Client.send_message` with the ``reply_to`` parameter set to this message.
@ -404,7 +394,7 @@ class Message(metaclass=NoPublicConstructor):
markdown: Optional[str] = None,
html: Optional[str] = None,
link_preview: bool = False,
buttons: Optional[Union[List[Button], List[List[Button]]]] = None,
buttons: Optional[list[Button] | list[list[Button]]] = None,
) -> Message:
"""
Alias for :meth:`telethon.Client.edit_message`.
@ -458,7 +448,7 @@ class Message(metaclass=NoPublicConstructor):
pass
@property
def buttons(self) -> Optional[List[List[Button]]]:
def buttons(self) -> Optional[list[list[Button]]]:
"""
The buttons attached to the message.
@ -512,8 +502,8 @@ class Message(metaclass=NoPublicConstructor):
def build_msg_map(
client: Client, messages: Sequence[abcs.Message], chat_map: Dict[int, Chat]
) -> Dict[int, Message]:
client: Client, messages: Sequence[abcs.Message], chat_map: dict[int, Chat]
) -> dict[int, Message]:
return {
msg.id: msg
for msg in (Message._from_raw(client, m, chat_map) for m in messages)
@ -526,7 +516,7 @@ def parse_message(
markdown: Optional[str],
html: Optional[str],
allow_empty: bool,
) -> Tuple[str, Optional[List[abcs.MessageEntity]]]:
) -> tuple[str, Optional[list[abcs.MessageEntity]]]:
cnt = sum((text is not None, markdown is not None, html is not None))
if cnt != 1:
if cnt == 0 and allow_empty:

View File

@ -1,7 +1,7 @@
from __future__ import annotations
import datetime
from typing import TYPE_CHECKING, Dict, Optional, Self, Sequence, Set, Union
from typing import TYPE_CHECKING, Optional, Self, Sequence
from ...session import PackedChat
from ...tl import abcs, types
@ -25,18 +25,18 @@ class Participant(metaclass=NoPublicConstructor):
self,
client: Client,
chat: PackedChat,
participant: Union[
types.ChannelParticipant,
types.ChannelParticipantSelf,
types.ChannelParticipantCreator,
types.ChannelParticipantAdmin,
types.ChannelParticipantBanned,
types.ChannelParticipantLeft,
types.ChatParticipant,
types.ChatParticipantCreator,
types.ChatParticipantAdmin,
],
chat_map: Dict[int, Chat],
participant: (
types.ChannelParticipant
| types.ChannelParticipantSelf
| types.ChannelParticipantCreator
| types.ChannelParticipantAdmin
| types.ChannelParticipantBanned
| types.ChannelParticipantLeft
| types.ChatParticipant
| types.ChatParticipantCreator
| types.ChatParticipantAdmin
),
chat_map: dict[int, Chat],
) -> None:
self._client = client
self._chat = chat
@ -49,7 +49,7 @@ class Participant(metaclass=NoPublicConstructor):
client: Client,
chat: PackedChat,
participant: abcs.ChannelParticipant,
chat_map: Dict[int, Chat],
chat_map: dict[int, Chat],
) -> Self:
if isinstance(
participant,
@ -72,7 +72,7 @@ class Participant(metaclass=NoPublicConstructor):
client: Client,
chat: PackedChat,
participant: abcs.ChatParticipant,
chat_map: Dict[int, Chat],
chat_map: dict[int, Chat],
) -> Self:
if isinstance(
participant,
@ -162,7 +162,7 @@ class Participant(metaclass=NoPublicConstructor):
)
@property
def admin_rights(self) -> Optional[Set[AdminRight]]:
def admin_rights(self) -> Optional[set[AdminRight]]:
"""
The set of administrator rights this participant has been granted, if they are an administrator.
"""
@ -178,7 +178,7 @@ class Participant(metaclass=NoPublicConstructor):
return None
@property
def restrictions(self) -> Optional[Set[ChatRestriction]]:
def restrictions(self) -> Optional[set[ChatRestriction]]:
"""
The set of restrictions applied to this participant, if they are banned.
"""

View File

@ -1,5 +1,3 @@
from typing import Dict
from ...tl import abcs, types
from .chat import Chat
from .meta import NoPublicConstructor
@ -17,7 +15,7 @@ class RecentAction(metaclass=NoPublicConstructor):
def __init__(
self,
event: abcs.ChannelAdminLogEvent,
chat_map: Dict[int, Chat],
chat_map: dict[int, Chat],
) -> None:
assert isinstance(event, types.ChannelAdminLogEvent)
self._raw = event

View File

@ -1,9 +1,8 @@
from math import gcd
from random import randrange
from typing import Tuple
def factorize(pq: int) -> Tuple[int, int]:
def factorize(pq: int) -> tuple[int, int]:
"""
Factorize the given number into its two prime factors.

View File

@ -3,7 +3,6 @@ import struct
import time
from dataclasses import dataclass
from hashlib import sha1
from typing import Tuple
from ..crypto import (
RSA_KEYS,
@ -67,17 +66,17 @@ class DhGenData:
nonce_number: int
def _do_step1(random_bytes: bytes) -> Tuple[bytes, Step1]:
def _do_step1(random_bytes: bytes) -> tuple[bytes, Step1]:
assert len(random_bytes) == 16
nonce = int.from_bytes(random_bytes)
return req_pq_multi(nonce=nonce), Step1(nonce=nonce)
def step1() -> Tuple[bytes, Step1]:
def step1() -> tuple[bytes, Step1]:
return _do_step1(os.urandom(16))
def _do_step2(data: Step1, response: bytes, random_bytes: bytes) -> Tuple[bytes, Step2]:
def _do_step2(data: Step1, response: bytes, random_bytes: bytes) -> tuple[bytes, Step2]:
assert len(random_bytes) == 288
nonce = data.nonce
res_pq = ResPq.from_bytes(response)
@ -130,13 +129,13 @@ def _do_step2(data: Step1, response: bytes, random_bytes: bytes) -> Tuple[bytes,
), Step2(nonce=nonce, server_nonce=res_pq.server_nonce, new_nonce=new_nonce)
def step2(data: Step1, response: bytes) -> Tuple[bytes, Step2]:
def step2(data: Step1, response: bytes) -> tuple[bytes, Step2]:
return _do_step2(data, response, os.urandom(288))
def _do_step3(
data: Step2, response: bytes, random_bytes: bytes, now: int
) -> Tuple[bytes, Step3]:
) -> tuple[bytes, Step3]:
assert len(random_bytes) == 272
nonce = data.nonce
@ -231,7 +230,7 @@ def _do_step3(
)
def step3(data: Step2, response: bytes) -> Tuple[bytes, Step3]:
def step3(data: Step2, response: bytes) -> tuple[bytes, Step3]:
return _do_step3(data, response, os.urandom(272), int(time.time()))

View File

@ -2,7 +2,7 @@ import logging
import os
import struct
import time
from typing import List, Optional, Tuple
from typing import Optional
from ...crypto import AuthKey, decrypt_data_v2, encrypt_data_v2
from ...tl.core import Reader
@ -107,12 +107,12 @@ class Encrypted(Mtp):
) -> None:
self._auth_key = auth_key
self._time_offset: int = time_offset or 0
self._salts: List[FutureSalt] = [
self._salts: list[FutureSalt] = [
FutureSalt(valid_since=0, valid_until=0x7FFFFFFF, salt=first_salt or 0)
]
self._start_salt_time: Optional[Tuple[int, float]] = None
self._start_salt_time: Optional[tuple[int, float]] = None
self._compression_threshold = compression_threshold
self._deserialization: List[Deserialization] = []
self._deserialization: list[Deserialization] = []
self._buffer = bytearray()
self._salt_request_msg_id: Optional[int] = None
@ -141,7 +141,7 @@ class Encrypted(Mtp):
self._client_id: int
self._sequence: int
self._last_msg_id: int
self._in_pending_ack: List[int] = []
self._in_pending_ack: list[int] = []
self._msg_count: int
self._reset_session()
@ -196,7 +196,7 @@ class Encrypted(Mtp):
def _get_current_salt(self) -> int:
return self._salts[-1].salt if self._salts else 0
def _finalize_plain(self) -> Optional[Tuple[MsgId, bytes]]:
def _finalize_plain(self) -> Optional[tuple[MsgId, bytes]]:
if not self._msg_count:
return None
@ -431,7 +431,7 @@ class Encrypted(Mtp):
return self._serialize_msg(body, True)
def finalize(self) -> Optional[Tuple[MsgId, bytes]]:
def finalize(self) -> Optional[tuple[MsgId, bytes]]:
result = self._finalize_plain()
if not result:
return None
@ -441,7 +441,7 @@ class Encrypted(Mtp):
def deserialize(
self, payload: bytes | bytearray | memoryview
) -> List[Deserialization]:
) -> list[Deserialization]:
check_message_buffer(payload)
plaintext = decrypt_data_v2(payload, self._auth_key)

View File

@ -1,5 +1,5 @@
import struct
from typing import List, Optional, Tuple
from typing import Optional
from ..utils import check_message_buffer
from .types import Deserialization, MsgId, Mtp, RpcResult
@ -23,7 +23,7 @@ class Plain(Mtp):
self._buffer += request # message_data
return msg_id
def finalize(self) -> Optional[Tuple[MsgId, bytes]]:
def finalize(self) -> Optional[tuple[MsgId, bytes]]:
if not self._buffer:
return None
@ -33,7 +33,7 @@ class Plain(Mtp):
def deserialize(
self, payload: bytes | bytearray | memoryview
) -> List[Deserialization]:
) -> list[Deserialization]:
check_message_buffer(payload)
auth_key_id, msg_id, length = struct.unpack_from("<qqi", payload)

View File

@ -1,7 +1,7 @@
import logging
import re
from abc import ABC, abstractmethod
from typing import List, NewType, Optional, Self, Tuple
from typing import NewType, Optional, Self
from ...tl.mtproto.types import RpcError as GeneratedRpcError
@ -192,7 +192,7 @@ class Mtp(ABC):
"""
@abstractmethod
def finalize(self) -> Optional[Tuple[MsgId, bytes]]:
def finalize(self) -> Optional[tuple[MsgId, bytes]]:
"""
Finalize the buffer of serialized requests.
@ -203,7 +203,7 @@ class Mtp(ABC):
@abstractmethod
def deserialize(
self, payload: bytes | bytearray | memoryview
) -> List[Deserialization]:
) -> list[Deserialization]:
"""
Deserialize incoming buffer payload.
"""

View File

@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Callable
from collections.abc import Callable
OutFn = Callable[[bytes | bytearray | memoryview], None]

View File

@ -4,18 +4,9 @@ import struct
import time
from abc import ABC
from asyncio import FIRST_COMPLETED, Event, Future
from collections.abc import Iterator
from dataclasses import dataclass
from typing import (
Generic,
Iterator,
List,
Optional,
Protocol,
Self,
Tuple,
Type,
TypeVar,
)
from typing import Generic, Optional, Protocol, Self, Type, TypeVar
from ..crypto import AuthKey
from ..mtproto import (
@ -127,7 +118,7 @@ class Connector(Protocol):
The :doc:`/concepts/datacenters` concept has examples on how to combine proxy libraries with Telethon.
"""
async def __call__(self, ip: str, port: int) -> Tuple[AsyncReader, AsyncWriter]:
async def __call__(self, ip: str, port: int) -> tuple[AsyncReader, AsyncWriter]:
raise NotImplementedError
@ -175,7 +166,7 @@ class Sender:
_transport: Transport
_mtp: Mtp
_mtp_buffer: bytearray
_requests: List[Request[object]]
_requests: list[Request[object]]
_request_event: Event
_next_ping: float
_read_buffer: bytearray
@ -239,7 +230,7 @@ class Sender:
if rx.done():
return rx.result()
async def step(self) -> List[Updates]:
async def step(self) -> list[Updates]:
self._try_fill_write()
recv_req = asyncio.create_task(self._request_event.wait())
@ -296,13 +287,13 @@ class Sender:
self._transport.pack(mtp_buffer, self._writer.write)
self._write_drain_pending = True
def _on_net_read(self, read_buffer: bytes) -> List[Updates]:
def _on_net_read(self, read_buffer: bytes) -> list[Updates]:
if not read_buffer:
raise ConnectionResetError("read 0 bytes")
self._read_buffer += read_buffer
updates: List[Updates] = []
updates: list[Updates] = []
while self._read_buffer:
self._mtp_buffer.clear()
try:
@ -331,7 +322,7 @@ class Sender:
)
self._next_ping = asyncio.get_running_loop().time() + PING_DELAY
def _process_mtp_buffer(self, updates: List[Updates]) -> None:
def _process_mtp_buffer(self, updates: list[Updates]) -> None:
results = self._mtp.deserialize(self._mtp_buffer)
for result in results:
@ -345,13 +336,13 @@ class Sender:
self._process_bad_message(result)
def _process_update(
self, updates: List[Updates], update: bytes | bytearray | memoryview
self, updates: list[Updates], update: bytes | bytearray | memoryview
) -> None:
try:
updates.append(Updates.from_bytes(update))
except ValueError:
cid = struct.unpack_from("I", update)[0]
alt_classes: Tuple[Type[Serializable], ...] = (
alt_classes: tuple[Type[Serializable], ...] = (
AffectedFoundMessages,
AffectedHistory,
AffectedMessages,

View File

@ -1,4 +1,4 @@
from typing import Any, Dict, Optional, Sequence, Tuple
from typing import Any, Optional, Sequence
from ...tl import abcs, types
from .packed import PackedChat, PackedType
@ -7,8 +7,8 @@ from .packed import PackedChat, PackedType
class ChatHashCache:
__slots__ = ("_hash_map", "_self_id", "_self_bot")
def __init__(self, self_user: Optional[Tuple[int, bool]]):
self._hash_map: Dict[int, Tuple[int, PackedType]] = {}
def __init__(self, self_user: Optional[tuple[int, bool]]):
self._hash_map: dict[int, tuple[int, PackedType]] = {}
self._self_id = self_user[0] if self_user else None
self._self_bot = self_user[1] if self_user else False

View File

@ -1,6 +1,6 @@
import logging
from enum import Enum
from typing import List, Literal, Union
from typing import Literal
from ...tl import abcs
@ -20,7 +20,7 @@ NO_UPDATES_TIMEOUT = 15 * 60
ENTRY_ACCOUNT: Literal["ACCOUNT"] = "ACCOUNT"
ENTRY_SECRET: Literal["SECRET"] = "SECRET"
Entry = Union[Literal["ACCOUNT", "SECRET"], int]
Entry = Literal["ACCOUNT", "SECRET"] | int
# Python's logging doesn't define a TRACE level. Pick halfway between DEBUG and NOTSET.
# We don't define a name for this as libraries shouldn't do that though.
@ -64,7 +64,7 @@ class PossibleGap:
def __init__(
self,
deadline: float,
updates: List[abcs.Update],
updates: list[abcs.Update],
) -> None:
self.deadline = deadline
self.updates = updates

View File

@ -2,7 +2,7 @@ import asyncio
import datetime
import logging
import time
from typing import Dict, List, Optional, Sequence, Set, Tuple
from typing import Optional, Sequence
from ...tl import Request, abcs, functions, types
from ..chat import ChatHashCache
@ -53,11 +53,11 @@ class MessageBox:
base_logger: logging.Logger,
) -> None:
self._logger = base_logger.getChild("messagebox")
self.map: Dict[Entry, State] = {}
self.map: dict[Entry, State] = {}
self.date = epoch()
self.seq = NO_SEQ
self.possible_gaps: Dict[Entry, PossibleGap] = {}
self.getting_diff_for: Set[Entry] = set()
self.possible_gaps: dict[Entry, PossibleGap] = {}
self.getting_diff_for: set[Entry] = set()
self.next_deadline: Optional[Entry] = None
if __debug__:
@ -164,7 +164,7 @@ class MessageBox:
return deadline
def reset_deadlines(self, entries: Set[Entry], deadline: float) -> None:
def reset_deadlines(self, entries: set[Entry], deadline: float) -> None:
if not entries:
return
@ -259,8 +259,8 @@ class MessageBox:
self,
updates: abcs.Updates,
chat_hashes: ChatHashCache,
) -> Tuple[List[abcs.Update], Sequence[abcs.User], Sequence[abcs.Chat]]:
result: List[abcs.Update] = []
) -> tuple[list[abcs.Update], Sequence[abcs.User], Sequence[abcs.Chat]]:
result: list[abcs.Update] = []
combined = adapt(updates, chat_hashes)
if __debug__:
@ -290,7 +290,7 @@ class MessageBox:
sorted_updates = list(sorted(combined.updates, key=update_sort_key))
any_pts_applied = False
reset_deadlines_for: Set[Entry] = set()
reset_deadlines_for: set[Entry] = set()
for update in sorted_updates:
entry, applied = self.apply_pts_info(update)
if entry is not None:
@ -341,7 +341,7 @@ class MessageBox:
def apply_pts_info(
self,
update: abcs.Update,
) -> Tuple[Optional[Entry], Optional[abcs.Update]]:
) -> tuple[Optional[Entry], Optional[abcs.Update]]:
if isinstance(update, types.UpdateChannelTooLong):
self.try_begin_get_diff(update.channel_id, "received updateChannelTooLong")
return None, None
@ -438,12 +438,12 @@ class MessageBox:
self,
diff: abcs.updates.Difference,
chat_hashes: ChatHashCache,
) -> Tuple[List[abcs.Update], Sequence[abcs.User], Sequence[abcs.Chat]]:
) -> tuple[list[abcs.Update], Sequence[abcs.User], Sequence[abcs.Chat]]:
if __debug__:
self._trace("applying account difference: %s", diff)
finish: bool
result: Tuple[List[abcs.Update], Sequence[abcs.User], Sequence[abcs.Chat]]
result: tuple[list[abcs.Update], Sequence[abcs.User], Sequence[abcs.Chat]]
if isinstance(diff, types.updates.DifferenceEmpty):
finish = True
self.date = datetime.datetime.fromtimestamp(
@ -496,7 +496,7 @@ class MessageBox:
self,
diff: types.updates.Difference,
chat_hashes: ChatHashCache,
) -> Tuple[List[abcs.Update], Sequence[abcs.User], Sequence[abcs.Chat]]:
) -> tuple[list[abcs.Update], Sequence[abcs.User], Sequence[abcs.Chat]]:
state = diff.state
assert isinstance(state, types.updates.State)
self.map[ENTRY_ACCOUNT].pts = state.pts
@ -582,7 +582,7 @@ class MessageBox:
channel_id: int,
diff: abcs.updates.ChannelDifference,
chat_hashes: ChatHashCache,
) -> Tuple[List[abcs.Update], Sequence[abcs.User], Sequence[abcs.Chat]]:
) -> tuple[list[abcs.Update], Sequence[abcs.User], Sequence[abcs.Chat]]:
entry: Entry = channel_id
if __debug__:
self._trace("applying channel=%r difference: %s", entry, diff)

View File

@ -1,4 +1,4 @@
from typing import List, Optional, Tuple
from typing import Optional
from ..tl.core.serializable import obj_repr
@ -13,7 +13,7 @@ class DataCenter:
:param auth: See below.
"""
__slots__: Tuple[str, ...] = ("id", "ipv4_addr", "ipv6_addr", "auth")
__slots__: tuple[str, ...] = ("id", "ipv4_addr", "ipv6_addr", "auth")
def __init__(
self,
@ -116,7 +116,7 @@ class UpdateState:
qts: int,
date: int,
seq: int,
channels: List[ChannelState],
channels: list[ChannelState],
) -> None:
self.pts = pts
"The primary partial sequence number."
@ -165,7 +165,7 @@ class Session:
def __init__(
self,
*,
dcs: Optional[List[DataCenter]] = None,
dcs: Optional[list[DataCenter]] = None,
user: Optional[User] = None,
state: Optional[UpdateState] = None,
):

View File

@ -1,6 +1,6 @@
import sqlite3
from pathlib import Path
from typing import Optional, Union
from typing import Optional
from ..session import ChannelState, DataCenter, Session, UpdateState, User
from .storage import Storage
@ -20,7 +20,7 @@ class SqliteSession(Storage):
an VCS by accident (adding ``*.session`` to ``.gitignore`` will catch them).
"""
def __init__(self, file: Union[str, Path]):
def __init__(self, file: str | Path):
path = Path(file)
if not path.suffix:
path = path.with_suffix(EXTENSION)
@ -105,18 +105,22 @@ class SqliteSession(Storage):
DataCenter(id=id, ipv4_addr=ipv4_addr, ipv6_addr=ipv6_addr, auth=auth)
for (id, ipv4_addr, ipv6_addr, auth) in datacenter
],
user=User(id=user[0], dc=user[1], bot=bool(user[2]), username=user[3])
if user
else None,
state=UpdateState(
pts=state[0],
qts=state[1],
date=state[2],
seq=state[3],
channels=[ChannelState(id=id, pts=pts) for id, pts in channelstate],
)
if state
else None,
user=(
User(id=user[0], dc=user[1], bot=bool(user[2]), username=user[3])
if user
else None
),
state=(
UpdateState(
pts=state[0],
qts=state[1],
date=state[2],
seq=state[3],
channels=[ChannelState(id=id, pts=pts) for id, pts in channelstate],
)
if state
else None
),
)
@staticmethod

View File

@ -1,6 +1,7 @@
import functools
import struct
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Type, TypeVar
from collections.abc import Callable
from typing import TYPE_CHECKING, Any, Optional, Type, TypeVar
if TYPE_CHECKING:
from .serializable import Serializable
@ -96,8 +97,8 @@ def single_deserializer(cls: Type[T]) -> Callable[[bytes], T]:
@functools.cache
def list_deserializer(cls: Type[T]) -> Callable[[bytes], List[T]]:
def deserializer(body: bytes) -> List[T]:
def list_deserializer(cls: Type[T]) -> Callable[[bytes], list[T]]:
def deserializer(body: bytes) -> list[T]:
reader = Reader(body)
vec_id, length = reader.read_fmt("<ii", 8)
assert vec_id == 0x1CB5C415 and length >= 0
@ -106,14 +107,14 @@ def list_deserializer(cls: Type[T]) -> Callable[[bytes], List[T]]:
return deserializer
def deserialize_i64_list(body: bytes) -> List[int]:
def deserialize_i64_list(body: bytes) -> list[int]:
reader = Reader(body)
vec_id, length = reader.read_fmt("<ii", 8)
assert vec_id == 0x1CB5C415 and length >= 0
return [*reader.read_fmt(f"<{length}q", length * 8)]
def deserialize_i32_list(body: bytes) -> List[int]:
def deserialize_i32_list(body: bytes) -> list[int]:
reader = Reader(body)
vec_id, length = reader.read_fmt("<ii", 8)
assert vec_id == 0x1CB5C415 and length >= 0

View File

@ -1,5 +1,6 @@
import struct
from typing import Any, Callable, Generic, Optional, TypeVar
from collections.abc import Callable
from typing import Any, Generic, Optional, TypeVar
Return = TypeVar("Return")

View File

@ -1,12 +1,12 @@
import abc
import struct
from typing import Protocol, Self, Tuple
from typing import Protocol, Self
from .reader import Reader
class HasSlots(Protocol):
__slots__: Tuple[str, ...]
__slots__: tuple[str, ...]
def obj_repr(self: HasSlots) -> str:
@ -16,7 +16,7 @@ def obj_repr(self: HasSlots) -> str:
class Serializable(abc.ABC):
__slots__: Tuple[str, ...] = ()
__slots__: tuple[str, ...] = ()
@classmethod
@abc.abstractmethod

View File

@ -1,5 +1,5 @@
import struct
from typing import Optional, Tuple
from typing import Optional
from pytest import raises
from telethon._impl.crypto import AuthKey
@ -49,7 +49,7 @@ def test_rpc_error_parsing() -> None:
PLAIN_REQUEST = b"Hey!"
def unwrap_finalize(finalized: Optional[Tuple[MsgId, bytes]]) -> bytes:
def unwrap_finalize(finalized: Optional[tuple[MsgId, bytes]]) -> bytes:
assert finalized is not None
_, buffer = finalized
return buffer

View File

@ -1,5 +1,3 @@
from typing import Tuple
from pytest import raises
from telethon._impl.mtproto import Abridged
@ -11,7 +9,7 @@ class Output(bytearray):
self += data
def setup_pack(n: int) -> Tuple[Abridged, bytes, Output]:
def setup_pack(n: int) -> tuple[Abridged, bytes, Output]:
input = bytes(x & 0xFF for x in range(n))
return Abridged(), input, Output()

View File

@ -1,5 +1,3 @@
from typing import Tuple
from pytest import raises
from telethon._impl.mtproto import Full
@ -11,12 +9,12 @@ class Output(bytearray):
self += data
def setup_pack(n: int) -> Tuple[Full, bytes, Output]:
def setup_pack(n: int) -> tuple[Full, bytes, Output]:
input = bytes(x & 0xFF for x in range(n))
return Full(), input, Output()
def setup_unpack(n: int) -> Tuple[bytes, Full, bytes, bytearray]:
def setup_unpack(n: int) -> tuple[bytes, Full, bytes, bytearray]:
transport, expected_output, input = setup_pack(n)
transport.pack(expected_output, input)

View File

@ -1,5 +1,3 @@
from typing import Tuple
from pytest import raises
from telethon._impl.mtproto import Intermediate
@ -11,7 +9,7 @@ class Output(bytearray):
self += data
def setup_pack(n: int) -> Tuple[Intermediate, bytes, Output]:
def setup_pack(n: int) -> tuple[Intermediate, bytes, Output]:
input = bytes(x & 0xFF for x in range(n))
return Intermediate(), input, Output()

View File

@ -1,11 +1,10 @@
import weakref
from pathlib import Path
from typing import Dict
class FakeFs:
def __init__(self) -> None:
self._files: Dict[Path, bytearray] = {}
self._files: dict[Path, bytearray] = {}
def open(self, path: Path) -> "SourceWriter":
return SourceWriter(self, path)

View File

@ -1,5 +1,4 @@
from pathlib import Path
from typing import Set
from ..tl_parser import NormalParameter, ParsedTl
from .fakefs import FakeFs, SourceWriter
@ -19,7 +18,7 @@ from .serde.serialization import generate_function, generate_write
def generate_init(
writer: SourceWriter, namespaces: Set[str], classes: Set[str]
writer: SourceWriter, namespaces: set[str], classes: set[str]
) -> None:
sorted_cls = list(sorted(classes))
sorted_ns = list(sorted(namespaces))
@ -46,14 +45,14 @@ def generate(fs: FakeFs, tl: ParsedTl) -> None:
ignored_types = {"true", "boolTrue", "boolFalse"} # also "compiler built-ins"
abc_namespaces: Set[str] = set()
type_namespaces: Set[str] = set()
function_namespaces: Set[str] = set()
abc_namespaces: set[str] = set()
type_namespaces: set[str] = set()
function_namespaces: set[str] = set()
abc_class_names: Set[str] = set()
type_class_names: Set[str] = set()
function_def_names: Set[str] = set()
generated_type_names: Set[str] = set()
abc_class_names: set[str] = set()
type_class_names: set[str] = set()
function_def_names: set[str] = set()
generated_type_names: set[str] = set()
for typedef in tl.typedefs:
if typedef.ty.full_name not in generated_types:
@ -193,10 +192,10 @@ def generate(fs: FakeFs, tl: ParsedTl) -> None:
writer.write(
"from .core import Serializable, Reader, deserialize_bool, deserialize_i32_list, deserialize_i64_list, deserialize_identity, single_deserializer, list_deserializer"
)
writer.write("from typing import cast, Tuple, Type")
writer.write("from typing import cast, Type")
writer.write(f"LAYER = {tl.layer!r}")
writer.write(
"TYPE_MAPPING = {t.constructor_id(): t for t in cast(Tuple[Type[Serializable]], ("
"TYPE_MAPPING = {t.constructor_id(): t for t in cast(tuple[Type[Serializable]], ("
)
for name in sorted(generated_type_names):
writer.write(f" types.{name},")

View File

@ -1,10 +1,10 @@
import re
from typing import Iterator, List
from collections.abc import Iterator
from ....tl_parser import BaseParameter, FlagsParameter, NormalParameter, Type
def split_words(name: str) -> List[str]:
def split_words(name: str) -> list[str]:
return re.findall(
r"""
^$

View File

@ -1,6 +1,6 @@
import struct
from itertools import groupby
from typing import Optional, Tuple
from typing import Optional
from ....tl_parser import Definition, NormalParameter, Parameter, Type
from ..fakefs import SourceWriter
@ -14,7 +14,7 @@ SPECIAL_CASED_OBJECT_READS = {
}
def reader_read_fmt(ty: Type, constructor_id: int) -> Tuple[str, Optional[str]]:
def reader_read_fmt(ty: Type, constructor_id: int) -> tuple[str, Optional[str]]:
if is_trivial(NormalParameter(ty=ty, flag=None)):
fmt = trivial_struct_fmt(NormalParameter(ty=ty, flag=None))
size = struct.calcsize(f"<{fmt}")

View File

@ -1,6 +1,6 @@
import struct
from collections.abc import Iterator
from itertools import groupby
from typing import Iterator
from ....tl_parser import Definition, FlagsParameter, NormalParameter, Parameter, Type
from ..fakefs import SourceWriter
@ -104,9 +104,11 @@ def generate_write(writer: SourceWriter, defn: Definition) -> None:
for param in group:
if isinstance(param.ty, FlagsParameter):
flags = " | ".join(
f"({1 << p.ty.flag.index} if self.{p.name} else 0)"
if p.ty.ty.name == "true"
else f"(0 if self.{p.name} is None else {1 << p.ty.flag.index})"
(
f"({1 << p.ty.flag.index} if self.{p.name} else 0)"
if p.ty.ty.name == "true"
else f"(0 if self.{p.name} is None else {1 << p.ty.flag.index})"
)
for p in defn.params
if isinstance(p.ty, NormalParameter)
and p.ty.flag
@ -140,9 +142,11 @@ def generate_function(writer: SourceWriter, defn: Definition) -> None:
for param in group:
if isinstance(param.ty, FlagsParameter):
flags = " | ".join(
f"({1 << p.ty.flag.index} if {p.name} else 0)"
if p.ty.ty.name == "true"
else f"(0 if {p.name} is None else {1 << p.ty.flag.index})"
(
f"({1 << p.ty.flag.index} if {p.name} else 0)"
if p.ty.ty.name == "true"
else f"(0 if {p.name} is None else {1 << p.ty.flag.index})"
)
for p in defn.params
if isinstance(p.ty, NormalParameter)
and p.ty.flag

View File

@ -1,7 +1,7 @@
import re
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Union
from typing import Optional
from .tl import Definition
from .tl_iterator import FunctionDef, TypeDef, iterate
@ -10,13 +10,13 @@ from .tl_iterator import FunctionDef, TypeDef, iterate
@dataclass
class ParsedTl:
layer: Optional[int]
typedefs: List[Definition]
functiondefs: List[Definition]
typedefs: list[Definition]
functiondefs: list[Definition]
def load_tl_file(path: Union[str, Path]) -> ParsedTl:
typedefs: List[TypeDef] = []
functiondefs: List[FunctionDef] = []
def load_tl_file(path: str | Path) -> ParsedTl:
typedefs: list[TypeDef] = []
functiondefs: list[FunctionDef] = []
with open(path, "r", encoding="utf-8") as fd:
contents = fd.read()

View File

@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import List, Self, Set
from typing import Self
from ..utils import infer_id
from .parameter import Parameter, TypeDefNotImplemented
@ -9,10 +9,10 @@ from .ty import Type
@dataclass
class Definition:
namespace: List[str]
namespace: list[str]
name: str
id: int
params: List[Parameter]
params: list[Parameter]
ty: Type
@classmethod
@ -58,9 +58,9 @@ class Definition:
except ValueError:
raise ValueError("invalid id")
type_defs: List[str] = []
flag_defs: List[str] = []
params: List[Parameter] = []
type_defs: list[str] = []
flag_defs: list[str] = []
params: list[Parameter] = []
for param_str in middle.split():
try:
@ -102,7 +102,7 @@ class Definition:
res += f"{ns}."
res += f"{self.name}#{self.id:x}"
def_set: Set[str] = set()
def_set: set[str] = set()
for param in self.params:
if isinstance(param.ty, NormalParameter):
def_set.update(param.ty.ty.find_generic_refs())

View File

@ -1,6 +1,8 @@
from __future__ import annotations
from abc import ABC
from dataclasses import dataclass
from typing import Optional, Union
from typing import Optional
from .flag import Flag
from .ty import Type
@ -8,7 +10,7 @@ from .ty import Type
class BaseParameter(ABC):
@staticmethod
def from_str(ty: str) -> Union["FlagsParameter", "NormalParameter"]:
def from_str(ty: str) -> FlagsParameter | NormalParameter:
if not ty:
raise ValueError("empty")
if ty == "#":

View File

@ -1,10 +1,11 @@
from collections.abc import Iterator
from dataclasses import dataclass
from typing import Iterator, List, Optional, Self
from typing import Optional, Self
@dataclass
class Type:
namespace: List[str]
namespace: list[str]
name: str
bare: bool
generic_ref: bool

View File

@ -1,4 +1,5 @@
from typing import Iterator, Type
from collections.abc import Iterator
from typing import Type
from .tl.definition import Definition
from .utils import remove_tl_comments

View File

@ -1,5 +1,3 @@
from typing import List
from pytest import mark
from telethon_generator._impl.codegen.serde.common import (
split_words,
@ -19,7 +17,7 @@ from telethon_generator._impl.codegen.serde.common import (
("fileMp4", ["file", "Mp4"]),
],
)
def test_split_name_words(name: str, expected: List[str]) -> None:
def test_split_name_words(name: str, expected: list[str]) -> None:
assert split_words(name) == expected

View File

@ -1,17 +1,17 @@
from typing import List, Optional
from typing import Optional
from telethon_generator.codegen import FakeFs, generate
from telethon_generator.tl_parser import Definition, ParsedTl, parse_tl_file
def get_definitions(contents: str) -> List[Definition]:
def get_definitions(contents: str) -> list[Definition]:
return [defn for defn in parse_tl_file(contents) if not isinstance(defn, Exception)]
def gen_py_code(
*,
typedefs: Optional[List[Definition]] = None,
functiondefs: Optional[List[Definition]] = None,
typedefs: Optional[list[Definition]] = None,
functiondefs: Optional[list[Definition]] = None,
) -> str:
fs = FakeFs()
generate(

View File

@ -15,12 +15,11 @@ import ast
import subprocess
import sys
from pathlib import Path
from typing import Dict, List, Union
class FunctionMethodsVisitor(ast.NodeVisitor):
def __init__(self) -> None:
self.methods: List[Union[ast.FunctionDef, ast.AsyncFunctionDef]] = []
self.methods: list[ast.FunctionDef | ast.AsyncFunctionDef] = []
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
self._try_add_def(node)
@ -28,7 +27,7 @@ class FunctionMethodsVisitor(ast.NodeVisitor):
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None:
self._try_add_def(node)
def _try_add_def(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef]) -> None:
def _try_add_def(self, node: ast.FunctionDef | ast.AsyncFunctionDef) -> None:
match node.args.args:
case [ast.arg(arg="self", annotation=ast.Name(id="Client")), *_]:
self.methods.append(node)
@ -39,7 +38,7 @@ class FunctionMethodsVisitor(ast.NodeVisitor):
class MethodVisitor(ast.NodeVisitor):
def __init__(self) -> None:
self._in_client = False
self.method_docs: Dict[str, str] = {}
self.method_docs: dict[str, str] = {}
def visit_ClassDef(self, node: ast.ClassDef) -> None:
if node.name == "Client":
@ -55,7 +54,7 @@ class MethodVisitor(ast.NodeVisitor):
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None:
self._try_add_doc(node)
def _try_add_doc(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef]) -> None:
def _try_add_doc(self, node: ast.FunctionDef | ast.AsyncFunctionDef) -> None:
if not self._in_client:
return
@ -86,10 +85,10 @@ def main() -> None:
m_visitor.visit(ast.parse(contents))
class_body: List[ast.stmt] = []
class_body: list[ast.stmt] = []
for function in sorted(fm_visitor.methods, key=lambda f: f.name):
function_body: List[ast.stmt] = []
function_body: list[ast.stmt] = []
if doc := m_visitor.method_docs.get(function.name):
function.body.append(ast.Expr(value=ast.Constant(value=doc)))

View File

@ -7,12 +7,11 @@ import ast
import os
import re
from pathlib import Path
from typing import List
class ImportVisitor(ast.NodeVisitor):
def __init__(self) -> None:
self.imported_names: List[str] = []
self.imported_names: list[str] = []
def visit_ImportFrom(self, node: ast.ImportFrom) -> None:
if node.level == 1:
@ -26,7 +25,7 @@ def main() -> None:
rf"(tl|mtproto){re.escape(os.path.sep)}(abcs|functions|types)"
)
files: List[str] = []
files: list[str] = []
for file in impl_root.rglob("__init__.py"):
file_str = str(file)
if autogenerated_re.search(file_str):

View File

@ -1,6 +1,4 @@
from typing import List
class AES:
def __init__(self, key: bytes) -> None: ...
def encrypt(self, plaintext: List[int]) -> List[int]: ...
def decrypt(self, ciphertext: List[int]) -> List[int]: ...
def encrypt(self, plaintext: list[int]) -> list[int]: ...
def decrypt(self, ciphertext: list[int]) -> list[int]: ...

View File

@ -1,15 +1,15 @@
from typing import Any, Dict, Optional
from typing import Any, Optional
def build_wheel(
wheel_directory: str,
config_settings: Optional[Dict[Any, Any]] = None,
config_settings: Optional[dict[Any, Any]] = None,
metadata_directory: Optional[str] = None,
) -> str: ...
def build_sdist(
sdist_directory: str, config_settings: Optional[Dict[Any, Any]] = None
sdist_directory: str, config_settings: Optional[dict[Any, Any]] = None
) -> str: ...
def build_editable(
wheel_directory: str,
config_settings: Optional[Dict[Any, Any]] = None,
config_settings: Optional[dict[Any, Any]] = None,
metadata_directory: Optional[str] = None,
) -> str: ...

View File

@ -1,19 +1,19 @@
from typing import Any, Dict, Optional
from typing import Any, Optional
class build_meta:
@staticmethod
def build_wheel(
wheel_directory: str,
config_settings: Optional[Dict[Any, Any]] = None,
config_settings: Optional[dict[Any, Any]] = None,
metadata_directory: Optional[str] = None,
) -> str: ...
@staticmethod
def build_sdist(
sdist_directory: str, config_settings: Optional[Dict[Any, Any]] = None
sdist_directory: str, config_settings: Optional[dict[Any, Any]] = None
) -> str: ...
@staticmethod
def build_editable(
wheel_directory: str,
config_settings: Optional[Dict[Any, Any]] = None,
config_settings: Optional[dict[Any, Any]] = None,
metadata_directory: Optional[str] = None,
) -> str: ...