Totally refactored source files location

Now it *should* be easier to turn Telethon
into a pip package
This commit is contained in:
Lonami 2016-09-17 20:42:34 +02:00
parent 27ec7292d8
commit 51a531225f
42 changed files with 518 additions and 531 deletions

6
.gitignore vendored
View File

@ -1,7 +1,7 @@
.idea .idea
tl/functions/ telethon/tl/functions/
tl/types/ telethon/tl/types/
tl/all_tlobjects.py telethon/tl/all_tlobjects.py
# User session # User session
*.session *.session

View File

@ -48,9 +48,9 @@ head to `api/` directory and create a copy of the `settings_example` file, namin
Then fill the file with the corresponding values (your `api_id`, `api_hash` and phone number in international format). Then fill the file with the corresponding values (your `api_id`, `api_hash` and phone number in international format).
## Running Telethon ## Running Telethon
First of all, you need to run the `tl_generator.py` by issuing `python3 tl_generator.py`. This will generate all the First of all, you need to run the `tl_generator.py` (located under `telethon-generator/`) by issuing
TLObjects from the given `scheme.tl` file. When it's done, you can run `python3 interactive_telegram_client.py` to `python3 tl_generator.py`. This will generate all the TLObjects from the given `scheme.tl` file.
start the interactive example. When it's done, you can run `python3 try_telethon.py` to start the interactive example.
## Advanced uses ## Advanced uses
### Using more than just `TelegramClient` ### Using more than just `TelegramClient`

View File

@ -2,8 +2,7 @@ import unittest
if __name__ == '__main__': if __name__ == '__main__':
from telethon_tests import CryptoTests, ParserTests, TLTests, UtilsTests, NetworkTests
from unittests import CryptoTests, ParserTests, TLTests, UtilsTests, NetworkTests
test_classes = [CryptoTests, ParserTests, TLTests, UtilsTests] test_classes = [CryptoTests, ParserTests, TLTests, UtilsTests]
network = input('Run network tests (y/n)?: ').lower() == 'y' network = input('Run network tests (y/n)?: ').lower() == 'y'

3
telethon/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .errors import *
from .telegram_client import TelegramClient
from .interactive_telegram_client import InteractiveTelegramClient

View File

@ -1,5 +1,5 @@
from utils import BinaryWriter, BinaryReader from telethon.utils import BinaryWriter, BinaryReader
import utils import telethon.helpers as utils
class AuthKey: class AuthKey:

View File

@ -1,5 +1,5 @@
import utils from telethon.utils import BinaryWriter
from utils import BinaryWriter import telethon.helpers as utils
import os import os

View File

@ -7,12 +7,6 @@ class ReadCancelledError(Exception):
super().__init__(self, 'You must run `python3 tl_generator.py` first. #ReadTheDocs!') super().__init__(self, 'You must run `python3 tl_generator.py` first. #ReadTheDocs!')
class TLGeneratorNotRan(Exception):
"""Occurs when you should've ran `tl_generator.py`, but you haven't"""
def __init__(self):
super().__init__(self, 'You must run `python3 tl_generator.py` first. #ReadTheDocs!')
class InvalidParameterError(Exception): class InvalidParameterError(Exception):
"""Occurs when an invalid parameter is given, for example, """Occurs when an invalid parameter is given, for example,
when either A or B are required but none is given""" when either A or B are required but none is given"""

View File

@ -1,6 +1,4 @@
import os import os
import shutil
from utils import BinaryWriter
import hashlib import hashlib
# region Multiple utilities # region Multiple utilities
@ -11,22 +9,6 @@ def generate_random_long(signed=True):
return int.from_bytes(os.urandom(8), signed=signed, byteorder='little') return int.from_bytes(os.urandom(8), signed=signed, byteorder='little')
def load_settings(path='api/settings'):
"""Loads the user settings located under `api/`"""
settings = {}
with open(path, 'r', encoding='utf-8') as file:
for line in file:
value_pair = line.split('=')
left = value_pair[0].strip()
right = value_pair[1].strip()
if right.isnumeric():
settings[left] = int(right)
else:
settings[left] = right
return settings
def ensure_parent_dir_exists(file_path): def ensure_parent_dir_exists(file_path):
"""Ensures that the parent directory exists""" """Ensures that the parent directory exists"""
parent = os.path.dirname(file_path) parent = os.path.dirname(file_path)
@ -65,16 +47,9 @@ def generate_key_data_from_nonces(server_nonce, new_nonce):
hash2 = sha1(bytes(server_nonce + new_nonce)) hash2 = sha1(bytes(server_nonce + new_nonce))
hash3 = sha1(bytes(new_nonce + new_nonce)) hash3 = sha1(bytes(new_nonce + new_nonce))
with BinaryWriter() as key_buffer: key = hash1 + hash2[:12]
with BinaryWriter() as iv_buffer: iv = hash2[12:20] + hash3 + new_nonce[:4]
key_buffer.write(hash1) return key, iv
key_buffer.write(hash2[:12])
iv_buffer.write(hash2[12:20])
iv_buffer.write(hash3)
iv_buffer.write(new_nonce[:4])
return key_buffer.get_bytes(), iv_buffer.get_bytes()
def sha1(data): def sha1(data):

View File

