import asyncio import os import sys import time from getpass import getpass from telethon import TelegramClient, events from telethon.errors import SessionPasswordNeededError from telethon.network import ConnectionTcpAbridged from telethon.utils import get_display_name def sprint(string, *args, **kwargs): """Safe Print (handle UnicodeEncodeErrors on some terminals)""" try: print(string, *args, **kwargs) except UnicodeEncodeError: string = string.encode('utf-8', errors='ignore')\ .decode('ascii', errors='ignore') print(string, *args, **kwargs) def print_title(title): """Helper function to print titles to the console more nicely""" sprint('\n') sprint('=={}=='.format('=' * len(title))) sprint('= {} ='.format(title)) sprint('=={}=='.format('=' * len(title))) def bytes_to_string(byte_count): """Converts a byte count to a string (in KB, MB...)""" suffix_index = 0 while byte_count >= 1024: byte_count /= 1024 suffix_index += 1 return '{:.2f}{}'.format( byte_count, [' bytes', 'KB', 'MB', 'GB', 'TB'][suffix_index] ) async def async_input(prompt): """ Python's ``input()`` is blocking, which means the event loop we set above can't be running while we're blocking there. This method will let the loop run while we wait for input. """ print(prompt, end='', flush=True) return (await asyncio.get_running_loop().run_in_executor(None, sys.stdin.readline)).rstrip() def get_env(name, message, cast=str): """Helper to get environment variables interactively""" if name in os.environ: return os.environ[name] while True: value = input(message) try: return cast(value) except ValueError as e: print(e, file=sys.stderr) time.sleep(1) class InteractiveTelegramClient(TelegramClient): """Full featured Telegram client, meant to be used on an interactive session to see what Telethon is capable off - This client allows the user to perform some basic interaction with Telegram through Telethon, such as listing dialogs (open chats), talking to people, downloading media, and receiving updates. """ def __init__(self, session_user_id, api_id, api_hash, proxy=None): """ Initializes the InteractiveTelegramClient. :param session_user_id: Name of the *.session file. :param api_id: Telegram's api_id acquired through my.telegram.org. :param api_hash: Telegram's api_hash. :param proxy: Optional proxy tuple/dictionary. """ print_title('Initialization') print('Initializing interactive example...') # The first step is to initialize the TelegramClient, as we are # subclassing it, we need to call super().__init__(). On a more # normal case you would want 'client = TelegramClient(...)' super().__init__( # These parameters should be passed always, session name and API session_user_id, api_id, api_hash, # You can optionally change the connection mode by passing a # type or an instance of it. This changes how the sent packets # look (low-level concept you normally shouldn't worry about). # Default is ConnectionTcpFull, smallest is ConnectionTcpAbridged. connection=ConnectionTcpAbridged, # If you're using a proxy, set it here. proxy=proxy ) # Store {message.id: message} map here so that we can download # media known the message ID, for every message having media. self.found_media = {} async def init(self): # Calling .connect() may raise a connection error False, so you need # to except those before continuing. Otherwise you may want to retry # as done here. print('Connecting to Telegram servers...') try: await self.connect() except IOError: # We handle IOError and not ConnectionError because # PySocks' errors do not subclass ConnectionError # (so this will work with and without proxies). print('Initial connection failed. Retrying...') await self.connect() # If the user hasn't called .sign_in() or .sign_up() yet, they won't # be authorized. The first thing you must do is authorize. Calling # .sign_in() should only be done once as the information is saved on # the *.session file so you don't need to enter the code every time. if not await self.is_user_authorized(): print('First run. Sending code request...') user_phone = input('Enter your phone: ') await self.sign_in(user_phone) self_user = None while self_user is None: code = input('Enter the code you just received: ') try: self_user = await self.sign_in(code=code) # Two-step verification may be enabled, and .sign_in will # raise this error. If that's the case ask for the password. # Note that getpass() may not work on PyCharm due to a bug, # if that's the case simply change it for input(). except SessionPasswordNeededError: pw = getpass('Two step verification is enabled. ' 'Please enter your password: ') self_user = await self.sign_in(password=pw) async def run(self): """Main loop of the TelegramClient, will wait for user action""" # Once everything is ready, we can add an event handler. # # Events are an abstraction over Telegram's "Updates" and # are much easier to use. self.add_event_handler(self.message_handler, events.NewMessage) # Enter a while loop to chat as long as the user wants while True: # Retrieve the top dialogs. You can set the limit to None to # retrieve all of them if you wish, but beware that may take # a long time if you have hundreds of them. dialog_count = 15 # Entities represent the user, chat or channel # corresponding to the dialog on the same index. dialogs = await self.get_dialogs(limit=dialog_count) i = None while i is None: print_title('Dialogs window') # Display them so the user can choose for i, dialog in enumerate(dialogs, start=1): sprint('{}. {}'.format(i, get_display_name(dialog.entity))) # Let the user decide who they want to talk to print() print('> Who do you want to send messages to?') print('> Available commands:') print(' !q: Quits the dialogs window and exits.') print(' !l: Logs out, terminating this session.') print() i = await async_input('Enter dialog ID or a command: ') if i == '!q': return if i == '!l': # Logging out will cause the user to need to reenter the # code next time they want to use the library, and will # also delete the *.session file off the filesystem. # # This is not the same as simply calling .disconnect(), # which simply shuts down everything gracefully. await self.log_out() return try: i = int(i if i else 0) - 1 # Ensure it is inside the bounds, otherwise retry if not 0 <= i < dialog_count: i = None except ValueError: i = None # Retrieve the selected user (or chat, or channel) entity = dialogs[i].entity # Show some information print_title('Chat with "{}"'.format(get_display_name(entity))) print('Available commands:') print(' !q: Quits the current chat.') print(' !Q: Quits the current chat and exits.') print(' !h: prints the latest messages (message History).') print(' !up : Uploads and sends the Photo from path.') print(' !uf : Uploads and sends the File from path.') print(' !d : Deletes a message by its id') print(' !dm : Downloads the given message Media (if any).') print(' !dp: Downloads the current dialog Profile picture.') print(' !i: Prints information about this chat..') print() # And start a while loop to chat while True: msg = await async_input('Enter a message: ') # Quit if msg == '!q': break elif msg == '!Q': return # History elif msg == '!h': # First retrieve the messages and some information messages = await self.get_messages(entity, limit=10) # Iterate over all (in reverse order so the latest appear # the last in the console) and print them with format: # "[hh:mm] Sender: Message" for msg in reversed(messages): # Note how we access .sender here. Since we made an # API call using the self client, it will always have # information about the sender. This is different to # events, where Telegram may not always send the user. name = get_display_name(msg.sender) # Format the message content if getattr(msg, 'media', None): self.found_media[msg.id] = msg content = '<{}> {}'.format( type(msg.media).__name__, msg.message) elif hasattr(msg, 'message'): content = msg.message elif hasattr(msg, 'action'): content = str(msg.action) else: # Unknown message, simply print its class name content = type(msg).__name__ # And print it to the user sprint('[{}:{}] (ID={}) {}: {}'.format( msg.date.hour, msg.date.minute, msg.id, name, content)) # Send photo elif msg.startswith('!up '): # Slice the message to get the path path = msg[len('!up '):] await self.send_photo(path=path, entity=entity) # Send file (document) elif msg.startswith('!uf '): # Slice the message to get the path path = msg[len('!uf '):] await self.send_document(path=path, entity=entity) # Delete messages elif msg.startswith('!d '): # Slice the message to get message ID msg = msg[len('!d '):] deleted_msg = await self.delete_messages(entity, msg) print('Deleted {}'.format(deleted_msg)) # Download media elif msg.startswith('!dm '): # Slice the message to get message ID await self.download_media_by_id(msg[len('!dm '):]) # Download profile photo elif msg == '!dp': print('Downloading profile picture to usermedia/...') os.makedirs('usermedia', exist_ok=True) output = await self.download_profile_photo(entity, 'usermedia') if output: print('Profile picture downloaded to', output) else: print('No profile picture found for this user!') elif msg == '!i': attributes = list(entity.to_dict().items()) pad = max(len(x) for x, _ in attributes) for name, val in attributes: print("{:<{width}} : {}".format(name, val, width=pad)) # Send chat message (if any) elif msg: await self.send_message(entity, msg, link_preview=False) async def send_photo(self, path, entity): """Sends the file located at path to the desired entity as a photo""" await self.send_file( entity, path, progress_callback=self.upload_progress_callback ) print('Photo sent!') async def send_document(self, path, entity): """Sends the file located at path to the desired entity as a document""" await self.send_file( entity, path, force_document=True, progress_callback=self.upload_progress_callback ) print('Document sent!') async def download_media_by_id(self, media_id): """Given a message ID, finds the media this message contained and downloads it. """ try: msg = self.found_media[int(media_id)] except (ValueError, KeyError): # ValueError when parsing, KeyError when accessing dictionary print('Invalid media ID given or message not found!') return print('Downloading media to usermedia/...') os.makedirs('usermedia', exist_ok=True) output = await self.download_media( msg.media, file='usermedia/', progress_callback=self.download_progress_callback ) print('Media downloaded to {}!'.format(output)) @staticmethod def download_progress_callback(downloaded_bytes, total_bytes): InteractiveTelegramClient.print_progress( 'Downloaded', downloaded_bytes, total_bytes ) @staticmethod def upload_progress_callback(uploaded_bytes, total_bytes): InteractiveTelegramClient.print_progress( 'Uploaded', uploaded_bytes, total_bytes ) @staticmethod def print_progress(progress_type, downloaded_bytes, total_bytes): print('{} {} out of {} ({:.2%})'.format( progress_type, bytes_to_string(downloaded_bytes), bytes_to_string(total_bytes), downloaded_bytes / total_bytes) ) async def message_handler(self, event): """Callback method for received events.NewMessage""" # Note that message_handler is called when a Telegram update occurs # and an event is created. Telegram may not always send information # about the ``.sender`` or the ``.chat``, so if you *really* want it # you should use ``get_chat()`` and ``get_sender()`` while working # with events. Since they are methods, you know they may make an API # call, which can be expensive. chat = await event.get_chat() if chat.is_group: if event.out: sprint('>> sent "{}" to chat {}'.format( event.text, get_display_name(chat) )) else: sprint('<< {} @ {} sent "{}"'.format( get_display_name(await event.get_sender()), get_display_name(chat), event.text )) else: if event.out: sprint('>> "{}" to user {}'.format( event.text, get_display_name(chat) )) else: sprint('<< {} sent "{}"'.format( get_display_name(chat), event.text )) async def main(): SESSION = os.environ.get('TG_SESSION', 'interactive') API_ID = get_env('TG_API_ID', 'Enter your API ID: ', int) API_HASH = get_env('TG_API_HASH', 'Enter your API hash: ') client = await InteractiveTelegramClient(SESSION, API_ID, API_HASH).init() await client.run() if __name__ == '__main__': asyncio.run(main())