mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2024-11-26 03:13:45 +03:00
Completely overhaul connections and transports
Reduce abstraction leaks. Now the transport can hold any state, rather than just the tag. It's also responsible to initialize on the first connection, and they can be cleanly reset. asyncio connections are no longer used, in favour of raw sockets, which should avoid some annoyances. For the time being, more obscure transport modes have been removed, as well as proxy support, until further cleaning is done.
This commit is contained in:
parent
02703e3753
commit
f5f0c84553
|
@ -11,7 +11,7 @@ import ipaddress
|
|||
from .. import version, __name__ as __base_name__, _tl
|
||||
from .._crypto import rsa
|
||||
from .._misc import markdown, entitycache, statecache, enums, helpers
|
||||
from .._network import MTProtoSender, Connection, ConnectionTcpFull, connection as conns
|
||||
from .._network import MTProtoSender, Connection, transports
|
||||
from .._sessions import Session, SQLiteSession, MemorySession
|
||||
from .._sessions.types import DataCenter, SessionState
|
||||
|
||||
|
@ -70,7 +70,7 @@ def init(
|
|||
api_id: int,
|
||||
api_hash: str,
|
||||
*,
|
||||
connection: 'typing.Type[Connection]' = ConnectionTcpFull,
|
||||
connection: 'typing.Type[Connection]' = (),
|
||||
use_ipv6: bool = False,
|
||||
proxy: typing.Union[tuple, dict] = None,
|
||||
local_addr: typing.Union[str, tuple] = None,
|
||||
|
@ -194,15 +194,12 @@ def init(
|
|||
# For now the current default remains TCP Full; may change to be "smart" if proxies are specified
|
||||
connection = enums.ConnectionMode.FULL
|
||||
|
||||
self._connection = {
|
||||
enums.ConnectionMode.FULL: conns.ConnectionTcpFull,
|
||||
enums.ConnectionMode.INTERMEDIATE: conns.ConnectionTcpIntermediate,
|
||||
enums.ConnectionMode.ABRIDGED: conns.ConnectionTcpAbridged,
|
||||
enums.ConnectionMode.OBFUSCATED: conns.ConnectionTcpObfuscated,
|
||||
enums.ConnectionMode.HTTP: conns.ConnectionHttp,
|
||||
self._transport = {
|
||||
enums.ConnectionMode.FULL: transports.Full(),
|
||||
enums.ConnectionMode.INTERMEDIATE: transports.Intermediate(),
|
||||
enums.ConnectionMode.ABRIDGED: transports.Abridged(),
|
||||
}[enums.parse_conn_mode(connection)]
|
||||
init_proxy = None if not issubclass(self._connection, conns.TcpMTProxy) else \
|
||||
_tl.InputClientProxy(*self._connection.address_info(proxy))
|
||||
init_proxy = None
|
||||
|
||||
# Used on connection. Capture the variables in a lambda since
|
||||
# exporting clients need to create this InvokeWithLayer.
|
||||
|
@ -334,13 +331,12 @@ async def connect(self: 'TelegramClient') -> None:
|
|||
# Use known key, if any
|
||||
self._sender.auth_key.key = dc.auth
|
||||
|
||||
if not await self._sender.connect(self._connection(
|
||||
str(ipaddress.ip_address((self._use_ipv6 and dc.ipv6) or dc.ipv4)),
|
||||
dc.port,
|
||||
dc.id,
|
||||
if not await self._sender.connect(Connection(
|
||||
ip=str(ipaddress.ip_address((self._use_ipv6 and dc.ipv6) or dc.ipv4)),
|
||||
port=dc.port,
|
||||
transport=self._transport.recreate_fresh(),
|
||||
loggers=self._log,
|
||||
proxy=self._proxy,
|
||||
local_addr=self._local_addr
|
||||
local_addr=self._local_addr,
|
||||
)):
|
||||
# We don't want to init or modify anything if we were already connected
|
||||
return
|
||||
|
@ -396,8 +392,7 @@ async def disconnect(self: 'TelegramClient'):
|
|||
return await _disconnect_coro(self)
|
||||
|
||||
def set_proxy(self: 'TelegramClient', proxy: typing.Union[tuple, dict]):
|
||||
init_proxy = None if not issubclass(self._connection, conns.TcpMTProxy) else \
|
||||
_tl.InputClientProxy(*self._connection.address_info(proxy))
|
||||
init_proxy = None
|
||||
|
||||
self._init_request.proxy = init_proxy
|
||||
self._proxy = proxy
|
||||
|
@ -481,13 +476,12 @@ async def _create_exported_sender(self: 'TelegramClient', dc_id):
|
|||
# If one were to do that, Telegram would reset the connection
|
||||
# with no further clues.
|
||||
sender = MTProtoSender(loggers=self._log)
|
||||
await sender.connect(self._connection(
|
||||
str(ipaddress.ip_address((self._use_ipv6 and dc.ipv6) or dc.ipv4)),
|
||||
dc.port,
|
||||
dc.id,
|
||||
await self._sender.connect(Connection(
|
||||
ip=str(ipaddress.ip_address((self._use_ipv6 and dc.ipv6) or dc.ipv4)),
|
||||
port=dc.port,
|
||||
transport=self._transport.recreate_fresh(),
|
||||
loggers=self._log,
|
||||
proxy=self._proxy,
|
||||
local_addr=self._local_addr
|
||||
local_addr=self._local_addr,
|
||||
))
|
||||
self._log[__name__].info('Exporting auth for new borrowed sender in %s', dc)
|
||||
auth = await self(_tl.fn.auth.ExportAuthorization(dc_id))
|
||||
|
@ -516,13 +510,13 @@ async def _borrow_exported_sender(self: 'TelegramClient', dc_id):
|
|||
|
||||
elif state.need_connect():
|
||||
dc = self._all_dcs[dc_id]
|
||||
await sender.connect(self._connection(
|
||||
str(ipaddress.ip_address((self._use_ipv6 and dc.ipv6) or dc.ipv4)),
|
||||
dc.port,
|
||||
dc.id,
|
||||
|
||||
await self._sender.connect(Connection(
|
||||
ip=str(ipaddress.ip_address((self._use_ipv6 and dc.ipv6) or dc.ipv4)),
|
||||
port=dc.port,
|
||||
transport=self._transport.recreate_fresh(),
|
||||
loggers=self._log,
|
||||
proxy=self._proxy,
|
||||
local_addr=self._local_addr
|
||||
local_addr=self._local_addr,
|
||||
))
|
||||
|
||||
state.add_borrow()
|
||||
|
|
|
@ -10,7 +10,6 @@ from . import (
|
|||
)
|
||||
from .. import version, _tl
|
||||
from ..types import _custom
|
||||
from .._network import ConnectionTcpFull
|
||||
from .._events.common import EventBuilder, EventCommon
|
||||
from .._misc import enums
|
||||
|
||||
|
|
|
@ -15,8 +15,6 @@ class ConnectionMode(Enum):
|
|||
FULL = 'full'
|
||||
INTERMEDIATE = 'intermediate'
|
||||
ABRIDGED = 'abridged'
|
||||
OBFUSCATED = 'obfuscated'
|
||||
HTTP = 'http'
|
||||
|
||||
|
||||
class Participant(Enum):
|
||||
|
|
|
@ -5,10 +5,5 @@ with Telegram's servers and the protocol used (TCP full, abridged, etc.).
|
|||
from .mtprotoplainsender import MTProtoPlainSender
|
||||
from .authenticator import do_authentication
|
||||
from .mtprotosender import MTProtoSender
|
||||
from .connection import (
|
||||
Connection,
|
||||
ConnectionTcpFull, ConnectionTcpIntermediate, ConnectionTcpAbridged,
|
||||
ConnectionTcpObfuscated, ConnectionTcpMTProxyAbridged,
|
||||
ConnectionTcpMTProxyIntermediate,
|
||||
ConnectionTcpMTProxyRandomizedIntermediate, ConnectionHttp, TcpMTProxy
|
||||
)
|
||||
from .connection import Connection
|
||||
from . import transports
|
||||
|
|
61
telethon/_network/connection.py
Normal file
61
telethon/_network/connection.py
Normal file
|
@ -0,0 +1,61 @@
|
|||
import asyncio
|
||||
import socket
|
||||
|
||||
from .transports.transport import Transport
|
||||
|
||||
|
||||
CHUNK_SIZE = 32 * 1024
|
||||
|
||||
|
||||
# TODO ideally the mtproto impl would also be sans-io, but that's less pressing
|
||||
class Connection:
|
||||
def __init__(self, ip, port, *, transport: Transport, loggers, local_addr=None):
|
||||
self._ip = ip
|
||||
self._port = port
|
||||
self._log = loggers[__name__]
|
||||
self._local_addr = local_addr
|
||||
|
||||
self._sock = None
|
||||
self._in_buffer = bytearray()
|
||||
self._transport = transport
|
||||
|
||||
async def connect(self, timeout=None, ssl=None):
|
||||
"""
|
||||
Establishes a connection with the server.
|
||||
"""
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.setblocking(False)
|
||||
if self._local_addr:
|
||||
sock.bind(self._local_addr)
|
||||
|
||||
await asyncio.wait_for(loop.sock_connect(sock, (self._ip, self._port)), timeout)
|
||||
self._sock = sock
|
||||
|
||||
async def disconnect(self):
|
||||
self._sock.close()
|
||||
self._sock = None
|
||||
|
||||
async def send(self, data):
|
||||
if not self._sock:
|
||||
raise ConnectionError('not connected')
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
await loop.sock_sendall(self._sock, self._transport.pack(data))
|
||||
|
||||
async def recv(self):
|
||||
if not self._sock:
|
||||
raise ConnectionError('not connected')
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
while True:
|
||||
try:
|
||||
length, body = self._transport.unpack(self._in_buffer)
|
||||
del self._in_buffer[:length]
|
||||
return body
|
||||
except EOFError:
|
||||
self._in_buffer += await loop.sock_recv(self._sock, CHUNK_SIZE)
|
||||
|
||||
def __str__(self):
|
||||
return f'{self._ip}:{self._port}/{self._transport.__class__.__name__}'
|
|
@ -1,12 +0,0 @@
|
|||
from .connection import Connection
|
||||
from .tcpfull import ConnectionTcpFull
|
||||
from .tcpintermediate import ConnectionTcpIntermediate
|
||||
from .tcpabridged import ConnectionTcpAbridged
|
||||
from .tcpobfuscated import ConnectionTcpObfuscated
|
||||
from .tcpmtproxy import (
|
||||
TcpMTProxy,
|
||||
ConnectionTcpMTProxyAbridged,
|
||||
ConnectionTcpMTProxyIntermediate,
|
||||
ConnectionTcpMTProxyRandomizedIntermediate
|
||||
)
|
||||
from .http import ConnectionHttp
|
|
@ -1,426 +0,0 @@
|
|||
import abc
|
||||
import asyncio
|
||||
import socket
|
||||
import sys
|
||||
|
||||
try:
|
||||
import ssl as ssl_mod
|
||||
except ImportError:
|
||||
ssl_mod = None
|
||||
|
||||
try:
|
||||
import python_socks
|
||||
except ImportError:
|
||||
python_socks = None
|
||||
|
||||
from ...errors._custom import InvalidChecksumError
|
||||
from ..._misc import helpers
|
||||
|
||||
|
||||
class Connection(abc.ABC):
|
||||
"""
|
||||
The `Connection` class is a wrapper around ``asyncio.open_connection``.
|
||||
|
||||
Subclasses will implement different transport modes as atomic operations,
|
||||
which this class eases doing since the exposed interface simply puts and
|
||||
gets complete data payloads to and from queues.
|
||||
|
||||
The only error that will raise from send and receive methods is
|
||||
``ConnectionError``, which will raise when attempting to send if
|
||||
the client is disconnected (includes remote disconnections).
|
||||
"""
|
||||
# this static attribute should be redefined by `Connection` subclasses and
|
||||
# should be one of `PacketCodec` implementations
|
||||
packet_codec = None
|
||||
|
||||
def __init__(self, ip, port, dc_id, *, loggers, proxy=None, local_addr=None):
|
||||
self._ip = ip
|
||||
self._port = port
|
||||
self._dc_id = dc_id # only for MTProxy, it's an abstraction leak
|
||||
self._log = loggers[__name__]
|
||||
self._proxy = proxy
|
||||
self._local_addr = local_addr
|
||||
self._reader = None
|
||||
self._writer = None
|
||||
self._connected = False
|
||||
self._send_task = None
|
||||
self._recv_task = None
|
||||
self._codec = None
|
||||
self._obfuscation = None # TcpObfuscated and MTProxy
|
||||
self._send_queue = asyncio.Queue(1)
|
||||
self._recv_queue = asyncio.Queue(1)
|
||||
|
||||
@staticmethod
|
||||
def _wrap_socket_ssl(sock):
|
||||
if ssl_mod is None:
|
||||
raise RuntimeError(
|
||||
'Cannot use proxy that requires SSL '
|
||||
'without the SSL module being available'
|
||||
)
|
||||
|
||||
return ssl_mod.wrap_socket(
|
||||
sock,
|
||||
do_handshake_on_connect=True,
|
||||
ssl_version=ssl_mod.PROTOCOL_SSLv23,
|
||||
ciphers='ADH-AES256-SHA')
|
||||
|
||||
@staticmethod
|
||||
def _parse_proxy(proxy_type, addr, port, rdns=True, username=None, password=None):
|
||||
if isinstance(proxy_type, str):
|
||||
proxy_type = proxy_type.lower()
|
||||
|
||||
# Always prefer `python_socks` when available
|
||||
if python_socks:
|
||||
from python_socks import ProxyType
|
||||
|
||||
# We do the check for numerical values here
|
||||
# to be backwards compatible with PySocks proxy format,
|
||||
# (since socks.SOCKS5 == 2, socks.SOCKS4 == 1, socks.HTTP == 3)
|
||||
if proxy_type == ProxyType.SOCKS5 or proxy_type == 2 or proxy_type == "socks5":
|
||||
protocol = ProxyType.SOCKS5
|
||||
elif proxy_type == ProxyType.SOCKS4 or proxy_type == 1 or proxy_type == "socks4":
|
||||
protocol = ProxyType.SOCKS4
|
||||
elif proxy_type == ProxyType.HTTP or proxy_type == 3 or proxy_type == "http":
|
||||
protocol = ProxyType.HTTP
|
||||
else:
|
||||
raise ValueError("Unknown proxy protocol type: {}".format(proxy_type))
|
||||
|
||||
# This tuple must be compatible with `python_socks`' `Proxy.create()` signature
|
||||
return protocol, addr, port, username, password, rdns
|
||||
|
||||
else:
|
||||
from socks import SOCKS5, SOCKS4, HTTP
|
||||
|
||||
if proxy_type == 2 or proxy_type == "socks5":
|
||||
protocol = SOCKS5
|
||||
elif proxy_type == 1 or proxy_type == "socks4":
|
||||
protocol = SOCKS4
|
||||
elif proxy_type == 3 or proxy_type == "http":
|
||||
protocol = HTTP
|
||||
else:
|
||||
raise ValueError("Unknown proxy protocol type: {}".format(proxy_type))
|
||||
|
||||
# This tuple must be compatible with `PySocks`' `socksocket.set_proxy()` signature
|
||||
return protocol, addr, port, rdns, username, password
|
||||
|
||||
async def _proxy_connect(self, timeout=None, local_addr=None):
|
||||
if isinstance(self._proxy, (tuple, list)):
|
||||
parsed = self._parse_proxy(*self._proxy)
|
||||
elif isinstance(self._proxy, dict):
|
||||
parsed = self._parse_proxy(**self._proxy)
|
||||
else:
|
||||
raise TypeError("Proxy of unknown format: {}".format(type(self._proxy)))
|
||||
|
||||
# Always prefer `python_socks` when available
|
||||
if python_socks:
|
||||
# python_socks internal errors are not inherited from
|
||||
# builtin IOError (just from Exception). Instead of adding those
|
||||
# in exceptions clauses everywhere through the code, we
|
||||
# rather monkey-patch them in place.
|
||||
|
||||
python_socks._errors.ProxyError = ConnectionError
|
||||
python_socks._errors.ProxyConnectionError = ConnectionError
|
||||
python_socks._errors.ProxyTimeoutError = ConnectionError
|
||||
|
||||
from python_socks.async_.asyncio import Proxy
|
||||
|
||||
proxy = Proxy.create(*parsed)
|
||||
|
||||
# WARNING: If `local_addr` is set we use manual socket creation, because,
|
||||
# unfortunately, `Proxy.connect()` does not expose `local_addr`
|
||||
# argument, so if we want to bind socket locally, we need to manually
|
||||
# create, bind and connect socket, and then pass to `Proxy.connect()` method.
|
||||
|
||||
if local_addr is None:
|
||||
sock = await proxy.connect(
|
||||
dest_host=self._ip,
|
||||
dest_port=self._port,
|
||||
timeout=timeout
|
||||
)
|
||||
else:
|
||||
# Here we start manual setup of the socket.
|
||||
# The `address` represents the proxy ip and proxy port,
|
||||
# not the destination one (!), because the socket
|
||||
# connects to the proxy server, not destination server.
|
||||
# IPv family is also checked on proxy address.
|
||||
if ':' in proxy.proxy_host:
|
||||
mode, address = socket.AF_INET6, (proxy.proxy_host, proxy.proxy_port, 0, 0)
|
||||
else:
|
||||
mode, address = socket.AF_INET, (proxy.proxy_host, proxy.proxy_port)
|
||||
|
||||
# Create a non-blocking socket and bind it (if local address is specified).
|
||||
sock = socket.socket(mode, socket.SOCK_STREAM)
|
||||
sock.setblocking(False)
|
||||
sock.bind(local_addr)
|
||||
|
||||
# Actual TCP connection is performed here.
|
||||
await asyncio.wait_for(
|
||||
asyncio.get_event_loop().sock_connect(sock=sock, address=address),
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
# As our socket is already created and connected,
|
||||
# this call sets the destination host/port and
|
||||
# starts protocol negotiations with the proxy server.
|
||||
sock = await proxy.connect(
|
||||
dest_host=self._ip,
|
||||
dest_port=self._port,
|
||||
timeout=timeout,
|
||||
_socket=sock
|
||||
)
|
||||
|
||||
else:
|
||||
import socks
|
||||
|
||||
# Here `address` represents destination address (not proxy), because of
|
||||
# the `PySocks` implementation of the connection routine.
|
||||
# IPv family is checked on proxy address, not destination address.
|
||||
if ':' in parsed[1]:
|
||||
mode, address = socket.AF_INET6, (self._ip, self._port, 0, 0)
|
||||
else:
|
||||
mode, address = socket.AF_INET, (self._ip, self._port)
|
||||
|
||||
# Setup socket, proxy, timeout and bind it (if necessary).
|
||||
sock = socks.socksocket(mode, socket.SOCK_STREAM)
|
||||
sock.set_proxy(*parsed)
|
||||
sock.settimeout(timeout)
|
||||
|
||||
if local_addr is not None:
|
||||
sock.bind(local_addr)
|
||||
|
||||
# Actual TCP connection and negotiation performed here.
|
||||
await asyncio.wait_for(
|
||||
asyncio.get_event_loop().sock_connect(sock=sock, address=address),
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
sock.setblocking(False)
|
||||
|
||||
return sock
|
||||
|
||||
async def _connect(self, timeout=None, ssl=None):
|
||||
if self._local_addr is not None:
|
||||
# NOTE: If port is not specified, we use 0 port
|
||||
# to notify the OS that port should be chosen randomly
|
||||
# from the available ones.
|
||||
if isinstance(self._local_addr, tuple) and len(self._local_addr) == 2:
|
||||
local_addr = self._local_addr
|
||||
elif isinstance(self._local_addr, str):
|
||||
local_addr = (self._local_addr, 0)
|
||||
else:
|
||||
raise ValueError("Unknown local address format: {}".format(self._local_addr))
|
||||
else:
|
||||
local_addr = None
|
||||
|
||||
if not self._proxy:
|
||||
self._reader, self._writer = await asyncio.wait_for(
|
||||
asyncio.open_connection(
|
||||
host=self._ip,
|
||||
port=self._port,
|
||||
ssl=ssl,
|
||||
local_addr=local_addr
|
||||
), timeout=timeout)
|
||||
else:
|
||||
# Proxy setup, connection and negotiation is performed here.
|
||||
sock = await self._proxy_connect(
|
||||
timeout=timeout,
|
||||
local_addr=local_addr
|
||||
)
|
||||
|
||||
# Wrap socket in SSL context (if provided)
|
||||
if ssl:
|
||||
sock = self._wrap_socket_ssl(sock)
|
||||
|
||||
self._reader, self._writer = await asyncio.open_connection(sock=sock)
|
||||
|
||||
self._codec = self.packet_codec(self)
|
||||
self._init_conn()
|
||||
await self._writer.drain()
|
||||
|
||||
async def connect(self, timeout=None, ssl=None):
|
||||
"""
|
||||
Establishes a connection with the server.
|
||||
"""
|
||||
await self._connect(timeout=timeout, ssl=ssl)
|
||||
self._connected = True
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
self._send_task = loop.create_task(self._send_loop())
|
||||
self._recv_task = loop.create_task(self._recv_loop())
|
||||
|
||||
async def disconnect(self):
|
||||
"""
|
||||
Disconnects from the server, and clears
|
||||
pending outgoing and incoming messages.
|
||||
"""
|
||||
self._connected = False
|
||||
|
||||
await helpers._cancel(
|
||||
self._log,
|
||||
send_task=self._send_task,
|
||||
recv_task=self._recv_task
|
||||
)
|
||||
|
||||
if self._writer:
|
||||
self._writer.close()
|
||||
try:
|
||||
await self._writer.wait_closed()
|
||||
except Exception as e:
|
||||
# Disconnecting should never raise. Seen:
|
||||
# * OSError: No route to host and
|
||||
# * OSError: [Errno 32] Broken pipe
|
||||
# * ConnectionResetError
|
||||
self._log.info('%s during disconnect: %s', type(e), e)
|
||||
|
||||
def send(self, data):
|
||||
"""
|
||||
Sends a packet of data through this connection mode.
|
||||
|
||||
This method returns a coroutine.
|
||||
"""
|
||||
if not self._connected:
|
||||
raise ConnectionError('Not connected')
|
||||
|
||||
return self._send_queue.put(data)
|
||||
|
||||
async def recv(self):
|
||||
"""
|
||||
Receives a packet of data through this connection mode.
|
||||
|
||||
This method returns a coroutine.
|
||||
"""
|
||||
while self._connected:
|
||||
result = await self._recv_queue.get()
|
||||
if result: # None = sentinel value = keep trying
|
||||
return result
|
||||
|
||||
raise ConnectionError('Not connected')
|
||||
|
||||
async def _send_loop(self):
|
||||
"""
|
||||
This loop is constantly popping items off the queue to send them.
|
||||
"""
|
||||
try:
|
||||
while self._connected:
|
||||
self._send(await self._send_queue.get())
|
||||
await self._writer.drain()
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as e:
|
||||
if isinstance(e, IOError):
|
||||
self._log.info('The server closed the connection while sending')
|
||||
else:
|
||||
self._log.exception('Unexpected exception in the send loop')
|
||||
|
||||
await self.disconnect()
|
||||
|
||||
async def _recv_loop(self):
|
||||
"""
|
||||
This loop is constantly putting items on the queue as they're read.
|
||||
"""
|
||||
while self._connected:
|
||||
try:
|
||||
data = await self._recv()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
if isinstance(e, (IOError, asyncio.IncompleteReadError)):
|
||||
msg = 'The server closed the connection'
|
||||
self._log.info(msg)
|
||||
elif isinstance(e, InvalidChecksumError):
|
||||
msg = 'The server response had an invalid checksum'
|
||||
self._log.info(msg)
|
||||
else:
|
||||
msg = 'Unexpected exception in the receive loop'
|
||||
self._log.exception(msg)
|
||||
|
||||
await self.disconnect()
|
||||
|
||||
# Add a sentinel value to unstuck recv
|
||||
if self._recv_queue.empty():
|
||||
self._recv_queue.put_nowait(None)
|
||||
|
||||
break
|
||||
|
||||
try:
|
||||
await self._recv_queue.put(data)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
|
||||
def _init_conn(self):
|
||||
"""
|
||||
This method will be called after `connect` is called.
|
||||
After this method finishes, the writer will be drained.
|
||||
|
||||
Subclasses should make use of this if they need to send
|
||||
data to Telegram to indicate which connection mode will
|
||||
be used.
|
||||
"""
|
||||
if self._codec.tag:
|
||||
self._writer.write(self._codec.tag)
|
||||
|
||||
def _send(self, data):
|
||||
self._writer.write(self._codec.encode_packet(data))
|
||||
|
||||
async def _recv(self):
|
||||
return await self._codec.read_packet(self._reader)
|
||||
|
||||
def __str__(self):
|
||||
return '{}:{}/{}'.format(
|
||||
self._ip, self._port,
|
||||
self.__class__.__name__.replace('Connection', '')
|
||||
)
|
||||
|
||||
|
||||
class ObfuscatedConnection(Connection):
|
||||
"""
|
||||
Base class for "obfuscated" connections ("obfuscated2", "mtproto proxy")
|
||||
"""
|
||||
"""
|
||||
This attribute should be redefined by subclasses
|
||||
"""
|
||||
obfuscated_io = None
|
||||
|
||||
def _init_conn(self):
|
||||
self._obfuscation = self.obfuscated_io(self)
|
||||
self._writer.write(self._obfuscation.header)
|
||||
|
||||
def _send(self, data):
|
||||
self._obfuscation.write(self._codec.encode_packet(data))
|
||||
|
||||
async def _recv(self):
|
||||
return await self._codec.read_packet(self._obfuscation)
|
||||
|
||||
|
||||
class PacketCodec(abc.ABC):
|
||||
"""
|
||||
Base class for packet codecs
|
||||
"""
|
||||
|
||||
"""
|
||||
This attribute should be re-defined by subclass to define if some
|
||||
"magic bytes" should be sent to server right after connection is made to
|
||||
signal which protocol will be used
|
||||
"""
|
||||
tag = None
|
||||
|
||||
def __init__(self, connection):
|
||||
"""
|
||||
Codec is created when connection is just made.
|
||||
"""
|
||||
self._conn = connection
|
||||
|
||||
@abc.abstractmethod
|
||||
def encode_packet(self, data):
|
||||
"""
|
||||
Encodes single packet and returns encoded bytes.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
async def read_packet(self, reader):
|
||||
"""
|
||||
Reads single packet from `reader` object that should have
|
||||
`readexactly(n)` method.
|
||||
"""
|
||||
raise NotImplementedError
|
|
@ -1,39 +0,0 @@
|
|||
import asyncio
|
||||
|
||||
from .connection import Connection, PacketCodec
|
||||
|
||||
|
||||
SSL_PORT = 443
|
||||
|
||||
|
||||
class HttpPacketCodec(PacketCodec):
|
||||
tag = None
|
||||
obfuscate_tag = None
|
||||
|
||||
def encode_packet(self, data):
|
||||
return ('POST /api HTTP/1.1\r\n'
|
||||
'Host: {}:{}\r\n'
|
||||
'Content-Type: application/x-www-form-urlencoded\r\n'
|
||||
'Connection: keep-alive\r\n'
|
||||
'Keep-Alive: timeout=100000, max=10000000\r\n'
|
||||
'Content-Length: {}\r\n\r\n'
|
||||
.format(self._conn._ip, self._conn._port, len(data))
|
||||
.encode('ascii') + data)
|
||||
|
||||
async def read_packet(self, reader):
|
||||
while True:
|
||||
line = await reader.readline()
|
||||
if not line or line[-1] != b'\n':
|
||||
raise asyncio.IncompleteReadError(line, None)
|
||||
|
||||
if line.lower().startswith(b'content-length: '):
|
||||
await reader.readexactly(2)
|
||||
length = int(line[16:-2])
|
||||
return await reader.readexactly(length)
|
||||
|
||||
|
||||
class ConnectionHttp(Connection):
|
||||
packet_codec = HttpPacketCodec
|
||||
|
||||
async def connect(self, timeout=None, ssl=None):
|
||||
await super().connect(timeout=timeout, ssl=self._port == SSL_PORT)
|
|
@ -1,33 +0,0 @@
|
|||
import struct
|
||||
|
||||
from .connection import Connection, PacketCodec
|
||||
|
||||
|
||||
class AbridgedPacketCodec(PacketCodec):
|
||||
tag = b'\xef'
|
||||
obfuscate_tag = b'\xef\xef\xef\xef'
|
||||
|
||||
def encode_packet(self, data):
|
||||
length = len(data) >> 2
|
||||
if length < 127:
|
||||
length = struct.pack('B', length)
|
||||
else:
|
||||
length = b'\x7f' + int.to_bytes(length, 3, 'little')
|
||||
return length + data
|
||||
|
||||
async def read_packet(self, reader):
|
||||
length = struct.unpack('<B', await reader.readexactly(1))[0]
|
||||
if length >= 127:
|
||||
length = struct.unpack(
|
||||
'<i', await reader.readexactly(3) + b'\0')[0]
|
||||
|
||||
return await reader.readexactly(length << 2)
|
||||
|
||||
|
||||
class ConnectionTcpAbridged(Connection):
|
||||
"""
|
||||
This is the mode with the lowest overhead, as it will
|
||||
only require 1 byte if the packet length is less than
|
||||
508 bytes (127 << 2, which is very common).
|
||||
"""
|
||||
packet_codec = AbridgedPacketCodec
|
|
@ -1,43 +0,0 @@
|
|||
import struct
|
||||
from zlib import crc32
|
||||
|
||||
from .connection import Connection, PacketCodec
|
||||
from ...errors._custom import InvalidChecksumError
|
||||
|
||||
|
||||
class FullPacketCodec(PacketCodec):
|
||||
tag = None
|
||||
|
||||
def __init__(self, connection):
|
||||
super().__init__(connection)
|
||||
self._send_counter = 0 # Important or Telegram won't reply
|
||||
|
||||
def encode_packet(self, data):
|
||||
# https://core.telegram.org/mtproto#tcp-transport
|
||||
# total length, sequence number, packet and checksum (CRC32)
|
||||
length = len(data) + 12
|
||||
data = struct.pack('<ii', length, self._send_counter) + data
|
||||
crc = struct.pack('<I', crc32(data))
|
||||
self._send_counter += 1
|
||||
return data + crc
|
||||
|
||||
async def read_packet(self, reader):
|
||||
packet_len_seq = await reader.readexactly(8) # 4 and 4
|
||||
packet_len, seq = struct.unpack('<ii', packet_len_seq)
|
||||
body = await reader.readexactly(packet_len - 8)
|
||||
checksum = struct.unpack('<I', body[-4:])[0]
|
||||
body = body[:-4]
|
||||
|
||||
valid_checksum = crc32(packet_len_seq + body)
|
||||
if checksum != valid_checksum:
|
||||
raise InvalidChecksumError(checksum, valid_checksum)
|
||||
|
||||
return body
|
||||
|
||||
|
||||
class ConnectionTcpFull(Connection):
|
||||
"""
|
||||
Default Telegram mode. Sends 12 additional bytes and
|
||||
needs to calculate the CRC value of the packet itself.
|
||||
"""
|
||||
packet_codec = FullPacketCodec
|
|
@ -1,46 +0,0 @@
|
|||
import struct
|
||||
import random
|
||||
import os
|
||||
|
||||
from .connection import Connection, PacketCodec
|
||||
|
||||
|
||||
class IntermediatePacketCodec(PacketCodec):
|
||||
tag = b'\xee\xee\xee\xee'
|
||||
obfuscate_tag = tag
|
||||
|
||||
def encode_packet(self, data):
|
||||
return struct.pack('<i', len(data)) + data
|
||||
|
||||
async def read_packet(self, reader):
|
||||
length = struct.unpack('<i', await reader.readexactly(4))[0]
|
||||
return await reader.readexactly(length)
|
||||
|
||||
|
||||
class RandomizedIntermediatePacketCodec(IntermediatePacketCodec):
|
||||
"""
|
||||
Data packets are aligned to 4bytes. This codec adds random bytes of size
|
||||
from 0 to 3 bytes, which are ignored by decoder.
|
||||
"""
|
||||
tag = None
|
||||
obfuscate_tag = b'\xdd\xdd\xdd\xdd'
|
||||
|
||||
def encode_packet(self, data):
|
||||
pad_size = random.randint(0, 3)
|
||||
padding = os.urandom(pad_size)
|
||||
return super().encode_packet(data + padding)
|
||||
|
||||
async def read_packet(self, reader):
|
||||
packet_with_padding = await super().read_packet(reader)
|
||||
pad_size = len(packet_with_padding) % 4
|
||||
if pad_size > 0:
|
||||
return packet_with_padding[:-pad_size]
|
||||
return packet_with_padding
|
||||
|
||||
|
||||
class ConnectionTcpIntermediate(Connection):
|
||||
"""
|
||||
Intermediate mode between `ConnectionTcpFull` and `ConnectionTcpAbridged`.
|
||||
Always sends 4 extra bytes for the packet length.
|
||||
"""
|
||||
packet_codec = IntermediatePacketCodec
|
|
@ -1,152 +0,0 @@
|
|||
import asyncio
|
||||
import hashlib
|
||||
import os
|
||||
|
||||
from .connection import ObfuscatedConnection
|
||||
from .tcpabridged import AbridgedPacketCodec
|
||||
from .tcpintermediate import (
|
||||
IntermediatePacketCodec,
|
||||
RandomizedIntermediatePacketCodec
|
||||
)
|
||||
|
||||
from ..._crypto import AESModeCTR
|
||||
|
||||
|
||||
class MTProxyIO:
|
||||
"""
|
||||
It's very similar to tcpobfuscated.ObfuscatedIO, but the way
|
||||
encryption keys, protocol tag and dc_id are encoded is different.
|
||||
"""
|
||||
header = None
|
||||
|
||||
def __init__(self, connection):
|
||||
self._reader = connection._reader
|
||||
self._writer = connection._writer
|
||||
|
||||
(self.header,
|
||||
self._encrypt,
|
||||
self._decrypt) = self.init_header(
|
||||
connection._secret, connection._dc_id, connection.packet_codec)
|
||||
|
||||
@staticmethod
|
||||
def init_header(secret, dc_id, packet_codec):
|
||||
# Validate
|
||||
is_dd = (len(secret) == 17) and (secret[0] == 0xDD)
|
||||
is_rand_codec = issubclass(
|
||||
packet_codec, RandomizedIntermediatePacketCodec)
|
||||
if is_dd and not is_rand_codec:
|
||||
raise ValueError(
|
||||
"Only RandomizedIntermediate can be used with dd-secrets")
|
||||
secret = secret[1:] if is_dd else secret
|
||||
if len(secret) != 16:
|
||||
raise ValueError(
|
||||
"MTProxy secret must be a hex-string representing 16 bytes")
|
||||
|
||||
# Obfuscated messages secrets cannot start with any of these
|
||||
keywords = (b'PVrG', b'GET ', b'POST', b'\xee\xee\xee\xee')
|
||||
while True:
|
||||
random = os.urandom(64)
|
||||
if (random[0] != 0xef and
|
||||
random[:4] not in keywords and
|
||||
random[4:4] != b'\0\0\0\0'):
|
||||
break
|
||||
|
||||
random = bytearray(random)
|
||||
random_reversed = random[55:7:-1] # Reversed (8, len=48)
|
||||
|
||||
# Encryption has "continuous buffer" enabled
|
||||
encrypt_key = hashlib.sha256(
|
||||
bytes(random[8:40]) + secret).digest()
|
||||
encrypt_iv = bytes(random[40:56])
|
||||
decrypt_key = hashlib.sha256(
|
||||
bytes(random_reversed[:32]) + secret).digest()
|
||||
decrypt_iv = bytes(random_reversed[32:48])
|
||||
|
||||
encryptor = AESModeCTR(encrypt_key, encrypt_iv)
|
||||
decryptor = AESModeCTR(decrypt_key, decrypt_iv)
|
||||
|
||||
random[56:60] = packet_codec.obfuscate_tag
|
||||
|
||||
dc_id_bytes = dc_id.to_bytes(2, "little", signed=True)
|
||||
random = random[:60] + dc_id_bytes + random[62:]
|
||||
random[56:64] = encryptor.encrypt(bytes(random))[56:64]
|
||||
return (random, encryptor, decryptor)
|
||||
|
||||
async def readexactly(self, n):
|
||||
return self._decrypt.encrypt(await self._reader.readexactly(n))
|
||||
|
||||
def write(self, data):
|
||||
self._writer.write(self._encrypt.encrypt(data))
|
||||
|
||||
|
||||
class TcpMTProxy(ObfuscatedConnection):
|
||||
"""
|
||||
Connector which allows user to connect to the Telegram via proxy servers
|
||||
commonly known as MTProxy.
|
||||
Implemented very ugly due to the leaky abstractions in Telethon networking
|
||||
classes that should be refactored later (TODO).
|
||||
|
||||
.. warning::
|
||||
|
||||
The support for TcpMTProxy classes is **EXPERIMENTAL** and prone to
|
||||
be changed. You shouldn't be using this class yet.
|
||||
"""
|
||||
packet_codec = None
|
||||
obfuscated_io = MTProxyIO
|
||||
|
||||
# noinspection PyUnusedLocal
|
||||
def __init__(self, ip, port, dc_id, *, loggers, proxy=None, local_addr=None):
|
||||
# connect to proxy's host and port instead of telegram's ones
|
||||
proxy_host, proxy_port = self.address_info(proxy)
|
||||
self._secret = bytes.fromhex(proxy[2])
|
||||
super().__init__(
|
||||
proxy_host, proxy_port, dc_id, loggers=loggers)
|
||||
|
||||
async def _connect(self, timeout=None, ssl=None):
|
||||
await super()._connect(timeout=timeout, ssl=ssl)
|
||||
|
||||
# Wait for EOF for 2 seconds (or if _wait_for_data's definition
|
||||
# is missing or different, just sleep for 2 seconds). This way
|
||||
# we give the proxy a chance to close the connection if the current
|
||||
# codec (which the proxy detects with the data we sent) cannot
|
||||
# be used for this proxy. This is a work around for #1134.
|
||||
# TODO Sleeping for N seconds may not be the best solution
|
||||
# TODO This fix could be welcome for HTTP proxies as well
|
||||
try:
|
||||
await asyncio.wait_for(self._reader._wait_for_data('proxy'), 2)
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
except Exception:
|
||||
await asyncio.sleep(2)
|
||||
|
||||
if self._reader.at_eof():
|
||||
await self.disconnect()
|
||||
raise ConnectionError(
|
||||
'Proxy closed the connection after sending initial payload')
|
||||
|
||||
@staticmethod
|
||||
def address_info(proxy_info):
|
||||
if proxy_info is None:
|
||||
raise ValueError("No proxy info specified for MTProxy connection")
|
||||
return proxy_info[:2]
|
||||
|
||||
|
||||
class ConnectionTcpMTProxyAbridged(TcpMTProxy):
|
||||
"""
|
||||
Connect to proxy using abridged protocol
|
||||
"""
|
||||
packet_codec = AbridgedPacketCodec
|
||||
|
||||
|
||||
class ConnectionTcpMTProxyIntermediate(TcpMTProxy):
|
||||
"""
|
||||
Connect to proxy using intermediate protocol
|
||||
"""
|
||||
packet_codec = IntermediatePacketCodec
|
||||
|
||||
|
||||
class ConnectionTcpMTProxyRandomizedIntermediate(TcpMTProxy):
|
||||
"""
|
||||
Connect to proxy using randomized intermediate protocol (dd-secrets)
|
||||
"""
|
||||
packet_codec = RandomizedIntermediatePacketCodec
|
|
@ -1,62 +0,0 @@
|
|||
import os
|
||||
|
||||
from .tcpabridged import AbridgedPacketCodec
|
||||
from .connection import ObfuscatedConnection
|
||||
|
||||
from ..._crypto import AESModeCTR
|
||||
|
||||
|
||||
class ObfuscatedIO:
|
||||
header = None
|
||||
|
||||
def __init__(self, connection):
|
||||
self._reader = connection._reader
|
||||
self._writer = connection._writer
|
||||
|
||||
(self.header,
|
||||
self._encrypt,
|
||||
self._decrypt) = self.init_header(connection.packet_codec)
|
||||
|
||||
@staticmethod
|
||||
def init_header(packet_codec):
|
||||
# Obfuscated messages secrets cannot start with any of these
|
||||
keywords = (b'PVrG', b'GET ', b'POST', b'\xee\xee\xee\xee')
|
||||
while True:
|
||||
random = os.urandom(64)
|
||||
if (random[0] != 0xef and
|
||||
random[:4] not in keywords and
|
||||
random[4:8] != b'\0\0\0\0'):
|
||||
break
|
||||
|
||||
random = bytearray(random)
|
||||
random_reversed = random[55:7:-1] # Reversed (8, len=48)
|
||||
|
||||
# Encryption has "continuous buffer" enabled
|
||||
encrypt_key = bytes(random[8:40])
|
||||
encrypt_iv = bytes(random[40:56])
|
||||
decrypt_key = bytes(random_reversed[:32])
|
||||
decrypt_iv = bytes(random_reversed[32:48])
|
||||
|
||||
encryptor = AESModeCTR(encrypt_key, encrypt_iv)
|
||||
decryptor = AESModeCTR(decrypt_key, decrypt_iv)
|
||||
|
||||
random[56:60] = packet_codec.obfuscate_tag
|
||||
random[56:64] = encryptor.encrypt(bytes(random))[56:64]
|
||||
return (random, encryptor, decryptor)
|
||||
|
||||
async def readexactly(self, n):
|
||||
return self._decrypt.encrypt(await self._reader.readexactly(n))
|
||||
|
||||
def write(self, data):
|
||||
self._writer.write(self._encrypt.encrypt(data))
|
||||
|
||||
|
||||
class ConnectionTcpObfuscated(ObfuscatedConnection):
|
||||
"""
|
||||
Mode that Telegram defines as "obfuscated2". Encodes the packet
|
||||
just like `ConnectionTcpAbridged`, but encrypts every message with
|
||||
a randomly generated key using the AES-CTR mode so the packets are
|
||||
harder to discern.
|
||||
"""
|
||||
obfuscated_io = ObfuscatedIO
|
||||
packet_codec = AbridgedPacketCodec
|
4
telethon/_network/transports/__init__.py
Normal file
4
telethon/_network/transports/__init__.py
Normal file
|
@ -0,0 +1,4 @@
|
|||
from .transport import Transport
|
||||
from .abridged import Abridged
|
||||
from .full import Full
|
||||
from .intermediate import Intermediate
|
43
telethon/_network/transports/abridged.py
Normal file
43
telethon/_network/transports/abridged.py
Normal file
|
@ -0,0 +1,43 @@
|
|||
from .transport import Transport
|
||||
import struct
|
||||
|
||||
|
||||
class Abridged(Transport):
|
||||
def __init__(self):
|
||||
self._init = False
|
||||
|
||||
def recreate_fresh(self):
|
||||
return type(self)()
|
||||
|
||||
def pack(self, input: bytes) -> bytes:
|
||||
if self._init:
|
||||
header = b''
|
||||
else:
|
||||
header = b'\xef'
|
||||
self._init = True
|
||||
|
||||
length = len(data) >> 2
|
||||
if length < 127:
|
||||
length = struct.pack('B', length)
|
||||
else:
|
||||
length = b'\x7f' + int.to_bytes(length, 3, 'little')
|
||||
|
||||
return header + length + data
|
||||
|
||||
def unpack(self, input: bytes) -> (int, bytes):
|
||||
if len(input) < 4:
|
||||
raise EOFError()
|
||||
|
||||
length = input[0]
|
||||
if length < 127:
|
||||
offset = 1
|
||||
else:
|
||||
offset = 4
|
||||
length = struct.unpack('<i', input[1:4] + b'\0')[0]
|
||||
|
||||
length = (length << 2) + offset
|
||||
|
||||
if len(input) < length:
|
||||
raise EOFError()
|
||||
|
||||
return length, input[offset:length]
|
41
telethon/_network/transports/full.py
Normal file
41
telethon/_network/transports/full.py
Normal file
|
@ -0,0 +1,41 @@
|
|||
from .transport import Transport
|
||||
import struct
|
||||
from zlib import crc32
|
||||
|
||||
|
||||
class Full(Transport):
|
||||
def __init__(self):
|
||||
self._send_counter = 0
|
||||
self._recv_counter = 0
|
||||
|
||||
def recreate_fresh(self):
|
||||
return type(self)()
|
||||
|
||||
def pack(self, input: bytes) -> bytes:
|
||||
# https://core.telegram.org/mtproto#tcp-transport
|
||||
length = len(input) + 12
|
||||
data = struct.pack('<ii', length, self._send_counter) + input
|
||||
crc = struct.pack('<I', crc32(data))
|
||||
self._send_counter += 1
|
||||
return data + crc
|
||||
|
||||
def unpack(self, input: bytes) -> (int, bytes):
|
||||
if len(input) < 12:
|
||||
raise EOFError()
|
||||
|
||||
length, seq = struct.unpack('<ii', input[:8])
|
||||
if len(input) < length:
|
||||
raise EOFError()
|
||||
|
||||
if seq != self._recv_counter:
|
||||
raise ValueError(f'expected sequence value {self._recv_counter!r}, got {seq!r}')
|
||||
|
||||
body = input[8:length - 4]
|
||||
checksum = struct.unpack('<I', input[length - 4:length])[0]
|
||||
|
||||
valid_checksum = crc32(input[:length - 4])
|
||||
if checksum != valid_checksum:
|
||||
raise InvalidChecksumError(checksum, valid_checksum)
|
||||
|
||||
self._recv_counter += 1
|
||||
return length, body
|
29
telethon/_network/transports/intermediate.py
Normal file
29
telethon/_network/transports/intermediate.py
Normal file
|
@ -0,0 +1,29 @@
|
|||
from .transport import Transport
|
||||
import struct
|
||||
|
||||
|
||||
class Intermediate(Transport):
|
||||
def __init__(self):
|
||||
self._init = False
|
||||
|
||||
def recreate_fresh(self):
|
||||
return type(self)()
|
||||
|
||||
def pack(self, input: bytes) -> bytes:
|
||||
if self._init:
|
||||
header = b''
|
||||
else:
|
||||
header = b'\xee\xee\xee\xee'
|
||||
self._init = True
|
||||
|
||||
return header + struct.pack('<i', len(data)) + data
|
||||
|
||||
def unpack(self, input: bytes) -> (int, bytes):
|
||||
if len(input) < 4:
|
||||
raise EOFError()
|
||||
|
||||
length = struct.unpack('<i', input[:4])[0] + 4
|
||||
if len(input) < length:
|
||||
raise EOFError()
|
||||
|
||||
return length, input[4:length]
|
17
telethon/_network/transports/transport.py
Normal file
17
telethon/_network/transports/transport.py
Normal file
|
@ -0,0 +1,17 @@
|
|||
import abc
|
||||
|
||||
|
||||
class Transport(abc.ABC):
|
||||
# Should return a newly-created instance of itself
|
||||
@abc.abstractmethod
|
||||
def recreate_fresh(self):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def pack(self, input: bytes) -> bytes:
|
||||
pass
|
||||
|
||||
# Should raise EOFError if it does not have enough bytes
|
||||
@abc.abstractmethod
|
||||
def unpack(self, input: bytes) -> (int, bytes):
|
||||
pass
|
Loading…
Reference in New Issue
Block a user