diff --git a/telethon/client/messages.py b/telethon/client/messages.py index f4cb9cb7..e8d27961 100644 --- a/telethon/client/messages.py +++ b/telethon/client/messages.py @@ -1,6 +1,4 @@ -import asyncio import itertools -import time from .messageparse import MessageParseMethods from .uploads import UploadMethods @@ -10,9 +8,24 @@ from ..tl import types, functions from ..requestiter import RequestIter -class _GetHistoryIter(RequestIter): - async def _init(self, entity, offset_id, min_id, max_id, from_user, batch_size, offset_date, add_offset): - self.entity = await self.client.get_input_entity(entity) +# TODO Maybe RequestIter could rather have the update offset here? +# Maybe init should return the request to be used and it be +# called automatically? And another method to just process it. +class _MessagesIter(RequestIter): + """ + Common factor for all requests that need to iterate over messages. + """ + async def _init( + self, entity, offset_id, min_id, max_id, from_user, + batch_size, offset_date, add_offset, filter, search + ): + # Note that entity being ``None`` will perform a global search. + if entity: + self.entity = await self.client.get_input_entity(entity) + else: + self.entity = None + if self.reverse: + raise ValueError('Cannot reverse global search') # Telegram doesn't like min_id/max_id. If these IDs are low enough # (starting from last_id - 100), the request will return nothing. @@ -45,175 +58,59 @@ class _GetHistoryIter(RequestIter): types.InputPeerUser, types.InputPeerSelf)): from_user = None # Ignore from_user unless it's a user - self.from_id = (await self.client.get_peer_id(from_user)) if from_user else None - - self.request = functions.messages.GetHistoryRequest( - peer=entity, - limit=1, - offset_date=offset_date, - offset_id=offset_id, - min_id=0, - max_id=0, - add_offset=add_offset, - hash=0 - ) - - if self.limit == 0: - # No messages, but we still need to know the total message count - result = await self.client(self.request) - if isinstance(result, types.messages.MessagesNotModified): - self.total = result.count - else: - self.total = getattr(result, 'count', len(result.messages)) - raise StopAsyncIteration - - # When going in reverse we need an offset of `-limit`, but we - # also want to respect what the user passed, so add them together. - if self.reverse: - self.request.add_offset -= batch_size - - if self.wait_time is None: - self.wait_time = 1 if self.limit > 3000 else 0 - - # Telegram has a hard limit of 100. - # We don't need to fetch 100 if the limit is less. - self.batch_size = min(max(batch_size, 1), min(100, self.limit)) - self.add_offset = add_offset - self.max_id = max_id - self.min_id = min_id - self.last_id = 0 if self.reverse else float('inf') - - async def _load_next_chunk(self): - result = [] - - self.request.limit = min(self.left, self.batch_size) - if self.reverse and self.request.limit != self.batch_size: - # Remember that we need -limit when going in reverse - self.request.add_offset = self.add_offset - self.request.limit - - r = await self.client(self.request) - self.total = getattr(r, 'count', len(r.messages)) - - entities = {utils.get_peer_id(x): x - for x in itertools.chain(r.users, r.chats)} - - messages = reversed(r.messages) if self.reverse else r.messages - for message in messages: - if (isinstance(message, types.MessageEmpty) - or self.from_id and message.from_id != self.from_id): - continue - - # TODO We used to yield and return here (stopping the iterator) - # How should we go around that here? - if self.reverse: - if message.id <= self.last_id or message.id >= self.max_id: - break - else: - if message.id >= self.last_id or message.id <= self.min_id: - break - - # There has been reports that on bad connections this method - # was returning duplicated IDs sometimes. Using ``last_id`` - # is an attempt to avoid these duplicates, since the message - # IDs are returned in descending order (or asc if reverse). - self.last_id = message.id - message._finish_init(self.client, entities, self.entity) - result.append(message) - - if len(r.messages) < self.request.limit: - return result - - # Find the first message that's not empty (in some rare cases - # it can happen that the last message is :tl:`MessageEmpty`) - last_message = None - messages = r.messages if self.reverse else reversed(r.messages) - for m in messages: - if not isinstance(m, types.MessageEmpty): - last_message = m - break - - # TODO If it's None, we used to break (ending the iterator) - # Similar case as the return above. - if last_message is not None: - # There are some cases where all the messages we get start - # being empty. This can happen on migrated mega-groups if - # the history was cleared, and we're using search. Telegram - # acts incredibly weird sometimes. Messages are returned but - # only "empty", not their contents. If this is the case we - # should just give up since there won't be any new Message. - self.request.offset_id = last_message.id - self.request.offset_date = last_message.date - if self.reverse: - # We want to skip the one we already have - self.request.offset_id += 1 - - return result - - -class _SearchMessagesIter(RequestIter): - async def _init(self, entity, offset_id, min_id, max_id, from_user, batch_size, offset_date, add_offset, filter, search): - self.entity = await self.client.get_input_entity(entity) - - # Telegram doesn't like min_id/max_id. If these IDs are low enough - # (starting from last_id - 100), the request will return nothing. - # - # We can emulate their behaviour locally by setting offset = max_id - # and simply stopping once we hit a message with ID <= min_id. - if self.reverse: - offset_id = max(offset_id, min_id) - if offset_id and max_id: - if max_id - offset_id <= 1: - raise StopAsyncIteration - - if not max_id: - max_id = float('inf') - else: - offset_id = max(offset_id, max_id) - if offset_id and min_id: - if offset_id - min_id <= 1: - raise StopAsyncIteration - - if self.reverse: - if offset_id: - offset_id += 1 - else: - offset_id = 1 - if from_user: - from_user = await self.client.get_input_entity(from_user) - if not isinstance(from_user, ( - types.InputPeerUser, types.InputPeerSelf)): - from_user = None # Ignore from_user unless it's a user - - self.from_id = (await self.client.get_peer_id(from_user)) if from_user else None - - if filter is None: - filter = types.InputMessagesFilterEmpty() - - # Telegram completely ignores `from_id` in private chats - if isinstance(entity, (types.InputPeerUser, types.InputPeerSelf)): - # Don't bother sending `from_user` (it's ignored anyway), - # but keep `from_id` defined above to check it locally. - from_user = None + self.from_id = await self.client.get_peer_id(from_user) else: - # Do send `from_user` to do the filtering server-side, - # and set `from_id` to None to avoid checking it locally. self.from_id = None - self.request = functions.messages.SearchRequest( - peer=entity, - q=search or '', - filter=filter() if isinstance(filter, type) else filter, - min_date=None, - max_date=offset_date, - offset_id=offset_id, - add_offset=add_offset, - limit=0, # Search actually returns 0 items if we ask it to - max_id=0, - min_id=0, - hash=0, - from_id=from_user - ) + if not self.entity: + self.request = functions.messages.SearchGlobalRequest( + q=search or '', + offset_date=offset_date, + offset_peer=types.InputPeerEmpty(), + offset_id=offset_id, + limit=1 + ) + elif search is not None or filter or from_user: + if filter is None: + filter = types.InputMessagesFilterEmpty() + + # Telegram completely ignores `from_id` in private chats + if isinstance( + self.entity, (types.InputPeerUser, types.InputPeerSelf)): + # Don't bother sending `from_user` (it's ignored anyway), + # but keep `from_id` defined above to check it locally. + from_user = None + else: + # Do send `from_user` to do the filtering server-side, + # and set `from_id` to None to avoid checking it locally. + self.from_id = None + + self.request = functions.messages.SearchRequest( + peer=self.entity, + q=search or '', + filter=filter() if isinstance(filter, type) else filter, + min_date=None, + max_date=offset_date, + offset_id=offset_id, + add_offset=add_offset, + limit=0, # Search actually returns 0 items if we ask it to + max_id=0, + min_id=0, + hash=0, + from_id=from_user + ) + else: + self.request = functions.messages.GetHistoryRequest( + peer=self.entity, + limit=1, + offset_date=offset_date, + offset_id=offset_id, + min_id=0, + max_id=0, + add_offset=add_offset, + hash=0 + ) if self.limit == 0: # No messages, but we still need to know the total message count @@ -227,19 +124,20 @@ class _SearchMessagesIter(RequestIter): # When going in reverse we need an offset of `-limit`, but we # also want to respect what the user passed, so add them together. if self.reverse: - self.request.add_offset -= batch_size + self.request.add_offset -= self.batch_size if self.wait_time is None: self.wait_time = 1 if self.limit > 3000 else 0 - # Telegram has a hard limit of 100. - # We don't need to fetch 100 if the limit is less. - self.batch_size = min(max(batch_size, 1), min(100, self.limit)) self.add_offset = add_offset self.max_id = max_id self.min_id = min_id self.last_id = 0 if self.reverse else float('inf') + # Telegram has a hard limit of 100. + # We don't need to fetch 100 if the limit is less. + self.batch_size = min(max(batch_size, 1), min(100, self.limit)) + async def _load_next_chunk(self): result = [] @@ -260,14 +158,9 @@ class _SearchMessagesIter(RequestIter): or self.from_id and message.from_id != self.from_id): continue - # TODO We used to yield and return here (stopping the iterator) - # How should we go around that here? - if self.reverse: - if message.id <= self.last_id or message.id >= self.max_id: - break - else: - if message.id >= self.last_id or message.id <= self.min_id: - break + if not self._message_in_range(message): + self.left = len(result) + break # There has been reports that on bad connections this method # was returning duplicated IDs sometimes. Using ``last_id`` @@ -278,34 +171,74 @@ class _SearchMessagesIter(RequestIter): result.append(message) if len(r.messages) < self.request.limit: - return result + self.left = len(result) - # Find the first message that's not empty (in some rare cases + # Get the first message that's not empty (in some rare cases # it can happen that the last message is :tl:`MessageEmpty`) - last_message = None - messages = r.messages if self.reverse else reversed(r.messages) - for m in messages: - if not isinstance(m, types.MessageEmpty): - last_message = m - break - - # TODO If it's None, we used to break (ending the iterator) - # Similar case as the return above. - if last_message is not None: + if result: + self._update_offset(result[0]) + else: # There are some cases where all the messages we get start # being empty. This can happen on migrated mega-groups if # the history was cleared, and we're using search. Telegram # acts incredibly weird sometimes. Messages are returned but # only "empty", not their contents. If this is the case we # should just give up since there won't be any new Message. - self.request.offset_id = last_message.id - self.request.max_date = last_message.date # not offset_date - if self.reverse: - # We want to skip the one we already have - self.request.offset_id += 1 + self.left = len(result) return result + def _message_in_range(self, message): + """ + Determine whether the given message is in the range or + it should be ignored (and avoid loading more chunks). + """ + # No entity means message IDs between chats may vary + if self.entity: + if self.reverse: + if message.id <= self.last_id or message.id >= self.max_id: + return False + else: + if message.id >= self.last_id or message.id <= self.min_id: + return False + + return True + + def _update_offset(self, last_message): + """ + After making the request, update its offset with the last message. + """ + self.request.offset_id = last_message.id + if self.reverse: + # We want to skip the one we already have + self.request.offset_id += 1 + + if isinstance(self.request, functions.messages.SearchRequest): + self.request.max_date = last_message.date + else: + # getHistory and searchGlobal call it offset_date + self.request.offset_date = last_message.date + + if isinstance(self.request, functions.messages.SearchGlobalRequest): + self.request.offset_peer = last_message.input_chat + + +class _IDsIter(RequestIter): + async def _init(self, entity, from_user, ids): + if not utils.is_list_like(ids): + self.ids = [ids] + elif not ids: + raise StopAsyncIteration + elif self.reverse: + self.ids = list(reversed(ids)) + else: + self.ids = ids + + raise NotImplementedError + + async def _load_next_chunk(self): + raise NotImplementedError + class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods): @@ -428,242 +361,26 @@ class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods): an higher limit, so you're free to set the ``batch_size`` that you think may be good. """ - # TODO Handle global search - # TODO Handle yield IDs - # TODO Reuse code between search, global, get history - if search is not None or filter or from_user: - return _SearchMessagesIter( - self, - limit, - entity=entity, - offset_id=offset_id, - min_id=min_id, - max_id=max_id, - from_user=from_user, - batch_size=batch_size, - offset_date=offset_date, - add_offset=add_offset, - filter=filter, - search=search - ) - else: - return _GetHistoryIter( - self, - limit, - wait_time=wait_time, - entity=entity, - reverse=reverse, - offset_id=offset_id, - min_id=min_id, - max_id=max_id, - from_user=from_user, - batch_size=batch_size, - offset_date=offset_date, - add_offset=add_offset - ) - # Note that entity being ``None`` is intended to get messages by - # ID under no specific chat, and also to request a global search. - if entity: - entity = await self.get_input_entity(entity) + if ids is not None: + return _IDsIter(self, limit, entity=entity, ids=ids) - if ids: - if not utils.is_list_like(ids): - ids = (ids,) - if reverse: - ids = list(reversed(ids)) - async for x in self._iter_ids(entity, ids, total=_total): - await yield_(x) - return - - # Telegram doesn't like min_id/max_id. If these IDs are low enough - # (starting from last_id - 100), the request will return nothing. - # - # We can emulate their behaviour locally by setting offset = max_id - # and simply stopping once we hit a message with ID <= min_id. - if reverse: - offset_id = max(offset_id, min_id) - if offset_id and max_id: - if max_id - offset_id <= 1: - return - - if not max_id: - max_id = float('inf') - else: - offset_id = max(offset_id, max_id) - if offset_id and min_id: - if offset_id - min_id <= 1: - return - - if reverse: - if offset_id: - offset_id += 1 - else: - offset_id = 1 - - if from_user: - from_user = await self.get_input_entity(from_user) - if not isinstance(from_user, ( - types.InputPeerUser, types.InputPeerSelf)): - from_user = None # Ignore from_user unless it's a user - - from_id = (await self.get_peer_id(from_user)) if from_user else None - - limit = float('inf') if limit is None else int(limit) - if not entity: - if reverse: - raise ValueError('Cannot reverse global search') - - reverse = None - request = functions.messages.SearchGlobalRequest( - q=search or '', - offset_date=offset_date, - offset_peer=types.InputPeerEmpty(), - offset_id=offset_id, - limit=1 - ) - elif search is not None or filter or from_user: - if filter is None: - filter = types.InputMessagesFilterEmpty() - - # Telegram completely ignores `from_id` in private chats - if isinstance(entity, (types.InputPeerUser, types.InputPeerSelf)): - # Don't bother sending `from_user` (it's ignored anyway), - # but keep `from_id` defined above to check it locally. - from_user = None - else: - # Do send `from_user` to do the filtering server-side, - # and set `from_id` to None to avoid checking it locally. - from_id = None - - request = functions.messages.SearchRequest( - peer=entity, - q=search or '', - filter=filter() if isinstance(filter, type) else filter, - min_date=None, - max_date=offset_date, - offset_id=offset_id, - add_offset=add_offset, - limit=0, # Search actually returns 0 items if we ask it to - max_id=0, - min_id=0, - hash=0, - from_id=from_user - ) - else: - request = functions.messages.GetHistoryRequest( - peer=entity, - limit=1, - offset_date=offset_date, - offset_id=offset_id, - min_id=0, - max_id=0, - add_offset=add_offset, - hash=0 - ) - - if limit == 0: - if not _total: - return - # No messages, but we still need to know the total message count - result = await self(request) - if isinstance(result, types.messages.MessagesNotModified): - _total[0] = result.count - else: - _total[0] = getattr(result, 'count', len(result.messages)) - return - - if wait_time is None: - wait_time = 1 if limit > 3000 else 0 - - have = 0 - last_id = 0 if reverse else float('inf') - - # Telegram has a hard limit of 100. - # We don't need to fetch 100 if the limit is less. - batch_size = min(max(batch_size, 1), min(100, limit)) - - # When going in reverse we need an offset of `-limit`, but we - # also want to respect what the user passed, so add them together. - if reverse: - request.add_offset -= batch_size - - while have < limit: - start = time.time() - - request.limit = min(limit - have, batch_size) - if reverse and request.limit != batch_size: - # Remember that we need -limit when going in reverse - request.add_offset = add_offset - request.limit - - r = await self(request) - if _total: - _total[0] = getattr(r, 'count', len(r.messages)) - - entities = {utils.get_peer_id(x): x - for x in itertools.chain(r.users, r.chats)} - - messages = reversed(r.messages) if reverse else r.messages - for message in messages: - if (isinstance(message, types.MessageEmpty) - or from_id and message.from_id != from_id): - continue - - if reverse is None: - pass - elif reverse: - if message.id <= last_id or message.id >= max_id: - return - else: - if message.id >= last_id or message.id <= min_id: - return - - # There has been reports that on bad connections this method - # was returning duplicated IDs sometimes. Using ``last_id`` - # is an attempt to avoid these duplicates, since the message - # IDs are returned in descending order (or asc if reverse). - last_id = message.id - - message._finish_init(self, entities, entity) - await yield_(message) - have += 1 - - if len(r.messages) < request.limit: - break - - # Find the first message that's not empty (in some rare cases - # it can happen that the last message is :tl:`MessageEmpty`) - last_message = None - messages = r.messages if reverse else reversed(r.messages) - for m in messages: - if not isinstance(m, types.MessageEmpty): - last_message = m - break - - if last_message is None: - # There are some cases where all the messages we get start - # being empty. This can happen on migrated mega-groups if - # the history was cleared, and we're using search. Telegram - # acts incredibly weird sometimes. Messages are returned but - # only "empty", not their contents. If this is the case we - # should just give up since there won't be any new Message. - break - else: - request.offset_id = last_message.id - if isinstance(request, functions.messages.SearchRequest): - request.max_date = last_message.date - else: - # getHistory and searchGlobal call it offset_date - request.offset_date = last_message.date - - if isinstance(request, functions.messages.SearchGlobalRequest): - request.offset_peer = last_message.input_chat - elif reverse: - # We want to skip the one we already have - request.offset_id += 1 - - await asyncio.sleep( - max(wait_time - (time.time() - start), 0), loop=self._loop) + return _MessagesIter( + client=self, + reverse=reverse, + wait_time=wait_time, + limit=limit, + entity=entity, + offset_id=offset_id, + min_id=min_id, + max_id=max_id, + from_user=from_user, + batch_size=batch_size, + offset_date=offset_date, + add_offset=add_offset, + filter=filter, + search=search + ) async def get_messages(self, *args, **kwargs): """ @@ -682,6 +399,7 @@ class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods): a single `Message ` will be returned for convenience instead of a list. """ + # TODO Make RequestIter have a .collect() or similar total = [0] kwargs['_total'] = total if len(args) == 1 and 'limit' not in kwargs: diff --git a/telethon/requestiter.py b/telethon/requestiter.py index d8ccff53..ba897a2a 100644 --- a/telethon/requestiter.py +++ b/telethon/requestiter.py @@ -3,6 +3,10 @@ import asyncio import time +# TODO There are two types of iterators for requests. +# One has a limit of items to retrieve, and the +# other has a list that must be called in chunks. +# Make classes for both here so it's easy to use. class RequestIter(abc.ABC): """ Helper class to deal with requests that need offsets to iterate. @@ -50,6 +54,9 @@ class RequestIter(abc.ABC): if self.buffer is (): await self._init(**self.kwargs) + if self.left <= 0: # <= 0 because subclasses may change it + raise StopAsyncIteration + if self.index == len(self.buffer): # asyncio will handle times <= 0 to sleep 0 seconds if self.wait_time: @@ -84,7 +91,7 @@ class RequestIter(abc.ABC): 'is running (i.e. you are inside an "async def")' ) - raise NotImplementedError('lol!') + return self.__aiter__() @abc.abstractmethod async def _load_next_chunk(self):