mirror of
https://github.com/Alexander-D-Karpov/akarpov
synced 2025-07-28 22:39:46 +03:00
Compare commits
17 Commits
909e0deb38
...
868f5bf35d
Author | SHA1 | Date | |
---|---|---|---|
|
868f5bf35d | ||
03c05d7adf | |||
0b439f43a5 | |||
6c15494aab | |||
bf182dbd0a | |||
f268212094 | |||
f4ca7db696 | |||
600c903a68 | |||
c3de9b45b8 | |||
66bd074149 | |||
b72ebe6e8c | |||
85e8e3fe8b | |||
2a7f1eae88 | |||
398820489f | |||
a2607cd0d0 | |||
1b9b1d2bb5 | |||
|
9d974f9954 |
|
@ -88,8 +88,12 @@ def get_liked(self, obj):
|
||||||
|
|
||||||
@extend_schema_field(ListAlbumSerializer)
|
@extend_schema_field(ListAlbumSerializer)
|
||||||
def get_album(self, obj):
|
def get_album(self, obj):
|
||||||
if obj.album:
|
if obj.album_id:
|
||||||
return ListAlbumSerializer(Album.objects.cache().get(id=obj.album_id)).data
|
try:
|
||||||
|
album = Album.objects.cache().get(id=obj.album_id)
|
||||||
|
return ListAlbumSerializer(album).data
|
||||||
|
except Album.DoesNotExist:
|
||||||
|
return None
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@extend_schema_field(ListAuthorSerializer(many=True))
|
@extend_schema_field(ListAuthorSerializer(many=True))
|
||||||
|
@ -105,16 +109,17 @@ def get_image(self, obj):
|
||||||
img = None
|
img = None
|
||||||
if obj.image_cropped:
|
if obj.image_cropped:
|
||||||
img = obj.image_cropped
|
img = obj.image_cropped
|
||||||
else:
|
elif obj.album_id:
|
||||||
album = Album.objects.cache().get(id=obj.album_id)
|
try:
|
||||||
if album.image_cropped:
|
album = Album.objects.cache().get(id=obj.album_id)
|
||||||
img = album.image_cropped
|
if album.image_cropped:
|
||||||
else:
|
img = album.image_cropped
|
||||||
authors = Author.objects.cache().filter(
|
except Album.DoesNotExist:
|
||||||
Q(songs__id=obj.id) & ~Q(image="")
|
pass
|
||||||
)
|
if not img:
|
||||||
if authors:
|
authors = Author.objects.cache().filter(Q(songs__id=obj.id) & ~Q(image=""))
|
||||||
img = authors.first().image_cropped
|
if authors.exists():
|
||||||
|
img = authors.first().image_cropped
|
||||||
if img:
|
if img:
|
||||||
return self.context["request"].build_absolute_uri(img.url)
|
return self.context["request"].build_absolute_uri(img.url)
|
||||||
return None
|
return None
|
||||||
|
|
|
@ -11,7 +11,7 @@ class SongDocument(Document):
|
||||||
properties={
|
properties={
|
||||||
"name": fields.TextField(
|
"name": fields.TextField(
|
||||||
fields={
|
fields={
|
||||||
"raw": fields.KeywordField(normalizer="lowercase"),
|
"raw": fields.KeywordField(normalizer="lowercase_normalizer"),
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
"name_transliterated": fields.TextField(
|
"name_transliterated": fields.TextField(
|
||||||
|
@ -21,7 +21,13 @@ class SongDocument(Document):
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
"link": fields.TextField(),
|
"link": fields.TextField(),
|
||||||
"meta": fields.ObjectField(dynamic=True),
|
"meta": fields.ObjectField(
|
||||||
|
dynamic=True,
|
||||||
|
properties={
|
||||||
|
"genre": fields.TextField(),
|
||||||
|
"release_year": fields.KeywordField(),
|
||||||
|
},
|
||||||
|
),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -30,7 +36,7 @@ class SongDocument(Document):
|
||||||
properties={
|
properties={
|
||||||
"name": fields.TextField(
|
"name": fields.TextField(
|
||||||
fields={
|
fields={
|
||||||
"raw": fields.KeywordField(normalizer="lowercase"),
|
"raw": fields.KeywordField(normalizer="lowercase_normalizer"),
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
"name_transliterated": fields.TextField(
|
"name_transliterated": fields.TextField(
|
||||||
|
@ -40,7 +46,13 @@ class SongDocument(Document):
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
"link": fields.TextField(),
|
"link": fields.TextField(),
|
||||||
"meta": fields.ObjectField(dynamic=True),
|
"meta": fields.ObjectField(
|
||||||
|
dynamic=True,
|
||||||
|
properties={
|
||||||
|
"genre": fields.TextField(),
|
||||||
|
"release_year": fields.KeywordField(),
|
||||||
|
},
|
||||||
|
),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -48,7 +60,7 @@ class SongDocument(Document):
|
||||||
attr="name",
|
attr="name",
|
||||||
fields={
|
fields={
|
||||||
"raw": fields.KeywordField(),
|
"raw": fields.KeywordField(),
|
||||||
"exact": fields.KeywordField(normalizer="lowercase"),
|
"exact": fields.KeywordField(normalizer="lowercase_normalizer"),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
name_transliterated = fields.TextField(
|
name_transliterated = fields.TextField(
|
||||||
|
@ -60,7 +72,13 @@ class SongDocument(Document):
|
||||||
)
|
)
|
||||||
suggest = fields.CompletionField()
|
suggest = fields.CompletionField()
|
||||||
|
|
||||||
meta = fields.ObjectField(dynamic=True)
|
meta = fields.ObjectField(
|
||||||
|
dynamic=True,
|
||||||
|
properties={
|
||||||
|
"genre": fields.TextField(),
|
||||||
|
"release_year": fields.KeywordField(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
class Index:
|
class Index:
|
||||||
name = "songs"
|
name = "songs"
|
||||||
|
@ -68,6 +86,13 @@ class Index:
|
||||||
"number_of_shards": 1,
|
"number_of_shards": 1,
|
||||||
"number_of_replicas": 0,
|
"number_of_replicas": 0,
|
||||||
"analysis": {
|
"analysis": {
|
||||||
|
"normalizer": {
|
||||||
|
"lowercase_normalizer": {
|
||||||
|
"type": "custom",
|
||||||
|
"char_filter": [],
|
||||||
|
"filter": ["lowercase"],
|
||||||
|
}
|
||||||
|
},
|
||||||
"filter": {
|
"filter": {
|
||||||
"my_transliterator": {
|
"my_transliterator": {
|
||||||
"type": "icu_transform",
|
"type": "icu_transform",
|
||||||
|
@ -144,8 +169,8 @@ class Index:
|
||||||
"filter": [
|
"filter": [
|
||||||
"lowercase",
|
"lowercase",
|
||||||
"autocomplete_filter",
|
"autocomplete_filter",
|
||||||
"english_stemmer", # Apply English stemming for autocomplete
|
"english_stemmer",
|
||||||
"russian_stemmer", # Include Russian stemming if applicable
|
"russian_stemmer",
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
"search_synonym_with_stemming": {
|
"search_synonym_with_stemming": {
|
||||||
|
@ -154,8 +179,8 @@ class Index:
|
||||||
"filter": [
|
"filter": [
|
||||||
"lowercase",
|
"lowercase",
|
||||||
"synonym_filter",
|
"synonym_filter",
|
||||||
"english_stemmer", # Apply English stemming for synonym search
|
"english_stemmer",
|
||||||
"russian_stemmer", # Include Russian stemming if processing Russian synonyms
|
"russian_stemmer",
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -176,7 +201,7 @@ class AuthorDocument(Document):
|
||||||
name = fields.TextField(
|
name = fields.TextField(
|
||||||
fields={
|
fields={
|
||||||
"raw": fields.KeywordField(),
|
"raw": fields.KeywordField(),
|
||||||
"exact": fields.KeywordField(normalizer="lowercase"),
|
"exact": fields.KeywordField(normalizer="lowercase_normalizer"),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
name_transliterated = fields.TextField(
|
name_transliterated = fields.TextField(
|
||||||
|
@ -187,11 +212,18 @@ class AuthorDocument(Document):
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
suggest = fields.CompletionField()
|
suggest = fields.CompletionField()
|
||||||
meta = fields.ObjectField(dynamic=True)
|
meta = fields.ObjectField(
|
||||||
|
dynamic=True,
|
||||||
|
properties={
|
||||||
|
"description": fields.TextField(),
|
||||||
|
# Ensure no empty date fields here either
|
||||||
|
"popularity": fields.IntegerField(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
class Index:
|
class Index:
|
||||||
name = "authors"
|
name = "authors"
|
||||||
settings = SongDocument.Index.settings # Reuse settings
|
settings = SongDocument.Index.settings
|
||||||
|
|
||||||
class Django:
|
class Django:
|
||||||
model = Author
|
model = Author
|
||||||
|
@ -202,7 +234,7 @@ class AlbumDocument(Document):
|
||||||
name = fields.TextField(
|
name = fields.TextField(
|
||||||
fields={
|
fields={
|
||||||
"raw": fields.KeywordField(),
|
"raw": fields.KeywordField(),
|
||||||
"exact": fields.KeywordField(normalizer="lowercase"),
|
"exact": fields.KeywordField(normalizer="lowercase_normalizer"),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
name_transliterated = fields.TextField(
|
name_transliterated = fields.TextField(
|
||||||
|
@ -213,7 +245,13 @@ class AlbumDocument(Document):
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
suggest = fields.CompletionField()
|
suggest = fields.CompletionField()
|
||||||
meta = fields.ObjectField(dynamic=True)
|
meta = fields.ObjectField(
|
||||||
|
dynamic=True,
|
||||||
|
properties={
|
||||||
|
"genre": fields.TextField(),
|
||||||
|
"release_year": fields.KeywordField(),
|
||||||
|
},
|
||||||
|
)
|
||||||
authors = fields.NestedField(
|
authors = fields.NestedField(
|
||||||
attr="authors",
|
attr="authors",
|
||||||
properties={
|
properties={
|
||||||
|
@ -223,7 +261,6 @@ class AlbumDocument(Document):
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
"name_transliterated": fields.TextField(
|
"name_transliterated": fields.TextField(
|
||||||
attr="name",
|
|
||||||
analyzer="transliterate",
|
analyzer="transliterate",
|
||||||
fields={
|
fields={
|
||||||
"raw": fields.KeywordField(),
|
"raw": fields.KeywordField(),
|
||||||
|
@ -236,7 +273,7 @@ class AlbumDocument(Document):
|
||||||
|
|
||||||
class Index:
|
class Index:
|
||||||
name = "albums"
|
name = "albums"
|
||||||
settings = SongDocument.Index.settings # Reuse settings
|
settings = SongDocument.Index.settings
|
||||||
|
|
||||||
class Django:
|
class Django:
|
||||||
model = Album
|
model = Album
|
||||||
|
|
|
@ -8,7 +8,7 @@
|
||||||
except requests.exceptions.JSONDecodeError:
|
except requests.exceptions.JSONDecodeError:
|
||||||
print("Failed to initialize GoogleTranslator due to external API issues.")
|
print("Failed to initialize GoogleTranslator due to external API issues.")
|
||||||
from django.core.files import File
|
from django.core.files import File
|
||||||
from django.db import transaction
|
from django.db import IntegrityError, transaction
|
||||||
from django.utils.text import slugify
|
from django.utils.text import slugify
|
||||||
from mutagen import File as MutagenFile
|
from mutagen import File as MutagenFile
|
||||||
from mutagen.id3 import APIC, ID3, TCON, TORY, TextFrame
|
from mutagen.id3 import APIC, ID3, TCON, TORY, TextFrame
|
||||||
|
@ -19,14 +19,64 @@
|
||||||
from akarpov.music.models import Album, Author, Song
|
from akarpov.music.models import Album, Author, Song
|
||||||
from akarpov.music.services.info import generate_readable_slug, search_all_platforms
|
from akarpov.music.services.info import generate_readable_slug, search_all_platforms
|
||||||
from akarpov.users.models import User
|
from akarpov.users.models import User
|
||||||
|
from akarpov.utils.generators import generate_charset # Import generate_charset
|
||||||
|
|
||||||
|
|
||||||
def get_or_create_author(author_name):
|
def get_or_create_author(author_name):
|
||||||
|
"""Get or create author with unique slug."""
|
||||||
with transaction.atomic():
|
with transaction.atomic():
|
||||||
author = Author.objects.filter(name__iexact=author_name).order_by("id").first()
|
author = Author.objects.filter(name__iexact=author_name).order_by("id").first()
|
||||||
if author is None:
|
if author is None:
|
||||||
author = Author.objects.create(name=author_name)
|
for attempt in range(5):
|
||||||
return author
|
try:
|
||||||
|
slug = generate_readable_slug(author_name, Author)
|
||||||
|
author = Author.objects.create(name=author_name, slug=slug)
|
||||||
|
return author
|
||||||
|
except IntegrityError:
|
||||||
|
# Slug conflict, retry slug generation
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
# If we still fail, get the existing author
|
||||||
|
author = (
|
||||||
|
Author.objects.filter(name__iexact=author_name)
|
||||||
|
.order_by("id")
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if author:
|
||||||
|
return author
|
||||||
|
else:
|
||||||
|
raise Exception("Failed to create or get author")
|
||||||
|
else:
|
||||||
|
return author
|
||||||
|
|
||||||
|
|
||||||
|
def get_or_create_album(album_name):
|
||||||
|
"""Get or create album with unique slug."""
|
||||||
|
if not album_name:
|
||||||
|
return None
|
||||||
|
|
||||||
|
with transaction.atomic():
|
||||||
|
album = Album.objects.filter(name__iexact=album_name).order_by("id").first()
|
||||||
|
if album is None:
|
||||||
|
for attempt in range(5):
|
||||||
|
try:
|
||||||
|
slug = generate_readable_slug(album_name, Album)
|
||||||
|
album = Album.objects.create(name=album_name, slug=slug)
|
||||||
|
return album
|
||||||
|
except IntegrityError:
|
||||||
|
# Slug conflict, retry slug generation
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
# If we still fail, get the existing album
|
||||||
|
album = (
|
||||||
|
Album.objects.filter(name__iexact=album_name).order_by("id").first()
|
||||||
|
)
|
||||||
|
if album:
|
||||||
|
return album
|
||||||
|
else:
|
||||||
|
raise Exception("Failed to create or get album")
|
||||||
|
else:
|
||||||
|
return album
|
||||||
|
|
||||||
|
|
||||||
def process_track_name(track_name: str) -> str:
|
def process_track_name(track_name: str) -> str:
|
||||||
|
@ -98,20 +148,18 @@ def load_track(
|
||||||
kwargs["release"] if "release" in kwargs else search_info.get("release", None)
|
kwargs["release"] if "release" in kwargs else search_info.get("release", None)
|
||||||
)
|
)
|
||||||
|
|
||||||
if album and type(album) is str and album.startswith("['"):
|
if album and isinstance(album, str) and album.startswith("['"):
|
||||||
album = album.replace("['", "").replace("']", "")
|
album = album.replace("['", "").replace("']", "")
|
||||||
|
|
||||||
if album:
|
if album:
|
||||||
if type(album) is str:
|
if isinstance(album, str):
|
||||||
album_name = album
|
album_name = album
|
||||||
elif type(album) is list:
|
elif isinstance(album, list):
|
||||||
album_name = album[0]
|
album_name = album[0]
|
||||||
else:
|
else:
|
||||||
album_name = None
|
album_name = None
|
||||||
if album_name:
|
if album_name:
|
||||||
album, created = Album.objects.get_or_create(
|
album = get_or_create_album(album_name)
|
||||||
name__iexact=album_name, defaults={"name": album_name}
|
|
||||||
)
|
|
||||||
|
|
||||||
processed_authors = []
|
processed_authors = []
|
||||||
if authors:
|
if authors:
|
||||||
|
@ -121,7 +169,7 @@ def load_track(
|
||||||
authors = processed_authors
|
authors = processed_authors
|
||||||
|
|
||||||
if sng := Song.objects.filter(
|
if sng := Song.objects.filter(
|
||||||
name=name if name else p_name,
|
name__iexact=name if name else p_name,
|
||||||
authors__id__in=[x.id for x in authors],
|
authors__id__in=[x.id for x in authors],
|
||||||
album=album,
|
album=album,
|
||||||
):
|
):
|
||||||
|
@ -186,15 +234,29 @@ def load_track(
|
||||||
|
|
||||||
new_file_name = generated_name + ".mp3"
|
new_file_name = generated_name + ".mp3"
|
||||||
|
|
||||||
if image_path:
|
# Generate unique slug for the song
|
||||||
with open(path, "rb") as file, open(image_path, "rb") as image:
|
song.slug = generate_readable_slug(name if name else p_name, Song)
|
||||||
song.image = File(image, name=generated_name + ".png")
|
|
||||||
song.file = File(file, name=new_file_name)
|
# Try to save the song, handling potential slug conflicts
|
||||||
song.save()
|
for attempt in range(5):
|
||||||
|
try:
|
||||||
|
if image_path:
|
||||||
|
with open(path, "rb") as file, open(image_path, "rb") as image:
|
||||||
|
song.image = File(image, name=generated_name + ".png")
|
||||||
|
song.file = File(file, name=new_file_name)
|
||||||
|
song.save()
|
||||||
|
else:
|
||||||
|
with open(path, "rb") as file:
|
||||||
|
song.file = File(file, name=new_file_name)
|
||||||
|
song.save()
|
||||||
|
break # Successfully saved the song
|
||||||
|
except IntegrityError:
|
||||||
|
# Slug conflict, generate a new slug using generate_charset
|
||||||
|
song.slug = generate_readable_slug(
|
||||||
|
song.name + "_" + generate_charset(5), Song
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
with open(path, "rb") as file:
|
raise Exception("Failed to save song with unique slug after multiple attempts")
|
||||||
song.file = File(file, name=new_file_name)
|
|
||||||
song.save()
|
|
||||||
|
|
||||||
if not album.image and song.image:
|
if not album.image and song.image:
|
||||||
album.image = song.image
|
album.image = song.image
|
||||||
|
@ -203,7 +265,7 @@ def load_track(
|
||||||
if authors:
|
if authors:
|
||||||
song.authors.set([x.id for x in authors])
|
song.authors.set([x.id for x in authors])
|
||||||
|
|
||||||
# set music meta
|
# Set music metadata
|
||||||
tag = MutagenFile(song.file.path)
|
tag = MutagenFile(song.file.path)
|
||||||
tag["title"] = TextFrame(encoding=3, text=[name])
|
tag["title"] = TextFrame(encoding=3, text=[name])
|
||||||
if album:
|
if album:
|
||||||
|
@ -236,7 +298,4 @@ def load_track(
|
||||||
if os.path.exists(image_path):
|
if os.path.exists(image_path):
|
||||||
os.remove(image_path)
|
os.remove(image_path)
|
||||||
|
|
||||||
song.slug = generate_readable_slug(song.name, Song)
|
|
||||||
song.save()
|
|
||||||
|
|
||||||
return song
|
return song
|
||||||
|
|
144
akarpov/music/services/external.py
Normal file
144
akarpov/music/services/external.py
Normal file
|
@ -0,0 +1,144 @@
|
||||||
|
import os
|
||||||
|
from functools import wraps
|
||||||
|
from random import randint
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import requests
|
||||||
|
import structlog
|
||||||
|
from django.conf import settings
|
||||||
|
|
||||||
|
logger = structlog.get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ExternalServiceClient:
|
||||||
|
def __init__(self):
|
||||||
|
self.base_url = getattr(settings, "MUSIC_EXTERNAL_SERVICE_URL", None)
|
||||||
|
|
||||||
|
def _make_request(
|
||||||
|
self, endpoint: str, params: dict = None, **kwargs
|
||||||
|
) -> dict[str, Any] | None:
|
||||||
|
if not self.base_url:
|
||||||
|
return None
|
||||||
|
|
||||||
|
url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
|
||||||
|
try:
|
||||||
|
response = requests.post(url, params=params, **kwargs)
|
||||||
|
response.raise_for_status()
|
||||||
|
return response.json()
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
logger.error(
|
||||||
|
"External service request failed",
|
||||||
|
error=str(e),
|
||||||
|
endpoint=endpoint,
|
||||||
|
params=params,
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_spotify_info(self, track_name: str) -> dict[str, Any] | None:
|
||||||
|
response = self._make_request("/spotify/search", params={"query": track_name})
|
||||||
|
if not response:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if "album_image" in response:
|
||||||
|
# Download and save image
|
||||||
|
image_path = self._download_image(response["album_image"])
|
||||||
|
if image_path:
|
||||||
|
response["album_image_path"] = image_path
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
def get_spotify_album_info(self, album_name: str) -> dict[str, Any] | None:
|
||||||
|
response = self._make_request("/spotify/album", params={"query": album_name})
|
||||||
|
if not response:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if response.get("images"):
|
||||||
|
image_path = self._download_image(response["images"][0].get("url"))
|
||||||
|
if image_path:
|
||||||
|
response["image_path"] = image_path
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
def get_spotify_artist_info(self, artist_name: str) -> dict[str, Any] | None:
|
||||||
|
response = self._make_request("/spotify/artist", params={"query": artist_name})
|
||||||
|
if not response:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if response.get("images"):
|
||||||
|
image_path = self._download_image(response["images"][0].get("url"))
|
||||||
|
if image_path:
|
||||||
|
response["image_path"] = image_path
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
def _download_image(self, url: str) -> str | None:
|
||||||
|
if not url:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = requests.get(url)
|
||||||
|
if response.status_code == 200:
|
||||||
|
image_path = os.path.join(
|
||||||
|
settings.MEDIA_ROOT, f"tmp_{randint(10000, 99999)}.png"
|
||||||
|
)
|
||||||
|
with open(image_path, "wb") as f:
|
||||||
|
f.write(response.content)
|
||||||
|
return image_path
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to download image", error=str(e), url=url)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def translate_text(
|
||||||
|
self, text: str, source_lang: str = "auto", target_lang: str = "en"
|
||||||
|
) -> str | None:
|
||||||
|
response = self._make_request(
|
||||||
|
"/translation/translate",
|
||||||
|
json={"text": text, "source_lang": source_lang, "target_lang": target_lang},
|
||||||
|
)
|
||||||
|
return response.get("translated_text") if response else None
|
||||||
|
|
||||||
|
|
||||||
|
def external_service_fallback(fallback_func):
|
||||||
|
"""Decorator to try external service first, then fall back to local implementation"""
|
||||||
|
|
||||||
|
@wraps(fallback_func)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
if (
|
||||||
|
not hasattr(settings, "MUSIC_EXTERNAL_SERVICE_URL")
|
||||||
|
or not settings.MUSIC_EXTERNAL_SERVICE_URL
|
||||||
|
):
|
||||||
|
return fallback_func(*args, **kwargs)
|
||||||
|
|
||||||
|
client = ExternalServiceClient()
|
||||||
|
try:
|
||||||
|
if fallback_func.__name__ == "get_spotify_info":
|
||||||
|
result = client.get_spotify_info(args[1]) # args[1] is track_name
|
||||||
|
elif fallback_func.__name__ == "get_spotify_album_info":
|
||||||
|
result = client.get_spotify_album_info(args[0]) # args[0] is album_name
|
||||||
|
elif fallback_func.__name__ == "get_spotify_artist_info":
|
||||||
|
result = client.get_spotify_artist_info(
|
||||||
|
args[0]
|
||||||
|
) # args[0] is artist_name
|
||||||
|
elif fallback_func.__name__ == "safe_translate":
|
||||||
|
result = client.translate_text(args[0]) # args[0] is text
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
"Unknown function for external service fallback",
|
||||||
|
function=fallback_func.__name__,
|
||||||
|
)
|
||||||
|
return fallback_func(*args, **kwargs)
|
||||||
|
|
||||||
|
if result:
|
||||||
|
return result
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
"External service failed, falling back to local implementation",
|
||||||
|
error=str(e),
|
||||||
|
function=fallback_func.__name__,
|
||||||
|
args=args,
|
||||||
|
)
|
||||||
|
|
||||||
|
return fallback_func(*args, **kwargs)
|
||||||
|
|
||||||
|
return wrapper
|
|
@ -1,4 +1,5 @@
|
||||||
import os
|
import os
|
||||||
|
import time
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from random import randint
|
from random import randint
|
||||||
|
@ -6,11 +7,19 @@
|
||||||
import librosa
|
import librosa
|
||||||
import mutagen
|
import mutagen
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
from django.core.files import File
|
||||||
|
from django.db import IntegrityError, transaction
|
||||||
|
from django.db.models import Count
|
||||||
|
from django.utils.text import slugify
|
||||||
from mutagen.id3 import ID3
|
from mutagen.id3 import ID3
|
||||||
|
from mutagen.mp3 import MP3
|
||||||
from PIL import Image, UnidentifiedImageError
|
from PIL import Image, UnidentifiedImageError
|
||||||
|
|
||||||
from akarpov.music.models import Song
|
from akarpov.common.tasks import crop_model_image
|
||||||
|
from akarpov.music.models import Album, Author, Song
|
||||||
from akarpov.music.services.db import load_track
|
from akarpov.music.services.db import load_track
|
||||||
|
from akarpov.users.models import User
|
||||||
|
from akarpov.utils.generators import generate_charset
|
||||||
|
|
||||||
|
|
||||||
def load_dir(path: str, user_id: int):
|
def load_dir(path: str, user_id: int):
|
||||||
|
@ -83,3 +92,493 @@ def set_song_volume(song: Song):
|
||||||
mp3_file = song.file.path
|
mp3_file = song.file.path
|
||||||
song.volume = analyze_music_loudness(mp3_file)
|
song.volume = analyze_music_loudness(mp3_file)
|
||||||
song.save(update_fields=["volume"])
|
song.save(update_fields=["volume"])
|
||||||
|
|
||||||
|
|
||||||
|
BATCH_SIZE = 10
|
||||||
|
BATCH_CHECK_DELAY = 10 # seconds
|
||||||
|
|
||||||
|
|
||||||
|
class FileProcessor:
|
||||||
|
def __init__(self):
|
||||||
|
self.failed_files: list[str] = []
|
||||||
|
self.processed_files: set[str] = set()
|
||||||
|
self.current_batch: dict[str, dict] = {}
|
||||||
|
|
||||||
|
def load_dir(self, path: str, user_id: int) -> tuple[list[str], int]:
|
||||||
|
path = Path(path)
|
||||||
|
files = list(path.glob("**/*.mp3"))
|
||||||
|
total_files = len(files)
|
||||||
|
|
||||||
|
for i in range(0, len(files), BATCH_SIZE):
|
||||||
|
batch = files[i : i + BATCH_SIZE] # noqa
|
||||||
|
self._process_batch(batch, user_id)
|
||||||
|
|
||||||
|
# Wait and verify batch
|
||||||
|
time.sleep(BATCH_CHECK_DELAY)
|
||||||
|
self._verify_batch()
|
||||||
|
|
||||||
|
print(
|
||||||
|
"Batch processed",
|
||||||
|
processed=len(self.processed_files),
|
||||||
|
failed=len(self.failed_files),
|
||||||
|
total=total_files,
|
||||||
|
remaining=total_files
|
||||||
|
- len(self.processed_files)
|
||||||
|
- len(self.failed_files),
|
||||||
|
)
|
||||||
|
|
||||||
|
return self.failed_files, len(self.processed_files)
|
||||||
|
|
||||||
|
def _process_batch(self, files: list[Path], user_id: int):
|
||||||
|
self.current_batch.clear()
|
||||||
|
|
||||||
|
for file_path in files:
|
||||||
|
file_str = str(file_path)
|
||||||
|
if file_str in self.processed_files or file_str in self.failed_files:
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
file_info = self._extract_file_info(file_str)
|
||||||
|
if self._check_exists(file_info):
|
||||||
|
self.processed_files.add(file_str)
|
||||||
|
continue
|
||||||
|
|
||||||
|
self.current_batch[file_str] = file_info
|
||||||
|
self._process_file(file_str, file_info, user_id)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print("File processing failed", file=file_str, error=str(e))
|
||||||
|
self.failed_files.append(file_str)
|
||||||
|
|
||||||
|
def _verify_batch(self):
|
||||||
|
for file_path, info in self.current_batch.items():
|
||||||
|
if not self._verify_file(file_path, info):
|
||||||
|
print("File verification failed", file=file_path)
|
||||||
|
self.failed_files.append(file_path)
|
||||||
|
else:
|
||||||
|
self.processed_files.add(file_path)
|
||||||
|
|
||||||
|
def _extract_file_info(self, path: str) -> dict:
|
||||||
|
tag = mutagen.File(path, easy=True)
|
||||||
|
return {
|
||||||
|
"author": tag.get("artist"),
|
||||||
|
"album": tag.get("album"),
|
||||||
|
"name": tag.get("title", [path.split("/")[-1]])[0],
|
||||||
|
"image": self._extract_image(path),
|
||||||
|
}
|
||||||
|
|
||||||
|
def _extract_image(self, path: str) -> str | None:
|
||||||
|
try:
|
||||||
|
tags = ID3(path)
|
||||||
|
pict = [x for x in tags.getall("APIC") if x]
|
||||||
|
if not pict:
|
||||||
|
return None
|
||||||
|
|
||||||
|
pict_data = pict[0].data
|
||||||
|
im = Image.open(BytesIO(pict_data))
|
||||||
|
image_path = f"/tmp/{randint(1, 1000000)}.png"
|
||||||
|
while os.path.exists(image_path):
|
||||||
|
image_path = f"/tmp/{randint(1, 1000000)}.png"
|
||||||
|
im.save(image_path)
|
||||||
|
return image_path
|
||||||
|
except (UnidentifiedImageError, Exception) as e:
|
||||||
|
print("Image extraction failed", error=str(e))
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _check_exists(self, info: dict) -> bool:
|
||||||
|
query = Song.objects.filter(name=info["name"])
|
||||||
|
if info["author"]:
|
||||||
|
query = query.filter(authors__name__in=info["author"])
|
||||||
|
if info["album"]:
|
||||||
|
query = query.filter(album__name=info["album"])
|
||||||
|
return query.exists()
|
||||||
|
|
||||||
|
def _verify_file(self, file_path: str, info: dict) -> bool:
|
||||||
|
song = Song.objects.filter(name=info["name"], file__isnull=False).first()
|
||||||
|
|
||||||
|
if not song:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Verify file exists and is readable
|
||||||
|
if not os.path.exists(song.file.path):
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Verify image if it was expected
|
||||||
|
if info["image"] and not song.image:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Verify metadata
|
||||||
|
if info["author"]:
|
||||||
|
if not song.authors.filter(name__in=info["author"]).exists():
|
||||||
|
return False
|
||||||
|
if info["album"]:
|
||||||
|
if not song.album or song.album.name != info["album"]:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _process_file(self, path: str, info: dict, user_id: int):
|
||||||
|
try:
|
||||||
|
song = load_track(
|
||||||
|
path=path,
|
||||||
|
image_path=info["image"],
|
||||||
|
user_id=user_id,
|
||||||
|
authors=info["author"],
|
||||||
|
album=info["album"],
|
||||||
|
name=info["name"],
|
||||||
|
)
|
||||||
|
if info["image"] and os.path.exists(info["image"]):
|
||||||
|
os.remove(info["image"])
|
||||||
|
|
||||||
|
set_song_volume(song)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print("File processing failed", file=path, error=str(e))
|
||||||
|
self.failed_files.append(path)
|
||||||
|
|
||||||
|
|
||||||
|
def clean_title_for_slug(title: str) -> str:
|
||||||
|
"""Clean title for slug generation."""
|
||||||
|
# Remove common suffixes
|
||||||
|
suffixes = [
|
||||||
|
"(Original Mix)",
|
||||||
|
"(Radio Edit)",
|
||||||
|
"(Extended Mix)",
|
||||||
|
"(Official Video)",
|
||||||
|
"(Music Video)",
|
||||||
|
"(Lyric Video)",
|
||||||
|
"(Audio)",
|
||||||
|
"(Official Audio)",
|
||||||
|
"(Visualizer)",
|
||||||
|
"(Official Music Video)",
|
||||||
|
"(Official Lyric Video)",
|
||||||
|
]
|
||||||
|
cleaned = title
|
||||||
|
for suffix in suffixes:
|
||||||
|
cleaned = cleaned.replace(suffix, "")
|
||||||
|
return cleaned.strip()
|
||||||
|
|
||||||
|
|
||||||
|
def process_authors_string(authors_str: str) -> list[str]:
|
||||||
|
"""Split author string into individual author names."""
|
||||||
|
if not authors_str:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# First split by major separators
|
||||||
|
authors = []
|
||||||
|
for part in authors_str.split("/"):
|
||||||
|
for subpart in part.split("&"):
|
||||||
|
# Handle various featuring cases
|
||||||
|
for feat_marker in [
|
||||||
|
" feat.",
|
||||||
|
" ft.",
|
||||||
|
" featuring.",
|
||||||
|
" presents ",
|
||||||
|
" pres. ",
|
||||||
|
]:
|
||||||
|
if feat_marker in subpart.lower():
|
||||||
|
parts = subpart.lower().split(feat_marker, 1)
|
||||||
|
authors.extend(part.strip() for part in parts)
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
# Handle collaboration markers
|
||||||
|
if " x " in subpart:
|
||||||
|
authors.extend(p.strip() for p in subpart.split(" x "))
|
||||||
|
else:
|
||||||
|
authors.append(subpart.strip())
|
||||||
|
|
||||||
|
# Remove duplicates while preserving order
|
||||||
|
seen = set()
|
||||||
|
return [x for x in authors if not (x.lower() in seen or seen.add(x.lower()))]
|
||||||
|
|
||||||
|
|
||||||
|
def extract_mp3_metadata(file_path: str) -> dict | None:
|
||||||
|
"""Extract metadata from MP3 file."""
|
||||||
|
try:
|
||||||
|
audio = MP3(file_path, ID3=ID3)
|
||||||
|
tags = audio.tags if audio.tags else {}
|
||||||
|
|
||||||
|
# Get filename without extension for fallback
|
||||||
|
base_filename = os.path.splitext(os.path.basename(file_path))[0]
|
||||||
|
|
||||||
|
metadata = {
|
||||||
|
"title": None,
|
||||||
|
"album": None,
|
||||||
|
"artists": None,
|
||||||
|
"genre": None,
|
||||||
|
"release_year": None,
|
||||||
|
"length": audio.info.length,
|
||||||
|
"image_data": None,
|
||||||
|
"image_mime": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
if tags:
|
||||||
|
# Extract basic metadata with fallbacks
|
||||||
|
metadata["title"] = str(tags.get("TIT2", "")) or base_filename
|
||||||
|
metadata["album"] = str(tags.get("TALB", ""))
|
||||||
|
metadata["artists"] = str(tags.get("TPE1", "")) or str(tags.get("TPE2", ""))
|
||||||
|
metadata["genre"] = str(tags.get("TCON", ""))
|
||||||
|
metadata["release_year"] = str(tags.get("TDRC", "")) or str(
|
||||||
|
tags.get("TYER", "")
|
||||||
|
)
|
||||||
|
|
||||||
|
# Extract cover art
|
||||||
|
for tag in tags.getall("APIC"):
|
||||||
|
if tag.type == 3: # Front cover
|
||||||
|
metadata["image_data"] = tag.data
|
||||||
|
metadata["image_mime"] = tag.mime
|
||||||
|
break
|
||||||
|
|
||||||
|
# Clean up title if it came from filename
|
||||||
|
if metadata["title"] == base_filename:
|
||||||
|
parts = base_filename.split(" - ", 1)
|
||||||
|
if len(parts) > 1 and not metadata["artists"]:
|
||||||
|
metadata["artists"] = parts[0]
|
||||||
|
metadata["title"] = parts[1]
|
||||||
|
|
||||||
|
return metadata
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error extracting metadata from {file_path}: {str(e)}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def generate_unique_slug(
|
||||||
|
base_name: str, model_class, existing_id=None, max_length=20
|
||||||
|
) -> str:
|
||||||
|
"""Generate a unique slug for a model instance."""
|
||||||
|
# Clean and slugify the base name
|
||||||
|
slug = slugify(clean_title_for_slug(base_name))
|
||||||
|
|
||||||
|
# Truncate if necessary
|
||||||
|
if len(slug) > max_length:
|
||||||
|
slug = slug[:max_length].rsplit("-", 1)[0]
|
||||||
|
|
||||||
|
original_slug = slug
|
||||||
|
counter = 1
|
||||||
|
|
||||||
|
# Check for uniqueness
|
||||||
|
while True:
|
||||||
|
if existing_id:
|
||||||
|
exists = (
|
||||||
|
model_class.objects.filter(slug=slug).exclude(id=existing_id).exists()
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
exists = model_class.objects.filter(slug=slug).exists()
|
||||||
|
|
||||||
|
if not exists:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Generate new slug
|
||||||
|
if counter == 1:
|
||||||
|
if len(original_slug) > (max_length - 7): # Leave room for _XXXXX
|
||||||
|
base = original_slug[: (max_length - 7)]
|
||||||
|
else:
|
||||||
|
base = original_slug
|
||||||
|
slug = f"{base}_{generate_charset(5)}"
|
||||||
|
else:
|
||||||
|
if len(original_slug) > (max_length - len(str(counter)) - 1):
|
||||||
|
base = original_slug[: (max_length - len(str(counter)) - 1)]
|
||||||
|
else:
|
||||||
|
base = original_slug
|
||||||
|
slug = f"{base}_{counter}"
|
||||||
|
|
||||||
|
counter += 1
|
||||||
|
|
||||||
|
return slug
|
||||||
|
|
||||||
|
|
||||||
|
def save_image_as_png(image_data: bytes, mime_type: str) -> str | None:
|
||||||
|
"""Convert image data to PNG and save temporarily."""
|
||||||
|
try:
|
||||||
|
if not image_data:
|
||||||
|
return None
|
||||||
|
|
||||||
|
img = Image.open(BytesIO(image_data))
|
||||||
|
temp_path = f"/tmp/{generate_charset(10)}.png"
|
||||||
|
img.save(temp_path, "PNG")
|
||||||
|
return temp_path
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error processing image: {str(e)}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_or_create_album(album_name: str, authors: list[Author]) -> Album | None:
|
||||||
|
"""Get or create album with proper locking and uniqueness check."""
|
||||||
|
if not album_name:
|
||||||
|
return None
|
||||||
|
|
||||||
|
with transaction.atomic():
|
||||||
|
# Try to find existing album
|
||||||
|
album = (
|
||||||
|
Album.objects.select_for_update().filter(name__iexact=album_name).first()
|
||||||
|
)
|
||||||
|
|
||||||
|
if album:
|
||||||
|
# Add any missing authors
|
||||||
|
current_author_ids = set(album.authors.values_list("id", flat=True))
|
||||||
|
new_author_ids = {author.id for author in authors}
|
||||||
|
missing_authors = new_author_ids - current_author_ids
|
||||||
|
if missing_authors:
|
||||||
|
album.authors.add(*missing_authors)
|
||||||
|
return album
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Create new album
|
||||||
|
album = Album.objects.create(
|
||||||
|
name=album_name, slug=generate_unique_slug(album_name, Album)
|
||||||
|
)
|
||||||
|
album.authors.set(authors)
|
||||||
|
return album
|
||||||
|
except IntegrityError:
|
||||||
|
# Handle race condition
|
||||||
|
album = (
|
||||||
|
Album.objects.select_for_update()
|
||||||
|
.filter(name__iexact=album_name)
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if album:
|
||||||
|
album.authors.add(*authors)
|
||||||
|
return album
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def check_song_exists(title: str, album: Album | None, authors: list[Author]) -> bool:
|
||||||
|
"""Check if a song already exists with the given title, album and authors."""
|
||||||
|
query = Song.objects.filter(name__iexact=title)
|
||||||
|
|
||||||
|
if album:
|
||||||
|
query = query.filter(album=album)
|
||||||
|
|
||||||
|
if authors:
|
||||||
|
# Ensure exact author match
|
||||||
|
query = query.annotate(author_count=Count("authors")).filter(
|
||||||
|
author_count=len(authors), authors__in=authors
|
||||||
|
)
|
||||||
|
|
||||||
|
return query.exists()
|
||||||
|
|
||||||
|
|
||||||
|
def load_mp3_directory(
|
||||||
|
directory_path: str, user_id: int | None = None
|
||||||
|
) -> tuple[list[str], int]:
|
||||||
|
"""
|
||||||
|
Load all MP3 files from a directory and its subdirectories.
|
||||||
|
Returns tuple of (failed_files, processed_count)
|
||||||
|
"""
|
||||||
|
path = Path(directory_path)
|
||||||
|
failed_files = []
|
||||||
|
processed_count = 0
|
||||||
|
|
||||||
|
# Keep track of created/updated objects for image cropping
|
||||||
|
created_authors = set()
|
||||||
|
created_albums = set()
|
||||||
|
created_songs = set()
|
||||||
|
|
||||||
|
for mp3_path in path.glob("**/*.mp3"):
|
||||||
|
try:
|
||||||
|
metadata = extract_mp3_metadata(str(mp3_path))
|
||||||
|
if not metadata:
|
||||||
|
failed_files.append(str(mp3_path))
|
||||||
|
continue
|
||||||
|
|
||||||
|
with transaction.atomic():
|
||||||
|
# Process authors
|
||||||
|
author_names = process_authors_string(metadata["artists"])
|
||||||
|
authors = []
|
||||||
|
for author_name in author_names:
|
||||||
|
author = Author.objects.filter(name__iexact=author_name).first()
|
||||||
|
if not author:
|
||||||
|
author = Author.objects.create(
|
||||||
|
name=author_name,
|
||||||
|
slug=generate_unique_slug(author_name, Author),
|
||||||
|
)
|
||||||
|
created_authors.add(author.id)
|
||||||
|
authors.append(author)
|
||||||
|
|
||||||
|
# Process album
|
||||||
|
album = None
|
||||||
|
if metadata["album"]:
|
||||||
|
try:
|
||||||
|
album = get_or_create_album(metadata["album"], authors)
|
||||||
|
if album.id not in created_albums and not album.image_cropped:
|
||||||
|
created_albums.add(album.id)
|
||||||
|
except IntegrityError as e:
|
||||||
|
print(f"Error creating album for {mp3_path}: {str(e)}")
|
||||||
|
failed_files.append(str(mp3_path))
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check for existing song
|
||||||
|
if check_song_exists(metadata["title"], album, authors):
|
||||||
|
print(
|
||||||
|
f"Skipping existing song: {metadata['artists']} - {metadata['title']}"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Process cover image
|
||||||
|
temp_image_path = None
|
||||||
|
if metadata["image_data"]:
|
||||||
|
temp_image_path = save_image_as_png(
|
||||||
|
metadata["image_data"], metadata["image_mime"]
|
||||||
|
)
|
||||||
|
if album and not album.image and temp_image_path:
|
||||||
|
with open(temp_image_path, "rb") as img_file:
|
||||||
|
album.image.save(
|
||||||
|
f"{album.slug}.png", File(img_file), save=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create song with proper slug from file name
|
||||||
|
file_name = os.path.splitext(os.path.basename(str(mp3_path)))[0]
|
||||||
|
song = Song(
|
||||||
|
name=metadata["title"],
|
||||||
|
length=metadata["length"],
|
||||||
|
album=album,
|
||||||
|
slug=generate_unique_slug(file_name, Song),
|
||||||
|
creator=User.objects.get(id=user_id) if user_id else None,
|
||||||
|
meta={
|
||||||
|
"genre": metadata["genre"],
|
||||||
|
"release_year": metadata["release_year"],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Save files
|
||||||
|
with open(mp3_path, "rb") as mp3_file:
|
||||||
|
song.file.save(f"{song.slug}.mp3", File(mp3_file), save=True)
|
||||||
|
|
||||||
|
if temp_image_path:
|
||||||
|
with open(temp_image_path, "rb") as img_file:
|
||||||
|
song.image.save(f"{song.slug}.png", File(img_file), save=True)
|
||||||
|
os.remove(temp_image_path)
|
||||||
|
|
||||||
|
# Set authors
|
||||||
|
song.authors.set(authors)
|
||||||
|
created_songs.add(song.id)
|
||||||
|
processed_count += 1
|
||||||
|
|
||||||
|
print(f"Successfully processed: {song.artists_names} - {song.name}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error processing {mp3_path}: {str(e)}")
|
||||||
|
failed_files.append(str(mp3_path))
|
||||||
|
|
||||||
|
# Trigger image cropping for all created/updated objects
|
||||||
|
print("Processing image cropping...")
|
||||||
|
|
||||||
|
for author_id in created_authors:
|
||||||
|
author = Author.objects.get(id=author_id)
|
||||||
|
if author.image and not author.image_cropped:
|
||||||
|
print(f"Cropping image for author: {author.name}")
|
||||||
|
crop_model_image(author_id, "music", "author")
|
||||||
|
|
||||||
|
for album_id in created_albums:
|
||||||
|
album = Album.objects.get(id=album_id)
|
||||||
|
if album.image and not album.image_cropped:
|
||||||
|
print(f"Cropping image for album: {album.name}")
|
||||||
|
crop_model_image(album_id, "music", "album")
|
||||||
|
|
||||||
|
for song_id in created_songs:
|
||||||
|
song = Song.objects.get(id=song_id)
|
||||||
|
if song.image and not song.image_cropped:
|
||||||
|
print(f"Cropping image for song: {song.name}")
|
||||||
|
crop_model_image(song_id, "music", "song")
|
||||||
|
|
||||||
|
return failed_files, processed_count
|
||||||
|
|
|
@ -1,9 +1,16 @@
|
||||||
import os
|
import os
|
||||||
|
import re
|
||||||
from random import randint
|
from random import randint
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
import spotipy
|
import spotipy
|
||||||
|
|
||||||
|
from akarpov.music.services.external import (
|
||||||
|
ExternalServiceClient,
|
||||||
|
external_service_fallback,
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from deep_translator import GoogleTranslator
|
from deep_translator import GoogleTranslator
|
||||||
except requests.exceptions.JSONDecodeError:
|
except requests.exceptions.JSONDecodeError:
|
||||||
|
@ -22,35 +29,74 @@
|
||||||
from akarpov.utils.text import is_similar_artist, normalize_text
|
from akarpov.utils.text import is_similar_artist, normalize_text
|
||||||
|
|
||||||
|
|
||||||
|
def clean_name(name: str) -> str:
|
||||||
|
# Replace special characters with underscores
|
||||||
|
cleaned = name.strip().replace(" ", "_")
|
||||||
|
cleaned = re.sub(r"[^\w\-]", "_", cleaned)
|
||||||
|
# Remove consecutive underscores
|
||||||
|
cleaned = re.sub(r"_+", "_", cleaned)
|
||||||
|
# Remove trailing underscores
|
||||||
|
cleaned = cleaned.strip("_")
|
||||||
|
return cleaned
|
||||||
|
|
||||||
|
|
||||||
|
def split_authors(authors_str: str) -> list[str]:
|
||||||
|
# Split on common separators
|
||||||
|
if not authors_str:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# First split by obvious delimiters
|
||||||
|
authors = []
|
||||||
|
for part in re.split(r"[,/&]", authors_str):
|
||||||
|
# Clean up each part
|
||||||
|
cleaned = part.strip()
|
||||||
|
if " feat." in cleaned.lower():
|
||||||
|
# Split on featuring
|
||||||
|
main_artist, feat_artist = cleaned.lower().split(" feat.", 1)
|
||||||
|
authors.extend([main_artist.strip(), feat_artist.strip()])
|
||||||
|
elif " ft." in cleaned.lower():
|
||||||
|
main_artist, feat_artist = cleaned.lower().split(" ft.", 1)
|
||||||
|
authors.extend([main_artist.strip(), feat_artist.strip()])
|
||||||
|
elif " x " in cleaned:
|
||||||
|
# Split artist collaborations
|
||||||
|
authors.extend(p.strip() for p in cleaned.split(" x "))
|
||||||
|
elif cleaned:
|
||||||
|
authors.append(cleaned)
|
||||||
|
|
||||||
|
# Remove duplicates while preserving order
|
||||||
|
seen = set()
|
||||||
|
return [x for x in authors if not (x in seen or seen.add(x))]
|
||||||
|
|
||||||
|
|
||||||
def generate_readable_slug(name: str, model: Model) -> str:
|
def generate_readable_slug(name: str, model: Model) -> str:
|
||||||
# Translate and slugify the name
|
# Clean and translate name
|
||||||
slug = safe_translate(name)
|
slug = safe_translate(name)
|
||||||
|
|
||||||
# Truncate slug if it's too long
|
# Remove any remaining spaces and ensure proper formatting
|
||||||
if len(slug) > 20:
|
slug = clean_name(slug)
|
||||||
slug = slug[:20]
|
|
||||||
last_dash = slug.rfind("-")
|
|
||||||
if last_dash != -1:
|
|
||||||
slug = slug[:last_dash]
|
|
||||||
|
|
||||||
original_slug = slug
|
# Truncate if necessary
|
||||||
|
if len(slug) > 20:
|
||||||
|
# Try to cut at word boundary
|
||||||
|
truncated = slug[:20].rsplit("_", 1)[0]
|
||||||
|
slug = truncated if truncated else slug[:20]
|
||||||
|
|
||||||
|
original_slug = slug.lower()
|
||||||
|
|
||||||
# Ensure uniqueness
|
# Ensure uniqueness
|
||||||
counter = 1
|
counter = 1
|
||||||
while model.objects.filter(slug=slug).exists():
|
while model.objects.filter(slug__iexact=slug).exists():
|
||||||
if len(original_slug) > 14:
|
if len(original_slug) > 14:
|
||||||
truncated_slug = original_slug[:14]
|
truncated = original_slug[:14].rsplit("_", 1)[0]
|
||||||
last_dash = truncated_slug.rfind("-")
|
base_slug = truncated if truncated else original_slug[:14]
|
||||||
if last_dash != -1:
|
|
||||||
truncated_slug = truncated_slug[:last_dash]
|
|
||||||
else:
|
else:
|
||||||
truncated_slug = original_slug
|
base_slug = original_slug
|
||||||
|
|
||||||
suffix = f"_{generate_charset(5)}" if counter == 1 else f"_{counter}"
|
suffix = f"_{generate_charset(5)}" if counter == 1 else f"_{counter}"
|
||||||
slug = f"{truncated_slug}{suffix}"
|
slug = f"{base_slug}{suffix}"
|
||||||
counter += 1
|
counter += 1
|
||||||
|
|
||||||
return slug
|
return slug.lower()
|
||||||
|
|
||||||
|
|
||||||
def create_spotify_session() -> spotipy.Spotify:
|
def create_spotify_session() -> spotipy.Spotify:
|
||||||
|
@ -76,6 +122,19 @@ def spotify_search(name: str, session: spotipy.Spotify, search_type="track"):
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
|
||||||
|
def clean_spotify_response(data: dict[str, Any]) -> dict[str, Any]:
|
||||||
|
if isinstance(data, dict):
|
||||||
|
return {
|
||||||
|
k: clean_spotify_response(v)
|
||||||
|
for k, v in data.items()
|
||||||
|
if k != "available_markets"
|
||||||
|
}
|
||||||
|
elif isinstance(data, list):
|
||||||
|
return [clean_spotify_response(item) for item in data]
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
@external_service_fallback
|
||||||
def get_spotify_info(name: str, session: spotipy.Spotify) -> dict:
|
def get_spotify_info(name: str, session: spotipy.Spotify) -> dict:
|
||||||
info = {
|
info = {
|
||||||
"album_name": "",
|
"album_name": "",
|
||||||
|
@ -84,7 +143,11 @@ def get_spotify_info(name: str, session: spotipy.Spotify) -> dict:
|
||||||
"artists": [],
|
"artists": [],
|
||||||
"artist": "",
|
"artist": "",
|
||||||
"title": "",
|
"title": "",
|
||||||
"genre": "",
|
"genre": [],
|
||||||
|
"meta": {},
|
||||||
|
"album_meta": {},
|
||||||
|
"external_urls": {},
|
||||||
|
"full_data": {},
|
||||||
}
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -93,32 +156,47 @@ def get_spotify_info(name: str, session: spotipy.Spotify) -> dict:
|
||||||
return info
|
return info
|
||||||
|
|
||||||
track = results[0]
|
track = results[0]
|
||||||
|
artist_data = session.artist(track["artists"][0]["external_urls"]["spotify"])
|
||||||
|
album_data = session.album(track["album"]["id"])
|
||||||
|
|
||||||
info.update(
|
info.update(
|
||||||
{
|
{
|
||||||
"album_name": track["album"]["name"],
|
"album_name": track["album"]["name"],
|
||||||
|
"album_image": track["album"]["images"][0]["url"]
|
||||||
|
if track["album"]["images"]
|
||||||
|
else "",
|
||||||
"release": track["album"]["release_date"].split("-")[0],
|
"release": track["album"]["release_date"].split("-")[0],
|
||||||
"album_image": track["album"]["images"][0]["url"],
|
|
||||||
"artists": [artist["name"] for artist in track["artists"]],
|
"artists": [artist["name"] for artist in track["artists"]],
|
||||||
"artist": track["artists"][0]["name"],
|
"artist": track["artists"][0]["name"],
|
||||||
"title": track["name"],
|
"title": track["name"],
|
||||||
# Extract additional data as needed
|
"genre": artist_data.get("genres", []),
|
||||||
|
"meta": {
|
||||||
|
"duration_ms": track.get("duration_ms"),
|
||||||
|
"explicit": track.get("explicit"),
|
||||||
|
"popularity": track.get("popularity"),
|
||||||
|
"preview_url": track.get("preview_url"),
|
||||||
|
"track_number": track.get("track_number"),
|
||||||
|
"type": track.get("type"),
|
||||||
|
},
|
||||||
|
"album_meta": clean_spotify_response(album_data),
|
||||||
|
"external_urls": track.get("external_urls", {}),
|
||||||
|
"full_data": clean_spotify_response(track),
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
artist_data = session.artist(track["artists"][0]["external_urls"]["spotify"])
|
if track["album"]["images"]:
|
||||||
info["genre"] = artist_data.get("genres", [])
|
album_image_url = track["album"]["images"][0]["url"]
|
||||||
|
image_response = requests.get(album_image_url)
|
||||||
|
if image_response.status_code == 200:
|
||||||
|
image_path = os.path.join(
|
||||||
|
settings.MEDIA_ROOT, f"tmp_{randint(10000, 99999)}.png"
|
||||||
|
)
|
||||||
|
with open(image_path, "wb") as f:
|
||||||
|
f.write(image_response.content)
|
||||||
|
info["album_image_path"] = image_path
|
||||||
|
|
||||||
album_image_url = track["album"]["images"][0]["url"]
|
except Exception as e:
|
||||||
image_response = requests.get(album_image_url)
|
print("Failed to get Spotify info", error=str(e))
|
||||||
if image_response.status_code == 200:
|
|
||||||
image_path = os.path.join(
|
|
||||||
settings.MEDIA_ROOT, f"tmp_{randint(10000, 99999)}.png"
|
|
||||||
)
|
|
||||||
with open(image_path, "wb") as f:
|
|
||||||
f.write(image_response.content)
|
|
||||||
info["album_image_path"] = image_path
|
|
||||||
|
|
||||||
except Exception:
|
|
||||||
return info
|
return info
|
||||||
|
|
||||||
return info
|
return info
|
||||||
|
@ -172,20 +250,101 @@ def search_yandex(name: str):
|
||||||
return info
|
return info
|
||||||
|
|
||||||
|
|
||||||
def get_spotify_album_info(album_name: str, session: spotipy.Spotify):
|
@external_service_fallback
|
||||||
search_result = session.search(q="album:" + album_name, type="album")
|
def get_spotify_album_info(album_name: str, session: spotipy.Spotify) -> dict:
|
||||||
albums = search_result.get("albums", {}).get("items", [])
|
info = {
|
||||||
if albums:
|
"name": "",
|
||||||
return albums[0]
|
"link": "",
|
||||||
return None
|
"meta": {},
|
||||||
|
"image_url": "",
|
||||||
|
"release_date": "",
|
||||||
|
"total_tracks": 0,
|
||||||
|
"images": [],
|
||||||
|
"external_urls": {},
|
||||||
|
"artists": [],
|
||||||
|
"genres": [],
|
||||||
|
"tracks": [],
|
||||||
|
"full_data": {},
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
search_result = session.search(q="album:" + album_name, type="album")
|
||||||
|
albums = search_result.get("albums", {}).get("items", [])
|
||||||
|
if not albums:
|
||||||
|
return info
|
||||||
|
|
||||||
|
album = albums[0]
|
||||||
|
album_id = album["id"]
|
||||||
|
full_album = session.album(album_id)
|
||||||
|
tracks = session.album_tracks(album_id)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"name": album.get("name", ""),
|
||||||
|
"link": album.get("external_urls", {}).get("spotify", ""),
|
||||||
|
"meta": {
|
||||||
|
"album_type": album.get("album_type", ""),
|
||||||
|
"release_date_precision": album.get("release_date_precision", ""),
|
||||||
|
"total_tracks": album.get("total_tracks", 0),
|
||||||
|
"type": album.get("type", ""),
|
||||||
|
},
|
||||||
|
"image_url": next(
|
||||||
|
(img["url"] for img in album.get("images", []) if img.get("url")), ""
|
||||||
|
),
|
||||||
|
"release_date": album.get("release_date", ""),
|
||||||
|
"total_tracks": album.get("total_tracks", 0),
|
||||||
|
"images": album.get("images", []),
|
||||||
|
"external_urls": album.get("external_urls", {}),
|
||||||
|
"artists": clean_spotify_response(album.get("artists", [])),
|
||||||
|
"genres": clean_spotify_response(full_album.get("genres", [])),
|
||||||
|
"tracks": clean_spotify_response(tracks.get("items", [])),
|
||||||
|
"full_data": clean_spotify_response(full_album),
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
print("Failed to get album info", error=str(e))
|
||||||
|
return info
|
||||||
|
|
||||||
|
|
||||||
def get_spotify_artist_info(artist_name: str, session: spotipy.Spotify):
|
@external_service_fallback
|
||||||
search_result = session.search(q="artist:" + artist_name, type="artist")
|
def get_spotify_artist_info(artist_name: str, session: spotipy.Spotify) -> dict:
|
||||||
artists = search_result.get("artists", {}).get("items", [])
|
info = {
|
||||||
if artists:
|
"name": "",
|
||||||
return artists[0]
|
"link": "",
|
||||||
return None
|
"meta": {},
|
||||||
|
"image_url": "",
|
||||||
|
"genres": [],
|
||||||
|
"popularity": 0,
|
||||||
|
"images": [],
|
||||||
|
"external_urls": {},
|
||||||
|
"full_data": {},
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
search_result = session.search(q="artist:" + artist_name, type="artist")
|
||||||
|
artists = search_result.get("artists", {}).get("items", [])
|
||||||
|
if not artists:
|
||||||
|
return info
|
||||||
|
|
||||||
|
artist = artists[0]
|
||||||
|
return {
|
||||||
|
"name": artist.get("name", ""),
|
||||||
|
"link": artist.get("external_urls", {}).get("spotify", ""),
|
||||||
|
"meta": {
|
||||||
|
"followers": artist.get("followers", {}).get("total", 0),
|
||||||
|
"popularity": artist.get("popularity", 0),
|
||||||
|
"type": artist.get("type", ""),
|
||||||
|
},
|
||||||
|
"image_url": next(
|
||||||
|
(img["url"] for img in artist.get("images", []) if img.get("url")), ""
|
||||||
|
),
|
||||||
|
"genres": artist.get("genres", []),
|
||||||
|
"popularity": artist.get("popularity", 0),
|
||||||
|
"images": artist.get("images", []),
|
||||||
|
"external_urls": artist.get("external_urls", {}),
|
||||||
|
"full_data": clean_spotify_response(artist),
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
print("Failed to get artist info", error=str(e))
|
||||||
|
return info
|
||||||
|
|
||||||
|
|
||||||
def get_yandex_album_info(album_name: str, client: Client):
|
def get_yandex_album_info(album_name: str, client: Client):
|
||||||
|
@ -379,29 +538,46 @@ def save_author_image(author, image_path):
|
||||||
print(f"Error saving author image: {str(e)}")
|
print(f"Error saving author image: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
def safe_translate(text):
|
@external_service_fallback
|
||||||
|
def safe_translate(text: str) -> str:
|
||||||
try:
|
try:
|
||||||
|
text = clean_name(text) # Clean before translation
|
||||||
translated = GoogleTranslator(source="auto", target="en").translate(text)
|
translated = GoogleTranslator(source="auto", target="en").translate(text)
|
||||||
return slugify(translated)
|
# Clean after translation and ensure proper slugification
|
||||||
|
return slugify(clean_name(translated)).replace(" ", "_").lower()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error translating text: {str(e)}")
|
print(f"Translation failed: {str(e)}")
|
||||||
return slugify(text)
|
# Fallback to direct slugification
|
||||||
|
return slugify(clean_name(text)).replace(" ", "_").lower()
|
||||||
|
|
||||||
|
|
||||||
def search_all_platforms(track_name: str) -> dict:
|
def search_all_platforms(track_name: str) -> dict:
|
||||||
print(track_name)
|
print(track_name)
|
||||||
# session = spotipy.Spotify(
|
|
||||||
# auth_manager=spotipy.SpotifyClientCredentials(
|
if settings.MUSIC_EXTERNAL_SERVICE_URL:
|
||||||
# client_id=settings.MUSIC_SPOTIFY_ID,
|
# Use external service if configured
|
||||||
# client_secret=settings.MUSIC_SPOTIFY_SECRET,
|
client = ExternalServiceClient()
|
||||||
# )
|
spotify_info = client.get_spotify_info(track_name) or {}
|
||||||
# )
|
else:
|
||||||
# spotify_info = get_spotify_info(track_name, session)
|
# Local implementation fallback
|
||||||
spotify_info = {} # TODO: add proxy for info retrieve
|
try:
|
||||||
|
session = spotipy.Spotify(
|
||||||
|
auth_manager=SpotifyClientCredentials(
|
||||||
|
client_id=settings.MUSIC_SPOTIFY_ID,
|
||||||
|
client_secret=settings.MUSIC_SPOTIFY_SECRET,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
spotify_info = get_spotify_info(track_name, session)
|
||||||
|
except Exception as e:
|
||||||
|
print("Local Spotify implementation failed", error=str(e))
|
||||||
|
spotify_info = {}
|
||||||
|
|
||||||
yandex_info = search_yandex(track_name)
|
yandex_info = search_yandex(track_name)
|
||||||
|
|
||||||
if "album_image_path" in spotify_info and "album_image_path" in yandex_info:
|
if "album_image_path" in spotify_info and "album_image_path" in yandex_info:
|
||||||
os.remove(yandex_info["album_image_path"])
|
os.remove(yandex_info["album_image_path"])
|
||||||
|
|
||||||
|
# Combine artist information
|
||||||
combined_artists = set()
|
combined_artists = set()
|
||||||
for artist in spotify_info.get("artists", []) + yandex_info.get("artists", []):
|
for artist in spotify_info.get("artists", []) + yandex_info.get("artists", []):
|
||||||
normalized_artist = normalize_text(artist)
|
normalized_artist = normalize_text(artist)
|
||||||
|
@ -410,12 +586,13 @@ def search_all_platforms(track_name: str) -> dict:
|
||||||
for existing_artist in combined_artists
|
for existing_artist in combined_artists
|
||||||
):
|
):
|
||||||
combined_artists.add(normalized_artist)
|
combined_artists.add(normalized_artist)
|
||||||
genre = spotify_info.get("genre") or yandex_info.get("genre")
|
|
||||||
if type(genre) is list:
|
|
||||||
genre = sorted(genre, key=lambda x: len(x))
|
|
||||||
genre = genre[0]
|
|
||||||
|
|
||||||
track_info = {
|
# Process genre information
|
||||||
|
genre = spotify_info.get("genre") or yandex_info.get("genre")
|
||||||
|
if isinstance(genre, list) and genre:
|
||||||
|
genre = sorted(genre, key=len)[0]
|
||||||
|
|
||||||
|
return {
|
||||||
"album_name": spotify_info.get("album_name")
|
"album_name": spotify_info.get("album_name")
|
||||||
or yandex_info.get("album_name", ""),
|
or yandex_info.get("album_name", ""),
|
||||||
"release": spotify_info.get("release") or yandex_info.get("release", ""),
|
"release": spotify_info.get("release") or yandex_info.get("release", ""),
|
||||||
|
@ -425,5 +602,3 @@ def search_all_platforms(track_name: str) -> dict:
|
||||||
"album_image": spotify_info.get("album_image_path")
|
"album_image": spotify_info.get("album_image_path")
|
||||||
or yandex_info.get("album_image_path", None),
|
or yandex_info.get("album_image_path", None),
|
||||||
}
|
}
|
||||||
|
|
||||||
return track_info
|
|
||||||
|
|
|
@ -8,66 +8,85 @@
|
||||||
|
|
||||||
|
|
||||||
def search_song(query):
|
def search_song(query):
|
||||||
|
if not query:
|
||||||
|
return Song.objects.none()
|
||||||
|
|
||||||
search = SongDocument.search()
|
search = SongDocument.search()
|
||||||
|
|
||||||
should_queries = [
|
# Priorities:
|
||||||
ES_Q("match_phrase", name={"query": query, "boost": 5}),
|
# 1. Exact phrase matches in name, author name, album name
|
||||||
|
# 2. Part of author/album name
|
||||||
|
# 3. Exact name (exact matches)
|
||||||
|
# 4. Fuzzy matches
|
||||||
|
# 5. Wildcards
|
||||||
|
|
||||||
|
# phrase matches (highest priority)
|
||||||
|
phrase_queries = [
|
||||||
|
ES_Q("match_phrase", name={"query": query, "boost": 10}),
|
||||||
ES_Q(
|
ES_Q(
|
||||||
"nested",
|
"nested",
|
||||||
path="authors",
|
path="authors",
|
||||||
query=ES_Q("match_phrase", name={"query": query, "boost": 4}),
|
query=ES_Q("match_phrase", authors__name={"query": query, "boost": 9}),
|
||||||
),
|
),
|
||||||
ES_Q(
|
ES_Q(
|
||||||
"nested",
|
"nested",
|
||||||
path="album",
|
path="album",
|
||||||
query=ES_Q("match_phrase", name={"query": query, "boost": 4}),
|
query=ES_Q("match_phrase", album__name={"query": query, "boost": 9}),
|
||||||
),
|
|
||||||
ES_Q("match", name={"query": query, "fuzziness": "AUTO", "boost": 3}),
|
|
||||||
ES_Q(
|
|
||||||
"nested",
|
|
||||||
path="authors",
|
|
||||||
query=ES_Q("match", name={"query": query, "fuzziness": "AUTO", "boost": 2}),
|
|
||||||
),
|
|
||||||
ES_Q(
|
|
||||||
"nested",
|
|
||||||
path="album",
|
|
||||||
query=ES_Q("match", name={"query": query, "fuzziness": "AUTO", "boost": 2}),
|
|
||||||
),
|
|
||||||
ES_Q("wildcard", name={"value": f"*{query.lower()}*", "boost": 1}),
|
|
||||||
ES_Q(
|
|
||||||
"nested",
|
|
||||||
path="authors",
|
|
||||||
query=ES_Q("wildcard", name={"value": f"*{query.lower()}*", "boost": 0.8}),
|
|
||||||
),
|
|
||||||
ES_Q(
|
|
||||||
"nested",
|
|
||||||
path="album",
|
|
||||||
query=ES_Q("wildcard", name={"value": f"*{query.lower()}*", "boost": 0.8}),
|
|
||||||
),
|
|
||||||
ES_Q(
|
|
||||||
"match",
|
|
||||||
name_transliterated={"query": query, "fuzziness": "AUTO", "boost": 1},
|
|
||||||
),
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
# exact keyword matches (non-case sensitive due to normalizers)
|
||||||
|
exact_queries = [
|
||||||
|
ES_Q("term", **{"name.exact": {"value": query.lower(), "boost": 8}})
|
||||||
|
]
|
||||||
|
|
||||||
|
# fuzzy matches
|
||||||
|
fuzzy_queries = [
|
||||||
|
ES_Q("match", name={"query": query, "fuzziness": "AUTO", "boost": 5}),
|
||||||
ES_Q(
|
ES_Q(
|
||||||
"nested",
|
"nested",
|
||||||
path="authors",
|
path="authors",
|
||||||
query=ES_Q(
|
query=ES_Q(
|
||||||
"match",
|
"match", authors__name={"query": query, "fuzziness": "AUTO", "boost": 4}
|
||||||
name_transliterated={"query": query, "fuzziness": "AUTO", "boost": 0.8},
|
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
ES_Q(
|
ES_Q(
|
||||||
"nested",
|
"nested",
|
||||||
path="album",
|
path="album",
|
||||||
query=ES_Q(
|
query=ES_Q(
|
||||||
"match",
|
"match", album__name={"query": query, "fuzziness": "AUTO", "boost": 4}
|
||||||
name_transliterated={"query": query, "fuzziness": "AUTO", "boost": 0.8},
|
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
]
|
]
|
||||||
|
|
||||||
search_query = ES_Q("bool", should=should_queries, minimum_should_match=1)
|
# wildcard matches
|
||||||
|
wildcard_queries = [
|
||||||
|
ES_Q("wildcard", name={"value": f"*{query.lower()}*", "boost": 2}),
|
||||||
|
ES_Q(
|
||||||
|
"nested",
|
||||||
|
path="authors",
|
||||||
|
query=ES_Q(
|
||||||
|
"wildcard", authors__name={"value": f"*{query.lower()}*", "boost": 2}
|
||||||
|
),
|
||||||
|
),
|
||||||
|
ES_Q(
|
||||||
|
"nested",
|
||||||
|
path="album",
|
||||||
|
query=ES_Q(
|
||||||
|
"wildcard", album__name={"value": f"*{query.lower()}*", "boost": 2}
|
||||||
|
),
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
# Combine queries
|
||||||
|
# We'll use a should query to incorporate all of these, relying on boosting
|
||||||
|
search_query = ES_Q(
|
||||||
|
"bool",
|
||||||
|
should=phrase_queries + exact_queries + fuzzy_queries + wildcard_queries,
|
||||||
|
minimum_should_match=1,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Execute search with size limit
|
||||||
search = search.query(search_query).extra(size=20)
|
search = search.query(search_query).extra(size=20)
|
||||||
response = search.execute()
|
response = search.execute()
|
||||||
|
|
||||||
|
@ -103,8 +122,9 @@ def bulk_update_index(model_class):
|
||||||
|
|
||||||
|
|
||||||
def search_author(query):
|
def search_author(query):
|
||||||
|
if not query:
|
||||||
|
return Author.objects.none()
|
||||||
search = AuthorDocument.search()
|
search = AuthorDocument.search()
|
||||||
|
|
||||||
should_queries = [
|
should_queries = [
|
||||||
ES_Q("match_phrase", name={"query": query, "boost": 5}),
|
ES_Q("match_phrase", name={"query": query, "boost": 5}),
|
||||||
ES_Q("match", name={"query": query, "fuzziness": "AUTO", "boost": 3}),
|
ES_Q("match", name={"query": query, "fuzziness": "AUTO", "boost": 3}),
|
||||||
|
@ -114,7 +134,6 @@ def search_author(query):
|
||||||
name_transliterated={"query": query, "fuzziness": "AUTO", "boost": 1},
|
name_transliterated={"query": query, "fuzziness": "AUTO", "boost": 1},
|
||||||
),
|
),
|
||||||
]
|
]
|
||||||
|
|
||||||
search_query = ES_Q("bool", should=should_queries, minimum_should_match=1)
|
search_query = ES_Q("bool", should=should_queries, minimum_should_match=1)
|
||||||
search = search.query(search_query).extra(size=10)
|
search = search.query(search_query).extra(size=10)
|
||||||
response = search.execute()
|
response = search.execute()
|
||||||
|
@ -125,11 +144,12 @@ def search_author(query):
|
||||||
Case(*[When(pk=pk, then=pos) for pos, pk in enumerate(hit_ids)])
|
Case(*[When(pk=pk, then=pos) for pos, pk in enumerate(hit_ids)])
|
||||||
)
|
)
|
||||||
return authors
|
return authors
|
||||||
|
|
||||||
return Author.objects.none()
|
return Author.objects.none()
|
||||||
|
|
||||||
|
|
||||||
def search_album(query):
|
def search_album(query):
|
||||||
|
if not query:
|
||||||
|
return Album.objects.none()
|
||||||
search = AlbumDocument.search()
|
search = AlbumDocument.search()
|
||||||
|
|
||||||
should_queries = [
|
should_queries = [
|
||||||
|
@ -141,7 +161,6 @@ def search_album(query):
|
||||||
name_transliterated={"query": query, "fuzziness": "AUTO", "boost": 1},
|
name_transliterated={"query": query, "fuzziness": "AUTO", "boost": 1},
|
||||||
),
|
),
|
||||||
]
|
]
|
||||||
|
|
||||||
search_query = ES_Q("bool", should=should_queries, minimum_should_match=1)
|
search_query = ES_Q("bool", should=should_queries, minimum_should_match=1)
|
||||||
search = search.query(search_query).extra(size=10)
|
search = search.query(search_query).extra(size=10)
|
||||||
response = search.execute()
|
response = search.execute()
|
||||||
|
@ -152,5 +171,4 @@ def search_album(query):
|
||||||
Case(*[When(pk=pk, then=pos) for pos, pk in enumerate(hit_ids)])
|
Case(*[When(pk=pk, then=pos) for pos, pk in enumerate(hit_ids)])
|
||||||
)
|
)
|
||||||
return albums
|
return albums
|
||||||
|
|
||||||
return Album.objects.none()
|
return Album.objects.none()
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
FROM traefik:2.10.7
|
FROM traefik:2.11.0
|
||||||
RUN mkdir -p /etc/traefik/acme \
|
RUN mkdir -p /etc/traefik/acme \
|
||||||
&& touch /etc/traefik/acme/acme.json \
|
&& touch /etc/traefik/acme/acme.json \
|
||||||
&& chmod 600 /etc/traefik/acme/acme.json
|
&& chmod 600 /etc/traefik/acme/acme.json
|
||||||
|
|
|
@ -715,6 +715,9 @@
|
||||||
LAST_FM_API_KEY = env("LAST_FM_API_KET", default="")
|
LAST_FM_API_KEY = env("LAST_FM_API_KET", default="")
|
||||||
LAST_FM_SECRET = env("LAST_FM_SECRET", default="")
|
LAST_FM_SECRET = env("LAST_FM_SECRET", default="")
|
||||||
|
|
||||||
|
# EXTERNAL
|
||||||
|
MUSIC_EXTERNAL_SERVICE_URL = env("MUSIC_EXTERNAL_SERVICE_URL", default="")
|
||||||
|
|
||||||
# ROBOTS
|
# ROBOTS
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
ROBOTS_USE_SITEMAP = True
|
ROBOTS_USE_SITEMAP = True
|
||||||
|
|
|
@ -69,6 +69,11 @@ services:
|
||||||
redis:
|
redis:
|
||||||
image: redis:6
|
image: redis:6
|
||||||
container_name: akarpov_local_redis
|
container_name: akarpov_local_redis
|
||||||
|
command: redis-server --maxmemory 4gb --maxmemory-policy allkeys-lru
|
||||||
|
deploy:
|
||||||
|
resources:
|
||||||
|
limits:
|
||||||
|
memory: 4G
|
||||||
|
|
||||||
celeryworker:
|
celeryworker:
|
||||||
<<: *django
|
<<: *django
|
||||||
|
|
Loading…
Reference in New Issue
Block a user