diff --git a/crypto/aes.py b/crypto/aes.py index 691eb8a6..cf442617 100644 --- a/crypto/aes.py +++ b/crypto/aes.py @@ -35,7 +35,7 @@ class AES: @staticmethod def encrypt_ige(plain_text, key, iv): """Encrypts the given text in 16-bytes blocks by using the given key and 32-bytes initialization vector""" - # TODO: Random padding + # TODO: Random padding? if len(plain_text) % 16 != 0: # Add padding if and only if it's not evenly divisible by 16 already padding = bytes(16 - len(plain_text) % 16) plain_text += padding diff --git a/crypto/auth_key.py b/crypto/auth_key.py index be784f2d..da4827c6 100755 --- a/crypto/auth_key.py +++ b/crypto/auth_key.py @@ -1,18 +1,13 @@ # This file is based on TLSharp # https://github.com/sochix/TLSharp/blob/master/TLSharp.Core/MTProto/Crypto/AuthKey.cs -import utils from errors import * from utils import BinaryWriter, BinaryReader +import utils class AuthKey: - def __init__(self, gab=None, data=None): - if gab: - self.key = utils.get_byte_array(gab, signed=False) - elif data: - self.key = data - else: - raise InvalidParameterError('Either a gab integer or data bytes array must be provided') + def __init__(self, data): + self.key = data with BinaryReader(utils.sha1(self.key)) as reader: self.aux_hash = reader.read_long(signed=False) @@ -26,5 +21,5 @@ class AuthKey: writer.write_byte(number) writer.write_long(self.aux_hash, signed=False) - new_nonce_hash = utils.sha1(writer.get_bytes())[4:20] + new_nonce_hash = utils.calc_msg_key(writer.get_bytes()) return new_nonce_hash diff --git a/crypto/rsa.py b/crypto/rsa.py index 1578764c..4c6cc8ec 100755 --- a/crypto/rsa.py +++ b/crypto/rsa.py @@ -26,16 +26,13 @@ class RSAServerKey: if length < 235: writer.write(utils.generate_random_bytes(235 - length)) - cipher_text = utils.get_byte_array( - pow(int.from_bytes(writer.get_bytes(), byteorder='big'), self.e, self.m), - signed=False) + result = int.from_bytes(writer.get_bytes(), byteorder='big') + result = pow(result, self.e, self.m) - if len(cipher_text) == 256: - return cipher_text - - else: - padding = bytes(256 - len(cipher_text)) - return padding + cipher_text + # If the result byte count is less than 256, since the byte order is big, + # the non-used bytes on the left will be 0 and act as padding, + # without need of any additional checks + return int.to_bytes(result, length=256, byteorder='big', signed=False) class RSA: diff --git a/network/__init__.py b/network/__init__.py index ea02bd9a..c9c23c26 100755 --- a/network/__init__.py +++ b/network/__init__.py @@ -1,6 +1,5 @@ from .mtproto_plain_sender import MtProtoPlainSender from .tcp_client import TcpClient -from .tcp_message import TcpMessage from .authenticator import do_authentication from .mtproto_sender import MtProtoSender from .tcp_transport import TcpTransport diff --git a/network/authenticator.py b/network/authenticator.py index 608f006d..67cf25e3 100755 --- a/network/authenticator.py +++ b/network/authenticator.py @@ -37,9 +37,7 @@ def do_authentication(transport): server_nonce = reader.read(16) pq_bytes = reader.tgread_bytes() - # "string pq is a representation of a natural number (in binary big endian format)" - # See https://core.telegram.org/mtproto/auth_key#dh-exchange-initiation - pq = int.from_bytes(pq_bytes, byteorder='big') + pq = get_int(pq_bytes) vector_id = reader.read_int() if vector_id != 0x1cb5c415: @@ -55,9 +53,9 @@ def do_authentication(transport): p, q = Factorizator.factorize(pq) with BinaryWriter() as pq_inner_data_writer: pq_inner_data_writer.write_int(0x83c95aec, signed=False) # PQ Inner Data - pq_inner_data_writer.tgwrite_bytes(utils.get_byte_array(pq, signed=False)) - pq_inner_data_writer.tgwrite_bytes(utils.get_byte_array(min(p, q), signed=False)) - pq_inner_data_writer.tgwrite_bytes(utils.get_byte_array(max(p, q), signed=False)) + pq_inner_data_writer.tgwrite_bytes(get_byte_array(pq, signed=False)) + pq_inner_data_writer.tgwrite_bytes(get_byte_array(min(p, q), signed=False)) + pq_inner_data_writer.tgwrite_bytes(get_byte_array(max(p, q), signed=False)) pq_inner_data_writer.write(nonce) pq_inner_data_writer.write(server_nonce) pq_inner_data_writer.write(new_nonce) @@ -78,8 +76,8 @@ def do_authentication(transport): req_dh_params_writer.write_int(0xd712e4be, signed=False) # Req DH Params req_dh_params_writer.write(nonce) req_dh_params_writer.write(server_nonce) - req_dh_params_writer.tgwrite_bytes(utils.get_byte_array(min(p, q), signed=False)) - req_dh_params_writer.tgwrite_bytes(utils.get_byte_array(max(p, q), signed=False)) + req_dh_params_writer.tgwrite_bytes(get_byte_array(min(p, q), signed=False)) + req_dh_params_writer.tgwrite_bytes(get_byte_array(max(p, q), signed=False)) req_dh_params_writer.write(target_fingerprint) req_dh_params_writer.tgwrite_bytes(cipher_text) @@ -127,15 +125,13 @@ def do_authentication(transport): raise AssertionError('Invalid server nonce in encrypted answer') g = dh_inner_data_reader.read_int() - # "current value of dh_prime equals (in big-endian byte order)" - # See https://core.telegram.org/mtproto/auth_key#presenting-proof-of-work-server-authentication - dh_prime = int.from_bytes(dh_inner_data_reader.tgread_bytes(), byteorder='big', signed=False) - ga = int.from_bytes(dh_inner_data_reader.tgread_bytes(), byteorder='big', signed=False) + dh_prime = get_int(dh_inner_data_reader.tgread_bytes(), signed=False) + ga = get_int(dh_inner_data_reader.tgread_bytes(), signed=False) server_time = dh_inner_data_reader.read_int() time_offset = server_time - int(time.time()) - b = int.from_bytes(utils.generate_random_bytes(2048), byteorder='big', signed=False) + b = get_int(utils.generate_random_bytes(2048), signed=False) gb = pow(g, b, dh_prime) gab = pow(ga, b, dh_prime) @@ -145,7 +141,7 @@ def do_authentication(transport): client_dh_inner_data_writer.write(nonce) client_dh_inner_data_writer.write(server_nonce) client_dh_inner_data_writer.write_long(0) # TODO retry_id - client_dh_inner_data_writer.tgwrite_bytes(utils.get_byte_array(gb, signed=False)) + client_dh_inner_data_writer.tgwrite_bytes(get_byte_array(gb, signed=False)) with BinaryWriter() as client_dh_inner_data_with_hash_writer: client_dh_inner_data_with_hash_writer.write(utils.sha1(client_dh_inner_data_writer.get_bytes())) @@ -178,7 +174,7 @@ def do_authentication(transport): raise NotImplementedError('Invalid server nonce from server') new_nonce_hash1 = reader.read(16) - auth_key = AuthKey(gab) + auth_key = AuthKey(get_byte_array(gab, signed=False)) new_nonce_hash_calculated = auth_key.calc_new_nonce_hash(new_nonce, 1) if new_nonce_hash1 != new_nonce_hash_calculated: @@ -199,3 +195,20 @@ def do_authentication(transport): def get_fingerprint_text(fingerprint): """Gets a fingerprint text in 01-23-45-67-89-AB-CD-EF format (no hyphens)""" return ''.join(hex(b)[2:].rjust(2, '0').upper() for b in fingerprint) + + +# The following methods operate in big endian (unlike most of Telegram API) because: +# > "...pq is a representation of a natural number (in binary *big endian* format)..." +# > "...current value of dh_prime equals (in *big-endian* byte order)..." +# Reference: https://core.telegram.org/mtproto/auth_key +def get_byte_array(integer, signed): + """Gets the arbitrary-length byte array corresponding to the given integer""" + bits = integer.bit_length() + byte_length = (bits + 8 - 1) // 8 # 8 bits per byte + return int.to_bytes(integer, length=byte_length, byteorder='big', signed=signed) + + +def get_int(byte_array, signed=True): + """Gets the specified integer from its byte array. This should be used by the authenticator, + who requires the data to be in big endian""" + return int.from_bytes(byte_array, byteorder='big', signed=signed) diff --git a/network/mtproto_plain_sender.py b/network/mtproto_plain_sender.py index 4f93f29e..08bf8541 100755 --- a/network/mtproto_plain_sender.py +++ b/network/mtproto_plain_sender.py @@ -26,8 +26,8 @@ class MtProtoPlainSender: def receive(self): """Receives a plain packet, returning the body of the response""" - result = self._transport.receive() - with BinaryReader(result.body) as reader: + seq, body = self._transport.receive() + with BinaryReader(body) as reader: auth_key_id = reader.read_long() msg_id = reader.read_long() message_length = reader.read_int() diff --git a/network/mtproto_sender.py b/network/mtproto_sender.py index 4eba1481..d8a7848c 100755 --- a/network/mtproto_sender.py +++ b/network/mtproto_sender.py @@ -13,10 +13,9 @@ from tl.all_tlobjects import tlobjects class MtProtoSender: """MTProto Mobile Protocol sender (https://core.telegram.org/mtproto/description)""" - def __init__(self, transport, session, swallow_errors=True): + def __init__(self, transport, session): self.transport = transport self.session = session - self.swallow_errors = swallow_errors self.need_confirmation = [] # Message IDs that need confirmation self.on_update_handlers = [] @@ -59,18 +58,11 @@ class MtProtoSender: def receive(self, request): """Receives the specified MTProtoRequest ("fills in it" the received data)""" while not request.confirm_received: - try: - message, remote_msg_id, remote_sequence = self.decode_msg(self.transport.receive().body) - - with BinaryReader(message) as reader: - self.process_msg(remote_msg_id, remote_sequence, reader, request) - - except RPCError as error: - if self.swallow_errors: - print('A RPC error occurred when decoding a message: {}'.format(error)) - else: - raise error + seq, body = self.transport.receive() + message, remote_msg_id, remote_sequence = self.decode_msg(body) + with BinaryReader(message) as reader: + self.process_msg(remote_msg_id, remote_sequence, reader, request) # endregion diff --git a/network/tcp_message.py b/network/tcp_message.py deleted file mode 100755 index a28c839e..00000000 --- a/network/tcp_message.py +++ /dev/null @@ -1,62 +0,0 @@ -# This file is based on TLSharp -# https://github.com/sochix/TLSharp/blob/master/TLSharp.Core/Network/TcpMessage.cs -from utils import BinaryWriter, BinaryReader -from binascii import crc32 -from errors import * - - -class TcpMessage: - def __init__(self, seq_number, body): - """ - :param seq_number: Sequence number - :param body: Message body byte array - """ - if body is None: - raise InvalidParameterError('body cannot be None') - - self.sequence_number = seq_number - self.body = body - - def encode(self): - """Returns the bytes of the this message encoded, following Telegram's guidelines""" - with BinaryWriter() as writer: - ''' https://core.telegram.org/mtproto#tcp-transport - - 4 length bytes are added at the front - (to include the length, the sequence number, and CRC32; always divisible by 4) - and 4 bytes with the packet sequence number within this TCP connection - (the first packet sent is numbered 0, the next one 1, etc.), - and 4 CRC32 bytes at the end (length, sequence number, and payload together). - ''' - writer.write_int(len(self.body) + 12) - writer.write_int(self.sequence_number) - writer.write(self.body) - - crc = crc32(writer.get_bytes()) - writer.write_int(crc, signed=False) - - return writer.get_bytes() - - @staticmethod - def decode(body): - """Returns a TcpMessage from the given encoded bytes, decoding them previously""" - if body is None: - raise InvalidParameterError('body cannot be None') - - if len(body) < 12: - raise InvalidParameterError('Wrong size of input packet') - - with BinaryReader(body) as reader: - packet_len = int.from_bytes(reader.read(4), byteorder='little') - if packet_len < 12: - raise InvalidParameterError('Invalid packet length in body: {}'.format(packet_len)) - - seq = reader.read_int() - packet = reader.read(packet_len - 12) - checksum = reader.read_int(signed=False) - - valid_checksum = crc32(body[:packet_len - 4]) - if checksum != valid_checksum: - raise InvalidChecksumError(checksum, valid_checksum) - - return TcpMessage(seq, packet) diff --git a/network/tcp_transport.py b/network/tcp_transport.py index 8f85bd6d..82719684 100755 --- a/network/tcp_transport.py +++ b/network/tcp_transport.py @@ -1,41 +1,49 @@ # This file is based on TLSharp # https://github.com/sochix/TLSharp/blob/master/TLSharp.Core/Network/TcpTransport.cs -from network import TcpMessage, TcpClient +from network import TcpClient from binascii import crc32 from errors import * +from utils import BinaryWriter class TcpTransport: def __init__(self, ip_address, port): - self._tcp_client = TcpClient() - self._send_counter = 0 + self.tcp_client = TcpClient() + self.send_counter = 0 - self._tcp_client.connect(ip_address, port) + self.tcp_client.connect(ip_address, port) + # Original reference: https://core.telegram.org/mtproto#tcp-transport + # The packets are encoded as: total length, sequence number, packet and checksum (CRC32) def send(self, packet): """Sends the given packet (bytes array) to the connected peer""" - if not self._tcp_client.connected: + if not self.tcp_client.connected: raise ConnectionError('Client not connected to server.') - # Get a TcpMessage which contains the given packet - tcp_message = TcpMessage(self._send_counter, packet) + with BinaryWriter() as writer: + writer.write_int(len(packet) + 12) # 12 = size_of (integer) * 3 + writer.write_int(self.send_counter) + writer.write(packet) - self._tcp_client.write(tcp_message.encode()) - self._send_counter += 1 + crc = crc32(writer.get_bytes()) + writer.write_int(crc, signed=False) + + self.tcp_client.write(writer.get_bytes()) + self.send_counter += 1 def receive(self): - """Receives a TcpMessage from the connected peer""" + """Receives a TCP message (tuple(sequence number, body)) from the connected peer""" - # First read everything - packet_length_bytes = self._tcp_client.read(4) + # First read everything we need + packet_length_bytes = self.tcp_client.read(4) packet_length = int.from_bytes(packet_length_bytes, byteorder='little') - seq_bytes = self._tcp_client.read(4) + seq_bytes = self.tcp_client.read(4) seq = int.from_bytes(seq_bytes, byteorder='little') - body = self._tcp_client.read(packet_length - 12) + body = self.tcp_client.read(packet_length - 12) - checksum = int.from_bytes(self._tcp_client.read(4), byteorder='little', signed=False) + checksum = int.from_bytes(self.tcp_client.read(4), byteorder='little', signed=False) # Then perform the checks rv = packet_length_bytes + seq_bytes + body @@ -44,9 +52,9 @@ class TcpTransport: if checksum != valid_checksum: raise InvalidChecksumError(checksum, valid_checksum) - # If we passed the tests, we can then return a valid TcpMessage - return TcpMessage(seq, body) + # If we passed the tests, we can then return a valid TCP message + return seq, body def close(self): - if self._tcp_client.connected: - self._tcp_client.close() + if self.tcp_client.connected: + self.tcp_client.close() diff --git a/tl/session.py b/tl/session.py index 1c3ab0ec..b5e8ef64 100755 --- a/tl/session.py +++ b/tl/session.py @@ -22,19 +22,25 @@ class Session: def save(self): """Saves the current session object as session_user_id.session""" - with open('{}.session'.format(self.session_user_id), 'wb') as file: - pickle.dump(self, file) + if self.session_user_id: + with open('{}.session'.format(self.session_user_id), 'wb') as file: + pickle.dump(self, file) @staticmethod def try_load_or_create_new(session_user_id): - """Loads a saved session_user_id session, or creates a new one if none existed before""" - filepath = '{}.session'.format(session_user_id) + """Loads a saved session_user_id session, or creates a new one if none existed before. + If the given session_user_id is None, we assume that it is for testing purposes""" + if session_user_id is None: + return Session(None) - if file_exists(filepath): - with open(filepath, 'rb') as file: - return pickle.load(file) else: - return Session(session_user_id) + filepath = '{}.session'.format(session_user_id) + + if file_exists(filepath): + with open(filepath, 'rb') as file: + return pickle.load(file) + else: + return Session(session_user_id) def get_new_msg_id(self): """Generates a new message ID based on the current time (in ms) since epoch""" diff --git a/tl/telegram_client.py b/tl/telegram_client.py index fe811002..e5a721eb 100644 --- a/tl/telegram_client.py +++ b/tl/telegram_client.py @@ -45,30 +45,34 @@ class TelegramClient: """Connects to the Telegram servers, executing authentication if required. Note that authenticating to the Telegram servers is not the same as authenticating the app, which requires to send a code first.""" + try: + if not self.session.auth_key or reconnect: + self.session.auth_key, self.session.time_offset = \ + network.authenticator.do_authentication(self.transport) - if not self.session.auth_key or reconnect: - self.session.auth_key, self.session.time_offset = network.authenticator.do_authentication(self.transport) - self.session.save() + self.session.save() - self.sender = MtProtoSender(self.transport, self.session) - self.sender.add_update_handler(self.on_update) + self.sender = MtProtoSender(self.transport, self.session) + self.sender.add_update_handler(self.on_update) - # Always init connection by using the latest layer, not only when not reconnecting (as in original TLSharp's) - # Otherwise, the server thinks that were using the oldest layer! - # (Note that this is mainly untested, but it seems like it since some errors point in that direction) - request = InvokeWithLayerRequest(layer=self.layer, - query=InitConnectionRequest(api_id=self.api_id, - device_model=platform.node(), - system_version=platform.system(), - app_version='0.2', - lang_code='en', - query=GetConfigRequest())) + # Now it's time to send an InitConnectionRequest + # This must always be invoked with the layer we'll be using + request = InvokeWithLayerRequest(layer=self.layer, + query=InitConnectionRequest(api_id=self.api_id, + device_model=platform.node(), + system_version=platform.system(), + app_version='0.2', + lang_code='en', + query=GetConfigRequest())) - self.sender.send(request) - self.sender.receive(request) + self.sender.send(request) + self.sender.receive(request) - self.dc_options = request.result.dc_options - return True + self.dc_options = request.result.dc_options + return True + except RPCError as error: + print('Could not stabilise initial connection: {}'.format(error)) + return False def reconnect_to_dc(self, dc_id): """Reconnects to the specified DC ID. This is automatically called after an InvalidDCError is raised""" diff --git a/unit_test.py b/unit_test.py deleted file mode 100755 index fe2c67da..00000000 --- a/unit_test.py +++ /dev/null @@ -1,273 +0,0 @@ -import random -import socket -import threading -import unittest -import os -import platform - -from tl import Session -from tl.functions import InvokeWithLayerRequest, InitConnectionRequest -from tl.functions.help import GetConfigRequest - -from crypto import AES, Factorizator -from network import TcpTransport, TcpClient, MtProtoSender -from utils import BinaryWriter, BinaryReader - -import utils -import network.authenticator - -host = 'localhost' -port = random.randint(50000, 60000) # Arbitrary non-privileged port - - -def run_server_echo_thread(): - def server_thread(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', port)) - s.listen(1) - conn, addr = s.accept() - with conn: - data = conn.recv(16) - conn.send(data) - - server = threading.Thread(target=server_thread) - server.start() - - -def get_representation(bytez): - return '-'.join(hex(b)[2:].rjust(2, '0').upper() for b in bytez) - - -def get_bytes(representation): - return bytes([int(b, 16) for b in representation.split('-')]) - - -class UnitTest(unittest.TestCase): - @staticmethod - def test_tcp_client(): - client = TcpClient() - run_server_echo_thread() - - try: - client.connect(host, port) - except: - raise AssertionError('Could connect to the server') - - try: - client.write('Unit testing...'.encode('ascii')) - except: - raise AssertionError('Could not send a message to the server') - - try: - client.read(16) - except: - raise AssertionError('Could not read a message to the server') - - try: - client.close() - except: - raise AssertionError('Could not close the client') - - @staticmethod - def test_binary_writer_reader(): - # Test that we can write and read properly - with BinaryWriter() as writer: - writer.write_byte(1) - writer.write_int(5) - writer.write_long(13) - writer.write_float(17.0) - writer.write_double(25.0) - writer.write(bytes([26, 27, 28, 29, 30, 31, 32])) - writer.write_large_int(2**127, 128, signed=False) - - data = writer.get_bytes() - assert data is not None, 'Example Data should not be None' - assert len(data) == 48, 'Example data length should be 48, but is {}'.format(len(data)) - - with BinaryReader(data) as reader: - value = reader.read_byte() - assert value == 1, 'Example byte should be 1 but is {}'.format(value) - - value = reader.read_int() - assert value == 5, 'Example integer should be 5 but is {}'.format(value) - - value = reader.read_long() - assert value == 13, 'Example long integer should be 13 but is {}'.format(value) - - value = reader.read_float() - assert value == 17.0, 'Example float should be 17.0 but is {}'.format(value) - - value = reader.read_double() - assert value == 25.0, 'Example double should be 25.0 but is {}'.format(value) - - value = reader.read(7) - assert value == bytes([26, 27, 28, 29, 30, 31, 32]), 'Example bytes should be {} but is {}' \ - .format(bytes([26, 27, 28, 29, 30, 31, 32]), value) - - value = reader.read_large_int(128, signed=False) - assert value == 2 ** 127, 'Example large integer should be {} but is {}'.format(2 ** 127, value) - - # Test Telegram that types are written right - with BinaryWriter() as writer: - writer.write_int(0x60469778) - buffer = writer.get_bytes() - valid = b'\x78\x97\x46\x60' # Tested written bytes using TLSharp and C#'s MemoryStream - - assert buffer == valid, "Written type should be {} but is {}".format(list(valid), list(buffer)) - - @staticmethod - def test_binary_tgwriter_tgreader(): - string = 'Testing Telegram strings, this should work properly!' - small_data = utils.generate_random_bytes(20) - large_data = utils.generate_random_bytes(1024) - - with BinaryWriter() as writer: - writer.tgwrite_string(string) - writer.tgwrite_bytes(small_data) - writer.tgwrite_bytes(large_data) - - data = writer.get_bytes() - assert data is not None, 'Example Data should not be None' - - with BinaryReader(data) as reader: - value = reader.tgread_string() - assert value == string, 'Example string should be {} but is {}'.format(string, value) - - value = reader.tgread_bytes() - assert value == small_data, 'Example bytes should be {} but is {}'.format(small_data, value) - - value = reader.tgread_bytes() - assert value == large_data, 'Example bytes should be {} but is {}'.format(large_data, value) - - @staticmethod - def test_factorizator(): - pq = 3118979781119966969 - p, q = Factorizator.factorize(pq) - - assert p == 1719614201, 'Factorized pair did not yield the correct result' - assert q == 1813767169, 'Factorized pair did not yield the correct result' - - @staticmethod - def test_to_byte_array(): - for value, real in zip( - [3118979781119966969, # PQ - 1667024975687354561, # PQ - 1148985737, 1450866553], # Min, Max - - ['2B-48-D7-95-FB-47-FE-F9', # PQ - '17-22-76-62-13-8C-88-C1', # PQ - '44-7C-21-89', '56-7A-77-79'] # Min, Max - ): - current = get_representation(utils.get_byte_array(value, signed=True)) - assert real == current, 'Invalid byte array representation (expected {}, got {})'.format(current, real) - - @staticmethod - def test_sha1(): - string = 'Example string' - data = get_representation(string.encode('utf-8')) - real = '45-78-61-6D-70-6C-65-20-73-74-72-69-6E-67' - assert data == real, 'Invalid string representation (should be {}, but is {})'.format(real, data) - - hashsum = get_representation(utils.sha1(get_bytes(data))) - real = '0A-54-92-7C-8D-06-3A-29-99-04-8E-F8-6A-3F-C4-8E-D3-7D-6D-39' - assert hashsum == real, 'Invalid sha1 hashsum representation (should be {}, but is {})'.format(real, data) - - @staticmethod - def test_bytes_to_int(): - bytez = b'\x01\x23\x45\x67\x89\xab\xcd\xef' - - reprs = get_representation(bytez) - real = '01-23-45-67-89-AB-CD-EF' - assert reprs == real, 'Invalid bytes representation (should be {} but is {})'.format(real, reprs) - assert bytez == get_bytes(reprs), 'Invalid representation to bytes conversion' - - value = int.from_bytes(bytez, byteorder='big', signed=True) - real = 81985529216486895 - assert value == real, 'Invalid bytes to int conversion (should be {} but is {})'.format(real, value) - - # Now test more cases - for repr, real in zip( - ['24-9D-FE-49-20-45-DF-C3', '60-44-F3-33', '61-5F-61-31'], - [2638544546736496579, 1615131443, 1633640753] - ): - bytez = get_bytes(repr) - if len(bytez) > 8: - value = int.from_bytes(bytez, byteorder='little', signed=True) - else: - value = int.from_bytes(bytez, byteorder='big', signed=True) - assert value == real, 'Invalid bytes to int conversion (should be {} but is {})'.format(real, value) - - @staticmethod - def test_aes_decrypt(): - cipher_text = get_bytes('EF-5F-5D-57-7B-A5-95-B0-1D-B7-BF-E5-09-AC-64-5E-F9-ED-E3-28-FB-D7-59-78-16-F8-74-4A-51-A6-10-16-F5-EF-99-0B-E6-CA-A7-9D-FA-DD-7B-CD-39-BF-FB-F0-D4-B6-09-50-76-78-9D-6F-87-DD-57-33-B7-F4-48-4F-C1-78-73-66-22-E1-65-74-E6-7B-4F-EB-99-7B-2D-58-94-62-10-2A-A2-C8-95-B4-D4-EE-2E-C9-44-DA-54-EE-7A-86-72-34-14-54-4F-E3-F9-C5-3A-3F-7E-C3-28-29-2F-19-D4-40-DA-D4-E6-11-A4-6D-A7-8A-20-0A-C4-2A-55-DD-4A-1A-D1-57-18-86-4C-0F-BC-9C-F6-2C-D5-E1-FA-D4-08-48-F9-B2-49-61-BA-30-6A-9F-E0-3E-55-66-E0-57-55-8A-02-57-28-E1-C8-BD-4A-F7-26-6A-6E-2F-93-76-57-C4-E5-F3-96-F3-79-17-B8-16-15-C1-A4-21-11-0F-9A-4D-92-BB-DF-2F-8E-4C-3E-88-D6-41-CC-91-D4-BA-FF-5A-F1-D9-2C-9C-FA-F3-DD-DB-03-B0-1B-1C-32-8C-9C-DE-96-FA-AE-40-9C-F5-AC-15-BF-11-17-D3-0F-E4-F4-C5-46-E7-27-3C-47-91-FD-02-AC-FC-7F-84-0D-4A-BC-43-2A-7E-0E-B9-BD-93-8E-0F-C9-1B-CF-C3-61-EA-2A-73-D8-00-3C-6E-BA-33-63-A6-24-16-AB-AB-11-93-3D-7B-20-29-C6-37-43-CF-78-C7-25-CE-03-02-0C-58-B3-34-24-61-06-FE-DD-00-65-BC-51-99-6E-08-51-8B-8D-83-CA-F1-36-ED-94-F6-FF-19-30-1C-6D-4E-6F-8C-59-08-2D-60-E6-3D-A8-39-C2-FE-96-2D-CF-AD-15-F5-68-B1-5B-2A-2F-6D-86-92-D9-F8-45-4F-09-80-01-3C-D1-8B-37-88-AA-52-02-58-18-7B-B5-86-60-BE-2C-E6-EE-3D-85-02-F9-CC-09-4A-44-55-73-24-A1-9B-01-ED-B1-FD-FF-C8-E1-F4-78-A9-DB-DD-F0-50-2F-A7-AB-EF-3C-0F-FC-82-5B-D5-35-E0-32-60-2F-F1-A3-DF-BE-70-68-F5-1A-AB-21-55-34-A4-45-86-B2-75-36-D1-50-36-DC-4B-78-5F-7D-44-F9-83-A4-A3-68-E9-4B-08-FA-7F-55-85-55-31-35-E5-C1-14-A4-E3-E2-AC-8A-2D-64-36-6F-46-4E-B1-AF-FE-66-BF-38-DD-1D-40-BC-DA-3F-0A-48-91-BD-09-84-B6-42-35-2A-E9-3A-57-63-92-BB-94-44-91-C7-C4-46-79-C4-60-7F-95-88-53-24-CF-CE-0A-4C-28-9B-B9-4F-10-68-CC-E3-A5-3F-F1-A1-43-4A-C8-FF-98-AC-F5-BF-36-7F-08-01-05-02-2F-C4-3D-EA-F9-0B-3E-99-FD-3A-C2-03-E7-D3-CE-79-2D-EE-F4-01-C8-6B-4B-AA-81-BA-49-D1-87-84-72-32-7F-6B-7F') - key = get_bytes('23-D7-11-75-77-78-5D-74-6F-74-C4-23-D3-99-66-E7-77-8B-5A-92-94-A5-88-C2-C3-D4-46-B7-F1-0C-D7-FA') - iv = get_bytes('F8-DC-50-26-71-C3-56-72-D7-07-78-17-57-D3-B6-5D-A6-EB-02-DB-7D-73-0A-0B-1C-29-15-40-CC-6C-03-7E') - - plain_text = AES.decrypt_ige(cipher_text, key, iv) - real = get_bytes('FB-17-EB-54-86-B6-49-1C-DF-D8-24-E6-D9-82-37-44-66-18-84-9F-BA-0D-89-B5-81-0D-47-B2-1B-CB-56-3F-7F-69-3A-22-0A-30-39-71-EE-1F-40-B2-A7-1A-4B-BD-76-7E-7A-FD-20-68-58-5F-03-00-00-00-FE-00-01-00-C7-1C-AE-B9-C6-B1-C9-04-8E-6C-52-2F-70-F1-3F-73-98-0D-40-23-8E-3E-21-C1-49-34-D0-37-56-3D-93-0F-48-19-8A-0A-A7-C1-40-58-22-94-93-D2-25-30-F4-DB-FA-33-6F-6E-0A-C9-25-13-95-43-AE-D4-4C-CE-7C-37-20-FD-51-F6-94-58-70-5A-C6-8C-D4-FE-6B-6B-13-AB-DC-97-46-51-29-69-32-84-54-F1-8F-AF-8C-59-5F-64-24-77-FE-96-BB-2A-94-1D-5B-CD-1D-4A-C8-CC-49-88-07-08-FA-9B-37-8E-3C-4F-3A-90-60-BE-E6-7C-F9-A4-A4-A6-95-81-10-51-90-7E-16-27-53-B5-6B-0F-6B-41-0D-BA-74-D8-A8-4B-2A-14-B3-14-4E-0E-F1-28-47-54-FD-17-ED-95-0D-59-65-B4-B9-DD-46-58-2D-B1-17-8D-16-9C-6B-C4-65-B0-D6-FF-9C-A3-92-8F-EF-5B-9A-E4-E4-18-FC-15-E8-3E-BE-A0-F8-7F-A9-FF-5E-ED-70-05-0D-ED-28-49-F4-7B-F9-59-D9-56-85-0C-E9-29-85-1F-0D-81-15-F6-35-B1-05-EE-2E-4E-15-D0-4B-24-54-BF-6F-4F-AD-F0-34-B1-04-03-11-9C-D8-E3-B9-2F-CC-5B-FE-00-01-00-22-8A-8F-62-58-9E-D1-9F-4B-53-EC-FB-22-E0-52-6B-8E-09-2E-B6-4B-90-53-30-A7-1F-52-1F-5B-3C-8F-AC-12-5B-D3-35-22-2A-1E-3E-9F-BD-33-73-B3-5C-1A-A6-8E-01-35-B4-8C-92-AE-D9-A0-86-6C-EF-CA-C9-09-4E-3B-B8-E5-F6-76-EE-F9-E7-CE-F1-DF-9E-F1-2E-92-55-DF-CF-80-68-70-AD-D1-AB-B2-34-54-E0-BF-38-A6-F4-C4-5B-64-96-8F-C7-14-18-84-7A-0A-44-38-56-1A-E4-9E-16-81-9D-AF-CC-A5-0E-17-D1-9E-DB-DE-DD-14-7D-04-71-99-06-32-3E-92-CE-D4-6E-76-10-DF-17-D9-2E-97-6A-F0-81-CC-ED-D5-20-10-60-AD-D8-7B-C2-FB-7D-87-CD-1E-B7-0E-28-1A-78-61-9C-8A-CA-81-A0-03-B2-40-7F-AE-BB-E3-21-96-74-01-A6-E2-C0-D8-17-C9-19-78-AD-1C-51-12-DC-29-8F-50-CA-8A-2F-56-89-A4-AC-E0-0F-C1-71-E4-C7-33-71-AE-83-30-59-D1-33-C2-D6-A6-F1-E8-FC-1F-77-3D-ED-E9-BF-B5-1C-6F-C0-9E-81-4B-B2-5A-51-C3-94-6B-BD-AD-5C-FF-DD-4B-0C-DB-E8-DA-0D-CB-57-B5-AC-D0-95-9E-FC-8F-F8') - assert plain_text == real, 'Decrypted text does not equal the real value (expected "{}", got "{}")'\ - .format(get_representation(real), get_representation(plain_text)) - - @staticmethod - def test_aes_encrypt(): - original_text = get_bytes('CD-B7-90-A9-61-41-F7-AD-A2-F9-FA-68-A4-D9-F2-5D-D6-6A-2E-40-54-B6-43-66-81-0D-47-B2-1B-CB-56-3F-7F-69-3A-22-0A-30-39-71-EE-1F-40-B2-A7-1A-4B-BD-76-7E-7A-FD-20-68-58-5F-00-00-00-00-00-00-00-00-FE-00-01-00-53-5B-3B-61-71-BC-C5-2D-2C-E3-D3-DB-4E-BF-BC-C0-3A-D0-90-89-C5-2E-EB-86-10-B9-B4-4B-D9-CF-00-DD-DA-D9-12-5E-27-DD-00-D2-61-E2-8A-93-97-30-38-0E-AC-49-C5-A2-7A-C8-67-6B-2C-B0-3C-95-BA-E2-C3-AC-6D-F7-87-7E-0F-9F-F1-A1-FD-87-81-04-C5-F0-14-35-E6-67-5A-E4-4E-57-A8-8D-C2-66-AF-F9-0B-82-13-CD-BC-FF-26-68-90-81-FA-26-34-80-B5-A7-C3-69-15-B1-31-BE-46-C2-B7-14-6B-75-B8-F2-71-6A-27-5B-3B-30-CF-A9-97-65-C6-E5-7D-46-EC-12-D3-6D-8F-64-58-03-1E-66-24-D5-87-9A-8B-CE-E3-D1-71-9B-F2-A9-49-19-69-B4-0A-D0-97-0E-91-5E-B0-F3-C2-FB-FF-AC-1F-CD-30-7E-C0-79-9F-DE-E0-85-17-16-13-10-FF-E8-24-B3-71-B5-C2-BC-72-B3-39-DB-53-D3-52-CB-6C-48-19-6D-CA-98-FB-C2-D4-24-6F-FD-8C-68-31-2F-C9-F4-6F-9B-55-9B-A5-6D-B9-54-D0-53-BC-8B-EC-4B-3F-D2-3C-E9-5E-34-79-80-9A-D2-8C-B9-5F-95-7A-46-72-11-4E-E6') - key = get_bytes('23-D7-11-75-77-78-5D-74-6F-74-C4-23-D3-99-66-E7-77-8B-5A-92-94-A5-88-C2-C3-D4-46-B7-F1-0C-D7-FA') - iv = get_bytes('F8-DC-50-26-71-C3-56-72-D7-07-78-17-57-D3-B6-5D-A6-EB-02-DB-7D-73-0A-0B-1C-29-15-40-CC-6C-03-7E') - - cipher_text = AES.encrypt_ige(original_text, key, iv) - real = get_bytes('57-A9-1C-BB-4A-B5-C7-B1-51-9E-A9-24-15-94-4B-63-CB-2F-50-93-B7-54-11-D8-F1-77-13-AE-DE-58-12-AF-C2-E1-10-1D-2C-38-7D-DD-A2-BD-6B-43-84-7E-B1-E5-51-69-62-36-A1-86-8D-02-25-B9-AA-0B-E2-32-13-2A-0F-D1-58-67-47-07-C9-FE-E0-0F-EE-EC-92-B8-65-BD-C4-69-31-B6-10-4E-8F-20-B6-8F-72-79-A0-A3-8F-63-37-4B-95-5F-A4-B0-E6-EE-49-BE-76-47-3E-F9-FF-AA-F6-B7-16-CD-24-09-B1-63-26-02-13-B8-99-BD-19-7F-E2-A5-91-BE-52-86-FF-EA-C9-05-3C-D6-19-AE-E6-D1-25-7F-38-AF-66-CF-F8-B7-E6-53-E3-F6-98-0D-EF-A8-BA-97-6F-20-09-69-29-73-12-5E-0F-77-10-DC-22-BB-23-25-1D-53-A6-10-26-EB-EB-5E-C4-86-04-F5-30-5B-BA-53-AE-EA-84-28-34-91-75-B8-F2-0B-74-D1-00-3A-7E-FE-F0-B5-BE-0E-86-21-61-5F-81-75-23-49-45-CB-07-57-78-AB-9B-80-A2-0A-46-DB-35-49-6A-08-5B-8B-55-4E-6E-1B-E0-E0-3E-E7-2A-06-A0-5D-4F-EA-71-1A-24-F4-F4-AF-95-4B-F2-9A-C5-FD-7F-6A-DD-61-2D-B3-29-DB-5B-D9-A8-CF-60-8F-36-85-04-B5-85-3F-78-EB-09-0C-B4-F4-2D-B8-67-71-2A-4B-1B-08-0C-18-A2-9E-30-13-0C-23-18-7A-43-73-D3-DC-38-F5-9A-4A-08-28-BD-A2-DC-A8-70-33-63-9E-A9-72-DF-72-A5-43-AB-A2') - assert cipher_text == real, 'Decrypted text does not equal the real value (expected "{}", got "{}")'\ - .format(get_representation(real), get_representation(cipher_text)) - - @staticmethod - def test_calc_key(): - shared_key = get_bytes('BC-D2-6D-B7-CA-76-F4-5D-5B-88-83-27-20-F3-11-8A-73-D0-34-94-31-AE-2A-4F-03-86-9A-2F-48-23-1A-8C-B5-6A-E9-24-E0-49-76-43-6D-5E-E7-30-1A-35-43-09-16-03-D2-9D-A9-89-D6-CE-08-50-0F-64-72-A0-B3-EB-FE-63-76-1A-DF-4A-14-96-98-16-A3-47-AB-04-14-21-5C-EB-0A-BC-6E-DF-C4-25-C6-09-B7-16-14-9C-27-81-15-3D-B0-AF-0E-0B-52-AA-04-36-36-73-F0-CF-B7-B8-3E-2C-44-94-78-D7-F8-E0-84-CB-25-D3-05-B2-E8-95-4D-72-3F-A2-E8-49-6E-F9-0B-5B-45-9B-AA-0C-58-7F-0E-69-DE-EE-64-1D-78-2F-4A-CE-EA-5E-7D-30-3B-A8-33-42-BB-52-A1-BF-65-04-B9-1E-A1-22-66-3D-A5-4D-40-9E-DD-81-80-C9-A5-FB-FC-67-DD-15-03-70-21-0F-66-44-16-89-32-EA-CA-B1-41-99-4F-A9-34-50-A9-A2-C6-3B-B2-43-39-1D-43-35-D2-0D-EC-4C-D9-AB-77-2D-03-0D-79-C2-76-17-5D-02-15-0C-42-61-97-CE-A5-B1-E4-5D-8E-E0-2C-CF-43-7B-6F-FA-99-66-A4-70-4D-00') - msg_keys = [ - 'BA-1A-CF-DA-A8-5E-43-62-6C-FA-B6-0C-3A-9B-B0-FC', - '86-6D-92-69-CF-8B-93-AA-86-4B-1F-69-D0-34-83-5D', - 'A1-2F-C0-61-6F-60-1A-B0-33-8F-7D-27-08-C8-EA-15', - '3A-56-52-0F-B7-89-D7-80-F6-18-72-CD-09-B5-A8-8A', - '06-F7-84-F3-91-CC-8D-DC-7D-92-41-7A-7E-84-25-E4', - '78-4D-FC-AE-F4-C4-55-81-6D-DD-99-A7-DB-B8-A3-88' - ] - are_client = [ - True, - False, - True, - True, - False, - False - ] - - valid_keys = [ - 'AF-E3-84-51-6D-E0-21-0C-D9-31-E4-9A-A0-76-5F-67-63-78-A1-B0-C9-BC-16-27-76-CF-2C-9D-4D-AE-C6-A5', - 'DD-30-58-B6-93-8E-C9-79-EF-83-F8-8C-6A-A7-68-03-E2-C6-B1-36-C5-BB-FC-E7-DF-D6-B1-67-F7-75-CF-6B', - '70-3F-66-AF-FE-0A-F3-1E-95-C3-25-48-8D-0F-A7-95-59-53-BF-DD-35-97-6E-7A-C0-5E-79-9C-9D-09-3B-7B', - '20-9F-82-E3-95-3F-9D-1E-EE-3E-F3-82-B0-8D-6E-76-26-5B-94-27-DD-7D-61-C3-AC-EB-FA-71-FF-0D-8F-08', - 'AE-06-BF-F1-89-88-22-66-98-48-76-E6-BD-D9-39-36-2D-36-4E-CF-FD-47-D2-87-D7-49-F8-93-22-2E-66-02', - 'D9-6B-43-D0-89-F7-C5-75-A2-4A-F2-2F-F0-17-5C-95-AE-FA-46-A5-95-AA-C3-B9-76-B0-3A-A8-0E-7B-EA-5D' - ] - - valid_ivs = [ - 'B8-51-F3-C5-A3-5D-C6-DF-9E-E0-51-BD-22-8D-13-09-0E-9A-9D-5E-38-A2-F8-E7-00-77-D9-C1-A7-A0-F7-0F', - 'DC-4C-C2-18-01-4A-22-58-86-6C-62-B6-B5-34-37-FD-E2-61-34-B6-AF-7D-46-53-D7-5B-E0-4E-0D-19-FB-BC', - '68-BB-BA-7F-55-B9-EF-86-EE-20-5A-1A-45-4E-70-C3-48-56-A2-E9-2F-91-AD-74-23-FE-54-06-E5-68-04-E9', - '79-31-8D-F0-DC-31-60-D7-09-BC-66-F1-AB-0D-7C-CB-2A-AF-74-32-64-C5-B3-18-C4-ED-55-D9-F6-39-DD-F3', - '1F-51-66-06-05-54-B9-E0-52-6A-88-6A-70-0C-DA-8D-2B-CD-BF-8A-67-5E-1A-A7-DD-EA-1C-CE-4C-D4-34-3D', - '8E-00-97-5E-B7-A1-F7-3D-1C-16-03-CA-B3-ED-EA-80-64-8F-77-A6-C4-34-5B-B5-DC-5D-C9-EC-B7-F8-F4-76' - ] - - for msg_key, is_client, valid_key, valid_iv in zip(msg_keys, are_client, valid_keys, valid_ivs): - msg_key = get_bytes(msg_key) - key, iv = utils.calc_key(shared_key, msg_key, is_client) - assert get_representation(key) == valid_key - assert get_representation(iv) == valid_iv - - @staticmethod - def test_authenticator(): - transport = TcpTransport('149.154.167.91', 443) - network.authenticator.do_authentication(transport) - transport.close() - -if __name__ == '__main__': - unittest.main() diff --git a/unit_tests.py b/unit_tests.py new file mode 100644 index 00000000..17937052 --- /dev/null +++ b/unit_tests.py @@ -0,0 +1,23 @@ +import unittest + + +if __name__ == '__main__': + + from unittests import CryptoTests, ParserTests, TLTests, UtilsTests, NetworkTests + test_classes = [CryptoTests, ParserTests, TLTests, UtilsTests] + + network = input('Run network tests (y/n)?: ').lower() == 'y' + if network: + test_classes.append(NetworkTests) + + loader = unittest.TestLoader() + + suites_list = [] + for test_class in test_classes: + suite = loader.loadTestsFromTestCase(test_class) + suites_list.append(suite) + + big_suite = unittest.TestSuite(suites_list) + + runner = unittest.TextTestRunner() + results = runner.run(big_suite) diff --git a/unittests/__init__.py b/unittests/__init__.py new file mode 100644 index 00000000..01f6aafe --- /dev/null +++ b/unittests/__init__.py @@ -0,0 +1,5 @@ +from .crypto_tests import CryptoTests +from .network_tests import NetworkTests +from .parser_tests import ParserTests +from .tl_tests import TLTests +from .utils_tests import UtilsTests diff --git a/unittests/crypto_tests.py b/unittests/crypto_tests.py new file mode 100644 index 00000000..8980ae00 --- /dev/null +++ b/unittests/crypto_tests.py @@ -0,0 +1,107 @@ +import unittest + +from crypto import AES +import utils.helpers as utils +from crypto import Factorizator + + +class CryptoTests(unittest.TestCase): + def setUp(self): + # Test known values + self.key = b'\xd1\xf4MXy\x0c\xf8/z,\xe9\xf9\xa4\x17\x04\xd9C\xc9\xaba\x81\xf3\xf8\xdd\xcb\x0c6\x92\x01\x1f\xc2y' + self.iv = b':\x02\x91x\x90Dj\xa6\x03\x90C\x08\x9e@X\xb5E\xffwy\xf3\x1c\xde\xde\xfbo\x8dm\xd6e.Z' + + self.plain_text = b'Non encrypted text :D' + self.plain_text_padded = b'My len is more uniform, promise!' + + self.cipher_text = b'\xb6\xa7\xec.\xb9\x9bG\xcb\xe9{\x91[\x12\xfc\x84D\x1c' \ + b'\x93\xd9\x17\x03\xcd\xd6\xb1D?\x98\xd2\xb5\xa5U\xfd' + + self.cipher_text_padded = b"W\xd1\xed'\x01\xa6c\xc3\xcb\xef\xaa\xe5\x1d\x1a" \ + b"[\x1b\xdf\xcdI\x1f>Z\n\t\xb9\xd2=\xbaF\xd1\x8e'" + + @staticmethod + def test_sha1(): + string = 'Example string' + + hashsum = utils.sha1(string.encode('utf-8')) + expected = b'\nT\x92|\x8d\x06:)\x99\x04\x8e\xf8j?\xc4\x8e\xd3}m9' + + assert hashsum == expected, 'Invalid sha1 hashsum representation (should be {}, but is {})'\ + .format(expected, hashsum) + + def test_aes_encrypt(self): + value = AES.encrypt_ige(self.plain_text, self.key, self.iv) + assert value == self.cipher_text, ('Ciphered text ("{}") does not equal expected ("{}")' + .format(value, self.cipher_text)) + + value = AES.encrypt_ige(self.plain_text_padded, self.key, self.iv) + assert value == self.cipher_text_padded, ('Ciphered text ("{}") does not equal expected ("{}")' + .format(value, self.cipher_text_padded)) + + def test_aes_decrypt(self): + # The ciphered text must always be padded + value = AES.decrypt_ige(self.cipher_text_padded, self.key, self.iv) + assert value == self.plain_text_padded, ('Decrypted text ("{}") does not equal expected ("{}")' + .format(value, self.plain_text_padded)) + + @staticmethod + def test_calc_key(): + shared_key = b'\xbc\xd2m\xb7\xcav\xf4][\x88\x83\' \xf3\x11\x8as\xd04\x941\xae' \ + b'*O\x03\x86\x9a/H#\x1a\x8c\xb5j\xe9$\xe0IvCm^\xe70\x1a5C\t\x16' \ + b'\x03\xd2\x9d\xa9\x89\xd6\xce\x08P\x0fdr\xa0\xb3\xeb\xfecv\x1a' \ + b'\xdfJ\x14\x96\x98\x16\xa3G\xab\x04\x14!\\\xeb\n\xbcn\xdf\xc4%' \ + b'\xc6\t\xb7\x16\x14\x9c\'\x81\x15=\xb0\xaf\x0e\x0bR\xaa\x0466s' \ + b'\xf0\xcf\xb7\xb8>,D\x94x\xd7\xf8\xe0\x84\xcb%\xd3\x05\xb2\xe8' \ + b'\x95Mr?\xa2\xe8In\xf9\x0b[E\x9b\xaa\x0cX\x7f\x0ei\xde\xeed\x1d' \ + b'x/J\xce\xea^}0;\xa83B\xbbR\xa1\xbfe\x04\xb9\x1e\xa1"f=\xa5M@' \ + b'\x9e\xdd\x81\x80\xc9\xa5\xfb\xfcg\xdd\x15\x03p!\x0ffD\x16\x892' \ + b'\xea\xca\xb1A\x99O\xa94P\xa9\xa2\xc6;\xb2C9\x1dC5\xd2\r\xecL' \ + b'\xd9\xabw-\x03\ry\xc2v\x17]\x02\x15\x0cBa\x97\xce\xa5\xb1\xe4]' \ + b'\x8e\xe0,\xcfC{o\xfa\x99f\xa4pM\x00' + + # Calculate key being the client + msg_key = b'\xba\x1a\xcf\xda\xa8^Cbl\xfa\xb6\x0c:\x9b\xb0\xfc' + + key, iv = utils.calc_key(shared_key, msg_key, client=True) + expected_key = b"\xaf\xe3\x84Qm\xe0!\x0c\xd91\xe4\x9a\xa0v_gcx\xa1\xb0\xc9\xbc\x16'v\xcf,\x9dM\xae\xc6\xa5" + expected_iv = b'\xb8Q\xf3\xc5\xa3]\xc6\xdf\x9e\xe0Q\xbd"\x8d\x13\t\x0e\x9a\x9d^8\xa2\xf8\xe7\x00w\xd9\xc1\xa7\xa0\xf7\x0f' + + assert key == expected_key, 'Invalid key (expected ("{}"), got ("{}"))'.format(expected_key, key) + assert iv == expected_iv, 'Invalid IV (expected ("{}"), got ("{}"))'.format(expected_iv, iv) + + # Calculate key being the server + msg_key = b'\x86m\x92i\xcf\x8b\x93\xaa\x86K\x1fi\xd04\x83]' + + key, iv = utils.calc_key(shared_key, msg_key, client=False) + expected_key = b'\xdd0X\xb6\x93\x8e\xc9y\xef\x83\xf8\x8cj\xa7h\x03\xe2\xc6\xb16\xc5\xbb\xfc\xe7\xdf\xd6\xb1g\xf7u\xcfk' + expected_iv = b'\xdcL\xc2\x18\x01J"X\x86lb\xb6\xb547\xfd\xe2a4\xb6\xaf}FS\xd7[\xe0N\r\x19\xfb\xbc' + + assert key == expected_key, 'Invalid key (expected ("{}"), got ("{}"))'.format(expected_key, key) + assert iv == expected_iv, 'Invalid IV (expected ("{}"), got ("{}"))'.format(expected_iv, iv) + + @staticmethod + def test_calc_msg_key(): + value = utils.calc_msg_key(b'Some random message') + expected = b'\xdfAa\xfc\x10\xab\x89\xd2\xfe\x19C\xf1\xdd~\xbf\x81' + assert value == expected, 'Value ("{}") does not equal expected ("{}")'.format(value, expected) + + @staticmethod + def test_generate_key_data_from_nonces(): + server_nonce = b'I am the server nonce.' + new_nonce = b'I am a new calculated nonce.' + + key, iv = utils.generate_key_data_from_nonces(server_nonce, new_nonce) + expected_key = b'?\xc4\xbd\xdf\rWU\x8a\xf5\x0f+V\xdc\x96up\x1d\xeeG\x00\x81|\x1eg\x8a\x8f{\xf0y\x80\xda\xde' + expected_iv = b'Q\x9dpZ\xb7\xdd\xcb\x82_\xfa\xf4\x90\xecn\x10\x9cD\xd2\x01\x8d\x83\xa0\xa4^\xb8\x91,\x7fI am' + + assert key == expected_key, 'Key ("{}") does not equal expected ("{}")'.format(key, expected_key) + assert iv == expected_iv, 'Key ("{}") does not equal expected ("{}")'.format(key, expected_iv) + + @staticmethod + def test_factorizator(): + pq = 3118979781119966969 + p, q = Factorizator.factorize(pq) + + assert p == 1719614201, 'Factorized pair did not yield the correct result' + assert q == 1813767169, 'Factorized pair did not yield the correct result' diff --git a/unittests/network_tests.py b/unittests/network_tests.py new file mode 100644 index 00000000..5947ba22 --- /dev/null +++ b/unittests/network_tests.py @@ -0,0 +1,41 @@ +import random +import socket +import threading +import unittest + +from network import TcpTransport, TcpClient +import network.authenticator + + +def run_server_echo_thread(port): + def server_thread(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('', port)) + s.listen(1) + conn, addr = s.accept() + with conn: + data = conn.recv(16) + conn.send(data) + + server = threading.Thread(target=server_thread) + server.start() + + +class NetworkTests(unittest.TestCase): + @staticmethod + def test_tcp_client(): + port = random.randint(50000, 60000) # Arbitrary non-privileged port + run_server_echo_thread(port) + + msg = b'Unit testing...' + client = TcpClient() + client.connect('localhost', port) + client.write(msg) + assert msg == client.read(16), 'Read message does not equal sent message' + client.close() + + @staticmethod + def test_authenticator(): + transport = TcpTransport('149.154.167.91', 443) + network.authenticator.do_authentication(transport) + transport.close() diff --git a/unittests/parser_tests.py b/unittests/parser_tests.py new file mode 100644 index 00000000..fc366b45 --- /dev/null +++ b/unittests/parser_tests.py @@ -0,0 +1,5 @@ +import unittest + + +class ParserTests(unittest.TestCase): + """There are no tests yet""" diff --git a/unittests/tl_tests.py b/unittests/tl_tests.py new file mode 100644 index 00000000..37f0bbe5 --- /dev/null +++ b/unittests/tl_tests.py @@ -0,0 +1,5 @@ +import unittest + + +class TLTests(unittest.TestCase): + """There are no tests yet""" diff --git a/unittests/utils_tests.py b/unittests/utils_tests.py new file mode 100644 index 00000000..cff23965 --- /dev/null +++ b/unittests/utils_tests.py @@ -0,0 +1,81 @@ +import unittest + +import utils +from utils import BinaryReader, BinaryWriter + + +class UtilsTests(unittest.TestCase): + @staticmethod + def test_binary_writer_reader(): + # Test that we can write and read properly + with BinaryWriter() as writer: + writer.write_byte(1) + writer.write_int(5) + writer.write_long(13) + writer.write_float(17.0) + writer.write_double(25.0) + writer.write(bytes([26, 27, 28, 29, 30, 31, 32])) + writer.write_large_int(2 ** 127, 128, signed=False) + + data = writer.get_bytes() + expected = b'\x01\x05\x00\x00\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\x88A\x00\x00\x00\x00\x00\x00' \ + b'9@\x1a\x1b\x1c\x1d\x1e\x1f \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80' + + assert data == expected, 'Retrieved data does not match the expected value' + + with BinaryReader(data) as reader: + value = reader.read_byte() + assert value == 1, 'Example byte should be 1 but is {}'.format(value) + + value = reader.read_int() + assert value == 5, 'Example integer should be 5 but is {}'.format(value) + + value = reader.read_long() + assert value == 13, 'Example long integer should be 13 but is {}'.format(value) + + value = reader.read_float() + assert value == 17.0, 'Example float should be 17.0 but is {}'.format(value) + + value = reader.read_double() + assert value == 25.0, 'Example double should be 25.0 but is {}'.format(value) + + value = reader.read(7) + assert value == bytes([26, 27, 28, 29, 30, 31, 32]), 'Example bytes should be {} but is {}' \ + .format(bytes([26, 27, 28, 29, 30, 31, 32]), value) + + value = reader.read_large_int(128, signed=False) + assert value == 2 ** 127, 'Example large integer should be {} but is {}'.format(2 ** 127, value) + + # Test Telegram that types are written right + with BinaryWriter() as writer: + writer.write_int(0x60469778) + buffer = writer.get_bytes() + valid = b'\x78\x97\x46\x60' # Tested written bytes using TLSharp and C#'s MemoryStream + + assert buffer == valid, "Written type should be {} but is {}".format(list(valid), list(buffer)) + + @staticmethod + def test_binary_tgwriter_tgreader(): + small_data = utils.generate_random_bytes(33) + small_data_padded = utils.generate_random_bytes(19) # +1 byte for length = 20 (evenly divisible by 4) + + large_data = utils.generate_random_bytes(999) + large_data_padded = utils.generate_random_bytes(1024) + + data = (small_data, small_data_padded, large_data, large_data_padded) + string = 'Testing Telegram strings, this should work properly!' + + with BinaryWriter() as writer: + # First write the data + for datum in data: + writer.tgwrite_bytes(datum) + writer.tgwrite_string(string) + + with BinaryReader(writer.get_bytes()) as reader: + # And then try reading it without errors (it should be unharmed!) + for datum in data: + value = reader.tgread_bytes() + assert value == datum, 'Example bytes should be {} but is {}'.format(datum, value) + + value = reader.tgread_string() + assert value == string, 'Example string should be {} but is {}'.format(string, value) diff --git a/utils/binary_reader.py b/utils/binary_reader.py index ea4f57d5..b267dc13 100755 --- a/utils/binary_reader.py +++ b/utils/binary_reader.py @@ -1,5 +1,5 @@ from io import BytesIO, BufferedReader -from tl import tlobjects +from tl.all_tlobjects import tlobjects from struct import unpack from errors import * import inspect diff --git a/utils/helpers.py b/utils/helpers.py index bdc331bb..badccb91 100755 --- a/utils/helpers.py +++ b/utils/helpers.py @@ -2,8 +2,7 @@ import os from utils import BinaryWriter import hashlib - -bits_per_byte = 8 +# region Multiple utilities def generate_random_long(signed=True): @@ -16,12 +15,24 @@ def generate_random_bytes(count): return os.urandom(count) -def get_byte_array(integer, signed): - """Gets the arbitrary-length byte array corresponding to the given integer""" - bits = integer.bit_length() - byte_length = (bits + bits_per_byte - 1) // bits_per_byte - # For some strange reason, this has to be big! - return int.to_bytes(integer, length=byte_length, byteorder='big', signed=signed) +def load_settings(path='api/settings'): + """Loads the user settings located under `api/`""" + settings = {} + with open(path, 'r', encoding='utf-8') as file: + for line in file: + value_pair = line.split('=') + left = value_pair[0].strip() + right = value_pair[1].strip() + if right.isnumeric(): + settings[left] = int(right) + else: + settings[left] = right + + return settings + +# endregion + +# region Cryptographic related utils def calc_key(shared_key, msg_key, client): @@ -44,11 +55,6 @@ def calc_msg_key(data): return sha1(data)[4:20] -def calc_msg_key_offset(data, offset, limit): - """Calculates the message key from offset given data, with an optional offset and limit""" - return sha1(data[offset:offset + limit])[4:20] - - def generate_key_data_from_nonces(server_nonce, new_nonce): """Generates the key data corresponding to the given nonces""" hash1 = sha1(bytes(new_nonce + server_nonce)) @@ -73,18 +79,4 @@ def sha1(data): sha.update(data) return sha.digest() - -def load_settings(): - """Loads the user settings located under `api/`""" - settings = {} - with open('api/settings', 'r', encoding='utf-8') as file: - for line in file: - value_pair = line.split('=') - left = value_pair[0].strip() - right = value_pair[1].strip() - if right.isnumeric(): - settings[left] = int(right) - else: - settings[left] = right - - return settings +# endregion