From 7399bfacd17e2fd5010e16c92a469bfbec133f7f Mon Sep 17 00:00:00 2001 From: Lonami Date: Mon, 3 Oct 2016 09:53:41 +0200 Subject: [PATCH] Implemented receive timeout (#6) and fixed error string --- telethon/errors.py | 2 +- telethon/network/mtproto_sender.py | 17 ++++++++++++----- telethon/network/tcp_client.py | 18 ++++++++++++++++-- telethon/network/tcp_transport.py | 18 ++++++++++++------ telethon/telegram_client.py | 10 ++++++---- 5 files changed, 47 insertions(+), 18 deletions(-) diff --git a/telethon/errors.py b/telethon/errors.py index 0b51e0dd..803b4620 100644 --- a/telethon/errors.py +++ b/telethon/errors.py @@ -4,7 +4,7 @@ import re class ReadCancelledError(Exception): """Occurs when a read operation was cancelled""" def __init__(self): - super().__init__(self, 'You must run `python3 tl_generator.py` first. #ReadTheDocs!') + super().__init__(self, 'The read operation was cancelled.') class InvalidParameterError(Exception): diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 91fdd9e1..309fe5bf 100755 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -1,6 +1,7 @@ import gzip from telethon.errors import * from time import sleep +from datetime import timedelta from threading import Thread, RLock import telethon.helpers as utils @@ -104,14 +105,16 @@ class MtProtoSender: # And update the saved session self.session.save() - def receive(self, request): + def receive(self, request, timeout=timedelta(seconds=5)): """Receives the specified MTProtoRequest ("fills in it" - the received data). This also restores the updates thread""" + the received data). This also restores the updates thread. + An optional timeout can be specified to cancel the operation + if no data has been read after its time delta""" with self.lock: # Don't stop trying to receive until we get the request we wanted while not request.confirm_received: - seq, body = self.transport.receive() + seq, body = self.transport.receive(timeout) message, remote_msg_id, remote_sequence = self.decode_msg(body) with BinaryReader(message) as reader: @@ -326,19 +329,23 @@ class MtProtoSender: def updates_thread_method(self): """This method will run until specified and listen for incoming updates""" + + # Set a reasonable timeout when checking for updates + timeout = timedelta(minutes=1) + while self.updates_thread_running: # Only try to receive updates if we're not waiting to receive a request if not self.waiting_receive: with self.lock: try: self.updates_thread_receiving = True - seq, body = self.transport.receive() + seq, body = self.transport.receive(timeout) message, remote_msg_id, remote_sequence = self.decode_msg(body) with BinaryReader(message) as reader: self.process_msg(remote_msg_id, remote_sequence, reader) - except ReadCancelledError: + except (ReadCancelledError, TimeoutError): pass self.updates_thread_receiving = False diff --git a/telethon/network/tcp_client.py b/telethon/network/tcp_client.py index 2a7a0491..6343c840 100755 --- a/telethon/network/tcp_client.py +++ b/telethon/network/tcp_client.py @@ -1,6 +1,7 @@ # Python rough implementation of a C# TCP client import socket import time +from datetime import datetime, timedelta from threading import Lock from telethon.errors import ReadCancelledError @@ -37,8 +38,11 @@ class TcpClient: self.socket.setblocking(True) self.socket.sendall(data) - def read(self, buffer_size): - """Reads (receives) the specified bytes from the connected peer""" + def read(self, buffer_size, timeout=timedelta(seconds=5)): + """Reads (receives) the specified bytes from the connected peer. + A timeout can be specified, which will cancel the operation if no data + has been read in the specified time. If data was read and it's waiting + for more, the timeout will NOT cancel the operation. Set to None for no timeout""" # Ensure that only one thread can receive data at once with self.lock: @@ -48,6 +52,10 @@ class TcpClient: # Set non-blocking so it can be cancelled self.socket.setblocking(False) + # Set the starting time so we can calculate whether the timeout should fire + if timeout: + start_time = datetime.now() + with BinaryWriter() as writer: while writer.written_count < buffer_size: # Only do cancel if no data was read yet @@ -66,6 +74,12 @@ class TcpClient: # There was no data available for us to read. Sleep a bit time.sleep(self.delay) + # Check if the timeout finished + if timeout: + time_passed = datetime.now() - start_time + if time_passed > timeout: + raise TimeoutError('The read operation exceeded the timeout.') + # If everything went fine, return the read bytes return writer.get_bytes() diff --git a/telethon/network/tcp_transport.py b/telethon/network/tcp_transport.py index 894a08a6..4a49ffa9 100755 --- a/telethon/network/tcp_transport.py +++ b/telethon/network/tcp_transport.py @@ -1,4 +1,6 @@ from binascii import crc32 +from datetime import timedelta + from telethon.network import TcpClient from telethon.errors import * from telethon.utils import BinaryWriter @@ -29,19 +31,23 @@ class TcpTransport: self.tcp_client.write(writer.get_bytes()) self.send_counter += 1 - def receive(self): - """Receives a TCP message (tuple(sequence number, body)) from the connected peer""" + def receive(self, timeout=timedelta(seconds=5)): + """Receives a TCP message (tuple(sequence number, body)) from the connected peer. + There is a default timeout of 5 seconds before the operation is cancelled. + Timeout can be set to None for no timeout""" # First read everything we need - packet_length_bytes = self.tcp_client.read(4) + packet_length_bytes = self.tcp_client.read(4, timeout) packet_length = int.from_bytes(packet_length_bytes, byteorder='little') - seq_bytes = self.tcp_client.read(4) + seq_bytes = self.tcp_client.read(4, timeout) seq = int.from_bytes(seq_bytes, byteorder='little') - body = self.tcp_client.read(packet_length - 12) + body = self.tcp_client.read(packet_length - 12, timeout) - checksum = int.from_bytes(self.tcp_client.read(4), byteorder='little', signed=False) + checksum = int.from_bytes(self.tcp_client.read(4, timeout), + byteorder='little', + signed=False) # Then perform the checks rv = packet_length_bytes + seq_bytes + body diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index fbb5199f..fe21302c 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -1,5 +1,5 @@ import platform -from datetime import datetime +from datetime import datetime, timedelta from hashlib import md5 from os import path, listdir from mimetypes import guess_extension, guess_type @@ -126,13 +126,15 @@ class TelegramClient: # region Telegram requests functions - def invoke(self, request): - """Invokes a MTProtoRequest (sends and receives it) and returns its result""" + def invoke(self, request, timeout=timedelta(seconds=5)): + """Invokes a MTProtoRequest (sends and receives it) and returns its result. + An optional timeout can be given to cancel the operation after the time delta. + Timeout can be set to None for no timeout""" if not issubclass(type(request), MTProtoRequest): raise ValueError('You can only invoke MtProtoRequests') self.sender.send(request) - self.sender.receive(request) + self.sender.receive(request, timeout) return request.result