Rework class hierarchy, try to DRY more

This commit is contained in:
Сергей Прохоров 2019-03-12 01:12:55 +01:00
parent b873aa67cc
commit 4696dfc25e
No known key found for this signature in database
GPG Key ID: 1C570244E4EF3337
7 changed files with 248 additions and 218 deletions

View File

@ -18,6 +18,10 @@ class Connection(abc.ABC):
``ConnectionError``, which will raise when attempting to send if
the client is disconnected (includes remote disconnections).
"""
# this static attribute should be redefined by `Connection` subclasses and
# should be one of `PacketCodec` implementations
packet_codec = None
def __init__(self, ip, port, dc_id, *, loop, loggers, proxy=None):
self._ip = ip
self._port = port
@ -78,6 +82,7 @@ class Connection(abc.ABC):
await asyncio.open_connection(sock=s, loop=self._loop)
self._connected = True
self._codec = self.packet_codec(self)
self._init_conn()
await self._writer.drain()
@ -184,27 +189,71 @@ class Connection(abc.ABC):
data to Telegram to indicate which connection mode will
be used.
"""
if self._codec.tag:
self._writer.write(self._codec.tag)
@abc.abstractmethod
def _send(self, data):
"""
This method should be implemented differently under each
connection mode and serialize the data into the packet
the way it should be sent through `self._writer`.
"""
raise NotImplementedError
self._writer.write(self._codec.encode_packet(data))
@abc.abstractmethod
async def _recv(self):
"""
This method should be implemented differently under each
connection mode and deserialize the data from the packet
the way it should be read from `self._reader`.
"""
raise NotImplementedError
return await self._codec.read_packet(self._reader)
def __str__(self):
return '{}:{}/{}'.format(
self._ip, self._port,
self.__class__.__name__.replace('Connection', '')
)
class ObfuscatedConnection(Connection):
"""
Base class for "obfuscated" connections ("obfuscated2", "mtproto proxy")
"""
"""
This attribute should be redefined by subclasses
"""
obfuscated_io = None
def _init_conn(self):
self._obfuscation = self.obfuscated_io(self)
self._writer.write(self._obfuscation.header)
def _send(self, data):
self._obfuscation.write(self._codec.encode_packet(data))
async def _recv(self):
return await self._codec.read_packet(self._obfuscation)
class PacketCodec(abc.ABC):
"""
Base class for packet codecs
"""
"""
This attribute should be re-defined by subclass to define if some
"magic bytes" should be sent to server right after conection is made to
signal which protocol will be used
"""
tag = None
def __init__(self, connection):
"""
Codec is created when connection is just made.
"""
pass
@abc.abstractmethod
def encode_packet(self, data):
"""
Encodes single packet and returns encoded bytes.
"""
raise NotImplementedError
@abc.abstractmethod
async def read_packet(self, reader):
"""
Reads single packet from `reader` object that should have
`readexactly(n)` method.
"""
raise NotImplementedError

View File

@ -1,34 +1,43 @@
import asyncio
from .connection import Connection
from .connection import Connection, PacketCodec
SSL_PORT = 443
class ConnectionHttp(Connection):
async def connect(self, timeout=None, ssl=None):
await super().connect(timeout=timeout, ssl=self._port == SSL_PORT)
class HttpPacketCodec(PacketCodec):
tag = None
obfuscate_tag = None
def _send(self, message):
self._writer.write(
'POST /api HTTP/1.1\r\n'
'Host: {}:{}\r\n'
'Content-Type: application/x-www-form-urlencoded\r\n'
'Connection: keep-alive\r\n'
'Keep-Alive: timeout=100000, max=10000000\r\n'
'Content-Length: {}\r\n\r\n'
.format(self._ip, self._port, len(message))
.encode('ascii') + message
)
def __init__(self, connection):
self._ip = connection._ip
self._port = connection._port
async def _recv(self):
def encode_packet(self, data):
return ('POST /api HTTP/1.1\r\n'
'Host: {}:{}\r\n'
'Content-Type: application/x-www-form-urlencoded\r\n'
'Connection: keep-alive\r\n'
'Keep-Alive: timeout=100000, max=10000000\r\n'
'Content-Length: {}\r\n\r\n'
.format(self._ip, self._port, len(data))
.encode('ascii') + data)
async def read_packet(self, reader):
while True:
line = await self._reader.readline()
line = await reader.readline()
if not line or line[-1] != b'\n':
raise asyncio.IncompleteReadError(line, None)
if line.lower().startswith(b'content-length: '):
await self._reader.readexactly(2)
await reader.readexactly(2)
length = int(line[16:-2])
return await self._reader.readexactly(length)
return await reader.readexactly(length)
class ConnectionHttp(Connection):
packet_codec = HttpPacketCodec
async def connect(self, timeout=None, ssl=None):
await super().connect(timeout=timeout, ssl=self._port == SSL_PORT)

