diff --git a/akarpov/music/services/file.py b/akarpov/music/services/file.py index 95bc450..054506c 100644 --- a/akarpov/music/services/file.py +++ b/akarpov/music/services/file.py @@ -7,11 +7,18 @@ import librosa import mutagen import numpy as np +from django.core.files import File +from django.db import IntegrityError, transaction +from django.db.models import Count +from django.utils.text import slugify from mutagen.id3 import ID3 +from mutagen.mp3 import MP3 from PIL import Image, UnidentifiedImageError -from akarpov.music.models import Song +from akarpov.music.models import Album, Author, Song from akarpov.music.services.db import load_track +from akarpov.users.models import User +from akarpov.utils.generators import generate_charset def load_dir(path: str, user_id: int): @@ -227,3 +234,316 @@ def _process_file(self, path: str, info: dict, user_id: int): except Exception as e: print("File processing failed", file=path, error=str(e)) self.failed_files.append(path) + + +def clean_title_for_slug(title: str) -> str: + """Clean title for slug generation.""" + # Remove common suffixes + suffixes = [ + "(Original Mix)", + "(Radio Edit)", + "(Extended Mix)", + "(Official Video)", + "(Music Video)", + "(Lyric Video)", + "(Audio)", + "(Official Audio)", + "(Visualizer)", + "(Official Music Video)", + "(Official Lyric Video)", + ] + cleaned = title + for suffix in suffixes: + cleaned = cleaned.replace(suffix, "") + return cleaned.strip() + + +def process_authors_string(authors_str: str) -> list[str]: + """Split author string into individual author names.""" + if not authors_str: + return [] + + # First split by major separators + authors = [] + for part in authors_str.split("/"): + for subpart in part.split("&"): + # Handle various featuring cases + for feat_marker in [ + " feat.", + " ft.", + " featuring.", + " presents ", + " pres. ", + ]: + if feat_marker in subpart.lower(): + parts = subpart.lower().split(feat_marker, 1) + authors.extend(part.strip() for part in parts) + break + else: + # Handle collaboration markers + if " x " in subpart: + authors.extend(p.strip() for p in subpart.split(" x ")) + else: + authors.append(subpart.strip()) + + # Remove duplicates while preserving order + seen = set() + return [x for x in authors if not (x.lower() in seen or seen.add(x.lower()))] + + +def extract_mp3_metadata(file_path: str) -> dict | None: + """Extract metadata from MP3 file.""" + try: + audio = MP3(file_path, ID3=ID3) + tags = audio.tags if audio.tags else {} + + # Get filename without extension for fallback + base_filename = os.path.splitext(os.path.basename(file_path))[0] + + metadata = { + "title": None, + "album": None, + "artists": None, + "genre": None, + "release_year": None, + "length": audio.info.length, + "image_data": None, + "image_mime": None, + } + + if tags: + # Extract basic metadata with fallbacks + metadata["title"] = str(tags.get("TIT2", "")) or base_filename + metadata["album"] = str(tags.get("TALB", "")) + metadata["artists"] = str(tags.get("TPE1", "")) or str(tags.get("TPE2", "")) + metadata["genre"] = str(tags.get("TCON", "")) + metadata["release_year"] = str(tags.get("TDRC", "")) or str( + tags.get("TYER", "") + ) + + # Extract cover art + for tag in tags.getall("APIC"): + if tag.type == 3: # Front cover + metadata["image_data"] = tag.data + metadata["image_mime"] = tag.mime + break + + # Clean up title if it came from filename + if metadata["title"] == base_filename: + parts = base_filename.split(" - ", 1) + if len(parts) > 1 and not metadata["artists"]: + metadata["artists"] = parts[0] + metadata["title"] = parts[1] + + return metadata + except Exception as e: + print(f"Error extracting metadata from {file_path}: {str(e)}") + return None + + +def generate_unique_slug( + base_name: str, model_class, existing_id=None, max_length=20 +) -> str: + """Generate a unique slug for a model instance.""" + # Clean and slugify the base name + slug = slugify(clean_title_for_slug(base_name)) + + # Truncate if necessary + if len(slug) > max_length: + slug = slug[:max_length].rsplit("-", 1)[0] + + original_slug = slug + counter = 1 + + # Check for uniqueness + while True: + if existing_id: + exists = ( + model_class.objects.filter(slug=slug).exclude(id=existing_id).exists() + ) + else: + exists = model_class.objects.filter(slug=slug).exists() + + if not exists: + break + + # Generate new slug + if counter == 1: + if len(original_slug) > (max_length - 7): # Leave room for _XXXXX + base = original_slug[: (max_length - 7)] + else: + base = original_slug + slug = f"{base}_{generate_charset(5)}" + else: + if len(original_slug) > (max_length - len(str(counter)) - 1): + base = original_slug[: (max_length - len(str(counter)) - 1)] + else: + base = original_slug + slug = f"{base}_{counter}" + + counter += 1 + + return slug + + +def save_image_as_png(image_data: bytes, mime_type: str) -> str | None: + """Convert image data to PNG and save temporarily.""" + try: + if not image_data: + return None + + img = Image.open(BytesIO(image_data)) + temp_path = f"/tmp/{generate_charset(10)}.png" + img.save(temp_path, "PNG") + return temp_path + except Exception as e: + print(f"Error processing image: {str(e)}") + return None + + +def get_or_create_album(album_name: str, authors: list[Author]) -> Album | None: + """Get or create album with proper locking and uniqueness check.""" + if not album_name: + return None + + with transaction.atomic(): + # Try to find existing album + album = ( + Album.objects.select_for_update().filter(name__iexact=album_name).first() + ) + + if album: + # Add any missing authors + current_author_ids = set(album.authors.values_list("id", flat=True)) + new_author_ids = {author.id for author in authors} + missing_authors = new_author_ids - current_author_ids + if missing_authors: + album.authors.add(*missing_authors) + return album + + try: + # Create new album + album = Album.objects.create( + name=album_name, slug=generate_unique_slug(album_name, Album) + ) + album.authors.set(authors) + return album + except IntegrityError: + # Handle race condition + album = ( + Album.objects.select_for_update() + .filter(name__iexact=album_name) + .first() + ) + if album: + album.authors.add(*authors) + return album + raise + + +def check_song_exists(title: str, album: Album | None, authors: list[Author]) -> bool: + """Check if a song already exists with the given title, album and authors.""" + query = Song.objects.filter(name__iexact=title) + + if album: + query = query.filter(album=album) + + if authors: + # Ensure exact author match + query = query.annotate(author_count=Count("authors")).filter( + author_count=len(authors), authors__in=authors + ) + + return query.exists() + + +def load_mp3_directory( + directory_path: str, user_id: int | None = None +) -> tuple[list[str], int]: + """ + Load all MP3 files from a directory and its subdirectories. + Returns tuple of (failed_files, processed_count) + """ + path = Path(directory_path) + failed_files = [] + processed_count = 0 + + for mp3_path in path.glob("**/*.mp3"): + try: + metadata = extract_mp3_metadata(str(mp3_path)) + if not metadata: + failed_files.append(str(mp3_path)) + continue + + with transaction.atomic(): + # Process authors + author_names = process_authors_string(metadata["artists"]) + authors = [] + for author_name in author_names: + author = Author.objects.filter(name__iexact=author_name).first() + if not author: + author = Author.objects.create( + name=author_name, + slug=generate_unique_slug(author_name, Author), + ) + authors.append(author) + + # Process album + album = None + if metadata["album"]: + try: + album = get_or_create_album(metadata["album"], authors) + except IntegrityError as e: + print(f"Error creating album for {mp3_path}: {str(e)}") + failed_files.append(str(mp3_path)) + continue + + # Check for existing song + if check_song_exists(metadata["title"], album, authors): + print(f"Skipping existing song: {metadata['title']}") + continue + + # Process cover image + temp_image_path = None + if metadata["image_data"]: + temp_image_path = save_image_as_png( + metadata["image_data"], metadata["image_mime"] + ) + if album and not album.image and temp_image_path: + with open(temp_image_path, "rb") as img_file: + album.image.save( + f"{album.slug}.png", File(img_file), save=True + ) + + # Create song with proper slug from file name + file_name = os.path.splitext(os.path.basename(str(mp3_path)))[0] + song = Song( + name=metadata["title"], + length=metadata["length"], + album=album, + slug=generate_unique_slug(file_name, Song), + creator=User.objects.get(id=user_id) if user_id else None, + meta={ + "genre": metadata["genre"], + "release_year": metadata["release_year"], + }, + ) + + # Save files + with open(mp3_path, "rb") as mp3_file: + song.file.save(f"{song.slug}.mp3", File(mp3_file), save=True) + + if temp_image_path: + with open(temp_image_path, "rb") as img_file: + song.image.save(f"{song.slug}.png", File(img_file), save=True) + os.remove(temp_image_path) + + # Set authors + song.authors.set(authors) + processed_count += 1 + + except Exception as e: + print(f"Error processing {mp3_path}: {str(e)}") + failed_files.append(str(mp3_path)) + + return failed_files, processed_count