Merge pull request #766 from LonamiWebs/tidygenerator

Tidy up the telethon-generator package
This commit is contained in:
Lonami 2018-04-15 13:21:26 +02:00 committed by GitHub
commit ffdfa8f262
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1475 additions and 1582 deletions

1
.gitignore vendored
View File

@ -1,5 +1,6 @@
# Docs
_build/
docs/
# Generated code
telethon/tl/functions/

1
docs/.gitignore vendored
View File

@ -1 +0,0 @@
generated/

View File

@ -54,7 +54,7 @@ Manual Installation
3. Enter the cloned repository: ``cd Telethon``
4. Run the code generator: ``python3 setup.py gen_tl``
4. Run the code generator: ``python3 setup.py gen tl errors``
5. Done!

117
setup.py
View File

@ -6,16 +6,16 @@ https://packaging.python.org/en/latest/distributing.html
https://github.com/pypa/sampleproject
Extra supported commands are:
* gen_tl, to generate the classes required for Telethon to run
* clean_tl, to clean these generated classes
* gen, to generate the classes required for Telethon to run or docs
* pypi, to generate sdist, bdist_wheel, and push to PyPi
"""
# To use a consistent encoding
from codecs import open
from sys import argv, version_info
import os
import re
# To use a consistent encoding
import shutil
from codecs import open
from sys import argv, version_info
# Always prefer setuptools over distutils
from setuptools import find_packages, setup
@ -37,44 +37,85 @@ class TempWorkDir:
os.chdir(self.original)
ERROR_LIST = 'telethon/errors/rpc_error_list.py'
ERRORS_JSON = 'telethon_generator/errors.json'
ERRORS_DESC = 'telethon_generator/error_descriptions'
SCHEME_TL = 'telethon_generator/scheme.tl'
GENERATOR_DIR = 'telethon/tl'
GENERATOR_DIR = 'telethon_generator'
LIBRARY_DIR = 'telethon'
ERRORS_IN_JSON = os.path.join(GENERATOR_DIR, 'data', 'errors.json')
ERRORS_IN_DESC = os.path.join(GENERATOR_DIR, 'data', 'error_descriptions')
ERRORS_OUT = os.path.join(LIBRARY_DIR, 'errors', 'rpc_error_list.py')
TLOBJECT_IN_TL = os.path.join(GENERATOR_DIR, 'data', 'scheme.tl')
TLOBJECT_OUT = os.path.join(LIBRARY_DIR, 'tl')
IMPORT_DEPTH = 2
DOCS_IN_RES = os.path.join(GENERATOR_DIR, 'data', 'html')
DOCS_OUT = 'docs'
def gen_tl(force=True):
from telethon_generator.tl_generator import TLGenerator
from telethon_generator.error_generator import generate_code
generator = TLGenerator(GENERATOR_DIR)
if generator.tlobjects_exist():
if not force:
return
print('Detected previous TLObjects. Cleaning...')
generator.clean_tlobjects()
print('Generating TLObjects...')
generator.generate_tlobjects(SCHEME_TL, import_depth=IMPORT_DEPTH)
print('Generating errors...')
generate_code(ERROR_LIST, json_file=ERRORS_JSON, errors_desc=ERRORS_DESC)
print('Done.')
def generate(which):
from telethon_generator.parsers import parse_errors, parse_tl, find_layer
from telethon_generator.generators import\
generate_errors, generate_tlobjects, generate_docs, clean_tlobjects
tlobjects = list(parse_tl(TLOBJECT_IN_TL, ignore_core=True))
errors = list(parse_errors(ERRORS_IN_JSON, ERRORS_IN_DESC))
layer = find_layer(TLOBJECT_IN_TL)
if not which:
which.extend(('tl', 'errors'))
clean = 'clean' in which
action = 'Cleaning' if clean else 'Generating'
if clean:
which.remove('clean')
if 'all' in which:
which.remove('all')
for x in ('tl', 'errors', 'docs'):
if x not in which:
which.append(x)
if 'tl' in which:
which.remove('tl')
print(action, 'TLObjects...')
if clean:
clean_tlobjects(TLOBJECT_OUT)
else:
generate_tlobjects(tlobjects, layer, IMPORT_DEPTH, TLOBJECT_OUT)
if 'errors' in which:
which.remove('errors')
print(action, 'RPCErrors...')
if clean:
if os.path.isfile(ERRORS_OUT):
os.remove(ERRORS_OUT)
else:
with open(ERRORS_OUT, 'w', encoding='utf-8') as file:
generate_errors(errors, file)
if 'docs' in which:
which.remove('docs')
print(action, 'documentation...')
if clean:
if os.path.isdir(DOCS_OUT):
shutil.rmtree(DOCS_OUT)
else:
generate_docs(tlobjects, errors, layer, DOCS_IN_RES, DOCS_OUT)
if which:
print('The following items were not understood:', which)
print(' Consider using only "tl", "errors" and/or "docs".')
print(' Using only "clean" will clean them. "all" to act on all.')
print(' For instance "gen tl errors".')
def main():
if len(argv) >= 2 and argv[1] == 'gen_tl':
gen_tl()
elif len(argv) >= 2 and argv[1] == 'clean_tl':
from telethon_generator.tl_generator import TLGenerator
print('Cleaning...')
TLGenerator(GENERATOR_DIR).clean_tlobjects()
print('Done.')
if len(argv) >= 2 and argv[1] == 'gen':
generate(argv[2:])
elif len(argv) >= 2 and argv[1] == 'pypi':
# (Re)generate the code to make sure we don't push without it
gen_tl()
generate(['clean', 'tl', 'errors'])
# Try importing the telethon module to assert it has no errors
try:
@ -96,14 +137,10 @@ def main():
for x in ('build', 'dist', 'Telethon.egg-info'):
rmtree(x, ignore_errors=True)
elif len(argv) >= 2 and argv[1] == 'fetch_errors':
from telethon_generator.error_generator import fetch_errors
fetch_errors(ERRORS_JSON)
else:
# Call gen_tl() if the scheme.tl file exists, e.g. install from GitHub
if os.path.isfile(SCHEME_TL):
gen_tl(force=False)
# e.g. install from GitHub
if os.path.isfile(GENERATOR_DIR):
generate(['clean', 'tl', 'errors'])
# Get the long description from the README file
with open('README.rst', encoding='utf-8') as f:

View File

@ -12,6 +12,7 @@ from .common import (
)
# This imports the base errors too, as they're imported there
from .rpc_base_errors import *
from .rpc_error_list import *

View File

@ -189,6 +189,6 @@ class TLObject:
def __bytes__(self):
return b''
@staticmethod
def from_reader(reader):
@classmethod
def from_reader(cls, reader):
return TLObject()

View File

Before

Width:  |  Height:  |  Size: 1.0 KiB

After

Width:  |  Height:  |  Size: 1.0 KiB

View File

@ -4,7 +4,7 @@ import re
class DocsWriter:
"""Utility class used to write the HTML files used on the documentation"""
def __init__(self, filename, type_to_path_function):
def __init__(self, filename, type_to_path):
"""Initializes the writer to the specified output file,
creating the parent directories when used if required.
@ -19,7 +19,7 @@ class DocsWriter:
self.menu_separator_tag = None
# Utility functions TODO There must be a better way
self.type_to_path = lambda t: type_to_path_function(
self.type_to_path = lambda t: type_to_path(
t, relative_to=self.filename
)

View File

@ -1,177 +0,0 @@
import json
import re
import urllib.request
from collections import defaultdict
URL = 'https://rpc.pwrtelegram.xyz/?all'
known_base_classes = {
303: 'InvalidDCError',
400: 'BadRequestError',
401: 'UnauthorizedError',
403: 'ForbiddenError',
404: 'NotFoundError',
406: 'AuthKeyError',
420: 'FloodError',
500: 'ServerError',
}
# The API doesn't return the code for some (vital) errors. They are
# all assumed to be 400, except these well-known ones that aren't.
known_codes = {
'ACTIVE_USER_REQUIRED': 401,
'AUTH_KEY_UNREGISTERED': 401,
'USER_DEACTIVATED': 401
}
def fetch_errors(output, url=URL):
print('Opening a connection to', url, '...')
r = urllib.request.urlopen(urllib.request.Request(
url, headers={'User-Agent' : 'Mozilla/5.0'}
))
print('Checking response...')
data = json.loads(
r.read().decode(r.info().get_param('charset') or 'utf-8')
)
if data.get('ok'):
print('Response was okay, saving data')
with open(output, 'w', encoding='utf-8') as f:
json.dump(data, f, sort_keys=True)
return True
else:
print('The data received was not okay:')
print(json.dumps(data, indent=4, sort_keys=True))
return False
def get_class_name(error_code):
if isinstance(error_code, int):
return known_base_classes.get(
error_code, 'RPCError' + str(error_code).replace('-', 'Neg')
)
if 'FIRSTNAME' in error_code:
error_code = error_code.replace('FIRSTNAME', 'FIRST_NAME')
result = re.sub(
r'_([a-z])', lambda m: m.group(1).upper(), error_code.lower()
)
return result[:1].upper() + result[1:].replace('_', '') + 'Error'
def write_error(f, code, name, desc, capture_name):
f.write(
'\n\nclass {}({}):\n def __init__(self, **kwargs):\n '
''.format(name, get_class_name(code))
)
if capture_name:
f.write(
"self.{} = int(kwargs.get('capture', 0))\n ".format(capture_name)
)
f.write('super(Exception, self).__init__({}'.format(repr(desc)))
if capture_name:
f.write('.format(self.{})'.format(capture_name))
f.write(')\n')
def generate_code(output, json_file, errors_desc):
with open(json_file, encoding='utf-8') as f:
data = json.load(f)
errors = defaultdict(set)
# PWRTelegram's API doesn't return all errors, which we do need here.
# Add some special known-cases manually first.
errors[420].update((
'FLOOD_WAIT_X', 'FLOOD_TEST_PHONE_WAIT_X'
))
errors[401].update((
'AUTH_KEY_INVALID', 'SESSION_EXPIRED', 'SESSION_REVOKED'
))
errors[303].update((
'FILE_MIGRATE_X', 'PHONE_MIGRATE_X',
'NETWORK_MIGRATE_X', 'USER_MIGRATE_X'
))
for error_code, method_errors in data['result'].items():
for error_list in method_errors.values():
for error in error_list:
errors[int(error_code)].add(re.sub('_\d+', '_X', error).upper())
# Some errors are in the human result, but not with a code. Assume code 400
for error in data['human_result']:
if error[0] != '-' and not error.isdigit():
error = re.sub('_\d+', '_X', error).upper()
if not any(error in es for es in errors.values()):
errors[known_codes.get(error, 400)].add(error)
# Some error codes are not known, so create custom base classes if needed
needed_base_classes = [
(e, get_class_name(e)) for e in errors if e not in known_base_classes
]
# Prefer the descriptions that are related with Telethon way of coding to
# those that PWRTelegram's API provides.
telethon_descriptions = {}
with open(errors_desc, encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
equal = line.index('=')
message, description = line[:equal], line[equal + 1:]
telethon_descriptions[message.rstrip()] = description.lstrip()
# Names for the captures, or 'x' if unknown
capture_names = {
'FloodWaitError': 'seconds',
'FloodTestPhoneWaitError': 'seconds',
'FileMigrateError': 'new_dc',
'NetworkMigrateError': 'new_dc',
'PhoneMigrateError': 'new_dc',
'UserMigrateError': 'new_dc',
'FilePartMissingError': 'which'
}
# Everything ready, generate the code
with open(output, 'w', encoding='utf-8') as f:
f.write(
'from .rpc_base_errors import RPCError, BadMessageError, {}\n'.format(
", ".join(known_base_classes.values()))
)
for code, cls in needed_base_classes:
f.write(
'\n\nclass {}(RPCError):\n code = {}\n'.format(cls, code)
)
patterns = [] # Save this dictionary later in the generated code
for error_code, error_set in errors.items():
for error in sorted(error_set):
description = telethon_descriptions.get(
error, '\n'.join(data['human_result'].get(
error, ['No description known.']
))
)
has_captures = '_X' in error
if has_captures:
name = get_class_name(error.replace('_X', ''))
pattern = error.replace('_X', r'_(\d+)')
else:
name, pattern = get_class_name(error), error
patterns.append((pattern, name))
capture = capture_names.get(name, 'x') if has_captures else None
# TODO Some errors have the same name but different code,
# split this across different files?
write_error(f, error_code, name, description, capture)
f.write('\n\nrpc_errors_all = {\n')
for pattern, name in patterns:
f.write(' {}: {},\n'.format(repr(pattern), name))
f.write('}\n')
if __name__ == '__main__':
if input('generate (y/n)?: ').lower() == 'y':
generate_code('../telethon/errors/rpc_error_list.py',
'errors.json', 'error_descriptions')
elif input('fetch (y/n)?: ').lower() == 'y':
fetch_errors('errors.json')

View File

@ -0,0 +1,32 @@
import sys
import json
import urllib.request
OUT = 'data/errors.json'
URL = 'https://rpc.pwrtelegram.xyz/?all'
def fetch_errors(output, url=URL):
print('Opening a connection to', url, '...')
r = urllib.request.urlopen(urllib.request.Request(
url, headers={'User-Agent' : 'Mozilla/5.0'}
))
print('Checking response...')
data = json.loads(
r.read().decode(r.info().get_param('charset') or 'utf-8')
)
if data.get('ok'):
print('Response was okay, saving data')
with open(output, 'w', encoding='utf-8') as f:
json.dump(data, f, sort_keys=True)
return True
else:
print('The data received was not okay:')
print(json.dumps(data, indent=4, sort_keys=True))
return False
if __name__ == '__main__':
out = OUT if len(sys.argv) < 2 else sys.argv[2]
url = URL if len(sys.argv) < 3 else sys.argv[3]
fetch_errors(out, url)

View File

@ -0,0 +1,26 @@
from telethon_generator.parsers import parse_errors, parse_tl, find_layer
from telethon_generator.generators import\
generate_errors, generate_tlobjects, generate_docs
ERRORS_INPUT_JSON = 'data/errors.json'
ERRORS_INPUT_DESC = 'data/error_descriptions'
ERRORS_OUTPUT = '../telethon/errors/rpc_error_list.py'
TLOBJECT_INPUT_TL = 'data/scheme.tl'
TLOBJECT_OUTPUT = '../telethon/tl'
DOCS_INPUT_RES = 'data/html'
DOCS_OUTPUT = '../docs'
if __name__ == '__main__':
tlobjects = list(parse_tl(TLOBJECT_INPUT_TL, ignore_core=True))
errors = list(parse_errors(ERRORS_INPUT_JSON, ERRORS_INPUT_DESC))
layer = find_layer(TLOBJECT_INPUT_TL)
generate_tlobjects(tlobjects, layer, TLOBJECT_OUTPUT)
with open(ERRORS_OUTPUT, 'w', encoding='utf-8') as file:
generate_errors(errors, file)
generate_docs(tlobjects, errors, layer, DOCS_INPUT_RES, DOCS_OUTPUT)

View File

@ -0,0 +1,3 @@
from .errors import generate_errors
from .tlobject import generate_tlobjects, clean_tlobjects
from .docs import generate_docs

View File

@ -1,124 +1,74 @@
#!/usr/bin/env python3
import functools
import os
import re
import sys
import shutil
try:
from .docs_writer import DocsWriter
except (ImportError, SystemError):
from docs_writer import DocsWriter
from collections import defaultdict
# Small trick so importing telethon_generator works
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from telethon_generator.parser import TLParser, TLObject
from ..docs_writer import DocsWriter
from ..parsers import TLObject
from ..utils import snake_to_camel_case
# TLObject -> Python class name
def get_class_name(tlobject):
"""Gets the class name following the Python style guidelines"""
# Courtesy of http://stackoverflow.com/a/31531797/4759433
CORE_TYPES = {
'int', 'long', 'int128', 'int256', 'double',
'vector', 'string', 'bool', 'true', 'bytes', 'date'
}
def _get_file_name(tlobject):
"""``ClassName -> class_name.html``."""
name = tlobject.name if isinstance(tlobject, TLObject) else tlobject
result = re.sub(r'_([a-z])', lambda m: m.group(1).upper(), name)
# Replace '_' with '' once again to make sure it doesn't appear on the name
result = result[:1].upper() + result[1:].replace('_', '')
# If it's a function, let it end with "Request" to identify them more easily
if isinstance(tlobject, TLObject) and tlobject.is_function:
result += 'Request'
return result
# TLObject -> filename
def get_file_name(tlobject, add_extension=False):
"""Gets the file name in file_name_format.html for the given TLObject.
Only its name may also be given if the full TLObject is not available"""
if isinstance(tlobject, TLObject):
name = tlobject.name
else:
name = tlobject
# Courtesy of http://stackoverflow.com/a/1176023/4759433
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
result = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
if add_extension:
return result + '.html'
else:
return result
return '{}.html'.format(result)
# TLObject -> from ... import ...
def get_import_code(tlobject):
"""``TLObject -> from ... import ...``."""
kind = 'functions' if tlobject.is_function else 'types'
ns = '.' + tlobject.namespace if tlobject.namespace else ''
return 'from telethon.tl.{}{} import {}'\
.format(kind, ns, get_class_name(tlobject))
.format(kind, ns, tlobject.class_name)
def get_create_path_for(tlobject):
"""Gets the file path (and creates the parent directories)
for the given 'tlobject', relative to nothing; only its local path"""
# Determine the output directory
def _get_create_path_for(root, tlobject):
"""Creates and returns the path for the given TLObject at root."""
out_dir = 'methods' if tlobject.is_function else 'constructors'
if tlobject.namespace:
out_dir = os.path.join(out_dir, tlobject.namespace)
# Ensure that it exists
out_dir = os.path.join(root, out_dir)
os.makedirs(out_dir, exist_ok=True)
# Return the resulting filename
return os.path.join(out_dir, get_file_name(tlobject, add_extension=True))
return os.path.join(out_dir, _get_file_name(tlobject))
def is_core_type(type_):
"""Returns "true" if the type is considered a core type"""
return type_.lower() in {
'int', 'long', 'int128', 'int256', 'double',
'vector', 'string', 'bool', 'true', 'bytes', 'date'
}
def get_path_for_type(type_, relative_to='.'):
"""Similar to getting the path for a TLObject, it might not be possible
to have the TLObject itself but rather its name (the type);
this method works in the same way, returning a relative path"""
if is_core_type(type_):
def get_path_for_type(root, type_, relative_to='.'):
"""Similar to `_get_create_path_for` but for only type names."""
if type_.lower() in CORE_TYPES:
path = 'index.html#%s' % type_.lower()
elif '.' in type_:
# If it's not a core type, then it has to be a custom Telegram type
namespace, name = type_.split('.')
path = 'types/%s/%s' % (namespace, get_file_name(name, True))
path = 'types/%s/%s' % (namespace, _get_file_name(name))
else:
path = 'types/%s' % get_file_name(type_, True)
path = 'types/%s' % _get_file_name(type_)
return get_relative_path(path, relative_to)
return _get_relative_path(os.path.join(root, path), relative_to)
# Destination path from the current position -> relative to the given path
def get_relative_path(destination, relative_to):
if os.path.isfile(relative_to):
def _get_relative_path(destination, relative_to, folder=False):
"""Return the relative path to destination from relative_to."""
if not folder:
relative_to = os.path.dirname(relative_to)
return os.path.relpath(destination, start=relative_to)
def get_relative_paths(original, relative_to):
"""Converts the dictionary of 'original' paths to relative paths
starting from the given 'relative_to' file"""
return {k: get_relative_path(v, relative_to) for k, v in original.items()}
# Generate a index.html file for the given folder
def find_title(html_file):
"""Finds the <title> for the given HTML file, or (Unknown)"""
with open(html_file) as handle:
for line in handle:
def _find_title(html_file):
"""Finds the <title> for the given HTML file, or (Unknown)."""
with open(html_file) as fp:
for line in fp:
if '<title>' in line:
# + 7 to skip len('<title>')
return line[line.index('<title>') + 7:line.index('</title>')]
@ -126,10 +76,11 @@ def find_title(html_file):
return '(Unknown)'
def build_menu(docs, filename, relative_main_index):
def _build_menu(docs, filename, root, relative_main_index):
"""Builds the menu using the given DocumentWriter up to 'filename',
which must be a file (it cannot be a directory)"""
# TODO Maybe this could be part of DocsWriter itself, "build path menu"
filename = _get_relative_path(filename, root)
docs.add_menu('API', relative_main_index)
items = filename.split('/')
@ -144,9 +95,8 @@ def build_menu(docs, filename, relative_main_index):
docs.end_menu()
def generate_index(folder, original_paths):
def _generate_index(folder, original_paths, root):
"""Generates the index file for the specified folder"""
# Determine the namespaces listed here (as sub folders)
# and the files (.html files) that we should link to
namespaces = []
@ -157,27 +107,28 @@ def generate_index(folder, original_paths):
elif item != 'index.html':
files.append(item)
# We work with relative paths
paths = get_relative_paths(original_paths, relative_to=folder)
paths = {k: _get_relative_path(v, folder, folder=True)
for k, v in original_paths.items()}
# Now that everything is setup, write the index.html file
filename = os.path.join(folder, 'index.html')
with DocsWriter(filename, type_to_path_function=get_path_for_type) as docs:
with DocsWriter(filename, type_to_path=get_path_for_type) as docs:
# Title should be the current folder name
docs.write_head(folder.title(), relative_css_path=paths['css'])
docs.set_menu_separator(paths['arrow'])
build_menu(docs, filename, relative_main_index=paths['index_all'])
docs.write_title(folder.title())
_build_menu(docs, filename, root,
relative_main_index=paths['index_all'])
docs.write_title(_get_relative_path(folder, root, folder=True).title())
if namespaces:
docs.write_title('Namespaces', level=3)
docs.begin_table(4)
namespaces.sort()
for namespace in namespaces:
# For every namespace, also write the index of it
generate_index(os.path.join(folder, namespace), original_paths)
_generate_index(os.path.join(folder, namespace),
original_paths, root)
docs.add_row(namespace.title(),
link=os.path.join(namespace, 'index.html'))
@ -186,7 +137,7 @@ def generate_index(folder, original_paths):
docs.write_title('Available items')
docs.begin_table(2)
files = [(f, find_title(os.path.join(folder, f))) for f in files]
files = [(f, _find_title(os.path.join(folder, f))) for f in files]
files.sort(key=lambda t: t[1])
for file, title in files:
@ -196,8 +147,8 @@ def generate_index(folder, original_paths):
docs.end_body()
def get_description(arg):
"""Generates a proper description for the given argument"""
def _get_description(arg):
"""Generates a proper description for the given argument."""
desc = []
otherwise = False
if arg.can_be_inferred:
@ -235,7 +186,7 @@ def get_description(arg):
)
def copy_replace(src, dst, replacements):
def _copy_replace(src, dst, replacements):
"""Copies the src file into dst applying the replacements dict"""
with open(src) as infile, open(dst, 'w') as outfile:
outfile.write(re.sub(
@ -245,10 +196,15 @@ def copy_replace(src, dst, replacements):
))
def generate_documentation(scheme_file):
"""Generates the documentation HTML files from from scheme.tl to
/methods and /constructors, etc.
def _write_html_pages(tlobjects, errors, layer, input_res, output_dir):
"""
Generates the documentation HTML files from from ``scheme.tl``
to ``/methods`` and ``/constructors``, etc.
"""
# Save 'Type: [Constructors]' for use in both:
# * Seeing the return type or constructors belonging to the same type.
# * Generating the types documentation, showing available constructors.
# TODO Tried using 'defaultdict(list)' with strange results, make it work.
original_paths = {
'css': 'css/docs.css',
'arrow': 'img/arrow.svg',
@ -259,46 +215,46 @@ def generate_documentation(scheme_file):
'index_methods': 'methods/index.html',
'index_constructors': 'constructors/index.html'
}
tlobjects = tuple(TLParser.parse_file(scheme_file))
original_paths = {k: os.path.join(output_dir, v)
for k, v in original_paths.items()}
print('Generating constructors and functions documentation...')
# Save 'Type: [Constructors]' for use in both:
# * Seeing the return type or constructors belonging to the same type.
# * Generating the types documentation, showing available constructors.
# TODO Tried using 'defaultdict(list)' with strange results, make it work.
tltypes = {}
tlfunctions = {}
type_to_constructors = {}
type_to_functions = {}
for tlobject in tlobjects:
# Select to which dictionary we want to store this type
dictionary = tlfunctions if tlobject.is_function else tltypes
if tlobject.result in dictionary:
dictionary[tlobject.result].append(tlobject)
d = type_to_functions if tlobject.is_function else type_to_constructors
if tlobject.result in d:
d[tlobject.result].append(tlobject)
else:
dictionary[tlobject.result] = [tlobject]
d[tlobject.result] = [tlobject]
for tltype, constructors in tltypes.items():
tltypes[tltype] = list(sorted(constructors, key=lambda c: c.name))
for t, cs in type_to_constructors.items():
type_to_constructors[t] = list(sorted(cs, key=lambda c: c.name))
method_causes_errors = defaultdict(list)
for error in errors:
for method in error.caused_by:
method_causes_errors[method].append(error)
# Since the output directory is needed everywhere partially apply it now
create_path_for = functools.partial(_get_create_path_for, output_dir)
path_for_type = functools.partial(get_path_for_type, output_dir)
for tlobject in tlobjects:
filename = get_create_path_for(tlobject)
filename = create_path_for(tlobject)
paths = {k: _get_relative_path(v, filename)
for k, v in original_paths.items()}
# Determine the relative paths for this file
paths = get_relative_paths(original_paths, relative_to=filename)
with DocsWriter(filename, type_to_path_function=get_path_for_type) \
as docs:
docs.write_head(
title=get_class_name(tlobject),
relative_css_path=paths['css'])
with DocsWriter(filename, type_to_path=path_for_type) as docs:
docs.write_head(title=tlobject.class_name,
relative_css_path=paths['css'])
# Create the menu (path to the current TLObject)
docs.set_menu_separator(paths['arrow'])
build_menu(docs, filename, relative_main_index=paths['index_all'])
_build_menu(docs, filename, output_dir,
relative_main_index=paths['index_all'])
# Create the page title
docs.write_title(get_class_name(tlobject))
docs.write_title(tlobject.class_name)
# Write the code definition for this TLObject
docs.write_code(tlobject)
@ -328,24 +284,24 @@ def generate_documentation(scheme_file):
inner = tlobject.result
docs.begin_table(column_count=1)
docs.add_row(inner, link=get_path_for_type(
docs.add_row(inner, link=path_for_type(
inner, relative_to=filename
))
docs.end_table()
constructors = tltypes.get(inner, [])
if not constructors:
cs = type_to_constructors.get(inner, [])
if not cs:
docs.write_text('This type has no instances available.')
elif len(constructors) == 1:
elif len(cs) == 1:
docs.write_text('This type can only be an instance of:')
else:
docs.write_text('This type can be an instance of either:')
docs.begin_table(column_count=2)
for constructor in constructors:
link = get_create_path_for(constructor)
link = get_relative_path(link, relative_to=filename)
docs.add_row(get_class_name(constructor), link=link)
for constructor in cs:
link = create_path_for(constructor)
link = _get_relative_path(link, relative_to=filename)
docs.add_row(constructor.class_name, link=link)
docs.end_table()
# Return (or similar types) written. Now parameters/members
@ -375,11 +331,11 @@ def generate_documentation(scheme_file):
else:
docs.add_row(
arg.type, align='center', link=
get_path_for_type(arg.type, relative_to=filename)
path_for_type(arg.type, relative_to=filename)
)
# Add a description for this argument
docs.add_row(get_description(arg))
docs.add_row(_get_description(arg))
docs.end_table()
else:
@ -388,6 +344,25 @@ def generate_documentation(scheme_file):
else:
docs.write_text('This type has no members.')
if tlobject.is_function:
docs.write_title('Known RPC errors')
errors = method_causes_errors[tlobject.fullname]
if not errors:
docs.write_text("This request can't cause any RPC error "
"as far as we know.")
else:
docs.write_text(
'This request can cause {} known error{}:'.format(
len(errors), '' if len(errors) == 1 else 's'
))
docs.begin_table(column_count=2)
for error in errors:
docs.add_row('<code>{}</code>'.format(error.name))
docs.add_row('{}.'.format(error.description))
docs.end_table()
docs.write_text('You can import these from '
'<code>telethon.errors</code>.')
# TODO Bit hacky, make everything like this? (prepending '../')
depth = '../' * (2 if tlobject.namespace else 1)
docs.add_script(src='prependPath = "{}";'.format(depth))
@ -396,55 +371,54 @@ def generate_documentation(scheme_file):
# Find all the available types (which are not the same as the constructors)
# Each type has a list of constructors associated to it, hence is a map
print('Generating types documentation...')
for tltype, constructors in tltypes.items():
filename = get_path_for_type(tltype)
for t, cs in type_to_constructors.items():
filename = path_for_type(t)
out_dir = os.path.dirname(filename)
if out_dir:
os.makedirs(out_dir, exist_ok=True)
# Since we don't have access to the full TLObject, split the type
if '.' in tltype:
namespace, name = tltype.split('.')
if '.' in t:
namespace, name = t.split('.')
else:
namespace, name = None, tltype
namespace, name = None, t
# Determine the relative paths for this file
paths = get_relative_paths(original_paths, relative_to=out_dir)
paths = {k: _get_relative_path(v, out_dir, folder=True)
for k, v in original_paths.items()}
with DocsWriter(filename, type_to_path_function=get_path_for_type) \
as docs:
with DocsWriter(filename, type_to_path=path_for_type) as docs:
docs.write_head(
title=get_class_name(name),
title=snake_to_camel_case(name),
relative_css_path=paths['css'])
docs.set_menu_separator(paths['arrow'])
build_menu(docs, filename, relative_main_index=paths['index_all'])
_build_menu(docs, filename, output_dir,
relative_main_index=paths['index_all'])
# Main file title
docs.write_title(get_class_name(name))
docs.write_title(snake_to_camel_case(name))
# List available constructors for this type
docs.write_title('Available constructors', level=3)
if not constructors:
if not cs:
docs.write_text('This type has no constructors available.')
elif len(constructors) == 1:
elif len(cs) == 1:
docs.write_text('This type has one constructor available.')
else:
docs.write_text('This type has %d constructors available.' %
len(constructors))
len(cs))
docs.begin_table(2)
for constructor in constructors:
for constructor in cs:
# Constructor full name
link = get_create_path_for(constructor)
link = get_relative_path(link, relative_to=filename)
docs.add_row(get_class_name(constructor), link=link)
link = create_path_for(constructor)
link = _get_relative_path(link, relative_to=filename)
docs.add_row(constructor.class_name, link=link)
docs.end_table()
# List all the methods which return this type
docs.write_title('Methods returning this type', level=3)
functions = tlfunctions.get(tltype, [])
functions = type_to_functions.get(t, [])
if not functions:
docs.write_text('No method returns this type.')
elif len(functions) == 1:
@ -457,16 +431,16 @@ def generate_documentation(scheme_file):
docs.begin_table(2)
for func in functions:
link = get_create_path_for(func)
link = get_relative_path(link, relative_to=filename)
docs.add_row(get_class_name(func), link=link)
link = create_path_for(func)
link = _get_relative_path(link, relative_to=filename)
docs.add_row(func.class_name, link=link)
docs.end_table()
# List all the methods which take this type as input
docs.write_title('Methods accepting this type as input', level=3)
other_methods = sorted(
(t for t in tlobjects
if any(tltype == a.type for a in t.args) and t.is_function),
if any(t == a.type for a in t.args) and t.is_function),
key=lambda t: t.name
)
if not other_methods:
@ -482,16 +456,16 @@ def generate_documentation(scheme_file):
docs.begin_table(2)
for ot in other_methods:
link = get_create_path_for(ot)
link = get_relative_path(link, relative_to=filename)
docs.add_row(get_class_name(ot), link=link)
link = create_path_for(ot)
link = _get_relative_path(link, relative_to=filename)
docs.add_row(ot.class_name, link=link)
docs.end_table()
# List every other type which has this type as a member
docs.write_title('Other types containing this type', level=3)
other_types = sorted(
(t for t in tlobjects
if any(tltype == a.type for a in t.args)
if any(t == a.type for a in t.args)
and not t.is_function
), key=lambda t: t.name
)
@ -509,9 +483,9 @@ def generate_documentation(scheme_file):
docs.begin_table(2)
for ot in other_types:
link = get_create_path_for(ot)
link = get_relative_path(link, relative_to=filename)
docs.add_row(get_class_name(ot), link=link)
link = create_path_for(ot)
link = _get_relative_path(link, relative_to=filename)
docs.add_row(ot.class_name, link=link)
docs.end_table()
docs.end_body()
@ -519,22 +493,21 @@ def generate_documentation(scheme_file):
# This will be done automatically and not taking into account any extra
# information that we have available, simply a file listing all the others
# accessible by clicking on their title
print('Generating indices...')
for folder in ['types', 'methods', 'constructors']:
generate_index(folder, original_paths)
_generate_index(os.path.join(output_dir, folder), original_paths,
output_dir)
# Write the final core index, the main index for the rest of files
layer = TLParser.find_layer(scheme_file)
types = set()
methods = []
constructors = []
cs = []
for tlobject in tlobjects:
if tlobject.is_function:
methods.append(tlobject)
else:
constructors.append(tlobject)
cs.append(tlobject)
if not is_core_type(tlobject.result):
if not tlobject.result.lower() in CORE_TYPES:
if re.search('^vector<', tlobject.result, re.IGNORECASE):
types.add(tlobject.result.split('<')[1].strip('>'))
else:
@ -542,41 +515,43 @@ def generate_documentation(scheme_file):
types = sorted(types)
methods = sorted(methods, key=lambda m: m.name)
constructors = sorted(constructors, key=lambda c: c.name)
cs = sorted(cs, key=lambda c: c.name)
def fmt(xs):
ys = {x: get_class_name(x) for x in xs} # cache TLObject: display
zs = {} # create a dict to hold those which have duplicated keys
for y in ys.values():
zs[y] = y in zs
return ', '.join(
'"{}.{}"'.format(x.namespace, ys[x])
if zs[ys[x]] and getattr(x, 'namespace', None)
else '"{}"'.format(ys[x]) for x in xs
)
request_names = fmt(methods)
type_names = fmt(types)
constructor_names = fmt(constructors)
def fmt(xs, formatter):
return ', '.join('"{}"'.format(formatter(x)) for x in xs)
request_urls = fmt(methods, get_create_path_for)
type_urls = fmt(types, get_path_for_type)
constructor_urls = fmt(constructors, get_create_path_for)
shutil.copy('../res/404.html', original_paths['404'])
copy_replace('../res/core.html', original_paths['index_all'], {
shutil.copy(os.path.join(input_res, '404.html'), original_paths['404'])
_copy_replace(os.path.join(input_res, 'core.html'),
original_paths['index_all'], {
'{type_count}': len(types),
'{method_count}': len(methods),
'{constructor_count}': len(tlobjects) - len(methods),
'{layer}': layer,
})
def fmt(xs):
zs = {} # create a dict to hold those which have duplicated keys
for x in xs:
zs[x.class_name] = x.class_name in zs
return ', '.join(
'"{}.{}"'.format(x.namespace, x.class_name)
if zs[x.class_name] and x.namespace
else '"{}"'.format(x.class_name) for x in xs
)
request_names = fmt(methods)
constructor_names = fmt(cs)
def fmt(xs, formatter):
return ', '.join('"{}"'.format(formatter(x)) for x in xs)
type_names = fmt(types, formatter=lambda x: x)
request_urls = fmt(methods, create_path_for)
type_urls = fmt(types, path_for_type)
constructor_urls = fmt(cs, create_path_for)
os.makedirs(os.path.abspath(os.path.join(
original_paths['search.js'], os.path.pardir
)), exist_ok=True)
copy_replace('../res/js/search.js', original_paths['search.js'], {
_copy_replace(os.path.join(input_res, 'js', 'search.js'),
original_paths['search.js'], {
'{request_names}': request_names,
'{type_names}': type_names,
'{constructor_names}': constructor_names,
@ -585,23 +560,16 @@ def generate_documentation(scheme_file):
'{constructor_urls}': constructor_urls
})
# Everything done
print('Documentation generated.')
def _copy_resources(res_dir, out_dir):
for dirname, files in [('css', ['docs.css']), ('img', ['arrow.svg'])]:
dirpath = os.path.join(out_dir, dirname)
os.makedirs(dirpath, exist_ok=True)
for file in files:
shutil.copy(os.path.join(res_dir, dirname, file), dirpath)
def copy_resources():
for d in ('css', 'img'):
os.makedirs(d, exist_ok=True)
shutil.copy('../res/img/arrow.svg', 'img')
shutil.copy('../res/css/docs.css', 'css')
if __name__ == '__main__':
os.makedirs('generated', exist_ok=True)
os.chdir('generated')
try:
generate_documentation('../../telethon_generator/scheme.tl')
copy_resources()
finally:
os.chdir(os.pardir)
def generate_docs(tlobjects, errors, layer, input_res, output_dir):
os.makedirs(output_dir, exist_ok=True)
_write_html_pages(tlobjects, errors, layer, input_res, output_dir)
_copy_resources(input_res, output_dir)

View File

@ -0,0 +1,52 @@
import itertools
def generate_errors(errors, f):
# Exact/regex match to create {CODE: ErrorClassName}
exact_match = []
regex_match = []
# Find out what subclasses to import and which to create
import_base, create_base = set(), {}
for error in errors:
if error.subclass_exists:
import_base.add(error.subclass)
else:
create_base[error.subclass] = error.int_code
if error.has_captures:
regex_match.append(error)
else:
exact_match.append(error)
# Imports and new subclass creation
f.write('from .rpc_base_errors import RPCError, {}\n'
.format(", ".join(sorted(import_base))))
for cls, int_code in sorted(create_base.items(), key=lambda t: t[1]):
f.write('\n\nclass {}(RPCError):\n code = {}\n'
.format(cls, int_code))
# Error classes generation
for error in errors:
f.write('\n\nclass {}({}):\n def __init__(self, **kwargs):\n'
' '.format(error.name, error.subclass))
if error.has_captures:
f.write("self.{} = int(kwargs.get('capture', 0))\n "
.format(error.capture_name))
f.write('super(Exception, self).__init__({}'
.format(repr(error.description)))
if error.has_captures:
f.write('.format(self.{})'.format(error.capture_name))
f.write(')\n')
# Create the actual {CODE: ErrorClassName} dict once classes are defined
# TODO Actually make a difference between regex/exact
f.write('\n\nrpc_errors_all = {\n')
for error in itertools.chain(regex_match, exact_match):
f.write(' {}: {},\n'.format(repr(error.pattern), error.name))
f.write('}\n')

View File

@ -0,0 +1,660 @@
import functools
import os
import re
import shutil
import struct
from collections import defaultdict
from zlib import crc32
from ..source_builder import SourceBuilder
from ..utils import snake_to_camel_case
AUTO_GEN_NOTICE = \
'"""File generated by TLObjects\' generator. All changes will be ERASED"""'
AUTO_CASTS = {
'InputPeer': 'utils.get_input_peer(client.get_input_entity({}))',
'InputChannel': 'utils.get_input_channel(client.get_input_entity({}))',
'InputUser': 'utils.get_input_user(client.get_input_entity({}))',
'InputMedia': 'utils.get_input_media({})',
'InputPhoto': 'utils.get_input_photo({})'
}
BASE_TYPES = ('string', 'bytes', 'int', 'long', 'int128',
'int256', 'double', 'Bool', 'true', 'date')
def _write_modules(out_dir, depth, namespace_tlobjects, type_constructors):
# namespace_tlobjects: {'namespace', [TLObject]}
os.makedirs(out_dir, exist_ok=True)
for ns, tlobjects in namespace_tlobjects.items():
file = os.path.join(out_dir, '{}.py'.format(ns or '__init__'))
with open(file, 'w', encoding='utf-8') as f,\
SourceBuilder(f) as builder:
builder.writeln(AUTO_GEN_NOTICE)
builder.writeln('from {}.tl.tlobject import TLObject', '.' * depth)
builder.writeln('from typing import Optional, List, '
'Union, TYPE_CHECKING')
# Add the relative imports to the namespaces,
# unless we already are in a namespace.
if not ns:
builder.writeln('from . import {}', ', '.join(
x for x in namespace_tlobjects.keys() if x
))
# Import 'os' for those needing access to 'os.urandom()'
# Currently only 'random_id' needs 'os' to be imported,
# for all those TLObjects with arg.can_be_inferred.
builder.writeln('import os')
# Import struct for the .__bytes__(self) serialization
builder.writeln('import struct')
tlobjects.sort(key=lambda x: x.name)
type_names = set()
type_defs = []
# Find all the types in this file and generate type definitions
# based on the types. The type definitions are written to the
# file at the end.
for t in tlobjects:
if not t.is_function:
type_name = t.result
if '.' in type_name:
type_name = type_name[type_name.rindex('.'):]
if type_name in type_names:
continue
type_names.add(type_name)
constructors = type_constructors[type_name]
if not constructors:
pass
elif len(constructors) == 1:
type_defs.append('Type{} = {}'.format(
type_name, constructors[0].class_name))
else:
type_defs.append('Type{} = Union[{}]'.format(
type_name, ','.join(c.class_name
for c in constructors)))
imports = {}
primitives = ('int', 'long', 'int128', 'int256', 'string',
'date', 'bytes', 'true')
# Find all the types in other files that are used in this file
# and generate the information required to import those types.
for t in tlobjects:
for arg in t.args:
name = arg.type
if not name or name in primitives:
continue
import_space = '{}.tl.types'.format('.' * depth)
if '.' in name:
namespace = name.split('.')[0]
name = name.split('.')[1]
import_space += '.{}'.format(namespace)
if name not in type_names:
type_names.add(name)
if name == 'date':
imports['datetime'] = ['datetime']
continue
elif import_space not in imports:
imports[import_space] = set()
imports[import_space].add('Type{}'.format(name))
# Add imports required for type checking
if imports:
builder.writeln('if TYPE_CHECKING:')
for namespace, names in imports.items():
builder.writeln('from {} import {}',
namespace, ', '.join(names))
builder.end_block()
# Generate the class for every TLObject
for t in tlobjects:
_write_source_code(t, builder, type_constructors)
builder.current_indent = 0
# Write the type definitions generated earlier.
builder.writeln('')
for line in type_defs:
builder.writeln(line)
def _write_source_code(tlobject, builder, type_constructors):
"""
Writes the source code corresponding to the given TLObject
by making use of the ``builder`` `SourceBuilder`.
Additional information such as file path depth and
the ``Type: [Constructors]`` must be given for proper
importing and documentation strings.
"""
_write_class_init(tlobject, type_constructors, builder)
_write_resolve(tlobject, builder)
_write_to_dict(tlobject, builder)
_write_to_bytes(tlobject, builder)
_write_from_reader(tlobject, builder)
_write_on_response(tlobject, builder)
def _write_class_init(tlobject, type_constructors, builder):
builder.writeln()
builder.writeln()
builder.writeln('class {}(TLObject):', tlobject.class_name)
# Class-level variable to store its Telegram's constructor ID
builder.writeln('CONSTRUCTOR_ID = {:#x}', tlobject.id)
builder.writeln('SUBCLASS_OF_ID = {:#x}',
crc32(tlobject.result.encode('ascii')))
builder.writeln()
# Convert the args to string parameters, flags having =None
args = [(a.name if not a.is_flag and not a.can_be_inferred
else '{}=None'.format(a.name)) for a in tlobject.real_args]
# Write the __init__ function
builder.writeln('def __init__({}):', ', '.join(['self'] + args))
if tlobject.real_args:
# Write the docstring, to know the type of the args
builder.writeln('"""')
for arg in tlobject.real_args:
if not arg.flag_indicator:
builder.writeln(':param {} {}:', arg.type_hint(), arg.name)
builder.current_indent -= 1 # It will auto-indent (':')
# We also want to know what type this request returns
# or to which type this constructor belongs to
builder.writeln()
if tlobject.is_function:
builder.write(':returns {}: ', tlobject.result)
else:
builder.write('Constructor for {}: ', tlobject.result)
constructors = type_constructors[tlobject.result]
if not constructors:
builder.writeln('This type has no constructors.')
elif len(constructors) == 1:
builder.writeln('Instance of {}.',
constructors[0].class_name)
else:
builder.writeln('Instance of either {}.', ', '.join(
c.class_name for c in constructors))
builder.writeln('"""')
builder.writeln('super().__init__()')
# Functions have a result object and are confirmed by default
if tlobject.is_function:
builder.writeln('self.result = None')
builder.writeln('self.content_related = True')
# Set the arguments
if tlobject.real_args:
builder.writeln()
for arg in tlobject.real_args:
if not arg.can_be_inferred:
builder.writeln('self.{0} = {0} # type: {1}',
arg.name, arg.type_hint())
# Currently the only argument that can be
# inferred are those called 'random_id'
elif arg.name == 'random_id':
# Endianness doesn't really matter, and 'big' is shorter
code = "int.from_bytes(os.urandom({}), 'big', signed=True)" \
.format(8 if arg.type == 'long' else 4)
if arg.is_vector:
# Currently for the case of "messages.forwardMessages"
# Ensure we can infer the length from id:Vector<>
if not next(a for a in tlobject.real_args
if a.name == 'id').is_vector:
raise ValueError(
'Cannot infer list of random ids for ', tlobject
)
code = '[{} for _ in range(len(id))]'.format(code)
builder.writeln(
"self.random_id = random_id if random_id "
"is not None else {}", code
)
else:
raise ValueError('Cannot infer a value for ', arg)
builder.end_block()
def _write_resolve(tlobject, builder):
if any(arg.type in AUTO_CASTS for arg in tlobject.real_args):
builder.writeln('def resolve(self, client, utils):')
for arg in tlobject.real_args:
ac = AUTO_CASTS.get(arg.type, None)
if not ac:
continue
if arg.is_vector:
builder.write('self.{0} = [{1} for _x in self.{0}]',
arg.name, ac.format('_x'))
else:
builder.write('self.{} = {}', arg.name,
ac.format('self.' + arg.name))
builder.writeln(' if self.{} else None'.format(arg.name)
if arg.is_flag else '')
builder.end_block()
def _write_to_dict(tlobject, builder):
builder.writeln('def to_dict(self):')
builder.writeln('return {')
builder.current_indent += 1
builder.write("'_': '{}'", tlobject.class_name)
for arg in tlobject.real_args:
builder.writeln(',')
builder.write("'{}': ", arg.name)
if arg.type in BASE_TYPES:
if arg.is_vector:
builder.write('[] if self.{0} is None else self.{0}[:]',
arg.name)
else:
builder.write('self.{}', arg.name)
else:
if arg.is_vector:
builder.write(
'[] if self.{0} is None else [None '
'if x is None else x.to_dict() for x in self.{0}]',
arg.name
)
else:
builder.write(
'None if self.{0} is None else self.{0}.to_dict()',
arg.name
)
builder.writeln()
builder.current_indent -= 1
builder.writeln("}")
builder.end_block()
def _write_to_bytes(tlobject, builder):
builder.writeln('def __bytes__(self):')
# Some objects require more than one flag parameter to be set
# at the same time. In this case, add an assertion.
repeated_args = defaultdict(list)
for arg in tlobject.args:
if arg.is_flag:
repeated_args[arg.flag_index].append(arg)
for ra in repeated_args.values():
if len(ra) > 1:
cnd1 = ('(self.{0} or self.{0} is not None)'
.format(a.name) for a in ra)
cnd2 = ('(self.{0} is None or self.{0} is False)'
.format(a.name) for a in ra)
builder.writeln(
"assert ({}) or ({}), '{} parameters must all "
"be False-y (like None) or all me True-y'",
' and '.join(cnd1), ' and '.join(cnd2),
', '.join(a.name for a in ra)
)
builder.writeln("return b''.join((")
builder.current_indent += 1
# First constructor code, we already know its bytes
builder.writeln('{},', repr(struct.pack('<I', tlobject.id)))
for arg in tlobject.args:
if _write_arg_to_bytes(builder, arg, tlobject.args):
builder.writeln(',')
builder.current_indent -= 1
builder.writeln('))')
builder.end_block()
def _write_from_reader(tlobject, builder):
builder.writeln('@classmethod')
builder.writeln('def from_reader(cls, reader):')
for arg in tlobject.args:
_write_arg_read_code(builder, arg, tlobject.args, name='_' + arg.name)
builder.writeln('return cls({})', ', '.join(
'{0}=_{0}'.format(a.name) for a in tlobject.real_args))
def _write_on_response(tlobject, builder):
# Only requests can have a different response that's not their
# serialized body, that is, we'll be setting their .result.
#
# The default behaviour is reading a TLObject too, so no need
# to override it unless necessary.
if not tlobject.is_function:
return
# https://core.telegram.org/mtproto/serialize#boxed-and-bare-types
# TL;DR; boxed types start with uppercase always, so we can use
# this to check whether everything in it is boxed or not.
#
# Currently only un-boxed responses are Vector<int>/Vector<long>.
# If this weren't the case, we should check upper case after
# max(index('<'), index('.')) (and if it is, it's boxed, so return).
m = re.match(r'Vector<(int|long)>', tlobject.result)
if not m:
return
builder.end_block()
builder.writeln('def on_response(self, reader):')
builder.writeln('reader.read_int() # Vector ID')
builder.writeln('self.result = [reader.read_{}() '
'for _ in range(reader.read_int())]', m.group(1))
def _write_arg_to_bytes(builder, arg, args, name=None):
"""
Writes the .__bytes__() code for the given argument
:param builder: The source code builder
:param arg: The argument to write
:param args: All the other arguments in TLObject same __bytes__.
This is required to determine the flags value
:param name: The name of the argument. Defaults to "self.argname"
This argument is an option because it's required when
writing Vectors<>
"""
if arg.generic_definition:
return # Do nothing, this only specifies a later type
if name is None:
name = 'self.{}'.format(arg.name)
# The argument may be a flag, only write if it's not None AND
# if it's not a True type.
# True types are not actually sent, but instead only used to
# determine the flags.
if arg.is_flag:
if arg.type == 'true':
return # Exit, since True type is never written
elif arg.is_vector:
# Vector flags are special since they consist of 3 values,
# so we need an extra join here. Note that empty vector flags
# should NOT be sent either!
builder.write("b'' if {0} is None or {0} is False "
"else b''.join((", name)
else:
builder.write("b'' if {0} is None or {0} is False "
"else (", name)
if arg.is_vector:
if arg.use_vector_id:
# vector code, unsigned 0x1cb5c415 as little endian
builder.write(r"b'\x15\xc4\xb5\x1c',")
builder.write("struct.pack('<i', len({})),", name)
# Cannot unpack the values for the outer tuple through *[(
# since that's a Python >3.5 feature, so add another join.
builder.write("b''.join(")
# Temporary disable .is_vector, not to enter this if again
# Also disable .is_flag since it's not needed per element
old_flag = arg.is_flag
arg.is_vector = arg.is_flag = False
_write_arg_to_bytes(builder, arg, args, name='x')
arg.is_vector = True
arg.is_flag = old_flag
builder.write(' for x in {})', name)
elif arg.flag_indicator:
# Calculate the flags with those items which are not None
if not any(f.is_flag for f in args):
# There's a flag indicator, but no flag arguments so it's 0
builder.write(r"b'\0\0\0\0'")
else:
builder.write("struct.pack('<I', ")
builder.write(
' | '.join('(0 if {0} is None or {0} is False else {1})'
.format('self.{}'.format(flag.name),
1 << flag.flag_index)
for flag in args if flag.is_flag)
)
builder.write(')')
elif 'int' == arg.type:
# struct.pack is around 4 times faster than int.to_bytes
builder.write("struct.pack('<i', {})", name)
elif 'long' == arg.type:
builder.write("struct.pack('<q', {})", name)
elif 'int128' == arg.type:
builder.write("{}.to_bytes(16, 'little', signed=True)", name)
elif 'int256' == arg.type:
builder.write("{}.to_bytes(32, 'little', signed=True)", name)
elif 'double' == arg.type:
builder.write("struct.pack('<d', {})", name)
elif 'string' == arg.type:
builder.write('TLObject.serialize_bytes({})', name)
elif 'Bool' == arg.type:
# 0x997275b5 if boolean else 0xbc799737
builder.write(r"b'\xb5ur\x99' if {} else b'7\x97y\xbc'", name)
elif 'true' == arg.type:
pass # These are actually NOT written! Only used for flags
elif 'bytes' == arg.type:
builder.write('TLObject.serialize_bytes({})', name)
elif 'date' == arg.type: # Custom format
builder.write('TLObject.serialize_datetime({})', name)
else:
# Else it may be a custom type
builder.write('bytes({})', name)
if arg.is_flag:
builder.write(')')
if arg.is_vector:
builder.write(')') # We were using a tuple
return True # Something was written
def _write_arg_read_code(builder, arg, args, name):
"""
Writes the read code for the given argument, setting the
arg.name variable to its read value.
:param builder: The source code builder
:param arg: The argument to write
:param args: All the other arguments in TLObject same on_send.
This is required to determine the flags value
:param name: The name of the argument. Defaults to "self.argname"
This argument is an option because it's required when
writing Vectors<>
"""
if arg.generic_definition:
return # Do nothing, this only specifies a later type
# The argument may be a flag, only write that flag was given!
was_flag = False
if arg.is_flag:
# Treat 'true' flags as a special case, since they're true if
# they're set, and nothing else needs to actually be read.
if 'true' == arg.type:
builder.writeln('{} = bool(flags & {})',
name, 1 << arg.flag_index)
return
was_flag = True
builder.writeln('if flags & {}:', 1 << arg.flag_index)
# Temporary disable .is_flag not to enter this if
# again when calling the method recursively
arg.is_flag = False
if arg.is_vector:
if arg.use_vector_id:
# We have to read the vector's constructor ID
builder.writeln("reader.read_int()")
builder.writeln('{} = []', name)
builder.writeln('for _ in range(reader.read_int()):')
# Temporary disable .is_vector, not to enter this if again
arg.is_vector = False
_write_arg_read_code(builder, arg, args, name='_x')
builder.writeln('{}.append(_x)', name)
arg.is_vector = True
elif arg.flag_indicator:
# Read the flags, which will indicate what items we should read next
builder.writeln('flags = reader.read_int()')
builder.writeln()
elif 'int' == arg.type:
builder.writeln('{} = reader.read_int()', name)
elif 'long' == arg.type:
builder.writeln('{} = reader.read_long()', name)
elif 'int128' == arg.type:
builder.writeln('{} = reader.read_large_int(bits=128)', name)
elif 'int256' == arg.type:
builder.writeln('{} = reader.read_large_int(bits=256)', name)
elif 'double' == arg.type:
builder.writeln('{} = reader.read_double()', name)
elif 'string' == arg.type:
builder.writeln('{} = reader.tgread_string()', name)
elif 'Bool' == arg.type:
builder.writeln('{} = reader.tgread_bool()', name)
elif 'true' == arg.type:
# Arbitrary not-None value, don't actually read "true" flags
builder.writeln('{} = True', name)
elif 'bytes' == arg.type:
builder.writeln('{} = reader.tgread_bytes()', name)
elif 'date' == arg.type: # Custom format
builder.writeln('{} = reader.tgread_date()', name)
else:
# Else it may be a custom type
if not arg.skip_constructor_id:
builder.writeln('{} = reader.tgread_object()', name)
else:
# Import the correct type inline to avoid cyclic imports.
# There may be better solutions so that we can just access
# all the types before the files have been parsed, but I
# don't know of any.
sep_index = arg.type.find('.')
if sep_index == -1:
ns, t = '.', arg.type
else:
ns, t = '.' + arg.type[:sep_index], arg.type[sep_index+1:]
class_name = snake_to_camel_case(t)
# There would be no need to import the type if we're in the
# file with the same namespace, but since it does no harm
# and we don't have information about such thing in the
# method we just ignore that case.
builder.writeln('from {} import {}', ns, class_name)
builder.writeln('{} = {}.from_reader(reader)',
name, class_name)
# End vector and flag blocks if required (if we opened them before)
if arg.is_vector:
builder.end_block()
if was_flag:
builder.current_indent -= 1
builder.writeln('else:')
builder.writeln('{} = None', name)
builder.current_indent -= 1
# Restore .is_flag
arg.is_flag = True
def _write_all_tlobjects(tlobjects, layer, builder):
builder.writeln(AUTO_GEN_NOTICE)
builder.writeln()
builder.writeln('from . import types, functions')
builder.writeln()
# Create a constant variable to indicate which layer this is
builder.writeln('LAYER = {}', layer)
builder.writeln()
# Then create the dictionary containing constructor_id: class
builder.writeln('tlobjects = {')
builder.current_indent += 1
# Fill the dictionary (0x1a2b3c4f: tl.full.type.path.Class)
for tlobject in tlobjects:
builder.write('{:#010x}: ', tlobject.id)
builder.write('functions' if tlobject.is_function else 'types')
if tlobject.namespace:
builder.write('.' + tlobject.namespace)
builder.writeln('.{},', tlobject.class_name)
builder.current_indent -= 1
builder.writeln('}')
def generate_tlobjects(tlobjects, layer, import_depth, output_dir):
get_file = functools.partial(os.path.join, output_dir)
os.makedirs(get_file('functions'), exist_ok=True)
os.makedirs(get_file('types'), exist_ok=True)
# Group everything by {namespace: [tlobjects]} to generate __init__.py
namespace_functions = defaultdict(list)
namespace_types = defaultdict(list)
# Group {type: [constructors]} to generate the documentation
type_constructors = defaultdict(list)
for tlobject in tlobjects:
if tlobject.is_function:
namespace_functions[tlobject.namespace].append(tlobject)
else:
namespace_types[tlobject.namespace].append(tlobject)
type_constructors[tlobject.result].append(tlobject)
_write_modules(get_file('functions'), import_depth,
namespace_functions, type_constructors)
_write_modules(get_file('types'), import_depth,
namespace_types, type_constructors)
filename = os.path.join(get_file('all_tlobjects.py'))
with open(filename, 'w', encoding='utf-8') as file:
with SourceBuilder(file) as builder:
_write_all_tlobjects(tlobjects, layer, builder)
def clean_tlobjects(output_dir):
get_file = functools.partial(os.path.join, output_dir)
for d in ('functions', 'types'):
d = get_file(d)
if os.path.isdir(d):
shutil.rmtree(d)
tl = get_file('all_tlobjects.py')
if os.path.isfile(tl):
os.remove(tl)

View File

@ -1,3 +0,0 @@
from .source_builder import SourceBuilder
from .tl_parser import TLParser
from .tl_object import TLObject

View File

@ -1,323 +0,0 @@
import re
from zlib import crc32
class TLObject:
""".tl core types IDs (such as vector, booleans, etc.)"""
CORE_TYPES = (
0xbc799737, # boolFalse#bc799737 = Bool;
0x997275b5, # boolTrue#997275b5 = Bool;
0x3fedd339, # true#3fedd339 = True;
0x1cb5c415, # vector#1cb5c415 {t:Type} # [ t ] = Vector t;
)
def __init__(self, fullname, object_id, args, result, is_function):
"""
Initializes a new TLObject, given its properties.
Usually, this will be called from `from_tl` instead
:param fullname: The fullname of the TL object (namespace.name)
The namespace can be omitted
:param object_id: The hexadecimal string representing the object ID
:param args: The arguments, if any, of the TL object
:param result: The result type of the TL object
:param is_function: Is the object a function or a type?
"""
# The name can or not have a namespace
if '.' in fullname:
self.namespace = fullname.split('.')[0]
self.name = fullname.split('.')[1]
else:
self.namespace = None
self.name = fullname
self.args = args
self.result = result
self.is_function = is_function
# The ID should be an hexadecimal string or None to be inferred
if object_id is None:
self.id = self.infer_id()
else:
self.id = int(object_id, base=16)
assert self.id == self.infer_id(),\
'Invalid inferred ID for ' + repr(self)
@staticmethod
def from_tl(tl, is_function):
"""Returns a TL object from the given TL scheme line"""
# Regex to match the whole line
match = re.match(r'''
^ # We want to match from the beginning to the end
([\w.]+) # The .tl object can contain alpha_name or namespace.alpha_name
(?:
\# # After the name, comes the ID of the object
([0-9a-f]+) # The constructor ID is in hexadecimal form
)? # If no constructor ID was given, CRC32 the 'tl' to determine it
(?:\s # After that, we want to match its arguments (name:type)
{? # For handling the start of the '{X:Type}' case
\w+ # The argument name will always be an alpha-only name
: # Then comes the separator between name:type
[\w\d<>#.?!]+ # The type is slightly more complex, since it's alphanumeric and it can
# also have Vector<type>, flags:# and flags.0?default, plus :!X as type
}? # For handling the end of the '{X:Type}' case
)* # Match 0 or more arguments
\s # Leave a space between the arguments and the equal
=
\s # Leave another space between the equal and the result
([\w\d<>#.?]+) # The result can again be as complex as any argument type
;$ # Finally, the line should always end with ;
''', tl, re.IGNORECASE | re.VERBOSE)
if match is None:
# Probably "vector#1cb5c415 {t:Type} # [ t ] = Vector t;"
raise ValueError('Cannot parse TLObject', tl)
# Sub-regex to match the arguments (sadly, it cannot be embedded in the first regex)
args_match = re.findall(r'''
({)? # We may or may not capture the opening brace
(\w+) # First we capture any alpha name with length 1 or more
: # Which is separated from its type by a colon
([\w\d<>#.?!]+) # The type is slightly more complex, since it's alphanumeric and it can
# also have Vector<type>, flags:# and flags.0?default, plus :!X as type
(})? # We may or not capture the closing brace
''', tl, re.IGNORECASE | re.VERBOSE)
# Retrieve the matched arguments
args = [TLArg(name, arg_type, brace != '')
for brace, name, arg_type, _ in args_match]
# And initialize the TLObject
return TLObject(
fullname=match.group(1),
object_id=match.group(2),
args=args,
result=match.group(3),
is_function=is_function)
def class_name(self):
"""Gets the class name following the Python style guidelines"""
return self.class_name_for(self.name, self.is_function)
@staticmethod
def class_name_for(typename, is_function=False):
"""Gets the class name following the Python style guidelines"""
# Courtesy of http://stackoverflow.com/a/31531797/4759433
result = re.sub(r'_([a-z])', lambda m: m.group(1).upper(), typename)
result = result[:1].upper() + result[1:].replace('_', '')
# If it's a function, let it end with "Request" to identify them
if is_function:
result += 'Request'
return result
def sorted_args(self):
"""Returns the arguments properly sorted and ready to plug-in
into a Python's method header (i.e., flags and those which
can be inferred will go last so they can default =None)
"""
return sorted(self.args,
key=lambda x: x.is_flag or x.can_be_inferred)
def is_core_type(self):
"""Determines whether the TLObject is a "core type"
(and thus should be embedded in the generated code) or not"""
return self.id in TLObject.CORE_TYPES
def __repr__(self, ignore_id=False):
fullname = ('{}.{}'.format(self.namespace, self.name)
if self.namespace is not None else self.name)
if getattr(self, 'id', None) is None or ignore_id:
hex_id = ''
else:
# Skip 0x and add 0's for padding
hex_id = '#' + hex(self.id)[2:].rjust(8, '0')
if self.args:
args = ' ' + ' '.join([repr(arg) for arg in self.args])
else:
args = ''
return '{}{}{} = {}'.format(fullname, hex_id, args, self.result)
def infer_id(self):
representation = self.__repr__(ignore_id=True)
# Clean the representation
representation = representation\
.replace(':bytes ', ':string ')\
.replace('?bytes ', '?string ')\
.replace('<', ' ').replace('>', '')\
.replace('{', '').replace('}', '')
representation = re.sub(
r' \w+:flags\.\d+\?true',
r'',
representation
)
return crc32(representation.encode('ascii'))
def __str__(self):
fullname = ('{}.{}'.format(self.namespace, self.name)
if self.namespace is not None else self.name)
# Some arguments are not valid for being represented, such as the flag indicator or generic definition
# (these have no explicit values until used)
valid_args = [arg for arg in self.args
if not arg.flag_indicator and not arg.generic_definition]
args = ', '.join(['{}={{}}'.format(arg.name) for arg in valid_args])
# Since Python's default representation for lists is using repr(), we need to str() manually on every item
args_format = ', '.join(
['str(self.{})'.format(arg.name) if not arg.is_vector else
'None if not self.{0} else [str(_) for _ in self.{0}]'.format(
arg.name) for arg in valid_args])
return ("'({} (ID: {}) = ({}))'.format({})"
.format(fullname, hex(self.id), args, args_format))
class TLArg:
def __init__(self, name, arg_type, generic_definition):
"""
Initializes a new .tl argument
:param name: The name of the .tl argument
:param arg_type: The type of the .tl argument
:param generic_definition: Is the argument a generic definition?
(i.e. {X:Type})
"""
if name == 'self': # This very only name is restricted
self.name = 'is_self'
else:
self.name = name
# Default values
self.is_vector = False
self.is_flag = False
self.skip_constructor_id = False
self.flag_index = -1
# Special case: some types can be inferred, which makes it
# less annoying to type. Currently the only type that can
# be inferred is if the name is 'random_id', to which a
# random ID will be assigned if left as None (the default)
self.can_be_inferred = name == 'random_id'
# The type can be an indicator that other arguments will be flags
if arg_type == '#':
self.flag_indicator = True
self.type = None
self.is_generic = False
else:
self.flag_indicator = False
self.is_generic = arg_type.startswith('!')
# Strip the exclamation mark always to have only the name
self.type = arg_type.lstrip('!')
# The type may be a flag (flags.IDX?REAL_TYPE)
# Note that 'flags' is NOT the flags name; this is determined by a previous argument
# However, we assume that the argument will always be called 'flags'
flag_match = re.match(r'flags.(\d+)\?([\w<>.]+)', self.type)
if flag_match:
self.is_flag = True
self.flag_index = int(flag_match.group(1))
# Update the type to match the exact type, not the "flagged" one
self.type = flag_match.group(2)
# Then check if the type is a Vector<REAL_TYPE>
vector_match = re.match(r'[Vv]ector<([\w\d.]+)>', self.type)
if vector_match:
self.is_vector = True
# If the type's first letter is not uppercase, then
# it is a constructor and we use (read/write) its ID
# as pinpointed on issue #81.
self.use_vector_id = self.type[0] == 'V'
# Update the type to match the one inside the vector
self.type = vector_match.group(1)
# See use_vector_id. An example of such case is ipPort in
# help.configSpecial
if self.type.split('.')[-1][0].islower():
self.skip_constructor_id = True
# The name may contain "date" in it, if this is the case and the type is "int",
# we can safely assume that this should be treated as a "date" object.
# Note that this is not a valid Telegram object, but it's easier to work with
if self.type == 'int' and (
re.search(r'(\b|_)date\b', name) or
name in ('expires', 'expires_at', 'was_online')):
self.type = 'date'
self.generic_definition = generic_definition
def doc_type_hint(self):
result = {
'int': 'int',
'long': 'int',
'int128': 'int',
'int256': 'int',
'string': 'str',
'date': 'datetime.datetime | None', # None date = 0 timestamp
'bytes': 'bytes',
'true': 'bool',
}.get(self.type, self.type)
if self.is_vector:
result = 'list[{}]'.format(result)
if self.is_flag and self.type != 'date':
result += ' | None'
return result
def python_type_hint(self):
type = self.type
if '.' in type:
type = type.split('.')[1]
result = {
'int': 'int',
'long': 'int',
'int128': 'int',
'int256': 'int',
'string': 'str',
'date': 'Optional[datetime]', # None date = 0 timestamp
'bytes': 'bytes',
'true': 'bool',
}.get(type, "Type{}".format(type))
if self.is_vector:
result = 'List[{}]'.format(result)
if self.is_flag and type != 'date':
result = 'Optional[{}]'.format(result)
return result
def __str__(self):
# Find the real type representation by updating it as required
real_type = self.type
if self.flag_indicator:
real_type = '#'
if self.is_vector:
if self.use_vector_id:
real_type = 'Vector<{}>'.format(real_type)
else:
real_type = 'vector<{}>'.format(real_type)
if self.is_generic:
real_type = '!{}'.format(real_type)
if self.is_flag:
real_type = 'flags.{}?{}'.format(self.flag_index, real_type)
if self.generic_definition:
return '{{{}:{}}}'.format(self.name, real_type)
else:
return '{}:{}'.format(self.name, real_type)
def __repr__(self):
# Get rid of our special type
return str(self)\
.replace(':date', ':int')\
.replace('?date', '?int')

View File

@ -1,51 +0,0 @@
import re
from .tl_object import TLObject
class TLParser:
"""Class used to parse .tl files"""
@staticmethod
def parse_file(file_path, ignore_core=False):
"""This method yields TLObjects from a given .tl file"""
with open(file_path, encoding='utf-8') as file:
# Start by assuming that the next found line won't
# be a function (and will hence be a type)
is_function = False
# Read all the lines from the .tl file
for line in file:
# Strip comments from the line
comment_index = line.find('//')
if comment_index != -1:
line = line[:comment_index]
line = line.strip()
if line:
# Check whether the line is a type change
# (types <-> functions) or not
match = re.match('---(\w+)---', line)
if match:
following_types = match.group(1)
is_function = following_types == 'functions'
else:
try:
result = TLObject.from_tl(line, is_function)
if not ignore_core or not result.is_core_type():
yield result
except ValueError as e:
if 'vector#1cb5c415' not in str(e):
raise
@staticmethod
def find_layer(file_path):
"""Finds the layer used on the specified scheme.tl file"""
layer_regex = re.compile(r'^//\s*LAYER\s*(\d+)$')
with open(file_path, encoding='utf-8') as file:
for line in file:
match = layer_regex.match(line)
if match:
return int(match.group(1))

View File

@ -0,0 +1,2 @@
from .errors import Error, parse_errors
from .tlobject import TLObject, parse_tl, find_layer

View File

@ -0,0 +1,146 @@
import json
import re
from collections import defaultdict
from ..utils import snake_to_camel_case
# Core base classes depending on the integer error code
KNOWN_BASE_CLASSES = {
303: 'InvalidDCError',
400: 'BadRequestError',
401: 'UnauthorizedError',
403: 'ForbiddenError',
404: 'NotFoundError',
406: 'AuthKeyError',
420: 'FloodError',
500: 'ServerError',
}
# The API doesn't return the code for some (vital) errors. They are
# all assumed to be 400, except these well-known ones that aren't.
KNOWN_CODES = {
'ACTIVE_USER_REQUIRED': 401,
'AUTH_KEY_UNREGISTERED': 401,
'USER_DEACTIVATED': 401
}
# Give better semantic names to some captures
CAPTURE_NAMES = {
'FloodWaitError': 'seconds',
'FloodTestPhoneWaitError': 'seconds',
'FileMigrateError': 'new_dc',
'NetworkMigrateError': 'new_dc',
'PhoneMigrateError': 'new_dc',
'UserMigrateError': 'new_dc',
'FilePartMissingError': 'which'
}
def _get_class_name(error_code):
"""
Gets the corresponding class name for the given error code,
this either being an integer (thus base error name) or str.
"""
if isinstance(error_code, int):
return KNOWN_BASE_CLASSES.get(
error_code, 'RPCError' + str(error_code).replace('-', 'Neg')
)
return snake_to_camel_case(
error_code.replace('FIRSTNAME', 'FIRST_NAME').lower(), suffix='Error')
class Error:
def __init__(self, int_code, str_code, description, caused_by):
# TODO Some errors have the same str_code but different int_code
# Should these be split into different files or doesn't really matter?
# Telegram isn't exactly consistent with returned errors anyway.
self.int_code = int_code
self.str_code = str_code
self.subclass = _get_class_name(int_code)
self.subclass_exists = int_code in KNOWN_BASE_CLASSES
self.description = description
self.caused_by = list(sorted(caused_by))
self.has_captures = '_X' in str_code
if self.has_captures:
self.name = _get_class_name(str_code.replace('_X', ''))
self.pattern = str_code.replace('_X', r'_(\d+)')
self.capture_name = CAPTURE_NAMES.get(self.name, 'x')
else:
self.name = _get_class_name(str_code)
self.pattern = str_code
self.capture_name = None
def parse_errors(json_file, descriptions_file):
"""
Parses the given JSON file in the following format:
{
"ok": true,
"human_result": {"int_code": ["descriptions"]},
"result": {"int_code": {"full_method_name": ["str_error"]}}
}
The descriptions file, which has precedence over the JSON's human_result,
should have the following format:
# comment
str_error=Description
The method yields `Error` instances as a result.
"""
with open(json_file, encoding='utf-8') as f:
data = json.load(f)
errors = defaultdict(set)
error_to_method = defaultdict(set)
# PWRTelegram's API doesn't return all errors, which we do need here.
# Add some special known-cases manually first.
errors[420].update((
'FLOOD_WAIT_X', 'FLOOD_TEST_PHONE_WAIT_X'
))
errors[401].update((
'AUTH_KEY_INVALID', 'SESSION_EXPIRED', 'SESSION_REVOKED'
))
errors[303].update((
'FILE_MIGRATE_X', 'PHONE_MIGRATE_X',
'NETWORK_MIGRATE_X', 'USER_MIGRATE_X'
))
for int_code, method_errors in data['result'].items():
for method, error_list in method_errors.items():
for error in error_list:
error = re.sub('_\d+', '_X', error).upper()
errors[int(int_code)].add(error)
error_to_method[error].add(method)
# Some errors are in the human result, but not with a code. Assume 400
for error in data['human_result']:
if error[0] != '-' and not error.isdigit():
error = re.sub('_\d+', '_X', error).upper()
if not any(error in es for es in errors.values()):
errors[KNOWN_CODES.get(error, 400)].add(error)
# Prefer the descriptions that are related with Telethon way of coding
# to those that PWRTelegram's API provides.
telethon_descriptions = {}
with open(descriptions_file, encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
equal = line.index('=')
message, description = line[:equal], line[equal + 1:]
telethon_descriptions[message.rstrip()] = description.lstrip()
for int_code, error_set in errors.items():
for str_code in sorted(error_set):
description = telethon_descriptions.get(
str_code, '\n'.join(data['human_result'].get(
str_code, ['No description known']
))
)
yield Error(
int_code=int_code,
str_code=str_code,
description=description,
caused_by=error_to_method[str_code]
)

View File

@ -0,0 +1,274 @@
import re
from zlib import crc32
from ..utils import snake_to_camel_case
CORE_TYPES = (
0xbc799737, # boolFalse#bc799737 = Bool;
0x997275b5, # boolTrue#997275b5 = Bool;
0x3fedd339, # true#3fedd339 = True;
0x1cb5c415, # vector#1cb5c415 {t:Type} # [ t ] = Vector t;
)
class TLObject:
def __init__(self, fullname, object_id, args, result, is_function):
"""
Initializes a new TLObject, given its properties.
:param fullname: The fullname of the TL object (namespace.name)
The namespace can be omitted.
:param object_id: The hexadecimal string representing the object ID
:param args: The arguments, if any, of the TL object
:param result: The result type of the TL object
:param is_function: Is the object a function or a type?
"""
# The name can or not have a namespace
self.fullname = fullname
if '.' in fullname:
self.namespace, self.name = fullname.split('.', maxsplit=1)
else:
self.namespace, self.name = None, fullname
self.args = args
self.result = result
self.is_function = is_function
self.id = None
if object_id is None:
self.id = self.infer_id()
else:
self.id = int(object_id, base=16)
assert self.id == self.infer_id(),\
'Invalid inferred ID for ' + repr(self)
self.class_name = snake_to_camel_case(
self.name, suffix='Request' if self.is_function else '')
self.real_args = list(a for a in self.sorted_args() if not
(a.flag_indicator or a.generic_definition))
def sorted_args(self):
"""Returns the arguments properly sorted and ready to plug-in
into a Python's method header (i.e., flags and those which
can be inferred will go last so they can default =None)
"""
return sorted(self.args,
key=lambda x: x.is_flag or x.can_be_inferred)
def __repr__(self, ignore_id=False):
if self.id is None or ignore_id:
hex_id = ''
else:
hex_id = '#{:08x}'.format(self.id)
if self.args:
args = ' ' + ' '.join([repr(arg) for arg in self.args])
else:
args = ''
return '{}{}{} = {}'.format(self.fullname, hex_id, args, self.result)
def infer_id(self):
representation = self.__repr__(ignore_id=True)
representation = representation\
.replace(':bytes ', ':string ')\
.replace('?bytes ', '?string ')\
.replace('<', ' ').replace('>', '')\
.replace('{', '').replace('}', '')
representation = re.sub(
r' \w+:flags\.\d+\?true',
r'',
representation
)
return crc32(representation.encode('ascii'))
class TLArg:
def __init__(self, name, arg_type, generic_definition):
"""
Initializes a new .tl argument
:param name: The name of the .tl argument
:param arg_type: The type of the .tl argument
:param generic_definition: Is the argument a generic definition?
(i.e. {X:Type})
"""
self.name = 'is_self' if name == 'self' else name
# Default values
self.is_vector = False
self.is_flag = False
self.skip_constructor_id = False
self.flag_index = -1
# Special case: some types can be inferred, which makes it
# less annoying to type. Currently the only type that can
# be inferred is if the name is 'random_id', to which a
# random ID will be assigned if left as None (the default)
self.can_be_inferred = name == 'random_id'
# The type can be an indicator that other arguments will be flags
if arg_type == '#':
self.flag_indicator = True
self.type = None
self.is_generic = False
else:
self.flag_indicator = False
self.is_generic = arg_type.startswith('!')
# Strip the exclamation mark always to have only the name
self.type = arg_type.lstrip('!')
# The type may be a flag (flags.IDX?REAL_TYPE)
# Note that 'flags' is NOT the flags name; this
# is determined by a previous argument
# However, we assume that the argument will always be called 'flags'
flag_match = re.match(r'flags.(\d+)\?([\w<>.]+)', self.type)
if flag_match:
self.is_flag = True
self.flag_index = int(flag_match.group(1))
# Update the type to match the exact type, not the "flagged" one
self.type = flag_match.group(2)
# Then check if the type is a Vector<REAL_TYPE>
vector_match = re.match(r'[Vv]ector<([\w\d.]+)>', self.type)
if vector_match:
self.is_vector = True
# If the type's first letter is not uppercase, then
# it is a constructor and we use (read/write) its ID
# as pinpointed on issue #81.
self.use_vector_id = self.type[0] == 'V'
# Update the type to match the one inside the vector
self.type = vector_match.group(1)
# See use_vector_id. An example of such case is ipPort in
# help.configSpecial
if self.type.split('.')[-1][0].islower():
self.skip_constructor_id = True
# The name may contain "date" in it, if this is the case and the type is "int",
# we can safely assume that this should be treated as a "date" object.
# Note that this is not a valid Telegram object, but it's easier to work with
if self.type == 'int' and (
re.search(r'(\b|_)date\b', name) or
name in ('expires', 'expires_at', 'was_online')):
self.type = 'date'
self.generic_definition = generic_definition
def type_hint(self):
type = self.type
if '.' in type:
type = type.split('.')[1]
result = {
'int': 'int',
'long': 'int',
'int128': 'int',
'int256': 'int',
'string': 'str',
'date': 'Optional[datetime]', # None date = 0 timestamp
'bytes': 'bytes',
'true': 'bool',
}.get(type, "Type{}".format(type))
if self.is_vector:
result = 'List[{}]'.format(result)
if self.is_flag and type != 'date':
result = 'Optional[{}]'.format(result)
return result
def __str__(self):
# Find the real type representation by updating it as required
real_type = self.type
if self.flag_indicator:
real_type = '#'
if self.is_vector:
if self.use_vector_id:
real_type = 'Vector<{}>'.format(real_type)
else:
real_type = 'vector<{}>'.format(real_type)
if self.is_generic:
real_type = '!{}'.format(real_type)
if self.is_flag:
real_type = 'flags.{}?{}'.format(self.flag_index, real_type)
if self.generic_definition:
return '{{{}:{}}}'.format(self.name, real_type)
else:
return '{}:{}'.format(self.name, real_type)
def __repr__(self):
return str(self).replace(':date', ':int').replace('?date', '?int')
def _from_line(line, is_function):
match = re.match(
r'^([\w.]+)' # 'name'
r'(?:#([0-9a-fA-F]+))?' # '#optionalcode'
r'(?:\s{?\w+:[\w\d<>#.?!]+}?)*' # '{args:.0?type}'
r'\s=\s' # ' = '
r'([\w\d<>#.?]+);$', # '<result.type>;'
line
)
if match is None:
# Probably "vector#1cb5c415 {t:Type} # [ t ] = Vector t;"
raise ValueError('Cannot parse TLObject {}'.format(line))
args_match = re.findall(
r'({)?'
r'(\w+)'
r':'
r'([\w\d<>#.?!]+)'
r'}?',
line
)
return TLObject(
fullname=match.group(1),
object_id=match.group(2),
result=match.group(3),
is_function=is_function,
args=[TLArg(name, arg_type, brace != '')
for brace, name, arg_type in args_match]
)
def parse_tl(file_path, ignore_core=False):
"""This method yields TLObjects from a given .tl file."""
with open(file_path, encoding='utf-8') as file:
is_function = False
for line in file:
comment_index = line.find('//')
if comment_index != -1:
line = line[:comment_index]
line = line.strip()
if not line:
continue
match = re.match('---(\w+)---', line)
if match:
following_types = match.group(1)
is_function = following_types == 'functions'
continue
try:
result = _from_line(line, is_function)
if not ignore_core or result.id not in CORE_TYPES:
yield result
except ValueError as e:
if 'vector#1cb5c415' not in str(e):
raise
def find_layer(file_path):
"""Finds the layer used on the specified scheme.tl file."""
layer_regex = re.compile(r'^//\s*LAYER\s*(\d+)$')
with open(file_path, encoding='utf-8') as file:
for line in file:
match = layer_regex.match(line)
if match:
return int(match.group(1))

View File

@ -1,762 +0,0 @@
import os
import re
import shutil
import struct
from zlib import crc32
from collections import defaultdict
from .parser import SourceBuilder, TLParser, TLObject
AUTO_GEN_NOTICE = \
'"""File generated by TLObjects\' generator. All changes will be ERASED"""'
AUTO_CASTS = {
'InputPeer': 'utils.get_input_peer(client.get_input_entity({}))',
'InputChannel': 'utils.get_input_channel(client.get_input_entity({}))',
'InputUser': 'utils.get_input_user(client.get_input_entity({}))',
'InputMedia': 'utils.get_input_media({})',
'InputPhoto': 'utils.get_input_photo({})'
}
class TLGenerator:
def __init__(self, output_dir):
self.output_dir = output_dir
def _get_file(self, *paths):
"""Wrapper around ``os.path.join()`` with output as first path."""
return os.path.join(self.output_dir, *paths)
def _rm_if_exists(self, filename):
"""Recursively deletes the given filename if it exists."""
file = self._get_file(filename)
if os.path.exists(file):
if os.path.isdir(file):
shutil.rmtree(file)
else:
os.remove(file)
def tlobjects_exist(self):
"""
Determines whether the TLObjects were previously
generated (hence exist) or not.
"""
return os.path.isfile(self._get_file('all_tlobjects.py'))
def clean_tlobjects(self):
"""Cleans the automatically generated TLObjects from disk."""
for name in ('functions', 'types', 'all_tlobjects.py'):
self._rm_if_exists(name)
def generate_tlobjects(self, scheme_file, import_depth):
"""
Generates all the TLObjects from the ``scheme_file`` to
``tl/functions`` and ``tl/types``.
"""
# First ensure that the required parent directories exist
os.makedirs(self._get_file('functions'), exist_ok=True)
os.makedirs(self._get_file('types'), exist_ok=True)
# Step 0: Cache the parsed file on a tuple
tlobjects = tuple(TLParser.parse_file(scheme_file, ignore_core=True))
# Step 1: Group everything by {namespace: [tlobjects]} so we can
# easily generate __init__.py files with all the TLObjects on them.
namespace_functions = defaultdict(list)
namespace_types = defaultdict(list)
# Make use of this iteration to also store 'Type: [Constructors]',
# used when generating the documentation for the classes.
type_constructors = defaultdict(list)
for tlobject in tlobjects:
if tlobject.is_function:
namespace_functions[tlobject.namespace].append(tlobject)
else:
namespace_types[tlobject.namespace].append(tlobject)
type_constructors[tlobject.result].append(tlobject)
# Step 2: Generate the actual code
self._write_init_py(
self._get_file('functions'), import_depth,
namespace_functions, type_constructors
)
self._write_init_py(
self._get_file('types'), import_depth,
namespace_types, type_constructors
)
# Step 4: Once all the objects have been generated,
# we can now group them in a single file
filename = os.path.join(self._get_file('all_tlobjects.py'))
with open(filename, 'w', encoding='utf-8') as file,\
SourceBuilder(file) as builder:
builder.writeln(AUTO_GEN_NOTICE)
builder.writeln()
builder.writeln('from . import types, functions')
builder.writeln()
# Create a constant variable to indicate which layer this is
builder.writeln('LAYER = {}', TLParser.find_layer(scheme_file))
builder.writeln()
# Then create the dictionary containing constructor_id: class
builder.writeln('tlobjects = {')
builder.current_indent += 1
# Fill the dictionary (0x1a2b3c4f: tl.full.type.path.Class)
for tlobject in tlobjects:
builder.write('{:#010x}: ', tlobject.id)
builder.write('functions' if tlobject.is_function else 'types')
if tlobject.namespace:
builder.write('.' + tlobject.namespace)
builder.writeln('.{},', tlobject.class_name())
builder.current_indent -= 1
builder.writeln('}')
@staticmethod
def _write_init_py(out_dir, depth, namespace_tlobjects, type_constructors):
# namespace_tlobjects: {'namespace', [TLObject]}
os.makedirs(out_dir, exist_ok=True)
for ns, tlobjects in namespace_tlobjects.items():
file = os.path.join(out_dir, ns + '.py' if ns else '__init__.py')
with open(file, 'w', encoding='utf-8') as f, \
SourceBuilder(f) as builder:
builder.writeln(AUTO_GEN_NOTICE)
# Both types and functions inherit from the TLObject class
# so they all can be serialized and sent, however, only the
# functions are "content_related".
builder.writeln(
'from {}.tl.tlobject import TLObject', '.' * depth
)
builder.writeln('from typing import Optional, List, '
'Union, TYPE_CHECKING')
# Add the relative imports to the namespaces,
# unless we already are in a namespace.
if not ns:
builder.writeln('from . import {}', ', '.join(
x for x in namespace_tlobjects.keys() if x
))
# Import 'os' for those needing access to 'os.urandom()'
# Currently only 'random_id' needs 'os' to be imported,
# for all those TLObjects with arg.can_be_inferred.
builder.writeln('import os')
# Import struct for the .__bytes__(self) serialization
builder.writeln('import struct')
tlobjects.sort(key=lambda x: x.name)
type_names = set()
type_defs = []
# Find all the types in this file and generate type definitions
# based on the types. The type definitions are written to the
# file at the end.
for t in tlobjects:
if not t.is_function:
type_name = t.result
if '.' in type_name:
type_name = type_name[type_name.rindex('.'):]
if type_name in type_names:
continue
type_names.add(type_name)
constructors = type_constructors[type_name]
if not constructors:
pass
elif len(constructors) == 1:
type_defs.append('Type{} = {}'.format(
type_name, constructors[0].class_name()))
else:
type_defs.append('Type{} = Union[{}]'.format(
type_name, ','.join(c.class_name()
for c in constructors)))
imports = {}
primitives = ('int', 'long', 'int128', 'int256', 'string',
'date', 'bytes', 'true')
# Find all the types in other files that are used in this file
# and generate the information required to import those types.
for t in tlobjects:
for arg in t.args:
name = arg.type
if not name or name in primitives:
continue
import_space = '{}.tl.types'.format('.' * depth)
if '.' in name:
namespace = name.split('.')[0]
name = name.split('.')[1]
import_space += '.{}'.format(namespace)
if name not in type_names:
type_names.add(name)
if name == 'date':
imports['datetime'] = ['datetime']
continue
elif import_space not in imports:
imports[import_space] = set()
imports[import_space].add('Type{}'.format(name))
# Add imports required for type checking
if imports:
builder.writeln('if TYPE_CHECKING:')
for namespace, names in imports.items():
builder.writeln('from {} import {}',
namespace, ', '.join(names))
builder.end_block()
# Generate the class for every TLObject
for t in tlobjects:
TLGenerator._write_source_code(
t, builder, depth, type_constructors
)
builder.current_indent = 0
# Write the type definitions generated earlier.
builder.writeln('')
for line in type_defs:
builder.writeln(line)
@staticmethod
def _write_source_code(tlobject, builder, depth, type_constructors):
"""
Writes the source code corresponding to the given TLObject
by making use of the ``builder`` `SourceBuilder`.
Additional information such as file path depth and
the ``Type: [Constructors]`` must be given for proper
importing and documentation strings.
"""
builder.writeln()
builder.writeln()
builder.writeln('class {}(TLObject):', tlobject.class_name())
# Class-level variable to store its Telegram's constructor ID
builder.writeln('CONSTRUCTOR_ID = {:#x}', tlobject.id)
builder.writeln('SUBCLASS_OF_ID = {:#x}',
crc32(tlobject.result.encode('ascii')))
builder.writeln()
# Flag arguments must go last
args = [
a for a in tlobject.sorted_args()
if not a.flag_indicator and not a.generic_definition
]
# Convert the args to string parameters, flags having =None
args = [
(a.name if not a.is_flag and not a.can_be_inferred
else '{}=None'.format(a.name))
for a in args
]
# Write the __init__ function
if args:
builder.writeln('def __init__(self, {}):', ', '.join(args))
else:
builder.writeln('def __init__(self):')
# Now update args to have the TLObject arguments, _except_
# those which are calculated on send or ignored, this is
# flag indicator and generic definitions.
#
# We don't need the generic definitions in Python
# because arguments can be any type
args = [arg for arg in tlobject.args
if not arg.flag_indicator and
not arg.generic_definition]
if args:
# Write the docstring, to know the type of the args
builder.writeln('"""')
for arg in args:
if not arg.flag_indicator:
builder.writeln(':param {} {}:',
arg.doc_type_hint(), arg.name)
builder.current_indent -= 1 # It will auto-indent (':')
# We also want to know what type this request returns
# or to which type this constructor belongs to
builder.writeln()
if tlobject.is_function:
builder.write(':returns {}: ', tlobject.result)
else:
builder.write('Constructor for {}: ', tlobject.result)
constructors = type_constructors[tlobject.result]
if not constructors:
builder.writeln('This type has no constructors.')
elif len(constructors) == 1:
builder.writeln('Instance of {}.',
constructors[0].class_name())
else:
builder.writeln('Instance of either {}.', ', '.join(
c.class_name() for c in constructors))
builder.writeln('"""')
builder.writeln('super().__init__()')
# Functions have a result object and are confirmed by default
if tlobject.is_function:
builder.writeln('self.result = None')
builder.writeln(
'self.content_related = True')
# Set the arguments
if args:
# Leave an empty line if there are any args
builder.writeln()
for arg in args:
if not arg.can_be_inferred:
builder.writeln('self.{0} = {0} # type: {1}',
arg.name, arg.python_type_hint())
continue
# Currently the only argument that can be
# inferred are those called 'random_id'
if arg.name == 'random_id':
# Endianness doesn't really matter, and 'big' is shorter
code = "int.from_bytes(os.urandom({}), 'big', signed=True)" \
.format(8 if arg.type == 'long' else 4)
if arg.is_vector:
# Currently for the case of "messages.forwardMessages"
# Ensure we can infer the length from id:Vector<>
if not next(
a for a in args if a.name == 'id').is_vector:
raise ValueError(
'Cannot infer list of random ids for ', tlobject
)
code = '[{} for _ in range(len(id))]'.format(code)
builder.writeln(
"self.random_id = random_id if random_id "
"is not None else {}", code
)
else:
raise ValueError('Cannot infer a value for ', arg)
builder.end_block()
# Write the resolve(self, client, utils) method
if any(arg.type in AUTO_CASTS for arg in args):
builder.writeln('def resolve(self, client, utils):')
for arg in args:
ac = AUTO_CASTS.get(arg.type, None)
if ac:
TLGenerator._write_self_assign(builder, arg, ac)
builder.end_block()
# Write the to_dict(self) method
builder.writeln('def to_dict(self):')
builder.writeln('return {')
builder.current_indent += 1
base_types = ('string', 'bytes', 'int', 'long', 'int128',
'int256', 'double', 'Bool', 'true', 'date')
builder.write("'_': '{}'", tlobject.class_name())
for arg in args:
builder.writeln(',')
builder.write("'{}': ", arg.name)
if arg.type in base_types:
if arg.is_vector:
builder.write('[] if self.{0} is None else self.{0}[:]',
arg.name)
else:
builder.write('self.{}', arg.name)
else:
if arg.is_vector:
builder.write(
'[] if self.{0} is None else [None '
'if x is None else x.to_dict() for x in self.{0}]',
arg.name
)
else:
builder.write(
'None if self.{0} is None else self.{0}.to_dict()',
arg.name
)
builder.writeln()
builder.current_indent -= 1
builder.writeln("}")
builder.end_block()
# Write the .__bytes__() function
builder.writeln('def __bytes__(self):')
# Some objects require more than one flag parameter to be set
# at the same time. In this case, add an assertion.
repeated_args = defaultdict(list)
for arg in tlobject.args:
if arg.is_flag:
repeated_args[arg.flag_index].append(arg)
for ra in repeated_args.values():
if len(ra) > 1:
cnd1 = ('(self.{0} or self.{0} is not None)'
.format(a.name) for a in ra)
cnd2 = ('(self.{0} is None or self.{0} is False)'
.format(a.name) for a in ra)
builder.writeln(
"assert ({}) or ({}), '{} parameters must all "
"be False-y (like None) or all me True-y'",
' and '.join(cnd1), ' and '.join(cnd2),
', '.join(a.name for a in ra)
)
builder.writeln("return b''.join((")
builder.current_indent += 1
# First constructor code, we already know its bytes
builder.writeln('{},', repr(struct.pack('<I', tlobject.id)))
for arg in tlobject.args:
if TLGenerator.write_to_bytes(builder, arg, tlobject.args):
builder.writeln(',')
builder.current_indent -= 1
builder.writeln('))')
builder.end_block()
# Write the static from_reader(reader) function
builder.writeln('@staticmethod')
builder.writeln('def from_reader(reader):')
for arg in tlobject.args:
TLGenerator.write_read_code(
builder, arg, tlobject.args, name='_' + arg.name
)
builder.writeln(
'return {}({})',
tlobject.class_name(),
', '.join(
'{0}=_{0}'.format(a.name) for a in tlobject.sorted_args()
if not a.flag_indicator and not a.generic_definition
)
)
# Only requests can have a different response that's not their
# serialized body, that is, we'll be setting their .result.
#
# The default behaviour is reading a TLObject too, so no need
# to override it unless necessary.
if tlobject.is_function and not TLGenerator._is_boxed(tlobject.result):
builder.end_block()
builder.writeln('def on_response(self, reader):')
TLGenerator.write_request_result_code(builder, tlobject)
@staticmethod
def _is_boxed(type_):
# https://core.telegram.org/mtproto/serialize#boxed-and-bare-types
# TL;DR; boxed types start with uppercase always, so we can use
# this to check whether everything in it is boxed or not.
#
# The API always returns a boxed type, but it may inside a Vector<>
# or a namespace, and the Vector may have a not-boxed type. For this
# reason we find whatever index, '<' or '.'. If neither are present
# we will get -1, and the 0th char is always upper case thus works.
# For Vector types and namespaces, it will check in the right place.
check_after = max(type_.find('<'), type_.find('.'))
return type_[check_after + 1].isupper()
@staticmethod
def _write_self_assign(builder, arg, get_input_code):
"""Writes self.arg = input.format(self.arg), considering vectors."""
if arg.is_vector:
builder.write('self.{0} = [{1} for _x in self.{0}]',
arg.name, get_input_code.format('_x'))
else:
builder.write('self.{} = {}',
arg.name, get_input_code.format('self.' + arg.name))
builder.writeln(
' if self.{} else None'.format(arg.name) if arg.is_flag else ''
)
@staticmethod
def get_file_name(tlobject, add_extension=False):
"""Gets the file name in file_name_format.py for the given TLObject"""
# Courtesy of http://stackoverflow.com/a/1176023/4759433
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', tlobject.name)
result = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
if add_extension:
return result + '.py'
else:
return result
@staticmethod
def write_to_bytes(builder, arg, args, name=None):
"""
Writes the .__bytes__() code for the given argument
:param builder: The source code builder
:param arg: The argument to write
:param args: All the other arguments in TLObject same __bytes__.
This is required to determine the flags value
:param name: The name of the argument. Defaults to "self.argname"
This argument is an option because it's required when
writing Vectors<>
"""
if arg.generic_definition:
return # Do nothing, this only specifies a later type
if name is None:
name = 'self.{}'.format(arg.name)
# The argument may be a flag, only write if it's not None AND
# if it's not a True type.
# True types are not actually sent, but instead only used to
# determine the flags.
if arg.is_flag:
if arg.type == 'true':
return # Exit, since True type is never written
elif arg.is_vector:
# Vector flags are special since they consist of 3 values,
# so we need an extra join here. Note that empty vector flags
# should NOT be sent either!
builder.write("b'' if {0} is None or {0} is False "
"else b''.join((", name)
else:
builder.write("b'' if {0} is None or {0} is False "
"else (", name)
if arg.is_vector:
if arg.use_vector_id:
# vector code, unsigned 0x1cb5c415 as little endian
builder.write(r"b'\x15\xc4\xb5\x1c',")
builder.write("struct.pack('<i', len({})),", name)
# Cannot unpack the values for the outer tuple through *[(
# since that's a Python >3.5 feature, so add another join.
builder.write("b''.join(")
# Temporary disable .is_vector, not to enter this if again
# Also disable .is_flag since it's not needed per element
old_flag = arg.is_flag
arg.is_vector = arg.is_flag = False
TLGenerator.write_to_bytes(builder, arg, args, name='x')
arg.is_vector = True
arg.is_flag = old_flag
builder.write(' for x in {})', name)
elif arg.flag_indicator:
# Calculate the flags with those items which are not None
if not any(f.is_flag for f in args):
# There's a flag indicator, but no flag arguments so it's 0
builder.write(r"b'\0\0\0\0'")
else:
builder.write("struct.pack('<I', ")
builder.write(
' | '.join('(0 if {0} is None or {0} is False else {1})'
.format('self.{}'.format(flag.name),
1 << flag.flag_index)
for flag in args if flag.is_flag)
)
builder.write(')')
elif 'int' == arg.type:
# struct.pack is around 4 times faster than int.to_bytes
builder.write("struct.pack('<i', {})", name)
elif 'long' == arg.type:
builder.write("struct.pack('<q', {})", name)
elif 'int128' == arg.type:
builder.write("{}.to_bytes(16, 'little', signed=True)", name)
elif 'int256' == arg.type:
builder.write("{}.to_bytes(32, 'little', signed=True)", name)
elif 'double' == arg.type:
builder.write("struct.pack('<d', {})", name)
elif 'string' == arg.type:
builder.write('TLObject.serialize_bytes({})', name)
elif 'Bool' == arg.type:
# 0x997275b5 if boolean else 0xbc799737
builder.write(r"b'\xb5ur\x99' if {} else b'7\x97y\xbc'", name)
elif 'true' == arg.type:
pass # These are actually NOT written! Only used for flags
elif 'bytes' == arg.type:
builder.write('TLObject.serialize_bytes({})', name)
elif 'date' == arg.type: # Custom format
builder.write('TLObject.serialize_datetime({})', name)
else:
# Else it may be a custom type
builder.write('bytes({})', name)
if arg.is_flag:
builder.write(')')
if arg.is_vector:
builder.write(')') # We were using a tuple
return True # Something was written
@staticmethod
def write_read_code(builder, arg, args, name):
"""
Writes the read code for the given argument, setting the
arg.name variable to its read value.
:param builder: The source code builder
:param arg: The argument to write
:param args: All the other arguments in TLObject same on_send.
This is required to determine the flags value
:param name: The name of the argument. Defaults to "self.argname"
This argument is an option because it's required when
writing Vectors<>
"""
if arg.generic_definition:
return # Do nothing, this only specifies a later type
# The argument may be a flag, only write that flag was given!
was_flag = False
if arg.is_flag:
# Treat 'true' flags as a special case, since they're true if
# they're set, and nothing else needs to actually be read.
if 'true' == arg.type:
builder.writeln('{} = bool(flags & {})',
name, 1 << arg.flag_index)
return
was_flag = True
builder.writeln('if flags & {}:', 1 << arg.flag_index)
# Temporary disable .is_flag not to enter this if
# again when calling the method recursively
arg.is_flag = False
if arg.is_vector:
if arg.use_vector_id:
# We have to read the vector's constructor ID
builder.writeln("reader.read_int()")
builder.writeln('{} = []', name)
builder.writeln('for _ in range(reader.read_int()):')
# Temporary disable .is_vector, not to enter this if again
arg.is_vector = False
TLGenerator.write_read_code(builder, arg, args, name='_x')
builder.writeln('{}.append(_x)', name)
arg.is_vector = True
elif arg.flag_indicator:
# Read the flags, which will indicate what items we should read next
builder.writeln('flags = reader.read_int()')
builder.writeln()
elif 'int' == arg.type:
builder.writeln('{} = reader.read_int()', name)
elif 'long' == arg.type:
builder.writeln('{} = reader.read_long()', name)
elif 'int128' == arg.type:
builder.writeln('{} = reader.read_large_int(bits=128)', name)
elif 'int256' == arg.type:
builder.writeln('{} = reader.read_large_int(bits=256)', name)
elif 'double' == arg.type:
builder.writeln('{} = reader.read_double()', name)
elif 'string' == arg.type:
builder.writeln('{} = reader.tgread_string()', name)
elif 'Bool' == arg.type:
builder.writeln('{} = reader.tgread_bool()', name)
elif 'true' == arg.type:
# Arbitrary not-None value, don't actually read "true" flags
builder.writeln('{} = True', name)
elif 'bytes' == arg.type:
builder.writeln('{} = reader.tgread_bytes()', name)
elif 'date' == arg.type: # Custom format
builder.writeln('{} = reader.tgread_date()', name)
else:
# Else it may be a custom type
if not arg.skip_constructor_id:
builder.writeln('{} = reader.tgread_object()', name)
else:
# Import the correct type inline to avoid cyclic imports.
# There may be better solutions so that we can just access
# all the types before the files have been parsed, but I
# don't know of any.
sep_index = arg.type.find('.')
if sep_index == -1:
ns, t = '.', arg.type
else:
ns, t = '.' + arg.type[:sep_index], arg.type[sep_index+1:]
class_name = TLObject.class_name_for(t)
# There would be no need to import the type if we're in the
# file with the same namespace, but since it does no harm
# and we don't have information about such thing in the
# method we just ignore that case.
builder.writeln('from {} import {}', ns, class_name)
builder.writeln('{} = {}.from_reader(reader)',
name, class_name)
# End vector and flag blocks if required (if we opened them before)
if arg.is_vector:
builder.end_block()
if was_flag:
builder.current_indent -= 1
builder.writeln('else:')
builder.writeln('{} = None', name)
builder.current_indent -= 1
# Restore .is_flag
arg.is_flag = True
@staticmethod
def write_request_result_code(builder, tlobject):
"""
Writes the receive code for the given function
:param builder: The source code builder
:param tlobject: The TLObject for which the 'self.result = '
will be written
"""
if tlobject.result.startswith('Vector<'):
# Vector results are a bit special since they can also be composed
# of integer values and such; however, the result of requests is
# not parsed as arguments are and it's a bit harder to tell which
# is which.
if tlobject.result == 'Vector<int>':
builder.writeln('reader.read_int() # Vector ID')
builder.writeln('count = reader.read_int()')
builder.writeln(
'self.result = [reader.read_int() for _ in range(count)]'
)
elif tlobject.result == 'Vector<long>':
builder.writeln('reader.read_int() # Vector ID')
builder.writeln('count = reader.read_long()')
builder.writeln(
'self.result = [reader.read_long() for _ in range(count)]'
)
else:
builder.writeln('self.result = reader.tgread_vector()')
else:
builder.writeln('self.result = reader.tgread_object()')

View File

@ -0,0 +1,8 @@
import re
def snake_to_camel_case(name, suffix=None):
# Courtesy of http://stackoverflow.com/a/31531797/4759433
result = re.sub(r'_([a-z])', lambda m: m.group(1).upper(), name)
result = result[:1].upper() + result[1:].replace('_', '')
return result + suffix if suffix else result