Implement some auth methods

This commit is contained in:
Lonami Exo 2023-09-01 17:41:47 +02:00
parent 201497b638
commit da011e4b1d
7 changed files with 337 additions and 55 deletions

View File

@ -1,36 +1,143 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING, Optional, Union
from ...mtproto.mtp.types import RpcError
from ...session.message_box.defs import User as SessionUser
from ...tl import abcs, functions, types
from ..types.chat.user import User
from ..types.login_token import LoginToken
from ..types.password_token import PasswordToken
from .net import connect_sender
if TYPE_CHECKING: if TYPE_CHECKING:
from .client import Client from .client import Client
def start(self: Client) -> None: async def is_authorized(self: Client) -> bool:
self try:
await self(functions.updates.get_state())
return True
except RpcError as e:
if e.code == 401:
return False
raise
async def complete_login(self: Client, auth: abcs.auth.Authorization) -> User:
assert isinstance(auth, types.auth.Authorization)
assert isinstance(auth.user, types.User)
user = User(auth.user)
self._config.session.user = SessionUser(
id=user.id,
dc=self._dc_id,
bot=user.bot,
)
packed = user.pack()
assert packed is not None
self._chat_hashes.set_self_user(packed)
try:
state = await self(functions.updates.get_state())
self._message_box.set_state(state)
except Exception:
pass
return user
async def handle_migrate(self: Client, dc_id: Optional[int]) -> None:
assert dc_id is not None
sender = await connect_sender(dc_id, self._config)
async with self._sender_lock:
self._sender = sender
self._dc_id = dc_id
async def bot_sign_in(self: Client, token: str) -> User:
request = functions.auth.import_bot_authorization(
flags=0,
api_id=self._config.api_id,
api_hash=self._config.api_hash,
bot_auth_token=token,
)
try:
result = await self(request)
except RpcError as e:
if e.name == "USER_MIGRATE":
await handle_migrate(self, e.value)
result = await self(request)
else:
raise
return await complete_login(self, result)
async def request_login_code(self: Client, phone: str) -> LoginToken:
request = functions.auth.send_code(
phone_number=phone,
api_id=self._config.api_id,
api_hash=self._config.api_hash,
settings=types.CodeSettings(
allow_flashcall=False,
current_number=False,
allow_app_hash=False,
allow_missed_call=False,
allow_firebase=False,
logout_tokens=None,
token=None,
app_sandbox=None,
),
)
try:
result = await self(request)
except RpcError as e:
if e.name == "USER_MIGRATE":
await handle_migrate(self, e.value)
result = await self(request)
else:
raise
assert isinstance(result, types.auth.SentCode)
return LoginToken._new(result, phone)
async def sign_in(
self: Client, token: LoginToken, code: str
) -> Union[User, PasswordToken]:
try:
result = await self(
functions.auth.sign_in(
phone_number=token._phone,
phone_code_hash=token._code.phone_code_hash,
phone_code=code,
email_verification=None,
)
)
except RpcError as e:
if e.name == "SESSION_PASSWORD_NEEDED":
return await get_password_information(self)
else:
raise
return await complete_login(self, result)
async def get_password_information(self: Client) -> PasswordToken:
result = self(functions.account.get_password())
assert isinstance(result, types.account.Password)
return PasswordToken._new(result)
async def check_password(
self: Client, token: PasswordToken, password: Union[str, bytes]
) -> User:
self, token, password
raise NotImplementedError raise NotImplementedError
async def sign_in(self: Client) -> None: async def sign_out(self: Client) -> None:
self await self(functions.auth.log_out())
raise NotImplementedError
async def sign_up(self: Client) -> None:
self
raise NotImplementedError
async def send_code_request(self: Client) -> None:
self
raise NotImplementedError
async def qr_login(self: Client) -> None:
self
raise NotImplementedError
async def log_out(self: Client) -> None:
self
raise NotImplementedError

View File

