Files
techdj-test/server.py
2026-01-02 22:29:32 -06:00

451 lines
14 KiB
Python

# Monkey patch MUST be first - before any other imports!
import eventlet
eventlet.monkey_patch()
import os
import subprocess
import threading
import queue
import time
from flask import Flask, send_from_directory, jsonify, request, session, Response, stream_with_context
from flask_socketio import SocketIO, emit
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
import downloader
# Relay State
broadcast_state = {
'active': False,
'mimeType': None,
}
listener_sids = set()
dj_sids = set()
# === Optional MP3 fallback stream (server-side transcoding) ===
# This allows listeners on browsers that don't support WebM/Opus via MediaSource
# (notably some Safari / locked-down environments) to still hear the stream.
_ffmpeg_proc = None
_ffmpeg_in_q = queue.Queue(maxsize=200)
_mp3_clients = set() # set[queue.Queue]
_mp3_lock = threading.Lock()
_transcode_threads_started = False
_transcoder_bytes_out = 0
_transcoder_last_error = None
_last_audio_chunk_ts = 0.0
def _start_transcoder_if_needed():
global _ffmpeg_proc, _transcode_threads_started
if _ffmpeg_proc is not None and _ffmpeg_proc.poll() is None:
return
cmd = [
'ffmpeg',
'-hide_banner',
'-loglevel', 'error',
'-i', 'pipe:0',
'-vn',
'-acodec', 'libmp3lame',
'-b:a', '192k',
'-f', 'mp3',
'pipe:1',
]
try:
_ffmpeg_proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
)
except FileNotFoundError:
_ffmpeg_proc = None
print('⚠️ ffmpeg not found; /stream.mp3 fallback disabled')
return
print('🎛️ ffmpeg transcoder started for /stream.mp3')
def _writer():
global _transcoder_last_error
while True:
chunk = _ffmpeg_in_q.get()
if chunk is None:
break
proc = _ffmpeg_proc
if proc is None or proc.stdin is None:
continue
try:
proc.stdin.write(chunk)
except Exception:
# If ffmpeg dies or pipe breaks, just stop writing.
_transcoder_last_error = 'stdin write failed'
break
def _reader():
global _transcoder_bytes_out, _transcoder_last_error
proc = _ffmpeg_proc
if proc is None or proc.stdout is None:
return
while True:
try:
data = proc.stdout.read(4096)
except Exception:
_transcoder_last_error = 'stdout read failed'
break
if not data:
break
_transcoder_bytes_out += len(data)
with _mp3_lock:
clients = list(_mp3_clients)
for q in clients:
try:
q.put_nowait(data)
except Exception:
# Drop if client queue is full or gone.
pass
if not _transcode_threads_started:
threading.Thread(target=_writer, daemon=True).start()
threading.Thread(target=_reader, daemon=True).start()
_transcode_threads_started = True
def _stop_transcoder():
global _ffmpeg_proc
try:
_ffmpeg_in_q.put_nowait(None)
except Exception:
pass
proc = _ffmpeg_proc
_ffmpeg_proc = None
if proc is None:
return
try:
proc.terminate()
except Exception:
pass
def _feed_transcoder(data: bytes):
global _last_audio_chunk_ts
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
return
_last_audio_chunk_ts = time.time()
try:
_ffmpeg_in_q.put_nowait(data)
except Exception:
# Queue full; drop to keep latency bounded.
pass
MUSIC_FOLDER = "music"
# Ensure music folder exists
if not os.path.exists(MUSIC_FOLDER):
os.makedirs(MUSIC_FOLDER)
# Helper for shared routes
def setup_shared_routes(app):
@app.route('/library.json')
def get_library():
library = []
if os.path.exists(MUSIC_FOLDER):
for filename in sorted(os.listdir(MUSIC_FOLDER)):
if filename.lower().endswith(('.mp3', '.m4a', '.wav', '.flac', '.ogg')):
library.append({
"title": os.path.splitext(filename)[0],
"file": f"music/{filename}"
})
return jsonify(library)
@app.route('/download', methods=['POST'])
def download():
data = request.get_json(silent=True) or {}
url = data.get('url')
quality = data.get('quality', '320')
if not url:
return jsonify({"success": False, "error": "No URL provided"}), 400
result = downloader.download_mp3(url, quality)
return jsonify(result)
@app.route('/search_youtube', methods=['GET'])
def search_youtube():
query = request.args.get('q', '')
if not query:
return jsonify({"success": False, "error": "No query provided"}), 400
# Get API key from environment variable
api_key = os.environ.get('YOUTUBE_API_KEY', '')
if not api_key:
return jsonify({
"success": False,
"error": "YouTube API key not configured. Set YOUTUBE_API_KEY environment variable."
}), 500
try:
import requests
# Search YouTube using Data API v3
url = 'https://www.googleapis.com/youtube/v3/search'
params = {
'part': 'snippet',
'q': query,
'type': 'video',
'videoCategoryId': '10', # Music category
'maxResults': 20,
'key': api_key
}
response = requests.get(url, params=params)
data = response.json()
if 'error' in data:
return jsonify({
"success": False,
"error": data['error'].get('message', 'YouTube API error')
}), 400
# Format results
results = []
for item in data.get('items', []):
results.append({
'videoId': item['id']['videoId'],
'title': item['snippet']['title'],
'channel': item['snippet']['channelTitle'],
'thumbnail': item['snippet']['thumbnails']['medium']['url'],
'url': f"https://www.youtube.com/watch?v={item['id']['videoId']}"
})
return jsonify({"success": True, "results": results})
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/<path:filename>')
def serve_static(filename):
response = send_from_directory('.', filename)
if filename.endswith(('.css', '.js', '.html')):
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
return response
@app.route('/')
def index():
return send_from_directory('.', 'index.html')
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({"success": False, "error": "No file provided"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"success": False, "error": "No file selected"}), 400
if not file.filename.endswith('.mp3'):
return jsonify({"success": False, "error": "Only MP3 files are allowed"}), 400
# Sanitize filename (keep extension)
import re
name_without_ext = os.path.splitext(file.filename)[0]
name_without_ext = re.sub(r'[^\w\s-]', '', name_without_ext)
name_without_ext = re.sub(r'[-\s]+', '-', name_without_ext)
filename = f"{name_without_ext}.mp3"
filepath = os.path.join(MUSIC_FOLDER, filename)
try:
file.save(filepath)
print(f"✅ Uploaded: {filename}")
return jsonify({"success": True, "filename": filename})
except Exception as e:
print(f"❌ Upload error: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/stream.mp3')
def stream_mp3():
# Streaming response from the ffmpeg transcoder output.
# If ffmpeg isn't available, return 503.
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
return jsonify({"success": False, "error": "MP3 stream not available"}), 503
client_q: queue.Queue = queue.Queue(maxsize=200)
with _mp3_lock:
_mp3_clients.add(client_q)
def gen():
try:
while True:
chunk = client_q.get()
if chunk is None:
break
yield chunk
finally:
with _mp3_lock:
_mp3_clients.discard(client_q)
return Response(
stream_with_context(gen()),
mimetype='audio/mpeg',
headers={
'Cache-Control': 'no-store, no-cache, must-revalidate, max-age=0',
'Connection': 'keep-alive',
},
)
@app.route('/stream_debug')
def stream_debug():
proc = _ffmpeg_proc
running = proc is not None and proc.poll() is None
return jsonify({
'broadcast_active': broadcast_state.get('active', False),
'broadcast_mimeType': broadcast_state.get('mimeType'),
'ffmpeg_running': running,
'ffmpeg_found': (proc is not None),
'mp3_clients': len(_mp3_clients),
'transcoder_bytes_out': _transcoder_bytes_out,
'transcoder_last_error': _transcoder_last_error,
'last_audio_chunk_ts': _last_audio_chunk_ts,
})
# === DJ SERVER (Port 5000) ===
dj_app = Flask(__name__, static_folder='.', static_url_path='')
dj_app.config['SECRET_KEY'] = 'dj_panel_secret'
setup_shared_routes(dj_app)
dj_socketio = SocketIO(
dj_app,
cors_allowed_origins="*",
async_mode='eventlet',
max_http_buffer_size=1e8, # 100MB buffer
ping_timeout=10,
ping_interval=5,
logger=False,
engineio_logger=False
)
@dj_socketio.on('connect')
def dj_connect():
print(f"🎧 DJ connected: {request.sid}")
dj_sids.add(request.sid)
@dj_socketio.on('disconnect')
def dj_disconnect():
dj_sids.discard(request.sid)
print("⚠️ DJ disconnected - broadcast will continue until manually stopped")
def stop_broadcast_after_timeout():
"""No longer used - broadcasts don't auto-stop"""
pass
@dj_socketio.on('start_broadcast')
def dj_start(data=None):
mime_type = None
if isinstance(data, dict):
mime_type = data.get('mimeType') or None
broadcast_state['active'] = True
broadcast_state['mimeType'] = mime_type
session['is_dj'] = True
print("🎙️ Broadcast -> ACTIVE")
_start_transcoder_if_needed()
listener_socketio.emit('broadcast_started', namespace='/')
listener_socketio.emit('stream_status', {'active': True}, namespace='/')
if mime_type:
listener_socketio.emit('stream_mime', {'mimeType': mime_type}, namespace='/')
@dj_socketio.on('stop_broadcast')
def dj_stop():
broadcast_state['active'] = False
broadcast_state['mimeType'] = None
session['is_dj'] = False
print("🛑 DJ stopped broadcasting")
_stop_transcoder()
listener_socketio.emit('broadcast_stopped', namespace='/')
listener_socketio.emit('stream_status', {'active': False}, namespace='/')
@dj_socketio.on('audio_chunk')
def dj_audio(data):
# Relay audio chunk to all listeners immediately
if broadcast_state['active']:
listener_socketio.emit('audio_data', data, namespace='/')
# Ensure MP3 fallback transcoder is running (if ffmpeg is installed)
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
_start_transcoder_if_needed()
if isinstance(data, (bytes, bytearray)):
_feed_transcoder(bytes(data))
# === LISTENER SERVER (Port 5001) ===
listener_app = Flask(__name__, static_folder='.', static_url_path='')
listener_app.config['SECRET_KEY'] = 'listener_secret'
setup_shared_routes(listener_app)
listener_socketio = SocketIO(
listener_app,
cors_allowed_origins="*",
async_mode='eventlet',
max_http_buffer_size=1e8, # 100MB buffer
ping_timeout=10,
ping_interval=5,
logger=False,
engineio_logger=False
)
@listener_socketio.on('connect')
def listener_connect():
print(f"👂 Listener Socket Connected: {request.sid}")
@listener_socketio.on('disconnect')
def listener_disconnect():
if request.sid in listener_sids:
listener_sids.discard(request.sid)
count = len(listener_sids)
print(f"❌ Listener left. Total: {count}")
listener_socketio.emit('listener_count', {'count': count}, namespace='/')
dj_socketio.emit('listener_count', {'count': count}, namespace='/')
@listener_socketio.on('join_listener')
def listener_join():
if request.sid not in listener_sids:
listener_sids.add(request.sid)
count = len(listener_sids)
print(f"👂 New listener joined. Total: {count}")
listener_socketio.emit('listener_count', {'count': count}, namespace='/')
dj_socketio.emit('listener_count', {'count': count}, namespace='/')
emit('stream_status', {'active': broadcast_state['active']})
if broadcast_state.get('mimeType'):
emit('stream_mime', {'mimeType': broadcast_state['mimeType']})
@listener_socketio.on('get_listener_count')
def listener_get_count():
emit('listener_count', {'count': len(listener_sids)})
# DJ Panel Routes (No engine commands needed in local mode)
@dj_socketio.on('get_mixer_status')
def get_mixer_status():
pass
@dj_socketio.on('audio_sync_queue')
def audio_sync_queue(data):
pass
if __name__ == '__main__':
print("=" * 50)
print("🎧 TECHDJ PRO - DUAL PORT ARCHITECTURE")
print("=" * 50)
print("👉 DJ PANEL: http://localhost:5000")
print("👉 LISTEN PAGE: http://localhost:5001")
print("=" * 50)
# Audio engine DISABLED
print("✅ Local Radio server ready")
# Run both servers using eventlet's spawn
eventlet.spawn(dj_socketio.run, dj_app, host='0.0.0.0', port=5000, debug=False)
listener_socketio.run(listener_app, host='0.0.0.0', port=5001, debug=False)