diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index e6a7ba9e..d15216c1 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -62,6 +62,11 @@ class MTProtoSender: # {id: Message} to set their Future result upon arrival. self._pending_messages = {} + # Containers are accepted or rejected as a whole when any of + # its inner requests are acknowledged. For this purpose we save + # {msg_id: container}. + self._pending_containers = [] + # We need to acknowledge every response from Telegram self._pending_ack = set() @@ -145,6 +150,9 @@ class MTProtoSender: Since the receiving part is "built in" the future, it's impossible to await receive a result that was never sent. """ + # TODO Perhaps this method should be synchronous and just return + # a `Future` that you need to further ``await`` instead of the + # currently double ``await (await send())``? message = TLMessage(self.session, request) self._pending_messages[message.msg_id] = message await self._send_queue.put(message) @@ -160,9 +168,20 @@ class MTProtoSender: Besides `connect`, only this method ever sends data. """ while self._user_connected: - # TODO If there's more than one item, send them all at once - body = helpers.pack_message( - self.session, await self._send_queue.get()) + messages = [await self._send_queue.get()] + while not self._send_queue.empty(): + messages.append(self._send_queue.get_nowait()) + + # TODO if _send_queue has a container and we wrap it inside + # another then that will not work. + if len(messages) == 1: + message = messages[0] + else: + message = TLMessage(self.session, MessageContainer(messages)) + self._pending_messages[message.msg_id] = message + self._pending_containers.append(message) + + body = helpers.pack_message(self.session, message) # TODO Handle exceptions async with self._send_lock: @@ -357,6 +376,23 @@ class MTProtoSender: # TODO https://goo.gl/LMyN7A self.session.salt = reader.tgread_object().server_salt + def _clean_containers(self, msg_ids): + """ + Helper method to clean containers from the pending messages + once a wrapped msg_id of them has been acknowledged. + + This is the only way we can resend TLMessage(MessageContainer) + on bad notifications and also mark them as received once any + of their inner TLMessage is acknowledged. + """ + for i in reversed(range(len(self._pending_containers))): + message = self._pending_containers[i] + for msg in message.request.messages: + if msg.msg_id in msg_ids: + del self._pending_containers[i] + del self._pending_messages[message.msg_id] + break + async def _handle_ack(self, msg_id, seq, reader): """ Handles a server acknowledge about our messages. Normally @@ -366,8 +402,17 @@ class MTProtoSender: Telegram doesn't seem to send its result so we need to confirm it manually. No other request is known to have this behaviour. + + Since the ID of sent messages consisting of a container is + never returned (unless on a bad notification), this method + also removes containers messages when any of their inner + messages are acknowledged. """ - for msg_id in reader.tgread_object().msg_ids: + ack = reader.tgread_object() + if self._pending_containers: + self._clean_containers(ack.msg_ids) + + for msg_id in ack.msg_ids: msg = self._pending_messages.get(msg_id, None) if msg and isinstance(msg.request, LogOutRequest): del self._pending_messages[msg_id]