This commit is contained in:
New-dev0 2021-06-13 13:41:31 +05:30
parent 8b1cb2894b
commit fb1d19a1eb
7 changed files with 184 additions and 160 deletions

View File

@ -348,4 +348,6 @@ library a lot easier. Only the interesting ones will be listed here.
get_inner_text
get_peer_id
resolve_id
pack_bot_file_id
resolve_bot_file_id
resolve_invite_link

View File

@ -484,38 +484,6 @@ class ChatMethods:
get_participants.__signature__ = inspect.signature(iter_participants)
async def get_participant(
self: 'TelegramClient',
chat: 'hints.EntityLike',
entity: 'hints.EntityLike'
):
"""
Get One Participant of Specific Channel or Chat.
Arguments
chat (`entity`):
The channel/chat entity from where to Get Participant.
entity (`entity`):
Username/UserId of person to look for.
Raises
UserNotParticipantError - User is Not Participant of Chat/Channel.
Returns
The resulting :tl:`ChannelParticipant` object.
Example
.. code-block:: python
await client.get_participant(chat, user)
"""
chat = await self.get_input_entity(chat)
user = await self.get_input_entity(entity)
return await self(functions.channels.GetParticipantRequest(
channel=chat,
participant=user)
)
def iter_admin_log(
self: 'TelegramClient',
@ -1372,130 +1340,4 @@ class ChatMethods:
finally:
await self._return_exported_sender(sender)
async def modify_groupcall(
self: 'TelegramClient',
method: str,
entity: 'hints.EntityLike',
schedule: 'hints.DateLike' = None,
title: str = None,
reset_invite_hash: bool = None,
join_muted: bool = None
):
"""
Stuff to do with Group Calls, Possible for Administrator and
Creator.
will raise ValueError, if `method` is None or Invalid.
Arguments
method (`str`):
Any of `create`, `discard`, `start_schedule` and
`edit_title` will work. Check Examples to know more Clearly.
entity (`int` | `str`):
Username/ID of Channel or Chat, where to Modify Group Call.
schedule (`hints.Datelike`, `optional`):
Used to Schedule the GroupCall.
title (`str`, `optional`):
Edits the Group call Title. It should be used with
`edit_title` as method parameter.
reset_invite_hash (`bool`, `optional`):
Reset the Invite Hash of Group Call of Specific Channel or
Chat.
join_muted (`bool`, `optional`):
Whether the New Group Call Participant should be Muted or
Not.
Method Parameter :
- `create` : `Create a Group Call or Schedule It.`
- `discard` : `Stop a Group Call.`
- `edit_title` : `Edit title of Group Call.`
- `edit_perms` : `Edit Permissions of Group Call.`
- `start_schedule` : `Start a Group Call which was Scheduled.`
Returns
The resulting :tl:`Updates` object.
Raises
ChatAdminRequiredError - You Should be Admin, in order to
Modify Group Call.
Example
.. code-block:: python
# Starting a Group Call
await client.modify_groupcall("create", -100123456789)
# Scheduling a Group Call, within 5-Minutes
from datetime import timedelta
schedule_for = timedelta(minutes=5)
await client.modify_groupcall(
"create",
schedule=schedule_for
)
# Editing a Group Call Title
new_title = "Having Fun with Telethon"
await client.modify_groupcall(
"edit_title",
title=new_title
)
# Stopping/Closing a Group Call
await client.modify_groupcall("discard", -100123456789)
# Force Start Scheduled Group Call
await client.modify_groupcall("start_schedule", chat)
# Toggle Group Call Setting
await client.modify_groupcall(
"edit_perms",
reset_invite_hash=True,
join_muted=True) # Mute New Group call Participants
"""
if not method:
raise ValueError("Method Cant be None.")
if method == "create":
return (await self(
functions.phone.CreateGroupCallRequest(
peer=entity,
schedule_date=schedule)
)).updates[0]
try:
Call = await self(
functions.messages.GetFullChatRequest(entity))
except errors.rpcerrorlist.ChatIdInvalidError:
Call = await self(
functions.channels.GetFullChannelRequest(entity))
Call = Call.full_chat.call
if method == "edit_title":
return await self(functions.phone.EditGroupCallTitleRequest(
Call, title=title if title else ""))
elif method == "edit_perms":
return await self(
functions.phone.ToggleGroupCallSettingsRequest(
reset_invite_hash=reset_invite_hash,
call=Call,
join_muted=join_muted))
elif method == "discard":
return await self(
functions.phone.DiscardGroupCallRequest(Call))
elif method == "start_schedule":
return await self(
functions.phone.StartScheduledGroupCallRequest(Call))
else:
raise ValueError(
"Invalid Method Used while using Modifying GroupCall")
# endregion

View File

@ -395,6 +395,9 @@ class DownloadMethods:
date = datetime.datetime.now()
media = message
if isinstance(media, str):
media = utils.resolve_bot_file_id(media)
if isinstance(media, types.MessageService):
if isinstance(message.action,
types.MessageActionChatEditPhoto):

View File

@ -152,6 +152,9 @@ class UploadMethods:
send the file as "external" media, and Telegram is the
one that will fetch the media and send it.
.
* A Bot API-like ``file_id``. You can convert previously
sent media to file IDs for later reusing with
`telethon.utils.pack_bot_file_id`.
* A handle to an existing file (for example, if you sent a
message with media before, you can use its ``message.media``
@ -696,6 +699,10 @@ class UploadMethods:
media = types.InputMediaPhotoExternal(file)
else:
media = types.InputMediaDocumentExternal(file)
else:
bot_file = utils.resolve_bot_file_id(file)
if bot_file:
media = utils.get_input_media(bot_file)
if media:
pass # Already have media, don't check the rest

View File

@ -18,6 +18,20 @@ class File:
def __init__(self, media):
self.media = media
@property
def id(self):
"""
The bot-API style ``file_id`` representing this file.
.. note::
This file ID may not work under user accounts,
but should still be usable by bot accounts.
You can, however, still use it to identify
a file in for example a database.
"""
return utils.pack_bot_file_id(self.media)
@property
def name(self):

View File

@ -855,6 +855,8 @@ def is_image(file):
match = re.match(r'\.(png|jpe?g)', _get_extension(file), re.IGNORECASE)
if match:
return True
else:
return isinstance(resolve_bot_file_id(file), types.Photo)
def is_gif(file):
@ -1113,6 +1115,160 @@ def _encode_telegram_base64(string):
return None # not valid base64, not valid ascii, not a string
def resolve_bot_file_id(file_id):
"""
Given a Bot API-style `file_id <telethon.tl.custom.file.File.id>`,
returns the media it represents. If the `file_id <telethon.tl.custom.file.File.id>`
is not valid, `None` is returned instead.
Note that the `file_id <telethon.tl.custom.file.File.id>` does not have information
such as image dimensions or file size, so these will be zero if present.
For thumbnails, the photo ID and hash will always be zero.
"""
data = _rle_decode(_decode_telegram_base64(file_id))
if not data:
return None
# This isn't officially documented anywhere, but
# we assume the last byte is some kind of "version".
data, version = data[:-1], data[-1]
if version not in (2, 4):
return None
if (version == 2 and len(data) == 24) or (version == 4 and len(data) == 25):
if version == 2:
file_type, dc_id, media_id, access_hash = struct.unpack('<iiqq', data)
# elif version == 4:
else:
# TODO Figure out what the extra byte means
file_type, dc_id, media_id, access_hash, _ = struct.unpack('<iiqqb', data)
if not (1 <= dc_id <= 5):
# Valid `file_id`'s must have valid DC IDs. Since this method is
# called when sending a file and the user may have entered a path
# they believe is correct but the file doesn't exist, this method
# may detect a path as "valid" bot `file_id` even when it's not.
# By checking the `dc_id`, we greatly reduce the chances of this
# happening.
return None
attributes = []
if file_type == 3 or file_type == 9:
attributes.append(types.DocumentAttributeAudio(
duration=0,
voice=file_type == 3
))
elif file_type == 4 or file_type == 13:
attributes.append(types.DocumentAttributeVideo(
duration=0,
w=0,
h=0,
round_message=file_type == 13
))
# elif file_type == 5: # other, cannot know which
elif file_type == 8:
attributes.append(types.DocumentAttributeSticker(
alt='',
stickerset=types.InputStickerSetEmpty()
))
elif file_type == 10:
attributes.append(types.DocumentAttributeAnimated())
return types.Document(
id=media_id,
access_hash=access_hash,
date=None,
mime_type='',
size=0,
thumbs=None,
dc_id=dc_id,
attributes=attributes,
file_reference=b''
)
elif (version == 2 and len(data) == 44) or (version == 4 and len(data) in (49, 77)):
if version == 2:
(file_type, dc_id, media_id, access_hash,
volume_id, secret, local_id) = struct.unpack('<iiqqqqi', data)
# else version == 4:
elif len(data) == 49:
# TODO Figure out what the extra five bytes mean
(file_type, dc_id, media_id, access_hash,
volume_id, secret, local_id, _) = struct.unpack('<iiqqqqi5s', data)
elif len(data) == 77:
# See #1613.
(file_type, dc_id, _, media_id, access_hash, volume_id, _, local_id, _) = struct.unpack('<ii28sqqq12sib', data)
else:
return None
if not (1 <= dc_id <= 5):
return None
# Thumbnails (small) always have ID 0; otherwise size 'x'
photo_size = 's' if media_id or access_hash else 'x'
return types.Photo(
id=media_id,
access_hash=access_hash,
file_reference=b'',
date=None,
sizes=[types.PhotoSize(
type=photo_size,
w=0,
h=0,
size=0
)],
dc_id=dc_id,
has_stickers=None
)
def pack_bot_file_id(file):
"""
Inverse operation for `resolve_bot_file_id`.
The only parameters this method will accept are :tl:`Document` and
:tl:`Photo`, and it will return a variable-length ``file_id`` string.
If an invalid parameter is given, it will ``return None``.
"""
if isinstance(file, types.MessageMediaDocument):
file = file.document
elif isinstance(file, types.MessageMediaPhoto):
file = file.photo
if isinstance(file, types.Document):
file_type = 5
for attribute in file.attributes:
if isinstance(attribute, types.DocumentAttributeAudio):
file_type = 3 if attribute.voice else 9
elif isinstance(attribute, types.DocumentAttributeVideo):
file_type = 13 if attribute.round_message else 4
elif isinstance(attribute, types.DocumentAttributeSticker):
file_type = 8
elif isinstance(attribute, types.DocumentAttributeAnimated):
file_type = 10
else:
continue
break
return _encode_telegram_base64(_rle_encode(struct.pack(
'<iiqqb', file_type, file.dc_id, file.id, file.access_hash, 2)))
elif isinstance(file, types.Photo):
size = next((x for x in reversed(file.sizes) if isinstance(
x, (types.PhotoSize, types.PhotoCachedSize))), None)
if not size:
return None
size = size.location
return _encode_telegram_base64(_rle_encode(struct.pack(
'<iiqqqqib', 2, file.dc_id, file.id, file.access_hash,
size.volume_id, 0, size.local_id, 2 # 0 = old `secret`
)))
else:
return None
def resolve_invite_link(link):
"""
@ -1393,7 +1549,7 @@ def _photo_size_byte_count(size):
return len(size.bytes)
elif isinstance(size, types.PhotoSizeEmpty):
return 0
elif isinstance(size, types.PhotoSizeProgressive):
elif isinstance(size, types.):
return max(size.sizes)
else:
return None