import os import re import shutil from parser import SourceBuilder, TLParser def get_output_path(normal_path): return os.path.join('../telethon/tl', normal_path) class TLGenerator: @staticmethod def tlobjects_exist(): """Determines whether the TLObjects were previously generated (hence exist) or not""" return os.path.isfile(get_output_path('all_tlobjects.py')) @staticmethod def clean_tlobjects(): """Cleans the automatically generated TLObjects from disk""" if os.path.isdir(get_output_path('functions')): shutil.rmtree(get_output_path('functions')) if os.path.isdir(get_output_path('types')): shutil.rmtree(get_output_path('types')) if os.path.isfile(get_output_path('all_tlobjects.py')): os.remove(get_output_path('all_tlobjects.py')) @staticmethod def generate_tlobjects(scheme_file): """Generates all the TLObjects from scheme.tl to tl/functions and tl/types""" # First ensure that the required parent directories exist os.makedirs(get_output_path('functions'), exist_ok=True) os.makedirs(get_output_path('types'), exist_ok=True) # Store the parsed file in a tuple for iterating it more than once tlobjects = tuple(TLParser.parse_file(scheme_file)) for tlobject in tlobjects: # Determine the output directory and create it out_dir = get_output_path('functions' if tlobject.is_function else 'types') if tlobject.namespace: out_dir = os.path.join(out_dir, tlobject.namespace) os.makedirs(out_dir, exist_ok=True) # Also add this object to __init__.py, so we can import the whole packet at once init_py = os.path.join(out_dir, '__init__.py') with open(init_py, 'a', encoding='utf-8') as file: with SourceBuilder(file) as builder: builder.writeln('from {} import {}'.format( TLGenerator.get_full_file_name(tlobject), TLGenerator.get_class_name(tlobject))) # Create the file for this TLObject filename = os.path.join(out_dir, TLGenerator.get_file_name(tlobject, add_extension=True)) with open(filename, 'w', encoding='utf-8') as file: # Let's build the source code! with SourceBuilder(file) as builder: # Both types and functions inherit from MTProtoRequest so they all can be sent builder.writeln('from telethon.tl.mtproto_request import MTProtoRequest') builder.writeln() builder.writeln() builder.writeln('class {}(MTProtoRequest):'.format(TLGenerator.get_class_name(tlobject))) # Write the original .tl definition, along with a "generated automatically" message builder.writeln('"""Class generated by TLObjects\' generator. ' 'All changes will be ERASED. Original .tl definition below.') builder.writeln('{}"""'.format(repr(tlobject))) builder.writeln() # First sort the arguments so that those not being a flag come first args = sorted([arg for arg in tlobject.args if not arg.flag_indicator], key=lambda x: x.is_flag) # Then convert the args to string parameters, the flags having =None args = [(arg.name if not arg.is_flag else '{}=None'.format(arg.name)) for arg in args if not arg.flag_indicator and not arg.generic_definition] # Write the __init__ function if args: builder.writeln('def __init__(self, {}):'.format(', '.join(args))) else: builder.writeln('def __init__(self):') # Now update args to have the TLObject arguments, _except_ # those which are generated automatically: flag indicator and generic definitions. # We don't need the generic definitions in Python because arguments can be any type args = [arg for arg in tlobject.args if not arg.flag_indicator and not arg.generic_definition] if args: # Write the docstring, so we know the type of the arguments builder.writeln('"""') for arg in args: if not arg.flag_indicator: builder.write(':param {}: Telegram type: «{}».'.format(arg.name, arg.type)) if arg.is_vector: builder.write(' Must be a list.'.format(arg.name)) if arg.is_generic: builder.write(' This should be another MTProtoRequest.') builder.writeln() builder.writeln('"""') builder.writeln('super().__init__()') # Functions have a result object and are confirmed by default if tlobject.is_function: builder.writeln('self.result = None') builder.writeln('self.confirmed = True # Confirmed by default') # Create an attribute that stores the TLObject's constructor ID builder.writeln('self.constructor_id = {}'.format(hex(tlobject.id))) # Set the arguments if args: # Leave an empty line if there are any args builder.writeln() for arg in args: builder.writeln('self.{0} = {0}'.format(arg.name)) builder.end_block() # Write the on_send(self, writer) function builder.writeln('def on_send(self, writer):') builder.writeln('writer.write_int(self.constructor_id, signed=False)' .format(hex(tlobject.id), tlobject.name)) for arg in tlobject.args: TLGenerator.write_onsend_code(builder, arg, tlobject.args) builder.end_block() # Write the empty() function, which returns an "empty" # instance, in which all attributes are set to None builder.writeln('@staticmethod') builder.writeln('def empty():') builder.writeln('"""Returns an "empty" instance (all attributes are None)"""') builder.writeln('return {}({})'.format( TLGenerator.get_class_name(tlobject), ', '.join('None' for _ in range(len(args))) )) builder.end_block() # Write the on_response(self, reader) function builder.writeln('def on_response(self, reader):') # Do not read constructor's ID, since that's already been read somewhere else if tlobject.is_function: builder.writeln('self.result = reader.tgread_object()') else: if tlobject.args: for arg in tlobject.args: TLGenerator.write_onresponse_code(builder, arg, tlobject.args) else: # If there were no arguments, we still need an on_response method, and hence "pass" if empty builder.writeln('pass') builder.end_block() # Write the __repr__(self) and __str__(self) functions builder.writeln('def __repr__(self):') builder.writeln("return '{}'".format(repr(tlobject))) builder.end_block() builder.writeln('def __str__(self):') builder.writeln("return {}".format(str(tlobject))) # builder.end_block() # There is no need to end the last block # Once all the objects have been generated, we can now group them in a single file filename = os.path.join(get_output_path('all_tlobjects.py')) with open(filename, 'w', encoding='utf-8') as file: with SourceBuilder(file) as builder: builder.writeln('"""File generated by TLObjects\' generator. All changes will be ERASED"""') builder.writeln() # First add imports for tlobject in tlobjects: builder.writeln('import {}'.format(TLGenerator.get_full_file_name(tlobject))) builder.writeln() # Create a variable to indicate which layer this is builder.writeln('layer = {} # Current generated layer'.format(TLParser.find_layer(scheme_file))) builder.writeln() # Then create the dictionary containing constructor_id: class builder.writeln('tlobjects = {') builder.current_indent += 1 # Fill the dictionary (0x1a2b3c4f: tl.full.type.path.Class) for tlobject in tlobjects: builder.writeln('{}: {}.{},' .format(hex(tlobject.id), TLGenerator.get_full_file_name(tlobject), TLGenerator.get_class_name(tlobject))) builder.current_indent -= 1 builder.writeln('}') @staticmethod def get_class_name(tlobject): """Gets the class name following the Python style guidelines, in ThisClassFormat""" # Courtesy of http://stackoverflow.com/a/31531797/4759433 # Also, '_' could be replaced for ' ', then use .title(), and then remove ' ' result = re.sub(r'_([a-z])', lambda m: m.group(1).upper(), tlobject.name) result = result[:1].upper() + result[1:].replace('_', '') # Replace again to fully ensure! # If it's a function, let it end with "Request" to identify them more easily if tlobject.is_function: result += 'Request' return result @staticmethod def get_full_file_name(tlobject): """Gets the full file name for the given TLObject (tl.type.full.path)""" fullname = TLGenerator.get_file_name(tlobject, add_extension=False) if tlobject.namespace: fullname = '{}.{}'.format(tlobject.namespace, fullname) if tlobject.is_function: return 'telethon.tl.functions.{}'.format(fullname) else: return 'telethon.tl.types.{}'.format(fullname) @staticmethod def get_file_name(tlobject, add_extension): """Gets the file name in file_name_format.py for the given TLObject""" # Courtesy of http://stackoverflow.com/a/1176023/4759433 s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', tlobject.name) result = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower() if add_extension: return result + '.py' else: return result @staticmethod def write_onsend_code(builder, arg, args, name=None): """ Writes the write code for the given argument :param builder: The source code builder :param arg: The argument to write :param args: All the other arguments in TLObject same on_send. This is required to determine the flags value :param name: The name of the argument. Defaults to «self.argname» This argument is an option because it's required when writing Vectors<> """ if arg.generic_definition: return # Do nothing, this only specifies a later type if name is None: name = 'self.{}'.format(arg.name) # The argument may be a flag, only write if it's not None AND if it's not a True type # True types are not actually sent, but instead only used to determine the flags if arg.is_flag: if arg.type == 'true': return # Exit, since True type is never written else: builder.writeln('if {}:'.format(name)) if arg.is_vector: builder.writeln("writer.write_int(0x1cb5c415, signed=False) # Vector's constructor ID") builder.writeln('writer.write_int(len({}))'.format(name)) builder.writeln('for {}_item in {}:'.format(arg.name, name)) # Temporary disable .is_vector, not to enter this if again arg.is_vector = False TLGenerator.write_onsend_code(builder, arg, args, name='{}_item'.format(arg.name)) arg.is_vector = True elif arg.flag_indicator: # Calculate the flags with those items which are not None builder.writeln('# Calculate the flags. This equals to those flag arguments which are NOT None') builder.writeln('flags = 0') for flag in args: if flag.is_flag: builder.writeln('flags |= (1 << {}) if {} else 0' .format(flag.flag_index, 'self.{}'.format(flag.name))) builder.writeln('writer.write_int(flags)') builder.writeln() elif 'int' == arg.type: builder.writeln('writer.write_int({})'.format(name)) elif 'long' == arg.type: builder.writeln('writer.write_long({})'.format(name)) elif 'int128' == arg.type: builder.writeln('writer.write_large_int({}, bits=128)'.format(name)) elif 'int256' == arg.type: builder.writeln('writer.write_large_int({}, bits=256)'.format(name)) elif 'double' == arg.type: builder.writeln('writer.write_double({})'.format(name)) elif 'string' == arg.type: builder.writeln('writer.tgwrite_string({})'.format(name)) elif 'Bool' == arg.type: builder.writeln('writer.tgwrite_bool({})'.format(name)) elif 'true' == arg.type: # Awkwardly enough, Telegram has both bool and "true", used in flags pass # These are actually NOT written! Only used for flags elif 'bytes' == arg.type: builder.writeln('writer.tgwrite_bytes({})'.format(name)) elif 'date' == arg.type: # Custom format builder.writeln('writer.tgwrite_date({})'.format(name)) else: # Else it may be a custom type builder.writeln('{}.on_send(writer)'.format(name)) # End vector and flag blocks if required (if we opened them before) if arg.is_vector: builder.end_block() if arg.is_flag: builder.end_block() @staticmethod def write_onresponse_code(builder, arg, args, name=None): """ Writes the receive code for the given argument :param builder: The source code builder :param arg: The argument to write :param args: All the other arguments in TLObject same on_send. This is required to determine the flags value :param name: The name of the argument. Defaults to «self.argname» This argument is an option because it's required when writing Vectors<> """ if arg.generic_definition: return # Do nothing, this only specifies a later type if name is None: name = 'self.{}'.format(arg.name) # The argument may be a flag, only write that flag was given! was_flag = False if arg.is_flag: was_flag = True builder.writeln('if (flags & (1 << {})) != 0:'.format(arg.flag_index)) # Temporary disable .is_flag not to enter this if again when calling the method recursively arg.is_flag = False if arg.is_vector: builder.writeln("reader.read_int() # Vector's constructor ID") builder.writeln('{} = [] # Initialize an empty list'.format(name)) builder.writeln('{}_len = reader.read_int()'.format(arg.name)) builder.writeln('for _ in range({}_len):'.format(arg.name)) # Temporary disable .is_vector, not to enter this if again arg.is_vector = False TLGenerator.write_onresponse_code(builder, arg, args, name='{}_item'.format(arg.name)) builder.writeln('{}.append({}_item)'.format(name, arg.name)) arg.is_vector = True elif arg.flag_indicator: # Read the flags, which will indicate what items we should read next builder.writeln('flags = reader.read_int()') builder.writeln() elif 'int' == arg.type: builder.writeln('{} = reader.read_int()'.format(name)) elif 'long' == arg.type: builder.writeln('{} = reader.read_long()'.format(name)) elif 'int128' == arg.type: builder.writeln('{} = reader.read_large_int(bits=128)'.format(name)) elif 'int256' == arg.type: builder.writeln('{} = reader.read_large_int(bits=256)'.format(name)) elif 'double' == arg.type: builder.writeln('{} = reader.read_double()'.format(name)) elif 'string' == arg.type: builder.writeln('{} = reader.tgread_string()'.format(name)) elif 'Bool' == arg.type: builder.writeln('{} = reader.tgread_bool()'.format(name)) elif 'true' == arg.type: # Awkwardly enough, Telegram has both bool and "true", used in flags builder.writeln('{} = True # Arbitrary not-None value, no need to read since it is a flag'.format(name)) elif 'bytes' == arg.type: builder.writeln('{} = reader.tgread_bytes()'.format(name)) elif 'date' == arg.type: # Custom format builder.writeln('{} = reader.tgread_date()'.format(name)) else: # Else it may be a custom type builder.writeln('{} = reader.tgread_object()'.format(name)) # End vector and flag blocks if required (if we opened them before) if arg.is_vector: builder.end_block() if was_flag: builder.end_block() # Restore .is_flag arg.is_flag = True if __name__ == '__main__': if TLGenerator.tlobjects_exist(): print('Detected previous TLObjects. Cleaning...') TLGenerator.clean_tlobjects() print('Generating TLObjects...') TLGenerator.generate_tlobjects('scheme.tl') print('Done.')