Add type hints to all public methods in the client

This commit is contained in:
Lonami Exo 2019-05-03 21:37:27 +02:00
parent c0e506e568
commit cd4b915522
16 changed files with 393 additions and 159 deletions

View File

@ -1,10 +1,14 @@
import functools
import inspect
import typing
from .users import UserMethods, _NOT_A_REQUEST
from .. import helpers, utils
from ..tl import functions, TLRequest
if typing.TYPE_CHECKING:
from .telegramclient import TelegramClient
# TODO Make use of :tl:`InvokeWithMessagesRange` somehow
# For that, we need to use :tl:`GetSplitRanges` first.
@ -105,8 +109,16 @@ class _TakeoutClient:
class AccountMethods(UserMethods):
def takeout(
self, finalize=True, *, contacts=None, users=None, chats=None,
megagroups=None, channels=None, files=None, max_file_size=None):
self: 'TelegramClient',
finalize: bool = True,
*,
contacts: bool = None,
users: bool = None,
chats: bool = None,
megagroups: bool = None,
channels: bool = None,
files: bool = None,
max_file_size: bool = None) -> 'TelegramClient':
"""
Creates a proxy object over the current :ref:`TelegramClient` through
which making requests will use :tl:`InvokeWithTakeoutRequest` to wrap
@ -146,6 +158,10 @@ class AccountMethods(UserMethods):
<telethon.sessions.abstract.Session.takeout_id>`.
Args:
finalize (`bool`):
Whether the takeout session should be finalized upon
exit or not.
contacts (`bool`):
Set to ``True`` if you plan on downloading contacts.
@ -192,7 +208,7 @@ class AccountMethods(UserMethods):
return _TakeoutClient(finalize, self, request)
async def end_takeout(self, success):
async def end_takeout(self: 'TelegramClient', success: bool) -> bool:
"""
Finishes a takeout, with specified result sent back to Telegram.

View File

@ -1,27 +1,33 @@
import datetime
import getpass
import hashlib
import inspect
import os
import sys
import typing
from .messageparse import MessageParseMethods
from .users import UserMethods
from .. import utils, helpers, errors, password as pwd_mod
from ..tl import types, functions
if typing.TYPE_CHECKING:
from .telegramclient import TelegramClient
class AuthMethods(MessageParseMethods, UserMethods):
# region Public methods
def start(
self,
phone=lambda: input('Please enter your phone (or bot token): '),
password=lambda: getpass.getpass('Please enter your password: '),
self: 'TelegramClient',
phone: typing.Callable[[], str] = lambda: input('Please enter your phone (or bot token): '),
password: typing.Callable[[], str] = lambda: getpass.getpass('Please enter your password: '),
*,
bot_token=None, force_sms=False, code_callback=None,
first_name='New User', last_name='', max_attempts=3):
bot_token: str = None,
force_sms: bool = False,
code_callback: typing.Callable[[], typing.Union[str, int]] = None,
first_name: str = 'New User',
last_name: str = '',
max_attempts: int = 3) -> 'TelegramClient':
"""
Convenience method to interactively connect and sign in if required,
also taking into consideration that 2FA may be enabled in the account.
@ -238,8 +244,13 @@ class AuthMethods(MessageParseMethods, UserMethods):
return phone, phone_hash
async def sign_in(
self, phone=None, code=None, *, password=None,
bot_token=None, phone_code_hash=None):
self: 'TelegramClient',
phone: str = None,
code: typing.Union[str, int] = None,
*,
password: str = None,
bot_token: str = None,
phone_code_hash: str = None) -> types.User:
"""
Starts or completes the sign in process with the given phone number
or code that Telegram sent.
@ -304,8 +315,14 @@ class AuthMethods(MessageParseMethods, UserMethods):
return self._on_login(result.user)
async def sign_up(self, code, first_name, last_name='',
*, phone=None, phone_code_hash=None):
async def sign_up(
self: 'TelegramClient',
code: typing.Union[str, int],
first_name: str,
last_name: str = '',
*,
phone: str = None,
phone_code_hash: str = None) -> types.User:
"""
Signs up to Telegram if you don't have an account yet.
You must call .send_code_request(phone) first.
@ -377,7 +394,11 @@ class AuthMethods(MessageParseMethods, UserMethods):
return user
async def send_code_request(self, phone, *, force_sms=False):
async def send_code_request(
self: 'TelegramClient',
phone: str,
*,
force_sms: bool = False) -> types.auth.SentCode:
"""
Sends a code request to the specified phone number.
@ -417,7 +438,7 @@ class AuthMethods(MessageParseMethods, UserMethods):
return result
async def log_out(self):
async def log_out(self: 'TelegramClient') -> bool:
"""
Logs out Telegram and deletes the current ``*.session`` file.
@ -439,8 +460,13 @@ class AuthMethods(MessageParseMethods, UserMethods):
return True
async def edit_2fa(
self, current_password=None, new_password=None,
*, hint='', email=None, email_code_callback=None):
self: 'TelegramClient',
current_password: str = None,
new_password: str = None,
*,
hint: str = '',
email: str = None,
email_code_callback: typing.Callable[[int], str] = None) -> bool:
"""
Changes the 2FA settings of the logged in user, according to the
passed parameters. Take note of the parameter explanations.

View File

@ -1,9 +1,21 @@
import typing
from .users import UserMethods
from .. import hints
from ..tl import types, functions, custom
if typing.TYPE_CHECKING:
from .telegramclient import TelegramClient
class BotMethods(UserMethods):
async def inline_query(self, bot, query, *, offset=None, geo_point=None):
async def inline_query(
self: 'TelegramClient',
bot: hints.EntityLike,
query: str,
*,
offset: str = None,
geo_point: types.GeoPoint = None) -> custom.InlineResults:
"""
Makes the given inline query to the specified bot
i.e. ``@vote My New Poll`` would be as follows:

View File

@ -1,11 +1,18 @@
import typing
from .updates import UpdateMethods
from .. import utils, hints
from ..tl import types, custom
from .. import utils, events
if typing.TYPE_CHECKING:
from .telegramclient import TelegramClient
class ButtonMethods(UpdateMethods):
@staticmethod
def build_reply_markup(buttons, inline_only=False):
def build_reply_markup(
self: 'TelegramClient',
buttons: typing.Optional[hints.MarkupLike],
inline_only: bool = False) -> typing.Optional[types.TypeReplyMarkup]:
"""
Builds a :tl`ReplyInlineMarkup` or :tl:`ReplyKeyboardMarkup` for
the given buttons, or does nothing if either no buttons are

View File

@ -3,13 +3,17 @@ import itertools
import string
from .users import UserMethods
from .. import helpers, utils
from .. import helpers, utils, hints
from ..requestiter import RequestIter
from ..tl import types, functions, custom
_MAX_PARTICIPANTS_CHUNK_SIZE = 200
_MAX_ADMIN_LOG_CHUNK_SIZE = 100
import typing
if typing.TYPE_CHECKING:
from .telegramclient import TelegramClient
class _ChatAction:
_str_mapping = {
@ -275,9 +279,13 @@ class ChatMethods(UserMethods):
# region Public methods
def iter_participants(
self, entity, limit=None, *, search='',
filter=None, aggressive=False
):
self: 'TelegramClient',
entity: hints.EntityLike,
limit: float = None,
*,
search: str = '',
filter: types.TypeChannelParticipantsFilter = None,
aggressive: bool = False) -> _ParticipantsIter:
"""
Iterator over the participants belonging to the specified chat.
@ -330,7 +338,10 @@ class ChatMethods(UserMethods):
aggressive=aggressive
)
async def get_participants(self, *args, **kwargs):
async def get_participants(
self: 'TelegramClient',
*args,
**kwargs) -> hints.TotalList:
"""
Same as `iter_participants`, but returns a
`TotalList <telethon.helpers.TotalList>` instead.
@ -338,10 +349,28 @@ class ChatMethods(UserMethods):
return await self.iter_participants(*args, **kwargs).collect()
def iter_admin_log(
self, entity, limit=None, *, max_id=0, min_id=0, search=None,
admins=None, join=None, leave=None, invite=None, restrict=None,
unrestrict=None, ban=None, unban=None, promote=None, demote=None,
info=None, settings=None, pinned=None, edit=None, delete=None):
self: 'TelegramClient',
entity: hints.EntityLike,
limit: float = None,
*,
max_id: int = 0,
min_id: int = 0,
search: str = None,
admins: hints.EntitiesLike = None,
join: bool = None,
leave: bool = None,
invite: bool = None,
restrict: bool = None,
unrestrict: bool = None,
ban: bool = None,
unban: bool = None,
promote: bool = None,
demote: bool = None,
info: bool = None,
settings: bool = None,
pinned: bool = None,
edit: bool = None,
delete: bool = None) -> _AdminLogIter:
"""
Iterator over the admin log for the specified channel.
@ -453,13 +482,22 @@ class ChatMethods(UserMethods):
delete=delete
)
async def get_admin_log(self, *args, **kwargs):
async def get_admin_log(
self: 'TelegramClient',
*args,
**kwargs) -> hints.TotalList:
"""
Same as `iter_admin_log`, but returns a ``list`` instead.
"""
return await self.iter_admin_log(*args, **kwargs).collect()
def action(self, entity, action, *, delay=4, auto_cancel=True):
def action(
self: 'TelegramClient',
entity: hints.EntityLike,
action: typing.Union[str, types.TypeSendMessageAction],
*,
delay: float = 4,
auto_cancel: bool = True) -> typing.Union[_ChatAction, typing.Coroutine]:
"""
Returns a context-manager object to represent a "chat action".

View File

@ -1,12 +1,16 @@
import itertools
import typing
from .users import UserMethods
from .. import utils
from .. import utils, hints
from ..requestiter import RequestIter
from ..tl import types, functions, custom
_MAX_CHUNK_SIZE = 100
if typing.TYPE_CHECKING:
from .telegramclient import TelegramClient
class _DialogsIter(RequestIter):
async def _init(
@ -98,9 +102,14 @@ class DialogMethods(UserMethods):
# region Public methods
def iter_dialogs(
self, limit=None, *, offset_date=None, offset_id=0,
offset_peer=types.InputPeerEmpty(), ignore_migrated=False
):
self: 'TelegramClient',
limit: float = None,
*,
offset_date: hints.DateLike = None,
offset_id: int = 0,
offset_peer: hints.EntityLike = types.InputPeerEmpty(),
ignore_migrated: bool = False
) -> _DialogsIter:
"""
Returns an iterator over the dialogs, yielding 'limit' at most.
Dialogs are the open "chats" or conversations with other people,
@ -141,14 +150,14 @@ class DialogMethods(UserMethods):
ignore_migrated=ignore_migrated
)
async def get_dialogs(self, *args, **kwargs):
async def get_dialogs(self: 'TelegramClient', *args, **kwargs) -> hints.TotalList:
"""
Same as `iter_dialogs`, but returns a
`TotalList <telethon.helpers.TotalList>` instead.
"""
return await self.iter_dialogs(*args, **kwargs).collect()
def iter_drafts(self):
def iter_drafts(self: 'TelegramClient') -> _DraftsIter:
"""
Iterator over all open draft messages.
@ -160,16 +169,21 @@ class DialogMethods(UserMethods):
# TODO Passing a limit here makes no sense
return _DraftsIter(self, None)
async def get_drafts(self):
async def get_drafts(self: 'TelegramClient') -> hints.TotalList:
"""
Same as :meth:`iter_drafts`, but returns a list instead.
"""
return await self.iter_drafts().collect()
def conversation(
self, entity,
*, timeout=60, total_timeout=None, max_messages=100,
exclusive=True, replies_are_responses=True):
self: 'TelegramClient',
entity: hints.EntityLike,
*,
timeout: float = 60,
total_timeout: float = None,
max_messages: int = 100,
exclusive: bool = True,
replies_are_responses: bool = True) -> custom.Conversation:
"""
Creates a `Conversation <telethon.tl.custom.conversation.Conversation>`
with the given entity so you can easily send messages and await for

View File

@ -2,9 +2,10 @@ import datetime
import io
import os
import pathlib
import typing
from .users import UserMethods
from .. import utils, helpers, errors
from .. import utils, helpers, errors, hints
from ..tl import TLObject, types, functions
try:
@ -12,13 +13,20 @@ try:
except ImportError:
aiohttp = None
if typing.TYPE_CHECKING:
from .telegramclient import TelegramClient
class DownloadMethods(UserMethods):
# region Public methods
async def download_profile_photo(
self, entity, file=None, *, download_big=True):
self: 'TelegramClient',
entity: hints.EntityLike,
file: hints.FileLike = None,
*,
download_big: bool = True) -> typing.Optional[str]:
"""
Downloads the profile photo of the given entity (user/chat/channel).
@ -118,8 +126,13 @@ class DownloadMethods(UserMethods):
# Until there's a report for chats, no need to.
return None
async def download_media(self, message, file=None,
*, thumb=None, progress_callback=None):
async def download_media(
self: 'TelegramClient',
message: hints.MessageLike,
file: hints.FileLike = None,
*,
thumb: hints.FileLike = None,
progress_callback: hints.ProgressCallback = None) -> typing.Optional[str]:
"""
Downloads the given media, or the media from a specified Message.
@ -194,8 +207,14 @@ class DownloadMethods(UserMethods):
)
async def download_file(
self, input_location, file=None, *, part_size_kb=None,
file_size=None, progress_callback=None, dc_id=None):
self: 'TelegramClient',
input_location: hints.FileLike,
file: hints.OutFileLike = None,
*,
part_size_kb: float = None,
file_size: int = None,
progress_callback: hints.ProgressCallback = None,
dc_id: int = None) -> None:
"""
Downloads the given input location to a file.
@ -337,8 +356,7 @@ class DownloadMethods(UserMethods):
else:
return None
@staticmethod
def _download_cached_photo_size(size, file):
def _download_cached_photo_size(self: 'TelegramClient', size, file):
# No need to download anything, simply write the bytes
if file is bytes:
return size.bytes
@ -357,7 +375,7 @@ class DownloadMethods(UserMethods):
f.close()
return file
async def _download_photo(self, photo, file, date, thumb, progress_callback):
async def _download_photo(self: 'TelegramClient', photo, file, date, thumb, progress_callback):
"""Specialized version of .download_media() for photos"""
# Determine the photo and its largest size
if isinstance(photo, types.MessageMediaPhoto):

View File

