updated music listing

This commit is contained in:
Alexander Karpov 2024-08-28 20:29:39 +03:00
parent b63beb8da8
commit 1fa8f1b9e3

View File

@ -31,66 +31,97 @@
@shared_task(soft_time_limit=60 * 60, time_limit=60 * 120) @shared_task(soft_time_limit=60 * 60, time_limit=60 * 120)
def list_tracks(url, user_id): def list_tracks(url, user_id):
if "music.youtube.com" in url or "youtu.be" in url: url = normalize_url(url)
url = url.replace("music.youtube.com", "youtube.com")
url = url.replace("youtu.be", "youtube.com")
if "spotify.com" in url:
spotify.download_url(url, user_id)
elif "music.yandex.ru" in url:
yandex.load_url(url, user_id)
if "youtube.com" in url:
if "channel" in url or "/c/" in url:
ytmusic = ytmusicapi.YTMusic()
channel_id = url.split("/")[-1]
channel_songs = ytmusic.get_artist(channel_id)["songs"]["results"]
for song in channel_songs: handlers = {
process_yb.apply_async( "spotify.com": handle_spotify,
kwargs={ "music.yandex.ru": handle_yandex,
"url": f"https://youtube.com/watch?v={song['videoId']}", "youtube.com": handle_youtube,
"user_id": user_id, }
}
)
elif "playlist" in url or "&list=" in url: for domain, handler in handlers.items():
ytmusic = ytmusicapi.YTMusic() if domain in url:
return handler(url, user_id)
print("failed to find handler, falling back to search")
return fallback_search(url, user_id)
# Parse the URL and the query string
parsed_url = urlparse(url)
parsed_qs = parse_qs(parsed_url.query)
# Get the playlist ID from the parsed query string def normalize_url(url):
playlist_id = parsed_qs.get("list", [None])[0] return url.replace("music.youtube.com", "youtube.com").replace(
"youtu.be", "youtube.com"
)
if playlist_id:
playlist_songs = ytmusic.get_playlist(playlist_id)["tracks"]
else: def handle_spotify(url, user_id):
raise ValueError("No playlist ID found in the URL.") spotify.download_url(url, user_id)
for song in playlist_songs: return url
process_yb.apply_async(
kwargs={
"url": f"https://music.youtube.com/watch?v={song['videoId']}", def handle_yandex(url, user_id):
"user_id": user_id, yandex.load_url(url, user_id)
} return url
)
else:
process_yb.apply_async(kwargs={"url": url, "user_id": user_id}) def handle_youtube(url, user_id):
if "channel" in url or "/c/" in url:
return handle_youtube_channel(url, user_id)
elif "playlist" in url or "&list=" in url:
return handle_youtube_playlist(url, user_id)
else: else:
spotify_manager = SpotifyClientCredentials( process_yb.apply_async(kwargs={"url": url, "user_id": user_id})
client_id=settings.MUSIC_SPOTIFY_ID, return url
client_secret=settings.MUSIC_SPOTIFY_SECRET,
)
spotify_search = spotipy.Spotify(client_credentials_manager=spotify_manager)
results = spotify_search.search(q=url, type="track", limit=1)
top_track = (
results["tracks"]["items"][0] if results["tracks"]["items"] else None
)
if top_track: def handle_youtube_channel(url, user_id):
spotify.download_url(top_track["external_urls"]["spotify"], user_id) ytmusic = YTMusic()
url = top_track["external_urls"]["spotify"] channel_id = url.split("/")[-1]
channel_songs = ytmusic.get_artist(channel_id)["songs"]["results"]
for song in channel_songs:
process_yb.apply_async(
kwargs={
"url": f"https://youtube.com/watch?v={song['videoId']}",
"user_id": user_id,
}
)
return url
def handle_youtube_playlist(url, user_id):
ytmusic = YTMusic()
parsed_url = urlparse(url)
parsed_qs = parse_qs(parsed_url.query)
playlist_id = parsed_qs.get("list", [None])[0]
if not playlist_id:
raise ValueError("No playlist ID found in the URL.")
playlist_songs = ytmusic.get_playlist(playlist_id)["tracks"]
for song in playlist_songs:
process_yb.apply_async(
kwargs={
"url": f"https://music.youtube.com/watch?v={song['videoId']}",
"user_id": user_id,
}
)
return url
def fallback_search(url, user_id):
spotify_manager = SpotifyClientCredentials(
client_id=settings.MUSIC_SPOTIFY_ID,
client_secret=settings.MUSIC_SPOTIFY_SECRET,
)
spotify_search = spotipy.Spotify(client_credentials_manager=spotify_manager)
results = spotify_search.search(q=url, type="track", limit=1)
top_track = results["tracks"]["items"][0] if results["tracks"]["items"] else None
if top_track:
spotify_url = top_track["external_urls"]["spotify"]
spotify.download_url(spotify_url, user_id)
return spotify_url
return url return url