diff --git a/telethon/client/messages.py b/telethon/client/messages.py index 2f928c69..f4cb9cb7 100644 --- a/telethon/client/messages.py +++ b/telethon/client/messages.py @@ -150,6 +150,163 @@ class _GetHistoryIter(RequestIter): return result +class _SearchMessagesIter(RequestIter): + async def _init(self, entity, offset_id, min_id, max_id, from_user, batch_size, offset_date, add_offset, filter, search): + self.entity = await self.client.get_input_entity(entity) + + # Telegram doesn't like min_id/max_id. If these IDs are low enough + # (starting from last_id - 100), the request will return nothing. + # + # We can emulate their behaviour locally by setting offset = max_id + # and simply stopping once we hit a message with ID <= min_id. + if self.reverse: + offset_id = max(offset_id, min_id) + if offset_id and max_id: + if max_id - offset_id <= 1: + raise StopAsyncIteration + + if not max_id: + max_id = float('inf') + else: + offset_id = max(offset_id, max_id) + if offset_id and min_id: + if offset_id - min_id <= 1: + raise StopAsyncIteration + + if self.reverse: + if offset_id: + offset_id += 1 + else: + offset_id = 1 + + if from_user: + from_user = await self.client.get_input_entity(from_user) + if not isinstance(from_user, ( + types.InputPeerUser, types.InputPeerSelf)): + from_user = None # Ignore from_user unless it's a user + + self.from_id = (await self.client.get_peer_id(from_user)) if from_user else None + + if filter is None: + filter = types.InputMessagesFilterEmpty() + + # Telegram completely ignores `from_id` in private chats + if isinstance(entity, (types.InputPeerUser, types.InputPeerSelf)): + # Don't bother sending `from_user` (it's ignored anyway), + # but keep `from_id` defined above to check it locally. + from_user = None + else: + # Do send `from_user` to do the filtering server-side, + # and set `from_id` to None to avoid checking it locally. + self.from_id = None + + self.request = functions.messages.SearchRequest( + peer=entity, + q=search or '', + filter=filter() if isinstance(filter, type) else filter, + min_date=None, + max_date=offset_date, + offset_id=offset_id, + add_offset=add_offset, + limit=0, # Search actually returns 0 items if we ask it to + max_id=0, + min_id=0, + hash=0, + from_id=from_user + ) + + if self.limit == 0: + # No messages, but we still need to know the total message count + result = await self.client(self.request) + if isinstance(result, types.messages.MessagesNotModified): + self.total = result.count + else: + self.total = getattr(result, 'count', len(result.messages)) + raise StopAsyncIteration + + # When going in reverse we need an offset of `-limit`, but we + # also want to respect what the user passed, so add them together. + if self.reverse: + self.request.add_offset -= batch_size + + if self.wait_time is None: + self.wait_time = 1 if self.limit > 3000 else 0 + + # Telegram has a hard limit of 100. + # We don't need to fetch 100 if the limit is less. + self.batch_size = min(max(batch_size, 1), min(100, self.limit)) + self.add_offset = add_offset + self.max_id = max_id + self.min_id = min_id + self.last_id = 0 if self.reverse else float('inf') + + async def _load_next_chunk(self): + result = [] + + self.request.limit = min(self.left, self.batch_size) + if self.reverse and self.request.limit != self.batch_size: + # Remember that we need -limit when going in reverse + self.request.add_offset = self.add_offset - self.request.limit + + r = await self.client(self.request) + self.total = getattr(r, 'count', len(r.messages)) + + entities = {utils.get_peer_id(x): x + for x in itertools.chain(r.users, r.chats)} + + messages = reversed(r.messages) if self.reverse else r.messages + for message in messages: + if (isinstance(message, types.MessageEmpty) + or self.from_id and message.from_id != self.from_id): + continue + + # TODO We used to yield and return here (stopping the iterator) + # How should we go around that here? + if self.reverse: + if message.id <= self.last_id or message.id >= self.max_id: + break + else: + if message.id >= self.last_id or message.id <= self.min_id: + break + + # There has been reports that on bad connections this method + # was returning duplicated IDs sometimes. Using ``last_id`` + # is an attempt to avoid these duplicates, since the message + # IDs are returned in descending order (or asc if reverse). + self.last_id = message.id + message._finish_init(self.client, entities, self.entity) + result.append(message) + + if len(r.messages) < self.request.limit: + return result + + # Find the first message that's not empty (in some rare cases + # it can happen that the last message is :tl:`MessageEmpty`) + last_message = None + messages = r.messages if self.reverse else reversed(r.messages) + for m in messages: + if not isinstance(m, types.MessageEmpty): + last_message = m + break + + # TODO If it's None, we used to break (ending the iterator) + # Similar case as the return above. + if last_message is not None: + # There are some cases where all the messages we get start + # being empty. This can happen on migrated mega-groups if + # the history was cleared, and we're using search. Telegram + # acts incredibly weird sometimes. Messages are returned but + # only "empty", not their contents. If this is the case we + # should just give up since there won't be any new Message. + self.request.offset_id = last_message.id + self.request.max_date = last_message.date # not offset_date + if self.reverse: + # We want to skip the one we already have + self.request.offset_id += 1 + + return result + + class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods): # region Public methods @@ -272,22 +429,39 @@ class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods): you think may be good. """ # TODO Handle global search - # TODO Handle search # TODO Handle yield IDs - return _GetHistoryIter( - self, - limit=limit, - wait_time=wait_time, - entity=entity, - reverse=reverse, - offset_id=offset_id, - min_id=min_id, - max_id=max_id, - from_user=from_user, - batch_size=batch_size, - offset_date=offset_date, - add_offset=add_offset - ) + # TODO Reuse code between search, global, get history + if search is not None or filter or from_user: + return _SearchMessagesIter( + self, + limit, + entity=entity, + offset_id=offset_id, + min_id=min_id, + max_id=max_id, + from_user=from_user, + batch_size=batch_size, + offset_date=offset_date, + add_offset=add_offset, + filter=filter, + search=search + ) + else: + return _GetHistoryIter( + self, + limit, + wait_time=wait_time, + entity=entity, + reverse=reverse, + offset_id=offset_id, + min_id=min_id, + max_id=max_id, + from_user=from_user, + batch_size=batch_size, + offset_date=offset_date, + add_offset=add_offset + ) + # Note that entity being ``None`` is intended to get messages by # ID under no specific chat, and also to request a global search. if entity: