diff --git a/telethon/crypto/cdn_decrypter.py b/telethon/crypto/cdn_decrypter.py index d6628d58..f4b0ef3e 100644 --- a/telethon/crypto/cdn_decrypter.py +++ b/telethon/crypto/cdn_decrypter.py @@ -10,7 +10,7 @@ from ..errors import CdnFileTamperedError class CdnDecrypter: """Used when downloading a file results in a 'FileCdnRedirect' to both prepare the redirect, decrypt the file as it downloads, and - ensure the file hasn't been tampered. + ensure the file hasn't been tampered. https://core.telegram.org/cdn """ def __init__(self, cdn_client, file_token, cdn_aes, cdn_file_hashes): self.client = cdn_client @@ -19,46 +19,26 @@ class CdnDecrypter: self.cdn_file_hashes = cdn_file_hashes @staticmethod - def prepare_decrypter(client, client_cls, cdn_redirect): + def prepare_decrypter(client, cdn_client, cdn_redirect): """Prepares a CDN decrypter, returning (decrypter, file data). - 'client' should be the original TelegramBareClient that - tried to download the file. - - 'client_cls' should be the class of the TelegramBareClient. + 'client' should be an existing client not connected to a CDN. + 'cdn_client' should be an already-connected TelegramBareClient + with the auth key already created. """ - # TODO Avoid the need for 'client_cls=TelegramBareClient' - # https://core.telegram.org/cdn cdn_aes = AESModeCTR( key=cdn_redirect.encryption_key, # 12 first bytes of the IV..4 bytes of the offset (0, big endian) iv=cdn_redirect.encryption_iv[:12] + bytes(4) ) - # Create a new client on said CDN - dc = client._get_dc(cdn_redirect.dc_id, cdn=True) - session = Session(client.session) - session.server_address = dc.ip_address - session.port = dc.port - cdn_client = client_cls( # Avoid importing TelegramBareClient - session, client.api_id, client.api_hash, - timeout=client._sender.connection.get_timeout() - ) - # This will make use of the new RSA keys for this specific CDN. - # # We assume that cdn_redirect.cdn_file_hashes are ordered by offset, # and that there will be enough of these to retrieve the whole file. - # - # This relies on the fact that TelegramBareClient._dc_options is - # static and it won't be called from this DC (it would fail). - cdn_client.connect() - - # CDN client is ready, create the resulting CdnDecrypter decrypter = CdnDecrypter( cdn_client, cdn_redirect.file_token, cdn_aes, cdn_redirect.cdn_file_hashes ) - cdn_file = client(GetCdnFileRequest( + cdn_file = cdn_client(GetCdnFileRequest( file_token=cdn_redirect.file_token, offset=cdn_redirect.cdn_file_hashes[0].offset, limit=cdn_redirect.cdn_file_hashes[0].limit diff --git a/telethon/network/connection.py b/telethon/network/connection.py index d333b8ff..28c548eb 100644 --- a/telethon/network/connection.py +++ b/telethon/network/connection.py @@ -130,6 +130,13 @@ class Connection: def close(self): self.conn.close() + def clone(self): + """Creates a copy of this Connection""" + return Connection(self.ip, self.port, + mode=self._mode, + proxy=self.conn.proxy, + timeout=self.conn.timeout) + # region Receive message implementations def recv(self): diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 83ac4a43..6558a20c 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -1,7 +1,6 @@ import gzip import logging import struct -from threading import RLock from .. import helpers as utils from ..crypto import AES @@ -20,7 +19,12 @@ logging.getLogger(__name__).addHandler(logging.NullHandler()) class MtProtoSender: """MTProto Mobile Protocol sender - (https://core.telegram.org/mtproto/description) + (https://core.telegram.org/mtproto/description). + + Note that this class is not thread-safe, and calling send/receive + from two or more threads at the same time is undefined behaviour. + Rationale: a new connection should be spawned to send/receive requests + in parallel, so thread-safety (hence locking) isn't needed. """ def __init__(self, session, connection): @@ -37,11 +41,6 @@ class MtProtoSender: # Requests (as msg_id: Message) sent waiting to be received self._pending_receive = {} - # Sending and receiving are independent, but two threads cannot - # send or receive at the same time no matter what. - self._send_lock = RLock() - self._recv_lock = RLock() - def connect(self): """Connects to the server""" self.connection.connect() @@ -55,6 +54,10 @@ class MtProtoSender: self._need_confirmation.clear() self._clear_all_pending() + def clone(self): + """Creates a copy of this MtProtoSender as a new connection""" + return MtProtoSender(self.session, self.connection.clone()) + # region Send and receive def send(self, *requests): @@ -93,19 +96,18 @@ class MtProtoSender: Any unhandled object (likely updates) will be passed to update_state.process(TLObject). """ - with self._recv_lock: - try: - body = self.connection.recv() - except (BufferError, InvalidChecksumError): - # TODO BufferError, we should spot the cause... - # "No more bytes left"; something wrong happened, clear - # everything to be on the safe side, or: - # - # "This packet should be skipped"; since this may have - # been a result for a request, invalidate every request - # and just re-invoke them to avoid problems - self._clear_all_pending() - return + try: + body = self.connection.recv() + except (BufferError, InvalidChecksumError): + # TODO BufferError, we should spot the cause... + # "No more bytes left"; something wrong happened, clear + # everything to be on the safe side, or: + # + # "This packet should be skipped"; since this may have + # been a result for a request, invalidate every request + # and just re-invoke them to avoid problems + self._clear_all_pending() + return message, remote_msg_id, remote_seq = self._decode_msg(body) with BinaryReader(message) as reader: @@ -128,8 +130,7 @@ class MtProtoSender: cipher_text = AES.encrypt_ige(plain_text, key, iv) result = key_id + msg_key + cipher_text - with self._send_lock: - self.connection.send(result) + self.connection.send(result) def _decode_msg(self, body): """Decodes an received encrypted message body bytes""" diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index f4d3d8da..54ebe095 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -1,21 +1,24 @@ import logging -from datetime import timedelta +import os +import threading +from datetime import timedelta, datetime from hashlib import md5 from io import BytesIO -from os import path from threading import Lock +from time import sleep from . import helpers as utils from .crypto import rsa, CdnDecrypter from .errors import ( RPCError, BrokenAuthKeyError, - FloodWaitError, FileMigrateError, TypeNotFoundError + FloodWaitError, FileMigrateError, TypeNotFoundError, + UnauthorizedError, PhoneMigrateError, NetworkMigrateError, UserMigrateError ) from .network import authenticator, MtProtoSender, Connection, ConnectionMode from .tl import TLObject, Session from .tl.all_tlobjects import LAYER from .tl.functions import ( - InitConnectionRequest, InvokeWithLayerRequest + InitConnectionRequest, InvokeWithLayerRequest, PingRequest ) from .tl.functions.auth import ( ImportAuthorizationRequest, ExportAuthorizationRequest @@ -23,6 +26,7 @@ from .tl.functions.auth import ( from .tl.functions.help import ( GetCdnConfigRequest, GetConfigRequest ) +from .tl.functions.updates import GetStateRequest from .tl.functions.upload import ( GetFileRequest, SaveBigFilePartRequest, SaveFilePartRequest ) @@ -63,18 +67,34 @@ class TelegramBareClient: def __init__(self, session, api_id, api_hash, connection_mode=ConnectionMode.TCP_FULL, proxy=None, - process_updates=False, - timeout=timedelta(seconds=5)): - """Initializes the Telegram client with the specified API ID and Hash. - Session must always be a Session instance, and an optional proxy - can also be specified to be used on the connection. - """ + update_workers=None, + spawn_read_thread=False, + timeout=timedelta(seconds=5), + **kwargs): + """Refer to TelegramClient.__init__ for docs on this method""" + if not api_id or not api_hash: + raise PermissionError( + "Your API ID or Hash cannot be empty or None. " + "Refer to Telethon's README.rst for more information.") + + # Determine what session object we have + if isinstance(session, str) or session is None: + session = Session.try_load_or_create_new(session) + elif not isinstance(session, Session): + raise ValueError( + 'The given session must be a str or a Session instance.' + ) + self.session = session self.api_id = int(api_id) self.api_hash = api_hash if self.api_id < 20: # official apps must use obfuscated connection_mode = ConnectionMode.TCP_OBFUSCATED + # This is the main sender, which will be used from the thread + # that calls .connect(). Every other thread will spawn a new + # temporary connection. The connection on this one is always + # kept open so Telegram can send us updates. self._sender = MtProtoSender(self.session, Connection( self.session.server_address, self.session.port, mode=connection_mode, proxy=proxy, timeout=timeout @@ -86,28 +106,76 @@ class TelegramBareClient: # we only want one to actually perform the reconnection. self._connect_lock = Lock() - # Cache "exported" senders 'dc_id: TelegramBareClient' and - # their corresponding sessions not to recreate them all - # the time since it's a (somewhat expensive) process. - self._cached_clients = {} + # Cache "exported" sessions as 'dc_id: Session' not to recreate + # them all the time since generating a new key is a relatively + # expensive operation. + self._exported_sessions = {} # This member will process updates if enabled. # One may change self.updates.enabled at any later point. - self.updates = UpdateState(process_updates) + self.updates = UpdateState(workers=update_workers) + + # Used on connection - the user may modify these and reconnect + kwargs['app_version'] = kwargs.get('app_version', self.__version__) + for name, value in kwargs.items(): + if hasattr(self.session, name): + setattr(self.session, name, value) + + # Despite the state of the real connection, keep track of whether + # the user has explicitly called .connect() or .disconnect() here. + # This information is required by the read thread, who will be the + # one attempting to reconnect on the background *while* the user + # doesn't explicitly call .disconnect(), thus telling it to stop + # retrying. The main thread, knowing there is a background thread + # attempting reconnection as soon as it happens, will just sleep. + self._user_connected = False + + # Save whether the user is authorized here (a.k.a. logged in) + self._authorized = False + + # Uploaded files cache so subsequent calls are instant + self._upload_cache = {} + + # Constantly read for results and updates from within the main client, + # if the user has left enabled such option. + self._spawn_read_thread = spawn_read_thread + self._recv_thread = None + + # Identifier of the main thread (the one that called .connect()). + # This will be used to create new connections from any other thread, + # so that requests can be sent in parallel. + self._main_thread_ident = None + + # Default PingRequest delay + self._last_ping = datetime.now() + self._ping_delay = timedelta(minutes=1) # endregion # region Connecting - def connect(self, exported_auth=None): + def connect(self, _exported_auth=None, _sync_updates=True, _cdn=False): """Connects to the Telegram servers, executing authentication if required. Note that authenticating to the Telegram servers is not the same as authenticating the desired user itself, which may require a call (or several) to 'sign_in' for the first time. - If 'exported_auth' is not None, it will be used instead to + Note that the optional parameters are meant for internal use. + + If '_exported_auth' is not None, it will be used instead to determine the authorization key for the current session. + + If '_sync_updates', sync_updates() will be called and a + second thread will be started if necessary. Note that this + will FAIL if the client is not connected to the user's + native data center, raising a "UserMigrateError", and + calling .disconnect() in the process. + + If '_cdn' is False, methods that are not allowed on such data + centers won't be invoked. """ + self._main_thread_ident = threading.get_ident() + try: self._sender.connect() if not self.session.auth_key: @@ -126,30 +194,46 @@ class TelegramBareClient: init_connection = self.session.layer != LAYER if init_connection: - if exported_auth is not None: + if _exported_auth is not None: self._init_connection(ImportAuthorizationRequest( - exported_auth.id, exported_auth.bytes + _exported_auth.id, _exported_auth.bytes )) - else: + elif not _cdn: TelegramBareClient._dc_options = \ self._init_connection(GetConfigRequest()).dc_options - elif exported_auth is not None: + elif _exported_auth is not None: self(ImportAuthorizationRequest( - exported_auth.id, exported_auth.bytes + _exported_auth.id, _exported_auth.bytes )) - if TelegramBareClient._dc_options is None: + if TelegramBareClient._dc_options is None and not _cdn: TelegramBareClient._dc_options = \ self(GetConfigRequest()).dc_options + # Connection was successful! Try syncing the update state + # UNLESS '_sync_updates' is False (we probably are in + # another data center and this would raise UserMigrateError) + # to also assert whether the user is logged in or not. + self._user_connected = True + if _sync_updates and not _cdn: + try: + self.sync_updates() + self._set_connected_and_authorized() + except UnauthorizedError: + self._authorized = False + return True except TypeNotFoundError as e: # This is fine, probably layer migration self._logger.debug('Found invalid item, probably migrating', e) self.disconnect() - return self.connect(exported_auth=exported_auth) + return self.connect( + _exported_auth=_exported_auth, + _sync_updates=_sync_updates, + _cdn=_cdn + ) except (RPCError, ConnectionError) as error: # Probably errors from the previous session, ignore them @@ -178,9 +262,20 @@ class TelegramBareClient: return result def disconnect(self): - """Disconnects from the Telegram server""" + """Disconnects from the Telegram server + and stops all the spawned threads""" + self._user_connected = False + self._recv_thread = None + + # This will trigger a "ConnectionResetError", for subsequent calls + # to read or send (from another thread) and usually, the background + # thread would try restarting the connection but since the + # ._recv_thread = None, it knows it doesn't have to. self._sender.disconnect() + # TODO Shall we clear the _exported_sessions, or may be reused? + pass + def _reconnect(self, new_dc=None): """If 'new_dc' is not set, only a call to .connect() will be made since it's assumed that the connection has been lost and the @@ -210,7 +305,11 @@ class TelegramBareClient: # endregion - # region Working with different Data Centers + # region Working with different connections/Data Centers + + def _on_read_thread(self): + return self._recv_thread is not None and \ + threading.get_ident() == self._recv_thread.ident def _get_dc(self, dc_id, ipv6=False, cdn=False): """Gets the Data Center (DC) associated to 'dc_id'""" @@ -237,30 +336,23 @@ class TelegramBareClient: TelegramBareClient._dc_options = self(GetConfigRequest()).dc_options return self._get_dc(dc_id, ipv6=ipv6, cdn=cdn) - def _get_exported_client(self, dc_id, - init_connection=False, - bypass_cache=False): - """Gets a cached exported TelegramBareClient for the desired DC. + def _get_exported_client(self, dc_id): + """Creates and connects a new TelegramBareClient for the desired DC. - If it's the first time retrieving the TelegramBareClient, the - current authorization is exported to the new DC so that - it can be used there, and the connection is initialized. - - If after using the sender a ConnectionResetError is raised, - this method should be called again with init_connection=True - in order to perform the reconnection. - - If bypass_cache is True, a new client will be exported and - it will not be cached. + If it's the first time calling the method with a given dc_id, + a new session will be first created, and its auth key generated. + Exporting/Importing the authorization will also be done so that + the auth is bound with the key. """ # Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt # for clearly showing how to export the authorization! ^^ - client = self._cached_clients.get(dc_id) - if client and not bypass_cache: - if init_connection: - client.reconnect() - return client + session = self._exported_sessions.get(dc_id) + if session: + export_auth = None # Already bound with the auth key else: + # TODO Add a lock, don't allow two threads to create an auth key + # (when calling .connect() if there wasn't a previous session). + # for the same data center. dc = self._get_dc(dc_id) # Export the current authorization to the new DC. @@ -274,46 +366,90 @@ class TelegramBareClient: session = Session(self.session) session.server_address = dc.ip_address session.port = dc.port - client = TelegramBareClient( - session, self.api_id, self.api_hash, - proxy=self._sender.connection.conn.proxy, - timeout=self._sender.connection.get_timeout() - ) - client.connect(exported_auth=export_auth) + self._exported_sessions[dc_id] = session - if not bypass_cache: - # Don't go through this expensive process every time. - self._cached_clients[dc_id] = client - return client + client = TelegramBareClient( + session, self.api_id, self.api_hash, + proxy=self._sender.connection.conn.proxy, + timeout=self._sender.connection.get_timeout() + ) + client.connect(_exported_auth=export_auth, _sync_updates=False) + return client + + def _get_cdn_client(self, cdn_redirect): + """Similar to ._get_exported_client, but for CDNs""" + session = self._exported_sessions.get(cdn_redirect.dc_id) + if not session: + dc = self._get_dc(cdn_redirect.dc_id, cdn=True) + session = Session(self.session) + session.server_address = dc.ip_address + session.port = dc.port + self._exported_sessions[cdn_redirect.dc_id] = session + + client = TelegramBareClient( + session, self.api_id, self.api_hash, + proxy=self._sender.connection.conn.proxy, + timeout=self._sender.connection.get_timeout() + ) + + # This will make use of the new RSA keys for this specific CDN. + # + # This relies on the fact that TelegramBareClient._dc_options is + # static and it won't be called from this DC (it would fail). + client.connect(cdn=True) # Avoid invoking non-CDN specific methods + return client # endregion # region Invoking Telegram requests - def invoke(self, *requests, call_receive=True, retries=5): + def invoke(self, *requests, retries=5): """Invokes (sends) a MTProtoRequest and returns (receives) its result. - If 'updates' is not None, all read update object will be put - in such list. Otherwise, update objects will be ignored. - - If 'call_receive' is set to False, then there should be another - thread calling to 'self._sender.receive()' running or this method - will lock forever. + The invoke will be retried up to 'retries' times before raising + ValueError(). """ + # Any error from a background thread will be "posted" and checked here + self.updates.check_error() + if not all(isinstance(x, TLObject) and x.content_related for x in requests): raise ValueError('You can only invoke requests, not types!') - if retries <= 0: - raise ValueError('Number of retries reached 0.') + # Determine the sender to be used (main or a new connection) + on_main_thread = threading.get_ident() == self._main_thread_ident + if on_main_thread or self._on_read_thread(): + sender = self._sender + else: + sender = self._sender.clone() + sender.connect() + # We should call receive from this thread if there's no background + # thread reading or if the server disconnected us and we're trying + # to reconnect. This is because the read thread may either be + # locked also trying to reconnect or we may be said thread already. + call_receive = not on_main_thread or \ + self._recv_thread is None or self._connect_lock.locked() + try: + for _ in range(retries): + result = self._invoke(sender, call_receive, *requests) + if result: + return result + + raise ValueError('Number of retries reached 0.') + finally: + if sender != self._sender: + sender.disconnect() # Close temporary connections + + def _invoke(self, sender, call_receive, *requests): try: # Ensure that we start with no previous errors (i.e. resending) for x in requests: x.confirm_received.clear() x.rpc_error = None - self._sender.send(*requests) + sender.send(*requests) + if not call_receive: # TODO This will be slightly troublesome if we allow # switching between constant read or not on the fly. @@ -321,33 +457,60 @@ class TelegramBareClient: # in which case a Lock would be required for .receive(). for x in requests: x.confirm_received.wait( - self._sender.connection.get_timeout() + sender.connection.get_timeout() ) else: while not all(x.confirm_received.is_set() for x in requests): - self._sender.receive(update_state=self.updates) + sender.receive(update_state=self.updates) + + except (PhoneMigrateError, NetworkMigrateError, + UserMigrateError) as e: + self._logger.debug( + 'DC error when invoking request, ' + 'attempting to reconnect at DC {}'.format(e.new_dc) + ) + + # TODO What happens with the background thread here? + # For normal use cases, this won't happen, because this will only + # be on the very first connection (not authorized, not running), + # but may be an issue for people who actually travel? + self._reconnect(new_dc=e.new_dc) + return self._invoke(sender, call_receive, *requests) except TimeoutError: pass # We will just retry except ConnectionResetError: + if self._connect_lock.locked(): + # We are connecting and we don't want to reconnect there... + raise + self._logger.debug('Server disconnected us. Reconnecting and ' 'resending request...') - self._reconnect() + + if sender != self._sender: + # TODO Try reconnecting forever too? + sender.connect() + else: + while self._user_connected and not self._reconnect(): + sleep(0.1) # Retry forever until we can send the request except FloodWaitError: + sender.disconnect() self.disconnect() raise + finally: + if sender != self._sender: + sender.disconnect() + try: raise next(x.rpc_error for x in requests if x.rpc_error) except StopIteration: if any(x.result is None for x in requests): # "A container may only be accepted or - # rejected by the other party as a whole." - return self.invoke( - *requests, call_receive=call_receive, retries=(retries - 1) - ) + # rejected by the other party as a whole." + return None elif len(requests) == 1: return requests[0].result else: @@ -356,6 +519,13 @@ class TelegramBareClient: # Let people use client(SomeRequest()) instead client.invoke(...) __call__ = invoke + # Some really basic functionality + + def is_user_authorized(self): + """Has the user been authorized yet + (code request sent and confirmed)?""" + return self._authorized + # endregion # region Uploading media @@ -381,10 +551,10 @@ class TelegramBareClient: Default values for the optional parameters if left as None are: part_size_kb = get_appropriated_part_size(file_size) - file_name = path.basename(file_path) + file_name = os.path.basename(file_path) """ if isinstance(file, str): - file_size = path.getsize(file) + file_size = os.path.getsize(file) elif isinstance(file, bytes): file_size = len(file) else: @@ -440,7 +610,7 @@ class TelegramBareClient: # Set a default file name if None was specified if not file_name: if isinstance(file, str): - file_name = path.basename(file) + file_name = os.path.basename(file) else: file_name = str(file_id) @@ -509,7 +679,7 @@ class TelegramBareClient: if isinstance(result, FileCdnRedirect): cdn_decrypter, result = \ CdnDecrypter.prepare_decrypter( - client, TelegramBareClient, result + client, self._get_cdn_client(result), result ) except FileMigrateError as e: @@ -528,6 +698,9 @@ class TelegramBareClient: if progress_callback: progress_callback(f.tell(), file_size) finally: + if client != self: + client.disconnect() + if cdn_decrypter: try: cdn_decrypter.client.disconnect() @@ -537,3 +710,73 @@ class TelegramBareClient: f.close() # endregion + + # region Updates handling + + def sync_updates(self): + """Synchronizes self.updates to their initial state. Will be + called automatically on connection if self.updates.enabled = True, + otherwise it should be called manually after enabling updates. + """ + self.updates.process(self(GetStateRequest())) + + def add_update_handler(self, handler): + """Adds an update handler (a function which takes a TLObject, + an update, as its parameter) and listens for updates""" + sync = not self.updates.handlers + self.updates.handlers.append(handler) + if sync: + self.sync_updates() + + def remove_update_handler(self, handler): + self.updates.handlers.remove(handler) + + def list_update_handlers(self): + return self.updates.handlers[:] + + # endregion + + # Constant read + + def _set_connected_and_authorized(self): + self._authorized = True + if self._spawn_read_thread and self._recv_thread is None: + self._recv_thread = threading.Thread( + name='ReadThread', daemon=True, + target=self._recv_thread_impl + ) + self._recv_thread.start() + + # By using this approach, another thread will be + # created and started upon connection to constantly read + # from the other end. Otherwise, manual calls to .receive() + # must be performed. The MtProtoSender cannot be connected, + # or an error will be thrown. + # + # This way, sending and receiving will be completely independent. + def _recv_thread_impl(self): + while self._user_connected: + try: + if datetime.now() > self._last_ping + self._ping_delay: + self._sender.send(PingRequest( + int.from_bytes(os.urandom(8), 'big', signed=True) + )) + self._last_ping = datetime.now() + + self._sender.receive(update_state=self.updates) + except TimeoutError: + # No problem. + pass + except ConnectionResetError: + self._logger.debug('Server disconnected us. Reconnecting...') + while self._user_connected and not self._reconnect(): + sleep(0.1) # Retry forever, this is instant messaging + + except Exception as e: + # Unknown exception, pass it to the main thread + self.updates.set_error(e) + break + + self._recv_thread = None + + # endregion diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 6092c70c..a053ed95 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -1,10 +1,7 @@ import os -import threading from datetime import datetime, timedelta from functools import lru_cache from mimetypes import guess_type -from threading import Thread -from time import sleep try: import socks @@ -15,12 +12,10 @@ from . import TelegramBareClient from . import helpers as utils from .errors import ( RPCError, UnauthorizedError, InvalidParameterError, PhoneCodeEmptyError, - PhoneMigrateError, NetworkMigrateError, UserMigrateError, PhoneCodeExpiredError, PhoneCodeHashEmptyError, PhoneCodeInvalidError ) from .network import ConnectionMode -from .tl import Session, TLObject -from .tl.functions import PingRequest +from .tl import TLObject from .tl.functions.account import ( GetPasswordRequest ) @@ -35,9 +30,6 @@ from .tl.functions.messages import ( GetDialogsRequest, GetHistoryRequest, ReadHistoryRequest, SendMediaRequest, SendMessageRequest ) -from .tl.functions.updates import ( - GetStateRequest -) from .tl.functions.users import ( GetUsersRequest ) @@ -65,8 +57,9 @@ class TelegramClient(TelegramBareClient): def __init__(self, session, api_id, api_hash, connection_mode=ConnectionMode.TCP_FULL, proxy=None, - process_updates=False, + update_workers=None, timeout=timedelta(seconds=5), + spawn_read_thread=True, **kwargs): """Initializes the Telegram client with the specified API ID and Hash. @@ -79,15 +72,21 @@ class TelegramClient(TelegramBareClient): This will only affect how messages are sent over the network and how much processing is required before sending them. - If 'process_updates' is set to True, incoming updates will be - processed and you must manually call 'self.updates.poll()' from - another thread to retrieve the saved update objects, or your - memory will fill with these. You may modify the value of - 'self.updates.polling' at any later point. + The integer 'update_workers' represents depending on its value: + is None: Updates will *not* be stored in memory. + = 0: Another thread is responsible for calling self.updates.poll() + > 0: 'update_workers' background threads will be spawned, any + any of them will invoke all the self.updates.handlers. - Despite the value of 'process_updates', if you later call - '.add_update_handler(...)', updates will also be processed - and the update objects will be passed to the handlers you added. + If 'spawn_read_thread', a background thread will be started once + an authorized user has been logged in to Telegram to read items + (such as updates and responses) from the network as soon as they + occur, which will speed things up. + + If you don't want to spawn any additional threads, pending updates + will be read and processed accordingly after invoking a request + and not immediately. This is useful if you don't care about updates + at all and have set 'update_workers=None'. If more named arguments are provided as **kwargs, they will be used to update the Session instance. Most common settings are: @@ -98,221 +97,25 @@ class TelegramClient(TelegramBareClient): system_lang_code = lang_code report_errors = True """ - if not api_id or not api_hash: - raise PermissionError( - "Your API ID or Hash cannot be empty or None. " - "Refer to Telethon's README.rst for more information.") - - # Determine what session object we have - if isinstance(session, str) or session is None: - session = Session.try_load_or_create_new(session) - elif not isinstance(session, Session): - raise ValueError( - 'The given session must be a str or a Session instance.') - super().__init__( session, api_id, api_hash, connection_mode=connection_mode, proxy=proxy, - process_updates=process_updates, + update_workers=update_workers, + spawn_read_thread=spawn_read_thread, timeout=timeout ) - # Used on connection - the user may modify these and reconnect - kwargs['app_version'] = kwargs.get('app_version', self.__version__) - for name, value in kwargs.items(): - if hasattr(self.session, name): - setattr(self.session, name, value) - - self._updates_thread = None + # Some fields to easy signing in self._phone_code_hash = None self._phone = None - # Despite the state of the real connection, keep track of whether - # the user has explicitly called .connect() or .disconnect() here. - # This information is required by the read thread, who will be the - # one attempting to reconnect on the background *while* the user - # doesn't explicitly call .disconnect(), thus telling it to stop - # retrying. The main thread, knowing there is a background thread - # attempting reconnection as soon as it happens, will just sleep. - self._user_connected = False - - # Save whether the user is authorized here (a.k.a. logged in) - self._authorized = False - - # Uploaded files cache so subsequent calls are instant - self._upload_cache = {} - - # Constantly read for results and updates from within the main client - self._recv_thread = None - - # Default PingRequest delay - self._last_ping = datetime.now() - self._ping_delay = timedelta(minutes=1) - - # endregion - - # region Connecting - - def connect(self, exported_auth=None): - """Connects to the Telegram servers, executing authentication if - required. Note that authenticating to the Telegram servers is - not the same as authenticating the desired user itself, which - may require a call (or several) to 'sign_in' for the first time. - - exported_auth is meant for internal purposes and can be ignored. - """ - if socks and self._recv_thread: - # Treat proxy errors specially since they're not related to - # Telegram itself, but rather to the proxy. If any happens on - # the read thread forward it to the main thread. - try: - ok = super().connect(exported_auth=exported_auth) - except socks.ProxyConnectionError as e: - ok = False - # Report the exception to the main thread - self.updates.set_error(e) - else: - ok = super().connect(exported_auth=exported_auth) - - if not ok: - return False - - self._user_connected = True - try: - self.sync_updates() - self._set_connected_and_authorized() - except UnauthorizedError: - self._authorized = False - - return True - - def disconnect(self): - """Disconnects from the Telegram server - and stops all the spawned threads""" - self._user_connected = False - self._recv_thread = None - - # This will trigger a "ConnectionResetError", usually, the background - # thread would try restarting the connection but since the - # ._recv_thread = None, it knows it doesn't have to. - super().disconnect() - - # Also disconnect all the cached senders - for sender in self._cached_clients.values(): - sender.disconnect() - - self._cached_clients.clear() - - # endregion - - # region Working with different connections - - def _on_read_thread(self): - return self._recv_thread is not None and \ - threading.get_ident() == self._recv_thread.ident - - def create_new_connection(self, on_dc=None, timeout=timedelta(seconds=5)): - """Creates a new connection which can be used in parallel - with the original TelegramClient. A TelegramBareClient - will be returned already connected, and the caller is - responsible to disconnect it. - - If 'on_dc' is None, the new client will run on the same - data center as the current client (most common case). - - If the client is meant to be used on a different data - center, the data center ID should be specified instead. - """ - if on_dc is None: - client = TelegramBareClient( - self.session, self.api_id, self.api_hash, - proxy=self._sender.connection.conn.proxy, timeout=timeout - ) - client.connect() - else: - client = self._get_exported_client(on_dc, bypass_cache=True) - - return client - # endregion # region Telegram requests functions - def invoke(self, *requests, **kwargs): - """Invokes (sends) one or several MTProtoRequest and returns - (receives) their result. An optional named 'retries' parameter - can be used, indicating how many times it should retry. - """ - # This is only valid when the read thread is reconnecting, - # that is, the connection lock is locked. - if self._on_read_thread() and not self._connect_lock.locked(): - return # Just ignore, we would be raising and crashing the thread - - self.updates.check_error() - - try: - # We should call receive from this thread if there's no background - # thread reading or if the server disconnected us and we're trying - # to reconnect. This is because the read thread may either be - # locked also trying to reconnect or we may be said thread already. - call_receive = \ - self._recv_thread is None or self._connect_lock.locked() - - return super().invoke( - *requests, - call_receive=call_receive, - retries=kwargs.get('retries', 5) - ) - - except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e: - self._logger.debug('DC error when invoking request, ' - 'attempting to reconnect at DC {}' - .format(e.new_dc)) - - # TODO What happens with the background thread here? - # For normal use cases, this won't happen, because this will only - # be on the very first connection (not authorized, not running), - # but may be an issue for people who actually travel? - self._reconnect(new_dc=e.new_dc) - return self.invoke(*requests) - - except ConnectionResetError as e: - if self._connect_lock.locked(): - # We are connecting and we don't want to reconnect there... - raise - while self._user_connected and not self._reconnect(): - sleep(0.1) # Retry forever until we can send the request - - # Let people use client(SomeRequest()) instead client.invoke(...) - __call__ = invoke - - def invoke_on_dc(self, request, dc_id, reconnect=False): - """Invokes the given request on a different DC - by making use of the exported MtProtoSenders. - - If 'reconnect=True', then the a reconnection will be performed and - ConnectionResetError will be raised if it occurs a second time. - """ - try: - client = self._get_exported_client( - dc_id, init_connection=reconnect) - - return client.invoke(request) - - except ConnectionResetError: - if reconnect: - raise - else: - return self.invoke_on_dc(request, dc_id, reconnect=True) - # region Authorization requests - def is_user_authorized(self): - """Has the user been authorized yet - (code request sent and confirmed)?""" - return self._authorized - def send_code_request(self, phone): """Sends a code request to the specified phone number""" if isinstance(phone, int): @@ -1006,73 +809,3 @@ class TelegramClient(TelegramBareClient): ) # endregion - - # region Updates handling - - def sync_updates(self): - """Synchronizes self.updates to their initial state. Will be - called automatically on connection if self.updates.enabled = True, - otherwise it should be called manually after enabling updates. - """ - self.updates.process(self(GetStateRequest())) - - def add_update_handler(self, handler): - """Adds an update handler (a function which takes a TLObject, - an update, as its parameter) and listens for updates""" - sync = not self.updates.handlers - self.updates.handlers.append(handler) - if sync: - self.sync_updates() - - def remove_update_handler(self, handler): - self.updates.handlers.remove(handler) - - def list_update_handlers(self): - return self.updates.handlers[:] - - # endregion - - # Constant read - - def _set_connected_and_authorized(self): - self._authorized = True - if self._recv_thread is None: - self._recv_thread = Thread( - name='ReadThread', daemon=True, - target=self._recv_thread_impl - ) - self._recv_thread.start() - - # By using this approach, another thread will be - # created and started upon connection to constantly read - # from the other end. Otherwise, manual calls to .receive() - # must be performed. The MtProtoSender cannot be connected, - # or an error will be thrown. - # - # This way, sending and receiving will be completely independent. - def _recv_thread_impl(self): - while self._user_connected: - try: - if datetime.now() > self._last_ping + self._ping_delay: - self._sender.send(PingRequest( - int.from_bytes(os.urandom(8), 'big', signed=True) - )) - self._last_ping = datetime.now() - - self._sender.receive(update_state=self.updates) - except TimeoutError: - # No problem. - pass - except ConnectionResetError: - self._logger.debug('Server disconnected us. Reconnecting...') - while self._user_connected and not self._reconnect(): - sleep(0.1) # Retry forever, this is instant messaging - - except Exception as e: - # Unknown exception, pass it to the main thread - self.updates.set_error(e) - break - - self._recv_thread = None - - # endregion diff --git a/telethon/update_state.py b/telethon/update_state.py index 4f3ebce7..4402fb14 100644 --- a/telethon/update_state.py +++ b/telethon/update_state.py @@ -1,6 +1,7 @@ +import logging from collections import deque from datetime import datetime -from threading import RLock, Event +from threading import RLock, Event, Thread from .tl import types as tl @@ -9,27 +10,46 @@ class UpdateState: """Used to hold the current state of processed updates. To retrieve an update, .poll() should be called. """ - def __init__(self, polling): - self._polling = polling + WORKER_POLL_TIMEOUT = 5.0 # Avoid waiting forever on the workers + + def __init__(self, workers=None): + """ + :param workers: This integer parameter has three possible cases: + workers is None: Updates will *not* be stored on self. + workers = 0: Another thread is responsible for calling self.poll() + workers > 0: 'workers' background threads will be spawned, any + any of them will invoke all the self.handlers. + """ + self._workers = workers + self._worker_threads = [] + self.handlers = [] self._updates_lock = RLock() self._updates_available = Event() self._updates = deque() + self._logger = logging.getLogger(__name__) + # https://core.telegram.org/api/updates self._state = tl.updates.State(0, 0, datetime.now(), 0, 0) + self._setup_workers() def can_poll(self): """Returns True if a call to .poll() won't lock""" return self._updates_available.is_set() - def poll(self): - """Polls an update or blocks until an update object is available""" - if not self._polling: - raise ValueError('Updates are not being polled hence not saved.') + def poll(self, timeout=None): + """Polls an update or blocks until an update object is available. + If 'timeout is not None', it should be a floating point value, + and the method will 'return None' if waiting times out. + """ + if not self._updates_available.wait(timeout=timeout): + return - self._updates_available.wait() with self._updates_lock: + if not self._updates_available.is_set(): + return + update = self._updates.popleft() if not self._updates: self._updates_available.clear() @@ -39,16 +59,62 @@ class UpdateState: return update - def get_polling(self): - return self._polling + def get_workers(self): + return self._workers - def set_polling(self, polling): - self._polling = polling - if not polling: - with self._updates_lock: - self._updates.clear() + def set_workers(self, n): + """Changes the number of workers running. + If 'n is None', clears all pending updates from memory. + """ + self._stop_workers() + self._workers = n + if n is None: + self._updates.clear() + else: + self._setup_workers() - polling = property(fget=get_polling, fset=set_polling) + workers = property(fget=get_workers, fset=set_workers) + + def _stop_workers(self): + """Raises "StopIterationException" on the worker threads to stop them, + and also clears all of them off the list + """ + self.set_error(StopIteration()) + for t in self._worker_threads: + t.join() + + self._worker_threads.clear() + + def _setup_workers(self): + if self._worker_threads or not self._workers: + # There already are workers, or workers is None or 0. Do nothing. + return + + for i in range(self._workers): + thread = Thread( + target=UpdateState._worker_loop, + name='UpdateWorker{}'.format(i), + daemon=True, + args=(self, i) + ) + self._worker_threads.append(thread) + thread.start() + + def _worker_loop(self, wid): + while True: + try: + update = self.poll(timeout=UpdateState.WORKER_POLL_TIMEOUT) + # TODO Maybe people can add different handlers per update type + if update: + for handler in self.handlers: + handler(update) + except StopIteration: + break + except Exception as e: + # We don't want to crash a worker thread due to any reason + self._logger.debug( + '[ERROR] Unhandled exception on worker {}'.format(wid), e + ) def set_error(self, error): """Sets an error, so that the next call to .poll() will raise it. @@ -69,8 +135,8 @@ class UpdateState: """Processes an update object. This method is normally called by the library itself. """ - if not self._polling and not self.handlers: - return + if self._workers is None: + return # No processing needs to be done if nobody's working with self._updates_lock: if isinstance(update, tl.updates.State): @@ -82,9 +148,5 @@ class UpdateState: return # We already handled this update self._state.pts = pts - if self._polling: - self._updates.append(update) - self._updates_available.set() - - for handler in self.handlers: - handler(update) + self._updates.append(update) + self._updates_available.set()