diff --git a/telethon_examples/anytime.png b/telethon_examples/anytime.png new file mode 100644 index 00000000..c8663cfa Binary files /dev/null and b/telethon_examples/anytime.png differ diff --git a/telethon_examples/auto_reply.py b/telethon_examples/auto_reply.py deleted file mode 100755 index 2af7d8ca..00000000 --- a/telethon_examples/auto_reply.py +++ /dev/null @@ -1,113 +0,0 @@ -#!/usr/bin/env python3 -# disclaimer: you should not actually use this. it can be quite spammy. -from telethon import TelegramClient -from telethon.errors import SessionPasswordNeededError -from getpass import getpass -from telethon.tl.types import InputPeerUser,InputPeerChannel -from telethon.tl.types import Updates -from telethon.tl.types import UpdateNewChannelMessage,UpdateNewMessage -from telethon.tl.functions.messages import SendMessageRequest,EditMessageRequest -from telethon.tl.types import MessageService -from nltk.tokenize import word_tokenize -from os import environ -from time import sleep - -CHANNELS = {} -CHANNELNAMES = {} -USERS = {} -EMACS_BLACKLIST = [1058260578, # si @linux_group - 123456789] -REACTS = {'emacs':'Needs more vim.', - 'chrome':'Needs more firefox.', -} - -class NeedsMore(TelegramClient): - def __init__(self): - settings = {'api_id':int(environ['TG_API_ID']), - 'api_hash':environ['TG_API_HASH'], - 'user_phone':environ['TG_PHONE'], - 'session_name':'needsmore'} - super().__init__( - settings.get('session_name','session1'), - settings['api_id'], - settings['api_hash'], - proxy=None, - process_updates=True) - - user_phone = settings['user_phone'] - - print('INFO: Connecting to Telegram Servers...', end='', flush=True) - self.connect() - print('Done!') - - if not self.is_user_authorized(): - print('INFO: Unauthorized user') - self.send_code_request(user_phone) - code_ok = False - while not code_ok: - code = input('Enter the auth code: ') - try: - code_ok = self.sign_in(user_phone, code) - except SessionPasswordNeededError: - pw = getpass('Two step verification enabled. Please enter your password: ') - self.sign_in(password=pw) - print('INFO: Client initialized succesfully!') - - def run(self): - # Listen for updates - while True: - update = self.updates.poll() # This will block until an update is available - triggers = [] - if isinstance(update, Updates): - for x in update.updates: - if not isinstance(x,UpdateNewChannelMessage): continue - if isinstance(x.message,MessageService): continue - # We're only interested in messages to supergroups - words = word_tokenize(x.message.message.lower()) - # Avoid matching 'emacs' in 'spacemacs' and similar - if 'emacs' in words and x.message.to_id.channel_id not in EMACS_BLACKLIST: - triggers.append(('emacs',x.message)) - if 'chrome' in words: - triggers.append(('chrome',x.message)) - if 'x files theme' == x.message.message.lower() and x.message.out: - # Automatically reply to yourself saying 'x files theme' with the audio - msg = x.message - chan = InputPeerChannel(msg.to_id.channel_id,CHANNELS[msg.to_id.channel_id]) - self.send_voice_note(chan,'xfiles.m4a',reply_to=msg.id) - sleep(1) - if '.shrug' in x.message.message.lower() and x.message.out: - # Automatically replace '.shrug' in any message you - # send to a supergroup with the shrug emoticon - msg = x.message - chan = InputPeerChannel(msg.to_id.channel_id,CHANNELS[msg.to_id.channel_id]) - self(EditMessageRequest(chan,msg.id, - message=msg.message.replace('.shrug','¯\_(ツ)_/¯'))) - sleep(1) - - for trigger in triggers: - msg = trigger[1] - chan = InputPeerChannel(msg.to_id.channel_id,CHANNELS[msg.to_id.channel_id]) - log_chat = InputPeerUser(user_id=123456789,access_hash=987654321234567890) - self.send_message(log_chat,"{} said {} in {}. Sending react {}".format( - msg.from_id,msg.message,CHANNELNAMES[msg.to_id.channel_id],REACTS[trigger[0]][:20])) - react = '>{}\n{}'.format(trigger[0],REACTS[trigger[0]]) - self.invoke(SendMessageRequest(chan,react,reply_to_msg_id=msg.id)) - sleep(1) - -if __name__ == "__main__": - #TODO: this block could be moved to __init__ - # You can create these text files using https://github.com/LonamiWebs/Telethon/wiki/Retrieving-all-dialogs - with open('channels.txt','r') as f: - # Format: channel_id access_hash #Channel Name - lines = f.readlines() - chans = [l.split(' #',1)[0].split(' ') for l in lines] - CHANNELS = {int(c[0]):int(c[1]) for c in chans} # id:hash - CHANNELNAMES = {int(l.split()[0]):l.split('#',1)[1].strip() for l in lines} #id:name - with open('users','r') as f: - # Format: [user_id, access_hash, 'username', 'Firstname Lastname'] - lines = f.readlines() - uss = [l.strip()[1:-1].split(',') for l in lines] - USERS = {int(user[0]):int(user[1]) for user in uss} # id:hash - - needsmore = NeedsMore() - needsmore.run() diff --git a/telethon_examples/replier.py b/telethon_examples/replier.py new file mode 100755 index 00000000..66026363 --- /dev/null +++ b/telethon_examples/replier.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +""" +A example script to automatically send messages based on certain triggers. + +The script makes uses of environment variables to determine the API ID, +hash, phone and such to be used. You may want to add these to your .bashrc +file, including TG_API_ID, TG_API_HASH, TG_PHONE and optionally TG_SESSION. + +This script assumes that you have certain files on the working directory, +such as "xfiles.m4a" or "anytime.png" for some of the automated replies. +""" +from getpass import getpass +from collections import defaultdict +from datetime import datetime, timedelta +from os import environ + +import re + +from telethon import TelegramClient +from telethon.errors import SessionPasswordNeededError +from telethon.tl.types import UpdateNewChannelMessage, UpdateShortMessage, MessageService +from telethon.tl.functions.messages import EditMessageRequest + +"""Uncomment this for debugging +import logging +logging.basicConfig(level=logging.DEBUG) +logging.debug('dbg') +logging.info('info') +""" + +REACTS = {'emacs': 'Needs more vim', + 'chrome': 'Needs more Firefox'} + +# A list of dates of reactions we've sent, so we can keep track of floods +recent_reacts = defaultdict(list) + + +def update_handler(update): + global recent_reacts + try: + msg = update.message + except AttributeError: + # print(update, 'did not have update.message') + return + if isinstance(msg, MessageService): + print(msg, 'was service msg') + return + + # React to messages in supergroups and PMs + if isinstance(update, UpdateNewChannelMessage): + words = re.split('\W+', msg.message) + for trigger, response in REACTS.items(): + if len(recent_reacts[msg.to_id.channel_id]) > 3: + # Silently ignore triggers if we've recently sent 3 reactions + break + + if trigger in words: + # Remove recent replies older than 10 minutes + recent_reacts[msg.to_id.channel_id] = [ + a for a in recent_reacts[msg.to_id.channel_id] if + datetime.now() - a < timedelta(minutes=10) + ] + # Send a reaction + client.send_message(msg.to_id, response, reply_to=msg.id) + # Add this reaction to the list of recent actions + recent_reacts[msg.to_id.channel_id].append(datetime.now()) + + if isinstance(update, UpdateShortMessage): + words = re.split('\W+', msg) + for trigger, response in REACTS.items(): + if len(recent_reacts[update.user_id]) > 3: + # Silently ignore triggers if we've recently sent 3 reactions + break + + if trigger in words: + # Send a reaction + client.send_message(update.user_id, response, reply_to=update.id) + # Add this reaction to the list of recent reactions + recent_reacts[update.user_id].append(datetime.now()) + + # Automatically send relevant media when we say certain things + # When invoking requests, get_input_entity needs to be called manually + if isinstance(update, UpdateNewChannelMessage) and msg.out: + if msg.message.lower() == 'x files theme': + client.send_voice_note(msg.to_id, 'xfiles.m4a', reply_to=msg.id) + if msg.message.lower() == 'anytime': + client.send_file(msg.to_id, 'anytime.png', reply_to=msg.id) + if '.shrug' in msg.message: + client(EditMessageRequest( + client.get_input_entity(msg.to_id), msg.id, + message=msg.message.replace('.shrug', r'¯\_(ツ)_/¯') + )) + + if isinstance(update, UpdateShortMessage) and update.out: + if msg.lower() == 'x files theme': + client.send_voice_note(update.user_id, 'xfiles.m4a', reply_to=update.id) + if msg.lower() == 'anytime': + client.send_file(update.user_id, 'anytime.png', reply_to=update.id) + if '.shrug' in msg: + client(EditMessageRequest( + client.get_input_entity(update.user_id), update.id, + message=msg.replace('.shrug', r'¯\_(ツ)_/¯') + )) + + +if __name__ == '__main__': + session_name = environ.get('TG_SESSION', 'session') + user_phone = environ['TG_PHONE'] + client = TelegramClient( + session_name, int(environ['TG_API_ID']), environ['TG_API_HASH'], + proxy=None, update_workers=4 + ) + try: + print('INFO: Connecting to Telegram Servers...', end='', flush=True) + client.connect() + print('Done!') + + if not client.is_user_authorized(): + print('INFO: Unauthorized user') + client.send_code_request(user_phone) + code_ok = False + while not code_ok: + code = input('Enter the auth code: ') + try: + code_ok = client.sign_in(user_phone, code) + except SessionPasswordNeededError: + password = getpass('Two step verification enabled. ' + 'Please enter your password: ') + code_ok = client.sign_in(password=password) + print('INFO: Client initialized successfully!') + + client.add_update_handler(update_handler) + input('Press Enter to stop this!\n') + except KeyboardInterrupt: + pass + finally: + client.disconnect()