mirror of
				https://github.com/Alexander-D-Karpov/akarpov
				synced 2025-10-31 23:47:30 +03:00 
			
		
		
		
	added yt-dlp for youtube music download
This commit is contained in:
		
							parent
							
								
									08b1479401
								
							
						
					
					
						commit
						b7773181ae
					
				|  | @ -28,8 +28,10 @@ def __init__(self, app): | |||
| 
 | ||||
|     async def __call__(self, scope, receive, send): | ||||
|         scope["user"] = await get_user(dict(scope["headers"])) | ||||
| 
 | ||||
|         try: | ||||
|             return await self.app(scope, receive, send) | ||||
|         except ValueError: | ||||
|             return | ||||
| 
 | ||||
| 
 | ||||
| class BaseConsumer(AsyncJsonWebsocketConsumer): | ||||
|  |  | |||
|  | @ -6,11 +6,12 @@ | |||
| 
 | ||||
| 
 | ||||
| def login() -> spotipy.Spotify: | ||||
|     if not settings.SPOTIFY_ID or not settings.SPOTIFY_SECRET: | ||||
|     if not settings.MUSIC_SPOTIFY_ID or not settings.MUSIC_SPOTIFY_SECRET: | ||||
|         raise ConnectionError("No spotify credentials provided") | ||||
|     return spotipy.Spotify( | ||||
|         auth_manager=SpotifyClientCredentials( | ||||
|             client_id=settings.SPOTIFY_ID, client_secret=settings.SPOTIFY_SECRET | ||||
|             client_id=settings.MUSIC_SPOTIFY_ID, | ||||
|             client_secret=settings.MUSIC_SPOTIFY_SECRET, | ||||
|         ) | ||||
|     ) | ||||
| 
 | ||||
|  |  | |||
|  | @ -16,9 +16,9 @@ | |||
| 
 | ||||
| 
 | ||||
| def login() -> Client: | ||||
|     if not settings.YANDEX_TOKEN: | ||||
|     if not settings.MUSIC_YANDEX_TOKEN: | ||||
|         raise ConnectionError("No yandex credentials provided") | ||||
|     return Client(settings.YANDEX_TOKEN).init() | ||||
|     return Client(settings.MUSIC_YANDEX_TOKEN).init() | ||||
| 
 | ||||
| 
 | ||||
| def search_ym(name: str): | ||||
|  |  | |||
|  | @ -1,51 +1,111 @@ | |||
| import datetime | ||||
| import os | ||||
| import re | ||||
| from random import randint | ||||
| 
 | ||||
| import requests | ||||
| import yt_dlp | ||||
| from django.conf import settings | ||||
| from PIL import Image | ||||
| from pydub import AudioSegment | ||||
| from pytube import Search, YouTube | ||||
| from yt_dlp import YoutubeDL | ||||
| 
 | ||||
| from akarpov.music.models import Song, SongInQue | ||||
| from akarpov.music.models import Song | ||||
| from akarpov.music.services.db import load_track | ||||
| from akarpov.music.services.spotify import get_track_info | ||||
| 
 | ||||
| final_filename = None | ||||
| 
 | ||||
| 
 | ||||