View File

@ -1,31 +1,11 @@
import struct
from .connection import Connection
from .connection import Connection, PacketCodec
class ConnectionTcpAbridged(Connection):
"""
This is the mode with the lowest overhead, as it will
only require 1 byte if the packet length is less than
508 bytes (127 << 2, which is very common).
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._codec = AbridgedPacket()
def _init_conn(self):
self._writer.write(self._codec.tag)
def _send(self, data):
self._writer.write(self._codec.encode_packet(data))
async def _recv(self):
return await self._codec.read_packet(self._reader)
class AbridgedPacket:
class AbridgedPacketCodec(PacketCodec):
tag = b'\xef'
mtproto_proxy_tag = b'\xef\xef\xef\xef'
obfuscate_tag = b'\xef\xef\xef\xef'
def encode_packet(self, data):
length = len(data) >> 2
@ -42,3 +22,12 @@ class AbridgedPacket:
'<i', await reader.readexactly(3) + b'\0')[0]
return await reader.readexactly(length << 2)
class ConnectionTcpAbridged(Connection):
"""
This is the mode with the lowest overhead, as it will
only require 1 byte if the packet length is less than
508 bytes (127 << 2, which is very common).
"""
packet_codec = AbridgedPacketCodec

View File

@ -1,36 +1,29 @@
import struct
from zlib import crc32
from .connection import Connection
from .connection import Connection, PacketCodec
from ...errors import InvalidChecksumError
class ConnectionTcpFull(Connection):
"""
Default Telegram mode. Sends 12 additional bytes and
needs to calculate the CRC value of the packet itself.
"""
def __init__(self, ip, port, dc_id, *, loop, loggers, proxy=None):
super().__init__(
ip, port, dc_id, loop=loop, loggers=loggers, proxy=proxy)
self._send_counter = 0
class FullPacketCodec(PacketCodec):
tag = None
def _init_conn(self):
def __init__(self, _conn):
self._send_counter = 0 # Important or Telegram won't reply
def _send(self, data):
def encode_packet(self, data):
# https://core.telegram.org/mtproto#tcp-transport
# total length, sequence number, packet and checksum (CRC32)
length = len(data) + 12
data = struct.pack('<ii', length, self._send_counter) + data
crc = struct.pack('<I', crc32(data))
self._send_counter += 1
self._writer.write(data + crc)
return data + crc
async def _recv(self):
packet_len_seq = await self._reader.readexactly(8) # 4 and 4
async def read_packet(self, reader):
packet_len_seq = await reader.readexactly(8) # 4 and 4
packet_len, seq = struct.unpack('<ii', packet_len_seq)
body = await self._reader.readexactly(packet_len - 8)
body = await reader.readexactly(packet_len - 8)
checksum = struct.unpack('<I', body[-4:])[0]
body = body[:-4]
@ -39,3 +32,11 @@ class ConnectionTcpFull(Connection):
raise InvalidChecksumError(checksum, valid_checksum)
return body
class ConnectionTcpFull(Connection):
"""
Default Telegram mode. Sends 12 additional bytes and
needs to calculate the CRC value of the packet itself.
"""
packet_codec = FullPacketCodec

View File

@ -2,31 +2,12 @@ import struct
import random
import os
from .connection import Connection
from .connection import Connection, PacketCodec
class ConnectionTcpIntermediate(Connection):
"""
Intermediate mode between `ConnectionTcpFull` and `ConnectionTcpAbridged`.
Always sends 4 extra bytes for the packet length.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._codec = IntermediatePacket()
def _init_conn(self):
self._writer.write(self._codec.tag)
def _send(self, data):
self._writer.write(self._codec.encode_packet(data))
async def _recv(self):
return await self.codec.read_packet(self._reader)
class IntermediatePacket:
class IntermediatePacketCodec(PacketCodec):
tag = b'\xee\xee\xee\xee'
mtproto_proxy_tag = tag
obfuscate_tag = tag
def encode_packet(self, data):
return struct.pack('<i', len(data)) + data
@ -36,12 +17,13 @@ class IntermediatePacket:
return await reader.readexactly(length)
class RandomizedIntermediatePacket(IntermediatePacket):
class RandomizedIntermediatePacketCodec(IntermediatePacketCodec):
"""
Data packets are aligned to 4bytes. This codec adds random bytes of size
from 0 to 3 bytes, which are ignored by decoder.
"""
mtproto_proxy_tag = b'\xdd\xdd\xdd\xdd'
tag = None
obfuscate_tag = b'\xdd\xdd\xdd\xdd'
def encode_packet(self, data):
pad_size = random.randint(0, 3)
@ -54,3 +36,11 @@ class RandomizedIntermediatePacket(IntermediatePacket):
if pad_size > 0:
return packet_with_padding[:-pad_size]
return packet_with_padding
class ConnectionTcpIntermediate(Connection):
"""
Intermediate mode between `ConnectionTcpFull` and `ConnectionTcpAbridged`.
Always sends 4 extra bytes for the packet length.
"""
packet_codec = IntermediatePacketCodec

