Completely refactored unit tests, removed unused code

This commit is contained in:
Lonami 2016-09-08 16:11:37 +02:00
parent a4f68dd29a
commit b2425eeea9
21 changed files with 398 additions and 460 deletions

View File

@ -35,7 +35,7 @@ class AES:
@staticmethod
def encrypt_ige(plain_text, key, iv):
"""Encrypts the given text in 16-bytes blocks by using the given key and 32-bytes initialization vector"""
# TODO: Random padding
# TODO: Random padding?
if len(plain_text) % 16 != 0: # Add padding if and only if it's not evenly divisible by 16 already
padding = bytes(16 - len(plain_text) % 16)
plain_text += padding

View File

@ -1,18 +1,13 @@
# This file is based on TLSharp
# https://github.com/sochix/TLSharp/blob/master/TLSharp.Core/MTProto/Crypto/AuthKey.cs
import utils
from errors import *
from utils import BinaryWriter, BinaryReader
import utils
class AuthKey:
def __init__(self, gab=None, data=None):
if gab:
self.key = utils.get_byte_array(gab, signed=False)
elif data:
def __init__(self, data):
self.key = data
else:
raise InvalidParameterError('Either a gab integer or data bytes array must be provided')
with BinaryReader(utils.sha1(self.key)) as reader:
self.aux_hash = reader.read_long(signed=False)
@ -26,5 +21,5 @@ class AuthKey:
writer.write_byte(number)
writer.write_long(self.aux_hash, signed=False)
new_nonce_hash = utils.sha1(writer.get_bytes())[4:20]
new_nonce_hash = utils.calc_msg_key(writer.get_bytes())
return new_nonce_hash

View File

@ -26,16 +26,13 @@ class RSAServerKey:
if length < 235:
writer.write(utils.generate_random_bytes(235 - length))
cipher_text = utils.get_byte_array(
pow(int.from_bytes(writer.get_bytes(), byteorder='big'), self.e, self.m),
signed=False)
result = int.from_bytes(writer.get_bytes(), byteorder='big')
result = pow(result, self.e, self.m)
if len(cipher_text) == 256:
return cipher_text
else:
padding = bytes(256 - len(cipher_text))
return padding + cipher_text
# If the result byte count is less than 256, since the byte order is big,
# the non-used bytes on the left will be 0 and act as padding,
# without need of any additional checks
return int.to_bytes(result, length=256, byteorder='big', signed=False)
class RSA:

View File

@ -1,6 +1,5 @@
from .mtproto_plain_sender import MtProtoPlainSender
from .tcp_client import TcpClient
from .tcp_message import TcpMessage
from .authenticator import do_authentication
from .mtproto_sender import MtProtoSender
from .tcp_transport import TcpTransport

View File

@ -37,9 +37,7 @@ def do_authentication(transport):
server_nonce = reader.read(16)
pq_bytes = reader.tgread_bytes()
# "string pq is a representation of a natural number (in binary big endian format)"
# See https://core.telegram.org/mtproto/auth_key#dh-exchange-initiation
pq = int.from_bytes(pq_bytes, byteorder='big')
pq = get_int(pq_bytes)
vector_id = reader.read_int()
if vector_id != 0x1cb5c415:
@ -55,9 +53,9 @@ def do_authentication(transport):
p, q = Factorizator.factorize(pq)
with BinaryWriter() as pq_inner_data_writer:
pq_inner_data_writer.write_int(0x83c95aec, signed=False) # PQ Inner Data
pq_inner_data_writer.tgwrite_bytes(utils.get_byte_array(pq, signed=False))
pq_inner_data_writer.tgwrite_bytes(utils.get_byte_array(min(p, q), signed=False))
pq_inner_data_writer.tgwrite_bytes(utils.get_byte_array(max(p, q), signed=False))
pq_inner_data_writer.tgwrite_bytes(get_byte_array(pq, signed=False))
pq_inner_data_writer.tgwrite_bytes(get_byte_array(min(p, q), signed=False))
pq_inner_data_writer.tgwrite_bytes(get_byte_array(max(p, q), signed=False))
pq_inner_data_writer.write(nonce)
pq_inner_data_writer.write(server_nonce)
pq_inner_data_writer.write(new_nonce)
@ -78,8 +76,8 @@ def do_authentication(transport):
req_dh_params_writer.write_int(0xd712e4be, signed=False) # Req DH Params
req_dh_params_writer.write(nonce)
req_dh_params_writer.write(server_nonce)
req_dh_params_writer.tgwrite_bytes(utils.get_byte_array(min(p, q), signed=False))
req_dh_params_writer.tgwrite_bytes(utils.get_byte_array(max(p, q), signed=False))
req_dh_params_writer.tgwrite_bytes(get_byte_array(min(p, q), signed=False))
req_dh_params_writer.tgwrite_bytes(get_byte_array(max(p, q), signed=False))
req_dh_params_writer.write(target_fingerprint)
req_dh_params_writer.tgwrite_bytes(cipher_text)
@ -127,15 +125,13 @@ def do_authentication(transport):
raise AssertionError('Invalid server nonce in encrypted answer')
g = dh_inner_data_reader.read_int()
# "current value of dh_prime equals (in big-endian byte order)"
# See https://core.telegram.org/mtproto/auth_key#presenting-proof-of-work-server-authentication
dh_prime = int.from_bytes(dh_inner_data_reader.tgread_bytes(), byteorder='big', signed=False)
ga = int.from_bytes(dh_inner_data_reader.tgread_bytes(), byteorder='big', signed=False)
dh_prime = get_int(dh_inner_data_reader.tgread_bytes(), signed=False)
ga = get_int(dh_inner_data_reader.tgread_bytes(), signed=False)
server_time = dh_inner_data_reader.read_int()
time_offset = server_time - int(time.time())
b = int.from_bytes(utils.generate_random_bytes(2048), byteorder='big', signed=False)
b = get_int(utils.generate_random_bytes(2048), signed=False)
gb = pow(g, b, dh_prime)
gab = pow(ga, b, dh_prime)
@ -145,7 +141,7 @@ def do_authentication(transport):
client_dh_inner_data_writer.write(nonce)
client_dh_inner_data_writer.write(server_nonce)
client_dh_inner_data_writer.write_long(0) # TODO retry_id
client_dh_inner_data_writer.tgwrite_bytes(utils.get_byte_array(gb, signed=False))
client_dh_inner_data_writer.tgwrite_bytes(get_byte_array(gb, signed=False))
with BinaryWriter() as client_dh_inner_data_with_hash_writer:
client_dh_inner_data_with_hash_writer.write(utils.sha1(client_dh_inner_data_writer.get_bytes()))
@ -178,7 +174,7 @@ def do_authentication(transport):
raise NotImplementedError('Invalid server nonce from server')
new_nonce_hash1 = reader.read(16)
auth_key = AuthKey(gab)
auth_key = AuthKey(get_byte_array(gab, signed=False))
new_nonce_hash_calculated = auth_key.calc_new_nonce_hash(new_nonce, 1)
if new_nonce_hash1 != new_nonce_hash_calculated:
@ -199,3 +195,20 @@ def do_authentication(transport):
def get_fingerprint_text(fingerprint):
"""Gets a fingerprint text in 01-23-45-67-89-AB-CD-EF format (no hyphens)"""
return ''.join(hex(b)[2:].rjust(2, '0').upper() for b in fingerprint)
# The following methods operate in big endian (unlike most of Telegram API) because:
# > "...pq is a representation of a natural number (in binary *big endian* format)..."
# > "...current value of dh_prime equals (in *big-endian* byte order)..."
# Reference: https://core.telegram.org/mtproto/auth_key
def get_byte_array(integer, signed):
"""Gets the arbitrary-length byte array corresponding to the given integer"""
bits = integer.bit_length()
byte_length = (bits + 8 - 1) // 8 # 8 bits per byte
return int.to_bytes(integer, length=byte_length, byteorder='big', signed=signed)
def get_int(byte_array, signed=True):
"""Gets the specified integer from its byte array. This should be used by the authenticator,
who requires the data to be in big endian"""
return int.from_bytes(byte_array, byteorder='big', signed=signed)

