Telethon/telethon/extensions/threaded_tcp_client.py

95 lines
3.4 KiB
Python
Raw Normal View History

import socket
import time
from datetime import datetime, timedelta
from io import BytesIO, BufferedWriter
from threading import Event, Lock, Thread, Condition
from ..errors import ReadCancelledError
class ThreadedTcpClient:
"""The main difference with the TcpClient class is that this one
will spawn a secondary thread that will be constantly reading
from the network and putting everything on another buffer.
"""
def __init__(self, proxy=None):
self.connected = False
self._proxy = proxy
self._recreate_socket()
# Support for multi-threading advantages and safety
self.cancelled = Event() # Has the read operation been cancelled?
self.delay = 0.1 # Read delay when there was no data available
self._lock = Lock()
self._buffer = []
self._read_thread = Thread(target=self._reading_thread, daemon=True)
self._cv = Condition() # Condition Variable
def _recreate_socket(self):
if self._proxy is None:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
else:
import socks
self._socket = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM)
if type(self._proxy) is dict:
self._socket.set_proxy(**self._proxy)
else: # tuple, list, etc.
self._socket.set_proxy(*self._proxy)
def connect(self, ip, port, timeout):
"""Connects to the specified IP and port number.
'timeout' must be given in seconds
"""
if not self.connected:
self._socket.settimeout(timeout)
self._socket.connect((ip, port))
self._socket.setblocking(False)
self.connected = True
def close(self):
"""Closes the connection"""
if self.connected:
self._socket.shutdown(socket.SHUT_RDWR)
self._socket.close()
self.connected = False
self._recreate_socket()
def write(self, data):
"""Writes (sends) the specified bytes to the connected peer"""
self._socket.sendall(data)
def read(self, size, timeout=timedelta(seconds=5)):
"""Reads (receives) a whole block of 'size bytes
from the connected peer.
A timeout can be specified, which will cancel the operation if
no data has been read in the specified time. If data was read
and it's waiting for more, the timeout will NOT cancel the
operation. Set to None for no timeout
"""
with self._cv:
print('wait for...')
self._cv.wait_for(lambda: len(self._buffer) >= size, timeout=timeout.seconds)
print('got', size)
result, self._buffer = self._buffer[:size], self._buffer[size:]
return result
def _reading_thread(self):
while True:
partial = self._socket.recv(4096)
if len(partial) == 0:
self.connected = False
raise ConnectionResetError(
'The server has closed the connection.')
with self._cv:
print('extended', len(partial))
self._buffer.extend(partial)
self._cv.notify()
def cancel_read(self):
"""Cancels the read operation IF it hasn't yet
started, raising a ReadCancelledError"""
self.cancelled.set()