mirror of
https://github.com/Alexander-D-Karpov/akarpov
synced 2025-08-02 12:50:08 +03:00
Compare commits
No commits in common. "b72ebe6e8cf90c5ed3149da02422592ee1481f29" and "2a7f1eae882592dc6c620df631eefcab733807d6" have entirely different histories.
b72ebe6e8c
...
2a7f1eae88
|
@ -22,30 +22,13 @@
|
||||||
|
|
||||||
|
|
||||||
def get_or_create_author(author_name):
|
def get_or_create_author(author_name):
|
||||||
"""Get or create author with unique slug."""
|
|
||||||
with transaction.atomic():
|
with transaction.atomic():
|
||||||
author = Author.objects.filter(name__iexact=author_name).order_by("id").first()
|
author = Author.objects.filter(name__iexact=author_name).order_by("id").first()
|
||||||
if author is None:
|
if author is None:
|
||||||
author = Author.objects.create(
|
author = Author.objects.create(name=author_name)
|
||||||
name=author_name, slug=generate_readable_slug(author_name, Author)
|
|
||||||
)
|
|
||||||
return author
|
return author
|
||||||
|
|
||||||
|
|
||||||
def get_or_create_album(album_name):
|
|
||||||
"""Get or create album with unique slug."""
|
|
||||||
if not album_name:
|
|
||||||
return None
|
|
||||||
|
|
||||||
with transaction.atomic():
|
|
||||||
album = Album.objects.filter(name__iexact=album_name).order_by("id").first()
|
|
||||||
if album is None:
|
|
||||||
album = Album.objects.create(
|
|
||||||
name=album_name, slug=generate_readable_slug(album_name, Album)
|
|
||||||
)
|
|
||||||
return album
|
|
||||||
|
|
||||||
|
|
||||||
def process_track_name(track_name: str) -> str:
|
def process_track_name(track_name: str) -> str:
|
||||||
# Split the track name by dash and parentheses
|
# Split the track name by dash and parentheses
|
||||||
parts = track_name.split(" - ")
|
parts = track_name.split(" - ")
|
||||||
|
@ -126,7 +109,9 @@ def load_track(
|
||||||
else:
|
else:
|
||||||
album_name = None
|
album_name = None
|
||||||
if album_name:
|
if album_name:
|
||||||
album = get_or_create_album(album_name)
|
album, created = Album.objects.get_or_create(
|
||||||
|
name__iexact=album_name, defaults={"name": album_name}
|
||||||
|
)
|
||||||
|
|
||||||
processed_authors = []
|
processed_authors = []
|
||||||
if authors:
|
if authors:
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
import os
|
import os
|
||||||
import time
|
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from random import randint
|
from random import randint
|
||||||
|
@ -84,146 +83,3 @@ def set_song_volume(song: Song):
|
||||||
mp3_file = song.file.path
|
mp3_file = song.file.path
|
||||||
song.volume = analyze_music_loudness(mp3_file)
|
song.volume = analyze_music_loudness(mp3_file)
|
||||||
song.save(update_fields=["volume"])
|
song.save(update_fields=["volume"])
|
||||||
|
|
||||||
|
|
||||||
BATCH_SIZE = 10
|
|
||||||
BATCH_CHECK_DELAY = 10 # seconds
|
|
||||||
|
|
||||||
|
|
||||||
class FileProcessor:
|
|
||||||
def __init__(self):
|
|
||||||
self.failed_files: list[str] = []
|
|
||||||
self.processed_files: set[str] = set()
|
|
||||||
self.current_batch: dict[str, dict] = {}
|
|
||||||
|
|
||||||
def load_dir(self, path: str, user_id: int) -> tuple[list[str], int]:
|
|
||||||
path = Path(path)
|
|
||||||
files = list(path.glob("**/*.mp3"))
|
|
||||||
total_files = len(files)
|
|
||||||
|
|
||||||
for i in range(0, len(files), BATCH_SIZE):
|
|
||||||
batch = files[i : i + BATCH_SIZE] # noqa
|
|
||||||
self._process_batch(batch, user_id)
|
|
||||||
|
|
||||||
# Wait and verify batch
|
|
||||||
time.sleep(BATCH_CHECK_DELAY)
|
|
||||||
self._verify_batch()
|
|
||||||
|
|
||||||
print(
|
|
||||||
"Batch processed",
|
|
||||||
processed=len(self.processed_files),
|
|
||||||
failed=len(self.failed_files),
|
|
||||||
total=total_files,
|
|
||||||
remaining=total_files
|
|
||||||
- len(self.processed_files)
|
|
||||||
- len(self.failed_files),
|
|
||||||
)
|
|
||||||
|
|
||||||
return self.failed_files, len(self.processed_files)
|
|
||||||
|
|
||||||
def _process_batch(self, files: list[Path], user_id: int):
|
|
||||||
self.current_batch.clear()
|
|
||||||
|
|
||||||
for file_path in files:
|
|
||||||
file_str = str(file_path)
|
|
||||||
if file_str in self.processed_files or file_str in self.failed_files:
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
|
||||||
file_info = self._extract_file_info(file_str)
|
|
||||||
if self._check_exists(file_info):
|
|
||||||
self.processed_files.add(file_str)
|
|
||||||
continue
|
|
||||||
|
|
||||||
self.current_batch[file_str] = file_info
|
|
||||||
self._process_file(file_str, file_info, user_id)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print("File processing failed", file=file_str, error=str(e))
|
|
||||||
self.failed_files.append(file_str)
|
|
||||||
|
|
||||||
def _verify_batch(self):
|
|
||||||
for file_path, info in self.current_batch.items():
|
|
||||||
if not self._verify_file(file_path, info):
|
|
||||||
print("File verification failed", file=file_path)
|
|
||||||
self.failed_files.append(file_path)
|
|
||||||
else:
|
|
||||||
self.processed_files.add(file_path)
|
|
||||||
|
|
||||||
def _extract_file_info(self, path: str) -> dict:
|
|
||||||
tag = mutagen.File(path, easy=True)
|
|
||||||
return {
|
|
||||||
"author": tag.get("artist"),
|
|
||||||
"album": tag.get("album"),
|
|
||||||
"name": tag.get("title", [path.split("/")[-1]])[0],
|
|
||||||
"image": self._extract_image(path),
|
|
||||||
}
|
|
||||||
|
|
||||||
def _extract_image(self, path: str) -> str | None:
|
|
||||||
try:
|
|
||||||
tags = ID3(path)
|
|
||||||
pict = [x for x in tags.getall("APIC") if x]
|
|
||||||
if not pict:
|
|
||||||
return None
|
|
||||||
|
|
||||||
pict_data = pict[0].data
|
|
||||||
im = Image.open(BytesIO(pict_data))
|
|
||||||
image_path = f"/tmp/{randint(1, 1000000)}.png"
|
|
||||||
while os.path.exists(image_path):
|
|
||||||
image_path = f"/tmp/{randint(1, 1000000)}.png"
|
|
||||||
im.save(image_path)
|
|
||||||
return image_path
|
|
||||||
except (UnidentifiedImageError, Exception) as e:
|
|
||||||
print("Image extraction failed", error=str(e))
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _check_exists(self, info: dict) -> bool:
|
|
||||||
query = Song.objects.filter(name=info["name"])
|
|
||||||
if info["author"]:
|
|
||||||
query = query.filter(authors__name__in=info["author"])
|
|
||||||
if info["album"]:
|
|
||||||
query = query.filter(album__name=info["album"])
|
|
||||||
return query.exists()
|
|
||||||
|
|
||||||
def _verify_file(self, file_path: str, info: dict) -> bool:
|
|
||||||
song = Song.objects.filter(name=info["name"], file__isnull=False).first()
|
|
||||||
|
|
||||||
if not song:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Verify file exists and is readable
|
|
||||||
if not os.path.exists(song.file.path):
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Verify image if it was expected
|
|
||||||
if info["image"] and not song.image:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Verify metadata
|
|
||||||
if info["author"]:
|
|
||||||
if not song.authors.filter(name__in=info["author"]).exists():
|
|
||||||
return False
|
|
||||||
if info["album"]:
|
|
||||||
if not song.album or song.album.name != info["album"]:
|
|
||||||
return False
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
def _process_file(self, path: str, info: dict, user_id: int):
|
|
||||||
try:
|
|
||||||
song = load_track(
|
|
||||||
path=path,
|
|
||||||
image_path=info["image"],
|
|
||||||
user_id=user_id,
|
|
||||||
authors=info["author"],
|
|
||||||
album=info["album"],
|
|
||||||
name=info["name"],
|
|
||||||
)
|
|
||||||
if info["image"] and os.path.exists(info["image"]):
|
|
||||||
os.remove(info["image"])
|
|
||||||
|
|
||||||
set_song_volume(song)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print("File processing failed", file=path, error=str(e))
|
|
||||||
self.failed_files.append(path)
|
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
import os
|
import os
|
||||||
import re
|
|
||||||
from random import randint
|
from random import randint
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
@ -29,57 +28,16 @@
|
||||||
from akarpov.utils.text import is_similar_artist, normalize_text
|
from akarpov.utils.text import is_similar_artist, normalize_text
|
||||||
|
|
||||||
|
|
||||||
def clean_name(name: str) -> str:
|
|
||||||
# Replace special characters with underscores
|
|
||||||
cleaned = name.strip().replace(" ", "_")
|
|
||||||
cleaned = re.sub(r"[^\w\-]", "_", cleaned)
|
|
||||||
# Remove consecutive underscores
|
|
||||||
cleaned = re.sub(r"_+", "_", cleaned)
|
|
||||||
# Remove trailing underscores
|
|
||||||
cleaned = cleaned.strip("_")
|
|
||||||
return cleaned
|
|
||||||
|
|
||||||
|
|
||||||
def split_authors(authors_str: str) -> list[str]:
|
|
||||||
# Split on common separators
|
|
||||||
if not authors_str:
|
|
||||||
return []
|
|
||||||
|
|
||||||
# First split by obvious delimiters
|
|
||||||
authors = []
|
|
||||||
for part in re.split(r"[,/&]", authors_str):
|
|
||||||
# Clean up each part
|
|
||||||
cleaned = part.strip()
|
|
||||||
if " feat." in cleaned.lower():
|
|
||||||
# Split on featuring
|
|
||||||
main_artist, feat_artist = cleaned.lower().split(" feat.", 1)
|
|
||||||
authors.extend([main_artist.strip(), feat_artist.strip()])
|
|
||||||
elif " ft." in cleaned.lower():
|
|
||||||
main_artist, feat_artist = cleaned.lower().split(" ft.", 1)
|
|
||||||
authors.extend([main_artist.strip(), feat_artist.strip()])
|
|
||||||
elif " x " in cleaned:
|
|
||||||
# Split artist collaborations
|
|
||||||
authors.extend(p.strip() for p in cleaned.split(" x "))
|
|
||||||
elif cleaned:
|
|
||||||
authors.append(cleaned)
|
|
||||||
|
|
||||||
# Remove duplicates while preserving order
|
|
||||||
seen = set()
|
|
||||||
return [x for x in authors if not (x in seen or seen.add(x))]
|
|
||||||
|
|
||||||
|
|
||||||
def generate_readable_slug(name: str, model: Model) -> str:
|
def generate_readable_slug(name: str, model: Model) -> str:
|
||||||
# Clean and translate name
|
# Translate and slugify the name
|
||||||
slug = safe_translate(name)
|
slug = safe_translate(name)
|
||||||
|
|
||||||
# Remove any remaining spaces and ensure proper formatting
|
# Truncate slug if it's too long
|
||||||
slug = clean_name(slug)
|
|
||||||
|
|
||||||
# Truncate if necessary
|
|
||||||
if len(slug) > 20:
|
if len(slug) > 20:
|
||||||
# Try to cut at word boundary
|
slug = slug[:20]
|
||||||
truncated = slug[:20].rsplit("_", 1)[0]
|
last_dash = slug.rfind("-")
|
||||||
slug = truncated if truncated else slug[:20]
|
if last_dash != -1:
|
||||||
|
slug = slug[:last_dash]
|
||||||
|
|
||||||
original_slug = slug
|
original_slug = slug
|
||||||
|
|
||||||
|
@ -87,16 +45,18 @@ def generate_readable_slug(name: str, model: Model) -> str:
|
||||||
counter = 1
|
counter = 1
|
||||||
while model.objects.filter(slug=slug).exists():
|
while model.objects.filter(slug=slug).exists():
|
||||||
if len(original_slug) > 14:
|
if len(original_slug) > 14:
|
||||||
truncated = original_slug[:14].rsplit("_", 1)[0]
|
truncated_slug = original_slug[:14]
|
||||||
base_slug = truncated if truncated else original_slug[:14]
|
last_dash = truncated_slug.rfind("-")
|
||||||
|
if last_dash != -1:
|
||||||
|
truncated_slug = truncated_slug[:last_dash]
|
||||||
else:
|
else:
|
||||||
base_slug = original_slug
|
truncated_slug = original_slug
|
||||||
|
|
||||||
suffix = f"_{generate_charset(5)}" if counter == 1 else f"_{counter}"
|
suffix = f"_{generate_charset(5)}" if counter == 1 else f"_{counter}"
|
||||||
slug = f"{base_slug}{suffix}"
|
slug = f"{truncated_slug}{suffix}"
|
||||||
counter += 1
|
counter += 1
|
||||||
|
|
||||||
return slug.lower()
|
return slug
|
||||||
|
|
||||||
|
|
||||||
def create_spotify_session() -> spotipy.Spotify:
|
def create_spotify_session() -> spotipy.Spotify:
|
||||||
|
@ -541,14 +501,11 @@ def save_author_image(author, image_path):
|
||||||
@external_service_fallback
|
@external_service_fallback
|
||||||
def safe_translate(text: str) -> str:
|
def safe_translate(text: str) -> str:
|
||||||
try:
|
try:
|
||||||
text = clean_name(text) # Clean before translation
|
|
||||||
translated = GoogleTranslator(source="auto", target="en").translate(text)
|
translated = GoogleTranslator(source="auto", target="en").translate(text)
|
||||||
# Clean after translation and ensure proper slugification
|
return slugify(translated)
|
||||||
return slugify(clean_name(translated)).replace(" ", "_").lower()
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Translation failed: {str(e)}")
|
print(f"Translation failed: {str(e)}")
|
||||||
# Fallback to direct slugification
|
return slugify(text)
|
||||||
return slugify(clean_name(text)).replace(" ", "_").lower()
|
|
||||||
|
|
||||||
|
|
||||||
def search_all_platforms(track_name: str) -> dict:
|
def search_all_platforms(track_name: str) -> dict:
|
||||||
|
|
Loading…
Reference in New Issue
Block a user