Also process own updates in MessageBox

This commit is contained in:
Lonami Exo 2022-05-19 16:40:32 +02:00
parent 80685191ab
commit 3a44f56f64
2 changed files with 29 additions and 0 deletions

View File

@ -345,12 +345,22 @@ class MessageBox:
# updatesTooLong is the only one with no date (we treat it as a gap)
raise GapError
# v1 has never sent updates produced by the client itself to the handlers.
# However proper update handling requires those to be processed.
# This is an ugly workaround for that.
self_outgoing = getattr(updates, '_self_outgoing', False)
real_result = result
result = []
seq = getattr(updates, 'seq', None) or NO_SEQ
seq_start = getattr(updates, 'seq_start', None) or seq
users = getattr(updates, 'users', None) or []
chats = getattr(updates, 'chats', None) or []
updates = getattr(updates, 'updates', None) or [updates]
for u in updates:
u._self_outgoing = self_outgoing
# > For all the other [not `updates` or `updatesCombined`] `Updates` type constructors
# > there is no need to check `seq` or change a local state.
if seq_start != NO_SEQ:
@ -392,6 +402,8 @@ class MessageBox:
# Clear now-empty gaps.
self.possible_gaps = {entry: gap for entry, gap in self.possible_gaps.items() if gap.updates}
real_result.extend(u for u in result if not u._self_outgoing)
return (users, chats)
# Tries to apply the input update if its `PtsInfo` follows the correct order.

View File

@ -22,6 +22,7 @@ from ..tl.types import (
MsgNewDetailedInfo, NewSessionCreated, MsgDetailedInfo, MsgsStateReq,
MsgsStateInfo, MsgsAllInfo, MsgResendReq, upload, DestroySessionOk, DestroySessionNone,
)
from ..tl import types as _tl
from ..crypto import AuthKey
from ..helpers import retry_range
@ -615,6 +616,7 @@ class MTProtoSender:
if not state.future.cancelled():
state.future.set_exception(e)
else:
self._store_own_updates(result)
if not state.future.cancelled():
state.future.set_result(result)
@ -649,6 +651,21 @@ class MTProtoSender:
self._log.debug('Handling update %s', message.obj.__class__.__name__)
self._updates_queue.put_nowait(message.obj)
def _store_own_updates(self, obj, *, _update_ids=frozenset((
_tl.UpdateShortMessage.CONSTRUCTOR_ID,
_tl.UpdateShortChatMessage.CONSTRUCTOR_ID,
_tl.UpdateShort.CONSTRUCTOR_ID,
_tl.UpdatesCombined.CONSTRUCTOR_ID,
_tl.Updates.CONSTRUCTOR_ID,
_tl.UpdateShortSentMessage.CONSTRUCTOR_ID,
))):
try:
if obj.CONSTRUCTOR_ID in _update_ids:
obj._self_outgoing = True # flag to only process, but not dispatch these
self._updates_queue.put_nowait(obj)
except AttributeError:
pass
async def _handle_pong(self, message):
"""
Handles pong results, which don't come inside a ``rpc_result``