Externalized TelegramClient utilities and added more

This commit is contained in:
Lonami 2016-10-09 12:57:38 +02:00
parent b52cb12b2a
commit a42effc2b5
4 changed files with 114 additions and 87 deletions

View File

@ -1,6 +1,9 @@
from telethon.tl.types import UpdateShortChatMessage
from telethon.tl.types import UpdateShortMessage
from telethon import TelegramClient
from telethon.utils import get_display_name, get_input_peer
import shutil
# Get the (current) number of lines in the terminal
@ -71,7 +74,7 @@ class InteractiveTelegramClient(TelegramClient):
# Display them so the user can choose
for i, entity in enumerate(entities):
i += 1 # 1-based index for normies
print('{}. {}'.format(i, self.get_display_name(entity)))
print('{}. {}'.format(i, get_display_name(entity)))
# Let the user decide who they want to talk to
print()
@ -97,10 +100,10 @@ class InteractiveTelegramClient(TelegramClient):
# Retrieve the selected user (or chat, or channel)
entity = entities[i]
input_peer = self.get_input_peer(entity)
input_peer = get_input_peer(entity)
# Show some information
print_title('Chat with "{}"'.format(self.get_display_name(entity)))
print_title('Chat with "{}"'.format(get_display_name(entity)))
print('Available commands:')
print(' !q: Quits the current chat.')
print(' !Q: Quits the current chat and exits.')
@ -166,7 +169,7 @@ class InteractiveTelegramClient(TelegramClient):
print('Profile picture downloaded to {}'.format(output))
else:
print('"{}" does not seem to have a profile picture.'
.format(self.get_display_name(entity)))
.format(get_display_name(entity)))
# Send chat message (if any)
elif msg:

View File

