mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-02-17 03:51:05 +03:00
Delete all (broken) tests
This commit is contained in:
parent
d392939018
commit
730cf31921
24
run_tests.py
24
run_tests.py
|
@ -1,24 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
import unittest
|
||||
|
||||
if __name__ == '__main__':
|
||||
from telethon_tests import \
|
||||
CryptoTests, ParserTests, TLTests, UtilsTests, NetworkTests
|
||||
|
||||
test_classes = [CryptoTests, ParserTests, TLTests, UtilsTests]
|
||||
|
||||
network = input('Run network tests (y/n)?: ').lower() == 'y'
|
||||
if network:
|
||||
test_classes.append(NetworkTests)
|
||||
|
||||
loader = unittest.TestLoader()
|
||||
|
||||
suites_list = []
|
||||
for test_class in test_classes:
|
||||
suite = loader.loadTestsFromTestCase(test_class)
|
||||
suites_list.append(suite)
|
||||
|
||||
big_suite = unittest.TestSuite(suites_list)
|
||||
|
||||
runner = unittest.TextTestRunner()
|
||||
results = runner.run(big_suite)
|
|
@ -1,5 +0,0 @@
|
|||
from .crypto_test import CryptoTests
|
||||
from .network_test import NetworkTests
|
||||
from .parser_test import ParserTests
|
||||
from .tl_test import TLTests
|
||||
from .utils_test import UtilsTests
|
|
@ -1,143 +0,0 @@
|
|||
import unittest
|
||||
from hashlib import sha1
|
||||
|
||||
import telethon.helpers as utils
|
||||
from telethon.crypto import AES, Factorization
|
||||
# from crypto.PublicKey import RSA as PyCryptoRSA
|
||||
|
||||
|
||||
class CryptoTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Test known values
|
||||
self.key = b'\xd1\xf4MXy\x0c\xf8/z,\xe9\xf9\xa4\x17\x04\xd9C\xc9\xaba\x81\xf3\xf8\xdd\xcb\x0c6\x92\x01\x1f\xc2y'
|
||||
self.iv = b':\x02\x91x\x90Dj\xa6\x03\x90C\x08\x9e@X\xb5E\xffwy\xf3\x1c\xde\xde\xfbo\x8dm\xd6e.Z'
|
||||
|
||||
self.plain_text = b'Non encrypted text :D'
|
||||
self.plain_text_padded = b'My len is more uniform, promise!'
|
||||
|
||||
self.cipher_text = b'\xb6\xa7\xec.\xb9\x9bG\xcb\xe9{\x91[\x12\xfc\x84D\x1c' \
|
||||
b'\x93\xd9\x17\x03\xcd\xd6\xb1D?\x98\xd2\xb5\xa5U\xfd'
|
||||
|
||||
self.cipher_text_padded = b"W\xd1\xed'\x01\xa6c\xc3\xcb\xef\xaa\xe5\x1d\x1a" \
|
||||
b"[\x1b\xdf\xcdI\x1f>Z\n\t\xb9\xd2=\xbaF\xd1\x8e'"
|
||||
|
||||
def test_sha1(self):
|
||||
string = 'Example string'
|
||||
|
||||
hash_sum = sha1(string.encode('utf-8')).digest()
|
||||
expected = b'\nT\x92|\x8d\x06:)\x99\x04\x8e\xf8j?\xc4\x8e\xd3}m9'
|
||||
|
||||
self.assertEqual(hash_sum, expected,
|
||||
msg='Invalid sha1 hash_sum representation (should be {}, but is {})'
|
||||
.format(expected, hash_sum))
|
||||
|
||||
@unittest.skip("test_aes_encrypt needs fix")
|
||||
def test_aes_encrypt(self):
|
||||
value = AES.encrypt_ige(self.plain_text, self.key, self.iv)
|
||||
take = 16 # Don't take all the bytes, since latest involve are random padding
|
||||
self.assertEqual(value[:take], self.cipher_text[:take],
|
||||
msg='Ciphered text ("{}") does not equal expected ("{}")'
|
||||
.format(value[:take], self.cipher_text[:take]))
|
||||
|
||||
value = AES.encrypt_ige(self.plain_text_padded, self.key, self.iv)
|
||||
self.assertEqual(value, self.cipher_text_padded,
|
||||
msg='Ciphered text ("{}") does not equal expected ("{}")'
|
||||
.format(value, self.cipher_text_padded))
|
||||
|
||||
def test_aes_decrypt(self):
|
||||
# The ciphered text must always be padded
|
||||
value = AES.decrypt_ige(self.cipher_text_padded, self.key, self.iv)
|
||||
self.assertEqual(value, self.plain_text_padded,
|
||||
msg='Decrypted text ("{}") does not equal expected ("{}")'
|
||||
.format(value, self.plain_text_padded))
|
||||
|
||||
@unittest.skip("test_calc_key needs fix")
|
||||
def test_calc_key(self):
|
||||
# TODO Upgrade test for MtProto 2.0
|
||||
shared_key = b'\xbc\xd2m\xb7\xcav\xf4][\x88\x83\' \xf3\x11\x8as\xd04\x941\xae' \
|
||||
b'*O\x03\x86\x9a/H#\x1a\x8c\xb5j\xe9$\xe0IvCm^\xe70\x1a5C\t\x16' \
|
||||
b'\x03\xd2\x9d\xa9\x89\xd6\xce\x08P\x0fdr\xa0\xb3\xeb\xfecv\x1a' \
|
||||
b'\xdfJ\x14\x96\x98\x16\xa3G\xab\x04\x14!\\\xeb\n\xbcn\xdf\xc4%' \
|
||||
b'\xc6\t\xb7\x16\x14\x9c\'\x81\x15=\xb0\xaf\x0e\x0bR\xaa\x0466s' \
|
||||
b'\xf0\xcf\xb7\xb8>,D\x94x\xd7\xf8\xe0\x84\xcb%\xd3\x05\xb2\xe8' \
|
||||
b'\x95Mr?\xa2\xe8In\xf9\x0b[E\x9b\xaa\x0cX\x7f\x0ei\xde\xeed\x1d' \
|
||||
b'x/J\xce\xea^}0;\xa83B\xbbR\xa1\xbfe\x04\xb9\x1e\xa1"f=\xa5M@' \
|
||||
b'\x9e\xdd\x81\x80\xc9\xa5\xfb\xfcg\xdd\x15\x03p!\x0ffD\x16\x892' \
|
||||
b'\xea\xca\xb1A\x99O\xa94P\xa9\xa2\xc6;\xb2C9\x1dC5\xd2\r\xecL' \
|
||||
b'\xd9\xabw-\x03\ry\xc2v\x17]\x02\x15\x0cBa\x97\xce\xa5\xb1\xe4]' \
|
||||
b'\x8e\xe0,\xcfC{o\xfa\x99f\xa4pM\x00'
|
||||
|
||||
# Calculate key being the client
|
||||
msg_key = b'\xba\x1a\xcf\xda\xa8^Cbl\xfa\xb6\x0c:\x9b\xb0\xfc'
|
||||
|
||||
key, iv = utils.calc_key(shared_key, msg_key, client=True)
|
||||
expected_key = b"\xaf\xe3\x84Qm\xe0!\x0c\xd91\xe4\x9a\xa0v_gc" \
|
||||
b"x\xa1\xb0\xc9\xbc\x16'v\xcf,\x9dM\xae\xc6\xa5"
|
||||
|
||||
expected_iv = b'\xb8Q\xf3\xc5\xa3]\xc6\xdf\x9e\xe0Q\xbd"\x8d' \
|
||||
b'\x13\t\x0e\x9a\x9d^8\xa2\xf8\xe7\x00w\xd9\xc1' \
|
||||
b'\xa7\xa0\xf7\x0f'
|
||||
|
||||
self.assertEqual(key, expected_key,
|
||||
msg='Invalid key (expected ("{}"), got ("{}"))'
|
||||
.format(expected_key, key))
|
||||
self.assertEqual(iv, expected_iv,
|
||||
msg='Invalid IV (expected ("{}"), got ("{}"))'
|
||||
.format(expected_iv, iv))
|
||||
|
||||
# Calculate key being the server
|
||||
msg_key = b'\x86m\x92i\xcf\x8b\x93\xaa\x86K\x1fi\xd04\x83]'
|
||||
|
||||
key, iv = utils.calc_key(shared_key, msg_key, client=False)
|
||||
expected_key = b'\xdd0X\xb6\x93\x8e\xc9y\xef\x83\xf8\x8cj' \
|
||||
b'\xa7h\x03\xe2\xc6\xb16\xc5\xbb\xfc\xe7' \
|
||||
b'\xdf\xd6\xb1g\xf7u\xcfk'
|
||||
|
||||
expected_iv = b'\xdcL\xc2\x18\x01J"X\x86lb\xb6\xb547\xfd' \
|
||||
b'\xe2a4\xb6\xaf}FS\xd7[\xe0N\r\x19\xfb\xbc'
|
||||
|
||||
self.assertEqual(key, expected_key,
|
||||
msg='Invalid key (expected ("{}"), got ("{}"))'
|
||||
.format(expected_key, key))
|
||||
self.assertEqual(iv, expected_iv,
|
||||
msg='Invalid IV (expected ("{}"), got ("{}"))'
|
||||
.format(expected_iv, iv))
|
||||
|
||||
def test_generate_key_data_from_nonce(self):
|
||||
server_nonce = int.from_bytes(b'The 16-bit nonce', byteorder='little')
|
||||
new_nonce = int.from_bytes(b'The new, calculated 32-bit nonce', byteorder='little')
|
||||
|
||||
key, iv = utils.generate_key_data_from_nonce(server_nonce, new_nonce)
|
||||
expected_key = b'/\xaa\x7f\xa1\xfcs\xef\xa0\x99zh\x03M\xa4\x8e\xb4\xab\x0eE]b\x95|\xfe\xc0\xf8\x1f\xd4\xa0\xd4\xec\x91'
|
||||
expected_iv = b'\xf7\xae\xe3\xc8+=\xc2\xb8\xd1\xe1\x1b\x0e\x10\x07\x9fn\x9e\xdc\x960\x05\xf9\xea\xee\x8b\xa1h The '
|
||||
|
||||
self.assertEqual(key, expected_key,
|
||||
msg='Key ("{}") does not equal expected ("{}")'
|
||||
.format(key, expected_key))
|
||||
self.assertEqual(iv, expected_iv,
|
||||
msg='IV ("{}") does not equal expected ("{}")'
|
||||
.format(iv, expected_iv))
|
||||
|
||||
# test_fringerprint_from_key can't be skipped due to ImportError
|
||||
# def test_fingerprint_from_key(self):
|
||||
# assert rsa._compute_fingerprint(PyCryptoRSA.importKey(
|
||||
# '-----BEGIN RSA PUBLIC KEY-----\n'
|
||||
# 'MIIBCgKCAQEAwVACPi9w23mF3tBkdZz+zwrzKOaaQdr01vAbU4E1pvkfj4sqDsm6\n'
|
||||
# 'lyDONS789sVoD/xCS9Y0hkkC3gtL1tSfTlgCMOOul9lcixlEKzwKENj1Yz/s7daS\n'
|
||||
# 'an9tqw3bfUV/nqgbhGX81v/+7RFAEd+RwFnK7a+XYl9sluzHRyVVaTTveB2GazTw\n'
|
||||
# 'Efzk2DWgkBluml8OREmvfraX3bkHZJTKX4EQSjBbbdJ2ZXIsRrYOXfaA+xayEGB+\n'
|
||||
# '8hdlLmAjbCVfaigxX0CDqWeR1yFL9kwd9P0NsZRPsmoqVwMbMu7mStFai6aIhc3n\n'
|
||||
# 'Slv8kg9qv1m6XHVQY3PnEw+QQtqSIXklHwIDAQAB\n'
|
||||
# '-----END RSA PUBLIC KEY-----'
|
||||
# )) == b'!k\xe8l\x02+\xb4\xc3', 'Wrong fingerprint calculated'
|
||||
|
||||
def test_factorize(self):
|
||||
pq = 3118979781119966969
|
||||
p, q = Factorization.factorize(pq)
|
||||
if p > q:
|
||||
p, q = q, p
|
||||
|
||||
self.assertEqual(p, 1719614201,
|
||||
msg='Factorized pair did not yield the correct result')
|
||||
self.assertEqual(q, 1813767169,
|
||||
msg='Factorized pair did not yield the correct result')
|
|
@ -1,49 +0,0 @@
|
|||
import unittest
|
||||
import os
|
||||
from io import BytesIO
|
||||
from random import randint
|
||||
from hashlib import sha256
|
||||
from telethon import TelegramClient
|
||||
|
||||
# Fill in your api_id and api_hash when running the tests
|
||||
# and REMOVE THEM once you've finished testing them.
|
||||
api_id = None
|
||||
api_hash = None
|
||||
|
||||
|
||||
class HigherLevelTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
if not api_id or not api_hash:
|
||||
raise ValueError('Please fill in both your api_id and api_hash.')
|
||||
|
||||
@unittest.skip("you can't seriously trash random mobile numbers like that :)")
|
||||
def test_cdn_download(self):
|
||||
client = TelegramClient(None, api_id, api_hash)
|
||||
client.session.set_dc(0, '149.154.167.40', 80)
|
||||
self.assertTrue(client.connect())
|
||||
|
||||
try:
|
||||
phone = '+999662' + str(randint(0, 9999)).zfill(4)
|
||||
client.send_code_request(phone)
|
||||
client.sign_up('22222', 'Test', 'DC')
|
||||
|
||||
me = client.get_me()
|
||||
data = os.urandom(2 ** 17)
|
||||
client.send_file(
|
||||
me, data,
|
||||
progress_callback=lambda c, t:
|
||||
print('test_cdn_download:uploading {:.2%}...'.format(c/t))
|
||||
)
|
||||
msg = client.get_messages(me)[1][0]
|
||||
|
||||
out = BytesIO()
|
||||
client.download_media(msg, out)
|
||||
self.assertEqual(sha256(data).digest(), sha256(out.getvalue()).digest())
|
||||
|
||||
out = BytesIO()
|
||||
client.download_media(msg, out) # Won't redirect
|
||||
self.assertEqual(sha256(data).digest(), sha256(out.getvalue()).digest())
|
||||
|
||||
client.log_out()
|
||||
finally:
|
||||
client.disconnect()
|
|
@ -1,44 +0,0 @@
|
|||
import random
|
||||
import socket
|
||||
import threading
|
||||
import unittest
|
||||
|
||||
import telethon.network.authenticator as authenticator
|
||||
from telethon.extensions import TcpClient
|
||||
from telethon.network import Connection
|
||||
|
||||
|
||||
def run_server_echo_thread(port):
|
||||
def server_thread():
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
s.bind(('', port))
|
||||
s.listen(1)
|
||||
connection, address = s.accept()
|
||||
with connection:
|
||||
data = connection.recv(16)
|
||||
connection.send(data)
|
||||
|
||||
server = threading.Thread(target=server_thread)
|
||||
server.start()
|
||||
|
||||
|
||||
class NetworkTests(unittest.TestCase):
|
||||
|
||||
@unittest.skip("test_tcp_client needs fix")
|
||||
def test_tcp_client(self):
|
||||
port = random.randint(50000, 60000) # Arbitrary non-privileged port
|
||||
run_server_echo_thread(port)
|
||||
|
||||
msg = b'Unit testing...'
|
||||
client = TcpClient()
|
||||
client.connect('localhost', port)
|
||||
client.write(msg)
|
||||
self.assertEqual(msg, client.read(15),
|
||||
msg='Read message does not equal sent message')
|
||||
client.close()
|
||||
|
||||
@unittest.skip("Some parameters changed, so IP doesn't go there anymore.")
|
||||
def test_authenticator(self):
|
||||
transport = Connection('149.154.167.91', 443)
|
||||
self.assertTrue(authenticator.do_authentication(transport))
|
||||
transport.close()
|
|
@ -1,8 +0,0 @@
|
|||
import unittest
|
||||
|
||||
|
||||
class ParserTests(unittest.TestCase):
|
||||
"""There are no tests yet"""
|
||||
@unittest.skip("there should be parser tests")
|
||||
def test_parser(self):
|
||||
self.assertTrue(True)
|
|
@ -1,8 +0,0 @@
|
|||
import unittest
|
||||
|
||||
|
||||
class TLTests(unittest.TestCase):
|
||||
"""There are no tests yet"""
|
||||
@unittest.skip("there should be TL tests")
|
||||
def test_tl(self):
|
||||
self.assertTrue(True)
|
|
@ -1,66 +0,0 @@
|
|||
import os
|
||||
import unittest
|
||||
from telethon.tl import TLObject
|
||||
from telethon.extensions import BinaryReader
|
||||
|
||||
|
||||
class UtilsTests(unittest.TestCase):
|
||||
def test_binary_writer_reader(self):
|
||||
# Test that we can read properly
|
||||
data = b'\x01\x05\x00\x00\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x88A\x00\x00\x00\x00\x00\x009@\x1a\x1b\x1c\x1d\x1e\x1f ' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x80'
|
||||
|
||||
with BinaryReader(data) as reader:
|
||||
value = reader.read_byte()
|
||||
self.assertEqual(value, 1,
|
||||
msg='Example byte should be 1 but is {}'.format(value))
|
||||
|
||||
value = reader.read_int()
|
||||
self.assertEqual(value, 5,
|
||||
msg='Example integer should be 5 but is {}'.format(value))
|
||||
|
||||
value = reader.read_long()
|
||||
self.assertEqual(value, 13,
|
||||
msg='Example long integer should be 13 but is {}'.format(value))
|
||||
|
||||
value = reader.read_float()
|
||||
self.assertEqual(value, 17.0,
|
||||
msg='Example float should be 17.0 but is {}'.format(value))
|
||||
|
||||
value = reader.read_double()
|
||||
self.assertEqual(value, 25.0,
|
||||
msg='Example double should be 25.0 but is {}'.format(value))
|
||||
|
||||
value = reader.read(7)
|
||||
self.assertEqual(value, bytes([26, 27, 28, 29, 30, 31, 32]),
|
||||
msg='Example bytes should be {} but is {}'
|
||||
.format(bytes([26, 27, 28, 29, 30, 31, 32]), value))
|
||||
|
||||
value = reader.read_large_int(128, signed=False)
|
||||
self.assertEqual(value, 2**127,
|
||||
msg='Example large integer should be {} but is {}'.format(2**127, value))
|
||||
|
||||
def test_binary_tgwriter_tgreader(self):
|
||||
small_data = os.urandom(33)
|
||||
small_data_padded = os.urandom(19) # +1 byte for length = 20 (%4 = 0)
|
||||
|
||||
large_data = os.urandom(999)
|
||||
large_data_padded = os.urandom(1024)
|
||||
|
||||
data = (small_data, small_data_padded, large_data, large_data_padded)
|
||||
string = 'Testing Telegram strings, this should work properly!'
|
||||
serialized = b''.join(TLObject.serialize_bytes(d) for d in data) + \
|
||||
TLObject.serialize_bytes(string)
|
||||
|
||||
with BinaryReader(serialized) as reader:
|
||||
# And then try reading it without errors (it should be unharmed!)
|
||||
for datum in data:
|
||||
value = reader.tgread_bytes()
|
||||
self.assertEqual(value, datum,
|
||||
msg='Example bytes should be {} but is {}'.format(datum, value))
|
||||
|
||||
value = reader.tgread_string()
|
||||
self.assertEqual(value, string,
|
||||
msg='Example string should be {} but is {}'.format(string, value))
|
Loading…
Reference in New Issue
Block a user