From c6645a555d405aa18987da4a35324cc102e1412c Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 9 Jun 2017 11:42:39 +0200 Subject: [PATCH] Prefer BufferedWriter over BinaryWriter for socket operations --- telethon/network/tcp_client.py | 41 ++++++++++++++++++--------------- telethon/utils/binary_writer.py | 5 ++-- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/telethon/network/tcp_client.py b/telethon/network/tcp_client.py index e25c7ae2..a20ac948 100644 --- a/telethon/network/tcp_client.py +++ b/telethon/network/tcp_client.py @@ -2,10 +2,10 @@ import socket import time from datetime import datetime, timedelta +from io import BytesIO, BufferedWriter from threading import Event, Lock from ..errors import ReadCancelledError -from ..utils import BinaryWriter class TcpClient: @@ -58,40 +58,44 @@ class TcpClient: 'The server has closed the connection.') total_sent += sent - def read(self, buffer_size, timeout=timedelta(seconds=5)): - """Reads (receives) the specified bytes from the connected peer. - A timeout can be specified, which will cancel the operation if no data - has been read in the specified time. If data was read and it's waiting - for more, the timeout will NOT cancel the operation. Set to None for no timeout""" + def read(self, size, timeout=timedelta(seconds=5)): + """Reads (receives) a whole block of 'size bytes + from the connected peer. + + A timeout can be specified, which will cancel the operation if + no data has been read in the specified time. If data was read + and it's waiting for more, the timeout will NOT cancel the + operation. Set to None for no timeout + """ # Ensure that only one thread can receive data at once with self._lock: # Ensure it is not cancelled at first, so we can enter the loop self.cancelled.clear() - # Set the starting time so we can calculate whether the timeout should fire - if timeout: - start_time = datetime.now() + # Set the starting time so we can + # calculate whether the timeout should fire + start_time = datetime.now() if timeout else None - with BinaryWriter() as buffer: - while buffer.written_count < buffer_size: + with BufferedWriter(BytesIO(), buffer_size=size) as buffer: + bytes_left = size + while bytes_left != 0: # Only do cancel if no data was read yet # Otherwise, carry on reading and finish - if self.cancelled.is_set() and buffer.written_count == 0: + if self.cancelled.is_set() and bytes_left == size: raise ReadCancelledError() try: - # When receiving from the socket, we may not receive all the data at once - # This is why we need to keep checking to make sure that we receive it all - left_count = buffer_size - buffer.written_count - partial = self._socket.recv(left_count) + partial = self._socket.recv(bytes_left) if len(partial) == 0: raise ConnectionResetError( 'The server has closed the connection (recv() returned 0 bytes).') + buffer.write(partial) + bytes_left -= len(partial) except BlockingIOError as error: - # There was no data available for us to read. Sleep a bit + # No data available yet, sleep a bit time.sleep(self.delay) # Check if the timeout finished @@ -102,7 +106,8 @@ class TcpClient: 'The read operation exceeded the timeout.') from error # If everything went fine, return the read bytes - return buffer.get_bytes() + buffer.flush() + return buffer.raw.getvalue() def cancel_read(self): """Cancels the read operation IF it hasn't yet diff --git a/telethon/utils/binary_writer.py b/telethon/utils/binary_writer.py index 00f37e42..5b363297 100644 --- a/telethon/utils/binary_writer.py +++ b/telethon/utils/binary_writer.py @@ -12,8 +12,7 @@ class BinaryWriter: if not stream: stream = BytesIO() - self.stream = stream - self.writer = BufferedWriter(self.stream) + self.writer = BufferedWriter(stream) self.written_count = 0 # region Writing @@ -126,7 +125,7 @@ class BinaryWriter: """Get the current bytes array content from the buffer, optionally flushing first""" if flush: self.writer.flush() - return self.stream.getvalue() + return self.writer.raw.getvalue() def get_written_bytes_count(self): """Gets the count of bytes written in the buffer.