mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-03-28 13:54:13 +03:00
Document the crypto/ module
This commit is contained in:
parent
74ec6391d9
commit
a932fb6470
|
@ -1,3 +1,8 @@
|
||||||
|
"""
|
||||||
|
This module contains several utilities regarding cryptographic purposes,
|
||||||
|
such as the AES IGE mode used by Telegram, the authorization key bound with
|
||||||
|
their data centers, and so on.
|
||||||
|
"""
|
||||||
from .aes import AES
|
from .aes import AES
|
||||||
from .aes_ctr import AESModeCTR
|
from .aes_ctr import AESModeCTR
|
||||||
from .auth_key import AuthKey
|
from .auth_key import AuthKey
|
||||||
|
|
|
@ -1,3 +1,6 @@
|
||||||
|
"""
|
||||||
|
AES IGE implementation in Python. This module may use libssl if available.
|
||||||
|
"""
|
||||||
import os
|
import os
|
||||||
import pyaes
|
import pyaes
|
||||||
from . import libssl
|
from . import libssl
|
||||||
|
@ -9,10 +12,15 @@ if libssl.AES is not None:
|
||||||
else:
|
else:
|
||||||
# Fallback to a pure Python implementation
|
# Fallback to a pure Python implementation
|
||||||
class AES:
|
class AES:
|
||||||
|
"""
|
||||||
|
Class that servers as an interface to encrypt and decrypt
|
||||||
|
text through the AES IGE mode.
|
||||||
|
"""
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def decrypt_ige(cipher_text, key, iv):
|
def decrypt_ige(cipher_text, key, iv):
|
||||||
"""Decrypts the given text in 16-bytes blocks by using the
|
"""
|
||||||
given key and 32-bytes initialization vector
|
Decrypts the given text in 16-bytes blocks by using the
|
||||||
|
given key and 32-bytes initialization vector.
|
||||||
"""
|
"""
|
||||||
iv1 = iv[:len(iv) // 2]
|
iv1 = iv[:len(iv) // 2]
|
||||||
iv2 = iv[len(iv) // 2:]
|
iv2 = iv[len(iv) // 2:]
|
||||||
|
@ -42,8 +50,9 @@ else:
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def encrypt_ige(plain_text, key, iv):
|
def encrypt_ige(plain_text, key, iv):
|
||||||
"""Encrypts the given text in 16-bytes blocks by using the
|
"""
|
||||||
given key and 32-bytes initialization vector
|
Encrypts the given text in 16-bytes blocks by using the
|
||||||
|
given key and 32-bytes initialization vector.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Add random padding iff it's not evenly divisible by 16 already
|
# Add random padding iff it's not evenly divisible by 16 already
|
||||||
|
|
|
@ -1,3 +1,6 @@
|
||||||
|
"""
|
||||||
|
This module holds the AESModeCTR wrapper class.
|
||||||
|
"""
|
||||||
import pyaes
|
import pyaes
|
||||||
|
|
||||||
|
|
||||||
|
@ -6,6 +9,12 @@ class AESModeCTR:
|
||||||
# TODO Maybe make a pull request to pyaes to support iv on CTR
|
# TODO Maybe make a pull request to pyaes to support iv on CTR
|
||||||
|
|
||||||
def __init__(self, key, iv):
|
def __init__(self, key, iv):
|
||||||
|
"""
|
||||||
|
Initializes the AES CTR mode with the given key/iv pair.
|
||||||
|
|
||||||
|
:param key: the key to be used as bytes.
|
||||||
|
:param iv: the bytes initialization vector. Must have a length of 16.
|
||||||
|
"""
|
||||||
# TODO Use libssl if available
|
# TODO Use libssl if available
|
||||||
assert isinstance(key, bytes)
|
assert isinstance(key, bytes)
|
||||||
self._aes = pyaes.AESModeOfOperationCTR(key)
|
self._aes = pyaes.AESModeOfOperationCTR(key)
|
||||||
|
@ -15,7 +24,19 @@ class AESModeCTR:
|
||||||
self._aes._counter._counter = list(iv)
|
self._aes._counter._counter = list(iv)
|
||||||
|
|
||||||
def encrypt(self, data):
|
def encrypt(self, data):
|
||||||
|
"""
|
||||||
|
Encrypts the given plain text through AES CTR.
|
||||||
|
|
||||||
|
:param data: the plain text to be encrypted.
|
||||||
|
:return: the encrypted cipher text.
|
||||||
|
"""
|
||||||
return self._aes.encrypt(data)
|
return self._aes.encrypt(data)
|
||||||
|
|
||||||
def decrypt(self, data):
|
def decrypt(self, data):
|
||||||
|
"""
|
||||||
|
Decrypts the given cipher text through AES CTR
|
||||||
|
|
||||||
|
:param data: the cipher text to be decrypted.
|
||||||
|
:return: the decrypted plain text.
|
||||||
|
"""
|
||||||
return self._aes.decrypt(data)
|
return self._aes.decrypt(data)
|
||||||
|
|
|
@ -1,3 +1,6 @@
|
||||||
|
"""
|
||||||
|
This module holds the AuthKey class.
|
||||||
|
"""
|
||||||
import struct
|
import struct
|
||||||
from hashlib import sha1
|
from hashlib import sha1
|
||||||
|
|
||||||
|
@ -6,7 +9,16 @@ from ..extensions import BinaryReader
|
||||||
|
|
||||||
|
|
||||||
class AuthKey:
|
class AuthKey:
|
||||||
|
"""
|
||||||
|
Represents an authorization key, used to encrypt and decrypt
|
||||||
|
messages sent to Telegram's data centers.
|
||||||
|
"""
|
||||||
def __init__(self, data):
|
def __init__(self, data):
|
||||||
|
"""
|
||||||
|
Initializes a new authorization key.
|
||||||
|
|
||||||
|
:param data: the data in bytes that represent this auth key.
|
||||||
|
"""
|
||||||
self.key = data
|
self.key = data
|
||||||
|
|
||||||
with BinaryReader(sha1(self.key).digest()) as reader:
|
with BinaryReader(sha1(self.key).digest()) as reader:
|
||||||
|
@ -15,8 +27,12 @@ class AuthKey:
|
||||||
self.key_id = reader.read_long(signed=False)
|
self.key_id = reader.read_long(signed=False)
|
||||||
|
|
||||||
def calc_new_nonce_hash(self, new_nonce, number):
|
def calc_new_nonce_hash(self, new_nonce, number):
|
||||||
"""Calculates the new nonce hash based on
|
"""
|
||||||
the current class fields' values
|
Calculates the new nonce hash based on the current attributes.
|
||||||
|
|
||||||
|
:param new_nonce: the new nonce to be hashed.
|
||||||
|
:param number: number to prepend before the hash.
|
||||||
|
:return: the hash for the given new nonce.
|
||||||
"""
|
"""
|
||||||
new_nonce = new_nonce.to_bytes(32, 'little', signed=True)
|
new_nonce = new_nonce.to_bytes(32, 'little', signed=True)
|
||||||
data = new_nonce + struct.pack('<BQ', number, self.aux_hash)
|
data = new_nonce + struct.pack('<BQ', number, self.aux_hash)
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
|
"""
|
||||||
|
This module holds the CdnDecrypter utility class.
|
||||||
|
"""
|
||||||
from hashlib import sha256
|
from hashlib import sha256
|
||||||
|
|
||||||
from ..tl import Session
|
|
||||||
from ..tl.functions.upload import GetCdnFileRequest, ReuploadCdnFileRequest
|
from ..tl.functions.upload import GetCdnFileRequest, ReuploadCdnFileRequest
|
||||||
from ..tl.types.upload import CdnFileReuploadNeeded, CdnFile
|
from ..tl.types.upload import CdnFileReuploadNeeded, CdnFile
|
||||||
from ..crypto import AESModeCTR
|
from ..crypto import AESModeCTR
|
||||||
|
@ -8,11 +10,20 @@ from ..errors import CdnFileTamperedError
|
||||||
|
|
||||||
|
|
||||||
class CdnDecrypter:
|
class CdnDecrypter:
|
||||||
"""Used when downloading a file results in a 'FileCdnRedirect' to
|
"""
|
||||||
both prepare the redirect, decrypt the file as it downloads, and
|
Used when downloading a file results in a 'FileCdnRedirect' to
|
||||||
ensure the file hasn't been tampered. https://core.telegram.org/cdn
|
both prepare the redirect, decrypt the file as it downloads, and
|
||||||
|
ensure the file hasn't been tampered. https://core.telegram.org/cdn
|
||||||
"""
|
"""
|
||||||
def __init__(self, cdn_client, file_token, cdn_aes, cdn_file_hashes):
|
def __init__(self, cdn_client, file_token, cdn_aes, cdn_file_hashes):
|
||||||
|
"""
|
||||||
|
Initializes the CDN decrypter.
|
||||||
|
|
||||||
|
:param cdn_client: a client connected to a CDN.
|
||||||
|
:param file_token: the token of the file to be used.
|
||||||
|
:param cdn_aes: the AES CTR used to decrypt the file.
|
||||||
|
:param cdn_file_hashes: the hashes the decrypted file must match.
|
||||||
|
"""
|
||||||
self.client = cdn_client
|
self.client = cdn_client
|
||||||
self.file_token = file_token
|
self.file_token = file_token
|
||||||
self.cdn_aes = cdn_aes
|
self.cdn_aes = cdn_aes
|
||||||
|
@ -20,10 +31,13 @@ class CdnDecrypter:
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def prepare_decrypter(client, cdn_client, cdn_redirect):
|
def prepare_decrypter(client, cdn_client, cdn_redirect):
|
||||||
"""Prepares a CDN decrypter, returning (decrypter, file data).
|
"""
|
||||||
'client' should be an existing client not connected to a CDN.
|
Prepares a new CDN decrypter.
|
||||||
'cdn_client' should be an already-connected TelegramBareClient
|
|
||||||
with the auth key already created.
|
:param client: a TelegramClient connected to the main servers.
|
||||||
|
:param cdn_client: a new client connected to the CDN.
|
||||||
|
:param cdn_redirect: the redirect file object that caused this call.
|
||||||
|
:return: (CdnDecrypter, first chunk file data)
|
||||||
"""
|
"""
|
||||||
cdn_aes = AESModeCTR(
|
cdn_aes = AESModeCTR(
|
||||||
key=cdn_redirect.encryption_key,
|
key=cdn_redirect.encryption_key,
|
||||||
|
@ -60,8 +74,11 @@ class CdnDecrypter:
|
||||||
return decrypter, cdn_file
|
return decrypter, cdn_file
|
||||||
|
|
||||||
def get_file(self):
|
def get_file(self):
|
||||||
"""Calls GetCdnFileRequest and decrypts its bytes.
|
"""
|
||||||
Also ensures that the file hasn't been tampered.
|
Calls GetCdnFileRequest and decrypts its bytes.
|
||||||
|
Also ensures that the file hasn't been tampered.
|
||||||
|
|
||||||
|
:return: the CdnFile result.
|
||||||
"""
|
"""
|
||||||
if self.cdn_file_hashes:
|
if self.cdn_file_hashes:
|
||||||
cdn_hash = self.cdn_file_hashes.pop(0)
|
cdn_hash = self.cdn_file_hashes.pop(0)
|
||||||
|
@ -77,6 +94,12 @@ class CdnDecrypter:
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def check(data, cdn_hash):
|
def check(data, cdn_hash):
|
||||||
"""Checks the integrity of the given data"""
|
"""
|
||||||
|
Checks the integrity of the given data.
|
||||||
|
Raises CdnFileTamperedError if the integrity check fails.
|
||||||
|
|
||||||
|
:param data: the data to be hashed.
|
||||||
|
:param cdn_hash: the expected hash.
|
||||||
|
"""
|
||||||
if sha256(data).digest() != cdn_hash.hash:
|
if sha256(data).digest() != cdn_hash.hash:
|
||||||
raise CdnFileTamperedError()
|
raise CdnFileTamperedError()
|
||||||
|
|
|
@ -1,9 +1,21 @@
|
||||||
|
"""
|
||||||
|
This module holds a fast Factorization class.
|
||||||
|
"""
|
||||||
from random import randint
|
from random import randint
|
||||||
|
|
||||||
|
|
||||||
class Factorization:
|
class Factorization:
|
||||||
|
"""
|
||||||
|
Simple module to factorize large numbers really quickly.
|
||||||
|
"""
|
||||||
@classmethod
|
@classmethod
|
||||||
def factorize(cls, pq):
|
def factorize(cls, pq):
|
||||||
|
"""
|
||||||
|
Factorizes the given large integer.
|
||||||
|
|
||||||
|
:param pq: the prime pair pq.
|
||||||
|
:return: a tuple containing the two factors p and q.
|
||||||
|
"""
|
||||||
if pq % 2 == 0:
|
if pq % 2 == 0:
|
||||||
return 2, pq // 2
|
return 2, pq // 2
|
||||||
|
|
||||||
|
@ -39,6 +51,13 @@ class Factorization:
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def gcd(a, b):
|
def gcd(a, b):
|
||||||
|
"""
|
||||||
|
Calculates the Greatest Common Divisor.
|
||||||
|
|
||||||
|
:param a: the first number.
|
||||||
|
:param b: the second number.
|
||||||
|
:return: GCD(a, b)
|
||||||
|
"""
|
||||||
while b:
|
while b:
|
||||||
a, b = b, a % b
|
a, b = b, a % b
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,6 @@
|
||||||
|
"""
|
||||||
|
This module holds an AES IGE class, if libssl is available on the system.
|
||||||
|
"""
|
||||||
import os
|
import os
|
||||||
import ctypes
|
import ctypes
|
||||||
from ctypes.util import find_library
|
from ctypes.util import find_library
|
||||||
|
@ -35,14 +38,23 @@ else:
|
||||||
AES_DECRYPT = ctypes.c_int(0)
|
AES_DECRYPT = ctypes.c_int(0)
|
||||||
|
|
||||||
class AES_KEY(ctypes.Structure):
|
class AES_KEY(ctypes.Structure):
|
||||||
|
"""Helper class representing an AES key"""
|
||||||
_fields_ = [
|
_fields_ = [
|
||||||
('rd_key', ctypes.c_uint32 * (4*(AES_MAXNR + 1))),
|
('rd_key', ctypes.c_uint32 * (4*(AES_MAXNR + 1))),
|
||||||
('rounds', ctypes.c_uint),
|
('rounds', ctypes.c_uint),
|
||||||
]
|
]
|
||||||
|
|
||||||
class AES:
|
class AES:
|
||||||
|
"""
|
||||||
|
Class that servers as an interface to encrypt and decrypt
|
||||||
|
text through the AES IGE mode, using the system's libssl.
|
||||||
|
"""
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def decrypt_ige(cipher_text, key, iv):
|
def decrypt_ige(cipher_text, key, iv):
|
||||||
|
"""
|
||||||
|
Decrypts the given text in 16-bytes blocks by using the
|
||||||
|
given key and 32-bytes initialization vector.
|
||||||
|
"""
|
||||||
aeskey = AES_KEY()
|
aeskey = AES_KEY()
|
||||||
ckey = (ctypes.c_ubyte * len(key))(*key)
|
ckey = (ctypes.c_ubyte * len(key))(*key)
|
||||||
cklen = ctypes.c_int(len(key)*8)
|
cklen = ctypes.c_int(len(key)*8)
|
||||||
|
@ -65,6 +77,10 @@ else:
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def encrypt_ige(plain_text, key, iv):
|
def encrypt_ige(plain_text, key, iv):
|
||||||
|
"""
|
||||||
|
Encrypts the given text in 16-bytes blocks by using the
|
||||||
|
given key and 32-bytes initialization vector.
|
||||||
|
"""
|
||||||
# Add random padding iff it's not evenly divisible by 16 already
|
# Add random padding iff it's not evenly divisible by 16 already
|
||||||
if len(plain_text) % 16 != 0:
|
if len(plain_text) % 16 != 0:
|
||||||
padding_count = 16 - len(plain_text) % 16
|
padding_count = 16 - len(plain_text) % 16
|
||||||
|
|
|
@ -1,3 +1,6 @@
|
||||||
|
"""
|
||||||
|
This module holds several utilities regarding RSA and server fingerprints.
|
||||||
|
"""
|
||||||
import os
|
import os
|
||||||
import struct
|
import struct
|
||||||
from hashlib import sha1
|
from hashlib import sha1
|
||||||
|
@ -32,8 +35,11 @@ def get_byte_array(integer):
|
||||||
|
|
||||||
|
|
||||||
def _compute_fingerprint(key):
|
def _compute_fingerprint(key):
|
||||||
"""For a given Crypto.RSA key, computes its 8-bytes-long fingerprint
|
"""
|
||||||
in the same way that Telegram does.
|
Given a RSA key, computes its fingerprint like Telegram does.
|
||||||
|
|
||||||
|
:param key: the Crypto.RSA key.
|
||||||
|
:return: its 8-bytes-long fingerprint.
|
||||||
"""
|
"""
|
||||||
n = TLObject.serialize_bytes(get_byte_array(key.n))
|
n = TLObject.serialize_bytes(get_byte_array(key.n))
|
||||||
e = TLObject.serialize_bytes(get_byte_array(key.e))
|
e = TLObject.serialize_bytes(get_byte_array(key.e))
|
||||||
|
@ -49,8 +55,14 @@ def add_key(pub):
|
||||||
|
|
||||||
|
|
||||||
def encrypt(fingerprint, data):
|
def encrypt(fingerprint, data):
|
||||||
"""Given the fingerprint of a previously added RSA key, encrypt its data
|
"""
|
||||||
in the way Telegram requires us to do so (sha1(data) + data + padding)
|
Encrypts the given data known the fingerprint to be used
|
||||||
|
in the way Telegram requires us to do so (sha1(data) + data + padding)
|
||||||
|
|
||||||
|
:param fingerprint: the fingerprint of the RSA key.
|
||||||
|
:param data: the data to be encrypted.
|
||||||
|
:return:
|
||||||
|
the cipher text, or None if no key matching this fingerprint is found.
|
||||||
"""
|
"""
|
||||||
global _server_keys
|
global _server_keys
|
||||||
key = _server_keys.get(fingerprint, None)
|
key = _server_keys.get(fingerprint, None)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user