@ -1,18 +1,7 @@
import tl_generator from telethon.tl.types import UpdateShortChatMessage
from tl.types import UpdateShortChatMessage from telethon.tl.types import UpdateShortMessage
from tl.types import UpdateShortMessage from telethon import TelegramClient
if not tl_generator.tlobjects_exist():
import errors
raise errors.TLGeneratorNotRan()
else:
del tl_generator
from telegram_client import TelegramClient
from utils.helpers import load_settings
import shutil import shutil
import traceback
# Get the (current) number of lines in the terminal # Get the (current) number of lines in the terminal
cols, rows = shutil.get_terminal_size() cols, rows = shutil.get_terminal_size()
@ -226,27 +215,3 @@ class InteractiveTelegramClient(TelegramClient):
elif type(update_object) is UpdateShortChatMessage: elif type(update_object) is UpdateShortChatMessage:
print('[Chat #{} sent {}]'.format(update_object.chat_id, update_object.message)) print('[Chat #{} sent {}]'.format(update_object.chat_id, update_object.message))
if __name__ == '__main__':
# Load the settings and initialize the client
settings = load_settings()
client = InteractiveTelegramClient(
session_user_id=settings.get('session_name', 'anonymous'),
user_phone=str(settings['user_phone']),
layer=55,
api_id=settings['api_id'],
api_hash=settings['api_hash'])
print('Initialization done!')
try:
client.run()
except Exception as e:
print('Unexpected error ({}): {} at\n{}'.format(type(e), e, traceback.format_exc()))
finally:
print_title('Exit')
print('Thanks for trying the interactive example! Exiting...')
client.disconnect()

View File

@ -1,9 +1,9 @@
import os import os
import time import time
import utils import telethon.helpers as utils
from utils import BinaryWriter, BinaryReader from telethon.utils import BinaryWriter, BinaryReader
from crypto import AES, AuthKey, Factorizator, RSA from telethon.crypto import AES, AuthKey, Factorizator, RSA
from network import MtProtoPlainSender from telethon.network import MtProtoPlainSender
def do_authentication(transport): def do_authentication(transport):

View File

@ -1,6 +1,6 @@
import time import time
import random import random
from utils import BinaryWriter, BinaryReader from telethon.utils import BinaryWriter, BinaryReader
class MtProtoPlainSender: class MtProtoPlainSender:

View File

@ -1,13 +1,13 @@
import gzip import gzip
from errors import * from telethon.errors import *
from time import sleep from time import sleep
from threading import Thread, Lock from threading import Thread, Lock
import utils import telethon.helpers as utils
from crypto import AES from telethon.crypto import AES
from utils import BinaryWriter, BinaryReader from telethon.utils import BinaryWriter, BinaryReader
from tl.types import MsgsAck from telethon.tl.types import MsgsAck
from tl.all_tlobjects import tlobjects from telethon.tl.all_tlobjects import tlobjects
class MtProtoSender: class MtProtoSender:

View File

@ -3,8 +3,8 @@ import socket
import time import time
from threading import Lock from threading import Lock
from errors import ReadCancelledError from telethon.errors import ReadCancelledError
from utils import BinaryWriter from telethon.utils import BinaryWriter
class TcpClient: class TcpClient:

View File

@ -1,7 +1,7 @@
from network import TcpClient
from binascii import crc32 from binascii import crc32
from errors import * from telethon.network import TcpClient
from utils import BinaryWriter from telethon.errors import *
from telethon.utils import BinaryWriter
class TcpTransport: class TcpTransport:

View File

@ -0,0 +1 @@
from .markdown_parser import parse_message_entities

View File

@ -1,4 +1,4 @@
from tl.types import MessageEntityBold, MessageEntityItalic, MessageEntityCode, MessageEntityTextUrl from telethon.tl.types import MessageEntityBold, MessageEntityItalic, MessageEntityCode, MessageEntityTextUrl
def parse_message_entities(msg): def parse_message_entities(msg):

View File

@ -4,31 +4,31 @@ from hashlib import md5
from os import path from os import path
from mimetypes import guess_extension, guess_type from mimetypes import guess_extension, guess_type
import utils
import network.authenticator
from errors import *
from network import MtProtoSender, TcpTransport
from parser.markdown_parser import parse_message_entities
# For sending and receiving requests # For sending and receiving requests
from tl import MTProtoRequest from telethon.tl import MTProtoRequest
from tl import Session from telethon.tl import Session
# The Requests and types that we'll be using # The Requests and types that we'll be using
from tl.functions.upload import SaveBigFilePartRequest from telethon.tl.functions.upload import SaveBigFilePartRequest
from tl.types import \ from telethon.tl.types import \
PeerUser, PeerChat, PeerChannel, \ PeerUser, PeerChat, PeerChannel, \
InputPeerUser, InputPeerChat, InputPeerChannel, InputPeerEmpty, \ InputPeerUser, InputPeerChat, InputPeerChannel, InputPeerEmpty, \
InputFile, InputFileLocation, InputMediaUploadedPhoto, InputMediaUploadedDocument, \ InputFile, InputFileLocation, InputMediaUploadedPhoto, InputMediaUploadedDocument, \
MessageMediaContact, MessageMediaDocument, MessageMediaPhoto, \ MessageMediaContact, MessageMediaDocument, MessageMediaPhoto, \
DocumentAttributeAudio, DocumentAttributeFilename, InputDocumentFileLocation DocumentAttributeAudio, DocumentAttributeFilename, InputDocumentFileLocation
from tl.functions import InvokeWithLayerRequest, InitConnectionRequest from telethon.tl.functions import InvokeWithLayerRequest, InitConnectionRequest
from tl.functions.help import GetConfigRequest from telethon.tl.functions.help import GetConfigRequest
from tl.functions.auth import SendCodeRequest, SignInRequest, SignUpRequest, LogOutRequest from telethon.tl.functions.auth import SendCodeRequest, SignInRequest, SignUpRequest, LogOutRequest
from tl.functions.upload import SaveFilePartRequest, GetFileRequest from telethon.tl.functions.upload import SaveFilePartRequest, GetFileRequest
from tl.functions.messages import GetDialogsRequest, GetHistoryRequest, SendMessageRequest, SendMediaRequest from telethon.tl.functions.messages import GetDialogsRequest, GetHistoryRequest, SendMessageRequest, SendMediaRequest
import telethon.helpers as utils
import telethon.network.authenticator as authenticator
from telethon.errors import *
from telethon.network import MtProtoSender, TcpTransport
from telethon.parser.markdown_parser import parse_message_entities
class TelegramClient: class TelegramClient:
@ -63,7 +63,7 @@ class TelegramClient:
try: try:
if not self.session.auth_key or reconnect: if not self.session.auth_key or reconnect:
self.session.auth_key, self.session.time_offset = \ self.session.auth_key, self.session.time_offset = \
network.authenticator.do_authentication(self.transport) authenticator.do_authentication(self.transport)
self.session.save() self.session.save()

