Port tl-types fromm grammers

This commit is contained in:
Lonami Exo 2023-07-05 22:35:06 +02:00
parent fed06f40ed
commit 7b707cfc6c
18 changed files with 311 additions and 19 deletions

5
.gitignore vendored
View File

@ -6,7 +6,12 @@ __pycache__/
.mypy_cache/ .mypy_cache/
dist/ dist/
build/ build/
**/tl/__init__.py
**/tl/layer.py **/tl/layer.py
**/tl/abcs/ **/tl/abcs/
**/tl/functions/ **/tl/functions/
**/tl/types/ **/tl/types/
**/mtproto/layer.py
**/mtproto/abcs/
**/mtproto/functions/
**/mtproto/types/

5
DEVELOPING.md Normal file
View File

@ -0,0 +1,5 @@
```sh
pip install -e generator/
python -m telethon_generator.codegen api.tl telethon/src/_impl/tl
python -m telethon_generator.codegen mtproto.tl telethon/src/_impl/tl/mtproto
```

21
client/LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2016-Present LonamiWebs
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

3
client/README.md Normal file
View File

@ -0,0 +1,3 @@
# Telethon
Full-featured Telegram client library.

41
client/pyproject.toml Normal file
View File

@ -0,0 +1,41 @@
[project]
name = "Telethon"
description = "Full-featured Telegram client library"
authors = [
{ name="Lonami", email="totufals@hotmail.com" },
]
readme = "README.md"
license = {file = "LICENSE"}
requires-python = ">=3.8"
keywords = ["telegram", "chat", "messaging", "mtproto", "telethon"]
classifiers = [
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Topic :: Communications :: Chat",
"Typing :: Typed",
]
dependencies = [
"pyaes~=1.6",
"rsa~=4.9",
]
dynamic = ["version"]
[project.optional-dependencies]
cryptg = ["cryptg~=0.4"]
[project.urls]
"Homepage" = "https://telethon.dev/"
"Source" = "https://telethon.dev/code/"
"Documentation" = "https://telethon.dev/docs/"
"Bug Tracker" = "https://telethon.dev/issues/"
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[tool.setuptools.dynamic]
version = {attr = "telethon.__version__"}

View File

@ -0,0 +1 @@
from .version import __version__

View File

@ -0,0 +1,4 @@
from . import abcs, core, functions, mtproto, types
from .layer import LAYER, TYPE_MAPPING
__all__ = ["abcs", "core", "functions", "mtproto", "types", "LAYER", "TYPE_MAPPING"]

View File

@ -0,0 +1,5 @@
from .reader import Reader
from .request import Request
from .serializable import Serializable, serialize_bytes_to
__all__ = ["Reader", "Request", "Serializable", "serialize_bytes_to"]

View File

@ -0,0 +1,64 @@
import struct
from typing import TYPE_CHECKING, Any, Type, TypeVar
if TYPE_CHECKING:
from .serializable import Serializable
T = TypeVar("T", bound="Serializable")
class Reader:
__slots__ = ("_buffer", "_pos", "_view")
def __init__(self, buffer: bytes) -> None:
self._buffer = buffer
self._pos = 0
self._view = memoryview(self._buffer)
def read(self, n: int) -> bytes:
self._pos += n
return self._view[self._pos - n : n]
def read_fmt(self, fmt: str, size: int) -> tuple[Any, ...]:
assert struct.calcsize(fmt) == size
self._pos += size
return struct.unpack(fmt, self._view[self._pos - size : self._pos])
def read_bytes(self) -> bytes:
if self._buffer[self._pos] == 254:
self._pos += 4
(length,) = struct.unpack(
"<i", self._buffer[self._pos - 3 : self._pos] + b"\0"
)
padding = length % 4
else:
length = self._buffer[self._pos]
padding = (length + 1) % 4
self._pos += 1
self._pos += length
data = self._view[self._pos - length : self._pos]
if padding > 0:
self._pos += 4 - padding
return data
@staticmethod
def _get_ty(_: int) -> Type["Serializable"]:
# Implementation replaced during import to prevent cycles,
# without the performance hit of having the import inside.
raise NotImplementedError
def read_serializable(self, cls: Type[T]) -> T:
# Calls to this method likely need to ignore "type-abstract".
# See https://github.com/python/mypy/issues/4717.
# Unfortunately `typing.cast` would add a tiny amount of runtime overhead
# which cannot be removed with optimization enabled.
self._pos += 4
cid = struct.unpack("<I", self._view[self._pos - 4 : self._pos])[0]
ty = self._get_ty(cid)
if ty is None:
raise ValueError(f"No type found for constructor ID: {cid:x}")
assert issubclass(ty, cls)
return ty._read_from(self)

