techdj/server.py

1283 lines
48 KiB
Python

# Monkey patch MUST be first - before any other imports!
import eventlet
eventlet.monkey_patch()
import os
import json
import re
import subprocess
import threading
import queue
import time
import collections
import hmac
from flask import Flask, send_from_directory, jsonify, request, session, Response, stream_with_context, abort, render_template, make_response
from flask_socketio import SocketIO, emit
from dotenv import load_dotenv
load_dotenv()
def _load_config():
"""Loads optional config.json from the project root.
If missing or invalid, returns an empty dict.
"""
try:
with open('config.json', 'r', encoding='utf-8') as f:
data = json.load(f)
return data if isinstance(data, dict) else {}
except FileNotFoundError:
return {}
except Exception:
return {}
CONFIG = _load_config()
# --- Config-driven settings (config.json > env vars > defaults) ---
CONFIG_HOST = (CONFIG.get('host') or os.environ.get('HOST') or '0.0.0.0').strip()
CONFIG_DJ_PORT = int(CONFIG.get('dj_port') or os.environ.get('DJ_PORT') or 5000)
CONFIG_LISTENER_PORT = int(CONFIG.get('listener_port') or os.environ.get('LISTEN_PORT') or 5001)
CONFIG_SECRET = (CONFIG.get('secret_key') or '').strip() or 'dj_panel_secret'
CONFIG_CORS = CONFIG.get('cors_origins', '*')
CONFIG_MAX_UPLOAD_MB = int(CONFIG.get('max_upload_mb') or 500)
CONFIG_DEBUG = bool(CONFIG.get('debug', False))
CONFIG_LISTENER_URL = (CONFIG.get('listener_url') or '').strip()
# MediaMTX / SRT integration
# secret shared with MediaMTX webhook requests (leave blank to disable auth)
_SRT_WEBHOOK_SECRET = (CONFIG.get('mediamtx_webhook_secret') or '').strip()
# SRT ingest port MediaMTX listens on (Flask does not bind this — configure in mediamtx.yml)
_MEDIAMTX_SRT_PORT = int(CONFIG.get('mediamtx_srt_port') or 8890)
# RTSP URL MediaMTX exposes for the incoming SRT path (used by ffmpeg to pull audio)
_MEDIAMTX_RTSP_URL = (CONFIG.get('mediamtx_rtsp_url') or 'rtsp://127.0.0.1:8554/live').strip()
_MEDIAMTX_HLS_URL = (CONFIG.get('mediamtx_hls_url') or 'http://techy.music:8888/aussie_dj/index.m3u8').strip()
DJ_PANEL_PASSWORD = (CONFIG.get('dj_panel_password') or '').strip()
DJ_AUTH_ENABLED = bool(DJ_PANEL_PASSWORD)
# Broadcast State
broadcast_state = {
'active': False,
}
# SRT stream state — updated by the /api/webhook MediaMTX callback
_srt_state = {
'active': False,
'path': None,
'source_id': None,
'started_at': None,
}
# ffmpeg process that pulls audio from MediaMTX's RTSP output for an SRT session
_srt_ffmpeg_proc = None
listener_sids = set()
dj_sids = set()
# Set to True once the first audio chunk arrives in MP3-direct mode.
# broadcast_started is held back until then so listeners don't connect
# to /stream.mp3 before any data is flowing (e.g. before the DJ presses play).
_mp3_broadcast_announced = False
# Current listener glow intensity — persisted so new/reloaded listeners get the
# correct level immediately without waiting for the DJ to move the slider again.
_current_glow_intensity = 30
# Grace-period greenlet: auto-stop broadcast if DJ doesn't reconnect in time
_dj_grace_greenlet = None
DJ_GRACE_PERIOD_SECS = 20 # seconds to wait before auto-stopping
# === Optional MP3 fallback stream (server-side transcoding) ===
_ffmpeg_proc = None
_ffmpeg_in_q = queue.Queue(maxsize=200)
_current_bitrate = (CONFIG.get('stream_bitrate') or '192k').strip()
_mp3_clients = set() # set[queue.Queue]
_mp3_lock = threading.Lock()
_transcoder_bytes_out = 0
_transcoder_last_error = None
_last_audio_chunk_ts = 0.0
_mp3_preroll = collections.deque() # byte-budget FIFO; evicted from the front when over limit
_mp3_preroll_bytes = 0
_MP3_PREROLL_MAX_BYTES = 256 * 1024 # 256 KB ≈ 10 s at 192 kbps — enough for smooth reconnects
def _preroll_append(data: bytes) -> None:
"""Add a chunk to the pre-roll buffer, evicting the oldest data from the
front when the byte budget is exceeded.
MUST be called with _mp3_lock already held.
"""
global _mp3_preroll_bytes
_mp3_preroll.append(data)
_mp3_preroll_bytes += len(data)
while _mp3_preroll_bytes > _MP3_PREROLL_MAX_BYTES and _mp3_preroll:
_mp3_preroll_bytes -= len(_mp3_preroll.popleft())
def _preroll_clear() -> None:
"""Empty the pre-roll buffer and reset its byte counter.
MUST be called with _mp3_lock already held.
"""
global _mp3_preroll_bytes
_mp3_preroll.clear()
_mp3_preroll_bytes = 0
def _start_transcoder_if_needed(is_mp3_input=False):
global _ffmpeg_proc, _transcoder_last_error
# If already running, check if we need to restart
if _ffmpeg_proc is not None and _ffmpeg_proc.poll() is None:
return
# Ensure stale process is cleaned up
if _ffmpeg_proc:
try: _ffmpeg_proc.terminate()
except: pass
_ffmpeg_proc = None
codec = 'copy' if is_mp3_input else 'libmp3lame'
cmd = [
'ffmpeg',
'-hide_banner',
'-loglevel', 'error',
'-fflags', 'nobuffer',
'-flags', 'low_delay',
'-probesize', '32',
'-analyzeduration', '0'
]
if is_mp3_input:
cmd.extend(['-f', 'mp3'])
cmd.extend([
'-i', 'pipe:0',
'-vn',
'-acodec', codec,
])
if not is_mp3_input:
cmd.extend(['-b:a', _current_bitrate])
cmd.extend([
'-flush_packets', '1',
'-f', 'mp3',
'pipe:1',
])
try:
_ffmpeg_proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
)
except FileNotFoundError:
_ffmpeg_proc = None
print('WARNING: ffmpeg not found; /stream.mp3 fallback disabled')
return
mode_str = "PASSTHROUGH (copy)" if is_mp3_input else f"TRANSCODE ({_current_bitrate})"
print(f'INFO: ffmpeg transcoder started ({mode_str})')
_transcoder_last_error = None
# Clear queue to avoid stale data
while not _ffmpeg_in_q.empty():
try: _ffmpeg_in_q.get_nowait()
except: break
# Define greenlets INSIDE so they close over THIS specific 'proc'.
# Blocking subprocess pipe I/O is delegated to eventlet.tpool so it runs
# in a real OS thread, preventing it from stalling the eventlet hub.
def _writer(proc):
global _transcoder_last_error
print(f"[THREAD] Transcoder writer started (PID: {proc.pid})")
while proc.poll() is None:
try:
chunk = _ffmpeg_in_q.get(timeout=1.0)
if chunk is None: break
if proc.stdin:
proc.stdin.write(chunk)
proc.stdin.flush()
except queue.Empty:
continue
except (BrokenPipeError, ConnectionResetError):
_transcoder_last_error = "Broken pipe in writer"
break
except Exception as e:
print(f"WARNING: Transcoder writer error: {e}")
_transcoder_last_error = str(e)
break
# Ensure process is killed if greenlet exits unexpectedly
if proc.poll() is None:
try: proc.terminate()
except: pass
print(f"[THREAD] Transcoder writer finished (PID: {proc.pid})")
def _reader(proc):
global _transcoder_bytes_out, _transcoder_last_error
print(f"[THREAD] Transcoder reader started (PID: {proc.pid})")
while proc.poll() is None:
try:
data = proc.stdout.read(1024)
if not data: break
_transcoder_bytes_out += len(data)
with _mp3_lock:
_preroll_append(data)
clients = list(_mp3_clients)
for q in clients:
try:
q.put_nowait(data)
except queue.Full:
# Client is too slow — skip this chunk for them
pass
except Exception as e:
print(f"WARNING: Transcoder reader error: {e}")
_transcoder_last_error = str(e)
break
# Ensure process is killed if greenlet exits unexpectedly
if proc.poll() is None:
try: proc.terminate()
except: pass
print(f"[THREAD] Transcoder reader finished (PID: {proc.pid})")
# Spawn as greenlets — tpool handles the blocking subprocess I/O internally
eventlet.spawn(_writer, _ffmpeg_proc)
eventlet.spawn(_reader, _ffmpeg_proc)
def _stop_transcoder():
global _ffmpeg_proc
print("STOPPING: Transcoder process")
proc = _ffmpeg_proc
_ffmpeg_proc = None # Null first so _feed_transcoder stops enqueuing
# Drain stale input chunks so the None sentinel is guaranteed to fit and
# the writer greenlet exits on its next get() instead of stalling.
while True:
try: _ffmpeg_in_q.get_nowait()
except queue.Empty: break
try: _ffmpeg_in_q.put_nowait(None)
except queue.Full: pass
if proc:
try:
proc.terminate()
proc.communicate(timeout=1.0)
except:
try: proc.kill()
except: pass
with _mp3_lock:
clients = list(_mp3_clients)
for q in clients:
try: q.put_nowait(None)
except: pass
_mp3_clients.clear()
_preroll_clear()
# === SRT / MediaMTX transcoder ===
# When MediaMTX receives the incoming SRT stream it exposes it as an RTSP endpoint.
# We launch a dedicated ffmpeg process that *pulls* from that RTSP URL and feeds the
# same _mp3_clients / _mp3_preroll distribution system used by all other stream modes.
def _start_srt_transcoder():
"""Start an ffmpeg process that pulls from MediaMTX's RTSP output."""
global _srt_ffmpeg_proc, _transcoder_last_error
if _srt_ffmpeg_proc is not None and _srt_ffmpeg_proc.poll() is None:
return # Already running
if _srt_ffmpeg_proc:
try: _srt_ffmpeg_proc.terminate()
except: pass
_srt_ffmpeg_proc = None
cmd = [
'ffmpeg',
'-hide_banner', '-loglevel', 'error',
'-fflags', 'nobuffer', '-flags', 'low_delay',
'-rtsp_transport', 'tcp',
'-i', _MEDIAMTX_RTSP_URL,
'-vn',
'-acodec', 'libmp3lame',
'-b:a', _current_bitrate,
'-flush_packets', '1',
'-f', 'mp3', 'pipe:1',
]
try:
_srt_ffmpeg_proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
)
except FileNotFoundError:
_srt_ffmpeg_proc = None
print('WARNING: ffmpeg not found; SRT→MP3 bridge disabled')
return
print(f'INFO: SRT transcoder started — pulling from {_MEDIAMTX_RTSP_URL} @ {_current_bitrate}')
_transcoder_last_error = None
def _srt_reader(proc):
global _transcoder_bytes_out, _transcoder_last_error
print(f'[THREAD] SRT reader started (PID: {proc.pid})')
while proc.poll() is None:
try:
data = proc.stdout.read(4096)
if not data:
break
_transcoder_bytes_out += len(data)
with _mp3_lock:
_preroll_append(data)
clients = list(_mp3_clients)
for q in clients:
try:
q.put_nowait(data)
except queue.Full:
pass
except Exception as e:
print(f'WARNING: SRT reader error: {e}')
_transcoder_last_error = str(e)
break
# Log any ffmpeg stderr output to help diagnose failures
try:
err = proc.stderr.read().decode(errors='replace').strip()
if err:
print(f'[FFMPEG SRT] {err}')
except Exception:
pass
if proc.poll() is None:
try: proc.terminate()
except: pass
print(f'[THREAD] SRT reader finished (PID: {proc.pid})')
eventlet.spawn(_srt_reader, _srt_ffmpeg_proc)
def _stop_srt_transcoder():
"""Terminate the SRT→MP3 ffmpeg process and flush all listener queues."""
global _srt_ffmpeg_proc
print('STOPPING: SRT transcoder')
proc = _srt_ffmpeg_proc
_srt_ffmpeg_proc = None
if proc:
try:
proc.terminate()
proc.communicate(timeout=2.0)
except Exception:
try: proc.kill()
except: pass
with _mp3_lock:
clients = list(_mp3_clients)
for q in clients:
try: q.put_nowait(None)
except: pass
_mp3_clients.clear()
_preroll_clear()
def _feed_transcoder(data: bytes):
global _last_audio_chunk_ts
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
# If active but dead, restart it automatically (non-MP3 mode only)
if broadcast_state.get('active'):
_start_transcoder_if_needed(is_mp3_input=False)
else:
return
_last_audio_chunk_ts = time.time()
try:
_ffmpeg_in_q.put_nowait(data)
except queue.Full:
# Drop chunk if overflow to prevent memory bloat
pass
def _distribute_mp3(data: bytes):
"""Distribute MP3 bytes directly to preroll buffer and all connected listener
clients, bypassing the ffmpeg transcoder entirely.
Used when the DJ is already sending valid MP3 (e.g. the Qt desktop client)
to eliminate the unnecessary encode/decode round-trip and cut pipeline latency
roughly in half.
"""
global _transcoder_bytes_out, _last_audio_chunk_ts
_last_audio_chunk_ts = time.time()
_transcoder_bytes_out += len(data)
with _mp3_lock:
_preroll_append(data)
for q in list(_mp3_clients):
try:
q.put_nowait(data)
except queue.Full:
pass
# Load settings to get MUSIC_FOLDER
def _load_settings():
try:
if os.path.exists('settings.json'):
with open('settings.json', 'r', encoding='utf-8') as f:
return json.load(f)
except:
pass
return {}
SETTINGS = _load_settings()
_config_music = (CONFIG.get('music_folder') or '').strip()
MUSIC_FOLDER = _config_music or SETTINGS.get('library', {}).get('music_folder', 'music')
# Ensure music folder exists
if not os.path.exists(MUSIC_FOLDER):
try:
os.makedirs(MUSIC_FOLDER)
except:
# Fallback to default if custom path fails
MUSIC_FOLDER = "music"
if not os.path.exists(MUSIC_FOLDER):
os.makedirs(MUSIC_FOLDER)
# Helper for shared routes
def setup_shared_routes(app, index_file='index.html'):
@app.route('/library.json')
def get_library():
library = []
global MUSIC_FOLDER
if os.path.exists(MUSIC_FOLDER):
for root, dirs, files in os.walk(MUSIC_FOLDER):
for filename in sorted(files):
if filename.lower().endswith(('.mp3', '.m4a', '.wav', '.flac', '.ogg')):
rel_path = os.path.relpath(os.path.join(root, filename), MUSIC_FOLDER)
library.append({
"title": os.path.splitext(filename)[0],
"file": f"music_proxy/{rel_path}",
# Absolute path exposed for Tauri's asset protocol (convertFileSrc).
# The web fallback keeps using the music_proxy route above.
"absolutePath": os.path.abspath(os.path.join(root, filename))
})
break # Top-level only
return jsonify(library)
@app.route('/music_proxy/<path:filename>')
def music_proxy(filename):
return send_from_directory(MUSIC_FOLDER, filename)
@app.route('/browse_directories', methods=['GET'])
def browse_directories():
path = request.args.get('path', os.path.expanduser('~'))
try:
entries = []
if os.path.exists(path) and os.path.isdir(path):
# Add parent
parent = os.path.dirname(os.path.abspath(path))
entries.append({"name": "..", "path": parent, "isDir": True})
for item in sorted(os.listdir(path)):
full_path = os.path.join(path, item)
if os.path.isdir(full_path) and not item.startswith('.'):
entries.append({
"name": item,
"path": full_path,
"isDir": True
})
return jsonify({"success": True, "path": os.path.abspath(path), "entries": entries})
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/update_settings', methods=['POST'])
def update_settings():
try:
data = request.get_json()
# Load existing
settings = _load_settings()
# Update selectively
if 'library' not in settings: settings['library'] = {}
if 'music_folder' in data.get('library', {}):
new_folder = data['library']['music_folder']
if os.path.exists(new_folder) and os.path.isdir(new_folder):
settings['library']['music_folder'] = new_folder
global MUSIC_FOLDER
MUSIC_FOLDER = new_folder
else:
return jsonify({"success": False, "error": "Invalid folder path"}), 400
with open('settings.json', 'w', encoding='utf-8') as f:
json.dump(settings, f, indent=4)
return jsonify({"success": True})
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/<path:filename>')
def serve_static(filename):
# Prevent path traversal
if '..' in filename or filename.startswith('/'):
abort(403)
# Allow only known safe static file extensions
allowed_extensions = ('.css', '.js', '.html', '.htm', '.png', '.jpg', '.jpeg',
'.gif', '.svg', '.ico', '.woff', '.woff2', '.ttf', '.eot', '.map')
if not filename.endswith(allowed_extensions):
abort(403)
response = send_from_directory('.', filename)
if filename.endswith(('.css', '.js', '.html')):
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
return response
@app.route('/')
def index():
response = send_from_directory('.', index_file)
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({"success": False, "error": "No file provided"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"success": False, "error": "No file selected"}), 400
allowed_exts = ('.mp3', '.m4a', '.wav', '.flac', '.ogg')
ext = os.path.splitext(file.filename)[1].lower()
if ext not in allowed_exts:
return jsonify({"success": False, "error": f"Supported formats: {', '.join(allowed_exts)}"}), 400
# Sanitize filename (keep extension)
name_without_ext = os.path.splitext(file.filename)[0]
name_without_ext = re.sub(r'[^\w\s-]', '', name_without_ext)
name_without_ext = re.sub(r'\s+', ' ', name_without_ext).strip()
filename = f"{name_without_ext}{ext}"
filepath = os.path.join(MUSIC_FOLDER, filename)
if os.path.exists(filepath):
return jsonify({"success": False, "error": "File already exists in library"}), 409
try:
file.save(filepath)
print(f"UPLOADED: {filename}")
return jsonify({"success": True, "filename": filename})
except Exception as e:
print(f"ERROR: Upload error: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/save_keymaps', methods=['POST'])
def save_keymaps():
try:
data = request.get_json()
with open('keymaps.json', 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4)
print("SAVED: Keymaps saved to keymaps.json")
return jsonify({"success": True})
except Exception as e:
print(f"ERROR: Save keymaps error: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/load_keymaps', methods=['GET'])
def load_keymaps():
try:
if os.path.exists('keymaps.json'):
with open('keymaps.json', 'r', encoding='utf-8') as f:
data = json.load(f)
return jsonify({"success": True, "keymaps": data})
else:
return jsonify({"success": True, "keymaps": None})
except Exception as e:
print(f"ERROR: Load keymaps error: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/stream.mp3')
def stream_mp3():
"""Live MP3 audio stream from the ffmpeg transcoder."""
# If broadcast is not active, return 503 with audio/mpeg content type.
# Returning JSON here would confuse the browser's <audio> element.
if not broadcast_state.get('active'):
return Response(b'', status=503, content_type='audio/mpeg', headers={
'Retry-After': '5',
'Access-Control-Allow-Origin': '*',
'Cache-Control': 'no-cache, no-store',
})
# For non-MP3 input the server runs an ffmpeg transcoder; wait for it to start.
# For MP3 input (e.g. Qt client) chunks are distributed directly — no ffmpeg needed.
is_mp3_direct = broadcast_state.get('is_mp3_input', False)
if not is_mp3_direct:
waited = 0.0
while (_ffmpeg_proc is None or _ffmpeg_proc.poll() is not None) and waited < 5.0:
eventlet.sleep(0.5)
waited += 0.5
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
return Response(b'', status=503, content_type='audio/mpeg', headers={
'Retry-After': '3',
'Access-Control-Allow-Origin': '*',
'Cache-Control': 'no-cache, no-store',
})
client_q = queue.Queue(maxsize=500)
with _mp3_lock:
preroll_bytes = _mp3_preroll_bytes
for chunk in _mp3_preroll:
try:
client_q.put_nowait(chunk)
except Exception:
break
_mp3_clients.add(client_q)
print(f"LISTENER: New listener joined stream (Pre-roll: {preroll_bytes} bytes)")
def gen():
idle_checks = 0
try:
while True:
try:
chunk = client_q.get(timeout=5)
idle_checks = 0
except queue.Empty:
if not broadcast_state.get('active'):
break
idle_checks += 1
if idle_checks >= 12: # 60s total with no audio data
break
continue
if chunk is None:
break
yield chunk
finally:
with _mp3_lock:
_mp3_clients.discard(client_q)
print("LISTENER: Listener disconnected from stream")
return Response(stream_with_context(gen()), content_type='audio/mpeg', headers={
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Expires': '0',
'X-Content-Type-Options': 'nosniff',
'Access-Control-Allow-Origin': '*',
'Icy-Name': 'TechDJ Live',
'X-Accel-Buffering': 'no', # Tell nginx/Cloudflare not to buffer this stream
})
@app.route('/stream_debug')
def stream_debug():
proc = _ffmpeg_proc
running = proc is not None and proc.poll() is None
return jsonify({
'broadcast_active': broadcast_state.get('active', False),
'broadcast_mimeType': broadcast_state.get('mimeType'),
'ffmpeg_running': running,
'ffmpeg_found': (proc is not None),
'mp3_clients': len(_mp3_clients),
'preroll_chunks': len(_mp3_preroll),
'preroll_bytes': _mp3_preroll_bytes,
'transcoder_bytes_out': _transcoder_bytes_out,
'transcoder_last_error': _transcoder_last_error,
'last_audio_chunk_ts': _last_audio_chunk_ts,
})
# === DJ SERVER ===
dj_app = Flask(__name__, static_folder='.', static_url_path='')
dj_app.config['SECRET_KEY'] = CONFIG_SECRET
dj_app.config['MAX_CONTENT_LENGTH'] = CONFIG_MAX_UPLOAD_MB * 1024 * 1024
setup_shared_routes(dj_app)
@dj_app.before_request
def _protect_dj_panel():
"""Optionally require a password for the DJ panel only (port 5000).
This does not affect the listener server (port 5001).
"""
if not DJ_AUTH_ENABLED:
return None
# Allow login/logout endpoints
if request.path in ('/login', '/logout'):
return None
# If already authenticated, allow
if session.get('dj_authed') is True:
return None
# Redirect everything else to login
return (
"<!doctype html><html><head><meta http-equiv='refresh' content='0; url=/login' /></head>"
"<body>Redirecting to <a href='/login'>/login</a>...</body></html>",
302,
{'Location': '/login'}
)
@dj_app.route('/login', methods=['GET', 'POST'])
def dj_login():
if not DJ_AUTH_ENABLED:
# If auth is disabled, just go to the panel.
session['dj_authed'] = True
return (
"<!doctype html><html><head><meta http-equiv='refresh' content='0; url=/' /></head>"
"<body>Auth disabled. Redirecting...</body></html>",
302,
{'Location': '/'}
)
error = None
if request.method == 'POST':
pw = (request.form.get('password') or '').strip()
if pw == DJ_PANEL_PASSWORD:
session['dj_authed'] = True
return (
"<!doctype html><html><head><meta http-equiv='refresh' content='0; url=/' /></head>"
"<body>Logged in. Redirecting...</body></html>",
302,
{'Location': '/'}
)
error = 'Invalid password'
# Minimal inline login page (no new assets)
return f"""<!doctype html>
<html lang=\"en\">
<head>
<meta charset=\"utf-8\" />
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1\" />
<title>TechDJ - DJ Login</title>
<style>
body {{ background:#0a0a12; color:#eee; font-family: system-ui, -apple-system, Segoe UI, Roboto, Arial; margin:0; }}
.wrap {{ min-height:100vh; display:flex; align-items:center; justify-content:center; padding:24px; }}
.card {{ width:100%; max-width:420px; background:rgba(10,10,20,0.85); border:2px solid #bc13fe; border-radius:16px; padding:24px; box-shadow:0 0 40px rgba(188,19,254,0.25); }}
h1 {{ margin:0 0 16px 0; font-size:22px; }}
label {{ display:block; margin:12px 0 8px; opacity:0.9; }}
input {{ width:100%; padding:12px; border-radius:10px; border:1px solid rgba(255,255,255,0.15); background:rgba(0,0,0,0.35); color:#fff; }}
button {{ width:100%; margin-top:14px; padding:12px; border-radius:10px; border:2px solid #bc13fe; background:rgba(188,19,254,0.15); color:#fff; font-weight:700; cursor:pointer; }}
.err {{ margin-top:12px; color:#ffb3ff; }}
.hint {{ margin-top:10px; font-size:12px; opacity:0.7; }}
</style>
</head>
<body>
<div class=\"wrap\">
<div class=\"card\">
<h1>DJ Panel Locked</h1>
<form method=\"post\" action=\"/login\">
<label for=\"password\">Password</label>
<input id=\"password\" name=\"password\" type=\"password\" autocomplete=\"current-password\" autofocus />
<button type=\"submit\">Unlock DJ Panel</button>
{f"<div class='err'>{error}</div>" if error else ""}
</form>
</div>
</div>
</body>
</html>"""
@dj_app.route('/logout')
def dj_logout():
session.pop('dj_authed', None)
return (
"<!doctype html><html><head><meta http-equiv='refresh' content='0; url=/login' /></head>"
"<body>Logged out. Redirecting...</body></html>",
302,
{'Location': '/login'}
)
@dj_app.route('/client_config')
def client_config():
"""Expose server-side config values needed by the DJ panel client."""
return jsonify({'listener_url': CONFIG_LISTENER_URL})
dj_socketio = SocketIO(
dj_app,
cors_allowed_origins=CONFIG_CORS,
async_mode='eventlet',
max_http_buffer_size=CONFIG_MAX_UPLOAD_MB * 1024 * 1024,
ping_timeout=60,
ping_interval=25,
logger=CONFIG_DEBUG,
engineio_logger=CONFIG_DEBUG
)
@dj_socketio.on('connect')
def dj_connect():
if DJ_AUTH_ENABLED and session.get('dj_authed') is not True:
print(f"REJECTED: DJ socket rejected (unauthorized): {request.sid}")
return False
print(f"STREAMPANEL: DJ connected: {request.sid}")
dj_sids.add(request.sid)
def _dj_disconnect_grace():
"""Auto-stop broadcast if no DJ reconnects within the grace period."""
global _dj_grace_greenlet, _mp3_broadcast_announced
print(f"INFO: Grace period started — {DJ_GRACE_PERIOD_SECS}s for DJ to reconnect")
eventlet.sleep(DJ_GRACE_PERIOD_SECS)
if broadcast_state.get('active') and not dj_sids:
print("WARNING: Grace period expired, no DJ reconnected — auto-stopping broadcast")
broadcast_state['active'] = False
broadcast_state['is_mp3_input'] = False
_mp3_broadcast_announced = False
_stop_transcoder()
listener_socketio.emit('broadcast_stopped', namespace='/')
listener_socketio.emit('stream_status', {'active': False}, namespace='/')
_dj_grace_greenlet = None
@dj_socketio.on('disconnect')
def dj_disconnect():
global _dj_grace_greenlet
dj_sids.discard(request.sid)
print(f"WARNING: DJ disconnected ({request.sid}). Remaining DJs: {len(dj_sids)}")
# If broadcast is active and no other DJs remain, start the grace period
if broadcast_state.get('active') and not dj_sids:
if _dj_grace_greenlet is None:
_dj_grace_greenlet = eventlet.spawn(_dj_disconnect_grace)
elif not broadcast_state.get('active'):
# Nothing to do — no active broadcast
pass
else:
print("INFO: Other DJ(s) still connected — broadcast continues uninterrupted")
@dj_socketio.on('start_broadcast')
def dj_start(data=None):
global _dj_grace_greenlet, _mp3_broadcast_announced
# Cancel any pending auto-stop grace period (DJ reconnected in time)
if _dj_grace_greenlet is not None:
_dj_grace_greenlet.kill(block=True) # block=True prevents a GreenletExit race
_dj_grace_greenlet = None
print("INFO: DJ reconnected within grace period — broadcast continues")
was_already_active = broadcast_state.get('active', False)
broadcast_state['active'] = True
session['is_dj'] = True
print("BROADCAST: ACTIVE")
is_mp3_input = False
if data:
if 'bitrate' in data:
global _current_bitrate
_current_bitrate = data['bitrate']
print(f"BITRATE: Setting stream bitrate to: {_current_bitrate}")
if data.get('format') == 'mp3':
is_mp3_input = True
broadcast_state['is_mp3_input'] = is_mp3_input
if not was_already_active:
# Fresh broadcast start — clear pre-roll and reset announcement flag.
with _mp3_lock:
_preroll_clear()
if is_mp3_input:
# MP3-direct mode (Qt client): the DJ still needs to press play before
# audio flows. Firing broadcast_started now would cause listeners to
# connect to /stream.mp3 before any data exists, wait 60 s, then
# disconnect. Instead we hold back the announcement and send it on
# the very first audio_chunk so listeners connect when data is ready.
_mp3_broadcast_announced = False
print("BROADCAST: MP3-direct mode — deferring broadcast_started until first chunk")
else:
# Non-MP3 (browser webm/opus): ffmpeg transcoder starts immediately,
# so listeners can connect right away.
_start_transcoder_if_needed(is_mp3_input=False)
listener_socketio.emit('broadcast_started', namespace='/')
else:
# DJ reconnected mid-broadcast - just ensure transcoder is alive (non-MP3 only)
# Do NOT clear pre-roll or trigger listener reload
print("BROADCAST: DJ reconnected - resuming existing broadcast")
if not is_mp3_input:
_start_transcoder_if_needed(is_mp3_input=False)
# For MP3-direct fresh broadcast, hold back stream_status active=true too
# so listeners don't auto-connect before audio data is flowing.
# It will be sent alongside broadcast_started on the first audio_chunk.
# For non-MP3 (browser) mode or DJ reconnects, send immediately.
if not (is_mp3_input and not was_already_active):
listener_socketio.emit('stream_status', {'active': True}, namespace='/')
@dj_socketio.on('get_listener_count')
def dj_get_listener_count():
emit('listener_count', {'count': len(listener_sids)})
@dj_socketio.on('listener_glow')
def dj_listener_glow(data):
"""DJ sets the glow intensity on the listener page."""
global _current_glow_intensity
intensity = int(data.get('intensity', 30)) if isinstance(data, dict) else 30
intensity = max(0, min(100, intensity))
_current_glow_intensity = intensity
listener_socketio.emit('listener_glow', {'intensity': intensity}, namespace='/')
@dj_socketio.on('deck_glow')
def dj_deck_glow(data):
"""Relay which decks are playing so the listener page can mirror the glow colour."""
if not isinstance(data, dict):
return
listener_socketio.emit('deck_glow', {
'A': bool(data.get('A', False)),
'B': bool(data.get('B', False)),
}, namespace='/')
@dj_socketio.on('now_playing')
def dj_now_playing(data):
"""Relay the currently playing track title to all listener pages."""
if not isinstance(data, dict):
return
# Track changed — flush the pre-roll so listeners who reconnect (e.g. after
# a stall watchdog trigger) start at the current song, not a replay of the
# previous one. This is the primary guard against the audio-loop symptom.
with _mp3_lock:
_preroll_clear()
listener_socketio.emit('now_playing', {
'title': str(data.get('title', '')),
'deck': str(data.get('deck', '')),
}, namespace='/')
@dj_socketio.on('stop_broadcast')
def dj_stop():
global _mp3_broadcast_announced
broadcast_state['active'] = False
broadcast_state['is_mp3_input'] = False
_mp3_broadcast_announced = False
session['is_dj'] = False
print("STOPPED: DJ stopped broadcasting")
_stop_transcoder()
listener_socketio.emit('broadcast_stopped', namespace='/')
listener_socketio.emit('stream_status', {'active': False}, namespace='/')
@dj_socketio.on('audio_chunk')
def dj_audio(data):
global _mp3_broadcast_announced
if broadcast_state['active'] and isinstance(data, (bytes, bytearray)):
if broadcast_state.get('is_mp3_input', False):
# MP3 input (e.g. Qt client): skip ffmpeg, send directly to listeners.
# Fire broadcast_started on the very first chunk so listeners connect
# only once actual audio data is flowing (the DJ has pressed play).
if not _mp3_broadcast_announced:
_mp3_broadcast_announced = True
print("BROADCAST: First MP3 chunk received — announcing broadcast_started to listeners")
listener_socketio.emit('broadcast_started', namespace='/')
listener_socketio.emit('stream_status', {'active': True}, namespace='/')
_distribute_mp3(bytes(data))
else:
# Other formats (e.g. webm/opus from browser): route through ffmpeg transcoder
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
_start_transcoder_if_needed(is_mp3_input=False)
_feed_transcoder(bytes(data))
# === LISTENER SERVER ===
# static_folder=None prevents Flask's built-in static handler from serving
# DJ files (like index.html) at /<path> — all static files go through our
# custom serve_static route which has security checks.
listener_app = Flask(__name__, static_folder=None, template_folder='.')
listener_app.config['SECRET_KEY'] = CONFIG_SECRET + '_listener'
listener_app.config['MAX_CONTENT_LENGTH'] = CONFIG_MAX_UPLOAD_MB * 1024 * 1024
setup_shared_routes(listener_app, index_file='listener.html')
def _listener_home():
"""Serve listener.html as a Jinja2 template so live context is injected."""
ctx = _listener_template_context()
response = make_response(render_template('listener.html', **ctx))
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
# Replace the static file serve registered by setup_shared_routes with the
# template-rendering version so Jinja2 context is injected on every page load.
listener_app.view_functions['index'] = _listener_home
# Block write/admin endpoints AND DJ-only files on the listener server
@listener_app.before_request
def _restrict_listener_routes():
"""Prevent listeners from accessing DJ-only endpoints and files."""
blocked_paths = ('/update_settings', '/upload', '/save_keymaps', '/browse_directories')
if request.path in blocked_paths:
abort(403)
# Block DJ-only files — prevents serving the DJ panel even via direct URL
dj_only_files = ('/index.html', '/script.js', '/style.css')
if request.path in dj_only_files:
abort(403)
listener_socketio = SocketIO(
listener_app,
cors_allowed_origins=CONFIG_CORS,
async_mode='eventlet',
max_http_buffer_size=CONFIG_MAX_UPLOAD_MB * 1024 * 1024,
# Lower timeouts: stale connections detected in ~25s instead of ~85s
# ping_interval: how often to probe (seconds)
# ping_timeout: how long to wait for pong before declaring dead
ping_timeout=15,
ping_interval=10,
logger=CONFIG_DEBUG,
engineio_logger=CONFIG_DEBUG
)
def _broadcast_listener_count():
"""Compute the most accurate listener count and broadcast to both panels.
Uses the larger of:
- listener_sids: Socket.IO connections (people with the page open)
- _mp3_clients: active /stream.mp3 HTTP connections (people actually hearing audio)
Taking the max avoids undercounting when someone hasn't clicked Enable Audio
yet, and also avoids undercounting direct stream URL listeners (e.g. VLC).
"""
with _mp3_lock:
stream_count = len(_mp3_clients)
count = max(len(listener_sids), stream_count)
listener_socketio.emit('listener_count', {'count': count}, namespace='/')
dj_socketio.emit('listener_count', {'count': count}, namespace='/')
return count
@listener_socketio.on('connect')
def listener_connect():
# Count immediately on connect — don't wait for join_listener
listener_sids.add(request.sid)
count = _broadcast_listener_count()
print(f"LISTENER: Connected {request.sid}. Total: {count}")
@listener_socketio.on('disconnect')
def listener_disconnect():
listener_sids.discard(request.sid)
count = _broadcast_listener_count()
print(f"REMOVED: Listener left {request.sid}. Total: {count}")
@listener_socketio.on('join_listener')
def listener_join():
# SID already added in listener_connect(); send stream status and current glow
emit('stream_status', {'active': broadcast_state['active']})
emit('listener_glow', {'intensity': _current_glow_intensity})
@listener_socketio.on('get_listener_count')
def listener_get_count():
emit('listener_count', {'count': len(listener_sids)})
# === MediaMTX webhook — SRT publish / unpublish events ===
def _listener_template_context() -> dict:
return {
'live': _srt_state['active'],
}
@listener_app.route('/api/webhook', methods=['POST'])
def mediamtx_webhook():
"""Receive publish/unpublish events from MediaMTX.
MediaMTX path config example (mediamtx.yml):
paths:
live:
runOnPublish: 'curl -s -X POST http://127.0.0.1:5001/api/webhook -H "Content-Type: application/json" -d \'{"event":"publish","path":"%MTX_PATH%","id":"%MTX_ID%"}\''
runOnUnpublish: 'curl -s -X POST http://127.0.0.1:5001/api/webhook -H "Content-Type: application/json" -d \'{"event":"unpublish","path":"%MTX_PATH%","id":"%MTX_ID%"}\''
Or if using MediaMTX >= 1.x webhook integration, point:
api > hooks > [publish|unpublish] > url: http://127.0.0.1:5001/api/webhook
"""
global _mp3_broadcast_announced
# ── Localhost-only: MediaMTX is co-located on the same server ──────────
if request.remote_addr not in ('127.0.0.1', '::1'):
abort(403)
# ── Optional shared-secret authentication ──────────────────────────────
# Set mediamtx_webhook_secret in config.json and pass the same value in
# the X-MediaMTX-Webhook-Secret request header (or as ?secret= query arg).
if _SRT_WEBHOOK_SECRET:
provided = (
request.headers.get('X-MediaMTX-Webhook-Secret')
or request.args.get('secret', '')
)
# Constant-time comparison prevents timing side-channels.
if not hmac.compare_digest(provided, _SRT_WEBHOOK_SECRET):
abort(403)
data = request.get_json(silent=True) or {}
# Support both JSON body fields and legacy query-string parameters.
event = (data.get('event') or request.args.get('event', '')).lower().strip()
path = (data.get('path') or data.get('MTX_PATH', '') or request.args.get('path', '')).strip()
source_id = (data.get('id') or data.get('sourceID', '') or '').strip()
if event == 'publish':
_srt_state.update({
'active': True,
'path': path,
'source_id': source_id,
'started_at': time.time(),
})
broadcast_state['active'] = True
broadcast_state['is_mp3_input'] = False
_mp3_broadcast_announced = True # SRT audio starts flowing immediately
print(f"SRT: Stream PUBLISHED — path='{path}' source='{source_id}'")
with _mp3_lock:
_preroll_clear()
_start_srt_transcoder()
listener_socketio.emit('broadcast_started', namespace='/')
listener_socketio.emit('stream_status', {'active': True}, namespace='/')
dj_socketio.emit( 'stream_status', {'active': True, 'source': 'srt'}, namespace='/')
return jsonify({'ok': True, 'event': 'publish', 'path': path})
if event == 'unpublish':
_srt_state.update({
'active': False,
'path': None,
'source_id': None,
'started_at': None,
})
broadcast_state['active'] = False
broadcast_state['is_mp3_input'] = False
_mp3_broadcast_announced = False
print(f"SRT: Stream UNPUBLISHED — path='{path}'")
_stop_srt_transcoder()
listener_socketio.emit('broadcast_stopped', namespace='/')
listener_socketio.emit('stream_status', {'active': False}, namespace='/')
dj_socketio.emit( 'stream_status', {'active': False, 'source': 'srt'}, namespace='/')
return jsonify({'ok': True, 'event': 'unpublish', 'path': path})
return jsonify({'ok': False, 'error': f"Unknown or missing 'event' field: '{event}'"}), 400
@listener_app.route('/api/srt_auth', methods=['POST'])
def srt_auth():
"""MediaMTX externalAuthenticationURL handler.
MediaMTX calls this endpoint for every new SRT (and RTSP/HLS) connection
attempt. Returning HTTP 200 allows the connection; anything else rejects it.
mediamtx.yml:
externalAuthenticationURL: http://127.0.0.1:5001/api/srt_auth
MediaMTX sends JSON like:
{"ip": "1.2.3.4", "action": "publish", "protocol": "srt",
"path": "aussie_dj", "user": "", "password": "", "query": "", "id": "..."}
"""
# Only MediaMTX (localhost) should be calling this
if request.remote_addr not in ('127.0.0.1', '::1'):
abort(403)
data = request.get_json(silent=True) or {}
action = (data.get('action') or '').strip().lower()
protocol = (data.get('protocol') or '').strip().lower()
client_ip = (data.get('ip') or '').strip()
return '', 200
@listener_app.route('/api/srt_status')
def srt_status():
"""Polling endpoint — returns current SRT / broadcast live state."""
proc = _srt_ffmpeg_proc
return jsonify({
'srt_active': _srt_state['active'],
'srt_path': _srt_state['path'],
'srt_source_id': _srt_state['source_id'],
'srt_started_at': _srt_state['started_at'],
'broadcast_active': broadcast_state.get('active', False),
'srt_transcoder_running': proc is not None and proc.poll() is None,
})
# DJ Panel Routes (No engine commands needed in local mode)
def _transcoder_watchdog():
"""Periodic check to ensure the ffmpeg transcoder stays alive.
Only applies to non-MP3 input; MP3 input (Qt client) is distributed directly.
"""
while True:
is_mp3_direct = broadcast_state.get('is_mp3_input', False)
if broadcast_state.get('active') and not is_mp3_direct:
# Browser WebM/opus path — watchdog for the push-based transcoder
if _srt_state['active']:
# SRT path — watchdog for the pull-based SRT transcoder
if _srt_ffmpeg_proc is None or _srt_ffmpeg_proc.poll() is not None:
print('WARNING: Watchdog: SRT transcoder dead during active stream, reviving...')
_start_srt_transcoder()
else:
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
print('WARNING: Watchdog: Transcoder dead during active broadcast, reviving...')
_start_transcoder_if_needed(is_mp3_input=False)
eventlet.sleep(5)
def _listener_count_sync_loop():
"""Periodic reconciliation — catches any edge cases where connect/disconnect
events were missed (e.g. server under load, eventlet greenlet delays)."""
while True:
eventlet.sleep(5)
_broadcast_listener_count()
_background_tasks_started = False
_background_tasks_lock = threading.Lock()
def _start_background_tasks():
"""Start the listener server and background greenlets.
Safe to call multiple times — only the first call has any effect.
Called from __main__ (direct run) and from gunicorn's post_worker_init
hook (production run via gunicorn.conf.py).
"""
global _background_tasks_started
with _background_tasks_lock:
if _background_tasks_started:
return
_background_tasks_started = True
eventlet.spawn(_listener_count_sync_loop)
eventlet.spawn(_transcoder_watchdog)
eventlet.spawn(
listener_socketio.run,
listener_app,
host=CONFIG_HOST,
port=CONFIG_LISTENER_PORT,
debug=False,
)
if __name__ == '__main__':
print("=" * 50)
print("TECHDJ PRO - DUAL PORT ARCHITECTURE")
print("=" * 50)
print(f"HOST: {CONFIG_HOST}")
print(f"URL: DJ PANEL -> http://{CONFIG_HOST}:{CONFIG_DJ_PORT}")
print(f"URL: LISTENER -> http://{CONFIG_HOST}:{CONFIG_LISTENER_PORT}")
if DJ_AUTH_ENABLED:
print("AUTH: DJ panel password ENABLED")
else:
print("AUTH: DJ panel password DISABLED")
print(f"DEBUG: {CONFIG_DEBUG}")
print("=" * 50)
print(f"READY: Server ready on {CONFIG_HOST}:{CONFIG_DJ_PORT} & {CONFIG_HOST}:{CONFIG_LISTENER_PORT}")
_start_background_tasks()
dj_socketio.run(dj_app, host=CONFIG_HOST, port=CONFIG_DJ_PORT, debug=False)