Files
techdj-test/downloader.py

195 lines
6.4 KiB
Python

import requests
import os
import re
import shutil
def clean_filename(title):
# Remove quotes and illegal characters
title = title.strip("'").strip('"')
return re.sub(r'[\\/*?:"<>|]', "", title)
def _ensure_music_dir():
if not os.path.exists("music"):
os.makedirs("music")
def _normalize_quality_kbps(quality):
try:
q = int(str(quality).strip())
except Exception:
q = 320
# Clamp to the expected UI values
if q <= 128:
return 128
if q <= 192:
return 192
return 320
def _can_use_ytdlp():
# yt-dlp needs ffmpeg to reliably output MP3.
if shutil.which("ffmpeg") is None:
return False
try:
import yt_dlp # noqa: F401
return True
except Exception:
return False
def _download_with_ytdlp(url, quality_kbps):
import yt_dlp
_ensure_music_dir()
# Force a predictable output location. yt-dlp will sanitize filenames.
outtmpl = os.path.join("music", "%(title)s.%(ext)s")
ydl_opts = {
"format": "bestaudio/best",
"outtmpl": outtmpl,
"noplaylist": True,
"quiet": True,
"no_warnings": True,
"overwrites": False,
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
}
],
# Best-effort attempt to honor the UI bitrate selector.
# Note: This depends on ffmpeg, and the source may not have enough fidelity.
"postprocessor_args": ["-b:a", f"{quality_kbps}k"],
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
# Handle playlist-like responses defensively even though noplaylist=True.
if isinstance(info, dict) and "entries" in info and info["entries"]:
info = info["entries"][0]
# Compute final mp3 path.
# prepare_filename returns the pre-postprocessed path, so swap extension.
base_path = None
if isinstance(info, dict):
try:
base_path = yt_dlp.YoutubeDL({"outtmpl": outtmpl}).prepare_filename(info)
except Exception:
base_path = None
if base_path:
mp3_path = os.path.splitext(base_path)[0] + ".mp3"
title = os.path.splitext(os.path.basename(mp3_path))[0]
return {"success": True, "title": title}
# Fallback return if we couldn't derive paths
return {"success": True, "title": "downloaded"}
def download_mp3(url, quality='320'):
print(f"\n🔍 Processing: {url}")
quality_kbps = _normalize_quality_kbps(quality)
# Prefer yt-dlp for YouTube because it can actually control MP3 output bitrate.
if _can_use_ytdlp():
try:
print(f"⬇️ Downloading via yt-dlp @ {quality_kbps}kbps...")
return _download_with_ytdlp(url, quality_kbps)
except Exception as e:
# If yt-dlp fails for any reason, fall back to the existing Cobalt flow.
print(f"⚠️ yt-dlp failed, falling back to Cobalt: {e}")
try:
# Use Cobalt v9 API to download
print("🌐 Requesting download from Cobalt API v9...")
_ensure_music_dir()
response = requests.post(
'https://api.cobalt.tools/api/v9/process',
headers={
'Accept': 'application/json',
'Content-Type': 'application/json'
},
json={
'url': url,
'downloadMode': 'audio',
'audioFormat': 'mp3'
},
timeout=30
)
print(f"📡 API Response Status: {response.status_code}")
if response.status_code != 200:
try:
error_data = response.json()
print(f"❌ Cobalt API error: {error_data}")
except:
print(f"❌ Cobalt API error: {response.text}")
return {"success": False, "error": f"API returned {response.status_code}"}
data = response.json()
print(f"📦 API Response: {data}")
# Check for errors in response
if data.get('status') == 'error':
error_msg = data.get('text', 'Unknown error')
print(f"❌ Cobalt error: {error_msg}")
return {"success": False, "error": error_msg}
# Get download URL
download_url = data.get('url')
if not download_url:
print(f"❌ No download URL in response: {data}")
return {"success": False, "error": "No download URL received"}
print(f"📥 Downloading audio...")
# Download the audio file
audio_response = requests.get(download_url, stream=True, timeout=60)
if audio_response.status_code != 200:
print(f"❌ Download failed: {audio_response.status_code}")
return {"success": False, "error": f"Download failed with status {audio_response.status_code}"}
# Try to get filename from Content-Disposition header
content_disposition = audio_response.headers.get('Content-Disposition', '')
if 'filename=' in content_disposition:
filename = content_disposition.split('filename=')[1].strip('"')
filename = clean_filename(os.path.splitext(filename)[0])
else:
# Fallback: extract video ID and use it
video_id = url.split('v=')[-1].split('&')[0]
filename = f"youtube_{video_id}"
# Ensure .mp3 extension
output_path = f"music/{filename}.mp3"
# Save the file
with open(output_path, 'wb') as f:
for chunk in audio_response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"✅ Success! Saved as: {filename}.mp3")
print(" (Hit Refresh in the App)")
return {"success": True, "title": filename}
except requests.exceptions.Timeout:
print("❌ Request timed out")
return {"success": False, "error": "Request timed out"}
except requests.exceptions.RequestException as e:
print(f"❌ Network error: {e}")
return {"success": False, "error": str(e)}
except Exception as e:
print(f"❌ Error: {e}")
return {"success": False, "error": str(e)}
if __name__ == "__main__":
_ensure_music_dir()
print("--- TECHDJ DOWNLOADER (via Cobalt API) ---")
while True:
url = input("\n🔗 URL (q to quit): ").strip()
if url.lower() == 'q': break
if url: download_mp3(url)