2
telethon/tl/__init__.py Executable file
View File

@ -0,0 +1,2 @@
from telethon.tl.mtproto_request import MTProtoRequest
from telethon.tl.session import Session

View File

@ -2,8 +2,8 @@ from os.path import isfile as file_exists
import os import os
import time import time
import pickle import pickle
import utils
import random import random
import telethon.helpers as utils
class Session: class Session:

View File

@ -1,3 +1,2 @@
from .binary_reader import BinaryReader
from .binary_writer import BinaryWriter from .binary_writer import BinaryWriter
from .helpers import * from .binary_reader import BinaryReader

View File

@ -1,8 +1,8 @@
from datetime import datetime from datetime import datetime
from io import BytesIO, BufferedReader from io import BytesIO, BufferedReader
from tl.all_tlobjects import tlobjects from telethon.tl.all_tlobjects import tlobjects
from struct import unpack from struct import unpack
from errors import * from telethon.errors import *
import inspect import inspect
import os import os

View File

@ -1,3 +1,3 @@
from .source_builder import SourceBuilder from .source_builder import SourceBuilder
from .tl_parser import TLParser from .tl_parser import TLParser
from .tlobject import TLObject from .tl_object import TLObject

View File

@ -1,6 +1,6 @@
import re import re
from parser.tlobject import TLObject from .tl_object import TLObject
class TLParser: class TLParser:

View File

