From 863d2e8368b0b6886896e19fbad0f61a4df8d191 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 2 Sep 2017 20:41:00 +0200 Subject: [PATCH] Make confirm_received a flag, avoid race conditions, fix bg RPCError There was a race condition between TelegramBareClient.invoke receiving part and MtProtoSender._handle_rpc_result actually reading the result after setting request.confirmed = True. The .confirmed is now a threading.Event to avoid the sleep(0.1). RPC errors have been moved inside the request so they're not raised on a background thread anymore. --- telethon/network/mtproto_sender.py | 18 ++++++++---------- telethon/telegram_bare_client.py | 5 +++-- telethon/tl/tlobject.py | 8 +++++--- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 62e473f0..8fdfdc4f 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -205,7 +205,7 @@ class MtProtoSender: if self.logging_out: self._logger.debug('Message ack confirmed a request') - r.confirm_received = True + r.confirm_received.set() return True @@ -239,7 +239,7 @@ class MtProtoSender: if r.request_msg_id == received_msg_id) self._logger.debug('Pong confirmed a request') - request.confirm_received = True + request.confirm_received.set() except StopIteration: pass return True @@ -313,8 +313,6 @@ class MtProtoSender: try: request = next(r for r in self._pending_receive if r.request_msg_id == request_id) - - request.confirm_received = True except StopIteration: request = None @@ -333,13 +331,12 @@ class MtProtoSender: self._need_confirmation.append(request_id) self._send_acknowledges() + if request: + request.error = error + request.confirm_received.set() + # else TODO Where should this error be reported? + # Read may be async. Can an error not-belong to a request? self._logger.debug('Read RPC error: %s', str(error)) - if isinstance(error, InvalidDCError): - # Must resend this request, if any - if request: - request.confirm_received = False - - raise error else: if request: self._logger.debug('Reading request response') @@ -351,6 +348,7 @@ class MtProtoSender: reader.seek(-4) request.on_response(reader) + request.confirm_received.set() return True else: # If it's really a result for RPC from previous connection diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 7584e34c..418b8c09 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -294,8 +294,9 @@ class TelegramBareClient: try: self._sender.send(request) - while not request.confirm_received: - sleep(0.1) # TODO Use a proper lock + request.confirm_received.wait() # TODO Optional timeout here? + if request.rpc_error: + raise request.rpc_error return request.result except ConnectionResetError: diff --git a/telethon/tl/tlobject.py b/telethon/tl/tlobject.py index 4c201125..66ed825f 100644 --- a/telethon/tl/tlobject.py +++ b/telethon/tl/tlobject.py @@ -1,4 +1,5 @@ from datetime import datetime, timedelta +from threading import Event class TLObject: @@ -10,7 +11,8 @@ class TLObject: self.dirty = False self.send_time = None - self.confirm_received = False + self.confirm_received = Event() + self.rpc_error = None # These should be overrode self.constructor_id = 0 @@ -23,11 +25,11 @@ class TLObject: self.sent = True def on_confirm(self): - self.confirm_received = True + self.confirm_received.set() def need_resend(self): return self.dirty or ( - self.content_related and not self.confirm_received and + self.content_related and not self.confirm_received.is_set() and datetime.now() - self.send_time > timedelta(seconds=3)) @staticmethod