View File

@ -26,8 +26,8 @@ class MtProtoPlainSender:
def receive(self):
"""Receives a plain packet, returning the body of the response"""
result = self._transport.receive()
with BinaryReader(result.body) as reader:
seq, body = self._transport.receive()
with BinaryReader(body) as reader:
auth_key_id = reader.read_long()
msg_id = reader.read_long()
message_length = reader.read_int()

View File

@ -13,10 +13,9 @@ from tl.all_tlobjects import tlobjects
class MtProtoSender:
"""MTProto Mobile Protocol sender (https://core.telegram.org/mtproto/description)"""
def __init__(self, transport, session, swallow_errors=True):
def __init__(self, transport, session):
self.transport = transport
self.session = session
self.swallow_errors = swallow_errors
self.need_confirmation = [] # Message IDs that need confirmation
self.on_update_handlers = []
@ -59,19 +58,12 @@ class MtProtoSender:
def receive(self, request):
"""Receives the specified MTProtoRequest ("fills in it" the received data)"""
while not request.confirm_received:
try:
message, remote_msg_id, remote_sequence = self.decode_msg(self.transport.receive().body)
seq, body = self.transport.receive()
message, remote_msg_id, remote_sequence = self.decode_msg(body)
with BinaryReader(message) as reader:
self.process_msg(remote_msg_id, remote_sequence, reader, request)
except RPCError as error:
if self.swallow_errors:
print('A RPC error occurred when decoding a message: {}'.format(error))
else:
raise error
# endregion
# region Low level processing

View File

@ -1,62 +0,0 @@
# This file is based on TLSharp
# https://github.com/sochix/TLSharp/blob/master/TLSharp.Core/Network/TcpMessage.cs
from utils import BinaryWriter, BinaryReader
from binascii import crc32
from errors import *
class TcpMessage:
def __init__(self, seq_number, body):
"""
:param seq_number: Sequence number
:param body: Message body byte array
"""
if body is None:
raise InvalidParameterError('body cannot be None')
self.sequence_number = seq_number
self.body = body
def encode(self):
"""Returns the bytes of the this message encoded, following Telegram's guidelines"""
with BinaryWriter() as writer:
''' https://core.telegram.org/mtproto#tcp-transport
4 length bytes are added at the front
(to include the length, the sequence number, and CRC32; always divisible by 4)
and 4 bytes with the packet sequence number within this TCP connection
(the first packet sent is numbered 0, the next one 1, etc.),
and 4 CRC32 bytes at the end (length, sequence number, and payload together).
'''
writer.write_int(len(self.body) + 12)
writer.write_int(self.sequence_number)
writer.write(self.body)
crc = crc32(writer.get_bytes())
writer.write_int(crc, signed=False)
return writer.get_bytes()
@staticmethod
def decode(body):
"""Returns a TcpMessage from the given encoded bytes, decoding them previously"""
if body is None:
raise InvalidParameterError('body cannot be None')
if len(body) < 12:
raise InvalidParameterError('Wrong size of input packet')
with BinaryReader(body) as reader:
packet_len = int.from_bytes(reader.read(4), byteorder='little')
if packet_len < 12:
raise InvalidParameterError('Invalid packet length in body: {}'.format(packet_len))
seq = reader.read_int()
packet = reader.read(packet_len - 12)
checksum = reader.read_int(signed=False)
valid_checksum = crc32(body[:packet_len - 4])
if checksum != valid_checksum:
raise InvalidChecksumError(checksum, valid_checksum)
return TcpMessage(seq, packet)

View File

@ -1,41 +1,49 @@
# This file is based on TLSharp
# https://github.com/sochix/TLSharp/blob/master/TLSharp.Core/Network/TcpTransport.cs
from network import TcpMessage, TcpClient
from network import TcpClient
from binascii import crc32
from errors import *
from utils import BinaryWriter
class TcpTransport:
def __init__(self, ip_address, port):
self._tcp_client = TcpClient()
self._send_counter = 0
self.tcp_client = TcpClient()
self.send_counter = 0
self._tcp_client.connect(ip_address, port)
self.tcp_client.connect(ip_address, port)
# Original reference: https://core.telegram.org/mtproto#tcp-transport
# The packets are encoded as: total length, sequence number, packet and checksum (CRC32)
def send(self, packet):
"""Sends the given packet (bytes array) to the connected peer"""
if not self._tcp_client.connected:
if not self.tcp_client.connected:
raise ConnectionError('Client not connected to server.')
# Get a TcpMessage which contains the given packet
tcp_message = TcpMessage(self._send_counter, packet)
with BinaryWriter() as writer:
writer.write_int(len(packet) + 12) # 12 = size_of (integer) * 3
writer.write_int(self.send_counter)
writer.write(packet)
self._tcp_client.write(tcp_message.encode())
self._send_counter += 1
crc = crc32(writer.get_bytes())
writer.write_int(crc, signed=False)
self.tcp_client.write(writer.get_bytes())
self.send_counter += 1
def receive(self):
"""Receives a TcpMessage from the connected peer"""
"""Receives a TCP message (tuple(sequence number, body)) from the connected peer"""
# First read everything
packet_length_bytes = self._tcp_client.read(4)
# First read everything we need
packet_length_bytes = self.tcp_client.read(4)
packet_length = int.from_bytes(packet_length_bytes, byteorder='little')
seq_bytes = self._tcp_client.read(4)
seq_bytes = self.tcp_client.read(4)
seq = int.from_bytes(seq_bytes, byteorder='little')
body = self._tcp_client.read(packet_length - 12)
body = self.tcp_client.read(packet_length - 12)
checksum = int.from_bytes(self._tcp_client.read(4), byteorder='little', signed=False)
checksum = int.from_bytes(self.tcp_client.read(4), byteorder='little', signed=False)
# Then perform the checks
rv = packet_length_bytes + seq_bytes + body
@ -44,9 +52,9 @@ class TcpTransport:
if checksum != valid_checksum:
raise InvalidChecksumError(checksum, valid_checksum)
# If we passed the tests, we can then return a valid TcpMessage
return TcpMessage(seq, body)
# If we passed the tests, we can then return a valid TCP message
return seq, body
def close(self):
if self._tcp_client.connected:
self._tcp_client.close()
if self.tcp_client.connected:
self.tcp_client.close()

