Files
techdj-test/downloader.py
2026-01-02 19:37:56 +00:00

106 lines
3.8 KiB
Python

import requests
import os
import re
def clean_filename(title):
# Remove quotes and illegal characters
title = title.strip("'").strip('"')
return re.sub(r'[\\/*?:"<>|]', "", title)
def download_mp3(url, quality='320'):
print(f"\n🔍 Processing: {url}")
try:
# Use Cobalt v9 API to download
print("🌐 Requesting download from Cobalt API v9...")
response = requests.post(
'https://api.cobalt.tools/api/v9/process',
headers={
'Accept': 'application/json',
'Content-Type': 'application/json'
},
json={
'url': url,
'downloadMode': 'audio',
'audioFormat': 'mp3'
},
timeout=30
)
print(f"📡 API Response Status: {response.status_code}")
if response.status_code != 200:
try:
error_data = response.json()
print(f"❌ Cobalt API error: {error_data}")
except:
print(f"❌ Cobalt API error: {response.text}")
return {"success": False, "error": f"API returned {response.status_code}"}
data = response.json()
print(f"📦 API Response: {data}")
# Check for errors in response
if data.get('status') == 'error':
error_msg = data.get('text', 'Unknown error')
print(f"❌ Cobalt error: {error_msg}")
return {"success": False, "error": error_msg}
# Get download URL
download_url = data.get('url')
if not download_url:
print(f"❌ No download URL in response: {data}")
return {"success": False, "error": "No download URL received"}
print(f"📥 Downloading audio...")
# Download the audio file
audio_response = requests.get(download_url, stream=True, timeout=60)
if audio_response.status_code != 200:
print(f"❌ Download failed: {audio_response.status_code}")
return {"success": False, "error": f"Download failed with status {audio_response.status_code}"}
# Try to get filename from Content-Disposition header
content_disposition = audio_response.headers.get('Content-Disposition', '')
if 'filename=' in content_disposition:
filename = content_disposition.split('filename=')[1].strip('"')
filename = clean_filename(os.path.splitext(filename)[0])
else:
# Fallback: extract video ID and use it
video_id = url.split('v=')[-1].split('&')[0]
filename = f"youtube_{video_id}"
# Ensure .mp3 extension
output_path = f"music/{filename}.mp3"
# Save the file
with open(output_path, 'wb') as f:
for chunk in audio_response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"✅ Success! Saved as: {filename}.mp3")
print(" (Hit Refresh in the App)")
return {"success": True, "title": filename}
except requests.exceptions.Timeout:
print("❌ Request timed out")
return {"success": False, "error": "Request timed out"}
except requests.exceptions.RequestException as e:
print(f"❌ Network error: {e}")
return {"success": False, "error": str(e)}
except Exception as e:
print(f"❌ Error: {e}")
return {"success": False, "error": str(e)}
if __name__ == "__main__":
if not os.path.exists("music"):
os.makedirs("music")
print("--- TECHDJ DOWNLOADER (via Cobalt API) ---")
while True:
url = input("\n🔗 URL (q to quit): ").strip()
if url.lower() == 'q': break
if url: download_mp3(url)