mirror of
https://github.com/Alexander-D-Karpov/akarpov
synced 2024-11-22 06:16:34 +03:00
fixed track processing, youtube handling
This commit is contained in:
parent
9c32235926
commit
b76a40aa02
|
@ -13,7 +13,7 @@ def create_cropped_model_image(sender, instance, created, **kwargs):
|
||||||
"app_label": model._meta.app_label,
|
"app_label": model._meta.app_label,
|
||||||
"model_name": model._meta.model_name,
|
"model_name": model._meta.model_name,
|
||||||
},
|
},
|
||||||
countdown=2,
|
countdown=5,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
from akarpov.utils.files import crop_image
|
from akarpov.utils.files import crop_image
|
||||||
|
|
||||||
|
|
||||||
@shared_task()
|
@shared_task(max_retries=3)
|
||||||
def crop_model_image(pk: int, app_label: str, model_name: str):
|
def crop_model_image(pk: int, app_label: str, model_name: str):
|
||||||
model = apps.get_model(app_label=app_label, model_name=model_name)
|
model = apps.get_model(app_label=app_label, model_name=model_name)
|
||||||
instance = model.objects.get(pk=pk)
|
instance = model.objects.get(pk=pk)
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
import os
|
import os
|
||||||
|
import re
|
||||||
|
|
||||||
from deep_translator import GoogleTranslator
|
from deep_translator import GoogleTranslator
|
||||||
from django.core.files import File
|
from django.core.files import File
|
||||||
|
from django.db import transaction
|
||||||
from django.utils.text import slugify
|
from django.utils.text import slugify
|
||||||
from mutagen import File as MutagenFile
|
from mutagen import File as MutagenFile
|
||||||
from mutagen.id3 import APIC, ID3, TCON, TORY, TextFrame
|
from mutagen.id3 import APIC, ID3, TCON, TORY, TextFrame
|
||||||
|
@ -10,9 +12,24 @@
|
||||||
from pydub import AudioSegment
|
from pydub import AudioSegment
|
||||||
|
|
||||||
from akarpov.music.models import Album, Author, Song
|
from akarpov.music.models import Album, Author, Song
|
||||||
from akarpov.music.services.info import search_all_platforms
|
from akarpov.music.services.info import generate_readable_slug, search_all_platforms
|
||||||
from akarpov.users.models import User
|
from akarpov.users.models import User
|
||||||
from akarpov.utils.generators import generate_charset
|
|
||||||
|
|
||||||
|
def process_track_name(track_name: str) -> str:
|
||||||
|
# Split the track name by dash and parentheses
|
||||||
|
parts = track_name.split(" - ")
|
||||||
|
processed_parts = []
|
||||||
|
|
||||||
|
for part in parts:
|
||||||
|
if "feat" in part:
|
||||||
|
continue
|
||||||
|
if "(" in part:
|
||||||
|
part = part.split("(")[0].strip()
|
||||||
|
processed_parts.append(part)
|
||||||
|
|
||||||
|
processed_track_name = " - ".join(processed_parts)
|
||||||
|
return processed_track_name
|
||||||
|
|
||||||
|
|
||||||
def load_track(
|
def load_track(
|
||||||
|
@ -25,14 +42,31 @@ def load_track(
|
||||||
link: str | None = None,
|
link: str | None = None,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) -> Song:
|
) -> Song:
|
||||||
p_name = path.split("/")[-1]
|
p_name = process_track_name(
|
||||||
query = f"{name if name else p_name} - {album if album else ''} - {', '.join(authors) if authors else ''}"
|
" ".join(path.split("/")[-1].split(".")[0].strip().split())
|
||||||
|
)
|
||||||
|
query = (
|
||||||
|
f"{process_track_name(name) if name else p_name} "
|
||||||
|
f"- {album if album else ''} - {', '.join(authors) if authors else ''}"
|
||||||
|
)
|
||||||
search_info = search_all_platforms(query)
|
search_info = search_all_platforms(query)
|
||||||
|
orig_name = name if name else p_name
|
||||||
|
|
||||||
if image_path and search_info.get("album_image", None):
|
if image_path and search_info.get("album_image", None):
|
||||||
os.remove(search_info["album_image"])
|
os.remove(search_info["album_image"])
|
||||||
|
if "title" in search_info:
|
||||||
|
title = re.sub(r"\W+", "", search_info["title"]).lower()
|
||||||
|
name_clean = re.sub(r"\W+", "", name).lower()
|
||||||
|
|
||||||
|
# Check if title is in name
|
||||||
|
if title in name_clean:
|
||||||
|
name = search_info["title"]
|
||||||
|
else:
|
||||||
|
name = process_track_name(" ".join(p_name.strip().split("-")))
|
||||||
|
|
||||||
|
if not name:
|
||||||
|
name = orig_name
|
||||||
|
|
||||||
name = name or search_info.get("title", p_name)
|
|
||||||
album = album or search_info.get("album_name", None)
|
album = album or search_info.get("album_name", None)
|
||||||
authors = authors or search_info.get("artists", [])
|
authors = authors or search_info.get("artists", [])
|
||||||
genre = kwargs.get("genre") or search_info.get("genre", None)
|
genre = kwargs.get("genre") or search_info.get("genre", None)
|
||||||
|
@ -47,12 +81,21 @@ def load_track(
|
||||||
re_authors = []
|
re_authors = []
|
||||||
if authors:
|
if authors:
|
||||||
for x in authors:
|
for x in authors:
|
||||||
try:
|
while True:
|
||||||
re_authors.append(Author.objects.get(name=x))
|
try:
|
||||||
except Author.DoesNotExist:
|
with transaction.atomic():
|
||||||
re_authors.append(Author.objects.create(name=x))
|
author, created = Author.objects.get_or_create(
|
||||||
|
name__iexact=x, defaults={"name": x}
|
||||||
|
)
|
||||||
|
re_authors.append(author)
|
||||||
|
break
|
||||||
|
except Author.MultipleObjectsReturned:
|
||||||
|
# If multiple authors are found, delete all but one
|
||||||
|
Author.objects.filter(name__iexact=x).exclude(
|
||||||
|
id=Author.objects.filter(name__iexact=x).first().id
|
||||||
|
).delete()
|
||||||
authors = re_authors
|
authors = re_authors
|
||||||
album_name = None
|
|
||||||
if album:
|
if album:
|
||||||
if type(album) is str:
|
if type(album) is str:
|
||||||
album_name = album
|
album_name = album
|
||||||
|
@ -61,12 +104,9 @@ def load_track(
|
||||||
else:
|
else:
|
||||||
album_name = None
|
album_name = None
|
||||||
if album_name:
|
if album_name:
|
||||||
try:
|
album, created = Album.objects.get_or_create(
|
||||||
album = Album.objects.get(name=album_name)
|
name__iexact=album_name, defaults={"name": album_name}
|
||||||
except Album.DoesNotExist:
|
)
|
||||||
album = Album.objects.create(name=album_name)
|
|
||||||
if not album_name:
|
|
||||||
album = None
|
|
||||||
|
|
||||||
if sng := Song.objects.filter(
|
if sng := Song.objects.filter(
|
||||||
name=name if name else p_name,
|
name=name if name else p_name,
|
||||||
|
@ -173,19 +213,7 @@ def load_track(
|
||||||
if os.path.exists(image_path):
|
if os.path.exists(image_path):
|
||||||
os.remove(image_path)
|
os.remove(image_path)
|
||||||
|
|
||||||
if generated_name and not Song.objects.filter(slug=generated_name).exists():
|
song.slug = generate_readable_slug(song.name, Song)
|
||||||
if len(generated_name) > 20:
|
song.save()
|
||||||
generated_name = generated_name.split("-")[0]
|
|
||||||
if len(generated_name) > 20:
|
|
||||||
generated_name = generated_name[:20]
|
|
||||||
if not Song.objects.filter(slug=generated_name).exists():
|
|
||||||
song.slug = generated_name
|
|
||||||
song.save()
|
|
||||||
else:
|
|
||||||
song.slug = generated_name[:14] + "_" + generate_charset(5)
|
|
||||||
song.save()
|
|
||||||
else:
|
|
||||||
song.slug = generated_name
|
|
||||||
song.save()
|
|
||||||
|
|
||||||
return song
|
return song
|
||||||
|
|
|
@ -16,6 +16,34 @@
|
||||||
from akarpov.utils.text import is_similar_artist, normalize_text
|
from akarpov.utils.text import is_similar_artist, normalize_text
|
||||||
|
|
||||||
|
|
||||||
|
def generate_readable_slug(name: str, model) -> str:
|
||||||
|
# Translate and slugify the name
|
||||||
|
slug = str(
|
||||||
|
slugify(
|
||||||
|
GoogleTranslator(source="auto", target="en").translate(
|
||||||
|
name,
|
||||||
|
target_language="en",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(slug) > 20:
|
||||||
|
slug = slug[:20]
|
||||||
|
last_dash = slug.rfind("-")
|
||||||
|
if last_dash != -1:
|
||||||
|
slug = slug[:last_dash]
|
||||||
|
|
||||||
|
while model.objects.filter(slug=slug).exists():
|
||||||
|
if len(slug) > 14:
|
||||||
|
slug = slug[:14]
|
||||||
|
last_dash = slug.rfind("-")
|
||||||
|
if last_dash != -1:
|
||||||
|
slug = slug[:last_dash]
|
||||||
|
slug = slug + "_" + generate_charset(5)
|
||||||
|
|
||||||
|
return slug
|
||||||
|
|
||||||
|
|
||||||
def create_spotify_session() -> spotipy.Spotify:
|
def create_spotify_session() -> spotipy.Spotify:
|
||||||
if not settings.MUSIC_SPOTIFY_ID or not settings.MUSIC_SPOTIFY_SECRET:
|
if not settings.MUSIC_SPOTIFY_ID or not settings.MUSIC_SPOTIFY_SECRET:
|
||||||
raise ConnectionError("No spotify credentials provided")
|
raise ConnectionError("No spotify credentials provided")
|
||||||
|
@ -197,15 +225,6 @@ def update_album_info(album: AlbumModel, author_name: str = None) -> None:
|
||||||
|
|
||||||
# Combine and prioritize Spotify data
|
# Combine and prioritize Spotify data
|
||||||
album_data = {}
|
album_data = {}
|
||||||
if yandex_album_info:
|
|
||||||
album_data.update(
|
|
||||||
{
|
|
||||||
"name": album_data.get("name", yandex_album_info.title),
|
|
||||||
"genre": album_data.get("genre", yandex_album_info.genre),
|
|
||||||
"description": yandex_album_info.description,
|
|
||||||
"type": yandex_album_info.type,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
if spotify_album_info:
|
if spotify_album_info:
|
||||||
album_data = {
|
album_data = {
|
||||||
|
@ -215,6 +234,15 @@ def update_album_info(album: AlbumModel, author_name: str = None) -> None:
|
||||||
"link": spotify_album_info["external_urls"]["spotify"],
|
"link": spotify_album_info["external_urls"]["spotify"],
|
||||||
"genre": spotify_album_info.get("genres", []),
|
"genre": spotify_album_info.get("genres", []),
|
||||||
}
|
}
|
||||||
|
if yandex_album_info:
|
||||||
|
album_data.update(
|
||||||
|
{
|
||||||
|
"name": album_data.get("name", yandex_album_info.title),
|
||||||
|
"genre": album_data.get("genre", yandex_album_info.genre),
|
||||||
|
"description": yandex_album_info.description,
|
||||||
|
"type": yandex_album_info.type,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
album.meta = album_data
|
album.meta = album_data
|
||||||
album.save()
|
album.save()
|
||||||
|
@ -262,20 +290,8 @@ def update_album_info(album: AlbumModel, author_name: str = None) -> None:
|
||||||
album_authors.append(author)
|
album_authors.append(author)
|
||||||
album.authors.set(album_authors)
|
album.authors.set(album_authors)
|
||||||
|
|
||||||
if generated_name and not AlbumModel.objects.filter(slug=generated_name).exists():
|
album.slug = generate_readable_slug(album.name, AlbumModel)
|
||||||
if len(generated_name) > 20:
|
album.save()
|
||||||
generated_name = generated_name.split("-")[0]
|
|
||||||
if len(generated_name) > 20:
|
|
||||||
generated_name = generated_name[:20]
|
|
||||||
if not AlbumModel.objects.filter(slug=generated_name).exists():
|
|
||||||
album.slug = generated_name
|
|
||||||
album.save()
|
|
||||||
else:
|
|
||||||
album.slug = generated_name[:14] + "_" + generate_charset(5)
|
|
||||||
album.save()
|
|
||||||
else:
|
|
||||||
album.slug = generated_name
|
|
||||||
album.save()
|
|
||||||
|
|
||||||
|
|
||||||
def update_author_info(author: Author) -> None:
|
def update_author_info(author: Author) -> None:
|
||||||
|
@ -288,6 +304,13 @@ def update_author_info(author: Author) -> None:
|
||||||
|
|
||||||
# Combine and prioritize Spotify data
|
# Combine and prioritize Spotify data
|
||||||
author_data = {}
|
author_data = {}
|
||||||
|
if spotify_artist_info:
|
||||||
|
author_data = {
|
||||||
|
"name": spotify_artist_info.get("name", author.name),
|
||||||
|
"genres": spotify_artist_info.get("genres", []),
|
||||||
|
"popularity": spotify_artist_info.get("popularity", 0),
|
||||||
|
"link": spotify_artist_info["external_urls"]["spotify"],
|
||||||
|
}
|
||||||
if yandex_artist_info:
|
if yandex_artist_info:
|
||||||
author_data.update(
|
author_data.update(
|
||||||
{
|
{
|
||||||
|
@ -297,14 +320,6 @@ def update_author_info(author: Author) -> None:
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
if spotify_artist_info:
|
|
||||||
author_data = {
|
|
||||||
"name": spotify_artist_info.get("name", author.name),
|
|
||||||
"genres": spotify_artist_info.get("genres", []),
|
|
||||||
"popularity": spotify_artist_info.get("popularity", 0),
|
|
||||||
"link": spotify_artist_info["external_urls"]["spotify"],
|
|
||||||
}
|
|
||||||
|
|
||||||
author.meta = author_data
|
author.meta = author_data
|
||||||
author.save()
|
author.save()
|
||||||
|
|
||||||
|
@ -337,20 +352,8 @@ def update_author_info(author: Author) -> None:
|
||||||
os.remove(image_path)
|
os.remove(image_path)
|
||||||
author.save()
|
author.save()
|
||||||
|
|
||||||
if generated_name and not Author.objects.filter(slug=generated_name).exists():
|
author.slug = generate_readable_slug(author.name, Author)
|
||||||
if len(generated_name) > 20:
|
author.save()
|
||||||
generated_name = generated_name.split("-")[0]
|
|
||||||
if len(generated_name) > 20:
|
|
||||||
generated_name = generated_name[:20]
|
|
||||||
if not Author.objects.filter(slug=generated_name).exists():
|
|
||||||
author.slug = generated_name
|
|
||||||
author.save()
|
|
||||||
else:
|
|
||||||
author.slug = generated_name[:14] + "_" + generate_charset(5)
|
|
||||||
author.save()
|
|
||||||
else:
|
|
||||||
author.slug = generated_name
|
|
||||||
author.save()
|
|
||||||
|
|
||||||
|
|
||||||
def search_all_platforms(track_name: str) -> dict:
|
def search_all_platforms(track_name: str) -> dict:
|
||||||
|
@ -373,7 +376,6 @@ def search_all_platforms(track_name: str) -> dict:
|
||||||
for existing_artist in combined_artists
|
for existing_artist in combined_artists
|
||||||
):
|
):
|
||||||
combined_artists.add(normalized_artist)
|
combined_artists.add(normalized_artist)
|
||||||
|
|
||||||
genre = spotify_info.get("genre") or yandex_info.get("genre")
|
genre = spotify_info.get("genre") or yandex_info.get("genre")
|
||||||
if type(genre) is list:
|
if type(genre) is list:
|
||||||
genre = sorted(genre, key=lambda x: len(x))
|
genre = sorted(genre, key=lambda x: len(x))
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
import requests
|
import requests
|
||||||
import yt_dlp
|
import yt_dlp
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
from django.utils.text import slugify
|
||||||
from PIL import Image
|
from PIL import Image
|
||||||
from pydub import AudioSegment
|
from pydub import AudioSegment
|
||||||
from pytube import Search, YouTube
|
from pytube import Search, YouTube
|
||||||
|
@ -75,9 +76,15 @@ def download_from_youtube_link(link: str, user_id: int) -> Song:
|
||||||
|
|
||||||
# convert to mp3
|
# convert to mp3
|
||||||
print(f"[processing] {title} converting to mp3")
|
print(f"[processing] {title} converting to mp3")
|
||||||
path = orig_path.replace(orig_path.split(".")[-1], "mp3")
|
path = (
|
||||||
|
"/".join(orig_path.split("/")[:-1])
|
||||||
|
+ "/"
|
||||||
|
+ slugify(orig_path.split("/")[-1].split(".")[0])
|
||||||
|
+ ".mp3"
|
||||||
|
)
|
||||||
AudioSegment.from_file(orig_path).export(path)
|
AudioSegment.from_file(orig_path).export(path)
|
||||||
os.remove(orig_path)
|
if orig_path != path:
|
||||||
|
os.remove(orig_path)
|
||||||
print(f"[processing] {title} converting to mp3: done")
|
print(f"[processing] {title} converting to mp3: done")
|
||||||
|
|
||||||
# split in chapters
|
# split in chapters
|
||||||
|
@ -175,7 +182,8 @@ def download_from_youtube_link(link: str, user_id: int) -> Song:
|
||||||
info["album_name"],
|
info["album_name"],
|
||||||
title,
|
title,
|
||||||
)
|
)
|
||||||
os.remove(path)
|
if os.path.exists(path):
|
||||||
|
os.remove(path)
|
||||||
|
|
||||||
return song
|
return song
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,7 @@ def auto_delete_file_on_delete(sender, instance, **kwargs):
|
||||||
|
|
||||||
@receiver(post_save, sender=Song)
|
@receiver(post_save, sender=Song)
|
||||||
def song_create(sender, instance: Song, created, **kwargs):
|
def song_create(sender, instance: Song, created, **kwargs):
|
||||||
if instance.volume is None:
|
if instance.volume is None and instance.file:
|
||||||
set_song_volume(instance)
|
set_song_volume(instance)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -87,8 +87,6 @@ def start_next_song(previous_ids: list):
|
||||||
async_to_sync(channel_layer.group_send)(
|
async_to_sync(channel_layer.group_send)(
|
||||||
"radio_main", {"type": "song", "data": data}
|
"radio_main", {"type": "song", "data": data}
|
||||||
)
|
)
|
||||||
song.played += 1
|
|
||||||
song.save(update_fields=["played"])
|
|
||||||
if RadioSong.objects.filter(slug="").exists():
|
if RadioSong.objects.filter(slug="").exists():
|
||||||
r = RadioSong.objects.get(slug="")
|
r = RadioSong.objects.get(slug="")
|
||||||
r.song = song
|
r.song = song
|
||||||
|
|
Loading…
Reference in New Issue
Block a user