mirror of
https://github.com/Alexander-D-Karpov/akarpov
synced 2024-11-24 02:03:49 +03:00
Compare commits
No commits in common. "5e9c01fc29c6e28cb7dce30be1dda72de721d1b9" and "b63beb8da8b2171eb8fd4479277ca53e6487b7bb" have entirely different histories.
5e9c01fc29
...
b63beb8da8
|
@ -21,33 +21,30 @@
|
||||||
from akarpov.utils.text import is_similar_artist, normalize_text
|
from akarpov.utils.text import is_similar_artist, normalize_text
|
||||||
|
|
||||||
|
|
||||||
def generate_readable_slug(name: str, model: Model) -> str:
|
def generate_readable_slug(name: str, model) -> str:
|
||||||
# Translate and slugify the name
|
# Translate and slugify the name
|
||||||
slug = safe_translate(name)
|
slug = str(
|
||||||
|
slugify(
|
||||||
|
GoogleTranslator(source="auto", target="en").translate(
|
||||||
|
name,
|
||||||
|
target_language="en",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
# Truncate slug if it's too long
|
|
||||||
if len(slug) > 20:
|
if len(slug) > 20:
|
||||||
slug = slug[:20]
|
slug = slug[:20]
|
||||||
last_dash = slug.rfind("-")
|
last_dash = slug.rfind("-")
|
||||||
if last_dash != -1:
|
if last_dash != -1:
|
||||||
slug = slug[:last_dash]
|
slug = slug[:last_dash]
|
||||||
|
|
||||||
original_slug = slug
|
|
||||||
|
|
||||||
# Ensure uniqueness
|
|
||||||
counter = 1
|
|
||||||
while model.objects.filter(slug=slug).exists():
|
while model.objects.filter(slug=slug).exists():
|
||||||
if len(original_slug) > 14:
|
if len(slug) > 14:
|
||||||
truncated_slug = original_slug[:14]
|
slug = slug[:14]
|
||||||
last_dash = truncated_slug.rfind("-")
|
last_dash = slug.rfind("-")
|
||||||
if last_dash != -1:
|
if last_dash != -1:
|
||||||
truncated_slug = truncated_slug[:last_dash]
|
slug = slug[:last_dash]
|
||||||
else:
|
slug = slug + "_" + generate_charset(5)
|
||||||
truncated_slug = original_slug
|
|
||||||
|
|
||||||
suffix = f"_{generate_charset(5)}" if counter == 1 else f"_{counter}"
|
|
||||||
slug = f"{truncated_slug}{suffix}"
|
|
||||||
counter += 1
|
|
||||||
|
|
||||||
return slug
|
return slug
|
||||||
|
|
||||||
|
@ -220,12 +217,16 @@ def update_album_info(album: AlbumModel, author_name: str = None) -> None:
|
||||||
client = yandex_login()
|
client = yandex_login()
|
||||||
spotify_session = create_spotify_session()
|
spotify_session = create_spotify_session()
|
||||||
|
|
||||||
search_term = f"{album.name} - {author_name}" if author_name else album.name
|
if author_name:
|
||||||
|
yandex_album_info = get_yandex_album_info(
|
||||||
yandex_album_info = get_api_info(get_yandex_album_info, search_term, client)
|
album.name + " - " + author_name, client
|
||||||
spotify_album_info = get_api_info(
|
)
|
||||||
get_spotify_album_info, search_term, spotify_session
|
spotify_album_info = get_spotify_album_info(
|
||||||
)
|
album.name + " - " + author_name, spotify_session
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
yandex_album_info = get_yandex_album_info(album.name, client)
|
||||||
|
spotify_album_info = get_spotify_album_info(album.name, spotify_session)
|
||||||
|
|
||||||
# Combine and prioritize Spotify data
|
# Combine and prioritize Spotify data
|
||||||
album_data = {}
|
album_data = {}
|
||||||
|
@ -235,14 +236,14 @@ def update_album_info(album: AlbumModel, author_name: str = None) -> None:
|
||||||
"name": spotify_album_info.get("name", album.name),
|
"name": spotify_album_info.get("name", album.name),
|
||||||
"release_date": spotify_album_info.get("release_date", ""),
|
"release_date": spotify_album_info.get("release_date", ""),
|
||||||
"total_tracks": spotify_album_info.get("total_tracks", ""),
|
"total_tracks": spotify_album_info.get("total_tracks", ""),
|
||||||
"link": spotify_album_info.get("external_urls", {}).get("spotify", ""),
|
"link": spotify_album_info["external_urls"]["spotify"],
|
||||||
"genre": spotify_album_info.get("genres", []),
|
"genre": spotify_album_info.get("genres", []),
|
||||||
}
|
}
|
||||||
if yandex_album_info:
|
if yandex_album_info:
|
||||||
album_data.update(
|
album_data.update(
|
||||||
{
|
{
|
||||||
"name": album_data.get("name") or yandex_album_info.title,
|
"name": album_data.get("name", yandex_album_info.title),
|
||||||
"genre": album_data.get("genre") or yandex_album_info.genre,
|
"genre": album_data.get("genre", yandex_album_info.genre),
|
||||||
"description": yandex_album_info.description,
|
"description": yandex_album_info.description,
|
||||||
"type": yandex_album_info.type,
|
"type": yandex_album_info.type,
|
||||||
}
|
}
|
||||||
|
@ -252,120 +253,102 @@ def update_album_info(album: AlbumModel, author_name: str = None) -> None:
|
||||||
album.save()
|
album.save()
|
||||||
|
|
||||||
# Handle Album Image - Prefer Spotify, fallback to Yandex
|
# Handle Album Image - Prefer Spotify, fallback to Yandex
|
||||||
image_path = get_album_image(spotify_album_info, yandex_album_info)
|
image_path = None
|
||||||
|
if (
|
||||||
|
spotify_album_info
|
||||||
|
and "images" in spotify_album_info
|
||||||
|
and spotify_album_info["images"]
|
||||||
|
):
|
||||||
|
image_path = download_image(
|
||||||
|
spotify_album_info["images"][0]["url"], settings.MEDIA_ROOT
|
||||||
|
)
|
||||||
|
elif yandex_album_info and yandex_album_info.cover_uri:
|
||||||
|
image_path = download_image(
|
||||||
|
"https://" + yandex_album_info.cover_uri, settings.MEDIA_ROOT
|
||||||
|
)
|
||||||
|
|
||||||
|
generated_name = slugify(
|
||||||
|
GoogleTranslator(source="auto", target="en").translate(
|
||||||
|
album.name,
|
||||||
|
target_language="en",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
if image_path:
|
if image_path:
|
||||||
save_album_image(album, image_path)
|
|
||||||
|
|
||||||
# Update Album Authors from Spotify data if available
|
|
||||||
if spotify_album_info and "artists" in spotify_album_info:
|
|
||||||
update_album_authors(album, spotify_album_info["artists"])
|
|
||||||
|
|
||||||
album.slug = generate_readable_slug(album.name, AlbumModel)
|
|
||||||
album.save()
|
|
||||||
|
|
||||||
|
|
||||||
def get_album_image(spotify_info, yandex_info):
|
|
||||||
if spotify_info and "images" in spotify_info and spotify_info["images"]:
|
|
||||||
return download_image(spotify_info["images"][0]["url"], settings.MEDIA_ROOT)
|
|
||||||
elif yandex_info and yandex_info.cover_uri:
|
|
||||||
return download_image("https://" + yandex_info.cover_uri, settings.MEDIA_ROOT)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def save_album_image(album, image_path):
|
|
||||||
if not image_path:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
generated_name = safe_translate(album.name)
|
|
||||||
with open(image_path, "rb") as f:
|
with open(image_path, "rb") as f:
|
||||||
album.image.save(
|
album.image.save(
|
||||||
generated_name + ".png",
|
generated_name + ".png",
|
||||||
File(f, name=generated_name + ".png"),
|
File(
|
||||||
|
f,
|
||||||
|
name=generated_name + ".png",
|
||||||
|
),
|
||||||
save=True,
|
save=True,
|
||||||
)
|
)
|
||||||
os.remove(image_path)
|
os.remove(image_path)
|
||||||
album.save()
|
album.save()
|
||||||
except Exception as e:
|
|
||||||
print(f"Error saving album image: {str(e)}")
|
|
||||||
|
|
||||||
|
# Update Album Authors from Spotify data if available
|
||||||
|
if spotify_album_info and "artists" in spotify_album_info:
|
||||||
|
album_authors = []
|
||||||
|
for artist in spotify_album_info["artists"]:
|
||||||
|
author, created = Author.objects.get_or_create(name=artist["name"])
|
||||||
|
album_authors.append(author)
|
||||||
|
album.authors.set(album_authors)
|
||||||
|
|
||||||
def update_album_authors(album, artists):
|
album.slug = generate_readable_slug(album.name, AlbumModel)
|
||||||
album_authors = []
|
album.save()
|
||||||
for artist in artists:
|
|
||||||
author, created = Author.objects.get_or_create(name=artist["name"])
|
|
||||||
album_authors.append(author)
|
|
||||||
album.authors.set(album_authors)
|
|
||||||
|
|
||||||
|
|
||||||
def update_author_info(author: Author) -> None:
|
def update_author_info(author: Author) -> None:
|
||||||
client = yandex_login()
|
client = yandex_login()
|
||||||
spotify_session = create_spotify_session()
|
spotify_session = create_spotify_session()
|
||||||
|
|
||||||
yandex_artist_info = get_api_info(get_yandex_artist_info, author.name, client)
|
# Retrieve info from both services
|
||||||
spotify_artist_info = get_api_info(
|
yandex_artist_info = get_yandex_artist_info(author.name, client)
|
||||||
get_spotify_artist_info, author.name, spotify_session
|
spotify_artist_info = get_spotify_artist_info(author.name, spotify_session)
|
||||||
)
|
|
||||||
|
|
||||||
author_data = combine_artist_data(author, spotify_artist_info, yandex_artist_info)
|
# Combine and prioritize Spotify data
|
||||||
|
|
||||||
with transaction.atomic():
|
|
||||||
author.meta = author_data
|
|
||||||
author.save()
|
|
||||||
|
|
||||||
image_path = get_author_image(spotify_artist_info, yandex_artist_info)
|
|
||||||
|
|
||||||
if image_path:
|
|
||||||
save_author_image(author, image_path)
|
|
||||||
|
|
||||||
author.slug = generate_readable_slug(author.name, Author)
|
|
||||||
with transaction.atomic():
|
|
||||||
author.save()
|
|
||||||
|
|
||||||
|
|
||||||
def get_api_info(api_func, search_term, session):
|
|
||||||
try:
|
|
||||||
return api_func(search_term, session)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error fetching info from {api_func.__name__}: {str(e)}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def combine_artist_data(author, spotify_info, yandex_info):
|
|
||||||
author_data = {}
|
author_data = {}
|
||||||
if spotify_info:
|
if spotify_artist_info:
|
||||||
author_data = {
|
author_data = {
|
||||||
"name": spotify_info.get("name", author.name),
|
"name": spotify_artist_info.get("name", author.name),
|
||||||
"genres": spotify_info.get("genres", []),
|
"genres": spotify_artist_info.get("genres", []),
|
||||||
"popularity": spotify_info.get("popularity", 0),
|
"popularity": spotify_artist_info.get("popularity", 0),
|
||||||
"link": spotify_info.get("external_urls", {}).get("spotify", ""),
|
"link": spotify_artist_info["external_urls"]["spotify"],
|
||||||
}
|
}
|
||||||
if yandex_info:
|
if yandex_artist_info:
|
||||||
author_data.update(
|
author_data.update(
|
||||||
{
|
{
|
||||||
"name": author_data.get("name") or yandex_info.name,
|
"name": author_data.get("name", yandex_artist_info.name),
|
||||||
"genres": author_data.get("genres") or yandex_info.genres,
|
"genres": author_data.get("genres", yandex_artist_info.genres),
|
||||||
"description": yandex_info.description,
|
"description": yandex_artist_info.description,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
return author_data
|
|
||||||
|
|
||||||
|
author.meta = author_data
|
||||||
|
with transaction.atomic():
|
||||||
|
author.save()
|
||||||
|
|
||||||
def get_author_image(spotify_info, yandex_info):
|
# Handle Author Image - Prefer Spotify, fallback to Yandex
|
||||||
if spotify_info and "images" in spotify_info and spotify_info["images"]:
|
image_path = None
|
||||||
return download_image(spotify_info["images"][0]["url"], settings.MEDIA_ROOT)
|
if (
|
||||||
elif yandex_info and yandex_info.cover:
|
spotify_artist_info
|
||||||
return download_image(yandex_info.cover, settings.MEDIA_ROOT)
|
and "images" in spotify_artist_info
|
||||||
return None
|
and spotify_artist_info["images"]
|
||||||
|
):
|
||||||
|
image_path = download_image(
|
||||||
|
spotify_artist_info["images"][0]["url"], settings.MEDIA_ROOT
|
||||||
|
)
|
||||||
|
elif yandex_artist_info and yandex_artist_info.cover:
|
||||||
|
image_path = download_image(yandex_artist_info.cover, settings.MEDIA_ROOT)
|
||||||
|
|
||||||
|
generated_name = slugify(
|
||||||
def save_author_image(author, image_path):
|
GoogleTranslator(source="auto", target="en").translate(
|
||||||
if not image_path:
|
author.name,
|
||||||
return
|
target_language="en",
|
||||||
|
)
|
||||||
try:
|
)
|
||||||
generated_name = safe_translate(author.name)
|
if image_path:
|
||||||
with open(image_path, "rb") as f:
|
with open(image_path, "rb") as f:
|
||||||
author.image.save(
|
author.image.save(
|
||||||
generated_name + ".png",
|
generated_name + ".png",
|
||||||
|
@ -374,17 +357,10 @@ def save_author_image(author, image_path):
|
||||||
)
|
)
|
||||||
os.remove(image_path)
|
os.remove(image_path)
|
||||||
author.save()
|
author.save()
|
||||||
except Exception as e:
|
|
||||||
print(f"Error saving author image: {str(e)}")
|
|
||||||
|
|
||||||
|
author.slug = generate_readable_slug(author.name, Author)
|
||||||
def safe_translate(text):
|
with transaction.atomic():
|
||||||
try:
|
author.save()
|
||||||
translated = GoogleTranslator(source="auto", target="en").translate(text)
|
|
||||||
return slugify(translated)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error translating text: {str(e)}")
|
|
||||||
return slugify(text)
|
|
||||||
|
|
||||||
|
|
||||||
def search_all_platforms(track_name: str) -> dict:
|
def search_all_platforms(track_name: str) -> dict:
|
||||||
|
|
|
@ -31,97 +31,66 @@
|
||||||
|
|
||||||
@shared_task(soft_time_limit=60 * 60, time_limit=60 * 120)
|
@shared_task(soft_time_limit=60 * 60, time_limit=60 * 120)
|
||||||
def list_tracks(url, user_id):
|
def list_tracks(url, user_id):
|
||||||
url = normalize_url(url)
|
if "music.youtube.com" in url or "youtu.be" in url:
|
||||||
|
url = url.replace("music.youtube.com", "youtube.com")
|
||||||
|
url = url.replace("youtu.be", "youtube.com")
|
||||||
|
if "spotify.com" in url:
|
||||||
|
spotify.download_url(url, user_id)
|
||||||
|
elif "music.yandex.ru" in url:
|
||||||
|
yandex.load_url(url, user_id)
|
||||||
|
if "youtube.com" in url:
|
||||||
|
if "channel" in url or "/c/" in url:
|
||||||
|
ytmusic = ytmusicapi.YTMusic()
|
||||||
|
channel_id = url.split("/")[-1]
|
||||||
|
channel_songs = ytmusic.get_artist(channel_id)["songs"]["results"]
|
||||||
|
|
||||||
handlers = {
|
for song in channel_songs:
|
||||||
"spotify.com": handle_spotify,
|
process_yb.apply_async(
|
||||||
"music.yandex.ru": handle_yandex,
|
kwargs={
|
||||||
"youtube.com": handle_youtube,
|
"url": f"https://youtube.com/watch?v={song['videoId']}",
|
||||||
}
|
"user_id": user_id,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
for domain, handler in handlers.items():
|
elif "playlist" in url or "&list=" in url:
|
||||||
if domain in url:
|
ytmusic = ytmusicapi.YTMusic()
|
||||||
return handler(url, user_id)
|
|
||||||
print("failed to find handler, falling back to search")
|
|
||||||
return fallback_search(url, user_id)
|
|
||||||
|
|
||||||
|
# Parse the URL and the query string
|
||||||
|
parsed_url = urlparse(url)
|
||||||
|
parsed_qs = parse_qs(parsed_url.query)
|
||||||
|
|
||||||
def normalize_url(url):
|
# Get the playlist ID from the parsed query string
|
||||||
return url.replace("music.youtube.com", "youtube.com").replace(
|
playlist_id = parsed_qs.get("list", [None])[0]
|
||||||
"youtu.be", "youtube.com"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
if playlist_id:
|
||||||
|
playlist_songs = ytmusic.get_playlist(playlist_id)["tracks"]
|
||||||
|
|
||||||
def handle_spotify(url, user_id):
|
else:
|
||||||
spotify.download_url(url, user_id)
|
raise ValueError("No playlist ID found in the URL.")
|
||||||
return url
|
for song in playlist_songs:
|
||||||
|
process_yb.apply_async(
|
||||||
|
kwargs={
|
||||||
def handle_yandex(url, user_id):
|
"url": f"https://music.youtube.com/watch?v={song['videoId']}",
|
||||||
yandex.load_url(url, user_id)
|
"user_id": user_id,
|
||||||
return url
|
}
|
||||||
|
)
|
||||||
|
else:
|
||||||
def handle_youtube(url, user_id):
|
process_yb.apply_async(kwargs={"url": url, "user_id": user_id})
|
||||||
if "channel" in url or "/c/" in url:
|
|
||||||
return handle_youtube_channel(url, user_id)
|
|
||||||
elif "playlist" in url or "&list=" in url:
|
|
||||||
return handle_youtube_playlist(url, user_id)
|
|
||||||
else:
|
else:
|
||||||
process_yb.apply_async(kwargs={"url": url, "user_id": user_id})
|
spotify_manager = SpotifyClientCredentials(
|
||||||
return url
|
client_id=settings.MUSIC_SPOTIFY_ID,
|
||||||
|
client_secret=settings.MUSIC_SPOTIFY_SECRET,
|
||||||
|
|
||||||
def handle_youtube_channel(url, user_id):
|
|
||||||
ytmusic = YTMusic()
|
|
||||||
channel_id = url.split("/")[-1]
|
|
||||||
channel_songs = ytmusic.get_artist(channel_id)["songs"]["results"]
|
|
||||||
|
|
||||||
for song in channel_songs:
|
|
||||||
process_yb.apply_async(
|
|
||||||
kwargs={
|
|
||||||
"url": f"https://youtube.com/watch?v={song['videoId']}",
|
|
||||||
"user_id": user_id,
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
return url
|
spotify_search = spotipy.Spotify(client_credentials_manager=spotify_manager)
|
||||||
|
|
||||||
|
results = spotify_search.search(q=url, type="track", limit=1)
|
||||||
def handle_youtube_playlist(url, user_id):
|
top_track = (
|
||||||
ytmusic = YTMusic()
|
results["tracks"]["items"][0] if results["tracks"]["items"] else None
|
||||||
parsed_url = urlparse(url)
|
|
||||||
parsed_qs = parse_qs(parsed_url.query)
|
|
||||||
playlist_id = parsed_qs.get("list", [None])[0]
|
|
||||||
|
|
||||||
if not playlist_id:
|
|
||||||
raise ValueError("No playlist ID found in the URL.")
|
|
||||||
|
|
||||||
playlist_songs = ytmusic.get_playlist(playlist_id)["tracks"]
|
|
||||||
|
|
||||||
for song in playlist_songs:
|
|
||||||
process_yb.apply_async(
|
|
||||||
kwargs={
|
|
||||||
"url": f"https://music.youtube.com/watch?v={song['videoId']}",
|
|
||||||
"user_id": user_id,
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
return url
|
|
||||||
|
|
||||||
|
if top_track:
|
||||||
def fallback_search(url, user_id):
|
spotify.download_url(top_track["external_urls"]["spotify"], user_id)
|
||||||
spotify_manager = SpotifyClientCredentials(
|
url = top_track["external_urls"]["spotify"]
|
||||||
client_id=settings.MUSIC_SPOTIFY_ID,
|
|
||||||
client_secret=settings.MUSIC_SPOTIFY_SECRET,
|
|
||||||
)
|
|
||||||
spotify_search = spotipy.Spotify(client_credentials_manager=spotify_manager)
|
|
||||||
|
|
||||||
results = spotify_search.search(q=url, type="track", limit=1)
|
|
||||||
top_track = results["tracks"]["items"][0] if results["tracks"]["items"] else None
|
|
||||||
|
|
||||||
if top_track:
|
|
||||||
spotify_url = top_track["external_urls"]["spotify"]
|
|
||||||
spotify.download_url(spotify_url, user_id)
|
|
||||||
return spotify_url
|
|
||||||
|
|
||||||
return url
|
return url
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user