View File

@ -0,0 +1,20 @@
import struct
class Request:
__slots__ = "_body"
def __init__(self, body: bytes):
self._body = body
@property
def constructor_id(self) -> int:
try:
cid = struct.unpack("<i", self._body[:4])[0]
assert isinstance(cid, int)
return cid
except struct.error:
return 0
def debug_name(self) -> str:
return f"request#{self.constructor_id:x}"

View File

@ -0,0 +1,52 @@
import abc
import struct
from typing import Self, Tuple
from .reader import Reader
class Serializable(abc.ABC):
__slots__: Tuple[str, ...] = ()
@classmethod
@abc.abstractmethod
def constructor_id(cls) -> int:
pass
@classmethod
def _read_from(cls, reader: Reader) -> Self:
return reader.read_serializable(cls)
def _write_boxed_to(self, buffer: bytearray) -> None:
buffer += struct.pack("<I", self.constructor_id())
self._write_to(buffer)
@abc.abstractmethod
def _write_to(self, buffer: bytearray) -> None:
pass
@classmethod
def from_bytes(cls, blob: bytes) -> Self:
return Reader(blob).read_serializable(cls)
def __bytes__(self) -> bytes:
buffer = bytearray()
self._write_boxed_to(buffer)
return bytes(buffer)
def __repr__(self) -> str:
attrs = ", ".join(repr(getattr(self, attr)) for attr in self.__slots__)
return f"{self.__class__.__name__}({attrs})"
def serialize_bytes_to(buffer: bytearray, data: bytes) -> None:
length = len(data)
if length < 0xFE:
buffer += struct.pack("<B", length)
length += 1
else:
buffer += b"\xfe"
buffer += struct.pack("<i", length)[:-1]
buffer += data
buffer += bytes((4 - (length % 4)) % 4)

View File

@ -0,0 +1 @@
from ..core import *

View File

@ -0,0 +1,2 @@
# https://peps.python.org/pep-0440/
__version__ = "2.0.0a0"

View File

@ -0,0 +1,26 @@
from pytest import mark
from telethon._impl.tl.core import Reader
@mark.parametrize(
("string", "prefix", "suffix"),
[
("", b"\00", b"\00\x00\x00"),
("Hi", b"\02", b"\00"),
("Hi!", b"\03", b""),
("Hello", b"\05", b"\00\x00"),
("Hello, world!", b"\x0d", b"\00\x00"),
(
"This is a very long string, and it has to be longer than 253 \
characters, which are quite a few but we can make it! Although, \
it is quite challenging. The quick brown fox jumps over the lazy \
fox. There is still some more text we need to type. Oh, this \
sentence made it past!",
b"\xfe\x11\x01\x00",
b"\x00\x00\x00",
),
],
)
def test_string(string: str, prefix: bytes, suffix: bytes) -> None:
data = prefix + string.encode("ascii") + suffix
assert str(Reader(data).read_bytes(), "ascii") == string

View File

@ -0,0 +1,28 @@
from pytest import mark
from telethon._impl.tl.core import serialize_bytes_to
@mark.parametrize(
("string", "prefix", "suffix"),
[
("", b"\00", b"\00\x00\x00"),
("Hi", b"\02", b"\00"),
("Hi!", b"\03", b""),
("Hello", b"\05", b"\00\x00"),
("Hello, world!", b"\x0d", b"\00\x00"),
(
"This is a very long string, and it has to be longer than 253 \
characters, which are quite a few but we can make it! Although, \
it is quite challenging. The quick brown fox jumps over the lazy \
fox. There is still some more text we need to type. Oh, this \
sentence made it past!",
b"\xfe\x11\x01\x00",
b"\x00\x00\x00",
),
],
)
def test_string(string: str, prefix: bytes, suffix: bytes) -> None:
expected = prefix + string.encode("ascii") + suffix
buffer = bytearray()
serialize_bytes_to(buffer, string.encode("ascii"))
assert bytes(buffer) == expected

