updated external service support

This commit is contained in:
Alexander Karpov 2024-12-02 20:57:19 +03:00
parent 398820489f
commit 2a7f1eae88
2 changed files with 210 additions and 30 deletions

View File

@ -1,4 +1,6 @@
import os
from functools import wraps
from random import randint
from typing import Any
import requests
@ -14,7 +16,7 @@ def __init__(self):
def _make_request(
self, endpoint: str, params: dict = None, **kwargs
) -> dict | None:
) -> dict[str, Any] | None:
if not self.base_url:
return None
@ -33,7 +35,58 @@ def _make_request(
return None
def get_spotify_info(self, track_name: str) -> dict[str, Any] | None:
return self._make_request("/spotify/search", params={"query": track_name})
response = self._make_request("/spotify/search", params={"query": track_name})
if not response:
return None
if "album_image" in response:
# Download and save image
image_path = self._download_image(response["album_image"])
if image_path:
response["album_image_path"] = image_path
return response
def get_spotify_album_info(self, album_name: str) -> dict[str, Any] | None:
response = self._make_request("/spotify/album", params={"query": album_name})
if not response:
return None
if response.get("images"):
image_path = self._download_image(response["images"][0].get("url"))
if image_path:
response["image_path"] = image_path
return response
def get_spotify_artist_info(self, artist_name: str) -> dict[str, Any] | None:
response = self._make_request("/spotify/artist", params={"query": artist_name})
if not response:
return None
if response.get("images"):
image_path = self._download_image(response["images"][0].get("url"))
if image_path:
response["image_path"] = image_path
return response
def _download_image(self, url: str) -> str | None:
if not url:
return None
try:
response = requests.get(url)
if response.status_code == 200:
image_path = os.path.join(
settings.MEDIA_ROOT, f"tmp_{randint(10000, 99999)}.png"
)
with open(image_path, "wb") as f:
f.write(response.content)
return image_path
except Exception as e:
logger.error("Failed to download image", error=str(e), url=url)
return None
def translate_text(
self, text: str, source_lang: str = "auto", target_lang: str = "en"
@ -60,16 +113,30 @@ def wrapper(*args, **kwargs):
try:
if fallback_func.__name__ == "get_spotify_info":
result = client.get_spotify_info(args[1]) # args[1] is track_name
elif fallback_func.__name__ == "get_spotify_album_info":
result = client.get_spotify_album_info(args[0]) # args[0] is album_name
elif fallback_func.__name__ == "get_spotify_artist_info":
result = client.get_spotify_artist_info(
args[0]
) # args[0] is artist_name
elif fallback_func.__name__ == "safe_translate":
result = client.translate_text(args[0]) # args[0] is text
else:
logger.warning(
"Unknown function for external service fallback",
function=fallback_func.__name__,
)
return fallback_func(*args, **kwargs)
if result:
return result
except Exception as e:
logger.error(
"External service failed, falling back to local implementation",
error=str(e),
function=fallback_func.__name__,
args=args,
)
return fallback_func(*args, **kwargs)

View File

@ -1,5 +1,6 @@
import os
from random import randint
from typing import Any
import requests
import spotipy
@ -81,6 +82,18 @@ def spotify_search(name: str, session: spotipy.Spotify, search_type="track"):
return res
def clean_spotify_response(data: dict[str, Any]) -> dict[str, Any]:
if isinstance(data, dict):
return {
k: clean_spotify_response(v)
for k, v in data.items()
if k != "available_markets"
}
elif isinstance(data, list):
return [clean_spotify_response(item) for item in data]
return data
@external_service_fallback
def get_spotify_info(name: str, session: spotipy.Spotify) -> dict:
info = {
@ -90,7 +103,11 @@ def get_spotify_info(name: str, session: spotipy.Spotify) -> dict:
"artists": [],
"artist": "",
"title": "",
"genre": "",
"genre": [],
"meta": {},
"album_meta": {},
"external_urls": {},
"full_data": {},
}
try:
@ -99,32 +116,47 @@ def get_spotify_info(name: str, session: spotipy.Spotify) -> dict:
return info
track = results[0]
artist_data = session.artist(track["artists"][0]["external_urls"]["spotify"])
album_data = session.album(track["album"]["id"])
info.update(
{
"album_name": track["album"]["name"],
"album_image": track["album"]["images"][0]["url"]
if track["album"]["images"]
else "",
"release": track["album"]["release_date"].split("-")[0],
"album_image": track["album"]["images"][0]["url"],
"artists": [artist["name"] for artist in track["artists"]],
"artist": track["artists"][0]["name"],
"title": track["name"],
# Extract additional data as needed
"genre": artist_data.get("genres", []),
"meta": {
"duration_ms": track.get("duration_ms"),
"explicit": track.get("explicit"),
"popularity": track.get("popularity"),
"preview_url": track.get("preview_url"),
"track_number": track.get("track_number"),
"type": track.get("type"),
},
"album_meta": clean_spotify_response(album_data),
"external_urls": track.get("external_urls", {}),
"full_data": clean_spotify_response(track),
}
)
artist_data = session.artist(track["artists"][0]["external_urls"]["spotify"])
info["genre"] = artist_data.get("genres", [])
if track["album"]["images"]:
album_image_url = track["album"]["images"][0]["url"]
image_response = requests.get(album_image_url)
if image_response.status_code == 200:
image_path = os.path.join(
settings.MEDIA_ROOT, f"tmp_{randint(10000, 99999)}.png"
)
with open(image_path, "wb") as f:
f.write(image_response.content)
info["album_image_path"] = image_path
album_image_url = track["album"]["images"][0]["url"]
image_response = requests.get(album_image_url)
if image_response.status_code == 200:
image_path = os.path.join(
settings.MEDIA_ROOT, f"tmp_{randint(10000, 99999)}.png"
)
with open(image_path, "wb") as f:
f.write(image_response.content)
info["album_image_path"] = image_path
except Exception:
except Exception as e:
print("Failed to get Spotify info", error=str(e))
return info
return info
@ -178,20 +210,101 @@ def search_yandex(name: str):
return info
def get_spotify_album_info(album_name: str, session: spotipy.Spotify):
search_result = session.search(q="album:" + album_name, type="album")
albums = search_result.get("albums", {}).get("items", [])
if albums:
return albums[0]
return None
@external_service_fallback
def get_spotify_album_info(album_name: str, session: spotipy.Spotify) -> dict:
info = {
"name": "",
"link": "",
"meta": {},
"image_url": "",
"release_date": "",
"total_tracks": 0,
"images": [],
"external_urls": {},
"artists": [],
"genres": [],
"tracks": [],
"full_data": {},
}
try:
search_result = session.search(q="album:" + album_name, type="album")
albums = search_result.get("albums", {}).get("items", [])
if not albums:
return info
album = albums[0]
album_id = album["id"]
full_album = session.album(album_id)
tracks = session.album_tracks(album_id)
return {
"name": album.get("name", ""),
"link": album.get("external_urls", {}).get("spotify", ""),
"meta": {
"album_type": album.get("album_type", ""),
"release_date_precision": album.get("release_date_precision", ""),
"total_tracks": album.get("total_tracks", 0),
"type": album.get("type", ""),
},
"image_url": next(
(img["url"] for img in album.get("images", []) if img.get("url")), ""
),
"release_date": album.get("release_date", ""),
"total_tracks": album.get("total_tracks", 0),
"images": album.get("images", []),
"external_urls": album.get("external_urls", {}),
"artists": clean_spotify_response(album.get("artists", [])),
"genres": clean_spotify_response(full_album.get("genres", [])),
"tracks": clean_spotify_response(tracks.get("items", [])),
"full_data": clean_spotify_response(full_album),
}
except Exception as e:
print("Failed to get album info", error=str(e))
return info
def get_spotify_artist_info(artist_name: str, session: spotipy.Spotify):
search_result = session.search(q="artist:" + artist_name, type="artist")
artists = search_result.get("artists", {}).get("items", [])
if artists:
return artists[0]
return None
@external_service_fallback
def get_spotify_artist_info(artist_name: str, session: spotipy.Spotify) -> dict:
info = {
"name": "",
"link": "",
"meta": {},
"image_url": "",
"genres": [],
"popularity": 0,
"images": [],
"external_urls": {},
"full_data": {},
}
try:
search_result = session.search(q="artist:" + artist_name, type="artist")
artists = search_result.get("artists", {}).get("items", [])
if not artists:
return info
artist = artists[0]
return {
"name": artist.get("name", ""),
"link": artist.get("external_urls", {}).get("spotify", ""),
"meta": {
"followers": artist.get("followers", {}).get("total", 0),
"popularity": artist.get("popularity", 0),
"type": artist.get("type", ""),
},
"image_url": next(
(img["url"] for img in artist.get("images", []) if img.get("url")), ""
),
"genres": artist.get("genres", []),
"popularity": artist.get("popularity", 0),
"images": artist.get("images", []),
"external_urls": artist.get("external_urls", {}),
"full_data": clean_spotify_response(artist),
}
except Exception as e:
print("Failed to get artist info", error=str(e))
return info
def get_yandex_album_info(album_name: str, client: Client):