techdj/server.py

900 lines
34 KiB
Python

# Monkey patch MUST be first - before any other imports!
import eventlet
eventlet.monkey_patch()
import os
import json
import re
import subprocess
import threading
import queue
import time
import collections
from flask import Flask, send_from_directory, jsonify, request, session, Response, stream_with_context, abort
from flask_socketio import SocketIO, emit
from dotenv import load_dotenv
load_dotenv()
def _load_config():
"""Loads optional config.json from the project root.
If missing or invalid, returns an empty dict.
"""
try:
with open('config.json', 'r', encoding='utf-8') as f:
data = json.load(f)
return data if isinstance(data, dict) else {}
except FileNotFoundError:
return {}
except Exception:
return {}
CONFIG = _load_config()
# --- Config-driven settings (config.json > env vars > defaults) ---
CONFIG_HOST = (CONFIG.get('host') or os.environ.get('HOST') or '0.0.0.0').strip()
CONFIG_DJ_PORT = int(CONFIG.get('dj_port') or os.environ.get('DJ_PORT') or 5000)
CONFIG_LISTENER_PORT = int(CONFIG.get('listener_port') or os.environ.get('LISTEN_PORT') or 5001)
CONFIG_SECRET = (CONFIG.get('secret_key') or '').strip() or 'dj_panel_secret'
CONFIG_CORS = CONFIG.get('cors_origins', '*')
CONFIG_MAX_UPLOAD_MB = int(CONFIG.get('max_upload_mb') or 500)
CONFIG_DEBUG = bool(CONFIG.get('debug', False))
CONFIG_LISTENER_URL = (CONFIG.get('listener_url') or '').strip()
DJ_PANEL_PASSWORD = (CONFIG.get('dj_panel_password') or '').strip()
DJ_AUTH_ENABLED = bool(DJ_PANEL_PASSWORD)
# Broadcast State
broadcast_state = {
'active': False,
}
listener_sids = set()
dj_sids = set()
# Grace-period greenlet: auto-stop broadcast if DJ doesn't reconnect in time
_dj_grace_greenlet = None
DJ_GRACE_PERIOD_SECS = 20 # seconds to wait before auto-stopping
# === Optional MP3 fallback stream (server-side transcoding) ===
_ffmpeg_proc = None
_ffmpeg_in_q = queue.Queue(maxsize=20)
_current_bitrate = (CONFIG.get('stream_bitrate') or '192k').strip()
_mp3_clients = set() # set[queue.Queue]
_mp3_lock = threading.Lock()
_transcoder_bytes_out = 0
_transcoder_last_error = None
_last_audio_chunk_ts = 0.0
_mp3_preroll = collections.deque(maxlen=1024) # ~83s at 96kbps for fast reconnect buffer fill
def _start_transcoder_if_needed(is_mp3_input=False):
global _ffmpeg_proc, _transcoder_last_error
# If already running, check if we need to restart
if _ffmpeg_proc is not None and _ffmpeg_proc.poll() is None:
return
# Ensure stale process is cleaned up
if _ffmpeg_proc:
try: _ffmpeg_proc.terminate()
except: pass
_ffmpeg_proc = None
codec = 'copy' if is_mp3_input else 'libmp3lame'
cmd = [
'ffmpeg',
'-hide_banner',
'-loglevel', 'error',
'-fflags', 'nobuffer',
'-flags', 'low_delay',
'-probesize', '32',
'-analyzeduration', '0'
]
if is_mp3_input:
cmd.extend(['-f', 'mp3'])
cmd.extend([
'-i', 'pipe:0',
'-vn',
'-acodec', codec,
])
if not is_mp3_input:
cmd.extend(['-b:a', _current_bitrate])
cmd.extend([
'-flush_packets', '1',
'-f', 'mp3',
'pipe:1',
])
try:
_ffmpeg_proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
)
except FileNotFoundError:
_ffmpeg_proc = None
print('WARNING: ffmpeg not found; /stream.mp3 fallback disabled')
return
mode_str = "PASSTHROUGH (copy)" if is_mp3_input else f"TRANSCODE ({_current_bitrate})"
print(f'INFO: ffmpeg transcoder started ({mode_str})')
_transcoder_last_error = None
# Clear queue to avoid stale data
while not _ffmpeg_in_q.empty():
try: _ffmpeg_in_q.get_nowait()
except: break
# Define threads INSIDE so they close over THIS specific 'proc'
def _writer(proc):
global _transcoder_last_error
print(f"[THREAD] Transcoder writer started (PID: {proc.pid})")
while proc.poll() is None:
try:
chunk = _ffmpeg_in_q.get(timeout=1.0)
if chunk is None: break
if proc.stdin:
proc.stdin.write(chunk)
proc.stdin.flush()
except queue.Empty:
continue
except (BrokenPipeError, ConnectionResetError):
_transcoder_last_error = "Broken pipe in writer"
break
except Exception as e:
print(f"WARNING: Transcoder writer error: {e}")
_transcoder_last_error = str(e)
break
# Ensure process is killed if thread exits unexpectedly
if proc.poll() is None:
try: proc.terminate()
except: pass
print(f"[THREAD] Transcoder writer finished (PID: {proc.pid})")
def _reader(proc):
global _transcoder_bytes_out, _transcoder_last_error
print(f"[THREAD] Transcoder reader started (PID: {proc.pid})")
while proc.poll() is None:
try:
# Smaller read for smoother delivery (1KB)
# This prevents buffering delays at lower bitrates
data = proc.stdout.read(1024)
if not data: break
_transcoder_bytes_out += len(data)
with _mp3_lock:
_mp3_preroll.append(data)
clients = list(_mp3_clients)
for q in clients:
try:
q.put_nowait(data)
except queue.Full:
# Client is too slow, skip this chunk for them
pass
except Exception as e:
print(f"WARNING: Transcoder reader error: {e}")
_transcoder_last_error = str(e)
break
# Ensure process is killed if thread exits unexpectedly
if proc.poll() is None:
try: proc.terminate()
except: pass
print(f"[THREAD] Transcoder reader finished (PID: {proc.pid})")
# Start greenlets/threads for THIS process specifically
eventlet.spawn(_writer, _ffmpeg_proc)
eventlet.spawn(_reader, _ffmpeg_proc)
def _stop_transcoder():
global _ffmpeg_proc
print("STOPPING: Transcoder process")
# Signal threads to stop via the queue
try: _ffmpeg_in_q.put_nowait(None)
except: pass
# Shutdown the process
proc = _ffmpeg_proc
_ffmpeg_proc = None
if proc:
try:
proc.terminate()
# Drain stdout/stderr to satisfy OS buffers
proc.communicate(timeout=1.0)
except:
try: proc.kill()
except: pass
# Clear client state
with _mp3_lock:
clients = list(_mp3_clients)
for q in clients:
try: q.put_nowait(None)
except: pass
_mp3_clients.clear()
_mp3_preroll.clear()
def _feed_transcoder(data: bytes):
global _last_audio_chunk_ts
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
# If active but dead, restart it automatically (non-MP3 mode only)
if broadcast_state.get('active'):
_start_transcoder_if_needed(is_mp3_input=False)
else:
return
_last_audio_chunk_ts = time.time()
try:
_ffmpeg_in_q.put_nowait(data)
except queue.Full:
# Drop chunk if overflow to prevent memory bloat
pass
def _distribute_mp3(data: bytes):
"""Distribute MP3 bytes directly to preroll buffer and all connected listener
clients, bypassing the ffmpeg transcoder entirely.
Used when the DJ is already sending valid MP3 (e.g. the Qt desktop client)
to eliminate the unnecessary encode/decode round-trip and cut pipeline latency
roughly in half.
"""
global _transcoder_bytes_out, _last_audio_chunk_ts
_last_audio_chunk_ts = time.time()
_transcoder_bytes_out += len(data)
with _mp3_lock:
_mp3_preroll.append(data)
for q in list(_mp3_clients):
try:
q.put_nowait(data)
except queue.Full:
pass
# Load settings to get MUSIC_FOLDER
def _load_settings():
try:
if os.path.exists('settings.json'):
with open('settings.json', 'r', encoding='utf-8') as f:
return json.load(f)
except:
pass
return {}
SETTINGS = _load_settings()
_config_music = (CONFIG.get('music_folder') or '').strip()
MUSIC_FOLDER = _config_music or SETTINGS.get('library', {}).get('music_folder', 'music')
# Ensure music folder exists
if not os.path.exists(MUSIC_FOLDER):
try:
os.makedirs(MUSIC_FOLDER)
except:
# Fallback to default if custom path fails
MUSIC_FOLDER = "music"
if not os.path.exists(MUSIC_FOLDER):
os.makedirs(MUSIC_FOLDER)
# Helper for shared routes
def setup_shared_routes(app, index_file='index.html'):
@app.route('/library.json')
def get_library():
library = []
global MUSIC_FOLDER
if os.path.exists(MUSIC_FOLDER):
for root, dirs, files in os.walk(MUSIC_FOLDER):
for filename in sorted(files):
if filename.lower().endswith(('.mp3', '.m4a', '.wav', '.flac', '.ogg')):
rel_path = os.path.relpath(os.path.join(root, filename), MUSIC_FOLDER)
library.append({
"title": os.path.splitext(filename)[0],
"file": f"music_proxy/{rel_path}",
# Absolute path exposed for Tauri's asset protocol (convertFileSrc).
# The web fallback keeps using the music_proxy route above.
"absolutePath": os.path.abspath(os.path.join(root, filename))
})
break # Top-level only
return jsonify(library)
@app.route('/music_proxy/<path:filename>')
def music_proxy(filename):
return send_from_directory(MUSIC_FOLDER, filename)
@app.route('/browse_directories', methods=['GET'])
def browse_directories():
path = request.args.get('path', os.path.expanduser('~'))
try:
entries = []
if os.path.exists(path) and os.path.isdir(path):
# Add parent
parent = os.path.dirname(os.path.abspath(path))
entries.append({"name": "..", "path": parent, "isDir": True})
for item in sorted(os.listdir(path)):
full_path = os.path.join(path, item)
if os.path.isdir(full_path) and not item.startswith('.'):
entries.append({
"name": item,
"path": full_path,
"isDir": True
})
return jsonify({"success": True, "path": os.path.abspath(path), "entries": entries})
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/update_settings', methods=['POST'])
def update_settings():
try:
data = request.get_json()
# Load existing
settings = _load_settings()
# Update selectively
if 'library' not in settings: settings['library'] = {}
if 'music_folder' in data.get('library', {}):
new_folder = data['library']['music_folder']
if os.path.exists(new_folder) and os.path.isdir(new_folder):
settings['library']['music_folder'] = new_folder
global MUSIC_FOLDER
MUSIC_FOLDER = new_folder
else:
return jsonify({"success": False, "error": "Invalid folder path"}), 400
with open('settings.json', 'w', encoding='utf-8') as f:
json.dump(settings, f, indent=4)
return jsonify({"success": True})
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/<path:filename>')
def serve_static(filename):
# Prevent path traversal
if '..' in filename or filename.startswith('/'):
abort(403)
# Allow only known safe static file extensions
allowed_extensions = ('.css', '.js', '.html', '.htm', '.png', '.jpg', '.jpeg',
'.gif', '.svg', '.ico', '.woff', '.woff2', '.ttf', '.eot', '.map')
if not filename.endswith(allowed_extensions):
abort(403)
response = send_from_directory('.', filename)
if filename.endswith(('.css', '.js', '.html')):
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
return response
@app.route('/')
def index():
response = send_from_directory('.', index_file)
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({"success": False, "error": "No file provided"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"success": False, "error": "No file selected"}), 400
allowed_exts = ('.mp3', '.m4a', '.wav', '.flac', '.ogg')
ext = os.path.splitext(file.filename)[1].lower()
if ext not in allowed_exts:
return jsonify({"success": False, "error": f"Supported formats: {', '.join(allowed_exts)}"}), 400
# Sanitize filename (keep extension)
name_without_ext = os.path.splitext(file.filename)[0]
name_without_ext = re.sub(r'[^\w\s-]', '', name_without_ext)
name_without_ext = re.sub(r'\s+', ' ', name_without_ext).strip()
filename = f"{name_without_ext}{ext}"
filepath = os.path.join(MUSIC_FOLDER, filename)
if os.path.exists(filepath):
return jsonify({"success": False, "error": "File already exists in library"}), 409
try:
file.save(filepath)
print(f"UPLOADED: {filename}")
return jsonify({"success": True, "filename": filename})
except Exception as e:
print(f"ERROR: Upload error: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/save_keymaps', methods=['POST'])
def save_keymaps():
try:
data = request.get_json()
with open('keymaps.json', 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4)
print("SAVED: Keymaps saved to keymaps.json")
return jsonify({"success": True})
except Exception as e:
print(f"ERROR: Save keymaps error: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/load_keymaps', methods=['GET'])
def load_keymaps():
try:
if os.path.exists('keymaps.json'):
with open('keymaps.json', 'r', encoding='utf-8') as f:
data = json.load(f)
return jsonify({"success": True, "keymaps": data})
else:
return jsonify({"success": True, "keymaps": None})
except Exception as e:
print(f"ERROR: Load keymaps error: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/stream.mp3')
def stream_mp3():
"""Live MP3 audio stream from the ffmpeg transcoder."""
# If broadcast is not active, return 503 with audio/mpeg content type.
# Returning JSON here would confuse the browser's <audio> element.
if not broadcast_state.get('active'):
return Response(b'', status=503, content_type='audio/mpeg', headers={
'Retry-After': '5',
'Access-Control-Allow-Origin': '*',
'Cache-Control': 'no-cache, no-store',
})
# For non-MP3 input the server runs an ffmpeg transcoder; wait for it to start.
# For MP3 input (e.g. Qt client) chunks are distributed directly — no ffmpeg needed.
is_mp3_direct = broadcast_state.get('is_mp3_input', False)
if not is_mp3_direct:
waited = 0.0
while (_ffmpeg_proc is None or _ffmpeg_proc.poll() is not None) and waited < 5.0:
eventlet.sleep(0.5)
waited += 0.5
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
return Response(b'', status=503, content_type='audio/mpeg', headers={
'Retry-After': '3',
'Access-Control-Allow-Origin': '*',
'Cache-Control': 'no-cache, no-store',
})
preroll_count = len(_mp3_preroll)
print(f"LISTENER: New listener joined stream (Pre-roll: {preroll_count} chunks)")
client_q = queue.Queue(maxsize=500)
with _mp3_lock:
for chunk in _mp3_preroll:
try:
client_q.put_nowait(chunk)
except Exception:
break
_mp3_clients.add(client_q)
def gen():
idle_checks = 0
try:
while True:
try:
chunk = client_q.get(timeout=5)
idle_checks = 0
except queue.Empty:
if not broadcast_state.get('active'):
break
idle_checks += 1
if idle_checks >= 12: # 60s total with no audio data
break
continue
if chunk is None:
break
yield chunk
finally:
with _mp3_lock:
_mp3_clients.discard(client_q)
print("LISTENER: Listener disconnected from stream")
return Response(stream_with_context(gen()), content_type='audio/mpeg', headers={
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Expires': '0',
'X-Content-Type-Options': 'nosniff',
'Access-Control-Allow-Origin': '*',
'Icy-Name': 'TechDJ Live',
'X-Accel-Buffering': 'no', # Tell nginx/Cloudflare not to buffer this stream
})
@app.route('/stream_debug')
def stream_debug():
proc = _ffmpeg_proc
running = proc is not None and proc.poll() is None
return jsonify({
'broadcast_active': broadcast_state.get('active', False),
'broadcast_mimeType': broadcast_state.get('mimeType'),
'ffmpeg_running': running,
'ffmpeg_found': (proc is not None),
'mp3_clients': len(_mp3_clients),
'transcoder_bytes_out': _transcoder_bytes_out,
'transcoder_last_error': _transcoder_last_error,
'last_audio_chunk_ts': _last_audio_chunk_ts,
})
# === DJ SERVER ===
dj_app = Flask(__name__, static_folder='.', static_url_path='')
dj_app.config['SECRET_KEY'] = CONFIG_SECRET
dj_app.config['MAX_CONTENT_LENGTH'] = CONFIG_MAX_UPLOAD_MB * 1024 * 1024
setup_shared_routes(dj_app)
@dj_app.before_request
def _protect_dj_panel():
"""Optionally require a password for the DJ panel only (port 5000).
This does not affect the listener server (port 5001).
"""
if not DJ_AUTH_ENABLED:
return None
# Allow login/logout endpoints
if request.path in ('/login', '/logout'):
return None
# If already authenticated, allow
if session.get('dj_authed') is True:
return None
# Redirect everything else to login
return (
"<!doctype html><html><head><meta http-equiv='refresh' content='0; url=/login' /></head>"
"<body>Redirecting to <a href='/login'>/login</a>...</body></html>",
302,
{'Location': '/login'}
)
@dj_app.route('/login', methods=['GET', 'POST'])
def dj_login():
if not DJ_AUTH_ENABLED:
# If auth is disabled, just go to the panel.
session['dj_authed'] = True
return (
"<!doctype html><html><head><meta http-equiv='refresh' content='0; url=/' /></head>"
"<body>Auth disabled. Redirecting...</body></html>",
302,
{'Location': '/'}
)
error = None
if request.method == 'POST':
pw = (request.form.get('password') or '').strip()
if pw == DJ_PANEL_PASSWORD:
session['dj_authed'] = True
return (
"<!doctype html><html><head><meta http-equiv='refresh' content='0; url=/' /></head>"
"<body>Logged in. Redirecting...</body></html>",
302,
{'Location': '/'}
)
error = 'Invalid password'
# Minimal inline login page (no new assets)
return f"""<!doctype html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\" />
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1\" />
<title>TechDJ - DJ Login</title>
<style>
body {{ background:#0a0a12; color:#eee; font-family: system-ui, -apple-system, Segoe UI, Roboto, Arial; margin:0; }}
.wrap {{ min-height:100vh; display:flex; align-items:center; justify-content:center; padding:24px; }}
.card {{ width:100%; max-width:420px; background:rgba(10,10,20,0.85); border:2px solid #bc13fe; border-radius:16px; padding:24px; box-shadow:0 0 40px rgba(188,19,254,0.25); }}
h1 {{ margin:0 0 16px 0; font-size:22px; }}
label {{ display:block; margin:12px 0 8px; opacity:0.9; }}
input {{ width:100%; padding:12px; border-radius:10px; border:1px solid rgba(255,255,255,0.15); background:rgba(0,0,0,0.35); color:#fff; }}
button {{ width:100%; margin-top:14px; padding:12px; border-radius:10px; border:2px solid #bc13fe; background:rgba(188,19,254,0.15); color:#fff; font-weight:700; cursor:pointer; }}
.err {{ margin-top:12px; color:#ffb3ff; }}
.hint {{ margin-top:10px; font-size:12px; opacity:0.7; }}
</style>
</head>
<body>
<div class=\"wrap\">
<div class=\"card\">
<h1>DJ Panel Locked</h1>
<form method=\"post\" action=\"/login\">
<label for=\"password\">Password</label>
<input id=\"password\" name=\"password\" type=\"password\" autocomplete=\"current-password\" autofocus />
<button type=\"submit\">Unlock DJ Panel</button>
{f"<div class='err'>{error}</div>" if error else ""}
</form>
</div>
</div>
</body>
</html>"""
@dj_app.route('/logout')
def dj_logout():
session.pop('dj_authed', None)
return (
"<!doctype html><html><head><meta http-equiv='refresh' content='0; url=/login' /></head>"
"<body>Logged out. Redirecting...</body></html>",
302,
{'Location': '/login'}
)
@dj_app.route('/client_config')
def client_config():
"""Expose server-side config values needed by the DJ panel client."""
return jsonify({'listener_url': CONFIG_LISTENER_URL})
dj_socketio = SocketIO(
dj_app,
cors_allowed_origins=CONFIG_CORS,
async_mode='eventlet',
max_http_buffer_size=CONFIG_MAX_UPLOAD_MB * 1024 * 1024,
ping_timeout=60,
ping_interval=25,
logger=CONFIG_DEBUG,
engineio_logger=CONFIG_DEBUG
)
@dj_socketio.on('connect')
def dj_connect():
if DJ_AUTH_ENABLED and session.get('dj_authed') is not True:
print(f"REJECTED: DJ socket rejected (unauthorized): {request.sid}")
return False
print(f"STREAMPANEL: DJ connected: {request.sid}")
dj_sids.add(request.sid)
def _dj_disconnect_grace():
"""Auto-stop broadcast if no DJ reconnects within the grace period."""
global _dj_grace_greenlet
print(f"INFO: Grace period started — {DJ_GRACE_PERIOD_SECS}s for DJ to reconnect")
eventlet.sleep(DJ_GRACE_PERIOD_SECS)
if broadcast_state.get('active') and not dj_sids:
print("WARNING: Grace period expired, no DJ reconnected — auto-stopping broadcast")
broadcast_state['active'] = False
_stop_transcoder()
listener_socketio.emit('broadcast_stopped', namespace='/')
listener_socketio.emit('stream_status', {'active': False}, namespace='/')
_dj_grace_greenlet = None
@dj_socketio.on('disconnect')
def dj_disconnect():
global _dj_grace_greenlet
dj_sids.discard(request.sid)
print(f"WARNING: DJ disconnected ({request.sid}). Remaining DJs: {len(dj_sids)}")
# If broadcast is active and no other DJs remain, start the grace period
if broadcast_state.get('active') and not dj_sids:
if _dj_grace_greenlet is None:
_dj_grace_greenlet = eventlet.spawn(_dj_disconnect_grace)
elif not broadcast_state.get('active'):
# Nothing to do — no active broadcast
pass
else:
print("INFO: Other DJ(s) still connected — broadcast continues uninterrupted")
@dj_socketio.on('start_broadcast')
def dj_start(data=None):
global _dj_grace_greenlet
# Cancel any pending auto-stop grace period (DJ reconnected in time)
if _dj_grace_greenlet is not None:
_dj_grace_greenlet.kill()
_dj_grace_greenlet = None
print("INFO: DJ reconnected within grace period — broadcast continues")
was_already_active = broadcast_state.get('active', False)
broadcast_state['active'] = True
session['is_dj'] = True
print("BROADCAST: ACTIVE")
is_mp3_input = False
if data:
if 'bitrate' in data:
global _current_bitrate
_current_bitrate = data['bitrate']
print(f"BITRATE: Setting stream bitrate to: {_current_bitrate}")
if data.get('format') == 'mp3':
is_mp3_input = True
broadcast_state['is_mp3_input'] = is_mp3_input
if not was_already_active:
# Fresh broadcast start — clear pre-roll.
# For non-MP3 input start the ffmpeg transcoder; for MP3 input chunks are
# distributed directly via _distribute_mp3(), no transcoder required.
with _mp3_lock:
_mp3_preroll.clear()
if not is_mp3_input:
_start_transcoder_if_needed(is_mp3_input=False)
# Tell listeners a new broadcast has begun (triggers audio player reload)
listener_socketio.emit('broadcast_started', namespace='/')
else:
# DJ reconnected mid-broadcast - just ensure transcoder is alive (non-MP3 only)
# Do NOT clear pre-roll or trigger listener reload
print("BROADCAST: DJ reconnected - resuming existing broadcast")
if not is_mp3_input:
_start_transcoder_if_needed(is_mp3_input=False)
# Always send current status so any waiting listeners get unblocked
listener_socketio.emit('stream_status', {'active': True}, namespace='/')
@dj_socketio.on('get_listener_count')
def dj_get_listener_count():
emit('listener_count', {'count': len(listener_sids)})
@dj_socketio.on('listener_glow')
def dj_listener_glow(data):
"""DJ sets the glow intensity on the listener page."""
intensity = int(data.get('intensity', 30)) if isinstance(data, dict) else 30
intensity = max(0, min(100, intensity))
listener_socketio.emit('listener_glow', {'intensity': intensity}, namespace='/')
@dj_socketio.on('deck_glow')
def dj_deck_glow(data):
"""Relay which decks are playing so the listener page can mirror the glow colour."""
if not isinstance(data, dict):
return
listener_socketio.emit('deck_glow', {
'A': bool(data.get('A', False)),
'B': bool(data.get('B', False)),
}, namespace='/')
@dj_socketio.on('stop_broadcast')
def dj_stop():
broadcast_state['active'] = False
session['is_dj'] = False
print("STOPPED: DJ stopped broadcasting")
_stop_transcoder()
listener_socketio.emit('broadcast_stopped', namespace='/')
listener_socketio.emit('stream_status', {'active': False}, namespace='/')
@dj_socketio.on('audio_chunk')
def dj_audio(data):
if broadcast_state['active'] and isinstance(data, (bytes, bytearray)):
if broadcast_state.get('is_mp3_input', False):
# MP3 input (e.g. Qt client): skip ffmpeg, send directly to listeners
_distribute_mp3(bytes(data))
else:
# Other formats (e.g. webm/opus from browser): route through ffmpeg transcoder
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
_start_transcoder_if_needed(is_mp3_input=False)
_feed_transcoder(bytes(data))
# === LISTENER SERVER ===
# static_folder=None prevents Flask's built-in static handler from serving
# DJ files (like index.html) at /<path> — all static files go through our
# custom serve_static route which has security checks.
listener_app = Flask(__name__, static_folder=None)
listener_app.config['SECRET_KEY'] = CONFIG_SECRET + '_listener'
listener_app.config['MAX_CONTENT_LENGTH'] = CONFIG_MAX_UPLOAD_MB * 1024 * 1024
setup_shared_routes(listener_app, index_file='listener.html')
# Block write/admin endpoints AND DJ-only files on the listener server
@listener_app.before_request
def _restrict_listener_routes():
"""Prevent listeners from accessing DJ-only endpoints and files."""
blocked_paths = ('/update_settings', '/upload', '/save_keymaps', '/browse_directories')
if request.path in blocked_paths:
abort(403)
# Block DJ-only files — prevents serving the DJ panel even via direct URL
dj_only_files = ('/index.html', '/script.js', '/style.css')
if request.path in dj_only_files:
abort(403)
listener_socketio = SocketIO(
listener_app,
cors_allowed_origins=CONFIG_CORS,
async_mode='eventlet',
max_http_buffer_size=CONFIG_MAX_UPLOAD_MB * 1024 * 1024,
# Lower timeouts: stale connections detected in ~25s instead of ~85s
# ping_interval: how often to probe (seconds)
# ping_timeout: how long to wait for pong before declaring dead
ping_timeout=15,
ping_interval=10,
logger=CONFIG_DEBUG,
engineio_logger=CONFIG_DEBUG
)
def _broadcast_listener_count():
"""Compute the most accurate listener count and broadcast to both panels.
Uses the larger of:
- listener_sids: Socket.IO connections (people with the page open)
- _mp3_clients: active /stream.mp3 HTTP connections (people actually hearing audio)
Taking the max avoids undercounting when someone hasn't clicked Enable Audio
yet, and also avoids undercounting direct stream URL listeners (e.g. VLC).
"""
with _mp3_lock:
stream_count = len(_mp3_clients)
count = max(len(listener_sids), stream_count)
listener_socketio.emit('listener_count', {'count': count}, namespace='/')
dj_socketio.emit('listener_count', {'count': count}, namespace='/')
return count
@listener_socketio.on('connect')
def listener_connect():
# Count immediately on connect — don't wait for join_listener
listener_sids.add(request.sid)
count = _broadcast_listener_count()
print(f"LISTENER: Connected {request.sid}. Total: {count}")
@listener_socketio.on('disconnect')
def listener_disconnect():
listener_sids.discard(request.sid)
count = _broadcast_listener_count()
print(f"REMOVED: Listener left {request.sid}. Total: {count}")
@listener_socketio.on('join_listener')
def listener_join():
# SID already added in listener_connect(); just send stream status back
emit('stream_status', {'active': broadcast_state['active']})
@listener_socketio.on('get_listener_count')
def listener_get_count():
emit('listener_count', {'count': len(listener_sids)})
# DJ Panel Routes (No engine commands needed in local mode)
def _transcoder_watchdog():
"""Periodic check to ensure the ffmpeg transcoder stays alive.
Only applies to non-MP3 input; MP3 input (Qt client) is distributed directly.
"""
while True:
is_mp3_direct = broadcast_state.get('is_mp3_input', False)
if broadcast_state.get('active') and not is_mp3_direct:
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
print("WARNING: Watchdog: Transcoder dead during active broadcast, reviving...")
_start_transcoder_if_needed(is_mp3_input=False)
eventlet.sleep(5)
def _listener_count_sync_loop():
"""Periodic reconciliation — catches any edge cases where connect/disconnect
events were missed (e.g. server under load, eventlet greenlet delays)."""
while True:
eventlet.sleep(5)
_broadcast_listener_count()
if __name__ == '__main__':
print("=" * 50)
print("TECHDJ PRO - DUAL PORT ARCHITECTURE")
print("=" * 50)
print(f"HOST: {CONFIG_HOST}")
print(f"URL: DJ PANEL -> http://{CONFIG_HOST}:{CONFIG_DJ_PORT}")
print(f"URL: LISTENER -> http://{CONFIG_HOST}:{CONFIG_LISTENER_PORT}")
if DJ_AUTH_ENABLED:
print("AUTH: DJ panel password ENABLED")
else:
print("AUTH: DJ panel password DISABLED")
print(f"DEBUG: {CONFIG_DEBUG}")
print("=" * 50)
print(f"READY: Server ready on {CONFIG_HOST}:{CONFIG_DJ_PORT} & {CONFIG_HOST}:{CONFIG_LISTENER_PORT}")
# Run both servers using eventlet's spawn
eventlet.spawn(_listener_count_sync_loop)
eventlet.spawn(_transcoder_watchdog)
eventlet.spawn(dj_socketio.run, dj_app, host=CONFIG_HOST, port=CONFIG_DJ_PORT, debug=False)
listener_socketio.run(listener_app, host=CONFIG_HOST, port=CONFIG_LISTENER_PORT, debug=False)