From 6bbd5070180f57f47302a4aac13b7bdbda6d5fad Mon Sep 17 00:00:00 2001 From: Alexander-D-Karpov Date: Wed, 30 Oct 2024 22:05:47 +0300 Subject: [PATCH] added control bot --- stream/docker-compose.yml | 2 + stream/main.py | 284 ++++++++++++++++++++++++++++++++++---- 2 files changed, 262 insertions(+), 24 deletions(-) diff --git a/stream/docker-compose.yml b/stream/docker-compose.yml index 6a18244..04fe995 100644 --- a/stream/docker-compose.yml +++ b/stream/docker-compose.yml @@ -7,6 +7,8 @@ services: - API_HOST=https://new.akarpov.ru - WEB_HOST=https://next.akarpov.ru - RTMP_URL=rtmp://your.rtmp.server/live/stream_key + - TELEGRAM_TOKEN=your_bot_token_here + - ADMIN_IDS=123,321 # Comma-separated list of Telegram user IDs volumes: - /tmp:/tmp restart: unless-stopped \ No newline at end of file diff --git a/stream/main.py b/stream/main.py index e095d19..51a4f85 100644 --- a/stream/main.py +++ b/stream/main.py @@ -1,7 +1,12 @@ +import asyncio import os +import queue import random +import signal import subprocess +import threading +import aiohttp import requests import time from PIL import Image, ImageDraw, ImageFont, UnidentifiedImageError @@ -10,6 +15,114 @@ from colorthief import ColorThief from urllib.parse import urljoin from qrcode.main import QRCode +from telegram import Update +from telegram.ext import ( + CallbackContext, + MessageHandler, + filters, + CommandHandler, + Application, +) + + +class StreamBot: + def __init__(self, api_host): + self.api_host = api_host.rstrip("/") + self.queue = None + self.admin_ids = list(map(int, os.getenv("ADMIN_IDS", "").split(","))) + + # Initialize bot + self.app = Application.builder().token(os.getenv("TELEGRAM_TOKEN")).build() + + # Add handlers + self.app.add_handler(CommandHandler("next", self.next_song)) + self.app.add_handler(CommandHandler("play", self.play_song)) + self.app.add_handler(CommandHandler("start", self.start)) + self.app.add_handler(CommandHandler("help", self.help)) + self.app.add_handler( + MessageHandler(filters.TEXT & ~filters.COMMAND, self.search_song) + ) + + def is_admin(self, user_id: int) -> bool: + return user_id in self.admin_ids + + async def start(self, update, context): + await update.message.reply_text( + "Welcome to the stream bot. Use /play to play a song." + " Use /help for more information." + " Use /next to skip to the next song." + ) + + async def help(self, update, context): + await update.message.reply_text( + "Use /play to play a song." + " Use /next to skip to the next song." + ) + + async def next_song(self, update, context): + if not self.is_admin(update.effective_user.id): + await update.message.reply_text( + "You're not authorized to control the stream." + ) + return + + if self.queue: + self.queue.put({"action": "next"}) + await update.message.reply_text("Skipping to next song...") + + async def play_song(self, update, context): + if not self.is_admin(update.effective_user.id): + await update.message.reply_text( + "You're not authorized to control the stream." + ) + return + + if not context.args: + await update.message.reply_text("Usage: /play ") + return + + slug = context.args[0] + if self.queue: + self.queue.put({"action": "play", "slug": slug}) + await update.message.reply_text(f"Playing song with slug: {slug}") + + async def search_song(self, update, context): + query = update.message.text + + try: + response = requests.get( + f"{self.api_host}/api/v1/music/song/?search={query}" + ) + response.raise_for_status() + data = response.json() + + if not data.get("results"): + await update.message.reply_text("No songs found.") + return + + songs = data["results"][:5] # Limit to 5 results + response_text = "Found songs:\n" + for song in songs: + authors = ", ".join(a["name"] for a in song["authors"]) + response_text += ( + f"\n{song['name']} by {authors}\nSlug: {song['slug']}\n" + ) + + response_text += "\nUse /play to play a song" + await update.message.reply_text(response_text) + except Exception as e: + await update.message.reply_text(f"Error searching for songs: {str(e)}") + + def run(self): + self.app.run_polling(allowed_updates=True) + + +def run_stream_service(service): + """Function to run stream service in a separate thread""" + try: + service.stream() + except Exception as e: + print(f"Stream service error: {e}") class StreamService: @@ -21,6 +134,8 @@ class StreamService: self.rtmp_url = rtmp_url self.overlay_path = "/tmp/overlay.png" self.session = requests.Session() + self.command_queue = queue.Queue() + self.current_process = None # Automatically find a font that supports Unicode characters self.font_path = self.find_font() @@ -267,6 +382,16 @@ class StreamService: print(f"Processing '{song_info['name']}' with length {length} seconds") + if self.current_process: + print("Stopping previous song...") + self.current_process.terminate() + try: + self.current_process.wait(timeout=5) + except subprocess.TimeoutExpired: + self.current_process.kill() + self.current_process.wait() + self.current_process = None + cmd = [ "ffmpeg", "-re", @@ -300,44 +425,155 @@ class StreamService: self.rtmp_url, ] - process = subprocess.Popen(cmd) - process.wait() + self.current_process = subprocess.Popen(cmd) + start_time = time.time() - print(f"Finished streaming {song_info['name']}") + while self.current_process.poll() is None: + # Check command queue every second + try: + cmd = self.command_queue.get_nowait() + if cmd["action"] in ["next", "play"]: + print("Received command during playback, stopping current song...") + self.current_process.terminate() + try: + self.current_process.wait(timeout=5) + except subprocess.TimeoutExpired: + self.current_process.kill() + self.current_process.wait() + self.current_process = None + return False # Signal that we were interrupted + except queue.Empty: + pass + + time.sleep(1) + elapsed = time.time() - start_time + if elapsed >= length: + break + + if self.current_process: + self.current_process.terminate() + try: + self.current_process.wait(timeout=5) + except subprocess.TimeoutExpired: + self.current_process.kill() + self.current_process.wait() + self.current_process = None + + return True # Signal normal completion def stream(self): while True: - slugs = self.get_song_slugs() - if not slugs: - time.sleep(10) - continue - - random.shuffle(slugs) - - for slug in slugs: + try: + # Check for commands try: - song_info = self.get_song_info(slug) - if not song_info: + cmd = self.command_queue.get_nowait() + if cmd["action"] == "next": + print("Skipping to next song...") + if self.current_process: + print("Stopping current song...") + self.current_process.terminate() + self.current_process.wait() + self.current_process = None continue + elif cmd["action"] == "play": + song_info = self.get_song_info(cmd["slug"]) + if song_info: + if song_info.get("meta", {}).get("genre", "").lower() in [ + "rusrap" + ]: + print( + f"Skipping {song_info['name']} because of genre 🤮" + ) + continue + print(f"Playing requested song: {song_info['name']}") + self.create_overlay(song_info) + self.stream_song(song_info) + continue + except queue.Empty: + pass - print(f"Starting stream for {song_info['name']}") - self.create_overlay(song_info) - self.stream_song(song_info) - print(f"Finished streaming {song_info['name']}") + slugs = self.get_song_slugs() + if not slugs: + time.sleep(10) + continue - except Exception as e: - print(f"Error processing song {slug}: {e}") - time.sleep(5) + random.shuffle(slugs) + + for slug in slugs: + try: + # Check for commands again + if not self.command_queue.empty(): + break + + song_info = self.get_song_info(slug) + if not song_info: + continue + + if song_info.get("meta", {}).get("genre", "").lower() in [ + "rusrap" + ]: + print(f"Skipping {song_info['name']} because of genre 🤮") + continue + + print(f"Starting stream for {song_info['name']}") + self.create_overlay(song_info) + if not self.stream_song(song_info): # If interrupted + break + print(f"Finished streaming {song_info['name']}") + + except Exception as e: + print(f"Error processing song {slug}: {e}") + if self.current_process: + self.current_process.terminate() + self.current_process.wait() + self.current_process = None + time.sleep(5) + + except Exception as e: + print(f"Stream error: {e}") + if self.current_process: + self.current_process.terminate() + self.current_process.wait() + self.current_process = None + time.sleep(5) + + def cleanup(self): + if self.current_process: + print("Cleaning up stream process...") + self.current_process.terminate() + try: + self.current_process.wait(timeout=5) + except subprocess.TimeoutExpired: + self.current_process.kill() + self.current_process.wait() + self.current_process = None if __name__ == "__main__": API_HOST = os.getenv("API_HOST", "https://new.akarpov.ru") WEB_HOST = os.getenv("WEB_HOST", "https://next.akarpov.ru") - RTMP_URL = os.getenv("RTMP_URL", None) + RTMP_URL = os.getenv("RTMP_URL") + TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN") + ADMIN_IDS = os.getenv("ADMIN_IDS") - if not RTMP_URL: - print("RTMP_URL environment variable is not set") + if not all([RTMP_URL, TELEGRAM_TOKEN, ADMIN_IDS]): + print("Missing required environment variables") exit(1) + # Initialize services service = StreamService(API_HOST, RTMP_URL, WEB_HOST) - service.stream() + bot = StreamBot(API_HOST) + bot.queue = service.command_queue + + # Run stream service in separate thread + service_thread = threading.Thread(target=run_stream_service, args=(service,)) + service_thread.daemon = True + service_thread.start() + + # Run bot in main thread + try: + bot.run() + except KeyboardInterrupt: + print("\nStopping the bot...") + if service: + service.cleanup()