mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2024-11-22 17:36:34 +03:00
Use pycrypto's RSA instead the old weird RSA implementation
This commit is contained in:
parent
0cae62f091
commit
66876b6722
2
setup.py
2
setup.py
|
@ -94,5 +94,5 @@ if __name__ == '__main__':
|
||||||
'telethon_generator', 'telethon_tests', 'run_tests.py',
|
'telethon_generator', 'telethon_tests', 'run_tests.py',
|
||||||
'try_telethon.py'
|
'try_telethon.py'
|
||||||
]),
|
]),
|
||||||
install_requires=['pyaes']
|
install_requires=['pyaes', 'pycrypto']
|
||||||
)
|
)
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
from .aes import AES
|
from .aes import AES
|
||||||
from .rsa import RSA, RSAServerKey
|
|
||||||
from .auth_key import AuthKey
|
from .auth_key import AuthKey
|
||||||
from .factorization import Factorization
|
from .factorization import Factorization
|
||||||
|
|
|
@ -1,60 +1,74 @@
|
||||||
import os
|
import os
|
||||||
from hashlib import sha1
|
from hashlib import sha1
|
||||||
|
try:
|
||||||
|
from Crypto.PublicKey import RSA
|
||||||
|
except ImportError:
|
||||||
|
raise ImportError('Missing module "pycrypto", please install via pip.')
|
||||||
|
|
||||||
from ..extensions import BinaryWriter
|
from ..extensions import BinaryWriter
|
||||||
|
|
||||||
|
|
||||||
class RSAServerKey:
|
# {fingerprint: Crypto.PublicKey.RSA._RSAobj} dictionary
|
||||||
def __init__(self, fingerprint, m, e):
|
_server_keys = { }
|
||||||
self.fingerprint = fingerprint
|
|
||||||
self.m = m
|
|
||||||
self.e = e
|
|
||||||
|
|
||||||
def encrypt(self, data, offset=None, length=None):
|
|
||||||
"""Encrypts the given data with the current key"""
|
|
||||||
if offset is None:
|
|
||||||
offset = 0
|
|
||||||
if length is None:
|
|
||||||
length = len(data)
|
|
||||||
|
|
||||||
with BinaryWriter() as writer:
|
|
||||||
# Write SHA
|
|
||||||
writer.write(sha1(data[offset:offset + length]).digest())
|
|
||||||
# Write data
|
|
||||||
writer.write(data[offset:offset + length])
|
|
||||||
# Add padding if required
|
|
||||||
if length < 235:
|
|
||||||
writer.write(os.urandom(235 - length))
|
|
||||||
|
|
||||||
result = int.from_bytes(writer.get_bytes(), byteorder='big')
|
|
||||||
result = pow(result, self.e, self.m)
|
|
||||||
|
|
||||||
# If the result byte count is less than 256, since the byte order is big,
|
|
||||||
# the non-used bytes on the left will be 0 and act as padding,
|
|
||||||
# without need of any additional checks
|
|
||||||
return int.to_bytes(
|
|
||||||
result, length=256, byteorder='big', signed=False)
|
|
||||||
|
|
||||||
|
|
||||||
class RSA:
|
def get_byte_array(integer):
|
||||||
_server_keys = {
|
"""Return the variable length bytes corresponding to the given int"""
|
||||||
'216be86c022bb4c3': RSAServerKey('216be86c022bb4c3', int(
|
# Operate in big endian (unlike most of Telegram API) since:
|
||||||
'C150023E2F70DB7985DED064759CFECF0AF328E69A41DAF4D6F01B538135A6F9'
|
# > "...pq is a representation of a natural number
|
||||||
'1F8F8B2A0EC9BA9720CE352EFCF6C5680FFC424BD634864902DE0B4BD6D49F4E'
|
# (in binary *big endian* format)..."
|
||||||
'580230E3AE97D95C8B19442B3C0A10D8F5633FECEDD6926A7F6DAB0DDB7D457F'
|
# > "...current value of dh_prime equals
|
||||||
'9EA81B8465FCD6FFFEED114011DF91C059CAEDAF97625F6C96ECC74725556934'
|
# (in *big-endian* byte order)..."
|
||||||
'EF781D866B34F011FCE4D835A090196E9A5F0E4449AF7EB697DDB9076494CA5F'
|
# Reference: https://core.telegram.org/mtproto/auth_key
|
||||||
'81104A305B6DD27665722C46B60E5DF680FB16B210607EF217652E60236C255F'
|
return int.to_bytes(
|
||||||
'6A28315F4083A96791D7214BF64C1DF4FD0DB1944FB26A2A57031B32EEE64AD1'
|
integer,
|
||||||
'5A8BA68885CDE74A5BFC920F6ABF59BA5C75506373E7130F9042DA922179251F',
|
length=(integer.bit_length() + 8 - 1) // 8, # 8 bits per byte,
|
||||||
16), int('010001', 16))
|
byteorder='big',
|
||||||
}
|
signed=False
|
||||||
|
)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def encrypt(fingerprint, data, offset=None, length=None):
|
|
||||||
"""Encrypts the given data given a fingerprint"""
|
|
||||||
if fingerprint.lower() not in RSA._server_keys:
|
|
||||||
return None
|
|
||||||
|
|
||||||
key = RSA._server_keys[fingerprint.lower()]
|
def _compute_fingerprint(key):
|
||||||
return key.encrypt(data, offset, length)
|
"""For a given Crypto.RSA key, computes its 8-bytes-long fingerprint
|
||||||
|
in the same way that Telegram does.
|
||||||
|
"""
|
||||||
|
with BinaryWriter() as writer:
|
||||||
|
writer.tgwrite_bytes(get_byte_array(key.n))
|
||||||
|
writer.tgwrite_bytes(get_byte_array(key.e))
|
||||||
|
# Telegram uses the last 8 bytes as the fingerprint
|
||||||
|
return sha1(writer.get_bytes()).digest()[-8:]
|
||||||
|
|
||||||
|
|
||||||
|
def add_key(pub):
|
||||||
|
"""Adds a new public key to be used when encrypting new data is needed"""
|
||||||
|
global _server_keys
|
||||||
|
key = RSA.importKey(pub)
|
||||||
|
_server_keys[_compute_fingerprint(key)] = key
|
||||||
|
|
||||||
|
|
||||||
|
def encrypt(fingerprint, data):
|
||||||
|
"""Given the fingerprint of a previously added RSA key, encrypt its data
|
||||||
|
in the way Telegram requires us to do so (sha1(data) + data + padding)
|
||||||
|
"""
|
||||||
|
global _server_keys
|
||||||
|
key = _server_keys.get(fingerprint, None)
|
||||||
|
if not key:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# len(sha1.digest) is always 20, so we're left with 255 - 20 - x padding
|
||||||
|
to_encrypt = sha1(data).digest() + data + os.urandom(235 - len(data))
|
||||||
|
return key.encrypt(to_encrypt, 0)[0]
|
||||||
|
|
||||||
|
|
||||||
|
# Add default keys
|
||||||
|
for pub in (
|
||||||
|
'''-----BEGIN RSA PUBLIC KEY-----
|
||||||
|
MIIBCgKCAQEAwVACPi9w23mF3tBkdZz+zwrzKOaaQdr01vAbU4E1pvkfj4sqDsm6
|
||||||
|
lyDONS789sVoD/xCS9Y0hkkC3gtL1tSfTlgCMOOul9lcixlEKzwKENj1Yz/s7daS
|
||||||
|
an9tqw3bfUV/nqgbhGX81v/+7RFAEd+RwFnK7a+XYl9sluzHRyVVaTTveB2GazTw
|
||||||
|
Efzk2DWgkBluml8OREmvfraX3bkHZJTKX4EQSjBbbdJ2ZXIsRrYOXfaA+xayEGB+
|
||||||
|
8hdlLmAjbCVfaigxX0CDqWeR1yFL9kwd9P0NsZRPsmoqVwMbMu7mStFai6aIhc3n
|
||||||
|
Slv8kg9qv1m6XHVQY3PnEw+QQtqSIXklHwIDAQAB
|
||||||
|
-----END RSA PUBLIC KEY-----''',
|
||||||
|
):
|
||||||
|
add_key(pub)
|
||||||
|
|
|
@ -3,7 +3,8 @@ import time
|
||||||
from hashlib import sha1
|
from hashlib import sha1
|
||||||
|
|
||||||
from .. import helpers as utils
|
from .. import helpers as utils
|
||||||
from ..crypto import AES, RSA, AuthKey, Factorization
|
from ..crypto import AES, AuthKey, Factorization
|
||||||
|
from ..crypto import rsa
|
||||||
from ..network import MtProtoPlainSender
|
from ..network import MtProtoPlainSender
|
||||||
from ..extensions import BinaryReader, BinaryWriter
|
from ..extensions import BinaryReader, BinaryWriter
|
||||||
|
|
||||||
|
@ -56,22 +57,20 @@ def do_authentication(transport):
|
||||||
with BinaryWriter() as pq_inner_data_writer:
|
with BinaryWriter() as pq_inner_data_writer:
|
||||||
pq_inner_data_writer.write_int(
|
pq_inner_data_writer.write_int(
|
||||||
0x83c95aec, signed=False) # PQ Inner Data
|
0x83c95aec, signed=False) # PQ Inner Data
|
||||||
pq_inner_data_writer.tgwrite_bytes(get_byte_array(pq, signed=False))
|
pq_inner_data_writer.tgwrite_bytes(rsa.get_byte_array(pq))
|
||||||
pq_inner_data_writer.tgwrite_bytes(
|
pq_inner_data_writer.tgwrite_bytes(rsa.get_byte_array(min(p, q)))
|
||||||
get_byte_array(
|
pq_inner_data_writer.tgwrite_bytes(rsa.get_byte_array(max(p, q)))
|
||||||
min(p, q), signed=False))
|
|
||||||
pq_inner_data_writer.tgwrite_bytes(
|
|
||||||
get_byte_array(
|
|
||||||
max(p, q), signed=False))
|
|
||||||
pq_inner_data_writer.write(nonce)
|
pq_inner_data_writer.write(nonce)
|
||||||
pq_inner_data_writer.write(server_nonce)
|
pq_inner_data_writer.write(server_nonce)
|
||||||
pq_inner_data_writer.write(new_nonce)
|
pq_inner_data_writer.write(new_nonce)
|
||||||
|
|
||||||
|
# sha_digest + data + random_bytes
|
||||||
cipher_text, target_fingerprint = None, None
|
cipher_text, target_fingerprint = None, None
|
||||||
for fingerprint in fingerprints:
|
for fingerprint in fingerprints:
|
||||||
cipher_text = RSA.encrypt(
|
cipher_text = rsa.encrypt(
|
||||||
get_fingerprint_text(fingerprint),
|
fingerprint,
|
||||||
pq_inner_data_writer.get_bytes())
|
pq_inner_data_writer.get_bytes()
|
||||||
|
)
|
||||||
|
|
||||||
if cipher_text is not None:
|
if cipher_text is not None:
|
||||||
target_fingerprint = fingerprint
|
target_fingerprint = fingerprint
|
||||||
|
@ -80,20 +79,16 @@ def do_authentication(transport):
|
||||||
if cipher_text is None:
|
if cipher_text is None:
|
||||||
raise AssertionError(
|
raise AssertionError(
|
||||||
'Could not find a valid key for fingerprints: {}'
|
'Could not find a valid key for fingerprints: {}'
|
||||||
.format(', '.join([get_fingerprint_text(f)
|
.format(', '.join([repr(f) for f in fingerprints]))
|
||||||
for f in fingerprints])))
|
)
|
||||||
|
|
||||||
with BinaryWriter() as req_dh_params_writer:
|
with BinaryWriter() as req_dh_params_writer:
|
||||||
req_dh_params_writer.write_int(
|
req_dh_params_writer.write_int(
|
||||||
0xd712e4be, signed=False) # Req DH Params
|
0xd712e4be, signed=False) # Req DH Params
|
||||||
req_dh_params_writer.write(nonce)
|
req_dh_params_writer.write(nonce)
|
||||||
req_dh_params_writer.write(server_nonce)
|
req_dh_params_writer.write(server_nonce)
|
||||||
req_dh_params_writer.tgwrite_bytes(
|
req_dh_params_writer.tgwrite_bytes(rsa.get_byte_array(min(p, q)))
|
||||||
get_byte_array(
|
req_dh_params_writer.tgwrite_bytes(rsa.get_byte_array(max(p, q)))
|
||||||
min(p, q), signed=False))
|
|
||||||
req_dh_params_writer.tgwrite_bytes(
|
|
||||||
get_byte_array(
|
|
||||||
max(p, q), signed=False))
|
|
||||||
req_dh_params_writer.write(target_fingerprint)
|
req_dh_params_writer.write(target_fingerprint)
|
||||||
req_dh_params_writer.tgwrite_bytes(cipher_text)
|
req_dh_params_writer.tgwrite_bytes(cipher_text)
|
||||||
|
|
||||||
|
@ -159,9 +154,7 @@ def do_authentication(transport):
|
||||||
client_dh_inner_data_writer.write(nonce)
|
client_dh_inner_data_writer.write(nonce)
|
||||||
client_dh_inner_data_writer.write(server_nonce)
|
client_dh_inner_data_writer.write(server_nonce)
|
||||||
client_dh_inner_data_writer.write_long(0) # TODO retry_id
|
client_dh_inner_data_writer.write_long(0) # TODO retry_id
|
||||||
client_dh_inner_data_writer.tgwrite_bytes(
|
client_dh_inner_data_writer.tgwrite_bytes(rsa.get_byte_array(gb))
|
||||||
get_byte_array(
|
|
||||||
gb, signed=False))
|
|
||||||
|
|
||||||
with BinaryWriter() as client_dh_inner_data_with_hash_writer:
|
with BinaryWriter() as client_dh_inner_data_with_hash_writer:
|
||||||
client_dh_inner_data_with_hash_writer.write(
|
client_dh_inner_data_with_hash_writer.write(
|
||||||
|
@ -204,7 +197,7 @@ def do_authentication(transport):
|
||||||
raise NotImplementedError('Invalid server nonce from server')
|
raise NotImplementedError('Invalid server nonce from server')
|
||||||
|
|
||||||
new_nonce_hash1 = reader.read(16)
|
new_nonce_hash1 = reader.read(16)
|
||||||
auth_key = AuthKey(get_byte_array(gab, signed=False))
|
auth_key = AuthKey(rsa.get_byte_array(gab))
|
||||||
|
|
||||||
new_nonce_hash_calculated = auth_key.calc_new_nonce_hash(new_nonce,
|
new_nonce_hash_calculated = auth_key.calc_new_nonce_hash(new_nonce,
|
||||||
1)
|
1)
|
||||||
|
@ -223,23 +216,6 @@ def do_authentication(transport):
|
||||||
raise AssertionError('DH Gen unknown: {}'.format(hex(code)))
|
raise AssertionError('DH Gen unknown: {}'.format(hex(code)))
|
||||||
|
|
||||||
|
|
||||||
def get_fingerprint_text(fingerprint):
|
|
||||||
"""Gets a fingerprint text in 01-23-45-67-89-AB-CD-EF format (no hyphens)"""
|
|
||||||
return ''.join(hex(b)[2:].rjust(2, '0').upper() for b in fingerprint)
|
|
||||||
|
|
||||||
|
|
||||||
# The following methods operate in big endian (unlike most of Telegram API) because:
|
|
||||||
# > "...pq is a representation of a natural number (in binary *big endian* format)..."
|
|
||||||
# > "...current value of dh_prime equals (in *big-endian* byte order)..."
|
|
||||||
# Reference: https://core.telegram.org/mtproto/auth_key
|
|
||||||
def get_byte_array(integer, signed):
|
|
||||||
"""Gets the arbitrary-length byte array corresponding to the given integer"""
|
|
||||||
bits = integer.bit_length()
|
|
||||||
byte_length = (bits + 8 - 1) // 8 # 8 bits per byte
|
|
||||||
return int.to_bytes(
|
|
||||||
integer, length=byte_length, byteorder='big', signed=signed)
|
|
||||||
|
|
||||||
|
|
||||||
def get_int(byte_array, signed=True):
|
def get_int(byte_array, signed=True):
|
||||||
"""Gets the specified integer from its byte array. This should be used by the authenticator,
|
"""Gets the specified integer from its byte array. This should be used by the authenticator,
|
||||||
who requires the data to be in big endian"""
|
who requires the data to be in big endian"""
|
||||||
|
|
Loading…
Reference in New Issue
Block a user