From cc5753137ce0431322651e148366634ec0e7c8b8 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 8 Jun 2018 21:52:59 +0200 Subject: [PATCH] Clean-up TelegramBareClient - unnecessary? --- telethon/telegram_bare_client.py | 622 ++++++++----------------------- 1 file changed, 161 insertions(+), 461 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 070ed49e..7acf5891 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -1,25 +1,16 @@ import logging -import os import platform -import threading from datetime import timedelta, datetime -from signal import signal, SIGINT, SIGTERM, SIGABRT -from threading import Lock -from time import sleep + from . import version, utils from .crypto import rsa -from .errors import ( - RPCError, BrokenAuthKeyError, ServerError, FloodWaitError, - FloodTestPhoneWaitError, TypeNotFoundError, UnauthorizedError, - PhoneMigrateError, NetworkMigrateError, UserMigrateError, AuthKeyError, - RpcCallFailError -) -from .network import authenticator, MTProtoSender, ConnectionTcpFull +from .extensions import markdown +from .network import MTProtoSender, ConnectionTcpFull from .sessions import Session, SQLiteSession from .tl import TLObject from .tl.all_tlobjects import LAYER from .tl.functions import ( - InitConnectionRequest, InvokeWithLayerRequest, PingRequest + InitConnectionRequest, InvokeWithLayerRequest ) from .tl.functions.auth import ( ImportAuthorizationRequest, ExportAuthorizationRequest @@ -27,7 +18,6 @@ from .tl.functions.auth import ( from .tl.functions.help import ( GetCdnConfigRequest, GetConfigRequest ) -from .tl.functions.updates import GetStateRequest from .tl.types.auth import ExportedAuthorization from .update_state import UpdateState @@ -39,31 +29,95 @@ DEFAULT_PORT = 443 __log__ = logging.getLogger(__name__) +# TODO Do we need this class? class TelegramBareClient: - """Bare Telegram Client with just the minimum - + """ + A bare Telegram client that somewhat eases the usage of the + ``MTProtoSender``. - The reason to distinguish between a MtProtoSender and a - TelegramClient itself is because the sender is just that, - a sender, which should know nothing about Telegram but - rather how to handle this specific connection. + Args: + session (`str` | `telethon.sessions.abstract.Session`, `None`): + The file name of the session file to be used if a string is + given (it may be a full path), or the Session instance to be + used otherwise. If it's ``None``, the session will not be saved, + and you should call :meth:`.log_out()` when you're done. - The TelegramClient itself should know how to initialize - a proper connection to the servers, as well as other basic - methods such as disconnection and reconnection. + Note that if you pass a string it will be a file in the current + working directory, although you can also pass absolute paths. - This distinction between a bare client and a full client - makes it possible to create clones of the bare version - (by using the same session, IP address and port) to be - able to execute queries on either, without the additional - cost that would involve having the methods for signing in, - logging out, and such. + The session file contains enough information for you to login + without re-sending the code, so if you have to enter the code + more than once, maybe you're changing the working directory, + renaming or removing the file, or using random names. + + api_id (`int` | `str`): + The API ID you obtained from https://my.telegram.org. + + api_hash (`str`): + The API ID you obtained from https://my.telegram.org. + + connection (`telethon.network.connection.common.Connection`, optional): + The connection instance to be used when creating a new connection + to the servers. If it's a type, the `proxy` argument will be used. + + Defaults to `telethon.network.connection.tcpfull.ConnectionTcpFull`. + + use_ipv6 (`bool`, optional): + Whether to connect to the servers through IPv6 or not. + By default this is ``False`` as IPv6 support is not + too widespread yet. + + proxy (`tuple` | `dict`, optional): + A tuple consisting of ``(socks.SOCKS5, 'host', port)``. + See https://github.com/Anorov/PySocks#usage-1 for more. + + update_workers (`int`, optional): + If specified, represents how many extra threads should + be spawned to handle incoming updates, and updates will + be kept in memory until they are processed. Note that + you must set this to at least ``0`` if you want to be + able to process updates through :meth:`updates.poll()`. + + timeout (`int` | `float` | `timedelta`, optional): + The timeout to be used when receiving responses from + the network. Defaults to 5 seconds. + + spawn_read_thread (`bool`, optional): + Whether to use an extra background thread or not. Defaults + to ``True`` so receiving items from the network happens + instantly, as soon as they arrive. Can still be disabled + if you want to run the library without any additional thread. + + report_errors (`bool`, optional): + Whether to report RPC errors or not. Defaults to ``True``, + see :ref:`api-status` for more information. + + device_model (`str`, optional): + "Device model" to be sent when creating the initial connection. + Defaults to ``platform.node()``. + + system_version (`str`, optional): + "System version" to be sent when creating the initial connection. + Defaults to ``platform.system()``. + + app_version (`str`, optional): + "App version" to be sent when creating the initial connection. + Defaults to `telethon.version.__version__`. + + lang_code (`str`, optional): + "Language code" to be sent when creating the initial connection. + Defaults to ``'en'``. + + system_lang_code (`str`, optional): + "System lang code" to be sent when creating the initial connection. + Defaults to `lang_code`. """ # Current TelegramClient version __version__ = version.__version__ - # TODO Make this thread-safe, all connections share the same DC - _config = None # Server configuration (with .dc_options) + # Server configuration (with .dc_options) + _config = None # region Initialization @@ -72,8 +126,6 @@ class TelegramBareClient: connection=ConnectionTcpFull, use_ipv6=False, proxy=None, - update_workers=None, - spawn_read_thread=False, timeout=timedelta(seconds=5), report_errors=True, device_model=None, @@ -118,11 +170,7 @@ class TelegramBareClient: if isinstance(connection, type): connection = connection(proxy=proxy, timeout=timeout) - self._sender = MtProtoSender(self.session, connection) - - # Two threads may be calling reconnect() when the connection is lost, - # we only want one to actually perform the reconnection. - self._reconnect_lock = Lock() + self._sender = MTProtoSender(self.session, connection) # Cache "exported" sessions as 'dc_id: Session' not to recreate # them all the time since generating a new key is a relatively @@ -131,7 +179,8 @@ class TelegramBareClient: # This member will process updates if enabled. # One may change self.updates.enabled at any later point. - self.updates = UpdateState(workers=update_workers) + # TODO Stop using that 1 + self.updates = UpdateState(1) # Used on connection - the user may modify these and reconnect system = platform.uname() @@ -141,15 +190,6 @@ class TelegramBareClient: self.lang_code = lang_code self.system_lang_code = system_lang_code - # Despite the state of the real connection, keep track of whether - # the user has explicitly called .connect() or .disconnect() here. - # This information is required by the read thread, who will be the - # one attempting to reconnect on the background *while* the user - # doesn't explicitly call .disconnect(), thus telling it to stop - # retrying. The main thread, knowing there is a background thread - # attempting reconnection as soon as it happens, will just sleep. - self._user_connected = False - # Save whether the user is authorized here (a.k.a. logged in) self._authorized = None # None = We don't know yet @@ -157,12 +197,6 @@ class TelegramBareClient: # See https://core.telegram.org/api/invoking#saving-client-info. self._first_request = True - # Constantly read for results and updates from within the main client, - # if the user has left enabled such option. - self._spawn_read_thread = spawn_read_thread - self._recv_thread = None - self._idling = threading.Event() - # Default PingRequest delay self._last_ping = datetime.now() self._ping_delay = timedelta(minutes=1) @@ -175,81 +209,48 @@ class TelegramBareClient: self._last_state = datetime.now() self._state_delay = timedelta(hours=1) - # Some errors are known but there's nothing we can do from the - # background thread. If any of these happens, call .disconnect(), - # and raise them next time .invoke() is tried to be called. - self._background_error = None + # Some further state for subclasses + self._event_builders = [] + self._events_pending_resolve = [] + + # Default parse mode + self._parse_mode = markdown + + # Some fields to easy signing in. Let {phone: hash} be + # a dictionary because the user may change their mind. + self._phone_code_hash = {} + self._phone = None + self._tos = None + + # Sometimes we need to know who we are, cache the self peer + self._self_input_peer = None # endregion # region Connecting - def connect(self, _sync_updates=True): - """Connects to the Telegram servers, executing authentication if - required. Note that authenticating to the Telegram servers is - not the same as authenticating the desired user itself, which - may require a call (or several) to 'sign_in' for the first time. - - Note that the optional parameters are meant for internal use. - - If '_sync_updates', sync_updates() will be called and a - second thread will be started if necessary. Note that this - will FAIL if the client is not connected to the user's - native data center, raising a "UserMigrateError", and - calling .disconnect() in the process. + async def connect(self, _sync_updates=True): """ - __log__.info('Connecting to %s:%d...', - self.session.server_address, self.session.port) - - self._background_error = None # Clear previous errors - - try: - self._sender.connect() - __log__.info('Connection success!') - - # Connection was successful! Try syncing the update state - # UNLESS '_sync_updates' is False (we probably are in - # another data center and this would raise UserMigrateError) - # to also assert whether the user is logged in or not. - self._user_connected = True - if self._authorized is None and _sync_updates: - try: - self.sync_updates() - self._set_connected_and_authorized() - except UnauthorizedError: - self._authorized = False - elif self._authorized: - self._set_connected_and_authorized() - - return True - - except TypeNotFoundError as e: - # This is fine, probably layer migration - __log__.warning('Connection failed, got unexpected type with ID ' - '%s. Migrating?', hex(e.invalid_constructor_id)) - self.disconnect() - return self.connect(_sync_updates=_sync_updates) - - except AuthKeyError as e: - # As of late March 2018 there were two AUTH_KEY_DUPLICATED - # reports. Retrying with a clean auth_key should fix this. - __log__.warning('Auth key error %s. Clearing it and retrying.', e) - self.disconnect() - self.session.auth_key = None - self.session.save() - return self.connect(_sync_updates=_sync_updates) - - except (RPCError, ConnectionError) as e: - # Probably errors from the previous session, ignore them - __log__.error('Connection failed due to %s', e) - self.disconnect() - return False + Connects to Telegram. + """ + # TODO Maybe we should rethink what the session does if the sender + # needs a session but it might connect to arbitrary IPs? + # + # TODO sync updates/connected and authorized if no UnauthorizedError? + await self._sender.connect( + self.session.server_address, self.session.port) def is_connected(self): + """ + Returns ``True`` if the user has connected. + """ return self._sender.is_connected() def _wrap_init_connection(self, query): - """Wraps query around InvokeWithLayerRequest(InitConnectionRequest())""" + """ + Wraps `query` around + ``InvokeWithLayerRequest(InitConnectionRequest(...))``. + """ return InvokeWithLayerRequest(LAYER, InitConnectionRequest( api_id=self.api_id, device_model=self.device_model, @@ -261,75 +262,46 @@ class TelegramBareClient: query=query )) - def disconnect(self): - """Disconnects from the Telegram server - and stops all the spawned threads""" - __log__.info('Disconnecting...') - self._user_connected = False # This will stop recv_thread's loop - - __log__.debug('Stopping all workers...') - self.updates.stop_workers() - - # This will trigger a "ConnectionResetError" on the recv_thread, - # which won't attempt reconnecting as ._user_connected is False. - __log__.debug('Disconnecting the socket...') - self._sender.disconnect() - - # TODO Shall we clear the _exported_sessions, or may be reused? - self._first_request = True # On reconnect it will be first again - self.session.set_update_state(0, self.updates.get_update_state(0)) + async def disconnect(self): + """ + Disconnects from Telegram. + """ + await self._sender.disconnect() + # TODO What to do with the update state? Does it belong here? + # self.session.set_update_state(0, self.updates.get_update_state(0)) self.session.close() - def _reconnect(self, new_dc=None): - """If 'new_dc' is not set, only a call to .connect() will be made - since it's assumed that the connection has been lost and the - library is reconnecting. - - If 'new_dc' is set, the client is first disconnected from the - current data center, clears the auth key for the old DC, and - connects to the new data center. + def _switch_dc(self, new_dc): """ - if new_dc is None: - if self.is_connected(): - __log__.info('Reconnection aborted: already connected') - return True + Switches the current connection to the new data center. + """ + # TODO Implement + raise NotImplementedError + dc = self._get_dc(new_dc) + __log__.info('Reconnecting to new data center %s', dc) - try: - __log__.info('Attempting reconnection...') - return self.connect() - except ConnectionResetError as e: - __log__.warning('Reconnection failed due to %s', e) - return False - else: - # Since we're reconnecting possibly due to a UserMigrateError, - # we need to first know the Data Centers we can connect to. Do - # that before disconnecting. - dc = self._get_dc(new_dc) - __log__.info('Reconnecting to new data center %s', dc) - - self.session.set_dc(dc.id, dc.ip_address, dc.port) - # auth_key's are associated with a server, which has now changed - # so it's not valid anymore. Set to None to force recreating it. - self.session.auth_key = None - self.session.save() - self.disconnect() - return self.connect() + self.session.set_dc(dc.id, dc.ip_address, dc.port) + # auth_key's are associated with a server, which has now changed + # so it's not valid anymore. Set to None to force recreating it. + self.session.auth_key = None + self.session.save() + self.disconnect() + return self.connect() def set_proxy(self, proxy): """Change the proxy used by the connections. """ if self.is_connected(): raise RuntimeError("You can't change the proxy while connected.") - self._sender.connection.conn.proxy = proxy + + # TODO Should we tell the user to create a new client? + # Can this be done more cleanly? Similar to `switch_dc` + self._sender._connection.conn.proxy = proxy # endregion # region Working with different connections/Data Centers - def _on_read_thread(self): - return self._recv_thread is not None and \ - threading.get_ident() == self._recv_thread.ident - def _get_dc(self, dc_id, cdn=False): """Gets the Data Center (DC) associated to 'dc_id'""" if not TelegramBareClient._config: @@ -431,7 +403,7 @@ class TelegramBareClient: # region Invoking Telegram requests - def __call__(self, request, retries=5, ordered=False): + async def __call__(self, request, ordered=False): """ Invokes (sends) one or more MTProtoRequests and returns (receives) their result. @@ -456,302 +428,30 @@ class TelegramBareClient: The result of the request (often a `TLObject`) or a list of results if more than one request was given. """ - single = not utils.is_list_like(request) - if single: - request = (request,) - + requests = (request,) if not utils.is_list_like(request) else request if not all(isinstance(x, TLObject) and - x.content_related for x in request): + x.content_related for x in requests): raise TypeError('You can only invoke requests, not types!') - if self._background_error: - raise self._background_error + # TODO Resolve requests, should be done by TelegramClient + # for r in requests: + # await r.resolve(self, utils) - for r in request: - r.resolve(self, utils) - - # For logging purposes - if single: - which = type(request[0]).__name__ + # TODO InvokeWithLayer if no authkey, maybe done in MTProtoSender? + # TODO Handle PhoneMigrateError, NetworkMigrateError, UserMigrateError + # ^ by switching DC + # TODO Retry on ServerError, RpcCallFailError + # TODO Auto-sleep on some FloodWaitError, FloodTestPhoneWaitError + future = await self._sender.send(request, ordered=ordered) + if isinstance(future, list): + results = [] + for f in future: + results.append(await future) + return results else: - which = '{} requests ({})'.format( - len(request), [type(x).__name__ for x in request]) - - # Determine the sender to be used (main or a new connection) - __log__.debug('Invoking %s', which) - call_receive = \ - not self._idling.is_set() or self._reconnect_lock.locked() - - for retry in range(retries): - result = self._invoke(call_receive, request, ordered=ordered) - if result is not None: - return result[0] if single else result - - log = __log__.info if retry == 0 else __log__.warning - log('Invoking %s failed %d times, connecting again and retrying', - which, retry + 1) - - sleep(1) - # The ReadThread has priority when attempting reconnection, - # since this thread is constantly running while __call__ is - # only done sometimes. Here try connecting only once/retry. - if not self._reconnect_lock.locked(): - with self._reconnect_lock: - self._reconnect() - - raise RuntimeError('Number of retries reached 0 for {}.'.format( - which - )) + return await future # Let people use client.invoke(SomeRequest()) instead client(...) invoke = __call__ - def _invoke(self, call_receive, requests, ordered=False): - try: - # Ensure that we start with no previous errors (i.e. resending) - for x in requests: - x.confirm_received.clear() - x.rpc_error = None - - if not self.session.auth_key: - __log__.info('Need to generate new auth key before invoking') - self._first_request = True - self.session.auth_key, self.session.time_offset = \ - authenticator.do_authentication(self._sender.connection) - - if self._first_request: - __log__.info('Initializing a new connection while invoking') - if len(requests) == 1: - requests = [self._wrap_init_connection(requests[0])] - else: - # We need a SINGLE request (like GetConfig) to init conn. - # Once that's done, the N original requests will be - # invoked. - TelegramBareClient._config = self( - self._wrap_init_connection(GetConfigRequest()) - ) - - self._sender.send(requests, ordered=ordered) - - if not call_receive: - # TODO This will be slightly troublesome if we allow - # switching between constant read or not on the fly. - # Must also watch out for calling .read() from two places, - # in which case a Lock would be required for .receive(). - for x in requests: - x.confirm_received.wait( - self._sender.connection.get_timeout() - ) - else: - while not all(x.confirm_received.is_set() for x in requests): - self._sender.receive(update_state=self.updates) - - except BrokenAuthKeyError: - __log__.error('Authorization key seems broken and was invalid!') - self.session.auth_key = None - - except TypeNotFoundError as e: - # Only occurs when we call receive. May happen when - # we need to reconnect to another DC on login and - # Telegram somehow sends old objects (like configOld) - self._first_request = True - __log__.warning('Read unknown TLObject code ({}). ' - 'Setting again first_request flag.' - .format(hex(e.invalid_constructor_id))) - - except TimeoutError: - __log__.warning('Invoking timed out') # We will just retry - - except ConnectionResetError as e: - __log__.warning('Connection was reset while invoking') - if self._user_connected: - # Server disconnected us, __call__ will try reconnecting. - try: - self._sender.disconnect() - except: - pass - - return None - else: - # User never called .connect(), so raise this error. - raise RuntimeError('Tried to invoke without .connect()') from e - - # Clear the flag if we got this far - self._first_request = False - - try: - raise next(x.rpc_error for x in requests if x.rpc_error) - except StopIteration: - if any(x.result is None for x in requests): - # "A container may only be accepted or - # rejected by the other party as a whole." - return None - - return [x.result for x in requests] - - except (PhoneMigrateError, NetworkMigrateError, - UserMigrateError) as e: - - # TODO What happens with the background thread here? - # For normal use cases, this won't happen, because this will only - # be on the very first connection (not authorized, not running), - # but may be an issue for people who actually travel? - self._reconnect(new_dc=e.new_dc) - return self._invoke(call_receive, requests) - - except (ServerError, RpcCallFailError) as e: - # Telegram is having some issues, just retry - __log__.warning('Telegram is having internal issues: %s', e) - - except (FloodWaitError, FloodTestPhoneWaitError) as e: - __log__.warning('Request invoked too often, wait %ds', e.seconds) - if e.seconds > self.session.flood_sleep_threshold | 0: - raise - - sleep(e.seconds) - - # Some really basic functionality - - def is_user_authorized(self): - """Has the user been authorized yet - (code request sent and confirmed)?""" - return self._authorized - - def get_input_entity(self, peer): - """ - Stub method, no functionality so that calling - ``.get_input_entity()`` from ``.resolve()`` doesn't fail. - """ - return peer - - # endregion - - # region Updates handling - - def sync_updates(self): - """Synchronizes self.updates to their initial state. Will be - called automatically on connection if self.updates.enabled = True, - otherwise it should be called manually after enabling updates. - """ - self.updates.process(self(GetStateRequest())) - self._last_state = datetime.now() - - # endregion - - # region Constant read - - def _set_connected_and_authorized(self): - self._authorized = True - self.updates.setup_workers() - if self._spawn_read_thread and self._recv_thread is None: - self._recv_thread = threading.Thread( - name='ReadThread', daemon=True, - target=self._recv_thread_impl - ) - self._recv_thread.start() - - def _signal_handler(self, signum, frame): - if self._user_connected: - self.disconnect() - else: - os._exit(1) - - def idle(self, stop_signals=(SIGINT, SIGTERM, SIGABRT)): - """ - Idles the program by looping forever and listening for updates - until one of the signals are received, which breaks the loop. - - :param stop_signals: - Iterable containing signals from the signal module that will - be subscribed to TelegramClient.disconnect() (effectively - stopping the idle loop), which will be called on receiving one - of those signals. - :return: - """ - if self._spawn_read_thread and not self._on_read_thread(): - raise RuntimeError('Can only idle if spawn_read_thread=False') - - self._idling.set() - for sig in stop_signals: - signal(sig, self._signal_handler) - - if self._on_read_thread(): - __log__.info('Starting to wait for items from the network') - else: - __log__.info('Idling to receive items from the network') - - while self._user_connected: - try: - if datetime.now() > self._last_ping + self._ping_delay: - self._sender.send(PingRequest( - int.from_bytes(os.urandom(8), 'big', signed=True) - )) - self._last_ping = datetime.now() - - if datetime.now() > self._last_state + self._state_delay: - self._sender.send(GetStateRequest()) - self._last_state = datetime.now() - - __log__.debug('Receiving items from the network...') - self._sender.receive(update_state=self.updates) - except TimeoutError: - # No problem - __log__.debug('Receiving items from the network timed out') - except ConnectionResetError: - if self._user_connected: - __log__.error('Connection was reset while receiving ' - 'items. Reconnecting') - with self._reconnect_lock: - while self._user_connected and not self._reconnect(): - sleep(0.1) # Retry forever, this is instant messaging - - if self.is_connected(): - # Telegram seems to kick us every 1024 items received - # from the network not considering things like bad salt. - # We must execute some *high level* request (that's not - # a ping) if we want to receive updates again. - # TODO Test if getDifference works too (better alternative) - self._sender.send(GetStateRequest()) - except: - self._idling.clear() - raise - - self._idling.clear() - __log__.info('Connection closed by the user, not reading anymore') - - # By using this approach, another thread will be - # created and started upon connection to constantly read - # from the other end. Otherwise, manual calls to .receive() - # must be performed. The MtProtoSender cannot be connected, - # or an error will be thrown. - # - # This way, sending and receiving will be completely independent. - def _recv_thread_impl(self): - # This thread is "idle" (only listening for updates), but also - # excepts everything unlike the manual idle because it should - # not crash. - while self._user_connected: - try: - self.idle(stop_signals=tuple()) - except Exception as error: - __log__.exception('Unknown exception in the read thread! ' - 'Disconnecting and leaving it to main thread') - # Unknown exception, pass it to the main thread - - try: - import socks - if isinstance(error, ( - socks.GeneralProxyError, socks.ProxyConnectionError - )): - # This is a known error, and it's not related to - # Telegram but rather to the proxy. Disconnect and - # hand it over to the main thread. - self._background_error = error - self.disconnect() - break - except ImportError: - "Not using PySocks, so it can't be a proxy error" - - self._recv_thread = None - # endregion