mirror of
https://github.com/Alexander-D-Karpov/akarpov
synced 2025-07-28 22:39:46 +03:00
Compare commits
17 Commits
2f68af0b07
...
65ec88a8bf
Author | SHA1 | Date | |
---|---|---|---|
|
65ec88a8bf | ||
03c05d7adf | |||
0b439f43a5 | |||
6c15494aab | |||
bf182dbd0a | |||
f268212094 | |||
f4ca7db696 | |||
600c903a68 | |||
c3de9b45b8 | |||
66bd074149 | |||
b72ebe6e8c | |||
85e8e3fe8b | |||
2a7f1eae88 | |||
398820489f | |||
a2607cd0d0 | |||
1b9b1d2bb5 | |||
|
7d359c56dc |
|
@ -88,8 +88,12 @@ def get_liked(self, obj):
|
|||
|
||||
@extend_schema_field(ListAlbumSerializer)
|
||||
def get_album(self, obj):
|
||||
if obj.album:
|
||||
return ListAlbumSerializer(Album.objects.cache().get(id=obj.album_id)).data
|
||||
if obj.album_id:
|
||||
try:
|
||||
album = Album.objects.cache().get(id=obj.album_id)
|
||||
return ListAlbumSerializer(album).data
|
||||
except Album.DoesNotExist:
|
||||
return None
|
||||
return None
|
||||
|
||||
@extend_schema_field(ListAuthorSerializer(many=True))
|
||||
|
@ -105,16 +109,17 @@ def get_image(self, obj):
|
|||
img = None
|
||||
if obj.image_cropped:
|
||||
img = obj.image_cropped
|
||||
else:
|
||||
album = Album.objects.cache().get(id=obj.album_id)
|
||||
if album.image_cropped:
|
||||
img = album.image_cropped
|
||||
else:
|
||||
authors = Author.objects.cache().filter(
|
||||
Q(songs__id=obj.id) & ~Q(image="")
|
||||
)
|
||||
if authors:
|
||||
img = authors.first().image_cropped
|
||||
elif obj.album_id:
|
||||
try:
|
||||
album = Album.objects.cache().get(id=obj.album_id)
|
||||
if album.image_cropped:
|
||||
img = album.image_cropped
|
||||
except Album.DoesNotExist:
|
||||
pass
|
||||
if not img:
|
||||
authors = Author.objects.cache().filter(Q(songs__id=obj.id) & ~Q(image=""))
|
||||
if authors.exists():
|
||||
img = authors.first().image_cropped
|
||||
if img:
|
||||
return self.context["request"].build_absolute_uri(img.url)
|
||||
return None
|
||||
|
|
|
@ -11,7 +11,7 @@ class SongDocument(Document):
|
|||
properties={
|
||||
"name": fields.TextField(
|
||||
fields={
|
||||
"raw": fields.KeywordField(normalizer="lowercase"),
|
||||
"raw": fields.KeywordField(normalizer="lowercase_normalizer"),
|
||||
},
|
||||
),
|
||||
"name_transliterated": fields.TextField(
|
||||
|
@ -21,7 +21,13 @@ class SongDocument(Document):
|
|||
},
|
||||
),
|
||||
"link": fields.TextField(),
|
||||
"meta": fields.ObjectField(dynamic=True),
|
||||
"meta": fields.ObjectField(
|
||||
dynamic=True,
|
||||
properties={
|
||||
"genre": fields.TextField(),
|
||||
"release_year": fields.KeywordField(),
|
||||
},
|
||||
),
|
||||
},
|
||||
)
|
||||
|
||||
|
@ -30,7 +36,7 @@ class SongDocument(Document):
|
|||
properties={
|
||||
"name": fields.TextField(
|
||||
fields={
|
||||
"raw": fields.KeywordField(normalizer="lowercase"),
|
||||
"raw": fields.KeywordField(normalizer="lowercase_normalizer"),
|
||||
},
|
||||
),
|
||||
"name_transliterated": fields.TextField(
|
||||
|
@ -40,7 +46,13 @@ class SongDocument(Document):
|
|||
},
|
||||
),
|
||||
"link": fields.TextField(),
|
||||
"meta": fields.ObjectField(dynamic=True),
|
||||
"meta": fields.ObjectField(
|
||||
dynamic=True,
|
||||
properties={
|
||||
"genre": fields.TextField(),
|
||||
"release_year": fields.KeywordField(),
|
||||
},
|
||||
),
|
||||
},
|
||||
)
|
||||
|
||||
|
@ -48,7 +60,7 @@ class SongDocument(Document):
|
|||
attr="name",
|
||||
fields={
|
||||
"raw": fields.KeywordField(),
|
||||
"exact": fields.KeywordField(normalizer="lowercase"),
|
||||
"exact": fields.KeywordField(normalizer="lowercase_normalizer"),
|
||||
},
|
||||
)
|
||||
name_transliterated = fields.TextField(
|
||||
|
@ -60,7 +72,13 @@ class SongDocument(Document):
|
|||
)
|
||||
suggest = fields.CompletionField()
|
||||
|
||||
meta = fields.ObjectField(dynamic=True)
|
||||
meta = fields.ObjectField(
|
||||
dynamic=True,
|
||||
properties={
|
||||
"genre": fields.TextField(),
|
||||
"release_year": fields.KeywordField(),
|
||||
},
|
||||
)
|
||||
|
||||
class Index:
|
||||
name = "songs"
|
||||
|
@ -68,6 +86,13 @@ class Index:
|
|||
"number_of_shards": 1,
|
||||
"number_of_replicas": 0,
|
||||
"analysis": {
|
||||
"normalizer": {
|
||||
"lowercase_normalizer": {
|
||||
"type": "custom",
|
||||
"char_filter": [],
|
||||
"filter": ["lowercase"],
|
||||
}
|
||||
},
|
||||
"filter": {
|
||||
"my_transliterator": {
|
||||
"type": "icu_transform",
|
||||
|
@ -144,8 +169,8 @@ class Index:
|
|||
"filter": [
|
||||
"lowercase",
|
||||
"autocomplete_filter",
|
||||
"english_stemmer", # Apply English stemming for autocomplete
|
||||
"russian_stemmer", # Include Russian stemming if applicable
|
||||
"english_stemmer",
|
||||
"russian_stemmer",
|
||||
],
|
||||
},
|
||||
"search_synonym_with_stemming": {
|
||||
|
@ -154,8 +179,8 @@ class Index:
|
|||
"filter": [
|
||||
"lowercase",
|
||||
"synonym_filter",
|
||||
"english_stemmer", # Apply English stemming for synonym search
|
||||
"russian_stemmer", # Include Russian stemming if processing Russian synonyms
|
||||
"english_stemmer",
|
||||
"russian_stemmer",
|
||||
],
|
||||
},
|
||||
},
|
||||
|
@ -176,7 +201,7 @@ class AuthorDocument(Document):
|
|||
name = fields.TextField(
|
||||
fields={
|
||||
"raw": fields.KeywordField(),
|
||||
"exact": fields.KeywordField(normalizer="lowercase"),
|
||||
"exact": fields.KeywordField(normalizer="lowercase_normalizer"),
|
||||
},
|
||||
)
|
||||
name_transliterated = fields.TextField(
|
||||
|
@ -187,11 +212,18 @@ class AuthorDocument(Document):
|
|||
},
|
||||
)
|
||||
suggest = fields.CompletionField()
|
||||
meta = fields.ObjectField(dynamic=True)
|
||||
meta = fields.ObjectField(
|
||||
dynamic=True,
|
||||
properties={
|
||||
"description": fields.TextField(),
|
||||
# Ensure no empty date fields here either
|
||||
"popularity": fields.IntegerField(),
|
||||
},
|
||||
)
|
||||
|
||||
class Index:
|
||||
name = "authors"
|
||||
settings = SongDocument.Index.settings # Reuse settings
|
||||
settings = SongDocument.Index.settings
|
||||
|
||||
class Django:
|
||||
model = Author
|
||||
|
@ -202,7 +234,7 @@ class AlbumDocument(Document):
|
|||
name = fields.TextField(
|
||||
fields={
|
||||
"raw": fields.KeywordField(),
|
||||
"exact": fields.KeywordField(normalizer="lowercase"),
|
||||
"exact": fields.KeywordField(normalizer="lowercase_normalizer"),
|
||||
},
|
||||
)
|
||||
name_transliterated = fields.TextField(
|
||||
|
@ -213,7 +245,13 @@ class AlbumDocument(Document):
|
|||
},
|
||||
)
|
||||
suggest = fields.CompletionField()
|
||||
meta = fields.ObjectField(dynamic=True)
|
||||
meta = fields.ObjectField(
|
||||
dynamic=True,
|
||||
properties={
|
||||
"genre": fields.TextField(),
|
||||
"release_year": fields.KeywordField(),
|
||||
},
|
||||
)
|
||||
authors = fields.NestedField(
|
||||
attr="authors",
|
||||
properties={
|
||||
|
@ -223,7 +261,6 @@ class AlbumDocument(Document):
|
|||
},
|
||||
),
|
||||
"name_transliterated": fields.TextField(
|
||||
attr="name",
|
||||
analyzer="transliterate",
|
||||
fields={
|
||||
"raw": fields.KeywordField(),
|
||||
|
@ -236,7 +273,7 @@ class AlbumDocument(Document):
|
|||
|
||||
class Index:
|
||||
name = "albums"
|
||||
settings = SongDocument.Index.settings # Reuse settings
|
||||
settings = SongDocument.Index.settings
|
||||
|
||||
class Django:
|
||||
model = Album
|
||||
|
|
|
@ -8,7 +8,7 @@
|
|||
except requests.exceptions.JSONDecodeError:
|
||||
print("Failed to initialize GoogleTranslator due to external API issues.")
|
||||
from django.core.files import File
|
||||
from django.db import transaction
|
||||
from django.db import IntegrityError, transaction
|
||||
from django.utils.text import slugify
|
||||
from mutagen import File as MutagenFile
|
||||
from mutagen.id3 import APIC, ID3, TCON, TORY, TextFrame
|
||||
|
@ -19,14 +19,64 @@
|
|||
from akarpov.music.models import Album, Author, Song
|
||||
from akarpov.music.services.info import generate_readable_slug, search_all_platforms
|
||||
from akarpov.users.models import User
|
||||
from akarpov.utils.generators import generate_charset # Import generate_charset
|
||||
|
||||
|
||||
def get_or_create_author(author_name):
|
||||
"""Get or create author with unique slug."""
|
||||
with transaction.atomic():
|
||||
author = Author.objects.filter(name__iexact=author_name).order_by("id").first()
|
||||
if author is None:
|
||||
author = Author.objects.create(name=author_name)
|
||||
return author
|
||||
for attempt in range(5):
|
||||
try:
|
||||
slug = generate_readable_slug(author_name, Author)
|
||||
author = Author.objects.create(name=author_name, slug=slug)
|
||||
return author
|
||||
except IntegrityError:
|
||||
# Slug conflict, retry slug generation
|
||||
continue
|
||||
else:
|
||||
# If we still fail, get the existing author
|
||||
author = (
|
||||
Author.objects.filter(name__iexact=author_name)
|
||||
.order_by("id")
|
||||
.first()
|
||||
)
|
||||
if author:
|
||||
return author
|
||||
else:
|
||||
raise Exception("Failed to create or get author")
|
||||
else:
|
||||
return author
|
||||
|
||||
|
||||
def get_or_create_album(album_name):
|
||||
"""Get or create album with unique slug."""
|
||||
if not album_name:
|
||||
return None
|
||||
|
||||
with transaction.atomic():
|
||||
album = Album.objects.filter(name__iexact=album_name).order_by("id").first()
|
||||
if album is None:
|
||||
for attempt in range(5):
|
||||
try:
|
||||
slug = generate_readable_slug(album_name, Album)
|
||||
album = Album.objects.create(name=album_name, slug=slug)
|
||||
return album
|
||||
except IntegrityError:
|
||||
# Slug conflict, retry slug generation
|
||||
continue
|
||||
else:
|
||||
# If we still fail, get the existing album
|
||||
album = (
|
||||
Album.objects.filter(name__iexact=album_name).order_by("id").first()
|
||||
)
|
||||
if album:
|
||||
return album
|
||||
else:
|
||||
raise Exception("Failed to create or get album")
|
||||
else:
|
||||
return album
|
||||
|
||||
|
||||
def process_track_name(track_name: str) -> str:
|
||||
|
@ -98,20 +148,18 @@ def load_track(
|
|||
kwargs["release"] if "release" in kwargs else search_info.get("release", None)
|
||||
)
|
||||
|
||||
if album and type(album) is str and album.startswith("['"):
|
||||
if album and isinstance(album, str) and album.startswith("['"):
|
||||
album = album.replace("['", "").replace("']", "")
|
||||
|
||||
if album:
|
||||
if type(album) is str:
|
||||
if isinstance(album, str):
|
||||
album_name = album
|
||||
elif type(album) is list:
|
||||
elif isinstance(album, list):
|
||||
album_name = album[0]
|
||||
else:
|
||||
album_name = None
|
||||
if album_name:
|
||||
album, created = Album.objects.get_or_create(
|
||||
name__iexact=album_name, defaults={"name": album_name}
|
||||
)
|
||||
album = get_or_create_album(album_name)
|
||||
|
||||
processed_authors = []
|
||||
if authors:
|
||||
|
@ -121,7 +169,7 @@ def load_track(
|
|||
authors = processed_authors
|
||||
|
||||
if sng := Song.objects.filter(
|
||||
name=name if name else p_name,
|
||||
name__iexact=name if name else p_name,
|
||||
authors__id__in=[x.id for x in authors],
|
||||
album=album,
|
||||
):
|
||||
|
@ -186,15 +234,29 @@ def load_track(
|
|||
|
||||
new_file_name = generated_name + ".mp3"
|
||||
|
||||
if image_path:
|
||||
with open(path, "rb") as file, open(image_path, "rb") as image:
|
||||
song.image = File(image, name=generated_name + ".png")
|
||||
song.file = File(file, name=new_file_name)
|
||||
song.save()
|
||||
# Generate unique slug for the song
|
||||
song.slug = generate_readable_slug(name if name else p_name, Song)
|
||||
|
||||
# Try to save the song, handling potential slug conflicts
|
||||
for attempt in range(5):
|
||||
try:
|
||||
if image_path:
|
||||
with open(path, "rb") as file, open(image_path, "rb") as image:
|
||||
song.image = File(image, name=generated_name + ".png")
|
||||
song.file = File(file, name=new_file_name)
|
||||
song.save()
|
||||
else:
|
||||
with open(path, "rb") as file:
|
||||
song.file = File(file, name=new_file_name)
|
||||
song.save()
|
||||
break # Successfully saved the song
|
||||
except IntegrityError:
|
||||
# Slug conflict, generate a new slug using generate_charset
|
||||
song.slug = generate_readable_slug(
|
||||
song.name + "_" + generate_charset(5), Song
|
||||
)
|
||||
else:
|
||||
with open(path, "rb") as file:
|
||||
song.file = File(file, name=new_file_name)
|
||||
song.save()
|
||||
raise Exception("Failed to save song with unique slug after multiple attempts")
|
||||
|
||||
if not album.image and song.image:
|
||||
album.image = song.image
|
||||
|
@ -203,7 +265,7 @@ def load_track(
|
|||
if authors:
|
||||
song.authors.set([x.id for x in authors])
|
||||
|
||||
# set music meta
|
||||
# Set music metadata
|
||||
tag = MutagenFile(song.file.path)
|
||||
tag["title"] = TextFrame(encoding=3, text=[name])
|
||||
if album:
|
||||
|
@ -236,7 +298,4 @@ def load_track(
|
|||
if os.path.exists(image_path):
|
||||
os.remove(image_path)
|
||||
|
||||
song.slug = generate_readable_slug(song.name, Song)
|
||||
song.save()
|
||||
|
||||
return song
|
||||
|
|
144
akarpov/music/services/external.py
Normal file
144
akarpov/music/services/external.py
Normal file
|
@ -0,0 +1,144 @@
|
|||
import os
|
||||
from functools import wraps
|
||||
from random import randint
|
||||
from typing import Any
|
||||
|
||||
import requests
|
||||
import structlog
|
||||
from django.conf import settings
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
class ExternalServiceClient:
|
||||
def __init__(self):
|
||||
self.base_url = getattr(settings, "MUSIC_EXTERNAL_SERVICE_URL", None)
|
||||
|
||||
def _make_request(
|
||||
self, endpoint: str, params: dict = None, **kwargs
|
||||
) -> dict[str, Any] | None:
|
||||
if not self.base_url:
|
||||
return None
|
||||
|
||||
url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
|
||||
try:
|
||||
response = requests.post(url, params=params, **kwargs)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(
|
||||
"External service request failed",
|
||||
error=str(e),
|
||||
endpoint=endpoint,
|
||||
params=params,
|
||||
)
|
||||
return None
|
||||
|
||||
def get_spotify_info(self, track_name: str) -> dict[str, Any] | None:
|
||||
response = self._make_request("/spotify/search", params={"query": track_name})
|
||||
if not response:
|
||||
return None
|
||||
|
||||
if "album_image" in response:
|
||||
# Download and save image
|
||||
image_path = self._download_image(response["album_image"])
|
||||
if image_path:
|
||||
response["album_image_path"] = image_path
|
||||
|
||||
return response
|
||||
|
||||
def get_spotify_album_info(self, album_name: str) -> dict[str, Any] | None:
|
||||
response = self._make_request("/spotify/album", params={"query": album_name})
|
||||
if not response:
|
||||
return None
|
||||
|
||||
if response.get("images"):
|
||||
image_path = self._download_image(response["images"][0].get("url"))
|
||||
if image_path:
|
||||
response["image_path"] = image_path
|
||||
|
||||
return response
|
||||
|
||||
def get_spotify_artist_info(self, artist_name: str) -> dict[str, Any] | None:
|
||||
response = self._make_request("/spotify/artist", params={"query": artist_name})
|
||||
if not response:
|
||||
return None
|
||||
|
||||
if response.get("images"):
|
||||
image_path = self._download_image(response["images"][0].get("url"))
|
||||
if image_path:
|
||||
response["image_path"] = image_path
|
||||
|
||||
return response
|
||||
|
||||
def _download_image(self, url: str) -> str | None:
|
||||
if not url:
|
||||
return None
|
||||
|
||||
try:
|
||||
response = requests.get(url)
|
||||
if response.status_code == 200:
|
||||
image_path = os.path.join(
|
||||
settings.MEDIA_ROOT, f"tmp_{randint(10000, 99999)}.png"
|
||||
)
|
||||
with open(image_path, "wb") as f:
|
||||
f.write(response.content)
|
||||
return image_path
|
||||
except Exception as e:
|
||||
logger.error("Failed to download image", error=str(e), url=url)
|
||||
return None
|
||||
|
||||
def translate_text(
|
||||
self, text: str, source_lang: str = "auto", target_lang: str = "en"
|
||||
) -> str | None:
|
||||
response = self._make_request(
|
||||
"/translation/translate",
|
||||
json={"text": text, "source_lang": source_lang, "target_lang": target_lang},
|
||||
)
|
||||
return response.get("translated_text") if response else None
|
||||
|
||||
|
||||
def external_service_fallback(fallback_func):
|
||||
"""Decorator to try external service first, then fall back to local implementation"""
|
||||
|
||||
@wraps(fallback_func)
|
||||
def wrapper(*args, **kwargs):
|
||||
if (
|
||||
not hasattr(settings, "MUSIC_EXTERNAL_SERVICE_URL")
|
||||
or not settings.MUSIC_EXTERNAL_SERVICE_URL
|
||||
):
|
||||
return fallback_func(*args, **kwargs)
|
||||
|
||||
client = ExternalServiceClient()
|
||||
try:
|
||||
if fallback_func.__name__ == "get_spotify_info":
|
||||
result = client.get_spotify_info(args[1]) # args[1] is track_name
|
||||
elif fallback_func.__name__ == "get_spotify_album_info":
|
||||
result = client.get_spotify_album_info(args[0]) # args[0] is album_name
|
||||
elif fallback_func.__name__ == "get_spotify_artist_info":
|
||||
result = client.get_spotify_artist_info(
|
||||
args[0]
|
||||
) # args[0] is artist_name
|
||||
elif fallback_func.__name__ == "safe_translate":
|
||||
result = client.translate_text(args[0]) # args[0] is text
|
||||
else:
|
||||
logger.warning(
|
||||
"Unknown function for external service fallback",
|
||||
function=fallback_func.__name__,
|
||||
)
|
||||
return fallback_func(*args, **kwargs)
|
||||
|
||||
if result:
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"External service failed, falling back to local implementation",
|
||||
error=str(e),
|
||||
function=fallback_func.__name__,
|
||||
args=args,
|
||||
)
|
||||
|
||||
return fallback_func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
|
@ -1,4 +1,5 @@
|
|||
import os
|
||||
import time
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from random import randint
|
||||
|
@ -6,11 +7,19 @@
|
|||
import librosa
|
||||
import mutagen
|
||||
import numpy as np
|
||||
from django.core.files import File
|
||||
from django.db import IntegrityError, transaction
|
||||
from django.db.models import Count
|
||||
from django.utils.text import slugify
|
||||
from mutagen.id3 import ID3
|
||||
from mutagen.mp3 import MP3
|
||||
from PIL import Image, UnidentifiedImageError
|
||||
|
||||
from akarpov.music.models import Song
|
||||
from akarpov.common.tasks import crop_model_image
|
||||
from akarpov.music.models import Album, Author, Song
|
||||
from akarpov.music.services.db import load_track
|
||||
from akarpov.users.models import User
|
||||
from akarpov.utils.generators import generate_charset
|
||||
|
||||
|
||||
def load_dir(path: str, user_id: int):
|
||||
|
@ -83,3 +92,493 @@ def set_song_volume(song: Song):
|
|||
mp3_file = song.file.path
|
||||
song.volume = analyze_music_loudness(mp3_file)
|
||||
song.save(update_fields=["volume"])
|
||||
|
||||
|
||||
BATCH_SIZE = 10
|
||||
BATCH_CHECK_DELAY = 10 # seconds
|
||||
|
||||
|
||||
class FileProcessor:
|
||||
def __init__(self):
|
||||
self.failed_files: list[str] = []
|
||||
self.processed_files: set[str] = set()
|
||||
self.current_batch: dict[str, dict] = {}
|
||||
|
||||
def load_dir(self, path: str, user_id: int) -> tuple[list[str], int]:
|
||||
path = Path(path)
|
||||
files = list(path.glob("**/*.mp3"))
|
||||
total_files = len(files)
|
||||
|
||||
for i in range(0, len(files), BATCH_SIZE):
|
||||
batch = files[i : i + BATCH_SIZE] # noqa
|
||||
self._process_batch(batch, user_id)
|
||||
|
||||
# Wait and verify batch
|
||||
time.sleep(BATCH_CHECK_DELAY)
|
||||
self._verify_batch()
|
||||
|
||||
print(
|
||||
"Batch processed",
|
||||
processed=len(self.processed_files),
|
||||
failed=len(self.failed_files),
|
||||
total=total_files,
|
||||
remaining=total_files
|
||||
- len(self.processed_files)
|
||||
- len(self.failed_files),
|
||||
)
|
||||
|
||||
return self.failed_files, len(self.processed_files)
|
||||
|
||||
def _process_batch(self, files: list[Path], user_id: int):
|
||||
self.current_batch.clear()
|
||||
|
||||
for file_path in files:
|
||||
file_str = str(file_path)
|
||||
if file_str in self.processed_files or file_str in self.failed_files:
|
||||
continue
|
||||
|
||||
try:
|
||||
file_info = self._extract_file_info(file_str)
|
||||
if self._check_exists(file_info):
|
||||
self.processed_files.add(file_str)
|
||||
continue
|
||||
|
||||
self.current_batch[file_str] = file_info
|
||||
self._process_file(file_str, file_info, user_id)
|
||||
|
||||
except Exception as e:
|
||||
print("File processing failed", file=file_str, error=str(e))
|
||||
self.failed_files.append(file_str)
|
||||
|
||||
def _verify_batch(self):
|
||||
for file_path, info in self.current_batch.items():
|
||||
if not self._verify_file(file_path, info):
|
||||
print("File verification failed", file=file_path)
|
||||
self.failed_files.append(file_path)
|
||||
else:
|
||||
self.processed_files.add(file_path)
|
||||
|
||||
def _extract_file_info(self, path: str) -> dict:
|
||||
tag = mutagen.File(path, easy=True)
|
||||
return {
|
||||
"author": tag.get("artist"),
|
||||
"album": tag.get("album"),
|
||||
"name": tag.get("title", [path.split("/")[-1]])[0],
|
||||
"image": self._extract_image(path),
|
||||
}
|
||||
|
||||
def _extract_image(self, path: str) -> str | None:
|
||||
try:
|
||||
tags = ID3(path)
|
||||
pict = [x for x in tags.getall("APIC") if x]
|
||||
if not pict:
|
||||
return None
|
||||
|
||||
pict_data = pict[0].data
|
||||
im = Image.open(BytesIO(pict_data))
|
||||
image_path = f"/tmp/{randint(1, 1000000)}.png"
|
||||
while os.path.exists(image_path):
|
||||
image_path = f"/tmp/{randint(1, 1000000)}.png"
|
||||
im.save(image_path)
|
||||
return image_path
|
||||
except (UnidentifiedImageError, Exception) as e:
|
||||
print("Image extraction failed", error=str(e))
|
||||
return None
|
||||
|
||||
def _check_exists(self, info: dict) -> bool:
|
||||
query = Song.objects.filter(name=info["name"])
|
||||
if info["author"]:
|
||||
query = query.filter(authors__name__in=info["author"])
|
||||
if info["album"]:
|
||||
query = query.filter(album__name=info["album"])
|
||||
return query.exists()
|
||||
|
||||
def _verify_file(self, file_path: str, info: dict) -> bool:
|
||||
song = Song.objects.filter(name=info["name"], file__isnull=False).first()
|
||||
|
||||
if not song:
|
||||
return False
|
||||
|
||||
# Verify file exists and is readable
|
||||
if not os.path.exists(song.file.path):
|
||||
return False
|
||||
|
||||
# Verify image if it was expected
|
||||
if info["image"] and not song.image:
|
||||
return False
|
||||
|
||||
# Verify metadata
|
||||
if info["author"]:
|
||||
if not song.authors.filter(name__in=info["author"]).exists():
|
||||
return False
|
||||
if info["album"]:
|
||||
if not song.album or song.album.name != info["album"]:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _process_file(self, path: str, info: dict, user_id: int):
|
||||
try:
|
||||
song = load_track(
|
||||
path=path,
|
||||
image_path=info["image"],
|
||||
user_id=user_id,
|
||||
authors=info["author"],
|
||||
album=info["album"],
|
||||
name=info["name"],
|
||||
)
|
||||
if info["image"] and os.path.exists(info["image"]):
|
||||
os.remove(info["image"])
|
||||
|
||||
set_song_volume(song)
|
||||
|
||||
except Exception as e:
|
||||
print("File processing failed", file=path, error=str(e))
|
||||
self.failed_files.append(path)
|
||||
|
||||
|
||||
def clean_title_for_slug(title: str) -> str:
|
||||
"""Clean title for slug generation."""
|
||||
# Remove common suffixes
|
||||
suffixes = [
|
||||
"(Original Mix)",
|
||||
"(Radio Edit)",
|
||||
"(Extended Mix)",
|
||||
"(Official Video)",
|
||||
"(Music Video)",
|
||||
"(Lyric Video)",
|
||||
"(Audio)",
|
||||
"(Official Audio)",
|
||||
"(Visualizer)",
|
||||
"(Official Music Video)",
|
||||
"(Official Lyric Video)",
|
||||
]
|
||||
cleaned = title
|
||||
for suffix in suffixes:
|
||||
cleaned = cleaned.replace(suffix, "")
|
||||
return cleaned.strip()
|
||||
|
||||
|
||||
def process_authors_string(authors_str: str) -> list[str]:
|
||||
"""Split author string into individual author names."""
|
||||
if not authors_str:
|
||||
return []
|
||||
|
||||
# First split by major separators
|
||||
authors = []
|
||||
for part in authors_str.split("/"):
|
||||
for subpart in part.split("&"):
|
||||
# Handle various featuring cases
|
||||
for feat_marker in [
|
||||
" feat.",
|
||||
" ft.",
|
||||
" featuring.",
|
||||
" presents ",
|
||||
" pres. ",
|
||||
]:
|
||||
if feat_marker in subpart.lower():
|
||||
parts = subpart.lower().split(feat_marker, 1)
|
||||
authors.extend(part.strip() for part in parts)
|
||||
break
|
||||
else:
|
||||
# Handle collaboration markers
|
||||
if " x " in subpart:
|
||||
authors.extend(p.strip() for p in subpart.split(" x "))
|
||||
else:
|
||||
authors.append(subpart.strip())
|
||||
|
||||
# Remove duplicates while preserving order
|
||||
seen = set()
|
||||
return [x for x in authors if not (x.lower() in seen or seen.add(x.lower()))]
|
||||
|
||||
|
||||
def extract_mp3_metadata(file_path: str) -> dict | None:
|
||||
"""Extract metadata from MP3 file."""
|
||||
try:
|
||||
audio = MP3(file_path, ID3=ID3)
|
||||
tags = audio.tags if audio.tags else {}
|
||||
|
||||
# Get filename without extension for fallback
|
||||
base_filename = os.path.splitext(os.path.basename(file_path))[0]
|
||||
|
||||
metadata = {
|
||||
"title": None,
|
||||
"album": None,
|
||||
"artists": None,
|
||||
"genre": None,
|
||||
"release_year": None,
|
||||
"length": audio.info.length,
|
||||
"image_data": None,
|
||||
"image_mime": None,
|
||||
}
|
||||
|
||||
if tags:
|
||||
# Extract basic metadata with fallbacks
|
||||
metadata["title"] = str(tags.get("TIT2", "")) or base_filename
|
||||
metadata["album"] = str(tags.get("TALB", ""))
|
||||
metadata["artists"] = str(tags.get("TPE1", "")) or str(tags.get("TPE2", ""))
|
||||
metadata["genre"] = str(tags.get("TCON", ""))
|
||||
metadata["release_year"] = str(tags.get("TDRC", "")) or str(
|
||||
tags.get("TYER", "")
|
||||
)
|
||||
|
||||
# Extract cover art
|
||||
for tag in tags.getall("APIC"):
|
||||
if tag.type == 3: # Front cover
|
||||
metadata["image_data"] = tag.data
|
||||
metadata["image_mime"] = tag.mime
|
||||
break
|
||||
|
||||
# Clean up title if it came from filename
|
||||
if metadata["title"] == base_filename:
|
||||
parts = base_filename.split(" - ", 1)
|
||||
if len(parts) > 1 and not metadata["artists"]:
|
||||
metadata["artists"] = parts[0]
|
||||
metadata["title"] = parts[1]
|
||||
|
||||
return metadata
|
||||
except Exception as e:
|
||||
print(f"Error extracting metadata from {file_path}: {str(e)}")
|
||||
return None
|
||||
|
||||
|
||||
def generate_unique_slug(
|
||||
base_name: str, model_class, existing_id=None, max_length=20
|
||||
) -> str:
|
||||
"""Generate a unique slug for a model instance."""
|
||||
# Clean and slugify the base name
|
||||
slug = slugify(clean_title_for_slug(base_name))
|
||||
|
||||
# Truncate if necessary
|
||||
if len(slug) > max_length:
|
||||
slug = slug[:max_length].rsplit("-", 1)[0]
|
||||
|
||||
original_slug = slug
|
||||
counter = 1
|
||||
|
||||
# Check for uniqueness
|
||||
while True:
|
||||
if existing_id:
|
||||
exists = (
|
||||
model_class.objects.filter(slug=slug).exclude(id=existing_id).exists()
|
||||
)
|
||||
else:
|
||||
exists = model_class.objects.filter(slug=slug).exists()
|
||||
|
||||
if not exists:
|
||||
break
|
||||
|
||||
# Generate new slug
|
||||
if counter == 1:
|
||||
if len(original_slug) > (max_length - 7): # Leave room for _XXXXX
|
||||
base = original_slug[: (max_length - 7)]
|
||||
else:
|
||||
base = original_slug
|
||||
slug = f"{base}_{generate_charset(5)}"
|
||||
else:
|
||||
if len(original_slug) > (max_length - len(str(counter)) - 1):
|
||||
base = original_slug[: (max_length - len(str(counter)) - 1)]
|
||||
else:
|
||||
base = original_slug
|
||||
slug = f"{base}_{counter}"
|
||||
|
||||
counter += 1
|
||||
|
||||
return slug
|
||||
|
||||
|
||||
def save_image_as_png(image_data: bytes, mime_type: str) -> str | None:
|
||||
"""Convert image data to PNG and save temporarily."""
|
||||
try:
|
||||
if not image_data:
|
||||
return None
|
||||
|
||||
img = Image.open(BytesIO(image_data))
|
||||
temp_path = f"/tmp/{generate_charset(10)}.png"
|
||||
img.save(temp_path, "PNG")
|
||||
return temp_path
|
||||
except Exception as e:
|
||||
print(f"Error processing image: {str(e)}")
|
||||
return None
|
||||
|
||||
|
||||
def get_or_create_album(album_name: str, authors: list[Author]) -> Album | None:
|
||||
"""Get or create album with proper locking and uniqueness check."""
|
||||
if not album_name:
|
||||
return None
|
||||
|
||||
with transaction.atomic():
|
||||
# Try to find existing album
|
||||
album = (
|
||||
Album.objects.select_for_update().filter(name__iexact=album_name).first()
|
||||
)
|
||||
|
||||
if album:
|
||||
# Add any missing authors
|
||||
current_author_ids = set(album.authors.values_list("id", flat=True))
|
||||
new_author_ids = {author.id for author in authors}
|
||||
missing_authors = new_author_ids - current_author_ids
|
||||
if missing_authors:
|
||||
album.authors.add(*missing_authors)
|
||||
return album
|
||||
|
||||
try:
|
||||
# Create new album
|
||||
album = Album.objects.create(
|
||||
name=album_name, slug=generate_unique_slug(album_name, Album)
|
||||
)
|
||||
album.authors.set(authors)
|
||||
return album
|
||||
except IntegrityError:
|
||||
# Handle race condition
|
||||
album = (
|
||||
Album.objects.select_for_update()
|
||||
.filter(name__iexact=album_name)
|
||||
.first()
|
||||
)
|
||||
if album:
|
||||
album.authors.add(*authors)
|
||||
return album
|
||||
raise
|
||||
|
||||
|
||||
def check_song_exists(title: str, album: Album | None, authors: list[Author]) -> bool:
|
||||
"""Check if a song already exists with the given title, album and authors."""
|
||||
query = Song.objects.filter(name__iexact=title)
|
||||
|
||||
if album:
|
||||
query = query.filter(album=album)
|
||||
|
||||
if authors:
|
||||
# Ensure exact author match
|
||||
query = query.annotate(author_count=Count("authors")).filter(
|
||||
author_count=len(authors), authors__in=authors
|
||||
)
|
||||
|
||||
return query.exists()
|
||||
|
||||
|
||||
def load_mp3_directory(
|
||||
directory_path: str, user_id: int | None = None
|
||||
) -> tuple[list[str], int]:
|
||||
"""
|
||||
Load all MP3 files from a directory and its subdirectories.
|
||||
Returns tuple of (failed_files, processed_count)
|
||||
"""
|
||||
path = Path(directory_path)
|
||||
failed_files = []
|
||||
processed_count = 0
|
||||
|
||||
# Keep track of created/updated objects for image cropping
|
||||
created_authors = set()
|
||||
created_albums = set()
|
||||
created_songs = set()
|
||||
|
||||
for mp3_path in path.glob("**/*.mp3"):
|
||||
try:
|
||||
metadata = extract_mp3_metadata(str(mp3_path))
|
||||
if not metadata:
|
||||
failed_files.append(str(mp3_path))
|
||||
continue
|
||||
|
||||
with transaction.atomic():
|
||||
# Process authors
|
||||
author_names = process_authors_string(metadata["artists"])
|
||||
authors = []
|
||||
for author_name in author_names:
|
||||
author = Author.objects.filter(name__iexact=author_name).first()
|
||||
if not author:
|
||||
author = Author.objects.create(
|
||||
name=author_name,
|
||||
slug=generate_unique_slug(author_name, Author),
|
||||
)
|
||||
created_authors.add(author.id)
|
||||
authors.append(author)
|
||||
|
||||
# Process album
|
||||
album = None
|
||||
if metadata["album"]:
|
||||
try:
|
||||
album = get_or_create_album(metadata["album"], authors)
|
||||
if album.id not in created_albums and not album.image_cropped:
|
||||
created_albums.add(album.id)
|
||||
except IntegrityError as e:
|
||||
print(f"Error creating album for {mp3_path}: {str(e)}")
|
||||
failed_files.append(str(mp3_path))
|
||||
continue
|
||||
|
||||
# Check for existing song
|
||||
if check_song_exists(metadata["title"], album, authors):
|
||||
print(
|
||||
f"Skipping existing song: {metadata['artists']} - {metadata['title']}"
|
||||
)
|
||||
continue
|
||||
|
||||
# Process cover image
|
||||
temp_image_path = None
|
||||
if metadata["image_data"]:
|
||||
temp_image_path = save_image_as_png(
|
||||
metadata["image_data"], metadata["image_mime"]
|
||||
)
|
||||
if album and not album.image and temp_image_path:
|
||||
with open(temp_image_path, "rb") as img_file:
|
||||
album.image.save(
|
||||
f"{album.slug}.png", File(img_file), save=True
|
||||
)
|
||||
|
||||
# Create song with proper slug from file name
|
||||
file_name = os.path.splitext(os.path.basename(str(mp3_path)))[0]
|
||||
song = Song(
|
||||
name=metadata["title"],
|
||||
length=metadata["length"],
|
||||
album=album,
|
||||
slug=generate_unique_slug(file_name, Song),
|
||||
creator=User.objects.get(id=user_id) if user_id else None,
|
||||
meta={
|
||||
"genre": metadata["genre"],
|
||||
"release_year": metadata["release_year"],
|
||||
},
|
||||
)
|
||||
|
||||
# Save files
|
||||
with open(mp3_path, "rb") as mp3_file:
|
||||
song.file.save(f"{song.slug}.mp3", File(mp3_file), save=True)
|
||||
|
||||
if temp_image_path:
|
||||
with open(temp_image_path, "rb") as img_file:
|
||||
song.image.save(f"{song.slug}.png", File(img_file), save=True)
|
||||
os.remove(temp_image_path)
|
||||
|
||||
# Set authors
|
||||
song.authors.set(authors)
|
||||
created_songs.add(song.id)
|
||||
processed_count += 1
|
||||
|
||||
print(f"Successfully processed: {song.artists_names} - {song.name}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing {mp3_path}: {str(e)}")
|
||||
failed_files.append(str(mp3_path))
|
||||
|
||||
# Trigger image cropping for all created/updated objects
|
||||
print("Processing image cropping...")
|
||||
|
||||
for author_id in created_authors:
|
||||
author = Author.objects.get(id=author_id)
|
||||
if author.image and not author.image_cropped:
|
||||
print(f"Cropping image for author: {author.name}")
|
||||
crop_model_image(author_id, "music", "author")
|
||||
|
||||
for album_id in created_albums:
|
||||
album = Album.objects.get(id=album_id)
|
||||
if album.image and not album.image_cropped:
|
||||
print(f"Cropping image for album: {album.name}")
|
||||
crop_model_image(album_id, "music", "album")
|
||||
|
||||
for song_id in created_songs:
|
||||
song = Song.objects.get(id=song_id)
|
||||
if song.image and not song.image_cropped:
|
||||
print(f"Cropping image for song: {song.name}")
|
||||
crop_model_image(song_id, "music", "song")
|
||||
|
||||
return failed_files, processed_count
|
||||
|
|
|
@ -1,9 +1,16 @@
|
|||
import os
|
||||
import re
|
||||
from random import randint
|
||||
from typing import Any
|
||||
|
||||
import requests
|
||||
import spotipy
|
||||
|
||||
from akarpov.music.services.external import (
|
||||
ExternalServiceClient,
|
||||
external_service_fallback,
|
||||
)
|
||||
|
||||
try:
|
||||
from deep_translator import GoogleTranslator
|
||||
except requests.exceptions.JSONDecodeError:
|
||||
|
@ -22,35 +29,74 @@
|
|||
from akarpov.utils.text import is_similar_artist, normalize_text
|
||||
|
||||
|
||||
def clean_name(name: str) -> str:
|
||||
# Replace special characters with underscores
|
||||
cleaned = name.strip().replace(" ", "_")
|
||||
cleaned = re.sub(r"[^\w\-]", "_", cleaned)
|
||||
# Remove consecutive underscores
|
||||
cleaned = re.sub(r"_+", "_", cleaned)
|
||||
# Remove trailing underscores
|
||||
cleaned = cleaned.strip("_")
|
||||
return cleaned
|
||||
|
||||
|
||||
def split_authors(authors_str: str) -> list[str]:
|
||||
# Split on common separators
|
||||
if not authors_str:
|
||||
return []
|
||||
|
||||
# First split by obvious delimiters
|
||||
authors = []
|
||||
for part in re.split(r"[,/&]", authors_str):
|
||||
# Clean up each part
|
||||
cleaned = part.strip()
|
||||
if " feat." in cleaned.lower():
|
||||
# Split on featuring
|
||||
main_artist, feat_artist = cleaned.lower().split(" feat.", 1)
|
||||
authors.extend([main_artist.strip(), feat_artist.strip()])
|
||||
elif " ft." in cleaned.lower():
|
||||
main_artist, feat_artist = cleaned.lower().split(" ft.", 1)
|
||||
authors.extend([main_artist.strip(), feat_artist.strip()])
|
||||
elif " x " in cleaned:
|
||||
# Split artist collaborations
|
||||
authors.extend(p.strip() for p in cleaned.split(" x "))
|
||||
elif cleaned:
|
||||
authors.append(cleaned)
|
||||
|
||||
# Remove duplicates while preserving order
|
||||
seen = set()
|
||||
return [x for x in authors if not (x in seen or seen.add(x))]
|
||||
|
||||
|
||||
def generate_readable_slug(name: str, model: Model) -> str:
|
||||
# Translate and slugify the name
|
||||
# Clean and translate name
|
||||
slug = safe_translate(name)
|
||||
|
||||
# Truncate slug if it's too long
|
||||
if len(slug) > 20:
|
||||
slug = slug[:20]
|
||||
last_dash = slug.rfind("-")
|
||||
if last_dash != -1:
|
||||
slug = slug[:last_dash]
|
||||
# Remove any remaining spaces and ensure proper formatting
|
||||
slug = clean_name(slug)
|
||||
|
||||
original_slug = slug
|
||||
# Truncate if necessary
|
||||
if len(slug) > 20:
|
||||
# Try to cut at word boundary
|
||||
truncated = slug[:20].rsplit("_", 1)[0]
|
||||
slug = truncated if truncated else slug[:20]
|
||||
|
||||
original_slug = slug.lower()
|
||||
|
||||
# Ensure uniqueness
|
||||
counter = 1
|
||||
while model.objects.filter(slug=slug).exists():
|
||||
while model.objects.filter(slug__iexact=slug).exists():
|
||||
if len(original_slug) > 14:
|
||||
truncated_slug = original_slug[:14]
|
||||
last_dash = truncated_slug.rfind("-")
|
||||
if last_dash != -1:
|
||||
truncated_slug = truncated_slug[:last_dash]
|
||||
truncated = original_slug[:14].rsplit("_", 1)[0]
|
||||
base_slug = truncated if truncated else original_slug[:14]
|
||||
else:
|
||||
truncated_slug = original_slug
|
||||
base_slug = original_slug
|
||||
|
||||
suffix = f"_{generate_charset(5)}" if counter == 1 else f"_{counter}"
|
||||
slug = f"{truncated_slug}{suffix}"
|
||||
slug = f"{base_slug}{suffix}"
|
||||
counter += 1
|
||||
|
||||
return slug
|
||||
return slug.lower()
|
||||
|
||||
|
||||
def create_spotify_session() -> spotipy.Spotify:
|
||||
|
@ -76,6 +122,19 @@ def spotify_search(name: str, session: spotipy.Spotify, search_type="track"):
|
|||
return res
|
||||
|
||||
|
||||
def clean_spotify_response(data: dict[str, Any]) -> dict[str, Any]:
|
||||
if isinstance(data, dict):
|
||||
return {
|
||||
k: clean_spotify_response(v)
|
||||
for k, v in data.items()
|
||||
if k != "available_markets"
|
||||
}
|
||||
elif isinstance(data, list):
|
||||
return [clean_spotify_response(item) for item in data]
|
||||
return data
|
||||
|
||||
|
||||
@external_service_fallback
|
||||
def get_spotify_info(name: str, session: spotipy.Spotify) -> dict:
|
||||
info = {
|
||||
"album_name": "",
|
||||
|
@ -84,7 +143,11 @@ def get_spotify_info(name: str, session: spotipy.Spotify) -> dict:
|
|||
"artists": [],
|
||||
"artist": "",
|
||||
"title": "",
|
||||
"genre": "",
|
||||
"genre": [],
|
||||
"meta": {},
|
||||
"album_meta": {},
|
||||
"external_urls": {},
|
||||
"full_data": {},
|
||||
}
|
||||
|
||||
try:
|
||||
|
@ -93,32 +156,47 @@ def get_spotify_info(name: str, session: spotipy.Spotify) -> dict:
|
|||
return info
|
||||
|
||||
track = results[0]
|
||||
artist_data = session.artist(track["artists"][0]["external_urls"]["spotify"])
|
||||
album_data = session.album(track["album"]["id"])
|
||||
|
||||
info.update(
|
||||
{
|
||||
"album_name": track["album"]["name"],
|
||||
"album_image": track["album"]["images"][0]["url"]
|
||||
if track["album"]["images"]
|
||||
else "",
|
||||
"release": track["album"]["release_date"].split("-")[0],
|
||||
"album_image": track["album"]["images"][0]["url"],
|
||||
"artists": [artist["name"] for artist in track["artists"]],
|
||||
"artist": track["artists"][0]["name"],
|
||||
"title": track["name"],
|
||||
# Extract additional data as needed
|
||||
"genre": artist_data.get("genres", []),
|
||||
"meta": {
|
||||
"duration_ms": track.get("duration_ms"),
|
||||
"explicit": track.get("explicit"),
|
||||
"popularity": track.get("popularity"),
|
||||
"preview_url": track.get("preview_url"),
|
||||
"track_number": track.get("track_number"),
|
||||
"type": track.get("type"),
|
||||
},
|
||||
"album_meta": clean_spotify_response(album_data),
|
||||
"external_urls": track.get("external_urls", {}),
|
||||
"full_data": clean_spotify_response(track),
|
||||
}
|
||||
)
|
||||
|
||||
artist_data = session.artist(track["artists"][0]["external_urls"]["spotify"])
|
||||
info["genre"] = artist_data.get("genres", [])
|
||||
if track["album"]["images"]:
|
||||
album_image_url = track["album"]["images"][0]["url"]
|
||||
image_response = requests.get(album_image_url)
|
||||
if image_response.status_code == 200:
|
||||
image_path = os.path.join(
|
||||
settings.MEDIA_ROOT, f"tmp_{randint(10000, 99999)}.png"
|
||||
)
|
||||
with open(image_path, "wb") as f:
|
||||
f.write(image_response.content)
|
||||
info["album_image_path"] = image_path
|
||||
|
||||
album_image_url = track["album"]["images"][0]["url"]
|
||||
image_response = requests.get(album_image_url)
|
||||
if image_response.status_code == 200:
|
||||
image_path = os.path.join(
|
||||
settings.MEDIA_ROOT, f"tmp_{randint(10000, 99999)}.png"
|
||||
)
|
||||
with open(image_path, "wb") as f:
|
||||
f.write(image_response.content)
|
||||
info["album_image_path"] = image_path
|
||||
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
print("Failed to get Spotify info", error=str(e))
|
||||
return info
|
||||
|
||||
return info
|
||||
|
@ -172,20 +250,101 @@ def search_yandex(name: str):
|
|||
return info
|
||||
|
||||
|
||||
def get_spotify_album_info(album_name: str, session: spotipy.Spotify):
|
||||
search_result = session.search(q="album:" + album_name, type="album")
|
||||
albums = search_result.get("albums", {}).get("items", [])
|
||||
if albums:
|
||||
return albums[0]
|
||||
return None
|
||||
@external_service_fallback
|
||||
def get_spotify_album_info(album_name: str, session: spotipy.Spotify) -> dict:
|
||||
info = {
|
||||
"name": "",
|
||||
"link": "",
|
||||
"meta": {},
|
||||
"image_url": "",
|
||||
"release_date": "",
|
||||
"total_tracks": 0,
|
||||
"images": [],
|
||||
"external_urls": {},
|
||||
"artists": [],
|
||||
"genres": [],
|
||||
"tracks": [],
|
||||
"full_data": {},
|
||||
}
|
||||
|
||||
try:
|
||||
search_result = session.search(q="album:" + album_name, type="album")
|
||||
albums = search_result.get("albums", {}).get("items", [])
|
||||
if not albums:
|
||||
return info
|
||||
|
||||
album = albums[0]
|
||||
album_id = album["id"]
|
||||
full_album = session.album(album_id)
|
||||
tracks = session.album_tracks(album_id)
|
||||
|
||||
return {
|
||||
"name": album.get("name", ""),
|
||||
"link": album.get("external_urls", {}).get("spotify", ""),
|
||||
"meta": {
|
||||
"album_type": album.get("album_type", ""),
|
||||
"release_date_precision": album.get("release_date_precision", ""),
|
||||
"total_tracks": album.get("total_tracks", 0),
|
||||
"type": album.get("type", ""),
|
||||
},
|
||||
"image_url": next(
|
||||
(img["url"] for img in album.get("images", []) if img.get("url")), ""
|
||||
),
|
||||
"release_date": album.get("release_date", ""),
|
||||
"total_tracks": album.get("total_tracks", 0),
|
||||
"images": album.get("images", []),
|
||||
"external_urls": album.get("external_urls", {}),
|
||||
"artists": clean_spotify_response(album.get("artists", [])),
|
||||
"genres": clean_spotify_response(full_album.get("genres", [])),
|
||||
"tracks": clean_spotify_response(tracks.get("items", [])),
|
||||
"full_data": clean_spotify_response(full_album),
|
||||
}
|
||||
except Exception as e:
|
||||
print("Failed to get album info", error=str(e))
|
||||
return info
|
||||
|
||||
|
||||
def get_spotify_artist_info(artist_name: str, session: spotipy.Spotify):
|
||||
search_result = session.search(q="artist:" + artist_name, type="artist")
|
||||
artists = search_result.get("artists", {}).get("items", [])
|
||||
if artists:
|
||||
return artists[0]
|
||||
return None
|
||||
@external_service_fallback
|
||||
def get_spotify_artist_info(artist_name: str, session: spotipy.Spotify) -> dict:
|
||||
info = {
|
||||
"name": "",
|
||||
"link": "",
|
||||
"meta": {},
|
||||
"image_url": "",
|
||||
"genres": [],
|
||||
"popularity": 0,
|
||||
"images": [],
|
||||
"external_urls": {},
|
||||
"full_data": {},
|
||||
}
|
||||
|
||||
try:
|
||||
search_result = session.search(q="artist:" + artist_name, type="artist")
|
||||
artists = search_result.get("artists", {}).get("items", [])
|
||||
if not artists:
|
||||
return info
|
||||
|
||||
artist = artists[0]
|
||||
return {
|
||||
"name": artist.get("name", ""),
|
||||
"link": artist.get("external_urls", {}).get("spotify", ""),
|
||||
"meta": {
|
||||
"followers": artist.get("followers", {}).get("total", 0),
|
||||
"popularity": artist.get("popularity", 0),
|
||||
"type": artist.get("type", ""),
|
||||
},
|
||||
"image_url": next(
|
||||
(img["url"] for img in artist.get("images", []) if img.get("url")), ""
|
||||
),
|
||||
"genres": artist.get("genres", []),
|
||||
"popularity": artist.get("popularity", 0),
|
||||
"images": artist.get("images", []),
|
||||
"external_urls": artist.get("external_urls", {}),
|
||||
"full_data": clean_spotify_response(artist),
|
||||
}
|
||||
except Exception as e:
|
||||
print("Failed to get artist info", error=str(e))
|
||||
return info
|
||||
|
||||
|
||||
def get_yandex_album_info(album_name: str, client: Client):
|
||||
|
@ -379,29 +538,46 @@ def save_author_image(author, image_path):
|
|||
print(f"Error saving author image: {str(e)}")
|
||||
|
||||
|
||||
def safe_translate(text):
|
||||
@external_service_fallback
|
||||
def safe_translate(text: str) -> str:
|
||||
try:
|
||||
text = clean_name(text) # Clean before translation
|
||||
translated = GoogleTranslator(source="auto", target="en").translate(text)
|
||||
return slugify(translated)
|
||||
# Clean after translation and ensure proper slugification
|
||||
return slugify(clean_name(translated)).replace(" ", "_").lower()
|
||||
except Exception as e:
|
||||
print(f"Error translating text: {str(e)}")
|
||||
return slugify(text)
|
||||
print(f"Translation failed: {str(e)}")
|
||||
# Fallback to direct slugification
|
||||
return slugify(clean_name(text)).replace(" ", "_").lower()
|
||||
|
||||
|
||||
def search_all_platforms(track_name: str) -> dict:
|
||||
print(track_name)
|
||||
# session = spotipy.Spotify(
|
||||
# auth_manager=spotipy.SpotifyClientCredentials(
|
||||
# client_id=settings.MUSIC_SPOTIFY_ID,
|
||||
# client_secret=settings.MUSIC_SPOTIFY_SECRET,
|
||||
# )
|
||||
# )
|
||||
# spotify_info = get_spotify_info(track_name, session)
|
||||
spotify_info = {} # TODO: add proxy for info retrieve
|
||||
|
||||
if settings.MUSIC_EXTERNAL_SERVICE_URL:
|
||||
# Use external service if configured
|
||||
client = ExternalServiceClient()
|
||||
spotify_info = client.get_spotify_info(track_name) or {}
|
||||
else:
|
||||
# Local implementation fallback
|
||||
try:
|
||||
session = spotipy.Spotify(
|
||||
auth_manager=SpotifyClientCredentials(
|
||||
client_id=settings.MUSIC_SPOTIFY_ID,
|
||||
client_secret=settings.MUSIC_SPOTIFY_SECRET,
|
||||
)
|
||||
)
|
||||
spotify_info = get_spotify_info(track_name, session)
|
||||
except Exception as e:
|
||||
print("Local Spotify implementation failed", error=str(e))
|
||||
spotify_info = {}
|
||||
|
||||
yandex_info = search_yandex(track_name)
|
||||
|
||||
if "album_image_path" in spotify_info and "album_image_path" in yandex_info:
|
||||
os.remove(yandex_info["album_image_path"])
|
||||
|
||||
# Combine artist information
|
||||
combined_artists = set()
|
||||
for artist in spotify_info.get("artists", []) + yandex_info.get("artists", []):
|
||||
normalized_artist = normalize_text(artist)
|
||||
|
@ -410,12 +586,13 @@ def search_all_platforms(track_name: str) -> dict:
|
|||
for existing_artist in combined_artists
|
||||
):
|
||||
combined_artists.add(normalized_artist)
|
||||
genre = spotify_info.get("genre") or yandex_info.get("genre")
|
||||
if type(genre) is list:
|
||||
genre = sorted(genre, key=lambda x: len(x))
|
||||
genre = genre[0]
|
||||
|
||||
track_info = {
|
||||
# Process genre information
|
||||
genre = spotify_info.get("genre") or yandex_info.get("genre")
|
||||
if isinstance(genre, list) and genre:
|
||||
genre = sorted(genre, key=len)[0]
|
||||
|
||||
return {
|
||||
"album_name": spotify_info.get("album_name")
|
||||
or yandex_info.get("album_name", ""),
|
||||
"release": spotify_info.get("release") or yandex_info.get("release", ""),
|
||||
|
@ -425,5 +602,3 @@ def search_all_platforms(track_name: str) -> dict:
|
|||
"album_image": spotify_info.get("album_image_path")
|
||||
or yandex_info.get("album_image_path", None),
|
||||
}
|
||||
|
||||
return track_info
|
||||
|
|
|
@ -8,66 +8,85 @@
|
|||
|
||||
|
||||
def search_song(query):
|
||||
if not query:
|
||||
return Song.objects.none()
|
||||
|
||||
search = SongDocument.search()
|
||||
|
||||
should_queries = [
|
||||
ES_Q("match_phrase", name={"query": query, "boost": 5}),
|
||||
# Priorities:
|
||||
# 1. Exact phrase matches in name, author name, album name
|
||||
# 2. Part of author/album name
|
||||
# 3. Exact name (exact matches)
|
||||
# 4. Fuzzy matches
|
||||
# 5. Wildcards
|
||||
|
||||
# phrase matches (highest priority)
|
||||
phrase_queries = [
|
||||
ES_Q("match_phrase", name={"query": query, "boost": 10}),
|
||||
ES_Q(
|
||||
"nested",
|
||||
path="authors",
|
||||
query=ES_Q("match_phrase", name={"query": query, "boost": 4}),
|
||||
query=ES_Q("match_phrase", authors__name={"query": query, "boost": 9}),
|
||||
),
|
||||
ES_Q(
|
||||
"nested",
|
||||
path="album",
|
||||
query=ES_Q("match_phrase", name={"query": query, "boost": 4}),
|
||||
),
|
||||
ES_Q("match", name={"query": query, "fuzziness": "AUTO", "boost": 3}),
|
||||
ES_Q(
|
||||
"nested",
|
||||
path="authors",
|
||||
query=ES_Q("match", name={"query": query, "fuzziness": "AUTO", "boost": 2}),
|
||||
),
|
||||
ES_Q(
|
||||
"nested",
|
||||
path="album",
|
||||
query=ES_Q("match", name={"query": query, "fuzziness": "AUTO", "boost": 2}),
|
||||
),
|
||||
ES_Q("wildcard", name={"value": f"*{query.lower()}*", "boost": 1}),
|
||||
ES_Q(
|
||||
"nested",
|
||||
path="authors",
|
||||
query=ES_Q("wildcard", name={"value": f"*{query.lower()}*", "boost": 0.8}),
|
||||
),
|
||||
ES_Q(
|
||||
"nested",
|
||||
path="album",
|
||||
query=ES_Q("wildcard", name={"value": f"*{query.lower()}*", "boost": 0.8}),
|
||||
),
|
||||
ES_Q(
|
||||
"match",
|
||||
name_transliterated={"query": query, "fuzziness": "AUTO", "boost": 1},
|
||||
query=ES_Q("match_phrase", album__name={"query": query, "boost": 9}),
|
||||
),
|
||||
]
|
||||
|
||||
# exact keyword matches (non-case sensitive due to normalizers)
|
||||
exact_queries = [
|
||||
ES_Q("term", **{"name.exact": {"value": query.lower(), "boost": 8}})
|
||||
]
|
||||
|
||||
# fuzzy matches
|
||||
fuzzy_queries = [
|
||||
ES_Q("match", name={"query": query, "fuzziness": "AUTO", "boost": 5}),
|
||||
ES_Q(
|
||||
"nested",
|
||||
path="authors",
|
||||
query=ES_Q(
|
||||
"match",
|
||||
name_transliterated={"query": query, "fuzziness": "AUTO", "boost": 0.8},
|
||||
"match", authors__name={"query": query, "fuzziness": "AUTO", "boost": 4}
|
||||
),
|
||||
),
|
||||
ES_Q(
|
||||
"nested",
|
||||
path="album",
|
||||
query=ES_Q(
|
||||
"match",
|
||||
name_transliterated={"query": query, "fuzziness": "AUTO", "boost": 0.8},
|
||||
"match", album__name={"query": query, "fuzziness": "AUTO", "boost": 4}
|
||||
),
|
||||
),
|
||||
]
|
||||
|
||||
search_query = ES_Q("bool", should=should_queries, minimum_should_match=1)
|
||||
# wildcard matches
|
||||
wildcard_queries = [
|
||||
ES_Q("wildcard", name={"value": f"*{query.lower()}*", "boost": 2}),
|
||||
ES_Q(
|
||||
"nested",
|
||||
path="authors",
|
||||
query=ES_Q(
|
||||
"wildcard", authors__name={"value": f"*{query.lower()}*", "boost": 2}
|
||||
),
|
||||
),
|
||||
ES_Q(
|
||||
"nested",
|
||||
path="album",
|
||||
query=ES_Q(
|
||||
"wildcard", album__name={"value": f"*{query.lower()}*", "boost": 2}
|
||||
),
|
||||
),
|
||||
]
|
||||
|
||||
# Combine queries
|
||||
# We'll use a should query to incorporate all of these, relying on boosting
|
||||
search_query = ES_Q(
|
||||
"bool",
|
||||
should=phrase_queries + exact_queries + fuzzy_queries + wildcard_queries,
|
||||
minimum_should_match=1,
|
||||
)
|
||||
|
||||
# Execute search with size limit
|
||||
search = search.query(search_query).extra(size=20)
|
||||
response = search.execute()
|
||||
|
||||
|
@ -103,8 +122,9 @@ def bulk_update_index(model_class):
|
|||
|
||||
|
||||
def search_author(query):
|
||||
if not query:
|
||||
return Author.objects.none()
|
||||
search = AuthorDocument.search()
|
||||
|
||||
should_queries = [
|
||||
ES_Q("match_phrase", name={"query": query, "boost": 5}),
|
||||
ES_Q("match", name={"query": query, "fuzziness": "AUTO", "boost": 3}),
|
||||
|
@ -114,7 +134,6 @@ def search_author(query):
|
|||
name_transliterated={"query": query, "fuzziness": "AUTO", "boost": 1},
|
||||
),
|
||||
]
|
||||
|
||||
search_query = ES_Q("bool", should=should_queries, minimum_should_match=1)
|
||||
search = search.query(search_query).extra(size=10)
|
||||
response = search.execute()
|
||||
|
@ -125,11 +144,12 @@ def search_author(query):
|
|||
Case(*[When(pk=pk, then=pos) for pos, pk in enumerate(hit_ids)])
|
||||
)
|
||||
return authors
|
||||
|
||||
return Author.objects.none()
|
||||
|
||||
|
||||
def search_album(query):
|
||||
if not query:
|
||||
return Album.objects.none()
|
||||
search = AlbumDocument.search()
|
||||
|
||||
should_queries = [
|
||||
|
@ -141,7 +161,6 @@ def search_album(query):
|
|||
name_transliterated={"query": query, "fuzziness": "AUTO", "boost": 1},
|
||||
),
|
||||
]
|
||||
|
||||
search_query = ES_Q("bool", should=should_queries, minimum_should_match=1)
|
||||
search = search.query(search_query).extra(size=10)
|
||||
response = search.execute()
|
||||
|
@ -152,5 +171,4 @@ def search_album(query):
|
|||
Case(*[When(pk=pk, then=pos) for pos, pk in enumerate(hit_ids)])
|
||||
)
|
||||
return albums
|
||||
|
||||
return Album.objects.none()
|
||||
|
|
|
@ -715,6 +715,9 @@
|
|||
LAST_FM_API_KEY = env("LAST_FM_API_KET", default="")
|
||||
LAST_FM_SECRET = env("LAST_FM_SECRET", default="")
|
||||
|
||||
# EXTERNAL
|
||||
MUSIC_EXTERNAL_SERVICE_URL = env("MUSIC_EXTERNAL_SERVICE_URL", default="")
|
||||
|
||||
# ROBOTS
|
||||
# ------------------------------------------------------------------------------
|
||||
ROBOTS_USE_SITEMAP = True
|
||||
|
|
Loading…
Reference in New Issue
Block a user