mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2024-11-29 21:03:45 +03:00
d1836ab899
If the type is not correct, we can let the API fail instead. This way, if the API supports more types in the future, there does not need to be an update. This has affected participant permissions in groups.
687 lines
25 KiB
Python
687 lines
25 KiB
Python
import asyncio
|
|
import inspect
|
|
import itertools
|
|
import string
|
|
import typing
|
|
import dataclasses
|
|
|
|
from .. import errors, _tl
|
|
from .._misc import helpers, utils, requestiter, tlobject, enums, hints
|
|
from ..types import _custom
|
|
|
|
if typing.TYPE_CHECKING:
|
|
from .telegramclient import TelegramClient
|
|
|
|
_MAX_PARTICIPANTS_CHUNK_SIZE = 200
|
|
_MAX_ADMIN_LOG_CHUNK_SIZE = 100
|
|
_MAX_PROFILE_PHOTO_CHUNK_SIZE = 100
|
|
|
|
|
|
class _ChatAction:
|
|
def __init__(self, client, chat, action, *, delay, auto_cancel):
|
|
self._client = client
|
|
self._delay = delay
|
|
self._auto_cancel = auto_cancel
|
|
self._request = _tl.fn.messages.SetTyping(chat, action)
|
|
self._task = None
|
|
self._running = False
|
|
|
|
def __await__(self):
|
|
return self._once().__await__()
|
|
|
|
async def __aenter__(self):
|
|
self._request = dataclasses.replace(self._request, peer=await self._client.get_input_entity(self._request.peer))
|
|
self._running = True
|
|
self._task = asyncio.create_task(self._update())
|
|
return self
|
|
|
|
async def __aexit__(self, *args):
|
|
self._running = False
|
|
if self._task:
|
|
self._task.cancel()
|
|
try:
|
|
await self._task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
self._task = None
|
|
|
|
async def _once(self):
|
|
self._request = dataclasses.replace(self._request, peer=await self._client.get_input_entity(self._request.peer))
|
|
await self._client(_tl.fn.messages.SetTyping(self._chat, self._action))
|
|
|
|
async def _update(self):
|
|
try:
|
|
while self._running:
|
|
await self._client(self._request)
|
|
await asyncio.sleep(self._delay)
|
|
except ConnectionError:
|
|
pass
|
|
except asyncio.CancelledError:
|
|
if self._auto_cancel:
|
|
await self._client(_tl.fn.messages.SetTyping(
|
|
self._chat, _tl.SendMessageCancelAction()))
|
|
|
|
@staticmethod
|
|
def _parse(action):
|
|
if isinstance(action, tlobject.TLObject) and action.SUBCLASS_OF_ID != 0x20b2cc21:
|
|
return action
|
|
|
|
return {
|
|
enums.Action.TYPING: _tl.SendMessageTypingAction(),
|
|
enums.Action.CONTACT: _tl.SendMessageChooseContactAction(),
|
|
enums.Action.GAME: _tl.SendMessageGamePlayAction(),
|
|
enums.Action.LOCATION: _tl.SendMessageGeoLocationAction(),
|
|
enums.Action.STICKER: _tl.SendMessageChooseStickerAction(),
|
|
enums.Action.RECORD_AUDIO: _tl.SendMessageRecordAudioAction(),
|
|
enums.Action.RECORD_ROUND: _tl.SendMessageRecordRoundAction(),
|
|
enums.Action.RECORD_VIDEO: _tl.SendMessageRecordVideoAction(),
|
|
enums.Action.AUDIO: _tl.SendMessageUploadAudioAction(1),
|
|
enums.Action.ROUND: _tl.SendMessageUploadRoundAction(1),
|
|
enums.Action.VIDEO: _tl.SendMessageUploadVideoAction(1),
|
|
enums.Action.PHOTO: _tl.SendMessageUploadPhotoAction(1),
|
|
enums.Action.DOCUMENT: _tl.SendMessageUploadDocumentAction(1),
|
|
enums.Action.CANCEL: _tl.SendMessageCancelAction(),
|
|
}[enums.Action(action)]
|
|
|
|
def progress(self, current, total):
|
|
if hasattr(self._request.action, 'progress'):
|
|
self._request = dataclasses.replace(
|
|
self._request,
|
|
action=dataclasses.replace(self._request.action, progress=100 * round(current / total))
|
|
)
|
|
|
|
|
|
class _ParticipantsIter(requestiter.RequestIter):
|
|
async def _init(self, entity, filter, search):
|
|
if not filter:
|
|
if search:
|
|
filter = _tl.ChannelParticipantsSearch(search)
|
|
else:
|
|
filter = _tl.ChannelParticipantsRecent()
|
|
else:
|
|
filter = enums.Participant(filter)
|
|
if filter == enums.Participant.ADMIN:
|
|
filter = _tl.ChannelParticipantsAdmins()
|
|
elif filter == enums.Participant.BOT:
|
|
filter = _tl.ChannelParticipantsBots()
|
|
elif filter == enums.Participant.KICKED:
|
|
filter = _tl.ChannelParticipantsKicked(search)
|
|
elif filter == enums.Participant.BANNED:
|
|
filter = _tl.ChannelParticipantsBanned(search)
|
|
elif filter == enums.Participant.CONTACT:
|
|
filter = _tl.ChannelParticipantsContacts(search)
|
|
else:
|
|
raise RuntimeError('unhandled enum variant')
|
|
|
|
entity = await self.client.get_input_entity(entity)
|
|
ty = helpers._entity_type(entity)
|
|
if search and (filter or ty != helpers._EntityType.CHANNEL):
|
|
# We need to 'search' ourselves unless we have a PeerChannel
|
|
search = search.casefold()
|
|
|
|
self.filter_entity = lambda ent: (
|
|
search in utils.get_display_name(ent).casefold() or
|
|
search in (getattr(ent, 'username', None) or '').casefold()
|
|
)
|
|
else:
|
|
self.filter_entity = lambda ent: True
|
|
|
|
if ty == helpers._EntityType.CHANNEL:
|
|
if self.limit <= 0:
|
|
# May not have access to the channel, but getFull can get the .total.
|
|
self.total = (await self.client(
|
|
_tl.fn.channels.GetFullChannel(entity)
|
|
)).full_chat.participants_count
|
|
raise StopAsyncIteration
|
|
|
|
self.seen = set()
|
|
self.request = _tl.fn.channels.GetParticipants(
|
|
channel=entity,
|
|
filter=filter or _tl.ChannelParticipantsSearch(search),
|
|
offset=0,
|
|
limit=_MAX_PARTICIPANTS_CHUNK_SIZE,
|
|
hash=0
|
|
)
|
|
|
|
elif ty == helpers._EntityType.CHAT:
|
|
full = await self.client(
|
|
_tl.fn.messages.GetFullChat(entity.chat_id))
|
|
if not isinstance(
|
|
full.full_chat.participants, _tl.ChatParticipants):
|
|
# ChatParticipantsForbidden won't have ``.participants``
|
|
self.total = 0
|
|
raise StopAsyncIteration
|
|
|
|
self.total = len(full.full_chat.participants.participants)
|
|
|
|
users = {user.id: user for user in full.users}
|
|
for participant in full.full_chat.participants.participants:
|
|
if isinstance(participant, _tl.ChannelParticipantBanned):
|
|
user_id = participant.peer.user_id
|
|
else:
|
|
user_id = participant.user_id
|
|
user = users[user_id]
|
|
if not self.filter_entity(user):
|
|
continue
|
|
|
|
user = users[user_id]
|
|
self.buffer.append(user)
|
|
|
|
return True
|
|
else:
|
|
self.total = 1
|
|
if self.limit != 0:
|
|
user = await self.client.get_entity(entity)
|
|
if self.filter_entity(user):
|
|
self.buffer.append(user)
|
|
|
|
return True
|
|
|
|
async def _load_next_chunk(self):
|
|
# Only care about the limit for the first request
|
|
# (small amount of people).
|
|
#
|
|
# Most people won't care about getting exactly 12,345
|
|
# members so it doesn't really matter not to be 100%
|
|
# precise with being out of the offset/limit here.
|
|
self.request = dataclasses.replace(self.request, limit=min(
|
|
self.limit - self.request.offset, _MAX_PARTICIPANTS_CHUNK_SIZE))
|
|
|
|
if self.request.offset > self.limit:
|
|
return True
|
|
|
|
participants = await self.client(self.request)
|
|
self.total = participants.count
|
|
|
|
self.request = dataclasses.replace(self.request, offset=self.request.offset + len(participants.participants))
|
|
users = {user.id: user for user in participants.users}
|
|
for participant in participants.participants:
|
|
if isinstance(participant, _tl.ChannelParticipantBanned):
|
|
if not isinstance(participant.peer, _tl.PeerUser):
|
|
# May have the entire channel banned. See #3105.
|
|
continue
|
|
user_id = participant.peer.user_id
|
|
else:
|
|
user_id = participant.user_id
|
|
|
|
user = users[user_id]
|
|
if not self.filter_entity(user) or user.id in self.seen:
|
|
continue
|
|
self.seen.add(user_id)
|
|
user = users[user_id]
|
|
self.buffer.append(user)
|
|
|
|
|
|
class _AdminLogIter(requestiter.RequestIter):
|
|
async def _init(
|
|
self, entity, admins, search, min_id, max_id,
|
|
join, leave, invite, restrict, unrestrict, ban, unban,
|
|
promote, demote, info, settings, pinned, edit, delete,
|
|
group_call
|
|
):
|
|
if any((join, leave, invite, restrict, unrestrict, ban, unban,
|
|
promote, demote, info, settings, pinned, edit, delete,
|
|
group_call)):
|
|
events_filter = _tl.ChannelAdminLogEventsFilter(
|
|
join=join, leave=leave, invite=invite, ban=restrict,
|
|
unban=unrestrict, kick=ban, unkick=unban, promote=promote,
|
|
demote=demote, info=info, settings=settings, pinned=pinned,
|
|
edit=edit, delete=delete, group_call=group_call
|
|
)
|
|
else:
|
|
events_filter = None
|
|
|
|
self.entity = await self.client.get_input_entity(entity)
|
|
|
|
admin_list = []
|
|
if admins:
|
|
if not utils.is_list_like(admins):
|
|
admins = (admins,)
|
|
|
|
for admin in admins:
|
|
admin_list.append(await self.client.get_input_entity(admin))
|
|
|
|
self.request = _tl.fn.channels.GetAdminLog(
|
|
self.entity, q=search or '', min_id=min_id, max_id=max_id,
|
|
limit=0, events_filter=events_filter, admins=admin_list or None
|
|
)
|
|
|
|
async def _load_next_chunk(self):
|
|
self.request = dataclasses.replace(self.request, limit=min(self.left, _MAX_ADMIN_LOG_CHUNK_SIZE))
|
|
r = await self.client(self.request)
|
|
entities = {utils.get_peer_id(x): x
|
|
for x in itertools.chain(r.users, r.chats)}
|
|
|
|
self.request = dataclasses.replace(self.request, max_id=min((e.id for e in r.events), default=0))
|
|
for ev in r.events:
|
|
if isinstance(ev.action,
|
|
_tl.ChannelAdminLogEventActionEditMessage):
|
|
ev = dataclasses.replace(ev, action=dataclasses.replace(
|
|
ev.action,
|
|
prev_message=_custom.Message._new(self.client, ev.action.prev_message, entities, self.entity),
|
|
new_message=_custom.Message._new(self.client, ev.action.new_message, entities, self.entity)
|
|
))
|
|
|
|
elif isinstance(ev.action,
|
|
_tl.ChannelAdminLogEventActionDeleteMessage):
|
|
ev.action.message = _custom.Message._new(
|
|
self.client, ev.action.message, entities, self.entity)
|
|
|
|
self.buffer.append(_custom.AdminLogEvent(ev, entities))
|
|
|
|
if len(r.events) < self.request.limit:
|
|
return True
|
|
|
|
|
|
class _ProfilePhotoIter(requestiter.RequestIter):
|
|
async def _init(
|
|
self, entity, offset, max_id
|
|
):
|
|
entity = await self.client.get_input_entity(entity)
|
|
ty = helpers._entity_type(entity)
|
|
if ty == helpers._EntityType.USER:
|
|
self.request = _tl.fn.photos.GetUserPhotos(
|
|
entity,
|
|
offset=offset,
|
|
max_id=max_id,
|
|
limit=1
|
|
)
|
|
else:
|
|
self.request = _tl.fn.messages.Search(
|
|
peer=entity,
|
|
q='',
|
|
filter=_tl.InputMessagesFilterChatPhotos(),
|
|
min_date=None,
|
|
max_date=None,
|
|
offset_id=0,
|
|
add_offset=offset,
|
|
limit=1,
|
|
max_id=max_id,
|
|
min_id=0,
|
|
hash=0
|
|
)
|
|
|
|
if self.limit == 0:
|
|
self.request = dataclasses.replace(self.request, limit=1)
|
|
result = await self.client(self.request)
|
|
if isinstance(result, _tl.photos.Photos):
|
|
self.total = len(result.photos)
|
|
elif isinstance(result, _tl.messages.Messages):
|
|
self.total = len(result.messages)
|
|
else:
|
|
# Luckily both photosSlice and messages have a count for total
|
|
self.total = getattr(result, 'count', None)
|
|
|
|
async def _load_next_chunk(self):
|
|
self.request = dataclasses.replace(self.request, limit=min(self.left, _MAX_PROFILE_PHOTO_CHUNK_SIZE))
|
|
result = await self.client(self.request)
|
|
|
|
if isinstance(result, _tl.photos.Photos):
|
|
self.buffer = result.photos
|
|
self.left = len(self.buffer)
|
|
self.total = len(self.buffer)
|
|
elif isinstance(result, _tl.messages.Messages):
|
|
self.buffer = [x.action.photo for x in result.messages
|
|
if isinstance(x.action, _tl.MessageActionChatEditPhoto)]
|
|
|
|
self.left = len(self.buffer)
|
|
self.total = len(self.buffer)
|
|
elif isinstance(result, _tl.photos.PhotosSlice):
|
|
self.buffer = result.photos
|
|
self.total = result.count
|
|
if len(self.buffer) < self.request.limit:
|
|
self.left = len(self.buffer)
|
|
else:
|
|
self.request = dataclasses.replace(self.request, offset=self.request.offset + len(result.photos))
|
|
else:
|
|
# Some broadcast channels have a photo that this request doesn't
|
|
# retrieve for whatever random reason the Telegram server feels.
|
|
#
|
|
# This means the `total` count may be wrong but there's not much
|
|
# that can be done around it (perhaps there are too many photos
|
|
# and this is only a partial result so it's not possible to just
|
|
# use the len of the result).
|
|
self.total = getattr(result, 'count', None)
|
|
|
|
# Unconditionally fetch the full channel to obtain this photo and
|
|
# yield it with the rest (unless it's a duplicate).
|
|
seen_id = None
|
|
if isinstance(result, _tl.messages.ChannelMessages):
|
|
channel = await self.client(_tl.fn.channels.GetFullChannel(self.request.peer))
|
|
photo = channel.full_chat.chat_photo
|
|
if isinstance(photo, _tl.Photo):
|
|
self.buffer.append(photo)
|
|
seen_id = photo.id
|
|
|
|
self.buffer.extend(
|
|
x.action.photo for x in result.messages
|
|
if isinstance(x.action, _tl.MessageActionChatEditPhoto)
|
|
and x.action.photo.id != seen_id
|
|
)
|
|
|
|
if len(result.messages) < self.request.limit:
|
|
self.left = len(self.buffer)
|
|
elif result.messages:
|
|
self.request = dataclasses.replace(
|
|
self.request,
|
|
add_offset=0,
|
|
offset_id=result.messages[-1].id
|
|
)
|
|
|
|
|
|
def get_participants(
|
|
self: 'TelegramClient',
|
|
entity: 'hints.EntityLike',
|
|
limit: float = (),
|
|
*,
|
|
search: str = '',
|
|
filter: '_tl.TypeChannelParticipantsFilter' = None) -> _ParticipantsIter:
|
|
return _ParticipantsIter(
|
|
self,
|
|
limit,
|
|
entity=entity,
|
|
filter=filter,
|
|
search=search
|
|
)
|
|
|
|
|
|
def get_admin_log(
|
|
self: 'TelegramClient',
|
|
entity: 'hints.EntityLike',
|
|
limit: float = (),
|
|
*,
|
|
max_id: int = 0,
|
|
min_id: int = 0,
|
|
search: str = None,
|
|
admins: 'hints.EntitiesLike' = None,
|
|
join: bool = None,
|
|
leave: bool = None,
|
|
invite: bool = None,
|
|
restrict: bool = None,
|
|
unrestrict: bool = None,
|
|
ban: bool = None,
|
|
unban: bool = None,
|
|
promote: bool = None,
|
|
demote: bool = None,
|
|
info: bool = None,
|
|
settings: bool = None,
|
|
pinned: bool = None,
|
|
edit: bool = None,
|
|
delete: bool = None,
|
|
group_call: bool = None) -> _AdminLogIter:
|
|
return _AdminLogIter(
|
|
self,
|
|
limit,
|
|
entity=entity,
|
|
admins=admins,
|
|
search=search,
|
|
min_id=min_id,
|
|
max_id=max_id,
|
|
join=join,
|
|
leave=leave,
|
|
invite=invite,
|
|
restrict=restrict,
|
|
unrestrict=unrestrict,
|
|
ban=ban,
|
|
unban=unban,
|
|
promote=promote,
|
|
demote=demote,
|
|
info=info,
|
|
settings=settings,
|
|
pinned=pinned,
|
|
edit=edit,
|
|
delete=delete,
|
|
group_call=group_call
|
|
)
|
|
|
|
|
|
def get_profile_photos(
|
|
self: 'TelegramClient',
|
|
entity: 'hints.EntityLike',
|
|
limit: int = (),
|
|
*,
|
|
offset: int = 0,
|
|
max_id: int = 0) -> _ProfilePhotoIter:
|
|
return _ProfilePhotoIter(
|
|
self,
|
|
limit,
|
|
entity=entity,
|
|
offset=offset,
|
|
max_id=max_id
|
|
)
|
|
|
|
|
|
def action(
|
|
self: 'TelegramClient',
|
|
entity: 'hints.EntityLike',
|
|
action: 'typing.Union[str, _tl.TypeSendMessageAction]',
|
|
*,
|
|
delay: float = 4,
|
|
auto_cancel: bool = True) -> 'typing.Union[_ChatAction, typing.Coroutine]':
|
|
action = _ChatAction._parse(action)
|
|
|
|
return _ChatAction(
|
|
self, entity, action, delay=delay, auto_cancel=auto_cancel)
|
|
|
|
async def edit_admin(
|
|
self: 'TelegramClient',
|
|
entity: 'hints.EntityLike',
|
|
user: 'hints.EntityLike',
|
|
*,
|
|
change_info: bool = None,
|
|
post_messages: bool = None,
|
|
edit_messages: bool = None,
|
|
delete_messages: bool = None,
|
|
ban_users: bool = None,
|
|
invite_users: bool = None,
|
|
pin_messages: bool = None,
|
|
add_admins: bool = None,
|
|
manage_call: bool = None,
|
|
anonymous: bool = None,
|
|
is_admin: bool = None,
|
|
title: str = None) -> _tl.Updates:
|
|
entity = await self.get_input_entity(entity)
|
|
user = await self.get_input_entity(user)
|
|
ty = helpers._entity_type(user)
|
|
|
|
perm_names = (
|
|
'change_info', 'post_messages', 'edit_messages', 'delete_messages',
|
|
'ban_users', 'invite_users', 'pin_messages', 'add_admins',
|
|
'anonymous', 'manage_call',
|
|
)
|
|
|
|
ty = helpers._entity_type(entity)
|
|
if ty == helpers._EntityType.CHANNEL:
|
|
# If we try to set these permissions in a megagroup, we
|
|
# would get a RIGHT_FORBIDDEN. However, it makes sense
|
|
# that an admin can post messages, so we want to avoid the error
|
|
if post_messages or edit_messages:
|
|
# TODO get rid of this once sessions cache this information
|
|
if entity.channel_id not in self._megagroup_cache:
|
|
full_entity = await self.get_entity(entity)
|
|
self._megagroup_cache[entity.channel_id] = full_entity.megagroup
|
|
|
|
if self._megagroup_cache[entity.channel_id]:
|
|
post_messages = None
|
|
edit_messages = None
|
|
|
|
perms = locals()
|
|
return await self(_tl.fn.channels.EditAdmin(entity, user, _tl.ChatAdminRights(**{
|
|
# A permission is its explicit (not-None) value or `is_admin`.
|
|
# This essentially makes `is_admin` be the default value.
|
|
name: perms[name] if perms[name] is not None else is_admin
|
|
for name in perm_names
|
|
}), rank=title or ''))
|
|
|
|
elif ty == helpers._EntityType.CHAT:
|
|
# If the user passed any permission in a small
|
|
# group chat, they must be a full admin to have it.
|
|
if is_admin is None:
|
|
is_admin = any(locals()[x] for x in perm_names)
|
|
|
|
return await self(_tl.fn.messages.EditChatAdmin(
|
|
entity, user, is_admin=is_admin))
|
|
|
|
else:
|
|
raise ValueError(
|
|
'You can only edit permissions in groups and channels')
|
|
|
|
async def edit_permissions(
|
|
self: 'TelegramClient',
|
|
entity: 'hints.EntityLike',
|
|
user: 'typing.Optional[hints.EntityLike]' = None,
|
|
until_date: 'hints.DateLike' = None,
|
|
*,
|
|
view_messages: bool = True,
|
|
send_messages: bool = True,
|
|
send_media: bool = True,
|
|
send_stickers: bool = True,
|
|
send_gifs: bool = True,
|
|
send_games: bool = True,
|
|
send_inline: bool = True,
|
|
embed_link_previews: bool = True,
|
|
send_polls: bool = True,
|
|
change_info: bool = True,
|
|
invite_users: bool = True,
|
|
pin_messages: bool = True) -> _tl.Updates:
|
|
entity = await self.get_input_entity(entity)
|
|
ty = helpers._entity_type(entity)
|
|
|
|
rights = _tl.ChatBannedRights(
|
|
until_date=until_date,
|
|
view_messages=not view_messages,
|
|
send_messages=not send_messages,
|
|
send_media=not send_media,
|
|
send_stickers=not send_stickers,
|
|
send_gifs=not send_gifs,
|
|
send_games=not send_games,
|
|
send_inline=not send_inline,
|
|
embed_links=not embed_link_previews,
|
|
send_polls=not send_polls,
|
|
change_info=not change_info,
|
|
invite_users=not invite_users,
|
|
pin_messages=not pin_messages
|
|
)
|
|
|
|
if user is None:
|
|
return await self(_tl.fn.messages.EditChatDefaultBannedRights(
|
|
peer=entity,
|
|
banned_rights=rights
|
|
))
|
|
|
|
user = await self.get_input_entity(user)
|
|
|
|
if isinstance(user, _tl.InputPeerSelf):
|
|
raise ValueError('You cannot restrict yourself')
|
|
|
|
return await self(_tl.fn.channels.EditBanned(
|
|
channel=entity,
|
|
participant=user,
|
|
banned_rights=rights
|
|
))
|
|
|
|
async def kick_participant(
|
|
self: 'TelegramClient',
|
|
entity: 'hints.EntityLike',
|
|
user: 'typing.Optional[hints.EntityLike]'
|
|
):
|
|
entity = await self.get_input_entity(entity)
|
|
user = await self.get_input_entity(user)
|
|
|
|
ty = helpers._entity_type(entity)
|
|
if ty == helpers._EntityType.CHAT:
|
|
resp = await self(_tl.fn.messages.DeleteChatUser(entity.chat_id, user))
|
|
elif ty == helpers._EntityType.CHANNEL:
|
|
if isinstance(user, _tl.InputPeerSelf):
|
|
# Despite no longer being in the channel, the account still
|
|
# seems to get the service message.
|
|
resp = await self(_tl.fn.channels.LeaveChannel(entity))
|
|
else:
|
|
resp = await self(_tl.fn.channels.EditBanned(
|
|
channel=entity,
|
|
participant=user,
|
|
banned_rights=_tl.ChatBannedRights(
|
|
until_date=None, view_messages=True)
|
|
))
|
|
await asyncio.sleep(0.5)
|
|
await self(_tl.fn.channels.EditBanned(
|
|
channel=entity,
|
|
participant=user,
|
|
banned_rights=_tl.ChatBannedRights(until_date=None)
|
|
))
|
|
else:
|
|
raise ValueError('You must pass either a channel or a chat')
|
|
|
|
return self._get_response_message(None, resp, entity)
|
|
|
|
async def get_permissions(
|
|
self: 'TelegramClient',
|
|
entity: 'hints.EntityLike',
|
|
user: 'hints.EntityLike' = None
|
|
) -> 'typing.Optional[_custom.ParticipantPermissions]':
|
|
entity = await self.get_entity(entity)
|
|
|
|
if not user:
|
|
if helpers._entity_type(entity) != helpers._EntityType.USER:
|
|
return entity.default_banned_rights
|
|
|
|
entity = await self.get_input_entity(entity)
|
|
user = await self.get_input_entity(user)
|
|
|
|
if helpers._entity_type(entity) == helpers._EntityType.CHANNEL:
|
|
participant = await self(_tl.fn.channels.GetParticipant(
|
|
entity,
|
|
user
|
|
))
|
|
return _custom.ParticipantPermissions(participant.participant, False)
|
|
elif helpers._entity_type(entity) == helpers._EntityType.CHAT:
|
|
chat = await self(_tl.fn.messages.GetFullChat(
|
|
entity
|
|
))
|
|
if isinstance(user, _tl.InputPeerSelf):
|
|
user = _tl.PeerUser(self._session_state.user_id)
|
|
for participant in chat.full_chat.participants.participants:
|
|
if participant.user_id == user.user_id:
|
|
return _custom.ParticipantPermissions(participant, True)
|
|
raise errors.USER_NOT_PARTICIPANT(400, 'USER_NOT_PARTICIPANT')
|
|
|
|
raise ValueError('You must pass either a channel or a chat')
|
|
|
|
async def get_stats(
|
|
self: 'TelegramClient',
|
|
entity: 'hints.EntityLike',
|
|
message: 'typing.Union[int, _tl.Message]' = None,
|
|
):
|
|
entity = await self.get_input_entity(entity)
|
|
|
|
message = utils.get_message_id(message)
|
|
if message is not None:
|
|
try:
|
|
req = _tl.fn.stats.GetMessageStats(entity, message)
|
|
return await self(req)
|
|
except errors.STATS_MIGRATE as e:
|
|
dc = e.dc
|
|
else:
|
|
# Don't bother fetching the Channel entity (costs a request), instead
|
|
# try to guess and if it fails we know it's the other one (best case
|
|
# no extra request, worst just one).
|
|
try:
|
|
req = _tl.fn.stats.GetBroadcastStats(entity)
|
|
return await self(req)
|
|
except errors.STATS_MIGRATE as e:
|
|
dc = e.dc
|
|
except errors.BROADCAST_REQUIRED:
|
|
req = _tl.fn.stats.GetMegagroupStats(entity)
|
|
try:
|
|
return await self(req)
|
|
except errors.STATS_MIGRATE as e:
|
|
dc = e.dc
|
|
|
|
sender = await self._borrow_exported_sender(dc)
|
|
try:
|
|
# req will be resolved to use the right types inside by now
|
|
return await sender.send(req)
|
|
finally:
|
|
await self._return_exported_sender(sender)
|