@ -2,7 +2,7 @@ import platform
from datetime import datetime, timedelta
from hashlib import md5
from os import path, listdir
from mimetypes import guess_extension, guess_type
from mimetypes import guess_type
# For sending and receiving requests
from telethon.tl import MTProtoRequest
@ -10,15 +10,6 @@ from telethon.tl import Session
# The Requests and types that we'll be using
from telethon.tl.functions.upload import SaveBigFilePartRequest
from telethon.tl.types import \
User, Chat, Channel, \
PeerUser, PeerChat, PeerChannel, \
InputPeerUser, InputPeerChat, InputPeerChannel, InputPeerEmpty, \
UserProfilePhotoEmpty, ChatPhotoEmpty, \
InputFile, InputFileLocation, InputMediaUploadedPhoto, InputMediaUploadedDocument, \
MessageMediaContact, MessageMediaDocument, MessageMediaPhoto, \
DocumentAttributeAudio, DocumentAttributeFilename, InputDocumentFileLocation
from telethon.tl.functions import InvokeWithLayerRequest, InitConnectionRequest
from telethon.tl.functions.help import GetConfigRequest
from telethon.tl.functions.auth import SendCodeRequest, SignInRequest, SignUpRequest, LogOutRequest
@ -28,8 +19,18 @@ from telethon.tl.functions.messages import \
SendMessageRequest, SendMediaRequest, \
ReadHistoryRequest
# All the types we need to work with
from telethon.tl.types import \
InputPeerEmpty, \
UserProfilePhotoEmpty, ChatPhotoEmpty, \
InputFile, InputFileLocation, InputMediaUploadedPhoto, InputMediaUploadedDocument, \
MessageMediaContact, MessageMediaDocument, MessageMediaPhoto, \
DocumentAttributeAudio, DocumentAttributeFilename, InputDocumentFileLocation
# Import some externalized utilities to work with the Telegram types and more
import telethon.helpers as utils
import telethon.network.authenticator as authenticator
from telethon.utils import find_user_or_chat, get_appropiate_part_size, get_extension
from telethon.errors import *
from telethon.network import MtProtoSender, TcpTransport
@ -227,7 +228,7 @@ class TelegramClient:
offset_peer=offset_peer,
limit=count))
return (r.dialogs,
[self.find_user_or_chat(d.peer, r.users, r.chats) for d in r.dialogs])
[find_user_or_chat(d.peer, r.users, r.chats) for d in r.dialogs])
# endregion
@ -325,7 +326,7 @@ class TelegramClient:
"""
file_size = path.getsize(file_path)
if not part_size_kb:
part_size_kb = self.find_appropiate_part_size(file_size)
part_size_kb = get_appropiate_part_size(file_size)
if part_size_kb > 512:
raise ValueError('The part size must be less or equal to 512KB')
@ -422,9 +423,8 @@ class TelegramClient:
isinstance(profile_photo, ChatPhotoEmpty)):
return False
# Photos are always compressed into a .jpg by Telegram
if add_extension:
file_path += '.jpg'
file_path += get_extension(profile_photo)
if download_big:
photo_location = profile_photo.photo_big
@ -467,9 +467,8 @@ class TelegramClient:
file_size = largest_size.size
largest_size = largest_size.location
# Photos are always compressed into a .jpg by Telegram
if add_extension:
file_path += '.jpg'
file_path += get_extension(message_media_photo)
# Download the media with the largest size input file location
self.download_file_loc(InputFileLocation(volume_id=largest_size.volume_id,
@ -502,11 +501,8 @@ class TelegramClient:
if file_path is None:
print('Could not determine a filename for the document')
# Guess the extension based on the mime_type
if add_extension:
ext = guess_extension(document.mime_type)
if ext is not None:
file_path += ext
file_path += get_extension(document.mime_type)
self.download_file_loc(InputDocumentFileLocation(id=document.id,
access_hash=document.access_hash,
@ -551,7 +547,7 @@ class TelegramClient:
if not file_size:
raise ValueError('A part size value must be provided')
else:
part_size_kb = self.find_appropiate_part_size(file_size)
part_size_kb = get_appropiate_part_size(file_size)
part_size = int(part_size_kb * 1024)
if part_size % 1024 != 0:
@ -582,68 +578,6 @@ class TelegramClient:
# endregion
# region Utilities
@staticmethod
def get_display_name(entity):
"""Gets the input peer for the given "entity" (user, chat or channel)
Returns None if it was not found"""
if isinstance(entity, User):
if entity.last_name is not None:
return '{} {}'.format(entity.first_name, entity.last_name)
return entity.first_name
if isinstance(entity, Chat) or isinstance(entity, Channel):
return entity.title
@staticmethod
def get_input_peer(entity):
"""Gets the input peer for the given "entity" (user, chat or channel).
Returns None if it was not found"""
if isinstance(entity, User):
return InputPeerUser(entity.id, entity.access_hash)
if isinstance(entity, Chat):
return InputPeerChat(entity.id)
if isinstance(entity, Channel):
return InputPeerChannel(entity.id, entity.access_hash)
@staticmethod
def find_user_or_chat(peer, users, chats):
"""Finds the corresponding user or chat given a peer.
Returns None if it was not found"""
try:
if isinstance(peer, PeerUser):
user = next(u for u in users if u.id == peer.user_id)
return user
elif isinstance(peer, PeerChat):
chat = next(c for c in chats if c.id == peer.chat_id)
return chat
elif isinstance(peer, PeerChannel):
channel = next(c for c in chats if c.id == peer.channel_id)
return channel
except StopIteration:
return None
@staticmethod
def find_appropiate_part_size(file_size):
if file_size <= 1048576: # 1MB
return 32
if file_size <= 10485760: # 10MB
return 64
if file_size <= 393216000: # 375MB
return 128
if file_size <= 786432000: # 750MB
return 256
if file_size <= 1572864000: # 1500MB
return 512
raise ValueError('File size too large')
# endregion
# region Updates handling
def add_update_handler(self, handler):

View File

@ -1,2 +1,3 @@
from .binary_writer import BinaryWriter
from .binary_reader import BinaryReader
from .tl_utils import *

View File

@ -0,0 +1,89 @@
"""Utilities for working with TLObjects.
We have these because some TLObjects differ in very little things,
for example, some may have an `user_id` attribute and other a `chat_id` but,
after all, both are the same attribute, IDs."""
from mimetypes import guess_extension
from telethon.tl.types import \
User, Chat, Channel, \
PeerUser, PeerChat, PeerChannel, \
InputPeerUser, InputPeerChat, InputPeerChannel, \
UserProfilePhoto, ChatPhoto, \
MessageMediaPhoto, MessageMediaDocument
def get_display_name(entity):
"""Gets the input peer for the given "entity" (user, chat or channel)
Returns None if it was not found"""
if isinstance(entity, User):
if entity.last_name is not None:
return '{} {}'.format(entity.first_name, entity.last_name)
return entity.first_name
if isinstance(entity, Chat) or isinstance(entity, Channel):
return entity.title
def get_extension(media):
"""Gets the corresponding extension for any Telegram media"""
# Photos are always compressed as .jpg by Telegram
if (isinstance(media, UserProfilePhoto) or
isinstance(media, ChatPhoto) or
isinstance(media, MessageMediaPhoto)):
return '.jpg'
# Documents will come with a mime type, from which we can guess their mime type
if isinstance(media, MessageMediaDocument):
extension = guess_extension(media.document.mime_type)
return extension if extension else ''
return None
def get_input_peer(entity):
"""Gets the input peer for the given "entity" (user, chat or channel).
Returns None if it was not found"""
if isinstance(entity, User):
return InputPeerUser(entity.id, entity.access_hash)
if isinstance(entity, Chat):
return InputPeerChat(entity.id)
if isinstance(entity, Channel):
return InputPeerChannel(entity.id, entity.access_hash)
def find_user_or_chat(peer, users, chats):
"""Finds the corresponding user or chat given a peer.
Returns None if it was not found"""
try:
if isinstance(peer, PeerUser):
user = next(u for u in users if u.id == peer.user_id)
return user
elif isinstance(peer, PeerChat):
chat = next(c for c in chats if c.id == peer.chat_id)
return chat
elif isinstance(peer, PeerChannel):
channel = next(c for c in chats if c.id == peer.channel_id)
return channel
except StopIteration:
return None
def get_appropiate_part_size(file_size):
"""Gets the appropiate part size when uploading or downloading files,
given an initial file size"""
if file_size <= 1048576: # 1MB
return 32
if file_size <= 10485760: # 10MB
return 64
if file_size <= 393216000: # 375MB
return 128
if file_size <= 786432000: # 750MB
return 256
if file_size <= 1572864000: # 1500MB
return 512
raise ValueError('File size too large')