@ -0,0 +1,401 @@
import os
import re
import shutil
from parser import SourceBuilder, TLParser
def get_output_path(normal_path):
return os.path.join('../telethon/tl', normal_path)
class TLGenerator:
@staticmethod
def tlobjects_exist():
"""Determines whether the TLObjects were previously generated (hence exist) or not"""
return os.path.isfile(get_output_path('all_tlobjects.py'))
@staticmethod
def clean_tlobjects():
"""Cleans the automatically generated TLObjects from disk"""
if os.path.isdir(get_output_path('functions')):
shutil.rmtree(get_output_path('functions'))
if os.path.isdir(get_output_path('types')):
shutil.rmtree(get_output_path('types'))
if os.path.isfile(get_output_path('all_tlobjects.py')):
os.remove(get_output_path('all_tlobjects.py'))
@staticmethod
def generate_tlobjects(scheme_file):
"""Generates all the TLObjects from scheme.tl to tl/functions and tl/types"""
# First ensure that the required parent directories exist
os.makedirs(get_output_path('functions'), exist_ok=True)
os.makedirs(get_output_path('types'), exist_ok=True)
# Store the parsed file in a tuple for iterating it more than once
tlobjects = tuple(TLParser.parse_file(scheme_file))
for tlobject in tlobjects:
# Determine the output directory and create it
out_dir = get_output_path('functions' if tlobject.is_function
else 'types')
if tlobject.namespace:
out_dir = os.path.join(out_dir, tlobject.namespace)
os.makedirs(out_dir, exist_ok=True)
# Also add this object to __init__.py, so we can import the whole packet at once
init_py = os.path.join(out_dir, '__init__.py')
with open(init_py, 'a', encoding='utf-8') as file:
with SourceBuilder(file) as builder:
builder.writeln('from {} import {}'.format(
TLGenerator.get_full_file_name(tlobject),
TLGenerator.get_class_name(tlobject)))
# Create the file for this TLObject
filename = os.path.join(out_dir, TLGenerator.get_file_name(tlobject, add_extension=True))
with open(filename, 'w', encoding='utf-8') as file:
# Let's build the source code!
with SourceBuilder(file) as builder:
# Both types and functions inherit from MTProtoRequest so they all can be sent
builder.writeln('from telethon.tl.mtproto_request import MTProtoRequest')
builder.writeln()
builder.writeln()
builder.writeln('class {}(MTProtoRequest):'.format(TLGenerator.get_class_name(tlobject)))
# Write the original .tl definition, along with a "generated automatically" message
builder.writeln('"""Class generated by TLObjects\' generator. '
'All changes will be ERASED. Original .tl definition below.')
builder.writeln('{}"""'.format(repr(tlobject)))
builder.writeln()
# First sort the arguments so that those not being a flag come first
args = sorted([arg for arg in tlobject.args if not arg.flag_indicator],
key=lambda x: x.is_flag)
# Then convert the args to string parameters, the flags having =None
args = [(arg.name if not arg.is_flag
else '{}=None'.format(arg.name)) for arg in args
if not arg.flag_indicator and not arg.generic_definition]
# Write the __init__ function
if args:
builder.writeln('def __init__(self, {}):'.format(', '.join(args)))
else:
builder.writeln('def __init__(self):')
# Now update args to have the TLObject arguments, _except_
# those which are generated automatically: flag indicator and generic definitions.
# We don't need the generic definitions in Python because arguments can be any type
args = [arg for arg in tlobject.args
if not arg.flag_indicator and not arg.generic_definition]
if args:
# Write the docstring, so we know the type of the arguments
builder.writeln('"""')
for arg in args:
if not arg.flag_indicator:
builder.write(':param {}: Telegram type: «{}».'.format(arg.name, arg.type))
if arg.is_vector:
builder.write(' Must be a list.'.format(arg.name))
if arg.is_generic:
builder.write(' This should be another MTProtoRequest.')
builder.writeln()
builder.writeln('"""')
builder.writeln('super().__init__()')
# Functions have a result object and are confirmed by default
if tlobject.is_function:
builder.writeln('self.result = None')
builder.writeln('self.confirmed = True # Confirmed by default')
# Create an attribute that stores the TLObject's constructor ID
builder.writeln('self.constructor_id = {}'.format(hex(tlobject.id)))
# Set the arguments
if args:
# Leave an empty line if there are any args
builder.writeln()
for arg in args:
builder.writeln('self.{0} = {0}'.format(arg.name))
builder.end_block()
# Write the on_send(self, writer) function
builder.writeln('def on_send(self, writer):')
builder.writeln('writer.write_int(self.constructor_id, signed=False)'
.format(hex(tlobject.id), tlobject.name))
for arg in tlobject.args:
TLGenerator.write_onsend_code(builder, arg, tlobject.args)
builder.end_block()
# Write the on_response(self, reader) function
builder.writeln('def on_response(self, reader):')
# Do not read constructor's ID, since that's already been read somewhere else
if tlobject.is_function:
builder.writeln('self.result = reader.tgread_object()')
else:
if tlobject.args:
for arg in tlobject.args:
TLGenerator.write_onresponse_code(builder, arg, tlobject.args)
else:
# If there were no arguments, we still need an on_response method, and hence "pass" if empty
builder.writeln('pass')
builder.end_block()
# Write the __repr__(self) and __str__(self) functions
builder.writeln('def __repr__(self):')
builder.writeln("return '{}'".format(repr(tlobject)))
builder.end_block()
builder.writeln('def __str__(self):')
builder.writeln("return {}".format(str(tlobject)))
# builder.end_block() # There is no need to end the last block
# Once all the objects have been generated, we can now group them in a single file
filename = os.path.join(get_output_path('all_tlobjects.py'))
with open(filename, 'w', encoding='utf-8') as file:
with SourceBuilder(file) as builder:
builder.writeln('"""File generated by TLObjects\' generator. All changes will be ERASED"""')
builder.writeln()
# First add imports
for tlobject in tlobjects:
builder.writeln('import {}'.format(TLGenerator.get_full_file_name(tlobject)))
builder.writeln()
# Then create the dictionary containing constructor_id: class
builder.writeln('tlobjects = {')
builder.current_indent += 1
# Fill the dictionary (0x1a2b3c4f: tl.full.type.path.Class)
for tlobject in tlobjects:
builder.writeln('{}: {}.{},'
.format(hex(tlobject.id),
TLGenerator.get_full_file_name(tlobject),
TLGenerator.get_class_name(tlobject)))
builder.current_indent -= 1
builder.writeln('}')
@staticmethod
def get_class_name(tlobject):
"""Gets the class name following the Python style guidelines, in ThisClassFormat"""
# Courtesy of http://stackoverflow.com/a/31531797/4759433
# Also, '_' could be replaced for ' ', then use .title(), and then remove ' '
result = re.sub(r'_([a-z])', lambda m: m.group(1).upper(), tlobject.name)
result = result[:1].upper() + result[1:].replace('_', '') # Replace again to fully ensure!
# If it's a function, let it end with "Request" to identify them more easily
if tlobject.is_function:
result += 'Request'
return result
@staticmethod
def get_full_file_name(tlobject):
"""Gets the full file name for the given TLObject (tl.type.full.path)"""
fullname = TLGenerator.get_file_name(tlobject, add_extension=False)
if tlobject.namespace:
fullname = '{}.{}'.format(tlobject.namespace, fullname)
if tlobject.is_function:
return 'telethon.tl.functions.{}'.format(fullname)
else:
return 'telethon.tl.types.{}'.format(fullname)
@staticmethod
def get_file_name(tlobject, add_extension):
"""Gets the file name in file_name_format.py for the given TLObject"""
# Courtesy of http://stackoverflow.com/a/1176023/4759433
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', tlobject.name)
result = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
if add_extension:
return result + '.py'
else:
return result
@staticmethod
def write_onsend_code(builder, arg, args, name=None):
"""
Writes the write code for the given argument
:param builder: The source code builder
:param arg: The argument to write
:param args: All the other arguments in TLObject same on_send. This is required to determine the flags value
:param name: The name of the argument. Defaults to «self.argname»
This argument is an option because it's required when writing Vectors<>
"""
if arg.generic_definition:
return # Do nothing, this only specifies a later type
if name is None:
name = 'self.{}'.format(arg.name)
# The argument may be a flag, only write if it's not None AND if it's not a True type
# True types are not actually sent, but instead only used to determine the flags
if arg.is_flag:
if arg.type == 'true':
return # Exit, since True type is never written
else:
builder.writeln('if {}:'.format(name))
if arg.is_vector:
builder.writeln("writer.write_int(0x1cb5c415, signed=False) # Vector's constructor ID")
builder.writeln('writer.write_int(len({}))'.format(name))
builder.writeln('for {}_item in {}:'.format(arg.name, name))
# Temporary disable .is_vector, not to enter this if again
arg.is_vector = False
TLGenerator.write_onsend_code(builder, arg, args, name='{}_item'.format(arg.name))
arg.is_vector = True
elif arg.flag_indicator:
# Calculate the flags with those items which are not None
builder.writeln('# Calculate the flags. This equals to those flag arguments which are NOT None')
builder.writeln('flags = 0')
for flag in args:
if flag.is_flag:
builder.writeln('flags |= (1 << {}) if {} else 0'
.format(flag.flag_index, 'self.{}'.format(flag.name)))
builder.writeln('writer.write_int(flags)')
builder.writeln()
elif 'int' == arg.type:
builder.writeln('writer.write_int({})'.format(name))
elif 'long' == arg.type:
builder.writeln('writer.write_long({})'.format(name))
elif 'int128' == arg.type:
builder.writeln('writer.write_large_int({}, bits=128)'.format(name))
elif 'int256' == arg.type:
builder.writeln('writer.write_large_int({}, bits=256)'.format(name))
elif 'double' == arg.type:
builder.writeln('writer.write_double({})'.format(name))
elif 'string' == arg.type:
builder.writeln('writer.tgwrite_string({})'.format(name))
elif 'Bool' == arg.type:
builder.writeln('writer.tgwrite_bool({})'.format(name))
elif 'true' == arg.type: # Awkwardly enough, Telegram has both bool and "true", used in flags
pass # These are actually NOT written! Only used for flags
elif 'bytes' == arg.type:
builder.writeln('writer.tgwrite_bytes({})'.format(name))
elif 'date' == arg.type: # Custom format
builder.writeln('writer.tgwrite_date({})'.format(name))
else:
# Else it may be a custom type
builder.writeln('{}.on_send(writer)'.format(name))
# End vector and flag blocks if required (if we opened them before)
if arg.is_vector:
builder.end_block()
if arg.is_flag:
builder.end_block()
@staticmethod
def write_onresponse_code(builder, arg, args, name=None):
"""
Writes the receive code for the given argument
:param builder: The source code builder
:param arg: The argument to write
:param args: All the other arguments in TLObject same on_send. This is required to determine the flags value
:param name: The name of the argument. Defaults to «self.argname»
This argument is an option because it's required when writing Vectors<>
"""
if arg.generic_definition:
return # Do nothing, this only specifies a later type
if name is None:
name = 'self.{}'.format(arg.name)
# The argument may be a flag, only write that flag was given!
was_flag = False
if arg.is_flag:
was_flag = True
builder.writeln('if (flags & (1 << {})) != 0:'.format(arg.flag_index))
# Temporary disable .is_flag not to enter this if again when calling the method recursively
arg.is_flag = False
if arg.is_vector:
builder.writeln("reader.read_int() # Vector's constructor ID")
builder.writeln('{} = [] # Initialize an empty list'.format(name))
builder.writeln('{}_len = reader.read_int()'.format(arg.name))
builder.writeln('for _ in range({}_len):'.format(arg.name))
# Temporary disable .is_vector, not to enter this if again
arg.is_vector = False
TLGenerator.write_onresponse_code(builder, arg, args, name='{}_item'.format(arg.name))
builder.writeln('{}.append({}_item)'.format(name, arg.name))
arg.is_vector = True
elif arg.flag_indicator:
# Read the flags, which will indicate what items we should read next
builder.writeln('flags = reader.read_int()')
builder.writeln()
elif 'int' == arg.type:
builder.writeln('{} = reader.read_int()'.format(name))
elif 'long' == arg.type:
builder.writeln('{} = reader.read_long()'.format(name))
elif 'int128' == arg.type:
builder.writeln('{} = reader.read_large_int(bits=128)'.format(name))
elif 'int256' == arg.type:
builder.writeln('{} = reader.read_large_int(bits=256)'.format(name))
elif 'double' == arg.type:
builder.writeln('{} = reader.read_double()'.format(name))
elif 'string' == arg.type:
builder.writeln('{} = reader.tgread_string()'.format(name))
elif 'Bool' == arg.type:
builder.writeln('{} = reader.tgread_bool()'.format(name))
elif 'true' == arg.type: # Awkwardly enough, Telegram has both bool and "true", used in flags
builder.writeln('{} = True # Arbitrary not-None value, no need to read since it is a flag'.format(name))
elif 'bytes' == arg.type:
builder.writeln('{} = reader.tgread_bytes()'.format(name))
elif 'date' == arg.type: # Custom format
builder.writeln('{} = reader.tgread_date()'.format(name))
else:
# Else it may be a custom type
builder.writeln('{} = reader.tgread_object()'.format(name))
# End vector and flag blocks if required (if we opened them before)
if arg.is_vector:
builder.end_block()
if was_flag:
builder.end_block()
# Restore .is_flag
arg.is_flag = True
if __name__ == '__main__':
if TLGenerator.tlobjects_exist():
print('Detected previous TLObjects. Cleaning...')
TLGenerator.clean_tlobjects()
print('Generating TLObjects...')
TLGenerator.generate_tlobjects('scheme.tl')
print('Done.')

