From 9716d1d5436c6a0f9568c977602afcdc11e043be Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 6 Oct 2017 19:30:14 +0200 Subject: [PATCH] Remove all Thread's except from UpdateState --- telethon/extensions/tcp_client.py | 23 ++--- telethon/telegram_bare_client.py | 161 +++--------------------------- telethon/telegram_client.py | 27 +---- telethon/tl/entity_database.py | 15 +-- telethon/tl/session.py | 55 +++++----- 5 files changed, 51 insertions(+), 230 deletions(-) diff --git a/telethon/extensions/tcp_client.py b/telethon/extensions/tcp_client.py index 6feb9841..134c36ae 100644 --- a/telethon/extensions/tcp_client.py +++ b/telethon/extensions/tcp_client.py @@ -3,14 +3,12 @@ import errno import socket from datetime import timedelta from io import BytesIO, BufferedWriter -from threading import Lock class TcpClient: def __init__(self, proxy=None, timeout=timedelta(seconds=5)): self.proxy = proxy self._socket = None - self._closing_lock = Lock() if isinstance(timeout, timedelta): self.timeout = timeout.seconds @@ -65,19 +63,14 @@ class TcpClient: def close(self): """Closes the connection""" - if self._closing_lock.locked(): - # Already closing, no need to close again (avoid None.close()) - return - - with self._closing_lock: - try: - if self._socket is not None: - self._socket.shutdown(socket.SHUT_RDWR) - self._socket.close() - except OSError: - pass # Ignore ENOTCONN, EBADF, and any other error when closing - finally: - self._socket = None + try: + if self._socket is not None: + self._socket.shutdown(socket.SHUT_RDWR) + self._socket.close() + except OSError: + pass # Ignore ENOTCONN, EBADF, and any other error when closing + finally: + self._socket = None def write(self, data): """Writes (sends) the specified bytes to the connected peer""" diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 17e8a364..3dc1b180 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -1,10 +1,8 @@ import logging import os -import threading from datetime import timedelta, datetime from hashlib import md5 from io import BytesIO -from threading import Lock from time import sleep from . import helpers as utils @@ -18,7 +16,7 @@ from .network import authenticator, MtProtoSender, Connection, ConnectionMode from .tl import TLObject, Session from .tl.all_tlobjects import LAYER from .tl.functions import ( - InitConnectionRequest, InvokeWithLayerRequest, PingRequest + InitConnectionRequest, InvokeWithLayerRequest ) from .tl.functions.auth import ( ImportAuthorizationRequest, ExportAuthorizationRequest @@ -67,8 +65,6 @@ class TelegramBareClient: def __init__(self, session, api_id, api_hash, connection_mode=ConnectionMode.TCP_FULL, proxy=None, - update_workers=None, - spawn_read_thread=False, timeout=timedelta(seconds=5), **kwargs): """Refer to TelegramClient.__init__ for docs on this method""" @@ -101,10 +97,6 @@ class TelegramBareClient: self._logger = logging.getLogger(__name__) - # Two threads may be calling reconnect() when the connection is lost, - # we only want one to actually perform the reconnection. - self._reconnect_lock = Lock() - # Cache "exported" sessions as 'dc_id: Session' not to recreate # them all the time since generating a new key is a relatively # expensive operation. @@ -112,7 +104,7 @@ class TelegramBareClient: # This member will process updates if enabled. # One may change self.updates.enabled at any later point. - self.updates = UpdateState(workers=update_workers) + self.updates = UpdateState(workers=None) # Used on connection - the user may modify these and reconnect kwargs['app_version'] = kwargs.get('app_version', self.__version__) @@ -136,24 +128,10 @@ class TelegramBareClient: # Uploaded files cache so subsequent calls are instant self._upload_cache = {} - # Constantly read for results and updates from within the main client, - # if the user has left enabled such option. - self._spawn_read_thread = spawn_read_thread - self._recv_thread = None - - # Identifier of the main thread (the one that called .connect()). - # This will be used to create new connections from any other thread, - # so that requests can be sent in parallel. - self._main_thread_ident = None - # Default PingRequest delay self._last_ping = datetime.now() self._ping_delay = timedelta(minutes=1) - # Some errors are known but there's nothing we can do from the - # background thread. If any of these happens, call .disconnect(), - # and raise them next time .invoke() is tried to be called. - self._background_error = None # endregion @@ -179,9 +157,6 @@ class TelegramBareClient: If '_cdn' is False, methods that are not allowed on such data centers won't be invoked. """ - self._main_thread_ident = threading.get_ident() - self._background_error = None # Clear previous errors - try: self._sender.connect() if not self.session.auth_key: @@ -268,20 +243,9 @@ class TelegramBareClient: return result def disconnect(self): - """Disconnects from the Telegram server - and stops all the spawned threads""" + """Disconnects from the Telegram server""" self._user_connected = False - self._recv_thread = None - - # Stop the workers from the background thread - self.updates.stop_workers() - - # This will trigger a "ConnectionResetError", for subsequent calls - # to read or send (from another thread) and usually, the background - # thread would try restarting the connection but since the - # ._recv_thread = None, it knows it doesn't have to. self._sender.disconnect() - # TODO Shall we clear the _exported_sessions, or may be reused? pass @@ -296,12 +260,7 @@ class TelegramBareClient: """ if new_dc is None: # Assume we are disconnected due to some error, so connect again - with self._reconnect_lock: - # Another thread may have connected again, so check that first - if not self.is_connected(): - return self.connect() - else: - return True + return self.connect() else: self.disconnect() self.session.auth_key = None # Force creating new auth_key @@ -316,10 +275,6 @@ class TelegramBareClient: # region Working with different connections/Data Centers - def _on_read_thread(self): - return self._recv_thread is not None and \ - threading.get_ident() == self._recv_thread.ident - def _get_dc(self, dc_id, ipv6=False, cdn=False): """Gets the Data Center (DC) associated to 'dc_id'""" if TelegramBareClient._dc_options is None: @@ -424,26 +379,12 @@ class TelegramBareClient: x.content_related for x in requests): raise ValueError('You can only invoke requests, not types!') - # Determine the sender to be used (main or a new connection) - on_main_thread = threading.get_ident() == self._main_thread_ident - if on_main_thread or self._on_read_thread(): - sender = self._sender - else: - sender = self._sender.clone() - sender.connect() + # TODO Determine the sender to be used (main or a new connection) + sender = self._sender # .clone(), .connect() - # We should call receive from this thread if there's no background - # thread reading or if the server disconnected us and we're trying - # to reconnect. This is because the read thread may either be - # locked also trying to reconnect or we may be said thread already. - call_receive = not on_main_thread or self._recv_thread is None \ - or self._reconnect_lock.locked() try: for _ in range(retries): - if self._background_error and on_main_thread: - raise self._background_error - - result = self._invoke(sender, call_receive, *requests) + result = self._invoke(sender, *requests) if result: return result @@ -455,7 +396,7 @@ class TelegramBareClient: # Let people use client.invoke(SomeRequest()) instead client(...) invoke = __call__ - def _invoke(self, sender, call_receive, *requests): + def _invoke(self, sender, *requests): try: # Ensure that we start with no previous errors (i.e. resending) for x in requests: @@ -463,27 +404,15 @@ class TelegramBareClient: x.rpc_error = None sender.send(*requests) - - if not call_receive: - # TODO This will be slightly troublesome if we allow - # switching between constant read or not on the fly. - # Must also watch out for calling .read() from two places, - # in which case a Lock would be required for .receive(). - for x in requests: - x.confirm_received.wait( - sender.connection.get_timeout() - ) - else: - while not all(x.confirm_received.is_set() for x in requests): - sender.receive(update_state=self.updates) + while not all(x.confirm_received.is_set() for x in requests): + sender.receive(update_state=self.updates) except TimeoutError: pass # We will just retry except ConnectionResetError: - if not self._authorized or self._reconnect_lock.locked(): - # Only attempt reconnecting if we're authorized and not - # reconnecting already. + if not self._authorized: + # Only attempt reconnecting if we're authorized raise self._logger.debug('Server disconnected us. Reconnecting and ' @@ -520,12 +449,8 @@ class TelegramBareClient: 'attempting to reconnect at DC {}'.format(e.new_dc) ) - # TODO What happens with the background thread here? - # For normal use cases, this won't happen, because this will only - # be on the very first connection (not authorized, not running), - # but may be an issue for people who actually travel? self._reconnect(new_dc=e.new_dc) - return self._invoke(sender, call_receive, *requests) + return self._invoke(sender, *requests) except ServerError as e: # Telegram is having some issues, just retry @@ -759,64 +684,6 @@ class TelegramBareClient: def _set_connected_and_authorized(self): self._authorized = True - self.updates.setup_workers() - if self._spawn_read_thread and self._recv_thread is None: - self._recv_thread = threading.Thread( - name='ReadThread', daemon=True, - target=self._recv_thread_impl - ) - self._recv_thread.start() - - # By using this approach, another thread will be - # created and started upon connection to constantly read - # from the other end. Otherwise, manual calls to .receive() - # must be performed. The MtProtoSender cannot be connected, - # or an error will be thrown. - # - # This way, sending and receiving will be completely independent. - def _recv_thread_impl(self): - while self._user_connected: - try: - if datetime.now() > self._last_ping + self._ping_delay: - self._sender.send(PingRequest( - int.from_bytes(os.urandom(8), 'big', signed=True) - )) - self._last_ping = datetime.now() - - self._sender.receive(update_state=self.updates) - except TimeoutError: - # No problem. - pass - except ConnectionResetError: - self._logger.debug('Server disconnected us. Reconnecting...') - while self._user_connected and not self._reconnect(): - sleep(0.1) # Retry forever, this is instant messaging - - except Exception as error: - # Unknown exception, pass it to the main thread - self._logger.debug( - '[ERROR] Unknown error on the read thread, please report', - error - ) - - try: - import socks - if isinstance(error, socks.GeneralProxyError): - # This is a known error, and it's not related to - # Telegram but rather to the proxy. Disconnect and - # hand it over to the main thread. - self._background_error = error - self.disconnect() - break - except ImportError: - "Not using PySocks, so it can't be a socket error" - - # If something strange happens we don't want to enter an - # infinite loop where all we do is raise an exception, so - # add a little sleep to avoid the CPU usage going mad. - sleep(0.1) - break - - self._recv_thread = None + # TODO self.updates.setup_workers() # endregion diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index bdaa41d8..80cfea50 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -52,21 +52,14 @@ from .tl.types.messages import DialogsSlice class TelegramClient(TelegramBareClient): - """Full featured TelegramClient meant to extend the basic functionality - - - As opposed to the TelegramBareClient, this one features downloading - media from different data centers, starting a second thread to - handle updates, and some very common functionality. - """ + """Full featured TelegramClient meant to extend the basic functionality""" # region Initialization def __init__(self, session, api_id, api_hash, connection_mode=ConnectionMode.TCP_FULL, proxy=None, - update_workers=None, timeout=timedelta(seconds=5), - spawn_read_thread=True, **kwargs): """Initializes the Telegram client with the specified API ID and Hash. @@ -79,22 +72,6 @@ class TelegramClient(TelegramBareClient): This will only affect how messages are sent over the network and how much processing is required before sending them. - The integer 'update_workers' represents depending on its value: - is None: Updates will *not* be stored in memory. - = 0: Another thread is responsible for calling self.updates.poll() - > 0: 'update_workers' background threads will be spawned, any - any of them will invoke all the self.updates.handlers. - - If 'spawn_read_thread', a background thread will be started once - an authorized user has been logged in to Telegram to read items - (such as updates and responses) from the network as soon as they - occur, which will speed things up. - - If you don't want to spawn any additional threads, pending updates - will be read and processed accordingly after invoking a request - and not immediately. This is useful if you don't care about updates - at all and have set 'update_workers=None'. - If more named arguments are provided as **kwargs, they will be used to update the Session instance. Most common settings are: device_model = platform.node() @@ -108,8 +85,6 @@ class TelegramClient(TelegramBareClient): session, api_id, api_hash, connection_mode=connection_mode, proxy=proxy, - update_workers=update_workers, - spawn_read_thread=spawn_read_thread, timeout=timeout, **kwargs ) diff --git a/telethon/tl/entity_database.py b/telethon/tl/entity_database.py index 61c07efc..da105616 100644 --- a/telethon/tl/entity_database.py +++ b/telethon/tl/entity_database.py @@ -1,5 +1,3 @@ -from threading import Lock - import re from .. import utils @@ -20,8 +18,6 @@ class EntityDatabase: """ self.enabled = enabled self.enabled_full = enabled_full - - self._lock = Lock() self._entities = {} # marked_id: user|chat|channel if input_list: @@ -81,12 +77,11 @@ class EntityDatabase: except ValueError: pass - with self._lock: - before = len(self._input_entities) - self._input_entities.update(new_input) - for e in new: - self._add_full_entity(e) - return len(self._input_entities) != before + before = len(self._input_entities) + self._input_entities.update(new_input) + for e in new: + self._add_full_entity(e) + return len(self._input_entities) != before def _add_full_entity(self, entity): """Adds a "full" entity (User, Chat or Channel, not "Input*"), diff --git a/telethon/tl/session.py b/telethon/tl/session.py index 98ffda16..b722144e 100644 --- a/telethon/tl/session.py +++ b/telethon/tl/session.py @@ -4,7 +4,6 @@ import platform import time from base64 import b64encode, b64decode from os.path import isfile as file_exists -from threading import Lock from .entity_database import EntityDatabase from .. import helpers @@ -51,11 +50,6 @@ class Session: self.report_errors = True self.save_entities = True - # Cross-thread safety - self._seq_no_lock = Lock() - self._msg_id_lock = Lock() - self._save_lock = Lock() - self.id = helpers.generate_random_long(signed=False) self._sequence = 0 self.time_offset = 0 @@ -71,24 +65,23 @@ class Session: def save(self): """Saves the current session object as session_user_id.session""" - if not self.session_user_id or self._save_lock.locked(): + if not self.session_user_id: return - with self._save_lock: - with open('{}.session'.format(self.session_user_id), 'w') as file: - out_dict = { - 'port': self.port, - 'salt': self.salt, - 'layer': self.layer, - 'server_address': self.server_address, - 'auth_key_data': - b64encode(self.auth_key.key).decode('ascii') - if self.auth_key else None - } - if self.save_entities: - out_dict['entities'] = self.entities.get_input_list() + with open('{}.session'.format(self.session_user_id), 'w') as file: + out_dict = { + 'port': self.port, + 'salt': self.salt, + 'layer': self.layer, + 'server_address': self.server_address, + 'auth_key_data': + b64encode(self.auth_key.key).decode('ascii') + if self.auth_key else None + } + if self.save_entities: + out_dict['entities'] = self.entities.get_input_list() - json.dump(out_dict, file) + json.dump(out_dict, file) def delete(self): """Deletes the current session file""" @@ -149,13 +142,12 @@ class Session: Note that if confirmed=True, the sequence number will be increased by one too """ - with self._seq_no_lock: - if content_related: - result = self._sequence * 2 + 1 - self._sequence += 1 - return result - else: - return self._sequence * 2 + if content_related: + result = self._sequence * 2 + 1 + self._sequence += 1 + return result + else: + return self._sequence * 2 def get_new_msg_id(self): """Generates a new unique message ID based on the current @@ -166,11 +158,10 @@ class Session: # "message identifiers are divisible by 4" new_msg_id = (int(now) << 32) | (nanoseconds << 2) - with self._msg_id_lock: - if self._last_msg_id >= new_msg_id: - new_msg_id = self._last_msg_id + 4 + if self._last_msg_id >= new_msg_id: + new_msg_id = self._last_msg_id + 4 - self._last_msg_id = new_msg_id + self._last_msg_id = new_msg_id return new_msg_id