View File

@ -22,12 +22,18 @@ class Session:
def save(self):
"""Saves the current session object as session_user_id.session"""
if self.session_user_id:
with open('{}.session'.format(self.session_user_id), 'wb') as file:
pickle.dump(self, file)
@staticmethod
def try_load_or_create_new(session_user_id):
"""Loads a saved session_user_id session, or creates a new one if none existed before"""
"""Loads a saved session_user_id session, or creates a new one if none existed before.
If the given session_user_id is None, we assume that it is for testing purposes"""
if session_user_id is None:
return Session(None)
else:
filepath = '{}.session'.format(session_user_id)
if file_exists(filepath):

View File

@ -45,17 +45,18 @@ class TelegramClient:
"""Connects to the Telegram servers, executing authentication if required.
Note that authenticating to the Telegram servers is not the same as authenticating
the app, which requires to send a code first."""
try:
if not self.session.auth_key or reconnect:
self.session.auth_key, self.session.time_offset = network.authenticator.do_authentication(self.transport)
self.session.auth_key, self.session.time_offset = \
network.authenticator.do_authentication(self.transport)
self.session.save()
self.sender = MtProtoSender(self.transport, self.session)
self.sender.add_update_handler(self.on_update)
# Always init connection by using the latest layer, not only when not reconnecting (as in original TLSharp's)
# Otherwise, the server thinks that were using the oldest layer!
# (Note that this is mainly untested, but it seems like it since some errors point in that direction)
# Now it's time to send an InitConnectionRequest
# This must always be invoked with the layer we'll be using
request = InvokeWithLayerRequest(layer=self.layer,
query=InitConnectionRequest(api_id=self.api_id,
device_model=platform.node(),
@ -69,6 +70,9 @@ class TelegramClient:
self.dc_options = request.result.dc_options
return True
except RPCError as error:
print('Could not stabilise initial connection: {}'.format(error))
return False
def reconnect_to_dc(self, dc_id):
"""Reconnects to the specified DC ID. This is automatically called after an InvalidDCError is raised"""

View File

@ -1,273 +0,0 @@
import random
import socket
import threading
import unittest
import os
import platform
from tl import Session
from tl.functions import InvokeWithLayerRequest, InitConnectionRequest
from tl.functions.help import GetConfigRequest
from crypto import AES, Factorizator
from network import TcpTransport, TcpClient, MtProtoSender
from utils import BinaryWriter, BinaryReader
import utils
import network.authenticator
host = 'localhost'
port = random.randint(50000, 60000) # Arbitrary non-privileged port
def run_server_echo_thread():
def server_thread():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('', port))
s.listen(1)
conn, addr = s.accept()
with conn:
data = conn.recv(16)
conn.send(data)
server = threading.Thread(target=server_thread)
server.start()
def get_representation(bytez):
return '-'.join(hex(b)[2:].rjust(2, '0').upper() for b in bytez)
def get_bytes(representation):
return bytes([int(b, 16) for b in representation.split('-')])
class UnitTest(unittest.TestCase):
@staticmethod
def test_tcp_client():
client = TcpClient()
run_server_echo_thread()
try:
client.connect(host, port)
except:
raise AssertionError('Could connect to the server')
try:
client.write('Unit testing...'.encode('ascii'))
except:
raise AssertionError('Could not send a message to the server')
try:
client.read(16)
except:
raise AssertionError('Could not read a message to the server')
try:
client.close()
except:
raise AssertionError('Could not close the client')
@staticmethod
def test_binary_writer_reader():
# Test that we can write and read properly
with BinaryWriter() as writer:
writer.write_byte(1)
writer.write_int(5)
writer.write_long(13)
writer.write_float(17.0)
writer.write_double(25.0)
writer.write(bytes([26, 27, 28, 29, 30, 31, 32]))
writer.write_large_int(2**127, 128, signed=False)
data = writer.get_bytes()
assert data is not None, 'Example Data should not be None'
assert len(data) == 48, 'Example data length should be 48, but is {}'.format(len(data))
with BinaryReader(data) as reader:
value = reader.read_byte()
assert value == 1, 'Example byte should be 1 but is {}'.format(value)
value = reader.read_int()
assert value == 5, 'Example integer should be 5 but is {}'.format(value)
value = reader.read_long()
assert value == 13, 'Example long integer should be 13 but is {}'.format(value)
value = reader.read_float()
assert value == 17.0, 'Example float should be 17.0 but is {}'.format(value)
value = reader.read_double()
assert value == 25.0, 'Example double should be 25.0 but is {}'.format(value)
value = reader.read(7)
assert value == bytes([26, 27, 28, 29, 30, 31, 32]), 'Example bytes should be {} but is {}' \
.format(bytes([26, 27, 28, 29, 30, 31, 32]), value)
value = reader.read_large_int(128, signed=False)
assert value == 2 ** 127, 'Example large integer should be {} but is {}'.format(2 ** 127, value)
# Test Telegram that types are written right
with BinaryWriter() as writer:
writer.write_int(0x60469778)
buffer = writer.get_bytes()
valid = b'\x78\x97\x46\x60' # Tested written bytes using TLSharp and C#'s MemoryStream
assert buffer == valid, "Written type should be {} but is {}".format(list(valid), list(buffer))
@staticmethod
def test_binary_tgwriter_tgreader():
string = 'Testing Telegram strings, this should work properly!'
small_data = utils.generate_random_bytes(20)
large_data = utils.generate_random_bytes(1024)
with BinaryWriter() as writer:
writer.tgwrite_string(string)
writer.tgwrite_bytes(small_data)
writer.tgwrite_bytes(large_data)
data = writer.get_bytes()
assert data is not None, 'Example Data should not be None'
with BinaryReader(data) as reader:
value = reader.tgread_string()
assert value == string, 'Example string should be {} but is {}'.format(string, value)
value = reader.tgread_bytes()
assert value == small_data, 'Example bytes should be {} but is {}'.format(small_data, value)
value = reader.tgread_bytes()
assert value == large_data, 'Example bytes should be {} but is {}'.format(large_data, value)
@staticmethod
def test_factorizator():
pq = 3118979781119966969
p, q = Factorizator.factorize(pq)
assert p == 1719614201, 'Factorized pair did not yield the correct result'
assert q == 1813767169, 'Factorized pair did not yield the correct result'
@staticmethod
def test_to_byte_array():
for value, real in zip(
[3118979781119966969, # PQ
1667024975687354561, # PQ
1148985737, 1450866553], # Min, Max
['2B-48-D7-95-FB-47-FE-F9', # PQ
'17-22-76-62-13-8C-88-C1', # PQ
'44-7C-21-89', '56-7A-77-79'] # Min, Max
):
current = get_representation(utils.get_byte_array(value, signed=True))
assert real == current, 'Invalid byte array representation (expected {}, got {})'.format(current, real)
@staticmethod
def test_sha1():
string = 'Example string'
data = get_representation(string.encode('utf-8'))
real = '45-78-61-6D-70-6C-65-20-73-74-72-69-6E-67'
assert data == real, 'Invalid string representation (should be {}, but is {})'.format(real, data)
hashsum = get_representation(utils.sha1(get_bytes(data)))
real = '0A-54-92-7C-8D-06-3A-29-99-04-8E-F8-6A-3F-C4-8E-D3-7D-6D-39'
assert hashsum == real, 'Invalid sha1 hashsum representation (should be {}, but is {})'.format(real, data)
@staticmethod
def test_bytes_to_int():
bytez = b'\x01\x23\x45\x67\x89\xab\xcd\xef'
reprs = get_representation(bytez)
real = '01-23-45-67-89-AB-CD-EF'
assert reprs == real, 'Invalid bytes representation (should be {} but is {})'.format(real, reprs)
assert bytez == get_bytes(reprs), 'Invalid representation to bytes conversion'
value = int.from_bytes(bytez, byteorder='big', signed=True)
real = 81985529216486895
assert value == real, 'Invalid bytes to int conversion (should be {} but is {})'.format(real, value)
# Now test more cases
for repr, real in zip(
['24-9D-FE-49-20-45-DF-C3', '60-44-F3-33', '61-5F-61-31'],
[2638544546736496579, 1615131443, 1633640753]
):
bytez = get_bytes(repr)
if len(bytez) > 8:
value = int.from_bytes(bytez, byteorder='little', signed=True)
else:
value = int.from_bytes(bytez, byteorder='big', signed=True)
assert value == real, 'Invalid bytes to int conversion (should be {} but is {})'.format(real, value)
@staticmethod
def test_aes_decrypt():
cipher_text = get_bytes('EF-5F-5D-57-7B-A5-95-B0-1D-B7-BF-E5-09-AC-64-5E-F9-ED-E3-28-FB-D7-59-78-16-F8-74-4A-51-A6-10-16-F5-EF-99-0B-E6-CA-A7-9D-FA-DD-7B-CD-39-BF-FB-F0-D4-B6-09-50-76-78-9D-6F-87-DD-57-33-B7-F4-48-4F-C1-78-73-66-22-E1-65-74-E6-7B-4F-EB-99-7B-2D-58-94-62-10-2A-A2-C8-95-B4-D4-EE-2E-C9-44-DA-54-EE-7A-86-72-34-14-54-4F-E3-F9-C5-3A-3F-7E-C3-28-29-2F-19-D4-40-DA-D4-E6-11-A4-6D-A7-8A-20-0A-C4-2A-55-DD-4A-1A-D1-57-18-86-4C-0F-BC-9C-F6-2C-D5-E1-FA-D4-08-48-F9-B2-49-61-BA-30-6A-9F-E0-3E-55-66-E0-57-55-8A-02-57-28-E1-C8-BD-4A-F7-26-6A-6E-2F-93-76-57-C4-E5-F3-96-F3-79-17-B8-16-15-C1-A4-21-11-0F-9A-4D-92-BB-DF-2F-8E-4C-3E-88-D6-41-CC-91-D4-BA-FF-5A-F1-D9-2C-9C-FA-F3-DD-DB-03-B0-1B-1C-32-8C-9C-DE-96-FA-AE-40-9C-F5-AC-15-BF-11-17-D3-0F-E4-F4-C5-46-E7-27-3C-47-91-FD-02-AC-FC-7F-84-0D-4A-BC-43-2A-7E-0E-B9-BD-93-8E-0F-C9-1B-CF-C3-61-EA-2A-73-D8-00-3C-6E-BA-33-63-A6-24-16-AB-AB-11-93-3D-7B-20-29-C6-37-43-CF-78-C7-25-CE-03-02-0C-58-B3-34-24-61-06-FE-DD-00-65-BC-51-99-6E-08-51-8B-8D-83-CA-F1-36-ED-94-F6-FF-19-30-1C-6D-4E-6F-8C-59-08-2D-60-E6-3D-A8-39-C2-FE-96-2D-CF-AD-15-F5-68-B1-5B-2A-2F-6D-86-92-D9-F8-45-4F-09-80-01-3C-D1-8B-37-88-AA-52-02-58-18-7B-B5-86-60-BE-2C-E6-EE-3D-85-02-F9-CC-09-4A-44-55-73-24-A1-9B-01-ED-B1-FD-FF-C8-E1-F4-78-A9-DB-DD-F0-50-2F-A7-AB-EF-3C-0F-FC-82-5B-D5-35-E0-32-60-2F-F1-A3-DF-BE-70-68-F5-1A-AB-21-55-34-A4-45-86-B2-75-36-D1-50-36-DC-4B-78-5F-7D-44-F9-83-A4-A3-68-E9-4B-08-FA-7F-55-85-55-31-35-E5-C1-14-A4-E3-E2-AC-8A-2D-64-36-6F-46-4E-B1-AF-FE-66-BF-38-DD-1D-40-BC-DA-3F-0A-48-91-BD-09-84-B6-42-35-2A-E9-3A-57-63-92-BB-94-44-91-C7-C4-46-79-C4-60-7F-95-88-53-24-CF-CE-0A-4C-28-9B-B9-4F-10-68-CC-E3-A5-3F-F1-A1-43-4A-C8-FF-98-AC-F5-BF-36-7F-08-01-05-02-2F-C4-3D-EA-F9-0B-3E-99-FD-3A-C2-03-E7-D3-CE-79-2D-EE-F4-01-C8-6B-4B-AA-81-BA-49-D1-87-84-72-32-7F-6B-7F')
key = get_bytes('23-D7-11-75-77-78-5D-74-6F-74-C4-23-D3-99-66-E7-77-8B-5A-92-94-A5-88-C2-C3-D4-46-B7-F1-0C-D7-FA')
iv = get_bytes('F8-DC-50-26-71-C3-56-72-D7-07-78-17-57-D3-B6-5D-A6-EB-02-DB-7D-73-0A-0B-1C-29-15-40-CC-6C-03-7E')
plain_text = AES.decrypt_ige(cipher_text, key, iv)
real = get_bytes('FB-17-EB-54-86-B6-49-1C-DF-D8-24-E6-D9-82-37-44-66-18-84-9F-BA-0D-89-B5-81-0D-47-B2-1B-CB-56-3F-7F-69-3A-22-0A-30-39-71-EE-1F-40-B2-A7-1A-4B-BD-76-7E-7A-FD-20-68-58-5F-03-00-00-00-FE-00-01-00-C7-1C-AE-B9-C6-B1-C9-04-8E-6C-52-2F-70-F1-3F-73-98-0D-40-23-8E-3E-21-C1-49-34-D0-37-56-3D-93-0F-48-19-8A-0A-A7-C1-40-58-22-94-93-D2-25-30-F4-DB-FA-33-6F-6E-0A-C9-25-13-95-43-AE-D4-4C-CE-7C-37-20-FD-51-F6-94-58-70-5A-C6-8C-D4-FE-6B-6B-13-AB-DC-97-46-51-29-69-32-84-54-F1-8F-AF-8C-59-5F-64-24-77-FE-96-BB-2A-94-1D-5B-CD-1D-4A-C8-CC-49-88-07-08-FA-9B-37-8E-3C-4F-3A-90-60-BE-E6-7C-F9-A4-A4-A6-95-81-10-51-90-7E-16-27-53-B5-6B-0F-6B-41-0D-BA-74-D8-A8-4B-2A-14-B3-14-4E-0E-F1-28-47-54-FD-17-ED-95-0D-59-65-B4-B9-DD-46-58-2D-B1-17-8D-16-9C-6B-C4-65-B0-D6-FF-9C-A3-92-8F-EF-5B-9A-E4-E4-18-FC-15-E8-3E-BE-A0-F8-7F-A9-FF-5E-ED-70-05-0D-ED-28-49-F4-7B-F9-59-D9-56-85-0C-E9-29-85-1F-0D-81-15-F6-35-B1-05-EE-2E-4E-15-D0-4B-24-54-BF-6F-4F-AD-F0-34-B1-04-03-11-9C-D8-E3-B9-2F-CC-5B-FE-00-01-00-22-8A-8F-62-58-9E-D1-9F-4B-53-EC-FB-22-E0-52-6B-8E-09-2E-B6-4B-90-53-30-A7-1F-52-1F-5B-3C-8F-AC-12-5B-D3-35-22-2A-1E-3E-9F-BD-33-73-B3-5C-1A-A6-8E-01-35-B4-8C-92-AE-D9-A0-86-6C-EF-CA-C9-09-4E-3B-B8-E5-F6-76-EE-F9-E7-CE-F1-DF-9E-F1-2E-92-55-DF-CF-80-68-70-AD-D1-AB-B2-34-54-E0-BF-38-A6-F4-C4-5B-64-96-8F-C7-14-18-84-7A-0A-44-38-56-1A-E4-9E-16-81-9D-AF-CC-A5-0E-17-D1-9E-DB-DE-DD-14-7D-04-71-99-06-32-3E-92-CE-D4-6E-76-10-DF-17-D9-2E-97-6A-F0-81-CC-ED-D5-20-10-60-AD-D8-7B-C2-FB-7D-87-CD-1E-B7-0E-28-1A-78-61-9C-8A-CA-81-A0-03-B2-40-7F-AE-BB-E3-21-96-74-01-A6-E2-C0-D8-17-C9-19-78-AD-1C-51-12-DC-29-8F-50-CA-8A-2F-56-89-A4-AC-E0-0F-C1-71-E4-C7-33-71-AE-83-30-59-D1-33-C2-D6-A6-F1-E8-FC-1F-77-3D-ED-E9-BF-B5-1C-6F-C0-9E-81-4B-B2-5A-51-C3-94-6B-BD-AD-5C-FF-DD-4B-0C-DB-E8-DA-0D-CB-57-B5-AC-D0-95-9E-FC-8F-F8')
assert plain_text == real, 'Decrypted text does not equal the real value (expected "{}", got "{}")'\
.format(get_representation(real), get_representation(plain_text))
@staticmethod
def test_aes_encrypt():
original_text = get_bytes('CD-B7-90-A9-61-41-F7-AD-A2-F9-FA-68-A4-D9-F2-5D-D6-6A-2E-40-54-B6-43-66-81-0D-47-B2-1B-CB-56-3F-7F-69-3A-22-0A-30-39-71-EE-1F-40-B2-A7-1A-4B-BD-76-7E-7A-FD-20-68-58-5F-00-00-00-00-00-00-00-00-FE-00-01-00-53-5B-3B-61-71-BC-C5-2D-2C-E3-D3-DB-4E-BF-BC-C0-3A-D0-90-89-C5-2E-EB-86-10-B9-B4-4B-D9-CF-00-DD-DA-D9-12-5E-27-DD-00-D2-61-E2-8A-93-97-30-38-0E-AC-49-C5-A2-7A-C8-67-6B-2C-B0-3C-95-BA-E2-C3-AC-6D-F7-87-7E-0F-9F-F1-A1-FD-87-81-04-C5-F0-14-35-E6-67-5A-E4-4E-57-A8-8D-C2-66-AF-F9-0B-82-13-CD-BC-FF-26-68-90-81-FA-26-34-80-B5-A7-C3-69-15-B1-31-BE-46-C2-B7-14-6B-75-B8-F2-71-6A-27-5B-3B-30-CF-A9-97-65-C6-E5-7D-46-EC-12-D3-6D-8F-64-58-03-1E-66-24-D5-87-9A-8B-CE-E3-D1-71-9B-F2-A9-49-19-69-B4-0A-D0-97-0E-91-5E-B0-F3-C2-FB-FF-AC-1F-CD-30-7E-C0-79-9F-DE-E0-85-17-16-13-10-FF-E8-24-B3-71-B5-C2-BC-72-B3-39-DB-53-D3-52-CB-6C-48-19-6D-CA-98-FB-C2-D4-24-6F-FD-8C-68-31-2F-C9-F4-6F-9B-55-9B-A5-6D-B9-54-D0-53-BC-8B-EC-4B-3F-D2-3C-E9-5E-34-79-80-9A-D2-8C-B9-5F-95-7A-46-72-11-4E-E6')
key = get_bytes('23-D7-11-75-77-78-5D-74-6F-74-C4-23-D3-99-66-E7-77-8B-5A-92-94-A5-88-C2-C3-D4-46-B7-F1-0C-D7-FA')
iv = get_bytes('F8-DC-50-26-71-C3-56-72-D7-07-78-17-57-D3-B6-5D-A6-EB-02-DB-7D-73-0A-0B-1C-29-15-40-CC-6C-03-7E')
cipher_text = AES.encrypt_ige(original_text, key, iv)
real = get_bytes('57-A9-1C-BB-4A-B5-C7-B1-51-9E-A9-24-15-94-4B-63-CB-2F-50-93-B7-54-11-D8-F1-77-13-AE-DE-58-12-AF-C2-E1-10-1D-2C-38-7D-DD-A2-BD-6B-43-84-7E-B1-E5-51-69-62-36-A1-86-8D-02-25-B9-AA-0B-E2-32-13-2A-0F-D1-58-67-47-07-C9-FE-E0-0F-EE-EC-92-B8-65-BD-C4-69-31-B6-10-4E-8F-20-B6-8F-72-79-A0-A3-8F-63-37-4B-95-5F-A4-B0-E6-EE-49-BE-76-47-3E-F9-FF-AA-F6-B7-16-CD-24-09-B1-63-26-02-13-B8-99-BD-19-7F-E2-A5-91-BE-52-86-FF-EA-C9-05-3C-D6-19-AE-E6-D1-25-7F-38-AF-66-CF-F8-B7-E6-53-E3-F6-98-0D-EF-A8-BA-97-6F-20-09-69-29-73-12-5E-0F-77-10-DC-22-BB-23-25-1D-53-A6-10-26-EB-EB-5E-C4-86-04-F5-30-5B-BA-53-AE-EA-84-28-34-91-75-B8-F2-0B-74-D1-00-3A-7E-FE-F0-B5-BE-0E-86-21-61-5F-81-75-23-49-45-CB-07-57-78-AB-9B-80-A2-0A-46-DB-35-49-6A-08-5B-8B-55-4E-6E-1B-E0-E0-3E-E7-2A-06-A0-5D-4F-EA-71-1A-24-F4-F4-AF-95-4B-F2-9A-C5-FD-7F-6A-DD-61-2D-B3-29-DB-5B-D9-A8-CF-60-8F-36-85-04-B5-85-3F-78-EB-09-0C-B4-F4-2D-B8-67-71-2A-4B-1B-08-0C-18-A2-9E-30-13-0C-23-18-7A-43-73-D3-DC-38-F5-9A-4A-08-28-BD-A2-DC-A8-70-33-63-9E-A9-72-DF-72-A5-43-AB-A2')
assert cipher_text == real, 'Decrypted text does not equal the real value (expected "{}", got "{}")'\
.format(get_representation(real), get_representation(cipher_text))
@staticmethod
def test_calc_key():
shared_key = get_bytes('BC-D2-6D-B7-CA-76-F4-5D-5B-88-83-27-20-F3-11-8A-73-D0-34-94-31-AE-2A-4F-03-86-9A-2F-48-23-1A-8C-B5-6A-E9-24-E0-49-76-43-6D-5E-E7-30-1A-35-43-09-16-03-D2-9D-A9-89-D6-CE-08-50-0F-64-72-A0-B3-EB-FE-63-76-1A-DF-4A-14-96-98-16-A3-47-AB-04-14-21-5C-EB-0A-BC-6E-DF-C4-25-C6-09-B7-16-14-9C-27-81-15-3D-B0-AF-0E-0B-52-AA-04-36-36-73-F0-CF-B7-B8-3E-2C-44-94-78-D7-F8-E0-84-CB-25-D3-05-B2-E8-95-4D-72-3F-A2-E8-49-6E-F9-0B-5B-45-9B-AA-0C-58-7F-0E-69-DE-EE-64-1D-78-2F-4A-CE-EA-5E-7D-30-3B-A8-33-42-BB-52-A1-BF-65-04-B9-1E-A1-22-66-3D-A5-4D-40-9E-DD-81-80-C9-A5-FB-FC-67-DD-15-03-70-21-0F-66-44-16-89-32-EA-CA-B1-41-99-4F-A9-34-50-A9-A2-C6-3B-B2-43-39-1D-43-35-D2-0D-EC-4C-D9-AB-77-2D-03-0D-79-C2-76-17-5D-02-15-0C-42-61-97-CE-A5-B1-E4-5D-8E-E0-2C-CF-43-7B-6F-FA-99-66-A4-70-4D-00')
msg_keys = [
'BA-1A-CF-DA-A8-5E-43-62-6C-FA-B6-0C-3A-9B-B0-FC',
'86-6D-92-69-CF-8B-93-AA-86-4B-1F-69-D0-34-83-5D',
'A1-2F-C0-61-6F-60-1A-B0-33-8F-7D-27-08-C8-EA-15',
'3A-56-52-0F-B7-89-D7-80-F6-18-72-CD-09-B5-A8-8A',
'06-F7-84-F3-91-CC-8D-DC-7D-92-41-7A-7E-84-25-E4',
'78-4D-FC-AE-F4-C4-55-81-6D-DD-99-A7-DB-B8-A3-88'
]
are_client = [
True,
False,
True,
True,
False,
False
]
valid_keys = [
'AF-E3-84-51-6D-E0-21-0C-D9-31-E4-9A-A0-76-5F-67-63-78-A1-B0-C9-BC-16-27-76-CF-2C-9D-4D-AE-C6-A5',
'DD-30-58-B6-93-8E-C9-79-EF-83-F8-8C-6A-A7-68-03-E2-C6-B1-36-C5-BB-FC-E7-DF-D6-B1-67-F7-75-CF-6B',
'70-3F-66-AF-FE-0A-F3-1E-95-C3-25-48-8D-0F-A7-95-59-53-BF-DD-35-97-6E-7A-C0-5E-79-9C-9D-09-3B-7B',
'20-9F-82-E3-95-3F-9D-1E-EE-3E-F3-82-B0-8D-6E-76-26-5B-94-27-DD-7D-61-C3-AC-EB-FA-71-FF-0D-8F-08',
'AE-06-BF-F1-89-88-22-66-98-48-76-E6-BD-D9-39-36-2D-36-4E-CF-FD-47-D2-87-D7-49-F8-93-22-2E-66-02',
'D9-6B-43-D0-89-F7-C5-75-A2-4A-F2-2F-F0-17-5C-95-AE-FA-46-A5-95-AA-C3-B9-76-B0-3A-A8-0E-7B-EA-5D'
]
valid_ivs = [
'B8-51-F3-C5-A3-5D-C6-DF-9E-E0-51-BD-22-8D-13-09-0E-9A-9D-5E-38-A2-F8-E7-00-77-D9-C1-A7-A0-F7-0F',
'DC-4C-C2-18-01-4A-22-58-86-6C-62-B6-B5-34-37-FD-E2-61-34-B6-AF-7D-46-53-D7-5B-E0-4E-0D-19-FB-BC',
'68-BB-BA-7F-55-B9-EF-86-EE-20-5A-1A-45-4E-70-C3-48-56-A2-E9-2F-91-AD-74-23-FE-54-06-E5-68-04-E9',
'79-31-8D-F0-DC-31-60-D7-09-BC-66-F1-AB-0D-7C-CB-2A-AF-74-32-64-C5-B3-18-C4-ED-55-D9-F6-39-DD-F3',
'1F-51-66-06-05-54-B9-E0-52-6A-88-6A-70-0C-DA-8D-2B-CD-BF-8A-67-5E-1A-A7-DD-EA-1C-CE-4C-D4-34-3D',
'8E-00-97-5E-B7-A1-F7-3D-1C-16-03-CA-B3-ED-EA-80-64-8F-77-A6-C4-34-5B-B5-DC-5D-C9-EC-B7-F8-F4-76'
]
for msg_key, is_client, valid_key, valid_iv in zip(msg_keys, are_client, valid_keys, valid_ivs):
msg_key = get_bytes(msg_key)
key, iv = utils.calc_key(shared_key, msg_key, is_client)
assert get_representation(key) == valid_key
assert get_representation(iv) == valid_iv
@staticmethod
def test_authenticator():
transport = TcpTransport('149.154.167.91', 443)
network.authenticator.do_authentication(transport)
transport.close()
if __name__ == '__main__':
unittest.main()

23
unit_tests.py Normal file
View File

@ -0,0 +1,23 @@
import unittest
if __name__ == '__main__':
from unittests import CryptoTests, ParserTests, TLTests, UtilsTests, NetworkTests
test_classes = [CryptoTests, ParserTests, TLTests, UtilsTests]
network = input('Run network tests (y/n)?: ').lower() == 'y'
if network:
test_classes.append(NetworkTests)
loader = unittest.TestLoader()
suites_list = []
for test_class in test_classes:
suite = loader.loadTestsFromTestCase(test_class)
suites_list.append(suite)
big_suite = unittest.TestSuite(suites_list)
runner = unittest.TextTestRunner()
results = runner.run(big_suite)

5
unittests/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from .crypto_tests import CryptoTests
from .network_tests import NetworkTests
from .parser_tests import ParserTests
from .tl_tests import TLTests
from .utils_tests import UtilsTests

107
unittests/crypto_tests.py Normal file
View File

@ -0,0 +1,107 @@
import unittest
from crypto import AES
import utils.helpers as utils
from crypto import Factorizator
class CryptoTests(unittest.TestCase):
def setUp(self):
# Test known values
self.key = b'\xd1\xf4MXy\x0c\xf8/z,\xe9\xf9\xa4\x17\x04\xd9C\xc9\xaba\x81\xf3\xf8\xdd\xcb\x0c6\x92\x01\x1f\xc2y'
self.iv = b':\x02\x91x\x90Dj\xa6\x03\x90C\x08\x9e@X\xb5E\xffwy\xf3\x1c\xde\xde\xfbo\x8dm\xd6e.Z'
self.plain_text = b'Non encrypted text :D'
self.plain_text_padded = b'My len is more uniform, promise!'
self.cipher_text = b'\xb6\xa7\xec.\xb9\x9bG\xcb\xe9{\x91[\x12\xfc\x84D\x1c' \
b'\x93\xd9\x17\x03\xcd\xd6\xb1D?\x98\xd2\xb5\xa5U\xfd'
self.cipher_text_padded = b"W\xd1\xed'\x01\xa6c\xc3\xcb\xef\xaa\xe5\x1d\x1a" \
b"[\x1b\xdf\xcdI\x1f>Z\n\t\xb9\xd2=\xbaF\xd1\x8e'"
@staticmethod
def test_sha1():
string = 'Example string'
hashsum = utils.sha1(string.encode('utf-8'))
expected = b'\nT\x92|\x8d\x06:)\x99\x04\x8e\xf8j?\xc4\x8e\xd3}m9'
assert hashsum == expected, 'Invalid sha1 hashsum representation (should be {}, but is {})'\
.format(expected, hashsum)
def test_aes_encrypt(self):
value = AES.encrypt_ige(self.plain_text, self.key, self.iv)
assert value == self.cipher_text, ('Ciphered text ("{}") does not equal expected ("{}")'
.format(value, self.cipher_text))
value = AES.encrypt_ige(self.plain_text_padded, self.key, self.iv)
assert value == self.cipher_text_padded, ('Ciphered text ("{}") does not equal expected ("{}")'
.format(value, self.cipher_text_padded))
def test_aes_decrypt(self):
# The ciphered text must always be padded
value = AES.decrypt_ige(self.cipher_text_padded, self.key, self.iv)
assert value == self.plain_text_padded, ('Decrypted text ("{}") does not equal expected ("{}")'
.format(value, self.plain_text_padded))
@staticmethod
def test_calc_key():
shared_key = b'\xbc\xd2m\xb7\xcav\xf4][\x88\x83\' \xf3\x11\x8as\xd04\x941\xae' \
b'*O\x03\x86\x9a/H#\x1a\x8c\xb5j\xe9$\xe0IvCm^\xe70\x1a5C\t\x16' \
b'\x03\xd2\x9d\xa9\x89\xd6\xce\x08P\x0fdr\xa0\xb3\xeb\xfecv\x1a' \
b'\xdfJ\x14\x96\x98\x16\xa3G\xab\x04\x14!\\\xeb\n\xbcn\xdf\xc4%' \
b'\xc6\t\xb7\x16\x14\x9c\'\x81\x15=\xb0\xaf\x0e\x0bR\xaa\x0466s' \
b'\xf0\xcf\xb7\xb8>,D\x94x\xd7\xf8\xe0\x84\xcb%\xd3\x05\xb2\xe8' \
b'\x95Mr?\xa2\xe8In\xf9\x0b[E\x9b\xaa\x0cX\x7f\x0ei\xde\xeed\x1d' \
b'x/J\xce\xea^}0;\xa83B\xbbR\xa1\xbfe\x04\xb9\x1e\xa1"f=\xa5M@' \
b'\x9e\xdd\x81\x80\xc9\xa5\xfb\xfcg\xdd\x15\x03p!\x0ffD\x16\x892' \
b'\xea\xca\xb1A\x99O\xa94P\xa9\xa2\xc6;\xb2C9\x1dC5\xd2\r\xecL' \
b'\xd9\xabw-\x03\ry\xc2v\x17]\x02\x15\x0cBa\x97\xce\xa5\xb1\xe4]' \
b'\x8e\xe0,\xcfC{o\xfa\x99f\xa4pM\x00'
# Calculate key being the client
msg_key = b'\xba\x1a\xcf\xda\xa8^Cbl\xfa\xb6\x0c:\x9b\xb0\xfc'
key, iv = utils.calc_key(shared_key, msg_key, client=True)
expected_key = b"\xaf\xe3\x84Qm\xe0!\x0c\xd91\xe4\x9a\xa0v_gcx\xa1\xb0\xc9\xbc\x16'v\xcf,\x9dM\xae\xc6\xa5"
expected_iv = b'\xb8Q\xf3\xc5\xa3]\xc6\xdf\x9e\xe0Q\xbd"\x8d\x13\t\x0e\x9a\x9d^8\xa2\xf8\xe7\x00w\xd9\xc1\xa7\xa0\xf7\x0f'
assert key == expected_key, 'Invalid key (expected ("{}"), got ("{}"))'.format(expected_key, key)
assert iv == expected_iv, 'Invalid IV (expected ("{}"), got ("{}"))'.format(expected_iv, iv)
# Calculate key being the server
msg_key = b'\x86m\x92i\xcf\x8b\x93\xaa\x86K\x1fi\xd04\x83]'
key, iv = utils.calc_key(shared_key, msg_key, client=False)
expected_key = b'\xdd0X\xb6\x93\x8e\xc9y\xef\x83\xf8\x8cj\xa7h\x03\xe2\xc6\xb16\xc5\xbb\xfc\xe7\xdf\xd6\xb1g\xf7u\xcfk'
expected_iv = b'\xdcL\xc2\x18\x01J"X\x86lb\xb6\xb547\xfd\xe2a4\xb6\xaf}FS\xd7[\xe0N\r\x19\xfb\xbc'
assert key == expected_key, 'Invalid key (expected ("{}"), got ("{}"))'.format(expected_key, key)
assert iv == expected_iv, 'Invalid IV (expected ("{}"), got ("{}"))'.format(expected_iv, iv)
@staticmethod
def test_calc_msg_key():
value = utils.calc_msg_key(b'Some random message')
expected = b'\xdfAa\xfc\x10\xab\x89\xd2\xfe\x19C\xf1\xdd~\xbf\x81'
assert value == expected, 'Value ("{}") does not equal expected ("{}")'.format(value, expected)
@staticmethod
def test_generate_key_data_from_nonces():
server_nonce = b'I am the server nonce.'
new_nonce = b'I am a new calculated nonce.'
key, iv = utils.generate_key_data_from_nonces(server_nonce, new_nonce)
expected_key = b'?\xc4\xbd\xdf\rWU\x8a\xf5\x0f+V\xdc\x96up\x1d\xeeG\x00\x81|\x1eg\x8a\x8f{\xf0y\x80\xda\xde'
expected_iv = b'Q\x9dpZ\xb7\xdd\xcb\x82_\xfa\xf4\x90\xecn\x10\x9cD\xd2\x01\x8d\x83\xa0\xa4^\xb8\x91,\x7fI am'
assert key == expected_key, 'Key ("{}") does not equal expected ("{}")'.format(key, expected_key)
assert iv == expected_iv, 'Key ("{}") does not equal expected ("{}")'.format(key, expected_iv)
@staticmethod
def test_factorizator():
pq = 3118979781119966969
p, q = Factorizator.factorize(pq)
assert p == 1719614201, 'Factorized pair did not yield the correct result'
assert q == 1813767169, 'Factorized pair did not yield the correct result'

View File

@ -0,0 +1,41 @@
import random
import socket
import threading
import unittest
from network import TcpTransport, TcpClient
import network.authenticator
def run_server_echo_thread(port):
def server_thread():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('', port))
s.listen(1)
conn, addr = s.accept()
with conn:
data = conn.recv(16)
conn.send(data)
server = threading.Thread(target=server_thread)
server.start()
class NetworkTests(unittest.TestCase):
@staticmethod
def test_tcp_client():
port = random.randint(50000, 60000) # Arbitrary non-privileged port
run_server_echo_thread(port)
msg = b'Unit testing...'
client = TcpClient()
client.connect('localhost', port)
client.write(msg)
assert msg == client.read(16), 'Read message does not equal sent message'
client.close()
@staticmethod
def test_authenticator():
transport = TcpTransport('149.154.167.91', 443)
network.authenticator.do_authentication(transport)
transport.close()

