diff --git a/bots/antiwhisper/Dockerfile b/bots/antiwhisper/Dockerfile new file mode 100644 index 0000000..176ff5f --- /dev/null +++ b/bots/antiwhisper/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.10-slim + +# Set work directory +WORKDIR /app + +# Install dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy bot code +COPY bot.py . + +# Set environment variables (you can also set them in docker-compose or externally) +# ENV TELEGRAM_BOT_TOKEN=YOUR_BOT_TOKEN + +CMD ["python", "bot.py"] \ No newline at end of file diff --git a/bots/antiwhisper/__init__.py b/bots/antiwhisper/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bots/antiwhisper/bot.py b/bots/antiwhisper/bot.py new file mode 100644 index 0000000..89e5598 --- /dev/null +++ b/bots/antiwhisper/bot.py @@ -0,0 +1,172 @@ +import os +import html +import json +import uuid +from typing import Dict + +from telegram import ( + InlineQueryResultArticle, + InputTextMessageContent, + InlineKeyboardButton, + InlineKeyboardMarkup, + Update +) +from telegram.ext import ( + ApplicationBuilder, + InlineQueryHandler, + CallbackQueryHandler, + CommandHandler, + ContextTypes, +) +from telegram.constants import ParseMode + +# File to store messages +MESSAGES_FILE = 'messages.json' + +# In-memory store to keep track of secret messages +# Format: {unique_id: {"message": str, "target_username": str}} +SECRET_MESSAGES: Dict[str, Dict[str, str]] = {} + +def load_messages(): + if os.path.exists(MESSAGES_FILE): + with open(MESSAGES_FILE, 'r', encoding='utf-8') as f: + data = json.load(f) + # Ensure keys are strings and values are dicts + if isinstance(data, dict): + return data + return {} + +def save_messages(): + with open(MESSAGES_FILE, 'w', encoding='utf-8') as f: + json.dump(SECRET_MESSAGES, f, ensure_ascii=False, indent=2) + +async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Simple /start command handler.""" + await update.message.reply_text("Hi! I'm a whisper bot. Use inline mode in group chats.") + +async def inline_query_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Handle inline queries. User should type something like: @YourBotName hello @username""" + query = update.inline_query.query.strip() + if not query: + return + + # Attempt to parse out the target username. + # We'll assume the last word that starts with '@' is the target. + words = query.split() + target_username = None + for w in reversed(words): + if w.startswith('@'): + target_username = w + break + + if target_username is None: + # If no target username found, just show a message that instructs how to use. + results = [ + InlineQueryResultArticle( + id=str(uuid.uuid4()), + title="How to whisper", + input_message_content=InputTextMessageContent( + "Please mention a user with '@username' at the end of your message." + ) + ) + ] + await update.inline_query.answer(results=results, cache_time=0) + return + + # Extract the secret message (everything except the target username) + if words[-1] == target_username: + message_parts = words[:-1] + else: + # find last occurrence of target_username and remove it + idx = len(words) - 1 - words[::-1].index(target_username) + message_parts = words[:idx] + words[idx+1:] + secret_message = " ".join(message_parts).strip() + + if not secret_message: + # If there's no secret message, prompt user. + results = [ + InlineQueryResultArticle( + id=str(uuid.uuid4()), + title="No message provided", + input_message_content=InputTextMessageContent( + "Please provide a message before the @username." + ) + ) + ] + await update.inline_query.answer(results=results, cache_time=0) + return + + # Create a unique ID to store the message + unique_id = str(uuid.uuid4()) + SECRET_MESSAGES[unique_id] = { + "message": secret_message, + "target_username": target_username.lower().strip('@') + } + + # Save to file + save_messages() + + # Display a "locked" message with a button + # The initial message visible to everyone: a "🔒 whisper message" + # The button will reveal the secret message to non-target users or show "соси" to the target user. + keyboard = InlineKeyboardMarkup([ + [InlineKeyboardButton("Reveal", callback_data=unique_id)] + ]) + + results = [ + InlineQueryResultArticle( + id=unique_id, + title="Whisper", + description="Send a private whisper message", + input_message_content=InputTextMessageContent( + f"🔒 A whisper message to everyone except @{target_username.strip('@')}." + ), + reply_markup=keyboard + ) + ] + + await update.inline_query.answer(results=results, cache_time=0) + + +async def callback_query_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Handle button presses.""" + query = update.callback_query + user = query.from_user + data = query.data + + if data not in SECRET_MESSAGES: + await query.answer("Message not found.", show_alert=True) + return + + secret_info = SECRET_MESSAGES[data] + secret_message = secret_info["message"] + target_username = secret_info["target_username"] + + if user.username and user.username.lower() == target_username: + # Target user sees a private popup "соси" just for them + await query.answer("соси", show_alert=False) + else: + # Non-target users see the secret message publicly (edit the chat message) + await query.answer(secret_message, show_alert=True) + + +def main(): + token = os.environ.get("TELEGRAM_BOT_TOKEN") + if not token: + raise RuntimeError("TELEGRAM_BOT_TOKEN environment variable not set.") + + # Load previously stored messages + global SECRET_MESSAGES + SECRET_MESSAGES = load_messages() + + application = ApplicationBuilder().token(token).build() + + application.add_handler(CommandHandler("start", start)) + application.add_handler(InlineQueryHandler(inline_query_handler)) + application.add_handler(CallbackQueryHandler(callback_query_handler)) + + application.run_polling() + + +if __name__ == "__main__": + main() diff --git a/bots/antiwhisper/docker-compose.yml b/bots/antiwhisper/docker-compose.yml new file mode 100644 index 0000000..14688ae --- /dev/null +++ b/bots/antiwhisper/docker-compose.yml @@ -0,0 +1,8 @@ +version: '3' +services: + telegram-bot: + build: . + container_name: telegram_bot + restart: always + env_file: + - .env \ No newline at end of file diff --git a/bots/antiwhisper/messages.json b/bots/antiwhisper/messages.json new file mode 100644 index 0000000..ec56953 --- /dev/null +++ b/bots/antiwhisper/messages.json @@ -0,0 +1,78 @@ +{ + "cef6279e-0edf-41b2-8e00-cd75cfdab5d5": { + "message": "bebra", + "target_username": "sanspie" + }, + "81821a7d-57b0-41b4-933d-3e848e7ae487": { + "message": "bebra", + "target_username": "" + }, + "6e489fc1-ee27-4df4-93eb-9c1aae9231d1": { + "message": "bebra", + "target_username": "sanspie" + }, + "cf76ebcb-3ba7-4c23-8397-a2f815bdc670": { + "message": "хуй", + "target_username": "" + }, + "4e2f8eb5-c3cb-414b-9feb-c7fc7d1065ba": { + "message": "хуй", + "target_username": "meowreef" + }, + "80e918eb-7926-4a67-aa68-6910f2097143": { + "message": "bebta", + "target_username": "" + }, + "bda3d44d-2050-4eef-b46c-5524c26e7785": { + "message": "bebta", + "target_username": "sanspie" + }, + "313479c8-d172-447f-9b12-0835665a9e93": { + "message": "bebta", + "target_username": "sanspie" + }, + "ed06f945-a241-40dd-9643-bcd45471c908": { + "message": "соси", + "target_username": "" + }, + "929d60c1-a1f1-48e6-bbbe-cf1d9e2e6c82": { + "message": "соси", + "target_username": "sanspie" + }, + "d62da696-66a0-4945-9e42-bab33c013480": { + "message": "аааа хуй", + "target_username": "" + }, + "3ead277f-54a4-4a3c-b3bf-b9f60ef666f5": { + "message": "аааа хуй", + "target_username": "a" + }, + "1bb7cb25-8e04-489e-9b63-58712bca9017": { + "message": "аааа хуй", + "target_username": "any" + }, + "b99799a7-672b-47f6-af53-3fe32416a83b": { + "message": "аааа хуй", + "target_username": "any" + }, + "f00c433c-c522-48d5-9309-6fe7790c59f4": { + "message": "аааа хуй", + "target_username": "an" + }, + "9df6f028-7acb-4f2b-a930-b46e277f234c": { + "message": "аааа хуй", + "target_username": "" + }, + "b0936990-4011-4840-9329-aa051e40cf90": { + "message": "аааа хуй", + "target_username": "фт" + }, + "6e711137-2f3f-4894-8ca8-3cc945554571": { + "message": "аааа хуй", + "target_username": "" + }, + "7f7d7b2b-bf8b-49b3-b4e3-928364dc726b": { + "message": "аааа хуй", + "target_username": "anyuser9999" + } +} \ No newline at end of file diff --git a/bots/antiwhisper/requirements.txt b/bots/antiwhisper/requirements.txt new file mode 100644 index 0000000..a72fb1d --- /dev/null +++ b/bots/antiwhisper/requirements.txt @@ -0,0 +1 @@ +python-telegram-bot==20.3 \ No newline at end of file diff --git a/bots/music/main.py b/bots/music/main.py new file mode 100644 index 0000000..86acfa1 --- /dev/null +++ b/bots/music/main.py @@ -0,0 +1,230 @@ +import os +import json +import yaml +import asyncio +from typing import Optional, Dict, Any +from telethon import TelegramClient +from telethon.tl.types import ( + MessageMediaDocument, + Document, + DocumentAttributeAudio, + PhotoSize, +) +from pydub import AudioSegment +from mutagen.mp3 import MP3 +from mutagen.id3 import ( + ID3, + TIT2, # Title + TPE1, # Artist + TALB, # Album + APIC, # Album Art + TDRC, # Recording time + TPE2, # Album Artist + TRCK, # Track number +) +import mimetypes +import aiofiles +import aiohttp +from pathlib import Path + + +class MusicDownloader: + def __init__(self, api_id: str, api_hash: str): + self.api_id = int(api_id) + self.api_hash = api_hash + self.client = TelegramClient('music_downloader', self.api_id, self.api_hash) + self.supported_audio_types = { + 'audio/mpeg': '.mp3', + 'audio/mp4': '.m4a', + 'audio/ogg': '.ogg', + 'audio/x-wav': '.wav', + 'audio/x-flac': '.flac' + } + + async def start(self): + await self.client.start() + + async def extract_audio_metadata(self, document: Document) -> Dict[str, Any]: + """Extract metadata from Telegram Document object.""" + metadata = { + 'title': None, + 'performer': None, + 'album': None, + 'duration': None, + 'track_num': None, + 'thumbnail': None + } + + for attr in document.attributes: + if isinstance(attr, DocumentAttributeAudio): + metadata['title'] = attr.title + metadata['performer'] = attr.performer + metadata['duration'] = attr.duration + + # Extract thumbnail if available + if document.thumbs: + for thumb in document.thumbs: + if isinstance(thumb, PhotoSize): + metadata['thumbnail'] = thumb + break + + return metadata + + async def download_thumbnail(self, thumb: PhotoSize) -> Optional[bytes]: + """Download thumbnail bytes.""" + if not thumb: + return None + + try: + return await self.client.download_media(thumb, bytes) + except Exception as e: + print(f"Error downloading thumbnail: {e}") + return None + + async def convert_to_mp3(self, input_path: str, output_path: str): + """Convert audio file to MP3 format.""" + try: + audio = AudioSegment.from_file(input_path) + audio.export(output_path, format='mp3', bitrate='320k') + return True + except Exception as e: + print(f"Error converting to MP3: {e}") + return False + + async def update_mp3_metadata(self, file_path: str, metadata: Dict[str, Any], thumb_data: Optional[bytes]): + """Update MP3 file with metadata and album art.""" + try: + audio = MP3(file_path) + if not audio.tags: + audio.tags = ID3() + + # Add basic metadata + if metadata['title']: + audio.tags.add(TIT2(text=metadata['title'])) + if metadata['performer']: + audio.tags.add(TPE1(text=metadata['performer'])) + audio.tags.add(TPE2(text=metadata['performer'])) + + # Add album art if available + if thumb_data: + audio.tags.add(APIC( + encoding=3, + mime='image/jpeg', + type=3, # Cover (front) + desc='Cover', + data=thumb_data + )) + + audio.save() + return True + except Exception as e: + print(f"Error updating MP3 metadata: {e}") + return False + + async def process_audio_message(self, message) -> bool: + """Process a single audio message.""" + if not message.media or not isinstance(message.media, MessageMediaDocument): + return False + + document = message.media.document + mime_type = document.mime_type + + if mime_type not in self.supported_audio_types: + return False + + # Extract metadata + metadata = await self.extract_audio_metadata(document) + thumb_data = await self.download_thumbnail(metadata['thumbnail']) + + # Create file paths + temp_path = f"temp_{message.id}{self.supported_audio_types[mime_type]}" + final_path = f"downloads/{metadata['performer'] if metadata['performer'] else 'Unknown Artist'}" + os.makedirs(final_path, exist_ok=True) + + final_filename = f"{metadata['title'] if metadata['title'] else f'track_{message.id}'}.mp3" + final_path = os.path.join(final_path, final_filename) + + # Download original file + try: + await self.client.download_media(message, temp_path) + except Exception as e: + print(f"Error downloading file: {e}") + return False + + try: + # Convert to MP3 if needed + if mime_type != 'audio/mpeg': + if not await self.convert_to_mp3(temp_path, final_path): + os.remove(temp_path) + return False + else: + os.rename(temp_path, final_path) + + # Update metadata + await self.update_mp3_metadata(final_path, metadata, thumb_data) + + print(f"Successfully processed: {final_filename}") + return True + + except Exception as e: + print(f"Error processing file: {e}") + if os.path.exists(temp_path): + os.remove(temp_path) + return False + + async def process_channel(self, channel_id: str, limit: int = None): + """Process all audio messages from a channel.""" + try: + print(f"Processing channel: {channel_id}") + + # Create downloads directory + os.makedirs("downloads", exist_ok=True) + + # Iterate through messages + async for message in self.client.iter_messages(channel_id, limit=limit): + await self.process_audio_message(message) + + except Exception as e: + print(f"Error processing channel: {e}") + + async def close(self): + await self.client.disconnect() + + +async def main(): + # Load config + if not os.path.isfile("poller.yaml"): + raise FileNotFoundError("Please create poller.yaml") + + with open("poller.yaml", "r") as stream: + config = yaml.safe_load(stream) + + api_id = os.getenv("api_id") + api_hash = os.getenv("api_hash") + + if not api_id or not api_hash: + raise ValueError("Please set api_id and api_hash environment variables") + + downloader = MusicDownloader(api_id, api_hash) + await downloader.start() + + try: + # Process channels from config + if 'channels' in config: + if 'usernames' in config['channels']: + for username in config['channels']['usernames']: + username = username.replace('https://t.me/', '').replace('@', '') + await downloader.process_channel(username) + + if 'ids' in config['channels']: + for channel_id in config['channels']['ids']: + await downloader.process_channel(channel_id) + + finally: + await downloader.close() + + +if __name__ == "__main__": + # Install required packages + # pip install telethon pydub mutagen cryptg + asyncio.run(main()) \ No newline at end of file diff --git a/bots/music/poller.yaml b/bots/music/poller.yaml new file mode 100644 index 0000000..90d739c --- /dev/null +++ b/bots/music/poller.yaml @@ -0,0 +1,3 @@ +channels: + usernames: + - astrophysiks \ No newline at end of file