diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 95a64624..d1ccca48 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -106,7 +106,9 @@ class MtProtoSender: # We're done receiving, remove the request from pending, if any if request: - self._pending_receive.remove(request) + try: + self._pending_receive.remove(request) + except ValueError: pass self._logger.info('Request result received') self._logger.debug('receive() released the lock') @@ -184,7 +186,11 @@ class MtProtoSender: return message, remote_msg_id, remote_sequence def _process_msg(self, msg_id, sequence, reader, updates): - """Processes and handles a Telegram message""" + """Processes and handles a Telegram message. + + Returns True if the message was handled correctly and doesn't + need to be skipped. Returns False otherwise. + """ # TODO Check salt, session_id and sequence_number self._need_confirmation.append(msg_id) @@ -222,7 +228,7 @@ class MtProtoSender: self._logger.info('Message ack confirmed a request') r.confirm_received = True - return False + return True # If the code is not parsed manually, then it was parsed by the code generator! # In this case, we will simply treat the incoming TLObject as an Update, @@ -235,7 +241,7 @@ class MtProtoSender: self._logger.debug('Read update for %s', repr(result)) updates.append(result) - return False + return True self._logger.warning('Unknown message: {}'.format(hex(code))) return False @@ -257,7 +263,7 @@ class MtProtoSender: request.confirm_received = True except StopIteration: pass - return False + return True def _handle_container(self, msg_id, sequence, reader, updates): self._logger.debug('Handling container') @@ -275,7 +281,7 @@ class MtProtoSender: inner_msg_id, sequence, reader, updates): reader.set_position(begin_position + inner_length) - return False + return True def _handle_bad_server_salt(self, msg_id, sequence, reader): self._logger.debug('Handling bad server salt') @@ -310,6 +316,7 @@ class MtProtoSender: self.session.save() self._logger.warning('Read Bad Message error: ' + str(error)) self._logger.info('Attempting to use the correct time offset.') + return True else: raise error @@ -343,26 +350,22 @@ class MtProtoSender: raise error else: - if not request: - raise ValueError( - 'The request needed to read this RPC result was not ' - 'found (possibly called receive() from another thread).') - - self._logger.debug('Reading request response') - if inner_code == 0x3072cfa1: # GZip packed - unpacked_data = gzip.decompress(reader.tgread_bytes()) - with BinaryReader(unpacked_data) as compressed_reader: - request.on_response(compressed_reader) - else: - reader.seek(-4) - if request_id == request.msg_id: - request.on_response(reader) + if request: + self._logger.debug('Reading request response') + if inner_code == 0x3072cfa1: # GZip packed + unpacked_data = gzip.decompress(reader.tgread_bytes()) + with BinaryReader(unpacked_data) as compressed_reader: + request.on_response(compressed_reader) else: - # If it's really a result for RPC from previous connection - # session, it will be skipped by the handle_container() - self._logger.warning( - 'RPC result found for unknown request ' - '(maybe from previous connection session)') + reader.seek(-4) + request.on_response(reader) + + return True + else: + # If it's really a result for RPC from previous connection + # session, it will be skipped by the handle_container() + self._logger.warning('Lost request will be skipped.') + return False def _handle_gzip_packed(self, msg_id, sequence, reader, updates): self._logger.debug('Handling gzip packed data')