diff --git a/akarpov/music/services/external.py b/akarpov/music/services/external.py index 6bd7d96..372a975 100644 --- a/akarpov/music/services/external.py +++ b/akarpov/music/services/external.py @@ -1,4 +1,6 @@ +import os from functools import wraps +from random import randint from typing import Any import requests @@ -14,7 +16,7 @@ def __init__(self): def _make_request( self, endpoint: str, params: dict = None, **kwargs - ) -> dict | None: + ) -> dict[str, Any] | None: if not self.base_url: return None @@ -33,7 +35,58 @@ def _make_request( return None def get_spotify_info(self, track_name: str) -> dict[str, Any] | None: - return self._make_request("/spotify/search", params={"query": track_name}) + response = self._make_request("/spotify/search", params={"query": track_name}) + if not response: + return None + + if "album_image" in response: + # Download and save image + image_path = self._download_image(response["album_image"]) + if image_path: + response["album_image_path"] = image_path + + return response + + def get_spotify_album_info(self, album_name: str) -> dict[str, Any] | None: + response = self._make_request("/spotify/album", params={"query": album_name}) + if not response: + return None + + if response.get("images"): + image_path = self._download_image(response["images"][0].get("url")) + if image_path: + response["image_path"] = image_path + + return response + + def get_spotify_artist_info(self, artist_name: str) -> dict[str, Any] | None: + response = self._make_request("/spotify/artist", params={"query": artist_name}) + if not response: + return None + + if response.get("images"): + image_path = self._download_image(response["images"][0].get("url")) + if image_path: + response["image_path"] = image_path + + return response + + def _download_image(self, url: str) -> str | None: + if not url: + return None + + try: + response = requests.get(url) + if response.status_code == 200: + image_path = os.path.join( + settings.MEDIA_ROOT, f"tmp_{randint(10000, 99999)}.png" + ) + with open(image_path, "wb") as f: + f.write(response.content) + return image_path + except Exception as e: + logger.error("Failed to download image", error=str(e), url=url) + return None def translate_text( self, text: str, source_lang: str = "auto", target_lang: str = "en" @@ -60,16 +113,30 @@ def wrapper(*args, **kwargs): try: if fallback_func.__name__ == "get_spotify_info": result = client.get_spotify_info(args[1]) # args[1] is track_name + elif fallback_func.__name__ == "get_spotify_album_info": + result = client.get_spotify_album_info(args[0]) # args[0] is album_name + elif fallback_func.__name__ == "get_spotify_artist_info": + result = client.get_spotify_artist_info( + args[0] + ) # args[0] is artist_name elif fallback_func.__name__ == "safe_translate": result = client.translate_text(args[0]) # args[0] is text + else: + logger.warning( + "Unknown function for external service fallback", + function=fallback_func.__name__, + ) + return fallback_func(*args, **kwargs) if result: return result + except Exception as e: logger.error( "External service failed, falling back to local implementation", error=str(e), function=fallback_func.__name__, + args=args, ) return fallback_func(*args, **kwargs) diff --git a/akarpov/music/services/info.py b/akarpov/music/services/info.py index cee6373..cfc134b 100644 --- a/akarpov/music/services/info.py +++ b/akarpov/music/services/info.py @@ -1,5 +1,6 @@ import os from random import randint +from typing import Any import requests import spotipy @@ -81,6 +82,18 @@ def spotify_search(name: str, session: spotipy.Spotify, search_type="track"): return res +def clean_spotify_response(data: dict[str, Any]) -> dict[str, Any]: + if isinstance(data, dict): + return { + k: clean_spotify_response(v) + for k, v in data.items() + if k != "available_markets" + } + elif isinstance(data, list): + return [clean_spotify_response(item) for item in data] + return data + + @external_service_fallback def get_spotify_info(name: str, session: spotipy.Spotify) -> dict: info = { @@ -90,7 +103,11 @@ def get_spotify_info(name: str, session: spotipy.Spotify) -> dict: "artists": [], "artist": "", "title": "", - "genre": "", + "genre": [], + "meta": {}, + "album_meta": {}, + "external_urls": {}, + "full_data": {}, } try: @@ -99,32 +116,47 @@ def get_spotify_info(name: str, session: spotipy.Spotify) -> dict: return info track = results[0] + artist_data = session.artist(track["artists"][0]["external_urls"]["spotify"]) + album_data = session.album(track["album"]["id"]) + info.update( { "album_name": track["album"]["name"], + "album_image": track["album"]["images"][0]["url"] + if track["album"]["images"] + else "", "release": track["album"]["release_date"].split("-")[0], - "album_image": track["album"]["images"][0]["url"], "artists": [artist["name"] for artist in track["artists"]], "artist": track["artists"][0]["name"], "title": track["name"], - # Extract additional data as needed + "genre": artist_data.get("genres", []), + "meta": { + "duration_ms": track.get("duration_ms"), + "explicit": track.get("explicit"), + "popularity": track.get("popularity"), + "preview_url": track.get("preview_url"), + "track_number": track.get("track_number"), + "type": track.get("type"), + }, + "album_meta": clean_spotify_response(album_data), + "external_urls": track.get("external_urls", {}), + "full_data": clean_spotify_response(track), } ) - artist_data = session.artist(track["artists"][0]["external_urls"]["spotify"]) - info["genre"] = artist_data.get("genres", []) + if track["album"]["images"]: + album_image_url = track["album"]["images"][0]["url"] + image_response = requests.get(album_image_url) + if image_response.status_code == 200: + image_path = os.path.join( + settings.MEDIA_ROOT, f"tmp_{randint(10000, 99999)}.png" + ) + with open(image_path, "wb") as f: + f.write(image_response.content) + info["album_image_path"] = image_path - album_image_url = track["album"]["images"][0]["url"] - image_response = requests.get(album_image_url) - if image_response.status_code == 200: - image_path = os.path.join( - settings.MEDIA_ROOT, f"tmp_{randint(10000, 99999)}.png" - ) - with open(image_path, "wb") as f: - f.write(image_response.content) - info["album_image_path"] = image_path - - except Exception: + except Exception as e: + print("Failed to get Spotify info", error=str(e)) return info return info @@ -178,20 +210,101 @@ def search_yandex(name: str): return info -def get_spotify_album_info(album_name: str, session: spotipy.Spotify): - search_result = session.search(q="album:" + album_name, type="album") - albums = search_result.get("albums", {}).get("items", []) - if albums: - return albums[0] - return None +@external_service_fallback +def get_spotify_album_info(album_name: str, session: spotipy.Spotify) -> dict: + info = { + "name": "", + "link": "", + "meta": {}, + "image_url": "", + "release_date": "", + "total_tracks": 0, + "images": [], + "external_urls": {}, + "artists": [], + "genres": [], + "tracks": [], + "full_data": {}, + } + + try: + search_result = session.search(q="album:" + album_name, type="album") + albums = search_result.get("albums", {}).get("items", []) + if not albums: + return info + + album = albums[0] + album_id = album["id"] + full_album = session.album(album_id) + tracks = session.album_tracks(album_id) + + return { + "name": album.get("name", ""), + "link": album.get("external_urls", {}).get("spotify", ""), + "meta": { + "album_type": album.get("album_type", ""), + "release_date_precision": album.get("release_date_precision", ""), + "total_tracks": album.get("total_tracks", 0), + "type": album.get("type", ""), + }, + "image_url": next( + (img["url"] for img in album.get("images", []) if img.get("url")), "" + ), + "release_date": album.get("release_date", ""), + "total_tracks": album.get("total_tracks", 0), + "images": album.get("images", []), + "external_urls": album.get("external_urls", {}), + "artists": clean_spotify_response(album.get("artists", [])), + "genres": clean_spotify_response(full_album.get("genres", [])), + "tracks": clean_spotify_response(tracks.get("items", [])), + "full_data": clean_spotify_response(full_album), + } + except Exception as e: + print("Failed to get album info", error=str(e)) + return info -def get_spotify_artist_info(artist_name: str, session: spotipy.Spotify): - search_result = session.search(q="artist:" + artist_name, type="artist") - artists = search_result.get("artists", {}).get("items", []) - if artists: - return artists[0] - return None +@external_service_fallback +def get_spotify_artist_info(artist_name: str, session: spotipy.Spotify) -> dict: + info = { + "name": "", + "link": "", + "meta": {}, + "image_url": "", + "genres": [], + "popularity": 0, + "images": [], + "external_urls": {}, + "full_data": {}, + } + + try: + search_result = session.search(q="artist:" + artist_name, type="artist") + artists = search_result.get("artists", {}).get("items", []) + if not artists: + return info + + artist = artists[0] + return { + "name": artist.get("name", ""), + "link": artist.get("external_urls", {}).get("spotify", ""), + "meta": { + "followers": artist.get("followers", {}).get("total", 0), + "popularity": artist.get("popularity", 0), + "type": artist.get("type", ""), + }, + "image_url": next( + (img["url"] for img in artist.get("images", []) if img.get("url")), "" + ), + "genres": artist.get("genres", []), + "popularity": artist.get("popularity", 0), + "images": artist.get("images", []), + "external_urls": artist.get("external_urls", {}), + "full_data": clean_spotify_response(artist), + } + except Exception as e: + print("Failed to get artist info", error=str(e)) + return info def get_yandex_album_info(album_name: str, client: Client):