1291 lines
50 KiB
Python
1291 lines
50 KiB
Python
# Monkey patch MUST be first - before any other imports!
|
||
import eventlet
|
||
eventlet.monkey_patch()
|
||
|
||
import os
|
||
import json
|
||
import re
|
||
import subprocess
|
||
import threading
|
||
import queue
|
||
import time
|
||
import collections
|
||
import hmac
|
||
from flask import Flask, send_from_directory, jsonify, request, session, Response, stream_with_context, abort, render_template, make_response
|
||
from flask_socketio import SocketIO, emit
|
||
from dotenv import load_dotenv
|
||
load_dotenv()
|
||
|
||
|
||
|
||
def _load_config():
|
||
"""Loads optional config.json from the project root.
|
||
|
||
If missing or invalid, returns an empty dict.
|
||
"""
|
||
try:
|
||
with open('config.json', 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
return data if isinstance(data, dict) else {}
|
||
except FileNotFoundError:
|
||
return {}
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
CONFIG = _load_config()
|
||
|
||
# --- Config-driven settings (config.json > env vars > defaults) ---
|
||
CONFIG_HOST = (CONFIG.get('host') or os.environ.get('HOST') or '0.0.0.0').strip()
|
||
CONFIG_DJ_PORT = int(CONFIG.get('dj_port') or os.environ.get('DJ_PORT') or 5000)
|
||
CONFIG_LISTENER_PORT = int(CONFIG.get('listener_port') or os.environ.get('LISTEN_PORT') or 5001)
|
||
CONFIG_SECRET = (CONFIG.get('secret_key') or '').strip() or 'dj_panel_secret'
|
||
CONFIG_CORS = CONFIG.get('cors_origins', '*')
|
||
CONFIG_MAX_UPLOAD_MB = int(CONFIG.get('max_upload_mb') or 500)
|
||
CONFIG_DEBUG = bool(CONFIG.get('debug', False))
|
||
CONFIG_LISTENER_URL = (CONFIG.get('listener_url') or '').strip()
|
||
|
||
# MediaMTX / SRT integration
|
||
# secret shared with MediaMTX webhook requests (leave blank to disable auth)
|
||
_SRT_WEBHOOK_SECRET = (CONFIG.get('mediamtx_webhook_secret') or '').strip()
|
||
# SRT ingest port MediaMTX listens on (Flask does not bind this — configure in mediamtx.yml)
|
||
_MEDIAMTX_SRT_PORT = int(CONFIG.get('mediamtx_srt_port') or 5005)
|
||
# RTSP URL MediaMTX exposes for the incoming SRT path (used by ffmpeg to pull audio)
|
||
_MEDIAMTX_RTSP_URL = (CONFIG.get('mediamtx_rtsp_url') or 'rtsp://127.0.0.1:8554/live').strip()
|
||
_MEDIAMTX_HLS_URL = (CONFIG.get('mediamtx_hls_url') or 'http://techy.music:8888/aussie_dj/index.m3u8').strip()
|
||
|
||
# Allowlist of DJ source IPs permitted to publish an SRT stream.
|
||
# Accepts a list of strings or a single string in config.json.
|
||
# Empty list / omitted = no restriction (allow any source IP).
|
||
_raw_srt_ips = CONFIG.get('srt_allowed_ips') or []
|
||
if isinstance(_raw_srt_ips, str):
|
||
_raw_srt_ips = [_raw_srt_ips]
|
||
_SRT_ALLOWED_IPS: set = {ip.strip() for ip in _raw_srt_ips if isinstance(ip, str) and ip.strip()}
|
||
|
||
DJ_PANEL_PASSWORD = (CONFIG.get('dj_panel_password') or '').strip()
|
||
DJ_AUTH_ENABLED = bool(DJ_PANEL_PASSWORD)
|
||
|
||
# Broadcast State
|
||
broadcast_state = {
|
||
'active': False,
|
||
}
|
||
|
||
# SRT stream state — updated by the /api/webhook MediaMTX callback
|
||
_srt_state = {
|
||
'active': False,
|
||
'path': None,
|
||
'source_id': None,
|
||
'started_at': None,
|
||
}
|
||
|
||
# ffmpeg process that pulls audio from MediaMTX's RTSP output for an SRT session
|
||
_srt_ffmpeg_proc = None
|
||
|
||
listener_sids = set()
|
||
dj_sids = set()
|
||
|
||
# Set to True once the first audio chunk arrives in MP3-direct mode.
|
||
# broadcast_started is held back until then so listeners don't connect
|
||
# to /stream.mp3 before any data is flowing (e.g. before the DJ presses play).
|
||
_mp3_broadcast_announced = False
|
||
|
||
# Current listener glow intensity — persisted so new/reloaded listeners get the
|
||
# correct level immediately without waiting for the DJ to move the slider again.
|
||
_current_glow_intensity = 30
|
||
|
||
# Grace-period greenlet: auto-stop broadcast if DJ doesn't reconnect in time
|
||
_dj_grace_greenlet = None
|
||
DJ_GRACE_PERIOD_SECS = 20 # seconds to wait before auto-stopping
|
||
|
||
# === Optional MP3 fallback stream (server-side transcoding) ===
|
||
_ffmpeg_proc = None
|
||
_ffmpeg_in_q = queue.Queue(maxsize=200)
|
||
_current_bitrate = (CONFIG.get('stream_bitrate') or '192k').strip()
|
||
_mp3_clients = set() # set[queue.Queue]
|
||
_mp3_lock = threading.Lock()
|
||
_transcoder_bytes_out = 0
|
||
_transcoder_last_error = None
|
||
_last_audio_chunk_ts = 0.0
|
||
_mp3_preroll = collections.deque() # byte-budget FIFO; evicted from the front when over limit
|
||
_mp3_preroll_bytes = 0
|
||
_MP3_PREROLL_MAX_BYTES = 256 * 1024 # 256 KB ≈ 10 s at 192 kbps — enough for smooth reconnects
|
||
|
||
|
||
def _preroll_append(data: bytes) -> None:
|
||
"""Add a chunk to the pre-roll buffer, evicting the oldest data from the
|
||
front when the byte budget is exceeded.
|
||
MUST be called with _mp3_lock already held.
|
||
"""
|
||
global _mp3_preroll_bytes
|
||
_mp3_preroll.append(data)
|
||
_mp3_preroll_bytes += len(data)
|
||
while _mp3_preroll_bytes > _MP3_PREROLL_MAX_BYTES and _mp3_preroll:
|
||
_mp3_preroll_bytes -= len(_mp3_preroll.popleft())
|
||
|
||
|
||
def _preroll_clear() -> None:
|
||
"""Empty the pre-roll buffer and reset its byte counter.
|
||
MUST be called with _mp3_lock already held.
|
||
"""
|
||
global _mp3_preroll_bytes
|
||
_mp3_preroll.clear()
|
||
_mp3_preroll_bytes = 0
|
||
|
||
|
||
def _start_transcoder_if_needed(is_mp3_input=False):
|
||
global _ffmpeg_proc, _transcoder_last_error
|
||
|
||
# If already running, check if we need to restart
|
||
if _ffmpeg_proc is not None and _ffmpeg_proc.poll() is None:
|
||
return
|
||
|
||
# Ensure stale process is cleaned up
|
||
if _ffmpeg_proc:
|
||
try: _ffmpeg_proc.terminate()
|
||
except: pass
|
||
_ffmpeg_proc = None
|
||
|
||
codec = 'copy' if is_mp3_input else 'libmp3lame'
|
||
|
||
cmd = [
|
||
'ffmpeg',
|
||
'-hide_banner',
|
||
'-loglevel', 'error',
|
||
'-fflags', 'nobuffer',
|
||
'-flags', 'low_delay',
|
||
'-probesize', '32',
|
||
'-analyzeduration', '0'
|
||
]
|
||
|
||
if is_mp3_input:
|
||
cmd.extend(['-f', 'mp3'])
|
||
|
||
cmd.extend([
|
||
'-i', 'pipe:0',
|
||
'-vn',
|
||
'-acodec', codec,
|
||
])
|
||
|
||
if not is_mp3_input:
|
||
cmd.extend(['-b:a', _current_bitrate])
|
||
|
||
cmd.extend([
|
||
'-flush_packets', '1',
|
||
'-f', 'mp3',
|
||
'pipe:1',
|
||
])
|
||
|
||
try:
|
||
_ffmpeg_proc = subprocess.Popen(
|
||
cmd,
|
||
stdin=subprocess.PIPE,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
bufsize=0,
|
||
)
|
||
except FileNotFoundError:
|
||
_ffmpeg_proc = None
|
||
print('WARNING: ffmpeg not found; /stream.mp3 fallback disabled')
|
||
return
|
||
|
||
mode_str = "PASSTHROUGH (copy)" if is_mp3_input else f"TRANSCODE ({_current_bitrate})"
|
||
print(f'INFO: ffmpeg transcoder started ({mode_str})')
|
||
|
||
_transcoder_last_error = None
|
||
|
||
# Clear queue to avoid stale data
|
||
while not _ffmpeg_in_q.empty():
|
||
try: _ffmpeg_in_q.get_nowait()
|
||
except: break
|
||
|
||
# Define greenlets INSIDE so they close over THIS specific 'proc'.
|
||
# Blocking subprocess pipe I/O is delegated to eventlet.tpool so it runs
|
||
# in a real OS thread, preventing it from stalling the eventlet hub.
|
||
def _writer(proc):
|
||
global _transcoder_last_error
|
||
print(f"[THREAD] Transcoder writer started (PID: {proc.pid})")
|
||
while proc.poll() is None:
|
||
try:
|
||
chunk = _ffmpeg_in_q.get(timeout=1.0)
|
||
if chunk is None: break
|
||
|
||
if proc.stdin:
|
||
# Run blocking pipe-write in a real thread via tpool
|
||
eventlet.tpool.execute(proc.stdin.write, chunk)
|
||
eventlet.tpool.execute(proc.stdin.flush)
|
||
except queue.Empty:
|
||
continue
|
||
except (BrokenPipeError, ConnectionResetError):
|
||
_transcoder_last_error = "Broken pipe in writer"
|
||
break
|
||
except Exception as e:
|
||
print(f"WARNING: Transcoder writer error: {e}")
|
||
_transcoder_last_error = str(e)
|
||
break
|
||
|
||
# Ensure process is killed if greenlet exits unexpectedly
|
||
if proc.poll() is None:
|
||
try: proc.terminate()
|
||
except: pass
|
||
print(f"[THREAD] Transcoder writer finished (PID: {proc.pid})")
|
||
|
||
def _reader(proc):
|
||
global _transcoder_bytes_out, _transcoder_last_error
|
||
print(f"[THREAD] Transcoder reader started (PID: {proc.pid})")
|
||
while proc.poll() is None:
|
||
try:
|
||
# Run blocking pipe-read in a real thread via tpool (1 KB chunks
|
||
# for smooth delivery; prevents buffering delays at lower bitrates)
|
||
data = eventlet.tpool.execute(proc.stdout.read, 1024)
|
||
if not data: break
|
||
_transcoder_bytes_out += len(data)
|
||
|
||
with _mp3_lock:
|
||
_preroll_append(data)
|
||
clients = list(_mp3_clients)
|
||
|
||
for q in clients:
|
||
try:
|
||
q.put_nowait(data)
|
||
except queue.Full:
|
||
# Client is too slow — skip this chunk for them
|
||
pass
|
||
except Exception as e:
|
||
print(f"WARNING: Transcoder reader error: {e}")
|
||
_transcoder_last_error = str(e)
|
||
break
|
||
|
||
# Ensure process is killed if greenlet exits unexpectedly
|
||
if proc.poll() is None:
|
||
try: proc.terminate()
|
||
except: pass
|
||
print(f"[THREAD] Transcoder reader finished (PID: {proc.pid})")
|
||
|
||
# Spawn as greenlets — tpool handles the blocking subprocess I/O internally
|
||
eventlet.spawn(_writer, _ffmpeg_proc)
|
||
eventlet.spawn(_reader, _ffmpeg_proc)
|
||
|
||
|
||
def _stop_transcoder():
|
||
global _ffmpeg_proc
|
||
print("STOPPING: Transcoder process")
|
||
|
||
proc = _ffmpeg_proc
|
||
_ffmpeg_proc = None # Null first so _feed_transcoder stops enqueuing
|
||
|
||
# Drain stale input chunks so the None sentinel is guaranteed to fit and
|
||
# the writer greenlet exits on its next get() instead of stalling.
|
||
while True:
|
||
try: _ffmpeg_in_q.get_nowait()
|
||
except queue.Empty: break
|
||
try: _ffmpeg_in_q.put_nowait(None)
|
||
except queue.Full: pass
|
||
|
||
if proc:
|
||
try:
|
||
proc.terminate()
|
||
proc.communicate(timeout=1.0)
|
||
except:
|
||
try: proc.kill()
|
||
except: pass
|
||
|
||
with _mp3_lock:
|
||
clients = list(_mp3_clients)
|
||
for q in clients:
|
||
try: q.put_nowait(None)
|
||
except: pass
|
||
_mp3_clients.clear()
|
||
_preroll_clear()
|
||
|
||
|
||
# === SRT / MediaMTX transcoder ===
|
||
# When MediaMTX receives the incoming SRT stream it exposes it as an RTSP endpoint.
|
||
# We launch a dedicated ffmpeg process that *pulls* from that RTSP URL and feeds the
|
||
# same _mp3_clients / _mp3_preroll distribution system used by all other stream modes.
|
||
|
||
def _start_srt_transcoder():
|
||
"""Start an ffmpeg process that pulls from MediaMTX's RTSP output."""
|
||
global _srt_ffmpeg_proc, _transcoder_last_error
|
||
|
||
if _srt_ffmpeg_proc is not None and _srt_ffmpeg_proc.poll() is None:
|
||
return # Already running
|
||
|
||
if _srt_ffmpeg_proc:
|
||
try: _srt_ffmpeg_proc.terminate()
|
||
except: pass
|
||
_srt_ffmpeg_proc = None
|
||
|
||
cmd = [
|
||
'ffmpeg',
|
||
'-hide_banner', '-loglevel', 'error',
|
||
'-fflags', 'nobuffer', '-flags', 'low_delay',
|
||
'-rtsp_transport', 'tcp',
|
||
'-i', _MEDIAMTX_RTSP_URL,
|
||
'-vn',
|
||
'-acodec', 'libmp3lame',
|
||
'-b:a', _current_bitrate,
|
||
'-flush_packets', '1',
|
||
'-f', 'mp3', 'pipe:1',
|
||
]
|
||
|
||
try:
|
||
_srt_ffmpeg_proc = subprocess.Popen(
|
||
cmd,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
bufsize=0,
|
||
)
|
||
except FileNotFoundError:
|
||
_srt_ffmpeg_proc = None
|
||
print('WARNING: ffmpeg not found; SRT→MP3 bridge disabled')
|
||
return
|
||
|
||
print(f'INFO: SRT transcoder started — pulling from {_MEDIAMTX_RTSP_URL} @ {_current_bitrate}')
|
||
_transcoder_last_error = None
|
||
|
||
def _srt_reader(proc):
|
||
global _transcoder_bytes_out, _transcoder_last_error
|
||
print(f'[THREAD] SRT reader started (PID: {proc.pid})')
|
||
while proc.poll() is None:
|
||
try:
|
||
data = eventlet.tpool.execute(proc.stdout.read, 4096)
|
||
if not data:
|
||
break
|
||
_transcoder_bytes_out += len(data)
|
||
with _mp3_lock:
|
||
_preroll_append(data)
|
||
clients = list(_mp3_clients)
|
||
for q in clients:
|
||
try:
|
||
q.put_nowait(data)
|
||
except queue.Full:
|
||
pass
|
||
except Exception as e:
|
||
print(f'WARNING: SRT reader error: {e}')
|
||
_transcoder_last_error = str(e)
|
||
break
|
||
|
||
if proc.poll() is None:
|
||
try: proc.terminate()
|
||
except: pass
|
||
print(f'[THREAD] SRT reader finished (PID: {proc.pid})')
|
||
|
||
eventlet.spawn(_srt_reader, _srt_ffmpeg_proc)
|
||
|
||
|
||
def _stop_srt_transcoder():
|
||
"""Terminate the SRT→MP3 ffmpeg process and flush all listener queues."""
|
||
global _srt_ffmpeg_proc
|
||
print('STOPPING: SRT transcoder')
|
||
|
||
proc = _srt_ffmpeg_proc
|
||
_srt_ffmpeg_proc = None
|
||
|
||
if proc:
|
||
try:
|
||
proc.terminate()
|
||
proc.communicate(timeout=2.0)
|
||
except Exception:
|
||
try: proc.kill()
|
||
except: pass
|
||
|
||
with _mp3_lock:
|
||
clients = list(_mp3_clients)
|
||
for q in clients:
|
||
try: q.put_nowait(None)
|
||
except: pass
|
||
_mp3_clients.clear()
|
||
_preroll_clear()
|
||
|
||
|
||
def _feed_transcoder(data: bytes):
|
||
global _last_audio_chunk_ts
|
||
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
|
||
# If active but dead, restart it automatically (non-MP3 mode only)
|
||
if broadcast_state.get('active'):
|
||
_start_transcoder_if_needed(is_mp3_input=False)
|
||
else:
|
||
return
|
||
|
||
_last_audio_chunk_ts = time.time()
|
||
try:
|
||
_ffmpeg_in_q.put_nowait(data)
|
||
except queue.Full:
|
||
# Drop chunk if overflow to prevent memory bloat
|
||
pass
|
||
|
||
|
||
def _distribute_mp3(data: bytes):
|
||
"""Distribute MP3 bytes directly to preroll buffer and all connected listener
|
||
clients, bypassing the ffmpeg transcoder entirely.
|
||
|
||
Used when the DJ is already sending valid MP3 (e.g. the Qt desktop client)
|
||
to eliminate the unnecessary encode/decode round-trip and cut pipeline latency
|
||
roughly in half.
|
||
"""
|
||
global _transcoder_bytes_out, _last_audio_chunk_ts
|
||
_last_audio_chunk_ts = time.time()
|
||
_transcoder_bytes_out += len(data)
|
||
with _mp3_lock:
|
||
_preroll_append(data)
|
||
for q in list(_mp3_clients):
|
||
try:
|
||
q.put_nowait(data)
|
||
except queue.Full:
|
||
pass
|
||
|
||
|
||
# Load settings to get MUSIC_FOLDER
|
||
def _load_settings():
|
||
try:
|
||
if os.path.exists('settings.json'):
|
||
with open('settings.json', 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
except:
|
||
pass
|
||
return {}
|
||
|
||
SETTINGS = _load_settings()
|
||
_config_music = (CONFIG.get('music_folder') or '').strip()
|
||
MUSIC_FOLDER = _config_music or SETTINGS.get('library', {}).get('music_folder', 'music')
|
||
|
||
# Ensure music folder exists
|
||
if not os.path.exists(MUSIC_FOLDER):
|
||
try:
|
||
os.makedirs(MUSIC_FOLDER)
|
||
except:
|
||
# Fallback to default if custom path fails
|
||
MUSIC_FOLDER = "music"
|
||
if not os.path.exists(MUSIC_FOLDER):
|
||
os.makedirs(MUSIC_FOLDER)
|
||
|
||
# Helper for shared routes
|
||
def setup_shared_routes(app, index_file='index.html'):
|
||
@app.route('/library.json')
|
||
def get_library():
|
||
library = []
|
||
global MUSIC_FOLDER
|
||
if os.path.exists(MUSIC_FOLDER):
|
||
for root, dirs, files in os.walk(MUSIC_FOLDER):
|
||
for filename in sorted(files):
|
||
if filename.lower().endswith(('.mp3', '.m4a', '.wav', '.flac', '.ogg')):
|
||
rel_path = os.path.relpath(os.path.join(root, filename), MUSIC_FOLDER)
|
||
library.append({
|
||
"title": os.path.splitext(filename)[0],
|
||
"file": f"music_proxy/{rel_path}",
|
||
# Absolute path exposed for Tauri's asset protocol (convertFileSrc).
|
||
# The web fallback keeps using the music_proxy route above.
|
||
"absolutePath": os.path.abspath(os.path.join(root, filename))
|
||
})
|
||
break # Top-level only
|
||
return jsonify(library)
|
||
|
||
@app.route('/music_proxy/<path:filename>')
|
||
def music_proxy(filename):
|
||
return send_from_directory(MUSIC_FOLDER, filename)
|
||
|
||
@app.route('/browse_directories', methods=['GET'])
|
||
def browse_directories():
|
||
path = request.args.get('path', os.path.expanduser('~'))
|
||
try:
|
||
entries = []
|
||
if os.path.exists(path) and os.path.isdir(path):
|
||
# Add parent
|
||
parent = os.path.dirname(os.path.abspath(path))
|
||
entries.append({"name": "..", "path": parent, "isDir": True})
|
||
|
||
for item in sorted(os.listdir(path)):
|
||
full_path = os.path.join(path, item)
|
||
if os.path.isdir(full_path) and not item.startswith('.'):
|
||
entries.append({
|
||
"name": item,
|
||
"path": full_path,
|
||
"isDir": True
|
||
})
|
||
return jsonify({"success": True, "path": os.path.abspath(path), "entries": entries})
|
||
except Exception as e:
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
@app.route('/update_settings', methods=['POST'])
|
||
def update_settings():
|
||
try:
|
||
data = request.get_json()
|
||
# Load existing
|
||
settings = _load_settings()
|
||
|
||
# Update selectively
|
||
if 'library' not in settings: settings['library'] = {}
|
||
if 'music_folder' in data.get('library', {}):
|
||
new_folder = data['library']['music_folder']
|
||
if os.path.exists(new_folder) and os.path.isdir(new_folder):
|
||
settings['library']['music_folder'] = new_folder
|
||
global MUSIC_FOLDER
|
||
MUSIC_FOLDER = new_folder
|
||
else:
|
||
return jsonify({"success": False, "error": "Invalid folder path"}), 400
|
||
|
||
with open('settings.json', 'w', encoding='utf-8') as f:
|
||
json.dump(settings, f, indent=4)
|
||
|
||
return jsonify({"success": True})
|
||
except Exception as e:
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
|
||
@app.route('/<path:filename>')
|
||
def serve_static(filename):
|
||
# Prevent path traversal
|
||
if '..' in filename or filename.startswith('/'):
|
||
abort(403)
|
||
# Allow only known safe static file extensions
|
||
allowed_extensions = ('.css', '.js', '.html', '.htm', '.png', '.jpg', '.jpeg',
|
||
'.gif', '.svg', '.ico', '.woff', '.woff2', '.ttf', '.eot', '.map')
|
||
if not filename.endswith(allowed_extensions):
|
||
abort(403)
|
||
response = send_from_directory('.', filename)
|
||
if filename.endswith(('.css', '.js', '.html')):
|
||
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
|
||
return response
|
||
|
||
@app.route('/')
|
||
def index():
|
||
response = send_from_directory('.', index_file)
|
||
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
|
||
response.headers['Pragma'] = 'no-cache'
|
||
response.headers['Expires'] = '0'
|
||
return response
|
||
|
||
@app.route('/upload', methods=['POST'])
|
||
def upload_file():
|
||
if 'file' not in request.files:
|
||
return jsonify({"success": False, "error": "No file provided"}), 400
|
||
|
||
file = request.files['file']
|
||
|
||
if file.filename == '':
|
||
return jsonify({"success": False, "error": "No file selected"}), 400
|
||
|
||
allowed_exts = ('.mp3', '.m4a', '.wav', '.flac', '.ogg')
|
||
ext = os.path.splitext(file.filename)[1].lower()
|
||
if ext not in allowed_exts:
|
||
return jsonify({"success": False, "error": f"Supported formats: {', '.join(allowed_exts)}"}), 400
|
||
|
||
# Sanitize filename (keep extension)
|
||
name_without_ext = os.path.splitext(file.filename)[0]
|
||
name_without_ext = re.sub(r'[^\w\s-]', '', name_without_ext)
|
||
name_without_ext = re.sub(r'\s+', ' ', name_without_ext).strip()
|
||
filename = f"{name_without_ext}{ext}"
|
||
|
||
filepath = os.path.join(MUSIC_FOLDER, filename)
|
||
|
||
if os.path.exists(filepath):
|
||
return jsonify({"success": False, "error": "File already exists in library"}), 409
|
||
|
||
try:
|
||
file.save(filepath)
|
||
print(f"UPLOADED: {filename}")
|
||
return jsonify({"success": True, "filename": filename})
|
||
except Exception as e:
|
||
print(f"ERROR: Upload error: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
@app.route('/save_keymaps', methods=['POST'])
|
||
def save_keymaps():
|
||
try:
|
||
data = request.get_json()
|
||
with open('keymaps.json', 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, indent=4)
|
||
print("SAVED: Keymaps saved to keymaps.json")
|
||
return jsonify({"success": True})
|
||
except Exception as e:
|
||
print(f"ERROR: Save keymaps error: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
@app.route('/load_keymaps', methods=['GET'])
|
||
def load_keymaps():
|
||
try:
|
||
if os.path.exists('keymaps.json'):
|
||
with open('keymaps.json', 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
return jsonify({"success": True, "keymaps": data})
|
||
else:
|
||
return jsonify({"success": True, "keymaps": None})
|
||
except Exception as e:
|
||
print(f"ERROR: Load keymaps error: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
@app.route('/stream.mp3')
|
||
def stream_mp3():
|
||
"""Live MP3 audio stream from the ffmpeg transcoder."""
|
||
# If broadcast is not active, return 503 with audio/mpeg content type.
|
||
# Returning JSON here would confuse the browser's <audio> element.
|
||
if not broadcast_state.get('active'):
|
||
return Response(b'', status=503, content_type='audio/mpeg', headers={
|
||
'Retry-After': '5',
|
||
'Access-Control-Allow-Origin': '*',
|
||
'Cache-Control': 'no-cache, no-store',
|
||
})
|
||
|
||
# For non-MP3 input the server runs an ffmpeg transcoder; wait for it to start.
|
||
# For MP3 input (e.g. Qt client) chunks are distributed directly — no ffmpeg needed.
|
||
is_mp3_direct = broadcast_state.get('is_mp3_input', False)
|
||
if not is_mp3_direct:
|
||
waited = 0.0
|
||
while (_ffmpeg_proc is None or _ffmpeg_proc.poll() is not None) and waited < 5.0:
|
||
eventlet.sleep(0.5)
|
||
waited += 0.5
|
||
|
||
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
|
||
return Response(b'', status=503, content_type='audio/mpeg', headers={
|
||
'Retry-After': '3',
|
||
'Access-Control-Allow-Origin': '*',
|
||
'Cache-Control': 'no-cache, no-store',
|
||
})
|
||
|
||
client_q = queue.Queue(maxsize=500)
|
||
with _mp3_lock:
|
||
preroll_bytes = _mp3_preroll_bytes
|
||
for chunk in _mp3_preroll:
|
||
try:
|
||
client_q.put_nowait(chunk)
|
||
except Exception:
|
||
break
|
||
_mp3_clients.add(client_q)
|
||
print(f"LISTENER: New listener joined stream (Pre-roll: {preroll_bytes} bytes)")
|
||
|
||
def gen():
|
||
idle_checks = 0
|
||
try:
|
||
while True:
|
||
try:
|
||
chunk = client_q.get(timeout=5)
|
||
idle_checks = 0
|
||
except queue.Empty:
|
||
if not broadcast_state.get('active'):
|
||
break
|
||
idle_checks += 1
|
||
if idle_checks >= 12: # 60s total with no audio data
|
||
break
|
||
continue
|
||
if chunk is None:
|
||
break
|
||
yield chunk
|
||
finally:
|
||
with _mp3_lock:
|
||
_mp3_clients.discard(client_q)
|
||
print("LISTENER: Listener disconnected from stream")
|
||
|
||
return Response(stream_with_context(gen()), content_type='audio/mpeg', headers={
|
||
'Cache-Control': 'no-cache, no-store, must-revalidate',
|
||
'Connection': 'keep-alive',
|
||
'Pragma': 'no-cache',
|
||
'Expires': '0',
|
||
'X-Content-Type-Options': 'nosniff',
|
||
'Access-Control-Allow-Origin': '*',
|
||
'Icy-Name': 'TechDJ Live',
|
||
'X-Accel-Buffering': 'no', # Tell nginx/Cloudflare not to buffer this stream
|
||
})
|
||
|
||
@app.route('/stream_debug')
|
||
def stream_debug():
|
||
proc = _ffmpeg_proc
|
||
running = proc is not None and proc.poll() is None
|
||
return jsonify({
|
||
'broadcast_active': broadcast_state.get('active', False),
|
||
'broadcast_mimeType': broadcast_state.get('mimeType'),
|
||
'ffmpeg_running': running,
|
||
'ffmpeg_found': (proc is not None),
|
||
'mp3_clients': len(_mp3_clients),
|
||
'preroll_chunks': len(_mp3_preroll),
|
||
'preroll_bytes': _mp3_preroll_bytes,
|
||
'transcoder_bytes_out': _transcoder_bytes_out,
|
||
'transcoder_last_error': _transcoder_last_error,
|
||
'last_audio_chunk_ts': _last_audio_chunk_ts,
|
||
})
|
||
|
||
# === DJ SERVER ===
|
||
dj_app = Flask(__name__, static_folder='.', static_url_path='')
|
||
dj_app.config['SECRET_KEY'] = CONFIG_SECRET
|
||
dj_app.config['MAX_CONTENT_LENGTH'] = CONFIG_MAX_UPLOAD_MB * 1024 * 1024
|
||
setup_shared_routes(dj_app)
|
||
|
||
|
||
@dj_app.before_request
|
||
def _protect_dj_panel():
|
||
"""Optionally require a password for the DJ panel only (port 5000).
|
||
|
||
This does not affect the listener server (port 5001).
|
||
"""
|
||
if not DJ_AUTH_ENABLED:
|
||
return None
|
||
|
||
# Allow login/logout endpoints
|
||
if request.path in ('/login', '/logout'):
|
||
return None
|
||
|
||
# If already authenticated, allow
|
||
if session.get('dj_authed') is True:
|
||
return None
|
||
|
||
# Redirect everything else to login
|
||
return (
|
||
"<!doctype html><html><head><meta http-equiv='refresh' content='0; url=/login' /></head>"
|
||
"<body>Redirecting to <a href='/login'>/login</a>...</body></html>",
|
||
302,
|
||
{'Location': '/login'}
|
||
)
|
||
|
||
|
||
@dj_app.route('/login', methods=['GET', 'POST'])
|
||
def dj_login():
|
||
if not DJ_AUTH_ENABLED:
|
||
# If auth is disabled, just go to the panel.
|
||
session['dj_authed'] = True
|
||
return (
|
||
"<!doctype html><html><head><meta http-equiv='refresh' content='0; url=/' /></head>"
|
||
"<body>Auth disabled. Redirecting...</body></html>",
|
||
302,
|
||
{'Location': '/'}
|
||
)
|
||
|
||
error = None
|
||
if request.method == 'POST':
|
||
pw = (request.form.get('password') or '').strip()
|
||
if pw == DJ_PANEL_PASSWORD:
|
||
session['dj_authed'] = True
|
||
return (
|
||
"<!doctype html><html><head><meta http-equiv='refresh' content='0; url=/' /></head>"
|
||
"<body>Logged in. Redirecting...</body></html>",
|
||
302,
|
||
{'Location': '/'}
|
||
)
|
||
error = 'Invalid password'
|
||
|
||
# Minimal inline login page (no new assets)
|
||
return f"""<!doctype html>
|
||
<html lang=\"en\">
|
||
<head>
|
||
<meta charset=\"utf-8\" />
|
||
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1\" />
|
||
<title>TechDJ - DJ Login</title>
|
||
<style>
|
||
body {{ background:#0a0a12; color:#eee; font-family: system-ui, -apple-system, Segoe UI, Roboto, Arial; margin:0; }}
|
||
.wrap {{ min-height:100vh; display:flex; align-items:center; justify-content:center; padding:24px; }}
|
||
.card {{ width:100%; max-width:420px; background:rgba(10,10,20,0.85); border:2px solid #bc13fe; border-radius:16px; padding:24px; box-shadow:0 0 40px rgba(188,19,254,0.25); }}
|
||
h1 {{ margin:0 0 16px 0; font-size:22px; }}
|
||
label {{ display:block; margin:12px 0 8px; opacity:0.9; }}
|
||
input {{ width:100%; padding:12px; border-radius:10px; border:1px solid rgba(255,255,255,0.15); background:rgba(0,0,0,0.35); color:#fff; }}
|
||
button {{ width:100%; margin-top:14px; padding:12px; border-radius:10px; border:2px solid #bc13fe; background:rgba(188,19,254,0.15); color:#fff; font-weight:700; cursor:pointer; }}
|
||
.err {{ margin-top:12px; color:#ffb3ff; }}
|
||
.hint {{ margin-top:10px; font-size:12px; opacity:0.7; }}
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<div class=\"wrap\">
|
||
<div class=\"card\">
|
||
<h1>DJ Panel Locked</h1>
|
||
<form method=\"post\" action=\"/login\">
|
||
<label for=\"password\">Password</label>
|
||
<input id=\"password\" name=\"password\" type=\"password\" autocomplete=\"current-password\" autofocus />
|
||
<button type=\"submit\">Unlock DJ Panel</button>
|
||
{f"<div class='err'>{error}</div>" if error else ""}
|
||
</form>
|
||
</div>
|
||
</div>
|
||
</body>
|
||
</html>"""
|
||
|
||
|
||
@dj_app.route('/logout')
|
||
def dj_logout():
|
||
session.pop('dj_authed', None)
|
||
return (
|
||
"<!doctype html><html><head><meta http-equiv='refresh' content='0; url=/login' /></head>"
|
||
"<body>Logged out. Redirecting...</body></html>",
|
||
302,
|
||
{'Location': '/login'}
|
||
)
|
||
|
||
@dj_app.route('/client_config')
|
||
def client_config():
|
||
"""Expose server-side config values needed by the DJ panel client."""
|
||
return jsonify({'listener_url': CONFIG_LISTENER_URL})
|
||
|
||
dj_socketio = SocketIO(
|
||
dj_app,
|
||
cors_allowed_origins=CONFIG_CORS,
|
||
async_mode='eventlet',
|
||
max_http_buffer_size=CONFIG_MAX_UPLOAD_MB * 1024 * 1024,
|
||
ping_timeout=60,
|
||
ping_interval=25,
|
||
logger=CONFIG_DEBUG,
|
||
engineio_logger=CONFIG_DEBUG
|
||
)
|
||
|
||
@dj_socketio.on('connect')
|
||
def dj_connect():
|
||
if DJ_AUTH_ENABLED and session.get('dj_authed') is not True:
|
||
print(f"REJECTED: DJ socket rejected (unauthorized): {request.sid}")
|
||
return False
|
||
print(f"STREAMPANEL: DJ connected: {request.sid}")
|
||
dj_sids.add(request.sid)
|
||
|
||
def _dj_disconnect_grace():
|
||
"""Auto-stop broadcast if no DJ reconnects within the grace period."""
|
||
global _dj_grace_greenlet, _mp3_broadcast_announced
|
||
print(f"INFO: Grace period started — {DJ_GRACE_PERIOD_SECS}s for DJ to reconnect")
|
||
eventlet.sleep(DJ_GRACE_PERIOD_SECS)
|
||
if broadcast_state.get('active') and not dj_sids:
|
||
print("WARNING: Grace period expired, no DJ reconnected — auto-stopping broadcast")
|
||
broadcast_state['active'] = False
|
||
broadcast_state['is_mp3_input'] = False
|
||
_mp3_broadcast_announced = False
|
||
_stop_transcoder()
|
||
listener_socketio.emit('broadcast_stopped', namespace='/')
|
||
listener_socketio.emit('stream_status', {'active': False}, namespace='/')
|
||
_dj_grace_greenlet = None
|
||
|
||
|
||
@dj_socketio.on('disconnect')
|
||
def dj_disconnect():
|
||
global _dj_grace_greenlet
|
||
dj_sids.discard(request.sid)
|
||
print(f"WARNING: DJ disconnected ({request.sid}). Remaining DJs: {len(dj_sids)}")
|
||
|
||
# If broadcast is active and no other DJs remain, start the grace period
|
||
if broadcast_state.get('active') and not dj_sids:
|
||
if _dj_grace_greenlet is None:
|
||
_dj_grace_greenlet = eventlet.spawn(_dj_disconnect_grace)
|
||
elif not broadcast_state.get('active'):
|
||
# Nothing to do — no active broadcast
|
||
pass
|
||
else:
|
||
print("INFO: Other DJ(s) still connected — broadcast continues uninterrupted")
|
||
|
||
@dj_socketio.on('start_broadcast')
|
||
def dj_start(data=None):
|
||
global _dj_grace_greenlet, _mp3_broadcast_announced
|
||
|
||
# Cancel any pending auto-stop grace period (DJ reconnected in time)
|
||
if _dj_grace_greenlet is not None:
|
||
_dj_grace_greenlet.kill(block=True) # block=True prevents a GreenletExit race
|
||
_dj_grace_greenlet = None
|
||
print("INFO: DJ reconnected within grace period — broadcast continues")
|
||
|
||
was_already_active = broadcast_state.get('active', False)
|
||
broadcast_state['active'] = True
|
||
session['is_dj'] = True
|
||
print("BROADCAST: ACTIVE")
|
||
|
||
is_mp3_input = False
|
||
if data:
|
||
if 'bitrate' in data:
|
||
global _current_bitrate
|
||
_current_bitrate = data['bitrate']
|
||
print(f"BITRATE: Setting stream bitrate to: {_current_bitrate}")
|
||
if data.get('format') == 'mp3':
|
||
is_mp3_input = True
|
||
broadcast_state['is_mp3_input'] = is_mp3_input
|
||
|
||
if not was_already_active:
|
||
# Fresh broadcast start — clear pre-roll and reset announcement flag.
|
||
with _mp3_lock:
|
||
_preroll_clear()
|
||
|
||
if is_mp3_input:
|
||
# MP3-direct mode (Qt client): the DJ still needs to press play before
|
||
# audio flows. Firing broadcast_started now would cause listeners to
|
||
# connect to /stream.mp3 before any data exists, wait 60 s, then
|
||
# disconnect. Instead we hold back the announcement and send it on
|
||
# the very first audio_chunk so listeners connect when data is ready.
|
||
_mp3_broadcast_announced = False
|
||
print("BROADCAST: MP3-direct mode — deferring broadcast_started until first chunk")
|
||
else:
|
||
# Non-MP3 (browser webm/opus): ffmpeg transcoder starts immediately,
|
||
# so listeners can connect right away.
|
||
_start_transcoder_if_needed(is_mp3_input=False)
|
||
listener_socketio.emit('broadcast_started', namespace='/')
|
||
else:
|
||
# DJ reconnected mid-broadcast - just ensure transcoder is alive (non-MP3 only)
|
||
# Do NOT clear pre-roll or trigger listener reload
|
||
print("BROADCAST: DJ reconnected - resuming existing broadcast")
|
||
if not is_mp3_input:
|
||
_start_transcoder_if_needed(is_mp3_input=False)
|
||
|
||
# For MP3-direct fresh broadcast, hold back stream_status active=true too
|
||
# so listeners don't auto-connect before audio data is flowing.
|
||
# It will be sent alongside broadcast_started on the first audio_chunk.
|
||
# For non-MP3 (browser) mode or DJ reconnects, send immediately.
|
||
if not (is_mp3_input and not was_already_active):
|
||
listener_socketio.emit('stream_status', {'active': True}, namespace='/')
|
||
|
||
@dj_socketio.on('get_listener_count')
|
||
def dj_get_listener_count():
|
||
emit('listener_count', {'count': len(listener_sids)})
|
||
|
||
@dj_socketio.on('listener_glow')
|
||
def dj_listener_glow(data):
|
||
"""DJ sets the glow intensity on the listener page."""
|
||
global _current_glow_intensity
|
||
intensity = int(data.get('intensity', 30)) if isinstance(data, dict) else 30
|
||
intensity = max(0, min(100, intensity))
|
||
_current_glow_intensity = intensity
|
||
listener_socketio.emit('listener_glow', {'intensity': intensity}, namespace='/')
|
||
|
||
@dj_socketio.on('deck_glow')
|
||
def dj_deck_glow(data):
|
||
"""Relay which decks are playing so the listener page can mirror the glow colour."""
|
||
if not isinstance(data, dict):
|
||
return
|
||
listener_socketio.emit('deck_glow', {
|
||
'A': bool(data.get('A', False)),
|
||
'B': bool(data.get('B', False)),
|
||
}, namespace='/')
|
||
|
||
@dj_socketio.on('now_playing')
|
||
def dj_now_playing(data):
|
||
"""Relay the currently playing track title to all listener pages."""
|
||
if not isinstance(data, dict):
|
||
return
|
||
# Track changed — flush the pre-roll so listeners who reconnect (e.g. after
|
||
# a stall watchdog trigger) start at the current song, not a replay of the
|
||
# previous one. This is the primary guard against the audio-loop symptom.
|
||
with _mp3_lock:
|
||
_preroll_clear()
|
||
listener_socketio.emit('now_playing', {
|
||
'title': str(data.get('title', '')),
|
||
'deck': str(data.get('deck', '')),
|
||
}, namespace='/')
|
||
|
||
@dj_socketio.on('stop_broadcast')
|
||
def dj_stop():
|
||
global _mp3_broadcast_announced
|
||
broadcast_state['active'] = False
|
||
broadcast_state['is_mp3_input'] = False
|
||
_mp3_broadcast_announced = False
|
||
session['is_dj'] = False
|
||
print("STOPPED: DJ stopped broadcasting")
|
||
|
||
_stop_transcoder()
|
||
|
||
listener_socketio.emit('broadcast_stopped', namespace='/')
|
||
listener_socketio.emit('stream_status', {'active': False}, namespace='/')
|
||
|
||
@dj_socketio.on('audio_chunk')
|
||
def dj_audio(data):
|
||
global _mp3_broadcast_announced
|
||
if broadcast_state['active'] and isinstance(data, (bytes, bytearray)):
|
||
if broadcast_state.get('is_mp3_input', False):
|
||
# MP3 input (e.g. Qt client): skip ffmpeg, send directly to listeners.
|
||
# Fire broadcast_started on the very first chunk so listeners connect
|
||
# only once actual audio data is flowing (the DJ has pressed play).
|
||
if not _mp3_broadcast_announced:
|
||
_mp3_broadcast_announced = True
|
||
print("BROADCAST: First MP3 chunk received — announcing broadcast_started to listeners")
|
||
listener_socketio.emit('broadcast_started', namespace='/')
|
||
listener_socketio.emit('stream_status', {'active': True}, namespace='/')
|
||
_distribute_mp3(bytes(data))
|
||
else:
|
||
# Other formats (e.g. webm/opus from browser): route through ffmpeg transcoder
|
||
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
|
||
_start_transcoder_if_needed(is_mp3_input=False)
|
||
_feed_transcoder(bytes(data))
|
||
|
||
# === LISTENER SERVER ===
|
||
# static_folder=None prevents Flask's built-in static handler from serving
|
||
# DJ files (like index.html) at /<path> — all static files go through our
|
||
# custom serve_static route which has security checks.
|
||
listener_app = Flask(__name__, static_folder=None, template_folder='.')
|
||
listener_app.config['SECRET_KEY'] = CONFIG_SECRET + '_listener'
|
||
listener_app.config['MAX_CONTENT_LENGTH'] = CONFIG_MAX_UPLOAD_MB * 1024 * 1024
|
||
setup_shared_routes(listener_app, index_file='listener.html')
|
||
|
||
|
||
def _listener_home():
|
||
"""Serve listener.html as a Jinja2 template so live context is injected."""
|
||
ctx = _listener_template_context()
|
||
response = make_response(render_template('listener.html', **ctx))
|
||
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
|
||
response.headers['Pragma'] = 'no-cache'
|
||
response.headers['Expires'] = '0'
|
||
return response
|
||
|
||
|
||
# Replace the static file serve registered by setup_shared_routes with the
|
||
# template-rendering version so Jinja2 context is injected on every page load.
|
||
listener_app.view_functions['index'] = _listener_home
|
||
|
||
|
||
# Block write/admin endpoints AND DJ-only files on the listener server
|
||
@listener_app.before_request
|
||
def _restrict_listener_routes():
|
||
"""Prevent listeners from accessing DJ-only endpoints and files."""
|
||
blocked_paths = ('/update_settings', '/upload', '/save_keymaps', '/browse_directories')
|
||
if request.path in blocked_paths:
|
||
abort(403)
|
||
# Block DJ-only files — prevents serving the DJ panel even via direct URL
|
||
dj_only_files = ('/index.html', '/script.js', '/style.css')
|
||
if request.path in dj_only_files:
|
||
abort(403)
|
||
listener_socketio = SocketIO(
|
||
listener_app,
|
||
cors_allowed_origins=CONFIG_CORS,
|
||
async_mode='eventlet',
|
||
max_http_buffer_size=CONFIG_MAX_UPLOAD_MB * 1024 * 1024,
|
||
# Lower timeouts: stale connections detected in ~25s instead of ~85s
|
||
# ping_interval: how often to probe (seconds)
|
||
# ping_timeout: how long to wait for pong before declaring dead
|
||
ping_timeout=15,
|
||
ping_interval=10,
|
||
logger=CONFIG_DEBUG,
|
||
engineio_logger=CONFIG_DEBUG
|
||
)
|
||
|
||
def _broadcast_listener_count():
|
||
"""Compute the most accurate listener count and broadcast to both panels.
|
||
|
||
Uses the larger of:
|
||
- listener_sids: Socket.IO connections (people with the page open)
|
||
- _mp3_clients: active /stream.mp3 HTTP connections (people actually hearing audio)
|
||
|
||
Taking the max avoids undercounting when someone hasn't clicked Enable Audio
|
||
yet, and also avoids undercounting direct stream URL listeners (e.g. VLC).
|
||
"""
|
||
with _mp3_lock:
|
||
stream_count = len(_mp3_clients)
|
||
count = max(len(listener_sids), stream_count)
|
||
listener_socketio.emit('listener_count', {'count': count}, namespace='/')
|
||
dj_socketio.emit('listener_count', {'count': count}, namespace='/')
|
||
return count
|
||
|
||
|
||
@listener_socketio.on('connect')
|
||
def listener_connect():
|
||
# Count immediately on connect — don't wait for join_listener
|
||
listener_sids.add(request.sid)
|
||
count = _broadcast_listener_count()
|
||
print(f"LISTENER: Connected {request.sid}. Total: {count}")
|
||
|
||
@listener_socketio.on('disconnect')
|
||
def listener_disconnect():
|
||
listener_sids.discard(request.sid)
|
||
count = _broadcast_listener_count()
|
||
print(f"REMOVED: Listener left {request.sid}. Total: {count}")
|
||
|
||
@listener_socketio.on('join_listener')
|
||
def listener_join():
|
||
# SID already added in listener_connect(); send stream status and current glow
|
||
emit('stream_status', {'active': broadcast_state['active']})
|
||
emit('listener_glow', {'intensity': _current_glow_intensity})
|
||
|
||
@listener_socketio.on('get_listener_count')
|
||
def listener_get_count():
|
||
emit('listener_count', {'count': len(listener_sids)})
|
||
|
||
|
||
# === MediaMTX webhook — SRT publish / unpublish events ===
|
||
|
||
def _listener_template_context() -> dict:
|
||
"""Return Jinja2 template context for listener.html.
|
||
|
||
Keys:
|
||
live – True when the SRT stream is currently active.
|
||
hls_url – MediaMTX HLS playlist URL for the video player.
|
||
"""
|
||
return {
|
||
'live': _srt_state['active'],
|
||
'hls_url': _MEDIAMTX_HLS_URL,
|
||
}
|
||
|
||
|
||
@listener_app.route('/api/webhook', methods=['POST'])
|
||
def mediamtx_webhook():
|
||
"""Receive publish/unpublish events from MediaMTX.
|
||
|
||
MediaMTX path config example (mediamtx.yml):
|
||
paths:
|
||
live:
|
||
runOnPublish: 'curl -s -X POST http://127.0.0.1:5001/api/webhook -H "Content-Type: application/json" -d \'{"event":"publish","path":"%MTX_PATH%","id":"%MTX_ID%"}\''
|
||
runOnUnpublish: 'curl -s -X POST http://127.0.0.1:5001/api/webhook -H "Content-Type: application/json" -d \'{"event":"unpublish","path":"%MTX_PATH%","id":"%MTX_ID%"}\''
|
||
|
||
Or if using MediaMTX >= 1.x webhook integration, point:
|
||
api > hooks > [publish|unpublish] > url: http://127.0.0.1:5001/api/webhook
|
||
"""
|
||
global _mp3_broadcast_announced
|
||
|
||
# ── Localhost-only: MediaMTX is co-located on the same server ──────────
|
||
if request.remote_addr not in ('127.0.0.1', '::1'):
|
||
abort(403)
|
||
|
||
# ── Optional shared-secret authentication ──────────────────────────────
|
||
# Set mediamtx_webhook_secret in config.json and pass the same value in
|
||
# the X-MediaMTX-Webhook-Secret request header (or as ?secret= query arg).
|
||
if _SRT_WEBHOOK_SECRET:
|
||
provided = (
|
||
request.headers.get('X-MediaMTX-Webhook-Secret')
|
||
or request.args.get('secret', '')
|
||
)
|
||
# Constant-time comparison prevents timing side-channels.
|
||
if not hmac.compare_digest(provided, _SRT_WEBHOOK_SECRET):
|
||
abort(403)
|
||
|
||
data = request.get_json(silent=True) or {}
|
||
# Support both JSON body fields and legacy query-string parameters.
|
||
event = (data.get('event') or request.args.get('event', '')).lower().strip()
|
||
path = (data.get('path') or data.get('MTX_PATH', '') or request.args.get('path', '')).strip()
|
||
source_id = (data.get('id') or data.get('sourceID', '') or '').strip()
|
||
|
||
# ── Source-IP allowlist (mediamtx >= 1.x sends source.remoteAddr) ──────
|
||
# Extract publisher IP from the nested source object MediaMTX sends in
|
||
# its native webhook payload. curl-based runOnPublish callers won't
|
||
# include this field, so we fall through silently when it's absent.
|
||
raw_remote = ''
|
||
if isinstance(data.get('source'), dict):
|
||
raw_remote = data['source'].get('remoteAddr', '')
|
||
# remoteAddr is "ip:port" — strip the port
|
||
publisher_ip = raw_remote.rsplit(':', 1)[0].strip('[]') if raw_remote else ''
|
||
|
||
if _SRT_ALLOWED_IPS and publisher_ip and publisher_ip not in _SRT_ALLOWED_IPS:
|
||
print(f"SRT WEBHOOK: Rejected publish from unauthorized IP '{publisher_ip}'")
|
||
return jsonify({'ok': False, 'error': 'source IP not in allowlist'}), 403
|
||
|
||
if event == 'publish':
|
||
_srt_state.update({
|
||
'active': True,
|
||
'path': path,
|
||
'source_id': source_id,
|
||
'started_at': time.time(),
|
||
})
|
||
broadcast_state['active'] = True
|
||
broadcast_state['is_mp3_input'] = False
|
||
_mp3_broadcast_announced = True # SRT audio starts flowing immediately
|
||
|
||
print(f"SRT: Stream PUBLISHED — path='{path}' source='{source_id}' ip='{publisher_ip or "unknown"}'")
|
||
with _mp3_lock:
|
||
_preroll_clear()
|
||
|
||
_start_srt_transcoder()
|
||
listener_socketio.emit('broadcast_started', namespace='/')
|
||
listener_socketio.emit('stream_status', {'active': True}, namespace='/')
|
||
dj_socketio.emit( 'stream_status', {'active': True, 'source': 'srt'}, namespace='/')
|
||
return jsonify({'ok': True, 'event': 'publish', 'path': path})
|
||
|
||
if event == 'unpublish':
|
||
_srt_state.update({
|
||
'active': False,
|
||
'path': None,
|
||
'source_id': None,
|
||
'started_at': None,
|
||
})
|
||
broadcast_state['active'] = False
|
||
broadcast_state['is_mp3_input'] = False
|
||
_mp3_broadcast_announced = False
|
||
|
||
print(f"SRT: Stream UNPUBLISHED — path='{path}'")
|
||
_stop_srt_transcoder()
|
||
listener_socketio.emit('broadcast_stopped', namespace='/')
|
||
listener_socketio.emit('stream_status', {'active': False}, namespace='/')
|
||
dj_socketio.emit( 'stream_status', {'active': False, 'source': 'srt'}, namespace='/')
|
||
return jsonify({'ok': True, 'event': 'unpublish', 'path': path})
|
||
|
||
return jsonify({'ok': False, 'error': f"Unknown or missing 'event' field: '{event}'"}), 400
|
||
|
||
|
||
@listener_app.route('/api/srt_auth', methods=['POST'])
|
||
def srt_auth():
|
||
"""MediaMTX externalAuthenticationURL handler.
|
||
|
||
MediaMTX calls this endpoint for every new SRT (and RTSP/HLS) connection
|
||
attempt. Returning HTTP 200 allows the connection; anything else rejects it.
|
||
|
||
mediamtx.yml:
|
||
externalAuthenticationURL: http://127.0.0.1:5001/api/srt_auth
|
||
|
||
MediaMTX sends JSON like:
|
||
{"ip": "1.2.3.4", "action": "publish", "protocol": "srt",
|
||
"path": "aussie_dj", "user": "", "password": "", "query": "", "id": "..."}
|
||
"""
|
||
# Only MediaMTX (localhost) should be calling this
|
||
if request.remote_addr not in ('127.0.0.1', '::1'):
|
||
abort(403)
|
||
|
||
data = request.get_json(silent=True) or {}
|
||
action = (data.get('action') or '').strip().lower()
|
||
protocol = (data.get('protocol') or '').strip().lower()
|
||
client_ip = (data.get('ip') or '').strip()
|
||
|
||
# Only gate SRT publish actions — allow reads and other protocols through.
|
||
if action == 'publish' and protocol == 'srt':
|
||
if _SRT_ALLOWED_IPS and client_ip not in _SRT_ALLOWED_IPS:
|
||
print(f"SRT AUTH: Blocked '{client_ip}' — not in srt_allowed_ips")
|
||
return '', 403
|
||
print(f"SRT AUTH: Allowed '{client_ip}'")
|
||
|
||
return '', 200
|
||
|
||
|
||
@listener_app.route('/api/srt_status')
|
||
def srt_status():
|
||
"""Polling endpoint — returns current SRT / broadcast live state."""
|
||
proc = _srt_ffmpeg_proc
|
||
return jsonify({
|
||
'srt_active': _srt_state['active'],
|
||
'srt_path': _srt_state['path'],
|
||
'srt_source_id': _srt_state['source_id'],
|
||
'srt_started_at': _srt_state['started_at'],
|
||
'broadcast_active': broadcast_state.get('active', False),
|
||
'srt_transcoder_running': proc is not None and proc.poll() is None,
|
||
'srt_allowed_ips': sorted(_SRT_ALLOWED_IPS) if _SRT_ALLOWED_IPS else 'any',
|
||
})
|
||
|
||
|
||
# DJ Panel Routes (No engine commands needed in local mode)
|
||
def _transcoder_watchdog():
|
||
"""Periodic check to ensure the ffmpeg transcoder stays alive.
|
||
Only applies to non-MP3 input; MP3 input (Qt client) is distributed directly.
|
||
"""
|
||
while True:
|
||
is_mp3_direct = broadcast_state.get('is_mp3_input', False)
|
||
if broadcast_state.get('active') and not is_mp3_direct:
|
||
# Browser WebM/opus path — watchdog for the push-based transcoder
|
||
if _srt_state['active']:
|
||
# SRT path — watchdog for the pull-based SRT transcoder
|
||
if _srt_ffmpeg_proc is None or _srt_ffmpeg_proc.poll() is not None:
|
||
print('WARNING: Watchdog: SRT transcoder dead during active stream, reviving...')
|
||
_start_srt_transcoder()
|
||
else:
|
||
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
|
||
print('WARNING: Watchdog: Transcoder dead during active broadcast, reviving...')
|
||
_start_transcoder_if_needed(is_mp3_input=False)
|
||
eventlet.sleep(5)
|
||
|
||
|
||
def _listener_count_sync_loop():
|
||
"""Periodic reconciliation — catches any edge cases where connect/disconnect
|
||
events were missed (e.g. server under load, eventlet greenlet delays)."""
|
||
while True:
|
||
eventlet.sleep(5)
|
||
_broadcast_listener_count()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
print("=" * 50)
|
||
print("TECHDJ PRO - DUAL PORT ARCHITECTURE")
|
||
print("=" * 50)
|
||
print(f"HOST: {CONFIG_HOST}")
|
||
print(f"URL: DJ PANEL -> http://{CONFIG_HOST}:{CONFIG_DJ_PORT}")
|
||
print(f"URL: LISTENER -> http://{CONFIG_HOST}:{CONFIG_LISTENER_PORT}")
|
||
if DJ_AUTH_ENABLED:
|
||
print("AUTH: DJ panel password ENABLED")
|
||
else:
|
||
print("AUTH: DJ panel password DISABLED")
|
||
print(f"DEBUG: {CONFIG_DEBUG}")
|
||
print("=" * 50)
|
||
print(f"READY: Server ready on {CONFIG_HOST}:{CONFIG_DJ_PORT} & {CONFIG_HOST}:{CONFIG_LISTENER_PORT}")
|
||
|
||
# Run both servers using eventlet's spawn
|
||
eventlet.spawn(_listener_count_sync_loop)
|
||
eventlet.spawn(_transcoder_watchdog)
|
||
eventlet.spawn(dj_socketio.run, dj_app, host=CONFIG_HOST, port=CONFIG_DJ_PORT, debug=False)
|
||
listener_socketio.run(listener_app, host=CONFIG_HOST, port=CONFIG_LISTENER_PORT, debug=False)
|