mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-01-25 00:34:19 +03:00
Move tl_generator to generators/ and remove the class
This commit is contained in:
parent
200d3c0360
commit
0af4699994
|
@ -1,15 +1,24 @@
|
|||
from telethon_generator.parsers import parse_errors
|
||||
from telethon_generator.generators import generate_errors
|
||||
from telethon_generator.parsers import parse_errors, parse_tl, find_layer
|
||||
from telethon_generator.generators import generate_errors, generate_tlobjects
|
||||
|
||||
|
||||
INPUT_JSON = 'errors.json'
|
||||
INPUT_DESCRIPTIONS = 'error_descriptions'
|
||||
OUTPUT = '../telethon/errors/rpc_error_list.py'
|
||||
ERRORS_INPUT_JSON = 'errors.json'
|
||||
ERRORS_INPUT_DESC = 'error_descriptions'
|
||||
ERRORS_OUTPUT = '../telethon/errors/rpc_error_list.py'
|
||||
|
||||
TLOBJECT_INPUT_TL = 'scheme.tl'
|
||||
TLOBJECT_OUTPUT = '../telethon/tl'
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
with open(OUTPUT, 'w', encoding='utf-8') as file:
|
||||
generate_tlobjects(
|
||||
tlobjects=list(parse_tl(TLOBJECT_INPUT_TL, ignore_core=True)),
|
||||
layer=find_layer((TLOBJECT_INPUT_TL)),
|
||||
output_dir=TLOBJECT_OUTPUT
|
||||
)
|
||||
|
||||
with open(ERRORS_OUTPUT, 'w', encoding='utf-8') as file:
|
||||
generate_errors(
|
||||
errors=list(parse_errors(INPUT_JSON, INPUT_DESCRIPTIONS)),
|
||||
errors=list(parse_errors(ERRORS_INPUT_JSON, ERRORS_INPUT_DESC)),
|
||||
f=file
|
||||
)
|
||||
|
|
|
@ -1 +1,2 @@
|
|||
from .errors import generate_errors
|
||||
from .tlobject import generate_tlobjects
|
||||
|
|
729
telethon_generator/generators/tlobject.py
Normal file
729
telethon_generator/generators/tlobject.py
Normal file
|
@ -0,0 +1,729 @@
|
|||
import os
|
||||
import re
|
||||
import struct
|
||||
from collections import defaultdict
|
||||
from zlib import crc32
|
||||
|
||||
from ..source_builder import SourceBuilder
|
||||
from ..utils import snake_to_camel_case
|
||||
|
||||
AUTO_GEN_NOTICE = \
|
||||
'"""File generated by TLObjects\' generator. All changes will be ERASED"""'
|
||||
|
||||
|
||||
AUTO_CASTS = {
|
||||
'InputPeer': 'utils.get_input_peer(client.get_input_entity({}))',
|
||||
'InputChannel': 'utils.get_input_channel(client.get_input_entity({}))',
|
||||
'InputUser': 'utils.get_input_user(client.get_input_entity({}))',
|
||||
'InputMedia': 'utils.get_input_media({})',
|
||||
'InputPhoto': 'utils.get_input_photo({})'
|
||||
}
|
||||
|
||||
|
||||
def generate_tlobjects(tlobjects, layer, output_dir):
|
||||
def get_file(*paths):
|
||||
return os.path.join(output_dir, *paths)
|
||||
|
||||
# First ensure that the required parent directories exist
|
||||
os.makedirs(get_file('functions'), exist_ok=True)
|
||||
os.makedirs(get_file('types'), exist_ok=True)
|
||||
|
||||
# Step 1: Group everything by {namespace: [tlobjects]} so we can
|
||||
# easily generate __init__.py files with all the TLObjects on them.
|
||||
namespace_functions = defaultdict(list)
|
||||
namespace_types = defaultdict(list)
|
||||
|
||||
# Make use of this iteration to also store 'Type: [Constructors]',
|
||||
# used when generating the documentation for the classes.
|
||||
type_constructors = defaultdict(list)
|
||||
for tlobject in tlobjects:
|
||||
if tlobject.is_function:
|
||||
namespace_functions[tlobject.namespace].append(tlobject)
|
||||
else:
|
||||
namespace_types[tlobject.namespace].append(tlobject)
|
||||
type_constructors[tlobject.result].append(tlobject)
|
||||
|
||||
# Step 2: Generate the actual code
|
||||
import_depth = 2
|
||||
_write_init_py(
|
||||
get_file('functions'), import_depth,
|
||||
namespace_functions, type_constructors
|
||||
)
|
||||
_write_init_py(
|
||||
get_file('types'), import_depth,
|
||||
namespace_types, type_constructors
|
||||
)
|
||||
|
||||
# Step 4: Once all the objects have been generated,
|
||||
# we can now group them in a single file
|
||||
filename = os.path.join(get_file('all_tlobjects.py'))
|
||||
with open(filename, 'w', encoding='utf-8') as file,\
|
||||
SourceBuilder(file) as builder:
|
||||
builder.writeln(AUTO_GEN_NOTICE)
|
||||
builder.writeln()
|
||||
|
||||
builder.writeln('from . import types, functions')
|
||||
builder.writeln()
|
||||
|
||||
# Create a constant variable to indicate which layer this is
|
||||
builder.writeln('LAYER = {}', layer)
|
||||
builder.writeln()
|
||||
|
||||
# Then create the dictionary containing constructor_id: class
|
||||
builder.writeln('tlobjects = {')
|
||||
builder.current_indent += 1
|
||||
|
||||
# Fill the dictionary (0x1a2b3c4f: tl.full.type.path.Class)
|
||||
for tlobject in tlobjects:
|
||||
builder.write('{:#010x}: ', tlobject.id)
|
||||
builder.write('functions' if tlobject.is_function else 'types')
|
||||
if tlobject.namespace:
|
||||
builder.write('.' + tlobject.namespace)
|
||||
|
||||
builder.writeln('.{},', tlobject.class_name)
|
||||
|
||||
builder.current_indent -= 1
|
||||
builder.writeln('}')
|
||||
|
||||
def _write_init_py(out_dir, depth, namespace_tlobjects, type_constructors):
|
||||
# namespace_tlobjects: {'namespace', [TLObject]}
|
||||
os.makedirs(out_dir, exist_ok=True)
|
||||
for ns, tlobjects in namespace_tlobjects.items():
|
||||
file = os.path.join(out_dir, ns + '.py' if ns else '__init__.py')
|
||||
with open(file, 'w', encoding='utf-8') as f, \
|
||||
SourceBuilder(f) as builder:
|
||||
builder.writeln(AUTO_GEN_NOTICE)
|
||||
|
||||
# Both types and functions inherit from the TLObject class
|
||||
# so they all can be serialized and sent, however, only the
|
||||
# functions are "content_related".
|
||||
builder.writeln(
|
||||
'from {}.tl.tlobject import TLObject', '.' * depth
|
||||
)
|
||||
builder.writeln('from typing import Optional, List, '
|
||||
'Union, TYPE_CHECKING')
|
||||
|
||||
# Add the relative imports to the namespaces,
|
||||
# unless we already are in a namespace.
|
||||
if not ns:
|
||||
builder.writeln('from . import {}', ', '.join(
|
||||
x for x in namespace_tlobjects.keys() if x
|
||||
))
|
||||
|
||||
# Import 'os' for those needing access to 'os.urandom()'
|
||||
# Currently only 'random_id' needs 'os' to be imported,
|
||||
# for all those TLObjects with arg.can_be_inferred.
|
||||
builder.writeln('import os')
|
||||
|
||||
# Import struct for the .__bytes__(self) serialization
|
||||
builder.writeln('import struct')
|
||||
|
||||
tlobjects.sort(key=lambda x: x.name)
|
||||
|
||||
type_names = set()
|
||||
type_defs = []
|
||||
|
||||
# Find all the types in this file and generate type definitions
|
||||
# based on the types. The type definitions are written to the
|
||||
# file at the end.
|
||||
for t in tlobjects:
|
||||
if not t.is_function:
|
||||
type_name = t.result
|
||||
if '.' in type_name:
|
||||
type_name = type_name[type_name.rindex('.'):]
|
||||
if type_name in type_names:
|
||||
continue
|
||||
type_names.add(type_name)
|
||||
constructors = type_constructors[type_name]
|
||||
if not constructors:
|
||||
pass
|
||||
elif len(constructors) == 1:
|
||||
type_defs.append('Type{} = {}'.format(
|
||||
type_name, constructors[0].class_name))
|
||||
else:
|
||||
type_defs.append('Type{} = Union[{}]'.format(
|
||||
type_name, ','.join(c.class_name
|
||||
for c in constructors)))
|
||||
|
||||
imports = {}
|
||||
primitives = ('int', 'long', 'int128', 'int256', 'string',
|
||||
'date', 'bytes', 'true')
|
||||
# Find all the types in other files that are used in this file
|
||||
# and generate the information required to import those types.
|
||||
for t in tlobjects:
|
||||
for arg in t.args:
|
||||
name = arg.type
|
||||
if not name or name in primitives:
|
||||
continue
|
||||
|
||||
import_space = '{}.tl.types'.format('.' * depth)
|
||||
if '.' in name:
|
||||
namespace = name.split('.')[0]
|
||||
name = name.split('.')[1]
|
||||
import_space += '.{}'.format(namespace)
|
||||
|
||||
if name not in type_names:
|
||||
type_names.add(name)
|
||||
if name == 'date':
|
||||
imports['datetime'] = ['datetime']
|
||||
continue
|
||||
elif import_space not in imports:
|
||||
imports[import_space] = set()
|
||||
imports[import_space].add('Type{}'.format(name))
|
||||
|
||||
# Add imports required for type checking
|
||||
if imports:
|
||||
builder.writeln('if TYPE_CHECKING:')
|
||||
for namespace, names in imports.items():
|
||||
builder.writeln('from {} import {}',
|
||||
namespace, ', '.join(names))
|
||||
|
||||
builder.end_block()
|
||||
|
||||
# Generate the class for every TLObject
|
||||
for t in tlobjects:
|
||||
_write_source_code(
|
||||
t, builder, depth, type_constructors
|
||||
)
|
||||
builder.current_indent = 0
|
||||
|
||||
# Write the type definitions generated earlier.
|
||||
builder.writeln('')
|
||||
for line in type_defs:
|
||||
builder.writeln(line)
|
||||
|
||||
|
||||
def _write_source_code(tlobject, builder, depth, type_constructors):
|
||||
"""
|
||||
Writes the source code corresponding to the given TLObject
|
||||
by making use of the ``builder`` `SourceBuilder`.
|
||||
|
||||
Additional information such as file path depth and
|
||||
the ``Type: [Constructors]`` must be given for proper
|
||||
importing and documentation strings.
|
||||
"""
|
||||
builder.writeln()
|
||||
builder.writeln()
|
||||
builder.writeln('class {}(TLObject):', tlobject.class_name)
|
||||
|
||||
# Class-level variable to store its Telegram's constructor ID
|
||||
builder.writeln('CONSTRUCTOR_ID = {:#x}', tlobject.id)
|
||||
builder.writeln('SUBCLASS_OF_ID = {:#x}',
|
||||
crc32(tlobject.result.encode('ascii')))
|
||||
builder.writeln()
|
||||
|
||||
# Flag arguments must go last
|
||||
args = [
|
||||
a for a in tlobject.sorted_args()
|
||||
if not a.flag_indicator and not a.generic_definition
|
||||
]
|
||||
|
||||
# Convert the args to string parameters, flags having =None
|
||||
args = [
|
||||
(a.name if not a.is_flag and not a.can_be_inferred
|
||||
else '{}=None'.format(a.name))
|
||||
for a in args
|
||||
]
|
||||
|
||||
# Write the __init__ function
|
||||
if args:
|
||||
builder.writeln('def __init__(self, {}):', ', '.join(args))
|
||||
else:
|
||||
builder.writeln('def __init__(self):')
|
||||
|
||||
# Now update args to have the TLObject arguments, _except_
|
||||
# those which are calculated on send or ignored, this is
|
||||
# flag indicator and generic definitions.
|
||||
#
|
||||
# We don't need the generic definitions in Python
|
||||
# because arguments can be any type
|
||||
args = [arg for arg in tlobject.args
|
||||
if not arg.flag_indicator and
|
||||
not arg.generic_definition]
|
||||
|
||||
if args:
|
||||
# Write the docstring, to know the type of the args
|
||||
builder.writeln('"""')
|
||||
for arg in args:
|
||||
if not arg.flag_indicator:
|
||||
builder.writeln(':param {} {}:',
|
||||
arg.doc_type_hint(), arg.name)
|
||||
builder.current_indent -= 1 # It will auto-indent (':')
|
||||
|
||||
# We also want to know what type this request returns
|
||||
# or to which type this constructor belongs to
|
||||
builder.writeln()
|
||||
if tlobject.is_function:
|
||||
builder.write(':returns {}: ', tlobject.result)
|
||||
else:
|
||||
builder.write('Constructor for {}: ', tlobject.result)
|
||||
|
||||
constructors = type_constructors[tlobject.result]
|
||||
if not constructors:
|
||||
builder.writeln('This type has no constructors.')
|
||||
elif len(constructors) == 1:
|
||||
builder.writeln('Instance of {}.',
|
||||
constructors[0].class_name)
|
||||
else:
|
||||
builder.writeln('Instance of either {}.', ', '.join(
|
||||
c.class_name for c in constructors))
|
||||
|
||||
builder.writeln('"""')
|
||||
|
||||
builder.writeln('super().__init__()')
|
||||
# Functions have a result object and are confirmed by default
|
||||
if tlobject.is_function:
|
||||
builder.writeln('self.result = None')
|
||||
builder.writeln(
|
||||
'self.content_related = True')
|
||||
|
||||
# Set the arguments
|
||||
if args:
|
||||
# Leave an empty line if there are any args
|
||||
builder.writeln()
|
||||
|
||||
for arg in args:
|
||||
if not arg.can_be_inferred:
|
||||
builder.writeln('self.{0} = {0} # type: {1}',
|
||||
arg.name, arg.python_type_hint())
|
||||
continue
|
||||
|
||||
# Currently the only argument that can be
|
||||
# inferred are those called 'random_id'
|
||||
if arg.name == 'random_id':
|
||||
# Endianness doesn't really matter, and 'big' is shorter
|
||||
code = "int.from_bytes(os.urandom({}), 'big', signed=True)" \
|
||||
.format(8 if arg.type == 'long' else 4)
|
||||
|
||||
if arg.is_vector:
|
||||
# Currently for the case of "messages.forwardMessages"
|
||||
# Ensure we can infer the length from id:Vector<>
|
||||
if not next(
|
||||
a for a in args if a.name == 'id').is_vector:
|
||||
raise ValueError(
|
||||
'Cannot infer list of random ids for ', tlobject
|
||||
)
|
||||
code = '[{} for _ in range(len(id))]'.format(code)
|
||||
|
||||
builder.writeln(
|
||||
"self.random_id = random_id if random_id "
|
||||
"is not None else {}", code
|
||||
)
|
||||
else:
|
||||
raise ValueError('Cannot infer a value for ', arg)
|
||||
|
||||
builder.end_block()
|
||||
|
||||
# Write the resolve(self, client, utils) method
|
||||
if any(arg.type in AUTO_CASTS for arg in args):
|
||||
builder.writeln('def resolve(self, client, utils):')
|
||||
for arg in args:
|
||||
ac = AUTO_CASTS.get(arg.type, None)
|
||||
if ac:
|
||||
_write_self_assign(builder, arg, ac)
|
||||
builder.end_block()
|
||||
|
||||
# Write the to_dict(self) method
|
||||
builder.writeln('def to_dict(self):')
|
||||
builder.writeln('return {')
|
||||
builder.current_indent += 1
|
||||
|
||||
base_types = ('string', 'bytes', 'int', 'long', 'int128',
|
||||
'int256', 'double', 'Bool', 'true', 'date')
|
||||
|
||||
builder.write("'_': '{}'", tlobject.class_name)
|
||||
for arg in args:
|
||||
builder.writeln(',')
|
||||
builder.write("'{}': ", arg.name)
|
||||
if arg.type in base_types:
|
||||
if arg.is_vector:
|
||||
builder.write('[] if self.{0} is None else self.{0}[:]',
|
||||
arg.name)
|
||||
else:
|
||||
builder.write('self.{}', arg.name)
|
||||
else:
|
||||
if arg.is_vector:
|
||||
builder.write(
|
||||
'[] if self.{0} is None else [None '
|
||||
'if x is None else x.to_dict() for x in self.{0}]',
|
||||
arg.name
|
||||
)
|
||||
else:
|
||||
builder.write(
|
||||
'None if self.{0} is None else self.{0}.to_dict()',
|
||||
arg.name
|
||||
)
|
||||
|
||||
builder.writeln()
|
||||
builder.current_indent -= 1
|
||||
builder.writeln("}")
|
||||
|
||||
builder.end_block()
|
||||
|
||||
# Write the .__bytes__() function
|
||||
builder.writeln('def __bytes__(self):')
|
||||
|
||||
# Some objects require more than one flag parameter to be set
|
||||
# at the same time. In this case, add an assertion.
|
||||
repeated_args = defaultdict(list)
|
||||
for arg in tlobject.args:
|
||||
if arg.is_flag:
|
||||
repeated_args[arg.flag_index].append(arg)
|
||||
|
||||
for ra in repeated_args.values():
|
||||
if len(ra) > 1:
|
||||
cnd1 = ('(self.{0} or self.{0} is not None)'
|
||||
.format(a.name) for a in ra)
|
||||
cnd2 = ('(self.{0} is None or self.{0} is False)'
|
||||
.format(a.name) for a in ra)
|
||||
builder.writeln(
|
||||
"assert ({}) or ({}), '{} parameters must all "
|
||||
"be False-y (like None) or all me True-y'",
|
||||
' and '.join(cnd1), ' and '.join(cnd2),
|
||||
', '.join(a.name for a in ra)
|
||||
)
|
||||
|
||||
builder.writeln("return b''.join((")
|
||||
builder.current_indent += 1
|
||||
|
||||
# First constructor code, we already know its bytes
|
||||
builder.writeln('{},', repr(struct.pack('<I', tlobject.id)))
|
||||
|
||||
for arg in tlobject.args:
|
||||
if write_to_bytes(builder, arg, tlobject.args):
|
||||
builder.writeln(',')
|
||||
|
||||
builder.current_indent -= 1
|
||||
builder.writeln('))')
|
||||
builder.end_block()
|
||||
|
||||
# Write the static from_reader(reader) function
|
||||
builder.writeln('@staticmethod')
|
||||
builder.writeln('def from_reader(reader):')
|
||||
for arg in tlobject.args:
|
||||
write_read_code(
|
||||
builder, arg, tlobject.args, name='_' + arg.name
|
||||
)
|
||||
|
||||
builder.writeln(
|
||||
'return {}({})',
|
||||
tlobject.class_name,
|
||||
', '.join(
|
||||
'{0}=_{0}'.format(a.name) for a in tlobject.sorted_args()
|
||||
if not a.flag_indicator and not a.generic_definition
|
||||
)
|
||||
)
|
||||
|
||||
# Only requests can have a different response that's not their
|
||||
# serialized body, that is, we'll be setting their .result.
|
||||
#
|
||||
# The default behaviour is reading a TLObject too, so no need
|
||||
# to override it unless necessary.
|
||||
if tlobject.is_function and not _is_boxed(tlobject.result):
|
||||
builder.end_block()
|
||||
builder.writeln('def on_response(self, reader):')
|
||||
write_request_result_code(builder, tlobject)
|
||||
|
||||
|
||||
def _is_boxed(type_):
|
||||
# https://core.telegram.org/mtproto/serialize#boxed-and-bare-types
|
||||
# TL;DR; boxed types start with uppercase always, so we can use
|
||||
# this to check whether everything in it is boxed or not.
|
||||
#
|
||||
# The API always returns a boxed type, but it may inside a Vector<>
|
||||
# or a namespace, and the Vector may have a not-boxed type. For this
|
||||
# reason we find whatever index, '<' or '.'. If neither are present
|
||||
# we will get -1, and the 0th char is always upper case thus works.
|
||||
# For Vector types and namespaces, it will check in the right place.
|
||||
check_after = max(type_.find('<'), type_.find('.'))
|
||||
return type_[check_after + 1].isupper()
|
||||
|
||||
|
||||
def _write_self_assign(builder, arg, get_input_code):
|
||||
"""Writes self.arg = input.format(self.arg), considering vectors."""
|
||||
if arg.is_vector:
|
||||
builder.write('self.{0} = [{1} for _x in self.{0}]',
|
||||
arg.name, get_input_code.format('_x'))
|
||||
else:
|
||||
builder.write('self.{} = {}',
|
||||
arg.name, get_input_code.format('self.' + arg.name))
|
||||
|
||||
builder.writeln(
|
||||
' if self.{} else None'.format(arg.name) if arg.is_flag else ''
|
||||
)
|
||||
|
||||
|
||||
def get_file_name(tlobject, add_extension=False):
|
||||
"""Gets the file name in file_name_format.py for the given TLObject"""
|
||||
|
||||
# Courtesy of http://stackoverflow.com/a/1176023/4759433
|
||||
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', tlobject.name)
|
||||
result = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
|
||||
if add_extension:
|
||||
return result + '.py'
|
||||
else:
|
||||
return result
|
||||
|
||||
|
||||
def write_to_bytes(builder, arg, args, name=None):
|
||||
"""
|
||||
Writes the .__bytes__() code for the given argument
|
||||
:param builder: The source code builder
|
||||
:param arg: The argument to write
|
||||
:param args: All the other arguments in TLObject same __bytes__.
|
||||
This is required to determine the flags value
|
||||
:param name: The name of the argument. Defaults to "self.argname"
|
||||
This argument is an option because it's required when
|
||||
writing Vectors<>
|
||||
"""
|
||||
if arg.generic_definition:
|
||||
return # Do nothing, this only specifies a later type
|
||||
|
||||
if name is None:
|
||||
name = 'self.{}'.format(arg.name)
|
||||
|
||||
# The argument may be a flag, only write if it's not None AND
|
||||
# if it's not a True type.
|
||||
# True types are not actually sent, but instead only used to
|
||||
# determine the flags.
|
||||
if arg.is_flag:
|
||||
if arg.type == 'true':
|
||||
return # Exit, since True type is never written
|
||||
elif arg.is_vector:
|
||||
# Vector flags are special since they consist of 3 values,
|
||||
# so we need an extra join here. Note that empty vector flags
|
||||
# should NOT be sent either!
|
||||
builder.write("b'' if {0} is None or {0} is False "
|
||||
"else b''.join((", name)
|
||||
else:
|
||||
builder.write("b'' if {0} is None or {0} is False "
|
||||
"else (", name)
|
||||
|
||||
if arg.is_vector:
|
||||
if arg.use_vector_id:
|
||||
# vector code, unsigned 0x1cb5c415 as little endian
|
||||
builder.write(r"b'\x15\xc4\xb5\x1c',")
|
||||
|
||||
builder.write("struct.pack('<i', len({})),", name)
|
||||
|
||||
# Cannot unpack the values for the outer tuple through *[(
|
||||
# since that's a Python >3.5 feature, so add another join.
|
||||
builder.write("b''.join(")
|
||||
|
||||
# Temporary disable .is_vector, not to enter this if again
|
||||
# Also disable .is_flag since it's not needed per element
|
||||
old_flag = arg.is_flag
|
||||
arg.is_vector = arg.is_flag = False
|
||||
write_to_bytes(builder, arg, args, name='x')
|
||||
arg.is_vector = True
|
||||
arg.is_flag = old_flag
|
||||
|
||||
builder.write(' for x in {})', name)
|
||||
|
||||
elif arg.flag_indicator:
|
||||
# Calculate the flags with those items which are not None
|
||||
if not any(f.is_flag for f in args):
|
||||
# There's a flag indicator, but no flag arguments so it's 0
|
||||
builder.write(r"b'\0\0\0\0'")
|
||||
else:
|
||||
builder.write("struct.pack('<I', ")
|
||||
builder.write(
|
||||
' | '.join('(0 if {0} is None or {0} is False else {1})'
|
||||
.format('self.{}'.format(flag.name),
|
||||
1 << flag.flag_index)
|
||||
for flag in args if flag.is_flag)
|
||||
)
|
||||
builder.write(')')
|
||||
|
||||
elif 'int' == arg.type:
|
||||
# struct.pack is around 4 times faster than int.to_bytes
|
||||
builder.write("struct.pack('<i', {})", name)
|
||||
|
||||
elif 'long' == arg.type:
|
||||
builder.write("struct.pack('<q', {})", name)
|
||||
|
||||
elif 'int128' == arg.type:
|
||||
builder.write("{}.to_bytes(16, 'little', signed=True)", name)
|
||||
|
||||
elif 'int256' == arg.type:
|
||||
builder.write("{}.to_bytes(32, 'little', signed=True)", name)
|
||||
|
||||
elif 'double' == arg.type:
|
||||
builder.write("struct.pack('<d', {})", name)
|
||||
|
||||
elif 'string' == arg.type:
|
||||
builder.write('TLObject.serialize_bytes({})', name)
|
||||
|
||||
elif 'Bool' == arg.type:
|
||||
# 0x997275b5 if boolean else 0xbc799737
|
||||
builder.write(r"b'\xb5ur\x99' if {} else b'7\x97y\xbc'", name)
|
||||
|
||||
elif 'true' == arg.type:
|
||||
pass # These are actually NOT written! Only used for flags
|
||||
|
||||
elif 'bytes' == arg.type:
|
||||
builder.write('TLObject.serialize_bytes({})', name)
|
||||
|
||||
elif 'date' == arg.type: # Custom format
|
||||
builder.write('TLObject.serialize_datetime({})', name)
|
||||
|
||||
else:
|
||||
# Else it may be a custom type
|
||||
builder.write('bytes({})', name)
|
||||
|
||||
if arg.is_flag:
|
||||
builder.write(')')
|
||||
if arg.is_vector:
|
||||
builder.write(')') # We were using a tuple
|
||||
|
||||
return True # Something was written
|
||||
|
||||
|
||||
def write_read_code(builder, arg, args, name):
|
||||
"""
|
||||
Writes the read code for the given argument, setting the
|
||||
arg.name variable to its read value.
|
||||
|
||||
:param builder: The source code builder
|
||||
:param arg: The argument to write
|
||||
:param args: All the other arguments in TLObject same on_send.
|
||||
This is required to determine the flags value
|
||||
:param name: The name of the argument. Defaults to "self.argname"
|
||||
This argument is an option because it's required when
|
||||
writing Vectors<>
|
||||
"""
|
||||
|
||||
if arg.generic_definition:
|
||||
return # Do nothing, this only specifies a later type
|
||||
|
||||
# The argument may be a flag, only write that flag was given!
|
||||
was_flag = False
|
||||
if arg.is_flag:
|
||||
# Treat 'true' flags as a special case, since they're true if
|
||||
# they're set, and nothing else needs to actually be read.
|
||||
if 'true' == arg.type:
|
||||
builder.writeln('{} = bool(flags & {})',
|
||||
name, 1 << arg.flag_index)
|
||||
return
|
||||
|
||||
was_flag = True
|
||||
builder.writeln('if flags & {}:', 1 << arg.flag_index)
|
||||
# Temporary disable .is_flag not to enter this if
|
||||
# again when calling the method recursively
|
||||
arg.is_flag = False
|
||||
|
||||
if arg.is_vector:
|
||||
if arg.use_vector_id:
|
||||
# We have to read the vector's constructor ID
|
||||
builder.writeln("reader.read_int()")
|
||||
|
||||
builder.writeln('{} = []', name)
|
||||
builder.writeln('for _ in range(reader.read_int()):')
|
||||
# Temporary disable .is_vector, not to enter this if again
|
||||
arg.is_vector = False
|
||||
write_read_code(builder, arg, args, name='_x')
|
||||
builder.writeln('{}.append(_x)', name)
|
||||
arg.is_vector = True
|
||||
|
||||
elif arg.flag_indicator:
|
||||
# Read the flags, which will indicate what items we should read next
|
||||
builder.writeln('flags = reader.read_int()')
|
||||
builder.writeln()
|
||||
|
||||
elif 'int' == arg.type:
|
||||
builder.writeln('{} = reader.read_int()', name)
|
||||
|
||||
elif 'long' == arg.type:
|
||||
builder.writeln('{} = reader.read_long()', name)
|
||||
|
||||
elif 'int128' == arg.type:
|
||||
builder.writeln('{} = reader.read_large_int(bits=128)', name)
|
||||
|
||||
elif 'int256' == arg.type:
|
||||
builder.writeln('{} = reader.read_large_int(bits=256)', name)
|
||||
|
||||
elif 'double' == arg.type:
|
||||
builder.writeln('{} = reader.read_double()', name)
|
||||
|
||||
elif 'string' == arg.type:
|
||||
builder.writeln('{} = reader.tgread_string()', name)
|
||||
|
||||
elif 'Bool' == arg.type:
|
||||
builder.writeln('{} = reader.tgread_bool()', name)
|
||||
|
||||
elif 'true' == arg.type:
|
||||
# Arbitrary not-None value, don't actually read "true" flags
|
||||
builder.writeln('{} = True', name)
|
||||
|
||||
elif 'bytes' == arg.type:
|
||||
builder.writeln('{} = reader.tgread_bytes()', name)
|
||||
|
||||
elif 'date' == arg.type: # Custom format
|
||||
builder.writeln('{} = reader.tgread_date()', name)
|
||||
|
||||
else:
|
||||
# Else it may be a custom type
|
||||
if not arg.skip_constructor_id:
|
||||
builder.writeln('{} = reader.tgread_object()', name)
|
||||
else:
|
||||
# Import the correct type inline to avoid cyclic imports.
|
||||
# There may be better solutions so that we can just access
|
||||
# all the types before the files have been parsed, but I
|
||||
# don't know of any.
|
||||
sep_index = arg.type.find('.')
|
||||
if sep_index == -1:
|
||||
ns, t = '.', arg.type
|
||||
else:
|
||||
ns, t = '.' + arg.type[:sep_index], arg.type[sep_index+1:]
|
||||
class_name = snake_to_camel_case(t)
|
||||
|
||||
# There would be no need to import the type if we're in the
|
||||
# file with the same namespace, but since it does no harm
|
||||
# and we don't have information about such thing in the
|
||||
# method we just ignore that case.
|
||||
builder.writeln('from {} import {}', ns, class_name)
|
||||
builder.writeln('{} = {}.from_reader(reader)',
|
||||
name, class_name)
|
||||
|
||||
# End vector and flag blocks if required (if we opened them before)
|
||||
if arg.is_vector:
|
||||
builder.end_block()
|
||||
|
||||
if was_flag:
|
||||
builder.current_indent -= 1
|
||||
builder.writeln('else:')
|
||||
builder.writeln('{} = None', name)
|
||||
builder.current_indent -= 1
|
||||
# Restore .is_flag
|
||||
arg.is_flag = True
|
||||
|
||||
|
||||
def write_request_result_code(builder, tlobject):
|
||||
"""
|
||||
Writes the receive code for the given function
|
||||
|
||||
:param builder: The source code builder
|
||||
:param tlobject: The TLObject for which the 'self.result = '
|
||||
will be written
|
||||
"""
|
||||
if tlobject.result.startswith('Vector<'):
|
||||
# Vector results are a bit special since they can also be composed
|
||||
# of integer values and such; however, the result of requests is
|
||||
# not parsed as arguments are and it's a bit harder to tell which
|
||||
# is which.
|
||||
if tlobject.result == 'Vector<int>':
|
||||
builder.writeln('reader.read_int() # Vector ID')
|
||||
builder.writeln('count = reader.read_int()')
|
||||
builder.writeln(
|
||||
'self.result = [reader.read_int() for _ in range(count)]'
|
||||
)
|
||||
elif tlobject.result == 'Vector<long>':
|
||||
builder.writeln('reader.read_int() # Vector ID')
|
||||
builder.writeln('count = reader.read_long()')
|
||||
builder.writeln(
|
||||
'self.result = [reader.read_long() for _ in range(count)]'
|
||||
)
|
||||
else:
|
||||
builder.writeln('self.result = reader.tgread_vector()')
|
||||
else:
|
||||
builder.writeln('self.result = reader.tgread_object()')
|
|
@ -1,764 +0,0 @@
|
|||
import os
|
||||
import re
|
||||
import shutil
|
||||
import struct
|
||||
from zlib import crc32
|
||||
from collections import defaultdict
|
||||
|
||||
from .source_builder import SourceBuilder
|
||||
from .parsers import TLObject, parse_tl, find_layer
|
||||
from .utils import snake_to_camel_case
|
||||
AUTO_GEN_NOTICE = \
|
||||
'"""File generated by TLObjects\' generator. All changes will be ERASED"""'
|
||||
|
||||
|
||||
AUTO_CASTS = {
|
||||
'InputPeer': 'utils.get_input_peer(client.get_input_entity({}))',
|
||||
'InputChannel': 'utils.get_input_channel(client.get_input_entity({}))',
|
||||
'InputUser': 'utils.get_input_user(client.get_input_entity({}))',
|
||||
'InputMedia': 'utils.get_input_media({})',
|
||||
'InputPhoto': 'utils.get_input_photo({})'
|
||||
}
|
||||
|
||||
|
||||
class TLGenerator:
|
||||
def __init__(self, output_dir):
|
||||
self.output_dir = output_dir
|
||||
|
||||
def _get_file(self, *paths):
|
||||
"""Wrapper around ``os.path.join()`` with output as first path."""
|
||||
return os.path.join(self.output_dir, *paths)
|
||||
|
||||
def _rm_if_exists(self, filename):
|
||||
"""Recursively deletes the given filename if it exists."""
|
||||
file = self._get_file(filename)
|
||||
if os.path.exists(file):
|
||||
if os.path.isdir(file):
|
||||
shutil.rmtree(file)
|
||||
else:
|
||||
os.remove(file)
|
||||
|
||||
def tlobjects_exist(self):
|
||||
"""
|
||||
Determines whether the TLObjects were previously
|
||||
generated (hence exist) or not.
|
||||
"""
|
||||
return os.path.isfile(self._get_file('all_tlobjects.py'))
|
||||
|
||||
def clean_tlobjects(self):
|
||||
"""Cleans the automatically generated TLObjects from disk."""
|
||||
for name in ('functions', 'types', 'all_tlobjects.py'):
|
||||
self._rm_if_exists(name)
|
||||
|
||||
def generate_tlobjects(self, scheme_file, import_depth):
|
||||
"""
|
||||
Generates all the TLObjects from the ``scheme_file`` to
|
||||
``tl/functions`` and ``tl/types``.
|
||||
"""
|
||||
|
||||
# First ensure that the required parent directories exist
|
||||
os.makedirs(self._get_file('functions'), exist_ok=True)
|
||||
os.makedirs(self._get_file('types'), exist_ok=True)
|
||||
|
||||
# Step 0: Cache the parsed file on a tuple
|
||||
tlobjects = tuple(parse_tl(scheme_file, ignore_core=True))
|
||||
|
||||
# Step 1: Group everything by {namespace: [tlobjects]} so we can
|
||||
# easily generate __init__.py files with all the TLObjects on them.
|
||||
namespace_functions = defaultdict(list)
|
||||
namespace_types = defaultdict(list)
|
||||
|
||||
# Make use of this iteration to also store 'Type: [Constructors]',
|
||||
# used when generating the documentation for the classes.
|
||||
type_constructors = defaultdict(list)
|
||||
for tlobject in tlobjects:
|
||||
if tlobject.is_function:
|
||||
namespace_functions[tlobject.namespace].append(tlobject)
|
||||
else:
|
||||
namespace_types[tlobject.namespace].append(tlobject)
|
||||
type_constructors[tlobject.result].append(tlobject)
|
||||
|
||||
# Step 2: Generate the actual code
|
||||
self._write_init_py(
|
||||
self._get_file('functions'), import_depth,
|
||||
namespace_functions, type_constructors
|
||||
)
|
||||
self._write_init_py(
|
||||
self._get_file('types'), import_depth,
|
||||
namespace_types, type_constructors
|
||||
)
|
||||
|
||||
# Step 4: Once all the objects have been generated,
|
||||
# we can now group them in a single file
|
||||
filename = os.path.join(self._get_file('all_tlobjects.py'))
|
||||
with open(filename, 'w', encoding='utf-8') as file,\
|
||||
SourceBuilder(file) as builder:
|
||||
builder.writeln(AUTO_GEN_NOTICE)
|
||||
builder.writeln()
|
||||
|
||||
builder.writeln('from . import types, functions')
|
||||
builder.writeln()
|
||||
|
||||
# Create a constant variable to indicate which layer this is
|
||||
builder.writeln('LAYER = {}', find_layer(scheme_file))
|
||||
builder.writeln()
|
||||
|
||||
# Then create the dictionary containing constructor_id: class
|
||||
builder.writeln('tlobjects = {')
|
||||
builder.current_indent += 1
|
||||
|
||||
# Fill the dictionary (0x1a2b3c4f: tl.full.type.path.Class)
|
||||
for tlobject in tlobjects:
|
||||
builder.write('{:#010x}: ', tlobject.id)
|
||||
builder.write('functions' if tlobject.is_function else 'types')
|
||||
if tlobject.namespace:
|
||||
builder.write('.' + tlobject.namespace)
|
||||
|
||||
builder.writeln('.{},', tlobject.class_name)
|
||||
|
||||
builder.current_indent -= 1
|
||||
builder.writeln('}')
|
||||
|
||||
@staticmethod
|
||||
def _write_init_py(out_dir, depth, namespace_tlobjects, type_constructors):
|
||||
# namespace_tlobjects: {'namespace', [TLObject]}
|
||||
os.makedirs(out_dir, exist_ok=True)
|
||||
for ns, tlobjects in namespace_tlobjects.items():
|
||||
file = os.path.join(out_dir, ns + '.py' if ns else '__init__.py')
|
||||
with open(file, 'w', encoding='utf-8') as f, \
|
||||
SourceBuilder(f) as builder:
|
||||
builder.writeln(AUTO_GEN_NOTICE)
|
||||
|
||||
# Both types and functions inherit from the TLObject class
|
||||
# so they all can be serialized and sent, however, only the
|
||||
# functions are "content_related".
|
||||
builder.writeln(
|
||||
'from {}.tl.tlobject import TLObject', '.' * depth
|
||||
)
|
||||
builder.writeln('from typing import Optional, List, '
|
||||
'Union, TYPE_CHECKING')
|
||||
|
||||
# Add the relative imports to the namespaces,
|
||||
# unless we already are in a namespace.
|
||||
if not ns:
|
||||
builder.writeln('from . import {}', ', '.join(
|
||||
x for x in namespace_tlobjects.keys() if x
|
||||
))
|
||||
|
||||
# Import 'os' for those needing access to 'os.urandom()'
|
||||
# Currently only 'random_id' needs 'os' to be imported,
|
||||
# for all those TLObjects with arg.can_be_inferred.
|
||||
builder.writeln('import os')
|
||||
|
||||
# Import struct for the .__bytes__(self) serialization
|
||||
builder.writeln('import struct')
|
||||
|
||||
tlobjects.sort(key=lambda x: x.name)
|
||||
|
||||
type_names = set()
|
||||
type_defs = []
|
||||
|
||||
# Find all the types in this file and generate type definitions
|
||||
# based on the types. The type definitions are written to the
|
||||
# file at the end.
|
||||
for t in tlobjects:
|
||||
if not t.is_function:
|
||||
type_name = t.result
|
||||
if '.' in type_name:
|
||||
type_name = type_name[type_name.rindex('.'):]
|
||||
if type_name in type_names:
|
||||
continue
|
||||
type_names.add(type_name)
|
||||
constructors = type_constructors[type_name]
|
||||
if not constructors:
|
||||
pass
|
||||
elif len(constructors) == 1:
|
||||
type_defs.append('Type{} = {}'.format(
|
||||
type_name, constructors[0].class_name))
|
||||
else:
|
||||
type_defs.append('Type{} = Union[{}]'.format(
|
||||
type_name, ','.join(c.class_name
|
||||
for c in constructors)))
|
||||
|
||||
imports = {}
|
||||
primitives = ('int', 'long', 'int128', 'int256', 'string',
|
||||
'date', 'bytes', 'true')
|
||||
# Find all the types in other files that are used in this file
|
||||
# and generate the information required to import those types.
|
||||
for t in tlobjects:
|
||||
for arg in t.args:
|
||||
name = arg.type
|
||||
if not name or name in primitives:
|
||||
continue
|
||||
|
||||
import_space = '{}.tl.types'.format('.' * depth)
|
||||
if '.' in name:
|
||||
namespace = name.split('.')[0]
|
||||
name = name.split('.')[1]
|
||||
import_space += '.{}'.format(namespace)
|
||||
|
||||
if name not in type_names:
|
||||
type_names.add(name)
|
||||
if name == 'date':
|
||||
imports['datetime'] = ['datetime']
|
||||
continue
|
||||
elif import_space not in imports:
|
||||
imports[import_space] = set()
|
||||
imports[import_space].add('Type{}'.format(name))
|
||||
|
||||
# Add imports required for type checking
|
||||
if imports:
|
||||
builder.writeln('if TYPE_CHECKING:')
|
||||
for namespace, names in imports.items():
|
||||
builder.writeln('from {} import {}',
|
||||
namespace, ', '.join(names))
|
||||
|
||||
builder.end_block()
|
||||
|
||||
# Generate the class for every TLObject
|
||||
for t in tlobjects:
|
||||
TLGenerator._write_source_code(
|
||||
t, builder, depth, type_constructors
|
||||
)
|
||||
builder.current_indent = 0
|
||||
|
||||
# Write the type definitions generated earlier.
|
||||
builder.writeln('')
|
||||
for line in type_defs:
|
||||
builder.writeln(line)
|
||||
|
||||
@staticmethod
|
||||
def _write_source_code(tlobject, builder, depth, type_constructors):
|
||||
"""
|
||||
Writes the source code corresponding to the given TLObject
|
||||
by making use of the ``builder`` `SourceBuilder`.
|
||||
|
||||
Additional information such as file path depth and
|
||||
the ``Type: [Constructors]`` must be given for proper
|
||||
importing and documentation strings.
|
||||
"""
|
||||
builder.writeln()
|
||||
builder.writeln()
|
||||
builder.writeln('class {}(TLObject):', tlobject.class_name)
|
||||
|
||||
# Class-level variable to store its Telegram's constructor ID
|
||||
builder.writeln('CONSTRUCTOR_ID = {:#x}', tlobject.id)
|
||||
builder.writeln('SUBCLASS_OF_ID = {:#x}',
|
||||
crc32(tlobject.result.encode('ascii')))
|
||||
builder.writeln()
|
||||
|
||||
# Flag arguments must go last
|
||||
args = [
|
||||
a for a in tlobject.sorted_args()
|
||||
if not a.flag_indicator and not a.generic_definition
|
||||
]
|
||||
|
||||
# Convert the args to string parameters, flags having =None
|
||||
args = [
|
||||
(a.name if not a.is_flag and not a.can_be_inferred
|
||||
else '{}=None'.format(a.name))
|
||||
for a in args
|
||||
]
|
||||
|
||||
# Write the __init__ function
|
||||
if args:
|
||||
builder.writeln('def __init__(self, {}):', ', '.join(args))
|
||||
else:
|
||||
builder.writeln('def __init__(self):')
|
||||
|
||||
# Now update args to have the TLObject arguments, _except_
|
||||
# those which are calculated on send or ignored, this is
|
||||
# flag indicator and generic definitions.
|
||||
#
|
||||
# We don't need the generic definitions in Python
|
||||
# because arguments can be any type
|
||||
args = [arg for arg in tlobject.args
|
||||
if not arg.flag_indicator and
|
||||
not arg.generic_definition]
|
||||
|
||||
if args:
|
||||
# Write the docstring, to know the type of the args
|
||||
builder.writeln('"""')
|
||||
for arg in args:
|
||||
if not arg.flag_indicator:
|
||||
builder.writeln(':param {} {}:',
|
||||
arg.doc_type_hint(), arg.name)
|
||||
builder.current_indent -= 1 # It will auto-indent (':')
|
||||
|
||||
# We also want to know what type this request returns
|
||||
# or to which type this constructor belongs to
|
||||
builder.writeln()
|
||||
if tlobject.is_function:
|
||||
builder.write(':returns {}: ', tlobject.result)
|
||||
else:
|
||||
builder.write('Constructor for {}: ', tlobject.result)
|
||||
|
||||
constructors = type_constructors[tlobject.result]
|
||||
if not constructors:
|
||||
builder.writeln('This type has no constructors.')
|
||||
elif len(constructors) == 1:
|
||||
builder.writeln('Instance of {}.',
|
||||
constructors[0].class_name)
|
||||
else:
|
||||
builder.writeln('Instance of either {}.', ', '.join(
|
||||
c.class_name for c in constructors))
|
||||
|
||||
builder.writeln('"""')
|
||||
|
||||
builder.writeln('super().__init__()')
|
||||
# Functions have a result object and are confirmed by default
|
||||
if tlobject.is_function:
|
||||
builder.writeln('self.result = None')
|
||||
builder.writeln(
|
||||
'self.content_related = True')
|
||||
|
||||
# Set the arguments
|
||||
if args:
|
||||
# Leave an empty line if there are any args
|
||||
builder.writeln()
|
||||
|
||||
for arg in args:
|
||||
if not arg.can_be_inferred:
|
||||
builder.writeln('self.{0} = {0} # type: {1}',
|
||||
arg.name, arg.python_type_hint())
|
||||
continue
|
||||
|
||||
# Currently the only argument that can be
|
||||
# inferred are those called 'random_id'
|
||||
if arg.name == 'random_id':
|
||||
# Endianness doesn't really matter, and 'big' is shorter
|
||||
code = "int.from_bytes(os.urandom({}), 'big', signed=True)" \
|
||||
.format(8 if arg.type == 'long' else 4)
|
||||
|
||||
if arg.is_vector:
|
||||
# Currently for the case of "messages.forwardMessages"
|
||||
# Ensure we can infer the length from id:Vector<>
|
||||
if not next(
|
||||
a for a in args if a.name == 'id').is_vector:
|
||||
raise ValueError(
|
||||
'Cannot infer list of random ids for ', tlobject
|
||||
)
|
||||
code = '[{} for _ in range(len(id))]'.format(code)
|
||||
|
||||
builder.writeln(
|
||||
"self.random_id = random_id if random_id "
|
||||
"is not None else {}", code
|
||||
)
|
||||
else:
|
||||
raise ValueError('Cannot infer a value for ', arg)
|
||||
|
||||
builder.end_block()
|
||||
|
||||
# Write the resolve(self, client, utils) method
|
||||
if any(arg.type in AUTO_CASTS for arg in args):
|
||||
builder.writeln('def resolve(self, client, utils):')
|
||||
for arg in args:
|
||||
ac = AUTO_CASTS.get(arg.type, None)
|
||||
if ac:
|
||||
TLGenerator._write_self_assign(builder, arg, ac)
|
||||
builder.end_block()
|
||||
|
||||
# Write the to_dict(self) method
|
||||
builder.writeln('def to_dict(self):')
|
||||
builder.writeln('return {')
|
||||
builder.current_indent += 1
|
||||
|
||||
base_types = ('string', 'bytes', 'int', 'long', 'int128',
|
||||
'int256', 'double', 'Bool', 'true', 'date')
|
||||
|
||||
builder.write("'_': '{}'", tlobject.class_name)
|
||||
for arg in args:
|
||||
builder.writeln(',')
|
||||
builder.write("'{}': ", arg.name)
|
||||
if arg.type in base_types:
|
||||
if arg.is_vector:
|
||||
builder.write('[] if self.{0} is None else self.{0}[:]',
|
||||
arg.name)
|
||||
else:
|
||||
builder.write('self.{}', arg.name)
|
||||
else:
|
||||
if arg.is_vector:
|
||||
builder.write(
|
||||
'[] if self.{0} is None else [None '
|
||||
'if x is None else x.to_dict() for x in self.{0}]',
|
||||
arg.name
|
||||
)
|
||||
else:
|
||||
builder.write(
|
||||
'None if self.{0} is None else self.{0}.to_dict()',
|
||||
arg.name
|
||||
)
|
||||
|
||||
builder.writeln()
|
||||
builder.current_indent -= 1
|
||||
builder.writeln("}")
|
||||
|
||||
builder.end_block()
|
||||
|
||||
# Write the .__bytes__() function
|
||||
builder.writeln('def __bytes__(self):')
|
||||
|
||||
# Some objects require more than one flag parameter to be set
|
||||
# at the same time. In this case, add an assertion.
|
||||
repeated_args = defaultdict(list)
|
||||
for arg in tlobject.args:
|
||||
if arg.is_flag:
|
||||
repeated_args[arg.flag_index].append(arg)
|
||||
|
||||
for ra in repeated_args.values():
|
||||
if len(ra) > 1:
|
||||
cnd1 = ('(self.{0} or self.{0} is not None)'
|
||||
.format(a.name) for a in ra)
|
||||
cnd2 = ('(self.{0} is None or self.{0} is False)'
|
||||
.format(a.name) for a in ra)
|
||||
builder.writeln(
|
||||
"assert ({}) or ({}), '{} parameters must all "
|
||||
"be False-y (like None) or all me True-y'",
|
||||
' and '.join(cnd1), ' and '.join(cnd2),
|
||||
', '.join(a.name for a in ra)
|
||||
)
|
||||
|
||||
builder.writeln("return b''.join((")
|
||||
builder.current_indent += 1
|
||||
|
||||
# First constructor code, we already know its bytes
|
||||
builder.writeln('{},', repr(struct.pack('<I', tlobject.id)))
|
||||
|
||||
for arg in tlobject.args:
|
||||
if TLGenerator.write_to_bytes(builder, arg, tlobject.args):
|
||||
builder.writeln(',')
|
||||
|
||||
builder.current_indent -= 1
|
||||
builder.writeln('))')
|
||||
builder.end_block()
|
||||
|
||||
# Write the static from_reader(reader) function
|
||||
builder.writeln('@staticmethod')
|
||||
builder.writeln('def from_reader(reader):')
|
||||
for arg in tlobject.args:
|
||||
TLGenerator.write_read_code(
|
||||
builder, arg, tlobject.args, name='_' + arg.name
|
||||
)
|
||||
|
||||
builder.writeln(
|
||||
'return {}({})',
|
||||
tlobject.class_name,
|
||||
', '.join(
|
||||
'{0}=_{0}'.format(a.name) for a in tlobject.sorted_args()
|
||||
if not a.flag_indicator and not a.generic_definition
|
||||
)
|
||||
)
|
||||
|
||||
# Only requests can have a different response that's not their
|
||||
# serialized body, that is, we'll be setting their .result.
|
||||
#
|
||||
# The default behaviour is reading a TLObject too, so no need
|
||||
# to override it unless necessary.
|
||||
if tlobject.is_function and not TLGenerator._is_boxed(tlobject.result):
|
||||
builder.end_block()
|
||||
builder.writeln('def on_response(self, reader):')
|
||||
TLGenerator.write_request_result_code(builder, tlobject)
|
||||
|
||||
@staticmethod
|
||||
def _is_boxed(type_):
|
||||
# https://core.telegram.org/mtproto/serialize#boxed-and-bare-types
|
||||
# TL;DR; boxed types start with uppercase always, so we can use
|
||||
# this to check whether everything in it is boxed or not.
|
||||
#
|
||||
# The API always returns a boxed type, but it may inside a Vector<>
|
||||
# or a namespace, and the Vector may have a not-boxed type. For this
|
||||
# reason we find whatever index, '<' or '.'. If neither are present
|
||||
# we will get -1, and the 0th char is always upper case thus works.
|
||||
# For Vector types and namespaces, it will check in the right place.
|
||||
check_after = max(type_.find('<'), type_.find('.'))
|
||||
return type_[check_after + 1].isupper()
|
||||
|
||||
@staticmethod
|
||||
def _write_self_assign(builder, arg, get_input_code):
|
||||
"""Writes self.arg = input.format(self.arg), considering vectors."""
|
||||
if arg.is_vector:
|
||||
builder.write('self.{0} = [{1} for _x in self.{0}]',
|
||||
arg.name, get_input_code.format('_x'))
|
||||
else:
|
||||
builder.write('self.{} = {}',
|
||||
arg.name, get_input_code.format('self.' + arg.name))
|
||||
|
||||
builder.writeln(
|
||||
' if self.{} else None'.format(arg.name) if arg.is_flag else ''
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def get_file_name(tlobject, add_extension=False):
|
||||
"""Gets the file name in file_name_format.py for the given TLObject"""
|
||||
|
||||
# Courtesy of http://stackoverflow.com/a/1176023/4759433
|
||||
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', tlobject.name)
|
||||
result = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
|
||||
if add_extension:
|
||||
return result + '.py'
|
||||
else:
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def write_to_bytes(builder, arg, args, name=None):
|
||||
"""
|
||||
Writes the .__bytes__() code for the given argument
|
||||
:param builder: The source code builder
|
||||
:param arg: The argument to write
|
||||
:param args: All the other arguments in TLObject same __bytes__.
|
||||
This is required to determine the flags value
|
||||
:param name: The name of the argument. Defaults to "self.argname"
|
||||
This argument is an option because it's required when
|
||||
writing Vectors<>
|
||||
"""
|
||||
if arg.generic_definition:
|
||||
return # Do nothing, this only specifies a later type
|
||||
|
||||
if name is None:
|
||||
name = 'self.{}'.format(arg.name)
|
||||
|
||||
# The argument may be a flag, only write if it's not None AND
|
||||
# if it's not a True type.
|
||||
# True types are not actually sent, but instead only used to
|
||||
# determine the flags.
|
||||
if arg.is_flag:
|
||||
if arg.type == 'true':
|
||||
return # Exit, since True type is never written
|
||||
elif arg.is_vector:
|
||||
# Vector flags are special since they consist of 3 values,
|
||||
# so we need an extra join here. Note that empty vector flags
|
||||
# should NOT be sent either!
|
||||
builder.write("b'' if {0} is None or {0} is False "
|
||||
"else b''.join((", name)
|
||||
else:
|
||||
builder.write("b'' if {0} is None or {0} is False "
|
||||
"else (", name)
|
||||
|
||||
if arg.is_vector:
|
||||
if arg.use_vector_id:
|
||||
# vector code, unsigned 0x1cb5c415 as little endian
|
||||
builder.write(r"b'\x15\xc4\xb5\x1c',")
|
||||
|
||||
builder.write("struct.pack('<i', len({})),", name)
|
||||
|
||||
# Cannot unpack the values for the outer tuple through *[(
|
||||
# since that's a Python >3.5 feature, so add another join.
|
||||
builder.write("b''.join(")
|
||||
|
||||
# Temporary disable .is_vector, not to enter this if again
|
||||
# Also disable .is_flag since it's not needed per element
|
||||
old_flag = arg.is_flag
|
||||
arg.is_vector = arg.is_flag = False
|
||||
TLGenerator.write_to_bytes(builder, arg, args, name='x')
|
||||
arg.is_vector = True
|
||||
arg.is_flag = old_flag
|
||||
|
||||
builder.write(' for x in {})', name)
|
||||
|
||||
elif arg.flag_indicator:
|
||||
# Calculate the flags with those items which are not None
|
||||
if not any(f.is_flag for f in args):
|
||||
# There's a flag indicator, but no flag arguments so it's 0
|
||||
builder.write(r"b'\0\0\0\0'")
|
||||
else:
|
||||
builder.write("struct.pack('<I', ")
|
||||
builder.write(
|
||||
' | '.join('(0 if {0} is None or {0} is False else {1})'
|
||||
.format('self.{}'.format(flag.name),
|
||||
1 << flag.flag_index)
|
||||
for flag in args if flag.is_flag)
|
||||
)
|
||||
builder.write(')')
|
||||
|
||||
elif 'int' == arg.type:
|
||||
# struct.pack is around 4 times faster than int.to_bytes
|
||||
builder.write("struct.pack('<i', {})", name)
|
||||
|
||||
elif 'long' == arg.type:
|
||||
builder.write("struct.pack('<q', {})", name)
|
||||
|
||||
elif 'int128' == arg.type:
|
||||
builder.write("{}.to_bytes(16, 'little', signed=True)", name)
|
||||
|
||||
elif 'int256' == arg.type:
|
||||
builder.write("{}.to_bytes(32, 'little', signed=True)", name)
|
||||
|
||||
elif 'double' == arg.type:
|
||||
builder.write("struct.pack('<d', {})", name)
|
||||
|
||||
elif 'string' == arg.type:
|
||||
builder.write('TLObject.serialize_bytes({})', name)
|
||||
|
||||
elif 'Bool' == arg.type:
|
||||
# 0x997275b5 if boolean else 0xbc799737
|
||||
builder.write(r"b'\xb5ur\x99' if {} else b'7\x97y\xbc'", name)
|
||||
|
||||
elif 'true' == arg.type:
|
||||
pass # These are actually NOT written! Only used for flags
|
||||
|
||||
elif 'bytes' == arg.type:
|
||||
builder.write('TLObject.serialize_bytes({})', name)
|
||||
|
||||
elif 'date' == arg.type: # Custom format
|
||||
builder.write('TLObject.serialize_datetime({})', name)
|
||||
|
||||
else:
|
||||
# Else it may be a custom type
|
||||
builder.write('bytes({})', name)
|
||||
|
||||
if arg.is_flag:
|
||||
builder.write(')')
|
||||
if arg.is_vector:
|
||||
builder.write(')') # We were using a tuple
|
||||
|
||||
return True # Something was written
|
||||
|
||||
@staticmethod
|
||||
def write_read_code(builder, arg, args, name):
|
||||
"""
|
||||
Writes the read code for the given argument, setting the
|
||||
arg.name variable to its read value.
|
||||
|
||||
:param builder: The source code builder
|
||||
:param arg: The argument to write
|
||||
:param args: All the other arguments in TLObject same on_send.
|
||||
This is required to determine the flags value
|
||||
:param name: The name of the argument. Defaults to "self.argname"
|
||||
This argument is an option because it's required when
|
||||
writing Vectors<>
|
||||
"""
|
||||
|
||||
if arg.generic_definition:
|
||||
return # Do nothing, this only specifies a later type
|
||||
|
||||
# The argument may be a flag, only write that flag was given!
|
||||
was_flag = False
|
||||
if arg.is_flag:
|
||||
# Treat 'true' flags as a special case, since they're true if
|
||||
# they're set, and nothing else needs to actually be read.
|
||||
if 'true' == arg.type:
|
||||
builder.writeln('{} = bool(flags & {})',
|
||||
name, 1 << arg.flag_index)
|
||||
return
|
||||
|
||||
was_flag = True
|
||||
builder.writeln('if flags & {}:', 1 << arg.flag_index)
|
||||
# Temporary disable .is_flag not to enter this if
|
||||
# again when calling the method recursively
|
||||
arg.is_flag = False
|
||||
|
||||
if arg.is_vector:
|
||||
if arg.use_vector_id:
|
||||
# We have to read the vector's constructor ID
|
||||
builder.writeln("reader.read_int()")
|
||||
|
||||
builder.writeln('{} = []', name)
|
||||
builder.writeln('for _ in range(reader.read_int()):')
|
||||
# Temporary disable .is_vector, not to enter this if again
|
||||
arg.is_vector = False
|
||||
TLGenerator.write_read_code(builder, arg, args, name='_x')
|
||||
builder.writeln('{}.append(_x)', name)
|
||||
arg.is_vector = True
|
||||
|
||||
elif arg.flag_indicator:
|
||||
# Read the flags, which will indicate what items we should read next
|
||||
builder.writeln('flags = reader.read_int()')
|
||||
builder.writeln()
|
||||
|
||||
elif 'int' == arg.type:
|
||||
builder.writeln('{} = reader.read_int()', name)
|
||||
|
||||
elif 'long' == arg.type:
|
||||
builder.writeln('{} = reader.read_long()', name)
|
||||
|
||||
elif 'int128' == arg.type:
|
||||
builder.writeln('{} = reader.read_large_int(bits=128)', name)
|
||||
|
||||
elif 'int256' == arg.type:
|
||||
builder.writeln('{} = reader.read_large_int(bits=256)', name)
|
||||
|
||||
elif 'double' == arg.type:
|
||||
builder.writeln('{} = reader.read_double()', name)
|
||||
|
||||
elif 'string' == arg.type:
|
||||
builder.writeln('{} = reader.tgread_string()', name)
|
||||
|
||||
elif 'Bool' == arg.type:
|
||||
builder.writeln('{} = reader.tgread_bool()', name)
|
||||
|
||||
elif 'true' == arg.type:
|
||||
# Arbitrary not-None value, don't actually read "true" flags
|
||||
builder.writeln('{} = True', name)
|
||||
|
||||
elif 'bytes' == arg.type:
|
||||
builder.writeln('{} = reader.tgread_bytes()', name)
|
||||
|
||||
elif 'date' == arg.type: # Custom format
|
||||
builder.writeln('{} = reader.tgread_date()', name)
|
||||
|
||||
else:
|
||||
# Else it may be a custom type
|
||||
if not arg.skip_constructor_id:
|
||||
builder.writeln('{} = reader.tgread_object()', name)
|
||||
else:
|
||||
# Import the correct type inline to avoid cyclic imports.
|
||||
# There may be better solutions so that we can just access
|
||||
# all the types before the files have been parsed, but I
|
||||
# don't know of any.
|
||||
sep_index = arg.type.find('.')
|
||||
if sep_index == -1:
|
||||
ns, t = '.', arg.type
|
||||
else:
|
||||
ns, t = '.' + arg.type[:sep_index], arg.type[sep_index+1:]
|
||||
class_name = snake_to_camel_case(t)
|
||||
|
||||
# There would be no need to import the type if we're in the
|
||||
# file with the same namespace, but since it does no harm
|
||||
# and we don't have information about such thing in the
|
||||
# method we just ignore that case.
|
||||
builder.writeln('from {} import {}', ns, class_name)
|
||||
builder.writeln('{} = {}.from_reader(reader)',
|
||||
name, class_name)
|
||||
|
||||
# End vector and flag blocks if required (if we opened them before)
|
||||
if arg.is_vector:
|
||||
builder.end_block()
|
||||
|
||||
if was_flag:
|
||||
builder.current_indent -= 1
|
||||
builder.writeln('else:')
|
||||
builder.writeln('{} = None', name)
|
||||
builder.current_indent -= 1
|
||||
# Restore .is_flag
|
||||
arg.is_flag = True
|
||||
|
||||
@staticmethod
|
||||
def write_request_result_code(builder, tlobject):
|
||||
"""
|
||||
Writes the receive code for the given function
|
||||
|
||||
:param builder: The source code builder
|
||||
:param tlobject: The TLObject for which the 'self.result = '
|
||||
will be written
|
||||
"""
|
||||
if tlobject.result.startswith('Vector<'):
|
||||
# Vector results are a bit special since they can also be composed
|
||||
# of integer values and such; however, the result of requests is
|
||||
# not parsed as arguments are and it's a bit harder to tell which
|
||||
# is which.
|
||||
if tlobject.result == 'Vector<int>':
|
||||
builder.writeln('reader.read_int() # Vector ID')
|
||||
builder.writeln('count = reader.read_int()')
|
||||
builder.writeln(
|
||||
'self.result = [reader.read_int() for _ in range(count)]'
|
||||
)
|
||||
elif tlobject.result == 'Vector<long>':
|
||||
builder.writeln('reader.read_int() # Vector ID')
|
||||
builder.writeln('count = reader.read_long()')
|
||||
builder.writeln(
|
||||
'self.result = [reader.read_long() for _ in range(count)]'
|
||||
)
|
||||
else:
|
||||
builder.writeln('self.result = reader.tgread_vector()')
|
||||
else:
|
||||
builder.writeln('self.result = reader.tgread_object()')
|
Loading…
Reference in New Issue
Block a user