| ydl_opts = { | ||||
|     "format": "m4a/bestaudio/best", | ||||
|     "postprocessors": [ | ||||
|         {  # Extract audio using ffmpeg | ||||
|             "key": "FFmpegExtractAudio", | ||||
|             "preferredcodec": "m4a", | ||||
|         } | ||||
|     ], | ||||
|     "outtmpl": f"{settings.MEDIA_ROOT}/%(uploader)s_%(title)s.%(ext)s", | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| def download_file(url): | ||||
|     with yt_dlp.YoutubeDL(ydl_opts) as ydl: | ||||
|         info = ydl.extract_info(url) | ||||
|     return info["requested_downloads"][0]["_filename"] | ||||
| 
 | ||||
| 
 | ||||
| def parse_description(description: str) -> list: | ||||
|     # Read the description file | ||||
|     # Split into time and chapter name | ||||
| 
 | ||||
|     list_of_chapters = [] | ||||
| 
 | ||||
|     # only increment chapter number on a chapter line | ||||
|     # chapter lines start with timecode | ||||
|     line_counter = 1 | ||||
|     for line in description.split("\n"): | ||||
|         result = re.search(r"\(?(\d?[:]?\d+[:]\d+)\)?", line) | ||||
|         try: | ||||
|             time_count = datetime.datetime.strptime(result.group(1), "%H:%M:%S") | ||||
|         except Exception: | ||||
|             try: | ||||
|                 time_count = datetime.datetime.strptime(result.group(1), "%M:%S") | ||||
|             except Exception: | ||||
|                 continue | ||||
|         chap_name = line.replace(result.group(0), "").rstrip(" :\n") | ||||
|         chap_pos = ( | ||||
|             time_count.timestamp() - datetime.datetime(1900, 1, 1, 0, 0).timestamp() | ||||
|         ) * 1000 | ||||
|         list_of_chapters.append((str(line_counter).zfill(2), chap_pos, chap_name)) | ||||
|         line_counter += 1 | ||||
| 
 | ||||
|     return list_of_chapters | ||||
| 
 | ||||
| 
 | ||||
| def download_from_youtube_link(link: str) -> Song: | ||||
|     que = SongInQue.objects.create() | ||||
|     try: | ||||
|         yt = YouTube(link) | ||||
|     song = None | ||||
| 
 | ||||
|         if yt.length > 900: | ||||
|             # TODO: add long video splitting | ||||
|             raise ValueError("Track is too long") | ||||
| 
 | ||||
|         if not len(yt.streams): | ||||
|             raise ValueError("There is no such song") | ||||
| 
 | ||||
|         info = get_track_info(yt.title) | ||||
|         que.name = info["title"] | ||||
|         que.save() | ||||
|         if sng := Song.objects.filter( | ||||
|             name=info["title"], album__name=info["album_name"] | ||||
|         ): | ||||
|             que.delete() | ||||
|             return sng.first() | ||||
| 
 | ||||
|         audio = yt.streams.filter(only_audio=True).order_by("abr").desc().first() | ||||
|         orig_path = audio.download(output_path=settings.MEDIA_ROOT) | ||||
|     with YoutubeDL(ydl_opts) as ydl: | ||||
|         info_dict = ydl.extract_info(link, download=False) | ||||
|         title = info_dict.get("title", None) | ||||
|         description = info_dict.get("description", None) | ||||
|     chapters = parse_description(description) | ||||
|     orig_path = download_file(link) | ||||
| 
 | ||||
|     # convert to mp3 | ||||
|     print(f"[processing] {title} converting to mp3") | ||||
|     path = orig_path.replace(orig_path.split(".")[-1], "mp3") | ||||
|     AudioSegment.from_file(orig_path).export(path) | ||||
|     os.remove(orig_path) | ||||
|     print(f"[processing] {title} converting to mp3: done") | ||||
| 
 | ||||
|         # load album image | ||||
|     # split in chapters | ||||
|     if len(chapters) > 1: | ||||
|         sound = AudioSegment.from_mp3(path) | ||||
|         for i in range(len(chapters)): | ||||
|             if i != len(chapters) - 1: | ||||
|                 print( | ||||
|                     f"[processing] loading {chapters[i][2]} from {chapters[i][1] // 1000} to", | ||||
|                     f"{chapters[i + 1][1] // 1000}", | ||||
|                 ) | ||||
|                 st = chapters[i][1] | ||||
|                 end = chapters[i + 1][1] | ||||
|                 audio = sound[st:end] | ||||
|             else: | ||||
|                 print( | ||||
|                     f"[processing] loading {chapters[i][2]} from {chapters[i][1] // 1000}" | ||||
|                 ) | ||||
|                 st = chapters[i][1] | ||||
|                 audio = sound[st:] | ||||
|             chapter_path = path.split(".")[0] + chapters[i][2] + ".mp3" | ||||
|             info = get_track_info(chapters[i][2]) | ||||
|             audio.export(chapter_path, format="mp3") | ||||
|             r = requests.get(info["album_image"]) | ||||
|             img_pth = str( | ||||
|                 settings.MEDIA_ROOT | ||||
|             + f"/{info['album_image'].split('/')[-1]}_{str(randint(100, 999))}.png" | ||||
|                 + f"/{info['album_image'].split('/')[-1]}_{str(randint(100, 999))}" | ||||
|             ) | ||||
|             with open(img_pth, "wb") as f: | ||||
|                 f.write(r.content) | ||||
|  | @ -54,13 +114,60 @@ def download_from_youtube_link(link: str) -> Song: | |||
|             im.save(str(f"{img_pth}.png")) | ||||
| 
 | ||||
|             os.remove(img_pth) | ||||
|             if "genre" in info: | ||||
|                 song = load_track( | ||||
|                     chapter_path, | ||||
|                     f"{img_pth}.png", | ||||
|                     info["artists"], | ||||
|                     info["album_name"], | ||||
|                     chapters[i][2], | ||||
|                     genre=info["genre"], | ||||
|                 ) | ||||
|             else: | ||||
|                 song = load_track( | ||||
|                     chapter_path, | ||||
|                     f"{img_pth}.png", | ||||
|                     info["artists"], | ||||
|                     info["album_name"], | ||||
|                     chapters[i][2], | ||||
|                 ) | ||||
|             os.remove(chapter_path) | ||||
|     else: | ||||
|         print(f"[processing] loading {title}") | ||||
| 
 | ||||
|         load_track(path, img_pth, info["artists"], info["album_name"]) | ||||
|     except Exception as e: | ||||
|         print(e) | ||||
|         que.name = e | ||||
|         que.error = True | ||||
|         que.save() | ||||
|         info = get_track_info(title) | ||||
|         r = requests.get(info["album_image"]) | ||||
|         img_pth = str( | ||||
|             settings.MEDIA_ROOT | ||||
|             + f"/{info['album_image'].split('/')[-1]}_{str(randint(100, 999))}" | ||||
|         ) | ||||
|         with open(img_pth, "wb") as f: | ||||
|             f.write(r.content) | ||||
| 
 | ||||
|         im = Image.open(img_pth) | ||||
|         im.save(str(f"{img_pth}.png")) | ||||
| 
 | ||||
|         os.remove(img_pth) | ||||
|         if "genre" in info: | ||||
|             song = load_track( | ||||
|                 path, | ||||
|                 f"{img_pth}.png", | ||||
|                 info["artists"], | ||||
|                 info["album_name"], | ||||
|                 title, | ||||
|                 genre=info["genre"], | ||||
|             ) | ||||
|         else: | ||||
|             song = load_track( | ||||
|                 path, | ||||
|                 f"{img_pth}.png", | ||||
|                 info["artists"], | ||||
|                 info["album_name"], | ||||
|                 title, | ||||
|             ) | ||||
|     os.remove(path) | ||||
| 
 | ||||
|     return song | ||||
| 
 | ||||
| 
 | ||||
| def search_channel(name): | ||||
|  |  | |||
|  | @ -66,3 +66,8 @@ class Meta: | |||
| 
 | ||||
|     def __str__(self): | ||||
|         return self | ||||
| 
 | ||||
| 
 | ||||
| class UserNotification: | ||||
|     # TODO: add notification system | ||||
|     ... | ||||
|  |  | |||
|  | @ -407,9 +407,9 @@ | |||
| # https://docs.celeryq.dev/en/stable/userguide/configuration.html#std:setting-result_serializer | ||||
| CELERY_RESULT_SERIALIZER = "json" | ||||
| # https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-time-limit | ||||
| CELERY_TASK_TIME_LIMIT = 5 * 60 | ||||
| CELERY_TASK_TIME_LIMIT = 20 * 60 | ||||
| # https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-soft-time-limit | ||||
| CELERY_TASK_SOFT_TIME_LIMIT = 60 | ||||
| CELERY_TASK_SOFT_TIME_LIMIT = 10 * 60 | ||||
| # https://docs.celeryq.dev/en/stable/userguide/configuration.html#beat-scheduler | ||||
| CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler" | ||||
| 
 | ||||
|  |  | |||
							
								
								
									
										511
									
								
								poetry.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										511
									
								
								poetry.lock
									
									
									
										generated
									
									
									
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							|  | @ -73,7 +73,6 @@ pydotplus = "^2.0.2" | |||
| preview-generator = "^0.29" | ||||
| uuid = "^1.30" | ||||
| mutagen = "^1.46.0" | ||||
| pytube = "^12.1.3" | ||||
| ytmusicapi = "^1.0.2" | ||||
| pydub = "^0.25.1" | ||||
| python-mpd2 = "^3.0.5" | ||||
|  | @ -97,6 +96,10 @@ django-ipware = "^5.0.0" | |||
| fastapi = {extras = ["all"], version = "^0.101.0"} | ||||
| sqlalchemy = "^2.0.19" | ||||
| pydantic-settings = "^2.0.2" | ||||
| yt-dlp = "^2023.7.6" | ||||
| pytube = "^15.0.0" | ||||
| urllib3 = ">=1.26" | ||||
| requests = ">=2.25" | ||||
| 
 | ||||
| 
 | ||||
| [build-system] | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	Block a user