mirror of
				https://github.com/LonamiWebs/Telethon.git
				synced 2025-11-04 09:57:29 +03:00 
			
		
		
		
	Update markdown parser to support nested entities
This commit is contained in:
		
							parent
							
								
									8b28f4ffbf
								
							
						
					
					
						commit
						4e80e21ba1
					
				| 
						 | 
					@ -4,6 +4,7 @@ for use within the library, which attempts to handle emojies correctly,
 | 
				
			||||||
since they seem to count as two characters and it's a bit strange.
 | 
					since they seem to count as two characters and it's a bit strange.
 | 
				
			||||||
"""
 | 
					"""
 | 
				
			||||||
import re
 | 
					import re
 | 
				
			||||||
 | 
					import warnings
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from ..helpers import add_surrogate, del_surrogate, strip_text
 | 
					from ..helpers import add_surrogate, del_surrogate, strip_text
 | 
				
			||||||
from ..tl import TLObject
 | 
					from ..tl import TLObject
 | 
				
			||||||
| 
						 | 
					@ -25,6 +26,10 @@ DEFAULT_URL_RE = re.compile(r'\[([\S\s]+?)\]\((.+?)\)')
 | 
				
			||||||
DEFAULT_URL_FORMAT = '[{0}]({1})'
 | 
					DEFAULT_URL_FORMAT = '[{0}]({1})'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def overlap(a, b, x, y):
 | 
				
			||||||
 | 
					    return max(a, x) < min(b, y)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def parse(message, delimiters=None, url_re=None):
 | 
					def parse(message, delimiters=None, url_re=None):
 | 
				
			||||||
    """
 | 
					    """
 | 
				
			||||||
    Parses the given markdown message and returns its stripped representation
 | 
					    Parses the given markdown message and returns its stripped representation
 | 
				
			||||||
