diff --git a/akarpov/music/tasks.py b/akarpov/music/tasks.py index 1e0203c..f481fb9 100644 --- a/akarpov/music/tasks.py +++ b/akarpov/music/tasks.py @@ -31,66 +31,97 @@ @shared_task(soft_time_limit=60 * 60, time_limit=60 * 120) def list_tracks(url, user_id): - if "music.youtube.com" in url or "youtu.be" in url: - url = url.replace("music.youtube.com", "youtube.com") - url = url.replace("youtu.be", "youtube.com") - if "spotify.com" in url: - spotify.download_url(url, user_id) - elif "music.yandex.ru" in url: - yandex.load_url(url, user_id) - if "youtube.com" in url: - if "channel" in url or "/c/" in url: - ytmusic = ytmusicapi.YTMusic() - channel_id = url.split("/")[-1] - channel_songs = ytmusic.get_artist(channel_id)["songs"]["results"] + url = normalize_url(url) - for song in channel_songs: - process_yb.apply_async( - kwargs={ - "url": f"https://youtube.com/watch?v={song['videoId']}", - "user_id": user_id, - } - ) + handlers = { + "spotify.com": handle_spotify, + "music.yandex.ru": handle_yandex, + "youtube.com": handle_youtube, + } - elif "playlist" in url or "&list=" in url: - ytmusic = ytmusicapi.YTMusic() + for domain, handler in handlers.items(): + if domain in url: + return handler(url, user_id) + print("failed to find handler, falling back to search") + return fallback_search(url, user_id) - # Parse the URL and the query string - parsed_url = urlparse(url) - parsed_qs = parse_qs(parsed_url.query) - # Get the playlist ID from the parsed query string - playlist_id = parsed_qs.get("list", [None])[0] +def normalize_url(url): + return url.replace("music.youtube.com", "youtube.com").replace( + "youtu.be", "youtube.com" + ) - if playlist_id: - playlist_songs = ytmusic.get_playlist(playlist_id)["tracks"] - else: - raise ValueError("No playlist ID found in the URL.") - for song in playlist_songs: - process_yb.apply_async( - kwargs={ - "url": f"https://music.youtube.com/watch?v={song['videoId']}", - "user_id": user_id, - } - ) - else: - process_yb.apply_async(kwargs={"url": url, "user_id": user_id}) +def handle_spotify(url, user_id): + spotify.download_url(url, user_id) + return url + + +def handle_yandex(url, user_id): + yandex.load_url(url, user_id) + return url + + +def handle_youtube(url, user_id): + if "channel" in url or "/c/" in url: + return handle_youtube_channel(url, user_id) + elif "playlist" in url or "&list=" in url: + return handle_youtube_playlist(url, user_id) else: - spotify_manager = SpotifyClientCredentials( - client_id=settings.MUSIC_SPOTIFY_ID, - client_secret=settings.MUSIC_SPOTIFY_SECRET, - ) - spotify_search = spotipy.Spotify(client_credentials_manager=spotify_manager) + process_yb.apply_async(kwargs={"url": url, "user_id": user_id}) + return url - results = spotify_search.search(q=url, type="track", limit=1) - top_track = ( - results["tracks"]["items"][0] if results["tracks"]["items"] else None - ) - if top_track: - spotify.download_url(top_track["external_urls"]["spotify"], user_id) - url = top_track["external_urls"]["spotify"] +def handle_youtube_channel(url, user_id): + ytmusic = YTMusic() + channel_id = url.split("/")[-1] + channel_songs = ytmusic.get_artist(channel_id)["songs"]["results"] + + for song in channel_songs: + process_yb.apply_async( + kwargs={ + "url": f"https://youtube.com/watch?v={song['videoId']}", + "user_id": user_id, + } + ) + return url + + +def handle_youtube_playlist(url, user_id): + ytmusic = YTMusic() + parsed_url = urlparse(url) + parsed_qs = parse_qs(parsed_url.query) + playlist_id = parsed_qs.get("list", [None])[0] + + if not playlist_id: + raise ValueError("No playlist ID found in the URL.") + + playlist_songs = ytmusic.get_playlist(playlist_id)["tracks"] + + for song in playlist_songs: + process_yb.apply_async( + kwargs={ + "url": f"https://music.youtube.com/watch?v={song['videoId']}", + "user_id": user_id, + } + ) + return url + + +def fallback_search(url, user_id): + spotify_manager = SpotifyClientCredentials( + client_id=settings.MUSIC_SPOTIFY_ID, + client_secret=settings.MUSIC_SPOTIFY_SECRET, + ) + spotify_search = spotipy.Spotify(client_credentials_manager=spotify_manager) + + results = spotify_search.search(q=url, type="track", limit=1) + top_track = results["tracks"]["items"][0] if results["tracks"]["items"] else None + + if top_track: + spotify_url = top_track["external_urls"]["spotify"] + spotify.download_url(spotify_url, user_id) + return spotify_url return url