From d81dd055e6d54c23f093a52151110b49f7552e64 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 6 Jan 2018 23:43:40 +0100 Subject: [PATCH] Remove temporary connections and use a lock again These seem to be the reason for missing some updates (#237) --- telethon/network/mtproto_sender.py | 11 ++-- telethon/telegram_bare_client.py | 88 +++++++++--------------------- 2 files changed, 31 insertions(+), 68 deletions(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 82a378ba..0e960181 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -5,6 +5,7 @@ encrypting every packet, and relies on a valid AuthKey in the used Session. import gzip import logging import struct +from threading import Lock from .. import helpers as utils from ..crypto import AES @@ -53,6 +54,9 @@ class MtProtoSender: # Requests (as msg_id: Message) sent waiting to be received self._pending_receive = {} + # Multithreading + self._send_lock = Lock() + def connect(self): """Connects to the server.""" self.connection.connect(self.session.server_address, self.session.port) @@ -71,10 +75,6 @@ class MtProtoSender: self._need_confirmation.clear() self._clear_all_pending() - def clone(self): - """Creates a copy of this MtProtoSender as a new connection.""" - return MtProtoSender(self.session, self.connection.clone()) - # region Send and receive def send(self, *requests): @@ -156,7 +156,8 @@ class MtProtoSender: :param message: the TLMessage to be sent. """ - self.connection.send(utils.pack_message(self.session, message)) + with self._send_lock: + self.connection.send(utils.pack_message(self.session, message)) def _decode_msg(self, body): """ diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index ab6d3bbb..429a4306 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -163,11 +163,6 @@ class TelegramBareClient: self._spawn_read_thread = spawn_read_thread self._recv_thread = None - # Identifier of the main thread (the one that called .connect()). - # This will be used to create new connections from any other thread, - # so that requests can be sent in parallel. - self._main_thread_ident = None - # Default PingRequest delay self._last_ping = datetime.now() self._ping_delay = timedelta(minutes=1) @@ -198,7 +193,6 @@ class TelegramBareClient: __log__.info('Connecting to %s:%d...', self.session.server_address, self.session.port) - self._main_thread_ident = threading.get_ident() self._background_error = None # Clear previous errors try: @@ -431,6 +425,9 @@ class TelegramBareClient: x.content_related for x in requests): raise TypeError('You can only invoke requests, not types!') + if self._background_error: + raise self._background_error + # For logging purposes if len(requests) == 1: which = type(requests[0]).__name__ @@ -439,66 +436,31 @@ class TelegramBareClient: len(requests), [type(x).__name__ for x in requests]) # Determine the sender to be used (main or a new connection) - on_main_thread = threading.get_ident() == self._main_thread_ident - if on_main_thread or self._on_read_thread(): - __log__.debug('Invoking %s from main thread', which) - sender = self._sender - update_state = self.updates - else: - __log__.debug('Invoking %s from background thread. ' - 'Creating temporary connection', which) + __log__.debug('Invoking %s', which) - sender = self._sender.clone() - sender.connect() - # We're on another connection, Telegram will resend all the - # updates that we haven't acknowledged (potentially entering - # an infinite loop if we're calling this in response to an - # update event, as it would be received again and again). So - # to avoid this we will simply not process updates on these - # new temporary connections, as they will be sent and later - # acknowledged over the main connection. - update_state = None + call_receive = self._recv_thread is None or self._reconnect_lock.locked() + for retry in range(retries): + result = self._invoke(call_receive, *requests) + if result is not None: + return result - # We should call receive from this thread if there's no background - # thread reading or if the server disconnected us and we're trying - # to reconnect. This is because the read thread may either be - # locked also trying to reconnect or we may be said thread already. - call_receive = not on_main_thread or self._recv_thread is None \ - or self._reconnect_lock.locked() - try: - for attempt in range(retries): - if self._background_error and on_main_thread: - raise self._background_error + __log__.warning('Invoking %s failed %d times, ' + 'reconnecting and retrying', + [str(x) for x in requests], retry + 1) + sleep(1) + # The ReadThread has priority when attempting reconnection, + # since this thread is constantly running while __call__ is + # only done sometimes. Here try connecting only once/retry. + if not self._reconnect_lock.locked(): + with self._reconnect_lock: + self._reconnect() - result = self._invoke( - sender, call_receive, update_state, *requests - ) - if result is not None: - return result - - __log__.warning('Invoking %s failed %d times, ' - 'reconnecting and retrying', - [str(x) for x in requests], attempt + 1) - sleep(1) - # The ReadThread has priority when attempting reconnection, - # since this thread is constantly running while __call__ is - # only done sometimes. Here try connecting only once/retry. - if sender == self._sender: - if not self._reconnect_lock.locked(): - with self._reconnect_lock: - self._reconnect() - else: - sender.connect() - - raise RuntimeError('Number of retries reached 0.') - finally: - if sender != self._sender: - sender.disconnect() # Close temporary connections + raise RuntimeError('Number of retries reached 0.') # Let people use client.invoke(SomeRequest()) instead client(...) invoke = __call__ - def _invoke(self, sender, call_receive, update_state, *requests): + def _invoke(self, call_receive, *requests): try: # Ensure that we start with no previous errors (i.e. resending) for x in requests: @@ -523,7 +485,7 @@ class TelegramBareClient: self._wrap_init_connection(GetConfigRequest()) ) - sender.send(*requests) + self._sender.send(*requests) if not call_receive: # TODO This will be slightly troublesome if we allow @@ -532,11 +494,11 @@ class TelegramBareClient: # in which case a Lock would be required for .receive(). for x in requests: x.confirm_received.wait( - sender.connection.get_timeout() + self._sender.connection.get_timeout() ) else: while not all(x.confirm_received.is_set() for x in requests): - sender.receive(update_state=update_state) + self._sender.receive(update_state=self.updates) except BrokenAuthKeyError: __log__.error('Authorization key seems broken and was invalid!') @@ -578,7 +540,7 @@ class TelegramBareClient: # be on the very first connection (not authorized, not running), # but may be an issue for people who actually travel? self._reconnect(new_dc=e.new_dc) - return self._invoke(sender, call_receive, update_state, *requests) + return self._invoke(call_receive, *requests) except ServerError as e: # Telegram is having some issues, just retry