Telethon/telethon/_client/auth.py
Lonami Exo f8264abb5a Clean-up client's __init__ and remove entity cache
Entity cache uses are removed. It was a source of ever-growing memory
usage that has to be reworked. This affects everything that tried to
obtain an input entity, input sender or input chat (such as the
SenderGetter or calls to _get_entity_pair). Input entities need to be
reworked in any case.

Its removal also affects the automatic cache of any raw API request.

Raise last error parameter is removed, and its behaviour made default.

The connection type parameter has been removed, since users really have
no need to change it.

A few more attributes have been made private, since users should not
mess with those.
2022-01-18 12:56:17 +01:00

434 lines
14 KiB
Python

import getpass
import inspect
import os
import sys
import typing
import warnings
import functools
import dataclasses
from .._misc import utils, helpers, password as pwd_mod
from .. import errors, _tl
from ..types import _custom
if typing.TYPE_CHECKING:
from .telegramclient import TelegramClient
class StartingClient:
def __init__(self, client, start_fn):
self.client = client
self.start_fn = start_fn
async def __aenter__(self):
await self.start_fn()
return self.client
async def __aexit__(self, *args):
await self.client.__aexit__(*args)
def __await__(self):
return self.__aenter__().__await__()
def start(
self: 'TelegramClient',
phone: typing.Callable[[], str] = lambda: input('Please enter your phone (or bot token): '),
password: typing.Callable[[], str] = lambda: getpass.getpass('Please enter your password: '),
*,
bot_token: str = None,
code_callback: typing.Callable[[], typing.Union[str, int]] = None,
first_name: str = 'New User',
last_name: str = '',
max_attempts: int = 3) -> 'TelegramClient':
if code_callback is None:
def code_callback():
return input('Please enter the code you received: ')
elif not callable(code_callback):
raise ValueError(
'The code_callback parameter needs to be a callable '
'function that returns the code you received by Telegram.'
)
if not phone and not bot_token:
raise ValueError('No phone number or bot token provided.')
if phone and bot_token and not callable(phone):
raise ValueError('Both a phone and a bot token provided, '
'must only provide one of either')
return StartingClient(self, functools.partial(_start,
self=self,
phone=phone,
password=password,
bot_token=bot_token,
code_callback=code_callback,
first_name=first_name,
last_name=last_name,
max_attempts=max_attempts
))
async def _start(
self: 'TelegramClient', phone, password, bot_token,
code_callback, first_name, last_name, max_attempts):
if not self.is_connected():
await self.connect()
# Rather than using `is_user_authorized`, use `get_me`. While this is
# more expensive and needs to retrieve more data from the server, it
# enables the library to warn users trying to login to a different
# account. See #1172.
me = await self.get_me()
if me is not None:
# The warnings here are on a best-effort and may fail.
if bot_token:
# bot_token's first part has the bot ID, but it may be invalid
# so don't try to parse as int (instead cast our ID to string).
if bot_token[:bot_token.find(':')] != str(me.id):
warnings.warn(
'the session already had an authorized user so it did '
'not login to the bot account using the provided '
'bot_token (it may not be using the user you expect)'
)
elif phone and not callable(phone) and utils.parse_phone(phone) != me.phone:
warnings.warn(
'the session already had an authorized user so it did '
'not login to the user account using the provided '
'phone (it may not be using the user you expect)'
)
return self
if not bot_token:
# Turn the callable into a valid phone number (or bot token)
while callable(phone):
value = phone()
if inspect.isawaitable(value):
value = await value
if ':' in value:
# Bot tokens have 'user_id:access_hash' format
bot_token = value
break
phone = utils.parse_phone(value) or phone
if bot_token:
await self.sign_in(bot_token=bot_token)
return self
me = None
attempts = 0
two_step_detected = False
await self.send_code_request(phone)
sign_up = False # assume login
while attempts < max_attempts:
try:
value = code_callback()
if inspect.isawaitable(value):
value = await value
# Since sign-in with no code works (it sends the code)
# we must double-check that here. Else we'll assume we
# logged in, and it will return None as the User.
if not value:
raise errors.PhoneCodeEmptyError(request=None)
if sign_up:
me = await self.sign_up(value, first_name, last_name)
else:
# Raises SessionPasswordNeededError if 2FA enabled
me = await self.sign_in(phone, code=value)
break
except errors.SessionPasswordNeededError:
two_step_detected = True
break
except errors.PhoneNumberOccupiedError:
sign_up = False
except errors.PhoneNumberUnoccupiedError:
sign_up = True
except (errors.PhoneCodeEmptyError,
errors.PhoneCodeExpiredError,
errors.PhoneCodeHashEmptyError,
errors.PhoneCodeInvalidError):
print('Invalid code. Please try again.', file=sys.stderr)
attempts += 1
else:
raise RuntimeError(
'{} consecutive sign-in attempts failed. Aborting'
.format(max_attempts)
)
if two_step_detected:
if not password:
raise ValueError(
"Two-step verification is enabled for this account. "
"Please provide the 'password' argument to 'start()'."
)
if callable(password):
for _ in range(max_attempts):
try:
value = password()
if inspect.isawaitable(value):
value = await value
me = await self.sign_in(phone=phone, password=value)
break
except errors.PasswordHashInvalidError:
print('Invalid password. Please try again',
file=sys.stderr)
else:
raise errors.PasswordHashInvalidError(request=None)
else:
me = await self.sign_in(phone=phone, password=password)
# We won't reach here if any step failed (exit by exception)
signed, name = 'Signed in successfully as', utils.get_display_name(me)
try:
print(signed, name)
except UnicodeEncodeError:
# Some terminals don't support certain characters
print(signed, name.encode('utf-8', errors='ignore')
.decode('ascii', errors='ignore'))
return self
def _parse_phone_and_hash(self, phone, phone_hash):
"""
Helper method to both parse and validate phone and its hash.
"""
phone = utils.parse_phone(phone) or self._phone
if not phone:
raise ValueError(
'Please make sure to call send_code_request first.'
)
phone_hash = phone_hash or self._phone_code_hash.get(phone, None)
if not phone_hash:
raise ValueError('You also need to provide a phone_code_hash.')
return phone, phone_hash
async def sign_in(
self: 'TelegramClient',
phone: str = None,
code: typing.Union[str, int] = None,
*,
password: str = None,
bot_token: str = None,
phone_code_hash: str = None) -> 'typing.Union[_tl.User, _tl.auth.SentCode]':
me = await self.get_me()
if me:
return me
if phone and code:
phone, phone_code_hash = \
_parse_phone_and_hash(self, phone, phone_code_hash)
# May raise PhoneCodeEmptyError, PhoneCodeExpiredError,
# PhoneCodeHashEmptyError or PhoneCodeInvalidError.
request = _tl.fn.auth.SignIn(
phone, phone_code_hash, str(code)
)
elif password:
pwd = await self(_tl.fn.account.GetPassword())
request = _tl.fn.auth.CheckPassword(
pwd_mod.compute_check(pwd, password)
)
elif bot_token:
request = _tl.fn.auth.ImportBotAuthorization(
flags=0, bot_auth_token=bot_token,
api_id=self._api_id, api_hash=self._api_hash
)
else:
raise ValueError('You must provide either phone and code, password, or bot_token.')
result = await self(request)
if isinstance(result, _tl.auth.AuthorizationSignUpRequired):
# Emulate pre-layer 104 behaviour
self._tos = result.terms_of_service
raise errors.PhoneNumberUnoccupiedError(request=request)
return await _update_session_state(self, result.user)
async def sign_up(
self: 'TelegramClient',
code: typing.Union[str, int],
first_name: str,
last_name: str = '',
*,
phone: str = None,
phone_code_hash: str = None) -> '_tl.User':
me = await self.get_me()
if me:
return me
# To prevent abuse, one has to try to sign in before signing up. This
# is the current way in which Telegram validates the code to sign up.
#
# `sign_in` will set `_tos`, so if it's set we don't need to call it
# because the user already tried to sign in.
#
# We're emulating pre-layer 104 behaviour so except the right error:
if not self._tos:
try:
return await self.sign_in(
phone=phone,
code=code,
phone_code_hash=phone_code_hash,
)
except errors.PhoneNumberUnoccupiedError:
pass # code is correct and was used, now need to sign in
if self._tos and self._tos.text:
if self.parse_mode:
t = self.parse_mode.unparse(self._tos.text, self._tos.entities)
else:
t = self._tos.text
sys.stderr.write("{}\n".format(t))
sys.stderr.flush()
phone, phone_code_hash = \
_parse_phone_and_hash(self, phone, phone_code_hash)
result = await self(_tl.fn.auth.SignUp(
phone_number=phone,
phone_code_hash=phone_code_hash,
first_name=first_name,
last_name=last_name
))
if self._tos:
await self(
_tl.fn.help.AcceptTermsOfService(self._tos.id))
return await _update_session_state(self, result.user)
async def _update_session_state(self, user, save=True):
"""
Callback called whenever the login or sign up process completes.
Returns the input user parameter.
"""
state = await self(_tl.fn.updates.GetState())
await _replace_session_state(
self,
save=save,
user_id=user.id,
bot=user.bot,
pts=state.pts,
qts=state.qts,
date=int(state.date.timestamp()),
seq=state.seq,
)
return user
async def _replace_session_state(self, *, save=True, **changes):
new = dataclasses.replace(self._session_state, **changes)
await self._session.set_state(new)
self._session_state = new
if save:
await self._session.save()
async def send_code_request(
self: 'TelegramClient',
phone: str) -> '_tl.auth.SentCode':
result = None
phone = utils.parse_phone(phone) or self._phone
phone_hash = self._phone_code_hash.get(phone)
if phone_hash:
result = await self(
_tl.fn.auth.ResendCode(phone, phone_hash))
self._phone_code_hash[phone] = result.phone_code_hash
else:
try:
result = await self(_tl.fn.auth.SendCode(
phone, self._api_id, self._api_hash, _tl.CodeSettings()))
except errors.AuthRestartError:
return await self.send_code_request(phone)
# phone_code_hash may be empty, if it is, do not save it (#1283)
if result.phone_code_hash:
self._phone_code_hash[phone] = phone_hash = result.phone_code_hash
self._phone = phone
return result
async def qr_login(self: 'TelegramClient', ignored_ids: typing.List[int] = None) -> _custom.QRLogin:
qr_login = _custom.QRLogin(self, ignored_ids or [])
await qr_login.recreate()
return qr_login
async def log_out(self: 'TelegramClient') -> bool:
try:
await self(_tl.fn.auth.LogOut())
except errors.RPCError:
return False
self._state_cache.reset()
await self.disconnect()
return True
async def edit_2fa(
self: 'TelegramClient',
current_password: str = None,
new_password: str = None,
*,
hint: str = '',
email: str = None,
email_code_callback: typing.Callable[[int], str] = None) -> bool:
if new_password is None and current_password is None:
return False
if email and not callable(email_code_callback):
raise ValueError('email present without email_code_callback')
pwd = await self(_tl.fn.account.GetPassword())
pwd.new_algo.salt1 += os.urandom(32)
assert isinstance(pwd, _tl.account.Password)
if not pwd.has_password and current_password:
current_password = None
if current_password:
password = pwd_mod.compute_check(pwd, current_password)
else:
password = _tl.InputCheckPasswordEmpty()
if new_password:
new_password_hash = pwd_mod.compute_digest(
pwd.new_algo, new_password)
else:
new_password_hash = b''
try:
await self(_tl.fn.account.UpdatePasswordSettings(
password=password,
new_settings=_tl.account.PasswordInputSettings(
new_algo=pwd.new_algo,
new_password_hash=new_password_hash,
hint=hint,
email=email,
new_secure_settings=None
)
))
except errors.EmailUnconfirmedError as e:
code = email_code_callback(e.code_length)
if inspect.isawaitable(code):
code = await code
code = str(code)
await self(_tl.fn.account.ConfirmPasswordEmail(code))
return True