Cache exported Sessions instead whole clients

This commit is contained in:
Lonami Exo 2017-09-30 16:32:10 +02:00
parent c1c6df9fd0
commit a35c4b15db

View File

@ -106,10 +106,10 @@ class TelegramBareClient:
# we only want one to actually perform the reconnection.
self._connect_lock = Lock()
# Cache "exported" senders 'dc_id: TelegramBareClient' and
# their corresponding sessions not to recreate them all
# the time since it's a (somewhat expensive) process.
self._cached_clients = {}
# Cache "exported" sessions as 'dc_id: Session' not to recreate
# them all the time since generating a new key is a relatively
# expensive operation.
self._exported_sessions = {}
# This member will process updates if enabled.
# One may change self.updates.enabled at any later point.
@ -154,14 +154,22 @@ class TelegramBareClient:
# region Connecting
def connect(self, exported_auth=None):
def connect(self, _exported_auth=None, _sync_updates=True):
"""Connects to the Telegram servers, executing authentication if
required. Note that authenticating to the Telegram servers is
not the same as authenticating the desired user itself, which
may require a call (or several) to 'sign_in' for the first time.
If 'exported_auth' is not None, it will be used instead to
Note that the optional parameters are meant for internal use.
If '_exported_auth' is not None, it will be used instead to
determine the authorization key for the current session.
If '_sync_updates', sync_updates() will be called and a
second thread will be started if necessary. Note that this
will FAIL if the client is not connected to the user's
native data center, raising a "UserMigrateError", and
calling .disconnect() in the process.
"""
self._main_thread_ident = threading.get_ident()
@ -183,17 +191,17 @@ class TelegramBareClient:
init_connection = self.session.layer != LAYER
if init_connection:
if exported_auth is not None:
if _exported_auth is not None:
self._init_connection(ImportAuthorizationRequest(
exported_auth.id, exported_auth.bytes
_exported_auth.id, _exported_auth.bytes
))
else:
TelegramBareClient._dc_options = \
self._init_connection(GetConfigRequest()).dc_options
elif exported_auth is not None:
elif _exported_auth is not None:
self(ImportAuthorizationRequest(
exported_auth.id, exported_auth.bytes
_exported_auth.id, _exported_auth.bytes
))
if TelegramBareClient._dc_options is None:
@ -201,11 +209,11 @@ class TelegramBareClient:
self(GetConfigRequest()).dc_options
# Connection was successful! Try syncing the update state
# IF we don't have an exported authorization (hence we're
# not in our NATIVE data center or we'd get UserMigrateError)
# UNLESS '_sync_updates' is False (we probably are in
# another data center and this would raise UserMigrateError)
# to also assert whether the user is logged in or not.
self._user_connected = True
if not exported_auth:
if _sync_updates:
try:
self.sync_updates()
self._set_connected_and_authorized()
@ -218,7 +226,10 @@ class TelegramBareClient:
# This is fine, probably layer migration
self._logger.debug('Found invalid item, probably migrating', e)
self.disconnect()
return self.connect(exported_auth=exported_auth)
return self.connect(
_exported_auth=_exported_auth,
_sync_updates=_sync_updates
)
except (RPCError, ConnectionError) as error:
# Probably errors from the previous session, ignore them
@ -258,11 +269,8 @@ class TelegramBareClient:
# ._recv_thread = None, it knows it doesn't have to.
self._sender.disconnect()
# Also disconnect all the cached senders
for sender in self._cached_clients.values():
sender.disconnect()
self._cached_clients.clear()
# TODO Shall we clear the _exported_sessions, or may be reused?
pass
def _reconnect(self, new_dc=None):
"""If 'new_dc' is not set, only a call to .connect() will be made
@ -324,30 +332,22 @@ class TelegramBareClient:
TelegramBareClient._dc_options = self(GetConfigRequest()).dc_options
return self._get_dc(dc_id, ipv6=ipv6, cdn=cdn)
def _get_exported_client(self, dc_id,
init_connection=False,
bypass_cache=False):
"""Gets a cached exported TelegramBareClient for the desired DC.
def _get_exported_client(self, dc_id):
"""Creates and connects a new TelegramBareClient for the desired DC.
If it's the first time retrieving the TelegramBareClient, the
current authorization is exported to the new DC so that
it can be used there, and the connection is initialized.
If after using the sender a ConnectionResetError is raised,
this method should be called again with init_connection=True
in order to perform the reconnection.
If bypass_cache is True, a new client will be exported and
it will not be cached.
If it's the first time calling the method with a given dc_id,
a new session will be first created, and its auth key generated.
Exporting/Importing the authorization will also be done so that
the auth is bound with the key.
"""
# Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt
# for clearly showing how to export the authorization! ^^
client = self._cached_clients.get(dc_id)
if client and not bypass_cache:
if init_connection:
client.reconnect()
return client
session = self._exported_sessions.get(dc_id)
if session:
export_auth = None # Already bound with the auth key
else:
# TODO Add a lock, don't allow two threads to create an auth key
# for the same data center.
dc = self._get_dc(dc_id)
# Export the current authorization to the new DC.
@ -361,17 +361,15 @@ class TelegramBareClient:
session = Session(self.session)
session.server_address = dc.ip_address
session.port = dc.port
client = TelegramBareClient(
session, self.api_id, self.api_hash,
proxy=self._sender.connection.conn.proxy,
timeout=self._sender.connection.get_timeout()
)
client.connect(exported_auth=export_auth)
self._exported_sessions[dc_id] = session
if not bypass_cache:
# Don't go through this expensive process every time.
self._cached_clients[dc_id] = client
return client
client = TelegramBareClient(
session, self.api_id, self.api_hash,
proxy=self._sender.connection.conn.proxy,
timeout=self._sender.connection.get_timeout()
)
client.connect(_exported_auth=export_auth, _sync_updates=False)
return client
# endregion
@ -672,6 +670,9 @@ class TelegramBareClient:
if progress_callback:
progress_callback(f.tell(), file_size)
finally:
if client != self:
client.disconnect()
if cdn_decrypter:
try:
cdn_decrypter.client.disconnect()