mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2024-11-13 04:56:35 +03:00
Slightly improve the updates thread (also easier to understand)
This commit is contained in:
parent
ceca636bb1
commit
426b09aec0
|
@ -29,53 +29,41 @@ class MtProtoSender:
|
||||||
# We need this to avoid using the updates thread if we're waiting to read
|
# We need this to avoid using the updates thread if we're waiting to read
|
||||||
self.waiting_receive = False
|
self.waiting_receive = False
|
||||||
|
|
||||||
self.updates_thread = Thread(
|
|
||||||
target=self.updates_thread_method, name='Updates thread')
|
|
||||||
|
|
||||||
# Signal for correct stopping update thread
|
|
||||||
self.updates_thread_stopping = False
|
|
||||||
|
|
||||||
self.ping_interval = 60
|
|
||||||
self.ping_time_last = time()
|
|
||||||
|
|
||||||
self.updates_thread_receiving = False
|
|
||||||
|
|
||||||
# Determine whether the received acknowledge request confirm
|
# Determine whether the received acknowledge request confirm
|
||||||
# our requests or not. This is not desired until we initialize
|
# our requests or not. This is not desired until we initialize
|
||||||
# our connection, because it breaks things when we call InvokeWithLayer
|
# our connection, because it breaks things when we call InvokeWithLayer
|
||||||
# TODO There might be a better way to handle msgs_ack requests
|
# TODO There might be a better way to handle msgs_ack requests
|
||||||
self.ack_requests_confirm = False
|
self.ack_requests_confirm = False
|
||||||
|
|
||||||
# Always running thread for periodical ping requests
|
self.ping_interval = 60
|
||||||
# after all initialization is complete
|
self.ping_time_last = time()
|
||||||
self.updates_thread.start()
|
|
||||||
|
# Variables used to determine the status of the updates thread.
|
||||||
|
self.updates_thread_running = False
|
||||||
|
self.updates_thread_receiving = False
|
||||||
|
|
||||||
|
self.updates_thread = Thread(
|
||||||
|
target=self.updates_thread_method, name='Updates thread')
|
||||||
|
|
||||||
|
# The "updates" thread must also be running to make periodic ping requests.
|
||||||
|
self.set_updates_thread(running=True)
|
||||||
|
|
||||||
def disconnect(self):
|
def disconnect(self):
|
||||||
"""Disconnects and **stops all the running threads** if any"""
|
"""Disconnects and **stops all the running threads** if any"""
|
||||||
self.set_listen_for_updates(enabled=False)
|
self.set_updates_thread(running=False)
|
||||||
# Stop thread on next loop cycle
|
|
||||||
self.updates_thread_stopping = True
|
|
||||||
self.transport.close()
|
self.transport.close()
|
||||||
|
|
||||||
def add_update_handler(self, handler):
|
def add_update_handler(self, handler):
|
||||||
"""Adds an update handler (a method with one argument, the received
|
"""Adds an update handler (a method with one argument, the received
|
||||||
TLObject) that is fired when there are updates available"""
|
TLObject) that is fired when there are updates available"""
|
||||||
|
|
||||||
first_handler = not self.on_update_handlers
|
# The updates thread is already running for periodic ping requests,
|
||||||
|
# so there is no need to start it when adding update handlers.
|
||||||
self.on_update_handlers.append(handler)
|
self.on_update_handlers.append(handler)
|
||||||
|
|
||||||
# If this is the first added handler,
|
|
||||||
# we must start the thread to receive updates
|
|
||||||
if first_handler:
|
|
||||||
self.set_listen_for_updates(enabled=True)
|
|
||||||
|
|
||||||
def remove_update_handler(self, handler):
|
def remove_update_handler(self, handler):
|
||||||
self.on_update_handlers.remove(handler)
|
self.on_update_handlers.remove(handler)
|
||||||
|
|
||||||
# If there are no more update handlers, stop the thread
|
|
||||||
if not self.on_update_handlers:
|
|
||||||
self.set_listen_for_updates(False)
|
|
||||||
|
|
||||||
def generate_sequence(self, confirmed):
|
def generate_sequence(self, confirmed):
|
||||||
"""Generates the next sequence number, based on whether it
|
"""Generates the next sequence number, based on whether it
|
||||||
was confirmed yet or not"""
|
was confirmed yet or not"""
|
||||||
|
@ -357,12 +345,18 @@ class MtProtoSender:
|
||||||
|
|
||||||
# endregion
|
# endregion
|
||||||
|
|
||||||
def set_listen_for_updates(self, enabled):
|
def set_updates_thread(self, running):
|
||||||
if enabled:
|
"""Sets the updates thread status (running or not)"""
|
||||||
self.updates_thread_receiving = False
|
if running == self.updates_thread_running:
|
||||||
else:
|
return
|
||||||
if self.updates_thread_receiving:
|
|
||||||
self.transport.cancel_receive()
|
# Different state, update the saved value and behave as required
|
||||||
|
self.updates_thread_running = running
|
||||||
|
if running:
|
||||||
|
self.updates_thread.start()
|
||||||
|
|
||||||
|
elif self.updates_thread_receiving:
|
||||||
|
self.transport.cancel_receive()
|
||||||
|
|
||||||
def updates_thread_method(self):
|
def updates_thread_method(self):
|
||||||
"""This method will run until specified and listen for incoming updates"""
|
"""This method will run until specified and listen for incoming updates"""
|
||||||
|
@ -370,18 +364,19 @@ class MtProtoSender:
|
||||||
# Set a reasonable timeout when checking for updates
|
# Set a reasonable timeout when checking for updates
|
||||||
timeout = timedelta(minutes=1)
|
timeout = timedelta(minutes=1)
|
||||||
|
|
||||||
while not self.updates_thread_stopping:
|
while self.updates_thread_running:
|
||||||
# Only try to receive updates if we're not waiting to receive a request
|
# Only try to receive updates if we're not waiting to receive a request
|
||||||
if not self.waiting_receive:
|
if not self.waiting_receive:
|
||||||
with self.lock:
|
with self.lock:
|
||||||
try:
|
try:
|
||||||
now = time()
|
now = time()
|
||||||
# if ping_interval seconds passed since last ping
|
# If ping_interval seconds passed since last ping, send a new one
|
||||||
# (or startup) - send new one
|
|
||||||
if now >= self.ping_time_last + self.ping_interval:
|
if now >= self.ping_time_last + self.ping_interval:
|
||||||
self.ping_time_last = now
|
self.ping_time_last = now
|
||||||
self.send_ping()
|
self.send_ping()
|
||||||
# if sending ping - we doesn't processing any other updates
|
|
||||||
|
# Exit the loop if we're not expecting to receive any updates
|
||||||
|
if not self.on_update_handlers:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
self.updates_thread_receiving = True
|
self.updates_thread_receiving = True
|
||||||
|
@ -401,5 +396,6 @@ class MtProtoSender:
|
||||||
# If we are here, it is because the read was cancelled
|
# If we are here, it is because the read was cancelled
|
||||||
# Sleep a bit just to give enough time for the other thread
|
# Sleep a bit just to give enough time for the other thread
|
||||||
# to acquire the lock. No need to sleep if we're not running anymore
|
# to acquire the lock. No need to sleep if we're not running anymore
|
||||||
if not self.updates_thread_stopping:
|
if self.updates_thread_running:
|
||||||
sleep(0.1)
|
# Longer sleep if we're not expecting any update (only pings)
|
||||||
|
sleep(0.1 if self.on_update_handlers else 1)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user