mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-01-24 00:04:14 +03:00
Replace broken auto_replier.py with new code (#389)
This commit is contained in:
parent
403c7bd00a
commit
39a1d5e90d
BIN
telethon_examples/anytime.png
Normal file
BIN
telethon_examples/anytime.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 2.2 KiB |
|
@ -1,113 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
# disclaimer: you should not actually use this. it can be quite spammy.
|
||||
from telethon import TelegramClient
|
||||
from telethon.errors import SessionPasswordNeededError
|
||||
from getpass import getpass
|
||||
from telethon.tl.types import InputPeerUser,InputPeerChannel
|
||||
from telethon.tl.types import Updates
|
||||
from telethon.tl.types import UpdateNewChannelMessage,UpdateNewMessage
|
||||
from telethon.tl.functions.messages import SendMessageRequest,EditMessageRequest
|
||||
from telethon.tl.types import MessageService
|
||||
from nltk.tokenize import word_tokenize
|
||||
from os import environ
|
||||
from time import sleep
|
||||
|
||||
CHANNELS = {}
|
||||
CHANNELNAMES = {}
|
||||
USERS = {}
|
||||
EMACS_BLACKLIST = [1058260578, # si @linux_group
|
||||
123456789]
|
||||
REACTS = {'emacs':'Needs more vim.',
|
||||
'chrome':'Needs more firefox.',
|
||||
}
|
||||
|
||||
class NeedsMore(TelegramClient):
|
||||
def __init__(self):
|
||||
settings = {'api_id':int(environ['TG_API_ID']),
|
||||
'api_hash':environ['TG_API_HASH'],
|
||||
'user_phone':environ['TG_PHONE'],
|
||||
'session_name':'needsmore'}
|
||||
super().__init__(
|
||||
settings.get('session_name','session1'),
|
||||
settings['api_id'],
|
||||
settings['api_hash'],
|
||||
proxy=None,
|
||||
process_updates=True)
|
||||
|
||||
user_phone = settings['user_phone']
|
||||
|
||||
print('INFO: Connecting to Telegram Servers...', end='', flush=True)
|
||||
self.connect()
|
||||
print('Done!')
|
||||
|
||||
if not self.is_user_authorized():
|
||||
print('INFO: Unauthorized user')
|
||||
self.send_code_request(user_phone)
|
||||
code_ok = False
|
||||
while not code_ok:
|
||||
code = input('Enter the auth code: ')
|
||||
try:
|
||||
code_ok = self.sign_in(user_phone, code)
|
||||
except SessionPasswordNeededError:
|
||||
pw = getpass('Two step verification enabled. Please enter your password: ')
|
||||
self.sign_in(password=pw)
|
||||
print('INFO: Client initialized succesfully!')
|
||||
|
||||
def run(self):
|
||||
# Listen for updates
|
||||
while True:
|
||||
update = self.updates.poll() # This will block until an update is available
|
||||
triggers = []
|
||||
if isinstance(update, Updates):
|
||||
for x in update.updates:
|
||||
if not isinstance(x,UpdateNewChannelMessage): continue
|
||||
if isinstance(x.message,MessageService): continue
|
||||
# We're only interested in messages to supergroups
|
||||
words = word_tokenize(x.message.message.lower())
|
||||
# Avoid matching 'emacs' in 'spacemacs' and similar
|
||||
if 'emacs' in words and x.message.to_id.channel_id not in EMACS_BLACKLIST:
|
||||
triggers.append(('emacs',x.message))
|
||||
if 'chrome' in words:
|
||||
triggers.append(('chrome',x.message))
|
||||
if 'x files theme' == x.message.message.lower() and x.message.out:
|
||||
# Automatically reply to yourself saying 'x files theme' with the audio
|
||||
msg = x.message
|
||||
chan = InputPeerChannel(msg.to_id.channel_id,CHANNELS[msg.to_id.channel_id])
|
||||
self.send_voice_note(chan,'xfiles.m4a',reply_to=msg.id)
|
||||
sleep(1)
|
||||
if '.shrug' in x.message.message.lower() and x.message.out:
|
||||
# Automatically replace '.shrug' in any message you
|
||||
# send to a supergroup with the shrug emoticon
|
||||
msg = x.message
|
||||
chan = InputPeerChannel(msg.to_id.channel_id,CHANNELS[msg.to_id.channel_id])
|
||||
self(EditMessageRequest(chan,msg.id,
|
||||
message=msg.message.replace('.shrug','¯\_(ツ)_/¯')))
|
||||
sleep(1)
|
||||
|
||||
for trigger in triggers:
|
||||
msg = trigger[1]
|
||||
chan = InputPeerChannel(msg.to_id.channel_id,CHANNELS[msg.to_id.channel_id])
|
||||
log_chat = InputPeerUser(user_id=123456789,access_hash=987654321234567890)
|
||||
self.send_message(log_chat,"{} said {} in {}. Sending react {}".format(
|
||||
msg.from_id,msg.message,CHANNELNAMES[msg.to_id.channel_id],REACTS[trigger[0]][:20]))
|
||||
react = '>{}\n{}'.format(trigger[0],REACTS[trigger[0]])
|
||||
self.invoke(SendMessageRequest(chan,react,reply_to_msg_id=msg.id))
|
||||
sleep(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
#TODO: this block could be moved to __init__
|
||||
# You can create these text files using https://github.com/LonamiWebs/Telethon/wiki/Retrieving-all-dialogs
|
||||
with open('channels.txt','r') as f:
|
||||
# Format: channel_id access_hash #Channel Name
|
||||
lines = f.readlines()
|
||||
chans = [l.split(' #',1)[0].split(' ') for l in lines]
|
||||
CHANNELS = {int(c[0]):int(c[1]) for c in chans} # id:hash
|
||||
CHANNELNAMES = {int(l.split()[0]):l.split('#',1)[1].strip() for l in lines} #id:name
|
||||
with open('users','r') as f:
|
||||
# Format: [user_id, access_hash, 'username', 'Firstname Lastname']
|
||||
lines = f.readlines()
|
||||
uss = [l.strip()[1:-1].split(',') for l in lines]
|
||||
USERS = {int(user[0]):int(user[1]) for user in uss} # id:hash
|
||||
|
||||
needsmore = NeedsMore()
|
||||
needsmore.run()
|
137
telethon_examples/replier.py
Executable file
137
telethon_examples/replier.py
Executable file
|
@ -0,0 +1,137 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
A example script to automatically send messages based on certain triggers.
|
||||
|
||||
The script makes uses of environment variables to determine the API ID,
|
||||
hash, phone and such to be used. You may want to add these to your .bashrc
|
||||
file, including TG_API_ID, TG_API_HASH, TG_PHONE and optionally TG_SESSION.
|
||||
|
||||
This script assumes that you have certain files on the working directory,
|
||||
such as "xfiles.m4a" or "anytime.png" for some of the automated replies.
|
||||
"""
|
||||
from getpass import getpass
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
from os import environ
|
||||
|
||||
import re
|
||||
|
||||
from telethon import TelegramClient
|
||||
from telethon.errors import SessionPasswordNeededError
|
||||
from telethon.tl.types import UpdateNewChannelMessage, UpdateShortMessage, MessageService
|
||||
from telethon.tl.functions.messages import EditMessageRequest
|
||||
|
||||
"""Uncomment this for debugging
|
||||
import logging
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logging.debug('dbg')
|
||||
logging.info('info')
|
||||
"""
|
||||
|
||||
REACTS = {'emacs': 'Needs more vim',
|
||||
'chrome': 'Needs more Firefox'}
|
||||
|
||||
# A list of dates of reactions we've sent, so we can keep track of floods
|
||||
recent_reacts = defaultdict(list)
|
||||
|
||||
|
||||
def update_handler(update):
|
||||
global recent_reacts
|
||||
try:
|
||||
msg = update.message
|
||||
except AttributeError:
|
||||
# print(update, 'did not have update.message')
|
||||
return
|
||||
if isinstance(msg, MessageService):
|
||||
print(msg, 'was service msg')
|
||||
return
|
||||
|
||||
# React to messages in supergroups and PMs
|
||||
if isinstance(update, UpdateNewChannelMessage):
|
||||
words = re.split('\W+', msg.message)
|
||||
for trigger, response in REACTS.items():
|
||||
if len(recent_reacts[msg.to_id.channel_id]) > 3:
|
||||
# Silently ignore triggers if we've recently sent 3 reactions
|
||||
break
|
||||
|
||||
if trigger in words:
|
||||
# Remove recent replies older than 10 minutes
|
||||
recent_reacts[msg.to_id.channel_id] = [
|
||||
a for a in recent_reacts[msg.to_id.channel_id] if
|
||||
datetime.now() - a < timedelta(minutes=10)
|
||||
]
|
||||
# Send a reaction
|
||||
client.send_message(msg.to_id, response, reply_to=msg.id)
|
||||
# Add this reaction to the list of recent actions
|
||||
recent_reacts[msg.to_id.channel_id].append(datetime.now())
|
||||
|
||||
if isinstance(update, UpdateShortMessage):
|
||||
words = re.split('\W+', msg)
|
||||
for trigger, response in REACTS.items():
|
||||
if len(recent_reacts[update.user_id]) > 3:
|
||||
# Silently ignore triggers if we've recently sent 3 reactions
|
||||
break
|
||||
|
||||
if trigger in words:
|
||||
# Send a reaction
|
||||
client.send_message(update.user_id, response, reply_to=update.id)
|
||||
# Add this reaction to the list of recent reactions
|
||||
recent_reacts[update.user_id].append(datetime.now())
|
||||
|
||||
# Automatically send relevant media when we say certain things
|
||||
# When invoking requests, get_input_entity needs to be called manually
|
||||
if isinstance(update, UpdateNewChannelMessage) and msg.out:
|
||||
if msg.message.lower() == 'x files theme':
|
||||
client.send_voice_note(msg.to_id, 'xfiles.m4a', reply_to=msg.id)
|
||||
if msg.message.lower() == 'anytime':
|
||||
client.send_file(msg.to_id, 'anytime.png', reply_to=msg.id)
|
||||
if '.shrug' in msg.message:
|
||||
client(EditMessageRequest(
|
||||
client.get_input_entity(msg.to_id), msg.id,
|
||||
message=msg.message.replace('.shrug', r'¯\_(ツ)_/¯')
|
||||
))
|
||||
|
||||
if isinstance(update, UpdateShortMessage) and update.out:
|
||||
if msg.lower() == 'x files theme':
|
||||
client.send_voice_note(update.user_id, 'xfiles.m4a', reply_to=update.id)
|
||||
if msg.lower() == 'anytime':
|
||||
client.send_file(update.user_id, 'anytime.png', reply_to=update.id)
|
||||
if '.shrug' in msg:
|
||||
client(EditMessageRequest(
|
||||
client.get_input_entity(update.user_id), update.id,
|
||||
message=msg.replace('.shrug', r'¯\_(ツ)_/¯')
|
||||
))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
session_name = environ.get('TG_SESSION', 'session')
|
||||
user_phone = environ['TG_PHONE']
|
||||
client = TelegramClient(
|
||||
session_name, int(environ['TG_API_ID']), environ['TG_API_HASH'],
|
||||
proxy=None, update_workers=4
|
||||
)
|
||||
try:
|
||||
print('INFO: Connecting to Telegram Servers...', end='', flush=True)
|
||||
client.connect()
|
||||
print('Done!')
|
||||
|
||||
if not client.is_user_authorized():
|
||||
print('INFO: Unauthorized user')
|
||||
client.send_code_request(user_phone)
|
||||
code_ok = False
|
||||
while not code_ok:
|
||||
code = input('Enter the auth code: ')
|
||||
try:
|
||||
code_ok = client.sign_in(user_phone, code)
|
||||
except SessionPasswordNeededError:
|
||||
password = getpass('Two step verification enabled. '
|
||||
'Please enter your password: ')
|
||||
code_ok = client.sign_in(password=password)
|
||||
print('INFO: Client initialized successfully!')
|
||||
|
||||
client.add_update_handler(update_handler)
|
||||
input('Press Enter to stop this!\n')
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
client.disconnect()
|
Loading…
Reference in New Issue
Block a user