From 922f17956bed00580d1934b084c8bd6f110f1d0b Mon Sep 17 00:00:00 2001 From: Lonami Date: Fri, 16 Sep 2016 14:04:46 +0200 Subject: [PATCH] Pythonized some parts --- crypto/aes.py | 22 ++++++++++++---------- crypto/rsa.py | 3 ++- network/authenticator.py | 7 ++++--- network/tcp_client.py | 1 - unittests/utils_tests.py | 11 +++++------ utils/helpers.py | 5 ----- 6 files changed, 23 insertions(+), 26 deletions(-) diff --git a/crypto/aes.py b/crypto/aes.py index dc0eda39..4f9054ad 100644 --- a/crypto/aes.py +++ b/crypto/aes.py @@ -1,3 +1,4 @@ +import os import pyaes @@ -10,7 +11,7 @@ class AES: aes = pyaes.AES(key) - plain_text = [0] * len(cipher_text) + plain_text = [] blocks_count = len(cipher_text) // 16 cipher_text_block = [0] * 16 @@ -24,27 +25,28 @@ class AES: plain_text_block[i] ^= iv1[i] iv1 = cipher_text[block_index * 16:block_index * 16 + 16] - iv2 = plain_text_block[0:16] + iv2 = plain_text_block[:] - plain_text[block_index * 16:block_index * 16 + 16] = plain_text_block[:16] + plain_text.extend(plain_text_block[:]) return bytes(plain_text) @staticmethod def encrypt_ige(plain_text, key, iv): """Encrypts the given text in 16-bytes blocks by using the given key and 32-bytes initialization vector""" - # TODO: Random padding? - if len(plain_text) % 16 != 0: # Add padding if and only if it's not evenly divisible by 16 already - padding = bytes(16 - len(plain_text) % 16) - plain_text += padding + + # Add random padding if and only if it's not evenly divisible by 16 already + if len(plain_text) % 16 != 0: + padding_count = 16 - len(plain_text) % 16 + plain_text += os.urandom(padding_count) iv1 = iv[:len(iv)//2] iv2 = iv[len(iv)//2:] aes = pyaes.AES(key) + cipher_text = [] blocks_count = len(plain_text) // 16 - cipher_text = [0] * len(plain_text) for block_index in range(blocks_count): plain_text_block = list(plain_text[block_index * 16:block_index * 16 + 16]) @@ -56,9 +58,9 @@ class AES: for i in range(16): cipher_text_block[i] ^= iv2[i] - iv1 = cipher_text_block[0:16] + iv1 = cipher_text_block[:] iv2 = plain_text[block_index * 16:block_index * 16 + 16] - cipher_text[block_index * 16:block_index * 16 + 16] = cipher_text_block[:16] + cipher_text.extend(cipher_text_block[:]) return bytes(cipher_text) diff --git a/crypto/rsa.py b/crypto/rsa.py index 44f858c0..5faa764a 100755 --- a/crypto/rsa.py +++ b/crypto/rsa.py @@ -1,5 +1,6 @@ import utils from utils import BinaryWriter +import os class RSAServerKey: @@ -22,7 +23,7 @@ class RSAServerKey: writer.write(data[offset:offset+length]) # Add padding if required if length < 235: - writer.write(utils.generate_random_bytes(235 - length)) + writer.write(os.urandom(235 - length)) result = int.from_bytes(writer.get_bytes(), byteorder='big') result = pow(result, self.e, self.m) diff --git a/network/authenticator.py b/network/authenticator.py index ff7a6eba..fdd0be65 100755 --- a/network/authenticator.py +++ b/network/authenticator.py @@ -1,3 +1,4 @@ +import os import time import utils from utils import BinaryWriter, BinaryReader @@ -11,7 +12,7 @@ def do_authentication(transport): sender = MtProtoPlainSender(transport) # Step 1 sending: PQ Request - nonce = utils.generate_random_bytes(16) + nonce = os.urandom(16) with BinaryWriter() as writer: writer.write_int(0x60469778, signed=False) # Constructor number writer.write(nonce) @@ -43,7 +44,7 @@ def do_authentication(transport): fingerprints.append(reader.read(8)) # Step 2 sending: DH Exchange - new_nonce = utils.generate_random_bytes(32) + new_nonce = os.urandom(32) p, q = Factorizator.factorize(pq) with BinaryWriter() as pq_inner_data_writer: pq_inner_data_writer.write_int(0x83c95aec, signed=False) # PQ Inner Data @@ -125,7 +126,7 @@ def do_authentication(transport): server_time = dh_inner_data_reader.read_int() time_offset = server_time - int(time.time()) - b = get_int(utils.generate_random_bytes(2048), signed=False) + b = get_int(os.urandom(2048), signed=False) gb = pow(g, b, dh_prime) gab = pow(ga, b, dh_prime) diff --git a/network/tcp_client.py b/network/tcp_client.py index 14b3043e..d5d8d61f 100755 --- a/network/tcp_client.py +++ b/network/tcp_client.py @@ -76,7 +76,6 @@ class TcpClient: # If everything went fine, return the read bytes return writer.get_bytes() - def cancel_read(self): """Cancels the read operation IF it hasn't yet started, raising a ReadCancelledError""" diff --git a/unittests/utils_tests.py b/unittests/utils_tests.py index edd0f668..3bb0c178 100644 --- a/unittests/utils_tests.py +++ b/unittests/utils_tests.py @@ -1,6 +1,5 @@ +import os import unittest - -import utils from utils import BinaryReader, BinaryWriter @@ -56,11 +55,11 @@ class UtilsTests(unittest.TestCase): @staticmethod def test_binary_tgwriter_tgreader(): - small_data = utils.generate_random_bytes(33) - small_data_padded = utils.generate_random_bytes(19) # +1 byte for length = 20 (evenly divisible by 4) + small_data = os.urandom(33) + small_data_padded = os.urandom(19) # +1 byte for length = 20 (evenly divisible by 4) - large_data = utils.generate_random_bytes(999) - large_data_padded = utils.generate_random_bytes(1024) + large_data = os.urandom(999) + large_data_padded = os.urandom(1024) data = (small_data, small_data_padded, large_data, large_data_padded) string = 'Testing Telegram strings, this should work properly!' diff --git a/utils/helpers.py b/utils/helpers.py index 1b49519d..97ee8c0d 100755 --- a/utils/helpers.py +++ b/utils/helpers.py @@ -11,11 +11,6 @@ def generate_random_long(signed=True): return int.from_bytes(os.urandom(8), signed=signed, byteorder='little') -def generate_random_bytes(count): - """Generates a random bytes array""" - return os.urandom(count) - - def load_settings(path='api/settings'): """Loads the user settings located under `api/`""" settings = {}