diff --git a/akarpov/music/services/file.py b/akarpov/music/services/file.py index 84f8133..95bc450 100644 --- a/akarpov/music/services/file.py +++ b/akarpov/music/services/file.py @@ -1,4 +1,5 @@ import os +import time from io import BytesIO from pathlib import Path from random import randint @@ -83,3 +84,146 @@ def set_song_volume(song: Song): mp3_file = song.file.path song.volume = analyze_music_loudness(mp3_file) song.save(update_fields=["volume"]) + + +BATCH_SIZE = 10 +BATCH_CHECK_DELAY = 10 # seconds + + +class FileProcessor: + def __init__(self): + self.failed_files: list[str] = [] + self.processed_files: set[str] = set() + self.current_batch: dict[str, dict] = {} + + def load_dir(self, path: str, user_id: int) -> tuple[list[str], int]: + path = Path(path) + files = list(path.glob("**/*.mp3")) + total_files = len(files) + + for i in range(0, len(files), BATCH_SIZE): + batch = files[i : i + BATCH_SIZE] # noqa + self._process_batch(batch, user_id) + + # Wait and verify batch + time.sleep(BATCH_CHECK_DELAY) + self._verify_batch() + + print( + "Batch processed", + processed=len(self.processed_files), + failed=len(self.failed_files), + total=total_files, + remaining=total_files + - len(self.processed_files) + - len(self.failed_files), + ) + + return self.failed_files, len(self.processed_files) + + def _process_batch(self, files: list[Path], user_id: int): + self.current_batch.clear() + + for file_path in files: + file_str = str(file_path) + if file_str in self.processed_files or file_str in self.failed_files: + continue + + try: + file_info = self._extract_file_info(file_str) + if self._check_exists(file_info): + self.processed_files.add(file_str) + continue + + self.current_batch[file_str] = file_info + self._process_file(file_str, file_info, user_id) + + except Exception as e: + print("File processing failed", file=file_str, error=str(e)) + self.failed_files.append(file_str) + + def _verify_batch(self): + for file_path, info in self.current_batch.items(): + if not self._verify_file(file_path, info): + print("File verification failed", file=file_path) + self.failed_files.append(file_path) + else: + self.processed_files.add(file_path) + + def _extract_file_info(self, path: str) -> dict: + tag = mutagen.File(path, easy=True) + return { + "author": tag.get("artist"), + "album": tag.get("album"), + "name": tag.get("title", [path.split("/")[-1]])[0], + "image": self._extract_image(path), + } + + def _extract_image(self, path: str) -> str | None: + try: + tags = ID3(path) + pict = [x for x in tags.getall("APIC") if x] + if not pict: + return None + + pict_data = pict[0].data + im = Image.open(BytesIO(pict_data)) + image_path = f"/tmp/{randint(1, 1000000)}.png" + while os.path.exists(image_path): + image_path = f"/tmp/{randint(1, 1000000)}.png" + im.save(image_path) + return image_path + except (UnidentifiedImageError, Exception) as e: + print("Image extraction failed", error=str(e)) + return None + + def _check_exists(self, info: dict) -> bool: + query = Song.objects.filter(name=info["name"]) + if info["author"]: + query = query.filter(authors__name__in=info["author"]) + if info["album"]: + query = query.filter(album__name=info["album"]) + return query.exists() + + def _verify_file(self, file_path: str, info: dict) -> bool: + song = Song.objects.filter(name=info["name"], file__isnull=False).first() + + if not song: + return False + + # Verify file exists and is readable + if not os.path.exists(song.file.path): + return False + + # Verify image if it was expected + if info["image"] and not song.image: + return False + + # Verify metadata + if info["author"]: + if not song.authors.filter(name__in=info["author"]).exists(): + return False + if info["album"]: + if not song.album or song.album.name != info["album"]: + return False + + return True + + def _process_file(self, path: str, info: dict, user_id: int): + try: + song = load_track( + path=path, + image_path=info["image"], + user_id=user_id, + authors=info["author"], + album=info["album"], + name=info["name"], + ) + if info["image"] and os.path.exists(info["image"]): + os.remove(info["image"]) + + set_song_volume(song) + + except Exception as e: + print("File processing failed", file=path, error=str(e)) + self.failed_files.append(path) diff --git a/akarpov/music/services/info.py b/akarpov/music/services/info.py index cfc134b..98a977f 100644 --- a/akarpov/music/services/info.py +++ b/akarpov/music/services/info.py @@ -1,4 +1,5 @@ import os +import re from random import randint from typing import Any @@ -28,16 +29,57 @@ from akarpov.utils.text import is_similar_artist, normalize_text +def clean_name(name: str) -> str: + # Replace special characters with underscores + cleaned = name.strip().replace(" ", "_") + cleaned = re.sub(r"[^\w\-]", "_", cleaned) + # Remove consecutive underscores + cleaned = re.sub(r"_+", "_", cleaned) + # Remove trailing underscores + cleaned = cleaned.strip("_") + return cleaned + + +def split_authors(authors_str: str) -> list[str]: + # Split on common separators + if not authors_str: + return [] + + # First split by obvious delimiters + authors = [] + for part in re.split(r"[,/&]", authors_str): + # Clean up each part + cleaned = part.strip() + if " feat." in cleaned.lower(): + # Split on featuring + main_artist, feat_artist = cleaned.lower().split(" feat.", 1) + authors.extend([main_artist.strip(), feat_artist.strip()]) + elif " ft." in cleaned.lower(): + main_artist, feat_artist = cleaned.lower().split(" ft.", 1) + authors.extend([main_artist.strip(), feat_artist.strip()]) + elif " x " in cleaned: + # Split artist collaborations + authors.extend(p.strip() for p in cleaned.split(" x ")) + elif cleaned: + authors.append(cleaned) + + # Remove duplicates while preserving order + seen = set() + return [x for x in authors if not (x in seen or seen.add(x))] + + def generate_readable_slug(name: str, model: Model) -> str: - # Translate and slugify the name + # Clean and translate name slug = safe_translate(name) - # Truncate slug if it's too long + # Remove any remaining spaces and ensure proper formatting + slug = clean_name(slug) + + # Truncate if necessary if len(slug) > 20: - slug = slug[:20] - last_dash = slug.rfind("-") - if last_dash != -1: - slug = slug[:last_dash] + # Try to cut at word boundary + truncated = slug[:20].rsplit("_", 1)[0] + slug = truncated if truncated else slug[:20] original_slug = slug @@ -45,18 +87,16 @@ def generate_readable_slug(name: str, model: Model) -> str: counter = 1 while model.objects.filter(slug=slug).exists(): if len(original_slug) > 14: - truncated_slug = original_slug[:14] - last_dash = truncated_slug.rfind("-") - if last_dash != -1: - truncated_slug = truncated_slug[:last_dash] + truncated = original_slug[:14].rsplit("_", 1)[0] + base_slug = truncated if truncated else original_slug[:14] else: - truncated_slug = original_slug + base_slug = original_slug suffix = f"_{generate_charset(5)}" if counter == 1 else f"_{counter}" - slug = f"{truncated_slug}{suffix}" + slug = f"{base_slug}{suffix}" counter += 1 - return slug + return slug.lower() def create_spotify_session() -> spotipy.Spotify: @@ -501,11 +541,14 @@ def save_author_image(author, image_path): @external_service_fallback def safe_translate(text: str) -> str: try: + text = clean_name(text) # Clean before translation translated = GoogleTranslator(source="auto", target="en").translate(text) - return slugify(translated) + # Clean after translation and ensure proper slugification + return slugify(clean_name(translated)).replace(" ", "_").lower() except Exception as e: print(f"Translation failed: {str(e)}") - return slugify(text) + # Fallback to direct slugification + return slugify(clean_name(text)).replace(" ", "_").lower() def search_all_platforms(track_name: str) -> dict: