fixed slug processing

This commit is contained in:
Alexander Karpov 2024-12-02 23:57:43 +03:00
parent 2a7f1eae88
commit 85e8e3fe8b
2 changed files with 202 additions and 15 deletions

View File

@ -1,4 +1,5 @@
import os
import time
from io import BytesIO
from pathlib import Path
from random import randint
@ -83,3 +84,146 @@ def set_song_volume(song: Song):
mp3_file = song.file.path
song.volume = analyze_music_loudness(mp3_file)
song.save(update_fields=["volume"])
BATCH_SIZE = 10
BATCH_CHECK_DELAY = 10 # seconds
class FileProcessor:
def __init__(self):
self.failed_files: list[str] = []
self.processed_files: set[str] = set()
self.current_batch: dict[str, dict] = {}
def load_dir(self, path: str, user_id: int) -> tuple[list[str], int]:
path = Path(path)
files = list(path.glob("**/*.mp3"))
total_files = len(files)
for i in range(0, len(files), BATCH_SIZE):
batch = files[i : i + BATCH_SIZE] # noqa
self._process_batch(batch, user_id)
# Wait and verify batch
time.sleep(BATCH_CHECK_DELAY)
self._verify_batch()
print(
"Batch processed",
processed=len(self.processed_files),
failed=len(self.failed_files),
total=total_files,
remaining=total_files
- len(self.processed_files)
- len(self.failed_files),
)
return self.failed_files, len(self.processed_files)
def _process_batch(self, files: list[Path], user_id: int):
self.current_batch.clear()
for file_path in files:
file_str = str(file_path)
if file_str in self.processed_files or file_str in self.failed_files:
continue
try:
file_info = self._extract_file_info(file_str)
if self._check_exists(file_info):
self.processed_files.add(file_str)
continue
self.current_batch[file_str] = file_info
self._process_file(file_str, file_info, user_id)
except Exception as e:
print("File processing failed", file=file_str, error=str(e))
self.failed_files.append(file_str)
def _verify_batch(self):
for file_path, info in self.current_batch.items():
if not self._verify_file(file_path, info):
print("File verification failed", file=file_path)
self.failed_files.append(file_path)
else:
self.processed_files.add(file_path)
def _extract_file_info(self, path: str) -> dict:
tag = mutagen.File(path, easy=True)
return {
"author": tag.get("artist"),
"album": tag.get("album"),
"name": tag.get("title", [path.split("/")[-1]])[0],
"image": self._extract_image(path),
}
def _extract_image(self, path: str) -> str | None:
try:
tags = ID3(path)
pict = [x for x in tags.getall("APIC") if x]
if not pict:
return None
pict_data = pict[0].data
im = Image.open(BytesIO(pict_data))
image_path = f"/tmp/{randint(1, 1000000)}.png"
while os.path.exists(image_path):
image_path = f"/tmp/{randint(1, 1000000)}.png"
im.save(image_path)
return image_path
except (UnidentifiedImageError, Exception) as e:
print("Image extraction failed", error=str(e))
return None
def _check_exists(self, info: dict) -> bool:
query = Song.objects.filter(name=info["name"])
if info["author"]:
query = query.filter(authors__name__in=info["author"])
if info["album"]:
query = query.filter(album__name=info["album"])
return query.exists()
def _verify_file(self, file_path: str, info: dict) -> bool:
song = Song.objects.filter(name=info["name"], file__isnull=False).first()
if not song:
return False
# Verify file exists and is readable
if not os.path.exists(song.file.path):
return False
# Verify image if it was expected
if info["image"] and not song.image:
return False
# Verify metadata
if info["author"]:
if not song.authors.filter(name__in=info["author"]).exists():
return False
if info["album"]:
if not song.album or song.album.name != info["album"]:
return False
return True
def _process_file(self, path: str, info: dict, user_id: int):
try:
song = load_track(
path=path,
image_path=info["image"],
user_id=user_id,
authors=info["author"],
album=info["album"],
name=info["name"],
)
if info["image"] and os.path.exists(info["image"]):
os.remove(info["image"])
set_song_volume(song)
except Exception as e:
print("File processing failed", file=path, error=str(e))
self.failed_files.append(path)

View File

@ -1,4 +1,5 @@
import os
import re
from random import randint
from typing import Any
@ -28,16 +29,57 @@
from akarpov.utils.text import is_similar_artist, normalize_text
def clean_name(name: str) -> str:
# Replace special characters with underscores
cleaned = name.strip().replace(" ", "_")
cleaned = re.sub(r"[^\w\-]", "_", cleaned)
# Remove consecutive underscores
cleaned = re.sub(r"_+", "_", cleaned)
# Remove trailing underscores
cleaned = cleaned.strip("_")
return cleaned
def split_authors(authors_str: str) -> list[str]:
# Split on common separators
if not authors_str:
return []
# First split by obvious delimiters
authors = []
for part in re.split(r"[,/&]", authors_str):
# Clean up each part
cleaned = part.strip()
if " feat." in cleaned.lower():
# Split on featuring
main_artist, feat_artist = cleaned.lower().split(" feat.", 1)
authors.extend([main_artist.strip(), feat_artist.strip()])
elif " ft." in cleaned.lower():
main_artist, feat_artist = cleaned.lower().split(" ft.", 1)
authors.extend([main_artist.strip(), feat_artist.strip()])
elif " x " in cleaned:
# Split artist collaborations
authors.extend(p.strip() for p in cleaned.split(" x "))
elif cleaned:
authors.append(cleaned)
# Remove duplicates while preserving order
seen = set()
return [x for x in authors if not (x in seen or seen.add(x))]
def generate_readable_slug(name: str, model: Model) -> str:
# Translate and slugify the name
# Clean and translate name
slug = safe_translate(name)
# Truncate slug if it's too long
# Remove any remaining spaces and ensure proper formatting
slug = clean_name(slug)
# Truncate if necessary
if len(slug) > 20:
slug = slug[:20]
last_dash = slug.rfind("-")
if last_dash != -1:
slug = slug[:last_dash]
# Try to cut at word boundary
truncated = slug[:20].rsplit("_", 1)[0]
slug = truncated if truncated else slug[:20]
original_slug = slug
@ -45,18 +87,16 @@ def generate_readable_slug(name: str, model: Model) -> str:
counter = 1
while model.objects.filter(slug=slug).exists():
if len(original_slug) > 14:
truncated_slug = original_slug[:14]
last_dash = truncated_slug.rfind("-")
if last_dash != -1:
truncated_slug = truncated_slug[:last_dash]
truncated = original_slug[:14].rsplit("_", 1)[0]
base_slug = truncated if truncated else original_slug[:14]
else:
truncated_slug = original_slug
base_slug = original_slug
suffix = f"_{generate_charset(5)}" if counter == 1 else f"_{counter}"
slug = f"{truncated_slug}{suffix}"
slug = f"{base_slug}{suffix}"
counter += 1
return slug
return slug.lower()
def create_spotify_session() -> spotipy.Spotify:
@ -501,11 +541,14 @@ def save_author_image(author, image_path):
@external_service_fallback
def safe_translate(text: str) -> str:
try:
text = clean_name(text) # Clean before translation
translated = GoogleTranslator(source="auto", target="en").translate(text)
return slugify(translated)
# Clean after translation and ensure proper slugification
return slugify(clean_name(translated)).replace(" ", "_").lower()
except Exception as e:
print(f"Translation failed: {str(e)}")
return slugify(text)
# Fallback to direct slugification
return slugify(clean_name(text)).replace(" ", "_").lower()
def search_all_platforms(track_name: str) -> dict: