Remove unused TcpClien.read(timeout=...) param, change other accessors

This commit is contained in:
Lonami Exo 2017-09-02 19:33:42 +02:00
parent 36f51e1e3f
commit b908296efa
5 changed files with 25 additions and 61 deletions

View File

@ -10,9 +10,15 @@ from ..errors import ReadCancelledError
class TcpClient: class TcpClient:
def __init__(self, proxy=None): def __init__(self, proxy=None, timeout=timedelta(seconds=5)):
self._proxy = proxy self._proxy = proxy
self._socket = None self._socket = None
if isinstance(timeout, timedelta):
self._timeout = timeout.seconds
elif isinstance(timeout, int) or isinstance(timeout, float):
self._timeout = float(timeout)
else:
raise ValueError('Invalid timeout type', type(timeout))
def _recreate_socket(self, mode): def _recreate_socket(self, mode):
if self._proxy is None: if self._proxy is None:
@ -25,7 +31,7 @@ class TcpClient:
else: # tuple, list, etc. else: # tuple, list, etc.
self._socket.set_proxy(*self._proxy) self._socket.set_proxy(*self._proxy)
def connect(self, ip, port, timeout): def connect(self, ip, port):
"""Connects to the specified IP and port number. """Connects to the specified IP and port number.
'timeout' must be given in seconds 'timeout' must be given in seconds
""" """
@ -36,7 +42,7 @@ class TcpClient:
mode, address = socket.AF_INET, (ip, port) mode, address = socket.AF_INET, (ip, port)
self._recreate_socket(mode) self._recreate_socket(mode)
self._socket.settimeout(timeout) self._socket.settimeout(self._timeout)
self._socket.connect(address) self._socket.connect(address)
def _get_connected(self): def _get_connected(self):
@ -59,10 +65,6 @@ class TcpClient:
def write(self, data): def write(self, data):
"""Writes (sends) the specified bytes to the connected peer""" """Writes (sends) the specified bytes to the connected peer"""
# TODO Check whether the code using this has multiple threads calling
# .write() on the very same socket. If so, have two locks, one for
# .write() and another for .read().
#
# TODO Timeout may be an issue when sending the data, Changed in v3.5: # TODO Timeout may be an issue when sending the data, Changed in v3.5:
# The socket timeout is now the maximum total duration to send all data. # The socket timeout is now the maximum total duration to send all data.
try: try:
@ -71,7 +73,7 @@ class TcpClient:
self.close() self.close()
raise raise
def read(self, size, timeout=timedelta(seconds=5)): def read(self, size):
"""Reads (receives) a whole block of 'size bytes """Reads (receives) a whole block of 'size bytes
from the connected peer. from the connected peer.

View File

@ -22,13 +22,12 @@ class Connection:
self.ip = ip self.ip = ip
self.port = port self.port = port
self._mode = mode self._mode = mode
self.timeout = timeout
self._send_counter = 0 self._send_counter = 0
self._aes_encrypt, self._aes_decrypt = None, None self._aes_encrypt, self._aes_decrypt = None, None
# TODO Rename "TcpClient" as some sort of generic socket? # TODO Rename "TcpClient" as some sort of generic socket?
self.conn = TcpClient(proxy=proxy) self.conn = TcpClient(proxy=proxy, timeout=timeout)
# Sending messages # Sending messages
if mode == 'tcp_full': if mode == 'tcp_full':
@ -53,8 +52,7 @@ class Connection:
def connect(self): def connect(self):
self._send_counter = 0 self._send_counter = 0
self.conn.connect(self.ip, self.port, self.conn.connect(self.ip, self.port)
timeout=round(self.timeout.seconds))
if self._mode == 'tcp_abridged': if self._mode == 'tcp_abridged':
self.conn.write(b'\xef') self.conn.write(b'\xef')
@ -102,13 +100,12 @@ class Connection:
# region Receive message implementations # region Receive message implementations
def recv(self, **kwargs): def recv(self):
"""Receives and unpacks a message""" """Receives and unpacks a message"""
# TODO Don't ignore kwargs['timeout']?
# Default implementation is just an error # Default implementation is just an error
raise ValueError('Invalid connection mode specified: ' + self._mode) raise ValueError('Invalid connection mode specified: ' + self._mode)
def _recv_tcp_full(self, **kwargs): def _recv_tcp_full(self):
packet_length_bytes = self.read(4) packet_length_bytes = self.read(4)
packet_length = int.from_bytes(packet_length_bytes, 'little') packet_length = int.from_bytes(packet_length_bytes, 'little')
@ -124,10 +121,10 @@ class Connection:
return body return body
def _recv_intermediate(self, **kwargs): def _recv_intermediate(self):
return self.read(int.from_bytes(self.read(4), 'little')) return self.read(int.from_bytes(self.read(4), 'little'))
def _recv_abridged(self, **kwargs): def _recv_abridged(self):
length = int.from_bytes(self.read(1), 'little') length = int.from_bytes(self.read(1), 'little')
if length >= 127: if length >= 127:
length = int.from_bytes(self.read(3) + b'\0', 'little') length = int.from_bytes(self.read(3) + b'\0', 'little')
@ -180,11 +177,11 @@ class Connection:
raise ValueError('Invalid connection mode specified: ' + self._mode) raise ValueError('Invalid connection mode specified: ' + self._mode)
def _read_plain(self, length): def _read_plain(self, length):
return self.conn.read(length, timeout=self.timeout) return self.conn.read(length)
def _read_obfuscated(self, length): def _read_obfuscated(self, length):
return self._aes_decrypt.encrypt( return self._aes_decrypt.encrypt(
self.conn.read(length, timeout=self.timeout) self.conn.read(length)
) )
# endregion # endregion

