Telethon/telethon_examples/interactive_telegram_client.py

406 lines
16 KiB
Python
Raw Normal View History

import asyncio
import os
2018-06-24 13:04:23 +03:00
import sys
import time
from getpass import getpass
2018-05-24 11:58:42 +03:00
from telethon import TelegramClient, events
from telethon.errors import SessionPasswordNeededError
from telethon.network import ConnectionTcpAbridged
from telethon.utils import get_display_name
2018-06-24 13:04:23 +03:00
def sprint(string, *args, **kwargs):
"""Safe Print (handle UnicodeEncodeErrors on some terminals)"""
try:
print(string, *args, **kwargs)
except UnicodeEncodeError:
string = string.encode('utf-8', errors='ignore')\
.decode('ascii', errors='ignore')
print(string, *args, **kwargs)
def print_title(title):
"""Helper function to print titles to the console more nicely"""
sprint('\n')
sprint('=={}=='.format('=' * len(title)))
sprint('= {} ='.format(title))
sprint('=={}=='.format('=' * len(title)))
def bytes_to_string(byte_count):
"""Converts a byte count to a string (in KB, MB...)"""
suffix_index = 0
while byte_count >= 1024:
byte_count /= 1024
suffix_index += 1
return '{:.2f}{}'.format(
byte_count, [' bytes', 'KB', 'MB', 'GB', 'TB'][suffix_index]
)
2018-06-24 13:04:23 +03:00
async def async_input(prompt):
"""
Python's ``input()`` is blocking, which means the event loop we set
above can't be running while we're blocking there. This method will
let the loop run while we wait for input.
"""
print(prompt, end='', flush=True)
return (await asyncio.get_running_loop().run_in_executor(None, sys.stdin.readline)).rstrip()
2018-06-24 13:04:23 +03:00
def get_env(name, message, cast=str):
"""Helper to get environment variables interactively"""
if name in os.environ:
return os.environ[name]
while True:
value = input(message)
try:
return cast(value)
except ValueError as e:
print(e, file=sys.stderr)
time.sleep(1)
class InteractiveTelegramClient(TelegramClient):
"""Full featured Telegram client, meant to be used on an interactive
session to see what Telethon is capable off -
This client allows the user to perform some basic interaction with
Telegram through Telethon, such as listing dialogs (open chats),
talking to people, downloading media, and receiving updates.
"""
def __init__(self, session_user_id, api_id, api_hash,
proxy=None):
"""
Initializes the InteractiveTelegramClient.
:param session_user_id: Name of the *.session file.
:param api_id: Telegram's api_id acquired through my.telegram.org.
:param api_hash: Telegram's api_hash.
:param proxy: Optional proxy tuple/dictionary.
"""
print_title('Initialization')
print('Initializing interactive example...')
# The first step is to initialize the TelegramClient, as we are
# subclassing it, we need to call super().__init__(). On a more
# normal case you would want 'client = TelegramClient(...)'
super().__init__(
# These parameters should be passed always, session name and API
session_user_id, api_id, api_hash,
2018-05-24 11:58:42 +03:00
# You can optionally change the connection mode by passing a
# type or an instance of it. This changes how the sent packets
# look (low-level concept you normally shouldn't worry about).
# Default is ConnectionTcpFull, smallest is ConnectionTcpAbridged.
connection=ConnectionTcpAbridged,
# If you're using a proxy, set it here.
2018-06-24 13:04:23 +03:00
proxy=proxy
)
2018-01-16 20:36:50 +03:00
# Store {message.id: message} map here so that we can download
# media known the message ID, for every message having media.
self.found_media = {}
async def init(self):
2018-06-24 13:04:23 +03:00
# Calling .connect() may raise a connection error False, so you need
# to except those before continuing. Otherwise you may want to retry
# as done here.
print('Connecting to Telegram servers...')
2018-06-24 13:04:23 +03:00
try:
await self.connect()
except IOError:
# We handle IOError and not ConnectionError because
# PySocks' errors do not subclass ConnectionError
# (so this will work with and without proxies).
print('Initial connection failed. Retrying...')
await self.connect()
# If the user hasn't called .sign_in() or .sign_up() yet, they won't
# be authorized. The first thing you must do is authorize. Calling
# .sign_in() should only be done once as the information is saved on
# the *.session file so you don't need to enter the code every time.
if not await self.is_user_authorized():
print('First run. Sending code request...')
user_phone = input('Enter your phone: ')
await self.sign_in(user_phone)
self_user = None
while self_user is None:
code = input('Enter the code you just received: ')
try:
self_user = await self.sign_in(code=code)
# Two-step verification may be enabled, and .sign_in will
# raise this error. If that's the case ask for the password.
# Note that getpass() may not work on PyCharm due to a bug,
# if that's the case simply change it for input().
except SessionPasswordNeededError:
pw = getpass('Two step verification is enabled. '
'Please enter your password: ')
self_user = await self.sign_in(password=pw)
2018-06-24 13:04:23 +03:00
async def run(self):
"""Main loop of the TelegramClient, will wait for user action"""
2018-05-24 11:58:42 +03:00
# Once everything is ready, we can add an event handler.
#
# Events are an abstraction over Telegram's "Updates" and
# are much easier to use.
self.add_event_handler(self.message_handler, events.NewMessage)
# Enter a while loop to chat as long as the user wants
while True:
# Retrieve the top dialogs. You can set the limit to None to
# retrieve all of them if you wish, but beware that may take
# a long time if you have hundreds of them.
dialog_count = 15
# Entities represent the user, chat or channel
# corresponding to the dialog on the same index.
2018-06-24 13:04:23 +03:00
dialogs = await self.get_dialogs(limit=dialog_count)
i = None
while i is None:
print_title('Dialogs window')
# Display them so the user can choose
for i, dialog in enumerate(dialogs, start=1):
sprint('{}. {}'.format(i, get_display_name(dialog.entity)))
# Let the user decide who they want to talk to
print()
print('> Who do you want to send messages to?')
print('> Available commands:')
print(' !q: Quits the dialogs window and exits.')
print(' !l: Logs out, terminating this session.')
print()
2018-06-24 13:04:23 +03:00
i = await async_input('Enter dialog ID or a command: ')
if i == '!q':
return
if i == '!l':
# Logging out will cause the user to need to reenter the
# code next time they want to use the library, and will
# also delete the *.session file off the filesystem.
#
# This is not the same as simply calling .disconnect(),
# which simply shuts down everything gracefully.
2018-06-24 13:04:23 +03:00
await self.log_out()
return
try:
i = int(i if i else 0) - 1
# Ensure it is inside the bounds, otherwise retry
if not 0 <= i < dialog_count:
i = None
except ValueError:
i = None
# Retrieve the selected user (or chat, or channel)
entity = dialogs[i].entity
# Show some information
print_title('Chat with "{}"'.format(get_display_name(entity)))
print('Available commands:')
print(' !q: Quits the current chat.')
print(' !Q: Quits the current chat and exits.')
print(' !h: prints the latest messages (message History).')
print(' !up <path>: Uploads and sends the Photo from path.')
print(' !uf <path>: Uploads and sends the File from path.')
print(' !d <msg-id>: Deletes a message by its id')
print(' !dm <msg-id>: Downloads the given message Media (if any).')
print(' !dp: Downloads the current dialog Profile picture.')
print(' !i: Prints information about this chat..')
print()
# And start a while loop to chat
while True:
2018-06-24 13:04:23 +03:00
msg = await async_input('Enter a message: ')
# Quit
if msg == '!q':
break
elif msg == '!Q':
return
# History
elif msg == '!h':
# First retrieve the messages and some information
2018-06-24 13:04:23 +03:00
messages = await self.get_messages(entity, limit=10)
# Iterate over all (in reverse order so the latest appear
# the last in the console) and print them with format:
# "[hh:mm] Sender: Message"
2018-01-16 20:36:50 +03:00
for msg in reversed(messages):
2018-06-24 13:04:23 +03:00
# Note how we access .sender here. Since we made an
# API call using the self client, it will always have
# information about the sender. This is different to
# events, where Telegram may not always send the user.
2018-01-16 20:36:50 +03:00
name = get_display_name(msg.sender)
# Format the message content
if getattr(msg, 'media', None):
2018-01-16 20:36:50 +03:00
self.found_media[msg.id] = msg
content = '<{}> {}'.format(
type(msg.media).__name__, msg.message)
elif hasattr(msg, 'message'):
content = msg.message
elif hasattr(msg, 'action'):
content = str(msg.action)
else:
# Unknown message, simply print its class name
content = type(msg).__name__
# And print it to the user
sprint('[{}:{}] (ID={}) {}: {}'.format(
msg.date.hour, msg.date.minute, msg.id, name, content))
# Send photo
elif msg.startswith('!up '):
# Slice the message to get the path
2018-06-24 13:04:23 +03:00
path = msg[len('!up '):]
await self.send_photo(path=path, entity=entity)
# Send file (document)
elif msg.startswith('!uf '):
# Slice the message to get the path
2018-06-24 13:04:23 +03:00
path = msg[len('!uf '):]
await self.send_document(path=path, entity=entity)
# Delete messages
elif msg.startswith('!d '):
# Slice the message to get message ID
2018-06-24 13:04:23 +03:00
msg = msg[len('!d '):]
deleted_msg = await self.delete_messages(entity, msg)
2018-01-16 20:36:50 +03:00
print('Deleted {}'.format(deleted_msg))
# Download media
elif msg.startswith('!dm '):
# Slice the message to get message ID
2018-06-24 13:04:23 +03:00
await self.download_media_by_id(msg[len('!dm '):])
# Download profile photo
elif msg == '!dp':
print('Downloading profile picture to usermedia/...')
os.makedirs('usermedia', exist_ok=True)
2018-06-24 13:04:23 +03:00
output = await self.download_profile_photo(entity,
'usermedia')
if output:
print('Profile picture downloaded to', output)
else:
2018-01-16 20:36:50 +03:00
print('No profile picture found for this user!')
elif msg == '!i':
attributes = list(entity.to_dict().items())
pad = max(len(x) for x, _ in attributes)
for name, val in attributes:
print("{:<{width}} : {}".format(name, val, width=pad))
# Send chat message (if any)
elif msg:
2018-06-24 13:04:23 +03:00
await self.send_message(entity, msg, link_preview=False)
2018-06-24 13:04:23 +03:00
async def send_photo(self, path, entity):
"""Sends the file located at path to the desired entity as a photo"""
2018-06-24 13:04:23 +03:00
await self.send_file(
entity, path,
progress_callback=self.upload_progress_callback
)
print('Photo sent!')
2018-06-24 13:04:23 +03:00
async def send_document(self, path, entity):
"""Sends the file located at path to the desired entity as a document"""
2018-06-24 13:04:23 +03:00
await self.send_file(
entity, path,
force_document=True,
progress_callback=self.upload_progress_callback
)
print('Document sent!')
2018-06-24 13:04:23 +03:00
async def download_media_by_id(self, media_id):
"""Given a message ID, finds the media this message contained and
downloads it.
"""
try:
2018-01-16 20:36:50 +03:00
msg = self.found_media[int(media_id)]
except (ValueError, KeyError):
# ValueError when parsing, KeyError when accessing dictionary
print('Invalid media ID given or message not found!')
return
print('Downloading media to usermedia/...')
os.makedirs('usermedia', exist_ok=True)
2018-06-24 13:04:23 +03:00
output = await self.download_media(
2018-01-16 20:36:50 +03:00
msg.media,
file='usermedia/',
progress_callback=self.download_progress_callback
)
print('Media downloaded to {}!'.format(output))
@staticmethod
def download_progress_callback(downloaded_bytes, total_bytes):
InteractiveTelegramClient.print_progress(
'Downloaded', downloaded_bytes, total_bytes
)
@staticmethod
def upload_progress_callback(uploaded_bytes, total_bytes):
InteractiveTelegramClient.print_progress(
'Uploaded', uploaded_bytes, total_bytes
)
@staticmethod
def print_progress(progress_type, downloaded_bytes, total_bytes):
print('{} {} out of {} ({:.2%})'.format(
progress_type, bytes_to_string(downloaded_bytes),
bytes_to_string(total_bytes), downloaded_bytes / total_bytes)
)
2018-06-24 13:04:23 +03:00
async def message_handler(self, event):
2018-05-24 11:58:42 +03:00
"""Callback method for received events.NewMessage"""
2018-06-24 13:04:23 +03:00
# Note that message_handler is called when a Telegram update occurs
# and an event is created. Telegram may not always send information
# about the ``.sender`` or the ``.chat``, so if you *really* want it
# you should use ``get_chat()`` and ``get_sender()`` while working
# with events. Since they are methods, you know they may make an API
# call, which can be expensive.
chat = await event.get_chat()
2018-05-24 11:58:42 +03:00
if event.is_group:
if event.out:
sprint('>> sent "{}" to chat {}'.format(
2018-06-24 13:04:23 +03:00
event.text, get_display_name(chat)
))
else:
2018-05-24 11:58:42 +03:00
sprint('<< {} @ {} sent "{}"'.format(
2018-06-24 13:04:23 +03:00
get_display_name(await event.get_sender()),
get_display_name(chat),
2018-05-24 11:58:42 +03:00
event.text
))
2018-05-24 11:58:42 +03:00
else:
if event.out:
sprint('>> "{}" to user {}'.format(
2018-06-24 13:04:23 +03:00
event.text, get_display_name(chat)
))
else:
2018-05-24 11:58:42 +03:00
sprint('<< {} sent "{}"'.format(
2018-06-24 13:04:23 +03:00
get_display_name(chat), event.text
))
async def main():
SESSION = os.environ.get('TG_SESSION', 'interactive')
API_ID = get_env('TG_API_ID', 'Enter your API ID: ', int)
API_HASH = get_env('TG_API_HASH', 'Enter your API hash: ')
client = await InteractiveTelegramClient(SESSION, API_ID, API_HASH).init()
await client.run()
if __name__ == '__main__':
asyncio.run(main())