View File

@ -0,0 +1,5 @@
import unittest
class ParserTests(unittest.TestCase):
"""There are no tests yet"""

5
unittests/tl_tests.py Normal file
View File

@ -0,0 +1,5 @@
import unittest
class TLTests(unittest.TestCase):
"""There are no tests yet"""

81
unittests/utils_tests.py Normal file
View File

@ -0,0 +1,81 @@
import unittest
import utils
from utils import BinaryReader, BinaryWriter
class UtilsTests(unittest.TestCase):
@staticmethod
def test_binary_writer_reader():
# Test that we can write and read properly
with BinaryWriter() as writer:
writer.write_byte(1)
writer.write_int(5)
writer.write_long(13)
writer.write_float(17.0)
writer.write_double(25.0)
writer.write(bytes([26, 27, 28, 29, 30, 31, 32]))
writer.write_large_int(2 ** 127, 128, signed=False)
data = writer.get_bytes()
expected = b'\x01\x05\x00\x00\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\x88A\x00\x00\x00\x00\x00\x00' \
b'9@\x1a\x1b\x1c\x1d\x1e\x1f \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80'
assert data == expected, 'Retrieved data does not match the expected value'
with BinaryReader(data) as reader:
value = reader.read_byte()
assert value == 1, 'Example byte should be 1 but is {}'.format(value)
value = reader.read_int()
assert value == 5, 'Example integer should be 5 but is {}'.format(value)
value = reader.read_long()
assert value == 13, 'Example long integer should be 13 but is {}'.format(value)
value = reader.read_float()
assert value == 17.0, 'Example float should be 17.0 but is {}'.format(value)
value = reader.read_double()
assert value == 25.0, 'Example double should be 25.0 but is {}'.format(value)
value = reader.read(7)
assert value == bytes([26, 27, 28, 29, 30, 31, 32]), 'Example bytes should be {} but is {}' \
.format(bytes([26, 27, 28, 29, 30, 31, 32]), value)
value = reader.read_large_int(128, signed=False)
assert value == 2 ** 127, 'Example large integer should be {} but is {}'.format(2 ** 127, value)
# Test Telegram that types are written right
with BinaryWriter() as writer:
writer.write_int(0x60469778)
buffer = writer.get_bytes()
valid = b'\x78\x97\x46\x60' # Tested written bytes using TLSharp and C#'s MemoryStream
assert buffer == valid, "Written type should be {} but is {}".format(list(valid), list(buffer))
@staticmethod
def test_binary_tgwriter_tgreader():
small_data = utils.generate_random_bytes(33)
small_data_padded = utils.generate_random_bytes(19) # +1 byte for length = 20 (evenly divisible by 4)
large_data = utils.generate_random_bytes(999)
large_data_padded = utils.generate_random_bytes(1024)
data = (small_data, small_data_padded, large_data, large_data_padded)
string = 'Testing Telegram strings, this should work properly!'
with BinaryWriter() as writer:
# First write the data
for datum in data:
writer.tgwrite_bytes(datum)
writer.tgwrite_string(string)
with BinaryReader(writer.get_bytes()) as reader:
# And then try reading it without errors (it should be unharmed!)
for datum in data:
value = reader.tgread_bytes()
assert value == datum, 'Example bytes should be {} but is {}'.format(datum, value)
value = reader.tgread_string()
assert value == string, 'Example string should be {} but is {}'.format(string, value)