View File

@ -1,84 +1,16 @@
import hashlib
import os
from .connection import Connection
from .tcpabridged import AbridgedPacket
from .tcpintermediate import IntermediatePacket, RandomizedIntermediatePacket
from .connection import ObfuscatedConnection
from .tcpabridged import AbridgedPacketCodec
from .tcpintermediate import (
IntermediatePacketCodec,
RandomizedIntermediatePacketCodec
)
from ...crypto import AESModeCTR
class TcpMTProxy(Connection):
"""
Connector which allows user to connect to the Telegram via proxy servers
commonly known as MTProxy.
Implemented very ugly due to the leaky abstractions in Telethon networking
classes that should be refactored later (TODO).
.. warning::
The support for MTProtoProxies class is **EXPERIMENTAL** and prone to
be changed. You shouldn't be using this class yet.
"""
packet_codec = None
@staticmethod
def address_info(proxy_info):
if proxy_info is None:
raise ValueError("No proxy info specified for MTProxy connection")
return proxy_info[:2]
def __init__(self, ip, port, dc_id, *, loop, loggers, proxy=None):
proxy_host, proxy_port = self.address_info(proxy)
super().__init__(
proxy_host, proxy_port, dc_id, loop=loop, loggers=loggers)
self._codec = self.packet_codec()
secret = bytes.fromhex(proxy[2])
is_dd = (len(secret) == 17) and (secret[0] == 0xDD)
if is_dd and (self.packet_codec != RandomizedIntermediatePacket):
raise ValueError(
"Only RandomizedIntermediate can be used with dd-secrets")
secret = secret[:-1] if is_dd else secret
if len(secret) != 16:
raise ValueError(
"MTProxy secret must be a hex-string representing 16 bytes")
self._dc_id = dc_id
self._secret = secret
def _init_conn(self):
self._obfuscation = MTProxyIO(self._reader, self._writer,
self._codec.mtproto_proxy_tag,
self._secret, self._dc_id)
self._writer.write(self._obfuscation.header)
def _send(self, data):
self._obfuscation.write(self._codec.encode_packet(data))
async def _recv(self):
return await self._codec.read_packet(self._obfuscation)
class ConnectionTcpMTProxyAbridged(TcpMTProxy):
"""
Connect to proxy using abridged protocol
"""
packet_codec = AbridgedPacket
class ConnectionTcpMTProxyIntermediate(TcpMTProxy):
"""
Connect to proxy using intermediate protocol
"""
packet_codec = IntermediatePacket
class ConnectionTcpMTProxyRandomizedIntermediate(TcpMTProxy):
"""
Connect to proxy using randomized intermediate protocol (dd-secrets)
"""
packet_codec = RandomizedIntermediatePacket
class MTProxyIO:
"""
It's very similar to tcpobfuscated.ObfuscatedIO, but the way
@ -86,9 +18,28 @@ class MTProxyIO:
"""
header = None
def __init__(self, reader, writer, protocol_tag, secret, dc_id):
self._reader = reader
self._writer = writer
def __init__(self, connection):
self._reader = connection._reader
self._writer = connection._writer
(self.header,
self._encrypt,
self._decrypt) = self.init_header(
connection._secret, connection._dc_id, connection.packet_codec)
def init_header(self, secret, dc_id, packet_codec):
# Validate
is_dd = (len(secret) == 17) and (secret[0] == 0xDD)
is_rand_codec = (
packet_codec == RandomizedIntermediatePacketCodec)
if is_dd and not is_rand_codec:
raise ValueError(
"Only RandomizedIntermediate can be used with dd-secrets")
secret = secret[:-1] if is_dd else secret
if len(secret) != 16:
raise ValueError(
"MTProxy secret must be a hex-string representing 16 bytes")
# Obfuscated messages secrets cannot start with any of these
keywords = (b'PVrG', b'GET ', b'POST', b'\xee\xee\xee\xee')
while True:
@ -109,19 +60,68 @@ class MTProxyIO:
bytes(random_reversed[:32]) + secret).digest()
decrypt_iv = bytes(random_reversed[32:48])
self._aes_encrypt = AESModeCTR(encrypt_key, encrypt_iv)
self._aes_decrypt = AESModeCTR(decrypt_key, decrypt_iv)
encryptor = AESModeCTR(encrypt_key, encrypt_iv)
decryptor = AESModeCTR(decrypt_key, decrypt_iv)
random[56:60] = protocol_tag
random[56:60] = packet_codec.obfuscate_tag
dc_id_bytes = dc_id.to_bytes(2, "little", signed=True)
random = random[:60] + dc_id_bytes + random[62:]
random[56:64] = self._aes_encrypt.encrypt(bytes(random))[56:64]
self.header = random
random[56:64] = encryptor.encrypt(bytes(random))[56:64]
return (random, encryptor, decryptor)
async def readexactly(self, n):
return self._aes_decrypt.encrypt(await self._reader.readexactly(n))
return self._decrypt.encrypt(await self._reader.readexactly(n))
def write(self, data):
self._writer.write(self._aes_encrypt.encrypt(data))
self._writer.write(self._encrypt.encrypt(data))
class TcpMTProxy(ObfuscatedConnection):
"""
Connector which allows user to connect to the Telegram via proxy servers
commonly known as MTProxy.
Implemented very ugly due to the leaky abstractions in Telethon networking
classes that should be refactored later (TODO).
.. warning::
The support for TcpMTProxy classes is **EXPERIMENTAL** and prone to
be changed. You shouldn't be using this class yet.
"""
packet_codec = None
obfuscated_io = MTProxyIO
@staticmethod
def address_info(proxy_info):
if proxy_info is None:
raise ValueError("No proxy info specified for MTProxy connection")
return proxy_info[:2]
def __init__(self, ip, port, dc_id, *, loop, loggers, proxy=None):
# connect to proxy's host and port instead of telegram's ones
proxy_host, proxy_port = self.address_info(proxy)
self._secret = bytes.fromhex(proxy[2])
super().__init__(
proxy_host, proxy_port, dc_id, loop=loop, loggers=loggers)
class ConnectionTcpMTProxyAbridged(TcpMTProxy):
"""
Connect to proxy using abridged protocol
"""
packet_codec = AbridgedPacketCodec
class ConnectionTcpMTProxyIntermediate(TcpMTProxy):
"""
Connect to proxy using intermediate protocol
"""
packet_codec = IntermediatePacketCodec
class ConnectionTcpMTProxyRandomizedIntermediate(TcpMTProxy):
"""
Connect to proxy using randomized intermediate protocol (dd-secrets)
"""
packet_codec = RandomizedIntermediatePacketCodec

