diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index 9b10817d..9b93c053 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -67,8 +67,8 @@ class MTProtoSender: self._pending_messages = {} # Containers are accepted or rejected as a whole when any of - # its inner requests are acknowledged. For this purpose we save - # {msg_id: container}. + # its inner requests are acknowledged. For this purpose we + # all the sent containers here. self._pending_containers = [] # We need to acknowledge every response from Telegram @@ -200,17 +200,33 @@ class MTProtoSender: self.session, MsgsAck(list(self._pending_ack)))) self._pending_ack.clear() - message = await self._send_queue.get() - if isinstance(message, list): - message = TLMessage(self.session, MessageContainer(message)) + messages = await self._send_queue.get() + if isinstance(messages, list): + message = TLMessage(self.session, MessageContainer(messages)) self._pending_messages[message.msg_id] = message self._pending_containers.append(message) + else: + message = messages + messages = [message] body = helpers.pack_message(self.session, message) - # TODO Handle exceptions - async with self._send_lock: - await self._connection.send(body) + while not any(m.future.cancelled() for m in messages): + try: + async with self._send_lock: + await self._connection.send(body) + break + # TODO Are there more exceptions besides timeout? + except asyncio.TimeoutError: + continue + else: + # Remove the cancelled messages from pending + self._clean_containers([m.msg_id for m in messages]) + for m in messages: + if m.future.cancelled(): + self._pending_messages.pop(m.msg_id, None) + else: + await self._send_queue.put(m) async def _recv_loop(self): """ @@ -220,9 +236,15 @@ class MTProtoSender: Besides `connect`, only this method ever receives data. """ while self._user_connected: - # TODO Handle exceptions - async with self._recv_lock: - body = await self._connection.recv() + # TODO Are there more exceptions besides timeout? + # Disconnecting or switching off WiFi only resulted in + # timeouts, and once the network was back it continued + # on its own after a short delay. + try: + async with self._recv_lock: + body = await self._connection.recv() + except asyncio.TimeoutError: + continue # TODO Check salt, session_id and sequence_number try: