mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-01-24 08:14:14 +03:00
Make confirm_received a flag, avoid race conditions, fix bg RPCError
There was a race condition between TelegramBareClient.invoke receiving part and MtProtoSender._handle_rpc_result actually reading the result after setting request.confirmed = True. The .confirmed is now a threading.Event to avoid the sleep(0.1). RPC errors have been moved inside the request so they're not raised on a background thread anymore.
This commit is contained in:
parent
b908296efa
commit
863d2e8368
|
@ -205,7 +205,7 @@ class MtProtoSender:
|
|||
|
||||
if self.logging_out:
|
||||
self._logger.debug('Message ack confirmed a request')
|
||||
r.confirm_received = True
|
||||
r.confirm_received.set()
|
||||
|
||||
return True
|
||||
|
||||
|
@ -239,7 +239,7 @@ class MtProtoSender:
|
|||
if r.request_msg_id == received_msg_id)
|
||||
|
||||
self._logger.debug('Pong confirmed a request')
|
||||
request.confirm_received = True
|
||||
request.confirm_received.set()
|
||||
except StopIteration: pass
|
||||
|
||||
return True
|
||||
|
@ -313,8 +313,6 @@ class MtProtoSender:
|
|||
try:
|
||||
request = next(r for r in self._pending_receive
|
||||
if r.request_msg_id == request_id)
|
||||
|
||||
request.confirm_received = True
|
||||
except StopIteration:
|
||||
request = None
|
||||
|
||||
|
@ -333,13 +331,12 @@ class MtProtoSender:
|
|||
self._need_confirmation.append(request_id)
|
||||
self._send_acknowledges()
|
||||
|
||||
if request:
|
||||
request.error = error
|
||||
request.confirm_received.set()
|
||||
# else TODO Where should this error be reported?
|
||||
# Read may be async. Can an error not-belong to a request?
|
||||
self._logger.debug('Read RPC error: %s', str(error))
|
||||
if isinstance(error, InvalidDCError):
|
||||
# Must resend this request, if any
|
||||
if request:
|
||||
request.confirm_received = False
|
||||
|
||||
raise error
|
||||
else:
|
||||
if request:
|
||||
self._logger.debug('Reading request response')
|
||||
|
@ -351,6 +348,7 @@ class MtProtoSender:
|
|||
reader.seek(-4)
|
||||
request.on_response(reader)
|
||||
|
||||
request.confirm_received.set()
|
||||
return True
|
||||
else:
|
||||
# If it's really a result for RPC from previous connection
|
||||
|
|
|
@ -294,8 +294,9 @@ class TelegramBareClient:
|
|||
|
||||
try:
|
||||
self._sender.send(request)
|
||||
while not request.confirm_received:
|
||||
sleep(0.1) # TODO Use a proper lock
|
||||
request.confirm_received.wait() # TODO Optional timeout here?
|
||||
if request.rpc_error:
|
||||
raise request.rpc_error
|
||||
return request.result
|
||||
|
||||
except ConnectionResetError:
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
from datetime import datetime, timedelta
|
||||
from threading import Event
|
||||
|
||||
|
||||
class TLObject:
|
||||
|
@ -10,7 +11,8 @@ class TLObject:
|
|||
|
||||
self.dirty = False
|
||||
self.send_time = None
|
||||
self.confirm_received = False
|
||||
self.confirm_received = Event()
|
||||
self.rpc_error = None
|
||||
|
||||
# These should be overrode
|
||||
self.constructor_id = 0
|
||||
|
@ -23,11 +25,11 @@ class TLObject:
|
|||
self.sent = True
|
||||
|
||||
def on_confirm(self):
|
||||
self.confirm_received = True
|
||||
self.confirm_received.set()
|
||||
|
||||
def need_resend(self):
|
||||
return self.dirty or (
|
||||
self.content_related and not self.confirm_received and
|
||||
self.content_related and not self.confirm_received.is_set() and
|
||||
datetime.now() - self.send_time > timedelta(seconds=3))
|
||||
|
||||
@staticmethod
|
||||
|
|
Loading…
Reference in New Issue
Block a user