| 
						 | 
					@ -48,85 +53,80 @@ def parse(message, delimiters=None, url_re=None):
 | 
				
			||||||
            return message, []
 | 
					            return message, []
 | 
				
			||||||
        delimiters = DEFAULT_DELIMITERS
 | 
					        delimiters = DEFAULT_DELIMITERS
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Build a regex to efficiently test all delimiters at once
 | 
				
			||||||
 | 
					    delim_re = re.compile('|'.join('({})'.format(re.escape(k)) for k in delimiters))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Cannot use a for loop because we need to skip some indices
 | 
					    # Cannot use a for loop because we need to skip some indices
 | 
				
			||||||
    i = 0
 | 
					    i = 0
 | 
				
			||||||
    result = []
 | 
					    result = []
 | 
				
			||||||
    current = None
 | 
					 | 
				
			||||||
    end_delimiter = None
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Work on byte level with the utf-16le encoding to get the offsets right.
 | 
					    # Work on byte level with the utf-16le encoding to get the offsets right.
 | 
				
			||||||
    # The offset will just be half the index we're at.
 | 
					    # The offset will just be half the index we're at.
 | 
				
			||||||
    message = add_surrogate(message)
 | 
					    message = add_surrogate(message)
 | 
				
			||||||
    while i < len(message):
 | 
					    while i < len(message):
 | 
				
			||||||
        if url_re and current is None:
 | 
					        m = delim_re.match(message, pos=i)
 | 
				
			||||||
            # If we're not inside a previous match since Telegram doesn't allow
 | 
					
 | 
				
			||||||
            # nested message entities, try matching the URL from the i'th pos.
 | 
					        # Did we find some delimiter here at `i`?
 | 
				
			||||||
            url_match = url_re.match(message, pos=i)
 | 
					        if m:
 | 
				
			||||||
            if url_match:
 | 
					            delim = next(filter(None, m.groups()))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # +1 to avoid matching right after (e.g. "****")
 | 
				
			||||||
 | 
					            end = message.find(delim, i + len(delim) + 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # Did we find the earliest closing tag?
 | 
				
			||||||
 | 
					            if end != -1:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # Remove the delimiter from the string
 | 
				
			||||||
 | 
					                message = ''.join((
 | 
				
			||||||
 | 
					                        message[:i],
 | 
				
			||||||
 | 
					                        message[i + len(delim):end],
 | 
				
			||||||
 | 
					                        message[end + len(delim):]
 | 
				
			||||||
 | 
					                ))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # Check other affected entities
 | 
				
			||||||
 | 
					                for ent in result:
 | 
				
			||||||
 | 
					                    # If the end is after our start, it is affected
 | 
				
			||||||
 | 
					                    if ent.offset + ent.length > i:
 | 
				
			||||||
 | 
					                        ent.length -= len(delim)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # Append the found entity
 | 
				
			||||||
 | 
					                ent = delimiters[delim]
 | 
				
			||||||
 | 
					                if ent == MessageEntityPre:
 | 
				
			||||||
 | 
					                    result.append(ent(i, end - i - len(delim), ''))  # has 'lang'
 | 
				
			||||||
 | 
					                else:
 | 
				
			||||||
 | 
					                    result.append(ent(i, end - i - len(delim)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # No nested entities inside code blocks
 | 
				
			||||||
 | 
					                if ent in (MessageEntityCode, MessageEntityPre):
 | 
				
			||||||
 | 
					                    i = end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        elif url_re:
 | 
				
			||||||
 | 
					            m = url_re.match(message, pos=i)
 | 
				
			||||||
 | 
					            if m:
 | 
				
			||||||
                # Replace the whole match with only the inline URL text.
 | 
					                # Replace the whole match with only the inline URL text.
 | 
				
			||||||
                message = ''.join((
 | 
					                message = ''.join((
 | 
				
			||||||
                    message[:url_match.start()],
 | 
					                    message[:m.start()],
 | 
				
			||||||
                    url_match.group(1),
 | 
					                    m.group(1),
 | 
				
			||||||
                    message[url_match.end():]
 | 
					                    message[m.end():]
 | 
				
			||||||
                ))
 | 
					                ))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                delim_size = m.end() - m.start() - len(m.group())
 | 
				
			||||||
 | 
					                for ent in result:
 | 
				
			||||||
 | 
					                    # If the end is after our start, it is affected
 | 
				
			||||||
 | 
					                    if ent.offset + ent.length > m.start():
 | 
				
			||||||
 | 
					                        ent.length -= delim_size
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                result.append(MessageEntityTextUrl(
 | 
					                result.append(MessageEntityTextUrl(
 | 
				
			||||||
                    offset=url_match.start(), length=len(url_match.group(1)),
 | 
					                    offset=m.start(), length=len(m.group(1)),
 | 
				
			||||||
                    url=del_surrogate(url_match.group(2))
 | 
					                    url=del_surrogate(m.group(2))
 | 
				
			||||||
                ))
 | 
					                ))
 | 
				
			||||||
                i += len(url_match.group(1))
 | 
					                i += len(m.group(1))
 | 
				
			||||||
                # Next loop iteration, don't check delimiters, since
 | 
					 | 
				
			||||||
                # a new inline URL might be right after this one.
 | 
					 | 
				
			||||||
                continue
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if end_delimiter is None:
 | 
					 | 
				
			||||||
            # We're not expecting any delimiter, so check them all
 | 
					 | 
				
			||||||
            for d, m in delimiters.items():
 | 
					 | 
				
			||||||
                # Slice the string at the current i'th position to see if
 | 
					 | 
				
			||||||
                # it matches the current delimiter d, otherwise skip it.
 | 
					 | 
				
			||||||
                if message[i:i + len(d)] != d:
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                if message[i + len(d):i + 2 * len(d)] == d:
 | 
					 | 
				
			||||||
                    # The same delimiter can't be right afterwards, if
 | 
					 | 
				
			||||||
                    # this were the case we would match empty strings
 | 
					 | 
				
			||||||
                    # like `` which we don't want to.
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                # Get rid of the delimiter by slicing it away
 | 
					 | 
				
			||||||
                message = message[:i] + message[i + len(d):]
 | 
					 | 
				
			||||||
                if m == MessageEntityPre:
 | 
					 | 
				
			||||||
                    # Special case, also has 'lang'
 | 
					 | 
				
			||||||
                    current = m(i, None, '')
 | 
					 | 
				
			||||||
                else:
 | 
					 | 
				
			||||||
                    current = m(i, None)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                end_delimiter = d  # We expect the same delimiter.
 | 
					 | 
				
			||||||
                break
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        elif message[i:i + len(end_delimiter)] == end_delimiter:
 | 
					 | 
				
			||||||
            message = message[:i] + message[i + len(end_delimiter):]
 | 
					 | 
				
			||||||
            current.length = i - current.offset
 | 
					 | 
				
			||||||
            result.append(current)
 | 
					 | 
				
			||||||
            current, end_delimiter = None, None
 | 
					 | 
				
			||||||
            # Don't increment i here as we matched a delimiter,
 | 
					 | 
				
			||||||
            # and there may be a new one right after. This is
 | 
					 | 
				
			||||||
            # different than when encountering the first delimiter,
 | 
					 | 
				
			||||||
            # as we already know there won't be the same right after.
 | 
					 | 
				
			||||||
            continue
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # Next iteration
 | 
					 | 
				
			||||||
        i += 1
 | 
					        i += 1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # We may have found some a delimiter but not its ending pair.
 | 
					 | 
				
			||||||
    # If this is the case, we want to insert the delimiter character back.
 | 
					 | 
				
			||||||
    if current is not None:
 | 
					 | 
				
			||||||
        message = (
 | 
					 | 
				
			||||||
            message[:current.offset]
 | 
					 | 
				
			||||||
            + end_delimiter
 | 
					 | 
				
			||||||
            + message[current.offset:]
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    message = strip_text(message, result)
 | 
					    message = strip_text(message, result)
 | 
				
			||||||
    return del_surrogate(message), result
 | 
					    return del_surrogate(message), result
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -148,55 +148,35 @@ def unparse(text, entities, delimiters=None, url_fmt=None):
 | 
				
			||||||
            return text
 | 
					            return text
 | 
				
			||||||
        delimiters = DEFAULT_DELIMITERS
 | 
					        delimiters = DEFAULT_DELIMITERS
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if url_fmt is None:
 | 
					    if url_fmt is not None:
 | 
				
			||||||
        url_fmt = DEFAULT_URL_FORMAT
 | 
					        warnings.warn('url_fmt is deprecated')  # since it complicates everything *a lot*
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if isinstance(entities, TLObject):
 | 
					    if isinstance(entities, TLObject):
 | 
				
			||||||
        entities = (entities,)
 | 
					        entities = (entities,)
 | 
				
			||||||
    else:
 | 
					 | 
				
			||||||
        entities = tuple(sorted(entities, key=lambda e: e.offset, reverse=True))
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    text = add_surrogate(text)
 | 
					    text = add_surrogate(text)
 | 
				
			||||||
    delimiters = {v: k for k, v in delimiters.items()}
 | 
					    delimiters = {v: k for k, v in delimiters.items()}
 | 
				
			||||||
 | 
					    insert_at = []
 | 
				
			||||||
    for entity in entities:
 | 
					    for entity in entities:
 | 
				
			||||||
        s = entity.offset
 | 
					        s = entity.offset
 | 
				
			||||||
        e = entity.offset + entity.length
 | 
					        e = entity.offset + entity.length
 | 
				
			||||||
        delimiter = delimiters.get(type(entity), None)
 | 
					        delimiter = delimiters.get(type(entity), None)
 | 
				
			||||||
        if delimiter:
 | 
					        if delimiter:
 | 
				
			||||||
            text = text[:s] + delimiter + text[s:e] + delimiter + text[e:]
 | 
					            insert_at.append((s, delimiter))
 | 
				
			||||||
        elif url_fmt:
 | 
					            insert_at.append((e, delimiter))
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
            url = None
 | 
					            url = None
 | 
				
			||||||
            if isinstance(entity, MessageEntityTextUrl):
 | 
					            if isinstance(entity, MessageEntityTextUrl):
 | 
				
			||||||
                url = entity.url
 | 
					                url = entity.url
 | 
				
			||||||
            elif isinstance(entity, MessageEntityMentionName):
 | 
					            elif isinstance(entity, MessageEntityMentionName):
 | 
				
			||||||
                url = 'tg://user?id={}'.format(entity.user_id)
 | 
					                url = 'tg://user?id={}'.format(entity.user_id)
 | 
				
			||||||
            if url:
 | 
					            if url:
 | 
				
			||||||
                # It's possible that entities are malformed and end up in the
 | 
					                insert_at.append((s, '['))
 | 
				
			||||||
                # middle of some character, like emoji, by using malformed
 | 
					                insert_at.append((e, ']({})'.format(url)))
 | 
				
			||||||
                # clients or bots. Try decoding the current one to check if
 | 
					 | 
				
			||||||
                # this is the case, and if it is, advance the entity.
 | 
					 | 
				
			||||||
                while e <= len(text):
 | 
					 | 
				
			||||||
                    try:
 | 
					 | 
				
			||||||
                        del_surrogate(text[s:e])
 | 
					 | 
				
			||||||
                        break
 | 
					 | 
				
			||||||
                    except UnicodeDecodeError:
 | 
					 | 
				
			||||||
                        e += 1
 | 
					 | 
				
			||||||
                else:
 | 
					 | 
				
			||||||
                    # Out of bounds, no luck going forward
 | 
					 | 
				
			||||||
                    while e > s:
 | 
					 | 
				
			||||||
                        try:
 | 
					 | 
				
			||||||
                            del_surrogate(text[s:e])
 | 
					 | 
				
			||||||
                            break
 | 
					 | 
				
			||||||
                        except UnicodeDecodeError:
 | 
					 | 
				
			||||||
                            e -= 1
 | 
					 | 
				
			||||||
                    else:
 | 
					 | 
				
			||||||
                        # No luck going backwards either, ignore entity
 | 
					 | 
				
			||||||
                        continue
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                text = (
 | 
					    insert_at.sort(key=lambda t: t[0])
 | 
				
			||||||
                    text[:s] +
 | 
					    while insert_at:
 | 
				
			||||||
                    add_surrogate(url_fmt.format(text[s:e], url)) +
 | 
					        at, what = insert_at.pop()
 | 
				
			||||||
                    text[e:]
 | 
					        text = text[:at] + what + text[at:]
 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    return del_surrogate(text)
 | 
					    return del_surrogate(text)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue
	
	Block a user