mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2024-11-10 19:46:36 +03:00
fd14a5a49a
A new thread is now created when MtProtoSender is created. This thread constantly listens for Telegram updates (such as incoming messages), so the updates can be received any time rather than only when sending another request.
388 lines
16 KiB
Python
Executable File
388 lines
16 KiB
Python
Executable File
import os
|
|
import re
|
|
import shutil
|
|
|
|
from parser import SourceBuilder, TLParser
|
|
|
|
|
|
def tlobjects_exist():
|
|
"""Determines whether the TLObjects were previously generated (hence exist) or not"""
|
|
return os.path.isfile('tl/all_tlobjects.py')
|
|
|
|
|
|
def clean_tlobjects():
|
|
"""Cleans the automatically generated TLObjects from disk"""
|
|
if os.path.isdir('tl/functions'):
|
|
shutil.rmtree('tl/functions')
|
|
|
|
if os.path.isdir('tl/types'):
|
|
shutil.rmtree('tl/types')
|
|
|
|
if os.path.isfile('tl/all_tlobjects.py'):
|
|
os.remove('tl/all_tlobjects.py')
|
|
|
|
|
|
def generate_tlobjects(scheme_file):
|
|
"""Generates all the TLObjects from scheme.tl to tl/functions and tl/types"""
|
|
|
|
# First ensure that the required parent directories exist
|
|
os.makedirs('tl/functions', exist_ok=True)
|
|
os.makedirs('tl/types', exist_ok=True)
|
|
|
|
# Store the parsed file in a tuple for iterating it more than once
|
|
tlobjects = tuple(TLParser.parse_file(scheme_file))
|
|
for tlobject in tlobjects:
|
|
# Determine the output directory and create it
|
|
out_dir = os.path.join('tl',
|
|
'functions' if tlobject.is_function
|
|
else 'types')
|
|
|
|
if tlobject.namespace:
|
|
out_dir = os.path.join(out_dir, tlobject.namespace)
|
|
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
|
|
# Also add this object to __init__.py, so we can import the whole packet at once
|
|
init_py = os.path.join(out_dir, '__init__.py')
|
|
with open(init_py, 'a', encoding='utf-8') as file:
|
|
with SourceBuilder(file) as builder:
|
|
builder.writeln('from {} import {}'.format(
|
|
get_full_file_name(tlobject), get_class_name(tlobject)))
|
|
|
|
# Create the file for this TLObject
|
|
filename = os.path.join(out_dir, get_file_name(tlobject, add_extension=True))
|
|
with open(filename, 'w', encoding='utf-8') as file:
|
|
# Let's build the source code!
|
|
with SourceBuilder(file) as builder:
|
|
# Both types and functions inherit from MTProtoRequest so they all can be sent
|
|
builder.writeln('from tl.mtproto_request import MTProtoRequest')
|
|
builder.writeln()
|
|
builder.writeln()
|
|
builder.writeln('class {}(MTProtoRequest):'.format(get_class_name(tlobject)))
|
|
|
|
# Write the original .tl definition, along with a "generated automatically" message
|
|
builder.writeln('"""Class generated by TLObjects\' generator. '
|
|
'All changes will be ERASED. Original .tl definition below.')
|
|
builder.writeln('{}"""'.format(repr(tlobject)))
|
|
builder.writeln()
|
|
|
|
# First sort the arguments so that those not being a flag come first
|
|
args = sorted([arg for arg in tlobject.args if not arg.flag_indicator],
|
|
key=lambda x: x.is_flag)
|
|
|
|
# Then convert the args to string parameters, the flags having =None
|
|
args = [(arg.name if not arg.is_flag
|
|
else '{}=None'.format(arg.name)) for arg in args
|
|
if not arg.flag_indicator and not arg.generic_definition]
|
|
|
|
# Write the __init__ function
|
|
if args:
|
|
builder.writeln('def __init__(self, {}):'.format(', '.join(args)))
|
|
else:
|
|
builder.writeln('def __init__(self):')
|
|
|
|
# Now update args to have the TLObject arguments, _except_
|
|
# those which are generated automatically: flag indicator and generic definitions.
|
|
# We don't need the generic definitions in Python because arguments can be any type
|
|
args = [arg for arg in tlobject.args
|
|
if not arg.flag_indicator and not arg.generic_definition]
|
|
|
|
if args:
|
|
# Write the docstring, so we know the type of the arguments
|
|
builder.writeln('"""')
|
|
for arg in args:
|
|
if not arg.flag_indicator:
|
|
builder.write(':param {}: Telegram type: «{}».'.format(arg.name, arg.type))
|
|
if arg.is_vector:
|
|
builder.write(' Must be a list.'.format(arg.name))
|
|
if arg.is_generic:
|
|
builder.write(' This should be another MTProtoRequest.')
|
|
builder.writeln()
|
|
builder.writeln('"""')
|
|
|
|
builder.writeln('super().__init__()')
|
|
# Functions have a result object and are confirmed by default
|
|
if tlobject.is_function:
|
|
builder.writeln('self.result = None')
|
|
builder.writeln('self.confirmed = True # Confirmed by default')
|
|
|
|
# Create an attribute that stores the TLObject's constructor ID
|
|
builder.writeln('self.constructor_id = {}'.format(hex(tlobject.id)))
|
|
|
|
# Set the arguments
|
|
if args:
|
|
# Leave an empty line if there are any args
|
|
builder.writeln()
|
|
for arg in args:
|
|
builder.writeln('self.{0} = {0}'.format(arg.name))
|
|
builder.end_block()
|
|
|
|
# Write the on_send(self, writer) function
|
|
builder.writeln('def on_send(self, writer):')
|
|
builder.writeln('writer.write_int(self.constructor_id, signed=False)'
|
|
.format(hex(tlobject.id), tlobject.name))
|
|
|
|
for arg in tlobject.args:
|
|
write_onsend_code(builder, arg, tlobject.args)
|
|
builder.end_block()
|
|
|
|
# Write the on_response(self, reader) function
|
|
builder.writeln('def on_response(self, reader):')
|
|
# Do not read constructor's ID, since that's already been read somewhere else
|
|
if tlobject.is_function:
|
|
builder.writeln('self.result = reader.tgread_object()')
|
|
else:
|
|
if tlobject.args:
|
|
for arg in tlobject.args:
|
|
write_onresponse_code(builder, arg, tlobject.args)
|
|
else:
|
|
# If there were no arguments, we still need an on_response method, and hence "pass" if empty
|
|
builder.writeln('pass')
|
|
builder.end_block()
|
|
|
|
# Write the __repr__(self) and __str__(self) functions
|
|
builder.writeln('def __repr__(self):')
|
|
builder.writeln("return '{}'".format(repr(tlobject)))
|
|
builder.end_block()
|
|
|
|
builder.writeln('def __str__(self):')
|
|
builder.writeln("return {}".format(str(tlobject)))
|
|
# builder.end_block() # There is no need to end the last block
|
|
|
|
# Once all the objects have been generated, we can now group them in a single file
|
|
filename = os.path.join('tl', 'all_tlobjects.py')
|
|
with open(filename, 'w', encoding='utf-8') as file:
|
|
with SourceBuilder(file) as builder:
|
|
builder.writeln('"""File generated by TLObjects\' generator. All changes will be ERASED"""')
|
|
builder.writeln()
|
|
|
|
# First add imports
|
|
for tlobject in tlobjects:
|
|
builder.writeln('import {}'.format(get_full_file_name(tlobject)))
|
|
builder.writeln()
|
|
|
|
# Then create the dictionary containing constructor_id: class
|
|
builder.writeln('tlobjects = {')
|
|
builder.current_indent += 1
|
|
|
|
# Fill the dictionary (0x1a2b3c4f: tl.full.type.path.Class)
|
|
for tlobject in tlobjects:
|
|
builder.writeln('{}: {}.{},'
|
|
.format(hex(tlobject.id), get_full_file_name(tlobject), get_class_name(tlobject)))
|
|
|
|
builder.current_indent -= 1
|
|
builder.writeln('}')
|
|
|
|
|
|
def get_class_name(tlobject):
|
|
"""Gets the class name following the Python style guidelines, in ThisClassFormat"""
|
|
|
|
# Courtesy of http://stackoverflow.com/a/31531797/4759433
|
|
# Also, '_' could be replaced for ' ', then use .title(), and then remove ' '
|
|
result = re.sub(r'_([a-z])', lambda m: m.group(1).upper(), tlobject.name)
|
|
result = result[:1].upper() + result[1:].replace('_', '') # Replace again to fully ensure!
|
|
# If it's a function, let it end with "Request" to identify them more easily
|
|
if tlobject.is_function:
|
|
result += 'Request'
|
|
return result
|
|
|
|
|
|
def get_full_file_name(tlobject):
|
|
"""Gets the full file name for the given TLObject (tl.type.full.path)"""
|
|
|
|
fullname = get_file_name(tlobject, add_extension=False)
|
|
if tlobject.namespace:
|
|
fullname = '{}.{}'.format(tlobject.namespace, fullname)
|
|
|
|
if tlobject.is_function:
|
|
return 'tl.functions.{}'.format(fullname)
|
|
else:
|
|
return 'tl.types.{}'.format(fullname)
|
|
|
|
|
|
def get_file_name(tlobject, add_extension):
|
|
"""Gets the file name in file_name_format.py for the given TLObject"""
|
|
|
|
# Courtesy of http://stackoverflow.com/a/1176023/4759433
|
|
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', tlobject.name)
|
|
result = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
|
|
if add_extension:
|
|
return result + '.py'
|
|
else:
|
|
return result
|
|
|
|
|
|
def write_onsend_code(builder, arg, args, name=None):
|
|
"""
|
|
Writes the write code for the given argument
|
|
:param builder: The source code builder
|
|
:param arg: The argument to write
|
|
:param args: All the other arguments in TLObject same on_send. This is required to determine the flags value
|
|
:param name: The name of the argument. Defaults to «self.argname»
|
|
This argument is an option because it's required when writing Vectors<>
|
|
"""
|
|
|
|
if arg.generic_definition:
|
|
return # Do nothing, this only specifies a later type
|
|
|
|
if name is None:
|
|
name = 'self.{}'.format(arg.name)
|
|
|
|
# The argument may be a flag, only write if it's not None AND if it's not a True type
|
|
# True types are not actually sent, but instead only used to determine the flags
|
|
if arg.is_flag:
|
|
if arg.type == 'true':
|
|
return # Exit, since True type is never written
|
|
else:
|
|
builder.writeln('if {}:'.format(name))
|
|
|
|
if arg.is_vector:
|
|
builder.writeln("writer.write_int(0x1cb5c415, signed=False) # Vector's constructor ID")
|
|
builder.writeln('writer.write_int(len({}))'.format(name))
|
|
builder.writeln('for {}_item in {}:'.format(arg.name, name))
|
|
# Temporary disable .is_vector, not to enter this if again
|
|
arg.is_vector = False
|
|
write_onsend_code(builder, arg, args, name='{}_item'.format(arg.name))
|
|
arg.is_vector = True
|
|
|
|
elif arg.flag_indicator:
|
|
# Calculate the flags with those items which are not None
|
|
builder.writeln('# Calculate the flags. This equals to those flag arguments which are NOT None')
|
|
builder.writeln('flags = 0')
|
|
for flag in args:
|
|
if flag.is_flag:
|
|
builder.writeln('flags |= (1 << {}) if {} else 0'
|
|
.format(flag.flag_index, 'self.{}'.format(flag.name)))
|
|
|
|
builder.writeln('writer.write_int(flags)')
|
|
builder.writeln()
|
|
|
|
elif 'int' == arg.type:
|
|
builder.writeln('writer.write_int({})'.format(name))
|
|
|
|
elif 'long' == arg.type:
|
|
builder.writeln('writer.write_long({})'.format(name))
|
|
|
|
elif 'int128' == arg.type:
|
|
builder.writeln('writer.write_large_int({}, bits=128)'.format(name))
|
|
|
|
elif 'int256' == arg.type:
|
|
builder.writeln('writer.write_large_int({}, bits=256)'.format(name))
|
|
|
|
elif 'double' == arg.type:
|
|
builder.writeln('writer.write_double({})'.format(name))
|
|
|
|
elif 'string' == arg.type:
|
|
builder.writeln('writer.tgwrite_string({})'.format(name))
|
|
|
|
elif 'Bool' == arg.type:
|
|
builder.writeln('writer.tgwrite_bool({})'.format(name))
|
|
|
|
elif 'true' == arg.type: # Awkwardly enough, Telegram has both bool and "true", used in flags
|
|
pass # These are actually NOT written! Only used for flags
|
|
|
|
elif 'bytes' == arg.type:
|
|
builder.writeln('writer.write({})'.format(name))
|
|
|
|
else:
|
|
# Else it may be a custom type
|
|
builder.writeln('{}.on_send(writer)'.format(name))
|
|
|
|
# End vector and flag blocks if required (if we opened them before)
|
|
if arg.is_vector:
|
|
builder.end_block()
|
|
|
|
if arg.is_flag:
|
|
builder.end_block()
|
|
|
|
|
|
def write_onresponse_code(builder, arg, args, name=None):
|
|
"""
|
|
Writes the receive code for the given argument
|
|
|
|
:param builder: The source code builder
|
|
:param arg: The argument to write
|
|
:param args: All the other arguments in TLObject same on_send. This is required to determine the flags value
|
|
:param name: The name of the argument. Defaults to «self.argname»
|
|
This argument is an option because it's required when writing Vectors<>
|
|
"""
|
|
|
|
if arg.generic_definition:
|
|
return # Do nothing, this only specifies a later type
|
|
|
|
if name is None:
|
|
name = 'self.{}'.format(arg.name)
|
|
|
|
# The argument may be a flag, only write that flag was given!
|
|
was_flag = False
|
|
if arg.is_flag:
|
|
was_flag = True
|
|
builder.writeln('if (flags & (1 << {})) != 0:'.format(arg.flag_index))
|
|
# Temporary disable .is_flag not to enter this if again when calling the method recursively
|
|
arg.is_flag = False
|
|
|
|
if arg.is_vector:
|
|
builder.writeln("reader.read_int() # Vector's constructor ID")
|
|
builder.writeln('{} = [] # Initialize an empty list'.format(name))
|
|
builder.writeln('{}_len = reader.read_int()'.format(arg.name))
|
|
builder.writeln('for _ in range({}_len):'.format(arg.name))
|
|
# Temporary disable .is_vector, not to enter this if again
|
|
arg.is_vector = False
|
|
write_onresponse_code(builder, arg, args, name='{}_item'.format(arg.name))
|
|
builder.writeln('{}.append({}_item)'.format(name, arg.name))
|
|
arg.is_vector = True
|
|
|
|
elif arg.flag_indicator:
|
|
# Read the flags, which will indicate what items we should read next
|
|
builder.writeln('flags = reader.read_int()')
|
|
builder.writeln()
|
|
|
|
elif 'int' == arg.type:
|
|
builder.writeln('{} = reader.read_int()'.format(name))
|
|
|
|
elif 'long' == arg.type:
|
|
builder.writeln('{} = reader.read_long()'.format(name))
|
|
|
|
elif 'int128' == arg.type:
|
|
builder.writeln('{} = reader.read_large_int(bits=128)'.format(name))
|
|
|
|
elif 'int256' == arg.type:
|
|
builder.writeln('{} = reader.read_large_int(bits=256)'.format(name))
|
|
|
|
elif 'double' == arg.type:
|
|
builder.writeln('{} = reader.read_double()'.format(name))
|
|
|
|
elif 'string' == arg.type:
|
|
builder.writeln('{} = reader.tgread_string()'.format(name))
|
|
|
|
elif 'Bool' == arg.type:
|
|
builder.writeln('{} = reader.tgread_bool()'.format(name))
|
|
|
|
elif 'true' == arg.type: # Awkwardly enough, Telegram has both bool and "true", used in flags
|
|
builder.writeln('{} = True # Arbitrary not-None value, no need to read since it is a flag'.format(name))
|
|
|
|
elif 'bytes' == arg.type:
|
|
builder.writeln('{} = reader.tgread_bytes()'.format(name))
|
|
|
|
else:
|
|
# Else it may be a custom type
|
|
builder.writeln('{} = reader.tgread_object()'.format(name))
|
|
|
|
# End vector and flag blocks if required (if we opened them before)
|
|
if arg.is_vector:
|
|
builder.end_block()
|
|
|
|
if was_flag:
|
|
builder.end_block()
|
|
# Restore .is_flag
|
|
arg.is_flag = True
|
|
|
|
if __name__ == '__main__':
|
|
if tlobjects_exist():
|
|
print('Detected previous TLObjects. Cleaning...')
|
|
clean_tlobjects()
|
|
|
|
print('Generating TLObjects...')
|
|
generate_tlobjects('scheme.tl')
|
|
print('Done.')
|