added castbox script, moved yandex

This commit is contained in:
Alexander Karpov 2023-06-13 11:08:34 +03:00
parent dd738006a5
commit 9800d8d345
6 changed files with 188 additions and 107 deletions

View File

@ -0,0 +1,84 @@
import requests
import json
import os
from urllib.parse import unquote
from pydub import AudioSegment
from mutagen.easyid3 import EasyID3
from mutagen.mp3 import MP3
from mutagen.id3 import APIC, ID3
url = input("https://castbox.fm/channel/: ")
if not url.startswith("https://castbox.fm/channel/"):
url = "https://castbox.fm/channel/" + url
def download_file(file_url):
local_filename = file_url.split("/")[-1]
with requests.get(file_url, stream=True) as r:
r.raise_for_status()
with open(local_filename, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
return local_filename
r = requests.get(url)
if r.status_code != 200:
raise LookupError("Site not found")
inner_data = r.text.splitlines()
data = []
for line in inner_data:
if "window.__INITIAL_STATE__" in line:
data.append(line)
if len(data) != 1:
raise ValueError("Payload not found")
d = json.loads(unquote(data[0].split('"')[1::2][0])) # type: dict
title = d["ch"]["chInfo"]["title"]
main_image = d["ch"]["chInfo"]["cover_web"]
author = d["ch"]["chInfo"]["author"]
print("Downloading podcast " + title)
episodes = d["ch"]["eps"]
if not os.path.isdir(title):
os.mkdir(title)
for i, episode in enumerate(episodes):
print(f"Downloading: {episode['title']}", end="\r")
if "url" in episode and episode["url"]:
ep_url = episode["url"]
else:
ep_url = episode["urls"][0]
orig_path = download_file(ep_url)
n_path = title + "/" + f"{title}.mp3"
AudioSegment.from_file(orig_path).export(n_path)
os.remove(orig_path)
if "cover_url" not in episode or not episode["cover_url"]:
img_path = download_file(main_image)
else:
img_path = download_file(episode["cover_url"])
if "author" in episode and episode["author"]:
ep_author = episode["author"]
else:
ep_author = author
tag = MP3(n_path, ID3=ID3)
tag.tags.add(
APIC(
encoding=3,
mime="image/png",
type=3,
desc="Cover",
data=open(img_path, "rb").read(),
)
)
tag.save()
tag = EasyID3(n_path)
tag["title"] = episode["title"]
tag["album"] = title
tag["artist"] = ep_author
tag.save()
os.remove(img_path)

View File

@ -1,107 +0,0 @@
import asyncio
import os
import daemon
from io import BytesIO
from time import sleep
from aiogram import Bot
from aiogram.bot.api import TelegramAPIServer
from mutagen.easyid3 import EasyID3
from mutagen.mp3 import MP3
from mutagen.id3 import APIC, ID3, TORY
from pydub import AudioSegment
from yandex_music import Client, Track
from dotenv import load_dotenv
load_dotenv(dotenv_path=".env")
YANDEX_TOKEN = os.getenv("YANDEX_TOKEN")
CHAT_ID = os.getenv("CHAT_ID")
TOKEN = os.getenv("BOT_TOKEN")
TELEGRAM_SERVER = os.getenv("TELEGRAM_SERVER", default=None)
if TELEGRAM_SERVER:
local_server = TelegramAPIServer.from_base(TELEGRAM_SERVER)
bot = Bot(TOKEN, server=local_server)
else:
bot = Bot(TOKEN)
client = Client(YANDEX_TOKEN).init()
latest_podcast = None
latest_sent = True
podcasts_listened = []
with daemon.DaemonContext():
while True:
try:
queues = client.queues_list()
last_queue = client.queue(queues[0].id)
last_track_id = last_queue.get_current_track()
last_track: Track = last_track_id.fetch_track()
if "podcast" in last_track.type:
if last_track_id not in podcasts_listened:
if last_track_id == latest_podcast and not latest_sent:
latest_sent = True
podcasts_listened.append(last_track_id)
title = last_track.title
album = last_track.albums[0]
url = f"https://music.yandex.ru/track/{last_track.id}"
desc = last_track.short_description.split("\n")[0]
last_track.download_cover(filename="cover.png")
img_path = os.path.abspath("cover.png")
last_track.download(filename="file", codec="mp3")
orig_path = os.path.abspath("file")
path = os.path.abspath("file.mp3")
AudioSegment.from_file(orig_path).export(path)
os.remove(orig_path)
# set music meta
tag = MP3(path, ID3=ID3)
tag.tags.add(
APIC(
encoding=3, # 3 is for utf-8
mime="image/png", # image/jpeg or image/png
type=3, # 3 is for the cover image
desc="Cover",
data=open(img_path, "rb").read(),
)
)
tag.tags.add(TORY(text=str(album.year)))
tag.save()
tag = EasyID3(path)
tag["title"] = title
tag["album"] = album.title
tag.save()
with open(path, "rb") as tmp:
obj = BytesIO(tmp.read())
obj.name = f"{title}.mp3"
loop = asyncio.get_event_loop()
coroutine = bot.send_audio(
chat_id=CHAT_ID,
audio=obj,
caption=f"{title} - {album.title}\n{desc}\n\n{url}",
title=title,
performer=album.title,
)
loop.run_until_complete(coroutine)
else:
latest_podcast = last_track_id
latest_sent = False
except BaseException as e:
loop = asyncio.get_event_loop()
coroutine = bot.send_message(CHAT_ID, text=str(e))
loop.run_until_complete(coroutine)
sleep(5 * 60)

104
podcasts/yandex/podcasts.py Normal file
View File

@ -0,0 +1,104 @@
import asyncio
import os
from io import BytesIO
from time import sleep
from aiogram import Bot
from aiogram.bot.api import TelegramAPIServer
from mutagen.easyid3 import EasyID3
from mutagen.mp3 import MP3
from mutagen.id3 import APIC, ID3, TORY
from pydub import AudioSegment
from yandex_music import Client, Track
from dotenv import load_dotenv
load_dotenv(dotenv_path=".env")
YANDEX_TOKEN = os.getenv("YANDEX_TOKEN")
CHAT_ID = os.getenv("CHAT_ID")
TOKEN = os.getenv("BOT_TOKEN")
TELEGRAM_SERVER = os.getenv("TELEGRAM_SERVER", default=None)
if TELEGRAM_SERVER:
local_server = TelegramAPIServer.from_base(TELEGRAM_SERVER)
bot = Bot(TOKEN, server=local_server)
else:
bot = Bot(TOKEN)
client = Client(YANDEX_TOKEN).init()
latest_podcast = None
latest_sent = True
podcasts_listened = []
while True:
try:
queues = client.queues_list()
last_queue = client.queue(queues[0].id)
last_track_id = last_queue.get_current_track()
last_track: Track = last_track_id.fetch_track()
if "podcast" in last_track.type:
if last_track_id not in podcasts_listened:
if last_track_id == latest_podcast and not latest_sent:
latest_sent = True
podcasts_listened.append(last_track_id)
title = last_track.title
album = last_track.albums[0]
url = f"https://music.yandex.ru/track/{last_track.id}"
desc = last_track.short_description.split("\n")[0]
last_track.download_cover(filename="cover.png")
img_path = os.path.abspath("cover.png")
last_track.download(filename="file", codec="mp3")
orig_path = os.path.abspath("file")
path = os.path.abspath("file.mp3")
AudioSegment.from_file(orig_path).export(path)
os.remove(orig_path)
# set music meta
tag = MP3(path, ID3=ID3)
tag.tags.add(
APIC(
encoding=3, # 3 is for utf-8
mime="image/png", # image/jpeg or image/png
type=3, # 3 is for the cover image
desc="Cover",
data=open(img_path, "rb").read(),
)
)
tag.tags.add(TORY(text=str(album.year)))
tag.save()
tag = EasyID3(path)
tag["title"] = title
tag["album"] = album.title
tag.save()
with open(path, "rb") as tmp:
obj = BytesIO(tmp.read())
obj.name = f"{title}.mp3"
loop = asyncio.get_event_loop()
coroutine = bot.send_audio(
chat_id=CHAT_ID,
audio=obj,
caption=f"{title} - {album.title}\n{desc}\n\n{url}",
title=title,
performer=album.title,
)
loop.run_until_complete(coroutine)
else:
latest_podcast = last_track_id
latest_sent = False
except BaseException as e:
loop = asyncio.get_event_loop()
coroutine = bot.send_message(CHAT_ID, text=str(e))
loop.run_until_complete(coroutine)
sleep(5 * 60)