From 9636ef35c1db0475ad09357a82e535ce8ddd288b Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 8 Jul 2023 13:04:20 +0200 Subject: [PATCH] Port crypto from grammers --- client/src/telethon/_impl/crypto/__init__.py | 110 ++++++++++++++ client/src/telethon/_impl/crypto/aes.py | 53 +++++++ client/src/telethon/_impl/crypto/auth_key.py | 23 +++ client/src/telethon/_impl/crypto/factorize.py | 49 ++++++ client/src/telethon/_impl/crypto/rsa.py | 48 ++++++ .../telethon/_impl/crypto/two_factor_auth.py | 140 ++++++++++++++++++ client/tests/auth_key_test.py | 50 +++++++ client/tests/crypto_test.py | 92 ++++++++++++ client/tests/factorize_test.py | 16 ++ client/tests/rsa_test.py | 15 ++ client/tests/two_factor_auth_test.py | 82 ++++++++++ stubs/pyaes.pyi | 6 + 12 files changed, 684 insertions(+) create mode 100644 client/src/telethon/_impl/crypto/__init__.py create mode 100644 client/src/telethon/_impl/crypto/aes.py create mode 100644 client/src/telethon/_impl/crypto/auth_key.py create mode 100644 client/src/telethon/_impl/crypto/factorize.py create mode 100644 client/src/telethon/_impl/crypto/rsa.py create mode 100644 client/src/telethon/_impl/crypto/two_factor_auth.py create mode 100644 client/tests/auth_key_test.py create mode 100644 client/tests/crypto_test.py create mode 100644 client/tests/factorize_test.py create mode 100644 client/tests/rsa_test.py create mode 100644 client/tests/two_factor_auth_test.py create mode 100644 stubs/pyaes.pyi diff --git a/client/src/telethon/_impl/crypto/__init__.py b/client/src/telethon/_impl/crypto/__init__.py new file mode 100644 index 00000000..dcd0c1f3 --- /dev/null +++ b/client/src/telethon/_impl/crypto/__init__.py @@ -0,0 +1,110 @@ +import os +from collections import namedtuple +from enum import IntEnum +from hashlib import sha1, sha256 + +from .aes import ige_decrypt, ige_encrypt +from .auth_key import AuthKey + + +# "where x = 0 for messages from client to server and x = 8 for those from server to client" +class Side(IntEnum): + CLIENT = 0 + SERVER = 8 + + +CalcKey = namedtuple("CalcKey", ("key", "iv")) + + +# https://core.telegram.org/mtproto/description#defining-aes-key-and-initialization-vector +def calc_key(auth_key: AuthKey, msg_key: bytes, side: Side) -> CalcKey: + x = int(side) + + # sha256_a = SHA256 (msg_key + substr (auth_key, x, 36)) + sha256_a = sha256(msg_key + auth_key.data[x : x + 36]).digest() + + # sha256_b = SHA256 (substr (auth_key, 40+x, 36) + msg_key) + sha256_b = sha256(auth_key.data[x + 40 : x + 76] + msg_key).digest() + + # aes_key = substr (sha256_a, 0, 8) + substr (sha256_b, 8, 16) + substr (sha256_a, 24, 8) + aes_key = sha256_a[:8] + sha256_b[8:24] + sha256_a[24:32] + + # aes_iv = substr (sha256_b, 0, 8) + substr (sha256_a, 8, 16) + substr (sha256_b, 24, 8) + aes_iv = sha256_b[:8] + sha256_a[8:24] + sha256_b[24:32] + + return CalcKey(aes_key, aes_iv) + + +def determine_padding_v2_length(length: int) -> int: + return 16 + (16 - (length % 16)) + + +def _do_encrypt_data_v2( + plaintext: bytes, auth_key: AuthKey, random_padding: bytes +) -> bytes: + padded_plaintext = ( + plaintext + random_padding[: determine_padding_v2_length(len(plaintext))] + ) + + side = Side.CLIENT + x = int(side) + + # msg_key_large = SHA256 (substr (auth_key, 88+x, 32) + plaintext + random_padding) + msg_key_large = sha256(auth_key.data[x + 88 : x + 120] + padded_plaintext).digest() + + # msg_key = substr (msg_key_large, 8, 16) + msg_key = msg_key_large[8:24] + + key, iv = calc_key(auth_key, msg_key, side) + ciphertext = ige_encrypt(padded_plaintext, key, iv) + + return auth_key.key_id + msg_key + ciphertext + + +def encrypt_data_v2(plaintext: bytes, auth_key: AuthKey) -> bytes: + random_padding = os.urandom(32) + return _do_encrypt_data_v2(plaintext, auth_key, random_padding) + + +def decrypt_data_v2(ciphertext: bytes, auth_key: AuthKey) -> bytes: + side = Side.SERVER + x = int(side) + + if len(ciphertext) < 24 or (len(ciphertext) - 24) % 16 != 0: + raise ValueError("invalid ciphertext buffer length") + + # TODO Check salt, session_id and sequence_number + key_id = ciphertext[:8] + if auth_key.key_id != key_id: + raise ValueError("server authkey mismatches with ours") + + msg_key = ciphertext[8:24] + key, iv = calc_key(auth_key, msg_key, side) + plaintext = ige_decrypt(ciphertext[24:], key, iv) + + # https://core.telegram.org/mtproto/security_guidelines#mtproto-encrypted-messages + our_key = sha256(auth_key.data[x + 88 : x + 120] + plaintext).digest() + if msg_key != our_key[8:24]: + raise ValueError("server msgkey mismatches with ours") + + return plaintext + + +def generate_key_data_from_nonce(server_nonce: bytes, new_nonce: bytes) -> CalcKey: + hash1 = sha1(new_nonce + server_nonce).digest() + hash2 = sha1(server_nonce + new_nonce).digest() + hash3 = sha1(new_nonce + new_nonce).digest() + + key = hash1 + hash2[:12] + iv = hash2[12:20] + hash3 + new_nonce[:4] + return CalcKey(key, iv) + + +def encrypt_ige(plaintext: bytes, key: bytes, iv: bytes) -> bytes: + if len(plaintext) % 16 != 0: + plaintext += os.urandom((16 - (len(plaintext) % 16)) % 16) + return ige_encrypt(plaintext, key, iv) + + +def decrypt_ige(padded_ciphertext: bytes, key: bytes, iv: bytes) -> bytes: + return ige_decrypt(padded_ciphertext, key, iv) diff --git a/client/src/telethon/_impl/crypto/aes.py b/client/src/telethon/_impl/crypto/aes.py new file mode 100644 index 00000000..cfca359d --- /dev/null +++ b/client/src/telethon/_impl/crypto/aes.py @@ -0,0 +1,53 @@ +import pyaes + + +def ige_encrypt(plaintext: bytes, key: bytes, iv: bytes) -> bytes: + assert len(plaintext) % 16 == 0 + assert len(iv) == 32 + + aes = pyaes.AES(key) + iv1 = iv[:16] + iv2 = iv[16:] + + ciphertext = bytearray() + + for block_offset in range(0, len(plaintext), 16): + plaintext_block = plaintext[block_offset : block_offset + 16] + ciphertext_block = bytes( + a ^ b + for a, b in zip( + aes.encrypt([a ^ b for a, b in zip(plaintext_block, iv1)]), iv2 + ) + ) + iv1 = ciphertext_block + iv2 = plaintext_block + + ciphertext += ciphertext_block + + return bytes(ciphertext) + + +def ige_decrypt(ciphertext: bytes, key: bytes, iv: bytes) -> bytes: + assert len(ciphertext) % 16 == 0 + assert len(iv) == 32 + + aes = pyaes.AES(key) + iv1 = iv[:16] + iv2 = iv[16:] + + plaintext = bytearray() + + for block_offset in range(0, len(ciphertext), 16): + ciphertext_block = ciphertext[block_offset : block_offset + 16] + plaintext_block = bytes( + a ^ b + for a, b in zip( + aes.decrypt([a ^ b for a, b in zip(ciphertext_block, iv2)]), iv1 + ) + ) + iv1 = ciphertext_block + iv2 = plaintext_block + + plaintext += plaintext_block + + return bytes(plaintext) diff --git a/client/src/telethon/_impl/crypto/auth_key.py b/client/src/telethon/_impl/crypto/auth_key.py new file mode 100644 index 00000000..ab345770 --- /dev/null +++ b/client/src/telethon/_impl/crypto/auth_key.py @@ -0,0 +1,23 @@ +from dataclasses import dataclass +from hashlib import sha1 +from typing import Self + + +@dataclass +class AuthKey: + data: bytes + aux_hash: bytes + key_id: bytes + + @classmethod + def from_bytes(cls, data: bytes) -> Self: + sha = sha1(data).digest() + aux_hash = sha[:8] + key_id = sha[12:] + return cls(data=data, aux_hash=aux_hash, key_id=key_id) + + def __bytes__(self) -> bytes: + return self.data + + def calc_new_nonce_hash(self, new_nonce: bytes, number: int) -> bytes: + return sha1(new_nonce + bytes((number,)) + self.aux_hash).digest()[4:] diff --git a/client/src/telethon/_impl/crypto/factorize.py b/client/src/telethon/_impl/crypto/factorize.py new file mode 100644 index 00000000..49d2c509 --- /dev/null +++ b/client/src/telethon/_impl/crypto/factorize.py @@ -0,0 +1,49 @@ +from math import gcd +from random import randrange +from typing import Tuple + + +def factorize(pq: int) -> Tuple[int, int]: + """ + Factorize the given number into its two prime factors. + + The algorithm here is a faster variant of [Pollard's rho algorithm], + published by [Richard Brent], based on + . + + [Pollard's rho algorithm]: + [Richard Brent]: + """ + if pq % 2 == 0: + return 2, pq // 2 + + y, c, m = randrange(1, pq), randrange(1, pq), randrange(1, pq) + g = r = q = 1 + x = ys = 0 + + while g == 1: + x = y + for _ in range(r): + y = (pow(y, 2, pq) + c) % pq + + k = 0 + while k < r and g == 1: + ys = y + for _ in range(min(m, r - k)): + y = (pow(y, 2, pq) + c) % pq + q = q * (abs(x - y)) % pq + + g = gcd(q, pq) + k += m + + r *= 2 + + if g == pq: + while True: + ys = (pow(ys, 2, pq) + c) % pq + g = gcd(abs(x - ys), pq) + if g > 1: + break + + p, q = g, pq // g + return (p, q) if p < q else (q, p) diff --git a/client/src/telethon/_impl/crypto/rsa.py b/client/src/telethon/_impl/crypto/rsa.py new file mode 100644 index 00000000..6ca86c7c --- /dev/null +++ b/client/src/telethon/_impl/crypto/rsa.py @@ -0,0 +1,48 @@ +import struct +from hashlib import sha1 + +from rsa import PublicKey, encrypt + +from ..tl.core import serialize_bytes_to + + +def compute_fingerprint(key: PublicKey) -> int: + buffer = bytearray() + serialize_bytes_to(buffer, int.to_bytes(key.n, (key.n.bit_length() + 7) // 8)) + serialize_bytes_to(buffer, int.to_bytes(key.e, (key.e.bit_length() + 7) // 8)) + fingerprint = struct.unpack(" bytes: + return encrypt(sha1(data).digest() + data, key) + + +# From my.telegram.org. +PRODUCTION_RSA_KEY = PublicKey.load_pkcs1( + b"""-----BEGIN RSA PUBLIC KEY----- +MIIBCgKCAQEA6LszBcC1LGzyr992NzE0ieY+BSaOW622Aa9Bd4ZHLl+TuFQ4lo4g +5nKaMBwK/BIb9xUfg0Q29/2mgIR6Zr9krM7HjuIcCzFvDtr+L0GQjae9H0pRB2OO +62cECs5HKhT5DZ98K33vmWiLowc621dQuwKWSQKjWf50XYFw42h21P2KXUGyp2y/ ++aEyZ+uVgLLQbRA1dEjSDZ2iGRy12Mk5gpYc397aYp438fsJoHIgJ2lgMv5h7WY9 +t6N/byY9Nw9p21Og3AoXSL2q/2IJ1WRUhebgAdGVMlV1fkuOQoEzR7EdpqtQD9Cs +5+bfo3Nhmcyvk5ftB0WkJ9z6bNZ7yxrP8wIDAQAB +-----END RSA PUBLIC KEY-----""" +) + +TESTMODE_RSA_KEY = PublicKey.load_pkcs1( + b"""-----BEGIN RSA PUBLIC KEY----- +MIIBCgKCAQEAyMEdY1aR+sCR3ZSJrtztKTKqigvO/vBfqACJLZtS7QMgCGXJ6XIR +yy7mx66W0/sOFa7/1mAZtEoIokDP3ShoqF4fVNb6XeqgQfaUHd8wJpDWHcR2OFwv +plUUI1PLTktZ9uW2WE23b+ixNwJjJGwBDJPQEQFBE+vfmH0JP503wr5INS1poWg/ +j25sIWeYPHYeOrFp/eXaqhISP6G+q2IeTaWTXpwZj4LzXq5YOpk4bYEQ6mvRq7D1 +aHWfYmlEGepfaYR8Q0YqvvhYtMte3ITnuSJs171+GDqpdKcSwHnd6FudwGO4pcCO +j4WcDuXc2CTHgH8gFTNhp/Y8/SpDOhvn9QIDAQAB +-----END RSA PUBLIC KEY-----""" +) + + +RSA_KEYS = { + compute_fingerprint(key): key for key in (PRODUCTION_RSA_KEY, TESTMODE_RSA_KEY) +} diff --git a/client/src/telethon/_impl/crypto/two_factor_auth.py b/client/src/telethon/_impl/crypto/two_factor_auth.py new file mode 100644 index 00000000..9bce8837 --- /dev/null +++ b/client/src/telethon/_impl/crypto/two_factor_auth.py @@ -0,0 +1,140 @@ +# Ported from https://github.com/Lonami/grammers/blob/d91dc82/lib/grammers-crypto/src/two_factor_auth.rs +from collections import namedtuple +from hashlib import pbkdf2_hmac, sha256 + +from .factorize import factorize + +TwoFactorAuth = namedtuple("TwoFactorAuth", ("m1", "g_a")) + + +def pad_to_256(data: bytes) -> bytes: + return bytes(256 - len(data)) + data + + +# H(data) := sha256(data) +def h(*data: bytes) -> bytes: + return sha256(b"".join(data)).digest() + + +# SH(data, salt) := H(salt | data | salt) +def sh(data: bytes, salt: bytes) -> bytes: + return h(salt, data, salt) + + +# PH1(password, salt1, salt2) := SH(SH(password, salt1), salt2) +def ph1(password: bytes, salt1: bytes, salt2: bytes) -> bytes: + return sh(sh(password, salt1), salt2) + + +# PH2(password, salt1, salt2) := SH(pbkdf2(sha512, PH1(password, salt1, salt2), salt1, 100000), salt2) +def ph2(password: bytes, salt1: bytes, salt2: bytes) -> bytes: + return sh(pbkdf2_hmac("sha512", ph1(password, salt1, salt2), salt1, 100000), salt2) + + +# https://core.telegram.org/api/srp +def calculate_2fa( + *, + salt1: bytes, + salt2: bytes, + g: int, + p: bytes, + g_b: bytes, + a: bytes, + password: bytes, +) -> TwoFactorAuth: + big_p = int.from_bytes(p) + + g_b = pad_to_256(g_b) + a = pad_to_256(a) + + g_for_hash = g.to_bytes(256) + + big_g_b = int.from_bytes(g_b) + + big_g = g + big_a = int.from_bytes(a) + + # k := H(p | g) + k = h(p, g_for_hash) + big_k = int.from_bytes(k) + + # g_a := pow(g, a) mod p + g_a = pow(big_g, big_a, big_p).to_bytes(256) + + # u := H(g_a | g_b) + u = int.from_bytes(h(g_a, g_b)) + + # x := PH2(password, salt1, salt2) + x = int.from_bytes(ph2(password, salt1, salt2)) + + # v := pow(g, x) mod p + big_v = pow(big_g, x, big_p) + + # k_v := (k * v) mod p + k_v = (big_k * big_v) % big_p + + # t := (g_b - k_v) mod p (positive modulo, if the result is negative increment by p) + if big_g_b > k_v: + sub = big_g_b - k_v + else: + sub = k_v - big_g_b + + big_t = sub % big_p + + # s_a := pow(t, a + u * x) mod p + first = u * x + second = big_a + first + big_s_a = pow(big_t, second, big_p) + + # k_a := H(s_a) + k_a = h(big_s_a.to_bytes(256)) + + # M1 := H(H(p) xor H(g) | H(salt1) | H(salt2) | g_a | g_b | k_a) + h_p = h(p) + h_g = h(g_for_hash) + + p_xor_g = bytes(hpi ^ hgi for hpi, hgi in zip(h_p, h_g)) + + m1 = h(p_xor_g, h(salt1), h(salt2), g_a, g_b, k_a) + + return TwoFactorAuth(m1, g_a) + + +def check_p_len(p: bytes) -> bool: + return len(p) == 256 + + +def check_known_prime(p: bytes, g: int) -> bool: + good_prime = b"\xc7\x1c\xae\xb9\xc6\xb1\xc9\x04\x8elR/p\xf1?s\x98\r@#\x8e>!\xc1I4\xd07V=\x93\x0fH\x19\x8a\n\xa7\xc1@X\"\x94\x93\xd2%0\xf4\xdb\xfa3on\n\xc9%\x13\x95C\xae\xd4L\xce|7 \xfdQ\xf6\x94XpZ\xc6\x8c\xd4\xfekk\x13\xab\xdc\x97FQ)i2\x84T\xf1\x8f\xaf\x8cY_d$w\xfe\x96\xbb*\x94\x1d[\xcd\x1dJ\xc8\xccI\x88\x07\x08\xfa\x9b7\x8e\xbe\xa0\xf8\x7f\xa9\xff^\xedp\x05\r\xed(I\xf4{\xf9Y\xd9V\x85\x0c\xe9)\x85\x1f\r\x81\x15\xf65\xb1\x05\xee.N\x15\xd0K$T\xbfoO\xad\xf04\xb1\x04\x03\x11\x9c\xd8\xe3\xb9/\xcc[" + return p == good_prime and g in (3, 4, 5, 7) + + +def check_p_prime_and_subgroup(p: bytes, g: int) -> bool: + if check_known_prime(p, g): + return True + + big_p = int.from_bytes(p) + + if g == 2: + candidate = big_p % 8 == 7 + elif g == 3: + candidate = big_p % 3 == 2 + elif g == 4: + candidate = True + elif g == 5: + candidate = (big_p % 5) in (1, 4) + elif g == 6: + candidate = (big_p % 24) in (19, 23) + elif g == 7: + candidate = (big_p % 7) in (3, 5, 6) + else: + raise ValueError(f"bad g: {g}") + + return candidate and factorize((big_p - 1) // 2)[0] == 1 + + +def check_p_and_g(p: bytes, g: int) -> bool: + if not check_p_len(p): + return False + + return check_p_prime_and_subgroup(p, g) diff --git a/client/tests/auth_key_test.py b/client/tests/auth_key_test.py new file mode 100644 index 00000000..70748d8a --- /dev/null +++ b/client/tests/auth_key_test.py @@ -0,0 +1,50 @@ +from telethon._impl.crypto.auth_key import AuthKey + + +def get_auth_key() -> AuthKey: + return AuthKey.from_bytes(bytes(range(256))) + + +def get_new_nonce() -> bytes: + return bytes(range(32)) + + +def test_auth_key_aux_hash() -> None: + auth_key = get_auth_key() + expected = b"I\x16\xd6\xbd\xb7\xf7\x8eh" + + assert auth_key.aux_hash == expected + + +def test_auth_key_id() -> None: + auth_key = get_auth_key() + expected = b"2\xd1Xn\xa4W\xdf\xc8" + + assert auth_key.key_id == expected + + +def test_calc_new_nonce_hash1() -> None: + auth_key = get_auth_key() + new_nonce = get_new_nonce() + assert ( + auth_key.calc_new_nonce_hash(new_nonce, 1) + == b"\xc2\xce\xd2\xb3>Y:U\xd2\x7fJ]\xab\xee|g" + ) + + +def test_calc_new_nonce_hash2() -> None: + auth_key = get_auth_key() + new_nonce = get_new_nonce() + assert ( + auth_key.calc_new_nonce_hash(new_nonce, 2) + == b"\xf41\x8e\x85\xbd/\xf3\xbe\x84\xd9\xfe\xfc\xe3\xdc\xe3\x9f" + ) + + +def test_calc_new_nonce_hash3() -> None: + auth_key = get_auth_key() + new_nonce = get_new_nonce() + assert ( + auth_key.calc_new_nonce_hash(new_nonce, 3) + == b"K\xf9\xd7\xb3}\xb4\x13\xeeC\x1d(Qv1\xcb=" + ) diff --git a/client/tests/crypto_test.py b/client/tests/crypto_test.py new file mode 100644 index 00000000..933f3231 --- /dev/null +++ b/client/tests/crypto_test.py @@ -0,0 +1,92 @@ +from telethon._impl.crypto import ( + Side, + _do_encrypt_data_v2, + calc_key, + decrypt_data_v2, + decrypt_ige, + encrypt_ige, + generate_key_data_from_nonce, +) +from telethon._impl.crypto.auth_key import AuthKey + + +def get_test_auth_key() -> AuthKey: + return AuthKey.from_bytes(bytes(range(256))) + + +def get_test_msg_key() -> bytes: + return bytes(range(16)) + + +def get_test_aes_key_or_iv() -> bytes: + return bytes(range(32)) + + +def test_calc_client_key() -> None: + auth_key = get_test_auth_key() + msg_key = get_test_msg_key() + expected = ( + b"pN\xd0\x9c\x8bAf\x8a\xe8\xf9\x9d$G8\xf7\x1d\xbd\xdcDF\x9bk\xbdJ\xa8W=\xd0B\xbd\x05\x9e", + b'M&`\x00\xa5P\xed\xab\xbfL|\xe4\x0f\xd0\x04<\xc9"0\x18L\xd3\x17\xa5\xcc\x9c$\x82\xfd;\x93\x18', + ) + assert calc_key(auth_key, msg_key, Side.CLIENT) == expected + + +def test_calc_server_key() -> None: + auth_key = get_test_auth_key() + msg_key = get_test_msg_key() + expected = ( + b"!w%y\x9b$X\x06E\x81t\xa1\xfc\xfb\xc8\x83\x90h\x07\xb1P3\xfd\xd0\xea+Mi\xcf\x9c6N", + b"f\x9ae8\x91zO\xa5l\xa3#`\xa41\xc9\x16\x0b\xe4\xad\x88q@\x98\r\xab\x91\xce{\xdcG\xff\xbc", + ) + assert calc_key(auth_key, msg_key, Side.SERVER) == expected + + +def test_encrypt_client_data_v2() -> None: + plaintext = b"Hello, world! This data should remain secure!" + auth_key = get_test_auth_key() + random_padding = bytes(32) + expected = b"2\xd1Xn\xa4W\xdf\xc8\xa8\x17)\xd4m\xb5@\x19\xa2\xbf\xd7\xf7D\xf9\xb9lOql\xfd\xc4G}\xb2\xa2\xc1_m\xdb\x85#_\xb9U/\x1d\x84\x07\xc6\xaa\xea\x00\xcc\x84LZ\x1b\xf6\xacD\xb7\x9b^\xdc*#\x86\x8b=`s\xa5\x90\x99,\x0f)u$=V>\xa1\x80\xd2\x18\xeeu|\x9a" + + assert _do_encrypt_data_v2(plaintext, auth_key, random_padding) == expected + + +def test_decrypt_server_data_v2() -> None: + ciphertext = b'zq\x83\xc2\xc1\x0eOM\xf9E\xfa\x9a\x9a\xbd5\xe7\xc3\x84\x0ba\xf0E0O9gL\x19\xc0\xe2\txOP\xf6"j\x075)\xd6u\xc9,\xbf\x0b\xfa\x8c\x99\xa7\x9b?9\xc7*]\x9a\x02mC\x1a\xb7@|\xa0N\xccU\x18}lE\xf1xqRN\xdd\x90\xce\xa0.\xd7(\xe1M|\xb1\x8a\xea*caX\xf0\x94Y\xa9Cw\x10\xd8\x94\xc7\x9f6\x8cN\x81d\xb7d~\xa9\x86\x12\xae\xfe\x94,]\x92\x12\x1a\xcb\x8d\xb0-\xcc\xce\xb6m\x0f\x87 \xac\x12\xa0m\xb0X+\xfd\x95[\xe3O6Q\x18\xe3\xba\xb8\xcd\x08\x0c\xe6\xb4[(\xea\xc5m\xcd*)7N' + auth_key = AuthKey.from_bytes( + b"].}e\xf4\x9e\xc2\x8b\xd0)\xa8\x87a\xea'\xb8\xa4\xc7\x9f\x12\"e%D>}|Yn\xf3050\xdb!\x07\xe8\x9a\xa9\x97\xc7\xa0\x16J\xb6\x94\x18z\xde\xff\x15k\xd6\xefq\x18\xa1\x96#Gu<\x0e~\x89\xa05K\x8e\xc3d\xf9\x99~q\xbci#\xfb\x86\xe8\xe44\x91\xe0\x10`jl\xe8E\xe2\xfa\x01\x94\tw\xef\n\xa3*\xdfZ\x97\xdb\xf6\xd4(\xec\x044\xd7\x17\xa2\xd3\xad\x19b,\xc0X\x87d!\x13\xc7\x96_\xfb\x86*><\xcb\n\xb9Z\xdd\xdaW\xf8\x92E\xdb\xd7kI#H\xf8\xe9K\xd5\xa7\xc0\xe0\xb8H\x08R<\xfd\x1e\xa8\x0b2\xfe\x9a\xd1\x98\xbc.\x10?\xce\xb7\xd5$\x92\xec\xc0':(gK\xc9#\xee\xe5\x92e\xab\x17\xa0\x02\xdf\x1fJ\xa2\xc5\x9b\x81\x9a^^\x1d\x10^\xc1\x173o\\v\xc6\xb1\x87\x03}KBp\xce\xe9\xcc!\x07\x1d\x97\xe9\xbc\xa2 \xc6\xd7\xb0\x1b\x99\x8c\xf2\xe5\xcd\xb9\xa5\x0e\xcd\xa1\x85*6\xe65i\x0c\x8e" + ) + expected = b"\xfc\x82j\x02$\x8b(\xfd`\xf2\xc4\x82$C\xadh\x01\xf0\xc1\xc2\x91\x8b0^\x02\x00\x00\x00X\x00\x00\x00\xdc\xf8\xf1s\x02\x00\x00\x00\x01\xa8\xc1\xc2\x91\x8b0^\x01\x00\x00\x00\x1c\x00\x00\x00\x08\t\xc2\x9e\xc4\xfd3\xad\x91\x8b0^\x18\xa8\x8e\xa6\x07\xeeX\x16\xfc\x82j\x02$\x8b(\xfd\x01\xcc\xc1\xc2\x91\x8b0^\x02\x00\x00\x00\x14\x00\x00\x00\xc5sw4\xc4\xfd3\xad\x91\x8b0^d\x080\x00\x00\x00\x00\x00\xfc\xe6g\x04\xa3\xcd\x8e\xe9\xd0\xaeo\xabg,`\xc0J?\x1f\xd4I\x0eQ\xf6" + + assert decrypt_data_v2(ciphertext, auth_key) == expected + + +def test_key_from_nonce() -> None: + server_nonce = bytes(range(16)) + new_nonce = bytes(range(32)) + + (key, iv) = generate_key_data_from_nonce(server_nonce, new_nonce) + assert ( + key + == b'\x07X\xf1S;a]$\xf6\xe8\xa9Jo\xcb\xee\nU\xea\xab"\x17\xd7)\\\xa9!=\x1a-}\x16\xa6' + ) + assert ( + iv + == b"Z\x84\x10\x8e\x98\x05el\xe8d\x07\x0e\x16nb\x18\xf6x>\x85\x11G\x1aZ\xb7\x80,\xf2\x00\x01\x02\x03" + ) + + +def test_verify_ige_encryption() -> None: + plaintext = get_test_aes_key_or_iv() + key = get_test_aes_key_or_iv() + iv = get_test_aes_key_or_iv() + expected = b"\xe2\x81\x12\xa5>\\\x89\xc7\xb1\xea\x80q\xc13i\x9f\xd4\xe8k&\xc4\xba\xc9\xfcZ\xf1\xab\x8c\xe2zD\xa4" + assert encrypt_ige(plaintext, key, iv) == expected + + +def test_verify_ige_decryption() -> None: + ciphertext = get_test_aes_key_or_iv() + key = get_test_aes_key_or_iv() + iv = get_test_aes_key_or_iv() + expected = b"\xe5wz\xfa\xcd{,\x16\xf7\xac@\xca\xe6\x1e\xf6\x03\xfe\xe6\t\x8f\xb8\xa8\x86\n\xb9\xeeg,\xd7\xe5\xba\xcc" + assert decrypt_ige(ciphertext, key, iv) == expected diff --git a/client/tests/factorize_test.py b/client/tests/factorize_test.py new file mode 100644 index 00000000..025bec2a --- /dev/null +++ b/client/tests/factorize_test.py @@ -0,0 +1,16 @@ +from telethon._impl.crypto.factorize import factorize + + +def test_factorization_1() -> None: + pq = factorize(1470626929934143021) + assert pq == (1206429347, 1218991343) + + +def test_factorization_2() -> None: + pq = factorize(2363612107535801713) + assert pq == (1518968219, 1556064227) + + +def test_factorization_3() -> None: + pq = factorize(2000000000000000006) + assert pq == (2, 1000000000000000003) diff --git a/client/tests/rsa_test.py b/client/tests/rsa_test.py new file mode 100644 index 00000000..ad03392a --- /dev/null +++ b/client/tests/rsa_test.py @@ -0,0 +1,15 @@ +from telethon._impl.crypto.rsa import ( + PRODUCTION_RSA_KEY, + TESTMODE_RSA_KEY, + compute_fingerprint, +) + + +def test_fingerprint_1() -> None: + fp = compute_fingerprint(PRODUCTION_RSA_KEY) + assert fp == -3414540481677951611 + + +def test_fingerprint_2() -> None: + fp = compute_fingerprint(TESTMODE_RSA_KEY) + assert fp == -5595554452916591101 diff --git a/client/tests/two_factor_auth_test.py b/client/tests/two_factor_auth_test.py new file mode 100644 index 00000000..40a60b82 --- /dev/null +++ b/client/tests/two_factor_auth_test.py @@ -0,0 +1,82 @@ +from pytest import mark, raises +from telethon._impl.crypto.two_factor_auth import ( + calculate_2fa, + check_p_prime_and_subgroup, + pad_to_256, +) + + +def test_calculations_1() -> None: + m1, g_a = calculate_2fa( + salt1=bytes((1,)), + salt2=bytes((2,)), + g=3, + p=pad_to_256(bytes((47,))), + g_b=bytes((5,)), + a=bytes((6,)), + password=bytes((7,)), + ) + + expected_m1 = b"\x9d\x83\xc4g\x00\xb8t\xe8\x07\xc4U\xe7\x11$\x1e\xde\x9e\xeabX;8G\xd7\xb7{z2\x13 6\xce" + expected_g_a = bytes(255) + b"\x18" + + assert expected_m1 == m1 + assert expected_g_a == g_a + + +def test_calculations_2() -> None: + (m1, g_a) = calculate_2fa( + salt1=b"_H<8\xbd\t\x86\xe7\xcd\xc9Z\xe18\xefOI\xb9Q\xc1\xf8\x1cq?\xec\xde\xf3\xafi,\xecKG\x16\xac\x9bw\n\x19^\xbe", + salt2=b"\xb6\x16\xfck\xbe\xdfQ\x11\x19\xc5\xed4b\x95'\xf1", + g=3, + p=b"\xc7\x1c\xae\xb9\xc6\xb1\xc9\x04\x8elR/p\xf1?s\x98\r@#\x8e>!\xc1I4\xd07V=\x93\x0fH\x19\x8a\n\xa7\xc1@X\"\x94\x93\xd2%0\xf4\xdb\xfa3on\n\xc9%\x13\x95C\xae\xd4L\xce|7 \xfdQ\xf6\x94XpZ\xc6\x8c\xd4\xfekk\x13\xab\xdc\x97FQ)i2\x84T\xf1\x8f\xaf\x8cY_d$w\xfe\x96\xbb*\x94\x1d[\xcd\x1dJ\xc8\xccI\x88\x07\x08\xfa\x9b7\x8e\xbe\xa0\xf8\x7f\xa9\xff^\xedp\x05\r\xed(I\xf4{\xf9Y\xd9V\x85\x0c\xe9)\x85\x1f\r\x81\x15\xf65\xb1\x05\xee.N\x15\xd0K$T\xbfoO\xad\xf04\xb1\x04\x03\x11\x9c\xd8\xe3\xb9/\xcc[", + g_b=b"\x93\xf7\x0e\xbdP\xf6Bj\xca%hv\x95\x99\xf9\x1f$\xd0\x12\x84\xccQ\xa4I\xe6-\xcc\x15'\xdf\xe5\x01&\xb2\xaaD#\x8eO\xb23\x14\x19\xedJ\xeb\xf1\xa0\xae\x15\xe0:\xbd\x18\xbf\xc5,\xa6\xba\xecVL\x13\xb5\xa1\xd2\xe3Wy\x98\x97u{\xb76\xfd\xc2\xce\xb1\xb5j\xac\xf1\x9a\xb3T\x8dm\x92*R/\x0bQ\xf4\x01$\xc3\xbc\x996\xaf\xf3\xe1\xdc\xfb\xea9\xac\x9a\xd2\xad\xdcj\xf0\xad0x2x\xbb\xb8L\xab\x0e\xd8FK\x0f\xfe\xb2\xb0\xc9:9\xa5\xd9}\xba\x01\x05g,\xa5GSs\xd8\xd2>T\xa6\xac\x9b\xed\x95\x19\xe8\xbe\xf4\xf0\x07\x19\xf5\xadV\x15\x1b\xe5SvH\xdf/\x8e>re\xcbW\xfb\x94\xa0T\xce*\x82\xb8\xccfJ\xd0`\xe0\xd6\xc6\xdf\x18y4T@\xeb\x97\x7f\xa0\xf2\xd3o1\xa2S\xd8\x91w2\xf13\xd4\x003\xa3KaR\x96\x9b`\rY\xcd\xab\xfe\xa2\xab#\x93\xade\x9eV\xd6n\x13`[\x1fa\xe4\x8e<\xd6\\\x0fX\xac", + a=b"\xbf1WO4\xfc\xe1\xe58\x91\xc5\x9b\x7fbF\x8a\x0c\xa6\x82\xda\x85\x85\xdf\x8d\xe0\xa1\x88s5\x97U\xfb\xb1\x81\x88x\xa9\xee\x91\x9b\xb1\xe9M \xc5\xf0`~\x02\xa31v\x19\x9b\xf3\"\x02V\xc9\xea\x1ai\xf3\x95\xa5\x15\xd2\x059\xd8\x8c\xdau\nRR\xfb\x86OW?+\x03/;F}\x08\xb3O\xd9\xc8\x9d\x1c]\x06'\x8e\x11>Q\xd4\xe8\x93\xc1\xc0'EZ\xf4e?\tf\x07\xa4\x15m\x94\xfb\x8e\x1d\xc7\xce\xe5\xbf\xe3(P\x98/\x94\x1a\xe4AN\x83\xbf\"\xdfV'\x0bC\xb7\xcc\xc4L&\xd4\x08\x86FM\xa8\xe3D\xa8T\x07\x95\xb8\xf6\x9b\x8fP\x85R\xa7#\xcdi1\xe1\xd6\x92\x04\xe8\xe9\xdc\x05o\n*\x10\xa0\xd7\x95\x1e5_>\xde\xf5\xa5\xe1\x8a\x90\x91)ZQ\xeb\x9d\xb1\x0b\x8b\r0H\x9c\x8d)\xbc\x0c\xd8n\x97x\x1f^0\xc5\xb6\xbf\xe7\xca\xf4\xaa\xe8\x1b(.e:\xc4\x8a\xa1\xa8\xfd\xe7\x89r+\xc0OC \xcd\x9f\x86\x84\x9f\xe0\\\xa4", + password=b"234567", + ) + + expected_m1 = b"Mz\xf4\x12\xc5\xa2\xe7\xb1Tg7k\xd1\x18\xb8S`Nh{1\xf5\x1cI\x80\xc4\xd7\xc1\x87f\x13\xe3" + expected_g_a = b"\x0f\xa1+\xc6U\xb1\x18z()ji\xaeV]h'\x82\xe0\xce\xb0Z\x08\x9c\x0cA\xc1\xdc\xe9\x83\xdc\x7fJSCN\xa7\x8e\x06]\x9e\x1c\xb6\x0eB{Dh\xa4x\x06\t\xfe\xba/UeN\xe2\xef\xe0\xae\xb7.\xda\xfd\xe2e.&\xed[MK\xaa\xd9\xd2\xa3\x81\x80j\xf64\x16\xbfbc\xdfE\xa4=\x85\xbeT\x01\xbc\">\xbf\xac\tB\x1c\xad\xdd~&\x0b\xd6\xb8eB\x13<\x04\x8dl\xd5K8\xd8\xe2\xcc\xdfkU\x0e\x87[\x13S\xa4\xac\xfe2\x92\xff\xb5j\x0fX\xb2\xa3\x90'\xc9\xbf\xdd\x91\xfdLS\x1d#\xc7}n\x8f}X>\xae\xea1m\xed\xde\xd6\x995)l\xe7\xea4\xe9\xben\xf2\xfb\xd8)\xea\xc4\xc9\xbddm\xc1V>G\xf7{\x91C\x1c\xa0\x02\xcfy\xfc\x14\x9d\x96\x82r\x83\\\x15\xca\x1ck,^\x03q*.\x1b]R\xf5\xe4\xfa\xa1\xc1l\xb1w\xfa\xfd\x96\xa6\xaa[K?Ld\x99\x15dKc\x85\\\xfb8V\x1f\xf1\x7f\xed\xfb\x8a" + + assert expected_m1 == m1 + assert expected_g_a == g_a + + +@mark.parametrize(("p", "g"), [(4, 0), (13, 0)]) +def test_bad_g(p: int, g: int) -> None: + with raises(ValueError) as e: + check_p_prime_and_subgroup(p.to_bytes((p.bit_length() + 7) // 8), g) + + assert e.match("bad g") + + +@mark.parametrize( + ("p", "g"), + [ + (11, 2), + (13, 3), + (13, 5), + (13, 6), + (13, 7), + ], +) +def test_incorrect_pg(p: int, g: int) -> None: + assert not check_p_prime_and_subgroup(p.to_bytes((p.bit_length() + 7) // 8), g) + + +@mark.parametrize( + ("p", "g"), + [ + (23, 2), + (47, 3), + (11, 4), + (11, 5), + (179, 5), + (383, 6), + (479, 7), + (383, 7), + (503, 7), + ], +) +def test_correct_pg(p: int, g: int) -> None: + assert check_p_prime_and_subgroup(p.to_bytes((p.bit_length() + 7) // 8), g) diff --git a/stubs/pyaes.pyi b/stubs/pyaes.pyi new file mode 100644 index 00000000..04ed6293 --- /dev/null +++ b/stubs/pyaes.pyi @@ -0,0 +1,6 @@ +from typing import List + +class AES: + def __init__(self, key: bytes) -> None: ... + def encrypt(self, plaintext: List[int]) -> List[int]: ... + def decrypt(self, ciphertext: List[int]) -> List[int]: ...