mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2024-11-25 19:03:46 +03:00
Move auth_key generation and InitConnection logic to .invoke()
The reasoning behind this is that .connect() should not call any request at all, it should only connect to the servers although it currently still calls GetStateRequest. There were some issues (#291, #360) where the auth_key was None (possibly due to .connect() returning False), so this may fix some of the cases where it returned False. This way we also ensure that we always have an auth_key, or even if it "breaks" (it's not the right key for the server anymore). A few additional changes have been introduced to accommodate this, such as moving InitConnection logic too or importing auths.
This commit is contained in:
parent
b3ca68b7d9
commit
ceb37cd4c5
|
@ -33,6 +33,7 @@ from .tl.functions.upload import (
|
|||
GetFileRequest, SaveBigFilePartRequest, SaveFilePartRequest
|
||||
)
|
||||
from .tl.types import InputFile, InputFileBig
|
||||
from .tl.types.auth import ExportedAuthorization
|
||||
from .tl.types.upload import FileCdnRedirect
|
||||
from .update_state import UpdateState
|
||||
from .utils import get_appropriated_part_size
|
||||
|
@ -62,7 +63,7 @@ class TelegramBareClient:
|
|||
__version__ = '0.15.3'
|
||||
|
||||
# TODO Make this thread-safe, all connections share the same DC
|
||||
_dc_options = None
|
||||
_config = None # Server configuration (with .dc_options)
|
||||
|
||||
# region Initialization
|
||||
|
||||
|
@ -161,7 +162,7 @@ class TelegramBareClient:
|
|||
|
||||
# region Connecting
|
||||
|
||||
def connect(self, _exported_auth=None, _sync_updates=True, _cdn=False):
|
||||
def connect(self, _sync_updates=True):
|
||||
"""Connects to the Telegram servers, executing authentication if
|
||||
required. Note that authenticating to the Telegram servers is
|
||||
not the same as authenticating the desired user itself, which
|
||||
|
@ -169,62 +170,24 @@ class TelegramBareClient:
|
|||
|
||||
Note that the optional parameters are meant for internal use.
|
||||
|
||||
If '_exported_auth' is not None, it will be used instead to
|
||||
determine the authorization key for the current session.
|
||||
|
||||
If '_sync_updates', sync_updates() will be called and a
|
||||
second thread will be started if necessary. Note that this
|
||||
will FAIL if the client is not connected to the user's
|
||||
native data center, raising a "UserMigrateError", and
|
||||
calling .disconnect() in the process.
|
||||
|
||||
If '_cdn' is False, methods that are not allowed on such data
|
||||
centers won't be invoked.
|
||||
"""
|
||||
self._main_thread_ident = threading.get_ident()
|
||||
self._background_error = None # Clear previous errors
|
||||
|
||||
try:
|
||||
self._sender.connect()
|
||||
if not self.session.auth_key:
|
||||
# New key, we need to tell the server we're going to use
|
||||
# the latest layer
|
||||
try:
|
||||
self.session.auth_key, self.session.time_offset = \
|
||||
authenticator.do_authentication(self._sender.connection)
|
||||
except BrokenAuthKeyError:
|
||||
return False
|
||||
|
||||
self.session.layer = LAYER
|
||||
self.session.save()
|
||||
init_connection = True
|
||||
else:
|
||||
init_connection = self.session.layer != LAYER
|
||||
|
||||
if init_connection:
|
||||
if _exported_auth is not None:
|
||||
self._init_connection(ImportAuthorizationRequest(
|
||||
_exported_auth.id, _exported_auth.bytes
|
||||
))
|
||||
elif not _cdn:
|
||||
TelegramBareClient._dc_options = \
|
||||
self._init_connection(GetConfigRequest()).dc_options
|
||||
|
||||
elif _exported_auth is not None:
|
||||
self(ImportAuthorizationRequest(
|
||||
_exported_auth.id, _exported_auth.bytes
|
||||
))
|
||||
|
||||
if TelegramBareClient._dc_options is None and not _cdn:
|
||||
TelegramBareClient._dc_options = \
|
||||
self(GetConfigRequest()).dc_options
|
||||
|
||||
# Connection was successful! Try syncing the update state
|
||||
# UNLESS '_sync_updates' is False (we probably are in
|
||||
# another data center and this would raise UserMigrateError)
|
||||
# to also assert whether the user is logged in or not.
|
||||
self._user_connected = True
|
||||
if self._authorized is None and _sync_updates and not _cdn:
|
||||
if self._authorized is None and _sync_updates:
|
||||
try:
|
||||
self.sync_updates()
|
||||
self._set_connected_and_authorized()
|
||||
|
@ -239,11 +202,7 @@ class TelegramBareClient:
|
|||
# This is fine, probably layer migration
|
||||
self._logger.debug('Found invalid item, probably migrating', e)
|
||||
self.disconnect()
|
||||
return self.connect(
|
||||
_exported_auth=_exported_auth,
|
||||
_sync_updates=_sync_updates,
|
||||
_cdn=_cdn
|
||||
)
|
||||
return self.connect(_sync_updates=_sync_updates)
|
||||
|
||||
except (RPCError, ConnectionError) as error:
|
||||
# Probably errors from the previous session, ignore them
|
||||
|
@ -256,8 +215,9 @@ class TelegramBareClient:
|
|||
def is_connected(self):
|
||||
return self._sender.is_connected()
|
||||
|
||||
def _init_connection(self, query=None):
|
||||
result = self(InvokeWithLayerRequest(LAYER, InitConnectionRequest(
|
||||
def _wrap_init_connection(self, query):
|
||||
"""Wraps query around InvokeWithLayerRequest(InitConnectionRequest())"""
|
||||
return InvokeWithLayerRequest(LAYER, InitConnectionRequest(
|
||||
api_id=self.api_id,
|
||||
device_model=self.session.device_model,
|
||||
system_version=self.session.system_version,
|
||||
|
@ -266,10 +226,7 @@ class TelegramBareClient:
|
|||
system_lang_code=self.session.system_lang_code,
|
||||
lang_pack='', # "langPacks are for official apps only"
|
||||
query=query
|
||||
)))
|
||||
self.session.layer = LAYER
|
||||
self.session.save()
|
||||
return result
|
||||
))
|
||||
|
||||
def disconnect(self):
|
||||
"""Disconnects from the Telegram server
|
||||
|
@ -308,13 +265,18 @@ class TelegramBareClient:
|
|||
except ConnectionResetError:
|
||||
return False
|
||||
else:
|
||||
self.disconnect()
|
||||
self.session.auth_key = None # Force creating new auth_key
|
||||
# Since we're reconnecting possibly due to a UserMigrateError,
|
||||
# we need to first know the Data Centers we can connect to. Do
|
||||
# that before disconnecting.
|
||||
dc = self._get_dc(new_dc)
|
||||
ip = dc.ip_address
|
||||
self.session.server_address = ip
|
||||
|
||||
self.session.server_address = dc.ip_address
|
||||
self.session.port = dc.port
|
||||
# auth_key's are associated with a server, which has now changed
|
||||
# so it's not valid anymore. Set to None to force recreating it.
|
||||
self.session.auth_key = None
|
||||
self.session.save()
|
||||
self.disconnect()
|
||||
return self.connect()
|
||||
|
||||
# endregion
|
||||
|
@ -327,10 +289,8 @@ class TelegramBareClient:
|
|||
|
||||
def _get_dc(self, dc_id, ipv6=False, cdn=False):
|
||||
"""Gets the Data Center (DC) associated to 'dc_id'"""
|
||||
if TelegramBareClient._dc_options is None:
|
||||
raise ConnectionError(
|
||||
'Cannot determine the required data center IP address. '
|
||||
'Stabilise a successful initial connection first.')
|
||||
if not TelegramBareClient._config:
|
||||
TelegramBareClient._config = self(GetConfigRequest())
|
||||
|
||||
try:
|
||||
if cdn:
|
||||
|
@ -339,15 +299,15 @@ class TelegramBareClient:
|
|||
rsa.add_key(pk.public_key)
|
||||
|
||||
return next(
|
||||
dc for dc in TelegramBareClient._dc_options if dc.id == dc_id
|
||||
and bool(dc.ipv6) == ipv6 and bool(dc.cdn) == cdn
|
||||
dc for dc in TelegramBareClient._config.dc_options
|
||||
if dc.id == dc_id and bool(dc.ipv6) == ipv6 and bool(dc.cdn) == cdn
|
||||
)
|
||||
except StopIteration:
|
||||
if not cdn:
|
||||
raise
|
||||
|
||||
# New configuration, perhaps a new CDN was added?
|
||||
TelegramBareClient._dc_options = self(GetConfigRequest()).dc_options
|
||||
TelegramBareClient._config = self(GetConfigRequest())
|
||||
return self._get_dc(dc_id, ipv6=ipv6, cdn=cdn)
|
||||
|
||||
def _get_exported_client(self, dc_id):
|
||||
|
@ -387,7 +347,14 @@ class TelegramBareClient:
|
|||
proxy=self._sender.connection.conn.proxy,
|
||||
timeout=self._sender.connection.get_timeout()
|
||||
)
|
||||
client.connect(_exported_auth=export_auth, _sync_updates=False)
|
||||
client.connect(_sync_updates=False)
|
||||
if isinstance(export_auth, ExportedAuthorization):
|
||||
client(ImportAuthorizationRequest(
|
||||
id=export_auth.id, bytes=export_auth.bytes
|
||||
))
|
||||
elif export_auth is not None:
|
||||
self._logger.warning('Unknown return export_auth type', export_auth)
|
||||
|
||||
client._authorized = True # We exported the auth, so we got auth
|
||||
return client
|
||||
|
||||
|
@ -409,9 +376,10 @@ class TelegramBareClient:
|
|||
|
||||
# This will make use of the new RSA keys for this specific CDN.
|
||||
#
|
||||
# This relies on the fact that TelegramBareClient._dc_options is
|
||||
# static and it won't be called from this DC (it would fail).
|
||||
client.connect(_cdn=True) # Avoid invoking non-CDN specific methods
|
||||
# We won't be calling GetConfigRequest because it's only called
|
||||
# when needed by ._get_dc, and also it's static so it's likely
|
||||
# set already. Avoid invoking non-CDN methods by not syncing updates.
|
||||
client.connect(_sync_updates=False)
|
||||
client._authorized = self._authorized
|
||||
return client
|
||||
|
||||
|
@ -472,12 +440,34 @@ class TelegramBareClient:
|
|||
invoke = __call__
|
||||
|
||||
def _invoke(self, sender, call_receive, update_state, *requests):
|
||||
# We need to specify the new layer (by initializing a new
|
||||
# connection) if it has changed from the latest known one.
|
||||
init_connection = self.session.layer != LAYER
|
||||
|
||||
try:
|
||||
# Ensure that we start with no previous errors (i.e. resending)
|
||||
for x in requests:
|
||||
x.confirm_received.clear()
|
||||
x.rpc_error = None
|
||||
|
||||
if not self.session.auth_key:
|
||||
# New key, we need to tell the server we're going to use
|
||||
# the latest layer and initialize the connection doing so.
|
||||
self.session.auth_key, self.session.time_offset = \
|
||||
authenticator.do_authentication(self._sender.connection)
|
||||
init_connection = True
|
||||
|
||||
if init_connection:
|
||||
if len(requests) == 1:
|
||||
requests = [self._wrap_init_connection(requests[0])]
|
||||
else:
|
||||
# We need a SINGLE request (like GetConfig) to init conn.
|
||||
# Once that's done, the N original requests will be
|
||||
# invoked.
|
||||
TelegramBareClient._config = self(
|
||||
self._wrap_init_connection(GetConfigRequest())
|
||||
)
|
||||
|
||||
sender.send(*requests)
|
||||
|
||||
if not call_receive:
|
||||
|
@ -493,6 +483,10 @@ class TelegramBareClient:
|
|||
while not all(x.confirm_received.is_set() for x in requests):
|
||||
sender.receive(update_state=update_state)
|
||||
|
||||
except BrokenAuthKeyError:
|
||||
self._logger.error('Broken auth key, a new one will be generated')
|
||||
self.session.auth_key = None
|
||||
|
||||
except TimeoutError:
|
||||
pass # We will just retry
|
||||
|
||||
|
@ -513,6 +507,12 @@ class TelegramBareClient:
|
|||
sleep(0.1) # Retry forever until we can send the request
|
||||
return None
|
||||
|
||||
if init_connection:
|
||||
# We initialized the connection successfully, even if
|
||||
# a request had an RPC error we have invoked it fine.
|
||||
self.session.layer = LAYER
|
||||
self.session.save()
|
||||
|
||||
try:
|
||||
raise next(x.rpc_error for x in requests if x.rpc_error)
|
||||
except StopIteration:
|
||||
|
|
Loading…
Reference in New Issue
Block a user