mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2024-11-22 17:36:34 +03:00
Fix commit 65912f9
by properly skipping lost messages
This commit is contained in:
parent
65912f926b
commit
3ed59d08e5
|
@ -106,7 +106,9 @@ class MtProtoSender:
|
||||||
|
|
||||||
# We're done receiving, remove the request from pending, if any
|
# We're done receiving, remove the request from pending, if any
|
||||||
if request:
|
if request:
|
||||||
self._pending_receive.remove(request)
|
try:
|
||||||
|
self._pending_receive.remove(request)
|
||||||
|
except ValueError: pass
|
||||||
|
|
||||||
self._logger.info('Request result received')
|
self._logger.info('Request result received')
|
||||||
self._logger.debug('receive() released the lock')
|
self._logger.debug('receive() released the lock')
|
||||||
|
@ -184,7 +186,11 @@ class MtProtoSender:
|
||||||
return message, remote_msg_id, remote_sequence
|
return message, remote_msg_id, remote_sequence
|
||||||
|
|
||||||
def _process_msg(self, msg_id, sequence, reader, updates):
|
def _process_msg(self, msg_id, sequence, reader, updates):
|
||||||
"""Processes and handles a Telegram message"""
|
"""Processes and handles a Telegram message.
|
||||||
|
|
||||||
|
Returns True if the message was handled correctly and doesn't
|
||||||
|
need to be skipped. Returns False otherwise.
|
||||||
|
"""
|
||||||
|
|
||||||
# TODO Check salt, session_id and sequence_number
|
# TODO Check salt, session_id and sequence_number
|
||||||
self._need_confirmation.append(msg_id)
|
self._need_confirmation.append(msg_id)
|
||||||
|
@ -222,7 +228,7 @@ class MtProtoSender:
|
||||||
self._logger.info('Message ack confirmed a request')
|
self._logger.info('Message ack confirmed a request')
|
||||||
r.confirm_received = True
|
r.confirm_received = True
|
||||||
|
|
||||||
return False
|
return True
|
||||||
|
|
||||||
# If the code is not parsed manually, then it was parsed by the code generator!
|
# If the code is not parsed manually, then it was parsed by the code generator!
|
||||||
# In this case, we will simply treat the incoming TLObject as an Update,
|
# In this case, we will simply treat the incoming TLObject as an Update,
|
||||||
|
@ -235,7 +241,7 @@ class MtProtoSender:
|
||||||
self._logger.debug('Read update for %s', repr(result))
|
self._logger.debug('Read update for %s', repr(result))
|
||||||
updates.append(result)
|
updates.append(result)
|
||||||
|
|
||||||
return False
|
return True
|
||||||
|
|
||||||
self._logger.warning('Unknown message: {}'.format(hex(code)))
|
self._logger.warning('Unknown message: {}'.format(hex(code)))
|
||||||
return False
|
return False
|
||||||
|
@ -257,7 +263,7 @@ class MtProtoSender:
|
||||||
request.confirm_received = True
|
request.confirm_received = True
|
||||||
except StopIteration: pass
|
except StopIteration: pass
|
||||||
|
|
||||||
return False
|
return True
|
||||||
|
|
||||||
def _handle_container(self, msg_id, sequence, reader, updates):
|
def _handle_container(self, msg_id, sequence, reader, updates):
|
||||||
self._logger.debug('Handling container')
|
self._logger.debug('Handling container')
|
||||||
|
@ -275,7 +281,7 @@ class MtProtoSender:
|
||||||
inner_msg_id, sequence, reader, updates):
|
inner_msg_id, sequence, reader, updates):
|
||||||
reader.set_position(begin_position + inner_length)
|
reader.set_position(begin_position + inner_length)
|
||||||
|
|
||||||
return False
|
return True
|
||||||
|
|
||||||
def _handle_bad_server_salt(self, msg_id, sequence, reader):
|
def _handle_bad_server_salt(self, msg_id, sequence, reader):
|
||||||
self._logger.debug('Handling bad server salt')
|
self._logger.debug('Handling bad server salt')
|
||||||
|
@ -310,6 +316,7 @@ class MtProtoSender:
|
||||||
self.session.save()
|
self.session.save()
|
||||||
self._logger.warning('Read Bad Message error: ' + str(error))
|
self._logger.warning('Read Bad Message error: ' + str(error))
|
||||||
self._logger.info('Attempting to use the correct time offset.')
|
self._logger.info('Attempting to use the correct time offset.')
|
||||||
|
return True
|
||||||
else:
|
else:
|
||||||
raise error
|
raise error
|
||||||
|
|
||||||
|
@ -343,26 +350,22 @@ class MtProtoSender:
|
||||||
|
|
||||||
raise error
|
raise error
|
||||||
else:
|
else:
|
||||||
if not request:
|
if request:
|
||||||
raise ValueError(
|
self._logger.debug('Reading request response')
|
||||||
'The request needed to read this RPC result was not '
|
if inner_code == 0x3072cfa1: # GZip packed
|
||||||
'found (possibly called receive() from another thread).')
|
unpacked_data = gzip.decompress(reader.tgread_bytes())
|
||||||
|
with BinaryReader(unpacked_data) as compressed_reader:
|
||||||
self._logger.debug('Reading request response')
|
request.on_response(compressed_reader)
|
||||||
if inner_code == 0x3072cfa1: # GZip packed
|
|
||||||
unpacked_data = gzip.decompress(reader.tgread_bytes())
|
|
||||||
with BinaryReader(unpacked_data) as compressed_reader:
|
|
||||||
request.on_response(compressed_reader)
|
|
||||||
else:
|
|
||||||
reader.seek(-4)
|
|
||||||
if request_id == request.msg_id:
|
|
||||||
request.on_response(reader)
|
|
||||||
else:
|
else:
|
||||||
# If it's really a result for RPC from previous connection
|
reader.seek(-4)
|
||||||
# session, it will be skipped by the handle_container()
|
request.on_response(reader)
|
||||||
self._logger.warning(
|
|
||||||
'RPC result found for unknown request '
|
return True
|
||||||
'(maybe from previous connection session)')
|
else:
|
||||||
|
# If it's really a result for RPC from previous connection
|
||||||
|
# session, it will be skipped by the handle_container()
|
||||||
|
self._logger.warning('Lost request will be skipped.')
|
||||||
|
return False
|
||||||
|
|
||||||
def _handle_gzip_packed(self, msg_id, sequence, reader, updates):
|
def _handle_gzip_packed(self, msg_id, sequence, reader, updates):
|
||||||
self._logger.debug('Handling gzip packed data')
|
self._logger.debug('Handling gzip packed data')
|
||||||
|
|
Loading…
Reference in New Issue
Block a user