diff --git a/telethon/client/auth.py b/telethon/client/auth.py index b6add62c..b1a932e7 100644 --- a/telethon/client/auth.py +++ b/telethon/client/auth.py @@ -378,8 +378,8 @@ class AuthMethods(MessageParseMethods, UserMethods): # By setting state.pts = 1 after logging in, the user or bot can # `catch_up` on all updates (and obtain necessary access hashes) # if they desire. The date parameter is ignored when pts = 1. - self._state.pts = 1 - self._state.date = datetime.datetime.now(tz=datetime.timezone.utc) + self._old_state = types.updates.State( + 1, 0, datetime.datetime.now(tz=datetime.timezone.utc), 0, 0) return user @@ -437,8 +437,8 @@ class AuthMethods(MessageParseMethods, UserMethods): self._bot = None self._self_input_peer = None self._authorized = False - self._state = types.updates.State( - 0, 0, datetime.datetime.now(tz=datetime.timezone.utc), 0, 0) + self._old_state = None + self._new_state = None await self.disconnect() self.session.delete() diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index ab21cae0..36b52469 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -304,10 +304,17 @@ class TelegramBaseClient(abc.ABC): self._dispatching_updates_queue = None self._authorized = None # None = unknown, False = no, True = yes - self._state = self.session.get_update_state(0) - if not self._state: - self._state = types.updates.State( - 0, 0, datetime.now(tz=timezone.utc), 0, 0) + + # Update state (for catching up after a disconnection) + self._old_state = self.session.get_update_state(0) + self._new_state = None + + # If we catch up, while we don't get disconnected, + # the old state will be the same as the new one. + # + # If we do get disconnected, then the old and new + # state may differ. + self._old_state_is_new = False # Some further state for subclasses self._event_builders = [] @@ -389,7 +396,13 @@ class TelegramBaseClient(abc.ABC): async def _disconnect_coro(self): await self._disconnect() - self.session.set_update_state(0, self._state) + + # If we disconnect, the old state is the last one we are aware of + self._old_state_is_new = True + + if self._new_state: + self.session.set_update_state(0, self._new_state) + self.session.close() async def _disconnect(self): diff --git a/telethon/client/updates.py b/telethon/client/updates.py index 6b35f4c8..8d481d1e 100644 --- a/telethon/client/updates.py +++ b/telethon/client/updates.py @@ -135,10 +135,19 @@ class UpdateMethods(UserMethods): This can also be used to forcibly fetch new updates if there are any. """ - state = self._state - if state.pts == 0: - # pts = 0 is invalid, pts = 1 will catch up since the beginning - state.pts = 1 + state = self._new_state if self._old_state_is_new else self._old_state + if not self._old_state_is_new and self._new_state: + max_pts = self._new_state.pts + else: + max_pts = float('inf') + + print('catching up since', state, 'up to', max_pts) + + # No known state -> catch up since the beginning (date is ignored). + # Note: pts = 0 is invalid (and so is no date/unix timestamp = 0). + if not state: + state = types.updates.State( + 1, 0, datetime.datetime.now(tz=datetime.timezone.utc), 0, 0) self.session.catching_up = True try: @@ -163,6 +172,22 @@ class UpdateMethods(UserMethods): for m in d.new_messages ] )) + + # We don't want to fetch updates we already know about. + # + # We may still get duplicates because the Difference + # contains a lot of updates and presumably only has + # the state for the last one, but at least we don't + # unnecessarily fetch too many. + # + # updates.getDifference's pts_total_limit seems to mean + # "how many pts is the request allowed to return", and + # if there is more than that, it returns "too long" (so + # there would be duplicate updates since we know about + # some). This can be used to detect collisions (i.e. + # it would return an update we have already seen). + if state.pts >= max_pts: + break else: if isinstance(d, types.updates.DifferenceEmpty): state.date = d.date @@ -171,7 +196,9 @@ class UpdateMethods(UserMethods): state.pts = d.pts break finally: - self._state = state + self._old_state = None + self._new_state = state + self._old_state_is_new = True self.session.set_update_state(0, state) self.session.catching_up = False @@ -201,17 +228,26 @@ class UpdateMethods(UserMethods): self._dispatching_updates_queue.set() self._loop.create_task(self._dispatch_queue_updates()) - need_diff = False - if hasattr(update, 'pts') and update.pts is not None: - if self._state.pts and (update.pts - self._state.pts) > 1: - need_diff = True - self._state.pts = update.pts - if hasattr(update, 'date'): - self._state.date = update.date - if hasattr(update, 'seq'): - self._state.seq = update.seq - # TODO make use of need_diff + need_diff = False + if getattr(update, 'pts', None): + if not self._new_state: + self._new_state = types.updates.State( + update.pts, + 0, + getattr(update, 'date', datetime.datetime.now(tz=datetime.timezone.utc)), + getattr(update, 'seq', 0), + 0 + ) + else: + if self._new_state.pts and (update.pts - self._new_state.pts) > 1: + need_diff = True + + self._new_state.pts = update.pts + if hasattr(update, 'date'): + self._new_state.date = update.date + if hasattr(update, 'seq'): + self._new_state.seq = update.seq async def _update_loop(self): # Pings' ID don't really need to be secure, just "random" @@ -307,10 +343,38 @@ class UpdateMethods(UserMethods): try: self._log[__name__].info( 'Asking for the current state after reconnect...') - state = await self(functions.updates.GetStateRequest()) - self._log[__name__].info('Got new state! %s', state) + + # TODO consider: + # If there aren't many updates while the client is disconnected + # (I tried with up to 20), Telegram seems to send them without + # asking for them (via updates.getDifference). + # + # On disconnection, the library should probably set a "need + # difference" or "catching up" flag so that any new updates are + # ignored, and then the library should call updates.getDifference + # itself to fetch them. + # + # In any case (either there are too many updates and Telegram + # didn't send them, or there isn't a lot and Telegram sent them + # but we dropped them), we fetch the new difference to get all + # missed updates. I feel like this would be the best solution. + + # If a disconnection occurs, the old known state will be + # the latest one we were aware of, so we can catch up since + # the most recent state we were aware of. + # TODO Ideally we set _old_state = _new_state *on* disconnect, + # not *after* we managed to reconnect since perhaps an update + # arrives just before we can get started. + self._old_state_is_new = True + await self.catch_up() + + self._log[__name__].info('Successfully fetched missed updates') except errors.RPCError as e: - self._log[__name__].info('Failed to get current state: %r', e) + self._log[__name__].warning('Failed to get missed updates after ' + 'reconnect: %r', e) + except Exception: + self._log[__name__].exception('Unhandled exception while getting ' + 'update difference after reconnect') # endregion