Wrote unit tests and found and fixed bugs

This commit is contained in:
Lonami 2016-08-30 13:11:19 +02:00
parent 557ec70237
commit e712a9bf61
6 changed files with 159 additions and 32 deletions

View File

@ -7,3 +7,4 @@ if __name__ == '__main__':
print('Done.') print('Done.')
pass pass

View File

@ -21,4 +21,4 @@ class TcpClient:
def read(self, buffer_size): def read(self, buffer_size):
"""Reads (receives) the specified bytes from the connected peer""" """Reads (receives) the specified bytes from the connected peer"""
self.socket.recv(buffer_size) return self.socket.recv(buffer_size)

123
unit_test.py Normal file
View File

@ -0,0 +1,123 @@
import unittest
import socket
import threading
import random
from time import sleep
import utils.helpers as utils
from network.tcp_client import TcpClient
from utils.binary_reader import BinaryReader
from utils.binary_writer import BinaryWriter
host = 'localhost'
port = random.randint(50000, 60000) # Arbitrary non-privileged port
def run_server_echo_thread():
def server_thread():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('', port))
s.listen(1)
conn, addr = s.accept()
with conn:
data = conn.recv(16)
conn.send(data)
server = threading.Thread(target=server_thread)
server.start()
class UnitTest(unittest.TestCase):
@staticmethod
def test_tcp_client():
client = TcpClient()
run_server_echo_thread()
try:
client.connect(host, port)
except:
raise AssertionError('Could connect to the server')
try:
client.write('Unit testing...'.encode('ascii'))
except:
raise AssertionError('Could not send a message to the server')
try:
client.read(16)
except:
raise AssertionError('Could not read a message to the server')
try:
client.close()
except:
raise AssertionError('Could not close the client')
@staticmethod
def test_binary_writer_reader():
with BinaryWriter() as writer:
writer.write_byte(1)
writer.write_int(5)
writer.write_long(13)
writer.write_float(17.0)
writer.write_double(25.0)
writer.write(bytes([26, 27, 28, 29, 30, 31, 32]))
writer.write_large_int(2**127, 128, signed=False)
data = writer.get_bytes()
assert data is not None, 'Example Data should not be None'
assert len(data) == 48, 'Example data length should be 48, but is {}'.format(len(data))
with BinaryReader(data) as reader:
value = reader.read_byte()
assert value == 1, 'Example byte should be 1 but is {}'.format(value)
value = reader.read_int()
assert value == 5, 'Example integer should be 5 but is {}'.format(value)
value = reader.read_long()
assert value == 13, 'Example long integer should be 13 but is {}'.format(value)
value = reader.read_float()
assert value == 17.0, 'Example float should be 17.0 but is {}'.format(value)
value = reader.read_double()
assert value == 25.0, 'Example double should be 25.0 but is {}'.format(value)
value = reader.read(7)
assert value == bytes([26, 27, 28, 29, 30, 31, 32]), 'Example bytes should be {} but is {}'\
.format(bytes([26, 27, 28, 29, 30, 31, 32]), value)
value = reader.read_large_int(128, signed=False)
assert value == 2**127, 'Example large integer should be {} but is {}'.format(2**127, value)
@staticmethod
def test_binary_tgwriter_tgreader():
string = 'Testing Telegram strings, this should work properly!'
small_data = utils.generate_random_bytes(20)
large_data = utils.generate_random_bytes(1024)
with BinaryWriter() as writer:
writer.tgwrite_string(string)
writer.tgwrite_bytes(small_data)
writer.tgwrite_bytes(large_data)
data = writer.get_bytes()
assert data is not None, 'Example Data should not be None'
with BinaryReader(data) as reader:
value = reader.tgread_string()
assert value == string, 'Example string should be {} but is {}'.format(string, value)
value = reader.tgread_bytes()
assert value == small_data, 'Example bytes should be {} but is {}'.format(small_data, value)
value = reader.tgread_bytes()
assert value == large_data, 'Example bytes should be {} but is {}'.format(large_data, value)
if __name__ == '__main__':
unittest.main()

View File

