import os import re import shutil from zlib import crc32 from collections import defaultdict from .parser import SourceBuilder, TLParser AUTO_GEN_NOTICE = \ '"""File generated by TLObjects\' generator. All changes will be ERASED"""' class TLGenerator: def __init__(self, output_dir): self.output_dir = output_dir def _get_file(self, *paths): return os.path.join(self.output_dir, *paths) def _rm_if_exists(self, filename): file = self._get_file(filename) if os.path.exists(file): if os.path.isdir(file): shutil.rmtree(file) else: os.remove(file) def tlobjects_exist(self): """Determines whether the TLObjects were previously generated (hence exist) or not """ return os.path.isfile(self._get_file('all_tlobjects.py')) def clean_tlobjects(self): """Cleans the automatically generated TLObjects from disk""" for name in ('functions', 'types', 'all_tlobjects.py'): self._rm_if_exists(name) def generate_tlobjects(self, scheme_file, import_depth): """Generates all the TLObjects from scheme.tl to tl/functions and tl/types """ # First ensure that the required parent directories exist os.makedirs(self._get_file('functions'), exist_ok=True) os.makedirs(self._get_file('types'), exist_ok=True) # Step 0: Cache the parsed file on a tuple tlobjects = tuple(TLParser.parse_file(scheme_file, ignore_core=True)) # Step 1: Group everything by {namespace: [tlobjects]} so we can # easily generate __init__.py files with all the TLObjects on them. namespace_functions = defaultdict(list) namespace_types = defaultdict(list) # Make use of this iteration to also store 'Type: [Constructors]', # used when generating the documentation for the classes. type_constructors = defaultdict(list) for tlobject in tlobjects: if tlobject.is_function: namespace_functions[tlobject.namespace].append(tlobject) else: namespace_types[tlobject.namespace].append(tlobject) type_constructors[tlobject.result].append(tlobject) # Step 2: Generate the actual code self._write_init_py( self._get_file('functions'), import_depth, namespace_functions, type_constructors ) self._write_init_py( self._get_file('types'), import_depth, namespace_types, type_constructors ) # Step 4: Once all the objects have been generated, # we can now group them in a single file filename = os.path.join(self._get_file('all_tlobjects.py')) with open(filename, 'w', encoding='utf-8') as file: with SourceBuilder(file) as builder: builder.writeln(AUTO_GEN_NOTICE) builder.writeln() builder.writeln('from . import types, functions') builder.writeln() # Create a variable to indicate which layer this is builder.writeln('layer = {} # Current generated layer'.format( TLParser.find_layer(scheme_file))) builder.writeln() # Then create the dictionary containing constructor_id: class builder.writeln('tlobjects = {') builder.current_indent += 1 # Fill the dictionary (0x1a2b3c4f: tl.full.type.path.Class) for tlobject in tlobjects: constructor = hex(tlobject.id) if len(constructor) != 10: # Make it a nice length 10 so it fits well constructor = '0x' + constructor[2:].zfill(8) builder.write('{}: '.format(constructor)) builder.write( 'functions' if tlobject.is_function else 'types') if tlobject.namespace: builder.write('.' + tlobject.namespace) builder.writeln('.{},'.format( TLGenerator.get_class_name(tlobject))) builder.current_indent -= 1 builder.writeln('}') @staticmethod def _write_init_py(out_dir, depth, namespace_tlobjects, type_constructors): # namespace_tlobjects: {'namespace', [TLObject]} os.makedirs(out_dir, exist_ok=True) for ns, tlobjects in namespace_tlobjects.items(): file = os.path.join(out_dir, ns + '.py' if ns else '__init__.py') with open(file, 'w', encoding='utf-8') as f, \ SourceBuilder(f) as builder: builder.writeln(AUTO_GEN_NOTICE) # Both types and functions inherit from the TLObject class # so they all can be serialized and sent, however, only the # functions are "content_related". builder.writeln( 'from {}.tl.tlobject import TLObject'.format('.' * depth) ) # Add the relative imports to the namespaces, # unless we already are in a namespace. if not ns: builder.writeln('from . import {}'.format(', '.join( x for x in namespace_tlobjects.keys() if x ))) # Generate the class for every TLObject for t in sorted(tlobjects, key=lambda x: x.name): TLGenerator._write_source_code( t, builder, depth, type_constructors ) while builder.current_indent != 0: builder.end_block() @staticmethod def _write_source_code(tlobject, builder, depth, type_constructors): """Writes the source code corresponding to the given TLObject by making use of the 'builder' SourceBuilder. Additional information such as file path depth and the Type: [Constructors] must be given for proper importing and documentation strings. """ if tlobject.is_function: util_imports = set() for a in tlobject.args: # We can automatically convert some "full" types to # "input only" (like User -> InputPeerUser, etc.) if a.type == 'InputPeer': util_imports.add('get_input_peer') elif a.type == 'InputChannel': util_imports.add('get_input_channel') elif a.type == 'InputUser': util_imports.add('get_input_user') if util_imports: builder.writeln('from {}.utils import {}'.format( '.' * depth, ', '.join(util_imports))) if any(a for a in tlobject.args if a.can_be_inferred): # Currently only 'random_id' needs 'os' to be imported builder.writeln('import os') builder.writeln() builder.writeln() builder.writeln('class {}(TLObject):'.format( TLGenerator.get_class_name(tlobject))) # Class-level variable to store its Telegram's constructor ID builder.writeln('constructor_id = {}'.format(hex(tlobject.id))) builder.writeln('subclass_of_id = {}'.format( hex(crc32(tlobject.result.encode('ascii'))))) builder.writeln() # Flag arguments must go last args = [ a for a in tlobject.sorted_args() if not a.flag_indicator and not a.generic_definition ] # Convert the args to string parameters, flags having =None args = [ (a.name if not a.is_flag and not a.can_be_inferred else '{}=None'.format(a.name)) for a in args ] # Write the __init__ function if args: builder.writeln( 'def __init__(self, {}):'.format(', '.join(args)) ) else: builder.writeln('def __init__(self):') # Now update args to have the TLObject arguments, _except_ # those which are calculated on send or ignored, this is # flag indicator and generic definitions. # # We don't need the generic definitions in Python # because arguments can be any type args = [arg for arg in tlobject.args if not arg.flag_indicator and not arg.generic_definition] if args: # Write the docstring, to know the type of the args builder.writeln('"""') for arg in args: if not arg.flag_indicator: builder.write( ':param {}: Telegram type: "{}".' .format(arg.name, arg.type) ) if arg.is_vector: builder.write(' Must be a list.'.format(arg.name)) if arg.is_generic: builder.write(' Must be another TLObject request.') builder.writeln() # We also want to know what type this request returns # or to which type this constructor belongs to builder.writeln() if tlobject.is_function: builder.write(':returns {}: '.format(tlobject.result)) else: builder.write('Constructor for {}: '.format(tlobject.result)) constructors = type_constructors[tlobject.result] if not constructors: builder.writeln('This type has no constructors.') elif len(constructors) == 1: builder.writeln('Instance of {}.'.format( TLGenerator.get_class_name(constructors[0]) )) else: builder.writeln('Instance of either {}.'.format( ', '.join(TLGenerator.get_class_name(c) for c in constructors) )) builder.writeln('"""') builder.writeln('super().__init__()') # Functions have a result object and are confirmed by default if tlobject.is_function: builder.writeln('self.result = None') builder.writeln( 'self.content_related = True') # Set the arguments if args: # Leave an empty line if there are any args builder.writeln() for arg in args: TLGenerator._write_self_assigns(builder, tlobject, arg, args) builder.end_block() # Write the to_dict(self) method if args: builder.writeln('def to_dict(self):') builder.writeln('return {') builder.current_indent += 1 base_types = ('string', 'bytes', 'int', 'long', 'int128', 'int256', 'double', 'Bool', 'true', 'date') for arg in args: builder.write("'{}': ".format(arg.name)) if arg.type in base_types: if arg.is_vector: builder.write( '[] if self.{0} is None else self.{0}[:]' .format(arg.name) ) else: builder.write('self.{}'.format(arg.name)) else: if arg.is_vector: builder.write( '[] if self.{0} is None else [None ' 'if x is None else x.to_dict() for x in self.{0}]' .format(arg.name) ) else: builder.write( 'None if self.{0} is None else self.{0}.to_dict()' .format(arg.name) ) builder.writeln(',') builder.current_indent -= 1 builder.writeln("}") else: builder.writeln('@staticmethod') builder.writeln('def to_dict():') builder.writeln('return {}') builder.end_block() # Write the on_send(self, writer) function builder.writeln('def on_send(self, writer):') builder.writeln( 'writer.write_int({}.constructor_id, signed=False)' .format(TLGenerator.get_class_name(tlobject))) for arg in tlobject.args: TLGenerator.write_onsend_code(builder, arg, tlobject.args) builder.end_block() # Write the empty() function, which returns an "empty" # instance, in which all attributes are set to None builder.writeln('@staticmethod') builder.writeln('def empty():') builder.writeln('return {}({})'.format( TLGenerator.get_class_name(tlobject), ', '.join( 'None' for _ in range(len(args))))) builder.end_block() # Write the on_response(self, reader) function builder.writeln('def on_response(self, reader):') # Do not read constructor's ID, since # that's already been read somewhere else if tlobject.is_function: TLGenerator.write_request_result_code(builder, tlobject) else: if tlobject.args: for arg in tlobject.args: TLGenerator.write_onresponse_code( builder, arg, tlobject.args) else: # If there were no arguments, we still need an # on_response method, and hence "pass" if empty builder.writeln('pass') builder.end_block() # Write the __repr__(self) and __str__(self) functions builder.writeln('def __repr__(self):') builder.writeln("return '{}'".format(repr(tlobject))) builder.end_block() builder.writeln('def __str__(self):') builder.writeln('return TLObject.pretty_format(self)') builder.end_block() builder.writeln('def stringify(self):') builder.writeln('return TLObject.pretty_format(self, indent=0)') # builder.end_block() # No need to end the last block @staticmethod def _write_self_assigns(builder, tlobject, arg, args): if arg.can_be_inferred: # Currently the only argument that can be # inferred are those called 'random_id' if arg.name == 'random_id': # Endianness doesn't really matter, and 'big' is shorter code = "int.from_bytes(os.urandom({}), 'big', signed=True)"\ .format(8 if arg.type == 'long' else 4) if arg.is_vector: # Currently for the case of "messages.forwardMessages" # Ensure we can infer the length from id:Vector<> if not next(a for a in args if a.name == 'id').is_vector: raise ValueError( 'Cannot infer list of random ids for ', tlobject ) code = '[{} for _ in range(len(id))]'.format(code) builder.writeln( "self.random_id = random_id if random_id " "is not None else {}".format(code) ) else: raise ValueError('Cannot infer a value for ', arg) # Well-known cases, auto-cast it to the right type elif arg.type == 'InputPeer' and tlobject.is_function: TLGenerator.write_get_input(builder, arg, 'get_input_peer') elif arg.type == 'InputChannel' and tlobject.is_function: TLGenerator.write_get_input(builder, arg, 'get_input_channel') elif arg.type == 'InputUser' and tlobject.is_function: TLGenerator.write_get_input(builder, arg, 'get_input_user') else: builder.writeln('self.{0} = {0}'.format(arg.name)) @staticmethod def write_get_input(builder, arg, get_input_code): """Returns "True" if the get_input_* code was written when assigning a parameter upon creating the request. Returns False otherwise """ if arg.is_vector: builder.writeln( 'self.{0} = [{1}(_x) for _x in {0}]' .format(arg.name, get_input_code) ) pass else: builder.writeln( 'self.{0} = {1}({0})'.format(arg.name, get_input_code) ) @staticmethod def get_class_name(tlobject): """Gets the class name following the Python style guidelines""" # Courtesy of http://stackoverflow.com/a/31531797/4759433 result = re.sub(r'_([a-z])', lambda m: m.group(1).upper(), tlobject.name) result = result[:1].upper() + result[1:].replace( '_', '') # Replace again to fully ensure! # If it's a function, let it end with "Request" to identify them if tlobject.is_function: result += 'Request' return result @staticmethod def get_file_name(tlobject, add_extension=False): """Gets the file name in file_name_format.py for the given TLObject""" # Courtesy of http://stackoverflow.com/a/1176023/4759433 s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', tlobject.name) result = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower() if add_extension: return result + '.py' else: return result @staticmethod def write_onsend_code(builder, arg, args, name=None): """ Writes the write code for the given argument :param builder: The source code builder :param arg: The argument to write :param args: All the other arguments in TLObject same on_send. This is required to determine the flags value :param name: The name of the argument. Defaults to "self.argname" This argument is an option because it's required when writing Vectors<> """ if arg.generic_definition: return # Do nothing, this only specifies a later type if name is None: name = 'self.{}'.format(arg.name) # The argument may be a flag, only write if it's not None AND # if it's not a True type. # True types are not actually sent, but instead only used to # determine the flags. if arg.is_flag: if arg.type == 'true': return # Exit, since True type is never written else: builder.writeln('if {}:'.format(name)) if arg.is_vector: if arg.use_vector_id: builder.writeln('writer.write_int(0x1cb5c415, signed=False)') builder.writeln('writer.write_int(len({}))'.format(name)) builder.writeln('for _x in {}:'.format(name)) # Temporary disable .is_vector, not to enter this if again arg.is_vector = False TLGenerator.write_onsend_code(builder, arg, args, name='_x') arg.is_vector = True elif arg.flag_indicator: # Calculate the flags with those items which are not None builder.writeln('flags = 0') for flag in args: if flag.is_flag: builder.writeln('flags |= (1 << {}) if {} else 0'.format( flag.flag_index, 'self.{}'.format(flag.name))) builder.writeln('writer.write_int(flags)') builder.writeln() elif 'int' == arg.type: builder.writeln('writer.write_int({})'.format(name)) elif 'long' == arg.type: builder.writeln('writer.write_long({})'.format(name)) elif 'int128' == arg.type: builder.writeln('writer.write_large_int({}, bits=128)'.format( name)) elif 'int256' == arg.type: builder.writeln('writer.write_large_int({}, bits=256)'.format( name)) elif 'double' == arg.type: builder.writeln('writer.write_double({})'.format(name)) elif 'string' == arg.type: builder.writeln('writer.tgwrite_string({})'.format(name)) elif 'Bool' == arg.type: builder.writeln('writer.tgwrite_bool({})'.format(name)) elif 'true' == arg.type: pass # These are actually NOT written! Only used for flags elif 'bytes' == arg.type: builder.writeln('writer.tgwrite_bytes({})'.format(name)) elif 'date' == arg.type: # Custom format builder.writeln('writer.tgwrite_date({})'.format(name)) else: # Else it may be a custom type builder.writeln('{}.on_send(writer)'.format(name)) # End vector and flag blocks if required (if we opened them before) if arg.is_vector: builder.end_block() if arg.is_flag: builder.end_block() @staticmethod def write_onresponse_code(builder, arg, args, name=None): """ Writes the receive code for the given argument :param builder: The source code builder :param arg: The argument to write :param args: All the other arguments in TLObject same on_send. This is required to determine the flags value :param name: The name of the argument. Defaults to "self.argname" This argument is an option because it's required when writing Vectors<> """ if arg.generic_definition: return # Do nothing, this only specifies a later type if name is None: name = 'self.{}'.format(arg.name) # The argument may be a flag, only write that flag was given! was_flag = False if arg.is_flag: was_flag = True builder.writeln('if (flags & (1 << {})) != 0:'.format( arg.flag_index )) # Temporary disable .is_flag not to enter this if # again when calling the method recursively arg.is_flag = False if arg.is_vector: if arg.use_vector_id: # We have to read the vector's constructor ID builder.writeln("reader.read_int()") builder.writeln('{} = []'.format(name)) builder.writeln('_len = reader.read_int()') builder.writeln('for _ in range(_len):') # Temporary disable .is_vector, not to enter this if again arg.is_vector = False TLGenerator.write_onresponse_code(builder, arg, args, name='_x') builder.writeln('{}.append(_x)'.format(name)) arg.is_vector = True elif arg.flag_indicator: # Read the flags, which will indicate what items we should read next builder.writeln('flags = reader.read_int()') builder.writeln() elif 'int' == arg.type: builder.writeln('{} = reader.read_int()'.format(name)) elif 'long' == arg.type: builder.writeln('{} = reader.read_long()'.format(name)) elif 'int128' == arg.type: builder.writeln( '{} = reader.read_large_int(bits=128)'.format(name) ) elif 'int256' == arg.type: builder.writeln( '{} = reader.read_large_int(bits=256)'.format(name) ) elif 'double' == arg.type: builder.writeln('{} = reader.read_double()'.format(name)) elif 'string' == arg.type: builder.writeln('{} = reader.tgread_string()'.format(name)) elif 'Bool' == arg.type: builder.writeln('{} = reader.tgread_bool()'.format(name)) elif 'true' == arg.type: # Arbitrary not-None value, don't actually read "true" flags builder.writeln('{} = True'.format(name)) elif 'bytes' == arg.type: builder.writeln('{} = reader.tgread_bytes()'.format(name)) elif 'date' == arg.type: # Custom format builder.writeln('{} = reader.tgread_date()'.format(name)) else: # Else it may be a custom type builder.writeln('{} = reader.tgread_object()'.format(name)) # End vector and flag blocks if required (if we opened them before) if arg.is_vector: builder.end_block() if was_flag: builder.end_block() # Restore .is_flag arg.is_flag = True @staticmethod def write_request_result_code(builder, tlobject): """ Writes the receive code for the given function :param builder: The source code builder :param tlobject: The TLObject for which the 'self.result = ' will be written """ if tlobject.result.startswith('Vector<'): # Vector results are a bit special since they can also be composed # of integer values and such; however, the result of requests is # not parsed as arguments are and it's a bit harder to tell which # is which. if tlobject.result == 'Vector': builder.writeln('reader.read_int() # Vector id') builder.writeln('count = reader.read_int()') builder.writeln( 'self.result = [reader.read_int() for _ in range(count)]' ) elif tlobject.result == 'Vector': builder.writeln('reader.read_int() # Vector id') builder.writeln('count = reader.read_long()') builder.writeln( 'self.result = [reader.read_long() for _ in range(count)]' ) else: builder.writeln('self.result = reader.tgread_vector()') else: builder.writeln('self.result = reader.tgread_object()')