View File

@ -1,8 +1,8 @@
import unittest import unittest
from crypto import AES from telethon.crypto import AES
import utils.helpers as utils import telethon.helpers as utils
from crypto import Factorizator from telethon.crypto import Factorizator
class CryptoTests(unittest.TestCase): class CryptoTests(unittest.TestCase):
@ -32,8 +32,10 @@ class CryptoTests(unittest.TestCase):
def test_aes_encrypt(self): def test_aes_encrypt(self):
value = AES.encrypt_ige(self.plain_text, self.key, self.iv) value = AES.encrypt_ige(self.plain_text, self.key, self.iv)
assert value == self.cipher_text, ('Ciphered text ("{}") does not equal expected ("{}")' take = 16 # Don't take all the bytes, since latest involve are random padding
.format(value, self.cipher_text)) assert value[:take] == self.cipher_text[:take],\
('Ciphered text ("{}") does not equal expected ("{}")'
.format(value[:take], self.cipher_text[:take]))
value = AES.encrypt_ige(self.plain_text_padded, self.key, self.iv) value = AES.encrypt_ige(self.plain_text_padded, self.key, self.iv)
assert value == self.cipher_text_padded, ('Ciphered text ("{}") does not equal expected ("{}")' assert value == self.cipher_text_padded, ('Ciphered text ("{}") does not equal expected ("{}")'

View File

@ -3,8 +3,8 @@ import socket
import threading import threading
import unittest import unittest
from network import TcpTransport, TcpClient from telethon.network import TcpTransport, TcpClient
import network.authenticator import telethon.network.authenticator as authenticator
def run_server_echo_thread(port): def run_server_echo_thread(port):
@ -37,5 +37,5 @@ class NetworkTests(unittest.TestCase):
@staticmethod @staticmethod
def test_authenticator(): def test_authenticator():
transport = TcpTransport('149.154.167.91', 443) transport = TcpTransport('149.154.167.91', 443)
network.authenticator.do_authentication(transport) authenticator.do_authentication(transport)
transport.close() transport.close()

View File

@ -1,6 +1,6 @@
import os import os
import unittest import unittest
from utils import BinaryReader, BinaryWriter from telethon.utils import BinaryReader, BinaryWriter
class UtilsTests(unittest.TestCase): class UtilsTests(unittest.TestCase):

View File

@ -1,4 +0,0 @@
from .all_tlobjects import tlobjects
from .mtproto_request import MTProtoRequest
from .session import Session

View File

@ -1,393 +0,0 @@
import os
import re
import shutil
from parser import SourceBuilder, TLParser
def tlobjects_exist():
"""Determines whether the TLObjects were previously generated (hence exist) or not"""
return os.path.isfile('tl/all_tlobjects.py')
def clean_tlobjects():
"""Cleans the automatically generated TLObjects from disk"""
if os.path.isdir('tl/functions'):
shutil.rmtree('tl/functions')
if os.path.isdir('tl/types'):
shutil.rmtree('tl/types')
if os.path.isfile('tl/all_tlobjects.py'):
os.remove('tl/all_tlobjects.py')
def generate_tlobjects(scheme_file):
"""Generates all the TLObjects from scheme.tl to tl/functions and tl/types"""
# First ensure that the required parent directories exist
os.makedirs('tl/functions', exist_ok=True)
os.makedirs('tl/types', exist_ok=True)
# Store the parsed file in a tuple for iterating it more than once
tlobjects = tuple(TLParser.parse_file(scheme_file))
for tlobject in tlobjects:
# Determine the output directory and create it
out_dir = os.path.join('tl',
'functions' if tlobject.is_function
else 'types')
if tlobject.namespace:
out_dir = os.path.join(out_dir, tlobject.namespace)
os.makedirs(out_dir, exist_ok=True)
# Also add this object to __init__.py, so we can import the whole packet at once
init_py = os.path.join(out_dir, '__init__.py')
with open(init_py, 'a', encoding='utf-8') as file:
with SourceBuilder(file) as builder:
builder.writeln('from {} import {}'.format(
get_full_file_name(tlobject), get_class_name(tlobject)))
# Create the file for this TLObject
filename = os.path.join(out_dir, get_file_name(tlobject, add_extension=True))
with open(filename, 'w', encoding='utf-8') as file:
# Let's build the source code!
with SourceBuilder(file) as builder:
# Both types and functions inherit from MTProtoRequest so they all can be sent
builder.writeln('from tl.mtproto_request import MTProtoRequest')
builder.writeln()
builder.writeln()
builder.writeln('class {}(MTProtoRequest):'.format(get_class_name(tlobject)))
# Write the original .tl definition, along with a "generated automatically" message
builder.writeln('"""Class generated by TLObjects\' generator. '
'All changes will be ERASED. Original .tl definition below.')
builder.writeln('{}"""'.format(repr(tlobject)))
builder.writeln()
# First sort the arguments so that those not being a flag come first
args = sorted([arg for arg in tlobject.args if not arg.flag_indicator],
key=lambda x: x.is_flag)
# Then convert the args to string parameters, the flags having =None
args = [(arg.name if not arg.is_flag
else '{}=None'.format(arg.name)) for arg in args
if not arg.flag_indicator and not arg.generic_definition]
# Write the __init__ function
if args:
builder.writeln('def __init__(self, {}):'.format(', '.join(args)))
else:
builder.writeln('def __init__(self):')
# Now update args to have the TLObject arguments, _except_
# those which are generated automatically: flag indicator and generic definitions.
# We don't need the generic definitions in Python because arguments can be any type
args = [arg for arg in tlobject.args
if not arg.flag_indicator and not arg.generic_definition]
if args:
# Write the docstring, so we know the type of the arguments
builder.writeln('"""')
for arg in args:
if not arg.flag_indicator:
builder.write(':param {}: Telegram type: «{}».'.format(arg.name, arg.type))
if arg.is_vector:
builder.write(' Must be a list.'.format(arg.name))
if arg.is_generic:
builder.write(' This should be another MTProtoRequest.')
builder.writeln()
builder.writeln('"""')
builder.writeln('super().__init__()')
# Functions have a result object and are confirmed by default
if tlobject.is_function:
builder.writeln('self.result = None')
builder.writeln('self.confirmed = True # Confirmed by default')
# Create an attribute that stores the TLObject's constructor ID
builder.writeln('self.constructor_id = {}'.format(hex(tlobject.id)))
# Set the arguments
if args:
# Leave an empty line if there are any args
builder.writeln()
for arg in args:
builder.writeln('self.{0} = {0}'.format(arg.name))
builder.end_block()
# Write the on_send(self, writer) function
builder.writeln('def on_send(self, writer):')
builder.writeln('writer.write_int(self.constructor_id, signed=False)'
.format(hex(tlobject.id), tlobject.name))
for arg in tlobject.args:
write_onsend_code(builder, arg, tlobject.args)
builder.end_block()
# Write the on_response(self, reader) function
builder.writeln('def on_response(self, reader):')
# Do not read constructor's ID, since that's already been read somewhere else
if tlobject.is_function:
builder.writeln('self.result = reader.tgread_object()')
else:
if tlobject.args:
for arg in tlobject.args:
write_onresponse_code(builder, arg, tlobject.args)
else:
# If there were no arguments, we still need an on_response method, and hence "pass" if empty
builder.writeln('pass')
builder.end_block()
# Write the __repr__(self) and __str__(self) functions
builder.writeln('def __repr__(self):')
builder.writeln("return '{}'".format(repr(tlobject)))
builder.end_block()
builder.writeln('def __str__(self):')
builder.writeln("return {}".format(str(tlobject)))
# builder.end_block() # There is no need to end the last block
# Once all the objects have been generated, we can now group them in a single file
filename = os.path.join('tl', 'all_tlobjects.py')
with open(filename, 'w', encoding='utf-8') as file:
with SourceBuilder(file) as builder:
builder.writeln('"""File generated by TLObjects\' generator. All changes will be ERASED"""')
builder.writeln()
# First add imports
for tlobject in tlobjects:
builder.writeln('import {}'.format(get_full_file_name(tlobject)))
builder.writeln()
# Then create the dictionary containing constructor_id: class
builder.writeln('tlobjects = {')
builder.current_indent += 1
# Fill the dictionary (0x1a2b3c4f: tl.full.type.path.Class)
for tlobject in tlobjects:
builder.writeln('{}: {}.{},'
.format(hex(tlobject.id), get_full_file_name(tlobject), get_class_name(tlobject)))
builder.current_indent -= 1
builder.writeln('}')
def get_class_name(tlobject):
"""Gets the class name following the Python style guidelines, in ThisClassFormat"""
# Courtesy of http://stackoverflow.com/a/31531797/4759433
# Also, '_' could be replaced for ' ', then use .title(), and then remove ' '
result = re.sub(r'_([a-z])', lambda m: m.group(1).upper(), tlobject.name)
result = result[:1].upper() + result[1:].replace('_', '') # Replace again to fully ensure!
# If it's a function, let it end with "Request" to identify them more easily
if tlobject.is_function:
result += 'Request'
return result
def get_full_file_name(tlobject):
"""Gets the full file name for the given TLObject (tl.type.full.path)"""
fullname = get_file_name(tlobject, add_extension=False)
if tlobject.namespace:
fullname = '{}.{}'.format(tlobject.namespace, fullname)
if tlobject.is_function:
return 'tl.functions.{}'.format(fullname)
else:
return 'tl.types.{}'.format(fullname)
def get_file_name(tlobject, add_extension):
"""Gets the file name in file_name_format.py for the given TLObject"""
# Courtesy of http://stackoverflow.com/a/1176023/4759433
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', tlobject.name)
result = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
if add_extension:
return result + '.py'
else:
return result
def write_onsend_code(builder, arg, args, name=None):
"""
Writes the write code for the given argument
:param builder: The source code builder
:param arg: The argument to write
:param args: All the other arguments in TLObject same on_send. This is required to determine the flags value
:param name: The name of the argument. Defaults to «self.argname»
This argument is an option because it's required when writing Vectors<>
"""
if arg.generic_definition:
return # Do nothing, this only specifies a later type
if name is None:
name = 'self.{}'.format(arg.name)
# The argument may be a flag, only write if it's not None AND if it's not a True type
# True types are not actually sent, but instead only used to determine the flags
if arg.is_flag:
if arg.type == 'true':
return # Exit, since True type is never written
else:
builder.writeln('if {}:'.format(name))
if arg.is_vector:
builder.writeln("writer.write_int(0x1cb5c415, signed=False) # Vector's constructor ID")
builder.writeln('writer.write_int(len({}))'.format(name))
builder.writeln('for {}_item in {}:'.format(arg.name, name))
# Temporary disable .is_vector, not to enter this if again
arg.is_vector = False
write_onsend_code(builder, arg, args, name='{}_item'.format(arg.name))
arg.is_vector = True
elif arg.flag_indicator:
# Calculate the flags with those items which are not None
builder.writeln('# Calculate the flags. This equals to those flag arguments which are NOT None')
builder.writeln('flags = 0')
for flag in args:
if flag.is_flag:
builder.writeln('flags |= (1 << {}) if {} else 0'
.format(flag.flag_index, 'self.{}'.format(flag.name)))
builder.writeln('writer.write_int(flags)')
builder.writeln()
elif 'int' == arg.type:
builder.writeln('writer.write_int({})'.format(name))
elif 'long' == arg.type:
builder.writeln('writer.write_long({})'.format(name))
elif 'int128' == arg.type:
builder.writeln('writer.write_large_int({}, bits=128)'.format(name))
elif 'int256' == arg.type:
builder.writeln('writer.write_large_int({}, bits=256)'.format(name))
elif 'double' == arg.type:
builder.writeln('writer.write_double({})'.format(name))
elif 'string' == arg.type:
builder.writeln('writer.tgwrite_string({})'.format(name))
elif 'Bool' == arg.type:
builder.writeln('writer.tgwrite_bool({})'.format(name))
elif 'true' == arg.type: # Awkwardly enough, Telegram has both bool and "true", used in flags
pass # These are actually NOT written! Only used for flags
elif 'bytes' == arg.type:
builder.writeln('writer.tgwrite_bytes({})'.format(name))
elif 'date' == arg.type: # Custom format
builder.writeln('writer.tgwrite_date({})'.format(name))
else:
# Else it may be a custom type
builder.writeln('{}.on_send(writer)'.format(name))
# End vector and flag blocks if required (if we opened them before)
if arg.is_vector:
builder.end_block()
if arg.is_flag:
builder.end_block()
def write_onresponse_code(builder, arg, args, name=None):
"""
Writes the receive code for the given argument
:param builder: The source code builder
:param arg: The argument to write
:param args: All the other arguments in TLObject same on_send. This is required to determine the flags value
:param name: The name of the argument. Defaults to «self.argname»
This argument is an option because it's required when writing Vectors<>
"""
if arg.generic_definition:
return # Do nothing, this only specifies a later type
if name is None:
name = 'self.{}'.format(arg.name)
# The argument may be a flag, only write that flag was given!
was_flag = False
if arg.is_flag:
was_flag = True
builder.writeln('if (flags & (1 << {})) != 0:'.format(arg.flag_index))
# Temporary disable .is_flag not to enter this if again when calling the method recursively
arg.is_flag = False
if arg.is_vector:
builder.writeln("reader.read_int() # Vector's constructor ID")
builder.writeln('{} = [] # Initialize an empty list'.format(name))
builder.writeln('{}_len = reader.read_int()'.format(arg.name))
builder.writeln('for _ in range({}_len):'.format(arg.name))
# Temporary disable .is_vector, not to enter this if again
arg.is_vector = False
write_onresponse_code(builder, arg, args, name='{}_item'.format(arg.name))
builder.writeln('{}.append({}_item)'.format(name, arg.name))
arg.is_vector = True
elif arg.flag_indicator:
# Read the flags, which will indicate what items we should read next
builder.writeln('flags = reader.read_int()')
builder.writeln()
elif 'int' == arg.type:
builder.writeln('{} = reader.read_int()'.format(name))
elif 'long' == arg.type:
builder.writeln('{} = reader.read_long()'.format(name))
elif 'int128' == arg.type:
builder.writeln('{} = reader.read_large_int(bits=128)'.format(name))
elif 'int256' == arg.type:
builder.writeln('{} = reader.read_large_int(bits=256)'.format(name))
elif 'double' == arg.type:
builder.writeln('{} = reader.read_double()'.format(name))
elif 'string' == arg.type:
builder.writeln('{} = reader.tgread_string()'.format(name))
elif 'Bool' == arg.type:
builder.writeln('{} = reader.tgread_bool()'.format(name))
elif 'true' == arg.type: # Awkwardly enough, Telegram has both bool and "true", used in flags
builder.writeln('{} = True # Arbitrary not-None value, no need to read since it is a flag'.format(name))
elif 'bytes' == arg.type:
builder.writeln('{} = reader.tgread_bytes()'.format(name))
elif 'date' == arg.type: # Custom format
builder.writeln('{} = reader.tgread_date()'.format(name))
else:
# Else it may be a custom type
builder.writeln('{} = reader.tgread_object()'.format(name))
# End vector and flag blocks if required (if we opened them before)
if arg.is_vector:
builder.end_block()
if was_flag:
builder.end_block()
# Restore .is_flag
arg.is_flag = True
if __name__ == '__main__':
if tlobjects_exist():
print('Detected previous TLObjects. Cleaning...')
clean_tlobjects()
print('Generating TLObjects...')
generate_tlobjects('scheme.tl')
print('Done.')

43
try_telethon.py Normal file
View File

@ -0,0 +1,43 @@
import traceback
from telethon.interactive_telegram_client import \
InteractiveTelegramClient, print_title
def load_settings(path='api/settings'):
"""Loads the user settings located under `api/`"""
settings = {}
with open(path, 'r', encoding='utf-8') as file:
for line in file:
value_pair = line.split('=')
left = value_pair[0].strip()
right = value_pair[1].strip()
if right.isnumeric():
settings[left] = int(right)
else:
settings[left] = right
return settings
if __name__ == '__main__':
# Load the settings and initialize the client
settings = load_settings()
client = InteractiveTelegramClient(
session_user_id=settings.get('session_name', 'anonymous'),
user_phone=str(settings['user_phone']),
layer=55,
api_id=settings['api_id'],
api_hash=settings['api_hash'])
print('Initialization done!')
try:
client.run()
except Exception as e:
print('Unexpected error ({}): {} at\n{}'.format(type(e), e, traceback.format_exc()))
finally:
print_title('Exit')
print('Thanks for trying the interactive example! Exiting...')
client.disconnect()