From f5f0c8455357a833db73fd7746cf122f3c1eb6dc Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 15 Jan 2022 13:33:50 +0100 Subject: [PATCH] Completely overhaul connections and transports Reduce abstraction leaks. Now the transport can hold any state, rather than just the tag. It's also responsible to initialize on the first connection, and they can be cleanly reset. asyncio connections are no longer used, in favour of raw sockets, which should avoid some annoyances. For the time being, more obscure transport modes have been removed, as well as proxy support, until further cleaning is done. --- telethon/_client/telegrambaseclient.py | 54 +-- telethon/_client/telegramclient.py | 1 - telethon/_misc/enums.py | 2 - telethon/_network/__init__.py | 9 +- telethon/_network/connection.py | 61 +++ telethon/_network/connection/__init__.py | 12 - telethon/_network/connection/connection.py | 426 ------------------ telethon/_network/connection/http.py | 39 -- telethon/_network/connection/tcpabridged.py | 33 -- telethon/_network/connection/tcpfull.py | 43 -- .../_network/connection/tcpintermediate.py | 46 -- telethon/_network/connection/tcpmtproxy.py | 152 ------- telethon/_network/connection/tcpobfuscated.py | 62 --- telethon/_network/transports/__init__.py | 4 + telethon/_network/transports/abridged.py | 43 ++ telethon/_network/transports/full.py | 41 ++ telethon/_network/transports/intermediate.py | 29 ++ telethon/_network/transports/transport.py | 17 + 18 files changed, 221 insertions(+), 853 deletions(-) create mode 100644 telethon/_network/connection.py delete mode 100644 telethon/_network/connection/__init__.py delete mode 100644 telethon/_network/connection/connection.py delete mode 100644 telethon/_network/connection/http.py delete mode 100644 telethon/_network/connection/tcpabridged.py delete mode 100644 telethon/_network/connection/tcpfull.py delete mode 100644 telethon/_network/connection/tcpintermediate.py delete mode 100644 telethon/_network/connection/tcpmtproxy.py delete mode 100644 telethon/_network/connection/tcpobfuscated.py create mode 100644 telethon/_network/transports/__init__.py create mode 100644 telethon/_network/transports/abridged.py create mode 100644 telethon/_network/transports/full.py create mode 100644 telethon/_network/transports/intermediate.py create mode 100644 telethon/_network/transports/transport.py diff --git a/telethon/_client/telegrambaseclient.py b/telethon/_client/telegrambaseclient.py index 85512cfb..e4906fbf 100644 --- a/telethon/_client/telegrambaseclient.py +++ b/telethon/_client/telegrambaseclient.py @@ -11,7 +11,7 @@ import ipaddress from .. import version, __name__ as __base_name__, _tl from .._crypto import rsa from .._misc import markdown, entitycache, statecache, enums, helpers -from .._network import MTProtoSender, Connection, ConnectionTcpFull, connection as conns +from .._network import MTProtoSender, Connection, transports from .._sessions import Session, SQLiteSession, MemorySession from .._sessions.types import DataCenter, SessionState @@ -70,7 +70,7 @@ def init( api_id: int, api_hash: str, *, - connection: 'typing.Type[Connection]' = ConnectionTcpFull, + connection: 'typing.Type[Connection]' = (), use_ipv6: bool = False, proxy: typing.Union[tuple, dict] = None, local_addr: typing.Union[str, tuple] = None, @@ -194,15 +194,12 @@ def init( # For now the current default remains TCP Full; may change to be "smart" if proxies are specified connection = enums.ConnectionMode.FULL - self._connection = { - enums.ConnectionMode.FULL: conns.ConnectionTcpFull, - enums.ConnectionMode.INTERMEDIATE: conns.ConnectionTcpIntermediate, - enums.ConnectionMode.ABRIDGED: conns.ConnectionTcpAbridged, - enums.ConnectionMode.OBFUSCATED: conns.ConnectionTcpObfuscated, - enums.ConnectionMode.HTTP: conns.ConnectionHttp, + self._transport = { + enums.ConnectionMode.FULL: transports.Full(), + enums.ConnectionMode.INTERMEDIATE: transports.Intermediate(), + enums.ConnectionMode.ABRIDGED: transports.Abridged(), }[enums.parse_conn_mode(connection)] - init_proxy = None if not issubclass(self._connection, conns.TcpMTProxy) else \ - _tl.InputClientProxy(*self._connection.address_info(proxy)) + init_proxy = None # Used on connection. Capture the variables in a lambda since # exporting clients need to create this InvokeWithLayer. @@ -334,13 +331,12 @@ async def connect(self: 'TelegramClient') -> None: # Use known key, if any self._sender.auth_key.key = dc.auth - if not await self._sender.connect(self._connection( - str(ipaddress.ip_address((self._use_ipv6 and dc.ipv6) or dc.ipv4)), - dc.port, - dc.id, + if not await self._sender.connect(Connection( + ip=str(ipaddress.ip_address((self._use_ipv6 and dc.ipv6) or dc.ipv4)), + port=dc.port, + transport=self._transport.recreate_fresh(), loggers=self._log, - proxy=self._proxy, - local_addr=self._local_addr + local_addr=self._local_addr, )): # We don't want to init or modify anything if we were already connected return @@ -396,8 +392,7 @@ async def disconnect(self: 'TelegramClient'): return await _disconnect_coro(self) def set_proxy(self: 'TelegramClient', proxy: typing.Union[tuple, dict]): - init_proxy = None if not issubclass(self._connection, conns.TcpMTProxy) else \ - _tl.InputClientProxy(*self._connection.address_info(proxy)) + init_proxy = None self._init_request.proxy = init_proxy self._proxy = proxy @@ -481,13 +476,12 @@ async def _create_exported_sender(self: 'TelegramClient', dc_id): # If one were to do that, Telegram would reset the connection # with no further clues. sender = MTProtoSender(loggers=self._log) - await sender.connect(self._connection( - str(ipaddress.ip_address((self._use_ipv6 and dc.ipv6) or dc.ipv4)), - dc.port, - dc.id, + await self._sender.connect(Connection( + ip=str(ipaddress.ip_address((self._use_ipv6 and dc.ipv6) or dc.ipv4)), + port=dc.port, + transport=self._transport.recreate_fresh(), loggers=self._log, - proxy=self._proxy, - local_addr=self._local_addr + local_addr=self._local_addr, )) self._log[__name__].info('Exporting auth for new borrowed sender in %s', dc) auth = await self(_tl.fn.auth.ExportAuthorization(dc_id)) @@ -516,13 +510,13 @@ async def _borrow_exported_sender(self: 'TelegramClient', dc_id): elif state.need_connect(): dc = self._all_dcs[dc_id] - await sender.connect(self._connection( - str(ipaddress.ip_address((self._use_ipv6 and dc.ipv6) or dc.ipv4)), - dc.port, - dc.id, + + await self._sender.connect(Connection( + ip=str(ipaddress.ip_address((self._use_ipv6 and dc.ipv6) or dc.ipv4)), + port=dc.port, + transport=self._transport.recreate_fresh(), loggers=self._log, - proxy=self._proxy, - local_addr=self._local_addr + local_addr=self._local_addr, )) state.add_borrow() diff --git a/telethon/_client/telegramclient.py b/telethon/_client/telegramclient.py index 9755d437..bb21a416 100644 --- a/telethon/_client/telegramclient.py +++ b/telethon/_client/telegramclient.py @@ -10,7 +10,6 @@ from . import ( ) from .. import version, _tl from ..types import _custom -from .._network import ConnectionTcpFull from .._events.common import EventBuilder, EventCommon from .._misc import enums diff --git a/telethon/_misc/enums.py b/telethon/_misc/enums.py index 2d5742aa..283030f3 100644 --- a/telethon/_misc/enums.py +++ b/telethon/_misc/enums.py @@ -15,8 +15,6 @@ class ConnectionMode(Enum): FULL = 'full' INTERMEDIATE = 'intermediate' ABRIDGED = 'abridged' - OBFUSCATED = 'obfuscated' - HTTP = 'http' class Participant(Enum): diff --git a/telethon/_network/__init__.py b/telethon/_network/__init__.py index 0b985d58..164acc4e 100644 --- a/telethon/_network/__init__.py +++ b/telethon/_network/__init__.py @@ -5,10 +5,5 @@ with Telegram's servers and the protocol used (TCP full, abridged, etc.). from .mtprotoplainsender import MTProtoPlainSender from .authenticator import do_authentication from .mtprotosender import MTProtoSender -from .connection import ( - Connection, - ConnectionTcpFull, ConnectionTcpIntermediate, ConnectionTcpAbridged, - ConnectionTcpObfuscated, ConnectionTcpMTProxyAbridged, - ConnectionTcpMTProxyIntermediate, - ConnectionTcpMTProxyRandomizedIntermediate, ConnectionHttp, TcpMTProxy -) +from .connection import Connection +from . import transports diff --git a/telethon/_network/connection.py b/telethon/_network/connection.py new file mode 100644 index 00000000..26674aa2 --- /dev/null +++ b/telethon/_network/connection.py @@ -0,0 +1,61 @@ +import asyncio +import socket + +from .transports.transport import Transport + + +CHUNK_SIZE = 32 * 1024 + + +# TODO ideally the mtproto impl would also be sans-io, but that's less pressing +class Connection: + def __init__(self, ip, port, *, transport: Transport, loggers, local_addr=None): + self._ip = ip + self._port = port + self._log = loggers[__name__] + self._local_addr = local_addr + + self._sock = None + self._in_buffer = bytearray() + self._transport = transport + + async def connect(self, timeout=None, ssl=None): + """ + Establishes a connection with the server. + """ + loop = asyncio.get_event_loop() + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setblocking(False) + if self._local_addr: + sock.bind(self._local_addr) + + await asyncio.wait_for(loop.sock_connect(sock, (self._ip, self._port)), timeout) + self._sock = sock + + async def disconnect(self): + self._sock.close() + self._sock = None + + async def send(self, data): + if not self._sock: + raise ConnectionError('not connected') + + loop = asyncio.get_event_loop() + await loop.sock_sendall(self._sock, self._transport.pack(data)) + + async def recv(self): + if not self._sock: + raise ConnectionError('not connected') + + loop = asyncio.get_event_loop() + while True: + try: + length, body = self._transport.unpack(self._in_buffer) + del self._in_buffer[:length] + return body + except EOFError: + self._in_buffer += await loop.sock_recv(self._sock, CHUNK_SIZE) + + def __str__(self): + return f'{self._ip}:{self._port}/{self._transport.__class__.__name__}' diff --git a/telethon/_network/connection/__init__.py b/telethon/_network/connection/__init__.py deleted file mode 100644 index 88771866..00000000 --- a/telethon/_network/connection/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -from .connection import Connection -from .tcpfull import ConnectionTcpFull -from .tcpintermediate import ConnectionTcpIntermediate -from .tcpabridged import ConnectionTcpAbridged -from .tcpobfuscated import ConnectionTcpObfuscated -from .tcpmtproxy import ( - TcpMTProxy, - ConnectionTcpMTProxyAbridged, - ConnectionTcpMTProxyIntermediate, - ConnectionTcpMTProxyRandomizedIntermediate -) -from .http import ConnectionHttp diff --git a/telethon/_network/connection/connection.py b/telethon/_network/connection/connection.py deleted file mode 100644 index abf398ee..00000000 --- a/telethon/_network/connection/connection.py +++ /dev/null @@ -1,426 +0,0 @@ -import abc -import asyncio -import socket -import sys - -try: - import ssl as ssl_mod -except ImportError: - ssl_mod = None - -try: - import python_socks -except ImportError: - python_socks = None - -from ...errors._custom import InvalidChecksumError -from ..._misc import helpers - - -class Connection(abc.ABC): - """ - The `Connection` class is a wrapper around ``asyncio.open_connection``. - - Subclasses will implement different transport modes as atomic operations, - which this class eases doing since the exposed interface simply puts and - gets complete data payloads to and from queues. - - The only error that will raise from send and receive methods is - ``ConnectionError``, which will raise when attempting to send if - the client is disconnected (includes remote disconnections). - """ - # this static attribute should be redefined by `Connection` subclasses and - # should be one of `PacketCodec` implementations - packet_codec = None - - def __init__(self, ip, port, dc_id, *, loggers, proxy=None, local_addr=None): - self._ip = ip - self._port = port - self._dc_id = dc_id # only for MTProxy, it's an abstraction leak - self._log = loggers[__name__] - self._proxy = proxy - self._local_addr = local_addr - self._reader = None - self._writer = None - self._connected = False - self._send_task = None - self._recv_task = None - self._codec = None - self._obfuscation = None # TcpObfuscated and MTProxy - self._send_queue = asyncio.Queue(1) - self._recv_queue = asyncio.Queue(1) - - @staticmethod - def _wrap_socket_ssl(sock): - if ssl_mod is None: - raise RuntimeError( - 'Cannot use proxy that requires SSL ' - 'without the SSL module being available' - ) - - return ssl_mod.wrap_socket( - sock, - do_handshake_on_connect=True, - ssl_version=ssl_mod.PROTOCOL_SSLv23, - ciphers='ADH-AES256-SHA') - - @staticmethod - def _parse_proxy(proxy_type, addr, port, rdns=True, username=None, password=None): - if isinstance(proxy_type, str): - proxy_type = proxy_type.lower() - - # Always prefer `python_socks` when available - if python_socks: - from python_socks import ProxyType - - # We do the check for numerical values here - # to be backwards compatible with PySocks proxy format, - # (since socks.SOCKS5 == 2, socks.SOCKS4 == 1, socks.HTTP == 3) - if proxy_type == ProxyType.SOCKS5 or proxy_type == 2 or proxy_type == "socks5": - protocol = ProxyType.SOCKS5 - elif proxy_type == ProxyType.SOCKS4 or proxy_type == 1 or proxy_type == "socks4": - protocol = ProxyType.SOCKS4 - elif proxy_type == ProxyType.HTTP or proxy_type == 3 or proxy_type == "http": - protocol = ProxyType.HTTP - else: - raise ValueError("Unknown proxy protocol type: {}".format(proxy_type)) - - # This tuple must be compatible with `python_socks`' `Proxy.create()` signature - return protocol, addr, port, username, password, rdns - - else: - from socks import SOCKS5, SOCKS4, HTTP - - if proxy_type == 2 or proxy_type == "socks5": - protocol = SOCKS5 - elif proxy_type == 1 or proxy_type == "socks4": - protocol = SOCKS4 - elif proxy_type == 3 or proxy_type == "http": - protocol = HTTP - else: - raise ValueError("Unknown proxy protocol type: {}".format(proxy_type)) - - # This tuple must be compatible with `PySocks`' `socksocket.set_proxy()` signature - return protocol, addr, port, rdns, username, password - - async def _proxy_connect(self, timeout=None, local_addr=None): - if isinstance(self._proxy, (tuple, list)): - parsed = self._parse_proxy(*self._proxy) - elif isinstance(self._proxy, dict): - parsed = self._parse_proxy(**self._proxy) - else: - raise TypeError("Proxy of unknown format: {}".format(type(self._proxy))) - - # Always prefer `python_socks` when available - if python_socks: - # python_socks internal errors are not inherited from - # builtin IOError (just from Exception). Instead of adding those - # in exceptions clauses everywhere through the code, we - # rather monkey-patch them in place. - - python_socks._errors.ProxyError = ConnectionError - python_socks._errors.ProxyConnectionError = ConnectionError - python_socks._errors.ProxyTimeoutError = ConnectionError - - from python_socks.async_.asyncio import Proxy - - proxy = Proxy.create(*parsed) - - # WARNING: If `local_addr` is set we use manual socket creation, because, - # unfortunately, `Proxy.connect()` does not expose `local_addr` - # argument, so if we want to bind socket locally, we need to manually - # create, bind and connect socket, and then pass to `Proxy.connect()` method. - - if local_addr is None: - sock = await proxy.connect( - dest_host=self._ip, - dest_port=self._port, - timeout=timeout - ) - else: - # Here we start manual setup of the socket. - # The `address` represents the proxy ip and proxy port, - # not the destination one (!), because the socket - # connects to the proxy server, not destination server. - # IPv family is also checked on proxy address. - if ':' in proxy.proxy_host: - mode, address = socket.AF_INET6, (proxy.proxy_host, proxy.proxy_port, 0, 0) - else: - mode, address = socket.AF_INET, (proxy.proxy_host, proxy.proxy_port) - - # Create a non-blocking socket and bind it (if local address is specified). - sock = socket.socket(mode, socket.SOCK_STREAM) - sock.setblocking(False) - sock.bind(local_addr) - - # Actual TCP connection is performed here. - await asyncio.wait_for( - asyncio.get_event_loop().sock_connect(sock=sock, address=address), - timeout=timeout - ) - - # As our socket is already created and connected, - # this call sets the destination host/port and - # starts protocol negotiations with the proxy server. - sock = await proxy.connect( - dest_host=self._ip, - dest_port=self._port, - timeout=timeout, - _socket=sock - ) - - else: - import socks - - # Here `address` represents destination address (not proxy), because of - # the `PySocks` implementation of the connection routine. - # IPv family is checked on proxy address, not destination address. - if ':' in parsed[1]: - mode, address = socket.AF_INET6, (self._ip, self._port, 0, 0) - else: - mode, address = socket.AF_INET, (self._ip, self._port) - - # Setup socket, proxy, timeout and bind it (if necessary). - sock = socks.socksocket(mode, socket.SOCK_STREAM) - sock.set_proxy(*parsed) - sock.settimeout(timeout) - - if local_addr is not None: - sock.bind(local_addr) - - # Actual TCP connection and negotiation performed here. - await asyncio.wait_for( - asyncio.get_event_loop().sock_connect(sock=sock, address=address), - timeout=timeout - ) - - sock.setblocking(False) - - return sock - - async def _connect(self, timeout=None, ssl=None): - if self._local_addr is not None: - # NOTE: If port is not specified, we use 0 port - # to notify the OS that port should be chosen randomly - # from the available ones. - if isinstance(self._local_addr, tuple) and len(self._local_addr) == 2: - local_addr = self._local_addr - elif isinstance(self._local_addr, str): - local_addr = (self._local_addr, 0) - else: - raise ValueError("Unknown local address format: {}".format(self._local_addr)) - else: - local_addr = None - - if not self._proxy: - self._reader, self._writer = await asyncio.wait_for( - asyncio.open_connection( - host=self._ip, - port=self._port, - ssl=ssl, - local_addr=local_addr - ), timeout=timeout) - else: - # Proxy setup, connection and negotiation is performed here. - sock = await self._proxy_connect( - timeout=timeout, - local_addr=local_addr - ) - - # Wrap socket in SSL context (if provided) - if ssl: - sock = self._wrap_socket_ssl(sock) - - self._reader, self._writer = await asyncio.open_connection(sock=sock) - - self._codec = self.packet_codec(self) - self._init_conn() - await self._writer.drain() - - async def connect(self, timeout=None, ssl=None): - """ - Establishes a connection with the server. - """ - await self._connect(timeout=timeout, ssl=ssl) - self._connected = True - - loop = asyncio.get_event_loop() - self._send_task = loop.create_task(self._send_loop()) - self._recv_task = loop.create_task(self._recv_loop()) - - async def disconnect(self): - """ - Disconnects from the server, and clears - pending outgoing and incoming messages. - """ - self._connected = False - - await helpers._cancel( - self._log, - send_task=self._send_task, - recv_task=self._recv_task - ) - - if self._writer: - self._writer.close() - try: - await self._writer.wait_closed() - except Exception as e: - # Disconnecting should never raise. Seen: - # * OSError: No route to host and - # * OSError: [Errno 32] Broken pipe - # * ConnectionResetError - self._log.info('%s during disconnect: %s', type(e), e) - - def send(self, data): - """ - Sends a packet of data through this connection mode. - - This method returns a coroutine. - """ - if not self._connected: - raise ConnectionError('Not connected') - - return self._send_queue.put(data) - - async def recv(self): - """ - Receives a packet of data through this connection mode. - - This method returns a coroutine. - """ - while self._connected: - result = await self._recv_queue.get() - if result: # None = sentinel value = keep trying - return result - - raise ConnectionError('Not connected') - - async def _send_loop(self): - """ - This loop is constantly popping items off the queue to send them. - """ - try: - while self._connected: - self._send(await self._send_queue.get()) - await self._writer.drain() - except asyncio.CancelledError: - pass - except Exception as e: - if isinstance(e, IOError): - self._log.info('The server closed the connection while sending') - else: - self._log.exception('Unexpected exception in the send loop') - - await self.disconnect() - - async def _recv_loop(self): - """ - This loop is constantly putting items on the queue as they're read. - """ - while self._connected: - try: - data = await self._recv() - except asyncio.CancelledError: - break - except Exception as e: - if isinstance(e, (IOError, asyncio.IncompleteReadError)): - msg = 'The server closed the connection' - self._log.info(msg) - elif isinstance(e, InvalidChecksumError): - msg = 'The server response had an invalid checksum' - self._log.info(msg) - else: - msg = 'Unexpected exception in the receive loop' - self._log.exception(msg) - - await self.disconnect() - - # Add a sentinel value to unstuck recv - if self._recv_queue.empty(): - self._recv_queue.put_nowait(None) - - break - - try: - await self._recv_queue.put(data) - except asyncio.CancelledError: - break - - def _init_conn(self): - """ - This method will be called after `connect` is called. - After this method finishes, the writer will be drained. - - Subclasses should make use of this if they need to send - data to Telegram to indicate which connection mode will - be used. - """ - if self._codec.tag: - self._writer.write(self._codec.tag) - - def _send(self, data): - self._writer.write(self._codec.encode_packet(data)) - - async def _recv(self): - return await self._codec.read_packet(self._reader) - - def __str__(self): - return '{}:{}/{}'.format( - self._ip, self._port, - self.__class__.__name__.replace('Connection', '') - ) - - -class ObfuscatedConnection(Connection): - """ - Base class for "obfuscated" connections ("obfuscated2", "mtproto proxy") - """ - """ - This attribute should be redefined by subclasses - """ - obfuscated_io = None - - def _init_conn(self): - self._obfuscation = self.obfuscated_io(self) - self._writer.write(self._obfuscation.header) - - def _send(self, data): - self._obfuscation.write(self._codec.encode_packet(data)) - - async def _recv(self): - return await self._codec.read_packet(self._obfuscation) - - -class PacketCodec(abc.ABC): - """ - Base class for packet codecs - """ - - """ - This attribute should be re-defined by subclass to define if some - "magic bytes" should be sent to server right after connection is made to - signal which protocol will be used - """ - tag = None - - def __init__(self, connection): - """ - Codec is created when connection is just made. - """ - self._conn = connection - - @abc.abstractmethod - def encode_packet(self, data): - """ - Encodes single packet and returns encoded bytes. - """ - raise NotImplementedError - - @abc.abstractmethod - async def read_packet(self, reader): - """ - Reads single packet from `reader` object that should have - `readexactly(n)` method. - """ - raise NotImplementedError diff --git a/telethon/_network/connection/http.py b/telethon/_network/connection/http.py deleted file mode 100644 index e2d976f7..00000000 --- a/telethon/_network/connection/http.py +++ /dev/null @@ -1,39 +0,0 @@ -import asyncio - -from .connection import Connection, PacketCodec - - -SSL_PORT = 443 - - -class HttpPacketCodec(PacketCodec): - tag = None - obfuscate_tag = None - - def encode_packet(self, data): - return ('POST /api HTTP/1.1\r\n' - 'Host: {}:{}\r\n' - 'Content-Type: application/x-www-form-urlencoded\r\n' - 'Connection: keep-alive\r\n' - 'Keep-Alive: timeout=100000, max=10000000\r\n' - 'Content-Length: {}\r\n\r\n' - .format(self._conn._ip, self._conn._port, len(data)) - .encode('ascii') + data) - - async def read_packet(self, reader): - while True: - line = await reader.readline() - if not line or line[-1] != b'\n': - raise asyncio.IncompleteReadError(line, None) - - if line.lower().startswith(b'content-length: '): - await reader.readexactly(2) - length = int(line[16:-2]) - return await reader.readexactly(length) - - -class ConnectionHttp(Connection): - packet_codec = HttpPacketCodec - - async def connect(self, timeout=None, ssl=None): - await super().connect(timeout=timeout, ssl=self._port == SSL_PORT) diff --git a/telethon/_network/connection/tcpabridged.py b/telethon/_network/connection/tcpabridged.py deleted file mode 100644 index 171b1d8c..00000000 --- a/telethon/_network/connection/tcpabridged.py +++ /dev/null @@ -1,33 +0,0 @@ -import struct - -from .connection import Connection, PacketCodec - - -class AbridgedPacketCodec(PacketCodec): - tag = b'\xef' - obfuscate_tag = b'\xef\xef\xef\xef' - - def encode_packet(self, data): - length = len(data) >> 2 - if length < 127: - length = struct.pack('B', length) - else: - length = b'\x7f' + int.to_bytes(length, 3, 'little') - return length + data - - async def read_packet(self, reader): - length = struct.unpack('= 127: - length = struct.unpack( - ' 0: - return packet_with_padding[:-pad_size] - return packet_with_padding - - -class ConnectionTcpIntermediate(Connection): - """ - Intermediate mode between `ConnectionTcpFull` and `ConnectionTcpAbridged`. - Always sends 4 extra bytes for the packet length. - """ - packet_codec = IntermediatePacketCodec diff --git a/telethon/_network/connection/tcpmtproxy.py b/telethon/_network/connection/tcpmtproxy.py deleted file mode 100644 index db18a61c..00000000 --- a/telethon/_network/connection/tcpmtproxy.py +++ /dev/null @@ -1,152 +0,0 @@ -import asyncio -import hashlib -import os - -from .connection import ObfuscatedConnection -from .tcpabridged import AbridgedPacketCodec -from .tcpintermediate import ( - IntermediatePacketCodec, - RandomizedIntermediatePacketCodec -) - -from ..._crypto import AESModeCTR - - -class MTProxyIO: - """ - It's very similar to tcpobfuscated.ObfuscatedIO, but the way - encryption keys, protocol tag and dc_id are encoded is different. - """ - header = None - - def __init__(self, connection): - self._reader = connection._reader - self._writer = connection._writer - - (self.header, - self._encrypt, - self._decrypt) = self.init_header( - connection._secret, connection._dc_id, connection.packet_codec) - - @staticmethod - def init_header(secret, dc_id, packet_codec): - # Validate - is_dd = (len(secret) == 17) and (secret[0] == 0xDD) - is_rand_codec = issubclass( - packet_codec, RandomizedIntermediatePacketCodec) - if is_dd and not is_rand_codec: - raise ValueError( - "Only RandomizedIntermediate can be used with dd-secrets") - secret = secret[1:] if is_dd else secret - if len(secret) != 16: - raise ValueError( - "MTProxy secret must be a hex-string representing 16 bytes") - - # Obfuscated messages secrets cannot start with any of these - keywords = (b'PVrG', b'GET ', b'POST', b'\xee\xee\xee\xee') - while True: - random = os.urandom(64) - if (random[0] != 0xef and - random[:4] not in keywords and - random[4:4] != b'\0\0\0\0'): - break - - random = bytearray(random) - random_reversed = random[55:7:-1] # Reversed (8, len=48) - - # Encryption has "continuous buffer" enabled - encrypt_key = hashlib.sha256( - bytes(random[8:40]) + secret).digest() - encrypt_iv = bytes(random[40:56]) - decrypt_key = hashlib.sha256( - bytes(random_reversed[:32]) + secret).digest() - decrypt_iv = bytes(random_reversed[32:48]) - - encryptor = AESModeCTR(encrypt_key, encrypt_iv) - decryptor = AESModeCTR(decrypt_key, decrypt_iv) - - random[56:60] = packet_codec.obfuscate_tag - - dc_id_bytes = dc_id.to_bytes(2, "little", signed=True) - random = random[:60] + dc_id_bytes + random[62:] - random[56:64] = encryptor.encrypt(bytes(random))[56:64] - return (random, encryptor, decryptor) - - async def readexactly(self, n): - return self._decrypt.encrypt(await self._reader.readexactly(n)) - - def write(self, data): - self._writer.write(self._encrypt.encrypt(data)) - - -class TcpMTProxy(ObfuscatedConnection): - """ - Connector which allows user to connect to the Telegram via proxy servers - commonly known as MTProxy. - Implemented very ugly due to the leaky abstractions in Telethon networking - classes that should be refactored later (TODO). - - .. warning:: - - The support for TcpMTProxy classes is **EXPERIMENTAL** and prone to - be changed. You shouldn't be using this class yet. - """ - packet_codec = None - obfuscated_io = MTProxyIO - - # noinspection PyUnusedLocal - def __init__(self, ip, port, dc_id, *, loggers, proxy=None, local_addr=None): - # connect to proxy's host and port instead of telegram's ones - proxy_host, proxy_port = self.address_info(proxy) - self._secret = bytes.fromhex(proxy[2]) - super().__init__( - proxy_host, proxy_port, dc_id, loggers=loggers) - - async def _connect(self, timeout=None, ssl=None): - await super()._connect(timeout=timeout, ssl=ssl) - - # Wait for EOF for 2 seconds (or if _wait_for_data's definition - # is missing or different, just sleep for 2 seconds). This way - # we give the proxy a chance to close the connection if the current - # codec (which the proxy detects with the data we sent) cannot - # be used for this proxy. This is a work around for #1134. - # TODO Sleeping for N seconds may not be the best solution - # TODO This fix could be welcome for HTTP proxies as well - try: - await asyncio.wait_for(self._reader._wait_for_data('proxy'), 2) - except asyncio.TimeoutError: - pass - except Exception: - await asyncio.sleep(2) - - if self._reader.at_eof(): - await self.disconnect() - raise ConnectionError( - 'Proxy closed the connection after sending initial payload') - - @staticmethod - def address_info(proxy_info): - if proxy_info is None: - raise ValueError("No proxy info specified for MTProxy connection") - return proxy_info[:2] - - -class ConnectionTcpMTProxyAbridged(TcpMTProxy): - """ - Connect to proxy using abridged protocol - """ - packet_codec = AbridgedPacketCodec - - -class ConnectionTcpMTProxyIntermediate(TcpMTProxy): - """ - Connect to proxy using intermediate protocol - """ - packet_codec = IntermediatePacketCodec - - -class ConnectionTcpMTProxyRandomizedIntermediate(TcpMTProxy): - """ - Connect to proxy using randomized intermediate protocol (dd-secrets) - """ - packet_codec = RandomizedIntermediatePacketCodec diff --git a/telethon/_network/connection/tcpobfuscated.py b/telethon/_network/connection/tcpobfuscated.py deleted file mode 100644 index 2aeeeac1..00000000 --- a/telethon/_network/connection/tcpobfuscated.py +++ /dev/null @@ -1,62 +0,0 @@ -import os - -from .tcpabridged import AbridgedPacketCodec -from .connection import ObfuscatedConnection - -from ..._crypto import AESModeCTR - - -class ObfuscatedIO: - header = None - - def __init__(self, connection): - self._reader = connection._reader - self._writer = connection._writer - - (self.header, - self._encrypt, - self._decrypt) = self.init_header(connection.packet_codec) - - @staticmethod - def init_header(packet_codec): - # Obfuscated messages secrets cannot start with any of these - keywords = (b'PVrG', b'GET ', b'POST', b'\xee\xee\xee\xee') - while True: - random = os.urandom(64) - if (random[0] != 0xef and - random[:4] not in keywords and - random[4:8] != b'\0\0\0\0'): - break - - random = bytearray(random) - random_reversed = random[55:7:-1] # Reversed (8, len=48) - - # Encryption has "continuous buffer" enabled - encrypt_key = bytes(random[8:40]) - encrypt_iv = bytes(random[40:56]) - decrypt_key = bytes(random_reversed[:32]) - decrypt_iv = bytes(random_reversed[32:48]) - - encryptor = AESModeCTR(encrypt_key, encrypt_iv) - decryptor = AESModeCTR(decrypt_key, decrypt_iv) - - random[56:60] = packet_codec.obfuscate_tag - random[56:64] = encryptor.encrypt(bytes(random))[56:64] - return (random, encryptor, decryptor) - - async def readexactly(self, n): - return self._decrypt.encrypt(await self._reader.readexactly(n)) - - def write(self, data): - self._writer.write(self._encrypt.encrypt(data)) - - -class ConnectionTcpObfuscated(ObfuscatedConnection): - """ - Mode that Telegram defines as "obfuscated2". Encodes the packet - just like `ConnectionTcpAbridged`, but encrypts every message with - a randomly generated key using the AES-CTR mode so the packets are - harder to discern. - """ - obfuscated_io = ObfuscatedIO - packet_codec = AbridgedPacketCodec diff --git a/telethon/_network/transports/__init__.py b/telethon/_network/transports/__init__.py new file mode 100644 index 00000000..36dfc149 --- /dev/null +++ b/telethon/_network/transports/__init__.py @@ -0,0 +1,4 @@ +from .transport import Transport +from .abridged import Abridged +from .full import Full +from .intermediate import Intermediate diff --git a/telethon/_network/transports/abridged.py b/telethon/_network/transports/abridged.py new file mode 100644 index 00000000..c847c249 --- /dev/null +++ b/telethon/_network/transports/abridged.py @@ -0,0 +1,43 @@ +from .transport import Transport +import struct + + +class Abridged(Transport): + def __init__(self): + self._init = False + + def recreate_fresh(self): + return type(self)() + + def pack(self, input: bytes) -> bytes: + if self._init: + header = b'' + else: + header = b'\xef' + self._init = True + + length = len(data) >> 2 + if length < 127: + length = struct.pack('B', length) + else: + length = b'\x7f' + int.to_bytes(length, 3, 'little') + + return header + length + data + + def unpack(self, input: bytes) -> (int, bytes): + if len(input) < 4: + raise EOFError() + + length = input[0] + if length < 127: + offset = 1 + else: + offset = 4 + length = struct.unpack(' bytes: + # https://core.telegram.org/mtproto#tcp-transport + length = len(input) + 12 + data = struct.pack(' (int, bytes): + if len(input) < 12: + raise EOFError() + + length, seq = struct.unpack(' bytes: + if self._init: + header = b'' + else: + header = b'\xee\xee\xee\xee' + self._init = True + + return header + struct.pack(' (int, bytes): + if len(input) < 4: + raise EOFError() + + length = struct.unpack(' bytes: + pass + + # Should raise EOFError if it does not have enough bytes + @abc.abstractmethod + def unpack(self, input: bytes) -> (int, bytes): + pass