diff --git a/telethon/extensions/threaded_tcp_client.py b/telethon/extensions/threaded_tcp_client.py deleted file mode 100644 index d97ce6ee..00000000 --- a/telethon/extensions/threaded_tcp_client.py +++ /dev/null @@ -1,94 +0,0 @@ -import socket -import time -from datetime import datetime, timedelta -from io import BytesIO, BufferedWriter -from threading import Event, Lock, Thread, Condition - -from ..errors import ReadCancelledError - - -class ThreadedTcpClient: - """The main difference with the TcpClient class is that this one - will spawn a secondary thread that will be constantly reading - from the network and putting everything on another buffer. - """ - def __init__(self, proxy=None): - self.connected = False - self._proxy = proxy - self._recreate_socket() - - # Support for multi-threading advantages and safety - self.cancelled = Event() # Has the read operation been cancelled? - self.delay = 0.1 # Read delay when there was no data available - self._lock = Lock() - - self._buffer = [] - self._read_thread = Thread(target=self._reading_thread, daemon=True) - self._cv = Condition() # Condition Variable - - def _recreate_socket(self): - if self._proxy is None: - self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - else: - import socks - self._socket = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM) - if type(self._proxy) is dict: - self._socket.set_proxy(**self._proxy) - else: # tuple, list, etc. - self._socket.set_proxy(*self._proxy) - - def connect(self, ip, port, timeout): - """Connects to the specified IP and port number. - 'timeout' must be given in seconds - """ - if not self.connected: - self._socket.settimeout(timeout) - self._socket.connect((ip, port)) - self._socket.setblocking(False) - self.connected = True - - def close(self): - """Closes the connection""" - if self.connected: - self._socket.shutdown(socket.SHUT_RDWR) - self._socket.close() - self.connected = False - self._recreate_socket() - - def write(self, data): - """Writes (sends) the specified bytes to the connected peer""" - self._socket.sendall(data) - - def read(self, size, timeout=timedelta(seconds=5)): - """Reads (receives) a whole block of 'size bytes - from the connected peer. - - A timeout can be specified, which will cancel the operation if - no data has been read in the specified time. If data was read - and it's waiting for more, the timeout will NOT cancel the - operation. Set to None for no timeout - """ - with self._cv: - print('wait for...') - self._cv.wait_for(lambda: len(self._buffer) >= size, timeout=timeout.seconds) - print('got', size) - result, self._buffer = self._buffer[:size], self._buffer[size:] - return result - - def _reading_thread(self): - while True: - partial = self._socket.recv(4096) - if len(partial) == 0: - self.connected = False - raise ConnectionResetError( - 'The server has closed the connection.') - - with self._cv: - print('extended', len(partial)) - self._buffer.extend(partial) - self._cv.notify() - - def cancel_read(self): - """Cancels the read operation IF it hasn't yet - started, raising a ReadCancelledError""" - self.cancelled.set()