mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-01-24 00:04:14 +03:00
First attempt at TelegramClient. Added fixes and doc
This commit is contained in:
parent
c863537b7b
commit
39a23559f0
4
.gitignore
vendored
4
.gitignore
vendored
|
@ -3,6 +3,10 @@ tl/functions/
|
|||
tl/types/
|
||||
tl/all_tlobjects.py
|
||||
|
||||
# User session
|
||||
*.session
|
||||
api/settings
|
||||
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
|
|
11
README.md
11
README.md
|
@ -11,9 +11,14 @@ This project requires the following Python modules, which can be installed by is
|
|||
Linux terminal:
|
||||
- `pyaes` ([GitHub](https://github.com/ricmoo/pyaes), [package index](https://pypi.python.org/pypi/pyaes))
|
||||
|
||||
### We need your help!
|
||||
As of now, the project is fully **untested** and with many pending things to do. If you know both Python and C#, please don't
|
||||
think it twice and help us (me)!
|
||||
Also, you need to obtain your both [API ID and Hash](my.telegram.org). Once you have them, head to `api/` and create a copy of
|
||||
the `settings_example` file, naming it `settings` (lowercase!). Then fill the file with the corresponding values (your `api_id`,
|
||||
`api_hash` and phone number in international format). Now it is when you're ready to go!
|
||||
|
||||
### Plans for the future
|
||||
If everything works well, this probably ends up being a Python package :)
|
||||
|
||||
But as of now, and until that happens, help is highly appreciated!
|
||||
|
||||
### Code generator limitations
|
||||
The current code generator is not complete, yet adding the missing features would only over-complicate an already hard-to-read code.
|
||||
|
|
0
api/__init__.py
Normal file
0
api/__init__.py
Normal file
4
api/settings_example
Normal file
4
api/settings_example
Normal file
|
@ -0,0 +1,4 @@
|
|||
api_id=12345
|
||||
api_hash=0123456789abcdef0123456789abcdef
|
||||
user_phone=34600000000
|
||||
session_name=anonymous
|
0
crypto/__init__.py
Normal file
0
crypto/__init__.py
Normal file
|
@ -1,9 +1,12 @@
|
|||
# This file is based on TLSharp
|
||||
# https://github.com/sochix/TLSharp/blob/master/TLSharp.Core/MTProto/Crypto/AES.cs
|
||||
import pyaes
|
||||
|
||||
|
||||
class AES:
|
||||
@staticmethod
|
||||
def decrypt_ige(cipher_text, key, iv):
|
||||
"""Decrypts the given text in 16-bytes blocks by using the given key and 32-bytes initialization vector"""
|
||||
iv1 = iv[:len(iv)//2]
|
||||
iv2 = iv[len(iv)//2:]
|
||||
|
||||
|
@ -31,6 +34,7 @@ class AES:
|
|||
|
||||
@staticmethod
|
||||
def encrypt_ige(plain_text, key, iv):
|
||||
"""Encrypts the given text in 16-bytes blocks by using the given key and 32-bytes initialization vector"""
|
||||
# TODO: Random padding
|
||||
padding = bytes(16 - len(plain_text) % 16)
|
||||
plain_text += padding
|
|
@ -20,6 +20,7 @@ class AuthKey:
|
|||
self.key_id = reader.read_long(signed=False)
|
||||
|
||||
def calc_new_nonce_hash(self, new_nonce, number):
|
||||
"""Calculates the new nonce hash based on the current class fields' values"""
|
||||
with BinaryWriter() as writer:
|
||||
writer.write(new_nonce)
|
||||
writer.write_byte(number)
|
|
@ -6,6 +6,7 @@ from random import randint
|
|||
class Factorizator:
|
||||
@staticmethod
|
||||
def find_small_multiplier_lopatin(what):
|
||||
"""Finds the small multiplier by using Lopatin's method"""
|
||||
g = 0
|
||||
for i in range(3):
|
||||
q = (randint(0, 127) & 15) + 17
|
||||
|
@ -41,6 +42,7 @@ class Factorizator:
|
|||
|
||||
@staticmethod
|
||||
def gcd(a, b):
|
||||
"""Calculates the greatest common divisor"""
|
||||
while a != 0 and b != 0:
|
||||
while b & 1 == 0:
|
||||
b >>= 1
|
||||
|
@ -57,5 +59,6 @@ class Factorizator:
|
|||
|
||||
@staticmethod
|
||||
def factorize(pq):
|
||||
"""Factorizes the given number and returns both the divisor and the number divided by the divisor"""
|
||||
divisor = Factorizator.find_small_multiplier_lopatin(pq)
|
||||
return divisor, pq // divisor
|
|
@ -1,5 +1,5 @@
|
|||
# This file is based on TLSharp
|
||||
# https://github.com/sochix/TLSharp/blob/master/TLSharp.Core/Auth/Authenticator.cs
|
||||
# https://github.com/sochix/TLSharp/blob/master/TLSharp.Core/MTProto/Crypto/RSA.cs
|
||||
from utils.binary_writer import BinaryWriter
|
||||
import utils.helpers as utils
|
||||
|
||||
|
@ -11,6 +11,7 @@ class RSAServerKey:
|
|||
self.e = e
|
||||
|
||||
def encrypt(self, data, offset=None, length=None):
|
||||
"""Encrypts the given data with the current key"""
|
||||
if offset is None:
|
||||
offset = 0
|
||||
if length is None:
|
||||
|
@ -37,8 +38,6 @@ class RSAServerKey:
|
|||
return padding + cipher_text
|
||||
|
||||
|
||||
|
||||
|
||||
class RSA:
|
||||
_server_keys = {
|
||||
'216be86c022bb4c3':
|
||||
|
@ -55,6 +54,7 @@ class RSA:
|
|||
|
||||
@staticmethod
|
||||
def encrypt(fingerprint, data, offset=None, length=None):
|
||||
"""Encrypts the given data given a fingerprint"""
|
||||
if fingerprint.lower() not in RSA._server_keys:
|
||||
return None
|
||||
|
30
main.py
30
main.py
|
@ -1,17 +1,21 @@
|
|||
import parser.tl_generator
|
||||
import tl_generator
|
||||
from tl.telegram_client import TelegramClient
|
||||
from utils.helpers import load_settings
|
||||
|
||||
from network.tcp_transport import TcpTransport
|
||||
from network.authenticator import do_authentication
|
||||
|
||||
if __name__ == '__main__':
|
||||
if not parser.tl_generator.tlobjects_exist():
|
||||
print('First run. Generating TLObjects...')
|
||||
parser.tl_generator.generate_tlobjects('scheme.tl')
|
||||
print('Done.')
|
||||
if not tl_generator.tlobjects_exist():
|
||||
print('Please run `python3 tl_generator.py` first!')
|
||||
|
||||
transport = TcpTransport('149.154.167.91', 443)
|
||||
auth_key, time_offset = do_authentication(transport)
|
||||
print(auth_key.aux_hash)
|
||||
print(auth_key.key)
|
||||
print(auth_key.key_id)
|
||||
print(time_offset)
|
||||
else:
|
||||
settings = load_settings()
|
||||
client = TelegramClient(session_user_id=settings.get('session_name', 'anonymous'),
|
||||
layer=54,
|
||||
api_id=settings['api_id'],
|
||||
api_hash=settings['api_hash'])
|
||||
|
||||
client.connect()
|
||||
if not client.is_user_authorized():
|
||||
phone_code_hash = client.send_code_request(settings['user_phone'])
|
||||
code = input('Enter the code you just received: ')
|
||||
client.make_auth(settings['user_phone'], phone_code_hash, code)
|
||||
|
|
|
@ -4,18 +4,21 @@
|
|||
# https://github.com/sochix/TLSharp/blob/master/TLSharp.Core/Auth/Step2_DHExchange.cs
|
||||
# https://github.com/sochix/TLSharp/blob/master/TLSharp.Core/Auth/Step3_CompleteDHExchange.cs
|
||||
|
||||
from network.mtproto_plain_sender import MtProtoPlainSender
|
||||
from utils.binary_writer import BinaryWriter
|
||||
from utils.binary_reader import BinaryReader
|
||||
from utils.factorizator import Factorizator
|
||||
from utils.auth_key import AuthKey
|
||||
import utils.helpers as utils
|
||||
import time
|
||||
from utils.rsa import RSA
|
||||
from utils.aes import AES
|
||||
|
||||
import utils.helpers as utils
|
||||
from crypto.aes import AES
|
||||
from crypto.auth_key import AuthKey
|
||||
from crypto.factorizator import Factorizator
|
||||
from crypto.rsa import RSA
|
||||
from network.mtproto_plain_sender import MtProtoPlainSender
|
||||
from utils.binary_reader import BinaryReader
|
||||
from utils.binary_writer import BinaryWriter
|
||||
|
||||
|
||||
def do_authentication(transport):
|
||||
"""Executes the authentication process with the Telegram servers.
|
||||
If no error is rose, returns both the authorization key and the time offset"""
|
||||
sender = MtProtoPlainSender(transport)
|
||||
|
||||
# Step 1 sending: PQ Request
|
||||
|
@ -201,4 +204,4 @@ def do_authentication(transport):
|
|||
|
||||
def get_fingerprint_text(fingerprint):
|
||||
"""Gets a fingerprint text in 01-23-45-67-89-AB-CD-EF format (no hyphens)"""
|
||||
return ''.join(hex(b)[2:].rjust(2, '0').upper() for b in fingerprint)
|
||||
return ''.join(hex(b)[2:].rjust(2, '0').upper() for b in fingerprint)
|
||||
|
|
|
@ -2,9 +2,9 @@
|
|||
# https://github.com/sochix/TLSharp/blob/master/TLSharp.Core/Network/MtProtoSender.cs
|
||||
import re
|
||||
import zlib
|
||||
import pyaes
|
||||
from time import sleep
|
||||
|
||||
from crypto.aes import AES
|
||||
from utils.binary_writer import BinaryWriter
|
||||
from utils.binary_reader import BinaryReader
|
||||
from tl.types.msgs_ack import MsgsAck
|
||||
|
@ -76,13 +76,12 @@ class MtProtoSender:
|
|||
|
||||
msg_key = helpers.calc_msg_key(writer.get_bytes())
|
||||
|
||||
key, iv = helpers.calc_key(self.session.auth_key.data, msg_key, True)
|
||||
aes = pyaes.AESModeOfOperationCFB(key, iv, 16)
|
||||
cipher_text = aes.encrypt(writer.get_bytes())
|
||||
key, iv = helpers.calc_key(self.session.auth_key.key, msg_key, True)
|
||||
cipher_text = AES.encrypt_ige(writer.get_bytes(), key, iv)
|
||||
|
||||
# And then finally send the packet
|
||||
with BinaryWriter() as writer:
|
||||
writer.write_long(self.session.auth_key.id, signed=False)
|
||||
writer.write_long(self.session.auth_key.key_id, signed=False)
|
||||
writer.write(msg_key)
|
||||
writer.write(cipher_text)
|
||||
|
||||
|
@ -96,15 +95,14 @@ class MtProtoSender:
|
|||
|
||||
with BinaryReader(body) as reader:
|
||||
if len(body) < 8:
|
||||
raise BufferError("Can't decode packet")
|
||||
raise BufferError("Can't decode packet ({})".format(body))
|
||||
|
||||
# TODO Check for both auth key ID and msg_key correctness
|
||||
remote_auth_key_id = reader.read_long()
|
||||
msg_key = reader.read(16)
|
||||
|
||||
key, iv = helpers.calc_key(self.session.auth_key.data, msg_key, False)
|
||||
aes = pyaes.AESModeOfOperationCFB(key, iv, 16)
|
||||
plain_text = aes.decrypt(reader.read(len(body) - reader.tell_position()))
|
||||
plain_text = AES.decrypt_ige(reader.read(len(body) - reader.tell_position()), key, iv)
|
||||
|
||||
with BinaryReader(plain_text) as plain_text_reader:
|
||||
remote_salt = plain_text_reader.read_long()
|
||||
|
@ -278,7 +276,7 @@ class MtProtoSender:
|
|||
|
||||
elif error_msg.startswith('PHONE_MIGRATE_'):
|
||||
dc_index = int(re.search(r'\d+', error_msg).group(0))
|
||||
raise ConnectionError('Your phone number registered to {} dc. Please update settings. '
|
||||
raise ConnectionError('Your phone number is registered to {} DC. Please update settings. '
|
||||
'See https://github.com/sochix/TLSharp#i-get-an-error-migrate_x '
|
||||
'for details.'.format(dc_index))
|
||||
else:
|
||||
|
|
|
@ -17,7 +17,6 @@ class Session:
|
|||
self.salt = 0 # Unsigned long
|
||||
self.time_offset = 0
|
||||
self.last_message_id = 0 # Long
|
||||
self.session_expires = 0
|
||||
self.user = None
|
||||
|
||||
def save(self):
|
||||
|
@ -26,13 +25,13 @@ class Session:
|
|||
pickle.dump(self, file)
|
||||
|
||||
@staticmethod
|
||||
def try_load_or_create_new(self, session_user_id):
|
||||
def try_load_or_create_new(session_user_id):
|
||||
"""Loads a saved session_user_id session, or creates a new one if none existed before"""
|
||||
filepath = '{}.session'.format(self.session_user_id)
|
||||
filepath = '{}.session'.format(session_user_id)
|
||||
|
||||
if file_exists(filepath):
|
||||
with open(filepath, 'rb') as file:
|
||||
return pickle.load(self)
|
||||
return pickle.load(file)
|
||||
else:
|
||||
return Session(session_user_id)
|
||||
|
||||
|
@ -40,7 +39,7 @@ class Session:
|
|||
"""Generates a new message ID based on the current time (in ms) since epoch"""
|
||||
# Refer to mtproto_plain_sender.py for the original method, this is a simple copy
|
||||
new_msg_id = int(self.time_offset + time.time() * 1000)
|
||||
if self._last_msg_id >= new_msg_id:
|
||||
if self.last_message_id >= new_msg_id:
|
||||
new_msg_id = self._last_msg_id + 4
|
||||
|
||||
self._last_msg_id = new_msg_id
|
||||
|
|
133
tl/telegram_client.py
Normal file
133
tl/telegram_client.py
Normal file
|
@ -0,0 +1,133 @@
|
|||
# This file is based on TLSharp
|
||||
# https://github.com/sochix/TLSharp/blob/master/TLSharp.Core/TelegramClient.cs
|
||||
from network.mtproto_sender import MtProtoSender
|
||||
from network.tcp_transport import TcpTransport
|
||||
import network.authenticator as authenticator
|
||||
from tl.session import Session
|
||||
import utils.helpers as utils
|
||||
import platform
|
||||
import re
|
||||
|
||||
from tl.functions.invoke_with_layer import InvokeWithLayer
|
||||
from tl.functions.init_connection import InitConnection
|
||||
from tl.functions.help.get_config import GetConfig
|
||||
from tl.functions.auth.check_phone import CheckPhone
|
||||
from tl.functions.auth.send_code import SendCode
|
||||
from tl.functions.auth.sign_in import SignIn
|
||||
from tl.functions.contacts.get_contacts import GetContacts
|
||||
from tl.types.input_peer_user import InputPeerUser
|
||||
from tl.functions.messages.send_message import SendMessage
|
||||
|
||||
|
||||
class TelegramClient:
|
||||
|
||||
def __init__(self, session_user_id, layer, api_id=None, api_hash=None):
|
||||
if api_id is None or api_hash is None:
|
||||
raise PermissionError('Your API ID or Hash are invalid. Make sure to obtain yours in http://my.telegram.org')
|
||||
|
||||
self.api_id = api_id
|
||||
self.api_hash = api_hash
|
||||
|
||||
self.layer = layer
|
||||
|
||||
self.session = Session.try_load_or_create_new(session_user_id)
|
||||
self.transport = TcpTransport(self.session.server_address, self.session.port)
|
||||
self.dc_options = None
|
||||
|
||||
# TODO Should this be async?
|
||||
def connect(self, reconnect=False):
|
||||
if self.session.auth_key is None or reconnect:
|
||||
self.session.auth_key, self.session.time_offset= authenticator.do_authentication(self.transport)
|
||||
|
||||
self.sender = MtProtoSender(self.transport, self.session)
|
||||
|
||||
if not reconnect:
|
||||
request = InvokeWithLayer(layer=self.layer,
|
||||
query=InitConnection(api_id=self.api_id,
|
||||
device_model=platform.node(),
|
||||
system_version=platform.system(),
|
||||
app_version='0.1',
|
||||
lang_code='en',
|
||||
query=GetConfig()))
|
||||
|
||||
self.sender.send(request)
|
||||
self.sender.receive(request)
|
||||
|
||||
# Result is a Config TLObject
|
||||
self.dc_options = request.result.dc_options
|
||||
|
||||
return True
|
||||
|
||||
def reconnect_to_dc(self, dc_id):
|
||||
if self.dc_options is None or not self.dc_options:
|
||||
raise ConnectionError("Can't reconnect. Stabilise an initial connection first.")
|
||||
|
||||
# dc is a DcOption TLObject
|
||||
dc = next(dc for dc in self.dc_options if dc.id == dc_id)
|
||||
|
||||
self.transport = TcpTransport(dc.ip_address, dc.port)
|
||||
self.session.server_address = dc.ip_address
|
||||
self.session.port = dc.port
|
||||
|
||||
self.connect(reconnect=True)
|
||||
|
||||
def is_user_authorized(self):
|
||||
return self.session.user is not None
|
||||
|
||||
def is_phone_registered(self, phone_number):
|
||||
assert self.sender is not None, 'Not connected!'
|
||||
|
||||
request = CheckPhone(phone_number)
|
||||
self.sender.send(request)
|
||||
self.sender.receive(request)
|
||||
|
||||
# Result is an Auth.CheckedPhone
|
||||
return request.result.phone_registered
|
||||
|
||||
def send_code_request(self, phone_number, destination='code'):
|
||||
if destination == 'code':
|
||||
destination = 5
|
||||
elif destination == 'sms':
|
||||
destination = 0
|
||||
else:
|
||||
raise ValueError('Destination must be either "code" or "sms"')
|
||||
|
||||
request = SendCode(phone_number, self.api_id, self.api_hash)
|
||||
completed = False
|
||||
while not completed:
|
||||
try:
|
||||
self.sender.send(request)
|
||||
self.sender.receive(request)
|
||||
completed = True
|
||||
except ConnectionError as error:
|
||||
if str(error).startswith('Your phone number is registered to'):
|
||||
dc = int(re.search(r'\d+', str(error)).group(0))
|
||||
self.reconnect_to_dc(dc)
|
||||
else:
|
||||
raise error
|
||||
|
||||
return request.result.phone_code_hash
|
||||
|
||||
def make_auth(self, phone_number, phone_code_hash, code):
|
||||
request = SignIn(phone_number, phone_code_hash, code)
|
||||
self.sender.send(request)
|
||||
self.sender.receive(request)
|
||||
|
||||
# Result is an Auth.Authorization TLObject
|
||||
self.session.user = request.result.user
|
||||
self.session.save()
|
||||
|
||||
return self.session.user
|
||||
|
||||
def import_contacts(self, phone_code_hash):
|
||||
request = GetContacts(phone_code_hash)
|
||||
self.sender.send(request)
|
||||
self.sender.receive(request)
|
||||
return request.result.contacts, request.result.users
|
||||
|
||||
def send_message(self, user, message):
|
||||
peer = InputPeerUser(user.id, user.access_hash)
|
||||
request = SendMessage(peer, message, utils.generate_random_long())
|
||||
|
||||
self.sender.send(request)
|
||||
self.sender.send(request)
|
|
@ -263,7 +263,7 @@ def write_onsend_code(builder, arg, args, name=None):
|
|||
|
||||
else:
|
||||
# Else it may be a custom type
|
||||
builder.writeln('{}.write(writer)'.format(name))
|
||||
builder.writeln('{}.on_send(writer)'.format(name))
|
||||
|
||||
# End vector and flag blocks if required (if we opened them before)
|
||||
if arg.is_vector:
|
||||
|
@ -347,3 +347,12 @@ def write_onresponse_code(builder, arg, args, name=None):
|
|||
|
||||
if arg.is_flag:
|
||||
builder.end_block()
|
||||
|
||||
if __name__ == '__main__':
|
||||
if tlobjects_exist():
|
||||
print('Detected previous TLObjects. Cleaning...')
|
||||
clean_tlobjects()
|
||||
|
||||
print('Generating TLObjects...')
|
||||
generate_tlobjects('scheme.tl')
|
||||
print('Done.')
|
19
unit_test.py
19
unit_test.py
|
@ -1,15 +1,16 @@
|
|||
import unittest
|
||||
import random
|
||||
import socket
|
||||
import threading
|
||||
import random
|
||||
import utils.helpers as utils
|
||||
import unittest
|
||||
|
||||
import utils.helpers as utils
|
||||
from crypto.aes import AES
|
||||
from crypto.factorizator import Factorizator
|
||||
from network.authenticator import do_authentication
|
||||
from network.tcp_client import TcpClient
|
||||
from network.tcp_transport import TcpTransport
|
||||
from utils.binary_reader import BinaryReader
|
||||
from utils.binary_writer import BinaryWriter
|
||||
from utils.factorizator import Factorizator
|
||||
from utils.aes import AES
|
||||
|
||||
|
||||
host = 'localhost'
|
||||
port = random.randint(50000, 60000) # Arbitrary non-privileged port
|
||||
|
@ -214,5 +215,11 @@ class UnitTest(unittest.TestCase):
|
|||
assert cipher_text == real, 'Decrypted text does not equal the real value (expected "{}", got "{}")'\
|
||||
.format(get_representation(real), get_representation(cipher_text))
|
||||
|
||||
@staticmethod
|
||||
def test_authenticator():
|
||||
transport = TcpTransport('149.154.167.91', 443)
|
||||
auth_key, time_offset = do_authentication(transport)
|
||||
transport.dispose()
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
from io import BytesIO, BufferedReader
|
||||
from tl.all_tlobjects import tlobjects
|
||||
from struct import unpack
|
||||
import inspect
|
||||
import os
|
||||
|
||||
|
||||
|
@ -85,14 +86,20 @@ class BinaryReader:
|
|||
|
||||
def tgread_object(self):
|
||||
"""Reads a Telegram object"""
|
||||
id = self.read_int()
|
||||
clazz = tlobjects.get(id, None)
|
||||
constructor_id = self.read_int()
|
||||
clazz = tlobjects.get(constructor_id, None)
|
||||
if clazz is None:
|
||||
raise ImportError('Could not find a matching ID for the TLObject that was supposed to be read. '
|
||||
'Found ID: {}'.format(hex(id)))
|
||||
'Found ID: {}'.format(hex(constructor_id)))
|
||||
|
||||
# Instantiate the class and return the result
|
||||
result = clazz()
|
||||
# Now we need to determine the number of parameters of the class, so we can
|
||||
# instantiate it with all of them set to None, and still, no need to write
|
||||
# the default =None in all the classes, thus forcing the user to provide a real value
|
||||
sig = inspect.signature(clazz.__init__)
|
||||
params = [None] * (len(sig.parameters) - 1) # Subtract 1 (self)
|
||||
result = clazz(*params) # https://docs.python.org/3/tutorial/controlflow.html#unpacking-argument-lists
|
||||
|
||||
# Finally, read the object and return the result
|
||||
result.on_response(self)
|
||||
return result
|
||||
|
||||
|
@ -100,7 +107,6 @@ class BinaryReader:
|
|||
|
||||
def close(self):
|
||||
self.reader.close()
|
||||
# TODO Do I need to close the underlying stream?
|
||||
|
||||
# region Position related
|
||||
|
||||
|
|
|
@ -65,18 +65,11 @@ class BinaryWriter:
|
|||
if padding != 0:
|
||||
padding = 4 - padding
|
||||
|
||||
# TODO ensure that _this_ is right (it appears to be)
|
||||
self.write(bytes([254]))
|
||||
self.write(bytes([len(data) % 256]))
|
||||
self.write(bytes([(len(data) >> 8) % 256]))
|
||||
self.write(bytes([(len(data) >> 16) % 256]))
|
||||
self.write(data)
|
||||
""" Original:
|
||||
binaryWriter.Write((byte)254);
|
||||
binaryWriter.Write((byte)(bytes.Length));
|
||||
binaryWriter.Write((byte)(bytes.Length >> 8));
|
||||
binaryWriter.Write((byte)(bytes.Length >> 16));
|
||||
"""
|
||||
|
||||
self.write(bytes(padding))
|
||||
|
||||
|
@ -84,10 +77,10 @@ class BinaryWriter:
|
|||
"""Write a string by using Telegram guidelines"""
|
||||
return self.tgwrite_bytes(string.encode('utf-8'))
|
||||
|
||||
def tgwrite_bool(self, bool):
|
||||
def tgwrite_bool(self, boolean):
|
||||
"""Write a boolean value by using Telegram guidelines"""
|
||||
# boolTrue boolFalse
|
||||
return self.write_int(0x997275b5 if bool else 0xbc799737, signed=False)
|
||||
return self.write_int(0x997275b5 if boolean else 0xbc799737, signed=False)
|
||||
|
||||
# endregion
|
||||
|
||||
|
@ -98,7 +91,6 @@ class BinaryWriter:
|
|||
def close(self):
|
||||
"""Close the current stream"""
|
||||
self.writer.close()
|
||||
# TODO Do I need to close the underlying stream?
|
||||
|
||||
def get_bytes(self, flush=True):
|
||||
"""Get the current bytes array content from the buffer, optionally flushing first"""
|
||||
|
|
|
@ -17,6 +17,7 @@ def generate_random_bytes(count):
|
|||
|
||||
|
||||
def get_byte_array(integer, signed):
|
||||
"""Gets the arbitrary-length byte array corresponding to the given integer"""
|
||||
bits = integer.bit_length()
|
||||
byte_length = (bits + bits_per_byte - 1) // bits_per_byte
|
||||
# For some strange reason, this has to be big!
|
||||
|
@ -49,6 +50,7 @@ def calc_msg_key_offset(data, offset, limit):
|
|||
|
||||
|
||||
def generate_key_data_from_nonces(server_nonce, new_nonce):
|
||||
"""Generates the key data corresponding to the given nonces"""
|
||||
hash1 = sha1(bytes(new_nonce + server_nonce))
|
||||
hash2 = sha1(bytes(server_nonce + new_nonce))
|
||||
hash3 = sha1(bytes(new_nonce + new_nonce))
|
||||
|
@ -66,6 +68,23 @@ def generate_key_data_from_nonces(server_nonce, new_nonce):
|
|||
|
||||
|
||||
def sha1(data):
|
||||
"""Calculates the SHA1 digest for the given data"""
|
||||
sha = hashlib.sha1()
|
||||
sha.update(data)
|
||||
return sha.digest()
|
||||
|
||||
|
||||
def load_settings():
|
||||
"""Loads the user settings located under `api/`"""
|
||||
settings = {}
|
||||
with open('api/settings', 'r', encoding='utf-8') as file:
|
||||
for line in file:
|
||||
value_pair = line.split('=')
|
||||
left = value_pair[0].strip()
|
||||
right = value_pair[1].strip()
|
||||
if right.isnumeric():
|
||||
settings[left] = int(right)
|
||||
else:
|
||||
settings[left] = right
|
||||
|
||||
return settings
|
||||
|
|
Loading…
Reference in New Issue
Block a user