From 332215ea2eee1293d0aa61d39ac7ae14e363d06d Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 31 Aug 2023 23:05:49 +0200 Subject: [PATCH] Bring client parsers for commonmark and HTML --- client/pyproject.toml | 1 + client/src/telethon/_impl/client/__init__.py | 0 .../telethon/_impl/client/parsers/__init__.py | 11 + .../src/telethon/_impl/client/parsers/html.py | 207 ++++++++++++++++++ .../telethon/_impl/client/parsers/markdown.py | 191 ++++++++++++++++ .../telethon/_impl/client/parsers/strings.py | 75 +++++++ client/tests/test_parsers.py | 151 +++++++++++++ 7 files changed, 636 insertions(+) create mode 100644 client/src/telethon/_impl/client/__init__.py create mode 100644 client/src/telethon/_impl/client/parsers/__init__.py create mode 100644 client/src/telethon/_impl/client/parsers/html.py create mode 100644 client/src/telethon/_impl/client/parsers/markdown.py create mode 100644 client/src/telethon/_impl/client/parsers/strings.py create mode 100644 client/tests/test_parsers.py diff --git a/client/pyproject.toml b/client/pyproject.toml index 3052714c..bd940936 100644 --- a/client/pyproject.toml +++ b/client/pyproject.toml @@ -21,6 +21,7 @@ classifiers = [ dependencies = [ "pyaes~=1.6", "rsa~=4.9", + "markdown-it-py~=3.0", ] dynamic = ["version"] diff --git a/client/src/telethon/_impl/client/__init__.py b/client/src/telethon/_impl/client/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/client/src/telethon/_impl/client/parsers/__init__.py b/client/src/telethon/_impl/client/parsers/__init__.py new file mode 100644 index 00000000..4d3985fb --- /dev/null +++ b/client/src/telethon/_impl/client/parsers/__init__.py @@ -0,0 +1,11 @@ +from .html import parse as parse_html_message +from .html import unparse as generate_html_message +from .markdown import parse as parse_markdown_message +from .markdown import unparse as generate_markdown_message + +__all__ = [ + "generate_html_message", + "parse_html_message", + "generate_markdown_message", + "parse_markdown_message", +] diff --git a/client/src/telethon/_impl/client/parsers/html.py b/client/src/telethon/_impl/client/parsers/html.py new file mode 100644 index 00000000..e7b8f739 --- /dev/null +++ b/client/src/telethon/_impl/client/parsers/html.py @@ -0,0 +1,207 @@ +from collections import deque +from html import escape +from html.parser import HTMLParser +from typing import Any, Deque, Dict, Iterable, List, Optional, Tuple, Type, cast + +from ...tl.abcs import MessageEntity +from ...tl.types import ( + MessageEntityBlockquote, + MessageEntityBold, + MessageEntityCode, + MessageEntityEmail, + MessageEntityItalic, + MessageEntityMentionName, + MessageEntityPre, + MessageEntitySpoiler, + MessageEntityStrike, + MessageEntityTextUrl, + MessageEntityUnderline, + MessageEntityUrl, +) +from .strings import add_surrogate, del_surrogate, strip_text, within_surrogate + + +class HTMLToTelegramParser(HTMLParser): + def __init__(self) -> None: + super().__init__() + self.text = "" + self.entities: List[MessageEntity] = [] + self._building_entities: Dict[str, MessageEntity] = {} + self._open_tags: Deque[str] = deque() + self._open_tags_meta: Deque[Optional[str]] = deque() + + def handle_starttag( + self, tag: str, attrs_seq: List[Tuple[str, Optional[str]]] + ) -> None: + self._open_tags.appendleft(tag) + self._open_tags_meta.appendleft(None) + + attrs = dict(attrs_seq) + EntityType: Optional[Type[MessageEntity]] = None + args = {} + if tag == "strong" or tag == "b": + EntityType = MessageEntityBold + elif tag == "em" or tag == "i": + EntityType = MessageEntityItalic + elif tag == "u": + EntityType = MessageEntityUnderline + elif tag == "del" or tag == "s": + EntityType = MessageEntityStrike + elif tag == "blockquote": + EntityType = MessageEntityBlockquote + elif tag == "details": + EntityType = MessageEntitySpoiler + elif tag == "code": + try: + # If we're in the middle of a
 tag, this  tag is
+                # probably intended for syntax highlighting.
+                #
+                # Syntax highlighting is set with
+                #     codeblock
+                # inside 
 tags
+                pre = self._building_entities["pre"]
+                assert isinstance(pre, MessageEntityPre)
+                if cls := attrs.get("class"):
+                    pre.language = cls[len("language-") :]
+            except KeyError:
+                EntityType = MessageEntityCode
+        elif tag == "pre":
+            EntityType = MessageEntityPre
+            args["language"] = ""
+        elif tag == "a":
+            url = attrs.get("href")
+            if not url:
+                return
+            if url.startswith("mailto:"):
+                url = url[len("mailto:") :]
+                EntityType = MessageEntityEmail
+            else:
+                if self.get_starttag_text() == url:
+                    EntityType = MessageEntityUrl
+                else:
+                    EntityType = MessageEntityTextUrl
+                    args["url"] = del_surrogate(url)
+                    url = None
+            self._open_tags_meta.popleft()
+            self._open_tags_meta.appendleft(url)
+
+        if EntityType and tag not in self._building_entities:
+            Et = cast(Any, EntityType)
+            self._building_entities[tag] = Et(
+                offset=len(self.text),
+                # The length will be determined when closing the tag.
+                length=0,
+                **args,
+            )
+
+    def handle_data(self, text: str) -> None:
+        previous_tag = self._open_tags[0] if len(self._open_tags) > 0 else ""
+        if previous_tag == "a":
+            url = self._open_tags_meta[0]
+            if url:
+                text = url
+
+        for entity in self._building_entities.values():
+            assert hasattr(entity, "length")
+            entity.length += len(text)
+
+        self.text += text
+
+    def handle_endtag(self, tag: str) -> None:
+        try:
+            self._open_tags.popleft()
+            self._open_tags_meta.popleft()
+        except IndexError:
+            pass
+        entity = self._building_entities.pop(tag, None)
+        if entity and hasattr(entity, "length") and entity.length:
+            self.entities.append(entity)
+
+
+def parse(html: str) -> Tuple[str, List[MessageEntity]]:
+    """
+    Parses the given HTML message and returns its stripped representation
+    plus a list of the MessageEntity's that were found.
+
+    :param html: the message with HTML to be parsed.
+    :return: a tuple consisting of (clean message, [message entities]).
+    """
+    if not html:
+        return html, []
+
+    parser = HTMLToTelegramParser()
+    parser.feed(add_surrogate(html))
+    text = strip_text(parser.text, parser.entities)
+    return del_surrogate(text), parser.entities
+
+
+ENTITY_TO_FORMATTER = {
+    MessageEntityBold: ("", ""),
+    MessageEntityItalic: ("", ""),
+    MessageEntityCode: ("", ""),
+    MessageEntityUnderline: ("", ""),
+    MessageEntityStrike: ("", ""),
+    MessageEntityBlockquote: ("
", "
"), + MessageEntitySpoiler: ("
", "
"), + MessageEntityPre: lambda e, _: ( + '
'.format(e.language) if e.language else "
",
+        "
" if e.language else "
", + ), + MessageEntityEmail: lambda _, t: (''.format(t), ""), + MessageEntityUrl: lambda _, t: (''.format(t), ""), + MessageEntityTextUrl: lambda e, _: (''.format(escape(e.url)), ""), + MessageEntityMentionName: lambda e, _: ( + ''.format(e.user_id), + "", + ), +} + + +def unparse(text: str, entities: Iterable[MessageEntity]) -> str: + """ + Performs the reverse operation to .parse(), effectively returning HTML + given a normal text and its MessageEntity's. + + :param text: the text to be reconverted into HTML. + :param entities: the MessageEntity's applied to the text. + :return: a HTML representation of the combination of both inputs. + """ + if not text: + return text + elif not entities: + return escape(text) + + text = add_surrogate(text) + insert_at: List[Tuple[int, str]] = [] + for entity in entities: + assert hasattr(entity, "offset") and hasattr(entity, "length") + s = entity.offset + e = entity.offset + entity.length + delimiter = ENTITY_TO_FORMATTER.get(type(entity), None) + if delimiter: + if callable(delimiter): + delim = delimiter(entity, text[s:e]) + else: + delim = delimiter + insert_at.append((s, delim[0])) + insert_at.append((e, delim[1])) + + insert_at.sort(key=lambda t: t[0]) + next_escape_bound = len(text) + while insert_at: + # Same logic as markdown.py + at, what = insert_at.pop() + while within_surrogate(text, at): + at += 1 + + text = ( + text[:at] + + what + + escape(text[at:next_escape_bound]) + + text[next_escape_bound:] + ) + next_escape_bound = at + + text = escape(text[:next_escape_bound]) + text[next_escape_bound:] + + return del_surrogate(text) diff --git a/client/src/telethon/_impl/client/parsers/markdown.py b/client/src/telethon/_impl/client/parsers/markdown.py new file mode 100644 index 00000000..4701d531 --- /dev/null +++ b/client/src/telethon/_impl/client/parsers/markdown.py @@ -0,0 +1,191 @@ +import re +from typing import Any, Iterator, List, Tuple + +import markdown_it +import markdown_it.token + +from ...tl.abcs import MessageEntity +from ...tl.types import ( + MessageEntityBlockquote, + MessageEntityBold, + MessageEntityCode, + MessageEntityItalic, + MessageEntityMentionName, + MessageEntityPre, + MessageEntityStrike, + MessageEntityTextUrl, + MessageEntityUnderline, +) +from .strings import add_surrogate, del_surrogate, within_surrogate + +MARKDOWN = markdown_it.MarkdownIt().enable("strikethrough") +DELIMITERS = { + MessageEntityBlockquote: ("> ", ""), + MessageEntityBold: ("**", "**"), + MessageEntityCode: ("`", "`"), + MessageEntityItalic: ("_", "_"), + MessageEntityStrike: ("~~", "~~"), + MessageEntityUnderline: ("# ", ""), +} + +# Not trying to be complete; just enough to have an alternative (mostly for inline underline). +# The fact headings are treated as underline is an implementation detail. +TAG_PATTERN = re.compile(r"<\s*(/?)\s*(\w+)") +HTML_TO_TYPE = { + "i": ("em_close", "em_open"), + "em": ("em_close", "em_open"), + "b": ("strong_close", "strong_open"), + "strong": ("strong_close", "strong_open"), + "s": ("s_close", "s_open"), + "del": ("s_close", "s_open"), + "u": ("heading_open", "heading_close"), + "mark": ("heading_open", "heading_close"), +} + + +def expand_inline_and_html( + tokens: List[markdown_it.token.Token], +) -> Iterator[markdown_it.token.Token]: + for token in tokens: + if token.type == "inline": + if token.children: + yield from expand_inline_and_html(token.children) + elif token.type == "html_inline": + match = TAG_PATTERN.match(token.content) + if match: + close, tag = match.groups() + tys = HTML_TO_TYPE.get(tag.lower()) + if tys: + token.type = tys[bool(close)] + token.nesting = -1 if close else 1 + yield token + else: + yield token + + +def parse(message: str) -> Tuple[str, List[MessageEntity]]: + """ + Parses the given markdown message and returns its stripped representation + plus a list of the MessageEntity's that were found. + """ + if not message: + return message, [] + + entities: List[MessageEntity] + token: markdown_it.token.Token + + def push(ty: Any, **extra: object) -> None: + nonlocal message, entities, token + if token.nesting > 0: + entities.append(ty(offset=len(message), length=0, **extra)) + else: + for entity in reversed(entities): + if isinstance(entity, ty): + entity.length = len(message) - entity.offset + break + + parsed = MARKDOWN.parse(add_surrogate(message.strip())) + message = "" + entities = [] + last_map = [0, 0] + for token in expand_inline_and_html(parsed): + if token.map is not None and token.map != last_map: + # paragraphs, quotes fences have a line mapping. Use it to determine how many newlines to insert. + # But don't inssert any (leading) new lines if we're yet to reach the first textual content, or + # if the mappings are the same (e.g. a quote then opens a paragraph but the mapping is equal). + if message: + message += "\n" + "\n" * (token.map[0] - last_map[-1]) + last_map = token.map + + if token.type in ("blockquote_close", "blockquote_open"): + push(MessageEntityBlockquote) + elif token.type == "code_block": + entities.append( + MessageEntityPre( + offset=len(message), length=len(token.content), language="" + ) + ) + message += token.content + elif token.type == "code_inline": + entities.append( + MessageEntityCode(offset=len(message), length=len(token.content)) + ) + message += token.content + elif token.type in ("em_close", "em_open"): + push(MessageEntityItalic) + elif token.type == "fence": + entities.append( + MessageEntityPre( + offset=len(message), length=len(token.content), language=token.info + ) + ) + message += token.content[:-1] # remove a single trailing newline + elif token.type == "hardbreak": + message += "\n" + elif token.type in ("heading_close", "heading_open"): + push(MessageEntityUnderline) + elif token.type == "hr": + message += "\u2015\n\n" + elif token.type in ("link_close", "link_open"): + if ( + token.markup != "autolink" + ): # telegram already picks up on these automatically + push(MessageEntityTextUrl, url=token.attrs.get("href")) + elif token.type in ("s_close", "s_open"): + push(MessageEntityStrike) + elif token.type == "softbreak": + message += " " + elif token.type in ("strong_close", "strong_open"): + push(MessageEntityBold) + elif token.type == "text": + message += token.content + + return del_surrogate(message), entities + + +def unparse(text: str, entities: List[MessageEntity]) -> str: + """ + Performs the reverse operation to .parse(), effectively returning + markdown-like syntax given a normal text and its MessageEntity's. + + Because there are many possible ways for markdown to produce a certain + output, this function cannot invert .parse() perfectly. + """ + if not text or not entities: + return text + + text = add_surrogate(text) + insert_at: List[Tuple[int, str]] = [] + for entity in entities: + assert hasattr(entity, "offset") + assert hasattr(entity, "length") + s = entity.offset + e = entity.offset + entity.length + delimiter = DELIMITERS.get(type(entity), None) + if delimiter: + insert_at.append((s, delimiter[0])) + insert_at.append((e, delimiter[1])) + elif isinstance(entity, MessageEntityPre): + insert_at.append((s, f"```{entity.language}\n")) + insert_at.append((e, "```\n")) + elif isinstance(entity, MessageEntityTextUrl): + insert_at.append((s, "[")) + insert_at.append((e, f"]({entity.url})")) + elif isinstance(entity, MessageEntityMentionName): + insert_at.append((s, "[")) + insert_at.append((e, f"](tg://user?id={entity.user_id})")) + + insert_at.sort(key=lambda t: t[0]) + while insert_at: + at, what = insert_at.pop() + + # If we are in the middle of a surrogate nudge the position by -1. + # Otherwise we would end up with malformed text and fail to encode. + # For example of bad input: "Hi \ud83d\ude1c" + # https://en.wikipedia.org/wiki/UTF-16#U+010000_to_U+10FFFF + while within_surrogate(text, at): + at += 1 + + text = text[:at] + what + text[at:] + + return del_surrogate(text) diff --git a/client/src/telethon/_impl/client/parsers/strings.py b/client/src/telethon/_impl/client/parsers/strings.py new file mode 100644 index 00000000..1d7f8fb3 --- /dev/null +++ b/client/src/telethon/_impl/client/parsers/strings.py @@ -0,0 +1,75 @@ +import struct +from typing import List, Optional + +from ...tl.abcs import MessageEntity + + +def add_surrogate(text: str) -> str: + return "".join( + # SMP -> Surrogate Pairs (Telegram offsets are calculated with these). + # See https://en.wikipedia.org/wiki/Plane_(Unicode)#Overview for more. + "".join(chr(y) for y in struct.unpack(" str: + return text.encode("utf-16", "surrogatepass").decode("utf-16") + + +def within_surrogate(text: str, index: int, *, length: Optional[int] = None) -> bool: + """ + `True` if ``index`` is within a surrogate (before and after it, not at!). + """ + if length is None: + length = len(text) + + return ( + 1 < index < len(text) # in bounds + and "\ud800" <= text[index - 1] <= "\udfff" # previous is + and "\ud800" <= text[index] <= "\udfff" # current is + ) + + +def strip_text(text: str, entities: List[MessageEntity]) -> str: + """ + Strips whitespace from the given text modifying the provided entities. + + This assumes that there are no overlapping entities, that their length + is greater or equal to one, and that their length is not out of bounds. + """ + if not entities: + return text.strip() + + while text and text[-1].isspace(): + e = entities[-1] + assert hasattr(e, "offset") and hasattr(e, "length") + if e.offset + e.length == len(text): + if e.length == 1: + del entities[-1] + if not entities: + return text.strip() + else: + e.length -= 1 + text = text[:-1] + + while text and text[0].isspace(): + for i in reversed(range(len(entities))): + e = entities[i] + assert hasattr(e, "offset") and hasattr(e, "length") + if e.offset != 0: + e.offset -= 1 + continue + + if e.length == 1: + del entities[0] + if not entities: + return text.lstrip() + else: + e.length -= 1 + + text = text[1:] + + return text diff --git a/client/tests/test_parsers.py b/client/tests/test_parsers.py new file mode 100644 index 00000000..4541306a --- /dev/null +++ b/client/tests/test_parsers.py @@ -0,0 +1,151 @@ +from telethon._impl.client.parsers import ( + generate_html_message, + generate_markdown_message, + parse_html_message, + parse_markdown_message, +) +from telethon._impl.tl import types + + +def test_parse_leading_markdown() -> None: + markdown = "**Hello** world!" + text, entities = parse_markdown_message(markdown) + assert text == "Hello world!" + assert entities == [types.MessageEntityBold(offset=0, length=5)] + + +def test_parse_trailing_markdown() -> None: + markdown = "Hello **world!**" + text, entities = parse_markdown_message(markdown) + assert text == "Hello world!" + assert entities == [types.MessageEntityBold(offset=6, length=6)] + + +def test_parse_emoji_markdown() -> None: + markdown = "A **little 🦀** here" + text, entities = parse_markdown_message(markdown) + assert text == "A little 🦀 here" + assert entities == [types.MessageEntityBold(offset=2, length=9)] + + +def test_parse_all_entities_markdown() -> None: + markdown = "Some **bold** (__strong__), *italics* (_cursive_), inline `code`, a\n```rust\npre\n```\nblock, a [link](https://example.com), and [mentions](tg://user?id=12345678)" + text, entities = parse_markdown_message(markdown) + + assert ( + text + == "Some bold (strong), italics (cursive), inline code, a\npre\nblock, a link, and mentions" + ) + assert entities == [ + types.MessageEntityBold(offset=5, length=4), + types.MessageEntityBold(offset=11, length=6), + types.MessageEntityItalic(offset=20, length=7), + types.MessageEntityItalic(offset=29, length=7), + types.MessageEntityCode(offset=46, length=4), + types.MessageEntityPre(offset=54, length=4, language="rust"), + types.MessageEntityTextUrl(offset=67, length=4, url="https://example.com"), + types.MessageEntityTextUrl(offset=77, length=8, url="tg://user?id=12345678"), + ] + + +def test_parse_nested_entities_markdown() -> None: + # CommonMark won't allow the following="Some **bold _both** italics_" + markdown = "Some **bold _both_** _italics_" + text, entities = parse_markdown_message(markdown) + assert text == "Some bold both italics" + assert entities == [ + types.MessageEntityBold(offset=5, length=9), + types.MessageEntityItalic(offset=10, length=4), + types.MessageEntityItalic(offset=15, length=7), + ] + + +def test_parse_then_unparse_markdown() -> None: + markdown = "Some **bold 🤷🏽‍♀️**, _italics_, inline `🤷🏽‍♀️ code`, a\n\n```rust\npre\n```\nblock, a [link](https://example.com), and [mentions](tg://user?id=12345678)" + text, entities = parse_markdown_message(markdown) + generated = generate_markdown_message(text, entities) + assert generated == markdown + + +def test_parse_leading_html() -> None: + # Intentionally use different casing to make sure that is handled well + html = "Hello world!" + text, entities = parse_html_message(html) + assert text == "Hello world!" + assert entities == [types.MessageEntityBold(offset=0, length=5)] + + +def test_parse_trailing_html() -> None: + html = "Hello world!" + text, entities = parse_html_message(html) + assert text == "Hello world!" + assert entities == [types.MessageEntityBold(offset=6, length=6)] + + +def test_parse_emoji_html() -> None: + html = "A little 🦀 here" + text, entities = parse_html_message(html) + assert text == "A little 🦀 here" + assert entities == [types.MessageEntityBold(offset=2, length=9)] + + +def test_parse_all_entities_html() -> None: + html = 'Some bold (strong), italics (cursive), inline code, a
pre
block, a link,
spoilers
and mentions' + text, entities = parse_html_message(html) + assert ( + text + == "Some bold (strong), italics (cursive), inline code, a pre block, a link, spoilers and mentions" + ) + assert entities == [ + types.MessageEntityBold(offset=5, length=4), + types.MessageEntityBold(offset=11, length=6), + types.MessageEntityItalic(offset=20, length=7), + types.MessageEntityItalic(offset=29, length=7), + types.MessageEntityCode(offset=46, length=4), + types.MessageEntityPre(offset=54, length=3, language=""), + types.MessageEntityTextUrl(offset=67, length=4, url="https://example.com"), + types.MessageEntitySpoiler(offset=73, length=8), + types.MessageEntityTextUrl(offset=86, length=8, url="tg://user?id=12345678"), + ] + + +def test_parse_pre_with_lang_html() -> None: + html = 'Some
pre
, normal and
rusty
code' + text, entities = parse_html_message(html) + assert text == "Some pre, normal and rusty code" + assert entities == [ + types.MessageEntityPre(offset=5, length=3, language=""), + types.MessageEntityCode(offset=10, length=6), + types.MessageEntityPre(offset=21, length=5, language="rust"), + ] + + +def test_parse_empty_pre_and_lang_html() -> None: + html = 'Some empty
 and code'
+    text, entities = parse_html_message(html)
+    assert text == "Some empty  and code"
+    assert entities == [types.MessageEntityCode(offset=16, length=4)]
+
+
+def test_parse_link_no_href_html() -> None:
+    html = "Some empty link, it does nothing"
+    text, entities = parse_html_message(html)
+    assert text == "Some empty link, it does nothing"
+    assert entities == []
+
+
+def test_parse_nested_entities_html() -> None:
+    html = "Some bold both italics"
+    text, entities = parse_html_message(html)
+    assert text == "Some bold both italics"
+    assert entities == [
+        types.MessageEntityBold(offset=5, length=9),
+        types.MessageEntityItalic(offset=10, length=12),
+    ]
+
+
+def test_parse_then_unparse_html() -> None:
+    html = 'Some bold, italics inline code, a 
pre
block
use rust;
, a link,
spoilers
and mentions' + text, entities = parse_html_message(html) + generated = generate_html_message(text, entities) + assert generated == html