Use types. namespace in utils

This commit is contained in:
Lonami Exo 2018-07-22 19:40:00 +02:00
parent ace7254344
commit 52292d77fb

View File

@ -14,26 +14,6 @@ from types import GeneratorType
from .extensions import markdown, html
from .helpers import add_surrogate, del_surrogate
from .tl import types
from .tl.types import (
Channel, ChannelForbidden, Chat, ChatEmpty, ChatForbidden, ChatFull,
ChatPhoto, InputPeerChannel, InputPeerChat, InputPeerUser, InputPeerEmpty,
MessageMediaDocument, MessageMediaPhoto, PeerChannel, InputChannel,
UserEmpty, InputUser, InputUserEmpty, InputUserSelf, InputPeerSelf,
PeerChat, PeerUser, User, UserFull, UserProfilePhoto, Document,
MessageMediaContact, MessageMediaEmpty, MessageMediaGame, MessageMediaGeo,
MessageMediaUnsupported, MessageMediaVenue, InputMediaContact,
InputMediaDocument, InputMediaEmpty, InputMediaGame,
InputMediaGeoPoint, InputMediaPhoto, InputMediaVenue, InputDocument,
DocumentEmpty, InputDocumentEmpty, GeoPoint, InputGeoPoint,
GeoPointEmpty, InputGeoPointEmpty, Photo, InputPhoto, PhotoEmpty,
InputPhotoEmpty, FileLocation, ChatPhotoEmpty, UserProfilePhotoEmpty,
FileLocationUnavailable, InputMediaUploadedDocument, ChannelFull,
InputMediaUploadedPhoto, DocumentAttributeFilename, photos,
TopPeer, InputNotifyPeer, InputMessageID, InputFileLocation,
InputDocumentFileLocation, PhotoSizeEmpty, InputDialogPeer,
DocumentAttributeAudio, DocumentAttributeVideo
)
from .tl.types.contacts import ResolvedPeer
try:
import hachoir
@ -82,7 +62,7 @@ def get_display_name(entity):
Gets the display name for the given entity, if it's an :tl:`User`,
:tl:`Chat` or :tl:`Channel`. Returns an empty string otherwise.
"""
if isinstance(entity, User):
if isinstance(entity, types.User):
if entity.last_name and entity.first_name:
return '{} {}'.format(entity.first_name, entity.last_name)
elif entity.first_name:
@ -92,7 +72,7 @@ def get_display_name(entity):
else:
return ''
elif isinstance(entity, (Chat, Channel)):
elif isinstance(entity, (types.Chat, types.Channel)):
return entity.title
return ''
@ -102,13 +82,14 @@ def get_extension(media):
"""Gets the corresponding extension for any Telegram media."""
# Photos are always compressed as .jpg by Telegram
if isinstance(media, (UserProfilePhoto, ChatPhoto, MessageMediaPhoto)):
if isinstance(media, (types.UserProfilePhoto,
types.ChatPhoto, types.MessageMediaPhoto)):
return '.jpg'
# Documents will come with a mime type
if isinstance(media, MessageMediaDocument):
if isinstance(media, types.MessageMediaDocument):
media = media.document
if isinstance(media, Document):
if isinstance(media, types.Document):
if media.mime_type == 'application/octet-stream':
# Octet stream are just bytes, which have no default extension
return ''
@ -140,38 +121,38 @@ def get_input_peer(entity, allow_self=True):
else:
_raise_cast_fail(entity, 'InputPeer')
if isinstance(entity, User):
if isinstance(entity, types.User):
if entity.is_self and allow_self:
return InputPeerSelf()
return types.InputPeerSelf()
else:
return InputPeerUser(entity.id, entity.access_hash or 0)
return types.InputPeerUser(entity.id, entity.access_hash or 0)
if isinstance(entity, (Chat, ChatEmpty, ChatForbidden)):
return InputPeerChat(entity.id)
if isinstance(entity, (types.Chat, types.ChatEmpty, types.ChatForbidden)):
return types.InputPeerChat(entity.id)
if isinstance(entity, (Channel, ChannelForbidden)):
return InputPeerChannel(entity.id, entity.access_hash or 0)
if isinstance(entity, (types.Channel, types.ChannelForbidden)):
return types.InputPeerChannel(entity.id, entity.access_hash or 0)
if isinstance(entity, InputUser):
return InputPeerUser(entity.user_id, entity.access_hash)
if isinstance(entity, types.InputUser):
return types.InputPeerUser(entity.user_id, entity.access_hash)
if isinstance(entity, InputChannel):
return InputPeerChannel(entity.channel_id, entity.access_hash)
if isinstance(entity, types.InputChannel):
return types.InputPeerChannel(entity.channel_id, entity.access_hash)
if isinstance(entity, InputUserSelf):
return InputPeerSelf()
if isinstance(entity, types.InputUserSelf):
return types.InputPeerSelf()
if isinstance(entity, UserEmpty):
return InputPeerEmpty()
if isinstance(entity, types.UserEmpty):
return types.InputPeerEmpty()
if isinstance(entity, UserFull):
if isinstance(entity, types.UserFull):
return get_input_peer(entity.user)
if isinstance(entity, ChatFull):
return InputPeerChat(entity.id)
if isinstance(entity, types.ChatFull):
return types.InputPeerChat(entity.id)
if isinstance(entity, PeerChat):
return InputPeerChat(entity.chat_id)
if isinstance(entity, types.PeerChat):
return types.InputPeerChat(entity.chat_id)
_raise_cast_fail(entity, 'InputPeer')
@ -184,11 +165,11 @@ def get_input_channel(entity):
except AttributeError:
_raise_cast_fail(entity, 'InputChannel')
if isinstance(entity, (Channel, ChannelForbidden)):
return InputChannel(entity.id, entity.access_hash or 0)
if isinstance(entity, (types.Channel, types.ChannelForbidden)):
return types.InputChannel(entity.id, entity.access_hash or 0)
if isinstance(entity, InputPeerChannel):
return InputChannel(entity.channel_id, entity.access_hash)
if isinstance(entity, types.InputPeerChannel):
return types.InputChannel(entity.channel_id, entity.access_hash)
_raise_cast_fail(entity, 'InputChannel')
@ -201,23 +182,23 @@ def get_input_user(entity):
except AttributeError:
_raise_cast_fail(entity, 'InputUser')
if isinstance(entity, User):
if isinstance(entity, types.User):
if entity.is_self:
return InputUserSelf()
return types.InputUserSelf()
else:
return InputUser(entity.id, entity.access_hash or 0)
return types.InputUser(entity.id, entity.access_hash or 0)
if isinstance(entity, InputPeerSelf):
return InputUserSelf()
if isinstance(entity, types.InputPeerSelf):
return types.InputUserSelf()
if isinstance(entity, (UserEmpty, InputPeerEmpty)):
return InputUserEmpty()
if isinstance(entity, (types.UserEmpty, types.InputPeerEmpty)):
return types.InputUserEmpty()
if isinstance(entity, UserFull):
if isinstance(entity, types.UserFull):
return get_input_user(entity.user)
if isinstance(entity, InputPeerUser):
return InputUser(entity.user_id, entity.access_hash)
if isinstance(entity, types.InputPeerUser):
return types.InputUser(entity.user_id, entity.access_hash)
_raise_cast_fail(entity, 'InputUser')
@ -228,12 +209,12 @@ def get_input_dialog(dialog):
if dialog.SUBCLASS_OF_ID == 0xa21c9795: # crc32(b'InputDialogPeer')
return dialog
if dialog.SUBCLASS_OF_ID == 0xc91c90b6: # crc32(b'InputPeer')
return InputDialogPeer(dialog)
return types.InputDialogPeer(dialog)
except AttributeError:
_raise_cast_fail(dialog, 'InputDialogPeer')
try:
return InputDialogPeer(get_input_peer(dialog))
return types.InputDialogPeer(get_input_peer(dialog))
except TypeError:
pass
@ -248,13 +229,13 @@ def get_input_document(document):
except AttributeError:
_raise_cast_fail(document, 'InputDocument')
if isinstance(document, Document):
return InputDocument(id=document.id, access_hash=document.access_hash)
if isinstance(document, types.Document):
return types.InputDocument(id=document.id, access_hash=document.access_hash)
if isinstance(document, DocumentEmpty):
return InputDocumentEmpty()
if isinstance(document, types.DocumentEmpty):
return types.InputDocumentEmpty()
if isinstance(document, MessageMediaDocument):
if isinstance(document, types.MessageMediaDocument):
return get_input_document(document.document)
if isinstance(document, types.Message):
@ -271,14 +252,14 @@ def get_input_photo(photo):
except AttributeError:
_raise_cast_fail(photo, 'InputPhoto')
if isinstance(photo, photos.Photo):
if isinstance(photo, types.photos.Photo):
photo = photo.photo
if isinstance(photo, Photo):
return InputPhoto(id=photo.id, access_hash=photo.access_hash)
if isinstance(photo, types.Photo):
return types.InputPhoto(id=photo.id, access_hash=photo.access_hash)
if isinstance(photo, PhotoEmpty):
return InputPhotoEmpty()
if isinstance(photo, types.PhotoEmpty):
return types.InputPhotoEmpty()
_raise_cast_fail(photo, 'InputPhoto')
@ -291,13 +272,13 @@ def get_input_geo(geo):
except AttributeError:
_raise_cast_fail(geo, 'InputGeoPoint')
if isinstance(geo, GeoPoint):
return InputGeoPoint(lat=geo.lat, long=geo.long)
if isinstance(geo, types.GeoPoint):
return types.InputGeoPoint(lat=geo.lat, long=geo.long)
if isinstance(geo, GeoPointEmpty):
return InputGeoPointEmpty()
if isinstance(geo, types.GeoPointEmpty):
return types.InputGeoPointEmpty()
if isinstance(geo, MessageMediaGeo):
if isinstance(geo, types.MessageMediaGeo):
return get_input_geo(geo.geo)
if isinstance(geo, types.Message):
@ -317,67 +298,67 @@ def get_input_media(media, is_photo=False):
if media.SUBCLASS_OF_ID == 0xfaf846f4: # crc32(b'InputMedia')
return media
elif media.SUBCLASS_OF_ID == 0x846363e0: # crc32(b'InputPhoto')
return InputMediaPhoto(media)
return types.InputMediaPhoto(media)
elif media.SUBCLASS_OF_ID == 0xf33fdb68: # crc32(b'InputDocument')
return InputMediaDocument(media)
return types.InputMediaDocument(media)
except AttributeError:
_raise_cast_fail(media, 'InputMedia')
if isinstance(media, MessageMediaPhoto):
return InputMediaPhoto(
if isinstance(media, types.MessageMediaPhoto):
return types.InputMediaPhoto(
id=get_input_photo(media.photo),
ttl_seconds=media.ttl_seconds
)
if isinstance(media, (Photo, photos.Photo, PhotoEmpty)):
return InputMediaPhoto(
if isinstance(media, (types.Photo, types.photos.Photo, types.PhotoEmpty)):
return types.InputMediaPhoto(
id=get_input_photo(media)
)
if isinstance(media, MessageMediaDocument):
return InputMediaDocument(
if isinstance(media, types.MessageMediaDocument):
return types.InputMediaDocument(
id=get_input_document(media.document),
ttl_seconds=media.ttl_seconds
)
if isinstance(media, (Document, DocumentEmpty)):
return InputMediaDocument(
if isinstance(media, (types.Document, types.DocumentEmpty)):
return types.InputMediaDocument(
id=get_input_document(media)
)
if isinstance(media, FileLocation):
if isinstance(media, types.FileLocation):
if is_photo:
return InputMediaUploadedPhoto(file=media)
return types.InputMediaUploadedPhoto(file=media)
else:
return InputMediaUploadedDocument(
return types.InputMediaUploadedDocument(
file=media,
mime_type='application/octet-stream', # unknown, assume bytes
attributes=[DocumentAttributeFilename('unnamed')]
attributes=[types.DocumentAttributeFilename('unnamed')]
)
if isinstance(media, MessageMediaGame):
return InputMediaGame(id=media.game.id)
if isinstance(media, types.MessageMediaGame):
return types.InputMediaGame(id=media.game.id)
if isinstance(media, (ChatPhoto, UserProfilePhoto)):
if isinstance(media.photo_big, FileLocationUnavailable):
if isinstance(media, (types.ChatPhoto, types.UserProfilePhoto)):
if isinstance(media.photo_big, types.FileLocationUnavailable):
media = media.photo_small
else:
media = media.photo_big
return get_input_media(media, is_photo=True)
if isinstance(media, MessageMediaContact):
return InputMediaContact(
if isinstance(media, types.MessageMediaContact):
return types.InputMediaContact(
phone_number=media.phone_number,
first_name=media.first_name,
last_name=media.last_name,
vcard=''
)
if isinstance(media, MessageMediaGeo):
return InputMediaGeoPoint(geo_point=get_input_geo(media.geo))
if isinstance(media, types.MessageMediaGeo):
return types.InputMediaGeoPoint(geo_point=get_input_geo(media.geo))
if isinstance(media, MessageMediaVenue):
return InputMediaVenue(
if isinstance(media, types.MessageMediaVenue):
return types.InputMediaVenue(
geo_point=get_input_geo(media.geo),
title=media.title,
address=media.address,
@ -387,9 +368,10 @@ def get_input_media(media, is_photo=False):
)
if isinstance(media, (
MessageMediaEmpty, MessageMediaUnsupported,
ChatPhotoEmpty, UserProfilePhotoEmpty, FileLocationUnavailable)):
return InputMediaEmpty()
types.MessageMediaEmpty, types.MessageMediaUnsupported,
types.ChatPhotoEmpty, types.UserProfilePhotoEmpty,
types.FileLocationUnavailable)):
return types.InputMediaEmpty()
if isinstance(media, types.Message):
return get_input_media(media.media, is_photo=is_photo)
@ -401,11 +383,11 @@ def get_input_message(message):
"""Similar to :meth:`get_input_peer`, but for input messages."""
try:
if isinstance(message, int): # This case is really common too
return InputMessageID(message)
return types.InputMessageID(message)
elif message.SUBCLASS_OF_ID == 0x54b6bcc5: # crc32(b'InputMessage'):
return message
elif message.SUBCLASS_OF_ID == 0x790009e3: # crc32(b'Message'):
return InputMessageID(message.id)
return types.InputMessageID(message.id)
except AttributeError:
pass
@ -445,14 +427,14 @@ def get_attributes(file, *, attributes=None, mime_type=None,
if mime_type is None:
mime_type = mimetypes.guess_type(file)[0]
attr_dict = {DocumentAttributeFilename:
DocumentAttributeFilename(os.path.basename(file))}
attr_dict = {types.DocumentAttributeFilename:
types.DocumentAttributeFilename(os.path.basename(file))}
if is_audio(file) and hachoir is not None:
with hachoir.parser.createParser(file) as parser:
m = hachoir.metadata.extractMetadata(parser)
attr_dict[DocumentAttributeAudio] = \
DocumentAttributeAudio(
attr_dict[types.DocumentAttributeAudio] = \
types.DocumentAttributeAudio(
voice=voice_note,
title=m.get('title') if m.has('title') else None,
performer=m.get('author') if m.has('author') else None,
@ -464,7 +446,7 @@ def get_attributes(file, *, attributes=None, mime_type=None,
if hachoir:
with hachoir.parser.createParser(file) as parser:
m = hachoir.metadata.extractMetadata(parser)
doc = DocumentAttributeVideo(
doc = types.DocumentAttributeVideo(
round_message=video_note,
w=m.get('width') if m.has('width') else 0,
h=m.get('height') if m.has('height') else 0,
@ -472,21 +454,21 @@ def get_attributes(file, *, attributes=None, mime_type=None,
if m.has('duration') else 0)
)
else:
doc = DocumentAttributeVideo(
doc = types.DocumentAttributeVideo(
0, 1, 1, round_message=video_note)
attr_dict[DocumentAttributeVideo] = doc
attr_dict[types.DocumentAttributeVideo] = doc
else:
attr_dict = {DocumentAttributeFilename:
DocumentAttributeFilename(
attr_dict = {types.DocumentAttributeFilename:
types.DocumentAttributeFilename(
os.path.basename(getattr(file, 'name', None) or 'unnamed'))}
if voice_note:
if DocumentAttributeAudio in attr_dict:
attr_dict[DocumentAttributeAudio].voice = True
if types.DocumentAttributeAudio in attr_dict:
attr_dict[types.DocumentAttributeAudio].voice = True
else:
attr_dict[DocumentAttributeAudio] = \
DocumentAttributeAudio(0, voice=True)
attr_dict[types.DocumentAttributeAudio] = \
types.DocumentAttributeAudio(0, voice=True)
# Now override the attributes if any. As we have a dict of
# {cls: instance}, we can override any class with the list
@ -553,23 +535,26 @@ def get_input_location(location):
if isinstance(location, types.Message):
location = location.media
if isinstance(location, MessageMediaDocument):
if isinstance(location, types.MessageMediaDocument):
location = location.document
elif isinstance(location, MessageMediaPhoto):
elif isinstance(location, types.MessageMediaPhoto):
location = location.photo
if isinstance(location, Document):
return (location.dc_id, InputDocumentFileLocation(
if isinstance(location, types.Document):
return (location.dc_id, types.InputDocumentFileLocation(
location.id, location.access_hash, location.version))
elif isinstance(location, Photo):
elif isinstance(location, types.Photo):
try:
location = next(x for x in reversed(location.sizes)
if not isinstance(x, PhotoSizeEmpty)).location
location = next(
x for x in reversed(location.sizes)
if not isinstance(x, types.PhotoSizeEmpty)
).location
except StopIteration:
pass
if isinstance(location, (FileLocation, FileLocationUnavailable)):
return (getattr(location, 'dc_id', None), InputFileLocation(
if isinstance(location, (
types.FileLocation, types.FileLocationUnavailable)):
return (getattr(location, 'dc_id', None), types.InputFileLocation(
location.volume_id, location.local_id, location.secret))
_raise_cast_fail(location, 'InputFileLocation')
@ -694,7 +679,8 @@ def get_peer_id(peer, add_mark=True):
try:
if peer.SUBCLASS_OF_ID not in (0x2d45687, 0xc91c90b6):
if isinstance(peer, (ResolvedPeer, InputNotifyPeer, TopPeer)):
if isinstance(peer, (
types.ResolvedPeer, types.InputNotifyPeer, types.TopPeer)):
peer = peer.peer
else:
# Not a Peer or an InputPeer, so first get its Input version
@ -703,16 +689,17 @@ def get_peer_id(peer, add_mark=True):
_raise_cast_fail(peer, 'int')
# Set the right ID/kind, or raise if the TLObject is not recognised
if isinstance(peer, (PeerUser, InputPeerUser)):
if isinstance(peer, (types.PeerUser, types.InputPeerUser)):
return peer.user_id
elif isinstance(peer, (PeerChat, InputPeerChat)):
elif isinstance(peer, (types.PeerChat, types.InputPeerChat)):
# Check in case the user mixed things up to avoid blowing up
if not (0 < peer.chat_id <= 0x7fffffff):
peer.chat_id = resolve_id(peer.chat_id)[0]
return -peer.chat_id if add_mark else peer.chat_id
elif isinstance(peer, (PeerChannel, InputPeerChannel, ChannelFull)):
if isinstance(peer, ChannelFull):
elif isinstance(peer, (
types.PeerChannel, types.InputPeerChannel, types.ChannelFull)):
if isinstance(peer, types.ChannelFull):
# Special case: .get_input_peer can't return InputChannel from
# ChannelFull since it doesn't have an .access_hash attribute.
i = peer.id
@ -722,7 +709,7 @@ def get_peer_id(peer, add_mark=True):
# Check in case the user mixed things up to avoid blowing up
if not (0 < i <= 0x7fffffff):
i = resolve_id(i)[0]
if isinstance(peer, ChannelFull):
if isinstance(peer, types.ChannelFull):
peer.id = i
else:
peer.channel_id = i
@ -740,7 +727,7 @@ def get_peer_id(peer, add_mark=True):
def resolve_id(marked_id):
"""Given a marked ID, returns the original ID and its :tl:`Peer` type."""
if marked_id >= 0:
return marked_id, PeerUser
return marked_id, types.PeerUser
# There have been report of chat IDs being 10000xyz, which means their
# marked version is -10000xyz, which in turn looks like a channel but
@ -748,9 +735,9 @@ def resolve_id(marked_id):
# two zeroes.
m = re.match(r'-100([^0]\d*)', str(marked_id))
if m:
return int(m.group(1)), PeerChannel
return int(m.group(1)), types.PeerChannel
return -marked_id, PeerChat
return -marked_id, types.PeerChat
def get_appropriated_part_size(file_size):