mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-03-03 19:00:21 +03:00
Cache exported TelegramBareClients instead MtProtoSenders
This commit is contained in:
parent
cf65e5b1cf
commit
639a17aa5a
|
@ -1,22 +1,22 @@
|
|||
import logging
|
||||
import platform
|
||||
from datetime import timedelta
|
||||
from hashlib import md5
|
||||
from os import path
|
||||
|
||||
# Import some externalized utilities to work with the Telegram types and more
|
||||
from . import helpers as utils
|
||||
from .errors import RPCError, InvalidDCError, FloodWaitError
|
||||
from .errors import RPCError, FloodWaitError
|
||||
from .network import authenticator, MtProtoSender, TcpTransport
|
||||
from .utils import get_appropriated_part_size
|
||||
|
||||
# For sending and receiving requests
|
||||
from .tl import MTProtoRequest, Session, JsonSession
|
||||
from .tl import MTProtoRequest
|
||||
from .tl.all_tlobjects import layer
|
||||
from .tl.functions import (InitConnectionRequest, InvokeWithLayerRequest)
|
||||
|
||||
# Initial request
|
||||
from .tl.functions.help import GetConfigRequest
|
||||
from .tl.functions.auth import ImportAuthorizationRequest
|
||||
|
||||
# Easier access for working with media
|
||||
from .tl.functions.upload import (
|
||||
|
@ -56,7 +56,6 @@ class TelegramBareClient:
|
|||
Session must always be a Session instance, and an optional proxy
|
||||
can also be specified to be used on the connection.
|
||||
"""
|
||||
|
||||
self.session = session
|
||||
self.api_id = api_id
|
||||
self.api_hash = api_hash
|
||||
|
@ -71,11 +70,15 @@ class TelegramBareClient:
|
|||
|
||||
# region Connecting
|
||||
|
||||
def connect(self, device_model, system_version, app_version, lang_code):
|
||||
def connect(self, device_model, system_version, app_version, lang_code,
|
||||
exported_auth=None):
|
||||
"""Connects to the Telegram servers, executing authentication if
|
||||
required. Note that authenticating to the Telegram servers is
|
||||
not the same as authenticating the desired user itself, which
|
||||
may require a call (or several) to 'sign_in' for the first time.
|
||||
|
||||
If 'exported_auth' is not None, it will be used instead to
|
||||
determine the authorization key for the current session.
|
||||
"""
|
||||
transport = TcpTransport(self.session.server_address,
|
||||
self.session.port, proxy=self.proxy)
|
||||
|
@ -92,17 +95,28 @@ class TelegramBareClient:
|
|||
|
||||
# Now it's time to send an InitConnectionRequest
|
||||
# This must always be invoked with the layer we'll be using
|
||||
query = InitConnectionRequest(
|
||||
if exported_auth is None:
|
||||
query = GetConfigRequest()
|
||||
else:
|
||||
query = ImportAuthorizationRequest(
|
||||
exported_auth.id, exported_auth.bytes)
|
||||
|
||||
request = InitConnectionRequest(
|
||||
api_id=self.api_id,
|
||||
device_model=device_model,
|
||||
system_version=system_version,
|
||||
app_version=app_version,
|
||||
lang_code=lang_code,
|
||||
query=GetConfigRequest())
|
||||
query=query)
|
||||
|
||||
result = self.invoke(
|
||||
InvokeWithLayerRequest(
|
||||
layer=layer, query=query))
|
||||
layer=layer, query=request))
|
||||
|
||||
if exported_auth is not None:
|
||||
# TODO Don't actually need this for exported authorizations,
|
||||
# they're only valid on such data center.
|
||||
result = self.invoke(GetConfigRequest())
|
||||
|
||||
# We're only interested in the DC options,
|
||||
# although many other options are available!
|
||||
|
@ -180,7 +194,10 @@ class TelegramBareClient:
|
|||
except ConnectionResetError:
|
||||
self._logger.info('Server disconnected us. Reconnecting and '
|
||||
'resending request...')
|
||||
self.reconnect()
|
||||
|
||||
# TODO Don't actually use these values
|
||||
import platform
|
||||
self.reconnect(platform.node(), platform.system(), self.__version__, 'en')
|
||||
return self.invoke(request, timeout=timeout)
|
||||
|
||||
except FloodWaitError:
|
||||
|
@ -274,8 +291,8 @@ class TelegramBareClient:
|
|||
|
||||
If 'progress_callback' is not None, it should be a function that
|
||||
takes two parameters, (bytes_downloaded, total_bytes). Note that
|
||||
'total_bytes' simply equals 'file_size', and may be None."""
|
||||
|
||||
'total_bytes' simply equals 'file_size', and may be None.
|
||||
"""
|
||||
if not part_size_kb:
|
||||
if not file_size:
|
||||
part_size_kb = 64 # Reasonable default
|
||||
|
|
|
@ -116,8 +116,7 @@ class TelegramClient(TelegramBareClient):
|
|||
# Cache "exported" senders 'dc_id: MtProtoSender' and
|
||||
# their corresponding sessions not to recreate them all
|
||||
# the time since it's a (somewhat expensive) process.
|
||||
self._cached_senders = {}
|
||||
self._cached_sessions = {}
|
||||
self._cached_clients = {}
|
||||
self._updates_thread = None
|
||||
self._phone_code_hashes = {}
|
||||
|
||||
|
@ -147,11 +146,10 @@ class TelegramClient(TelegramBareClient):
|
|||
super(TelegramClient, self).disconnect()
|
||||
|
||||
# Also disconnect all the cached senders
|
||||
for sender in self._cached_senders.values():
|
||||
for sender in self._cached_clients.values():
|
||||
sender.disconnect()
|
||||
|
||||
self._cached_senders.clear()
|
||||
self._cached_sessions.clear()
|
||||
self._cached_clients.clear()
|
||||
|
||||
def reconnect(self, new_dc=None, *args):
|
||||
"""Disconnects and connects again (effectively reconnecting).
|
||||
|
@ -173,10 +171,10 @@ class TelegramClient(TelegramBareClient):
|
|||
|
||||
# region Working with different Data Centers
|
||||
|
||||
def _get_exported_sender(self, dc_id, init_connection=False):
|
||||
"""Gets a cached exported MtProtoSender for the desired DC.
|
||||
def _get_exported_client(self, dc_id, init_connection=False):
|
||||
"""Gets a cached exported TelegramBareClient for the desired DC.
|
||||
|
||||
If it's the first time retrieving the MtProtoSender, the
|
||||
If it's the first time retrieving the TelegramBareClient, the
|
||||
current authorization is exported to the new DC so that
|
||||
it can be used there, and the connection is initialized.
|
||||
|
||||
|
@ -186,56 +184,36 @@ class TelegramClient(TelegramBareClient):
|
|||
# Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt
|
||||
# for clearly showing how to export the authorization! ^^
|
||||
|
||||
sender = self._cached_senders.get(dc_id)
|
||||
session = self._cached_sessions.get(dc_id)
|
||||
|
||||
if sender and session:
|
||||
client = self._cached_clients.get(dc_id)
|
||||
if client:
|
||||
if init_connection:
|
||||
sender.disconnect()
|
||||
sender.connect()
|
||||
client.reconnect(
|
||||
device_model=self.device_model,
|
||||
system_version=self.system_version,
|
||||
app_version=self.app_version,
|
||||
lang_code=self.lang_code
|
||||
)
|
||||
|
||||
return sender
|
||||
return client
|
||||
else:
|
||||
dc = self._get_dc(dc_id)
|
||||
|
||||
# Step 1. Export the current authorization to the new DC.
|
||||
# Export the current authorization to the new DC.
|
||||
export_auth = self.invoke(ExportAuthorizationRequest(dc_id))
|
||||
|
||||
# Step 2. Create a transport connected to the new DC.
|
||||
# We also create a temporary session because
|
||||
# it's what will contain the required AuthKey
|
||||
# for MtProtoSender to work.
|
||||
transport = TcpTransport(dc.ip_address, dc.port, proxy=self.proxy)
|
||||
session = Session(None)
|
||||
session.auth_key, session.time_offset = \
|
||||
authenticator.do_authentication(transport)
|
||||
# Create a temporary session for this IP address, which needs
|
||||
# to be different because each auth_key is unique per DC.
|
||||
session = JsonSession(None)
|
||||
session.server_address = dc.ip_address
|
||||
session.port = dc.port
|
||||
client = TelegramBareClient(session, self.api_id, self.api_hash)
|
||||
client.connect(self.device_model, self.system_version,
|
||||
self.app_version, self.lang_code,
|
||||
exported_auth=export_auth)
|
||||
|
||||
# Step 3. After authenticating on the new DC,
|
||||
# we can create the proper MtProtoSender.
|
||||
sender = MtProtoSender(transport, session)
|
||||
sender.connect()
|
||||
|
||||
# InvokeWithLayer(InitConnection(ImportAuthorization(...)))
|
||||
init_connection = InitConnectionRequest(
|
||||
api_id=self.api_id,
|
||||
device_model=platform.node(),
|
||||
system_version=platform.system(),
|
||||
app_version=self.__version__,
|
||||
lang_code='en',
|
||||
query=ImportAuthorizationRequest(
|
||||
export_auth.id, export_auth.bytes)
|
||||
)
|
||||
query = InvokeWithLayerRequest(layer=layer, query=init_connection)
|
||||
|
||||
sender.send(query)
|
||||
sender.receive(query)
|
||||
|
||||
# Step 4. We're connected and using the desired layer!
|
||||
# Don't go through this expensive process every time.
|
||||
self._cached_senders[dc_id] = sender
|
||||
self._cached_sessions[dc_id] = session
|
||||
|
||||
return sender
|
||||
self._cached_clients[dc_id] = client
|
||||
return client
|
||||
|
||||
# endregion
|
||||
|
||||
|
@ -302,12 +280,10 @@ class TelegramClient(TelegramBareClient):
|
|||
ConnectionResetError will be raised if it occurs a second time.
|
||||
"""
|
||||
try:
|
||||
sender = self._get_exported_sender(
|
||||
client = self._get_exported_client(
|
||||
dc_id, init_connection=reconnect)
|
||||
|
||||
sender.send(request)
|
||||
sender.receive(request)
|
||||
return request.result
|
||||
return client.invoke(request)
|
||||
|
||||
except ConnectionResetError:
|
||||
if reconnect:
|
||||
|
|
Loading…
Reference in New Issue
Block a user