scripts/bots/antiwhisper/bot.py
2024-12-07 03:32:26 +03:00

173 lines
5.5 KiB
Python

import os
import html
import json
import uuid
from typing import Dict
from telegram import (
InlineQueryResultArticle,
InputTextMessageContent,
InlineKeyboardButton,
InlineKeyboardMarkup,
Update
)
from telegram.ext import (
ApplicationBuilder,
InlineQueryHandler,
CallbackQueryHandler,
CommandHandler,
ContextTypes,
)
from telegram.constants import ParseMode
# File to store messages
MESSAGES_FILE = 'messages.json'
# In-memory store to keep track of secret messages
# Format: {unique_id: {"message": str, "target_username": str}}
SECRET_MESSAGES: Dict[str, Dict[str, str]] = {}
def load_messages():
if os.path.exists(MESSAGES_FILE):
with open(MESSAGES_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
# Ensure keys are strings and values are dicts
if isinstance(data, dict):
return data
return {}
def save_messages():
with open(MESSAGES_FILE, 'w', encoding='utf-8') as f:
json.dump(SECRET_MESSAGES, f, ensure_ascii=False, indent=2)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Simple /start command handler."""
await update.message.reply_text("Hi! I'm a whisper bot. Use inline mode in group chats.")
async def inline_query_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle inline queries. User should type something like: @YourBotName hello @username"""
query = update.inline_query.query.strip()
if not query:
return
# Attempt to parse out the target username.
# We'll assume the last word that starts with '@' is the target.
words = query.split()
target_username = None
for w in reversed(words):
if w.startswith('@'):
target_username = w
break
if target_username is None:
# If no target username found, just show a message that instructs how to use.
results = [
InlineQueryResultArticle(
id=str(uuid.uuid4()),
title="How to whisper",
input_message_content=InputTextMessageContent(
"Please mention a user with '@username' at the end of your message."
)
)
]
await update.inline_query.answer(results=results, cache_time=0)
return
# Extract the secret message (everything except the target username)
if words[-1] == target_username:
message_parts = words[:-1]
else:
# find last occurrence of target_username and remove it
idx = len(words) - 1 - words[::-1].index(target_username)
message_parts = words[:idx] + words[idx+1:]
secret_message = " ".join(message_parts).strip()
if not secret_message:
# If there's no secret message, prompt user.
results = [
InlineQueryResultArticle(
id=str(uuid.uuid4()),
title="No message provided",
input_message_content=InputTextMessageContent(
"Please provide a message before the @username."
)
)
]
await update.inline_query.answer(results=results, cache_time=0)
return
# Create a unique ID to store the message
unique_id = str(uuid.uuid4())
SECRET_MESSAGES[unique_id] = {
"message": secret_message,
"target_username": target_username.lower().strip('@')
}
# Save to file
save_messages()
# Display a "locked" message with a button
# The initial message visible to everyone: a "🔒 whisper message"
# The button will reveal the secret message to non-target users or show "соси" to the target user.
keyboard = InlineKeyboardMarkup([
[InlineKeyboardButton("Reveal", callback_data=unique_id)]
])
results = [
InlineQueryResultArticle(
id=unique_id,
title="Whisper",
description="Send a private whisper message",
input_message_content=InputTextMessageContent(
f"🔒 A whisper message to everyone except @{target_username.strip('@')}."
),
reply_markup=keyboard
)
]
await update.inline_query.answer(results=results, cache_time=0)
async def callback_query_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle button presses."""
query = update.callback_query
user = query.from_user
data = query.data
if data not in SECRET_MESSAGES:
await query.answer("Message not found.", show_alert=True)
return
secret_info = SECRET_MESSAGES[data]
secret_message = secret_info["message"]
target_username = secret_info["target_username"]
if user.username and user.username.lower() == target_username:
# Target user sees a private popup "соси" just for them
await query.answer("соси", show_alert=False)
else:
# Non-target users see the secret message publicly (edit the chat message)
await query.answer(secret_message, show_alert=True)
def main():
token = os.environ.get("TELEGRAM_BOT_TOKEN")
if not token:
raise RuntimeError("TELEGRAM_BOT_TOKEN environment variable not set.")
# Load previously stored messages
global SECRET_MESSAGES
SECRET_MESSAGES = load_messages()
application = ApplicationBuilder().token(token).build()
application.add_handler(CommandHandler("start", start))
application.add_handler(InlineQueryHandler(inline_query_handler))
application.add_handler(CallbackQueryHandler(callback_query_handler))
application.run_polling()
if __name__ == "__main__":
main()