View File

@ -14,17 +14,23 @@ from .serde.deserialization import generate_read
from .serde.serialization import generate_function, generate_write from .serde.serialization import generate_function, generate_write
def generate_init(writer: SourceWriter, namespaces: Set[str]) -> None: def generate_init(
sorted_ns = list(namespaces) writer: SourceWriter, namespaces: Set[str], classes: Set[str]
sorted_ns.sort() ) -> None:
sorted_cls = list(sorted(classes))
sorted_ns = list(sorted(namespaces))
if sorted_cls:
sorted_import = ", ".join(sorted_cls)
writer.write(f"from ._nons import {sorted_import}")
if sorted_ns: if sorted_ns:
sorted_import = ", ".join(sorted_ns) sorted_import = ", ".join(sorted_ns)
writer.write(f"from ._nons import *")
writer.write(f"from . import {sorted_import}") writer.write(f"from . import {sorted_import}")
sorted_all = ", ".join(f"{ns!r}" for ns in sorted_ns) if sorted_cls or sorted_ns:
writer.write(f"__all__ = [{sorted_all}]") sorted_all = ", ".join(f"{ns!r}" for ns in sorted_cls + sorted_ns)
writer.write(f"__all__ = [{sorted_all}]")
def generate(fs: FakeFs, tl: ParsedTl) -> None: def generate(fs: FakeFs, tl: ParsedTl) -> None:
@ -39,7 +45,10 @@ def generate(fs: FakeFs, tl: ParsedTl) -> None:
type_namespaces = set() type_namespaces = set()
function_namespaces = set() function_namespaces = set()
generated_type_names = [] abc_class_names = set()
type_class_names = set()
function_def_names = set()
generated_type_names = set()
for typedef in tl.typedefs: for typedef in tl.typedefs:
if typedef.ty.full_name not in generated_types: if typedef.ty.full_name not in generated_types:
@ -49,11 +58,12 @@ def generate(fs: FakeFs, tl: ParsedTl) -> None:
abc_namespaces.add(typedef.ty.namespace[0]) abc_namespaces.add(typedef.ty.namespace[0])
abc_path = (Path("abcs") / typedef.ty.namespace[0]).with_suffix(".py") abc_path = (Path("abcs") / typedef.ty.namespace[0]).with_suffix(".py")
else: else:
abc_class_names.add(to_class_name(typedef.ty.name))
abc_path = Path("abcs/_nons.py") abc_path = Path("abcs/_nons.py")
if abc_path not in fs: if abc_path not in fs:
fs.write(abc_path, "from abc import ABCMeta\n") fs.write(abc_path, "from abc import ABCMeta\n")
fs.write(abc_path, "from ..core.serializable import Serializable\n") fs.write(abc_path, "from ..core import Serializable\n")
fs.write( fs.write(
abc_path, abc_path,
@ -72,6 +82,7 @@ def generate(fs: FakeFs, tl: ParsedTl) -> None:
type_namespaces.add(typedef.namespace[0]) type_namespaces.add(typedef.namespace[0])
type_path = (Path("types") / typedef.namespace[0]).with_suffix(".py") type_path = (Path("types") / typedef.namespace[0]).with_suffix(".py")
else: else:
type_class_names.add(to_class_name(typedef.name))
type_path = Path("types/_nons.py") type_path = Path("types/_nons.py")
writer = fs.open(type_path) writer = fs.open(type_path)
@ -80,11 +91,10 @@ def generate(fs: FakeFs, tl: ParsedTl) -> None:
writer.write(f"import struct") writer.write(f"import struct")
writer.write(f"from typing import List, Optional, Self") writer.write(f"from typing import List, Optional, Self")
writer.write(f"from .. import abcs") writer.write(f"from .. import abcs")
writer.write(f"from ..core.reader import Reader") writer.write(f"from ..core import Reader, serialize_bytes_to")
writer.write(f"from ..core.serializable import serialize_bytes_to")
ns = f"{typedef.namespace[0]}." if typedef.namespace else "" ns = f"{typedef.namespace[0]}." if typedef.namespace else ""
generated_type_names.append(f"{ns}{to_class_name(typedef.name)}") generated_type_names.add(f"{ns}{to_class_name(typedef.name)}")
# class Type(BaseType) # class Type(BaseType)
writer.write( writer.write(
@ -138,6 +148,7 @@ def generate(fs: FakeFs, tl: ParsedTl) -> None:
".py" ".py"
) )
else: else:
function_def_names.add(to_method_name(functiondef.name))
function_path = Path("functions/_nons.py") function_path = Path("functions/_nons.py")
writer = fs.open(function_path) writer = fs.open(function_path)
@ -146,8 +157,7 @@ def generate(fs: FakeFs, tl: ParsedTl) -> None:
writer.write(f"import struct") writer.write(f"import struct")
writer.write(f"from typing import List, Optional, Self") writer.write(f"from typing import List, Optional, Self")
writer.write(f"from .. import abcs") writer.write(f"from .. import abcs")
writer.write(f"from ..core.request import Request") writer.write(f"from ..core import Request, serialize_bytes_to")
writer.write(f"from ..core.serializable import serialize_bytes_to")
# def name(params, ...) # def name(params, ...)
params = ", ".join(f"{p.name}: {param_type_fmt(p.ty)}" for p in required_params) params = ", ".join(f"{p.name}: {param_type_fmt(p.ty)}" for p in required_params)
@ -156,11 +166,12 @@ def generate(fs: FakeFs, tl: ParsedTl) -> None:
generate_function(writer, functiondef) generate_function(writer, functiondef)
writer.dedent(2) writer.dedent(2)
generate_init(fs.open(Path("abcs/__init__.py")), abc_namespaces) generate_init(fs.open(Path("abcs/__init__.py")), abc_namespaces, abc_class_names)
generate_init(fs.open(Path("types/__init__.py")), type_namespaces) generate_init(fs.open(Path("types/__init__.py")), type_namespaces, type_class_names)
generate_init(fs.open(Path("functions/__init__.py")), function_namespaces) generate_init(
fs.open(Path("functions/__init__.py")), function_namespaces, function_def_names
)
generated_type_names.sort()
writer = fs.open(Path("layer.py")) writer = fs.open(Path("layer.py"))
writer.write(f"from . import types") writer.write(f"from . import types")
writer.write(f"from .core import Serializable, Reader") writer.write(f"from .core import Serializable, Reader")
@ -169,7 +180,7 @@ def generate(fs: FakeFs, tl: ParsedTl) -> None:
writer.write( writer.write(
"TYPE_MAPPING = {t.constructor_id(): t for t in cast(Tuple[Type[Serializable]], (" "TYPE_MAPPING = {t.constructor_id(): t for t in cast(Tuple[Type[Serializable]], ("
) )
for name in generated_type_names: for name in sorted(generated_type_names):
writer.write(f" types.{name},") writer.write(f" types.{name},")
writer.write("))}") writer.write("))}")
writer.write( writer.write(

View File

@ -1,3 +1,4 @@
import struct
from itertools import groupby from itertools import groupby
from typing import Iterator from typing import Iterator
@ -127,7 +128,8 @@ def generate_write(writer: SourceWriter, defn: Definition) -> None:
def generate_function(writer: SourceWriter, defn: Definition) -> None: def generate_function(writer: SourceWriter, defn: Definition) -> None:
tmp_names = gen_tmp_names() tmp_names = gen_tmp_names()
writer.write("_buffer = bytearray()") serialized_cid = struct.pack("<I", defn.id)
writer.write(f"_buffer = bytearray({serialized_cid!r})")
for trivial, iter in groupby( for trivial, iter in groupby(
defn.params, defn.params,
key=lambda p: is_trivial(p.ty), key=lambda p: is_trivial(p.ty),

View File

@ -1 +1,2 @@
# https://peps.python.org/pep-0440/
__version__ = "0.1.0" __version__ = "0.1.0"