View File

@ -1,5 +1,4 @@
import gzip import gzip
from datetime import timedelta
from threading import RLock, Thread from threading import RLock, Thread
from .. import helpers as utils from .. import helpers as utils
@ -91,15 +90,11 @@ class MtProtoSender:
# No problem. # No problem.
pass pass
def _receive_message(self, **kwargs): def _receive_message(self):
"""Receives a single message from the connected endpoint. """Receives a single message from the connected endpoint."""
An optional named parameter 'timeout' can be specified if
one desires to override 'self.connection.timeout'.
"""
# TODO Don't ignore updates # TODO Don't ignore updates
self._logger.debug('Receiving a message...') self._logger.debug('Receiving a message...')
body = self.connection.recv(**kwargs) body = self.connection.recv()
message, remote_msg_id, remote_seq = self._decode_msg(body) message, remote_msg_id, remote_seq = self._decode_msg(body)
with BinaryReader(message) as reader: with BinaryReader(message) as reader:

View File

@ -204,30 +204,6 @@ class TelegramBareClient:
# endregion # endregion
# region Properties
def set_timeout(self, timeout):
if timeout is None:
self._timeout = None
elif isinstance(timeout, int) or isinstance(timeout, float):
self._timeout = timedelta(seconds=timeout)
elif isinstance(timeout, timedelta):
self._timeout = timeout
else:
raise ValueError(
'{} is not a valid type for a timeout'.format(type(timeout))
)
if self._sender:
self._sender.transport.timeout = self._timeout
def get_timeout(self):
return self._timeout
timeout = property(get_timeout, set_timeout)
# endregion
# region Working with different Data Centers # region Working with different Data Centers
def _get_dc(self, dc_id, ipv6=False, cdn=False): def _get_dc(self, dc_id, ipv6=False, cdn=False):

View File

@ -129,8 +129,6 @@ class TelegramClient(TelegramBareClient):
not the same as authenticating the desired user itself, which not the same as authenticating the desired user itself, which
may require a call (or several) to 'sign_in' for the first time. may require a call (or several) to 'sign_in' for the first time.
The specified timeout will be used on internal .invoke()'s.
*args will be ignored. *args will be ignored.
""" """
return super().connect() return super().connect()
@ -151,7 +149,7 @@ class TelegramClient(TelegramBareClient):
# region Working with different connections # region Working with different connections
def create_new_connection(self, on_dc=None): def create_new_connection(self, on_dc=None, timeout=timedelta(seconds=5)):
"""Creates a new connection which can be used in parallel """Creates a new connection which can be used in parallel
with the original TelegramClient. A TelegramBareClient with the original TelegramClient. A TelegramBareClient
will be returned already connected, and the caller is will be returned already connected, and the caller is
@ -165,7 +163,9 @@ class TelegramClient(TelegramBareClient):
""" """
if on_dc is None: if on_dc is None:
client = TelegramBareClient( client = TelegramBareClient(
self.session, self.api_id, self.api_hash, proxy=self.proxy) self.session, self.api_id, self.api_hash,
proxy=self.proxy, timeout=timeout
)
client.connect() client.connect()
else: else:
client = self._get_exported_client(on_dc, bypass_cache=True) client = self._get_exported_client(on_dc, bypass_cache=True)
@ -179,9 +179,6 @@ class TelegramClient(TelegramBareClient):
def invoke(self, request, *args): def invoke(self, request, *args):
"""Invokes (sends) a MTProtoRequest and returns (receives) its result. """Invokes (sends) a MTProtoRequest and returns (receives) its result.
An optional timeout can be specified to cancel the operation if no
result is received within such time, or None to disable any timeout.
*args will be ignored. *args will be ignored.
""" """
try: try:
@ -921,9 +918,6 @@ class TelegramClient(TelegramBareClient):
def _updates_thread_method(self): def _updates_thread_method(self):
"""This method will run until specified and listen for incoming updates""" """This method will run until specified and listen for incoming updates"""
# Set a reasonable timeout when checking for updates
timeout = timedelta(minutes=1)
while self._updates_thread_running.is_set(): while self._updates_thread_running.is_set():
# Always sleep a bit before each iteration to relax the CPU, # Always sleep a bit before each iteration to relax the CPU,
# since it's possible to early 'continue' the loop to reach # since it's possible to early 'continue' the loop to reach