diff --git a/stream/docker-compose.yml b/stream/docker-compose.yml index 04fe995..c379477 100644 --- a/stream/docker-compose.yml +++ b/stream/docker-compose.yml @@ -6,7 +6,7 @@ services: environment: - API_HOST=https://new.akarpov.ru - WEB_HOST=https://next.akarpov.ru - - RTMP_URL=rtmp://your.rtmp.server/live/stream_key + - RTMP_URLS=rtmp://your.rtmp.server/live/stream_key - TELEGRAM_TOKEN=your_bot_token_here - ADMIN_IDS=123,321 # Comma-separated list of Telegram user IDs volumes: diff --git a/stream/main.py b/stream/main.py index 51a4f85..4e70a95 100644 --- a/stream/main.py +++ b/stream/main.py @@ -1,10 +1,12 @@ import asyncio +import atexit import os import queue import random import signal import subprocess import threading +from typing import Optional import aiohttp import requests @@ -126,16 +128,17 @@ def run_stream_service(service): class StreamService: - def __init__(self, api_host, rtmp_url, web_host): + def __init__(self, api_host, rtmp_urls, web_host): self.qr_color = None self.bg_color = None self.api_host = api_host.rstrip("/") self.web_host = web_host.rstrip("/") - self.rtmp_url = rtmp_url + self.rtmp_urls = rtmp_urls self.overlay_path = "/tmp/overlay.png" self.session = requests.Session() self.command_queue = queue.Queue() - self.current_process = None + self.current_processes = [] # List of current FFmpeg processes + self.play_random_songs = True # Flag to control random song playback # Automatically find a font that supports Unicode characters self.font_path = self.find_font() @@ -371,7 +374,10 @@ class StreamService: except Exception as e: print(f"QR code generation failed: {e}") - img.save(self.overlay_path, "PNG") + # Ensure the image is in RGBA mode before saving as PNG + if img.mode != "RGBA": + img = img.convert("RGBA") + img.save(self.overlay_path, format="PNG") def stream_song(self, song_info): song_file = self.get_full_url(song_info["file"]) @@ -382,65 +388,75 @@ class StreamService: print(f"Processing '{song_info['name']}' with length {length} seconds") - if self.current_process: + # Stop any previous FFmpeg processes + if self.current_processes: print("Stopping previous song...") - self.current_process.terminate() - try: - self.current_process.wait(timeout=5) - except subprocess.TimeoutExpired: - self.current_process.kill() - self.current_process.wait() - self.current_process = None + for proc in self.current_processes: + proc.terminate() + for proc in self.current_processes: + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + self.current_processes = [] - cmd = [ - "ffmpeg", - "-re", - "-i", - song_file, - "-stream_loop", - "-1", - "-i", - self.overlay_path, - "-c:a", - "aac", - "-c:v", - "libx264", - "-filter:a", - "volume=0.5", - "-preset", - "veryfast", - "-b:a", - "192k", - "-ar", - "44100", - "-r", - "30", - "-pix_fmt", - "yuv420p", - "-t", - str(length), - "-y", - "-f", - "flv", - self.rtmp_url, - ] + # Start FFmpeg processes for each RTMP URL + self.current_processes = [] + for rtmp_url in self.rtmp_urls: + cmd = [ + "ffmpeg", + "-re", + "-i", + song_file, + "-stream_loop", + "-1", + "-i", + self.overlay_path, + "-c:a", + "aac", + "-c:v", + "libx264", + "-filter:a", + "volume=0.5", + "-preset", + "veryfast", + "-b:a", + "192k", + "-ar", + "44100", + "-r", + "30", + "-pix_fmt", + "yuv420p", + "-t", + str(length), + "-y", + "-f", + "flv", + rtmp_url, + ] + print(f"Starting FFmpeg process for {rtmp_url}") + proc = subprocess.Popen(cmd) + self.current_processes.append(proc) - self.current_process = subprocess.Popen(cmd) start_time = time.time() - while self.current_process.poll() is None: + while any(proc.poll() is None for proc in self.current_processes): # Check command queue every second try: cmd = self.command_queue.get_nowait() if cmd["action"] in ["next", "play"]: print("Received command during playback, stopping current song...") - self.current_process.terminate() - try: - self.current_process.wait(timeout=5) - except subprocess.TimeoutExpired: - self.current_process.kill() - self.current_process.wait() - self.current_process = None + for proc in self.current_processes: + proc.terminate() + for proc in self.current_processes: + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + self.current_processes = [] return False # Signal that we were interrupted except queue.Empty: pass @@ -450,14 +466,15 @@ class StreamService: if elapsed >= length: break - if self.current_process: - self.current_process.terminate() + # Terminate processes after the song is done + for proc in self.current_processes: + proc.terminate() try: - self.current_process.wait(timeout=5) + proc.wait(timeout=5) except subprocess.TimeoutExpired: - self.current_process.kill() - self.current_process.wait() - self.current_process = None + proc.kill() + proc.wait() + self.current_processes = [] return True # Signal normal completion @@ -469,11 +486,13 @@ class StreamService: cmd = self.command_queue.get_nowait() if cmd["action"] == "next": print("Skipping to next song...") - if self.current_process: + if self.current_processes: print("Stopping current song...") - self.current_process.terminate() - self.current_process.wait() - self.current_process = None + for proc in self.current_processes: + proc.terminate() + for proc in self.current_processes: + proc.wait() + self.current_processes = [] continue elif cmd["action"] == "play": song_info = self.get_song_info(cmd["slug"]) @@ -486,12 +505,18 @@ class StreamService: ) continue print(f"Playing requested song: {song_info['name']}") + self.play_random_songs = False # Disable random songs self.create_overlay(song_info) self.stream_song(song_info) - continue + self.play_random_songs = True # Re-enable random songs + continue # Go back to check for commands except queue.Empty: pass + if not self.play_random_songs: + time.sleep(5) + continue + slugs = self.get_song_slugs() if not slugs: time.sleep(10) @@ -523,45 +548,84 @@ class StreamService: except Exception as e: print(f"Error processing song {slug}: {e}") - if self.current_process: - self.current_process.terminate() - self.current_process.wait() - self.current_process = None + if self.current_processes: + for proc in self.current_processes: + proc.terminate() + proc.wait() + self.current_processes = [] time.sleep(5) except Exception as e: print(f"Stream error: {e}") - if self.current_process: - self.current_process.terminate() - self.current_process.wait() - self.current_process = None + if self.current_processes: + for proc in self.current_processes: + proc.terminate() + proc.wait() + self.current_processes = [] time.sleep(5) def cleanup(self): - if self.current_process: - print("Cleaning up stream process...") - self.current_process.terminate() - try: - self.current_process.wait(timeout=5) - except subprocess.TimeoutExpired: - self.current_process.kill() - self.current_process.wait() - self.current_process = None + if self.current_processes: + print("Cleaning up stream processes...") + for proc in self.current_processes: + try: + proc.terminate() + proc.wait(timeout=3) + except subprocess.TimeoutExpired: + print(f"Force killing process {proc.pid}") + proc.kill() + proc.wait() + except Exception as e: + print(f"Error killing process: {e}") + self.current_processes = [] + + +service: Optional[StreamService] = None +bot: Optional[StreamBot] = None +should_exit = threading.Event() + + +def cleanup_handler(): + """Cleanup handler for both normal exit and signals""" + print("\nCleaning up...") + should_exit.set() + if service: + service.cleanup() + # Give processes time to cleanup + time.sleep(2) + + +def signal_handler(signum, frame): + """Handle SIGINT and SIGTERM""" + if signum in (signal.SIGINT, signal.SIGTERM): + print(f"\nReceived signal {signum}, initiating shutdown...") + cleanup_handler() + os._exit(0) # Force exit if normal exit fails if __name__ == "__main__": API_HOST = os.getenv("API_HOST", "https://new.akarpov.ru") WEB_HOST = os.getenv("WEB_HOST", "https://next.akarpov.ru") - RTMP_URL = os.getenv("RTMP_URL") + RTMP_URLS = os.getenv("RTMP_URLS") TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN") ADMIN_IDS = os.getenv("ADMIN_IDS") - if not all([RTMP_URL, TELEGRAM_TOKEN, ADMIN_IDS]): + if not all([RTMP_URLS, TELEGRAM_TOKEN, ADMIN_IDS]): print("Missing required environment variables") exit(1) + rtmp_urls = [url.strip() for url in RTMP_URLS.split(",") if url.strip()] + if not rtmp_urls: + print("No valid RTMP URLs provided") + exit(1) + + # Register cleanup handlers + atexit.register(cleanup_handler) + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + # Initialize services - service = StreamService(API_HOST, RTMP_URL, WEB_HOST) + service = StreamService(API_HOST, rtmp_urls, WEB_HOST) bot = StreamBot(API_HOST) bot.queue = service.command_queue @@ -574,6 +638,9 @@ if __name__ == "__main__": try: bot.run() except KeyboardInterrupt: - print("\nStopping the bot...") - if service: - service.cleanup() + cleanup_handler() + except Exception as e: + print(f"Unexpected error: {e}") + cleanup_handler() + finally: + cleanup_handler()