Rewrite container packing to support arbitrary sizes

This commit is contained in:
Lonami Exo 2018-10-05 12:26:59 +02:00
parent 7e7bbcf4c0
commit ef60ade647
2 changed files with 76 additions and 45 deletions

View File

@ -4,6 +4,7 @@ import struct
from .mtprotostate import MTProtoState
from ..tl import TLRequest
from ..tl.core.tlmessage import TLMessage
from ..tl.core.messagecontainer import MessageContainer
__log__ = logging.getLogger(__name__)
@ -48,8 +49,8 @@ class MTProtoLayer:
Nested lists imply an order is required for the messages in them.
Message containers will be used if there is more than one item.
"""
data = self._pack_state_list(state_list)
await self._connection.send(self._state.encrypt_message_data(data))
for data in filter(None, self._pack_state_list(state_list)):
await self._connection.send(self._state.encrypt_message_data(data))
async def recv(self):
"""
@ -67,9 +68,6 @@ class MTProtoLayer:
nested inside another message and message container) and
returns the serialized message data.
"""
# TODO This method could be an iterator yielding messages while small
# respecting the ``MessageContainer.MAXIMUM_SIZE`` limit.
#
# Note that the simplest case is writing a single query data into
# a message, and returning the message data and ID. For efficiency
# purposes this method supports more than one message and automatically
@ -79,51 +77,82 @@ class MTProtoLayer:
# to store and serialize the data. However, to keep the context local
# and relevant to the only place where such feature is actually used,
# this is not done.
n = 0
#
# When iterating over the state_list there are two branches, one
# being just a state and the other being a list so the inner states
# depend on each other. In either case, if the packed size exceeds
# the maximum container size, it must be sent. This code is non-
# trivial so it has been factored into an inner function.
#
# A new buffer instance will be used once the size should be "flushed"
buffer = io.BytesIO()
for state in state_list:
if not isinstance(state, list):
n += 1
state.msg_id = self._state.write_data_as_message(
buffer, state.data, isinstance(state.request, TLRequest))
# The batch of requests sent in a single buffer-flush. We need to
# remember which states were written to set their container ID.
batch = []
# The currently written size. Reset when it exceeds the maximum.
size = 0
__log__.debug('Assigned msg_id = %d to %s (%x)',
state.msg_id, state.request.__class__.__name__,
id(state.request))
else:
last_id = None
for s in state:
n += 1
last_id = s.msg_id = self._state.write_data_as_message(
buffer, s.data, isinstance(s.request, TLRequest),
after_id=last_id)
def write_state(state, after_id=None):
nonlocal buffer, batch, size
if state:
batch.append(state)
size += len(state.data) + TLMessage.SIZE_OVERHEAD
__log__.debug('Assigned msg_id = %d to %s (%x)',
s.msg_id, s.request.__class__.__name__,
id(s.request))
if n > 1:
# Inlined code to pack several messages into a container
#
# TODO This part and encrypting data prepend a few bytes but
# force a potentially large payload to be appended, which
# may be expensive. Can we do better?
data = struct.pack(
'<Ii', MessageContainer.CONSTRUCTOR_ID, n
) + buffer.getvalue()
buffer = io.BytesIO()
container_id = self._state.write_data_as_message(
buffer, data, content_related=False
)
for state in state_list:
if not isinstance(state, list):
state.container_id = container_id
else:
for s in state:
# Flush whenever the current size exceeds the maximum,
# or if there's no state, which indicates force flush.
if not state or size > MessageContainer.MAXIMUM_SIZE:
size -= MessageContainer.MAXIMUM_SIZE
if len(batch) > 1:
# Inlined code to pack several messages into a container
data = struct.pack(
'<Ii', MessageContainer.CONSTRUCTOR_ID, len(batch)
) + buffer.getvalue()
buffer = io.BytesIO()
container_id = self._state.write_data_as_message(
buffer, data, content_related=False
)
for s in batch:
s.container_id = container_id
r = buffer.getvalue()
__log__.debug('Packed %d message(s) in %d bytes for sending', n, len(r))
return r
# At this point it's either a single msg or a msg + container
data = buffer.getvalue()
__log__.debug('Packed %d message(s) in %d bytes for sending',
len(batch), len(data))
batch.clear()
buffer = io.BytesIO()
return data
if not state:
return # Just forcibly flushing
# If even after flushing it still exceeds the maximum size,
# this message payload cannot be sent. Telegram would forcibly
# close the connection, and the message would never be confirmed.
if size > MessageContainer.MAXIMUM_SIZE:
state.future.set_exception(
ValueError('Request payload is too big'))
return
# This is the only requirement to make this work.
state.msg_id = self._state.write_data_as_message(
buffer, state.data, isinstance(state.request, TLRequest),
after_id=after_id
)
__log__.debug('Assigned msg_id = %d to %s (%x)',
state.msg_id, state.request.__class__.__name__,
id(state.request))
# TODO Yield in the inner loop -> Telegram "Invalid container". Why?
for state in state_list:
if not isinstance(state, list):
yield write_state(state)
else:
after_id = None
for s in state:
yield write_state(s, after_id)
after_id = s.msg_id
yield write_state(None)
def __str__(self):
return str(self._connection)

View File

@ -22,6 +22,8 @@ class TLMessage(TLObject):
inlined and is unlikely to change. Thus these are only needed to
encapsulate responses.
"""
SIZE_OVERHEAD = 12
def __init__(self, msg_id, seq_no, obj):
self.msg_id = msg_id
self.seq_no = seq_no