Telethon/telethon/extensions/tcp_client.py
Lonami Exo cc280a129d Remove TcpClient.write/read shared locks
Since they were shared between write and read, and now the read
is done constantly on a separate thread, the read lock would
cause the write method to be locked and not functional at all
2017-09-02 18:49:29 +02:00

143 lines
5.1 KiB
Python

# Python rough implementation of a C# TCP client
import socket
import time
from datetime import datetime, timedelta
from io import BytesIO, BufferedWriter
from threading import Event, Lock
import errno
from ..errors import ReadCancelledError
class TcpClient:
def __init__(self, proxy=None):
self._proxy = proxy
self._socket = None
# Support for multi-threading advantages and safety
self.cancelled = Event() # Has the read operation been cancelled?
self.delay = 0.1 # Read delay when there was no data available
self._lock = Lock()
def _recreate_socket(self, mode):
if self._proxy is None:
self._socket = socket.socket(mode, socket.SOCK_STREAM)
else:
import socks
self._socket = socks.socksocket(mode, socket.SOCK_STREAM)
if type(self._proxy) is dict:
self._socket.set_proxy(**self._proxy)
else: # tuple, list, etc.
self._socket.set_proxy(*self._proxy)
def connect(self, ip, port, timeout):
"""Connects to the specified IP and port number.
'timeout' must be given in seconds
"""
if not self.connected:
if ':' in ip: # IPv6
self._recreate_socket(socket.AF_INET6)
self._socket.settimeout(timeout)
self._socket.connect((ip, port, 0, 0))
else:
self._recreate_socket(socket.AF_INET)
self._socket.settimeout(timeout)
self._socket.connect((ip, port))
self._socket.setblocking(False)
def _get_connected(self):
return self._socket is not None
connected = property(fget=_get_connected)
def close(self):
"""Closes the connection"""
try:
if self.connected:
self._socket.shutdown(socket.SHUT_RDWR)
self._socket.close()
except OSError as e:
if e.errno != errno.ENOTCONN:
raise
finally:
self._socket = None
def write(self, data):
"""Writes (sends) the specified bytes to the connected peer"""
# TODO Check whether the code using this has multiple threads calling
# .write() on the very same socket. If so, have two locks, one for
# .write() and another for .read().
try:
view = memoryview(data)
total_sent, total = 0, len(data)
while total_sent < total:
try:
sent = self._socket.send(view[total_sent:])
if sent == 0:
self.close()
raise ConnectionResetError(
'The server has closed the connection.')
total_sent += sent
except BlockingIOError:
time.sleep(self.delay)
except BrokenPipeError:
self.close()
raise
def read(self, size, timeout=timedelta(seconds=5)):
"""Reads (receives) a whole block of 'size bytes
from the connected peer.
A timeout can be specified, which will cancel the operation if
no data has been read in the specified time. If data was read
and it's waiting for more, the timeout will NOT cancel the
operation. Set to None for no timeout
"""
# Ensure it is not cancelled at first, so we can enter the loop
self.cancelled.clear()
# Set the starting time so we can
# calculate whether the timeout should fire
start_time = datetime.now() if timeout is not None else None
with BufferedWriter(BytesIO(), buffer_size=size) as buffer:
bytes_left = size
while bytes_left != 0:
# Only do cancel if no data was read yet
# Otherwise, carry on reading and finish
if self.cancelled.is_set() and bytes_left == size:
raise ReadCancelledError()
try:
partial = self._socket.recv(bytes_left)
if len(partial) == 0:
self.close()
raise ConnectionResetError(
'The server has closed the connection.')
buffer.write(partial)
bytes_left -= len(partial)
except BlockingIOError as error:
# No data available yet, sleep a bit
time.sleep(self.delay)
# Check if the timeout finished
if timeout is not None:
time_passed = datetime.now() - start_time
if time_passed > timeout:
raise TimeoutError(
'The read operation exceeded the timeout.') from error
# If everything went fine, return the read bytes
buffer.flush()
return buffer.raw.getvalue()
def cancel_read(self):
"""Cancels the read operation IF it hasn't yet
started, raising a ReadCancelledError"""
self.cancelled.set()