Make the Connection a proper ABC (#509)

This commit is contained in:
Lonami Exo 2018-05-10 14:22:19 +02:00
parent dd954b8fbd
commit ba4b7ce881
11 changed files with 254 additions and 331 deletions

View File

@ -1,7 +1,7 @@
import logging
from .telegram_bare_client import TelegramBareClient
from .telegram_client import TelegramClient
from .network import ConnectionMode
from .network import connection
from . import tl, version

View File

@ -5,4 +5,7 @@ with Telegram's servers and the protocol used (TCP full, abridged, etc.).
from .mtproto_plain_sender import MtProtoPlainSender
from .authenticator import do_authentication
from .mtproto_sender import MtProtoSender
from .connection import Connection, ConnectionMode
from .connection import (
ConnectionTcpFull, ConnectionTcpAbridged, ConnectionTcpObfuscated,
ConnectionTcpIntermediate
)

View File

@ -1,316 +0,0 @@
"""
This module holds both the Connection class and the ConnectionMode enum,
which specifies the protocol to be used by the Connection.
"""
import logging
import os
import struct
from datetime import timedelta
from zlib import crc32
from enum import Enum
import errno
from ..crypto import AESModeCTR
from ..extensions import TcpClient
from ..errors import InvalidChecksumError
__log__ = logging.getLogger(__name__)
class ConnectionMode(Enum):
"""Represents which mode should be used to stabilise a connection.
TCP_FULL: Default Telegram mode. Sends 12 additional bytes and
needs to calculate the CRC value of the packet itself.
TCP_INTERMEDIATE: Intermediate mode between TCP_FULL and TCP_ABRIDGED.
Always sends 4 extra bytes for the packet length.
TCP_ABRIDGED: This is the mode with the lowest overhead, as it will
only require 1 byte if the packet length is less than
508 bytes (127 << 2, which is very common).
TCP_OBFUSCATED: Encodes the packet just like TCP_ABRIDGED, but encrypts
every message with a randomly generated key using the
AES-CTR mode so the packets are harder to discern.
"""
TCP_FULL = 1
TCP_INTERMEDIATE = 2
TCP_ABRIDGED = 3
TCP_OBFUSCATED = 4
class Connection:
"""
Represents an abstract connection (TCP, TCP abridged...).
'mode' must be any of the ConnectionMode enumeration.
Note that '.send()' and '.recv()' refer to messages, which
will be packed accordingly, whereas '.write()' and '.read()'
work on plain bytes, with no further additions.
"""
def __init__(self, mode=ConnectionMode.TCP_FULL,
proxy=None, timeout=timedelta(seconds=5)):
"""
Initializes a new connection.
:param mode: the ConnectionMode to be used.
:param proxy: whether to use a proxy or not.
:param timeout: timeout to be used for all operations.
"""
self._mode = mode
self._send_counter = 0
self._aes_encrypt, self._aes_decrypt = None, None
# TODO Rename "TcpClient" as some sort of generic socket?
self.conn = TcpClient(proxy=proxy, timeout=timeout)
# Sending messages
if mode == ConnectionMode.TCP_FULL:
setattr(self, 'send', self._send_tcp_full)
setattr(self, 'recv', self._recv_tcp_full)
elif mode == ConnectionMode.TCP_INTERMEDIATE:
setattr(self, 'send', self._send_intermediate)
setattr(self, 'recv', self._recv_intermediate)
elif mode in (ConnectionMode.TCP_ABRIDGED,
ConnectionMode.TCP_OBFUSCATED):
setattr(self, 'send', self._send_abridged)
setattr(self, 'recv', self._recv_abridged)
# Writing and reading from the socket
if mode == ConnectionMode.TCP_OBFUSCATED:
setattr(self, 'write', self._write_obfuscated)
setattr(self, 'read', self._read_obfuscated)
else:
setattr(self, 'write', self._write_plain)
setattr(self, 'read', self._read_plain)
def connect(self, ip, port):
"""
Estabilishes a connection to IP:port.
:param ip: the IP to connect to.
:param port: the port to connect to.
"""
try:
self.conn.connect(ip, port)
except OSError as e:
if e.errno == errno.EISCONN:
return # Already connected, no need to re-set everything up
else:
raise
self._send_counter = 0
if self._mode == ConnectionMode.TCP_ABRIDGED:
self.conn.write(b'\xef')
elif self._mode == ConnectionMode.TCP_INTERMEDIATE:
self.conn.write(b'\xee\xee\xee\xee')
elif self._mode == ConnectionMode.TCP_OBFUSCATED:
self._setup_obfuscation()
def get_timeout(self):
"""Returns the timeout used by the connection."""
return self.conn.timeout
def _setup_obfuscation(self):
"""
Sets up the obfuscated protocol.
"""
# Obfuscated messages secrets cannot start with any of these
keywords = (b'PVrG', b'GET ', b'POST', b'\xee' * 4)
while True:
random = os.urandom(64)
if (random[0] != b'\xef' and
random[:4] not in keywords and
random[4:4] != b'\0\0\0\0'):
# Invalid random generated
break
random = list(random)
random[56] = random[57] = random[58] = random[59] = 0xef
random_reversed = random[55:7:-1] # Reversed (8, len=48)
# encryption has "continuous buffer" enabled
encrypt_key = bytes(random[8:40])
encrypt_iv = bytes(random[40:56])
decrypt_key = bytes(random_reversed[:32])
decrypt_iv = bytes(random_reversed[32:48])
self._aes_encrypt = AESModeCTR(encrypt_key, encrypt_iv)
self._aes_decrypt = AESModeCTR(decrypt_key, decrypt_iv)
random[56:64] = self._aes_encrypt.encrypt(bytes(random))[56:64]
self.conn.write(bytes(random))
def is_connected(self):
"""
Determines whether the connection is alive or not.
:return: true if it's connected.
"""
return self.conn.connected
def close(self):
"""Closes the connection."""
self.conn.close()
def clone(self):
"""Creates a copy of this Connection."""
return Connection(
mode=self._mode, proxy=self.conn.proxy, timeout=self.conn.timeout
)
# region Receive message implementations
def recv(self):
"""Receives and unpacks a message"""
# Default implementation is just an error
raise ValueError('Invalid connection mode specified: ' + str(self._mode))
def _recv_tcp_full(self):
"""
Receives a message from the network,
internally encoded using the TCP full protocol.
May raise InvalidChecksumError if the received data doesn't
match its valid checksum.
:return: the read message payload.
"""
packet_len_seq = self.read(8) # 4 and 4
packet_len, seq = struct.unpack('<ii', packet_len_seq)
body = self.read(packet_len - 12)
checksum = struct.unpack('<I', self.read(4))[0]
valid_checksum = crc32(packet_len_seq + body)
if checksum != valid_checksum:
raise InvalidChecksumError(checksum, valid_checksum)
return body
def _recv_intermediate(self):
"""
Receives a message from the network,
internally encoded using the TCP intermediate protocol.
:return: the read message payload.
"""
return self.read(struct.unpack('<i', self.read(4))[0])
def _recv_abridged(self):
"""
Receives a message from the network,
internally encoded using the TCP abridged protocol.
:return: the read message payload.
"""
length = struct.unpack('<B', self.read(1))[0]
if length >= 127:
length = struct.unpack('<i', self.read(3) + b'\0')[0]
return self.read(length << 2)
# endregion
# region Send message implementations
def send(self, message):
"""Encapsulates and sends the given message"""
# Default implementation is just an error
raise ValueError('Invalid connection mode specified: ' + str(self._mode))
def _send_tcp_full(self, message):
"""
Encapsulates and sends the given message payload
using the TCP full mode (length, sequence, message, crc32).
:param message: the message to be sent.
"""
# https://core.telegram.org/mtproto#tcp-transport
# total length, sequence number, packet and checksum (CRC32)
length = len(message) + 12
data = struct.pack('<ii', length, self._send_counter) + message
crc = struct.pack('<I', crc32(data))
self._send_counter += 1
self.write(data + crc)
def _send_intermediate(self, message):
"""
Encapsulates and sends the given message payload
using the TCP intermediate mode (length, message).
:param message: the message to be sent.
"""
self.write(struct.pack('<i', len(message)) + message)
def _send_abridged(self, message):
"""
Encapsulates and sends the given message payload
using the TCP abridged mode (short length, message).
:param message: the message to be sent.
"""
length = len(message) >> 2
if length < 127:
length = struct.pack('B', length)
else:
length = b'\x7f' + int.to_bytes(length, 3, 'little')
self.write(length + message)
# endregion
# region Read implementations
def read(self, length):
raise ValueError('Invalid connection mode specified: ' + str(self._mode))
def _read_plain(self, length):
"""
Reads data from the socket connection.
:param length: how many bytes should be read.
:return: a byte sequence with len(data) == length
"""
return self.conn.read(length)
def _read_obfuscated(self, length):
"""
Reads data and decrypts from the socket connection.
:param length: how many bytes should be read.
:return: the decrypted byte sequence with len(data) == length
"""
return self._aes_decrypt.encrypt(
self.conn.read(length)
)
# endregion
# region Write implementations
def write(self, data):
raise ValueError('Invalid connection mode specified: ' + str(self._mode))
def _write_plain(self, data):
"""
Writes the given data through the socket connection.
:param data: the data in bytes to be written.
"""
self.conn.write(data)
def _write_obfuscated(self, data):
"""
Writes the given data through the socket connection,
using the obfuscated mode (AES encryption is applied on top).
:param data: the data in bytes to be written.
"""
self.conn.write(self._aes_encrypt.encrypt(data))
# endregion

View File

@ -0,0 +1,4 @@
from .tcpfull import ConnectionTcpFull
from .tcpabridged import ConnectionTcpAbridged
from .tcpobfuscated import ConnectionTcpObfuscated
from .tcpintermediate import ConnectionTcpIntermediate

View File

@ -0,0 +1,57 @@
"""
This module holds the abstract `Connection` class.
"""
import abc
from datetime import timedelta
class Connection(abc.ABC):
"""
Represents an abstract connection for Telegram.
Subclasses should implement the actual protocol
being used when encoding/decoding messages.
"""
def __init__(self, proxy=None, timeout=timedelta(seconds=5)):
"""
Initializes a new connection.
:param proxy: whether to use a proxy or not.
:param timeout: timeout to be used for all operations.
"""
self._proxy = proxy
self._timeout = timeout
@abc.abstractmethod
def get_timeout(self):
"""Returns the timeout used by the connection."""
raise NotImplementedError
@abc.abstractmethod
def is_connected(self):
"""
Determines whether the connection is alive or not.
:return: true if it's connected.
"""
raise NotImplementedError
@abc.abstractmethod
def close(self):
"""Closes the connection."""
raise NotImplementedError
@abc.abstractmethod
def clone(self):
"""Creates a copy of this Connection."""
raise NotImplementedError
@abc.abstractmethod
def recv(self):
"""Receives and unpacks a message"""
raise NotImplementedError
@abc.abstractmethod
def send(self, message):
"""Encapsulates and sends the given message"""
raise NotImplementedError

View File

@ -0,0 +1,34 @@
import struct
from .tcpfull import ConnectionTcpFull
class ConnectionTcpAbridged(ConnectionTcpFull):
"""
This is the mode with the lowest overhead, as it will
only require 1 byte if the packet length is less than
508 bytes (127 << 2, which is very common).
"""
def connect(self, ip, port):
result = super().connect(ip, port)
self.conn.write(b'\xef')
return result
def clone(self):
return ConnectionTcpAbridged(self._proxy, self._timeout)
def recv(self):
length = struct.unpack('<B', self.read(1))[0]
if length >= 127:
length = struct.unpack('<i', self.read(3) + b'\0')[0]
return self.read(length << 2)
def send(self, message):
length = len(message) >> 2
if length < 127:
length = struct.pack('B', length)
else:
length = b'\x7f' + int.to_bytes(length, 3, 'little')
self.write(length + message)

View File

@ -0,0 +1,65 @@
import errno
import struct
from datetime import timedelta
from zlib import crc32
from .common import Connection
from ...errors import InvalidChecksumError
from ...extensions import TcpClient
class ConnectionTcpFull(Connection):
"""
Default Telegram mode. Sends 12 additional bytes and
needs to calculate the CRC value of the packet itself.
"""
def __init__(self, proxy=None, timeout=timedelta(seconds=5)):
super().__init__(proxy, timeout)
self._send_counter = 0
self.conn = TcpClient(proxy=self._proxy, timeout=self._timeout)
self.read = self.conn.read
self.write = self.conn.write
def connect(self, ip, port):
try:
self.conn.connect(ip, port)
except OSError as e:
if e.errno == errno.EISCONN:
return # Already connected, no need to re-set everything up
else:
raise
self._send_counter = 0
def get_timeout(self):
return self.conn.timeout
def is_connected(self):
return self.conn.connected
def close(self):
self.conn.close()
def clone(self):
return ConnectionTcpFull(self._proxy, self._timeout)
def recv(self):
packet_len_seq = self.read(8) # 4 and 4
packet_len, seq = struct.unpack('<ii', packet_len_seq)
body = self.read(packet_len - 12)
checksum = struct.unpack('<I', self.read(4))[0]
valid_checksum = crc32(packet_len_seq + body)
if checksum != valid_checksum:
raise InvalidChecksumError(checksum, valid_checksum)
return body
def send(self, message):
# https://core.telegram.org/mtproto#tcp-transport
# total length, sequence number, packet and checksum (CRC32)
length = len(message) + 12
data = struct.pack('<ii', length, self._send_counter) + message
crc = struct.pack('<I', crc32(data))
self._send_counter += 1
self.write(data + crc)

View File

@ -0,0 +1,23 @@
import struct
from .tcpfull import ConnectionTcpFull
class ConnectionTcpIntermediate(ConnectionTcpFull):
"""
Intermediate mode between `ConnectionTcpFull` and `ConnectionTcpAbridged`.
Always sends 4 extra bytes for the packet length.
"""
def connect(self, ip, port):
result = super().connect(ip, port)
self.conn.write(b'\xee\xee\xee\xee')
return result
def clone(self):
return ConnectionTcpIntermediate(self._proxy, self._timeout)
def recv(self):
return self.read(struct.unpack('<i', self.read(4))[0])
def send(self, message):
self.write(struct.pack('<i', len(message)) + message)

View File

@ -0,0 +1,50 @@
import os
from datetime import timedelta
from .tcpfull import ConnectionTcpFull
from .tcpabridged import ConnectionTcpAbridged
from ...crypto import AESModeCTR
class ConnectionTcpObfuscated(ConnectionTcpAbridged):
"""
Encodes the packet just like `ConnectionTcpAbridged`, but encrypts
every message with a randomly generated key using the
AES-CTR mode so the packets are harder to discern.
"""
def __init__(self, proxy=None, timeout=timedelta(seconds=5)):
super().__init__(proxy, timeout)
self._aes_encrypt, self._aes_decrypt = None, None
self.read = lambda s: self._aes_decrypt.encrypt(self.conn.read(s))
self.write = lambda d: self.conn.write(self._aes_encrypt.encrypt(d))
def connect(self, ip, port):
result = ConnectionTcpFull.connect(self, ip, port)
# Obfuscated messages secrets cannot start with any of these
keywords = (b'PVrG', b'GET ', b'POST', b'\xee' * 4)
while True:
random = os.urandom(64)
if (random[0] != b'\xef' and
random[:4] not in keywords and
random[4:4] != b'\0\0\0\0'):
break
random = list(random)
random[56] = random[57] = random[58] = random[59] = 0xef
random_reversed = random[55:7:-1] # Reversed (8, len=48)
# encryption has "continuous buffer" enabled
encrypt_key = bytes(random[8:40])
encrypt_iv = bytes(random[40:56])
decrypt_key = bytes(random_reversed[:32])
decrypt_iv = bytes(random_reversed[32:48])
self._aes_encrypt = AESModeCTR(encrypt_key, encrypt_iv)
self._aes_decrypt = AESModeCTR(decrypt_key, decrypt_iv)
random[56:64] = self._aes_encrypt.encrypt(bytes(random))[56:64]
self.conn.write(bytes(random))
return result
def clone(self):
return ConnectionTcpObfuscated(self._proxy, self._timeout)

View File

@ -14,7 +14,7 @@ from .errors import (
PhoneMigrateError, NetworkMigrateError, UserMigrateError, AuthKeyError,
RpcCallFailError
)
from .network import authenticator, MtProtoSender, Connection, ConnectionMode
from .network import authenticator, MtProtoSender, ConnectionTcpFull
from .sessions import Session, SQLiteSession
from .tl import TLObject
from .tl.all_tlobjects import LAYER
@ -68,7 +68,8 @@ class TelegramBareClient:
# region Initialization
def __init__(self, session, api_id, api_hash,
connection_mode=ConnectionMode.TCP_FULL,
*,
connection=ConnectionTcpFull,
use_ipv6=False,
proxy=None,
update_workers=None,
@ -114,9 +115,10 @@ class TelegramBareClient:
# that calls .connect(). Every other thread will spawn a new
# temporary connection. The connection on this one is always
# kept open so Telegram can send us updates.
self._sender = MtProtoSender(self.session, Connection(
mode=connection_mode, proxy=proxy, timeout=timeout
))
if isinstance(connection, type):
connection = connection(proxy=proxy, timeout=timeout)
self._sender = MtProtoSender(self.session, connection)
# Two threads may be calling reconnect() when the connection is lost,
# we only want one to actually perform the reconnection.

View File

@ -45,7 +45,7 @@ from .errors import (
SessionPasswordNeededError, FileMigrateError, PhoneNumberUnoccupiedError,
PhoneNumberOccupiedError, UsernameNotOccupiedError
)
from .network import ConnectionMode
from .network import ConnectionTcpFull
from .tl.custom import Draft, Dialog
from .tl.functions.account import (
GetPasswordRequest, UpdatePasswordSettingsRequest
@ -122,11 +122,11 @@ class TelegramClient(TelegramBareClient):
api_hash (`str`):
The API ID you obtained from https://my.telegram.org.
connection_mode (`ConnectionMode`, optional):
The connection mode to be used when creating a new connection
to the servers. Defaults to the ``TCP_FULL`` mode.
This will only affect how messages are sent over the network
and how much processing is required before sending them.
connection (`telethon.network.connection.common.Connection`, optional):
The connection instance to be used when creating a new connection
to the servers. If it's a type, the `proxy` argument will be used.
Defaults to `telethon.network.connection.tcpfull.ConnectionTcpFull`.
use_ipv6 (`bool`, optional):
Whether to connect to the servers through IPv6 or not.
@ -174,7 +174,8 @@ class TelegramClient(TelegramBareClient):
# region Initialization
def __init__(self, session, api_id, api_hash,
connection_mode=ConnectionMode.TCP_FULL,
*,
connection=ConnectionTcpFull,
use_ipv6=False,
proxy=None,
update_workers=None,
@ -184,7 +185,7 @@ class TelegramClient(TelegramBareClient):
**kwargs):
super().__init__(
session, api_id, api_hash,
connection_mode=connection_mode,
connection=connection,
use_ipv6=use_ipv6,
proxy=proxy,
update_workers=update_workers,