Allow invoking requests on different DCs (#53)

This commit is contained in:
Lonami Exo 2017-05-30 13:03:14 +02:00
parent a427465231
commit 097d1669b2

View File

@ -90,6 +90,12 @@ class TelegramClient:
self._updates_thread_running = Event()
self._updates_thread_receiving = Event()
# Cache "exported" senders 'dc_id: MtProtoSender' and
# their corresponding sessions not to recreate them all
# the time since it's a (somewhat expensive) process.
self._cached_senders = {}
self._cached_sessions = {}
# These will be set later
self._updates_thread = None
self.dc_options = None
@ -156,6 +162,13 @@ class TelegramClient:
self.transport.close()
self.transport = None
# Also disconnect all the cached senders
for sender in self._cached_senders:
sender.disconnect()
self._cached_senders.clear()
self._cached_sessions.clear()
def reconnect(self):
"""Disconnects and connects again (effectively reconnecting)"""
self.disconnect()
@ -187,6 +200,73 @@ class TelegramClient:
return next(dc for dc in self.dc_options if dc.id == dc_id)
def _get_exported_sender(self, dc_id, init_connection=False):
"""Gets a cached exported MtProtoSender for the desired DC.
If it's the first time retrieving the MtProtoSender, the
current authorization is exported to the new DC so that
it can be used there, and the connection is initialized.
If after using the sender a ConnectionResetError is rose,
this method should be called again with init_connection=True
in order to perform the reconnection."""
# Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt
# for clearly showing how to export the authorization! ^^
sender = self._cached_senders.get(dc_id)
session = self._cached_sessions.get(dc_id)
if sender and session:
if init_connection:
# TODO reconnect
pass
return sender
else:
from telethon.tl.functions.auth import \
ExportAuthorizationRequest, ImportAuthorizationRequest
dc = self._get_dc(dc_id)
# Step 1. Export the current authorization to the new DC.
export_auth = self.invoke(ExportAuthorizationRequest(dc_id))
# Step 2. Create a transport connected to the new DC.
# We also create a temporary session because
# it's what will contain the required AuthKey
# for MtProtoSender to work.
transport = TcpTransport(dc.ip_address, dc.port, proxy=self.proxy)
session = Session(None)
session.auth_key, session.time_offset = \
authenticator.do_authentication(transport)
# Step 3. After authenticating on the new DC,
# we can create the proper MtProtoSender.
sender = MtProtoSender(transport, session)
sender.connect()
# InvokeWithLayer(InitConnection(ImportAuthorization(...)))
init_connection = InitConnectionRequest(
api_id=self.api_id,
device_model=platform.node(),
system_version=platform.system(),
app_version=self.__version__,
lang_code='en',
query=ImportAuthorizationRequest(
export_auth.id, export_auth.bytes)
)
query = InvokeWithLayerRequest(layer=layer, query=init_connection)
sender.send(query)
sender.receive(query)
# Step 4. We're connected and using the desired layer!
# Don't go through this expensive process every time.
self._cached_senders[dc_id] = sender
self._cached_sessions[dc_id] = session
return sender
# endregion
# region Telegram requests functions
@ -222,8 +302,13 @@ class TelegramClient:
if throw_invalid_dc:
raise
if error.message.startswith('FILE_MIGRATE_'):
return self.invoke_on_dc(request, error.new_dc,
timeout=timeout)
else:
self._reconnect_to_dc(error.new_dc)
return self.invoke(request, timeout=timeout, throw_invalid_dc=True)
return self.invoke(request,
timeout=timeout, throw_invalid_dc=True)
except ConnectionResetError:
self._logger.info('Server disconnected us. Reconnecting and '
@ -239,6 +324,29 @@ class TelegramClient:
finally:
self._lock.release()
def invoke_on_dc(self, request, dc_id,
timeout=timedelta(seconds=5), reconnect=False):
"""Invokes the given request on a different DC
by making use of the exported MtProtoSenders.
If 'reconnect=True', then the a reconnection will be performed
and ConnectionResetError will be rose if it occurs a second time.
"""
try:
sender = self._get_exported_sender(
dc_id, init_connection=reconnect)
sender.send(request)
sender.receive(request)
return request.result
except ConnectionResetError:
if reconnect:
raise
else:
return self.invoke_on_dc(request, dc_id,
timeout=timeout, reconnect=True)
# region Authorization requests
def is_user_authorized(self):