added multiple rtmp support

This commit is contained in:
Alexander Karpov 2024-11-01 00:43:36 +03:00
parent 4f2323c364
commit a4543a9d2c
2 changed files with 155 additions and 88 deletions

View File

@ -6,7 +6,7 @@ services:
environment:
- API_HOST=https://new.akarpov.ru
- WEB_HOST=https://next.akarpov.ru
- RTMP_URL=rtmp://your.rtmp.server/live/stream_key
- RTMP_URLS=rtmp://your.rtmp.server/live/stream_key
- TELEGRAM_TOKEN=your_bot_token_here
- ADMIN_IDS=123,321 # Comma-separated list of Telegram user IDs
volumes:

View File

@ -1,10 +1,12 @@
import asyncio
import atexit
import os
import queue
import random
import signal
import subprocess
import threading
from typing import Optional
import aiohttp
import requests
@ -126,16 +128,17 @@ def run_stream_service(service):
class StreamService:
def __init__(self, api_host, rtmp_url, web_host):
def __init__(self, api_host, rtmp_urls, web_host):
self.qr_color = None
self.bg_color = None
self.api_host = api_host.rstrip("/")
self.web_host = web_host.rstrip("/")
self.rtmp_url = rtmp_url
self.rtmp_urls = rtmp_urls
self.overlay_path = "/tmp/overlay.png"
self.session = requests.Session()
self.command_queue = queue.Queue()
self.current_process = None
self.current_processes = [] # List of current FFmpeg processes
self.play_random_songs = True # Flag to control random song playback
# Automatically find a font that supports Unicode characters
self.font_path = self.find_font()
@ -371,7 +374,10 @@ class StreamService:
except Exception as e:
print(f"QR code generation failed: {e}")
img.save(self.overlay_path, "PNG")
# Ensure the image is in RGBA mode before saving as PNG
if img.mode != "RGBA":
img = img.convert("RGBA")
img.save(self.overlay_path, format="PNG")
def stream_song(self, song_info):
song_file = self.get_full_url(song_info["file"])
@ -382,16 +388,22 @@ class StreamService:
print(f"Processing '{song_info['name']}' with length {length} seconds")
if self.current_process:
# Stop any previous FFmpeg processes
if self.current_processes:
print("Stopping previous song...")
self.current_process.terminate()
for proc in self.current_processes:
proc.terminate()
for proc in self.current_processes:
try:
self.current_process.wait(timeout=5)
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
self.current_process.kill()
self.current_process.wait()
self.current_process = None
proc.kill()
proc.wait()
self.current_processes = []
# Start FFmpeg processes for each RTMP URL
self.current_processes = []
for rtmp_url in self.rtmp_urls:
cmd = [
"ffmpeg",
"-re",
@ -422,25 +434,29 @@ class StreamService:
"-y",
"-f",
"flv",
self.rtmp_url,
rtmp_url,
]
print(f"Starting FFmpeg process for {rtmp_url}")
proc = subprocess.Popen(cmd)
self.current_processes.append(proc)
self.current_process = subprocess.Popen(cmd)
start_time = time.time()
while self.current_process.poll() is None:
while any(proc.poll() is None for proc in self.current_processes):
# Check command queue every second
try:
cmd = self.command_queue.get_nowait()
if cmd["action"] in ["next", "play"]:
print("Received command during playback, stopping current song...")
self.current_process.terminate()
for proc in self.current_processes:
proc.terminate()
for proc in self.current_processes:
try:
self.current_process.wait(timeout=5)
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
self.current_process.kill()
self.current_process.wait()
self.current_process = None
proc.kill()
proc.wait()
self.current_processes = []
return False # Signal that we were interrupted
except queue.Empty:
pass
@ -450,14 +466,15 @@ class StreamService:
if elapsed >= length:
break
if self.current_process:
self.current_process.terminate()
# Terminate processes after the song is done
for proc in self.current_processes:
proc.terminate()
try:
self.current_process.wait(timeout=5)
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
self.current_process.kill()
self.current_process.wait()
self.current_process = None
proc.kill()
proc.wait()
self.current_processes = []
return True # Signal normal completion
@ -469,11 +486,13 @@ class StreamService:
cmd = self.command_queue.get_nowait()
if cmd["action"] == "next":
print("Skipping to next song...")
if self.current_process:
if self.current_processes:
print("Stopping current song...")
self.current_process.terminate()
self.current_process.wait()
self.current_process = None
for proc in self.current_processes:
proc.terminate()
for proc in self.current_processes:
proc.wait()
self.current_processes = []
continue
elif cmd["action"] == "play":
song_info = self.get_song_info(cmd["slug"])
@ -486,12 +505,18 @@ class StreamService:
)
continue
print(f"Playing requested song: {song_info['name']}")
self.play_random_songs = False # Disable random songs
self.create_overlay(song_info)
self.stream_song(song_info)
continue
self.play_random_songs = True # Re-enable random songs
continue # Go back to check for commands
except queue.Empty:
pass
if not self.play_random_songs:
time.sleep(5)
continue
slugs = self.get_song_slugs()
if not slugs:
time.sleep(10)
@ -523,45 +548,84 @@ class StreamService:
except Exception as e:
print(f"Error processing song {slug}: {e}")
if self.current_process:
self.current_process.terminate()
self.current_process.wait()
self.current_process = None
if self.current_processes:
for proc in self.current_processes:
proc.terminate()
proc.wait()
self.current_processes = []
time.sleep(5)
except Exception as e:
print(f"Stream error: {e}")
if self.current_process:
self.current_process.terminate()
self.current_process.wait()
self.current_process = None
if self.current_processes:
for proc in self.current_processes:
proc.terminate()
proc.wait()
self.current_processes = []
time.sleep(5)
def cleanup(self):
if self.current_process:
print("Cleaning up stream process...")
self.current_process.terminate()
if self.current_processes:
print("Cleaning up stream processes...")
for proc in self.current_processes:
try:
self.current_process.wait(timeout=5)
proc.terminate()
proc.wait(timeout=3)
except subprocess.TimeoutExpired:
self.current_process.kill()
self.current_process.wait()
self.current_process = None
print(f"Force killing process {proc.pid}")
proc.kill()
proc.wait()
except Exception as e:
print(f"Error killing process: {e}")
self.current_processes = []
service: Optional[StreamService] = None
bot: Optional[StreamBot] = None
should_exit = threading.Event()
def cleanup_handler():
"""Cleanup handler for both normal exit and signals"""
print("\nCleaning up...")
should_exit.set()
if service:
service.cleanup()
# Give processes time to cleanup
time.sleep(2)
def signal_handler(signum, frame):
"""Handle SIGINT and SIGTERM"""
if signum in (signal.SIGINT, signal.SIGTERM):
print(f"\nReceived signal {signum}, initiating shutdown...")
cleanup_handler()
os._exit(0) # Force exit if normal exit fails
if __name__ == "__main__":
API_HOST = os.getenv("API_HOST", "https://new.akarpov.ru")
WEB_HOST = os.getenv("WEB_HOST", "https://next.akarpov.ru")
RTMP_URL = os.getenv("RTMP_URL")
RTMP_URLS = os.getenv("RTMP_URLS")
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
ADMIN_IDS = os.getenv("ADMIN_IDS")
if not all([RTMP_URL, TELEGRAM_TOKEN, ADMIN_IDS]):
if not all([RTMP_URLS, TELEGRAM_TOKEN, ADMIN_IDS]):
print("Missing required environment variables")
exit(1)
rtmp_urls = [url.strip() for url in RTMP_URLS.split(",") if url.strip()]
if not rtmp_urls:
print("No valid RTMP URLs provided")
exit(1)
# Register cleanup handlers
atexit.register(cleanup_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Initialize services
service = StreamService(API_HOST, RTMP_URL, WEB_HOST)
service = StreamService(API_HOST, rtmp_urls, WEB_HOST)
bot = StreamBot(API_HOST)
bot.queue = service.command_queue
@ -574,6 +638,9 @@ if __name__ == "__main__":
try:
bot.run()
except KeyboardInterrupt:
print("\nStopping the bot...")
if service:
service.cleanup()
cleanup_handler()
except Exception as e:
print(f"Unexpected error: {e}")
cleanup_handler()
finally:
cleanup_handler()