import functools import os import re import shutil import struct from collections import defaultdict from zlib import crc32 from ..sourcebuilder import SourceBuilder from ..utils import snake_to_camel_case AUTO_GEN_NOTICE = \ '"""File generated by TLObjects\' generator. All changes will be ERASED"""' AUTO_CASTS = { 'InputPeer': 'utils.get_input_peer(await client.get_input_entity({}))', 'InputChannel': 'utils.get_input_channel(await client.get_input_entity({}))', 'InputUser': 'utils.get_input_user(await client.get_input_entity({}))', 'InputDialogPeer': 'await client._get_input_dialog({})', 'InputNotifyPeer': 'await client._get_input_notify({})', 'InputMedia': 'utils.get_input_media({})', 'InputPhoto': 'utils.get_input_photo({})', 'InputMessage': 'utils.get_input_message({})', 'InputDocument': 'utils.get_input_document({})', 'InputChatPhoto': 'utils.get_input_chat_photo({})', 'InputGroupCall': 'utils.get_input_group_call({})', } NAMED_AUTO_CASTS = { ('chat_id', 'int'): 'await client.get_peer_id({})' } # Secret chats have a chat_id which may be negative. # With the named auto-cast above, we would break it. # However there are plenty of other legit requests # with `chat_id:int` where it is useful. # # NOTE: This works because the auto-cast is not recursive. # There are plenty of types that would break if we # did recurse into them to resolve them. NAMED_BLACKLIST = { 'messages.discardEncryption' } BASE_TYPES = ('string', 'bytes', 'int', 'long', 'int128', 'int256', 'double', 'Bool', 'true', 'date') def _write_modules( out_dir, in_mod, kind, namespace_tlobjects, type_constructors, layer, all_tlobjects): # namespace_tlobjects: {'namespace', [TLObject]} out_dir.mkdir(parents=True, exist_ok=True) for ns, tlobjects in namespace_tlobjects.items(): file = out_dir / '{}.py'.format(ns or '__init__') with file.open('w') as f, SourceBuilder(f) as builder: builder.writeln(AUTO_GEN_NOTICE) if kind == 'TLObject': builder.writeln('from .._misc.tlobject import TLObject, TLRequest') builder.writeln('from . import fn') else: builder.writeln('from ..._misc.tlobject import TLObject, TLRequest') builder.writeln('from typing import Optional, List, ' 'Union, TYPE_CHECKING') # Add the relative imports to the namespaces, # unless we already are in a namespace. if not ns: builder.writeln('from . import {}', ', '.join(sorted( x for x in namespace_tlobjects.keys() if x ))) # Import 'os' for those needing access to 'os.urandom()' # Currently only 'random_id' needs 'os' to be imported, # for all those TLObjects with arg.can_be_inferred. builder.writeln('import os') # Import struct for the .__bytes__(self) serialization builder.writeln('import struct') # Import datetime for type hinting builder.writeln('from datetime import datetime') tlobjects.sort(key=lambda x: x.name) type_names = set() type_defs = [] # Find all the types in this file and generate type definitions # based on the types. The type definitions are written to the # file at the end. for t in tlobjects: if not t.is_function: type_name = t.result if '.' in type_name: type_name = type_name[type_name.rindex('.'):] if type_name in type_names: continue type_names.add(type_name) constructors = type_constructors[type_name] if not constructors: pass elif len(constructors) == 1: type_defs.append('Type{} = {}'.format( type_name, constructors[0].class_name)) else: type_defs.append('Type{} = Union[{}]'.format( type_name, ','.join(c.class_name for c in constructors))) imports = {} primitives = {'int', 'long', 'int128', 'int256', 'double', 'string', 'date', 'bytes', 'Bool', 'true'} # Find all the types in other files that are used in this file # and generate the information required to import those types. for t in tlobjects: for arg in t.args: name = arg.type if not name or name in primitives: continue if kind == 'TLObject': import_space = '.' else: import_space = '..' if '.' in name: namespace = name.split('.')[0] name = name.split('.')[1] import_space += '.{}'.format(namespace) if name not in type_names: type_names.add(name) if name == 'date': imports['datetime'] = ['datetime'] continue elif import_space not in imports: imports[import_space] = set() imports[import_space].add('Type{}'.format(name)) # Add imports required for type checking if imports: builder.writeln('if TYPE_CHECKING:') for namespace, names in imports.items(): builder.writeln('from {} import {}', namespace, ', '.join(sorted(names))) builder.end_block() # Generate the class for every TLObject for t in tlobjects: _write_source_code(t, kind, builder, type_constructors) builder.current_indent = 0 # Write the type definitions generated earlier. builder.writeln() for line in type_defs: builder.writeln(line) if not ns and kind == 'TLObject': _write_all_tlobjects(all_tlobjects, layer, builder) def _write_source_code(tlobject, kind, builder, type_constructors): """ Writes the source code corresponding to the given TLObject by making use of the ``builder`` `SourceBuilder`. Additional information such as file path depth and the ``Type: [Constructors]`` must be given for proper importing and documentation strings. """ _write_class_init(tlobject, kind, type_constructors, builder) _write_resolve(tlobject, builder) _write_to_bytes(tlobject, builder) _write_from_reader(tlobject, builder) _write_read_result(tlobject, builder) def _write_class_init(tlobject, kind, type_constructors, builder): builder.writeln() builder.writeln() builder.writeln('class {}({}):', tlobject.class_name, kind) # Define slots to help reduce the size of the objects a little bit. # It's also good for knowing what fields an object has. builder.write('__slots__ = (') sep = '' for arg in tlobject.real_args: builder.write('{}{!r},', sep, arg.name) sep = ' ' builder.writeln(')') # Class-level variable to store its Telegram's constructor ID builder.writeln('CONSTRUCTOR_ID = {:#x}', tlobject.id) builder.writeln('SUBCLASS_OF_ID = {:#x}', crc32(tlobject.result.encode('ascii'))) builder.writeln() # Convert the args to string parameters, flags having =None args = ['{}: {}{}'.format( a.name, a.type_hint(), '=None' if a.is_flag or a.can_be_inferred else '') for a in tlobject.real_args ] # Write the __init__ function if it has any argument if not tlobject.real_args: return if any(a.name in __builtins__ for a in tlobject.real_args): builder.writeln('# noinspection PyShadowingBuiltins') builder.writeln("def __init__({}):", ', '.join(['self'] + args)) builder.writeln('"""') if tlobject.is_function: builder.write(':returns {}: ', tlobject.result) else: builder.write('Constructor for {}: ', tlobject.result) constructors = type_constructors[tlobject.result] if not constructors: builder.writeln('This type has no constructors.') elif len(constructors) == 1: builder.writeln('Instance of {}.', constructors[0].class_name) else: builder.writeln('Instance of either {}.', ', '.join( c.class_name for c in constructors)) builder.writeln('"""') # Set the arguments for arg in tlobject.real_args: if not arg.can_be_inferred: builder.writeln('self.{0} = {0}', arg.name) # Currently the only argument that can be # inferred are those called 'random_id' elif arg.name == 'random_id': # Endianness doesn't really matter, and 'big' is shorter code = "int.from_bytes(os.urandom({}), 'big', signed=True)" \ .format(8 if arg.type == 'long' else 4) if arg.is_vector: # Currently for the case of "messages.forwardMessages" # Ensure we can infer the length from id:Vector<> if not next(a for a in tlobject.real_args if a.name == 'id').is_vector: raise ValueError( 'Cannot infer list of random ids for ', tlobject ) code = '[{} for _ in range(len(id))]'.format(code) builder.writeln( "self.random_id = random_id if random_id " "is not None else {}", code ) else: raise ValueError('Cannot infer a value for ', arg) builder.end_block() def _write_resolve(tlobject, builder): if tlobject.is_function and any( (arg.type in AUTO_CASTS or ((arg.name, arg.type) in NAMED_AUTO_CASTS and tlobject.fullname not in NAMED_BLACKLIST)) for arg in tlobject.real_args ): builder.writeln('async def resolve(self, client, utils):') for arg in tlobject.real_args: ac = AUTO_CASTS.get(arg.type) if not ac: ac = NAMED_AUTO_CASTS.get((arg.name, arg.type)) if not ac: continue if arg.is_flag: builder.writeln('if self.{}:', arg.name) if arg.is_vector: builder.writeln('_tmp = []') builder.writeln('for _x in self.{0}:', arg.name) builder.writeln('_tmp.append({})', ac.format('_x')) builder.end_block() builder.writeln('self.{} = _tmp', arg.name) else: builder.writeln('self.{} = {}', arg.name, ac.format('self.' + arg.name)) if arg.is_flag: builder.end_block() builder.end_block() def _write_to_bytes(tlobject, builder): builder.writeln('def _bytes(self):') # Some objects require more than one flag parameter to be set # at the same time. In this case, add an assertion. repeated_args = defaultdict(list) for arg in tlobject.args: if arg.is_flag: repeated_args[arg.flag_index].append(arg) for ra in repeated_args.values(): if len(ra) > 1: cnd1 = ('(self.{0} or self.{0} is not None)' .format(a.name) for a in ra) cnd2 = ('(self.{0} is None or self.{0} is False)' .format(a.name) for a in ra) builder.writeln( "assert ({}) or ({}), '{} parameters must all " "be False-y (like None) or all me True-y'", ' and '.join(cnd1), ' and '.join(cnd2), ', '.join(a.name for a in ra) ) builder.writeln("return b''.join((") builder.current_indent += 1 # First constructor code, we already know its bytes builder.writeln('{},', repr(struct.pack('/Vector. # If this weren't the case, we should check upper case after # max(index('<'), index('.')) (and if it is, it's boxed, so return). m = re.match(r'Vector<(int|long)>', tlobject.result) if not m: return builder.end_block() builder.writeln('@staticmethod') builder.writeln('def read_result(reader):') builder.writeln('reader.read_int() # Vector ID') builder.writeln('return [reader.read_{}() ' 'for _ in range(reader.read_int())]', m.group(1)) def _write_arg_to_bytes(builder, arg, tlobject, name=None): """ Writes the .__bytes__() code for the given argument :param builder: The source code builder :param arg: The argument to write :param tlobject: The parent TLObject :param name: The name of the argument. Defaults to "self.argname" This argument is an option because it's required when writing Vectors<> """ if arg.generic_definition: return # Do nothing, this only specifies a later type if name is None: name = 'self.{}'.format(arg.name) # The argument may be a flag, only write if it's not None AND # if it's not a True type. # True types are not actually sent, but instead only used to # determine the flags. if arg.is_flag: if arg.type == 'true': return # Exit, since True type is never written elif arg.is_vector: # Vector flags are special since they consist of 3 values, # so we need an extra join here. Note that empty vector flags # should NOT be sent either! builder.write("b'' if {0} is None or {0} is False " "else b''.join((", name) elif 'Bool' == arg.type: # `False` is a valid value for this type, so only check for `None`. builder.write("b'' if {0} is None else (", name) else: builder.write("b'' if {0} is None or {0} is False " "else (", name) if arg.is_vector: if arg.use_vector_id: # vector code, unsigned 0x1cb5c415 as little endian builder.write(r"b'\x15\xc4\xb5\x1c',") builder.write("struct.pack('3.5 feature, so add another join. builder.write("b''.join(") # Temporary disable .is_vector, not to enter this if again # Also disable .is_flag since it's not needed per element old_flag = arg.is_flag arg.is_vector = arg.is_flag = False _write_arg_to_bytes(builder, arg, tlobject, name='x') arg.is_vector = True arg.is_flag = old_flag builder.write(' for x in {})', name) elif arg.flag_indicator: # Calculate the flags with those items which are not None if not any(f.is_flag for f in tlobject.args): # There's a flag indicator, but no flag arguments so it's 0 builder.write(r"b'\0\0\0\0'") else: def fmt_flag(flag): if flag.type == 'Bool': fmt = '(0 if {0} is None else {1})' else: fmt = '(0 if {0} is None or {0} is False else {1})' return fmt.format('self.{}'.format(flag.name), 1 << flag.flag_index) builder.write("struct.pack(' """ if arg.generic_definition: return # Do nothing, this only specifies a later type # The argument may be a flag, only write that flag was given! was_flag = False if arg.is_flag: # Treat 'true' flags as a special case, since they're true if # they're set, and nothing else needs to actually be read. if 'true' == arg.type: builder.writeln('{} = bool(flags & {})', name, 1 << arg.flag_index) return was_flag = True builder.writeln('if flags & {}:', 1 << arg.flag_index) # Temporary disable .is_flag not to enter this if # again when calling the method recursively arg.is_flag = False if arg.is_vector: if arg.use_vector_id: # We have to read the vector's constructor ID builder.writeln("reader.read_int()") builder.writeln('{} = []', name) builder.writeln('for _ in range(reader.read_int()):') # Temporary disable .is_vector, not to enter this if again arg.is_vector = False _write_arg_read_code(builder, arg, tlobject, name='_x') builder.writeln('{}.append(_x)', name) arg.is_vector = True elif arg.flag_indicator: # Read the flags, which will indicate what items we should read next builder.writeln('flags = reader.read_int()') builder.writeln() elif 'int' == arg.type: # User IDs are becoming larger than 2³¹ - 1, which would translate # into reading a negative ID, which we would treat as a chat. So # special case them to read unsigned. See https://t.me/BotNews/57. if arg.name == 'user_id' or (arg.name == 'id' and tlobject.result == 'User'): builder.writeln('{} = reader.read_int(signed=False)', name) else: builder.writeln('{} = reader.read_int()', name) elif 'long' == arg.type: builder.writeln('{} = reader.read_long()', name) elif 'int128' == arg.type: builder.writeln('{} = reader.read_large_int(bits=128)', name) elif 'int256' == arg.type: builder.writeln('{} = reader.read_large_int(bits=256)', name) elif 'double' == arg.type: builder.writeln('{} = reader.read_double()', name) elif 'string' == arg.type: builder.writeln('{} = reader.tgread_string()', name) elif 'Bool' == arg.type: builder.writeln('{} = reader.tgread_bool()', name) elif 'true' == arg.type: # Arbitrary not-None value, don't actually read "true" flags builder.writeln('{} = True', name) elif 'bytes' == arg.type: builder.writeln('{} = reader.tgread_bytes()', name) elif 'date' == arg.type: # Custom format builder.writeln('{} = reader.tgread_date()', name) else: # Else it may be a custom type if not arg.skip_constructor_id: builder.writeln('{} = reader.tgread_object()', name) else: # Import the correct type inline to avoid cyclic imports. # There may be better solutions so that we can just access # all the types before the files have been parsed, but I # don't know of any. sep_index = arg.type.find('.') if sep_index == -1: ns, t = '.', arg.type else: ns, t = '.' + arg.type[:sep_index], arg.type[sep_index+1:] class_name = snake_to_camel_case(t) # There would be no need to import the type if we're in the # file with the same namespace, but since it does no harm # and we don't have information about such thing in the # method we just ignore that case. builder.writeln('from {} import {}', ns, class_name) builder.writeln('{} = {}.from_reader(reader)', name, class_name) # End vector and flag blocks if required (if we opened them before) if arg.is_vector: builder.end_block() if was_flag: builder.current_indent -= 1 builder.writeln('else:') builder.writeln('{} = None', name) builder.current_indent -= 1 # Restore .is_flag arg.is_flag = True def _write_all_tlobjects(tlobjects, layer, builder): # Create a constant variable to indicate which layer this is builder.writeln('LAYER = {}', layer) builder.writeln() # Then create the dictionary containing constructor_id: class builder.writeln('tlobjects = {') builder.current_indent += 1 # Fill the dictionary (0x1a2b3c4f: tl.full.type.path.Class) for tlobject in tlobjects: builder.write('{:#010x}: ', tlobject.id) if tlobject.is_function: builder.write('fn.') if tlobject.namespace: builder.write('{}.', tlobject.namespace) builder.writeln('{},', tlobject.class_name) builder.current_indent -= 1 builder.writeln('}') def generate_tlobjects(tlobjects, layer, input_mod, output_dir): # Group everything by {namespace: [tlobjects]} to generate __init__.py namespace_functions = defaultdict(list) namespace_types = defaultdict(list) # Group {type: [constructors]} to generate the documentation type_constructors = defaultdict(list) for tlobject in tlobjects: if tlobject.is_function: namespace_functions[tlobject.namespace].append(tlobject) else: namespace_types[tlobject.namespace].append(tlobject) type_constructors[tlobject.result].append(tlobject) _write_modules(output_dir, input_mod, 'TLObject', namespace_types, type_constructors, layer, tlobjects) _write_modules(output_dir / 'fn', input_mod + '.fn', 'TLRequest', namespace_functions, type_constructors, layer, tlobjects) def clean_tlobjects(output_dir): for d in ('functions', 'types'): d = output_dir / d if d.is_dir(): shutil.rmtree(str(d))