mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2024-11-22 17:36:34 +03:00
Make iter_messages use a common message iterator
This commit is contained in:
parent
19f38d6733
commit
e2f44ddbea
|
@ -1,6 +1,4 @@
|
||||||
import asyncio
|
|
||||||
import itertools
|
import itertools
|
||||||
import time
|
|
||||||
|
|
||||||
from .messageparse import MessageParseMethods
|
from .messageparse import MessageParseMethods
|
||||||
from .uploads import UploadMethods
|
from .uploads import UploadMethods
|
||||||
|
@ -10,9 +8,24 @@ from ..tl import types, functions
|
||||||
from ..requestiter import RequestIter
|
from ..requestiter import RequestIter
|
||||||
|
|
||||||
|
|
||||||
class _GetHistoryIter(RequestIter):
|
# TODO Maybe RequestIter could rather have the update offset here?
|
||||||
async def _init(self, entity, offset_id, min_id, max_id, from_user, batch_size, offset_date, add_offset):
|
# Maybe init should return the request to be used and it be
|
||||||
self.entity = await self.client.get_input_entity(entity)
|
# called automatically? And another method to just process it.
|
||||||
|
class _MessagesIter(RequestIter):
|
||||||
|
"""
|
||||||
|
Common factor for all requests that need to iterate over messages.
|
||||||
|
"""
|
||||||
|
async def _init(
|
||||||
|
self, entity, offset_id, min_id, max_id, from_user,
|
||||||
|
batch_size, offset_date, add_offset, filter, search
|
||||||
|
):
|
||||||
|
# Note that entity being ``None`` will perform a global search.
|
||||||
|
if entity:
|
||||||
|
self.entity = await self.client.get_input_entity(entity)
|
||||||
|
else:
|
||||||
|
self.entity = None
|
||||||
|
if self.reverse:
|
||||||
|
raise ValueError('Cannot reverse global search')
|
||||||
|
|
||||||
# Telegram doesn't like min_id/max_id. If these IDs are low enough
|
# Telegram doesn't like min_id/max_id. If these IDs are low enough
|
||||||
# (starting from last_id - 100), the request will return nothing.
|
# (starting from last_id - 100), the request will return nothing.
|
||||||
|
@ -45,175 +58,59 @@ class _GetHistoryIter(RequestIter):
|
||||||
types.InputPeerUser, types.InputPeerSelf)):
|
types.InputPeerUser, types.InputPeerSelf)):
|
||||||
from_user = None # Ignore from_user unless it's a user
|
from_user = None # Ignore from_user unless it's a user
|
||||||
|
|
||||||
self.from_id = (await self.client.get_peer_id(from_user)) if from_user else None
|
|
||||||
|
|
||||||
self.request = functions.messages.GetHistoryRequest(
|
|
||||||
peer=entity,
|
|
||||||
limit=1,
|
|
||||||
offset_date=offset_date,
|
|
||||||
offset_id=offset_id,
|
|
||||||
min_id=0,
|
|
||||||
max_id=0,
|
|
||||||
add_offset=add_offset,
|
|
||||||
hash=0
|
|
||||||
)
|
|
||||||
|
|
||||||
if self.limit == 0:
|
|
||||||
# No messages, but we still need to know the total message count
|
|
||||||
result = await self.client(self.request)
|
|
||||||
if isinstance(result, types.messages.MessagesNotModified):
|
|
||||||
self.total = result.count
|
|
||||||
else:
|
|
||||||
self.total = getattr(result, 'count', len(result.messages))
|
|
||||||
raise StopAsyncIteration
|
|
||||||
|
|
||||||
# When going in reverse we need an offset of `-limit`, but we
|
|
||||||
# also want to respect what the user passed, so add them together.
|
|
||||||
if self.reverse:
|
|
||||||
self.request.add_offset -= batch_size
|
|
||||||
|
|
||||||
if self.wait_time is None:
|
|
||||||
self.wait_time = 1 if self.limit > 3000 else 0
|
|
||||||
|
|
||||||
# Telegram has a hard limit of 100.
|
|
||||||
# We don't need to fetch 100 if the limit is less.
|
|
||||||
self.batch_size = min(max(batch_size, 1), min(100, self.limit))
|
|
||||||
self.add_offset = add_offset
|
|
||||||
self.max_id = max_id
|
|
||||||
self.min_id = min_id
|
|
||||||
self.last_id = 0 if self.reverse else float('inf')
|
|
||||||
|
|
||||||
async def _load_next_chunk(self):
|
|
||||||
result = []
|
|
||||||
|
|
||||||
self.request.limit = min(self.left, self.batch_size)
|
|
||||||
if self.reverse and self.request.limit != self.batch_size:
|
|
||||||
# Remember that we need -limit when going in reverse
|
|
||||||
self.request.add_offset = self.add_offset - self.request.limit
|
|
||||||
|
|
||||||
r = await self.client(self.request)
|
|
||||||
self.total = getattr(r, 'count', len(r.messages))
|
|
||||||
|
|
||||||
entities = {utils.get_peer_id(x): x
|
|
||||||
for x in itertools.chain(r.users, r.chats)}
|
|
||||||
|
|
||||||
messages = reversed(r.messages) if self.reverse else r.messages
|
|
||||||
for message in messages:
|
|
||||||
if (isinstance(message, types.MessageEmpty)
|
|
||||||
or self.from_id and message.from_id != self.from_id):
|
|
||||||
continue
|
|
||||||
|
|
||||||
# TODO We used to yield and return here (stopping the iterator)
|
|
||||||
# How should we go around that here?
|
|
||||||
if self.reverse:
|
|
||||||
if message.id <= self.last_id or message.id >= self.max_id:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
if message.id >= self.last_id or message.id <= self.min_id:
|
|
||||||
break
|
|
||||||
|
|
||||||
# There has been reports that on bad connections this method
|
|
||||||
# was returning duplicated IDs sometimes. Using ``last_id``
|
|
||||||
# is an attempt to avoid these duplicates, since the message
|
|
||||||
# IDs are returned in descending order (or asc if reverse).
|
|
||||||
self.last_id = message.id
|
|
||||||
message._finish_init(self.client, entities, self.entity)
|
|
||||||
result.append(message)
|
|
||||||
|
|
||||||
if len(r.messages) < self.request.limit:
|
|
||||||
return result
|
|
||||||
|
|
||||||
# Find the first message that's not empty (in some rare cases
|
|
||||||
# it can happen that the last message is :tl:`MessageEmpty`)
|
|
||||||
last_message = None
|
|
||||||
messages = r.messages if self.reverse else reversed(r.messages)
|
|
||||||
for m in messages:
|
|
||||||
if not isinstance(m, types.MessageEmpty):
|
|
||||||
last_message = m
|
|
||||||
break
|
|
||||||
|
|
||||||
# TODO If it's None, we used to break (ending the iterator)
|
|
||||||
# Similar case as the return above.
|
|
||||||
if last_message is not None:
|
|
||||||
# There are some cases where all the messages we get start
|
|
||||||
# being empty. This can happen on migrated mega-groups if
|
|
||||||
# the history was cleared, and we're using search. Telegram
|
|
||||||
# acts incredibly weird sometimes. Messages are returned but
|
|
||||||
# only "empty", not their contents. If this is the case we
|
|
||||||
# should just give up since there won't be any new Message.
|
|
||||||
self.request.offset_id = last_message.id
|
|
||||||
self.request.offset_date = last_message.date
|
|
||||||
if self.reverse:
|
|
||||||
# We want to skip the one we already have
|
|
||||||
self.request.offset_id += 1
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
class _SearchMessagesIter(RequestIter):
|
|
||||||
async def _init(self, entity, offset_id, min_id, max_id, from_user, batch_size, offset_date, add_offset, filter, search):
|
|
||||||
self.entity = await self.client.get_input_entity(entity)
|
|
||||||
|
|
||||||
# Telegram doesn't like min_id/max_id. If these IDs are low enough
|
|
||||||
# (starting from last_id - 100), the request will return nothing.
|
|
||||||
#
|
|
||||||
# We can emulate their behaviour locally by setting offset = max_id
|
|
||||||
# and simply stopping once we hit a message with ID <= min_id.
|
|
||||||
if self.reverse:
|
|
||||||
offset_id = max(offset_id, min_id)
|
|
||||||
if offset_id and max_id:
|
|
||||||
if max_id - offset_id <= 1:
|
|
||||||
raise StopAsyncIteration
|
|
||||||
|
|
||||||
if not max_id:
|
|
||||||
max_id = float('inf')
|
|
||||||
else:
|
|
||||||
offset_id = max(offset_id, max_id)
|
|
||||||
if offset_id and min_id:
|
|
||||||
if offset_id - min_id <= 1:
|
|
||||||
raise StopAsyncIteration
|
|
||||||
|
|
||||||
if self.reverse:
|
|
||||||
if offset_id:
|
|
||||||
offset_id += 1
|
|
||||||
else:
|
|
||||||
offset_id = 1
|
|
||||||
|
|
||||||
if from_user:
|
if from_user:
|
||||||
from_user = await self.client.get_input_entity(from_user)
|
self.from_id = await self.client.get_peer_id(from_user)
|
||||||
if not isinstance(from_user, (
|
|
||||||
types.InputPeerUser, types.InputPeerSelf)):
|
|
||||||
from_user = None # Ignore from_user unless it's a user
|
|
||||||
|
|
||||||
self.from_id = (await self.client.get_peer_id(from_user)) if from_user else None
|
|
||||||
|
|
||||||
if filter is None:
|
|
||||||
filter = types.InputMessagesFilterEmpty()
|
|
||||||
|
|
||||||
# Telegram completely ignores `from_id` in private chats
|
|
||||||
if isinstance(entity, (types.InputPeerUser, types.InputPeerSelf)):
|
|
||||||
# Don't bother sending `from_user` (it's ignored anyway),
|
|
||||||
# but keep `from_id` defined above to check it locally.
|
|
||||||
from_user = None
|
|
||||||
else:
|
else:
|
||||||
# Do send `from_user` to do the filtering server-side,
|
|
||||||
# and set `from_id` to None to avoid checking it locally.
|
|
||||||
self.from_id = None
|
self.from_id = None
|
||||||
|
|
||||||
self.request = functions.messages.SearchRequest(
|
if not self.entity:
|
||||||
peer=entity,
|
self.request = functions.messages.SearchGlobalRequest(
|
||||||
q=search or '',
|
q=search or '',
|
||||||
filter=filter() if isinstance(filter, type) else filter,
|
offset_date=offset_date,
|
||||||
min_date=None,
|
offset_peer=types.InputPeerEmpty(),
|
||||||
max_date=offset_date,
|
offset_id=offset_id,
|
||||||
offset_id=offset_id,
|
limit=1
|
||||||
add_offset=add_offset,
|
)
|
||||||
limit=0, # Search actually returns 0 items if we ask it to
|
elif search is not None or filter or from_user:
|
||||||
max_id=0,
|
if filter is None:
|
||||||
min_id=0,
|
filter = types.InputMessagesFilterEmpty()
|
||||||
hash=0,
|
|
||||||
from_id=from_user
|
# Telegram completely ignores `from_id` in private chats
|
||||||
)
|
if isinstance(
|
||||||
|
self.entity, (types.InputPeerUser, types.InputPeerSelf)):
|
||||||
|
# Don't bother sending `from_user` (it's ignored anyway),
|
||||||
|
# but keep `from_id` defined above to check it locally.
|
||||||
|
from_user = None
|
||||||
|
else:
|
||||||
|
# Do send `from_user` to do the filtering server-side,
|
||||||
|
# and set `from_id` to None to avoid checking it locally.
|
||||||
|
self.from_id = None
|
||||||
|
|
||||||
|
self.request = functions.messages.SearchRequest(
|
||||||
|
peer=self.entity,
|
||||||
|
q=search or '',
|
||||||
|
filter=filter() if isinstance(filter, type) else filter,
|
||||||
|
min_date=None,
|
||||||
|
max_date=offset_date,
|
||||||
|
offset_id=offset_id,
|
||||||
|
add_offset=add_offset,
|
||||||
|
limit=0, # Search actually returns 0 items if we ask it to
|
||||||
|
max_id=0,
|
||||||
|
min_id=0,
|
||||||
|
hash=0,
|
||||||
|
from_id=from_user
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.request = functions.messages.GetHistoryRequest(
|
||||||
|
peer=self.entity,
|
||||||
|
limit=1,
|
||||||
|
offset_date=offset_date,
|
||||||
|
offset_id=offset_id,
|
||||||
|
min_id=0,
|
||||||
|
max_id=0,
|
||||||
|
add_offset=add_offset,
|
||||||
|
hash=0
|
||||||
|
)
|
||||||
|
|
||||||
if self.limit == 0:
|
if self.limit == 0:
|
||||||
# No messages, but we still need to know the total message count
|
# No messages, but we still need to know the total message count
|
||||||
|
@ -227,19 +124,20 @@ class _SearchMessagesIter(RequestIter):
|
||||||
# When going in reverse we need an offset of `-limit`, but we
|
# When going in reverse we need an offset of `-limit`, but we
|
||||||
# also want to respect what the user passed, so add them together.
|
# also want to respect what the user passed, so add them together.
|
||||||
if self.reverse:
|
if self.reverse:
|
||||||
self.request.add_offset -= batch_size
|
self.request.add_offset -= self.batch_size
|
||||||
|
|
||||||
if self.wait_time is None:
|
if self.wait_time is None:
|
||||||
self.wait_time = 1 if self.limit > 3000 else 0
|
self.wait_time = 1 if self.limit > 3000 else 0
|
||||||
|
|
||||||
# Telegram has a hard limit of 100.
|
|
||||||
# We don't need to fetch 100 if the limit is less.
|
|
||||||
self.batch_size = min(max(batch_size, 1), min(100, self.limit))
|
|
||||||
self.add_offset = add_offset
|
self.add_offset = add_offset
|
||||||
self.max_id = max_id
|
self.max_id = max_id
|
||||||
self.min_id = min_id
|
self.min_id = min_id
|
||||||
self.last_id = 0 if self.reverse else float('inf')
|
self.last_id = 0 if self.reverse else float('inf')
|
||||||
|
|
||||||
|
# Telegram has a hard limit of 100.
|
||||||
|
# We don't need to fetch 100 if the limit is less.
|
||||||
|
self.batch_size = min(max(batch_size, 1), min(100, self.limit))
|
||||||
|
|
||||||
async def _load_next_chunk(self):
|
async def _load_next_chunk(self):
|
||||||
result = []
|
result = []
|
||||||
|
|
||||||
|
@ -260,14 +158,9 @@ class _SearchMessagesIter(RequestIter):
|
||||||
or self.from_id and message.from_id != self.from_id):
|
or self.from_id and message.from_id != self.from_id):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# TODO We used to yield and return here (stopping the iterator)
|
if not self._message_in_range(message):
|
||||||
# How should we go around that here?
|
self.left = len(result)
|
||||||
if self.reverse:
|
break
|
||||||
if message.id <= self.last_id or message.id >= self.max_id:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
if message.id >= self.last_id or message.id <= self.min_id:
|
|
||||||
break
|
|
||||||
|
|
||||||
# There has been reports that on bad connections this method
|
# There has been reports that on bad connections this method
|
||||||
# was returning duplicated IDs sometimes. Using ``last_id``
|
# was returning duplicated IDs sometimes. Using ``last_id``
|
||||||
|
@ -278,34 +171,74 @@ class _SearchMessagesIter(RequestIter):
|
||||||
result.append(message)
|
result.append(message)
|
||||||
|
|
||||||
if len(r.messages) < self.request.limit:
|
if len(r.messages) < self.request.limit:
|
||||||
return result
|
self.left = len(result)
|
||||||
|
|
||||||
# Find the first message that's not empty (in some rare cases
|
# Get the first message that's not empty (in some rare cases
|
||||||
# it can happen that the last message is :tl:`MessageEmpty`)
|
# it can happen that the last message is :tl:`MessageEmpty`)
|
||||||
last_message = None
|
if result:
|
||||||
messages = r.messages if self.reverse else reversed(r.messages)
|
self._update_offset(result[0])
|
||||||
for m in messages:
|
else:
|
||||||
if not isinstance(m, types.MessageEmpty):
|
|
||||||
last_message = m
|
|
||||||
break
|
|
||||||
|
|
||||||
# TODO If it's None, we used to break (ending the iterator)
|
|
||||||
# Similar case as the return above.
|
|
||||||
if last_message is not None:
|
|
||||||
# There are some cases where all the messages we get start
|
# There are some cases where all the messages we get start
|
||||||
# being empty. This can happen on migrated mega-groups if
|
# being empty. This can happen on migrated mega-groups if
|
||||||
# the history was cleared, and we're using search. Telegram
|
# the history was cleared, and we're using search. Telegram
|
||||||
# acts incredibly weird sometimes. Messages are returned but
|
# acts incredibly weird sometimes. Messages are returned but
|
||||||
# only "empty", not their contents. If this is the case we
|
# only "empty", not their contents. If this is the case we
|
||||||
# should just give up since there won't be any new Message.
|
# should just give up since there won't be any new Message.
|
||||||
self.request.offset_id = last_message.id
|
self.left = len(result)
|
||||||
self.request.max_date = last_message.date # not offset_date
|
|
||||||
if self.reverse:
|
|
||||||
# We want to skip the one we already have
|
|
||||||
self.request.offset_id += 1
|
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
def _message_in_range(self, message):
|
||||||
|
"""
|
||||||
|
Determine whether the given message is in the range or
|
||||||
|
it should be ignored (and avoid loading more chunks).
|
||||||
|
"""
|
||||||
|
# No entity means message IDs between chats may vary
|
||||||
|
if self.entity:
|
||||||
|
if self.reverse:
|
||||||
|
if message.id <= self.last_id or message.id >= self.max_id:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
if message.id >= self.last_id or message.id <= self.min_id:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _update_offset(self, last_message):
|
||||||
|
"""
|
||||||
|
After making the request, update its offset with the last message.
|
||||||
|
"""
|
||||||
|
self.request.offset_id = last_message.id
|
||||||
|
if self.reverse:
|
||||||
|
# We want to skip the one we already have
|
||||||
|
self.request.offset_id += 1
|
||||||
|
|
||||||
|
if isinstance(self.request, functions.messages.SearchRequest):
|
||||||
|
self.request.max_date = last_message.date
|
||||||
|
else:
|
||||||
|
# getHistory and searchGlobal call it offset_date
|
||||||
|
self.request.offset_date = last_message.date
|
||||||
|
|
||||||
|
if isinstance(self.request, functions.messages.SearchGlobalRequest):
|
||||||
|
self.request.offset_peer = last_message.input_chat
|
||||||
|
|
||||||
|
|
||||||
|
class _IDsIter(RequestIter):
|
||||||
|
async def _init(self, entity, from_user, ids):
|
||||||
|
if not utils.is_list_like(ids):
|
||||||
|
self.ids = [ids]
|
||||||
|
elif not ids:
|
||||||
|
raise StopAsyncIteration
|
||||||
|
elif self.reverse:
|
||||||
|
self.ids = list(reversed(ids))
|
||||||
|
else:
|
||||||
|
self.ids = ids
|
||||||
|
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
async def _load_next_chunk(self):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods):
|
class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods):
|
||||||
|
|
||||||
|
@ -428,242 +361,26 @@ class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods):
|
||||||
an higher limit, so you're free to set the ``batch_size`` that
|
an higher limit, so you're free to set the ``batch_size`` that
|
||||||
you think may be good.
|
you think may be good.
|
||||||
"""
|
"""
|
||||||
# TODO Handle global search
|
|
||||||
# TODO Handle yield IDs
|
|
||||||
# TODO Reuse code between search, global, get history
|
|
||||||
if search is not None or filter or from_user:
|
|
||||||
return _SearchMessagesIter(
|
|
||||||
self,
|
|
||||||
limit,
|
|
||||||
entity=entity,
|
|
||||||
offset_id=offset_id,
|
|
||||||
min_id=min_id,
|
|
||||||
max_id=max_id,
|
|
||||||
from_user=from_user,
|
|
||||||
batch_size=batch_size,
|
|
||||||
offset_date=offset_date,
|
|
||||||
add_offset=add_offset,
|
|
||||||
filter=filter,
|
|
||||||
search=search
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
return _GetHistoryIter(
|
|
||||||
self,
|
|
||||||
limit,
|
|
||||||
wait_time=wait_time,
|
|
||||||
entity=entity,
|
|
||||||
reverse=reverse,
|
|
||||||
offset_id=offset_id,
|
|
||||||
min_id=min_id,
|
|
||||||
max_id=max_id,
|
|
||||||
from_user=from_user,
|
|
||||||
batch_size=batch_size,
|
|
||||||
offset_date=offset_date,
|
|
||||||
add_offset=add_offset
|
|
||||||
)
|
|
||||||
|
|
||||||
# Note that entity being ``None`` is intended to get messages by
|
if ids is not None:
|
||||||
# ID under no specific chat, and also to request a global search.
|
return _IDsIter(self, limit, entity=entity, ids=ids)
|
||||||
if entity:
|
|
||||||
entity = await self.get_input_entity(entity)
|
|
||||||
|
|
||||||
if ids:
|
return _MessagesIter(
|
||||||
if not utils.is_list_like(ids):
|
client=self,
|
||||||
ids = (ids,)
|
reverse=reverse,
|
||||||
if reverse:
|
wait_time=wait_time,
|
||||||
ids = list(reversed(ids))
|
limit=limit,
|
||||||
async for x in self._iter_ids(entity, ids, total=_total):
|
entity=entity,
|
||||||
await yield_(x)
|
offset_id=offset_id,
|
||||||
return
|
min_id=min_id,
|
||||||
|
max_id=max_id,
|
||||||
# Telegram doesn't like min_id/max_id. If these IDs are low enough
|
from_user=from_user,
|
||||||
# (starting from last_id - 100), the request will return nothing.
|
batch_size=batch_size,
|
||||||
#
|
offset_date=offset_date,
|
||||||
# We can emulate their behaviour locally by setting offset = max_id
|
add_offset=add_offset,
|
||||||
# and simply stopping once we hit a message with ID <= min_id.
|
filter=filter,
|
||||||
if reverse:
|
search=search
|
||||||
offset_id = max(offset_id, min_id)
|
)
|
||||||
if offset_id and max_id:
|
|
||||||
if max_id - offset_id <= 1:
|
|
||||||
return
|
|
||||||
|
|
||||||
if not max_id:
|
|
||||||
max_id = float('inf')
|
|
||||||
else:
|
|
||||||
offset_id = max(offset_id, max_id)
|
|
||||||
if offset_id and min_id:
|
|
||||||
if offset_id - min_id <= 1:
|
|
||||||
return
|
|
||||||
|
|
||||||
if reverse:
|
|
||||||
if offset_id:
|
|
||||||
offset_id += 1
|
|
||||||
else:
|
|
||||||
offset_id = 1
|
|
||||||
|
|
||||||
if from_user:
|
|
||||||
from_user = await self.get_input_entity(from_user)
|
|
||||||
if not isinstance(from_user, (
|
|
||||||
types.InputPeerUser, types.InputPeerSelf)):
|
|
||||||
from_user = None # Ignore from_user unless it's a user
|
|
||||||
|
|
||||||
from_id = (await self.get_peer_id(from_user)) if from_user else None
|
|
||||||
|
|
||||||
limit = float('inf') if limit is None else int(limit)
|
|
||||||
if not entity:
|
|
||||||
if reverse:
|
|
||||||
raise ValueError('Cannot reverse global search')
|
|
||||||
|
|
||||||
reverse = None
|
|
||||||
request = functions.messages.SearchGlobalRequest(
|
|
||||||
q=search or '',
|
|
||||||
offset_date=offset_date,
|
|
||||||
offset_peer=types.InputPeerEmpty(),
|
|
||||||
offset_id=offset_id,
|
|
||||||
limit=1
|
|
||||||
)
|
|
||||||
elif search is not None or filter or from_user:
|
|
||||||
if filter is None:
|
|
||||||
filter = types.InputMessagesFilterEmpty()
|
|
||||||
|
|
||||||
# Telegram completely ignores `from_id` in private chats
|
|
||||||
if isinstance(entity, (types.InputPeerUser, types.InputPeerSelf)):
|
|
||||||
# Don't bother sending `from_user` (it's ignored anyway),
|
|
||||||
# but keep `from_id` defined above to check it locally.
|
|
||||||
from_user = None
|
|
||||||
else:
|
|
||||||
# Do send `from_user` to do the filtering server-side,
|
|
||||||
# and set `from_id` to None to avoid checking it locally.
|
|
||||||
from_id = None
|
|
||||||
|
|
||||||
request = functions.messages.SearchRequest(
|
|
||||||
peer=entity,
|
|
||||||
q=search or '',
|
|
||||||
filter=filter() if isinstance(filter, type) else filter,
|
|
||||||
min_date=None,
|
|
||||||
max_date=offset_date,
|
|
||||||
offset_id=offset_id,
|
|
||||||
add_offset=add_offset,
|
|
||||||
limit=0, # Search actually returns 0 items if we ask it to
|
|
||||||
max_id=0,
|
|
||||||
min_id=0,
|
|
||||||
hash=0,
|
|
||||||
from_id=from_user
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
request = functions.messages.GetHistoryRequest(
|
|
||||||
peer=entity,
|
|
||||||
limit=1,
|
|
||||||
offset_date=offset_date,
|
|
||||||
offset_id=offset_id,
|
|
||||||
min_id=0,
|
|
||||||
max_id=0,
|
|
||||||
add_offset=add_offset,
|
|
||||||
hash=0
|
|
||||||
)
|
|
||||||
|
|
||||||
if limit == 0:
|
|
||||||
if not _total:
|
|
||||||
return
|
|
||||||
# No messages, but we still need to know the total message count
|
|
||||||
result = await self(request)
|
|
||||||
if isinstance(result, types.messages.MessagesNotModified):
|
|
||||||
_total[0] = result.count
|
|
||||||
else:
|
|
||||||
_total[0] = getattr(result, 'count', len(result.messages))
|
|
||||||
return
|
|
||||||
|
|
||||||
if wait_time is None:
|
|
||||||
wait_time = 1 if limit > 3000 else 0
|
|
||||||
|
|
||||||
have = 0
|
|
||||||
last_id = 0 if reverse else float('inf')
|
|
||||||
|
|
||||||
# Telegram has a hard limit of 100.
|
|
||||||
# We don't need to fetch 100 if the limit is less.
|
|
||||||
batch_size = min(max(batch_size, 1), min(100, limit))
|
|
||||||
|
|
||||||
# When going in reverse we need an offset of `-limit`, but we
|
|
||||||
# also want to respect what the user passed, so add them together.
|
|
||||||
if reverse:
|
|
||||||
request.add_offset -= batch_size
|
|
||||||
|
|
||||||
while have < limit:
|
|
||||||
start = time.time()
|
|
||||||
|
|
||||||
request.limit = min(limit - have, batch_size)
|
|
||||||
if reverse and request.limit != batch_size:
|
|
||||||
# Remember that we need -limit when going in reverse
|
|
||||||
request.add_offset = add_offset - request.limit
|
|
||||||
|
|
||||||
r = await self(request)
|
|
||||||
if _total:
|
|
||||||
_total[0] = getattr(r, 'count', len(r.messages))
|
|
||||||
|
|
||||||
entities = {utils.get_peer_id(x): x
|
|
||||||
for x in itertools.chain(r.users, r.chats)}
|
|
||||||
|
|
||||||
messages = reversed(r.messages) if reverse else r.messages
|
|
||||||
for message in messages:
|
|
||||||
if (isinstance(message, types.MessageEmpty)
|
|
||||||
or from_id and message.from_id != from_id):
|
|
||||||
continue
|
|
||||||
|
|
||||||
if reverse is None:
|
|
||||||
pass
|
|
||||||
elif reverse:
|
|
||||||
if message.id <= last_id or message.id >= max_id:
|
|
||||||
return
|
|
||||||
else:
|
|
||||||
if message.id >= last_id or message.id <= min_id:
|
|
||||||
return
|
|
||||||
|
|
||||||
# There has been reports that on bad connections this method
|
|
||||||
# was returning duplicated IDs sometimes. Using ``last_id``
|
|
||||||
# is an attempt to avoid these duplicates, since the message
|
|
||||||
# IDs are returned in descending order (or asc if reverse).
|
|
||||||
last_id = message.id
|
|
||||||
|
|
||||||
message._finish_init(self, entities, entity)
|
|
||||||
await yield_(message)
|
|
||||||
have += 1
|
|
||||||
|
|
||||||
if len(r.messages) < request.limit:
|
|
||||||
break
|
|
||||||
|
|
||||||
# Find the first message that's not empty (in some rare cases
|
|
||||||
# it can happen that the last message is :tl:`MessageEmpty`)
|
|
||||||
last_message = None
|
|
||||||
messages = r.messages if reverse else reversed(r.messages)
|
|
||||||
for m in messages:
|
|
||||||
if not isinstance(m, types.MessageEmpty):
|
|
||||||
last_message = m
|
|
||||||
break
|
|
||||||
|
|
||||||
if last_message is None:
|
|
||||||
# There are some cases where all the messages we get start
|
|
||||||
# being empty. This can happen on migrated mega-groups if
|
|
||||||
# the history was cleared, and we're using search. Telegram
|
|
||||||
# acts incredibly weird sometimes. Messages are returned but
|
|
||||||
# only "empty", not their contents. If this is the case we
|
|
||||||
# should just give up since there won't be any new Message.
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
request.offset_id = last_message.id
|
|
||||||
if isinstance(request, functions.messages.SearchRequest):
|
|
||||||
request.max_date = last_message.date
|
|
||||||
else:
|
|
||||||
# getHistory and searchGlobal call it offset_date
|
|
||||||
request.offset_date = last_message.date
|
|
||||||
|
|
||||||
if isinstance(request, functions.messages.SearchGlobalRequest):
|
|
||||||
request.offset_peer = last_message.input_chat
|
|
||||||
elif reverse:
|
|
||||||
# We want to skip the one we already have
|
|
||||||
request.offset_id += 1
|
|
||||||
|
|
||||||
await asyncio.sleep(
|
|
||||||
max(wait_time - (time.time() - start), 0), loop=self._loop)
|
|
||||||
|
|
||||||
async def get_messages(self, *args, **kwargs):
|
async def get_messages(self, *args, **kwargs):
|
||||||
"""
|
"""
|
||||||
|
@ -682,6 +399,7 @@ class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods):
|
||||||
a single `Message <telethon.tl.custom.message.Message>` will be
|
a single `Message <telethon.tl.custom.message.Message>` will be
|
||||||
returned for convenience instead of a list.
|
returned for convenience instead of a list.
|
||||||
"""
|
"""
|
||||||
|
# TODO Make RequestIter have a .collect() or similar
|
||||||
total = [0]
|
total = [0]
|
||||||
kwargs['_total'] = total
|
kwargs['_total'] = total
|
||||||
if len(args) == 1 and 'limit' not in kwargs:
|
if len(args) == 1 and 'limit' not in kwargs:
|
||||||
|
|
|
@ -3,6 +3,10 @@ import asyncio
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
# TODO There are two types of iterators for requests.
|
||||||
|
# One has a limit of items to retrieve, and the
|
||||||
|
# other has a list that must be called in chunks.
|
||||||
|
# Make classes for both here so it's easy to use.
|
||||||
class RequestIter(abc.ABC):
|
class RequestIter(abc.ABC):
|
||||||
"""
|
"""
|
||||||
Helper class to deal with requests that need offsets to iterate.
|
Helper class to deal with requests that need offsets to iterate.
|
||||||
|
@ -50,6 +54,9 @@ class RequestIter(abc.ABC):
|
||||||
if self.buffer is ():
|
if self.buffer is ():
|
||||||
await self._init(**self.kwargs)
|
await self._init(**self.kwargs)
|
||||||
|
|
||||||
|
if self.left <= 0: # <= 0 because subclasses may change it
|
||||||
|
raise StopAsyncIteration
|
||||||
|
|
||||||
if self.index == len(self.buffer):
|
if self.index == len(self.buffer):
|
||||||
# asyncio will handle times <= 0 to sleep 0 seconds
|
# asyncio will handle times <= 0 to sleep 0 seconds
|
||||||
if self.wait_time:
|
if self.wait_time:
|
||||||
|
@ -84,7 +91,7 @@ class RequestIter(abc.ABC):
|
||||||
'is running (i.e. you are inside an "async def")'
|
'is running (i.e. you are inside an "async def")'
|
||||||
)
|
)
|
||||||
|
|
||||||
raise NotImplementedError('lol!')
|
return self.__aiter__()
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
async def _load_next_chunk(self):
|
async def _load_next_chunk(self):
|
||||||
|
|
Loading…
Reference in New Issue
Block a user