@ -1,15 +1,25 @@
import asyncio import asyncio
from collections import deque from collections import deque
from types import TracebackType from types import TracebackType
from typing import Deque, Optional, Self, Type, TypeVar from typing import Deque, Optional, Self, Type, TypeVar, Union
from ...mtsender.sender import Sender from ...mtsender.sender import Sender
from ...session.chat.hash_cache import ChatHashCache from ...session.chat.hash_cache import ChatHashCache
from ...session.message_box.messagebox import MessageBox from ...session.message_box.messagebox import MessageBox
from ...tl import abcs from ...tl import abcs
from ...tl.core.request import Request from ...tl.core.request import Request
from ..types.chat.user import User
from ..types.login_token import LoginToken
from ..types.password_token import PasswordToken
from .account import edit_2fa, end_takeout, takeout from .account import edit_2fa, end_takeout, takeout
from .auth import log_out, qr_login, send_code_request, sign_in, sign_up, start from .auth import (
bot_sign_in,
check_password,
is_authorized,
request_login_code,
sign_in,
sign_out,
)
from .bots import inline_query from .bots import inline_query
from .buttons import build_reply_markup from .buttons import build_reply_markup
from .chats import ( from .chats import (
@ -54,14 +64,7 @@ from .updates import (
set_receive_updates, set_receive_updates,
) )
from .uploads import send_file, upload_file from .uploads import send_file, upload_file
from .users import ( from .users import get_entity, get_input_entity, get_me, get_peer_id
get_entity,
get_input_entity,
get_me,
get_peer_id,
is_bot,
is_user_authorized,
)
Return = TypeVar("Return") Return = TypeVar("Return")
@ -92,23 +95,25 @@ class Client:
async def edit_2fa(self) -> None: async def edit_2fa(self) -> None:
await edit_2fa(self) await edit_2fa(self)
def start(self) -> None: async def is_authorized(self) -> bool:
start(self) return await is_authorized(self)
async def sign_in(self) -> None: async def bot_sign_in(self, token: str) -> User:
await sign_in(self) return await bot_sign_in(self, token)
async def sign_up(self) -> None: async def request_login_code(self, phone: str) -> LoginToken:
await sign_up(self) return await request_login_code(self, phone)
async def send_code_request(self) -> None: async def sign_in(self, token: LoginToken, code: str) -> Union[User, PasswordToken]:
await send_code_request(self) return await sign_in(self, token, code)
async def qr_login(self) -> None: async def check_password(
await qr_login(self) self, token: PasswordToken, password: Union[str, bytes]
) -> User:
return await check_password(self, token, password)
async def log_out(self) -> None: async def sign_out(self) -> None:
await log_out(self) await sign_out(self)
async def inline_query(self) -> None: async def inline_query(self) -> None:
await inline_query(self) await inline_query(self)
@ -218,12 +223,6 @@ class Client:
async def get_me(self) -> None: async def get_me(self) -> None:
await get_me(self) await get_me(self)
async def is_bot(self) -> None:
await is_bot(self)
async def is_user_authorized(self) -> None:
await is_user_authorized(self)
async def get_entity(self) -> None: async def get_entity(self) -> None:
await get_entity(self) await get_entity(self)

View File

@ -0,0 +1,125 @@
from typing import List, Optional, Self
from ....session.chat.packed import PackedChat, PackedType
from ....tl import abcs, types
from ..meta import NoPublicConstructor
class RestrictionReason(metaclass=NoPublicConstructor):
__slots__ = ("_raw",)
def __init__(self, raw: types.RestrictionReason) -> None:
self._raw = raw
@classmethod
def _from_raw(cls, reason: abcs.RestrictionReason) -> Self:
assert isinstance(reason, types.RestrictionReason)
return cls._create(reason)
@property
def platforms(self) -> List[str]:
return self._raw.platform.split("-")
@property
def reason(self) -> str:
return self._raw.reason
@property
def text(self) -> str:
return self._raw.text
class User(metaclass=NoPublicConstructor):
__slots__ = ("_raw",)
def __init__(self, raw: types.User) -> None:
self._raw = raw
@classmethod
def _from_raw(cls, user: abcs.User) -> Self:
if isinstance(user, types.UserEmpty):
return cls._create(
types.User(
self=False,
contact=False,
mutual_contact=False,
deleted=False,
bot=False,
bot_chat_history=False,
bot_nochats=False,
verified=False,
restricted=False,
min=False,
bot_inline_geo=False,
support=False,
scam=False,
apply_min_photo=False,
fake=False,
bot_attach_menu=False,
premium=False,
attach_menu_enabled=False,
bot_can_edit=False,
id=user.id,
access_hash=None,
first_name=None,
last_name=None,
username=None,
phone=None,
photo=None,
status=None,
bot_info_version=None,
restriction_reason=None,
bot_inline_placeholder=None,
lang_code=None,
emoji_status=None,
usernames=None,
)
)
elif isinstance(user, types.User):
return cls._create(user)
else:
raise RuntimeError("unexpected case")
@property
def id(self) -> int:
return self._raw.id
def pack(self) -> Optional[PackedChat]:
if self._raw.access_hash is not None:
return PackedChat(
ty=PackedType.BOT if self._raw.bot else PackedType.USER,
id=self._raw.id,
access_hash=self._raw.access_hash,
)
else:
return None
@property
def first_name(self) -> str:
return self._raw.first_name or ""
@property
def last_name(self) -> str:
return self._raw.last_name or ""
@property
def full_name(self) -> str:
return f"{self.first_name} {self.last_name}".strip()
@property
def username(self) -> Optional[str]:
return self._raw.username
@property
def phone(self) -> Optional[str]:
return self._raw.phone
@property
def bot(self) -> bool:
return self._raw.bot
@property
def restriction_reasons(self) -> List[RestrictionReason]:
return [
RestrictionReason._from_raw(r) for r in (self._raw.restriction_reason or [])
]

View File

@ -0,0 +1,17 @@
from typing import Self
from telethon._impl.tl import types
from .meta import NoPublicConstructor
class LoginToken(metaclass=NoPublicConstructor):
__slots__ = ("_code", "_phone")
def __init__(self, code: types.auth.SentCode, phone: str) -> None:
self._code = code
self._phone = phone
@classmethod
def _new(cls, code: types.auth.SentCode, phone: str) -> Self:
return cls._create(code, phone)

View File

@ -0,0 +1,14 @@
from typing import Type, TypeVar
T = TypeVar("T")
class NoPublicConstructor(type):
def __call__(cls, *args: object, **kwargs: object) -> None:
raise TypeError(
f"{cls.__module__}.{cls.__qualname__} has no public constructor"
)
@property
def _create(cls: Type[T]) -> Type[T]:
return super().__call__ # type: ignore

View File

@ -0,0 +1,20 @@
from typing import Self
from telethon._impl.tl import types
from .meta import NoPublicConstructor
class PasswordToken(metaclass=NoPublicConstructor):
__slots__ = ("_password",)
def __init__(self, password: types.account.Password) -> None:
self._password = password
@classmethod
def _new(cls, password: types.account.Password) -> Self:
return cls._create(password)
@property
def hint(self) -> str:
return self._password.hint or ""

View File

@ -1,7 +1,7 @@
import os import os
import random import random
from pytest import mark
from pytest import mark
from telethon._impl.client.client.client import Client from telethon._impl.client.client.client import Client
from telethon._impl.client.client.net import Config from telethon._impl.client.client.net import Config
from telethon._impl.session.message_box.defs import Session from telethon._impl.session.message_box.defs import Session