MtProtoSender: Use threading.Event instead of boolean flags

This commit is contained in:
Dmitry D. Chernov 2017-05-10 03:41:16 +10:00 committed by Lonami
parent 7ffe2b3130
commit 048bc81b74

View File

@ -1,6 +1,6 @@
import gzip import gzip
from datetime import timedelta from datetime import timedelta
from threading import RLock, Thread from threading import Event, RLock, Thread
from time import sleep, time from time import sleep, time
import telethon.helpers as utils import telethon.helpers as utils
@ -29,7 +29,7 @@ class MtProtoSender:
# Flag used to determine whether we've received a sent request yet or not # Flag used to determine whether we've received a sent request yet or not
# We need this to avoid using the updates thread if we're waiting to read # We need this to avoid using the updates thread if we're waiting to read
self.waiting_receive = False self.waiting_receive = Event()
# Used when logging out, the only request that seems to use 'ack' requests # Used when logging out, the only request that seems to use 'ack' requests
# TODO There might be a better way to handle msgs_ack requests # TODO There might be a better way to handle msgs_ack requests
@ -38,9 +38,9 @@ class MtProtoSender:
self.ping_interval = 60 self.ping_interval = 60
self.ping_time_last = time() self.ping_time_last = time()
# Variables used to determine the status of the updates thread. # Flags used to determine the status of the updates thread.
self.updates_thread_running = False self.updates_thread_running = Event()
self.updates_thread_receiving = False self.updates_thread_receiving = Event()
# Sleep amount on "must sleep" error for the updates thread to sleep too # Sleep amount on "must sleep" error for the updates thread to sleep too
self.updates_thread_sleep = None self.updates_thread_sleep = None
@ -103,7 +103,7 @@ class MtProtoSender:
# Only cancel the receive *if* it was the # Only cancel the receive *if* it was the
# updates thread who was receiving. We do # updates thread who was receiving. We do
# not want to cancel other pending requests! # not want to cancel other pending requests!
if self.updates_thread_receiving: if self.updates_thread_receiving.is_set():
Log.i('Cancelling updates receive from send()...') Log.i('Cancelling updates receive from send()...')
self.transport.cancel_receive() self.transport.cancel_receive()
@ -111,7 +111,7 @@ class MtProtoSender:
with self.lock: with self.lock:
Log.d('send() acquired the lock') Log.d('send() acquired the lock')
# Set the flag to true so the updates thread stops trying to receive # Set the flag to true so the updates thread stops trying to receive
self.waiting_receive = True self.waiting_receive.set()
# If any message needs confirmation send an AckRequest first # If any message needs confirmation send an AckRequest first
if self.need_confirmation: if self.need_confirmation:
@ -153,7 +153,7 @@ class MtProtoSender:
Log.i('Request result received') Log.i('Request result received')
# We can now set the flag to False thus resuming the updates thread # We can now set the flag to False thus resuming the updates thread
self.waiting_receive = False self.waiting_receive.clear()
Log.d('receive() released the lock') Log.d('receive() released the lock')
# endregion # endregion
@ -388,16 +388,17 @@ class MtProtoSender:
def set_updates_thread(self, running): def set_updates_thread(self, running):
"""Sets the updates thread status (running or not)""" """Sets the updates thread status (running or not)"""
if running == self.updates_thread_running: if running == self.updates_thread_running.is_set():
return return
# Different state, update the saved value and behave as required # Different state, update the saved value and behave as required
Log.i('Changing updates thread running status to %s', running) Log.i('Changing updates thread running status to %s', running)
self.updates_thread_running = running
if running: if running:
self.updates_thread_running.set()
self.updates_thread.start() self.updates_thread.start()
else:
elif self.updates_thread_receiving: self.updates_thread_running.clear()
if self.updates_thread_receiving.is_set():
self.transport.cancel_receive() self.transport.cancel_receive()
def updates_thread_method(self): def updates_thread_method(self):
@ -406,7 +407,7 @@ class MtProtoSender:
# Set a reasonable timeout when checking for updates # Set a reasonable timeout when checking for updates
timeout = timedelta(minutes=1) timeout = timedelta(minutes=1)
while self.updates_thread_running: while self.updates_thread_running.is_set():
# Always sleep a bit before each iteration to relax the CPU, # Always sleep a bit before each iteration to relax the CPU,
# since it's possible to early 'continue' the loop to reach # since it's possible to early 'continue' the loop to reach
# the next iteration, but we still should to sleep. # the next iteration, but we still should to sleep.
@ -418,7 +419,7 @@ class MtProtoSender:
sleep(0.1 if self.on_update_handlers else 1) sleep(0.1 if self.on_update_handlers else 1)
# Only try to receive updates if we're not waiting to receive a request # Only try to receive updates if we're not waiting to receive a request
if not self.waiting_receive: if not self.waiting_receive.is_set():
with self.lock: with self.lock:
Log.d('Updates thread acquired the lock') Log.d('Updates thread acquired the lock')
try: try:
@ -434,7 +435,7 @@ class MtProtoSender:
Log.d('No updates handlers found, continuing') Log.d('No updates handlers found, continuing')
continue continue
self.updates_thread_receiving = True self.updates_thread_receiving.set()
Log.d('Trying to receive updates from the updates thread') Log.d('Trying to receive updates from the updates thread')
seq, body = self.transport.receive(timeout) seq, body = self.transport.receive(timeout)
message, remote_msg_id, remote_sequence = self.decode_msg( message, remote_msg_id, remote_sequence = self.decode_msg(
@ -473,4 +474,4 @@ class MtProtoSender:
raise raise
Log.d('Updates thread released the lock') Log.d('Updates thread released the lock')
self.updates_thread_receiving = False self.updates_thread_receiving.clear()