mirror of
				https://github.com/LonamiWebs/Telethon.git
				synced 2025-11-04 18:07:35 +03:00 
			
		
		
		
	Furthermore utils needs to access the message by reference through types.Message because it is patched and replaced.
		
			
				
	
	
		
			165 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			165 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import abc
 | 
						|
import struct
 | 
						|
from datetime import datetime, date, timedelta
 | 
						|
 | 
						|
 | 
						|
class TLObject:
 | 
						|
    CONSTRUCTOR_ID = None
 | 
						|
    SUBCLASS_OF_ID = None
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def pretty_format(obj, indent=None):
 | 
						|
        """
 | 
						|
        Pretty formats the given object as a string which is returned.
 | 
						|
        If indent is None, a single line will be returned.
 | 
						|
        """
 | 
						|
        if indent is None:
 | 
						|
            if isinstance(obj, TLObject):
 | 
						|
                obj = obj.to_dict()
 | 
						|
 | 
						|
            if isinstance(obj, dict):
 | 
						|
                return '{}({})'.format(obj.get('_', 'dict'), ', '.join(
 | 
						|
                    '{}={}'.format(k, TLObject.pretty_format(v))
 | 
						|
                    for k, v in obj.items() if k != '_'
 | 
						|
                ))
 | 
						|
            elif isinstance(obj, str) or isinstance(obj, bytes):
 | 
						|
                return repr(obj)
 | 
						|
            elif hasattr(obj, '__iter__'):
 | 
						|
                return '[{}]'.format(
 | 
						|
                    ', '.join(TLObject.pretty_format(x) for x in obj)
 | 
						|
                )
 | 
						|
            else:
 | 
						|
                return repr(obj)
 | 
						|
        else:
 | 
						|
            result = []
 | 
						|
            if isinstance(obj, TLObject):
 | 
						|
                obj = obj.to_dict()
 | 
						|
 | 
						|
            if isinstance(obj, dict):
 | 
						|
                result.append(obj.get('_', 'dict'))
 | 
						|
                result.append('(')
 | 
						|
                if obj:
 | 
						|
                    result.append('\n')
 | 
						|
                    indent += 1
 | 
						|
                    for k, v in obj.items():
 | 
						|
                        if k == '_':
 | 
						|
                            continue
 | 
						|
                        result.append('\t' * indent)
 | 
						|
                        result.append(k)
 | 
						|
                        result.append('=')
 | 
						|
                        result.append(TLObject.pretty_format(v, indent))
 | 
						|
                        result.append(',\n')
 | 
						|
                    result.pop()  # last ',\n'
 | 
						|
                    indent -= 1
 | 
						|
                    result.append('\n')
 | 
						|
                    result.append('\t' * indent)
 | 
						|
                result.append(')')
 | 
						|
 | 
						|
            elif isinstance(obj, str) or isinstance(obj, bytes):
 | 
						|
                result.append(repr(obj))
 | 
						|
 | 
						|
            elif hasattr(obj, '__iter__'):
 | 
						|
                result.append('[\n')
 | 
						|
                indent += 1
 | 
						|
                for x in obj:
 | 
						|
                    result.append('\t' * indent)
 | 
						|
                    result.append(TLObject.pretty_format(x, indent))
 | 
						|
                    result.append(',\n')
 | 
						|
                indent -= 1
 | 
						|
                result.append('\t' * indent)
 | 
						|
                result.append(']')
 | 
						|
 | 
						|
            else:
 | 
						|
                result.append(repr(obj))
 | 
						|
 | 
						|
            return ''.join(result)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def serialize_bytes(data):
 | 
						|
        """Write bytes by using Telegram guidelines"""
 | 
						|
        if not isinstance(data, bytes):
 | 
						|
            if isinstance(data, str):
 | 
						|
                data = data.encode('utf-8')
 | 
						|
            else:
 | 
						|
                raise TypeError(
 | 
						|
                    'bytes or str expected, not {}'.format(type(data)))
 | 
						|
 | 
						|
        r = []
 | 
						|
        if len(data) < 254:
 | 
						|
            padding = (len(data) + 1) % 4
 | 
						|
            if padding != 0:
 | 
						|
                padding = 4 - padding
 | 
						|
 | 
						|
            r.append(bytes([len(data)]))
 | 
						|
            r.append(data)
 | 
						|
 | 
						|
        else:
 | 
						|
            padding = len(data) % 4
 | 
						|
            if padding != 0:
 | 
						|
                padding = 4 - padding
 | 
						|
 | 
						|
            r.append(bytes([
 | 
						|
                254,
 | 
						|
                len(data) % 256,
 | 
						|
                (len(data) >> 8) % 256,
 | 
						|
                (len(data) >> 16) % 256
 | 
						|
            ]))
 | 
						|
            r.append(data)
 | 
						|
 | 
						|
        r.append(bytes(padding))
 | 
						|
        return b''.join(r)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def serialize_datetime(dt):
 | 
						|
        if not dt:
 | 
						|
            return b'\0\0\0\0'
 | 
						|
 | 
						|
        if isinstance(dt, datetime):
 | 
						|
            dt = int(dt.timestamp())
 | 
						|
        elif isinstance(dt, date):
 | 
						|
            dt = int(datetime(dt.year, dt.month, dt.day).timestamp())
 | 
						|
        elif isinstance(dt, float):
 | 
						|
            dt = int(dt)
 | 
						|
        elif isinstance(dt, timedelta):
 | 
						|
            # Timezones are tricky. datetime.now() + ... timestamp() works
 | 
						|
            dt = int((datetime.now() + dt).timestamp())
 | 
						|
 | 
						|
        if isinstance(dt, int):
 | 
						|
            return struct.pack('<I', dt)
 | 
						|
 | 
						|
        raise TypeError('Cannot interpret "{}" as a date.'.format(dt))
 | 
						|
 | 
						|
    def __eq__(self, o):
 | 
						|
        return isinstance(o, type(self)) and self.to_dict() == o.to_dict()
 | 
						|
 | 
						|
    def __ne__(self, o):
 | 
						|
        return not isinstance(o, type(self)) or self.to_dict() != o.to_dict()
 | 
						|
 | 
						|
    def __str__(self):
 | 
						|
        return TLObject.pretty_format(self)
 | 
						|
 | 
						|
    def stringify(self):
 | 
						|
        return TLObject.pretty_format(self, indent=0)
 | 
						|
 | 
						|
    def to_dict(self):
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    def __bytes__(self):
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def from_reader(cls, reader):
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
 | 
						|
class TLRequest(TLObject):
 | 
						|
    """
 | 
						|
    Represents a content-related `TLObject` (a request that can be sent).
 | 
						|
    """
 | 
						|
    @staticmethod
 | 
						|
    def read_result(reader):
 | 
						|
        return reader.tgread_object()
 | 
						|
 | 
						|
    async def resolve(self, client, utils):
 | 
						|
        pass
 |