@ -1,5 +1,6 @@
from io import BytesIO, BufferedReader from io import BytesIO, BufferedReader
from tl.all_tlobjects import tlobjects from tl.all_tlobjects import tlobjects
from struct import unpack
import os import os
@ -20,17 +21,30 @@ class BinaryReader:
# region Reading # region Reading
def read_byte(self):
"""Reads a single byte value"""
return self.reader.read(1)[0]
def read_int(self, signed=True): def read_int(self, signed=True):
"""Reads an integer (4 bytes) value""" """Reads an integer (4 bytes) value"""
return int.from_bytes(self.reader.read(4), signed=signed, byteorder='big') return int.from_bytes(self.reader.read(4), byteorder='big', signed=signed)
def read_long(self, signed=True): def read_long(self, signed=True):
"""Reads a long integer (8 bytes) value""" """Reads a long integer (8 bytes) value"""
return int.from_bytes(self.reader.read(8), signed=signed, byteorder='big') return int.from_bytes(self.reader.read(8), byteorder='big', signed=signed)
def read_large_int(self, bits): # Network is always big-endian, this is, '>'
def read_float(self):
"""Reads a real floating point (4 bytes) value"""
return unpack('>f', self.reader.read(4))[0]
def read_double(self):
"""Reads a real floating point (8 bytes) value"""
return unpack('>d', self.reader.read(8))[0]
def read_large_int(self, bits, signed=True):
"""Reads a n-bits long integer value""" """Reads a n-bits long integer value"""
return int.from_bytes(self.reader.read(bits // 8), byteorder='big') return int.from_bytes(self.reader.read(bits // 8), byteorder='big', signed=signed)
def read(self, length): def read(self, length):
"""Read the given amount of bytes""" """Read the given amount of bytes"""
@ -38,7 +52,7 @@ class BinaryReader:
def get_bytes(self): def get_bytes(self):
"""Gets the byte array representing the current buffer as a whole""" """Gets the byte array representing the current buffer as a whole"""
return self.stream.getbuffer() return self.stream.getvalue()
# endregion # endregion
@ -46,9 +60,9 @@ class BinaryReader:
def tgread_bytes(self): def tgread_bytes(self):
"""Reads a Telegram-encoded byte array, without the need of specifying its length""" """Reads a Telegram-encoded byte array, without the need of specifying its length"""
first_byte = self.read(1) first_byte = self.read_byte()
if first_byte == 254: if first_byte == 254:
length = self.read(1) | (self.read(1) << 8) | (self.read(1) << 16) length = self.read_byte() | (self.read_byte() << 8) | (self.read_byte() << 16)
padding = length % 4 padding = length % 4
else: else:
length = first_byte length = first_byte

View File

@ -23,31 +23,28 @@ class BinaryWriter:
def write_int(self, value, signed=True): def write_int(self, value, signed=True):
"""Writes an integer value (4 bytes), which can or cannot be signed""" """Writes an integer value (4 bytes), which can or cannot be signed"""
if signed: if not signed:
self.writer.write(pack('i', value))
else:
value &= 0xFFFFFFFF # Ensure it's unsigned (see http://stackoverflow.com/a/30092291/4759433) value &= 0xFFFFFFFF # Ensure it's unsigned (see http://stackoverflow.com/a/30092291/4759433)
self.writer.write(pack('I', value)) self.writer.write(int.to_bytes(value, length=4, byteorder='big', signed=signed))
def write_long(self, value, signed=True): def write_long(self, value, signed=True):
"""Writes a long integer value (8 bytes), which can or cannot be signed""" """Writes a long integer value (8 bytes), which can or cannot be signed"""
if signed: if not signed:
self.writer.write(pack('q', value))
else:
value &= 0xFFFFFFFFFFFFFFFF value &= 0xFFFFFFFFFFFFFFFF
self.writer.write(pack('Q', value)) self.writer.write(int.to_bytes(value, length=8, byteorder='big', signed=signed))
# Network is always big-endian, this is, '>' when packing
def write_float(self, value): def write_float(self, value):
"""Writes a floating point value (4 bytes)""" """Writes a floating point value (4 bytes)"""
self.writer.write(pack('f', value)) self.writer.write(pack('>f', value))
def write_double(self, value): def write_double(self, value):
"""Writes a floating point value (8 bytes)""" """Writes a floating point value (8 bytes)"""
self.writer.write(pack('d', value)) self.writer.write(pack('>d', value))
def write_large_int(self, value, bits): def write_large_int(self, value, bits, signed=True):
"""Writes a n-bits long integer value""" """Writes a n-bits long integer value"""
self.writer.write(pack('{}B'.format(bits // 8), value)) self.writer.write(int.to_bytes(value, length=bits // 8, byteorder='big', signed=signed))
def write(self, data): def write(self, data):
"""Writes the given bytes array""" """Writes the given bytes array"""
@ -111,7 +108,7 @@ class BinaryWriter:
"""Get the current bytes array content from the buffer, optionally flushing first""" """Get the current bytes array content from the buffer, optionally flushing first"""
if flush: if flush:
self.writer.flush() self.writer.flush()
self.stream.getbuffer() return self.stream.getvalue()
# with block # with block
def __enter__(self): def __enter__(self):

View File

@ -1,24 +1,16 @@
import random import os
from utils.binary_writer import BinaryWriter from utils.binary_writer import BinaryWriter
from hashlib import sha1 from hashlib import sha1
def generate_random_long(signed=True): def generate_random_long(signed=True):
"""Generates a random long integer (8 bytes), which is optionally signed""" """Generates a random long integer (8 bytes), which is optionally signed"""
result = random.getrandbits(64) return int.from_bytes(os.urandom(8), signed=signed)
if not signed:
result &= 0xFFFFFFFFFFFFFFFF # Ensure it's unsigned
return result
def generate_random_bytes(count): def generate_random_bytes(count):
"""Generates a random bytes array""" """Generates a random bytes array"""
with BinaryWriter() as writer: return os.urandom(count)
for _ in range(count):
writer.write(random.getrandbits(8))
return writer.get_bytes()
def calc_key(shared_key, msg_key, client): def calc_key(shared_key, msg_key, client):