From 36f51e1e3f0ae2d0cf51e6f1cc41c5d9cce1afbc Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 2 Sep 2017 19:14:11 +0200 Subject: [PATCH] Remove ability to TcpClient.cancel_read() This simplifies the process of sending and receiving data, and makes use of Python's socket.settimeout instead a hand-crafted version with a sort-of arbitrary self.delay = 0.1 (seconds), which should improve the speed of the method --- telethon/extensions/tcp_client.py | 80 +++++++------------------------ telethon/network/connection.py | 5 -- 2 files changed, 18 insertions(+), 67 deletions(-) diff --git a/telethon/extensions/tcp_client.py b/telethon/extensions/tcp_client.py index 795b714d..24916b6c 100644 --- a/telethon/extensions/tcp_client.py +++ b/telethon/extensions/tcp_client.py @@ -14,11 +14,6 @@ class TcpClient: self._proxy = proxy self._socket = None - # Support for multi-threading advantages and safety - self.cancelled = Event() # Has the read operation been cancelled? - self.delay = 0.1 # Read delay when there was no data available - self._lock = Lock() - def _recreate_socket(self, mode): if self._proxy is None: self._socket = socket.socket(mode, socket.SOCK_STREAM) @@ -36,14 +31,13 @@ class TcpClient: """ if not self.connected: if ':' in ip: # IPv6 - self._recreate_socket(socket.AF_INET6) - self._socket.settimeout(timeout) - self._socket.connect((ip, port, 0, 0)) + mode, address = socket.AF_INET6, (ip, port, 0, 0) else: - self._recreate_socket(socket.AF_INET) - self._socket.settimeout(timeout) - self._socket.connect((ip, port)) - self._socket.setblocking(False) + mode, address = socket.AF_INET, (ip, port) + + self._recreate_socket(mode) + self._socket.settimeout(timeout) + self._socket.connect(address) def _get_connected(self): return self._socket is not None @@ -68,20 +62,11 @@ class TcpClient: # TODO Check whether the code using this has multiple threads calling # .write() on the very same socket. If so, have two locks, one for # .write() and another for .read(). + # + # TODO Timeout may be an issue when sending the data, Changed in v3.5: + # The socket timeout is now the maximum total duration to send all data. try: - view = memoryview(data) - total_sent, total = 0, len(data) - while total_sent < total: - try: - sent = self._socket.send(view[total_sent:]) - if sent == 0: - self.close() - raise ConnectionResetError( - 'The server has closed the connection.') - total_sent += sent - - except BlockingIOError: - time.sleep(self.delay) + self._socket.sendall(data) except BrokenPipeError: self.close() raise @@ -95,48 +80,19 @@ class TcpClient: and it's waiting for more, the timeout will NOT cancel the operation. Set to None for no timeout """ - - # Ensure it is not cancelled at first, so we can enter the loop - self.cancelled.clear() - - # Set the starting time so we can - # calculate whether the timeout should fire - start_time = datetime.now() if timeout is not None else None - + # TODO Remove the timeout from this method, always use previous one with BufferedWriter(BytesIO(), buffer_size=size) as buffer: bytes_left = size while bytes_left != 0: - # Only do cancel if no data was read yet - # Otherwise, carry on reading and finish - if self.cancelled.is_set() and bytes_left == size: - raise ReadCancelledError() + partial = self._socket.recv(bytes_left) + if len(partial) == 0: + self.close() + raise ConnectionResetError( + 'The server has closed the connection.') - try: - partial = self._socket.recv(bytes_left) - if len(partial) == 0: - self.close() - raise ConnectionResetError( - 'The server has closed the connection.') - - buffer.write(partial) - bytes_left -= len(partial) - - except BlockingIOError as error: - # No data available yet, sleep a bit - time.sleep(self.delay) - - # Check if the timeout finished - if timeout is not None: - time_passed = datetime.now() - start_time - if time_passed > timeout: - raise TimeoutError( - 'The read operation exceeded the timeout.') from error + buffer.write(partial) + bytes_left -= len(partial) # If everything went fine, return the read bytes buffer.flush() return buffer.raw.getvalue() - - def cancel_read(self): - """Cancels the read operation IF it hasn't yet - started, raising a ReadCancelledError""" - self.cancelled.set() diff --git a/telethon/network/connection.py b/telethon/network/connection.py index af3ab817..19dad787 100644 --- a/telethon/network/connection.py +++ b/telethon/network/connection.py @@ -96,11 +96,6 @@ class Connection: def close(self): self.conn.close() - def cancel_receive(self): - """Cancels (stops) trying to receive from the - remote peer and raises a ReadCancelledError""" - self.conn.cancel_read() - def get_client_delay(self): """Gets the client read delay""" return self.conn.delay