mirror of
synced 2025-02-14 02:20:59 +03:00
The session_state cache can be used instead. This does put get_me with input_peer at a disadvantage, but I expect this is not used all that often, since 'me' does just fine.
437 lines
14 KiB
437 lines
14 KiB
import getpass
import inspect
import os
import sys
import typing
import warnings
import functools
from .._misc import utils, helpers, password as pwd_mod
from .. import errors, _tl
from ..types import _custom
if typing.TYPE_CHECKING:
from .telegramclient import TelegramClient
class StartingClient:
def __init__(self, client, start_fn):
self.client = client
self.start_fn = start_fn
async def __aenter__(self):
await self.start_fn()
return self.client
async def __aexit__(self, *args):
await self.client.__aexit__(*args)
def __await__(self):
return self.__aenter__().__await__()
def start(
self: 'TelegramClient',
phone: typing.Callable[[], str] = lambda: input('Please enter your phone (or bot token): '),
password: typing.Callable[[], str] = lambda: getpass.getpass('Please enter your password: '),
bot_token: str = None,
force_sms: bool = False,
code_callback: typing.Callable[[], typing.Union[str, int]] = None,
first_name: str = 'New User',
last_name: str = '',
max_attempts: int = 3) -> 'TelegramClient':
if code_callback is None:
def code_callback():
return input('Please enter the code you received: ')
elif not callable(code_callback):
raise ValueError(
'The code_callback parameter needs to be a callable '
'function that returns the code you received by Telegram.'
if not phone and not bot_token:
raise ValueError('No phone number or bot token provided.')
if phone and bot_token and not callable(phone):
raise ValueError('Both a phone and a bot token provided, '
'must only provide one of either')
return StartingClient(self, functools.partial(_start,
async def _start(
self: 'TelegramClient', phone, password, bot_token, force_sms,
code_callback, first_name, last_name, max_attempts):
if not self.is_connected():
await self.connect()
# Rather than using `is_user_authorized`, use `get_me`. While this is
# more expensive and needs to retrieve more data from the server, it
# enables the library to warn users trying to login to a different
# account. See #1172.
me = await self.get_me()
if me is not None:
# The warnings here are on a best-effort and may fail.
if bot_token:
# bot_token's first part has the bot ID, but it may be invalid
# so don't try to parse as int (instead cast our ID to string).
if bot_token[:bot_token.find(':')] != str(me.id):
'the session already had an authorized user so it did '
'not login to the bot account using the provided '
'bot_token (it may not be using the user you expect)'
elif phone and not callable(phone) and utils.parse_phone(phone) != me.phone:
'the session already had an authorized user so it did '
'not login to the user account using the provided '
'phone (it may not be using the user you expect)'
return self
if not bot_token:
# Turn the callable into a valid phone number (or bot token)
while callable(phone):
value = phone()
if inspect.isawaitable(value):
value = await value
if ':' in value:
# Bot tokens have 'user_id:access_hash' format
bot_token = value
phone = utils.parse_phone(value) or phone
if bot_token:
await self.sign_in(bot_token=bot_token)
return self
me = None
attempts = 0
two_step_detected = False
await self.send_code_request(phone, force_sms=force_sms)
sign_up = False # assume login
while attempts < max_attempts:
value = code_callback()
if inspect.isawaitable(value):
value = await value
# Since sign-in with no code works (it sends the code)
# we must double-check that here. Else we'll assume we
# logged in, and it will return None as the User.
if not value:
raise errors.PhoneCodeEmptyError(request=None)
if sign_up:
me = await self.sign_up(value, first_name, last_name)
# Raises SessionPasswordNeededError if 2FA enabled
me = await self.sign_in(phone, code=value)
except errors.SessionPasswordNeededError:
two_step_detected = True
except errors.PhoneNumberOccupiedError:
sign_up = False
except errors.PhoneNumberUnoccupiedError:
sign_up = True
except (errors.PhoneCodeEmptyError,
print('Invalid code. Please try again.', file=sys.stderr)
attempts += 1
raise RuntimeError(
'{} consecutive sign-in attempts failed. Aborting'
if two_step_detected:
if not password:
raise ValueError(
"Two-step verification is enabled for this account. "
"Please provide the 'password' argument to 'start()'."
if callable(password):
for _ in range(max_attempts):
value = password()
if inspect.isawaitable(value):
value = await value
me = await self.sign_in(phone=phone, password=value)
except errors.PasswordHashInvalidError:
print('Invalid password. Please try again',
raise errors.PasswordHashInvalidError(request=None)
me = await self.sign_in(phone=phone, password=password)
# We won't reach here if any step failed (exit by exception)
signed, name = 'Signed in successfully as', utils.get_display_name(me)
print(signed, name)
except UnicodeEncodeError:
# Some terminals don't support certain characters
print(signed, name.encode('utf-8', errors='ignore')
.decode('ascii', errors='ignore'))
return self
def _parse_phone_and_hash(self, phone, phone_hash):
Helper method to both parse and validate phone and its hash.
phone = utils.parse_phone(phone) or self._phone
if not phone:
raise ValueError(
'Please make sure to call send_code_request first.'
phone_hash = phone_hash or self._phone_code_hash.get(phone, None)
if not phone_hash:
raise ValueError('You also need to provide a phone_code_hash.')
return phone, phone_hash
async def sign_in(
self: 'TelegramClient',
phone: str = None,
code: typing.Union[str, int] = None,
password: str = None,
bot_token: str = None,
phone_code_hash: str = None) -> 'typing.Union[_tl.User, _tl.auth.SentCode]':
me = await self.get_me()
if me:
return me
if phone and code:
phone, phone_code_hash = \
_parse_phone_and_hash(self, phone, phone_code_hash)
# May raise PhoneCodeEmptyError, PhoneCodeExpiredError,
# PhoneCodeHashEmptyError or PhoneCodeInvalidError.
request = _tl.fn.auth.SignIn(
phone, phone_code_hash, str(code)
elif password:
pwd = await self(_tl.fn.account.GetPassword())
request = _tl.fn.auth.CheckPassword(
pwd_mod.compute_check(pwd, password)
elif bot_token:
request = _tl.fn.auth.ImportBotAuthorization(
flags=0, bot_auth_token=bot_token,
api_id=self.api_id, api_hash=self.api_hash
raise ValueError('You must provide either phone and code, password, or bot_token.')
result = await self(request)
if isinstance(result, _tl.auth.AuthorizationSignUpRequired):
# Emulate pre-layer 104 behaviour
self._tos = result.terms_of_service
raise errors.PhoneNumberUnoccupiedError(request=request)
return await _update_session_state(self, result.user)
async def sign_up(
self: 'TelegramClient',
code: typing.Union[str, int],
first_name: str,
last_name: str = '',
phone: str = None,
phone_code_hash: str = None) -> '_tl.User':
me = await self.get_me()
if me:
return me
# To prevent abuse, one has to try to sign in before signing up. This
# is the current way in which Telegram validates the code to sign up.
# `sign_in` will set `_tos`, so if it's set we don't need to call it
# because the user already tried to sign in.
# We're emulating pre-layer 104 behaviour so except the right error:
if not self._tos:
return await self.sign_in(
except errors.PhoneNumberUnoccupiedError:
pass # code is correct and was used, now need to sign in
if self._tos and self._tos.text:
if self.parse_mode:
t = self.parse_mode.unparse(self._tos.text, self._tos.entities)
t = self._tos.text
phone, phone_code_hash = \
_parse_phone_and_hash(self, phone, phone_code_hash)
result = await self(_tl.fn.auth.SignUp(
if self._tos:
await self(
return await _update_session_state(self, result.user)
async def _update_session_state(self, user, save=True):
Callback called whenever the login or sign up process completes.
Returns the input user parameter.
self._authorized = True
self._session_state.user_id = user.id
self._session_state.bot = user.bot
state = await self(_tl.fn.updates.GetState())
self._session_state.pts = state.pts
self._session_state.qts = state.qts
self._session_state.date = int(state.date.timestamp())
self._session_state.seq = state.seq
await self.session.set_state(self._session_state)
if save:
await self.session.save()
return user
async def send_code_request(
self: 'TelegramClient',
phone: str,
force_sms: bool = False) -> '_tl.auth.SentCode':
result = None
phone = utils.parse_phone(phone) or self._phone
phone_hash = self._phone_code_hash.get(phone)
if not phone_hash:
result = await self(_tl.fn.auth.SendCode(
phone, self.api_id, self.api_hash, _tl.CodeSettings()))
except errors.AuthRestartError:
return await self.send_code_request(phone, force_sms=force_sms)
# If we already sent a SMS, do not resend the code (hash may be empty)
if isinstance(result.type, _tl.auth.SentCodeTypeSms):
force_sms = False
# phone_code_hash may be empty, if it is, do not save it (#1283)
if result.phone_code_hash:
self._phone_code_hash[phone] = phone_hash = result.phone_code_hash
force_sms = True
self._phone = phone
if force_sms:
result = await self(
_tl.fn.auth.ResendCode(phone, phone_hash))
self._phone_code_hash[phone] = result.phone_code_hash
return result
async def qr_login(self: 'TelegramClient', ignored_ids: typing.List[int] = None) -> _custom.QRLogin:
qr_login = _custom.QRLogin(self, ignored_ids or [])
await qr_login.recreate()
return qr_login
async def log_out(self: 'TelegramClient') -> bool:
await self(_tl.fn.auth.LogOut())
except errors.RPCError:
return False
self._authorized = False
await self.disconnect()
return True
async def edit_2fa(
self: 'TelegramClient',
current_password: str = None,
new_password: str = None,
hint: str = '',
email: str = None,
email_code_callback: typing.Callable[[int], str] = None) -> bool:
if new_password is None and current_password is None:
return False
if email and not callable(email_code_callback):
raise ValueError('email present without email_code_callback')
pwd = await self(_tl.fn.account.GetPassword())
pwd.new_algo.salt1 += os.urandom(32)
assert isinstance(pwd, _tl.account.Password)
if not pwd.has_password and current_password:
current_password = None
if current_password:
password = pwd_mod.compute_check(pwd, current_password)
password = _tl.InputCheckPasswordEmpty()
if new_password:
new_password_hash = pwd_mod.compute_digest(
pwd.new_algo, new_password)
new_password_hash = b''
await self(_tl.fn.account.UpdatePasswordSettings(
except errors.EmailUnconfirmedError as e:
code = email_code_callback(e.code_length)
if inspect.isawaitable(code):
code = await code
code = str(code)
await self(_tl.fn.account.ConfirmPasswordEmail(code))
return True