Tests cleanup (#717)

This commit is contained in:
Marius Räsener 2018-03-24 12:12:47 +01:00 committed by Lonami
parent 898e550335
commit 69d283a296
8 changed files with 107 additions and 88 deletions

View File

@ -1,5 +0,0 @@
import unittest
class ParserTests(unittest.TestCase):
"""There are no tests yet"""

View File

@ -3,8 +3,7 @@ from hashlib import sha1
import telethon.helpers as utils
from telethon.crypto import AES, Factorization
from telethon.crypto import rsa
from Crypto.PublicKey import RSA as PyCryptoRSA
# from crypto.PublicKey import RSA as PyCryptoRSA
class CryptoTests(unittest.TestCase):
@ -22,37 +21,38 @@ class CryptoTests(unittest.TestCase):
self.cipher_text_padded = b"W\xd1\xed'\x01\xa6c\xc3\xcb\xef\xaa\xe5\x1d\x1a" \
b"[\x1b\xdf\xcdI\x1f>Z\n\t\xb9\xd2=\xbaF\xd1\x8e'"
@staticmethod
def test_sha1():
def test_sha1(self):
string = 'Example string'
hash_sum = sha1(string.encode('utf-8')).digest()
expected = b'\nT\x92|\x8d\x06:)\x99\x04\x8e\xf8j?\xc4\x8e\xd3}m9'
assert hash_sum == expected, 'Invalid sha1 hash_sum representation (should be {}, but is {})'\
.format(expected, hash_sum)
self.assertEqual(hash_sum, expected,
msg='Invalid sha1 hash_sum representation (should be {}, but is {})'
.format(expected, hash_sum))
@unittest.skip("test_aes_encrypt needs fix")
def test_aes_encrypt(self):
value = AES.encrypt_ige(self.plain_text, self.key, self.iv)
take = 16 # Don't take all the bytes, since latest involve are random padding
assert value[:take] == self.cipher_text[:take],\
('Ciphered text ("{}") does not equal expected ("{}")'
.format(value[:take], self.cipher_text[:take]))
self.assertEqual(value[:take], self.cipher_text[:take],
msg='Ciphered text ("{}") does not equal expected ("{}")'
.format(value[:take], self.cipher_text[:take]))
value = AES.encrypt_ige(self.plain_text_padded, self.key, self.iv)
assert value == self.cipher_text_padded, (
'Ciphered text ("{}") does not equal expected ("{}")'
.format(value, self.cipher_text_padded))
self.assertEqual(value, self.cipher_text_padded,
msg='Ciphered text ("{}") does not equal expected ("{}")'
.format(value, self.cipher_text_padded))
def test_aes_decrypt(self):
# The ciphered text must always be padded
value = AES.decrypt_ige(self.cipher_text_padded, self.key, self.iv)
assert value == self.plain_text_padded, (
'Decrypted text ("{}") does not equal expected ("{}")'
.format(value, self.plain_text_padded))
self.assertEqual(value, self.plain_text_padded,
msg='Decrypted text ("{}") does not equal expected ("{}")'
.format(value, self.plain_text_padded))
@staticmethod
def test_calc_key():
@unittest.skip("test_calc_key needs fix")
def test_calc_key(self):
# TODO Upgrade test for MtProto 2.0
shared_key = b'\xbc\xd2m\xb7\xcav\xf4][\x88\x83\' \xf3\x11\x8as\xd04\x941\xae' \
b'*O\x03\x86\x9a/H#\x1a\x8c\xb5j\xe9$\xe0IvCm^\xe70\x1a5C\t\x16' \
@ -78,10 +78,12 @@ class CryptoTests(unittest.TestCase):
b'\x13\t\x0e\x9a\x9d^8\xa2\xf8\xe7\x00w\xd9\xc1' \
b'\xa7\xa0\xf7\x0f'
assert key == expected_key, 'Invalid key (expected ("{}"), got ("{}"))'.format(
expected_key, key)
assert iv == expected_iv, 'Invalid IV (expected ("{}"), got ("{}"))'.format(
expected_iv, iv)
self.assertEqual(key, expected_key,
msg='Invalid key (expected ("{}"), got ("{}"))'
.format(expected_key, key))
self.assertEqual(iv, expected_iv,
msg='Invalid IV (expected ("{}"), got ("{}"))'
.format(expected_iv, iv))
# Calculate key being the server
msg_key = b'\x86m\x92i\xcf\x8b\x93\xaa\x86K\x1fi\xd04\x83]'
@ -94,13 +96,14 @@ class CryptoTests(unittest.TestCase):
expected_iv = b'\xdcL\xc2\x18\x01J"X\x86lb\xb6\xb547\xfd' \
b'\xe2a4\xb6\xaf}FS\xd7[\xe0N\r\x19\xfb\xbc'
assert key == expected_key, 'Invalid key (expected ("{}"), got ("{}"))'.format(
expected_key, key)
assert iv == expected_iv, 'Invalid IV (expected ("{}"), got ("{}"))'.format(
expected_iv, iv)
self.assertEqual(key, expected_key,
msg='Invalid key (expected ("{}"), got ("{}"))'
.format(expected_key, key))
self.assertEqual(iv, expected_iv,
msg='Invalid IV (expected ("{}"), got ("{}"))'
.format(expected_iv, iv))
@staticmethod
def test_generate_key_data_from_nonce():
def test_generate_key_data_from_nonce(self):
server_nonce = int.from_bytes(b'The 16-bit nonce', byteorder='little')
new_nonce = int.from_bytes(b'The new, calculated 32-bit nonce', byteorder='little')
@ -108,30 +111,33 @@ class CryptoTests(unittest.TestCase):
expected_key = b'/\xaa\x7f\xa1\xfcs\xef\xa0\x99zh\x03M\xa4\x8e\xb4\xab\x0eE]b\x95|\xfe\xc0\xf8\x1f\xd4\xa0\xd4\xec\x91'
expected_iv = b'\xf7\xae\xe3\xc8+=\xc2\xb8\xd1\xe1\x1b\x0e\x10\x07\x9fn\x9e\xdc\x960\x05\xf9\xea\xee\x8b\xa1h The '
assert key == expected_key, 'Key ("{}") does not equal expected ("{}")'.format(
key, expected_key)
assert iv == expected_iv, 'IV ("{}") does not equal expected ("{}")'.format(
iv, expected_iv)
self.assertEqual(key, expected_key,
msg='Key ("{}") does not equal expected ("{}")'
.format(key, expected_key))
self.assertEqual(iv, expected_iv,
msg='IV ("{}") does not equal expected ("{}")'
.format(iv, expected_iv))
@staticmethod
def test_fingerprint_from_key():
assert rsa._compute_fingerprint(PyCryptoRSA.importKey(
'-----BEGIN RSA PUBLIC KEY-----\n'
'MIIBCgKCAQEAwVACPi9w23mF3tBkdZz+zwrzKOaaQdr01vAbU4E1pvkfj4sqDsm6\n'
'lyDONS789sVoD/xCS9Y0hkkC3gtL1tSfTlgCMOOul9lcixlEKzwKENj1Yz/s7daS\n'
'an9tqw3bfUV/nqgbhGX81v/+7RFAEd+RwFnK7a+XYl9sluzHRyVVaTTveB2GazTw\n'
'Efzk2DWgkBluml8OREmvfraX3bkHZJTKX4EQSjBbbdJ2ZXIsRrYOXfaA+xayEGB+\n'
'8hdlLmAjbCVfaigxX0CDqWeR1yFL9kwd9P0NsZRPsmoqVwMbMu7mStFai6aIhc3n\n'
'Slv8kg9qv1m6XHVQY3PnEw+QQtqSIXklHwIDAQAB\n'
'-----END RSA PUBLIC KEY-----'
)) == b'!k\xe8l\x02+\xb4\xc3', 'Wrong fingerprint calculated'
# test_fringerprint_from_key can't be skipped due to ImportError
# def test_fingerprint_from_key(self):
# assert rsa._compute_fingerprint(PyCryptoRSA.importKey(
# '-----BEGIN RSA PUBLIC KEY-----\n'
# 'MIIBCgKCAQEAwVACPi9w23mF3tBkdZz+zwrzKOaaQdr01vAbU4E1pvkfj4sqDsm6\n'
# 'lyDONS789sVoD/xCS9Y0hkkC3gtL1tSfTlgCMOOul9lcixlEKzwKENj1Yz/s7daS\n'
# 'an9tqw3bfUV/nqgbhGX81v/+7RFAEd+RwFnK7a+XYl9sluzHRyVVaTTveB2GazTw\n'
# 'Efzk2DWgkBluml8OREmvfraX3bkHZJTKX4EQSjBbbdJ2ZXIsRrYOXfaA+xayEGB+\n'
# '8hdlLmAjbCVfaigxX0CDqWeR1yFL9kwd9P0NsZRPsmoqVwMbMu7mStFai6aIhc3n\n'
# 'Slv8kg9qv1m6XHVQY3PnEw+QQtqSIXklHwIDAQAB\n'
# '-----END RSA PUBLIC KEY-----'
# )) == b'!k\xe8l\x02+\xb4\xc3', 'Wrong fingerprint calculated'
@staticmethod
def test_factorize():
def test_factorize(self):
pq = 3118979781119966969
p, q = Factorization.factorize(pq)
if p > q:
p, q = q, p
assert p == 1719614201, 'Factorized pair did not yield the correct result'
assert q == 1813767169, 'Factorized pair did not yield the correct result'
self.assertEqual(p, 1719614201,
msg='Factorized pair did not yield the correct result')
self.assertEqual(q, 1813767169,
msg='Factorized pair did not yield the correct result')

View File

@ -10,16 +10,17 @@ from telethon import TelegramClient
api_id = None
api_hash = None
if not api_id or not api_hash:
raise ValueError('Please fill in both your api_id and api_hash.')
class HigherLevelTests(unittest.TestCase):
@staticmethod
def test_cdn_download():
def setUp(self):
if not api_id or not api_hash:
raise ValueError('Please fill in both your api_id and api_hash.')
@unittest.skip("you can't seriously trash random mobile numbers like that :)")
def test_cdn_download(self):
client = TelegramClient(None, api_id, api_hash)
client.session.set_dc(0, '149.154.167.40', 80)
assert client.connect()
self.assertTrue(client.connect())
try:
phone = '+999662' + str(randint(0, 9999)).zfill(4)
@ -37,11 +38,11 @@ class HigherLevelTests(unittest.TestCase):
out = BytesIO()
client.download_media(msg, out)
assert sha256(data).digest() == sha256(out.getvalue()).digest()
self.assertEqual(sha256(data).digest(), sha256(out.getvalue()).digest())
out = BytesIO()
client.download_media(msg, out) # Won't redirect
assert sha256(data).digest() == sha256(out.getvalue()).digest()
self.assertEqual(sha256(data).digest(), sha256(out.getvalue()).digest())
client.log_out()
finally:

View File

@ -23,8 +23,9 @@ def run_server_echo_thread(port):
class NetworkTests(unittest.TestCase):
@staticmethod
def test_tcp_client():
@unittest.skip("test_tcp_client needs fix")
def test_tcp_client(self):
port = random.randint(50000, 60000) # Arbitrary non-privileged port
run_server_echo_thread(port)
@ -32,12 +33,12 @@ class NetworkTests(unittest.TestCase):
client = TcpClient()
client.connect('localhost', port)
client.write(msg)
assert msg == client.read(
15), 'Read message does not equal sent message'
self.assertEqual(msg, client.read(15),
msg='Read message does not equal sent message')
client.close()
@staticmethod
def test_authenticator():
@unittest.skip("Some parameters changed, so IP doesn't go there anymore.")
def test_authenticator(self):
transport = Connection('149.154.167.91', 443)
authenticator.do_authentication(transport)
self.assertTrue(authenticator.do_authentication(transport))
transport.close()

View File

@ -0,0 +1,8 @@
import unittest
class ParserTests(unittest.TestCase):
"""There are no tests yet"""
@unittest.skip("there should be parser tests")
def test_parser(self):
self.assertTrue(True)

View File

@ -0,0 +1,8 @@
import unittest
class TLTests(unittest.TestCase):
"""There are no tests yet"""
@unittest.skip("there should be TL tests")
def test_tl(self):
self.assertTrue(True)

View File

@ -5,8 +5,7 @@ from telethon.extensions import BinaryReader
class UtilsTests(unittest.TestCase):
@staticmethod
def test_binary_writer_reader():
def test_binary_writer_reader(self):
# Test that we can read properly
data = b'\x01\x05\x00\x00\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
b'\x88A\x00\x00\x00\x00\x00\x009@\x1a\x1b\x1c\x1d\x1e\x1f ' \
@ -15,29 +14,35 @@ class UtilsTests(unittest.TestCase):
with BinaryReader(data) as reader:
value = reader.read_byte()
assert value == 1, 'Example byte should be 1 but is {}'.format(value)
self.assertEqual(value, 1,
msg='Example byte should be 1 but is {}'.format(value))
value = reader.read_int()
assert value == 5, 'Example integer should be 5 but is {}'.format(value)
self.assertEqual(value, 5,
msg='Example integer should be 5 but is {}'.format(value))
value = reader.read_long()
assert value == 13, 'Example long integer should be 13 but is {}'.format(value)
self.assertEqual(value, 13,
msg='Example long integer should be 13 but is {}'.format(value))
value = reader.read_float()
assert value == 17.0, 'Example float should be 17.0 but is {}'.format(value)
self.assertEqual(value, 17.0,
msg='Example float should be 17.0 but is {}'.format(value))
value = reader.read_double()
assert value == 25.0, 'Example double should be 25.0 but is {}'.format(value)
self.assertEqual(value, 25.0,
msg='Example double should be 25.0 but is {}'.format(value))
value = reader.read(7)
assert value == bytes([26, 27, 28, 29, 30, 31, 32]), 'Example bytes should be {} but is {}' \
.format(bytes([26, 27, 28, 29, 30, 31, 32]), value)
self.assertEqual(value, bytes([26, 27, 28, 29, 30, 31, 32]),
msg='Example bytes should be {} but is {}'
.format(bytes([26, 27, 28, 29, 30, 31, 32]), value))
value = reader.read_large_int(128, signed=False)
assert value == 2**127, 'Example large integer should be {} but is {}'.format(2**127, value)
self.assertEqual(value, 2**127,
msg='Example large integer should be {} but is {}'.format(2**127, value))
@staticmethod
def test_binary_tgwriter_tgreader():
def test_binary_tgwriter_tgreader(self):
small_data = os.urandom(33)
small_data_padded = os.urandom(19) # +1 byte for length = 20 (%4 = 0)
@ -53,9 +58,9 @@ class UtilsTests(unittest.TestCase):
# And then try reading it without errors (it should be unharmed!)
for datum in data:
value = reader.tgread_bytes()
assert value == datum, 'Example bytes should be {} but is {}'.format(
datum, value)
self.assertEqual(value, datum,
msg='Example bytes should be {} but is {}'.format(datum, value))
value = reader.tgread_string()
assert value == string, 'Example string should be {} but is {}'.format(
string, value)
self.assertEqual(value, string,
msg='Example string should be {} but is {}'.format(string, value))

View File

@ -1,5 +0,0 @@
import unittest
class TLTests(unittest.TestCase):
"""There are no tests yet"""