211 lines
7.0 KiB
Python
211 lines
7.0 KiB
Python
import requests
|
|
import os
|
|
import re
|
|
import shutil
|
|
|
|
def clean_filename(title):
|
|
# Remove quotes and illegal characters
|
|
title = title.strip("'").strip('"')
|
|
return re.sub(r'[\\/*?:"<>|]', "", title)
|
|
|
|
def _ensure_music_dir():
|
|
if not os.path.exists("music"):
|
|
os.makedirs("music")
|
|
|
|
def _normalize_quality_kbps(quality):
|
|
try:
|
|
q = int(str(quality).strip())
|
|
except Exception:
|
|
q = 320
|
|
# Clamp to the expected UI values
|
|
if q <= 128:
|
|
return 128
|
|
if q <= 192:
|
|
return 192
|
|
return 320
|
|
|
|
def _can_use_ytdlp():
|
|
# yt-dlp needs ffmpeg to reliably output MP3.
|
|
if shutil.which("ffmpeg") is None:
|
|
return False
|
|
try:
|
|
import yt_dlp # noqa: F401
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
def _download_with_ytdlp(url, quality_kbps):
|
|
import yt_dlp
|
|
|
|
_ensure_music_dir()
|
|
|
|
# Force a predictable output location. yt-dlp will sanitize filenames.
|
|
outtmpl = os.path.join("music", "%(title)s.%(ext)s")
|
|
|
|
ydl_opts = {
|
|
"format": "bestaudio/best",
|
|
"outtmpl": outtmpl,
|
|
"noplaylist": True,
|
|
"quiet": True,
|
|
"no_warnings": True,
|
|
"overwrites": False,
|
|
"postprocessors": [
|
|
{
|
|
"key": "FFmpegExtractAudio",
|
|
"preferredcodec": "mp3",
|
|
}
|
|
],
|
|
# Best-effort attempt to honor the UI bitrate selector.
|
|
# Note: This depends on ffmpeg, and the source may not have enough fidelity.
|
|
"postprocessor_args": ["-b:a", f"{quality_kbps}k"],
|
|
}
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
info = ydl.extract_info(url, download=True)
|
|
|
|
# Handle playlist-like responses defensively even though noplaylist=True.
|
|
if isinstance(info, dict) and "entries" in info and info["entries"]:
|
|
info = info["entries"][0]
|
|
|
|
# Compute final mp3 path.
|
|
# prepare_filename returns the pre-postprocessed path, so swap extension.
|
|
base_path = None
|
|
if isinstance(info, dict):
|
|
try:
|
|
base_path = yt_dlp.YoutubeDL({"outtmpl": outtmpl}).prepare_filename(info)
|
|
except Exception:
|
|
base_path = None
|
|
|
|
if base_path:
|
|
mp3_path = os.path.splitext(base_path)[0] + ".mp3"
|
|
title = os.path.splitext(os.path.basename(mp3_path))[0]
|
|
return {"success": True, "title": title}
|
|
|
|
# Fallback return if we couldn't derive paths
|
|
return {"success": True, "title": "downloaded"}
|
|
|
|
def download_mp3(url, quality='320'):
|
|
print(f"\n🔍 Processing: {url}")
|
|
|
|
quality_kbps = _normalize_quality_kbps(quality)
|
|
|
|
# Prefer yt-dlp for YouTube because it can actually control MP3 output bitrate.
|
|
if _can_use_ytdlp():
|
|
try:
|
|
print(f"✨ Using yt-dlp (preferred method)")
|
|
print(f"⬇️ Downloading @ {quality_kbps}kbps...")
|
|
return _download_with_ytdlp(url, quality_kbps)
|
|
except Exception as e:
|
|
# If yt-dlp fails for any reason, fall back to the existing Cobalt flow.
|
|
print(f"⚠️ yt-dlp failed, falling back to Cobalt API: {e}")
|
|
else:
|
|
# Check what's missing
|
|
has_ffmpeg = shutil.which("ffmpeg") is not None
|
|
has_ytdlp = False
|
|
try:
|
|
import yt_dlp # noqa: F401
|
|
has_ytdlp = True
|
|
except:
|
|
pass
|
|
|
|
if not has_ffmpeg:
|
|
print("⚠️ ffmpeg not found - using Cobalt API fallback")
|
|
if not has_ytdlp:
|
|
print("⚠️ yt-dlp not installed - using Cobalt API fallback")
|
|
print(" 💡 Install with: pip install yt-dlp")
|
|
|
|
try:
|
|
# Use Cobalt v9 API to download
|
|
print("🌐 Requesting download from Cobalt API v9...")
|
|
|
|
_ensure_music_dir()
|
|
|
|
response = requests.post(
|
|
'https://api.cobalt.tools/api/v9/process',
|
|
headers={
|
|
'Accept': 'application/json',
|
|
'Content-Type': 'application/json'
|
|
},
|
|
json={
|
|
'url': url,
|
|
'downloadMode': 'audio',
|
|
'audioFormat': 'mp3'
|
|
},
|
|
timeout=30
|
|
)
|
|
|
|
print(f"📡 API Response Status: {response.status_code}")
|
|
|
|
if response.status_code != 200:
|
|
try:
|
|
error_data = response.json()
|
|
print(f"❌ Cobalt API error: {error_data}")
|
|
except:
|
|
print(f"❌ Cobalt API error: {response.text}")
|
|
return {"success": False, "error": f"API returned {response.status_code}"}
|
|
|
|
data = response.json()
|
|
print(f"📦 API Response: {data}")
|
|
|
|
# Check for errors in response
|
|
if data.get('status') == 'error':
|
|
error_msg = data.get('text', 'Unknown error')
|
|
print(f"❌ Cobalt error: {error_msg}")
|
|
return {"success": False, "error": error_msg}
|
|
|
|
# Get download URL
|
|
download_url = data.get('url')
|
|
if not download_url:
|
|
print(f"❌ No download URL in response: {data}")
|
|
return {"success": False, "error": "No download URL received"}
|
|
|
|
print(f"📥 Downloading audio...")
|
|
|
|
# Download the audio file
|
|
audio_response = requests.get(download_url, stream=True, timeout=60)
|
|
|
|
if audio_response.status_code != 200:
|
|
print(f"❌ Download failed: {audio_response.status_code}")
|
|
return {"success": False, "error": f"Download failed with status {audio_response.status_code}"}
|
|
|
|
# Try to get filename from Content-Disposition header
|
|
content_disposition = audio_response.headers.get('Content-Disposition', '')
|
|
if 'filename=' in content_disposition:
|
|
filename = content_disposition.split('filename=')[1].strip('"')
|
|
filename = clean_filename(os.path.splitext(filename)[0])
|
|
else:
|
|
# Fallback: extract video ID and use it
|
|
video_id = url.split('v=')[-1].split('&')[0]
|
|
filename = f"youtube_{video_id}"
|
|
|
|
# Ensure .mp3 extension
|
|
output_path = f"music/{filename}.mp3"
|
|
|
|
# Save the file
|
|
with open(output_path, 'wb') as f:
|
|
for chunk in audio_response.iter_content(chunk_size=8192):
|
|
f.write(chunk)
|
|
|
|
print(f"✅ Success! Saved as: {filename}.mp3")
|
|
print(" (Hit Refresh in the App)")
|
|
return {"success": True, "title": filename}
|
|
|
|
except requests.exceptions.Timeout:
|
|
print("❌ Request timed out")
|
|
return {"success": False, "error": "Request timed out"}
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"❌ Network error: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
except Exception as e:
|
|
print(f"❌ Error: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
if __name__ == "__main__":
|
|
_ensure_music_dir()
|
|
|
|
print("--- TECHDJ DOWNLOADER (via Cobalt API) ---")
|
|
while True:
|
|
url = input("\n🔗 URL (q to quit): ").strip()
|
|
if url.lower() == 'q': break
|
|
if url: download_mp3(url)
|