added control bot

This commit is contained in:
Alexander Karpov 2024-10-30 22:05:47 +03:00
parent 1c3188a289
commit 6bbd507018
2 changed files with 262 additions and 24 deletions

View File

@ -7,6 +7,8 @@ services:
- API_HOST=https://new.akarpov.ru
- WEB_HOST=https://next.akarpov.ru
- RTMP_URL=rtmp://your.rtmp.server/live/stream_key
- TELEGRAM_TOKEN=your_bot_token_here
- ADMIN_IDS=123,321 # Comma-separated list of Telegram user IDs
volumes:
- /tmp:/tmp
restart: unless-stopped

View File

@ -1,7 +1,12 @@
import asyncio
import os
import queue
import random
import signal
import subprocess
import threading
import aiohttp
import requests
import time
from PIL import Image, ImageDraw, ImageFont, UnidentifiedImageError
@ -10,6 +15,114 @@ from colorthief import ColorThief
from urllib.parse import urljoin
from qrcode.main import QRCode
from telegram import Update
from telegram.ext import (
CallbackContext,
MessageHandler,
filters,
CommandHandler,
Application,
)
class StreamBot:
def __init__(self, api_host):
self.api_host = api_host.rstrip("/")
self.queue = None
self.admin_ids = list(map(int, os.getenv("ADMIN_IDS", "").split(",")))
# Initialize bot
self.app = Application.builder().token(os.getenv("TELEGRAM_TOKEN")).build()
# Add handlers
self.app.add_handler(CommandHandler("next", self.next_song))
self.app.add_handler(CommandHandler("play", self.play_song))
self.app.add_handler(CommandHandler("start", self.start))
self.app.add_handler(CommandHandler("help", self.help))
self.app.add_handler(
MessageHandler(filters.TEXT & ~filters.COMMAND, self.search_song)
)
def is_admin(self, user_id: int) -> bool:
return user_id in self.admin_ids
async def start(self, update, context):
await update.message.reply_text(
"Welcome to the stream bot. Use /play <song_slug> to play a song."
" Use /help for more information."
" Use /next to skip to the next song."
)
async def help(self, update, context):
await update.message.reply_text(
"Use /play <song_slug> to play a song."
" Use /next to skip to the next song."
)
async def next_song(self, update, context):
if not self.is_admin(update.effective_user.id):
await update.message.reply_text(
"You're not authorized to control the stream."
)
return
if self.queue:
self.queue.put({"action": "next"})
await update.message.reply_text("Skipping to next song...")
async def play_song(self, update, context):
if not self.is_admin(update.effective_user.id):
await update.message.reply_text(
"You're not authorized to control the stream."
)
return
if not context.args:
await update.message.reply_text("Usage: /play <song_slug>")
return
slug = context.args[0]
if self.queue:
self.queue.put({"action": "play", "slug": slug})
await update.message.reply_text(f"Playing song with slug: {slug}")
async def search_song(self, update, context):
query = update.message.text
try:
response = requests.get(
f"{self.api_host}/api/v1/music/song/?search={query}"
)
response.raise_for_status()
data = response.json()
if not data.get("results"):
await update.message.reply_text("No songs found.")
return
songs = data["results"][:5] # Limit to 5 results
response_text = "Found songs:\n"
for song in songs:
authors = ", ".join(a["name"] for a in song["authors"])
response_text += (
f"\n{song['name']} by {authors}\nSlug: {song['slug']}\n"
)
response_text += "\nUse /play <slug> to play a song"
await update.message.reply_text(response_text)
except Exception as e:
await update.message.reply_text(f"Error searching for songs: {str(e)}")
def run(self):
self.app.run_polling(allowed_updates=True)
def run_stream_service(service):
"""Function to run stream service in a separate thread"""
try:
service.stream()
except Exception as e:
print(f"Stream service error: {e}")
class StreamService:
@ -21,6 +134,8 @@ class StreamService:
self.rtmp_url = rtmp_url
self.overlay_path = "/tmp/overlay.png"
self.session = requests.Session()
self.command_queue = queue.Queue()
self.current_process = None
# Automatically find a font that supports Unicode characters
self.font_path = self.find_font()
@ -267,6 +382,16 @@ class StreamService:
print(f"Processing '{song_info['name']}' with length {length} seconds")
if self.current_process:
print("Stopping previous song...")
self.current_process.terminate()
try:
self.current_process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.current_process.kill()
self.current_process.wait()
self.current_process = None
cmd = [
"ffmpeg",
"-re",
@ -300,44 +425,155 @@ class StreamService:
self.rtmp_url,
]
process = subprocess.Popen(cmd)
process.wait()
self.current_process = subprocess.Popen(cmd)
start_time = time.time()
print(f"Finished streaming {song_info['name']}")
while self.current_process.poll() is None:
# Check command queue every second
try:
cmd = self.command_queue.get_nowait()
if cmd["action"] in ["next", "play"]:
print("Received command during playback, stopping current song...")
self.current_process.terminate()
try:
self.current_process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.current_process.kill()
self.current_process.wait()
self.current_process = None
return False # Signal that we were interrupted
except queue.Empty:
pass
time.sleep(1)
elapsed = time.time() - start_time
if elapsed >= length:
break
if self.current_process:
self.current_process.terminate()
try:
self.current_process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.current_process.kill()
self.current_process.wait()
self.current_process = None
return True # Signal normal completion
def stream(self):
while True:
slugs = self.get_song_slugs()
if not slugs:
time.sleep(10)
continue
random.shuffle(slugs)
for slug in slugs:
try:
# Check for commands
try:
song_info = self.get_song_info(slug)
if not song_info:
cmd = self.command_queue.get_nowait()
if cmd["action"] == "next":
print("Skipping to next song...")
if self.current_process:
print("Stopping current song...")
self.current_process.terminate()
self.current_process.wait()
self.current_process = None
continue
elif cmd["action"] == "play":
song_info = self.get_song_info(cmd["slug"])
if song_info:
if song_info.get("meta", {}).get("genre", "").lower() in [
"rusrap"
]:
print(
f"Skipping {song_info['name']} because of genre 🤮"
)
continue
print(f"Playing requested song: {song_info['name']}")
self.create_overlay(song_info)
self.stream_song(song_info)
continue
except queue.Empty:
pass
print(f"Starting stream for {song_info['name']}")
self.create_overlay(song_info)
self.stream_song(song_info)
print(f"Finished streaming {song_info['name']}")
slugs = self.get_song_slugs()
if not slugs:
time.sleep(10)
continue
except Exception as e:
print(f"Error processing song {slug}: {e}")
time.sleep(5)
random.shuffle(slugs)
for slug in slugs:
try:
# Check for commands again
if not self.command_queue.empty():
break
song_info = self.get_song_info(slug)
if not song_info:
continue
if song_info.get("meta", {}).get("genre", "").lower() in [
"rusrap"
]:
print(f"Skipping {song_info['name']} because of genre 🤮")
continue
print(f"Starting stream for {song_info['name']}")
self.create_overlay(song_info)
if not self.stream_song(song_info): # If interrupted
break
print(f"Finished streaming {song_info['name']}")
except Exception as e:
print(f"Error processing song {slug}: {e}")
if self.current_process:
self.current_process.terminate()
self.current_process.wait()
self.current_process = None
time.sleep(5)
except Exception as e:
print(f"Stream error: {e}")
if self.current_process:
self.current_process.terminate()
self.current_process.wait()
self.current_process = None
time.sleep(5)
def cleanup(self):
if self.current_process:
print("Cleaning up stream process...")
self.current_process.terminate()
try:
self.current_process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.current_process.kill()
self.current_process.wait()
self.current_process = None
if __name__ == "__main__":
API_HOST = os.getenv("API_HOST", "https://new.akarpov.ru")
WEB_HOST = os.getenv("WEB_HOST", "https://next.akarpov.ru")
RTMP_URL = os.getenv("RTMP_URL", None)
RTMP_URL = os.getenv("RTMP_URL")
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
ADMIN_IDS = os.getenv("ADMIN_IDS")
if not RTMP_URL:
print("RTMP_URL environment variable is not set")
if not all([RTMP_URL, TELEGRAM_TOKEN, ADMIN_IDS]):
print("Missing required environment variables")
exit(1)
# Initialize services
service = StreamService(API_HOST, RTMP_URL, WEB_HOST)
service.stream()
bot = StreamBot(API_HOST)
bot.queue = service.command_queue
# Run stream service in separate thread
service_thread = threading.Thread(target=run_stream_service, args=(service,))
service_thread.daemon = True
service_thread.start()
# Run bot in main thread
try:
bot.run()
except KeyboardInterrupt:
print("\nStopping the bot...")
if service:
service.cleanup()