From e712a9bf61ad5ddb1c343bed31a6f27788756834 Mon Sep 17 00:00:00 2001 From: Lonami Date: Tue, 30 Aug 2016 13:11:19 +0200 Subject: [PATCH] Wrote unit tests and found and fixed bugs --- main.py | 1 + network/tcp_client.py | 2 +- unit_test.py | 123 +++++++++++++++++++++++++++++++++++++++++ utils/binary_reader.py | 28 +++++++--- utils/binary_writer.py | 23 ++++---- utils/helpers.py | 14 +---- 6 files changed, 159 insertions(+), 32 deletions(-) create mode 100644 unit_test.py diff --git a/main.py b/main.py index b4a3863b..3cb9366f 100644 --- a/main.py +++ b/main.py @@ -7,3 +7,4 @@ if __name__ == '__main__': print('Done.') pass + diff --git a/network/tcp_client.py b/network/tcp_client.py index 66d18a5a..cdfb585f 100644 --- a/network/tcp_client.py +++ b/network/tcp_client.py @@ -21,4 +21,4 @@ class TcpClient: def read(self, buffer_size): """Reads (receives) the specified bytes from the connected peer""" - self.socket.recv(buffer_size) + return self.socket.recv(buffer_size) diff --git a/unit_test.py b/unit_test.py new file mode 100644 index 00000000..608de412 --- /dev/null +++ b/unit_test.py @@ -0,0 +1,123 @@ +import unittest +import socket +import threading +import random +from time import sleep +import utils.helpers as utils + +from network.tcp_client import TcpClient + +from utils.binary_reader import BinaryReader +from utils.binary_writer import BinaryWriter + + +host = 'localhost' +port = random.randint(50000, 60000) # Arbitrary non-privileged port + + +def run_server_echo_thread(): + def server_thread(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('', port)) + s.listen(1) + conn, addr = s.accept() + with conn: + data = conn.recv(16) + conn.send(data) + + server = threading.Thread(target=server_thread) + server.start() + + +class UnitTest(unittest.TestCase): + @staticmethod + def test_tcp_client(): + client = TcpClient() + run_server_echo_thread() + + try: + client.connect(host, port) + except: + raise AssertionError('Could connect to the server') + + try: + client.write('Unit testing...'.encode('ascii')) + except: + raise AssertionError('Could not send a message to the server') + + try: + client.read(16) + except: + raise AssertionError('Could not read a message to the server') + + try: + client.close() + except: + raise AssertionError('Could not close the client') + + @staticmethod + def test_binary_writer_reader(): + with BinaryWriter() as writer: + writer.write_byte(1) + writer.write_int(5) + writer.write_long(13) + writer.write_float(17.0) + writer.write_double(25.0) + writer.write(bytes([26, 27, 28, 29, 30, 31, 32])) + writer.write_large_int(2**127, 128, signed=False) + + data = writer.get_bytes() + assert data is not None, 'Example Data should not be None' + assert len(data) == 48, 'Example data length should be 48, but is {}'.format(len(data)) + + with BinaryReader(data) as reader: + value = reader.read_byte() + assert value == 1, 'Example byte should be 1 but is {}'.format(value) + + value = reader.read_int() + assert value == 5, 'Example integer should be 5 but is {}'.format(value) + + value = reader.read_long() + assert value == 13, 'Example long integer should be 13 but is {}'.format(value) + + value = reader.read_float() + assert value == 17.0, 'Example float should be 17.0 but is {}'.format(value) + + value = reader.read_double() + assert value == 25.0, 'Example double should be 25.0 but is {}'.format(value) + + value = reader.read(7) + assert value == bytes([26, 27, 28, 29, 30, 31, 32]), 'Example bytes should be {} but is {}'\ + .format(bytes([26, 27, 28, 29, 30, 31, 32]), value) + + value = reader.read_large_int(128, signed=False) + assert value == 2**127, 'Example large integer should be {} but is {}'.format(2**127, value) + + @staticmethod + def test_binary_tgwriter_tgreader(): + string = 'Testing Telegram strings, this should work properly!' + small_data = utils.generate_random_bytes(20) + large_data = utils.generate_random_bytes(1024) + + with BinaryWriter() as writer: + writer.tgwrite_string(string) + writer.tgwrite_bytes(small_data) + writer.tgwrite_bytes(large_data) + + data = writer.get_bytes() + assert data is not None, 'Example Data should not be None' + + with BinaryReader(data) as reader: + value = reader.tgread_string() + assert value == string, 'Example string should be {} but is {}'.format(string, value) + + value = reader.tgread_bytes() + assert value == small_data, 'Example bytes should be {} but is {}'.format(small_data, value) + + value = reader.tgread_bytes() + assert value == large_data, 'Example bytes should be {} but is {}'.format(large_data, value) + + + +if __name__ == '__main__': + unittest.main() diff --git a/utils/binary_reader.py b/utils/binary_reader.py index 2fe3e738..54081771 100644 --- a/utils/binary_reader.py +++ b/utils/binary_reader.py @@ -1,5 +1,6 @@ from io import BytesIO, BufferedReader from tl.all_tlobjects import tlobjects +from struct import unpack import os @@ -20,17 +21,30 @@ class BinaryReader: # region Reading + def read_byte(self): + """Reads a single byte value""" + return self.reader.read(1)[0] + def read_int(self, signed=True): """Reads an integer (4 bytes) value""" - return int.from_bytes(self.reader.read(4), signed=signed, byteorder='big') + return int.from_bytes(self.reader.read(4), byteorder='big', signed=signed) def read_long(self, signed=True): """Reads a long integer (8 bytes) value""" - return int.from_bytes(self.reader.read(8), signed=signed, byteorder='big') + return int.from_bytes(self.reader.read(8), byteorder='big', signed=signed) - def read_large_int(self, bits): + # Network is always big-endian, this is, '>' + def read_float(self): + """Reads a real floating point (4 bytes) value""" + return unpack('>f', self.reader.read(4))[0] + + def read_double(self): + """Reads a real floating point (8 bytes) value""" + return unpack('>d', self.reader.read(8))[0] + + def read_large_int(self, bits, signed=True): """Reads a n-bits long integer value""" - return int.from_bytes(self.reader.read(bits // 8), byteorder='big') + return int.from_bytes(self.reader.read(bits // 8), byteorder='big', signed=signed) def read(self, length): """Read the given amount of bytes""" @@ -38,7 +52,7 @@ class BinaryReader: def get_bytes(self): """Gets the byte array representing the current buffer as a whole""" - return self.stream.getbuffer() + return self.stream.getvalue() # endregion @@ -46,9 +60,9 @@ class BinaryReader: def tgread_bytes(self): """Reads a Telegram-encoded byte array, without the need of specifying its length""" - first_byte = self.read(1) + first_byte = self.read_byte() if first_byte == 254: - length = self.read(1) | (self.read(1) << 8) | (self.read(1) << 16) + length = self.read_byte() | (self.read_byte() << 8) | (self.read_byte() << 16) padding = length % 4 else: length = first_byte diff --git a/utils/binary_writer.py b/utils/binary_writer.py index f1c36e25..fa10d81b 100644 --- a/utils/binary_writer.py +++ b/utils/binary_writer.py @@ -23,31 +23,28 @@ class BinaryWriter: def write_int(self, value, signed=True): """Writes an integer value (4 bytes), which can or cannot be signed""" - if signed: - self.writer.write(pack('i', value)) - else: + if not signed: value &= 0xFFFFFFFF # Ensure it's unsigned (see http://stackoverflow.com/a/30092291/4759433) - self.writer.write(pack('I', value)) + self.writer.write(int.to_bytes(value, length=4, byteorder='big', signed=signed)) def write_long(self, value, signed=True): """Writes a long integer value (8 bytes), which can or cannot be signed""" - if signed: - self.writer.write(pack('q', value)) - else: + if not signed: value &= 0xFFFFFFFFFFFFFFFF - self.writer.write(pack('Q', value)) + self.writer.write(int.to_bytes(value, length=8, byteorder='big', signed=signed)) + # Network is always big-endian, this is, '>' when packing def write_float(self, value): """Writes a floating point value (4 bytes)""" - self.writer.write(pack('f', value)) + self.writer.write(pack('>f', value)) def write_double(self, value): """Writes a floating point value (8 bytes)""" - self.writer.write(pack('d', value)) + self.writer.write(pack('>d', value)) - def write_large_int(self, value, bits): + def write_large_int(self, value, bits, signed=True): """Writes a n-bits long integer value""" - self.writer.write(pack('{}B'.format(bits // 8), value)) + self.writer.write(int.to_bytes(value, length=bits // 8, byteorder='big', signed=signed)) def write(self, data): """Writes the given bytes array""" @@ -111,7 +108,7 @@ class BinaryWriter: """Get the current bytes array content from the buffer, optionally flushing first""" if flush: self.writer.flush() - self.stream.getbuffer() + return self.stream.getvalue() # with block def __enter__(self): diff --git a/utils/helpers.py b/utils/helpers.py index 68e6c910..d5a63c2b 100644 --- a/utils/helpers.py +++ b/utils/helpers.py @@ -1,24 +1,16 @@ -import random +import os from utils.binary_writer import BinaryWriter from hashlib import sha1 def generate_random_long(signed=True): """Generates a random long integer (8 bytes), which is optionally signed""" - result = random.getrandbits(64) - if not signed: - result &= 0xFFFFFFFFFFFFFFFF # Ensure it's unsigned - - return result + return int.from_bytes(os.urandom(8), signed=signed) def generate_random_bytes(count): """Generates a random bytes array""" - with BinaryWriter() as writer: - for _ in range(count): - writer.write(random.getrandbits(8)) - - return writer.get_bytes() + return os.urandom(count) def calc_key(shared_key, msg_key, client):