mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-07-10 16:12:22 +03:00
Pre-pack outgoing TLMessage
This has several benefits: - The message can be resent without re-calling bytes(), which for some requests may be expensive. - Converting requests to bytes early lets us detect errors early, such as OverflowError on bad requests. - Containers can't exceed 1044456 bytes so knowing their length is important. This can now be done in O(1). But also several drawbacks: - If the object is modified the bytes won't reflect this. This isn't an issue because it's only done for in msgs. - Incoming messages can no longer be reconverted into bytes but this was never needed anyway.
This commit is contained in:
parent
b237947af1
commit
33ce702ab9
|
@ -507,7 +507,6 @@ class MTProtoSender:
|
||||||
rpc_result.req_msg_id)
|
rpc_result.req_msg_id)
|
||||||
|
|
||||||
if rpc_result.error:
|
if rpc_result.error:
|
||||||
# TODO Report errors if possible/enabled
|
|
||||||
error = rpc_message_to_error(rpc_result.error)
|
error = rpc_message_to_error(rpc_result.error)
|
||||||
self._send_queue.put_nowait(self.state.create_message(
|
self._send_queue.put_nowait(self.state.create_message(
|
||||||
MsgsAck([message.msg_id])
|
MsgsAck([message.msg_id])
|
||||||
|
@ -517,10 +516,13 @@ class MTProtoSender:
|
||||||
message.future.set_exception(error)
|
message.future.set_exception(error)
|
||||||
return
|
return
|
||||||
elif message:
|
elif message:
|
||||||
|
# TODO Would be nice to avoid accessing a per-obj read_result
|
||||||
|
# Instead have a variable that indicated how the result should
|
||||||
|
# be read (an enum) and dispatch to read the result, mostly
|
||||||
|
# always it's just a normal TLObject.
|
||||||
with BinaryReader(rpc_result.body) as reader:
|
with BinaryReader(rpc_result.body) as reader:
|
||||||
result = message.obj.read_result(reader)
|
result = message.obj.read_result(reader)
|
||||||
|
|
||||||
# TODO Process entities
|
|
||||||
if not message.future.cancelled():
|
if not message.future.cancelled():
|
||||||
message.future.set_result(result)
|
message.future.set_result(result)
|
||||||
return
|
return
|
||||||
|
|
|
@ -46,7 +46,8 @@ class MTProtoState:
|
||||||
msg_id=self._get_new_msg_id(),
|
msg_id=self._get_new_msg_id(),
|
||||||
seq_no=self._get_seq_no(isinstance(obj, TLRequest)),
|
seq_no=self._get_seq_no(isinstance(obj, TLRequest)),
|
||||||
obj=obj,
|
obj=obj,
|
||||||
after_id=after.msg_id if after else None
|
after_id=after.msg_id if after else None,
|
||||||
|
out=True # Pre-convert the request into bytes
|
||||||
)
|
)
|
||||||
|
|
||||||
def update_message_id(self, message):
|
def update_message_id(self, message):
|
||||||
|
|
|
@ -21,9 +21,7 @@ class TLMessage(TLObject):
|
||||||
sent `TLMessage`, and this result can be represented as a `Future`
|
sent `TLMessage`, and this result can be represented as a `Future`
|
||||||
that will eventually be set with either a result, error or cancelled.
|
that will eventually be set with either a result, error or cancelled.
|
||||||
"""
|
"""
|
||||||
def __init__(self, msg_id, seq_no, obj=None, after_id=0):
|
def __init__(self, msg_id, seq_no, obj, out=False, after_id=0):
|
||||||
self.msg_id = msg_id
|
|
||||||
self.seq_no = seq_no
|
|
||||||
self.obj = obj
|
self.obj = obj
|
||||||
self.container_msg_id = None
|
self.container_msg_id = None
|
||||||
self.future = asyncio.Future()
|
self.future = asyncio.Future()
|
||||||
|
@ -31,23 +29,56 @@ class TLMessage(TLObject):
|
||||||
# After which message ID this one should run. We do this so
|
# After which message ID this one should run. We do this so
|
||||||
# InvokeAfterMsgRequest is transparent to the user and we can
|
# InvokeAfterMsgRequest is transparent to the user and we can
|
||||||
# easily invoke after while confirming the original request.
|
# easily invoke after while confirming the original request.
|
||||||
|
# TODO Currently we don't update this if another message ID changes
|
||||||
self.after_id = after_id
|
self.after_id = after_id
|
||||||
|
|
||||||
|
# There are two use-cases for the TLMessage, outgoing and incoming.
|
||||||
|
# Outgoing messages are meant to be serialized and sent across the
|
||||||
|
# network so it makes sense to pack them as early as possible and
|
||||||
|
# avoid this computation if it needs to be resent, and also shows
|
||||||
|
# serializing-errors as early as possible (foreground task).
|
||||||
|
#
|
||||||
|
# We assume obj won't change so caching the bytes is safe to do.
|
||||||
|
# Caching bytes lets us get the size in a fast way, necessary for
|
||||||
|
# knowing whether a container can be sent (<1MB) or not (too big).
|
||||||
|
#
|
||||||
|
# Incoming messages don't really need this body, but we save the
|
||||||
|
# msg_id and seq_no inside the body for consistency and raise if
|
||||||
|
# one tries to bytes()-ify the entire message (len == 12).
|
||||||
|
if not out:
|
||||||
|
self._body = struct.pack('<qi', msg_id, seq_no)
|
||||||
|
else:
|
||||||
|
if self.after_id is None:
|
||||||
|
body = GzipPacked.gzip_if_smaller(self.obj)
|
||||||
|
else:
|
||||||
|
body = GzipPacked.gzip_if_smaller(
|
||||||
|
InvokeAfterMsgRequest(self.after_id, self.obj))
|
||||||
|
|
||||||
|
self._body = struct.pack('<qii', msg_id, seq_no, len(body)) + body
|
||||||
|
|
||||||
def to_dict(self):
|
def to_dict(self):
|
||||||
return {
|
return {
|
||||||
'_': 'TLMessage',
|
'_': 'TLMessage',
|
||||||
'msg_id': self.msg_id,
|
'msg_id': self.msg_id,
|
||||||
'seq_no': self.seq_no,
|
'seq_no': self.seq_no,
|
||||||
'obj': self.obj,
|
'obj': self.obj,
|
||||||
'container_msg_id': self.container_msg_id,
|
'container_msg_id': self.container_msg_id
|
||||||
'after_id': self.after_id
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def __bytes__(self):
|
@property
|
||||||
if self.after_id is None:
|
def msg_id(self):
|
||||||
body = GzipPacked.gzip_if_smaller(self.obj)
|
return struct.unpack('<q', self._body[:8])[0]
|
||||||
else:
|
|
||||||
body = GzipPacked.gzip_if_smaller(
|
|
||||||
InvokeAfterMsgRequest(self.after_id, self.obj))
|
|
||||||
|
|
||||||
return struct.pack('<qii', self.msg_id, self.seq_no, len(body)) + body
|
@msg_id.setter
|
||||||
|
def msg_id(self, value):
|
||||||
|
self._body = struct.pack('<q', value) + self._body[8:]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def seq_no(self):
|
||||||
|
return struct.unpack('<i', self._body[8:12])[0]
|
||||||
|
|
||||||
|
def __bytes__(self):
|
||||||
|
if len(self._body) == 12: # msg_id, seqno
|
||||||
|
raise TypeError('Incoming messages should not be bytes()-ed')
|
||||||
|
|
||||||
|
return self._body
|
||||||
|
|
Loading…
Reference in New Issue
Block a user