Refactor music service and add Spotify support

This commit is contained in:
Alexander Karpov 2024-02-01 02:47:29 +03:00
parent 0189377aeb
commit db72084d64
4 changed files with 110 additions and 19 deletions

View File

@ -63,7 +63,7 @@ def filter(self, queryset):
if search_type in search_classes:
search_instance = search_classes[search_type](
queryset=File.objects.filter(user=self.request.user)
queryset=File.objects.filter(user=self.request.user).nocache()
)
queryset = search_instance.search(query)
return queryset

View File

@ -1,9 +1,11 @@
import os
import re
import requests
from deep_translator import GoogleTranslator
from django.core.files import File
from django.db import transaction
from django.db.models import Min
from django.utils.text import slugify
from mutagen import File as MutagenFile
from mutagen.id3 import APIC, ID3, TCON, TORY, TextFrame
@ -16,6 +18,32 @@
from akarpov.users.models import User
def get_or_create_author(author_name):
retry = True
while retry:
retry = False
try:
with transaction.atomic():
author, created = Author.objects.get_or_create(
name__iexact=author_name, defaults={"name": author_name}
)
return author
except Author.MultipleObjectsReturned:
with transaction.atomic():
# If multiple authors are found, get the first one and delete the rest
min_id = Author.objects.filter(name__iexact=author_name).aggregate(
Min("id")
)["id__min"]
author = Author.objects.get(id=min_id)
Author.objects.filter(name__iexact=author_name).exclude(
id=min_id
).delete()
return author
except Exception as e:
if "could not serialize access due to concurrent update" in str(e):
retry = True
def process_track_name(track_name: str) -> str:
# Split the track name by dash and parentheses
parts = track_name.split(" - ")
@ -78,23 +106,12 @@ def load_track(
if album and type(album) is str and album.startswith("['"):
album = album.replace("['", "").replace("']", "")
re_authors = []
processed_authors = []
if authors:
for x in authors:
while True:
try:
with transaction.atomic():
author, created = Author.objects.get_or_create(
name__iexact=x, defaults={"name": x}
)
re_authors.append(author)
break
except Author.MultipleObjectsReturned:
# If multiple authors are found, delete all but one
Author.objects.filter(name__iexact=x).exclude(
id=Author.objects.filter(name__iexact=x).first().id
).delete()
authors = re_authors
for author_name in authors:
author = get_or_create_author(author_name)
processed_authors.append(author)
authors = processed_authors
if album:
if type(album) is str:
@ -122,6 +139,14 @@ def load_track(
path = mp3_path
tag = MP3(path, ID3=ID3)
if image_path and image_path.startswith("http"):
response = requests.get(image_path)
se = image_path.split("/")[-1]
image_path = f'/tmp/{generate_readable_slug(name, Song)}.{"png" if "." not in se else se.split(".")[-1]}'
with open(image_path, "wb") as f:
f.write(response.content)
if image_path:
if not image_path.endswith(".png"):
nm = image_path

View File

@ -1,7 +1,10 @@
import spotipy
from django.conf import settings
from spotdl import Song, Spotdl
from spotipy.oauth2 import SpotifyClientCredentials
from akarpov.music.services.db import load_track
def create_session() -> spotipy.Spotify:
if not settings.MUSIC_SPOTIFY_ID or not settings.MUSIC_SPOTIFY_SECRET:
@ -18,3 +21,62 @@ def create_session() -> spotipy.Spotify:
def search(name: str, session: spotipy.Spotify, search_type="track"):
res = session.search(name, type=search_type)
return res
def download_url(url, user_id=None):
spot_settings = {
"simple_tui": True,
"log_level": "DEBUG",
"lyrics_providers": ["genius", "musixmatch"],
}
spotdl_client = Spotdl(
client_id=settings.MUSIC_SPOTIFY_ID,
client_secret=settings.MUSIC_SPOTIFY_SECRET,
user_auth=False,
cache_path="/tmp/",
headless=False,
downloader_settings=spot_settings,
)
session = create_session()
if "track" in url:
songs = [Song.from_url(url)]
elif "album" in url:
album_tracks = session.album(url)["tracks"]["items"]
songs = [
Song.from_url(track["external_urls"]["spotify"]) for track in album_tracks
]
elif "artist" in url:
artist_top_tracks = session.artist_top_tracks(url)["tracks"]
songs = [
Song.from_url(track["external_urls"]["spotify"])
for track in artist_top_tracks
]
elif "playlist" in url:
playlist_tracks = session.playlist_items(url)["items"]
songs = [
Song.from_url(track["track"]["external_urls"]["spotify"])
for track in playlist_tracks
]
else:
return None
for song in songs:
res = spotdl_client.download(song)
if res:
song, path = res
else:
return None
load_track(
path=str(path),
image_path=song.cover_url,
user_id=user_id,
authors=song.artists,
album=song.album_name,
name=song.name,
link=song.url,
genre=song.genres[0] if song.genres else None,
release=song.date,
)

View File

@ -19,7 +19,7 @@
UserListenHistory,
UserMusicProfile,
)
from akarpov.music.services import yandex, youtube
from akarpov.music.services import spotify, yandex, youtube
from akarpov.music.services.file import load_dir, load_file
from akarpov.utils.celery import get_scheduled_tasks_name
@ -28,7 +28,11 @@
@shared_task
def list_tracks(url, user_id):
if "music.yandex.ru" in url:
if "music.youtube.com" in url:
url = url.replace("music.youtube.com", "youtube.com")
if "spotify.com" in url:
spotify.download_url(url, user_id)
elif "music.yandex.ru" in url:
yandex.load_playlist(url, user_id)
elif "channel" in url or "/c/" in url:
p = Channel(url)