Bring client parsers for commonmark and HTML

This commit is contained in:
Lonami Exo 2023-08-31 23:05:49 +02:00
parent 69d7941852
commit 332215ea2e
7 changed files with 636 additions and 0 deletions

View File

@ -21,6 +21,7 @@ classifiers = [
dependencies = [
"pyaes~=1.6",
"rsa~=4.9",
"markdown-it-py~=3.0",
]
dynamic = ["version"]

View File

@ -0,0 +1,11 @@
from .html import parse as parse_html_message
from .html import unparse as generate_html_message
from .markdown import parse as parse_markdown_message
from .markdown import unparse as generate_markdown_message
__all__ = [
"generate_html_message",
"parse_html_message",
"generate_markdown_message",
"parse_markdown_message",
]

View File

@ -0,0 +1,207 @@
from collections import deque
from html import escape
from html.parser import HTMLParser
from typing import Any, Deque, Dict, Iterable, List, Optional, Tuple, Type, cast
from ...tl.abcs import MessageEntity
from ...tl.types import (
MessageEntityBlockquote,
MessageEntityBold,
MessageEntityCode,
MessageEntityEmail,
MessageEntityItalic,
MessageEntityMentionName,
MessageEntityPre,
MessageEntitySpoiler,
MessageEntityStrike,
MessageEntityTextUrl,
MessageEntityUnderline,
MessageEntityUrl,
)
from .strings import add_surrogate, del_surrogate, strip_text, within_surrogate
class HTMLToTelegramParser(HTMLParser):
def __init__(self) -> None:
super().__init__()
self.text = ""
self.entities: List[MessageEntity] = []
self._building_entities: Dict[str, MessageEntity] = {}
self._open_tags: Deque[str] = deque()
self._open_tags_meta: Deque[Optional[str]] = deque()
def handle_starttag(
self, tag: str, attrs_seq: List[Tuple[str, Optional[str]]]
) -> None:
self._open_tags.appendleft(tag)
self._open_tags_meta.appendleft(None)
attrs = dict(attrs_seq)
EntityType: Optional[Type[MessageEntity]] = None
args = {}
if tag == "strong" or tag == "b":
EntityType = MessageEntityBold
elif tag == "em" or tag == "i":
EntityType = MessageEntityItalic
elif tag == "u":
EntityType = MessageEntityUnderline
elif tag == "del" or tag == "s":
EntityType = MessageEntityStrike
elif tag == "blockquote":
EntityType = MessageEntityBlockquote
elif tag == "details":
EntityType = MessageEntitySpoiler
elif tag == "code":
try:
# If we're in the middle of a <pre> tag, this <code> tag is
# probably intended for syntax highlighting.
#
# Syntax highlighting is set with
# <code class='language-...'>codeblock</code>
# inside <pre> tags
pre = self._building_entities["pre"]
assert isinstance(pre, MessageEntityPre)
if cls := attrs.get("class"):
pre.language = cls[len("language-") :]
except KeyError:
EntityType = MessageEntityCode
elif tag == "pre":
EntityType = MessageEntityPre
args["language"] = ""
elif tag == "a":
url = attrs.get("href")
if not url:
return
if url.startswith("mailto:"):
url = url[len("mailto:") :]
EntityType = MessageEntityEmail
else:
if self.get_starttag_text() == url:
EntityType = MessageEntityUrl
else:
EntityType = MessageEntityTextUrl
args["url"] = del_surrogate(url)
url = None
self._open_tags_meta.popleft()
self._open_tags_meta.appendleft(url)
if EntityType and tag not in self._building_entities:
Et = cast(Any, EntityType)
self._building_entities[tag] = Et(
offset=len(self.text),
# The length will be determined when closing the tag.
length=0,
**args,
)
def handle_data(self, text: str) -> None:
previous_tag = self._open_tags[0] if len(self._open_tags) > 0 else ""
if previous_tag == "a":
url = self._open_tags_meta[0]
if url:
text = url
for entity in self._building_entities.values():
assert hasattr(entity, "length")
entity.length += len(text)
self.text += text
def handle_endtag(self, tag: str) -> None:
try:
self._open_tags.popleft()
self._open_tags_meta.popleft()
except IndexError:
pass
entity = self._building_entities.pop(tag, None)
if entity and hasattr(entity, "length") and entity.length:
self.entities.append(entity)
def parse(html: str) -> Tuple[str, List[MessageEntity]]:
"""
Parses the given HTML message and returns its stripped representation
plus a list of the MessageEntity's that were found.
:param html: the message with HTML to be parsed.
:return: a tuple consisting of (clean message, [message entities]).
"""
if not html:
return html, []
parser = HTMLToTelegramParser()
parser.feed(add_surrogate(html))
text = strip_text(parser.text, parser.entities)
return del_surrogate(text), parser.entities
ENTITY_TO_FORMATTER = {
MessageEntityBold: ("<strong>", "</strong>"),
MessageEntityItalic: ("<em>", "</em>"),
MessageEntityCode: ("<code>", "</code>"),
MessageEntityUnderline: ("<u>", "</u>"),
MessageEntityStrike: ("<del>", "</del>"),
MessageEntityBlockquote: ("<blockquote>", "</blockquote>"),
MessageEntitySpoiler: ("<details>", "</details>"),
MessageEntityPre: lambda e, _: (
'<pre><code class="language-{}">'.format(e.language) if e.language else "<pre>",
"</code></pre>" if e.language else "</pre>",
),
MessageEntityEmail: lambda _, t: ('<a href="mailto:{}">'.format(t), "</a>"),
MessageEntityUrl: lambda _, t: ('<a href="{}">'.format(t), "</a>"),
MessageEntityTextUrl: lambda e, _: ('<a href="{}">'.format(escape(e.url)), "</a>"),
MessageEntityMentionName: lambda e, _: (
'<a href="tg://user?id={}">'.format(e.user_id),
"</a>",
),
}
def unparse(text: str, entities: Iterable[MessageEntity]) -> str:
"""
Performs the reverse operation to .parse(), effectively returning HTML
given a normal text and its MessageEntity's.
:param text: the text to be reconverted into HTML.
:param entities: the MessageEntity's applied to the text.
:return: a HTML representation of the combination of both inputs.
"""
if not text:
return text
elif not entities:
return escape(text)
text = add_surrogate(text)
insert_at: List[Tuple[int, str]] = []
for entity in entities:
assert hasattr(entity, "offset") and hasattr(entity, "length")
s = entity.offset
e = entity.offset + entity.length
delimiter = ENTITY_TO_FORMATTER.get(type(entity), None)
if delimiter:
if callable(delimiter):
delim = delimiter(entity, text[s:e])
else:
delim = delimiter
insert_at.append((s, delim[0]))
insert_at.append((e, delim[1]))
insert_at.sort(key=lambda t: t[0])
next_escape_bound = len(text)
while insert_at:
# Same logic as markdown.py
at, what = insert_at.pop()
while within_surrogate(text, at):
at += 1
text = (
text[:at]
+ what
+ escape(text[at:next_escape_bound])
+ text[next_escape_bound:]
)
next_escape_bound = at
text = escape(text[:next_escape_bound]) + text[next_escape_bound:]
return del_surrogate(text)

View File

@ -0,0 +1,191 @@
import re
from typing import Any, Iterator, List, Tuple
import markdown_it
import markdown_it.token
from ...tl.abcs import MessageEntity
from ...tl.types import (
MessageEntityBlockquote,
MessageEntityBold,
MessageEntityCode,
MessageEntityItalic,
MessageEntityMentionName,
MessageEntityPre,
MessageEntityStrike,
MessageEntityTextUrl,
MessageEntityUnderline,
)
from .strings import add_surrogate, del_surrogate, within_surrogate
MARKDOWN = markdown_it.MarkdownIt().enable("strikethrough")
DELIMITERS = {
MessageEntityBlockquote: ("> ", ""),
MessageEntityBold: ("**", "**"),
MessageEntityCode: ("`", "`"),
MessageEntityItalic: ("_", "_"),
MessageEntityStrike: ("~~", "~~"),
MessageEntityUnderline: ("# ", ""),
}
# Not trying to be complete; just enough to have an alternative (mostly for inline underline).
# The fact headings are treated as underline is an implementation detail.
TAG_PATTERN = re.compile(r"<\s*(/?)\s*(\w+)")
HTML_TO_TYPE = {
"i": ("em_close", "em_open"),
"em": ("em_close", "em_open"),
"b": ("strong_close", "strong_open"),
"strong": ("strong_close", "strong_open"),
"s": ("s_close", "s_open"),
"del": ("s_close", "s_open"),
"u": ("heading_open", "heading_close"),
"mark": ("heading_open", "heading_close"),
}
def expand_inline_and_html(
tokens: List[markdown_it.token.Token],
) -> Iterator[markdown_it.token.Token]:
for token in tokens:
if token.type == "inline":
if token.children:
yield from expand_inline_and_html(token.children)
elif token.type == "html_inline":
match = TAG_PATTERN.match(token.content)
if match:
close, tag = match.groups()
tys = HTML_TO_TYPE.get(tag.lower())
if tys:
token.type = tys[bool(close)]
token.nesting = -1 if close else 1
yield token
else:
yield token
def parse(message: str) -> Tuple[str, List[MessageEntity]]:
"""
Parses the given markdown message and returns its stripped representation
plus a list of the MessageEntity's that were found.
"""
if not message:
return message, []
entities: List[MessageEntity]
token: markdown_it.token.Token
def push(ty: Any, **extra: object) -> None:
nonlocal message, entities, token
if token.nesting > 0:
entities.append(ty(offset=len(message), length=0, **extra))
else:
for entity in reversed(entities):
if isinstance(entity, ty):
entity.length = len(message) - entity.offset
break
parsed = MARKDOWN.parse(add_surrogate(message.strip()))
message = ""
entities = []
last_map = [0, 0]
for token in expand_inline_and_html(parsed):
if token.map is not None and token.map != last_map:
# paragraphs, quotes fences have a line mapping. Use it to determine how many newlines to insert.
# But don't inssert any (leading) new lines if we're yet to reach the first textual content, or
# if the mappings are the same (e.g. a quote then opens a paragraph but the mapping is equal).
if message:
message += "\n" + "\n" * (token.map[0] - last_map[-1])
last_map = token.map
if token.type in ("blockquote_close", "blockquote_open"):
push(MessageEntityBlockquote)
elif token.type == "code_block":
entities.append(
MessageEntityPre(
offset=len(message), length=len(token.content), language=""
)
)
message += token.content
elif token.type == "code_inline":
entities.append(
MessageEntityCode(offset=len(message), length=len(token.content))
)
message += token.content
elif token.type in ("em_close", "em_open"):
push(MessageEntityItalic)
elif token.type == "fence":
entities.append(
MessageEntityPre(
offset=len(message), length=len(token.content), language=token.info
)
)
message += token.content[:-1] # remove a single trailing newline
elif token.type == "hardbreak":
message += "\n"
elif token.type in ("heading_close", "heading_open"):
push(MessageEntityUnderline)
elif token.type == "hr":
message += "\u2015\n\n"
elif token.type in ("link_close", "link_open"):
if (
token.markup != "autolink"
): # telegram already picks up on these automatically
push(MessageEntityTextUrl, url=token.attrs.get("href"))
elif token.type in ("s_close", "s_open"):
push(MessageEntityStrike)
elif token.type == "softbreak":
message += " "
elif token.type in ("strong_close", "strong_open"):
push(MessageEntityBold)
elif token.type == "text":
message += token.content
return del_surrogate(message), entities
def unparse(text: str, entities: List[MessageEntity]) -> str:
"""
Performs the reverse operation to .parse(), effectively returning
markdown-like syntax given a normal text and its MessageEntity's.
Because there are many possible ways for markdown to produce a certain
output, this function cannot invert .parse() perfectly.
"""
if not text or not entities:
return text
text = add_surrogate(text)
insert_at: List[Tuple[int, str]] = []
for entity in entities:
assert hasattr(entity, "offset")
assert hasattr(entity, "length")
s = entity.offset
e = entity.offset + entity.length
delimiter = DELIMITERS.get(type(entity), None)
if delimiter:
insert_at.append((s, delimiter[0]))
insert_at.append((e, delimiter[1]))
elif isinstance(entity, MessageEntityPre):
insert_at.append((s, f"```{entity.language}\n"))
insert_at.append((e, "```\n"))
elif isinstance(entity, MessageEntityTextUrl):
insert_at.append((s, "["))
insert_at.append((e, f"]({entity.url})"))
elif isinstance(entity, MessageEntityMentionName):
insert_at.append((s, "["))
insert_at.append((e, f"](tg://user?id={entity.user_id})"))
insert_at.sort(key=lambda t: t[0])
while insert_at:
at, what = insert_at.pop()
# If we are in the middle of a surrogate nudge the position by -1.
# Otherwise we would end up with malformed text and fail to encode.
# For example of bad input: "Hi \ud83d\ude1c"
# https://en.wikipedia.org/wiki/UTF-16#U+010000_to_U+10FFFF
while within_surrogate(text, at):
at += 1
text = text[:at] + what + text[at:]
return del_surrogate(text)

View File

@ -0,0 +1,75 @@
import struct
from typing import List, Optional
from ...tl.abcs import MessageEntity
def add_surrogate(text: str) -> str:
return "".join(
# SMP -> Surrogate Pairs (Telegram offsets are calculated with these).
# See https://en.wikipedia.org/wiki/Plane_(Unicode)#Overview for more.
"".join(chr(y) for y in struct.unpack("<HH", x.encode("utf-16le")))
if (0x10000 <= ord(x) <= 0x10FFFF)
else x
for x in text
)
def del_surrogate(text: str) -> str:
return text.encode("utf-16", "surrogatepass").decode("utf-16")
def within_surrogate(text: str, index: int, *, length: Optional[int] = None) -> bool:
"""
`True` if ``index`` is within a surrogate (before and after it, not at!).
"""
if length is None:
length = len(text)
return (
1 < index < len(text) # in bounds
and "\ud800" <= text[index - 1] <= "\udfff" # previous is
and "\ud800" <= text[index] <= "\udfff" # current is
)
def strip_text(text: str, entities: List[MessageEntity]) -> str:
"""
Strips whitespace from the given text modifying the provided entities.
This assumes that there are no overlapping entities, that their length
is greater or equal to one, and that their length is not out of bounds.
"""
if not entities:
return text.strip()
while text and text[-1].isspace():
e = entities[-1]
assert hasattr(e, "offset") and hasattr(e, "length")
if e.offset + e.length == len(text):
if e.length == 1:
del entities[-1]
if not entities:
return text.strip()
else:
e.length -= 1
text = text[:-1]
while text and text[0].isspace():
for i in reversed(range(len(entities))):
e = entities[i]
assert hasattr(e, "offset") and hasattr(e, "length")
if e.offset != 0:
e.offset -= 1
continue
if e.length == 1:
del entities[0]
if not entities:
return text.lstrip()
else:
e.length -= 1
text = text[1:]
return text

View File

@ -0,0 +1,151 @@
from telethon._impl.client.parsers import (
generate_html_message,
generate_markdown_message,
parse_html_message,
parse_markdown_message,
)
from telethon._impl.tl import types
def test_parse_leading_markdown() -> None:
markdown = "**Hello** world!"
text, entities = parse_markdown_message(markdown)
assert text == "Hello world!"
assert entities == [types.MessageEntityBold(offset=0, length=5)]
def test_parse_trailing_markdown() -> None:
markdown = "Hello **world!**"
text, entities = parse_markdown_message(markdown)
assert text == "Hello world!"
assert entities == [types.MessageEntityBold(offset=6, length=6)]
def test_parse_emoji_markdown() -> None:
markdown = "A **little 🦀** here"
text, entities = parse_markdown_message(markdown)
assert text == "A little 🦀 here"
assert entities == [types.MessageEntityBold(offset=2, length=9)]
def test_parse_all_entities_markdown() -> None:
markdown = "Some **bold** (__strong__), *italics* (_cursive_), inline `code`, a\n```rust\npre\n```\nblock, a [link](https://example.com), and [mentions](tg://user?id=12345678)"
text, entities = parse_markdown_message(markdown)
assert (
text
== "Some bold (strong), italics (cursive), inline code, a\npre\nblock, a link, and mentions"
)
assert entities == [
types.MessageEntityBold(offset=5, length=4),
types.MessageEntityBold(offset=11, length=6),
types.MessageEntityItalic(offset=20, length=7),
types.MessageEntityItalic(offset=29, length=7),
types.MessageEntityCode(offset=46, length=4),
types.MessageEntityPre(offset=54, length=4, language="rust"),
types.MessageEntityTextUrl(offset=67, length=4, url="https://example.com"),
types.MessageEntityTextUrl(offset=77, length=8, url="tg://user?id=12345678"),
]
def test_parse_nested_entities_markdown() -> None:
# CommonMark won't allow the following="Some **bold _both** italics_"
markdown = "Some **bold _both_** _italics_"
text, entities = parse_markdown_message(markdown)
assert text == "Some bold both italics"
assert entities == [
types.MessageEntityBold(offset=5, length=9),
types.MessageEntityItalic(offset=10, length=4),
types.MessageEntityItalic(offset=15, length=7),
]
def test_parse_then_unparse_markdown() -> None:
markdown = "Some **bold 🤷🏽‍♀️**, _italics_, inline `🤷🏽‍♀️ code`, a\n\n```rust\npre\n```\nblock, a [link](https://example.com), and [mentions](tg://user?id=12345678)"
text, entities = parse_markdown_message(markdown)
generated = generate_markdown_message(text, entities)
assert generated == markdown
def test_parse_leading_html() -> None:
# Intentionally use different casing to make sure that is handled well
html = "<B>Hello</b> world!"
text, entities = parse_html_message(html)
assert text == "Hello world!"
assert entities == [types.MessageEntityBold(offset=0, length=5)]
def test_parse_trailing_html() -> None:
html = "Hello <strong>world!</strong>"
text, entities = parse_html_message(html)
assert text == "Hello world!"
assert entities == [types.MessageEntityBold(offset=6, length=6)]
def test_parse_emoji_html() -> None:
html = "A <b>little 🦀</b> here"
text, entities = parse_html_message(html)
assert text == "A little 🦀 here"
assert entities == [types.MessageEntityBold(offset=2, length=9)]
def test_parse_all_entities_html() -> None:
html = 'Some <b>bold</b> (<strong>strong</strong>), <i>italics</i> (<em>cursive</em>), inline <code>code</code>, a <pre>pre</pre> block, a <a href="https://example.com">link</a>, <details>spoilers</details> and <a href="tg://user?id=12345678">mentions</a>'
text, entities = parse_html_message(html)
assert (
text
== "Some bold (strong), italics (cursive), inline code, a pre block, a link, spoilers and mentions"
)
assert entities == [
types.MessageEntityBold(offset=5, length=4),
types.MessageEntityBold(offset=11, length=6),
types.MessageEntityItalic(offset=20, length=7),
types.MessageEntityItalic(offset=29, length=7),
types.MessageEntityCode(offset=46, length=4),
types.MessageEntityPre(offset=54, length=3, language=""),
types.MessageEntityTextUrl(offset=67, length=4, url="https://example.com"),
types.MessageEntitySpoiler(offset=73, length=8),
types.MessageEntityTextUrl(offset=86, length=8, url="tg://user?id=12345678"),
]
def test_parse_pre_with_lang_html() -> None:
html = 'Some <pre>pre</pre>, <code>normal</code> and <pre><code class="language-rust">rusty</code></pre> code'
text, entities = parse_html_message(html)
assert text == "Some pre, normal and rusty code"
assert entities == [
types.MessageEntityPre(offset=5, length=3, language=""),
types.MessageEntityCode(offset=10, length=6),
types.MessageEntityPre(offset=21, length=5, language="rust"),
]
def test_parse_empty_pre_and_lang_html() -> None:
html = 'Some empty <pre></pre> and <code class="language-rust">code</code>'
text, entities = parse_html_message(html)
assert text == "Some empty and code"
assert entities == [types.MessageEntityCode(offset=16, length=4)]
def test_parse_link_no_href_html() -> None:
html = "Some <a>empty link</a>, it does nothing"
text, entities = parse_html_message(html)
assert text == "Some empty link, it does nothing"
assert entities == []
def test_parse_nested_entities_html() -> None:
html = "Some <b>bold <i>both</b> italics</i>"
text, entities = parse_html_message(html)
assert text == "Some bold both italics"
assert entities == [
types.MessageEntityBold(offset=5, length=9),
types.MessageEntityItalic(offset=10, length=12),
]
def test_parse_then_unparse_html() -> None:
html = 'Some <strong>bold</strong>, <em>italics</em> inline <code>code</code>, a <pre>pre</pre> block <pre><code class="language-rs">use rust;</code></pre>, a <a href="https://example.com">link</a>, <details>spoilers</details> and <a href="tg://user?id=12345678">mentions</a>'
text, entities = parse_html_message(html)
generated = generate_html_message(text, entities)
assert generated == html