Collapse multiple requests into a single container

This commit is contained in:
Lonami Exo 2018-06-07 13:33:32 +02:00
parent a3687b8bb5
commit 382355a22f

View File

@ -62,6 +62,11 @@ class MTProtoSender:
# {id: Message} to set their Future result upon arrival. # {id: Message} to set their Future result upon arrival.
self._pending_messages = {} self._pending_messages = {}
# Containers are accepted or rejected as a whole when any of
# its inner requests are acknowledged. For this purpose we save
# {msg_id: container}.
self._pending_containers = []
# We need to acknowledge every response from Telegram # We need to acknowledge every response from Telegram
self._pending_ack = set() self._pending_ack = set()
@ -145,6 +150,9 @@ class MTProtoSender:
Since the receiving part is "built in" the future, it's Since the receiving part is "built in" the future, it's
impossible to await receive a result that was never sent. impossible to await receive a result that was never sent.
""" """
# TODO Perhaps this method should be synchronous and just return
# a `Future` that you need to further ``await`` instead of the
# currently double ``await (await send())``?
message = TLMessage(self.session, request) message = TLMessage(self.session, request)
self._pending_messages[message.msg_id] = message self._pending_messages[message.msg_id] = message
await self._send_queue.put(message) await self._send_queue.put(message)
@ -160,9 +168,20 @@ class MTProtoSender:
Besides `connect`, only this method ever sends data. Besides `connect`, only this method ever sends data.
""" """
while self._user_connected: while self._user_connected:
# TODO If there's more than one item, send them all at once messages = [await self._send_queue.get()]
body = helpers.pack_message( while not self._send_queue.empty():
self.session, await self._send_queue.get()) messages.append(self._send_queue.get_nowait())
# TODO if _send_queue has a container and we wrap it inside
# another then that will not work.
if len(messages) == 1:
message = messages[0]
else:
message = TLMessage(self.session, MessageContainer(messages))
self._pending_messages[message.msg_id] = message
self._pending_containers.append(message)
body = helpers.pack_message(self.session, message)
# TODO Handle exceptions # TODO Handle exceptions
async with self._send_lock: async with self._send_lock:
@ -357,6 +376,23 @@ class MTProtoSender:
# TODO https://goo.gl/LMyN7A # TODO https://goo.gl/LMyN7A
self.session.salt = reader.tgread_object().server_salt self.session.salt = reader.tgread_object().server_salt
def _clean_containers(self, msg_ids):
"""
Helper method to clean containers from the pending messages
once a wrapped msg_id of them has been acknowledged.
This is the only way we can resend TLMessage(MessageContainer)
on bad notifications and also mark them as received once any
of their inner TLMessage is acknowledged.
"""
for i in reversed(range(len(self._pending_containers))):
message = self._pending_containers[i]
for msg in message.request.messages:
if msg.msg_id in msg_ids:
del self._pending_containers[i]
del self._pending_messages[message.msg_id]
break
async def _handle_ack(self, msg_id, seq, reader): async def _handle_ack(self, msg_id, seq, reader):
""" """
Handles a server acknowledge about our messages. Normally Handles a server acknowledge about our messages. Normally
@ -366,8 +402,17 @@ class MTProtoSender:
Telegram doesn't seem to send its result so we need to confirm Telegram doesn't seem to send its result so we need to confirm
it manually. No other request is known to have this behaviour. it manually. No other request is known to have this behaviour.
Since the ID of sent messages consisting of a container is
never returned (unless on a bad notification), this method
also removes containers messages when any of their inner
messages are acknowledged.
""" """
for msg_id in reader.tgread_object().msg_ids: ack = reader.tgread_object()
if self._pending_containers:
self._clean_containers(ack.msg_ids)
for msg_id in ack.msg_ids:
msg = self._pending_messages.get(msg_id, None) msg = self._pending_messages.get(msg_id, None)
if msg and isinstance(msg.request, LogOutRequest): if msg and isinstance(msg.request, LogOutRequest):
del self._pending_messages[msg_id] del self._pending_messages[msg_id]