mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2024-11-22 17:36:34 +03:00
7e78b1b6dc
Uploading and sending media are different things. Once you have uploaded a media file, you can send it to many users without uploading it again, since you have a handle to the uploaded file. Other fixes include not showing additional data on error messages and not generating correct code for sending bytes
161 lines
5.1 KiB
Python
Executable File
161 lines
5.1 KiB
Python
Executable File
from datetime import datetime
|
|
from io import BytesIO, BufferedReader
|
|
from tl.all_tlobjects import tlobjects
|
|
from struct import unpack
|
|
from errors import *
|
|
import inspect
|
|
import os
|
|
|
|
|
|
class BinaryReader:
|
|
"""
|
|
Small utility class to read binary data.
|
|
Also creates a "Memory Stream" if necessary
|
|
"""
|
|
def __init__(self, data=None, stream=None):
|
|
if data:
|
|
self.stream = BytesIO(data)
|
|
elif stream:
|
|
self.stream = stream
|
|
else:
|
|
raise InvalidParameterError("Either bytes or a stream must be provided")
|
|
|
|
self.reader = BufferedReader(self.stream)
|
|
|
|
# region Reading
|
|
|
|
# "All numbers are written as little endian." |> Source: https://core.telegram.org/mtproto
|
|
def read_byte(self):
|
|
"""Reads a single byte value"""
|
|
return self.read(1)[0]
|
|
|
|
def read_int(self, signed=True):
|
|
"""Reads an integer (4 bytes) value"""
|
|
return int.from_bytes(self.read(4), byteorder='little', signed=signed)
|
|
|
|
def read_long(self, signed=True):
|
|
"""Reads a long integer (8 bytes) value"""
|
|
return int.from_bytes(self.read(8), byteorder='little', signed=signed)
|
|
|
|
def read_float(self):
|
|
"""Reads a real floating point (4 bytes) value"""
|
|
return unpack('<f', self.read(4))[0]
|
|
|
|
def read_double(self):
|
|
"""Reads a real floating point (8 bytes) value"""
|
|
return unpack('<d', self.read(8))[0]
|
|
|
|
def read_large_int(self, bits, signed=True):
|
|
"""Reads a n-bits long integer value"""
|
|
return int.from_bytes(self.read(bits // 8), byteorder='little', signed=signed)
|
|
|
|
def read(self, length):
|
|
"""Read the given amount of bytes"""
|
|
result = self.reader.read(length)
|
|
if len(result) != length:
|
|
raise BufferError('Trying to read outside the data bounds (no more data left to read)')
|
|
|
|
return result
|
|
|
|
def get_bytes(self):
|
|
"""Gets the byte array representing the current buffer as a whole"""
|
|
return self.stream.getvalue()
|
|
|
|
# endregion
|
|
|
|
# region Telegram custom reading
|
|
|
|
def tgread_bytes(self):
|
|
"""Reads a Telegram-encoded byte array, without the need of specifying its length"""
|
|
first_byte = self.read_byte()
|
|
if first_byte == 254:
|
|
length = self.read_byte() | (self.read_byte() << 8) | (self.read_byte() << 16)
|
|
padding = length % 4
|
|
else:
|
|
length = first_byte
|
|
padding = (length + 1) % 4
|
|
|
|
data = self.read(length)
|
|
if padding > 0:
|
|
padding = 4 - padding
|
|
self.read(padding)
|
|
|
|
return data
|
|
|
|
def tgread_string(self):
|
|
"""Reads a Telegram-encoded string"""
|
|
return str(self.tgread_bytes(), encoding='utf-8')
|
|
|
|
def tgread_bool(self):
|
|
"""Reads a Telegram boolean value"""
|
|
value = self.read_int(signed=False)
|
|
if value == 0x997275b5: # boolTrue
|
|
return True
|
|
elif value == 0xbc799737: # boolFalse
|
|
return False
|
|
else:
|
|
raise ValueError('Invalid boolean code {}'.format(hex(value)))
|
|
|
|
def tgread_date(self):
|
|
"""Reads and converts Unix time (used by Telegram) into a Python datetime object"""
|
|
value = self.read_int()
|
|
return None if value == 0 else datetime.fromtimestamp(value)
|
|
|
|
def tgread_object(self):
|
|
"""Reads a Telegram object"""
|
|
constructor_id = self.read_int(signed=False)
|
|
clazz = tlobjects.get(constructor_id, None)
|
|
if clazz is None:
|
|
# The class was None, but there's still a
|
|
# chance of it being a manually parsed value like bool!
|
|
value = constructor_id
|
|
if value == 0x997275b5: # boolTrue
|
|
return True
|
|
elif value == 0xbc799737: # boolFalse
|
|
return False
|
|
|
|
# If there was still no luck, give up
|
|
raise TypeNotFoundError(constructor_id)
|
|
|
|
# Now we need to determine the number of parameters of the class, so we can
|
|
# instantiate it with all of them set to None, and still, no need to write
|
|
# the default =None in all the classes, thus forcing the user to provide a real value
|
|
sig = inspect.signature(clazz.__init__)
|
|
params = [None] * (len(sig.parameters) - 1) # Subtract 1 (self)
|
|
result = clazz(*params) # https://docs.python.org/3/tutorial/controlflow.html#unpacking-argument-lists
|
|
|
|
# Finally, read the object and return the result
|
|
result.on_response(self)
|
|
return result
|
|
|
|
# endregion
|
|
|
|
def close(self):
|
|
self.reader.close()
|
|
|
|
# region Position related
|
|
|
|
def tell_position(self):
|
|
"""Tells the current position on the stream"""
|
|
return self.reader.tell()
|
|
|
|
def set_position(self, position):
|
|
"""Sets the current position on the stream"""
|
|
self.reader.seek(position)
|
|
|
|
def seek(self, offset):
|
|
"""Seeks the stream position given an offset from the current position. May be negative"""
|
|
self.reader.seek(offset, os.SEEK_CUR)
|
|
|
|
# endregion
|
|
|
|
# region with block
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
|
|
# endregion
|