diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 6202c263..9cf0e7ff 100755 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -1,6 +1,6 @@ import gzip from datetime import timedelta -from threading import RLock, Thread +from threading import Event, RLock, Thread from time import sleep, time import telethon.helpers as utils @@ -29,7 +29,7 @@ class MtProtoSender: # Flag used to determine whether we've received a sent request yet or not # We need this to avoid using the updates thread if we're waiting to read - self.waiting_receive = False + self.waiting_receive = Event() # Used when logging out, the only request that seems to use 'ack' requests # TODO There might be a better way to handle msgs_ack requests @@ -38,9 +38,9 @@ class MtProtoSender: self.ping_interval = 60 self.ping_time_last = time() - # Variables used to determine the status of the updates thread. - self.updates_thread_running = False - self.updates_thread_receiving = False + # Flags used to determine the status of the updates thread. + self.updates_thread_running = Event() + self.updates_thread_receiving = Event() # Sleep amount on "must sleep" error for the updates thread to sleep too self.updates_thread_sleep = None @@ -103,7 +103,7 @@ class MtProtoSender: # Only cancel the receive *if* it was the # updates thread who was receiving. We do # not want to cancel other pending requests! - if self.updates_thread_receiving: + if self.updates_thread_receiving.is_set(): Log.i('Cancelling updates receive from send()...') self.transport.cancel_receive() @@ -111,7 +111,7 @@ class MtProtoSender: with self.lock: Log.d('send() acquired the lock') # Set the flag to true so the updates thread stops trying to receive - self.waiting_receive = True + self.waiting_receive.set() # If any message needs confirmation send an AckRequest first if self.need_confirmation: @@ -153,7 +153,7 @@ class MtProtoSender: Log.i('Request result received') # We can now set the flag to False thus resuming the updates thread - self.waiting_receive = False + self.waiting_receive.clear() Log.d('receive() released the lock') # endregion @@ -388,17 +388,18 @@ class MtProtoSender: def set_updates_thread(self, running): """Sets the updates thread status (running or not)""" - if running == self.updates_thread_running: + if running == self.updates_thread_running.is_set(): return # Different state, update the saved value and behave as required Log.i('Changing updates thread running status to %s', running) - self.updates_thread_running = running if running: + self.updates_thread_running.set() self.updates_thread.start() - - elif self.updates_thread_receiving: - self.transport.cancel_receive() + else: + self.updates_thread_running.clear() + if self.updates_thread_receiving.is_set(): + self.transport.cancel_receive() def updates_thread_method(self): """This method will run until specified and listen for incoming updates""" @@ -406,7 +407,7 @@ class MtProtoSender: # Set a reasonable timeout when checking for updates timeout = timedelta(minutes=1) - while self.updates_thread_running: + while self.updates_thread_running.is_set(): # Always sleep a bit before each iteration to relax the CPU, # since it's possible to early 'continue' the loop to reach # the next iteration, but we still should to sleep. @@ -418,7 +419,7 @@ class MtProtoSender: sleep(0.1 if self.on_update_handlers else 1) # Only try to receive updates if we're not waiting to receive a request - if not self.waiting_receive: + if not self.waiting_receive.is_set(): with self.lock: Log.d('Updates thread acquired the lock') try: @@ -434,7 +435,7 @@ class MtProtoSender: Log.d('No updates handlers found, continuing') continue - self.updates_thread_receiving = True + self.updates_thread_receiving.set() Log.d('Trying to receive updates from the updates thread') seq, body = self.transport.receive(timeout) message, remote_msg_id, remote_sequence = self.decode_msg( @@ -473,4 +474,4 @@ class MtProtoSender: raise Log.d('Updates thread released the lock') - self.updates_thread_receiving = False + self.updates_thread_receiving.clear()