import os from getpass import getpass from telethon.utils import get_display_name from telethon import ConnectionMode, TelegramClient from telethon.errors import SessionPasswordNeededError from telethon.tl.types import ( PeerChat, UpdateShortChatMessage, UpdateShortMessage ) def sprint(string, *args, **kwargs): """Safe Print (handle UnicodeEncodeErrors on some terminals)""" try: print(string, *args, **kwargs) except UnicodeEncodeError: string = string.encode('utf-8', errors='ignore')\ .decode('ascii', errors='ignore') print(string, *args, **kwargs) def print_title(title): """Helper function to print titles to the console more nicely""" sprint('\n') sprint('=={}=='.format('=' * len(title))) sprint('= {} ='.format(title)) sprint('=={}=='.format('=' * len(title))) def bytes_to_string(byte_count): """Converts a byte count to a string (in KB, MB...)""" suffix_index = 0 while byte_count >= 1024: byte_count /= 1024 suffix_index += 1 return '{:.2f}{}'.format( byte_count, [' bytes', 'KB', 'MB', 'GB', 'TB'][suffix_index] ) class InteractiveTelegramClient(TelegramClient): """Full featured Telegram client, meant to be used on an interactive session to see what Telethon is capable off - This client allows the user to perform some basic interaction with Telegram through Telethon, such as listing dialogs (open chats), talking to people, downloading media, and receiving updates. """ def __init__(self, session_user_id, user_phone, api_id, api_hash, proxy=None): """ Initializes the InteractiveTelegramClient. :param session_user_id: Name of the *.session file. :param user_phone: The phone of the user that will login. :param api_id: Telegram's api_id acquired through my.telegram.org. :param api_hash: Telegram's api_hash. :param proxy: Optional proxy tuple/dictionary. """ print_title('Initialization') print('Initializing interactive example...') # The first step is to initialize the TelegramClient, as we are # subclassing it, we need to call super().__init__(). On a more # normal case you would want 'client = TelegramClient(...)' super().__init__( # These parameters should be passed always, session name and API session_user_id, api_id, api_hash, # You can optionally change the connection mode by using this enum. # This changes how much data will be sent over the network with # every request, and how it will be formatted. Default is # ConnectionMode.TCP_FULL, and smallest is TCP_TCP_ABRIDGED. connection_mode=ConnectionMode.TCP_ABRIDGED, # If you're using a proxy, set it here. proxy=proxy, # If you want to receive updates, you need to start one or more # "update workers" which are background threads that will allow # you to run things when your update handlers (callbacks) are # called with an Update object. update_workers=1 ) # Store {message.id: message} map here so that we can download # media known the message ID, for every message having media. self.found_media = {} # Calling .connect() may return False, so you need to assert it's # True before continuing. Otherwise you may want to retry as done here. print('Connecting to Telegram servers...') if not self.connect(): print('Initial connection failed. Retrying...') if not self.connect(): print('Could not connect to Telegram servers.') return # If the user hasn't called .sign_in() or .sign_up() yet, they won't # be authorized. The first thing you must do is authorize. Calling # .sign_in() should only be done once as the information is saved on # the *.session file so you don't need to enter the code every time. if not self.is_user_authorized(): print('First run. Sending code request...') self.sign_in(user_phone) self_user = None while self_user is None: code = input('Enter the code you just received: ') try: self_user = self.sign_in(code=code) # Two-step verification may be enabled, and .sign_in will # raise this error. If that's the case ask for the password. # Note that getpass() may not work on PyCharm due to a bug, # if that's the case simply change it for input(). except SessionPasswordNeededError: pw = getpass('Two step verification is enabled. ' 'Please enter your password: ') self_user = self.sign_in(password=pw) def run(self): """Main loop of the TelegramClient, will wait for user action""" # Once everything is ready, we can add an update handler. Every # update object will be passed to the self.update_handler method, # where we can process it as we need. self.add_update_handler(self.update_handler) # Enter a while loop to chat as long as the user wants while True: # Retrieve the top dialogs. You can set the limit to None to # retrieve all of them if you wish, but beware that may take # a long time if you have hundreds of them. dialog_count = 15 # Entities represent the user, chat or channel # corresponding to the dialog on the same index. dialogs = self.get_dialogs(limit=dialog_count) i = None while i is None: print_title('Dialogs window') # Display them so the user can choose for i, dialog in enumerate(dialogs, start=1): sprint('{}. {}'.format(i, get_display_name(dialog.entity))) # Let the user decide who they want to talk to print() print('> Who do you want to send messages to?') print('> Available commands:') print(' !q: Quits the dialogs window and exits.') print(' !l: Logs out, terminating this session.') print() i = input('Enter dialog ID or a command: ') if i == '!q': return if i == '!l': # Logging out will cause the user to need to reenter the # code next time they want to use the library, and will # also delete the *.session file off the filesystem. # # This is not the same as simply calling .disconnect(), # which simply shuts down everything gracefully. self.log_out() return try: i = int(i if i else 0) - 1 # Ensure it is inside the bounds, otherwise retry if not 0 <= i < dialog_count: i = None except ValueError: i = None # Retrieve the selected user (or chat, or channel) entity = dialogs[i].entity # Show some information print_title('Chat with "{}"'.format(get_display_name(entity))) print('Available commands:') print(' !q: Quits the current chat.') print(' !Q: Quits the current chat and exits.') print(' !h: prints the latest messages (message History).') print(' !up : Uploads and sends the Photo from path.') print(' !uf : Uploads and sends the File from path.') print(' !d : Deletes a message by its id') print(' !dm : Downloads the given message Media (if any).') print(' !dp: Downloads the current dialog Profile picture.') print(' !i: Prints information about this chat..') print() # And start a while loop to chat while True: msg = input('Enter a message: ') # Quit if msg == '!q': break elif msg == '!Q': return # History elif msg == '!h': # First retrieve the messages and some information messages = self.get_message_history(entity, limit=10) # Iterate over all (in reverse order so the latest appear # the last in the console) and print them with format: # "[hh:mm] Sender: Message" for msg in reversed(messages): # Note that the .sender attribute is only there for # convenience, the API returns it differently. But # this shouldn't concern us. See the documentation # for .get_message_history() for more information. name = get_display_name(msg.sender) # Format the message content if getattr(msg, 'media', None): self.found_media[msg.id] = msg content = '<{}> {}'.format( type(msg.media).__name__, msg.message) elif hasattr(msg, 'message'): content = msg.message elif hasattr(msg, 'action'): content = str(msg.action) else: # Unknown message, simply print its class name content = type(msg).__name__ # And print it to the user sprint('[{}:{}] (ID={}) {}: {}'.format( msg.date.hour, msg.date.minute, msg.id, name, content)) # Send photo elif msg.startswith('!up '): # Slice the message to get the path self.send_photo(path=msg[len('!up '):], entity=entity) # Send file (document) elif msg.startswith('!uf '): # Slice the message to get the path self.send_document(path=msg[len('!uf '):], entity=entity) # Delete messages elif msg.startswith('!d '): # Slice the message to get message ID deleted_msg = self.delete_messages(entity, msg[len('!d '):]) print('Deleted {}'.format(deleted_msg)) # Download media elif msg.startswith('!dm '): # Slice the message to get message ID self.download_media_by_id(msg[len('!dm '):]) # Download profile photo elif msg == '!dp': print('Downloading profile picture to usermedia/...') os.makedirs('usermedia', exist_ok=True) output = self.download_profile_photo(entity, 'usermedia') if output: print('Profile picture downloaded to', output) else: print('No profile picture found for this user!') elif msg == '!i': attributes = list(entity.to_dict().items()) pad = max(len(x) for x, _ in attributes) for name, val in attributes: print("{:<{width}} : {}".format(name, val, width=pad)) # Send chat message (if any) elif msg: self.send_message(entity, msg, link_preview=False) def send_photo(self, path, entity): """Sends the file located at path to the desired entity as a photo""" self.send_file( entity, path, progress_callback=self.upload_progress_callback ) print('Photo sent!') def send_document(self, path, entity): """Sends the file located at path to the desired entity as a document""" self.send_file( entity, path, force_document=True, progress_callback=self.upload_progress_callback ) print('Document sent!') def download_media_by_id(self, media_id): """Given a message ID, finds the media this message contained and downloads it. """ try: msg = self.found_media[int(media_id)] except (ValueError, KeyError): # ValueError when parsing, KeyError when accessing dictionary print('Invalid media ID given or message not found!') return print('Downloading media to usermedia/...') os.makedirs('usermedia', exist_ok=True) output = self.download_media( msg.media, file='usermedia/', progress_callback=self.download_progress_callback ) print('Media downloaded to {}!'.format(output)) @staticmethod def download_progress_callback(downloaded_bytes, total_bytes): InteractiveTelegramClient.print_progress( 'Downloaded', downloaded_bytes, total_bytes ) @staticmethod def upload_progress_callback(uploaded_bytes, total_bytes): InteractiveTelegramClient.print_progress( 'Uploaded', uploaded_bytes, total_bytes ) @staticmethod def print_progress(progress_type, downloaded_bytes, total_bytes): print('{} {} out of {} ({:.2%})'.format( progress_type, bytes_to_string(downloaded_bytes), bytes_to_string(total_bytes), downloaded_bytes / total_bytes) ) def update_handler(self, update): """Callback method for received Updates""" # We have full control over what we want to do with the updates. # In our case we only want to react to chat messages, so we use # isinstance() to behave accordingly on these cases. if isinstance(update, UpdateShortMessage): who = self.get_entity(update.user_id) if update.out: sprint('>> "{}" to user {}'.format( update.message, get_display_name(who) )) else: sprint('<< {} sent "{}"'.format( get_display_name(who), update.message )) elif isinstance(update, UpdateShortChatMessage): which = self.get_entity(PeerChat(update.chat_id)) if update.out: sprint('>> sent "{}" to chat {}'.format( update.message, get_display_name(which) )) else: who = self.get_entity(update.from_id) sprint('<< {} @ {} sent "{}"'.format( get_display_name(which), get_display_name(who), update.message ))