Merge pull request #25 from strayge/master

Add periodic pings (fixes #24)
This commit is contained in:
Lonami 2017-02-19 15:34:23 +01:00 committed by GitHub
commit 0e60159460
2 changed files with 34 additions and 11 deletions

View File

@ -1,13 +1,14 @@
import gzip import gzip
from datetime import timedelta from datetime import timedelta
from threading import RLock, Thread from threading import RLock, Thread
from time import sleep from time import sleep, time
import telethon.helpers as utils import telethon.helpers as utils
from telethon.crypto import AES from telethon.crypto import AES
from telethon.errors import * from telethon.errors import *
from telethon.tl.all_tlobjects import tlobjects from telethon.tl.all_tlobjects import tlobjects
from telethon.tl.types import MsgsAck from telethon.tl.types import MsgsAck
from telethon.tl.functions import PingRequest
from telethon.utils import BinaryReader, BinaryWriter from telethon.utils import BinaryReader, BinaryWriter
@ -30,7 +31,13 @@ class MtProtoSender:
self.updates_thread = Thread( self.updates_thread = Thread(
target=self.updates_thread_method, name='Updates thread') target=self.updates_thread_method, name='Updates thread')
self.updates_thread_running = False
# Signal for correct stopping update thread
self.updates_thread_stopping = False
self.ping_interval = 60
self.ping_time_last = time()
self.updates_thread_receiving = False self.updates_thread_receiving = False
# Determine whether the received acknowledge request confirm # Determine whether the received acknowledge request confirm
@ -39,9 +46,15 @@ class MtProtoSender:
# TODO There might be a better way to handle msgs_ack requests # TODO There might be a better way to handle msgs_ack requests
self.ack_requests_confirm = False self.ack_requests_confirm = False
# Always running thread for periodical ping requests
# after all initialization is complete
self.updates_thread.start()
def disconnect(self): def disconnect(self):
"""Disconnects and **stops all the running threads** if any""" """Disconnects and **stops all the running threads** if any"""
self.set_listen_for_updates(enabled=False) self.set_listen_for_updates(enabled=False)
# Stop thread on next loop cycle
self.updates_thread_stopping = True
self.transport.close() self.transport.close()
def add_update_handler(self, handler): def add_update_handler(self, handler):
@ -75,6 +88,12 @@ class MtProtoSender:
# region Send and receive # region Send and receive
def send_ping(self):
"""Sends PingRequest"""
request = PingRequest(0)
self.send(request)
self.receive(request)
def send(self, request): def send(self, request):
"""Sends the specified MTProtoRequest, previously sending any message """Sends the specified MTProtoRequest, previously sending any message
which needed confirmation. This also pauses the updates thread""" which needed confirmation. This also pauses the updates thread"""
@ -340,13 +359,8 @@ class MtProtoSender:
def set_listen_for_updates(self, enabled): def set_listen_for_updates(self, enabled):
if enabled: if enabled:
if not self.updates_thread_running:
self.updates_thread_running = True
self.updates_thread_receiving = False self.updates_thread_receiving = False
self.updates_thread.start()
else: else:
self.updates_thread_running = False
if self.updates_thread_receiving: if self.updates_thread_receiving:
self.transport.cancel_receive() self.transport.cancel_receive()
@ -356,11 +370,20 @@ class MtProtoSender:
# Set a reasonable timeout when checking for updates # Set a reasonable timeout when checking for updates
timeout = timedelta(minutes=1) timeout = timedelta(minutes=1)
while self.updates_thread_running: while not self.updates_thread_stopping:
# Only try to receive updates if we're not waiting to receive a request # Only try to receive updates if we're not waiting to receive a request
if not self.waiting_receive: if not self.waiting_receive:
with self.lock: with self.lock:
try: try:
now = time()
# if ping_interval seconds passed since last ping
# (or startup) - send new one
if now >= self.ping_time_last + self.ping_interval:
self.ping_time_last = now
self.send_ping()
# if sending ping - we doesn't processing any other updates
continue
self.updates_thread_receiving = True self.updates_thread_receiving = True
seq, body = self.transport.receive(timeout) seq, body = self.transport.receive(timeout)
message, remote_msg_id, remote_sequence = self.decode_msg( message, remote_msg_id, remote_sequence = self.decode_msg(
@ -378,5 +401,5 @@ class MtProtoSender:
# If we are here, it is because the read was cancelled # If we are here, it is because the read was cancelled
# Sleep a bit just to give enough time for the other thread # Sleep a bit just to give enough time for the other thread
# to acquire the lock. No need to sleep if we're not running anymore # to acquire the lock. No need to sleep if we're not running anymore
if self.updates_thread_running: if not self.updates_thread_stopping:
sleep(0.1) sleep(0.1)

View File

@ -32,7 +32,7 @@ class NetworkTests(unittest.TestCase):
client.connect('localhost', port) client.connect('localhost', port)
client.write(msg) client.write(msg)
assert msg == client.read( assert msg == client.read(
16), 'Read message does not equal sent message' 15), 'Read message does not equal sent message'
client.close() client.close()
@staticmethod @staticmethod