added new functions for local upload

This commit is contained in:
Alexander Karpov 2024-12-05 06:25:43 +03:00
parent c3de9b45b8
commit 600c903a68

View File

@ -7,11 +7,18 @@
import librosa
import mutagen
import numpy as np
from django.core.files import File
from django.db import IntegrityError, transaction
from django.db.models import Count
from django.utils.text import slugify
from mutagen.id3 import ID3
from mutagen.mp3 import MP3
from PIL import Image, UnidentifiedImageError
from akarpov.music.models import Song
from akarpov.music.models import Album, Author, Song
from akarpov.music.services.db import load_track
from akarpov.users.models import User
from akarpov.utils.generators import generate_charset
def load_dir(path: str, user_id: int):
@ -227,3 +234,316 @@ def _process_file(self, path: str, info: dict, user_id: int):
except Exception as e:
print("File processing failed", file=path, error=str(e))
self.failed_files.append(path)
def clean_title_for_slug(title: str) -> str:
"""Clean title for slug generation."""
# Remove common suffixes
suffixes = [
"(Original Mix)",
"(Radio Edit)",
"(Extended Mix)",
"(Official Video)",
"(Music Video)",
"(Lyric Video)",
"(Audio)",
"(Official Audio)",
"(Visualizer)",
"(Official Music Video)",
"(Official Lyric Video)",
]
cleaned = title
for suffix in suffixes:
cleaned = cleaned.replace(suffix, "")
return cleaned.strip()
def process_authors_string(authors_str: str) -> list[str]:
"""Split author string into individual author names."""
if not authors_str:
return []
# First split by major separators
authors = []
for part in authors_str.split("/"):
for subpart in part.split("&"):
# Handle various featuring cases
for feat_marker in [
" feat.",
" ft.",
" featuring.",
" presents ",
" pres. ",
]:
if feat_marker in subpart.lower():
parts = subpart.lower().split(feat_marker, 1)
authors.extend(part.strip() for part in parts)
break
else:
# Handle collaboration markers
if " x " in subpart:
authors.extend(p.strip() for p in subpart.split(" x "))
else:
authors.append(subpart.strip())
# Remove duplicates while preserving order
seen = set()
return [x for x in authors if not (x.lower() in seen or seen.add(x.lower()))]
def extract_mp3_metadata(file_path: str) -> dict | None:
"""Extract metadata from MP3 file."""
try:
audio = MP3(file_path, ID3=ID3)
tags = audio.tags if audio.tags else {}
# Get filename without extension for fallback
base_filename = os.path.splitext(os.path.basename(file_path))[0]
metadata = {
"title": None,
"album": None,
"artists": None,
"genre": None,
"release_year": None,
"length": audio.info.length,
"image_data": None,
"image_mime": None,
}
if tags:
# Extract basic metadata with fallbacks
metadata["title"] = str(tags.get("TIT2", "")) or base_filename
metadata["album"] = str(tags.get("TALB", ""))
metadata["artists"] = str(tags.get("TPE1", "")) or str(tags.get("TPE2", ""))
metadata["genre"] = str(tags.get("TCON", ""))
metadata["release_year"] = str(tags.get("TDRC", "")) or str(
tags.get("TYER", "")
)
# Extract cover art
for tag in tags.getall("APIC"):
if tag.type == 3: # Front cover
metadata["image_data"] = tag.data
metadata["image_mime"] = tag.mime
break
# Clean up title if it came from filename
if metadata["title"] == base_filename:
parts = base_filename.split(" - ", 1)
if len(parts) > 1 and not metadata["artists"]:
metadata["artists"] = parts[0]
metadata["title"] = parts[1]
return metadata
except Exception as e:
print(f"Error extracting metadata from {file_path}: {str(e)}")
return None
def generate_unique_slug(
base_name: str, model_class, existing_id=None, max_length=20
) -> str:
"""Generate a unique slug for a model instance."""
# Clean and slugify the base name
slug = slugify(clean_title_for_slug(base_name))
# Truncate if necessary
if len(slug) > max_length:
slug = slug[:max_length].rsplit("-", 1)[0]
original_slug = slug
counter = 1
# Check for uniqueness
while True:
if existing_id:
exists = (
model_class.objects.filter(slug=slug).exclude(id=existing_id).exists()
)
else:
exists = model_class.objects.filter(slug=slug).exists()
if not exists:
break
# Generate new slug
if counter == 1:
if len(original_slug) > (max_length - 7): # Leave room for _XXXXX
base = original_slug[: (max_length - 7)]
else:
base = original_slug
slug = f"{base}_{generate_charset(5)}"
else:
if len(original_slug) > (max_length - len(str(counter)) - 1):
base = original_slug[: (max_length - len(str(counter)) - 1)]
else:
base = original_slug
slug = f"{base}_{counter}"
counter += 1
return slug
def save_image_as_png(image_data: bytes, mime_type: str) -> str | None:
"""Convert image data to PNG and save temporarily."""
try:
if not image_data:
return None
img = Image.open(BytesIO(image_data))
temp_path = f"/tmp/{generate_charset(10)}.png"
img.save(temp_path, "PNG")
return temp_path
except Exception as e:
print(f"Error processing image: {str(e)}")
return None
def get_or_create_album(album_name: str, authors: list[Author]) -> Album | None:
"""Get or create album with proper locking and uniqueness check."""
if not album_name:
return None
with transaction.atomic():
# Try to find existing album
album = (
Album.objects.select_for_update().filter(name__iexact=album_name).first()
)
if album:
# Add any missing authors
current_author_ids = set(album.authors.values_list("id", flat=True))
new_author_ids = {author.id for author in authors}
missing_authors = new_author_ids - current_author_ids
if missing_authors:
album.authors.add(*missing_authors)
return album
try:
# Create new album
album = Album.objects.create(
name=album_name, slug=generate_unique_slug(album_name, Album)
)
album.authors.set(authors)
return album
except IntegrityError:
# Handle race condition
album = (
Album.objects.select_for_update()
.filter(name__iexact=album_name)
.first()
)
if album:
album.authors.add(*authors)
return album
raise
def check_song_exists(title: str, album: Album | None, authors: list[Author]) -> bool:
"""Check if a song already exists with the given title, album and authors."""
query = Song.objects.filter(name__iexact=title)
if album:
query = query.filter(album=album)
if authors:
# Ensure exact author match
query = query.annotate(author_count=Count("authors")).filter(
author_count=len(authors), authors__in=authors
)
return query.exists()
def load_mp3_directory(
directory_path: str, user_id: int | None = None
) -> tuple[list[str], int]:
"""
Load all MP3 files from a directory and its subdirectories.
Returns tuple of (failed_files, processed_count)
"""
path = Path(directory_path)
failed_files = []
processed_count = 0
for mp3_path in path.glob("**/*.mp3"):
try:
metadata = extract_mp3_metadata(str(mp3_path))
if not metadata:
failed_files.append(str(mp3_path))
continue
with transaction.atomic():
# Process authors
author_names = process_authors_string(metadata["artists"])
authors = []
for author_name in author_names:
author = Author.objects.filter(name__iexact=author_name).first()
if not author:
author = Author.objects.create(
name=author_name,
slug=generate_unique_slug(author_name, Author),
)
authors.append(author)
# Process album
album = None
if metadata["album"]:
try:
album = get_or_create_album(metadata["album"], authors)
except IntegrityError as e:
print(f"Error creating album for {mp3_path}: {str(e)}")
failed_files.append(str(mp3_path))
continue
# Check for existing song
if check_song_exists(metadata["title"], album, authors):
print(f"Skipping existing song: {metadata['title']}")
continue
# Process cover image
temp_image_path = None
if metadata["image_data"]:
temp_image_path = save_image_as_png(
metadata["image_data"], metadata["image_mime"]
)
if album and not album.image and temp_image_path:
with open(temp_image_path, "rb") as img_file:
album.image.save(
f"{album.slug}.png", File(img_file), save=True
)
# Create song with proper slug from file name
file_name = os.path.splitext(os.path.basename(str(mp3_path)))[0]
song = Song(
name=metadata["title"],
length=metadata["length"],
album=album,
slug=generate_unique_slug(file_name, Song),
creator=User.objects.get(id=user_id) if user_id else None,
meta={
"genre": metadata["genre"],
"release_year": metadata["release_year"],
},
)
# Save files
with open(mp3_path, "rb") as mp3_file:
song.file.save(f"{song.slug}.mp3", File(mp3_file), save=True)
if temp_image_path:
with open(temp_image_path, "rb") as img_file:
song.image.save(f"{song.slug}.png", File(img_file), save=True)
os.remove(temp_image_path)
# Set authors
song.authors.set(authors)
processed_count += 1
except Exception as e:
print(f"Error processing {mp3_path}: {str(e)}")
failed_files.append(str(mp3_path))
return failed_files, processed_count