added more sample bots

This commit is contained in:
Alexander Karpov 2024-12-07 03:32:26 +03:00
parent a4543a9d2c
commit cb6e37274c
8 changed files with 508 additions and 0 deletions

View File

@ -0,0 +1,16 @@
FROM python:3.10-slim
# Set work directory
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy bot code
COPY bot.py .
# Set environment variables (you can also set them in docker-compose or externally)
# ENV TELEGRAM_BOT_TOKEN=YOUR_BOT_TOKEN
CMD ["python", "bot.py"]

View File

172
bots/antiwhisper/bot.py Normal file
View File

@ -0,0 +1,172 @@
import os
import html
import json
import uuid
from typing import Dict
from telegram import (
InlineQueryResultArticle,
InputTextMessageContent,
InlineKeyboardButton,
InlineKeyboardMarkup,
Update
)
from telegram.ext import (
ApplicationBuilder,
InlineQueryHandler,
CallbackQueryHandler,
CommandHandler,
ContextTypes,
)
from telegram.constants import ParseMode
# File to store messages
MESSAGES_FILE = 'messages.json'
# In-memory store to keep track of secret messages
# Format: {unique_id: {"message": str, "target_username": str}}
SECRET_MESSAGES: Dict[str, Dict[str, str]] = {}
def load_messages():
if os.path.exists(MESSAGES_FILE):
with open(MESSAGES_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
# Ensure keys are strings and values are dicts
if isinstance(data, dict):
return data
return {}
def save_messages():
with open(MESSAGES_FILE, 'w', encoding='utf-8') as f:
json.dump(SECRET_MESSAGES, f, ensure_ascii=False, indent=2)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Simple /start command handler."""
await update.message.reply_text("Hi! I'm a whisper bot. Use inline mode in group chats.")
async def inline_query_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle inline queries. User should type something like: @YourBotName hello @username"""
query = update.inline_query.query.strip()
if not query:
return
# Attempt to parse out the target username.
# We'll assume the last word that starts with '@' is the target.
words = query.split()
target_username = None
for w in reversed(words):
if w.startswith('@'):
target_username = w
break
if target_username is None:
# If no target username found, just show a message that instructs how to use.
results = [
InlineQueryResultArticle(
id=str(uuid.uuid4()),
title="How to whisper",
input_message_content=InputTextMessageContent(
"Please mention a user with '@username' at the end of your message."
)
)
]
await update.inline_query.answer(results=results, cache_time=0)
return
# Extract the secret message (everything except the target username)
if words[-1] == target_username:
message_parts = words[:-1]
else:
# find last occurrence of target_username and remove it
idx = len(words) - 1 - words[::-1].index(target_username)
message_parts = words[:idx] + words[idx+1:]
secret_message = " ".join(message_parts).strip()
if not secret_message:
# If there's no secret message, prompt user.
results = [
InlineQueryResultArticle(
id=str(uuid.uuid4()),
title="No message provided",
input_message_content=InputTextMessageContent(
"Please provide a message before the @username."
)
)
]
await update.inline_query.answer(results=results, cache_time=0)
return
# Create a unique ID to store the message
unique_id = str(uuid.uuid4())
SECRET_MESSAGES[unique_id] = {
"message": secret_message,
"target_username": target_username.lower().strip('@')
}
# Save to file
save_messages()
# Display a "locked" message with a button
# The initial message visible to everyone: a "🔒 whisper message"
# The button will reveal the secret message to non-target users or show "соси" to the target user.
keyboard = InlineKeyboardMarkup([
[InlineKeyboardButton("Reveal", callback_data=unique_id)]
])
results = [
InlineQueryResultArticle(
id=unique_id,
title="Whisper",
description="Send a private whisper message",
input_message_content=InputTextMessageContent(
f"🔒 A whisper message to everyone except @{target_username.strip('@')}."
),
reply_markup=keyboard
)
]
await update.inline_query.answer(results=results, cache_time=0)
async def callback_query_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle button presses."""
query = update.callback_query
user = query.from_user
data = query.data
if data not in SECRET_MESSAGES:
await query.answer("Message not found.", show_alert=True)
return
secret_info = SECRET_MESSAGES[data]
secret_message = secret_info["message"]
target_username = secret_info["target_username"]
if user.username and user.username.lower() == target_username:
# Target user sees a private popup "соси" just for them
await query.answer("соси", show_alert=False)
else:
# Non-target users see the secret message publicly (edit the chat message)
await query.answer(secret_message, show_alert=True)
def main():
token = os.environ.get("TELEGRAM_BOT_TOKEN")
if not token:
raise RuntimeError("TELEGRAM_BOT_TOKEN environment variable not set.")
# Load previously stored messages
global SECRET_MESSAGES
SECRET_MESSAGES = load_messages()
application = ApplicationBuilder().token(token).build()
application.add_handler(CommandHandler("start", start))
application.add_handler(InlineQueryHandler(inline_query_handler))
application.add_handler(CallbackQueryHandler(callback_query_handler))
application.run_polling()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,8 @@
version: '3'
services:
telegram-bot:
build: .
container_name: telegram_bot
restart: always
env_file:
- .env

View File

@ -0,0 +1,78 @@
{
"cef6279e-0edf-41b2-8e00-cd75cfdab5d5": {
"message": "bebra",
"target_username": "sanspie"
},
"81821a7d-57b0-41b4-933d-3e848e7ae487": {
"message": "bebra",
"target_username": ""
},
"6e489fc1-ee27-4df4-93eb-9c1aae9231d1": {
"message": "bebra",
"target_username": "sanspie"
},
"cf76ebcb-3ba7-4c23-8397-a2f815bdc670": {
"message": "хуй",
"target_username": ""
},
"4e2f8eb5-c3cb-414b-9feb-c7fc7d1065ba": {
"message": "хуй",
"target_username": "meowreef"
},
"80e918eb-7926-4a67-aa68-6910f2097143": {
"message": "bebta",
"target_username": ""
},
"bda3d44d-2050-4eef-b46c-5524c26e7785": {
"message": "bebta",
"target_username": "sanspie"
},
"313479c8-d172-447f-9b12-0835665a9e93": {
"message": "bebta",
"target_username": "sanspie"
},
"ed06f945-a241-40dd-9643-bcd45471c908": {
"message": "соси",
"target_username": ""
},
"929d60c1-a1f1-48e6-bbbe-cf1d9e2e6c82": {
"message": "соси",
"target_username": "sanspie"
},
"d62da696-66a0-4945-9e42-bab33c013480": {
"message": "аааа хуй",
"target_username": ""
},
"3ead277f-54a4-4a3c-b3bf-b9f60ef666f5": {
"message": "аааа хуй",
"target_username": "a"
},
"1bb7cb25-8e04-489e-9b63-58712bca9017": {
"message": "аааа хуй",
"target_username": "any"
},
"b99799a7-672b-47f6-af53-3fe32416a83b": {
"message": "аааа хуй",
"target_username": "any"
},
"f00c433c-c522-48d5-9309-6fe7790c59f4": {
"message": "аааа хуй",
"target_username": "an"
},
"9df6f028-7acb-4f2b-a930-b46e277f234c": {
"message": "аааа хуй",
"target_username": ""
},
"b0936990-4011-4840-9329-aa051e40cf90": {
"message": "аааа хуй",
"target_username": "фт"
},
"6e711137-2f3f-4894-8ca8-3cc945554571": {
"message": "аааа хуй",
"target_username": ""
},
"7f7d7b2b-bf8b-49b3-b4e3-928364dc726b": {
"message": "аааа хуй",
"target_username": "anyuser9999"
}
}

View File

@ -0,0 +1 @@
python-telegram-bot==20.3

230
bots/music/main.py Normal file
View File

@ -0,0 +1,230 @@
import os
import json
import yaml
import asyncio
from typing import Optional, Dict, Any
from telethon import TelegramClient
from telethon.tl.types import (
MessageMediaDocument,
Document,
DocumentAttributeAudio,
PhotoSize,
)
from pydub import AudioSegment
from mutagen.mp3 import MP3
from mutagen.id3 import (
ID3,
TIT2, # Title
TPE1, # Artist
TALB, # Album
APIC, # Album Art
TDRC, # Recording time
TPE2, # Album Artist
TRCK, # Track number
)
import mimetypes
import aiofiles
import aiohttp
from pathlib import Path
class MusicDownloader:
def __init__(self, api_id: str, api_hash: str):
self.api_id = int(api_id)
self.api_hash = api_hash
self.client = TelegramClient('music_downloader', self.api_id, self.api_hash)
self.supported_audio_types = {
'audio/mpeg': '.mp3',
'audio/mp4': '.m4a',
'audio/ogg': '.ogg',
'audio/x-wav': '.wav',
'audio/x-flac': '.flac'
}
async def start(self):
await self.client.start()
async def extract_audio_metadata(self, document: Document) -> Dict[str, Any]:
"""Extract metadata from Telegram Document object."""
metadata = {
'title': None,
'performer': None,
'album': None,
'duration': None,
'track_num': None,
'thumbnail': None
}
for attr in document.attributes:
if isinstance(attr, DocumentAttributeAudio):
metadata['title'] = attr.title
metadata['performer'] = attr.performer
metadata['duration'] = attr.duration
# Extract thumbnail if available
if document.thumbs:
for thumb in document.thumbs:
if isinstance(thumb, PhotoSize):
metadata['thumbnail'] = thumb
break
return metadata
async def download_thumbnail(self, thumb: PhotoSize) -> Optional[bytes]:
"""Download thumbnail bytes."""
if not thumb:
return None
try:
return await self.client.download_media(thumb, bytes)
except Exception as e:
print(f"Error downloading thumbnail: {e}")
return None
async def convert_to_mp3(self, input_path: str, output_path: str):
"""Convert audio file to MP3 format."""
try:
audio = AudioSegment.from_file(input_path)
audio.export(output_path, format='mp3', bitrate='320k')
return True
except Exception as e:
print(f"Error converting to MP3: {e}")
return False
async def update_mp3_metadata(self, file_path: str, metadata: Dict[str, Any], thumb_data: Optional[bytes]):
"""Update MP3 file with metadata and album art."""
try:
audio = MP3(file_path)
if not audio.tags:
audio.tags = ID3()
# Add basic metadata
if metadata['title']:
audio.tags.add(TIT2(text=metadata['title']))
if metadata['performer']:
audio.tags.add(TPE1(text=metadata['performer']))
audio.tags.add(TPE2(text=metadata['performer']))
# Add album art if available
if thumb_data:
audio.tags.add(APIC(
encoding=3,
mime='image/jpeg',
type=3, # Cover (front)
desc='Cover',
data=thumb_data
))
audio.save()
return True
except Exception as e:
print(f"Error updating MP3 metadata: {e}")
return False
async def process_audio_message(self, message) -> bool:
"""Process a single audio message."""
if not message.media or not isinstance(message.media, MessageMediaDocument):
return False
document = message.media.document
mime_type = document.mime_type
if mime_type not in self.supported_audio_types:
return False
# Extract metadata
metadata = await self.extract_audio_metadata(document)
thumb_data = await self.download_thumbnail(metadata['thumbnail'])
# Create file paths
temp_path = f"temp_{message.id}{self.supported_audio_types[mime_type]}"
final_path = f"downloads/{metadata['performer'] if metadata['performer'] else 'Unknown Artist'}"
os.makedirs(final_path, exist_ok=True)
final_filename = f"{metadata['title'] if metadata['title'] else f'track_{message.id}'}.mp3"
final_path = os.path.join(final_path, final_filename)
# Download original file
try:
await self.client.download_media(message, temp_path)
except Exception as e:
print(f"Error downloading file: {e}")
return False
try:
# Convert to MP3 if needed
if mime_type != 'audio/mpeg':
if not await self.convert_to_mp3(temp_path, final_path):
os.remove(temp_path)
return False
else:
os.rename(temp_path, final_path)
# Update metadata
await self.update_mp3_metadata(final_path, metadata, thumb_data)
print(f"Successfully processed: {final_filename}")
return True
except Exception as e:
print(f"Error processing file: {e}")
if os.path.exists(temp_path):
os.remove(temp_path)
return False
async def process_channel(self, channel_id: str, limit: int = None):
"""Process all audio messages from a channel."""
try:
print(f"Processing channel: {channel_id}")
# Create downloads directory
os.makedirs("downloads", exist_ok=True)
# Iterate through messages
async for message in self.client.iter_messages(channel_id, limit=limit):
await self.process_audio_message(message)
except Exception as e:
print(f"Error processing channel: {e}")
async def close(self):
await self.client.disconnect()
async def main():
# Load config
if not os.path.isfile("poller.yaml"):
raise FileNotFoundError("Please create poller.yaml")
with open("poller.yaml", "r") as stream:
config = yaml.safe_load(stream)
api_id = os.getenv("api_id")
api_hash = os.getenv("api_hash")
if not api_id or not api_hash:
raise ValueError("Please set api_id and api_hash environment variables")
downloader = MusicDownloader(api_id, api_hash)
await downloader.start()
try:
# Process channels from config
if 'channels' in config:
if 'usernames' in config['channels']:
for username in config['channels']['usernames']:
username = username.replace('https://t.me/', '').replace('@', '')
await downloader.process_channel(username)
if 'ids' in config['channels']:
for channel_id in config['channels']['ids']:
await downloader.process_channel(channel_id)
finally:
await downloader.close()
if __name__ == "__main__":
# Install required packages
# pip install telethon pydub mutagen cryptg
asyncio.run(main())

3
bots/music/poller.yaml Normal file
View File

@ -0,0 +1,3 @@
channels:
usernames:
- astrophysiks