@ -1,17 +1,21 @@
import itertools
import re
import typing
from .users import UserMethods
from .. import utils
from ..tl import types
if typing.TYPE_CHECKING:
from .telegramclient import TelegramClient
class MessageParseMethods(UserMethods):
# region Public properties
@property
def parse_mode(self):
def parse_mode(self: 'TelegramClient'):
"""
This property is the default parse mode used when sending messages.
Defaults to `telethon.extensions.markdown`. It will always
@ -38,14 +42,14 @@ class MessageParseMethods(UserMethods):
return self._parse_mode
@parse_mode.setter
def parse_mode(self, mode):
def parse_mode(self: 'TelegramClient', mode: str):
self._parse_mode = utils.sanitize_parse_mode(mode)
# endregion
# region Private methods
async def _replace_with_mention(self, entities, i, user):
async def _replace_with_mention(self: 'TelegramClient', entities, i, user):
"""
Helper method to replace ``entities[i]`` to mention ``user``,
or do nothing if it can't be found.
@ -59,7 +63,7 @@ class MessageParseMethods(UserMethods):
except (ValueError, TypeError):
return False
async def _parse_message_text(self, message, parse_mode):
async def _parse_message_text(self: 'TelegramClient', message, parse_mode):
"""
Returns a (parsed message, entities) tuple depending on ``parse_mode``.
"""
@ -89,7 +93,7 @@ class MessageParseMethods(UserMethods):
return message, msg_entities
def _get_response_message(self, request, result, input_chat):
def _get_response_message(self: 'TelegramClient', request, result, input_chat):
"""
Extracts the response message known a request and Update result.
The request may also be the ID of the message to match.

View File

@ -1,14 +1,18 @@
import itertools
import typing
from .buttons import ButtonMethods
from .messageparse import MessageParseMethods
from .uploads import UploadMethods
from .buttons import ButtonMethods
from .. import utils, errors
from ..tl import types, functions
from .. import utils, errors, hints
from ..requestiter import RequestIter
from ..tl import types, functions
_MAX_CHUNK_SIZE = 100
if typing.TYPE_CHECKING:
from .telegramclient import TelegramClient
class _MessagesIter(RequestIter):
"""
@ -293,10 +297,22 @@ class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods):
# region Message retrieval
def iter_messages(
self, entity, limit=None, *, offset_date=None, offset_id=0,
max_id=0, min_id=0, add_offset=0, search=None, filter=None,
from_user=None, wait_time=None, ids=None, reverse=False
):
self: 'TelegramClient',
entity: hints.EntityLike,
limit: float = None,
*,
offset_date: hints.DateLike = None,
offset_id: int = 0,
max_id: int = 0,
min_id: int = 0,
add_offset: int = 0,
search: str = None,
filter: typing.Union[types.TypeMessagesFilter, typing.Type[types.TypeMessagesFilter]] = None,
from_user: hints.EntityLike = None,
wait_time: float = None,
ids: typing.Union[int, typing.Sequence[int]] = None,
reverse: bool = False
) -> typing.Union[_MessagesIter, _IDsIter]:
"""
Iterator over the message history for the specified entity.
If either `search`, `filter` or `from_user` are provided,
@ -417,7 +433,7 @@ class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods):
search=search
)
async def get_messages(self, *args, **kwargs):
async def get_messages(self: 'TelegramClient', *args, **kwargs) -> hints.TotalList:
"""
Same as `iter_messages`, but returns a
`TotalList <telethon.helpers.TotalList>` instead.
@ -457,10 +473,18 @@ class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods):
# region Message sending/editing/deleting
async def send_message(
self, entity, message='', *, reply_to=None,
parse_mode=(), link_preview=True, file=None,
force_document=False, clear_draft=False, buttons=None,
silent=None):
self: 'TelegramClient',
entity: hints.EntityLike,
message: hints.MessageLike = '',
*,
reply_to: typing.Union[int, types.Message] = None,
parse_mode: typing.Optional[str] = (),
link_preview: bool = True,
file: hints.FileLike = None,
force_document: bool = False,
clear_draft: bool = False,
buttons: hints.MarkupLike = None,
silent: bool = None) -> types.Message:
"""
Sends the given message to the specified entity (user/chat/channel).
@ -609,8 +633,14 @@ class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods):
return self._get_response_message(request, result, entity)
async def forward_messages(self, entity, messages, from_peer=None,
*, silent=None, as_album=None):
async def forward_messages(
self: 'TelegramClient',
entity: hints.EntityLike,
messages: typing.Union[hints.MessageIDLike, typing.Sequence[hints.MessageIDLike]],
from_peer: hints.EntityLike = None,
*,
silent: bool = None,
as_album: bool = None) -> typing.Sequence[types.Message]:
"""
Forwards the given message(s) to the specified entity.
@ -719,9 +749,15 @@ class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods):
return sent[0] if single else sent
async def edit_message(
self, entity, message=None, text=None,
*, parse_mode=(), link_preview=True, file=None,
buttons=None):
self: 'TelegramClient',
entity: typing.Union[hints.EntityLike, types.Message],
message: hints.MessageLike = None,
text: str = None,
*,
parse_mode: str = (),
link_preview: bool = True,
file: hints.FileLike = None,
buttons: hints.MarkupLike = None) -> types.Message:
"""
Edits the given message ID (to change its contents or disable preview).
@ -824,7 +860,12 @@ class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods):
await self._cache_media(msg, file, file_handle, image=image)
return msg
async def delete_messages(self, entity, message_ids, *, revoke=True):
async def delete_messages(
self: 'TelegramClient',
entity: hints.EntityLike,
message_ids: typing.Union[hints.MessageIDLike, typing.Sequence[hints.MessageIDLike]],
*,
revoke: bool = True) -> typing.Sequence[types.messages.AffectedMessages]:
"""
Deletes a message from a chat, optionally "for everyone".
@ -877,7 +918,12 @@ class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods):
# region Miscellaneous
async def send_read_acknowledge(
self, entity, message=None, *, max_id=None, clear_mentions=False):
self: 'TelegramClient',
entity: hints.EntityLike,
message: typing.Union[hints.MessageIDLike, typing.Sequence[hints.MessageIDLike]] = None,
*,
max_id: int = None,
clear_mentions: bool = False) -> bool:
"""
Sends a "read acknowledge" (i.e., notifying the given peer that we've
read their messages, also known as the "double check").
@ -924,7 +970,7 @@ class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods):
if max_id is not None:
if isinstance(entity, types.InputPeerChannel):
return await self(functions.channels.ReadHistoryRequest(
entity, max_id=max_id))
utils.get_input_channel(entity), max_id=max_id))
else:
return await self(functions.messages.ReadHistoryRequest(
entity, max_id=max_id))

View File

@ -3,23 +3,26 @@ import asyncio
import logging
import platform
import time
from datetime import datetime, timezone
import typing
from .. import version, helpers, __name__ as __base_name__
from ..crypto import rsa
from ..entitycache import EntityCache
from ..extensions import markdown
from ..network import MTProtoSender, ConnectionTcpFull, TcpMTProxy
from ..network import MTProtoSender, Connection, ConnectionTcpFull, TcpMTProxy
from ..sessions import Session, SQLiteSession, MemorySession
from ..statecache import StateCache
from ..tl import TLObject, functions, types
from ..tl.alltlobjects import LAYER
from ..entitycache import EntityCache
from ..statecache import StateCache
DEFAULT_DC_ID = 4
DEFAULT_IPV4_IP = '149.154.167.51'
DEFAULT_IPV6_IP = '[2001:67c:4e8:f002::a]'
DEFAULT_PORT = 443
if typing.TYPE_CHECKING:
from .telegramclient import TelegramClient
__default_log__ = logging.getLogger(__base_name__)
__default_log__.addHandler(logging.NullHandler())
@ -159,25 +162,29 @@ class TelegramBaseClient(abc.ABC):
# region Initialization
def __init__(self, session, api_id, api_hash,
def __init__(
self: 'TelegramClient',
session: typing.Union[str, Session],
api_id: int,
api_hash: str,
*,
connection=ConnectionTcpFull,
use_ipv6=False,
proxy=None,
timeout=10,
request_retries=5,
connection_retries=5,
retry_delay=1,
auto_reconnect=True,
sequential_updates=False,
flood_sleep_threshold=60,
device_model=None,
system_version=None,
app_version=None,
lang_code='en',
system_lang_code='en',
loop=None,
base_logger=None):
connection: typing.Type[Connection] = ConnectionTcpFull,
use_ipv6: bool = False,
proxy: typing.Union[tuple, dict] = None,
timeout: int = 10,
request_retries: int = 5,
connection_retries: int =5,
retry_delay: int =1,
auto_reconnect: bool = True,
sequential_updates: bool = False,
flood_sleep_threshold: int = 60,
device_model: str = None,
system_version: str = None,
app_version: str = None,
lang_code: str = 'en',
system_lang_code: str = 'en',
loop: asyncio.AbstractEventLoop = None,
base_logger: typing.Union[str, logging.Logger] = None):
if not api_id or not api_hash:
raise ValueError(
"Your API ID or Hash cannot be empty or None. "
@ -334,11 +341,11 @@ class TelegramBaseClient(abc.ABC):
# region Properties
@property
def loop(self):
def loop(self: 'TelegramClient') -> asyncio.AbstractEventLoop:
return self._loop
@property
def disconnected(self):
def disconnected(self: 'TelegramClient') -> asyncio.Future:
"""
Future that resolves when the connection to Telegram
ends, either by user action or in the background.
@ -349,7 +356,7 @@ class TelegramBaseClient(abc.ABC):
# region Connecting
async def connect(self):
async def connect(self: 'TelegramClient') -> None:
"""
Connects to Telegram.
"""
@ -369,14 +376,14 @@ class TelegramBaseClient(abc.ABC):
self._updates_handle = self._loop.create_task(self._update_loop())
def is_connected(self):
def is_connected(self: 'TelegramClient') -> bool:
"""
Returns ``True`` if the user has connected.
"""
sender = getattr(self, '_sender', None)
return sender and sender.is_connected()
def disconnect(self):
def disconnect(self: 'TelegramClient'):
"""
Disconnects from Telegram.
@ -389,7 +396,7 @@ class TelegramBaseClient(abc.ABC):
else:
self._loop.run_until_complete(self._disconnect_coro())
async def _disconnect_coro(self):
async def _disconnect_coro(self: 'TelegramClient'):
await self._disconnect()
pts, date = self._state_cache[None]
@ -403,7 +410,7 @@ class TelegramBaseClient(abc.ABC):
self.session.close()
async def _disconnect(self):
async def _disconnect(self: 'TelegramClient'):
"""
Disconnect only, without closing the session. Used in reconnections
to different data centers, where we don't want to close the session
@ -414,7 +421,7 @@ class TelegramBaseClient(abc.ABC):
await helpers._cancel(self._log[__name__],
updates_handle=self._updates_handle)
async def _switch_dc(self, new_dc):
async def _switch_dc(self: 'TelegramClient', new_dc):
"""
Permanently switches the current connection to the new data center.
"""
@ -430,7 +437,7 @@ class TelegramBaseClient(abc.ABC):
await self._disconnect()
return await self.connect()
def _auth_key_callback(self, auth_key):
def _auth_key_callback(self: 'TelegramClient', auth_key):
"""
Callback from the sender whenever it needed to generate a
new authorization key. This means we are not authorized.
@ -442,7 +449,7 @@ class TelegramBaseClient(abc.ABC):
# region Working with different connections/Data Centers
async def _get_dc(self, dc_id, cdn=False):
async def _get_dc(self: 'TelegramClient', dc_id, cdn=False):
"""Gets the Data Center (DC) associated to 'dc_id'"""
cls = self.__class__
if not cls._config:
@ -459,7 +466,7 @@ class TelegramBaseClient(abc.ABC):
and bool(dc.ipv6) == self._use_ipv6 and bool(dc.cdn) == cdn
)
async def _create_exported_sender(self, dc_id):
async def _create_exported_sender(self: 'TelegramClient', dc_id):
"""
Creates a new exported `MTProtoSender` for the given `dc_id` and
returns it. This method should be used by `_borrow_exported_sender`.
@ -489,7 +496,7 @@ class TelegramBaseClient(abc.ABC):
await sender.send(req)
return sender
async def _borrow_exported_sender(self, dc_id):
async def _borrow_exported_sender(self: 'TelegramClient', dc_id):
"""
Borrows a connected `MTProtoSender` for the given `dc_id`.
If it's not cached, creates a new one if it doesn't exist yet,
@ -517,7 +524,7 @@ class TelegramBaseClient(abc.ABC):
return sender
async def _return_exported_sender(self, sender):
async def _return_exported_sender(self: 'TelegramClient', sender):
"""
Returns a borrowed exported sender. If all borrows have
been returned, the sender is cleanly disconnected.
@ -532,7 +539,7 @@ class TelegramBaseClient(abc.ABC):
'Disconnecting borrowed sender for DC %d', dc_id)
await sender.disconnect()
async def _get_cdn_client(self, cdn_redirect):
async def _get_cdn_client(self: 'TelegramClient', cdn_redirect):
"""Similar to ._borrow_exported_client, but for CDNs"""
# TODO Implement
raise NotImplementedError
@ -563,7 +570,7 @@ class TelegramBaseClient(abc.ABC):
# region Invoking Telegram requests
@abc.abstractmethod
def __call__(self, request, ordered=False):
def __call__(self: 'TelegramClient', request, ordered=False):
"""
Invokes (sends) one or more MTProtoRequests and returns (receives)
their result.
@ -584,15 +591,15 @@ class TelegramBaseClient(abc.ABC):
raise NotImplementedError
@abc.abstractmethod
def _handle_update(self, update):
def _handle_update(self: 'TelegramClient', update):
raise NotImplementedError
@abc.abstractmethod
def _update_loop(self):
def _update_loop(self: 'TelegramClient'):
raise NotImplementedError
@abc.abstractmethod
async def _handle_auto_reconnect(self):
async def _handle_auto_reconnect(self: 'TelegramClient'):
raise NotImplementedError
# endregion

View File

@ -2,20 +2,22 @@ import asyncio
import itertools
import random
import time
import datetime
import typing
from .users import UserMethods
from .. import events, utils, errors
from ..events.common import EventBuilder, EventCommon
from ..tl import types, functions
from ..events.common import EventCommon
from ..statecache import StateCache
if typing.TYPE_CHECKING:
from .telegramclient import TelegramClient
class UpdateMethods(UserMethods):
# region Public methods
async def _run_until_disconnected(self):
async def _run_until_disconnected(self: 'TelegramClient'):
try:
await self.disconnected
except KeyboardInterrupt:
@ -23,7 +25,7 @@ class UpdateMethods(UserMethods):
finally:
await self.disconnect()
def run_until_disconnected(self):
def run_until_disconnected(self: 'TelegramClient'):
"""
Runs the event loop until `disconnect` is called or if an error
while connecting/sending/receiving occurs in the background. In
@ -43,7 +45,7 @@ class UpdateMethods(UserMethods):
# No loop.run_until_complete; it's already syncified
self.disconnect()
def on(self, event):
def on(self: 'TelegramClient', event: EventBuilder):
"""
Decorator helper method around `add_event_handler`. Example:
@ -67,7 +69,10 @@ class UpdateMethods(UserMethods):
return decorator
def add_event_handler(self, callback, event=None):
def add_event_handler(
self: 'TelegramClient',
callback: callable,
event: EventBuilder = None):
"""
Registers the given callback to be called on the specified event.
@ -100,7 +105,10 @@ class UpdateMethods(UserMethods):
self._event_builders.append((event, callback))
def remove_event_handler(self, callback, event=None):
def remove_event_handler(
self: 'TelegramClient',
callback: callable,
event: EventBuilder = None) -> int:
"""
Inverse operation of :meth:`add_event_handler`.
@ -121,14 +129,15 @@ class UpdateMethods(UserMethods):
return found
def list_event_handlers(self):
def list_event_handlers(self: 'TelegramClient')\
-> typing.Sequence[typing.Tuple[callable, EventBuilder]]:
"""
Lists all added event handlers, returning a list of pairs
consisting of (callback, event).
"""
return [(callback, event) for event, callback in self._event_builders]
async def catch_up(self):
async def catch_up(self: 'TelegramClient'):
"""
"Catches up" on the missed updates while the client was offline.
You should call this method after registering the event handlers
@ -196,7 +205,7 @@ class UpdateMethods(UserMethods):
# It is important to not make _handle_update async because we rely on
# the order that the updates arrive in to update the pts and date to
# be always-increasing. There is also no need to make this async.
def _handle_update(self, update):
def _handle_update(self: 'TelegramClient', update):
self.session.process_entities(update)
self._entity_cache.add(update)
@ -212,7 +221,7 @@ class UpdateMethods(UserMethods):
self._state_cache.update(update)
def _process_update(self, update, entities=None):
def _process_update(self: 'TelegramClient', update, entities=None):
update._entities = entities or {}
# This part is somewhat hot so we don't bother patching
@ -230,7 +239,7 @@ class UpdateMethods(UserMethods):
self._state_cache.update(update)
async def _update_loop(self):
async def _update_loop(self: 'TelegramClient'):
# Pings' ID don't really need to be secure, just "random"
rnd = lambda: random.randrange(-2**63, 2**63)
while self.is_connected():
@ -275,13 +284,13 @@ class UpdateMethods(UserMethods):
except (ConnectionError, asyncio.CancelledError):
return
async def _dispatch_queue_updates(self):
async def _dispatch_queue_updates(self: 'TelegramClient'):
while not self._updates_queue.empty():
await self._dispatch_update(*self._updates_queue.get_nowait())
self._dispatching_updates_queue.clear()
async def _dispatch_update(self, update, channel_id, pts_date):
async def _dispatch_update(self: 'TelegramClient', update, channel_id, pts_date):
if not self._entity_cache.ensure_cached(update):
await self._get_difference(update, channel_id, pts_date)
@ -333,7 +342,7 @@ class UpdateMethods(UserMethods):
self._log[__name__].exception('Unhandled exception on %s',
name)
async def _get_difference(self, update, channel_id, pts_date):
async def _get_difference(self: 'TelegramClient', update, channel_id, pts_date):
"""
Get the difference for this `channel_id` if any, then load entities.
@ -373,7 +382,7 @@ class UpdateMethods(UserMethods):
itertools.chain(result.users, result.chats)
})
async def _handle_auto_reconnect(self):
async def _handle_auto_reconnect(self: 'TelegramClient'):
# TODO Catch-up
return
try:
@ -415,7 +424,7 @@ class EventBuilderDict:
"""
Helper "dictionary" to return events from types and cache them.
"""
def __init__(self, client, update):
def __init__(self, client: 'TelegramClient', update):
self.client = client
self.update = update

View File

@ -3,12 +3,13 @@ import io
import os
import pathlib
import re
import typing
from io import BytesIO
from .buttons import ButtonMethods
from .messageparse import MessageParseMethods
from .users import UserMethods
from .. import utils, helpers
from .. import utils, helpers, hints
from ..tl import types, functions, custom
try:
@ -18,6 +19,10 @@ except ImportError:
PIL = None
if typing.TYPE_CHECKING:
from .telegramclient import TelegramClient
class _CacheType:
"""Like functools.partial but pretends to be the wrapped class."""
def __init__(self, cls):
@ -83,11 +88,24 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
# region Public methods
async def send_file(
self, entity, file, *, caption=None, force_document=False,
progress_callback=None, reply_to=None, attributes=None,
thumb=None, allow_cache=True, parse_mode=(),
voice_note=False, video_note=False, buttons=None, silent=None,
supports_streaming=False, **kwargs):
self: 'TelegramClient',
entity: hints.EntityLike,
file: hints.FileLike,
*,
caption: str = None,
force_document: bool = False,
progress_callback: hints.ProgressCallback = None,
reply_to: hints.MessageIDLike = None,
attributes: typing.Sequence[types.TypeDocumentAttribute] = None,
thumb: hints.FileLike = None,
allow_cache: bool = True,
parse_mode: str = (),
voice_note: bool = False,
video_note: bool = False,
buttons: hints.MarkupLike = None,
silent: bool = None,
supports_streaming: bool = False,
**kwargs) -> types.Message:
"""
Sends a file to the specified entity.
@ -292,7 +310,7 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
return msg
async def _send_album(self, entity, files, caption='',
async def _send_album(self: 'TelegramClient', entity, files, caption='',
progress_callback=None, reply_to=None,
parse_mode=(), silent=None):
"""Specialized version of .send_file for albums"""
@ -360,8 +378,13 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
return [messages[m.media.id.id] for m in media]
async def upload_file(
self, file, *, part_size_kb=None, file_name=None, use_cache=None,
progress_callback=None):
self: 'TelegramClient',
file: hints.FileLike,
*,
part_size_kb: float = None,
file_name: str = None,
use_cache: type = None,
progress_callback: hints.ProgressCallback = None) -> types.TypeInputFile:
"""
Uploads the specified file and returns a handle (an instance of
:tl:`InputFile` or :tl:`InputFileBig`, as required) which can be
@ -607,7 +630,7 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
)
return file_handle, media, as_image
async def _cache_media(self, msg, file, file_handle, image):
async def _cache_media(self: 'TelegramClient', msg, file, file_handle, image):
if file and msg and isinstance(file_handle,
custom.InputSizedFile):
# There was a response message and we didn't use cached

View File

@ -1,18 +1,22 @@
import asyncio
import itertools
import time
import typing
from .telegrambaseclient import TelegramBaseClient
from .. import errors, utils
from .. import errors, utils, hints
from ..errors import MultiError, RPCError
from ..tl import TLObject, TLRequest, types, functions
from ..helpers import retry_range
from ..tl import TLRequest, types, functions
_NOT_A_REQUEST = lambda: TypeError('You can only invoke requests, not types!')
if typing.TYPE_CHECKING:
from .telegramclient import TelegramClient
class UserMethods(TelegramBaseClient):
async def __call__(self, request, ordered=False):
async def __call__(self: 'TelegramClient', request, ordered=False):
requests = (request if utils.is_list_like(request) else (request,))
for r in requests:
if not isinstance(r, TLRequest):
@ -97,7 +101,8 @@ class UserMethods(TelegramBaseClient):
# region Public methods
async def get_me(self, input_peer=False):
async def get_me(self: 'TelegramClient', input_peer: bool = False) \
-> typing.Union[types.User, types.InputPeerUser]:
"""
Gets "me" (the self user) which is currently authenticated,
or None if the request fails (hence, not authenticated).
@ -128,7 +133,7 @@ class UserMethods(TelegramBaseClient):
except errors.UnauthorizedError:
return None
async def is_bot(self):
async def is_bot(self: 'TelegramClient') -> bool:
"""
Return ``True`` if the signed-in user is a bot, ``False`` otherwise.
"""
@ -137,7 +142,7 @@ class UserMethods(TelegramBaseClient):
return self._bot
async def is_user_authorized(self):
async def is_user_authorized(self: 'TelegramClient') -> bool:
"""
Returns ``True`` if the user is authorized.
"""
@ -151,7 +156,9 @@ class UserMethods(TelegramBaseClient):
return self._authorized
async def get_entity(self, entity):
async def get_entity(
self: 'TelegramClient',
entity: hints.EntitiesLike) -> hints.Entity:
"""
Turns the given entity into a valid Telegram :tl:`User`, :tl:`Chat`
or :tl:`Channel`. You can also pass a list or iterable of entities,
@ -243,7 +250,9 @@ class UserMethods(TelegramBaseClient):
return result[0] if single else result
async def get_input_entity(self, peer):
async def get_input_entity(
self: 'TelegramClient',
peer: hints.EntityLike) -> types.TypeInputPeer:
"""
Turns the given peer into its input entity version. Most requests
use this kind of :tl:`InputPeer`, so this is the most suitable call
@ -366,7 +375,10 @@ class UserMethods(TelegramBaseClient):
.format(peer)
)
async def get_peer_id(self, peer, add_mark=True):
async def get_peer_id(
self: 'TelegramClient',
peer: hints.EntityLike,
add_mark: bool = True) -> int:
"""
Gets the ID for the given peer, which may be anything entity-like.
@ -395,7 +407,7 @@ class UserMethods(TelegramBaseClient):
# region Private methods
async def _get_entity_from_string(self, string):
async def _get_entity_from_string(self: 'TelegramClient', string):
"""
Gets a full entity from the given string, which may be a phone or
a username, and processes all the found entities on the session.
@ -459,7 +471,7 @@ class UserMethods(TelegramBaseClient):
'Cannot find any entity corresponding to "{}"'.format(string)
)
async def _get_input_dialog(self, dialog):
async def _get_input_dialog(self: 'TelegramClient', dialog):
"""
Returns a :tl:`InputDialogPeer`. This is a bit tricky because
it may or not need access to the client to convert what's given
@ -476,7 +488,7 @@ class UserMethods(TelegramBaseClient):
return types.InputDialogPeer(await self.get_input_entity(dialog))
async def _get_input_notify(self, notify):
async def _get_input_notify(self: 'TelegramClient', notify):
"""
Returns a :tl:`InputNotifyPeer`. This is a bit tricky because
it may or not need access to the client to convert what's given

View File

@ -2,7 +2,7 @@
import asyncio
import os
import struct
from hashlib import sha1, sha256
from hashlib import sha1
# region Multiple utilities

View File

@ -6,6 +6,7 @@ from .mtprotoplainsender import MTProtoPlainSender
from .authenticator import do_authentication
from .mtprotosender import MTProtoSender
from .connection import (
Connection,
ConnectionTcpFull, ConnectionTcpIntermediate, ConnectionTcpAbridged,
ConnectionTcpObfuscated, ConnectionTcpMTProxyAbridged,
ConnectionTcpMTProxyIntermediate,

View File

@ -1,3 +1,4 @@
from .connection import Connection
from .tcpfull import ConnectionTcpFull
from .tcpintermediate import ConnectionTcpIntermediate
from .tcpabridged import ConnectionTcpAbridged