diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 9bc1fb1a..eddaa8fe 100755 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -1,13 +1,14 @@ import gzip from datetime import timedelta from threading import RLock, Thread -from time import sleep +from time import sleep, time import telethon.helpers as utils from telethon.crypto import AES from telethon.errors import * from telethon.tl.all_tlobjects import tlobjects from telethon.tl.types import MsgsAck +from telethon.tl.functions import PingRequest from telethon.utils import BinaryReader, BinaryWriter @@ -30,7 +31,13 @@ class MtProtoSender: self.updates_thread = Thread( target=self.updates_thread_method, name='Updates thread') - self.updates_thread_running = False + + # Signal for correct stopping update thread + self.updates_thread_stopping = False + + self.ping_interval = 60 + self.ping_time_last = time() + self.updates_thread_receiving = False # Determine whether the received acknowledge request confirm @@ -39,9 +46,15 @@ class MtProtoSender: # TODO There might be a better way to handle msgs_ack requests self.ack_requests_confirm = False + # Always running thread for periodical ping requests + # after all initialization is complete + self.updates_thread.start() + def disconnect(self): """Disconnects and **stops all the running threads** if any""" self.set_listen_for_updates(enabled=False) + # Stop thread on next loop cycle + self.updates_thread_stopping = True self.transport.close() def add_update_handler(self, handler): @@ -75,6 +88,12 @@ class MtProtoSender: # region Send and receive + def send_ping(self): + """Sends PingRequest""" + request = PingRequest(0) + self.send(request) + self.receive(request) + def send(self, request): """Sends the specified MTProtoRequest, previously sending any message which needed confirmation. This also pauses the updates thread""" @@ -340,13 +359,8 @@ class MtProtoSender: def set_listen_for_updates(self, enabled): if enabled: - if not self.updates_thread_running: - self.updates_thread_running = True - self.updates_thread_receiving = False - - self.updates_thread.start() + self.updates_thread_receiving = False else: - self.updates_thread_running = False if self.updates_thread_receiving: self.transport.cancel_receive() @@ -356,11 +370,20 @@ class MtProtoSender: # Set a reasonable timeout when checking for updates timeout = timedelta(minutes=1) - while self.updates_thread_running: + while not self.updates_thread_stopping: # Only try to receive updates if we're not waiting to receive a request if not self.waiting_receive: with self.lock: try: + now = time() + # if ping_interval seconds passed since last ping + # (or startup) - send new one + if now >= self.ping_time_last + self.ping_interval: + self.ping_time_last = now + self.send_ping() + # if sending ping - we doesn't processing any other updates + continue + self.updates_thread_receiving = True seq, body = self.transport.receive(timeout) message, remote_msg_id, remote_sequence = self.decode_msg( @@ -378,5 +401,5 @@ class MtProtoSender: # If we are here, it is because the read was cancelled # Sleep a bit just to give enough time for the other thread # to acquire the lock. No need to sleep if we're not running anymore - if self.updates_thread_running: + if not self.updates_thread_stopping: sleep(0.1)