From bc15b451b5db541b5132449aafc40cb47a2854bb Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 22 Sep 2017 12:20:38 +0200 Subject: [PATCH] Use a safer reconnect behaviour (respect multithread too) --- telethon/telegram_bare_client.py | 28 ++++++++++++++------ telethon/telegram_client.py | 44 ++++++++++++++++++++------------ 2 files changed, 47 insertions(+), 25 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 2f2f0df9..a403c36d 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -3,6 +3,7 @@ from datetime import timedelta from hashlib import md5 from io import BytesIO from os import path +from threading import RLock from . import helpers as utils from .crypto import rsa, CdnDecrypter @@ -81,6 +82,10 @@ class TelegramBareClient: self._logger = logging.getLogger(__name__) + # Two threads may be calling reconnect() when the connection is lost, + # we only want one to actually perform the reconnection. + self._connect_lock = RLock() + # Cache "exported" senders 'dc_id: TelegramBareClient' and # their corresponding sessions not to recreate them all # the time since it's a (somewhat expensive) process. @@ -177,22 +182,29 @@ class TelegramBareClient: self._sender.disconnect() def reconnect(self, new_dc=None): - """Disconnects and connects again (effectively reconnecting). + """If 'new_dc' is not set, only a call to .connect() will be made + since it's assumed that the connection has been lost and the + library is reconnecting. - If 'new_dc' is not None, the current authorization key is - removed, the DC used is switched, and a new connection is made. + If 'new_dc' is set, the client is first disconnected from the + current data center, clears the auth key for the old DC, and + connects to the new data center. """ - self.disconnect() - - if new_dc is not None: + if new_dc is None: + # Assume we are disconnected due to some error, so connect again + with self._connect_lock: + # Another thread may have connected again, so check that first + if not self.is_connected(): + self.connect() + else: + self.disconnect() self.session.auth_key = None # Force creating new auth_key dc = self._get_dc(new_dc) ip = dc.ip_address self._sender.connection.ip = self.session.server_address = ip self._sender.connection.port = self.session.port = dc.port self.session.save() - - self.connect() + self.connect() # endregion diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 7baac88e..61dbff26 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -126,6 +126,15 @@ class TelegramClient(TelegramBareClient): self._phone_code_hash = None self._phone = None + # Despite the state of the real connection, keep track of whether + # the user has explicitly called .connect() or .disconnect() here. + # This information is required by the read thread, who will be the + # one attempting to reconnect on the background *while* the user + # doesn't explicitly call .disconnect(), thus telling it to stop + # retrying. The main thread, knowing there is a background thread + # attempting reconnection as soon as it happens, will just sleep. + self._user_connected = False + # Save whether the user is authorized here (a.k.a. logged in) self._authorized = False @@ -167,6 +176,7 @@ class TelegramClient(TelegramBareClient): if not ok: return False + self._user_connected = True try: self.sync_updates() self._set_connected_and_authorized() @@ -178,8 +188,7 @@ class TelegramClient(TelegramBareClient): def disconnect(self): """Disconnects from the Telegram server and stops all the spawned threads""" - # The existing thread will close eventually, since it's - # only running while the MtProtoSender.is_connected() + self._user_connected = False self._recv_thread = None # This will trigger a "ConnectionResetError", usually, the background @@ -255,9 +264,20 @@ class TelegramClient(TelegramBareClient): 'attempting to reconnect at DC {}' .format(e.new_dc)) + # TODO What happens with the background thread here? + # For normal use cases, this won't happen, because this will only + # be on the very first connection (not authorized, not running), + # but may be an issue for people who actually travel? self.reconnect(new_dc=e.new_dc) return self.invoke(request) + except ConnectionResetError: + if self._connect_lock.locked(): + # We are connecting and we don't want to reconnect there... + raise + while self._user_connected and not self.reconnect(): + pass # Retry forever until we finally can send the request + # Let people use client(SomeRequest()) instead client.invoke(...) __call__ = invoke @@ -1031,7 +1051,7 @@ class TelegramClient(TelegramBareClient): # # This way, sending and receiving will be completely independent. def _recv_thread_impl(self): - while self._sender.is_connected(): + while self._user_connected: try: if datetime.now() > self._last_ping + self._ping_delay: self._sender.send(PingRequest( @@ -1040,24 +1060,14 @@ class TelegramClient(TelegramBareClient): self._last_ping = datetime.now() self._sender.receive(update_state=self.updates) - except AttributeError: - # 'NoneType' object has no attribute 'receive'. - # The only moment when this can happen is reconnection - # was triggered from another thread and the ._sender - # was set to None, so close this thread and exit by return. - self._recv_thread = None - return except TimeoutError: # No problem. pass except ConnectionResetError: - if self._recv_thread is not None: - # Do NOT attempt reconnecting unless the connection was - # finished by the user -> ._recv_thread is None - self._logger.debug('Server disconnected us. Reconnecting...') - self._recv_thread = None # Not running anymore - self.reconnect() - return + self._logger.debug('Server disconnected us. Reconnecting...') + while self._user_connected and not self.reconnect(): + pass # Retry forever, this is instant messaging + except Exception as e: # Unknown exception, pass it to the main thread self.updates.set_error(e)