forked from computertech/techdj
106 lines
3.8 KiB
Python
106 lines
3.8 KiB
Python
import requests
|
|
import os
|
|
import re
|
|
|
|
def clean_filename(title):
|
|
# Remove quotes and illegal characters
|
|
title = title.strip("'").strip('"')
|
|
return re.sub(r'[\\/*?:"<>|]', "", title)
|
|
|
|
def download_mp3(url, quality='320'):
|
|
print(f"\n🔍 Processing: {url}")
|
|
|
|
try:
|
|
# Use Cobalt v9 API to download
|
|
print("🌐 Requesting download from Cobalt API v9...")
|
|
|
|
response = requests.post(
|
|
'https://api.cobalt.tools/api/v9/process',
|
|
headers={
|
|
'Accept': 'application/json',
|
|
'Content-Type': 'application/json'
|
|
},
|
|
json={
|
|
'url': url,
|
|
'downloadMode': 'audio',
|
|
'audioFormat': 'mp3'
|
|
},
|
|
timeout=30
|
|
)
|
|
|
|
print(f"📡 API Response Status: {response.status_code}")
|
|
|
|
if response.status_code != 200:
|
|
try:
|
|
error_data = response.json()
|
|
print(f"❌ Cobalt API error: {error_data}")
|
|
except:
|
|
print(f"❌ Cobalt API error: {response.text}")
|
|
return {"success": False, "error": f"API returned {response.status_code}"}
|
|
|
|
data = response.json()
|
|
print(f"📦 API Response: {data}")
|
|
|
|
# Check for errors in response
|
|
if data.get('status') == 'error':
|
|
error_msg = data.get('text', 'Unknown error')
|
|
print(f"❌ Cobalt error: {error_msg}")
|
|
return {"success": False, "error": error_msg}
|
|
|
|
# Get download URL
|
|
download_url = data.get('url')
|
|
if not download_url:
|
|
print(f"❌ No download URL in response: {data}")
|
|
return {"success": False, "error": "No download URL received"}
|
|
|
|
print(f"📥 Downloading audio...")
|
|
|
|
# Download the audio file
|
|
audio_response = requests.get(download_url, stream=True, timeout=60)
|
|
|
|
if audio_response.status_code != 200:
|
|
print(f"❌ Download failed: {audio_response.status_code}")
|
|
return {"success": False, "error": f"Download failed with status {audio_response.status_code}"}
|
|
|
|
# Try to get filename from Content-Disposition header
|
|
content_disposition = audio_response.headers.get('Content-Disposition', '')
|
|
if 'filename=' in content_disposition:
|
|
filename = content_disposition.split('filename=')[1].strip('"')
|
|
filename = clean_filename(os.path.splitext(filename)[0])
|
|
else:
|
|
# Fallback: extract video ID and use it
|
|
video_id = url.split('v=')[-1].split('&')[0]
|
|
filename = f"youtube_{video_id}"
|
|
|
|
# Ensure .mp3 extension
|
|
output_path = f"music/{filename}.mp3"
|
|
|
|
# Save the file
|
|
with open(output_path, 'wb') as f:
|
|
for chunk in audio_response.iter_content(chunk_size=8192):
|
|
f.write(chunk)
|
|
|
|
print(f"✅ Success! Saved as: {filename}.mp3")
|
|
print(" (Hit Refresh in the App)")
|
|
return {"success": True, "title": filename}
|
|
|
|
except requests.exceptions.Timeout:
|
|
print("❌ Request timed out")
|
|
return {"success": False, "error": "Request timed out"}
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"❌ Network error: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
except Exception as e:
|
|
print(f"❌ Error: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
if __name__ == "__main__":
|
|
if not os.path.exists("music"):
|
|
os.makedirs("music")
|
|
|
|
print("--- TECHDJ DOWNLOADER (via Cobalt API) ---")
|
|
while True:
|
|
url = input("\n🔗 URL (q to quit): ").strip()
|
|
if url.lower() == 'q': break
|
|
if url: download_mp3(url)
|