View File

@ -1,41 +1,23 @@
import os
from .tcpabridged import AbridgedPacket
from .connection import Connection
from .tcpabridged import AbridgedPacketCodec
from .connection import ObfuscatedConnection
from ...crypto import AESModeCTR
class ConnectionTcpObfuscated(Connection):
"""
Mode that Telegram defines as "obfuscated2". Encodes the packet
just like `ConnectionTcpAbridged`, but encrypts every message with
a randomly generated key using the AES-CTR mode so the packets are
harder to discern.
"""
def __init__(self, ip, port, dc_id, *, loop, loggers, proxy=None):
super().__init__(
ip, port, dc_id, loop=loop, loggers=loggers, proxy=proxy)
self._codec = AbridgedPacket()
def _init_conn(self):
self._obfuscation = ObfuscatedIO(
self._reader, self._writer, self._codec.mtproto_proxy_tag)
self._writer.write(self._obfuscation.header)
def _send(self, data):
self._obfuscation.write(self._codec.encode_packet(data))
async def _recv(self):
return await self._codec.read_packet(self._obfuscation)
class ObfuscatedIO:
header = None
def __init__(self, reader, writer, protocol_tag):
self._reader = reader
self._writer = writer
def __init__(self, connection):
self._reader = connection._reader
self._writer = connection._writer
(self.header,
self._encrypt,
self._decrypt) = self.init_header(connection.packet_codec)
def init_header(self, packet_codec):
# Obfuscated messages secrets cannot start with any of these
keywords = (b'PVrG', b'GET ', b'POST', b'\xee\xee\xee\xee')
while True:
@ -54,16 +36,26 @@ class ObfuscatedIO:
decrypt_key = bytes(random_reversed[:32])
decrypt_iv = bytes(random_reversed[32:48])
self._aes_encrypt = AESModeCTR(encrypt_key, encrypt_iv)
self._aes_decrypt = AESModeCTR(decrypt_key, decrypt_iv)
encryptor = AESModeCTR(encrypt_key, encrypt_iv)
decryptor = AESModeCTR(decrypt_key, decrypt_iv)
random[56:60] = protocol_tag
random[56:64] = self._aes_encrypt.encrypt(bytes(random))[56:64]
self.header = random
random[56:60] = packet_codec.obfuscate_tag
random[56:64] = encryptor.encrypt(bytes(random))[56:64]
return (random, encryptor, decryptor)
async def readexactly(self, n):
return self._aes_decrypt.encrypt(await self._reader.readexactly(n))
return self._decrypt.encrypt(await self._reader.readexactly(n))
def write(self, data):
self._writer.write(self._aes_encrypt.encrypt(data))
self._writer.write(self._encrypt.encrypt(data))
class ConnectionTcpObfuscated(ObfuscatedConnection):
"""
Mode that Telegram defines as "obfuscated2". Encodes the packet
just like `ConnectionTcpAbridged`, but encrypts every message with
a randomly generated key using the AES-CTR mode so the packets are
harder to discern.
"""
obfuscated_io = ObfuscatedIO
packet_codec = AbridgedPacketCodec