From e035939aa20d69f3b8414dd7810498ebc022fd3f Mon Sep 17 00:00:00 2001 From: Lonami Date: Sun, 2 Oct 2016 13:30:14 +0200 Subject: [PATCH] Attempt at fixing #5 (RPCError) and updated README Now RPC results can be received from the updates thread, as long as they are errors. This, however, should not happen! A recursive lock is now used (and released on every method, rather than only on the `.receive()` one) --- README.md | 1 - telethon/network/mtproto_sender.py | 92 ++++++++++++++++-------------- 2 files changed, 49 insertions(+), 44 deletions(-) diff --git a/README.md b/README.md index 184490db..bb6a5575 100755 --- a/README.md +++ b/README.md @@ -14,7 +14,6 @@ The project's **core only** is based on TLSharp, a C# Telegram client implementa - [Tips for porting Telethon](#tips-for-porting-telethon) - [Code generator limitations](#code-generator-limitations) - [Updating the `scheme.tl`](#updating-the-schemetl) -- [Plans for the future](#plans-for-the-future) ## Why Telethon? > Why should I bother with Telethon? There are more mature projects already, such as diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 7c1a5edd..91fdd9e1 100755 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -1,7 +1,7 @@ import gzip from telethon.errors import * from time import sleep -from threading import Thread, Lock +from threading import Thread, RLock import telethon.helpers as utils from telethon.crypto import AES @@ -19,8 +19,12 @@ class MtProtoSender: self.need_confirmation = [] # Message IDs that need confirmation self.on_update_handlers = [] - # Store a Lock instance to make this class safely multi-threaded - self.lock = Lock() + # Store an RLock instance to make this class safely multi-threaded + self.lock = RLock() + + # Flag used to determine whether we've received a sent request yet or not + # We need this to avoid using the updates thread if we're waiting to read + self.waiting_receive = False self.updates_thread = Thread(target=self.updates_thread_method, name='Updates thread') self.updates_thread_running = False @@ -68,7 +72,7 @@ class MtProtoSender: # region Send and receive - def send(self, request, resend=False): + def send(self, request): """Sends the specified MTProtoRequest, previously sending any message which needed confirmation. This also pauses the updates thread""" @@ -78,34 +82,33 @@ class MtProtoSender: if self.updates_thread_receiving: self.transport.cancel_receive() - # Now only us can be using this method if we're not resending - if not resend: - self.lock.acquire() + # Now only us can be using this method + with self.lock: + # Set the flag to true so the updates thread stops trying to receive + self.waiting_receive = True - # If any message needs confirmation send an AckRequest first - if self.need_confirmation: - msgs_ack = MsgsAck(self.need_confirmation) + # If any message needs confirmation send an AckRequest first + if self.need_confirmation: + msgs_ack = MsgsAck(self.need_confirmation) + with BinaryWriter() as writer: + msgs_ack.on_send(writer) + self.send_packet(writer.get_bytes(), msgs_ack) + + del self.need_confirmation[:] + + # Finally send our packed request with BinaryWriter() as writer: - msgs_ack.on_send(writer) - self.send_packet(writer.get_bytes(), msgs_ack) + request.on_send(writer) + self.send_packet(writer.get_bytes(), request) - del self.need_confirmation[:] - - # Finally send our packed request - with BinaryWriter() as writer: - request.on_send(writer) - self.send_packet(writer.get_bytes(), request) - - # And update the saved session - self.session.save() - # Don't resume the updates thread yet, - # since every send() is preceded by a receive() + # And update the saved session + self.session.save() def receive(self, request): """Receives the specified MTProtoRequest ("fills in it" the received data). This also restores the updates thread""" - try: + with self.lock: # Don't stop trying to receive until we get the request we wanted while not request.confirm_received: seq, body = self.transport.receive() @@ -114,10 +117,8 @@ class MtProtoSender: with BinaryReader(message) as reader: self.process_msg(remote_msg_id, remote_sequence, reader, request) - finally: - # Once we are done trying to get our request, - # restore the updates thread and release the lock - self.lock.release() + # We can now set the flag to False thus resuming the updates thread + self.waiting_receive = False # endregion @@ -252,7 +253,7 @@ class MtProtoSender: raise ValueError('Tried to handle a bad server salt with no request specified') # Resend - self.send(request, resend=True) + self.send(request) return True @@ -265,19 +266,19 @@ class MtProtoSender: raise BadMessageError(error_code) def handle_rpc_result(self, msg_id, sequence, reader, request): - if not request: - raise ValueError('RPC results should only happen after a request was sent') - code = reader.read_int(signed=False) request_id = reader.read_long(signed=False) inner_code = reader.read_int(signed=False) - if request_id == request.msg_id: + if request and request_id == request.msg_id: request.confirm_received = True if inner_code == 0x2144ca19: # RPC Error error = RPCError(code=reader.read_int(), message=reader.tgread_string()) if error.must_resend: + if not request: + raise ValueError('The previously sent request must be resent. ' + 'However, no request was previously sent (called from updates thread).') request.confirm_received = False if error.message.startswith('FLOOD_WAIT_'): @@ -290,6 +291,9 @@ class MtProtoSender: else: raise error else: + if not request: + raise ValueError('Cannot receive a request from inside an RPC result from the updates thread.') + if inner_code == 0x3072cfa1: # GZip packed unpacked_data = gzip.decompress(reader.tgread_bytes()) with BinaryReader(unpacked_data) as compressed_reader: @@ -323,19 +327,21 @@ class MtProtoSender: def updates_thread_method(self): """This method will run until specified and listen for incoming updates""" while self.updates_thread_running: - with self.lock: - try: - self.updates_thread_receiving = True - seq, body = self.transport.receive() - message, remote_msg_id, remote_sequence = self.decode_msg(body) + # Only try to receive updates if we're not waiting to receive a request + if not self.waiting_receive: + with self.lock: + try: + self.updates_thread_receiving = True + seq, body = self.transport.receive() + message, remote_msg_id, remote_sequence = self.decode_msg(body) - with BinaryReader(message) as reader: - self.process_msg(remote_msg_id, remote_sequence, reader) + with BinaryReader(message) as reader: + self.process_msg(remote_msg_id, remote_sequence, reader) - except ReadCancelledError: - pass + except ReadCancelledError: + pass - self.updates_thread_receiving = False + self.updates_thread_receiving = False # If we are here, it is because the read was cancelled # Sleep a bit just to give enough time for the other thread