diff --git a/telethon/_client/telegrambaseclient.py b/telethon/_client/telegrambaseclient.py index 85512cfb..e4906fbf 100644 --- a/telethon/_client/telegrambaseclient.py +++ b/telethon/_client/telegrambaseclient.py @@ -11,7 +11,7 @@ import ipaddress from .. import version, __name__ as __base_name__, _tl from .._crypto import rsa from .._misc import markdown, entitycache, statecache, enums, helpers -from .._network import MTProtoSender, Connection, ConnectionTcpFull, connection as conns +from .._network import MTProtoSender, Connection, transports from .._sessions import Session, SQLiteSession, MemorySession from .._sessions.types import DataCenter, SessionState @@ -70,7 +70,7 @@ def init( api_id: int, api_hash: str, *, - connection: 'typing.Type[Connection]' = ConnectionTcpFull, + connection: 'typing.Type[Connection]' = (), use_ipv6: bool = False, proxy: typing.Union[tuple, dict] = None, local_addr: typing.Union[str, tuple] = None, @@ -194,15 +194,12 @@ def init( # For now the current default remains TCP Full; may change to be "smart" if proxies are specified connection = enums.ConnectionMode.FULL - self._connection = { - enums.ConnectionMode.FULL: conns.ConnectionTcpFull, - enums.ConnectionMode.INTERMEDIATE: conns.ConnectionTcpIntermediate, - enums.ConnectionMode.ABRIDGED: conns.ConnectionTcpAbridged, - enums.ConnectionMode.OBFUSCATED: conns.ConnectionTcpObfuscated, - enums.ConnectionMode.HTTP: conns.ConnectionHttp, + self._transport = { + enums.ConnectionMode.FULL: transports.Full(), + enums.ConnectionMode.INTERMEDIATE: transports.Intermediate(), + enums.ConnectionMode.ABRIDGED: transports.Abridged(), }[enums.parse_conn_mode(connection)] - init_proxy = None if not issubclass(self._connection, conns.TcpMTProxy) else \ - _tl.InputClientProxy(*self._connection.address_info(proxy)) + init_proxy = None # Used on connection. Capture the variables in a lambda since # exporting clients need to create this InvokeWithLayer. @@ -334,13 +331,12 @@ async def connect(self: 'TelegramClient') -> None: # Use known key, if any self._sender.auth_key.key = dc.auth - if not await self._sender.connect(self._connection( - str(ipaddress.ip_address((self._use_ipv6 and dc.ipv6) or dc.ipv4)), - dc.port, - dc.id, + if not await self._sender.connect(Connection( + ip=str(ipaddress.ip_address((self._use_ipv6 and dc.ipv6) or dc.ipv4)), + port=dc.port, + transport=self._transport.recreate_fresh(), loggers=self._log, - proxy=self._proxy, - local_addr=self._local_addr + local_addr=self._local_addr, )): # We don't want to init or modify anything if we were already connected return @@ -396,8 +392,7 @@ async def disconnect(self: 'TelegramClient'): return await _disconnect_coro(self) def set_proxy(self: 'TelegramClient', proxy: typing.Union[tuple, dict]): - init_proxy = None if not issubclass(self._connection, conns.TcpMTProxy) else \ - _tl.InputClientProxy(*self._connection.address_info(proxy)) + init_proxy = None self._init_request.proxy = init_proxy self._proxy = proxy @@ -481,13 +476,12 @@ async def _create_exported_sender(self: 'TelegramClient', dc_id): # If one were to do that, Telegram would reset the connection # with no further clues. sender = MTProtoSender(loggers=self._log) - await sender.connect(self._connection( - str(ipaddress.ip_address((self._use_ipv6 and dc.ipv6) or dc.ipv4)), - dc.port, - dc.id, + await self._sender.connect(Connection( + ip=str(ipaddress.ip_address((self._use_ipv6 and dc.ipv6) or dc.ipv4)), + port=dc.port, + transport=self._transport.recreate_fresh(), loggers=self._log, - proxy=self._proxy, - local_addr=self._local_addr + local_addr=self._local_addr, )) self._log[__name__].info('Exporting auth for new borrowed sender in %s', dc) auth = await self(_tl.fn.auth.ExportAuthorization(dc_id)) @@ -516,13 +510,13 @@ async def _borrow_exported_sender(self: 'TelegramClient', dc_id): elif state.need_connect(): dc = self._all_dcs[dc_id] - await sender.connect(self._connection( - str(ipaddress.ip_address((self._use_ipv6 and dc.ipv6) or dc.ipv4)), - dc.port, - dc.id, + + await self._sender.connect(Connection( + ip=str(ipaddress.ip_address((self._use_ipv6 and dc.ipv6) or dc.ipv4)), + port=dc.port, + transport=self._transport.recreate_fresh(), loggers=self._log, - proxy=self._proxy, - local_addr=self._local_addr + local_addr=self._local_addr, )) state.add_borrow() diff --git a/telethon/_client/telegramclient.py b/telethon/_client/telegramclient.py index 9755d437..bb21a416 100644 --- a/telethon/_client/telegramclient.py +++ b/telethon/_client/telegramclient.py @@ -10,7 +10,6 @@ from . import ( ) from .. import version, _tl from ..types import _custom -from .._network import ConnectionTcpFull from .._events.common import EventBuilder, EventCommon from .._misc import enums diff --git a/telethon/_misc/enums.py b/telethon/_misc/enums.py index 2d5742aa..283030f3 100644 --- a/telethon/_misc/enums.py +++ b/telethon/_misc/enums.py @@ -15,8 +15,6 @@ class ConnectionMode(Enum): FULL = 'full' INTERMEDIATE = 'intermediate' ABRIDGED = 'abridged' - OBFUSCATED = 'obfuscated' - HTTP = 'http' class Participant(Enum): diff --git a/telethon/_network/__init__.py b/telethon/_network/__init__.py index 0b985d58..164acc4e 100644 --- a/telethon/_network/__init__.py +++ b/telethon/_network/__init__.py @@ -5,10 +5,5 @@ with Telegram's servers and the protocol used (TCP full, abridged, etc.). from .mtprotoplainsender import MTProtoPlainSender from .authenticator import do_authentication from .mtprotosender import MTProtoSender -from .connection import ( - Connection, - ConnectionTcpFull, ConnectionTcpIntermediate, ConnectionTcpAbridged, - ConnectionTcpObfuscated, ConnectionTcpMTProxyAbridged, - ConnectionTcpMTProxyIntermediate, - ConnectionTcpMTProxyRandomizedIntermediate, ConnectionHttp, TcpMTProxy -) +from .connection import Connection +from . import transports diff --git a/telethon/_network/connection.py b/telethon/_network/connection.py new file mode 100644 index 00000000..26674aa2 --- /dev/null +++ b/telethon/_network/connection.py @@ -0,0 +1,61 @@ +import asyncio +import socket + +from .transports.transport import Transport + + +CHUNK_SIZE = 32 * 1024 + + +# TODO ideally the mtproto impl would also be sans-io, but that's less pressing +class Connection: + def __init__(self, ip, port, *, transport: Transport, loggers, local_addr=None): + self._ip = ip + self._port = port + self._log = loggers[__name__] + self._local_addr = local_addr + + self._sock = None + self._in_buffer = bytearray() + self._transport = transport + + async def connect(self, timeout=None, ssl=None): + """ + Establishes a connection with the server. + """ + loop = asyncio.get_event_loop() + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setblocking(False) + if self._local_addr: + sock.bind(self._local_addr) + + await asyncio.wait_for(loop.sock_connect(sock, (self._ip, self._port)), timeout) + self._sock = sock + + async def disconnect(self): + self._sock.close() + self._sock = None + + async def send(self, data): + if not self._sock: + raise ConnectionError('not connected') + + loop = asyncio.get_event_loop() + await loop.sock_sendall(self._sock, self._transport.pack(data)) + + async def recv(self): + if not self._sock: + raise ConnectionError('not connected') + + loop = asyncio.get_event_loop() + while True: + try: + length, body = self._transport.unpack(self._in_buffer) + del self._in_buffer[:length] + return body + except EOFError: + self._in_buffer += await loop.sock_recv(self._sock, CHUNK_SIZE) + + def __str__(self): + return f'{self._ip}:{self._port}/{self._transport.__class__.__name__}' diff --git a/telethon/_network/connection/__init__.py b/telethon/_network/connection/__init__.py deleted file mode 100644 index 88771866..00000000 --- a/telethon/_network/connection/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -from .connection import Connection -from .tcpfull import ConnectionTcpFull -from .tcpintermediate import ConnectionTcpIntermediate -from .tcpabridged import ConnectionTcpAbridged -from .tcpobfuscated import ConnectionTcpObfuscated -from .tcpmtproxy import ( - TcpMTProxy, - ConnectionTcpMTProxyAbridged, - ConnectionTcpMTProxyIntermediate, - ConnectionTcpMTProxyRandomizedIntermediate -) -from .http import ConnectionHttp diff --git a/telethon/_network/connection/connection.py b/telethon/_network/connection/connection.py deleted file mode 100644 index abf398ee..00000000 --- a/telethon/_network/connection/connection.py +++ /dev/null @@ -1,426 +0,0 @@ -import abc -import asyncio -import socket -import sys - -try: - import ssl as ssl_mod -except ImportError: - ssl_mod = None - -try: - import python_socks -except ImportError: - python_socks = None - -from ...errors._custom import InvalidChecksumError -from ..._misc import helpers - - -class Connection(abc.ABC): - """ - The `Connection` class is a wrapper around ``asyncio.open_connection``. - - Subclasses will implement different transport modes as atomic operations, - which this class eases doing since the exposed interface simply puts and - gets complete data payloads to and from queues. - - The only error that will raise from send and receive methods is - ``ConnectionError``, which will raise when attempting to send if - the client is disconnected (includes remote disconnections). - """ - # this static attribute should be redefined by `Connection` subclasses and - # should be one of `PacketCodec` implementations - packet_codec = None - - def __init__(self, ip, port, dc_id, *, loggers, proxy=None, local_addr=None): - self._ip = ip - self._port = port - self._dc_id = dc_id # only for MTProxy, it's an abstraction leak - self._log = loggers[__name__] - self._proxy = proxy - self._local_addr = local_addr - self._reader = None - self._writer = None - self._connected = False - self._send_task = None - self._recv_task = None - self._codec = None - self._obfuscation = None # TcpObfuscated and MTProxy - self._send_queue = asyncio.Queue(1) - self._recv_queue = asyncio.Queue(1) - - @staticmethod - def _wrap_socket_ssl(sock): - if ssl_mod is None: - raise RuntimeError( - 'Cannot use proxy that requires SSL ' - 'without the SSL module being available' - ) - - return ssl_mod.wrap_socket( - sock, - do_handshake_on_connect=True, - ssl_version=ssl_mod.PROTOCOL_SSLv23, - ciphers='ADH-AES256-SHA') - - @staticmethod - def _parse_proxy(proxy_type, addr, port, rdns=True, username=None, password=None): - if isinstance(proxy_type, str): - proxy_type = proxy_type.lower() - - # Always prefer `python_socks` when available - if python_socks: - from python_socks import ProxyType - - # We do the check for numerical values here - # to be backwards compatible with PySocks proxy format, - # (since socks.SOCKS5 == 2, socks.SOCKS4 == 1, socks.HTTP == 3) - if proxy_type == ProxyType.SOCKS5 or proxy_type == 2 or proxy_type == "socks5": - protocol = ProxyType.SOCKS5 - elif proxy_type == ProxyType.SOCKS4 or proxy_type == 1 or proxy_type == "socks4": - protocol = ProxyType.SOCKS4 - elif proxy_type == ProxyType.HTTP or proxy_type == 3 or proxy_type == "http": - protocol = ProxyType.HTTP - else: - raise ValueError("Unknown proxy protocol type: {}".format(proxy_type)) - - # This tuple must be compatible with `python_socks`' `Proxy.create()` signature - return protocol, addr, port, username, password, rdns - - else: - from socks import SOCKS5, SOCKS4, HTTP - - if proxy_type == 2 or proxy_type == "socks5": - protocol = SOCKS5 - elif proxy_type == 1 or proxy_type == "socks4": - protocol = SOCKS4 - elif proxy_type == 3 or proxy_type == "http": - protocol = HTTP - else: - raise ValueError("Unknown proxy protocol type: {}".format(proxy_type)) - - # This tuple must be compatible with `PySocks`' `socksocket.set_proxy()` signature - return protocol, addr, port, rdns, username, password - - async def _proxy_connect(self, timeout=None, local_addr=None): - if isinstance(self._proxy, (tuple, list)): - parsed = self._parse_proxy(*self._proxy) - elif isinstance(self._proxy, dict): - parsed = self._parse_proxy(**self._proxy) - else: - raise TypeError("Proxy of unknown format: {}".format(type(self._proxy))) - - # Always prefer `python_socks` when available - if python_socks: - # python_socks internal errors are not inherited from - # builtin IOError (just from Exception). Instead of adding those - # in exceptions clauses everywhere through the code, we - # rather monkey-patch them in place. - - python_socks._errors.ProxyError = ConnectionError - python_socks._errors.ProxyConnectionError = ConnectionError - python_socks._errors.ProxyTimeoutError = ConnectionError - - from python_socks.async_.asyncio import Proxy - - proxy = Proxy.create(*parsed) - - # WARNING: If `local_addr` is set we use manual socket creation, because, - # unfortunately, `Proxy.connect()` does not expose `local_addr` - # argument, so if we want to bind socket locally, we need to manually - # create, bind and connect socket, and then pass to `Proxy.connect()` method. - - if local_addr is None: - sock = await proxy.connect( - dest_host=self._ip, - dest_port=self._port, - timeout=timeout - ) - else: - # Here we start manual setup of the socket. - # The `address` represents the proxy ip and proxy port, - # not the destination one (!), because the socket - # connects to the proxy server, not destination server. - # IPv family is also checked on proxy address. - if ':' in proxy.proxy_host: - mode, address = socket.AF_INET6, (proxy.proxy_host, proxy.proxy_port, 0, 0) - else: - mode, address = socket.AF_INET, (proxy.proxy_host, proxy.proxy_port) - - # Create a non-blocking socket and bind it (if local address is specified). - sock = socket.socket(mode, socket.SOCK_STREAM) - sock.setblocking(False) - sock.bind(local_addr) - - # Actual TCP connection is performed here. - await asyncio.wait_for( - asyncio.get_event_loop().sock_connect(sock=sock, address=address), - timeout=timeout - ) - - # As our socket is already created and connected, - # this call sets the destination host/port and - # starts protocol negotiations with the proxy server. - sock = await proxy.connect( - dest_host=self._ip, - dest_port=self._port, - timeout=timeout, - _socket=sock - ) - - else: - import socks - - # Here `address` represents destination address (not proxy), because of - # the `PySocks` implementation of the connection routine. - # IPv family is checked on proxy address, not destination address. - if ':' in parsed[1]: - mode, address = socket.AF_INET6, (self._ip, self._port, 0, 0) - else: - mode, address = socket.AF_INET, (self._ip, self._port) - - # Setup socket, proxy, timeout and bind it (if necessary). - sock = socks.socksocket(mode, socket.SOCK_STREAM) - sock.set_proxy(*parsed) - sock.settimeout(timeout) - - if local_addr is not None: - sock.bind(local_addr) - - # Actual TCP connection and negotiation performed here. - await asyncio.wait_for( - asyncio.get_event_loop().sock_connect(sock=sock, address=address), - timeout=timeout - ) - - sock.setblocking(False) - - return sock - - async def _connect(self, timeout=None, ssl=None): - if self._local_addr is not None: - # NOTE: If port is not specified, we use 0 port - # to notify the OS that port should be chosen randomly - # from the available ones. - if isinstance(self._local_addr, tuple) and len(self._local_addr) == 2: - local_addr = self._local_addr - elif isinstance(self._local_addr, str): - local_addr = (self._local_addr, 0) - else: - raise ValueError("Unknown local address format: {}".format(self._local_addr)) - else: - local_addr = None - - if not self._proxy: - self._reader, self._writer = await asyncio.wait_for( - asyncio.open_connection( - host=self._ip, - port=self._port, - ssl=ssl, - local_addr=local_addr - ), timeout=timeout) - else: - # Proxy setup, connection and negotiation is performed here. - sock = await self._proxy_connect( - timeout=timeout, - local_addr=local_addr - ) - - # Wrap socket in SSL context (if provided) - if ssl: - sock = self._wrap_socket_ssl(sock) - - self._reader, self._writer = await asyncio.open_connection(sock=sock) - - self._codec = self.packet_codec(self) - self._init_conn() - await self._writer.drain() - - async def connect(self, timeout=None, ssl=None): - """ - Establishes a connection with the server. - """ - await self._connect(timeout=timeout, ssl=ssl) - self._connected = True - - loop = asyncio.get_event_loop() - self._send_task = loop.create_task(self._send_loop()) - self._recv_task = loop.create_task(self._recv_loop()) - - async def disconnect(self): - """ - Disconnects from the server, and clears - pending outgoing and incoming messages. - """ - self._connected = False - - await helpers._cancel( - self._log, - send_task=self._send_task, - recv_task=self._recv_task - ) - - if self._writer: - self._writer.close() - try: - await self._writer.wait_closed() - except Exception as e: - # Disconnecting should never raise. Seen: - # * OSError: No route to host and - # * OSError: [Errno 32] Broken pipe - # * ConnectionResetError - self._log.info('%s during disconnect: %s', type(e), e) - - def send(self, data): - """ - Sends a packet of data through this connection mode. - - This method returns a coroutine. - """ - if not self._connected: - raise ConnectionError('Not connected') - - return self._send_queue.put(data) - - async def recv(self): - """ - Receives a packet of data through this connection mode. - - This method returns a coroutine. - """ - while self._connected: - result = await self._recv_queue.get() - if result: # None = sentinel value = keep trying - return result - - raise ConnectionError('Not connected') - - async def _send_loop(self): - """ - This loop is constantly popping items off the queue to send them. - """ - try: - while self._connected: - self._send(await self._send_queue.get()) - await self._writer.drain() - except asyncio.CancelledError: - pass - except Exception as e: - if isinstance(e, IOError): - self._log.info('The server closed the connection while sending') - else: - self._log.exception('Unexpected exception in the send loop') - - await self.disconnect() - - async def _recv_loop(self): - """ - This loop is constantly putting items on the queue as they're read. - """ - while self._connected: - try: - data = await self._recv() - except asyncio.CancelledError: - break - except Exception as e: - if isinstance(e, (IOError, asyncio.IncompleteReadError)): - msg = 'The server closed the connection' - self._log.info(msg) - elif isinstance(e, InvalidChecksumError): - msg = 'The server response had an invalid checksum' - self._log.info(msg) - else: - msg = 'Unexpected exception in the receive loop' - self._log.exception(msg) - - await self.disconnect() - - # Add a sentinel value to unstuck recv - if self._recv_queue.empty(): - self._recv_queue.put_nowait(None) - - break - - try: - await self._recv_queue.put(data) - except asyncio.CancelledError: - break - - def _init_conn(self): - """ - This method will be called after `connect` is called. - After this method finishes, the writer will be drained. - - Subclasses should make use of this if they need to send - data to Telegram to indicate which connection mode will - be used. - """ - if self._codec.tag: - self._writer.write(self._codec.tag) - - def _send(self, data): - self._writer.write(self._codec.encode_packet(data)) - - async def _recv(self): - return await self._codec.read_packet(self._reader) - - def __str__(self): - return '{}:{}/{}'.format( - self._ip, self._port, - self.__class__.__name__.replace('Connection', '') - ) - - -class ObfuscatedConnection(Connection): - """ - Base class for "obfuscated" connections ("obfuscated2", "mtproto proxy") - """ - """ - This attribute should be redefined by subclasses - """ - obfuscated_io = None - - def _init_conn(self): - self._obfuscation = self.obfuscated_io(self) - self._writer.write(self._obfuscation.header) - - def _send(self, data): - self._obfuscation.write(self._codec.encode_packet(data)) - - async def _recv(self): - return await self._codec.read_packet(self._obfuscation) - - -class PacketCodec(abc.ABC): - """ - Base class for packet codecs - """ - - """ - This attribute should be re-defined by subclass to define if some - "magic bytes" should be sent to server right after connection is made to - signal which protocol will be used - """ - tag = None - - def __init__(self, connection): - """ - Codec is created when connection is just made. - """ - self._conn = connection - - @abc.abstractmethod - def encode_packet(self, data): - """ - Encodes single packet and returns encoded bytes. - """ - raise NotImplementedError - - @abc.abstractmethod - async def read_packet(self, reader): - """ - Reads single packet from `reader` object that should have - `readexactly(n)` method. - """ - raise NotImplementedError diff --git a/telethon/_network/connection/http.py b/telethon/_network/connection/http.py deleted file mode 100644 index e2d976f7..00000000 --- a/telethon/_network/connection/http.py +++ /dev/null @@ -1,39 +0,0 @@ -import asyncio - -from .connection import Connection, PacketCodec - - -SSL_PORT = 443 - - -class HttpPacketCodec(PacketCodec): - tag = None - obfuscate_tag = None - - def encode_packet(self, data): - return ('POST /api HTTP/1.1\r\n' - 'Host: {}:{}\r\n' - 'Content-Type: application/x-www-form-urlencoded\r\n' - 'Connection: keep-alive\r\n' - 'Keep-Alive: timeout=100000, max=10000000\r\n' - 'Content-Length: {}\r\n\r\n' - .format(self._conn._ip, self._conn._port, len(data)) - .encode('ascii') + data) - - async def read_packet(self, reader): - while True: - line = await reader.readline() - if not line or line[-1] != b'\n': - raise asyncio.IncompleteReadError(line, None) - - if line.lower().startswith(b'content-length: '): - await reader.readexactly(2) - length = int(line[16:-2]) - return await reader.readexactly(length) - - -class ConnectionHttp(Connection): - packet_codec = HttpPacketCodec - - async def connect(self, timeout=None, ssl=None): - await super().connect(timeout=timeout, ssl=self._port == SSL_PORT) diff --git a/telethon/_network/connection/tcpabridged.py b/telethon/_network/connection/tcpabridged.py deleted file mode 100644 index 171b1d8c..00000000 --- a/telethon/_network/connection/tcpabridged.py +++ /dev/null @@ -1,33 +0,0 @@ -import struct - -from .connection import Connection, PacketCodec - - -class AbridgedPacketCodec(PacketCodec): - tag = b'\xef' - obfuscate_tag = b'\xef\xef\xef\xef' - - def encode_packet(self, data): - length = len(data) >> 2 - if length < 127: - length = struct.pack('B', length) - else: - length = b'\x7f' + int.to_bytes(length, 3, 'little') - return length + data - - async def read_packet(self, reader): - length = struct.unpack('= 127: - length = struct.unpack( - ' 0: - return packet_with_padding[:-pad_size] - return packet_with_padding - - -class ConnectionTcpIntermediate(Connection): - """ - Intermediate mode between `ConnectionTcpFull` and `ConnectionTcpAbridged`. - Always sends 4 extra bytes for the packet length. - """ - packet_codec = IntermediatePacketCodec diff --git a/telethon/_network/connection/tcpmtproxy.py b/telethon/_network/connection/tcpmtproxy.py deleted file mode 100644 index db18a61c..00000000 --- a/telethon/_network/connection/tcpmtproxy.py +++ /dev/null @@ -1,152 +0,0 @@ -import asyncio -import hashlib -import os - -from .connection import ObfuscatedConnection -from .tcpabridged import AbridgedPacketCodec -from .tcpintermediate import ( - IntermediatePacketCodec, - RandomizedIntermediatePacketCodec -) - -from ..._crypto import AESModeCTR - - -class MTProxyIO: - """ - It's very similar to tcpobfuscated.ObfuscatedIO, but the way - encryption keys, protocol tag and dc_id are encoded is different. - """ - header = None - - def __init__(self, connection): - self._reader = connection._reader - self._writer = connection._writer - - (self.header, - self._encrypt, - self._decrypt) = self.init_header( - connection._secret, connection._dc_id, connection.packet_codec) - - @staticmethod - def init_header(secret, dc_id, packet_codec): - # Validate - is_dd = (len(secret) == 17) and (secret[0] == 0xDD) - is_rand_codec = issubclass( - packet_codec, RandomizedIntermediatePacketCodec) - if is_dd and not is_rand_codec: - raise ValueError( - "Only RandomizedIntermediate can be used with dd-secrets") - secret = secret[1:] if is_dd else secret - if len(secret) != 16: - raise ValueError( - "MTProxy secret must be a hex-string representing 16 bytes") - - # Obfuscated messages secrets cannot start with any of these - keywords = (b'PVrG', b'GET ', b'POST', b'\xee\xee\xee\xee') - while True: - random = os.urandom(64) - if (random[0] != 0xef and - random[:4] not in keywords and - random[4:4] != b'\0\0\0\0'): - break - - random = bytearray(random) - random_reversed = random[55:7:-1] # Reversed (8, len=48) - - # Encryption has "continuous buffer" enabled - encrypt_key = hashlib.sha256( - bytes(random[8:40]) + secret).digest() - encrypt_iv = bytes(random[40:56]) - decrypt_key = hashlib.sha256( - bytes(random_reversed[:32]) + secret).digest() - decrypt_iv = bytes(random_reversed[32:48]) - - encryptor = AESModeCTR(encrypt_key, encrypt_iv) - decryptor = AESModeCTR(decrypt_key, decrypt_iv) - - random[56:60] = packet_codec.obfuscate_tag - - dc_id_bytes = dc_id.to_bytes(2, "little", signed=True) - random = random[:60] + dc_id_bytes + random[62:] - random[56:64] = encryptor.encrypt(bytes(random))[56:64] - return (random, encryptor, decryptor) - - async def readexactly(self, n): - return self._decrypt.encrypt(await self._reader.readexactly(n)) - - def write(self, data): - self._writer.write(self._encrypt.encrypt(data)) - - -class TcpMTProxy(ObfuscatedConnection): - """ - Connector which allows user to connect to the Telegram via proxy servers - commonly known as MTProxy. - Implemented very ugly due to the leaky abstractions in Telethon networking - classes that should be refactored later (TODO). - - .. warning:: - - The support for TcpMTProxy classes is **EXPERIMENTAL** and prone to - be changed. You shouldn't be using this class yet. - """ - packet_codec = None - obfuscated_io = MTProxyIO - - # noinspection PyUnusedLocal - def __init__(self, ip, port, dc_id, *, loggers, proxy=None, local_addr=None): - # connect to proxy's host and port instead of telegram's ones - proxy_host, proxy_port = self.address_info(proxy) - self._secret = bytes.fromhex(proxy[2]) - super().__init__( - proxy_host, proxy_port, dc_id, loggers=loggers) - - async def _connect(self, timeout=None, ssl=None): - await super()._connect(timeout=timeout, ssl=ssl) - - # Wait for EOF for 2 seconds (or if _wait_for_data's definition - # is missing or different, just sleep for 2 seconds). This way - # we give the proxy a chance to close the connection if the current - # codec (which the proxy detects with the data we sent) cannot - # be used for this proxy. This is a work around for #1134. - # TODO Sleeping for N seconds may not be the best solution - # TODO This fix could be welcome for HTTP proxies as well - try: - await asyncio.wait_for(self._reader._wait_for_data('proxy'), 2) - except asyncio.TimeoutError: - pass - except Exception: - await asyncio.sleep(2) - - if self._reader.at_eof(): - await self.disconnect() - raise ConnectionError( - 'Proxy closed the connection after sending initial payload') - - @staticmethod - def address_info(proxy_info): - if proxy_info is None: - raise ValueError("No proxy info specified for MTProxy connection") - return proxy_info[:2] - - -class ConnectionTcpMTProxyAbridged(TcpMTProxy): - """ - Connect to proxy using abridged protocol - """ - packet_codec = AbridgedPacketCodec - - -class ConnectionTcpMTProxyIntermediate(TcpMTProxy): - """ - Connect to proxy using intermediate protocol - """ - packet_codec = IntermediatePacketCodec - - -class ConnectionTcpMTProxyRandomizedIntermediate(TcpMTProxy): - """ - Connect to proxy using randomized intermediate protocol (dd-secrets) - """ - packet_codec = RandomizedIntermediatePacketCodec diff --git a/telethon/_network/connection/tcpobfuscated.py b/telethon/_network/connection/tcpobfuscated.py deleted file mode 100644 index 2aeeeac1..00000000 --- a/telethon/_network/connection/tcpobfuscated.py +++ /dev/null @@ -1,62 +0,0 @@ -import os - -from .tcpabridged import AbridgedPacketCodec -from .connection import ObfuscatedConnection - -from ..._crypto import AESModeCTR - - -class ObfuscatedIO: - header = None - - def __init__(self, connection): - self._reader = connection._reader - self._writer = connection._writer - - (self.header, - self._encrypt, - self._decrypt) = self.init_header(connection.packet_codec) - - @staticmethod - def init_header(packet_codec): - # Obfuscated messages secrets cannot start with any of these - keywords = (b'PVrG', b'GET ', b'POST', b'\xee\xee\xee\xee') - while True: - random = os.urandom(64) - if (random[0] != 0xef and - random[:4] not in keywords and - random[4:8] != b'\0\0\0\0'): - break - - random = bytearray(random) - random_reversed = random[55:7:-1] # Reversed (8, len=48) - - # Encryption has "continuous buffer" enabled - encrypt_key = bytes(random[8:40]) - encrypt_iv = bytes(random[40:56]) - decrypt_key = bytes(random_reversed[:32]) - decrypt_iv = bytes(random_reversed[32:48]) - - encryptor = AESModeCTR(encrypt_key, encrypt_iv) - decryptor = AESModeCTR(decrypt_key, decrypt_iv) - - random[56:60] = packet_codec.obfuscate_tag - random[56:64] = encryptor.encrypt(bytes(random))[56:64] - return (random, encryptor, decryptor) - - async def readexactly(self, n): - return self._decrypt.encrypt(await self._reader.readexactly(n)) - - def write(self, data): - self._writer.write(self._encrypt.encrypt(data)) - - -class ConnectionTcpObfuscated(ObfuscatedConnection): - """ - Mode that Telegram defines as "obfuscated2". Encodes the packet - just like `ConnectionTcpAbridged`, but encrypts every message with - a randomly generated key using the AES-CTR mode so the packets are - harder to discern. - """ - obfuscated_io = ObfuscatedIO - packet_codec = AbridgedPacketCodec diff --git a/telethon/_network/transports/__init__.py b/telethon/_network/transports/__init__.py new file mode 100644 index 00000000..36dfc149 --- /dev/null +++ b/telethon/_network/transports/__init__.py @@ -0,0 +1,4 @@ +from .transport import Transport +from .abridged import Abridged +from .full import Full +from .intermediate import Intermediate diff --git a/telethon/_network/transports/abridged.py b/telethon/_network/transports/abridged.py new file mode 100644 index 00000000..c847c249 --- /dev/null +++ b/telethon/_network/transports/abridged.py @@ -0,0 +1,43 @@ +from .transport import Transport +import struct + + +class Abridged(Transport): + def __init__(self): + self._init = False + + def recreate_fresh(self): + return type(self)() + + def pack(self, input: bytes) -> bytes: + if self._init: + header = b'' + else: + header = b'\xef' + self._init = True + + length = len(data) >> 2 + if length < 127: + length = struct.pack('B', length) + else: + length = b'\x7f' + int.to_bytes(length, 3, 'little') + + return header + length + data + + def unpack(self, input: bytes) -> (int, bytes): + if len(input) < 4: + raise EOFError() + + length = input[0] + if length < 127: + offset = 1 + else: + offset = 4 + length = struct.unpack(' bytes: + # https://core.telegram.org/mtproto#tcp-transport + length = len(input) + 12 + data = struct.pack(' (int, bytes): + if len(input) < 12: + raise EOFError() + + length, seq = struct.unpack(' bytes: + if self._init: + header = b'' + else: + header = b'\xee\xee\xee\xee' + self._init = True + + return header + struct.pack(' (int, bytes): + if len(input) < 4: + raise EOFError() + + length = struct.unpack(' bytes: + pass + + # Should raise EOFError if it does not have enough bytes + @abc.abstractmethod + def unpack(self, input: bytes) -> (int, bytes): + pass