mirror of
https://github.com/Alexander-D-Karpov/akarpov
synced 2024-11-22 13:16:33 +03:00
added music app, inited channels
This commit is contained in:
parent
2cafea3cbc
commit
50348f7ba0
31
akarpov/common/channels.py
Normal file
31
akarpov/common/channels.py
Normal file
|
@ -0,0 +1,31 @@
|
|||
from channels.db import database_sync_to_async
|
||||
|
||||
from akarpov.common.jwt import read_jwt
|
||||
|
||||
|
||||
@database_sync_to_async
|
||||
def get_user(headers):
|
||||
# WARNING headers type is bytes
|
||||
if b"authorization" not in headers or not headers[b"authorization"]:
|
||||
return False
|
||||
|
||||
jwt = headers[b"authorization"].decode()
|
||||
payload = read_jwt(jwt)
|
||||
|
||||
if not payload or "id" not in payload:
|
||||
return False
|
||||
|
||||
return payload["id"]
|
||||
|
||||
|
||||
class HeaderAuthMiddleware:
|
||||
"""Custom middleware to read user auth token from string."""
|
||||
|
||||
def __init__(self, app):
|
||||
# Store the ASGI application we were passed
|
||||
self.app = app
|
||||
|
||||
async def __call__(self, scope, receive, send):
|
||||
scope["user"] = await get_user(dict(scope["headers"]))
|
||||
|
||||
return await self.app(scope, receive, send)
|
45
akarpov/common/jwt.py
Normal file
45
akarpov/common/jwt.py
Normal file
|
@ -0,0 +1,45 @@
|
|||
from datetime import datetime
|
||||
|
||||
import jwt
|
||||
import pytz
|
||||
from django.conf import settings
|
||||
from jwt import ExpiredSignatureError, InvalidSignatureError
|
||||
|
||||
TIMEZONE = pytz.timezone("Europe/Moscow")
|
||||
|
||||
|
||||
def sign_jwt(data: dict, t_life: None | int = None) -> str:
|
||||
"""generate and sign jwt with iat and exp using data from settings"""
|
||||
iat = int(datetime.now(tz=TIMEZONE).timestamp())
|
||||
exp = iat + settings.TOKEN_EXP if not t_life else iat + t_life
|
||||
payload = {"iat": iat, "exp": exp}
|
||||
for nm, el in data.items():
|
||||
if nm not in ["iat", "exp"]:
|
||||
payload[nm] = el
|
||||
|
||||
secret = settings.SECRET_KEY
|
||||
token = jwt.encode(payload=payload, key=secret)
|
||||
return token
|
||||
|
||||
|
||||
def read_jwt(token: str) -> dict | bool:
|
||||
"""reads jwt, validates it and return payload if correct"""
|
||||
header_data = jwt.get_unverified_header(token)
|
||||
secret = settings.SECRET_KEY
|
||||
try:
|
||||
payload = jwt.decode(token, key=secret, algorithms=[header_data["alg"]])
|
||||
except ExpiredSignatureError:
|
||||
return False
|
||||
except InvalidSignatureError:
|
||||
return False
|
||||
|
||||
if "exp" not in payload:
|
||||
return False
|
||||
|
||||
if int(datetime.now(tz=TIMEZONE).timestamp()) > payload["exp"]:
|
||||
return False
|
||||
|
||||
payload.pop("iat", None)
|
||||
payload.pop("exp", None)
|
||||
|
||||
return payload
|
6
akarpov/common/views.py
Normal file
6
akarpov/common/views.py
Normal file
|
@ -0,0 +1,6 @@
|
|||
from django.contrib.auth.mixins import LoginRequiredMixin, UserPassesTestMixin
|
||||
|
||||
|
||||
class SuperUserRequiredMixin(LoginRequiredMixin, UserPassesTestMixin):
|
||||
def test_func(self):
|
||||
return self.request.user.is_superuser
|
0
akarpov/music/__init__.py
Normal file
0
akarpov/music/__init__.py
Normal file
0
akarpov/music/admin.py
Normal file
0
akarpov/music/admin.py
Normal file
12
akarpov/music/apps.py
Normal file
12
akarpov/music/apps.py
Normal file
|
@ -0,0 +1,12 @@
|
|||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class MusicConfig(AppConfig):
|
||||
default_auto_field = "django.db.models.BigAutoField"
|
||||
name = "akarpov.music"
|
||||
|
||||
def ready(self):
|
||||
try:
|
||||
import akarpov.music.signals # noqa F401
|
||||
except ImportError:
|
||||
pass
|
9
akarpov/music/forms.py
Normal file
9
akarpov/music/forms.py
Normal file
|
@ -0,0 +1,9 @@
|
|||
from django import forms
|
||||
|
||||
|
||||
class TracksLoadForm(forms.Form):
|
||||
address = forms.CharField(max_length=500)
|
||||
|
||||
|
||||
class FileUploadForm(forms.Form):
|
||||
file = forms.FileField(widget=forms.ClearableFileInput(attrs={"multiple": True}))
|
237
akarpov/music/migrations/0001_initial.py
Normal file
237
akarpov/music/migrations/0001_initial.py
Normal file
|
@ -0,0 +1,237 @@
|
|||
# Generated by Django 4.1.7 on 2023-03-27 21:25
|
||||
|
||||
import akarpov.utils.files
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
import django.db.models.deletion
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
|
||||
("shortener", "0001_initial"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name="Album",
|
||||
fields=[
|
||||
(
|
||||
"id",
|
||||
models.BigAutoField(
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
verbose_name="ID",
|
||||
),
|
||||
),
|
||||
(
|
||||
"image",
|
||||
models.ImageField(
|
||||
blank=True, upload_to=akarpov.utils.files.user_file_upload_mixin
|
||||
),
|
||||
),
|
||||
("image_cropped", models.ImageField(blank=True, upload_to="cropped/")),
|
||||
("slug", models.SlugField(blank=True, max_length=20, unique=True)),
|
||||
("name", models.CharField(max_length=200)),
|
||||
("link", models.URLField(blank=True)),
|
||||
(
|
||||
"short_link",
|
||||
models.ForeignKey(
|
||||
blank=True,
|
||||
null=True,
|
||||
on_delete=django.db.models.deletion.SET_NULL,
|
||||
to="shortener.link",
|
||||
),
|
||||
),
|
||||
],
|
||||
options={
|
||||
"abstract": False,
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name="Author",
|
||||
fields=[
|
||||
(
|
||||
"id",
|
||||
models.BigAutoField(
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
verbose_name="ID",
|
||||
),
|
||||
),
|
||||
(
|
||||
"image",
|
||||
models.ImageField(
|
||||
blank=True, upload_to=akarpov.utils.files.user_file_upload_mixin
|
||||
),
|
||||
),
|
||||
("image_cropped", models.ImageField(blank=True, upload_to="cropped/")),
|
||||
("slug", models.SlugField(blank=True, max_length=20, unique=True)),
|
||||
("name", models.CharField(max_length=200)),
|
||||
("link", models.URLField(blank=True)),
|
||||
(
|
||||
"short_link",
|
||||
models.ForeignKey(
|
||||
blank=True,
|
||||
null=True,
|
||||
on_delete=django.db.models.deletion.SET_NULL,
|
||||
to="shortener.link",
|
||||
),
|
||||
),
|
||||
],
|
||||
options={
|
||||
"abstract": False,
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name="Playlist",
|
||||
fields=[
|
||||
(
|
||||
"id",
|
||||
models.BigAutoField(
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
verbose_name="ID",
|
||||
),
|
||||
),
|
||||
("slug", models.SlugField(blank=True, max_length=20, unique=True)),
|
||||
("name", models.CharField(max_length=200)),
|
||||
("private", models.BooleanField(default=False)),
|
||||
("length", models.IntegerField(default=0)),
|
||||
(
|
||||
"creator",
|
||||
models.ForeignKey(
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
related_name="playlists",
|
||||
to=settings.AUTH_USER_MODEL,
|
||||
),
|
||||
),
|
||||
(
|
||||
"short_link",
|
||||
models.ForeignKey(
|
||||
blank=True,
|
||||
null=True,
|
||||
on_delete=django.db.models.deletion.SET_NULL,
|
||||
to="shortener.link",
|
||||
),
|
||||
),
|
||||
],
|
||||
options={
|
||||
"abstract": False,
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name="SongInQue",
|
||||
fields=[
|
||||
(
|
||||
"id",
|
||||
models.BigAutoField(
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
verbose_name="ID",
|
||||
),
|
||||
),
|
||||
("name", models.CharField(blank=True, max_length=250)),
|
||||
("error", models.BooleanField(default=False)),
|
||||
],
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name="Song",
|
||||
fields=[
|
||||
(
|
||||
"id",
|
||||
models.BigAutoField(
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
verbose_name="ID",
|
||||
),
|
||||
),
|
||||
(
|
||||
"image",
|
||||
models.ImageField(
|
||||
blank=True, upload_to=akarpov.utils.files.user_file_upload_mixin
|
||||
),
|
||||
),
|
||||
("image_cropped", models.ImageField(blank=True, upload_to="cropped/")),
|
||||
("slug", models.SlugField(blank=True, max_length=20, unique=True)),
|
||||
("link", models.URLField(blank=True)),
|
||||
("length", models.IntegerField(null=True)),
|
||||
("played", models.IntegerField(default=0)),
|
||||
("name", models.CharField(max_length=200)),
|
||||
("file", models.FileField(upload_to="music")),
|
||||
(
|
||||
"album",
|
||||
models.ForeignKey(
|
||||
null=True,
|
||||
on_delete=django.db.models.deletion.SET_NULL,
|
||||
related_name="songs",
|
||||
to="music.album",
|
||||
),
|
||||
),
|
||||
(
|
||||
"author",
|
||||
models.ForeignKey(
|
||||
null=True,
|
||||
on_delete=django.db.models.deletion.SET_NULL,
|
||||
related_name="songs",
|
||||
to="music.author",
|
||||
),
|
||||
),
|
||||
(
|
||||
"short_link",
|
||||
models.ForeignKey(
|
||||
blank=True,
|
||||
null=True,
|
||||
on_delete=django.db.models.deletion.SET_NULL,
|
||||
to="shortener.link",
|
||||
),
|
||||
),
|
||||
],
|
||||
options={
|
||||
"abstract": False,
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name="PlaylistSong",
|
||||
fields=[
|
||||
(
|
||||
"id",
|
||||
models.BigAutoField(
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
verbose_name="ID",
|
||||
),
|
||||
),
|
||||
("order", models.IntegerField()),
|
||||
(
|
||||
"playlist",
|
||||
models.ForeignKey(
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
related_name="songs",
|
||||
to="music.playlist",
|
||||
),
|
||||
),
|
||||
(
|
||||
"song",
|
||||
models.ForeignKey(
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
related_name="playlists",
|
||||
to="music.song",
|
||||
),
|
||||
),
|
||||
],
|
||||
options={
|
||||
"ordering": ["order"],
|
||||
"unique_together": {("playlist", "song"), ("playlist", "order")},
|
||||
},
|
||||
),
|
||||
]
|
0
akarpov/music/migrations/__init__.py
Normal file
0
akarpov/music/migrations/__init__.py
Normal file
84
akarpov/music/models.py
Normal file
84
akarpov/music/models.py
Normal file
|
@ -0,0 +1,84 @@
|
|||
from django.db import models
|
||||
from django.urls import reverse
|
||||
|
||||
from akarpov.common.models import BaseImageModel
|
||||
from akarpov.tools.shortener.models import ShortLink
|
||||
|
||||
|
||||
class Author(BaseImageModel, ShortLink):
|
||||
name = models.CharField(max_length=200)
|
||||
link = models.URLField(blank=True)
|
||||
|
||||
def get_absolute_url(self):
|
||||
return reverse("music:author", kwargs={"slug": self.slug})
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
|
||||
class Album(BaseImageModel, ShortLink):
|
||||
name = models.CharField(max_length=200)
|
||||
link = models.URLField(blank=True)
|
||||
|
||||
def get_absolute_url(self):
|
||||
return reverse("music:album", kwargs={"slug": self.slug})
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
|
||||
class Song(BaseImageModel, ShortLink):
|
||||
link = models.URLField(blank=True)
|
||||
length = models.IntegerField(null=True)
|
||||
played = models.IntegerField(default=0)
|
||||
name = models.CharField(max_length=200)
|
||||
file = models.FileField(upload_to="music")
|
||||
author = models.ForeignKey(
|
||||
Author, null=True, related_name="songs", on_delete=models.SET_NULL
|
||||
)
|
||||
album = models.ForeignKey(
|
||||
Album, null=True, related_name="songs", on_delete=models.SET_NULL
|
||||
)
|
||||
|
||||
def get_absolute_url(self):
|
||||
return reverse("music:song", kwargs={"slug": self.slug})
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
class SlugMeta:
|
||||
slug_length = 10
|
||||
|
||||
|
||||
class Playlist(ShortLink):
|
||||
name = models.CharField(max_length=200)
|
||||
private = models.BooleanField(default=False)
|
||||
creator = models.ForeignKey(
|
||||
"users.User", related_name="playlists", on_delete=models.CASCADE
|
||||
)
|
||||
length = models.IntegerField(default=0)
|
||||
|
||||
def get_absolute_url(self):
|
||||
return reverse("playlist:song", kwargs={"slug": self.slug})
|
||||
|
||||
def get_songs(self):
|
||||
return self.songs.all().values("song")
|
||||
|
||||
|
||||
class PlaylistSong(models.Model):
|
||||
order = models.IntegerField()
|
||||
playlist = models.ForeignKey(
|
||||
"Playlist", related_name="songs", on_delete=models.CASCADE
|
||||
)
|
||||
song = models.ForeignKey("Song", related_name="playlists", on_delete=models.CASCADE)
|
||||
|
||||
class Meta:
|
||||
unique_together = [("playlist", "song"), ("playlist", "order")]
|
||||
ordering = ["order"]
|
||||
|
||||
|
||||
class SongInQue(models.Model):
|
||||
"""It is very important for pk of SongInQue and Song to be the same"""
|
||||
|
||||
name = models.CharField(blank=True, max_length=250)
|
||||
error = models.BooleanField(default=False)
|
0
akarpov/music/services/__init__.py
Normal file
0
akarpov/music/services/__init__.py
Normal file
11
akarpov/music/services/base.py
Normal file
11
akarpov/music/services/base.py
Normal file
|
@ -0,0 +1,11 @@
|
|||
from akarpov.music.tasks import list_tracks, process_dir
|
||||
|
||||
|
||||
def load_tracks(address: str):
|
||||
if address.startswith("/"):
|
||||
process_dir.apply_async(kwargs={"path": address})
|
||||
list_tracks.apply_async(kwargs={"url": address})
|
||||
|
||||
|
||||
def load_track_file(file):
|
||||
...
|
44
akarpov/music/services/file.py
Normal file
44
akarpov/music/services/file.py
Normal file
|
@ -0,0 +1,44 @@
|
|||
from pathlib import Path
|
||||
|
||||
import mutagen
|
||||
from django.core.files import File
|
||||
|
||||
from akarpov.music.models import Album, Author, Song, SongInQue
|
||||
|
||||
|
||||
def load_dir(path: str):
|
||||
path = Path(path)
|
||||
|
||||
for f in list(path.glob("**/*.mp3")):
|
||||
with f.open("rb") as file:
|
||||
process_mp3_file(File(file, name=str(f).split("/")[-1]), str(f))
|
||||
|
||||
|
||||
def process_mp3_file(file: File, path: str) -> None:
|
||||
que = SongInQue.objects.create()
|
||||
try:
|
||||
tag = mutagen.File(path, easy=True)
|
||||
que.name = tag["title"][0] if "title" in tag else path.split("/")[-1]
|
||||
que.save()
|
||||
if "artist" in tag:
|
||||
author = Author.objects.get_or_create(name=tag["artist"][0])[0]
|
||||
else:
|
||||
author = None
|
||||
|
||||
if "album" in tag:
|
||||
album = Album.objects.get_or_create(name=tag["album"][0])[0]
|
||||
else:
|
||||
album = None
|
||||
|
||||
song, created = Song.objects.get_or_create(
|
||||
name=tag["title"][0] if "title" in tag else path.split("/")[-1],
|
||||
author=author,
|
||||
album=album,
|
||||
)
|
||||
song.file = file
|
||||
song.save(update_fields=["file"])
|
||||
que.delete()
|
||||
except Exception as e:
|
||||
que.name = e
|
||||
que.error = True
|
||||
que.save()
|
60
akarpov/music/services/spotify.py
Normal file
60
akarpov/music/services/spotify.py
Normal file
|
@ -0,0 +1,60 @@
|
|||
import spotipy
|
||||
from django.conf import settings
|
||||
from spotipy.oauth2 import SpotifyClientCredentials
|
||||
|
||||
from akarpov.music.services.yandex import search_ym
|
||||
|
||||
|
||||
def login() -> spotipy.Spotify:
|
||||
if not settings.SPOTIFY_ID or not settings.SPOTIFY_SECRET:
|
||||
raise ConnectionError("No spotify credentials provided")
|
||||
return spotipy.Spotify(
|
||||
auth_manager=SpotifyClientCredentials(
|
||||
client_id=settings.SPOTIFY_ID, client_secret=settings.SPOTIFY_SECRET
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def search(name: str, search_type="track"):
|
||||
sp = login()
|
||||
res = sp.search(name, type=search_type)
|
||||
return res
|
||||
|
||||
|
||||
def get_track_info(name: str) -> dict:
|
||||
info = {
|
||||
"album_name": "",
|
||||
"album_image": "",
|
||||
"release": "",
|
||||
"artists": [],
|
||||
"artist": "",
|
||||
"title": "",
|
||||
}
|
||||
|
||||
res = search(name)["tracks"]["items"]
|
||||
if not res:
|
||||
return info
|
||||
res = res[0]
|
||||
|
||||
info["album_name"] = res["album"]["name"]
|
||||
info["release"] = res["album"]["release_date"].split("-")[0]
|
||||
info["album_image"] = res["album"]["images"][0]["url"]
|
||||
info["artists"] = [x["name"] for x in res["artists"]]
|
||||
info["artist"] = [x["name"] for x in res["artists"]][0]
|
||||
info["title"] = res["name"]
|
||||
|
||||
# try to get genre
|
||||
sp = login()
|
||||
genres = sp.album(res["album"]["external_urls"]["spotify"])["genres"]
|
||||
if not genres:
|
||||
ym_info = search_ym(info["artist"] + " " + info["title"])
|
||||
if ym_info and "genre" in ym_info:
|
||||
info["genre"] = ym_info["genre"]
|
||||
else:
|
||||
genres = sp.artist(res["artists"][0]["external_urls"]["spotify"])["genres"]
|
||||
if genres:
|
||||
info["genre"] = genres[0]
|
||||
else:
|
||||
info["genre"] = genres[0]
|
||||
|
||||
return info
|
135
akarpov/music/services/yandex.py
Normal file
135
akarpov/music/services/yandex.py
Normal file
|
@ -0,0 +1,135 @@
|
|||
import os
|
||||
from pathlib import Path
|
||||
from random import randint
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.files import File
|
||||
from django.utils.text import slugify
|
||||
from mutagen.easyid3 import EasyID3
|
||||
from mutagen.id3 import APIC, ID3, TCON, TORY
|
||||
from mutagen.mp3 import MP3
|
||||
from pydub import AudioSegment
|
||||
from yandex_music import Client, Playlist, Search, Track
|
||||
|
||||
from akarpov.music import tasks
|
||||
from akarpov.music.models import Album, Author, Song, SongInQue
|
||||
|
||||
|
||||
def login() -> Client:
|
||||
if not settings.YANDEX_TOKEN:
|
||||
raise ConnectionError("No yandex credentials provided")
|
||||
return Client(settings.YANDEX_TOKEN).init()
|
||||
|
||||
|
||||
def search_ym(name: str):
|
||||
client = login()
|
||||
info = {}
|
||||
search = client.search(name, type_="track") # type: Search
|
||||
|
||||
if search.tracks:
|
||||
best = search.tracks.results[0] # type: Track
|
||||
|
||||
info = {
|
||||
"artists": [artist.name for artist in best.artists],
|
||||
"title": best.title,
|
||||
}
|
||||
|
||||
# getting genre
|
||||
if best.albums[0].genre:
|
||||
genre = best.albums[0].genre
|
||||
elif best.artists[0].genres:
|
||||
genre = best.artists[0].genres[0]
|
||||
else:
|
||||
genre = None
|
||||
|
||||
if genre:
|
||||
info["genre"] = genre
|
||||
|
||||
return info
|
||||
|
||||
|
||||
def load_file_meta(track: int):
|
||||
que = SongInQue.objects.create()
|
||||
try:
|
||||
client = login()
|
||||
track = client.tracks(track)[0] # type: Track
|
||||
que.name = track.title
|
||||
que.save()
|
||||
|
||||
try:
|
||||
if sng := Song.objects.filter(
|
||||
name=track.title, album__name=track.albums[0].title
|
||||
):
|
||||
que.delete()
|
||||
return sng.first()
|
||||
except IndexError:
|
||||
que.delete()
|
||||
return
|
||||
|
||||
filename = slugify(f"{track.artists[0].name} - {track.title}")
|
||||
orig_path = f"{settings.MEDIA_ROOT}/{filename}"
|
||||
|
||||
track.download(filename=orig_path, codec="mp3")
|
||||
|
||||
path = orig_path + ".mp3"
|
||||
AudioSegment.from_file(orig_path).export(path)
|
||||
os.remove(orig_path)
|
||||
|
||||
# load album image
|
||||
img_pth = str(settings.MEDIA_ROOT + f"/_{str(randint(10000, 99999))}.png")
|
||||
|
||||
track.download_cover(filename=img_pth)
|
||||
|
||||
album = track.albums[0]
|
||||
|
||||
# set music meta
|
||||
tag = MP3(path, ID3=ID3)
|
||||
tag.tags.add(
|
||||
APIC(
|
||||
encoding=3, # 3 is for utf-8
|
||||
mime="image/png", # image/jpeg or image/png
|
||||
type=3, # 3 is for the cover image
|
||||
desc="Cover",
|
||||
data=open(img_pth, "rb").read(),
|
||||
)
|
||||
)
|
||||
tag.tags.add(TORY(text=str(album.year)))
|
||||
tag.tags.add(TCON(text=album.genre))
|
||||
tag.save()
|
||||
|
||||
os.remove(img_pth)
|
||||
tag = EasyID3(path)
|
||||
|
||||
tag["title"] = track.title
|
||||
tag["album"] = album.title
|
||||
tag["artist"] = track.artists[0].name
|
||||
|
||||
tag.save()
|
||||
|
||||
# save track
|
||||
ms_path = Path(path)
|
||||
song = Song(
|
||||
name=track.title,
|
||||
author=Author.objects.get_or_create(name=track.artists[0].name)[0],
|
||||
album=Album.objects.get_or_create(name=album.title)[0],
|
||||
)
|
||||
with ms_path.open(mode="rb") as f:
|
||||
song.file = File(f, name=ms_path.name)
|
||||
song.save()
|
||||
os.remove(path)
|
||||
que.delete()
|
||||
return song
|
||||
except Exception as e:
|
||||
que.name = e
|
||||
que.error = True
|
||||
que.save()
|
||||
|
||||
|
||||
def load_playlist(link: str):
|
||||
author = link.split("/")[4]
|
||||
playlist_id = link.split("/")[-1]
|
||||
|
||||
client = login()
|
||||
playlist = client.users_playlists(int(playlist_id), author) # type: Playlist
|
||||
for track in playlist.fetch_tracks():
|
||||
tasks.load_ym_file_meta.apply_async(kwargs={"track": track.track.id})
|
108
akarpov/music/services/youtube.py
Normal file
108
akarpov/music/services/youtube.py
Normal file
|
@ -0,0 +1,108 @@
|
|||
import os
|
||||
from pathlib import Path
|
||||
from random import randint
|
||||
|
||||
import requests
|
||||
from django.conf import settings
|
||||
from django.core.files import File
|
||||
from mutagen.easyid3 import EasyID3
|
||||
from mutagen.id3 import APIC, ID3, TCON, TORY
|
||||
from mutagen.mp3 import MP3
|
||||
from PIL import Image
|
||||
from pydub import AudioSegment
|
||||
from pytube import Search, YouTube
|
||||
|
||||
from akarpov.music.models import Album, Author, Song, SongInQue
|
||||
from akarpov.music.services.spotify import get_track_info
|
||||
|
||||
|
||||
def download_from_youtube_link(link: str) -> Song:
|
||||
que = SongInQue.objects.create()
|
||||
try:
|
||||
yt = YouTube(link)
|
||||
|
||||
if yt.length > 900:
|
||||
# TODO: add long video splitting
|
||||
raise ValueError("Track is too long")
|
||||
|
||||
if not len(yt.streams):
|
||||
raise ValueError("There is no such song")
|
||||
|
||||
info = get_track_info(yt.title)
|
||||
que.name = info["title"]
|
||||
que.save()
|
||||
if sng := Song.objects.filter(
|
||||
name=info["title"], album__name=info["album_name"]
|
||||
):
|
||||
que.delete()
|
||||
return sng.first()
|
||||
|
||||
authors = [Author.objects.get_or_create(name=x)[0] for x in info["artists"]]
|
||||
album = Album.objects.get_or_create(name=info["album_name"])[0]
|
||||
|
||||
audio = yt.streams.filter(only_audio=True).order_by("abr").desc().first()
|
||||
orig_path = audio.download(output_path=settings.MEDIA_ROOT)
|
||||
|
||||
# convert to mp3
|
||||
path = orig_path.replace(orig_path.split(".")[-1], "mp3")
|
||||
AudioSegment.from_file(orig_path).export(path)
|
||||
os.remove(orig_path)
|
||||
|
||||
# load album image
|
||||
r = requests.get(info["album_image"])
|
||||
img_pth = str(
|
||||
settings.MEDIA_ROOT
|
||||
+ f"/{info['album_image'].split('/')[-1]}_{str(randint(100, 999))}"
|
||||
)
|
||||
with open(img_pth, "wb") as f:
|
||||
f.write(r.content)
|
||||
|
||||
im = Image.open(img_pth)
|
||||
im.save(str(f"{img_pth}.png"))
|
||||
|
||||
os.remove(img_pth)
|
||||
|
||||
# set music meta
|
||||
tag = MP3(path, ID3=ID3)
|
||||
tag.tags.add(
|
||||
APIC(
|
||||
encoding=3, # 3 is for utf-8
|
||||
mime="image/png", # image/jpeg or image/png
|
||||
type=3, # 3 is for the cover image
|
||||
desc="Cover",
|
||||
data=open(str(f"{img_pth}.png"), "rb").read(),
|
||||
)
|
||||
)
|
||||
tag.tags.add(TORY(text=info["release"]))
|
||||
if "genre" in info:
|
||||
tag.tags.add(TCON(text=info["genre"]))
|
||||
|
||||
tag.save()
|
||||
os.remove(str(f"{img_pth}.png"))
|
||||
tag = EasyID3(path)
|
||||
|
||||
tag["title"] = info["title"]
|
||||
tag["album"] = info["album_name"]
|
||||
tag["artist"] = info["artist"]
|
||||
|
||||
tag.save()
|
||||
|
||||
# save track
|
||||
ms_path = Path(path)
|
||||
song = Song(name=info["title"], author=authors[0], album=album)
|
||||
with ms_path.open(mode="rb") as f:
|
||||
song.file = File(f, name=ms_path.name)
|
||||
song.save()
|
||||
os.remove(path)
|
||||
que.delete()
|
||||
return song
|
||||
except Exception as e:
|
||||
que.name = e
|
||||
que.error = True
|
||||
que.save()
|
||||
|
||||
|
||||
def search_channel(name):
|
||||
s = Search(name)
|
||||
vid = s.results[0] # type: YouTube
|
||||
return vid.channel_url
|
18
akarpov/music/signals.py
Normal file
18
akarpov/music/signals.py
Normal file
|
@ -0,0 +1,18 @@
|
|||
import os
|
||||
|
||||
from django.db.models.signals import post_delete, post_save
|
||||
from django.dispatch import receiver
|
||||
|
||||
from akarpov.music.models import Song
|
||||
|
||||
|
||||
@receiver(post_delete, sender=Song)
|
||||
def auto_delete_file_on_delete(sender, instance, **kwargs):
|
||||
if instance.file:
|
||||
if os.path.isfile(instance.file.path):
|
||||
os.remove(instance.file.path)
|
||||
|
||||
|
||||
@receiver(post_save)
|
||||
def send_que_status(sender, instance, created, **kwargs):
|
||||
...
|
41
akarpov/music/tasks.py
Normal file
41
akarpov/music/tasks.py
Normal file
|
@ -0,0 +1,41 @@
|
|||
from celery import shared_task
|
||||
from pytube import Channel, Playlist
|
||||
|
||||
from akarpov.music.services import yandex, youtube
|
||||
from akarpov.music.services.file import load_dir
|
||||
|
||||
|
||||
@shared_task
|
||||
def list_tracks(url):
|
||||
if "yandex" in url:
|
||||
yandex.load_playlist(url)
|
||||
elif "channel" in url or "/c/" in url:
|
||||
p = Channel(url)
|
||||
for video in p.video_urls:
|
||||
process_yb.apply_async(kwargs={"url": video})
|
||||
elif "playlist" in url or "&list=" in url:
|
||||
p = Playlist(url)
|
||||
for video in p.video_urls:
|
||||
process_yb.apply_async(kwargs={"url": video})
|
||||
|
||||
else:
|
||||
process_yb.apply_async(kwargs={"url": url})
|
||||
|
||||
return url
|
||||
|
||||
|
||||
@shared_task(max_retries=5)
|
||||
def process_yb(url):
|
||||
youtube.download_from_youtube_link(url)
|
||||
return url
|
||||
|
||||
|
||||
@shared_task
|
||||
def process_dir(path):
|
||||
load_dir(path)
|
||||
return path
|
||||
|
||||
|
||||
@shared_task
|
||||
def load_ym_file_meta(track):
|
||||
return yandex.load_file_meta(track)
|
14
akarpov/music/urls.py
Normal file
14
akarpov/music/urls.py
Normal file
|
@ -0,0 +1,14 @@
|
|||
from django.urls import path
|
||||
|
||||
from . import views
|
||||
|
||||
app_name = "music"
|
||||
|
||||
urlpatterns = [
|
||||
path("upload", views.load_track_view, name="load"),
|
||||
path("upload_file", views.load_track_file_view, name="upload"),
|
||||
path("<str:slug>", views.song_view, name="song"),
|
||||
path("album/<str:slug>", views.album_view, name="album"),
|
||||
path("author/<str:slug>", views.author_view, name="author"),
|
||||
path("playlist/<str:slug>", views.playlist_view, name="playlist"),
|
||||
]
|
76
akarpov/music/views.py
Normal file
76
akarpov/music/views.py
Normal file
|
@ -0,0 +1,76 @@
|
|||
from django.views import generic
|
||||
|
||||
from akarpov.common.views import SuperUserRequiredMixin
|
||||
from akarpov.music.forms import FileUploadForm, TracksLoadForm
|
||||
from akarpov.music.models import Album, Author, Playlist, Song
|
||||
from akarpov.music.services.base import load_track_file, load_tracks
|
||||
|
||||
|
||||
class AlbumView(generic.DetailView):
|
||||
model = Album
|
||||
slug_field = "slug"
|
||||
slug_url_kwarg = "slug"
|
||||
template_name = "music/album.html"
|
||||
|
||||
|
||||
album_view = AlbumView.as_view()
|
||||
|
||||
|
||||
class AuthorView(generic.DetailView):
|
||||
model = Author
|
||||
slug_field = "slug"
|
||||
slug_url_kwarg = "slug"
|
||||
template_name = "music/author.html"
|
||||
|
||||
|
||||
author_view = AuthorView.as_view()
|
||||
|
||||
|
||||
class SongView(generic.DetailView):
|
||||
model = Song
|
||||
slug_field = "slug"
|
||||
slug_url_kwarg = "slug"
|
||||
template_name = "music/song.html"
|
||||
|
||||
|
||||
song_view = SongView.as_view()
|
||||
|
||||
|
||||
class PlaylistView(generic.DetailView):
|
||||
model = Playlist
|
||||
slug_field = "slug"
|
||||
slug_url_kwarg = "slug"
|
||||
template_name = "music/playlist.html"
|
||||
|
||||
|
||||
playlist_view = SongView.as_view()
|
||||
|
||||
|
||||
class LoadTrackView(SuperUserRequiredMixin, generic.FormView):
|
||||
form_class = TracksLoadForm
|
||||
|
||||
def get_success_url(self):
|
||||
# TODO: add room to see tracks load
|
||||
return ""
|
||||
|
||||
def form_valid(self, form):
|
||||
load_tracks(form.address)
|
||||
return super().form_valid(form)
|
||||
|
||||
|
||||
load_track_view = LoadTrackView.as_view()
|
||||
|
||||
|
||||
class LoadTrackFileView(SuperUserRequiredMixin, generic.FormView):
|
||||
form_class = FileUploadForm
|
||||
|
||||
def get_success_url(self):
|
||||
# TODO: add room to see tracks load
|
||||
return ""
|
||||
|
||||
def form_valid(self, form):
|
||||
load_track_file(form.files)
|
||||
return super().form_valid(form)
|
||||
|
||||
|
||||
load_track_file_view = LoadTrackFileView.as_view()
|
0
akarpov/templates/music/album.html
Normal file
0
akarpov/templates/music/album.html
Normal file
10
akarpov/templates/music/author.html
Normal file
10
akarpov/templates/music/author.html
Normal file
|
@ -0,0 +1,10 @@
|
|||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<title>Title</title>
|
||||
</head>
|
||||
<body>
|
||||
|
||||
</body>
|
||||
</html>
|
0
akarpov/templates/music/playlist.html
Normal file
0
akarpov/templates/music/playlist.html
Normal file
0
akarpov/templates/music/song.html
Normal file
0
akarpov/templates/music/song.html
Normal file
16
config/asgi.py
Normal file
16
config/asgi.py
Normal file
|
@ -0,0 +1,16 @@
|
|||
import os
|
||||
|
||||
from channels.routing import ProtocolTypeRouter, URLRouter
|
||||
from django.core.asgi import get_asgi_application
|
||||
|
||||
from akarpov.common.channels import HeaderAuthMiddleware
|
||||
from config import routing
|
||||
|
||||
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings.local")
|
||||
|
||||
application = ProtocolTypeRouter(
|
||||
{
|
||||
"http": get_asgi_application(),
|
||||
"websocket": HeaderAuthMiddleware(URLRouter(routing.websocket_urlpatterns)),
|
||||
}
|
||||
)
|
1
config/routing.py
Normal file
1
config/routing.py
Normal file
|
@ -0,0 +1 @@
|
|||
websocket_urlpatterns = []
|
|
@ -78,6 +78,7 @@
|
|||
# APPS
|
||||
# ------------------------------------------------------------------------------
|
||||
DJANGO_APPS = [
|
||||
"daphne",
|
||||
"django.contrib.auth",
|
||||
"django.contrib.contenttypes",
|
||||
"django.contrib.sessions",
|
||||
|
@ -142,6 +143,7 @@
|
|||
"akarpov.about",
|
||||
"akarpov.blog",
|
||||
"akarpov.files",
|
||||
"akarpov.music",
|
||||
"akarpov.pipeliner",
|
||||
"akarpov.test_platform",
|
||||
"akarpov.tools.shortener",
|
||||
|
@ -522,3 +524,16 @@
|
|||
# ACTIVE_LINK
|
||||
# ------------------------------------------------------------------------------
|
||||
ACTIVE_LINK_CSS_CLASS = "nav-active"
|
||||
|
||||
# MUSIC
|
||||
# ------------------------------------------------------------------------------
|
||||
# MPD
|
||||
MUSIC_MPD_HOST = env("MPD_HOST", default="")
|
||||
MUSIC_MPD_PASSWORD = env("MPD_HOST", default="")
|
||||
|
||||
# SPOTIFY
|
||||
MUSIC_SPOTIFY_ID = env("MPD_HOST", default="")
|
||||
MUSIC_SPOTIFY_SECRET = env("MPD_HOST", default="")
|
||||
|
||||
# YANDEX_MUSIC
|
||||
MUSIC_YANDEX_TOKEN = env("MPD_HOST", default="")
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
path("users/", include("akarpov.users.urls", namespace="users")),
|
||||
path("about/", include("akarpov.about.urls", namespace="about")),
|
||||
path("files/", include("akarpov.files.urls", namespace="files")),
|
||||
path("files/", include("akarpov.music.urls", namespace="music")),
|
||||
path("forms/", include("akarpov.test_platform.urls", namespace="forms")),
|
||||
path("tools/", include("akarpov.tools.urls", namespace="tools")),
|
||||
path("ckeditor/", include("ckeditor_uploader.urls")),
|
||||
|
|
859
poetry.lock
generated
859
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
|
@ -67,11 +67,20 @@ django-cms = "^3.11.1"
|
|||
django-sekizai = "^4.0.0"
|
||||
amzqr = "^0.0.1"
|
||||
django-active-link = "^0.1.8"
|
||||
channels = "^4.0.0"
|
||||
channels = {extras = ["daphne"], version = "^4.0.0"}
|
||||
django-upload-validator = "^1.1.6"
|
||||
markdown = "^3.4.3"
|
||||
pydotplus = "^2.0.2"
|
||||
preview-generator = "^0.29"
|
||||
uuid = "^1.30"
|
||||
mutagen = "^1.46.0"
|
||||
pytube = "^12.1.3"
|
||||
ytmusicapi = "^0.25.1"
|
||||
pydub = "^0.25.1"
|
||||
python-mpd2 = "^3.0.5"
|
||||
spotipy = "^2.22.1"
|
||||
yandex-music = "^2.0.1"
|
||||
pyjwt = "^2.6.0"
|
||||
|
||||
|
||||
[build-system]
|
||||
|
|
Loading…
Reference in New Issue
Block a user