Several fixes to authenticator, added more unit tests

Some fixes include, in more detail:
- Using little over big endianess in some parts
- Flagging all the constructor numbers as unsigned
- Fixed bugs with factorizer
- Implemented TLSharp's RSA
This commit is contained in:
Lonami 2016-09-03 10:54:58 +02:00
parent 12cb66ab2c
commit 75a648f438
28 changed files with 226 additions and 53 deletions

0
.gitignore vendored Normal file → Executable file
View File

0
LICENSE Normal file → Executable file
View File

3
README.md Normal file → Executable file
View File

@ -7,10 +7,9 @@ The files without the previously mentioned notice are no longer part of TLSharp
to make them entirely different. to make them entirely different.
### Requirements ### Requirements
This project requires the following Python modules, which can be installed by issuing `sudo -H pip install <module>` on a This project requires the following Python modules, which can be installed by issuing `sudo -H pip3 install <module>` on a
Linux terminal: Linux terminal:
- `pyaes` ([GitHub](https://github.com/ricmoo/pyaes), [package index](https://pypi.python.org/pypi/pyaes)) - `pyaes` ([GitHub](https://github.com/ricmoo/pyaes), [package index](https://pypi.python.org/pypi/pyaes))
- `rsa` ([GitHub](https://github.com/sybrenstuvel/python-rsa), [package index](https://pypi.python.org/pypi/rsa))
### We need your help! ### We need your help!
As of now, the project is fully **untested** and with many pending things to do. If you know both Python and C#, please don't As of now, the project is fully **untested** and with many pending things to do. If you know both Python and C#, please don't

9
main.py Normal file → Executable file
View File

@ -1,10 +1,15 @@
import parser.tl_generator import parser.tl_generator
from network.tcp_transport import TcpTransport
from network.authenticator import do_authentication
if __name__ == '__main__': if __name__ == '__main__':
if not parser.tl_generator.tlobjects_exist(): if not parser.tl_generator.tlobjects_exist():
print('First run. Generating TLObjects...') print('First run. Generating TLObjects...')
parser.tl_generator.generate_tlobjects('scheme.tl') parser.tl_generator.generate_tlobjects('scheme.tl')
print('Done.') print('Done.')
pass transport = TcpTransport('149.154.167.91', 443)
auth_key, time_offset = do_authentication(transport)
print(auth_key)
print(time_offset)

0
network/__init__.py Normal file → Executable file
View File

33
network/authenticator.py Normal file → Executable file
View File

@ -12,7 +12,7 @@ from utils.auth_key import AuthKey
import utils.helpers as utils import utils.helpers as utils
import time import time
import pyaes import pyaes
import rsa from utils.rsa import RSA
def do_authentication(transport): def do_authentication(transport):
@ -21,7 +21,7 @@ def do_authentication(transport):
# Step 1 sending: PQ Request # Step 1 sending: PQ Request
nonce = utils.generate_random_bytes(16) nonce = utils.generate_random_bytes(16)
with BinaryWriter() as writer: with BinaryWriter() as writer:
writer.write_int(0x60469778) # Constructor number writer.write_int(0x60469778, signed=False) # Constructor number
writer.write(nonce) writer.write(nonce)
sender.send(writer.get_bytes()) sender.send(writer.get_bytes())
@ -39,7 +39,7 @@ def do_authentication(transport):
server_nonce = reader.read(16) server_nonce = reader.read(16)
pq_bytes = reader.tgread_bytes() pq_bytes = reader.tgread_bytes()
pq = int.from_bytes(pq_bytes, byteorder='big') pq = int.from_bytes(pq_bytes, byteorder='little')
vector_id = reader.read_int() vector_id = reader.read_int()
if vector_id != 0x1cb5c415: if vector_id != 0x1cb5c415:
@ -54,7 +54,7 @@ def do_authentication(transport):
new_nonce = utils.generate_random_bytes(32) new_nonce = utils.generate_random_bytes(32)
p, q = Factorizator.factorize(pq) p, q = Factorizator.factorize(pq)
with BinaryWriter() as pq_inner_data_writer: with BinaryWriter() as pq_inner_data_writer:
pq_inner_data_writer.write_int(0x83c95aec) # PQ Inner Data pq_inner_data_writer.write_int(0x83c95aec, signed=False) # PQ Inner Data
pq_inner_data_writer.tgwrite_bytes(utils.get_byte_array(pq, signed=False)) pq_inner_data_writer.tgwrite_bytes(utils.get_byte_array(pq, signed=False))
pq_inner_data_writer.tgwrite_bytes(utils.get_byte_array(min(p, q), signed=False)) pq_inner_data_writer.tgwrite_bytes(utils.get_byte_array(min(p, q), signed=False))
pq_inner_data_writer.tgwrite_bytes(utils.get_byte_array(max(p, q), signed=False)) pq_inner_data_writer.tgwrite_bytes(utils.get_byte_array(max(p, q), signed=False))
@ -64,8 +64,7 @@ def do_authentication(transport):
cipher_text, target_fingerprint = None, None cipher_text, target_fingerprint = None, None
for fingerprint in fingerprints: for fingerprint in fingerprints:
cipher_text = rsa.encrypt(str(fingerprint, encoding='utf-8').replace('-', ''), cipher_text = RSA.encrypt(get_fingerprint_text(fingerprint), pq_inner_data_writer.get_bytes())
pq_inner_data_writer.get_bytes())
if cipher_text is not None: if cipher_text is not None:
target_fingerprint = fingerprint target_fingerprint = fingerprint
@ -73,15 +72,15 @@ def do_authentication(transport):
if cipher_text is None: if cipher_text is None:
raise AssertionError('Could not find a valid key for fingerprints: {}' raise AssertionError('Could not find a valid key for fingerprints: {}'
.format(', '.join([str(f, encoding='utf-8') for f in fingerprints]))) .format(', '.join([get_fingerprint_text(f) for f in fingerprints])))
with BinaryWriter() as req_dh_params_writer: with BinaryWriter() as req_dh_params_writer:
req_dh_params_writer.write_int(0xd712e4be) # Req DH Params req_dh_params_writer.write_int(0xd712e4be, signed=False) # Req DH Params
req_dh_params_writer.write(nonce) req_dh_params_writer.write(nonce)
req_dh_params_writer.write(server_nonce) req_dh_params_writer.write(server_nonce)
req_dh_params_writer.tgwrite_bytes(utils.get_byte_array(min(p, q), signed=False)) req_dh_params_writer.tgwrite_bytes(utils.get_byte_array(min(p, q), signed=False))
req_dh_params_writer.tgwrite_bytes(utils.get_byte_array(max(p, q), signed=False)) req_dh_params_writer.tgwrite_bytes(utils.get_byte_array(max(p, q), signed=False))
req_dh_params_writer.Write(target_fingerprint) req_dh_params_writer.write(target_fingerprint)
req_dh_params_writer.tgwrite_bytes(cipher_text) req_dh_params_writer.tgwrite_bytes(cipher_text)
req_dh_params_bytes = req_dh_params_writer.get_bytes() req_dh_params_bytes = req_dh_params_writer.get_bytes()
@ -89,6 +88,7 @@ def do_authentication(transport):
# Step 2 response: DH Exchange # Step 2 response: DH Exchange
encrypted_answer = None encrypted_answer = None
# TODO, there is no data to read? What's going on?
with BinaryReader(sender.receive()) as reader: with BinaryReader(sender.receive()) as reader:
response_code = reader.read_int(signed=False) response_code = reader.read_int(signed=False)
@ -129,19 +129,19 @@ def do_authentication(transport):
raise AssertionError('Invalid server nonce in encrypted answer') raise AssertionError('Invalid server nonce in encrypted answer')
g = dh_inner_data_reader.read_int() g = dh_inner_data_reader.read_int()
dh_prime = int.from_bytes(dh_inner_data_reader.tgread_bytes(), byteorder='big', signed=True) dh_prime = int.from_bytes(dh_inner_data_reader.tgread_bytes(), byteorder='little', signed=True)
ga = int.from_bytes(dh_inner_data_reader.tgread_bytes(), byteorder='big', signed=True) ga = int.from_bytes(dh_inner_data_reader.tgread_bytes(), byteorder='little', signed=True)
server_time = dh_inner_data_reader.read_int() server_time = dh_inner_data_reader.read_int()
time_offset = server_time - int(time.time() * 1000) # Multiply by 1000 to get milliseconds time_offset = server_time - int(time.time() * 1000) # Multiply by 1000 to get milliseconds
b = int.from_bytes(utils.generate_random_bytes(2048), byteorder='big') b = int.from_bytes(utils.generate_random_bytes(2048), byteorder='little')
gb = pow(g, b, dh_prime) gb = pow(g, b, dh_prime)
gab = pow(ga, b, dh_prime) gab = pow(ga, b, dh_prime)
# Prepare client DH Inner Data # Prepare client DH Inner Data
with BinaryWriter() as client_dh_inner_data_writer: with BinaryWriter() as client_dh_inner_data_writer:
client_dh_inner_data_writer.write_int(0x6643b654) # Client DH Inner Data client_dh_inner_data_writer.write_int(0x6643b654, signed=False) # Client DH Inner Data
client_dh_inner_data_writer.write(nonce) client_dh_inner_data_writer.write(nonce)
client_dh_inner_data_writer.write(server_nonce) client_dh_inner_data_writer.write(server_nonce)
client_dh_inner_data_writer.write_long(0) # TODO retry_id client_dh_inner_data_writer.write_long(0) # TODO retry_id
@ -157,7 +157,7 @@ def do_authentication(transport):
# Prepare Set client DH params # Prepare Set client DH params
with BinaryWriter() as set_client_dh_params_writer: with BinaryWriter() as set_client_dh_params_writer:
set_client_dh_params_writer.write_int(0xf5045f1f) set_client_dh_params_writer.write_int(0xf5045f1f, signed=False)
set_client_dh_params_writer.write(nonce) set_client_dh_params_writer.write(nonce)
set_client_dh_params_writer.write(server_nonce) set_client_dh_params_writer.write(server_nonce)
set_client_dh_params_writer.tgwrite_bytes(client_dh_inner_data_encrypted_bytes) set_client_dh_params_writer.tgwrite_bytes(client_dh_inner_data_encrypted_bytes)
@ -194,3 +194,8 @@ def do_authentication(transport):
else: else:
raise AssertionError('DH Gen unknown: {}'.format(hex(code))) raise AssertionError('DH Gen unknown: {}'.format(hex(code)))
def get_fingerprint_text(fingerprint):
"""Gets a fingerprint text in 01-23-45-67-89-AB-CD-EF format (no hyphens)"""
return ''.join(hex(b)[2:].rjust(2, '0').upper() for b in fingerprint)

2
network/mtproto_plain_sender.py Normal file → Executable file
View File

@ -17,7 +17,7 @@ class MtProtoPlainSender:
"""Sends a plain packet (auth_key_id = 0) containing the given message body (data)""" """Sends a plain packet (auth_key_id = 0) containing the given message body (data)"""
with BinaryWriter() as writer: with BinaryWriter() as writer:
writer.write_long(0) writer.write_long(0)
writer.write_int(self.get_new_msg_id()) writer.write_long(self.get_new_msg_id())
writer.write_int(len(data)) writer.write_int(len(data))
writer.write(data) writer.write(data)

0
network/mtproto_sender.py Normal file → Executable file
View File

2
network/tcp_client.py Normal file → Executable file
View File

@ -10,10 +10,12 @@ class TcpClient:
def connect(self, ip, port): def connect(self, ip, port):
"""Connects to the specified IP and port number""" """Connects to the specified IP and port number"""
self.socket.connect((ip, port)) self.socket.connect((ip, port))
self.connected = True
def close(self): def close(self):
"""Closes the connection""" """Closes the connection"""
self.socket.close() self.socket.close()
self.connected = False
def write(self, data): def write(self, data):
"""Writes (sends) the specified bytes to the connected peer""" """Writes (sends) the specified bytes to the connected peer"""

8
network/tcp_message.py Normal file → Executable file
View File

@ -34,7 +34,8 @@ class TcpMessage:
writer.write(self.body) writer.write(self.body)
writer.flush() # Flush so we can get the buffer in the CRC writer.flush() # Flush so we can get the buffer in the CRC
crc = crc32(writer.get_bytes()[0:8 + len(self.body)]) # Ensure it's unsigned (see http://stackoverflow.com/a/30092291/4759433)
crc = crc32(writer.get_bytes()[0:8 + len(self.body)]) & 0xFFFFFFFF
writer.write_int(crc, signed=False) writer.write_int(crc, signed=False)
return writer.get_bytes() return writer.get_bytes()
@ -49,7 +50,7 @@ class TcpMessage:
raise ValueError('Wrong size of input packet') raise ValueError('Wrong size of input packet')
with BinaryReader(body) as reader: with BinaryReader(body) as reader:
packet_len = int.from_bytes(reader.read(4), byteorder='big') packet_len = int.from_bytes(reader.read(4), byteorder='little')
if packet_len < 12: if packet_len < 12:
raise ValueError('Invalid packet length: {}'.format(packet_len)) raise ValueError('Invalid packet length: {}'.format(packet_len))
@ -57,7 +58,8 @@ class TcpMessage:
packet = reader.read(packet_len - 12) packet = reader.read(packet_len - 12)
checksum = reader.read_int() checksum = reader.read_int()
valid_checksum = crc32(body[:packet_len - 4]) # Ensure it's unsigned (see http://stackoverflow.com/a/30092291/4759433)
valid_checksum = crc32(body[:packet_len - 4]) & 0xFFFFFFFF
if checksum != valid_checksum: if checksum != valid_checksum:
raise ValueError('Invalid checksum, skip') raise ValueError('Invalid checksum, skip')

9
network/tcp_transport.py Normal file → Executable file
View File

@ -29,18 +29,19 @@ class TcpTransport:
# First read everything # First read everything
packet_length_bytes = self._tcp_client.read(4) packet_length_bytes = self._tcp_client.read(4)
packet_length = int.from_bytes(packet_length_bytes, byteorder='big') packet_length = int.from_bytes(packet_length_bytes, byteorder='little')
seq_bytes = self._tcp_client.read(4) seq_bytes = self._tcp_client.read(4)
seq = int.from_bytes(seq_bytes, byteorder='big') seq = int.from_bytes(seq_bytes, byteorder='little')
body = self._tcp_client.read(packet_length - 12) body = self._tcp_client.read(packet_length - 12)
checksum = int.from_bytes(self._tcp_client.read(4), byteorder='big') checksum = int.from_bytes(self._tcp_client.read(4), byteorder='little')
# Then perform the checks # Then perform the checks
rv = packet_length_bytes + seq_bytes + body rv = packet_length_bytes + seq_bytes + body
valid_checksum = crc32(rv) & 0xFFFFFFFF # Ensure it's unsigned (http://stackoverflow.com/a/30092291/4759433) # Ensure it's unsigned (http://stackoverflow.com/a/30092291/4759433)
valid_checksum = crc32(rv) & 0xFFFFFFFF
if checksum != valid_checksum: if checksum != valid_checksum:
raise ValueError('Invalid checksum, skip') raise ValueError('Invalid checksum, skip')

0
parser/__init__.py Normal file → Executable file
View File

0
parser/source_builder.py Normal file → Executable file
View File

4
parser/tl_generator.py Normal file → Executable file
View File

@ -113,7 +113,7 @@ def generate_tlobjects(scheme_file):
# Write the on_send(self, writer) function # Write the on_send(self, writer) function
builder.writeln('def on_send(self, writer):') builder.writeln('def on_send(self, writer):')
builder.writeln("writer.write_int({}) # {}'s constructor ID" builder.writeln("writer.write_int({}, signed=False) # {}'s constructor ID"
.format(hex(tlobject.id), tlobject.name)) .format(hex(tlobject.id), tlobject.name))
for arg in tlobject.args: for arg in tlobject.args:
@ -214,7 +214,7 @@ def write_onsend_code(builder, arg, args, name=None):
builder.writeln('if {} is not None:'.format(name)) builder.writeln('if {} is not None:'.format(name))
if arg.is_vector: if arg.is_vector:
builder.writeln("writer.write_int(0x1cb5c415) # Vector's constructor ID") builder.writeln("writer.write_int(0x1cb5c415, signed=False) # Vector's constructor ID")
builder.writeln('writer.write_int(len({}))'.format(name)) builder.writeln('writer.write_int(len({}))'.format(name))
builder.writeln('for {}_item in {}:'.format(arg.name, name)) builder.writeln('for {}_item in {}:'.format(arg.name, name))
# Temporary disable .is_vector, not to enter this if again # Temporary disable .is_vector, not to enter this if again

0
parser/tl_parser.py Normal file → Executable file
View File

0
scheme.tl Normal file → Executable file
View File

0
tl/__init__.py Normal file → Executable file
View File

0
tl/mtproto_request.py Normal file → Executable file
View File

0
tl/session.py Normal file → Executable file
View File

0
tl/tlobject.py Normal file → Executable file
View File

85
unit_test.py Normal file → Executable file
View File

@ -2,13 +2,13 @@ import unittest
import socket import socket
import threading import threading
import random import random
from time import sleep
import utils.helpers as utils import utils.helpers as utils
from network.tcp_client import TcpClient from network.tcp_client import TcpClient
from utils.binary_reader import BinaryReader from utils.binary_reader import BinaryReader
from utils.binary_writer import BinaryWriter from utils.binary_writer import BinaryWriter
from utils.factorizator import Factorizator
from utils.rsa import RSA
host = 'localhost' host = 'localhost'
@ -29,6 +29,14 @@ def run_server_echo_thread():
server.start() server.start()
def get_representation(bytez):
return '-'.join(hex(b)[2:].rjust(2, '0').upper() for b in bytez)
def get_bytes(representation):
return bytes([int(b, 16) for b in representation.split('-')])
class UnitTest(unittest.TestCase): class UnitTest(unittest.TestCase):
@staticmethod @staticmethod
def test_tcp_client(): def test_tcp_client():
@ -57,6 +65,7 @@ class UnitTest(unittest.TestCase):
@staticmethod @staticmethod
def test_binary_writer_reader(): def test_binary_writer_reader():
# Test that we can write and read properly
with BinaryWriter() as writer: with BinaryWriter() as writer:
writer.write_byte(1) writer.write_byte(1)
writer.write_int(5) writer.write_int(5)
@ -93,6 +102,15 @@ class UnitTest(unittest.TestCase):
value = reader.read_large_int(128, signed=False) value = reader.read_large_int(128, signed=False)
assert value == 2**127, 'Example large integer should be {} but is {}'.format(2**127, value) assert value == 2**127, 'Example large integer should be {} but is {}'.format(2**127, value)
# Test Telegram that types are written right
with BinaryWriter() as writer:
writer.write_int(0x60469778)
buffer = writer.get_bytes()
valid = b'\x78\x97\x46\x60' # Tested written bytes using TLSharp and C#'s MemoryStream
assert buffer == valid, "Written type should be {} but is {}".format(list(valid), list(buffer))
@staticmethod @staticmethod
def test_binary_tgwriter_tgreader(): def test_binary_tgwriter_tgreader():
string = 'Testing Telegram strings, this should work properly!' string = 'Testing Telegram strings, this should work properly!'
@ -117,6 +135,69 @@ class UnitTest(unittest.TestCase):
value = reader.tgread_bytes() value = reader.tgread_bytes()
assert value == large_data, 'Example bytes should be {} but is {}'.format(large_data, value) assert value == large_data, 'Example bytes should be {} but is {}'.format(large_data, value)
@staticmethod
def test_factorizator():
pq = 3118979781119966969
p, q = Factorizator.factorize(pq)
assert p == 1719614201, 'Factorized pair did not yield the correct result'
assert q == 1813767169, 'Factorized pair did not yield the correct result'
@staticmethod
def test_to_byte_array():
for value, real in zip(
[3118979781119966969, # PQ
1667024975687354561, # PQ
1148985737, 1450866553], # Min, Max
['2B-48-D7-95-FB-47-FE-F9', # PQ
'17-22-76-62-13-8C-88-C1', # PQ
'44-7C-21-89', '56-7A-77-79'] # Min, Max
):
current = get_representation(utils.get_byte_array(value, signed=True))
assert real == current, 'Invalid byte array representation (expected {}, got {})'.format(current, real)
@staticmethod
def test_sha1():
string = 'Example string'
data = get_representation(string.encode('utf-8'))
real = '45-78-61-6D-70-6C-65-20-73-74-72-69-6E-67'
assert data == real, 'Invalid string representation (should be {}, but is {})'.format(real, data)
hashsum = get_representation(utils.sha1(get_bytes(data)))
real = '0A-54-92-7C-8D-06-3A-29-99-04-8E-F8-6A-3F-C4-8E-D3-7D-6D-39'
assert hashsum == real, 'Invalid sha1 hashsum representation (should be {}, but is {})'.format(real, data)
@staticmethod
def test_rsa():
fingerprint = '216BE86C022BB4C3'
data = get_bytes('EC-5A-C9-83-08-29-86-64-72-35-B8-4B-7D-00-00-00-04-59-6B-F5-41-00-00-00-04-76-E1-1B-3D-00-00-00-CE-2A-EA-DE-D2-17-35-B8-E6-AB-3B-3A-00-0A-79-46-C6-09-3A-99-E9-C1-5B-B5-20-30-27-B7-D5-4F-2F-A3-1C-AF-F4-23-54-B2-5E-BD-00-AB-71-0A-3E-67-94-21-E3-B3-72-71-C0-29-50-00-19-8C-CD-6A-52-D4-CE-9E')
hashsum = utils.sha1(data)
real = get_bytes('6C-86-F7-6D-A2-F5-C2-A5-D0-4D-D5-45-8A-85-AE-62-8B-F7-84-A0')
assert hashsum == real, 'Invalid sha1 hashsum representation (should be {}, but is {})'\
.format(get_representation(real), get_representation(data))
with BinaryWriter() as writer:
writer.write(hashsum)
writer.write(data)
real = get_bytes('6C-86-F7-6D-A2-F5-C2-A5-D0-4D-D5-45-8A-85-AE-62-8B-F7-84-A0-EC-5A-C9-83-08-29-86-64-72-35-B8-4B-7D-00-00-00-04-59-6B-F5-41-00-00-00-04-76-E1-1B-3D-00-00-00-CE-2A-EA-DE-D2-17-35-B8-E6-AB-3B-3A-00-0A-79-46-C6-09-3A-99-E9-C1-5B-B5-20-30-27-B7-D5-4F-2F-A3-1C-AF-F4-23-54-B2-5E-BD-00-AB-71-0A-3E-67-94-21-E3-B3-72-71-C0-29-50-00-19-8C-CD-6A-52-D4-CE-9E')
assert writer.get_bytes() == real, 'Invalid written value'
# Since the random padding is random by nature, use the sample data we know the result for
data = get_bytes(
'6C-86-F7-6D-A2-F5-C2-A5-D0-4D-D5-45-8A-85-AE-62-8B-F7-84-A0-EC-5A-C9-83-08-29-86-64-72-35-B8-4B-7D-00-00-00-04-59-6B-F5-41-00-00-00-04-76-E1-1B-3D-00-00-00-CE-2A-EA-DE-D2-17-35-B8-E6-AB-3B-3A-00-0A-79-46-C6-09-3A-99-E9-C1-5B-B5-20-30-27-B7-D5-4F-2F-A3-1C-AF-F4-23-54-B2-5E-BD-00-AB-71-0A-3E-67-94-21-E3-B3-72-71-C0-29-50-00-19-8C-CD-6A-52-D4-CE-9E-10-F4-6E-C6-1F-CB-DC-8C-2A-7A-91-92-71-22-D6-08-AD-B4-6D-5F-D3-59-0F-F4-71-1A-57-FF-17-9E-AE-CD-D8-90-4B-DB-1A-1F-06-C1-22-8D-20-67-F8-F0-F2-D1-26-DF-E9-78-72-A7-DF-B6-E5-7A-55-04-73-DD-74-8B-CB-C3-B9-E1-D7-FA-EE-8E-AD-AB-3D-46-39-A8-FB-80-28-85-D8-38-B5-35-5B-30-B0-94-F6-A0-CA-02-4E-45-18-94-9B-35-36-11-FA-2C-F0-5B-CA-C6-6A-98-7D-3C-7E-D4-DB-ED-05-3C-D6-95-68-88-30-43-04-4E-C3-AB-5D-F7-2D-A5-0C-C6-49-17-8B-AC-48')
e = 65537
m = 24403446649145068056824081744112065346446136066297307473868293895086332508101251964919587745984311372853053253457835208829824428441874946556659953519213382748319518214765985662663680818277989736779506318868003755216402538945900388706898101286548187286716959100102939636333452457308619454821845196109544157601096359148241435922125602449263164512290854366930013825808102403072317738266383237191313714482187326643144603633877219028262697593882410403273959074350849923041765639673335775605842311578109726403165298875058941765362622936097839775380070572921007586266115476975819175319995527916042178582540628652481530373407
cipher_text = utils.get_byte_array(pow(int.from_bytes(data, byteorder='big'), e, m), signed=False)
real = get_bytes('13-8A-DC-F1-10-FF-59-29-2D-ED-4A-16-AA-D9-FA-15-A5-9A-A2-A6-33-D0-23-77-6F-E7-42-30-52-9E-4E-A9-CA-8F-CD-11-71-AB-C8-E2-97-2C-B9-A1-68-FA-4D-02-A9-56-30-84-5B-F6-5F-5D-1E-95-53-A4-A9-8F-1F-66-82-0C-20-8F-6D-EB-6F-B0-F5-D2-6C-45-89-14-1F-69-85-C8-6F-C7-41-A5-76-5F-F5-BA-9B-18-32-F7-02-C8-29-A7-70-BE-8E-FD-9E-86-48-6D-00-1E-AF-77-3F-7C-A4-1E-CD-03-21-18-4A-4D-57-FB-D9-6F-B0-4A-AD-24-A4-6F-01-07-CB-56-AC-37-22-9F-50-1F-EA-B9-17-51-EB-4B-A9-30-14-5A-A8-A9-5F-9D-9D-A5-AE-46-86-0D-0B-07-2D-84-C6-3B-DD-AD-4B-EA-89-07-CF-6B-DD-D4-68-38-F9-A9-62-A7-A3-3A-CB-79-F3-42-1B-28-E4-25-90-9B-B2-ED-EE-BC-65-8B-10-21-38-27-8B-66-98-51-A2-30-4B-F0-EA-BD-5D-E1-7D-D0-55-6E-A5-D1-FB-12-01-C2-44-D7-1F-B5-28-37-3B-08-8D-3B-79-59-D6-15-76-A4-4B-E6-3C-B3-16-58-88-9F-F9-77-21-C1-99-4E')
assert cipher_text == real, 'Invalid ciphered text (should be {}, but is {})'\
.format(get_representation(real), get_representation(cipher_text))
if __name__ == '__main__': if __name__ == '__main__':

0
utils/__init__.py Normal file → Executable file
View File

0
utils/auth_key.py Normal file → Executable file
View File

20
utils/binary_reader.py Normal file → Executable file
View File

@ -21,34 +21,38 @@ class BinaryReader:
# region Reading # region Reading
# "All numbers are written as little endian." |> Source: https://core.telegram.org/mtproto
def read_byte(self): def read_byte(self):
"""Reads a single byte value""" """Reads a single byte value"""
return self.reader.read(1)[0] return self.read(1)[0]
def read_int(self, signed=True): def read_int(self, signed=True):
"""Reads an integer (4 bytes) value""" """Reads an integer (4 bytes) value"""
return int.from_bytes(self.reader.read(4), byteorder='big', signed=signed) return int.from_bytes(self.read(4), byteorder='little', signed=signed)
def read_long(self, signed=True): def read_long(self, signed=True):
"""Reads a long integer (8 bytes) value""" """Reads a long integer (8 bytes) value"""
return int.from_bytes(self.reader.read(8), byteorder='big', signed=signed) return int.from_bytes(self.read(8), byteorder='little', signed=signed)
# Network is always big-endian, this is, '>'
def read_float(self): def read_float(self):
"""Reads a real floating point (4 bytes) value""" """Reads a real floating point (4 bytes) value"""
return unpack('>f', self.reader.read(4))[0] return unpack('<f', self.read(4))[0]
def read_double(self): def read_double(self):
"""Reads a real floating point (8 bytes) value""" """Reads a real floating point (8 bytes) value"""
return unpack('>d', self.reader.read(8))[0] return unpack('<d', self.read(8))[0]
def read_large_int(self, bits, signed=True): def read_large_int(self, bits, signed=True):
"""Reads a n-bits long integer value""" """Reads a n-bits long integer value"""
return int.from_bytes(self.reader.read(bits // 8), byteorder='big', signed=signed) return int.from_bytes(self.read(bits // 8), byteorder='little', signed=signed)
def read(self, length): def read(self, length):
"""Read the given amount of bytes""" """Read the given amount of bytes"""
return self.reader.read(length) result = self.reader.read(length)
if len(result) != length:
raise BufferError('Trying to read outside the data bounds (no more data left to read)')
return result
def get_bytes(self): def get_bytes(self):
"""Gets the byte array representing the current buffer as a whole""" """Gets the byte array representing the current buffer as a whole"""

16
utils/binary_writer.py Normal file → Executable file
View File

@ -17,34 +17,30 @@ class BinaryWriter:
# region Writing # region Writing
# "All numbers are written as little endian." |> Source: https://core.telegram.org/mtproto
def write_byte(self, value): def write_byte(self, value):
"""Writes a single byte value""" """Writes a single byte value"""
self.writer.write(pack('B', value)) self.writer.write(pack('B', value))
def write_int(self, value, signed=True): def write_int(self, value, signed=True):
"""Writes an integer value (4 bytes), which can or cannot be signed""" """Writes an integer value (4 bytes), which can or cannot be signed"""
if not signed: self.writer.write(int.to_bytes(value, length=4, byteorder='little', signed=signed))
value &= 0xFFFFFFFF # Ensure it's unsigned (see http://stackoverflow.com/a/30092291/4759433)
self.writer.write(int.to_bytes(value, length=4, byteorder='big', signed=signed))
def write_long(self, value, signed=True): def write_long(self, value, signed=True):
"""Writes a long integer value (8 bytes), which can or cannot be signed""" """Writes a long integer value (8 bytes), which can or cannot be signed"""
if not signed: self.writer.write(int.to_bytes(value, length=8, byteorder='little', signed=signed))
value &= 0xFFFFFFFFFFFFFFFF
self.writer.write(int.to_bytes(value, length=8, byteorder='big', signed=signed))
# Network is always big-endian, this is, '>' when packing
def write_float(self, value): def write_float(self, value):
"""Writes a floating point value (4 bytes)""" """Writes a floating point value (4 bytes)"""
self.writer.write(pack('>f', value)) self.writer.write(pack('<f', value))
def write_double(self, value): def write_double(self, value):
"""Writes a floating point value (8 bytes)""" """Writes a floating point value (8 bytes)"""
self.writer.write(pack('>d', value)) self.writer.write(pack('<d', value))
def write_large_int(self, value, bits, signed=True): def write_large_int(self, value, bits, signed=True):
"""Writes a n-bits long integer value""" """Writes a n-bits long integer value"""
self.writer.write(int.to_bytes(value, length=bits // 8, byteorder='big', signed=signed)) self.writer.write(int.to_bytes(value, length=bits // 8, byteorder='little', signed=signed))
def write(self, data): def write(self, data):
"""Writes the given bytes array""" """Writes the given bytes array"""

21
utils/factorizator.py Normal file → Executable file
View File

@ -1,7 +1,6 @@
# This file is based on TLSharp # This file is based on TLSharp
# https://github.com/sochix/TLSharp/blob/master/TLSharp.Core/MTProto/Crypto/Factorizator.cs # https://github.com/sochix/TLSharp/blob/master/TLSharp.Core/MTProto/Crypto/Factorizator.cs
from random import randint from random import randint
from math import gcd
class Factorizator: class Factorizator:
@ -10,7 +9,7 @@ class Factorizator:
g = 0 g = 0
for i in range(3): for i in range(3):
q = (randint(0, 127) & 15) + 17 q = (randint(0, 127) & 15) + 17
x = randint(1000000000) + 1 x = randint(0, 1000000000) + 1
y = x y = x
lim = 1 << (i + 18) lim = 1 << (i + 18)
for j in range(1, lim): for j in range(1, lim):
@ -27,7 +26,7 @@ class Factorizator:
x = c x = c
z = y - x if x < y else x - y z = y - x if x < y else x - y
g = gcd(z, what) g = Factorizator.gcd(z, what)
if g != 1: if g != 1:
break break
@ -40,6 +39,22 @@ class Factorizator:
p = what // g p = what // g
return min(p, g) return min(p, g)
@staticmethod
def gcd(a, b):
while a != 0 and b != 0:
while b & 1 == 0:
b >>= 1
while a & 1 == 0:
a >>= 1
if a > b:
a -= b
else:
b -= a
return a if b == 0 else b
@staticmethod @staticmethod
def factorize(pq): def factorize(pq):
divisor = Factorizator.find_small_multiplier_lopatin(pq) divisor = Factorizator.find_small_multiplier_lopatin(pq)

5
utils/helpers.py Normal file → Executable file
View File

@ -8,7 +8,7 @@ bits_per_byte = 8
def generate_random_long(signed=True): def generate_random_long(signed=True):
"""Generates a random long integer (8 bytes), which is optionally signed""" """Generates a random long integer (8 bytes), which is optionally signed"""
return int.from_bytes(os.urandom(8), signed=signed, byteorder='big') return int.from_bytes(os.urandom(8), signed=signed, byteorder='little')
def generate_random_bytes(count): def generate_random_bytes(count):
@ -19,6 +19,7 @@ def generate_random_bytes(count):
def get_byte_array(integer, signed): def get_byte_array(integer, signed):
bits = integer.bit_length() bits = integer.bit_length()
byte_length = (bits + bits_per_byte - 1) // bits_per_byte byte_length = (bits + bits_per_byte - 1) // bits_per_byte
# For some strange reason, this has to be big!
return int.to_bytes(integer, length=byte_length, byteorder='big', signed=signed) return int.to_bytes(integer, length=byte_length, byteorder='big', signed=signed)
@ -65,6 +66,6 @@ def generate_key_data_from_nonces(server_nonce, new_nonce):
def sha1(data): def sha1(data):
sha = hashlib.sha1 sha = hashlib.sha1()
sha.update(data) sha.update(data)
return sha.digest() return sha.digest()

62
utils/rsa.py Executable file
View File

@ -0,0 +1,62 @@
# This file is based on TLSharp
# https://github.com/sochix/TLSharp/blob/master/TLSharp.Core/Auth/Authenticator.cs
from utils.binary_writer import BinaryWriter
import utils.helpers as utils
class RSAServerKey:
def __init__(self, fingerprint, m, e):
self.fingerprint = fingerprint
self.m = m
self.e = e
def encrypt(self, data, offset=None, length=None):
if offset is None:
offset = 0
if length is None:
length = len(data)
with BinaryWriter() as writer:
# Write SHA
writer.write(utils.sha1(data[offset:offset+length]))
# Write data
writer.write(data[offset:offset+length])
# Add padding if required
if length < 235:
writer.write(utils.generate_random_bytes(235 - length))
cipher_text = utils.get_byte_array(
pow(int.from_bytes(writer.get_bytes(), byteorder='big'), self.e, self.m),
signed=False)
if len(cipher_text) == 256:
return cipher_text
else:
padding = bytes(256 - len(cipher_text))
return padding + cipher_text
class RSA:
_server_keys = {
'216be86c022bb4c3':
RSAServerKey('216be86c022bb4c3', int('C150023E2F70DB7985DED064759CFECF0AF328E69A41DAF4D6F01B538135A6F9'
'1F8F8B2A0EC9BA9720CE352EFCF6C5680FFC424BD634864902DE0B4BD6D49F4E'
'580230E3AE97D95C8B19442B3C0A10D8F5633FECEDD6926A7F6DAB0DDB7D457F'
'9EA81B8465FCD6FFFEED114011DF91C059CAEDAF97625F6C96ECC74725556934'
'EF781D866B34F011FCE4D835A090196E9A5F0E4449AF7EB697DDB9076494CA5F'
'81104A305B6DD27665722C46B60E5DF680FB16B210607EF217652E60236C255F'
'6A28315F4083A96791D7214BF64C1DF4FD0DB1944FB26A2A57031B32EEE64AD1'
'5A8BA68885CDE74A5BFC920F6ABF59BA5C75506373E7130F9042DA922179251F',
16), int('010001', 16))
}
@staticmethod
def encrypt(fingerprint, data, offset=None, length=None):
if fingerprint.lower() not in RSA._server_keys:
return None
key = RSA._server_keys[fingerprint.lower()]
return key.encrypt(data, offset, length)