View File

@ -1,5 +1,5 @@
from io import BytesIO, BufferedReader
from tl import tlobjects
from tl.all_tlobjects import tlobjects
from struct import unpack
from errors import *
import inspect

View File

@ -2,8 +2,7 @@ import os
from utils import BinaryWriter
import hashlib
bits_per_byte = 8
# region Multiple utilities
def generate_random_long(signed=True):
@ -16,12 +15,24 @@ def generate_random_bytes(count):
return os.urandom(count)
def get_byte_array(integer, signed):
"""Gets the arbitrary-length byte array corresponding to the given integer"""
bits = integer.bit_length()
byte_length = (bits + bits_per_byte - 1) // bits_per_byte
# For some strange reason, this has to be big!
return int.to_bytes(integer, length=byte_length, byteorder='big', signed=signed)
def load_settings(path='api/settings'):
"""Loads the user settings located under `api/`"""
settings = {}
with open(path, 'r', encoding='utf-8') as file:
for line in file:
value_pair = line.split('=')
left = value_pair[0].strip()
right = value_pair[1].strip()
if right.isnumeric():
settings[left] = int(right)
else:
settings[left] = right
return settings
# endregion
# region Cryptographic related utils
def calc_key(shared_key, msg_key, client):
@ -44,11 +55,6 @@ def calc_msg_key(data):
return sha1(data)[4:20]
def calc_msg_key_offset(data, offset, limit):
"""Calculates the message key from offset given data, with an optional offset and limit"""
return sha1(data[offset:offset + limit])[4:20]
def generate_key_data_from_nonces(server_nonce, new_nonce):
"""Generates the key data corresponding to the given nonces"""
hash1 = sha1(bytes(new_nonce + server_nonce))
@ -73,18 +79,4 @@ def sha1(data):
sha.update(data)
return sha.digest()
def load_settings():
"""Loads the user settings located under `api/`"""
settings = {}
with open('api/settings', 'r', encoding='utf-8') as file:
for line in file:
value_pair = line.split('=')
left = value_pair[0].strip()
right = value_pair[1].strip()
if right.isnumeric():
settings[left] = int(right)
else:
settings[left] = right
return settings
# endregion