Compare commits

..

No commits in common. "5e9c01fc29c6e28cb7dce30be1dda72de721d1b9" and "b63beb8da8b2171eb8fd4479277ca53e6487b7bb" have entirely different histories.

2 changed files with 147 additions and 202 deletions

View File

@ -21,33 +21,30 @@
from akarpov.utils.text import is_similar_artist, normalize_text from akarpov.utils.text import is_similar_artist, normalize_text
def generate_readable_slug(name: str, model: Model) -> str: def generate_readable_slug(name: str, model) -> str:
# Translate and slugify the name # Translate and slugify the name
slug = safe_translate(name) slug = str(
slugify(
GoogleTranslator(source="auto", target="en").translate(
name,
target_language="en",
)
)
)
# Truncate slug if it's too long
if len(slug) > 20: if len(slug) > 20:
slug = slug[:20] slug = slug[:20]
last_dash = slug.rfind("-") last_dash = slug.rfind("-")
if last_dash != -1: if last_dash != -1:
slug = slug[:last_dash] slug = slug[:last_dash]
original_slug = slug
# Ensure uniqueness
counter = 1
while model.objects.filter(slug=slug).exists(): while model.objects.filter(slug=slug).exists():
if len(original_slug) > 14: if len(slug) > 14:
truncated_slug = original_slug[:14] slug = slug[:14]
last_dash = truncated_slug.rfind("-") last_dash = slug.rfind("-")
if last_dash != -1: if last_dash != -1:
truncated_slug = truncated_slug[:last_dash] slug = slug[:last_dash]
else: slug = slug + "_" + generate_charset(5)
truncated_slug = original_slug
suffix = f"_{generate_charset(5)}" if counter == 1 else f"_{counter}"
slug = f"{truncated_slug}{suffix}"
counter += 1
return slug return slug
@ -220,12 +217,16 @@ def update_album_info(album: AlbumModel, author_name: str = None) -> None:
client = yandex_login() client = yandex_login()
spotify_session = create_spotify_session() spotify_session = create_spotify_session()
search_term = f"{album.name} - {author_name}" if author_name else album.name if author_name:
yandex_album_info = get_yandex_album_info(
yandex_album_info = get_api_info(get_yandex_album_info, search_term, client) album.name + " - " + author_name, client
spotify_album_info = get_api_info( )
get_spotify_album_info, search_term, spotify_session spotify_album_info = get_spotify_album_info(
) album.name + " - " + author_name, spotify_session
)
else:
yandex_album_info = get_yandex_album_info(album.name, client)
spotify_album_info = get_spotify_album_info(album.name, spotify_session)
# Combine and prioritize Spotify data # Combine and prioritize Spotify data
album_data = {} album_data = {}
@ -235,14 +236,14 @@ def update_album_info(album: AlbumModel, author_name: str = None) -> None:
"name": spotify_album_info.get("name", album.name), "name": spotify_album_info.get("name", album.name),
"release_date": spotify_album_info.get("release_date", ""), "release_date": spotify_album_info.get("release_date", ""),
"total_tracks": spotify_album_info.get("total_tracks", ""), "total_tracks": spotify_album_info.get("total_tracks", ""),
"link": spotify_album_info.get("external_urls", {}).get("spotify", ""), "link": spotify_album_info["external_urls"]["spotify"],
"genre": spotify_album_info.get("genres", []), "genre": spotify_album_info.get("genres", []),
} }
if yandex_album_info: if yandex_album_info:
album_data.update( album_data.update(
{ {
"name": album_data.get("name") or yandex_album_info.title, "name": album_data.get("name", yandex_album_info.title),
"genre": album_data.get("genre") or yandex_album_info.genre, "genre": album_data.get("genre", yandex_album_info.genre),
"description": yandex_album_info.description, "description": yandex_album_info.description,
"type": yandex_album_info.type, "type": yandex_album_info.type,
} }
@ -252,120 +253,102 @@ def update_album_info(album: AlbumModel, author_name: str = None) -> None:
album.save() album.save()
# Handle Album Image - Prefer Spotify, fallback to Yandex # Handle Album Image - Prefer Spotify, fallback to Yandex
image_path = get_album_image(spotify_album_info, yandex_album_info) image_path = None
if (
spotify_album_info
and "images" in spotify_album_info
and spotify_album_info["images"]
):
image_path = download_image(
spotify_album_info["images"][0]["url"], settings.MEDIA_ROOT
)
elif yandex_album_info and yandex_album_info.cover_uri:
image_path = download_image(
"https://" + yandex_album_info.cover_uri, settings.MEDIA_ROOT
)
generated_name = slugify(
GoogleTranslator(source="auto", target="en").translate(
album.name,
target_language="en",
)
)
if image_path: if image_path:
save_album_image(album, image_path)
# Update Album Authors from Spotify data if available
if spotify_album_info and "artists" in spotify_album_info:
update_album_authors(album, spotify_album_info["artists"])
album.slug = generate_readable_slug(album.name, AlbumModel)
album.save()
def get_album_image(spotify_info, yandex_info):
if spotify_info and "images" in spotify_info and spotify_info["images"]:
return download_image(spotify_info["images"][0]["url"], settings.MEDIA_ROOT)
elif yandex_info and yandex_info.cover_uri:
return download_image("https://" + yandex_info.cover_uri, settings.MEDIA_ROOT)
return None
def save_album_image(album, image_path):
if not image_path:
return
try:
generated_name = safe_translate(album.name)
with open(image_path, "rb") as f: with open(image_path, "rb") as f:
album.image.save( album.image.save(
generated_name + ".png", generated_name + ".png",
File(f, name=generated_name + ".png"), File(
f,
name=generated_name + ".png",
),
save=True, save=True,
) )
os.remove(image_path) os.remove(image_path)
album.save() album.save()
except Exception as e:
print(f"Error saving album image: {str(e)}")
# Update Album Authors from Spotify data if available
if spotify_album_info and "artists" in spotify_album_info:
album_authors = []
for artist in spotify_album_info["artists"]:
author, created = Author.objects.get_or_create(name=artist["name"])
album_authors.append(author)
album.authors.set(album_authors)
def update_album_authors(album, artists): album.slug = generate_readable_slug(album.name, AlbumModel)
album_authors = [] album.save()
for artist in artists:
author, created = Author.objects.get_or_create(name=artist["name"])
album_authors.append(author)
album.authors.set(album_authors)
def update_author_info(author: Author) -> None: def update_author_info(author: Author) -> None:
client = yandex_login() client = yandex_login()
spotify_session = create_spotify_session() spotify_session = create_spotify_session()
yandex_artist_info = get_api_info(get_yandex_artist_info, author.name, client) # Retrieve info from both services
spotify_artist_info = get_api_info( yandex_artist_info = get_yandex_artist_info(author.name, client)
get_spotify_artist_info, author.name, spotify_session spotify_artist_info = get_spotify_artist_info(author.name, spotify_session)
)
author_data = combine_artist_data(author, spotify_artist_info, yandex_artist_info) # Combine and prioritize Spotify data
with transaction.atomic():
author.meta = author_data
author.save()
image_path = get_author_image(spotify_artist_info, yandex_artist_info)
if image_path:
save_author_image(author, image_path)
author.slug = generate_readable_slug(author.name, Author)
with transaction.atomic():
author.save()
def get_api_info(api_func, search_term, session):
try:
return api_func(search_term, session)
except Exception as e:
print(f"Error fetching info from {api_func.__name__}: {str(e)}")
return None
def combine_artist_data(author, spotify_info, yandex_info):
author_data = {} author_data = {}
if spotify_info: if spotify_artist_info:
author_data = { author_data = {
"name": spotify_info.get("name", author.name), "name": spotify_artist_info.get("name", author.name),
"genres": spotify_info.get("genres", []), "genres": spotify_artist_info.get("genres", []),
"popularity": spotify_info.get("popularity", 0), "popularity": spotify_artist_info.get("popularity", 0),
"link": spotify_info.get("external_urls", {}).get("spotify", ""), "link": spotify_artist_info["external_urls"]["spotify"],
} }
if yandex_info: if yandex_artist_info:
author_data.update( author_data.update(
{ {
"name": author_data.get("name") or yandex_info.name, "name": author_data.get("name", yandex_artist_info.name),
"genres": author_data.get("genres") or yandex_info.genres, "genres": author_data.get("genres", yandex_artist_info.genres),
"description": yandex_info.description, "description": yandex_artist_info.description,
} }
) )
return author_data
author.meta = author_data
with transaction.atomic():
author.save()
def get_author_image(spotify_info, yandex_info): # Handle Author Image - Prefer Spotify, fallback to Yandex
if spotify_info and "images" in spotify_info and spotify_info["images"]: image_path = None
return download_image(spotify_info["images"][0]["url"], settings.MEDIA_ROOT) if (
elif yandex_info and yandex_info.cover: spotify_artist_info
return download_image(yandex_info.cover, settings.MEDIA_ROOT) and "images" in spotify_artist_info
return None and spotify_artist_info["images"]
):
image_path = download_image(
spotify_artist_info["images"][0]["url"], settings.MEDIA_ROOT
)
elif yandex_artist_info and yandex_artist_info.cover:
image_path = download_image(yandex_artist_info.cover, settings.MEDIA_ROOT)
generated_name = slugify(
def save_author_image(author, image_path): GoogleTranslator(source="auto", target="en").translate(
if not image_path: author.name,
return target_language="en",
)
try: )
generated_name = safe_translate(author.name) if image_path:
with open(image_path, "rb") as f: with open(image_path, "rb") as f:
author.image.save( author.image.save(
generated_name + ".png", generated_name + ".png",
@ -374,17 +357,10 @@ def save_author_image(author, image_path):
) )
os.remove(image_path) os.remove(image_path)
author.save() author.save()
except Exception as e:
print(f"Error saving author image: {str(e)}")
author.slug = generate_readable_slug(author.name, Author)
def safe_translate(text): with transaction.atomic():
try: author.save()
translated = GoogleTranslator(source="auto", target="en").translate(text)
return slugify(translated)
except Exception as e:
print(f"Error translating text: {str(e)}")
return slugify(text)
def search_all_platforms(track_name: str) -> dict: def search_all_platforms(track_name: str) -> dict:

View File

@ -31,97 +31,66 @@
@shared_task(soft_time_limit=60 * 60, time_limit=60 * 120) @shared_task(soft_time_limit=60 * 60, time_limit=60 * 120)
def list_tracks(url, user_id): def list_tracks(url, user_id):
url = normalize_url(url) if "music.youtube.com" in url or "youtu.be" in url:
url = url.replace("music.youtube.com", "youtube.com")
url = url.replace("youtu.be", "youtube.com")
if "spotify.com" in url:
spotify.download_url(url, user_id)
elif "music.yandex.ru" in url:
yandex.load_url(url, user_id)
if "youtube.com" in url:
if "channel" in url or "/c/" in url:
ytmusic = ytmusicapi.YTMusic()
channel_id = url.split("/")[-1]
channel_songs = ytmusic.get_artist(channel_id)["songs"]["results"]
handlers = { for song in channel_songs:
"spotify.com": handle_spotify, process_yb.apply_async(
"music.yandex.ru": handle_yandex, kwargs={
"youtube.com": handle_youtube, "url": f"https://youtube.com/watch?v={song['videoId']}",
} "user_id": user_id,
}
)
for domain, handler in handlers.items(): elif "playlist" in url or "&list=" in url:
if domain in url: ytmusic = ytmusicapi.YTMusic()
return handler(url, user_id)
print("failed to find handler, falling back to search")
return fallback_search(url, user_id)
# Parse the URL and the query string
parsed_url = urlparse(url)
parsed_qs = parse_qs(parsed_url.query)
def normalize_url(url): # Get the playlist ID from the parsed query string
return url.replace("music.youtube.com", "youtube.com").replace( playlist_id = parsed_qs.get("list", [None])[0]
"youtu.be", "youtube.com"
)
if playlist_id:
playlist_songs = ytmusic.get_playlist(playlist_id)["tracks"]
def handle_spotify(url, user_id): else:
spotify.download_url(url, user_id) raise ValueError("No playlist ID found in the URL.")
return url for song in playlist_songs:
process_yb.apply_async(
kwargs={
def handle_yandex(url, user_id): "url": f"https://music.youtube.com/watch?v={song['videoId']}",
yandex.load_url(url, user_id) "user_id": user_id,
return url }
)
else:
def handle_youtube(url, user_id): process_yb.apply_async(kwargs={"url": url, "user_id": user_id})
if "channel" in url or "/c/" in url:
return handle_youtube_channel(url, user_id)
elif "playlist" in url or "&list=" in url:
return handle_youtube_playlist(url, user_id)
else: else:
process_yb.apply_async(kwargs={"url": url, "user_id": user_id}) spotify_manager = SpotifyClientCredentials(
return url client_id=settings.MUSIC_SPOTIFY_ID,
client_secret=settings.MUSIC_SPOTIFY_SECRET,
def handle_youtube_channel(url, user_id):
ytmusic = YTMusic()
channel_id = url.split("/")[-1]
channel_songs = ytmusic.get_artist(channel_id)["songs"]["results"]
for song in channel_songs:
process_yb.apply_async(
kwargs={
"url": f"https://youtube.com/watch?v={song['videoId']}",
"user_id": user_id,
}
) )
return url spotify_search = spotipy.Spotify(client_credentials_manager=spotify_manager)
results = spotify_search.search(q=url, type="track", limit=1)
def handle_youtube_playlist(url, user_id): top_track = (
ytmusic = YTMusic() results["tracks"]["items"][0] if results["tracks"]["items"] else None
parsed_url = urlparse(url)
parsed_qs = parse_qs(parsed_url.query)
playlist_id = parsed_qs.get("list", [None])[0]
if not playlist_id:
raise ValueError("No playlist ID found in the URL.")
playlist_songs = ytmusic.get_playlist(playlist_id)["tracks"]
for song in playlist_songs:
process_yb.apply_async(
kwargs={
"url": f"https://music.youtube.com/watch?v={song['videoId']}",
"user_id": user_id,
}
) )
return url
if top_track:
def fallback_search(url, user_id): spotify.download_url(top_track["external_urls"]["spotify"], user_id)
spotify_manager = SpotifyClientCredentials( url = top_track["external_urls"]["spotify"]
client_id=settings.MUSIC_SPOTIFY_ID,
client_secret=settings.MUSIC_SPOTIFY_SECRET,
)
spotify_search = spotipy.Spotify(client_credentials_manager=spotify_manager)
results = spotify_search.search(q=url, type="track", limit=1)
top_track = results["tracks"]["items"][0] if results["tracks"]["items"] else None
if top_track:
spotify_url = top_track["external_urls"]["spotify"]
spotify.download_url(spotify_url, user_id)
return spotify_url
return url return url