From 0a693c705a13c0e5146296768a7fd6c3d876a0b1 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 29 Sep 2017 19:55:14 +0200 Subject: [PATCH 01/18] Create a new connection when called from a different thread This allows to invoke several requests in parallel while not waiting for other requests to be written to the network. --- telethon/telegram_bare_client.py | 17 ++++++++++----- telethon/telegram_client.py | 36 ++++++++++++++++++++++++++++---- 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index f4d3d8da..a6d78c14 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -290,7 +290,7 @@ class TelegramBareClient: # region Invoking Telegram requests - def invoke(self, *requests, call_receive=True, retries=5): + def invoke(self, *requests, call_receive=True, retries=5, sender=None): """Invokes (sends) a MTProtoRequest and returns (receives) its result. If 'updates' is not None, all read update object will be put @@ -307,13 +307,16 @@ class TelegramBareClient: if retries <= 0: raise ValueError('Number of retries reached 0.') + if sender is None: + sender = self._sender + try: # Ensure that we start with no previous errors (i.e. resending) for x in requests: x.confirm_received.clear() x.rpc_error = None - self._sender.send(*requests) + sender.send(*requests) if not call_receive: # TODO This will be slightly troublesome if we allow # switching between constant read or not on the fly. @@ -321,11 +324,11 @@ class TelegramBareClient: # in which case a Lock would be required for .receive(). for x in requests: x.confirm_received.wait( - self._sender.connection.get_timeout() + sender.connection.get_timeout() ) else: while not all(x.confirm_received.is_set() for x in requests): - self._sender.receive(update_state=self.updates) + sender.receive(update_state=self.updates) except TimeoutError: pass # We will just retry @@ -333,9 +336,13 @@ class TelegramBareClient: except ConnectionResetError: self._logger.debug('Server disconnected us. Reconnecting and ' 'resending request...') - self._reconnect() + if sender != self._sender: + sender.connect() + else: + self._reconnect() except FloodWaitError: + sender.disconnect() self.disconnect() raise diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 6092c70c..4c686c83 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -18,7 +18,7 @@ from .errors import ( PhoneMigrateError, NetworkMigrateError, UserMigrateError, PhoneCodeExpiredError, PhoneCodeHashEmptyError, PhoneCodeInvalidError ) -from .network import ConnectionMode +from .network import Connection, ConnectionMode, MtProtoSender from .tl import Session, TLObject from .tl.functions import PingRequest from .tl.functions.account import ( @@ -146,6 +146,11 @@ class TelegramClient(TelegramBareClient): # Constantly read for results and updates from within the main client self._recv_thread = None + # Identifier of the main thread (the one that called .connect()). + # This will be used to create new connections from any other thread, + # so that requests can be sent in parallel. + self._main_thread_ident = None + # Default PingRequest delay self._last_ping = datetime.now() self._ping_delay = timedelta(minutes=1) @@ -162,6 +167,8 @@ class TelegramClient(TelegramBareClient): exported_auth is meant for internal purposes and can be ignored. """ + self._main_thread_ident = threading.get_ident() + if socks and self._recv_thread: # Treat proxy errors specially since they're not related to # Telegram itself, but rather to the proxy. If any happens on @@ -246,23 +253,40 @@ class TelegramClient(TelegramBareClient): """ # This is only valid when the read thread is reconnecting, # that is, the connection lock is locked. - if self._on_read_thread() and not self._connect_lock.locked(): + on_read_thread = self._on_read_thread() + if on_read_thread and not self._connect_lock.locked(): return # Just ignore, we would be raising and crashing the thread self.updates.check_error() + # Determine the sender to be used (main or a new connection) + # TODO Polish this so it's nicer + on_main_thread = threading.get_ident() == self._main_thread_ident + if on_main_thread or on_read_thread: + sender = self._sender + else: + conn = Connection( + self.session.server_address, self.session.port, + mode=self._sender.connection._mode, + proxy=self._sender.connection.conn.proxy, + timeout=self._sender.connection.get_timeout() + ) + sender = MtProtoSender(self.session, conn) + sender.connect() + try: # We should call receive from this thread if there's no background # thread reading or if the server disconnected us and we're trying # to reconnect. This is because the read thread may either be # locked also trying to reconnect or we may be said thread already. - call_receive = \ + call_receive = not on_main_thread or \ self._recv_thread is None or self._connect_lock.locked() return super().invoke( *requests, call_receive=call_receive, - retries=kwargs.get('retries', 5) + retries=kwargs.get('retries', 5), + sender=sender ) except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e: @@ -284,6 +308,10 @@ class TelegramClient(TelegramBareClient): while self._user_connected and not self._reconnect(): sleep(0.1) # Retry forever until we can send the request + finally: + if sender != self._sender: + sender.disconnect() + # Let people use client(SomeRequest()) instead client.invoke(...) __call__ = invoke From b61deb5cfb8cbe8e6b5d9436d9d2ab18f6b83f01 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 29 Sep 2017 20:10:16 +0200 Subject: [PATCH 02/18] Delete methods to create_new_connection and invoke_on_dc --- telethon/telegram_client.py | 42 ------------------------------------- 1 file changed, 42 deletions(-) diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 4c686c83..9db87684 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -219,29 +219,6 @@ class TelegramClient(TelegramBareClient): return self._recv_thread is not None and \ threading.get_ident() == self._recv_thread.ident - def create_new_connection(self, on_dc=None, timeout=timedelta(seconds=5)): - """Creates a new connection which can be used in parallel - with the original TelegramClient. A TelegramBareClient - will be returned already connected, and the caller is - responsible to disconnect it. - - If 'on_dc' is None, the new client will run on the same - data center as the current client (most common case). - - If the client is meant to be used on a different data - center, the data center ID should be specified instead. - """ - if on_dc is None: - client = TelegramBareClient( - self.session, self.api_id, self.api_hash, - proxy=self._sender.connection.conn.proxy, timeout=timeout - ) - client.connect() - else: - client = self._get_exported_client(on_dc, bypass_cache=True) - - return client - # endregion # region Telegram requests functions @@ -315,25 +292,6 @@ class TelegramClient(TelegramBareClient): # Let people use client(SomeRequest()) instead client.invoke(...) __call__ = invoke - def invoke_on_dc(self, request, dc_id, reconnect=False): - """Invokes the given request on a different DC - by making use of the exported MtProtoSenders. - - If 'reconnect=True', then the a reconnection will be performed and - ConnectionResetError will be raised if it occurs a second time. - """ - try: - client = self._get_exported_client( - dc_id, init_connection=reconnect) - - return client.invoke(request) - - except ConnectionResetError: - if reconnect: - raise - else: - return self.invoke_on_dc(request, dc_id, reconnect=True) - # region Authorization requests def is_user_authorized(self): From 479afddf507efac009ae28f8b6569e2549a06637 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 29 Sep 2017 20:50:27 +0200 Subject: [PATCH 03/18] Move core functionality to the TelegramBareClient Rationale: the intended behaviour of the TelegramClient will now be to focus on abstracting the users from manually importing requests and types to work with Telegram's API. Thus, all the core functionality has been moved to the TelegramBareClient, which will now be responsible of spawning new threads or connections and even handling updates. This way there is a clear distinction between the two clients, TelegramClient is the one meant to be exposed to the end user, since it provides all the mentioned abstractions, while the TelegramBareClient is the "basic" client needed to work with the API in a comfortable way. There is still a need for an MtProtoSender, which still even lower level, and knows as little as possible of what requests are. This handles parsing the messages received from the server so that their result can be understood. --- telethon/telegram_bare_client.py | 252 ++++++++++++++++++++++++++--- telethon/telegram_client.py | 267 +------------------------------ 2 files changed, 232 insertions(+), 287 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index a6d78c14..38fd1555 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -1,21 +1,24 @@ import logging -from datetime import timedelta +import os +import threading +from datetime import timedelta, datetime from hashlib import md5 from io import BytesIO -from os import path from threading import Lock +from time import sleep from . import helpers as utils from .crypto import rsa, CdnDecrypter from .errors import ( RPCError, BrokenAuthKeyError, - FloodWaitError, FileMigrateError, TypeNotFoundError + FloodWaitError, FileMigrateError, TypeNotFoundError, + UnauthorizedError, PhoneMigrateError, NetworkMigrateError, UserMigrateError ) from .network import authenticator, MtProtoSender, Connection, ConnectionMode from .tl import TLObject, Session from .tl.all_tlobjects import LAYER from .tl.functions import ( - InitConnectionRequest, InvokeWithLayerRequest + InitConnectionRequest, InvokeWithLayerRequest, PingRequest ) from .tl.functions.auth import ( ImportAuthorizationRequest, ExportAuthorizationRequest @@ -23,6 +26,7 @@ from .tl.functions.auth import ( from .tl.functions.help import ( GetCdnConfigRequest, GetConfigRequest ) +from .tl.functions.updates import GetStateRequest from .tl.functions.upload import ( GetFileRequest, SaveBigFilePartRequest, SaveFilePartRequest ) @@ -64,11 +68,22 @@ class TelegramBareClient: connection_mode=ConnectionMode.TCP_FULL, proxy=None, process_updates=False, - timeout=timedelta(seconds=5)): - """Initializes the Telegram client with the specified API ID and Hash. - Session must always be a Session instance, and an optional proxy - can also be specified to be used on the connection. - """ + timeout=timedelta(seconds=5), + **kwargs): + """Refer to TelegramClient.__init__ for docs on this method""" + if not api_id or not api_hash: + raise PermissionError( + "Your API ID or Hash cannot be empty or None. " + "Refer to Telethon's README.rst for more information.") + + # Determine what session object we have + if isinstance(session, str) or session is None: + session = Session.try_load_or_create_new(session) + elif not isinstance(session, Session): + raise ValueError( + 'The given session must be a str or a Session instance.' + ) + self.session = session self.api_id = int(api_id) self.api_hash = api_hash @@ -95,6 +110,39 @@ class TelegramBareClient: # One may change self.updates.enabled at any later point. self.updates = UpdateState(process_updates) + # Used on connection - the user may modify these and reconnect + kwargs['app_version'] = kwargs.get('app_version', self.__version__) + for name, value in kwargs.items(): + if hasattr(self.session, name): + setattr(self.session, name, value) + + # Despite the state of the real connection, keep track of whether + # the user has explicitly called .connect() or .disconnect() here. + # This information is required by the read thread, who will be the + # one attempting to reconnect on the background *while* the user + # doesn't explicitly call .disconnect(), thus telling it to stop + # retrying. The main thread, knowing there is a background thread + # attempting reconnection as soon as it happens, will just sleep. + self._user_connected = False + + # Save whether the user is authorized here (a.k.a. logged in) + self._authorized = False + + # Uploaded files cache so subsequent calls are instant + self._upload_cache = {} + + # Constantly read for results and updates from within the main client + self._recv_thread = None + + # Identifier of the main thread (the one that called .connect()). + # This will be used to create new connections from any other thread, + # so that requests can be sent in parallel. + self._main_thread_ident = None + + # Default PingRequest delay + self._last_ping = datetime.now() + self._ping_delay = timedelta(minutes=1) + # endregion # region Connecting @@ -108,6 +156,8 @@ class TelegramBareClient: If 'exported_auth' is not None, it will be used instead to determine the authorization key for the current session. """ + self._main_thread_ident = threading.get_ident() + try: self._sender.connect() if not self.session.auth_key: @@ -143,6 +193,15 @@ class TelegramBareClient: TelegramBareClient._dc_options = \ self(GetConfigRequest()).dc_options + # Connection was successful! Try syncing the update state + # to also assert whether the user is logged in or not. + self._user_connected = True + try: + self.sync_updates() + self._set_connected_and_authorized() + except UnauthorizedError: + self._authorized = False + return True except TypeNotFoundError as e: @@ -178,9 +237,23 @@ class TelegramBareClient: return result def disconnect(self): - """Disconnects from the Telegram server""" + """Disconnects from the Telegram server + and stops all the spawned threads""" + self._user_connected = False + self._recv_thread = None + + # This will trigger a "ConnectionResetError", for subsequent calls + # to read or send (from another thread) and usually, the background + # thread would try restarting the connection but since the + # ._recv_thread = None, it knows it doesn't have to. self._sender.disconnect() + # Also disconnect all the cached senders + for sender in self._cached_clients.values(): + sender.disconnect() + + self._cached_clients.clear() + def _reconnect(self, new_dc=None): """If 'new_dc' is not set, only a call to .connect() will be made since it's assumed that the connection has been lost and the @@ -210,7 +283,11 @@ class TelegramBareClient: # endregion - # region Working with different Data Centers + # region Working with different connections/Data Centers + + def _on_read_thread(self): + return self._recv_thread is not None and \ + threading.get_ident() == self._recv_thread.ident def _get_dc(self, dc_id, ipv6=False, cdn=False): """Gets the Data Center (DC) associated to 'dc_id'""" @@ -290,16 +367,21 @@ class TelegramBareClient: # region Invoking Telegram requests - def invoke(self, *requests, call_receive=True, retries=5, sender=None): + def invoke(self, *requests, call_receive=True, retries=5): """Invokes (sends) a MTProtoRequest and returns (receives) its result. - If 'updates' is not None, all read update object will be put - in such list. Otherwise, update objects will be ignored. - - If 'call_receive' is set to False, then there should be another - thread calling to 'self._sender.receive()' running or this method - will lock forever. + The invoke will be retried up to 'retries' times before raising + ValueError(). """ + # This is only valid when the read thread is reconnecting, + # that is, the connection lock is locked. + on_read_thread = self._on_read_thread() + if on_read_thread and not self._connect_lock.locked(): + return # Just ignore, we would be raising and crashing the thread + + # Any error from a background thread will be "posted" and checked here + self.updates.check_error() + if not all(isinstance(x, TLObject) and x.content_related for x in requests): raise ValueError('You can only invoke requests, not types!') @@ -307,8 +389,20 @@ class TelegramBareClient: if retries <= 0: raise ValueError('Number of retries reached 0.') - if sender is None: + # Determine the sender to be used (main or a new connection) + # TODO Polish this so it's nicer + on_main_thread = threading.get_ident() == self._main_thread_ident + if on_main_thread or on_read_thread: sender = self._sender + else: + conn = Connection( + self.session.server_address, self.session.port, + mode=self._sender.connection._mode, + proxy=self._sender.connection.conn.proxy, + timeout=self._sender.connection.get_timeout() + ) + sender = MtProtoSender(self.session, conn) + sender.connect() try: # Ensure that we start with no previous errors (i.e. resending) @@ -317,6 +411,14 @@ class TelegramBareClient: x.rpc_error = None sender.send(*requests) + + # We should call receive from this thread if there's no background + # thread reading or if the server disconnected us and we're trying + # to reconnect. This is because the read thread may either be + # locked also trying to reconnect or we may be said thread already. + call_receive = not on_main_thread or \ + self._recv_thread is None or self._connect_lock.locked() + if not call_receive: # TODO This will be slightly troublesome if we allow # switching between constant read or not on the fly. @@ -330,22 +432,49 @@ class TelegramBareClient: while not all(x.confirm_received.is_set() for x in requests): sender.receive(update_state=self.updates) + except (PhoneMigrateError, NetworkMigrateError, + UserMigrateError) as e: + self._logger.debug( + 'DC error when invoking request, ' + 'attempting to reconnect at DC {}'.format(e.new_dc) + ) + + # TODO What happens with the background thread here? + # For normal use cases, this won't happen, because this will only + # be on the very first connection (not authorized, not running), + # but may be an issue for people who actually travel? + self._reconnect(new_dc=e.new_dc) + return self.invoke( + *requests, call_receive=call_receive, retries=(retries - 1) + ) + except TimeoutError: pass # We will just retry except ConnectionResetError: + if self._connect_lock.locked(): + # We are connecting and we don't want to reconnect there... + raise + self._logger.debug('Server disconnected us. Reconnecting and ' 'resending request...') + if sender != self._sender: + # TODO Try reconnecting forever too? sender.connect() else: - self._reconnect() + while self._user_connected and not self._reconnect(): + sleep(0.1) # Retry forever until we can send the request except FloodWaitError: sender.disconnect() self.disconnect() raise + finally: + if sender != self._sender: + sender.disconnect() + try: raise next(x.rpc_error for x in requests if x.rpc_error) except StopIteration: @@ -363,6 +492,13 @@ class TelegramBareClient: # Let people use client(SomeRequest()) instead client.invoke(...) __call__ = invoke + # Some really basic functionality + + def is_user_authorized(self): + """Has the user been authorized yet + (code request sent and confirmed)?""" + return self._authorized + # endregion # region Uploading media @@ -388,10 +524,10 @@ class TelegramBareClient: Default values for the optional parameters if left as None are: part_size_kb = get_appropriated_part_size(file_size) - file_name = path.basename(file_path) + file_name = os.path.basename(file_path) """ if isinstance(file, str): - file_size = path.getsize(file) + file_size = os.path.getsize(file) elif isinstance(file, bytes): file_size = len(file) else: @@ -447,7 +583,7 @@ class TelegramBareClient: # Set a default file name if None was specified if not file_name: if isinstance(file, str): - file_name = path.basename(file) + file_name = os.path.basename(file) else: file_name = str(file_id) @@ -544,3 +680,73 @@ class TelegramBareClient: f.close() # endregion + + # region Updates handling + + def sync_updates(self): + """Synchronizes self.updates to their initial state. Will be + called automatically on connection if self.updates.enabled = True, + otherwise it should be called manually after enabling updates. + """ + self.updates.process(self(GetStateRequest())) + + def add_update_handler(self, handler): + """Adds an update handler (a function which takes a TLObject, + an update, as its parameter) and listens for updates""" + sync = not self.updates.handlers + self.updates.handlers.append(handler) + if sync: + self.sync_updates() + + def remove_update_handler(self, handler): + self.updates.handlers.remove(handler) + + def list_update_handlers(self): + return self.updates.handlers[:] + + # endregion + + # Constant read + + def _set_connected_and_authorized(self): + self._authorized = True + if self._recv_thread is None: + self._recv_thread = threading.Thread( + name='ReadThread', daemon=True, + target=self._recv_thread_impl + ) + self._recv_thread.start() + + # By using this approach, another thread will be + # created and started upon connection to constantly read + # from the other end. Otherwise, manual calls to .receive() + # must be performed. The MtProtoSender cannot be connected, + # or an error will be thrown. + # + # This way, sending and receiving will be completely independent. + def _recv_thread_impl(self): + while self._user_connected: + try: + if datetime.now() > self._last_ping + self._ping_delay: + self._sender.send(PingRequest( + int.from_bytes(os.urandom(8), 'big', signed=True) + )) + self._last_ping = datetime.now() + + self._sender.receive(update_state=self.updates) + except TimeoutError: + # No problem. + pass + except ConnectionResetError: + self._logger.debug('Server disconnected us. Reconnecting...') + while self._user_connected and not self._reconnect(): + sleep(0.1) # Retry forever, this is instant messaging + + except Exception as e: + # Unknown exception, pass it to the main thread + self.updates.set_error(e) + break + + self._recv_thread = None + + # endregion diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 9db87684..a28d2c62 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -1,10 +1,7 @@ import os -import threading from datetime import datetime, timedelta from functools import lru_cache from mimetypes import guess_type -from threading import Thread -from time import sleep try: import socks @@ -15,12 +12,10 @@ from . import TelegramBareClient from . import helpers as utils from .errors import ( RPCError, UnauthorizedError, InvalidParameterError, PhoneCodeEmptyError, - PhoneMigrateError, NetworkMigrateError, UserMigrateError, PhoneCodeExpiredError, PhoneCodeHashEmptyError, PhoneCodeInvalidError ) -from .network import Connection, ConnectionMode, MtProtoSender -from .tl import Session, TLObject -from .tl.functions import PingRequest +from .network import ConnectionMode +from .tl import TLObject from .tl.functions.account import ( GetPasswordRequest ) @@ -35,9 +30,6 @@ from .tl.functions.messages import ( GetDialogsRequest, GetHistoryRequest, ReadHistoryRequest, SendMediaRequest, SendMessageRequest ) -from .tl.functions.updates import ( - GetStateRequest -) from .tl.functions.users import ( GetUsersRequest ) @@ -98,18 +90,6 @@ class TelegramClient(TelegramBareClient): system_lang_code = lang_code report_errors = True """ - if not api_id or not api_hash: - raise PermissionError( - "Your API ID or Hash cannot be empty or None. " - "Refer to Telethon's README.rst for more information.") - - # Determine what session object we have - if isinstance(session, str) or session is None: - session = Session.try_load_or_create_new(session) - elif not isinstance(session, Session): - raise ValueError( - 'The given session must be a str or a Session instance.') - super().__init__( session, api_id, api_hash, connection_mode=connection_mode, @@ -118,187 +98,16 @@ class TelegramClient(TelegramBareClient): timeout=timeout ) - # Used on connection - the user may modify these and reconnect - kwargs['app_version'] = kwargs.get('app_version', self.__version__) - for name, value in kwargs.items(): - if hasattr(self.session, name): - setattr(self.session, name, value) - - self._updates_thread = None + # Some fields to easy signing in self._phone_code_hash = None self._phone = None - # Despite the state of the real connection, keep track of whether - # the user has explicitly called .connect() or .disconnect() here. - # This information is required by the read thread, who will be the - # one attempting to reconnect on the background *while* the user - # doesn't explicitly call .disconnect(), thus telling it to stop - # retrying. The main thread, knowing there is a background thread - # attempting reconnection as soon as it happens, will just sleep. - self._user_connected = False - - # Save whether the user is authorized here (a.k.a. logged in) - self._authorized = False - - # Uploaded files cache so subsequent calls are instant - self._upload_cache = {} - - # Constantly read for results and updates from within the main client - self._recv_thread = None - - # Identifier of the main thread (the one that called .connect()). - # This will be used to create new connections from any other thread, - # so that requests can be sent in parallel. - self._main_thread_ident = None - - # Default PingRequest delay - self._last_ping = datetime.now() - self._ping_delay = timedelta(minutes=1) - - # endregion - - # region Connecting - - def connect(self, exported_auth=None): - """Connects to the Telegram servers, executing authentication if - required. Note that authenticating to the Telegram servers is - not the same as authenticating the desired user itself, which - may require a call (or several) to 'sign_in' for the first time. - - exported_auth is meant for internal purposes and can be ignored. - """ - self._main_thread_ident = threading.get_ident() - - if socks and self._recv_thread: - # Treat proxy errors specially since they're not related to - # Telegram itself, but rather to the proxy. If any happens on - # the read thread forward it to the main thread. - try: - ok = super().connect(exported_auth=exported_auth) - except socks.ProxyConnectionError as e: - ok = False - # Report the exception to the main thread - self.updates.set_error(e) - else: - ok = super().connect(exported_auth=exported_auth) - - if not ok: - return False - - self._user_connected = True - try: - self.sync_updates() - self._set_connected_and_authorized() - except UnauthorizedError: - self._authorized = False - - return True - - def disconnect(self): - """Disconnects from the Telegram server - and stops all the spawned threads""" - self._user_connected = False - self._recv_thread = None - - # This will trigger a "ConnectionResetError", usually, the background - # thread would try restarting the connection but since the - # ._recv_thread = None, it knows it doesn't have to. - super().disconnect() - - # Also disconnect all the cached senders - for sender in self._cached_clients.values(): - sender.disconnect() - - self._cached_clients.clear() - - # endregion - - # region Working with different connections - - def _on_read_thread(self): - return self._recv_thread is not None and \ - threading.get_ident() == self._recv_thread.ident - # endregion # region Telegram requests functions - def invoke(self, *requests, **kwargs): - """Invokes (sends) one or several MTProtoRequest and returns - (receives) their result. An optional named 'retries' parameter - can be used, indicating how many times it should retry. - """ - # This is only valid when the read thread is reconnecting, - # that is, the connection lock is locked. - on_read_thread = self._on_read_thread() - if on_read_thread and not self._connect_lock.locked(): - return # Just ignore, we would be raising and crashing the thread - - self.updates.check_error() - - # Determine the sender to be used (main or a new connection) - # TODO Polish this so it's nicer - on_main_thread = threading.get_ident() == self._main_thread_ident - if on_main_thread or on_read_thread: - sender = self._sender - else: - conn = Connection( - self.session.server_address, self.session.port, - mode=self._sender.connection._mode, - proxy=self._sender.connection.conn.proxy, - timeout=self._sender.connection.get_timeout() - ) - sender = MtProtoSender(self.session, conn) - sender.connect() - - try: - # We should call receive from this thread if there's no background - # thread reading or if the server disconnected us and we're trying - # to reconnect. This is because the read thread may either be - # locked also trying to reconnect or we may be said thread already. - call_receive = not on_main_thread or \ - self._recv_thread is None or self._connect_lock.locked() - - return super().invoke( - *requests, - call_receive=call_receive, - retries=kwargs.get('retries', 5), - sender=sender - ) - - except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e: - self._logger.debug('DC error when invoking request, ' - 'attempting to reconnect at DC {}' - .format(e.new_dc)) - - # TODO What happens with the background thread here? - # For normal use cases, this won't happen, because this will only - # be on the very first connection (not authorized, not running), - # but may be an issue for people who actually travel? - self._reconnect(new_dc=e.new_dc) - return self.invoke(*requests) - - except ConnectionResetError as e: - if self._connect_lock.locked(): - # We are connecting and we don't want to reconnect there... - raise - while self._user_connected and not self._reconnect(): - sleep(0.1) # Retry forever until we can send the request - - finally: - if sender != self._sender: - sender.disconnect() - - # Let people use client(SomeRequest()) instead client.invoke(...) - __call__ = invoke - # region Authorization requests - def is_user_authorized(self): - """Has the user been authorized yet - (code request sent and confirmed)?""" - return self._authorized - def send_code_request(self, phone): """Sends a code request to the specified phone number""" if isinstance(phone, int): @@ -992,73 +801,3 @@ class TelegramClient(TelegramBareClient): ) # endregion - - # region Updates handling - - def sync_updates(self): - """Synchronizes self.updates to their initial state. Will be - called automatically on connection if self.updates.enabled = True, - otherwise it should be called manually after enabling updates. - """ - self.updates.process(self(GetStateRequest())) - - def add_update_handler(self, handler): - """Adds an update handler (a function which takes a TLObject, - an update, as its parameter) and listens for updates""" - sync = not self.updates.handlers - self.updates.handlers.append(handler) - if sync: - self.sync_updates() - - def remove_update_handler(self, handler): - self.updates.handlers.remove(handler) - - def list_update_handlers(self): - return self.updates.handlers[:] - - # endregion - - # Constant read - - def _set_connected_and_authorized(self): - self._authorized = True - if self._recv_thread is None: - self._recv_thread = Thread( - name='ReadThread', daemon=True, - target=self._recv_thread_impl - ) - self._recv_thread.start() - - # By using this approach, another thread will be - # created and started upon connection to constantly read - # from the other end. Otherwise, manual calls to .receive() - # must be performed. The MtProtoSender cannot be connected, - # or an error will be thrown. - # - # This way, sending and receiving will be completely independent. - def _recv_thread_impl(self): - while self._user_connected: - try: - if datetime.now() > self._last_ping + self._ping_delay: - self._sender.send(PingRequest( - int.from_bytes(os.urandom(8), 'big', signed=True) - )) - self._last_ping = datetime.now() - - self._sender.receive(update_state=self.updates) - except TimeoutError: - # No problem. - pass - except ConnectionResetError: - self._logger.debug('Server disconnected us. Reconnecting...') - while self._user_connected and not self._reconnect(): - sleep(0.1) # Retry forever, this is instant messaging - - except Exception as e: - # Unknown exception, pass it to the main thread - self.updates.set_error(e) - break - - self._recv_thread = None - - # endregion From b87a798dd5678b5e2b71f7b572f5ff2b59f3edde Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 30 Sep 2017 10:12:01 +0200 Subject: [PATCH 04/18] Spawn new worker threads to handle updates instead using ReadThread --- telethon/update_state.py | 75 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 70 insertions(+), 5 deletions(-) diff --git a/telethon/update_state.py b/telethon/update_state.py index 4f3ebce7..314511d6 100644 --- a/telethon/update_state.py +++ b/telethon/update_state.py @@ -1,6 +1,7 @@ +import logging from collections import deque from datetime import datetime -from threading import RLock, Event +from threading import RLock, Event, Thread from .tl import types as tl @@ -11,14 +12,24 @@ class UpdateState: """ def __init__(self, polling): self._polling = polling + self._workers = 4 + self._worker_threads = [] + self.handlers = [] self._updates_lock = RLock() self._updates_available = Event() self._updates = deque() + self._logger = logging.getLogger(__name__) + # https://core.telegram.org/api/updates self._state = tl.updates.State(0, 0, datetime.now(), 0, 0) + # TODO Rename "polling" to some other variable + # that signifies "running background threads". + if polling: + self._setup_workers() + def can_poll(self): """Returns True if a call to .poll() won't lock""" return self._updates_available.is_set() @@ -39,17 +50,74 @@ class UpdateState: return update + # TODO How should this be handled with background worker threads? def get_polling(self): return self._polling def set_polling(self, polling): self._polling = polling - if not polling: + if polling: + self._setup_workers() + else: with self._updates_lock: self._updates.clear() + self._stop_workers() polling = property(fget=get_polling, fset=set_polling) + def get_workers(self): + return self._workers + + def set_workers(self, n): + self._stop_workers() + self._workers = n + self._setup_workers() + + workers = property(fget=get_workers, fset=set_workers) + + def _stop_workers(self): + """Raises "StopIterationException" on the worker threads to stop them, + and also clears all of them off the list + """ + if self._worker_threads: + pass + + self.set_error(StopIteration()) + for t in self._worker_threads: + t.join() + + self._worker_threads.clear() + + def _setup_workers(self): + if self._worker_threads: + # There already are workers + return + + for i in range(self._workers): + thread = Thread( + target=UpdateState._worker_loop, + name='UpdateWorker{}'.format(i), + daemon=True, + args=(self, i) + ) + self._worker_threads.append(thread) + thread.start() + + def _worker_loop(self, wid): + while True: + try: + update = self.poll() + # TODO Maybe people can add different handlers per update type + for handler in self.handlers: + handler(update) + except StopIteration: + break + except Exception as e: + # We don't want to crash a worker thread due to any reason + self._logger.debug( + '[ERROR] Unhandled exception on worker {}'.format(wid), e + ) + def set_error(self, error): """Sets an error, so that the next call to .poll() will raise it. Can be (and is) used to pass exceptions between threads. @@ -85,6 +153,3 @@ class UpdateState: if self._polling: self._updates.append(update) self._updates_available.set() - - for handler in self.handlers: - handler(update) From 9560bcc3240b7f94006f9476d79e49f2dad71f05 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 30 Sep 2017 10:24:54 +0200 Subject: [PATCH 05/18] Remove "if background thread" check as it isn't exposed anymore --- telethon/telegram_bare_client.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 38fd1555..e9eae399 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -373,12 +373,6 @@ class TelegramBareClient: The invoke will be retried up to 'retries' times before raising ValueError(). """ - # This is only valid when the read thread is reconnecting, - # that is, the connection lock is locked. - on_read_thread = self._on_read_thread() - if on_read_thread and not self._connect_lock.locked(): - return # Just ignore, we would be raising and crashing the thread - # Any error from a background thread will be "posted" and checked here self.updates.check_error() @@ -392,7 +386,7 @@ class TelegramBareClient: # Determine the sender to be used (main or a new connection) # TODO Polish this so it's nicer on_main_thread = threading.get_ident() == self._main_thread_ident - if on_main_thread or on_read_thread: + if on_main_thread or self._on_read_thread(): sender = self._sender else: conn = Connection( From 72b7e99222053e3a252a517dffa84f19751983e3 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 30 Sep 2017 10:59:33 +0200 Subject: [PATCH 06/18] Ensure the worker threads have updates once they acquire the lock --- telethon/update_state.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/telethon/update_state.py b/telethon/update_state.py index 314511d6..9239bd03 100644 --- a/telethon/update_state.py +++ b/telethon/update_state.py @@ -1,4 +1,5 @@ import logging +import threading from collections import deque from datetime import datetime from threading import RLock, Event, Thread @@ -41,6 +42,9 @@ class UpdateState: self._updates_available.wait() with self._updates_lock: + if not self._updates_available.is_set(): + return + update = self._updates.popleft() if not self._updates: self._updates_available.clear() From 7cef5885fa35d92a8b4c276e80a7d56a223e5ee2 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 30 Sep 2017 11:17:31 +0200 Subject: [PATCH 07/18] Rename process_updates/polling to workers --- telethon/telegram_bare_client.py | 4 +-- telethon/telegram_client.py | 14 ++++---- telethon/update_state.py | 61 ++++++++++++-------------------- 3 files changed, 32 insertions(+), 47 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index e9eae399..1a6cfebd 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -67,7 +67,7 @@ class TelegramBareClient: def __init__(self, session, api_id, api_hash, connection_mode=ConnectionMode.TCP_FULL, proxy=None, - process_updates=False, + update_workers=None, timeout=timedelta(seconds=5), **kwargs): """Refer to TelegramClient.__init__ for docs on this method""" @@ -108,7 +108,7 @@ class TelegramBareClient: # This member will process updates if enabled. # One may change self.updates.enabled at any later point. - self.updates = UpdateState(process_updates) + self.updates = UpdateState(workers=update_workers) # Used on connection - the user may modify these and reconnect kwargs['app_version'] = kwargs.get('app_version', self.__version__) diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index a28d2c62..b78fc68d 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -57,7 +57,7 @@ class TelegramClient(TelegramBareClient): def __init__(self, session, api_id, api_hash, connection_mode=ConnectionMode.TCP_FULL, proxy=None, - process_updates=False, + update_workers=None, timeout=timedelta(seconds=5), **kwargs): """Initializes the Telegram client with the specified API ID and Hash. @@ -71,11 +71,11 @@ class TelegramClient(TelegramBareClient): This will only affect how messages are sent over the network and how much processing is required before sending them. - If 'process_updates' is set to True, incoming updates will be - processed and you must manually call 'self.updates.poll()' from - another thread to retrieve the saved update objects, or your - memory will fill with these. You may modify the value of - 'self.updates.polling' at any later point. + The integer 'update_workers' represents depending on its value: + is None: Updates will *not* be stored in memory. + = 0: Another thread is responsible for calling self.updates.poll() + > 0: 'update_workers' background threads will be spawned, any + any of them will invoke all the self.updates.handlers. Despite the value of 'process_updates', if you later call '.add_update_handler(...)', updates will also be processed @@ -94,7 +94,7 @@ class TelegramClient(TelegramBareClient): session, api_id, api_hash, connection_mode=connection_mode, proxy=proxy, - process_updates=process_updates, + update_workers=update_workers, timeout=timeout ) diff --git a/telethon/update_state.py b/telethon/update_state.py index 9239bd03..cced0e1c 100644 --- a/telethon/update_state.py +++ b/telethon/update_state.py @@ -1,5 +1,4 @@ import logging -import threading from collections import deque from datetime import datetime from threading import RLock, Event, Thread @@ -11,9 +10,15 @@ class UpdateState: """Used to hold the current state of processed updates. To retrieve an update, .poll() should be called. """ - def __init__(self, polling): - self._polling = polling - self._workers = 4 + def __init__(self, workers=None): + """ + :param workers: This integer parameter has three possible cases: + workers is None: Updates will *not* be stored on self. + workers = 0: Another thread is responsible for calling self.poll() + workers > 0: 'workers' background threads will be spawned, any + any of them will invoke all the self.handlers. + """ + self._workers = workers self._worker_threads = [] self.handlers = [] @@ -25,11 +30,7 @@ class UpdateState: # https://core.telegram.org/api/updates self._state = tl.updates.State(0, 0, datetime.now(), 0, 0) - - # TODO Rename "polling" to some other variable - # that signifies "running background threads". - if polling: - self._setup_workers() + self._setup_workers() def can_poll(self): """Returns True if a call to .poll() won't lock""" @@ -37,9 +38,6 @@ class UpdateState: def poll(self): """Polls an update or blocks until an update object is available""" - if not self._polling: - raise ValueError('Updates are not being polled hence not saved.') - self._updates_available.wait() with self._updates_lock: if not self._updates_available.is_set(): @@ -54,28 +52,19 @@ class UpdateState: return update - # TODO How should this be handled with background worker threads? - def get_polling(self): - return self._polling - - def set_polling(self, polling): - self._polling = polling - if polling: - self._setup_workers() - else: - with self._updates_lock: - self._updates.clear() - self._stop_workers() - - polling = property(fget=get_polling, fset=set_polling) - def get_workers(self): return self._workers def set_workers(self, n): + """Changes the number of workers running. + If 'n is None', clears all pending updates from memory. + """ self._stop_workers() self._workers = n - self._setup_workers() + if n is None: + self._updates.clear() + else: + self._setup_workers() workers = property(fget=get_workers, fset=set_workers) @@ -83,9 +72,6 @@ class UpdateState: """Raises "StopIterationException" on the worker threads to stop them, and also clears all of them off the list """ - if self._worker_threads: - pass - self.set_error(StopIteration()) for t in self._worker_threads: t.join() @@ -93,8 +79,8 @@ class UpdateState: self._worker_threads.clear() def _setup_workers(self): - if self._worker_threads: - # There already are workers + if self._worker_threads or not self._workers: + # There already are workers, or workers is None or 0. Do nothing. return for i in range(self._workers): @@ -141,8 +127,8 @@ class UpdateState: """Processes an update object. This method is normally called by the library itself. """ - if not self._polling and not self.handlers: - return + if self._workers is None: + return # No processing needs to be done if nobody's working with self._updates_lock: if isinstance(update, tl.updates.State): @@ -154,6 +140,5 @@ class UpdateState: return # We already handled this update self._state.pts = pts - if self._polling: - self._updates.append(update) - self._updates_available.set() + self._updates.append(update) + self._updates_available.set() From a3ae56ca9e670d606785f0a93dbaf052ed32d5f2 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 30 Sep 2017 11:21:07 +0200 Subject: [PATCH 08/18] Use a timeout when worker threads are polling --- telethon/update_state.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/telethon/update_state.py b/telethon/update_state.py index cced0e1c..4402fb14 100644 --- a/telethon/update_state.py +++ b/telethon/update_state.py @@ -10,6 +10,8 @@ class UpdateState: """Used to hold the current state of processed updates. To retrieve an update, .poll() should be called. """ + WORKER_POLL_TIMEOUT = 5.0 # Avoid waiting forever on the workers + def __init__(self, workers=None): """ :param workers: This integer parameter has three possible cases: @@ -36,9 +38,14 @@ class UpdateState: """Returns True if a call to .poll() won't lock""" return self._updates_available.is_set() - def poll(self): - """Polls an update or blocks until an update object is available""" - self._updates_available.wait() + def poll(self, timeout=None): + """Polls an update or blocks until an update object is available. + If 'timeout is not None', it should be a floating point value, + and the method will 'return None' if waiting times out. + """ + if not self._updates_available.wait(timeout=timeout): + return + with self._updates_lock: if not self._updates_available.is_set(): return @@ -96,10 +103,11 @@ class UpdateState: def _worker_loop(self, wid): while True: try: - update = self.poll() + update = self.poll(timeout=UpdateState.WORKER_POLL_TIMEOUT) # TODO Maybe people can add different handlers per update type - for handler in self.handlers: - handler(update) + if update: + for handler in self.handlers: + handler(update) except StopIteration: break except Exception as e: From 61033b2f564171302868baa5b021f96be3b54c68 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 30 Sep 2017 11:28:15 +0200 Subject: [PATCH 09/18] Allow disabling spawning a second thread --- telethon/telegram_bare_client.py | 7 +++++-- telethon/telegram_client.py | 14 +++++++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 1a6cfebd..251c927e 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -68,6 +68,7 @@ class TelegramBareClient: connection_mode=ConnectionMode.TCP_FULL, proxy=None, update_workers=None, + spawn_read_thread=True, timeout=timedelta(seconds=5), **kwargs): """Refer to TelegramClient.__init__ for docs on this method""" @@ -131,7 +132,9 @@ class TelegramBareClient: # Uploaded files cache so subsequent calls are instant self._upload_cache = {} - # Constantly read for results and updates from within the main client + # Constantly read for results and updates from within the main client, + # if the user has left enabled such option. + self._spawn_read_thread = spawn_read_thread self._recv_thread = None # Identifier of the main thread (the one that called .connect()). @@ -704,7 +707,7 @@ class TelegramBareClient: def _set_connected_and_authorized(self): self._authorized = True - if self._recv_thread is None: + if self._spawn_read_thread and self._recv_thread is None: self._recv_thread = threading.Thread( name='ReadThread', daemon=True, target=self._recv_thread_impl diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index b78fc68d..a053ed95 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -59,6 +59,7 @@ class TelegramClient(TelegramBareClient): proxy=None, update_workers=None, timeout=timedelta(seconds=5), + spawn_read_thread=True, **kwargs): """Initializes the Telegram client with the specified API ID and Hash. @@ -77,9 +78,15 @@ class TelegramClient(TelegramBareClient): > 0: 'update_workers' background threads will be spawned, any any of them will invoke all the self.updates.handlers. - Despite the value of 'process_updates', if you later call - '.add_update_handler(...)', updates will also be processed - and the update objects will be passed to the handlers you added. + If 'spawn_read_thread', a background thread will be started once + an authorized user has been logged in to Telegram to read items + (such as updates and responses) from the network as soon as they + occur, which will speed things up. + + If you don't want to spawn any additional threads, pending updates + will be read and processed accordingly after invoking a request + and not immediately. This is useful if you don't care about updates + at all and have set 'update_workers=None'. If more named arguments are provided as **kwargs, they will be used to update the Session instance. Most common settings are: @@ -95,6 +102,7 @@ class TelegramClient(TelegramBareClient): connection_mode=connection_mode, proxy=proxy, update_workers=update_workers, + spawn_read_thread=spawn_read_thread, timeout=timeout ) From 003e23123960b8877960cd814ebd14b50dbba88b Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 30 Sep 2017 11:45:35 +0200 Subject: [PATCH 10/18] Attempt at cleaning up TelegramBareClient.invoke() --- telethon/telegram_bare_client.py | 45 +++++++++++++++++++------------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 251c927e..010ae086 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -91,6 +91,10 @@ class TelegramBareClient: if self.api_id < 20: # official apps must use obfuscated connection_mode = ConnectionMode.TCP_OBFUSCATED + # This is the main sender, which will be used from the thread + # that calls .connect(). Every other thread will spawn a new + # temporary connection. The connection on this one is always + # kept open so Telegram can send us updates. self._sender = MtProtoSender(self.session, Connection( self.session.server_address, self.session.port, mode=connection_mode, proxy=proxy, timeout=timeout @@ -370,7 +374,7 @@ class TelegramBareClient: # region Invoking Telegram requests - def invoke(self, *requests, call_receive=True, retries=5): + def invoke(self, *requests, retries=5): """Invokes (sends) a MTProtoRequest and returns (receives) its result. The invoke will be retried up to 'retries' times before raising @@ -383,9 +387,6 @@ class TelegramBareClient: x.content_related for x in requests): raise ValueError('You can only invoke requests, not types!') - if retries <= 0: - raise ValueError('Number of retries reached 0.') - # Determine the sender to be used (main or a new connection) # TODO Polish this so it's nicer on_main_thread = threading.get_ident() == self._main_thread_ident @@ -401,6 +402,25 @@ class TelegramBareClient: sender = MtProtoSender(self.session, conn) sender.connect() + # We should call receive from this thread if there's no background + # thread reading or if the server disconnected us and we're trying + # to reconnect. This is because the read thread may either be + # locked also trying to reconnect or we may be said thread already. + call_receive = not on_main_thread or \ + self._recv_thread is None or self._connect_lock.locked() + try: + for _ in range(retries): + result = self._invoke(sender, call_receive, *requests) + if result: + return result + + if retries <= 0: + raise ValueError('Number of retries reached 0.') + finally: + if sender != self._sender: + sender.disconnect() # Close temporary connections + + def _invoke(self, sender, call_receive, *requests): try: # Ensure that we start with no previous errors (i.e. resending) for x in requests: @@ -409,13 +429,6 @@ class TelegramBareClient: sender.send(*requests) - # We should call receive from this thread if there's no background - # thread reading or if the server disconnected us and we're trying - # to reconnect. This is because the read thread may either be - # locked also trying to reconnect or we may be said thread already. - call_receive = not on_main_thread or \ - self._recv_thread is None or self._connect_lock.locked() - if not call_receive: # TODO This will be slightly troublesome if we allow # switching between constant read or not on the fly. @@ -441,9 +454,7 @@ class TelegramBareClient: # be on the very first connection (not authorized, not running), # but may be an issue for people who actually travel? self._reconnect(new_dc=e.new_dc) - return self.invoke( - *requests, call_receive=call_receive, retries=(retries - 1) - ) + return self._invoke(sender, call_receive, *requests) except TimeoutError: pass # We will just retry @@ -477,10 +488,8 @@ class TelegramBareClient: except StopIteration: if any(x.result is None for x in requests): # "A container may only be accepted or - # rejected by the other party as a whole." - return self.invoke( - *requests, call_receive=call_receive, retries=(retries - 1) - ) + # rejected by the other party as a whole." + return None elif len(requests) == 1: return requests[0].result else: From 5da300ca8412686c0d6cfff2755eb1bc15cab7e7 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 30 Sep 2017 11:49:38 +0200 Subject: [PATCH 11/18] Make MtProtoSender not thread-safe Rationale: a new connection should be spawned if one desires to send and receive requests in parallel, which would otherwise cause one of either threads to lock. --- telethon/network/mtproto_sender.py | 41 ++++++++++++++---------------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 83ac4a43..772aa213 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -1,7 +1,6 @@ import gzip import logging import struct -from threading import RLock from .. import helpers as utils from ..crypto import AES @@ -20,7 +19,12 @@ logging.getLogger(__name__).addHandler(logging.NullHandler()) class MtProtoSender: """MTProto Mobile Protocol sender - (https://core.telegram.org/mtproto/description) + (https://core.telegram.org/mtproto/description). + + Note that this class is not thread-safe, and calling send/receive + from two or more threads at the same time is undefined behaviour. + Rationale: a new connection should be spawned to send/receive requests + in parallel, so thread-safety (hence locking) isn't needed. """ def __init__(self, session, connection): @@ -37,11 +41,6 @@ class MtProtoSender: # Requests (as msg_id: Message) sent waiting to be received self._pending_receive = {} - # Sending and receiving are independent, but two threads cannot - # send or receive at the same time no matter what. - self._send_lock = RLock() - self._recv_lock = RLock() - def connect(self): """Connects to the server""" self.connection.connect() @@ -93,19 +92,18 @@ class MtProtoSender: Any unhandled object (likely updates) will be passed to update_state.process(TLObject). """ - with self._recv_lock: - try: - body = self.connection.recv() - except (BufferError, InvalidChecksumError): - # TODO BufferError, we should spot the cause... - # "No more bytes left"; something wrong happened, clear - # everything to be on the safe side, or: - # - # "This packet should be skipped"; since this may have - # been a result for a request, invalidate every request - # and just re-invoke them to avoid problems - self._clear_all_pending() - return + try: + body = self.connection.recv() + except (BufferError, InvalidChecksumError): + # TODO BufferError, we should spot the cause... + # "No more bytes left"; something wrong happened, clear + # everything to be on the safe side, or: + # + # "This packet should be skipped"; since this may have + # been a result for a request, invalidate every request + # and just re-invoke them to avoid problems + self._clear_all_pending() + return message, remote_msg_id, remote_seq = self._decode_msg(body) with BinaryReader(message) as reader: @@ -128,8 +126,7 @@ class MtProtoSender: cipher_text = AES.encrypt_ige(plain_text, key, iv) result = key_id + msg_key + cipher_text - with self._send_lock: - self.connection.send(result) + self.connection.send(result) def _decode_msg(self, body): """Decodes an received encrypted message body bytes""" From 0a567fcd7c39dfb3d8e590d37c38ac0c248bdd1d Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 30 Sep 2017 12:08:06 +0200 Subject: [PATCH 12/18] Make creating a new sender cleaner --- telethon/network/connection.py | 7 +++++++ telethon/network/mtproto_sender.py | 4 ++++ telethon/telegram_bare_client.py | 9 +-------- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/telethon/network/connection.py b/telethon/network/connection.py index d333b8ff..28c548eb 100644 --- a/telethon/network/connection.py +++ b/telethon/network/connection.py @@ -130,6 +130,13 @@ class Connection: def close(self): self.conn.close() + def clone(self): + """Creates a copy of this Connection""" + return Connection(self.ip, self.port, + mode=self._mode, + proxy=self.conn.proxy, + timeout=self.conn.timeout) + # region Receive message implementations def recv(self): diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 772aa213..6558a20c 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -54,6 +54,10 @@ class MtProtoSender: self._need_confirmation.clear() self._clear_all_pending() + def clone(self): + """Creates a copy of this MtProtoSender as a new connection""" + return MtProtoSender(self.session, self.connection.clone()) + # region Send and receive def send(self, *requests): diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 010ae086..04398826 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -388,18 +388,11 @@ class TelegramBareClient: raise ValueError('You can only invoke requests, not types!') # Determine the sender to be used (main or a new connection) - # TODO Polish this so it's nicer on_main_thread = threading.get_ident() == self._main_thread_ident if on_main_thread or self._on_read_thread(): sender = self._sender else: - conn = Connection( - self.session.server_address, self.session.port, - mode=self._sender.connection._mode, - proxy=self._sender.connection.conn.proxy, - timeout=self._sender.connection.get_timeout() - ) - sender = MtProtoSender(self.session, conn) + sender = self._sender.clone() sender.connect() # We should call receive from this thread if there's no background From 18e485ded2cae368ed86bb173df5c9c2a04abada Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 30 Sep 2017 15:53:47 +0200 Subject: [PATCH 13/18] Set default TelegramBareClient behaviour to not spawn ReadThread --- telethon/telegram_bare_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 04398826..4355be3c 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -68,7 +68,7 @@ class TelegramBareClient: connection_mode=ConnectionMode.TCP_FULL, proxy=None, update_workers=None, - spawn_read_thread=True, + spawn_read_thread=False, timeout=timedelta(seconds=5), **kwargs): """Refer to TelegramClient.__init__ for docs on this method""" From 8ecd2c2e060db034900a18e434fa89a3e3334821 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 30 Sep 2017 16:11:16 +0200 Subject: [PATCH 14/18] Call .sync_updates on .connect iff exported_auth is None Calling this method on exported clients would trigger a UserMigrateError because it was being used on a non-native data center. For .connects like this, don't attempt syncing updates. --- telethon/telegram_bare_client.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 4355be3c..cf26c465 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -201,13 +201,16 @@ class TelegramBareClient: self(GetConfigRequest()).dc_options # Connection was successful! Try syncing the update state + # IF we don't have an exported authorization (hence we're + # not in our NATIVE data center or we'd get UserMigrateError) # to also assert whether the user is logged in or not. self._user_connected = True - try: - self.sync_updates() - self._set_connected_and_authorized() - except UnauthorizedError: - self._authorized = False + if not exported_auth: + try: + self.sync_updates() + self._set_connected_and_authorized() + except UnauthorizedError: + self._authorized = False return True From c1c6df9fd0bfc9a20c7eb6864c697d9f38ce81b6 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 30 Sep 2017 16:18:16 +0200 Subject: [PATCH 15/18] Fix invoke not raising ValueError when retries reach 0 --- telethon/telegram_bare_client.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index cf26c465..d027a358 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -410,8 +410,7 @@ class TelegramBareClient: if result: return result - if retries <= 0: - raise ValueError('Number of retries reached 0.') + raise ValueError('Number of retries reached 0.') finally: if sender != self._sender: sender.disconnect() # Close temporary connections From a35c4b15dba34415617ee798d4d38c993f6ab97a Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 30 Sep 2017 16:32:10 +0200 Subject: [PATCH 16/18] Cache exported Sessions instead whole clients --- telethon/telegram_bare_client.py | 97 ++++++++++++++++---------------- 1 file changed, 49 insertions(+), 48 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index d027a358..f7a3933d 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -106,10 +106,10 @@ class TelegramBareClient: # we only want one to actually perform the reconnection. self._connect_lock = Lock() - # Cache "exported" senders 'dc_id: TelegramBareClient' and - # their corresponding sessions not to recreate them all - # the time since it's a (somewhat expensive) process. - self._cached_clients = {} + # Cache "exported" sessions as 'dc_id: Session' not to recreate + # them all the time since generating a new key is a relatively + # expensive operation. + self._exported_sessions = {} # This member will process updates if enabled. # One may change self.updates.enabled at any later point. @@ -154,14 +154,22 @@ class TelegramBareClient: # region Connecting - def connect(self, exported_auth=None): + def connect(self, _exported_auth=None, _sync_updates=True): """Connects to the Telegram servers, executing authentication if required. Note that authenticating to the Telegram servers is not the same as authenticating the desired user itself, which may require a call (or several) to 'sign_in' for the first time. - If 'exported_auth' is not None, it will be used instead to + Note that the optional parameters are meant for internal use. + + If '_exported_auth' is not None, it will be used instead to determine the authorization key for the current session. + + If '_sync_updates', sync_updates() will be called and a + second thread will be started if necessary. Note that this + will FAIL if the client is not connected to the user's + native data center, raising a "UserMigrateError", and + calling .disconnect() in the process. """ self._main_thread_ident = threading.get_ident() @@ -183,17 +191,17 @@ class TelegramBareClient: init_connection = self.session.layer != LAYER if init_connection: - if exported_auth is not None: + if _exported_auth is not None: self._init_connection(ImportAuthorizationRequest( - exported_auth.id, exported_auth.bytes + _exported_auth.id, _exported_auth.bytes )) else: TelegramBareClient._dc_options = \ self._init_connection(GetConfigRequest()).dc_options - elif exported_auth is not None: + elif _exported_auth is not None: self(ImportAuthorizationRequest( - exported_auth.id, exported_auth.bytes + _exported_auth.id, _exported_auth.bytes )) if TelegramBareClient._dc_options is None: @@ -201,11 +209,11 @@ class TelegramBareClient: self(GetConfigRequest()).dc_options # Connection was successful! Try syncing the update state - # IF we don't have an exported authorization (hence we're - # not in our NATIVE data center or we'd get UserMigrateError) + # UNLESS '_sync_updates' is False (we probably are in + # another data center and this would raise UserMigrateError) # to also assert whether the user is logged in or not. self._user_connected = True - if not exported_auth: + if _sync_updates: try: self.sync_updates() self._set_connected_and_authorized() @@ -218,7 +226,10 @@ class TelegramBareClient: # This is fine, probably layer migration self._logger.debug('Found invalid item, probably migrating', e) self.disconnect() - return self.connect(exported_auth=exported_auth) + return self.connect( + _exported_auth=_exported_auth, + _sync_updates=_sync_updates + ) except (RPCError, ConnectionError) as error: # Probably errors from the previous session, ignore them @@ -258,11 +269,8 @@ class TelegramBareClient: # ._recv_thread = None, it knows it doesn't have to. self._sender.disconnect() - # Also disconnect all the cached senders - for sender in self._cached_clients.values(): - sender.disconnect() - - self._cached_clients.clear() + # TODO Shall we clear the _exported_sessions, or may be reused? + pass def _reconnect(self, new_dc=None): """If 'new_dc' is not set, only a call to .connect() will be made @@ -324,30 +332,22 @@ class TelegramBareClient: TelegramBareClient._dc_options = self(GetConfigRequest()).dc_options return self._get_dc(dc_id, ipv6=ipv6, cdn=cdn) - def _get_exported_client(self, dc_id, - init_connection=False, - bypass_cache=False): - """Gets a cached exported TelegramBareClient for the desired DC. + def _get_exported_client(self, dc_id): + """Creates and connects a new TelegramBareClient for the desired DC. - If it's the first time retrieving the TelegramBareClient, the - current authorization is exported to the new DC so that - it can be used there, and the connection is initialized. - - If after using the sender a ConnectionResetError is raised, - this method should be called again with init_connection=True - in order to perform the reconnection. - - If bypass_cache is True, a new client will be exported and - it will not be cached. + If it's the first time calling the method with a given dc_id, + a new session will be first created, and its auth key generated. + Exporting/Importing the authorization will also be done so that + the auth is bound with the key. """ # Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt # for clearly showing how to export the authorization! ^^ - client = self._cached_clients.get(dc_id) - if client and not bypass_cache: - if init_connection: - client.reconnect() - return client + session = self._exported_sessions.get(dc_id) + if session: + export_auth = None # Already bound with the auth key else: + # TODO Add a lock, don't allow two threads to create an auth key + # for the same data center. dc = self._get_dc(dc_id) # Export the current authorization to the new DC. @@ -361,17 +361,15 @@ class TelegramBareClient: session = Session(self.session) session.server_address = dc.ip_address session.port = dc.port - client = TelegramBareClient( - session, self.api_id, self.api_hash, - proxy=self._sender.connection.conn.proxy, - timeout=self._sender.connection.get_timeout() - ) - client.connect(exported_auth=export_auth) + self._exported_sessions[dc_id] = session - if not bypass_cache: - # Don't go through this expensive process every time. - self._cached_clients[dc_id] = client - return client + client = TelegramBareClient( + session, self.api_id, self.api_hash, + proxy=self._sender.connection.conn.proxy, + timeout=self._sender.connection.get_timeout() + ) + client.connect(_exported_auth=export_auth, _sync_updates=False) + return client # endregion @@ -672,6 +670,9 @@ class TelegramBareClient: if progress_callback: progress_callback(f.tell(), file_size) finally: + if client != self: + client.disconnect() + if cdn_decrypter: try: cdn_decrypter.client.disconnect() From d28f370ab6bc2b7e409c6d87882cedd08cdb0bb3 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 30 Sep 2017 17:51:07 +0200 Subject: [PATCH 17/18] Add ._get_cdn_client as alternative ._get_exported_client version --- telethon/crypto/cdn_decrypter.py | 32 ++++++--------------------- telethon/telegram_bare_client.py | 37 +++++++++++++++++++++++++++----- 2 files changed, 38 insertions(+), 31 deletions(-) diff --git a/telethon/crypto/cdn_decrypter.py b/telethon/crypto/cdn_decrypter.py index d6628d58..f4b0ef3e 100644 --- a/telethon/crypto/cdn_decrypter.py +++ b/telethon/crypto/cdn_decrypter.py @@ -10,7 +10,7 @@ from ..errors import CdnFileTamperedError class CdnDecrypter: """Used when downloading a file results in a 'FileCdnRedirect' to both prepare the redirect, decrypt the file as it downloads, and - ensure the file hasn't been tampered. + ensure the file hasn't been tampered. https://core.telegram.org/cdn """ def __init__(self, cdn_client, file_token, cdn_aes, cdn_file_hashes): self.client = cdn_client @@ -19,46 +19,26 @@ class CdnDecrypter: self.cdn_file_hashes = cdn_file_hashes @staticmethod - def prepare_decrypter(client, client_cls, cdn_redirect): + def prepare_decrypter(client, cdn_client, cdn_redirect): """Prepares a CDN decrypter, returning (decrypter, file data). - 'client' should be the original TelegramBareClient that - tried to download the file. - - 'client_cls' should be the class of the TelegramBareClient. + 'client' should be an existing client not connected to a CDN. + 'cdn_client' should be an already-connected TelegramBareClient + with the auth key already created. """ - # TODO Avoid the need for 'client_cls=TelegramBareClient' - # https://core.telegram.org/cdn cdn_aes = AESModeCTR( key=cdn_redirect.encryption_key, # 12 first bytes of the IV..4 bytes of the offset (0, big endian) iv=cdn_redirect.encryption_iv[:12] + bytes(4) ) - # Create a new client on said CDN - dc = client._get_dc(cdn_redirect.dc_id, cdn=True) - session = Session(client.session) - session.server_address = dc.ip_address - session.port = dc.port - cdn_client = client_cls( # Avoid importing TelegramBareClient - session, client.api_id, client.api_hash, - timeout=client._sender.connection.get_timeout() - ) - # This will make use of the new RSA keys for this specific CDN. - # # We assume that cdn_redirect.cdn_file_hashes are ordered by offset, # and that there will be enough of these to retrieve the whole file. - # - # This relies on the fact that TelegramBareClient._dc_options is - # static and it won't be called from this DC (it would fail). - cdn_client.connect() - - # CDN client is ready, create the resulting CdnDecrypter decrypter = CdnDecrypter( cdn_client, cdn_redirect.file_token, cdn_aes, cdn_redirect.cdn_file_hashes ) - cdn_file = client(GetCdnFileRequest( + cdn_file = cdn_client(GetCdnFileRequest( file_token=cdn_redirect.file_token, offset=cdn_redirect.cdn_file_hashes[0].offset, limit=cdn_redirect.cdn_file_hashes[0].limit diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index f7a3933d..dd075c9b 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -154,7 +154,7 @@ class TelegramBareClient: # region Connecting - def connect(self, _exported_auth=None, _sync_updates=True): + def connect(self, _exported_auth=None, _sync_updates=True, cdn=False): """Connects to the Telegram servers, executing authentication if required. Note that authenticating to the Telegram servers is not the same as authenticating the desired user itself, which @@ -170,6 +170,9 @@ class TelegramBareClient: will FAIL if the client is not connected to the user's native data center, raising a "UserMigrateError", and calling .disconnect() in the process. + + If 'cdn' is False, methods that are not allowed on such data + centers won't be invoked. """ self._main_thread_ident = threading.get_ident() @@ -195,7 +198,7 @@ class TelegramBareClient: self._init_connection(ImportAuthorizationRequest( _exported_auth.id, _exported_auth.bytes )) - else: + elif not cdn: TelegramBareClient._dc_options = \ self._init_connection(GetConfigRequest()).dc_options @@ -204,7 +207,7 @@ class TelegramBareClient: _exported_auth.id, _exported_auth.bytes )) - if TelegramBareClient._dc_options is None: + if TelegramBareClient._dc_options is None and not cdn: TelegramBareClient._dc_options = \ self(GetConfigRequest()).dc_options @@ -213,7 +216,7 @@ class TelegramBareClient: # another data center and this would raise UserMigrateError) # to also assert whether the user is logged in or not. self._user_connected = True - if _sync_updates: + if _sync_updates and not cdn: try: self.sync_updates() self._set_connected_and_authorized() @@ -347,6 +350,7 @@ class TelegramBareClient: export_auth = None # Already bound with the auth key else: # TODO Add a lock, don't allow two threads to create an auth key + # (when calling .connect() if there wasn't a previous session). # for the same data center. dc = self._get_dc(dc_id) @@ -371,6 +375,29 @@ class TelegramBareClient: client.connect(_exported_auth=export_auth, _sync_updates=False) return client + def _get_cdn_client(self, cdn_redirect): + """Similar to ._get_exported_client, but for CDNs""" + session = self._exported_sessions.get(cdn_redirect.dc_id) + if not session: + dc = self._get_dc(cdn_redirect.dc_id, cdn=True) + session = Session(self.session) + session.server_address = dc.ip_address + session.port = dc.port + self._exported_sessions[cdn_redirect.dc_id] = session + + client = TelegramBareClient( + session, self.api_id, self.api_hash, + proxy=self._sender.connection.conn.proxy, + timeout=self._sender.connection.get_timeout() + ) + + # This will make use of the new RSA keys for this specific CDN. + # + # This relies on the fact that TelegramBareClient._dc_options is + # static and it won't be called from this DC (it would fail). + client.connect(cdn=True) # Avoid invoking non-CDN specific methods + return client + # endregion # region Invoking Telegram requests @@ -651,7 +678,7 @@ class TelegramBareClient: if isinstance(result, FileCdnRedirect): cdn_decrypter, result = \ CdnDecrypter.prepare_decrypter( - client, TelegramBareClient, result + client, self._get_cdn_client(result), result ) except FileMigrateError as e: From 4cd7e1711e01c201bd268a7860d557fdedf481ef Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 30 Sep 2017 17:56:42 +0200 Subject: [PATCH 18/18] Rename cdn parameter to _cdn --- telethon/telegram_bare_client.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index dd075c9b..54ebe095 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -154,7 +154,7 @@ class TelegramBareClient: # region Connecting - def connect(self, _exported_auth=None, _sync_updates=True, cdn=False): + def connect(self, _exported_auth=None, _sync_updates=True, _cdn=False): """Connects to the Telegram servers, executing authentication if required. Note that authenticating to the Telegram servers is not the same as authenticating the desired user itself, which @@ -171,7 +171,7 @@ class TelegramBareClient: native data center, raising a "UserMigrateError", and calling .disconnect() in the process. - If 'cdn' is False, methods that are not allowed on such data + If '_cdn' is False, methods that are not allowed on such data centers won't be invoked. """ self._main_thread_ident = threading.get_ident() @@ -198,7 +198,7 @@ class TelegramBareClient: self._init_connection(ImportAuthorizationRequest( _exported_auth.id, _exported_auth.bytes )) - elif not cdn: + elif not _cdn: TelegramBareClient._dc_options = \ self._init_connection(GetConfigRequest()).dc_options @@ -207,7 +207,7 @@ class TelegramBareClient: _exported_auth.id, _exported_auth.bytes )) - if TelegramBareClient._dc_options is None and not cdn: + if TelegramBareClient._dc_options is None and not _cdn: TelegramBareClient._dc_options = \ self(GetConfigRequest()).dc_options @@ -216,7 +216,7 @@ class TelegramBareClient: # another data center and this would raise UserMigrateError) # to also assert whether the user is logged in or not. self._user_connected = True - if _sync_updates and not cdn: + if _sync_updates and not _cdn: try: self.sync_updates() self._set_connected_and_authorized() @@ -231,7 +231,8 @@ class TelegramBareClient: self.disconnect() return self.connect( _exported_auth=_exported_auth, - _sync_updates=_sync_updates + _sync_updates=_sync_updates, + _cdn=_cdn ) except (RPCError, ConnectionError) as error: