feat: MediaMTX SRT integration, IP allowlist, pre-roll bugfixes

This commit is contained in:
ComputerTech 2026-04-04 12:33:45 +01:00
parent 20e56f37b8
commit 3c0174ff31
3 changed files with 431 additions and 26 deletions

View File

@ -15,5 +15,12 @@
"debug": false,
"_comment_listener_url": "Public URL of the listener page. Shown in DJ panel as the shareable stream link. Leave empty to auto-detect.",
"listener_url": ""
"listener_url": "",
"_comment_mediamtx": "MediaMTX / SRT integration. mediamtx_webhook_secret is the shared secret sent in the X-MediaMTX-Webhook-Secret header by webhook callers (leave blank to disable auth). mediamtx_rtsp_url is the RTSP URL MediaMTX exposes for the incoming SRT path.",
"mediamtx_webhook_secret": "",
"mediamtx_rtsp_url": "rtsp://127.0.0.1:8554/live",
"mediamtx_hls_url": "http://techy.music:8888/aussie_dj/index.m3u8",
"_comment_srt_allowed_ips": "List of DJ source IPs allowed to publish an SRT stream. Empty = allow all.",
"srt_allowed_ips": []
}

View File

@ -13,9 +13,9 @@
<div class="listener-mode" id="listener-mode">
<div class="listener-header">
<h1>TECHY.MUSIC</h1>
<div class="live-indicator">
<div class="live-indicator" id="live-badge" {% if not live %}hidden{% endif %}>
<span class="pulse-dot"></span>
<span>LIVE</span>
<span>🔴 LIVE</span>
</div>
</div>
<div class="listener-content">
@ -23,6 +23,12 @@
<canvas id="viz-listener" width="400" height="100"></canvas>
<!-- HLS video player — shown only when the SRT stream is active -->
<div id="hls-player-container" {% if not live %}hidden{% endif %} style="width:100%;margin-top:16px;text-align:center;">
<video id="hls-video" controls autoplay muted playsinline
style="width:100%;max-width:720px;border-radius:12px;background:#000;"></video>
</div>
<!-- Enable Audio Button (shown when autoplay is blocked) -->
<button class="enable-audio-btn" id="enable-audio-btn" onclick="enableListenerAudio()">
<span class="audio-icon">🎧</span>
@ -41,6 +47,59 @@
<script src="https://cdn.socket.io/4.5.4/socket.io.min.js"></script>
<script src="listener.js?v=2.0"></script>
<script src="https://cdn.jsdelivr.net/npm/hls.js@1/dist/hls.min.js"></script>
<script>
(function () {
'use strict';
var HLS_URL = {{ hls_url | tojson }};
var badge = document.getElementById('live-badge');
var container = document.getElementById('hls-player-container');
var video = document.getElementById('hls-video');
var hlsInstance = null;
function startHls() {
if (hlsInstance) return;
if (typeof Hls !== 'undefined' && Hls.isSupported()) {
hlsInstance = new Hls({ lowLatencyMode: true });
hlsInstance.loadSource(HLS_URL);
hlsInstance.attachMedia(video);
} else if (video.canPlayType('application/vnd.apple.mpegurl')) {
// Safari — native HLS support
video.src = HLS_URL;
}
}
function stopHls() {
if (hlsInstance) { hlsInstance.destroy(); hlsInstance = null; }
video.src = '';
}
function setLive(isLive) {
if (isLive) {
if (badge) badge.removeAttribute('hidden');
if (container) container.removeAttribute('hidden');
startHls();
} else {
if (badge) badge.setAttribute('hidden', '');
if (container) container.setAttribute('hidden', '');
stopHls();
}
}
// Server-rendered as live — start HLS immediately.
{% if live %}startHls();{% endif %}
// Poll /api/srt_status every 5 s to react to live state changes
// without opening a second Socket.IO connection (which would inflate
// the listener count).
setInterval(function () {
fetch('/api/srt_status')
.then(function (r) { return r.json(); })
.then(function (d) { setLive(!!d.broadcast_active); })
.catch(function () {});
}, 5000);
}());
</script>
</body>
</html>

385
server.py
View File

@ -10,7 +10,8 @@ import threading
import queue
import time
import collections
from flask import Flask, send_from_directory, jsonify, request, session, Response, stream_with_context, abort
import hmac
from flask import Flask, send_from_directory, jsonify, request, session, Response, stream_with_context, abort, render_template, make_response
from flask_socketio import SocketIO, emit
from dotenv import load_dotenv
load_dotenv()
@ -44,6 +45,21 @@ CONFIG_MAX_UPLOAD_MB = int(CONFIG.get('max_upload_mb') or 500)
CONFIG_DEBUG = bool(CONFIG.get('debug', False))
CONFIG_LISTENER_URL = (CONFIG.get('listener_url') or '').strip()
# MediaMTX / SRT integration
# secret shared with MediaMTX webhook requests (leave blank to disable auth)
_SRT_WEBHOOK_SECRET = (CONFIG.get('mediamtx_webhook_secret') or '').strip()
# RTSP URL MediaMTX exposes for the incoming SRT path (used by ffmpeg to pull audio)
_MEDIAMTX_RTSP_URL = (CONFIG.get('mediamtx_rtsp_url') or 'rtsp://127.0.0.1:8554/live').strip()
_MEDIAMTX_HLS_URL = (CONFIG.get('mediamtx_hls_url') or 'http://techy.music:8888/aussie_dj/index.m3u8').strip()
# Allowlist of DJ source IPs permitted to publish an SRT stream.
# Accepts a list of strings or a single string in config.json.
# Empty list / omitted = no restriction (allow any source IP).
_raw_srt_ips = CONFIG.get('srt_allowed_ips') or []
if isinstance(_raw_srt_ips, str):
_raw_srt_ips = [_raw_srt_ips]
_SRT_ALLOWED_IPS: set = {ip.strip() for ip in _raw_srt_ips if isinstance(ip, str) and ip.strip()}
DJ_PANEL_PASSWORD = (CONFIG.get('dj_panel_password') or '').strip()
DJ_AUTH_ENABLED = bool(DJ_PANEL_PASSWORD)
@ -51,6 +67,18 @@ DJ_AUTH_ENABLED = bool(DJ_PANEL_PASSWORD)
broadcast_state = {
'active': False,
}
# SRT stream state — updated by the /api/webhook MediaMTX callback
_srt_state = {
'active': False,
'path': None,
'source_id': None,
'started_at': None,
}
# ffmpeg process that pulls audio from MediaMTX's RTSP output for an SRT session
_srt_ffmpeg_proc = None
listener_sids = set()
dj_sids = set()
@ -69,14 +97,37 @@ DJ_GRACE_PERIOD_SECS = 20 # seconds to wait before auto-stopping
# === Optional MP3 fallback stream (server-side transcoding) ===
_ffmpeg_proc = None
_ffmpeg_in_q = queue.Queue(maxsize=20)
_ffmpeg_in_q = queue.Queue(maxsize=200)
_current_bitrate = (CONFIG.get('stream_bitrate') or '192k').strip()
_mp3_clients = set() # set[queue.Queue]
_mp3_lock = threading.Lock()
_transcoder_bytes_out = 0
_transcoder_last_error = None
_last_audio_chunk_ts = 0.0
_mp3_preroll = collections.deque(maxlen=1024) # ~83s at 96kbps for fast reconnect buffer fill
_mp3_preroll = collections.deque() # byte-budget FIFO; evicted from the front when over limit
_mp3_preroll_bytes = 0
_MP3_PREROLL_MAX_BYTES = 256 * 1024 # 256 KB ≈ 10 s at 192 kbps — enough for smooth reconnects
def _preroll_append(data: bytes) -> None:
"""Add a chunk to the pre-roll buffer, evicting the oldest data from the
front when the byte budget is exceeded.
MUST be called with _mp3_lock already held.
"""
global _mp3_preroll_bytes
_mp3_preroll.append(data)
_mp3_preroll_bytes += len(data)
while _mp3_preroll_bytes > _MP3_PREROLL_MAX_BYTES and _mp3_preroll:
_mp3_preroll_bytes -= len(_mp3_preroll.popleft())
def _preroll_clear() -> None:
"""Empty the pre-roll buffer and reset its byte counter.
MUST be called with _mp3_lock already held.
"""
global _mp3_preroll_bytes
_mp3_preroll.clear()
_mp3_preroll_bytes = 0
def _start_transcoder_if_needed(is_mp3_input=False):
@ -188,7 +239,7 @@ def _start_transcoder_if_needed(is_mp3_input=False):
_transcoder_bytes_out += len(data)
with _mp3_lock:
_mp3_preroll.append(data)
_preroll_append(data)
clients = list(_mp3_clients)
for q in clients:
@ -216,32 +267,133 @@ def _start_transcoder_if_needed(is_mp3_input=False):
def _stop_transcoder():
global _ffmpeg_proc
print("STOPPING: Transcoder process")
# Signal threads to stop via the queue
try: _ffmpeg_in_q.put_nowait(None)
except: pass
# Shutdown the process
proc = _ffmpeg_proc
_ffmpeg_proc = None
_ffmpeg_proc = None # Null first so _feed_transcoder stops enqueuing
# Drain stale input chunks so the None sentinel is guaranteed to fit and
# the writer greenlet exits on its next get() instead of stalling.
while True:
try: _ffmpeg_in_q.get_nowait()
except queue.Empty: break
try: _ffmpeg_in_q.put_nowait(None)
except queue.Full: pass
if proc:
try:
proc.terminate()
# Drain stdout/stderr to satisfy OS buffers
proc.communicate(timeout=1.0)
except:
try: proc.kill()
except: pass
# Clear client state
with _mp3_lock:
clients = list(_mp3_clients)
for q in clients:
try: q.put_nowait(None)
except: pass
_mp3_clients.clear()
_mp3_preroll.clear()
_preroll_clear()
# === SRT / MediaMTX transcoder ===
# When MediaMTX receives the incoming SRT stream it exposes it as an RTSP endpoint.
# We launch a dedicated ffmpeg process that *pulls* from that RTSP URL and feeds the
# same _mp3_clients / _mp3_preroll distribution system used by all other stream modes.
def _start_srt_transcoder():
"""Start an ffmpeg process that pulls from MediaMTX's RTSP output."""
global _srt_ffmpeg_proc, _transcoder_last_error
if _srt_ffmpeg_proc is not None and _srt_ffmpeg_proc.poll() is None:
return # Already running
if _srt_ffmpeg_proc:
try: _srt_ffmpeg_proc.terminate()
except: pass
_srt_ffmpeg_proc = None
cmd = [
'ffmpeg',
'-hide_banner', '-loglevel', 'error',
'-fflags', 'nobuffer', '-flags', 'low_delay',
'-rtsp_transport', 'tcp',
'-i', _MEDIAMTX_RTSP_URL,
'-vn',
'-acodec', 'libmp3lame',
'-b:a', _current_bitrate,
'-flush_packets', '1',
'-f', 'mp3', 'pipe:1',
]
try:
_srt_ffmpeg_proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
)
except FileNotFoundError:
_srt_ffmpeg_proc = None
print('WARNING: ffmpeg not found; SRT→MP3 bridge disabled')
return
print(f'INFO: SRT transcoder started — pulling from {_MEDIAMTX_RTSP_URL} @ {_current_bitrate}')
_transcoder_last_error = None
def _srt_reader(proc):
global _transcoder_bytes_out, _transcoder_last_error
print(f'[THREAD] SRT reader started (PID: {proc.pid})')
while proc.poll() is None:
try:
data = eventlet.tpool.execute(proc.stdout.read, 4096)
if not data:
break
_transcoder_bytes_out += len(data)
with _mp3_lock:
_preroll_append(data)
clients = list(_mp3_clients)
for q in clients:
try:
q.put_nowait(data)
except queue.Full:
pass
except Exception as e:
print(f'WARNING: SRT reader error: {e}')
_transcoder_last_error = str(e)
break
if proc.poll() is None:
try: proc.terminate()
except: pass
print(f'[THREAD] SRT reader finished (PID: {proc.pid})')
eventlet.spawn(_srt_reader, _srt_ffmpeg_proc)
def _stop_srt_transcoder():
"""Terminate the SRT→MP3 ffmpeg process and flush all listener queues."""
global _srt_ffmpeg_proc
print('STOPPING: SRT transcoder')
proc = _srt_ffmpeg_proc
_srt_ffmpeg_proc = None
if proc:
try:
proc.terminate()
proc.communicate(timeout=2.0)
except Exception:
try: proc.kill()
except: pass
with _mp3_lock:
clients = list(_mp3_clients)
for q in clients:
try: q.put_nowait(None)
except: pass
_mp3_clients.clear()
_preroll_clear()
def _feed_transcoder(data: bytes):
@ -273,7 +425,7 @@ def _distribute_mp3(data: bytes):
_last_audio_chunk_ts = time.time()
_transcoder_bytes_out += len(data)
with _mp3_lock:
_mp3_preroll.append(data)
_preroll_append(data)
for q in list(_mp3_clients):
try:
q.put_nowait(data)
@ -488,16 +640,16 @@ def setup_shared_routes(app, index_file='index.html'):
'Cache-Control': 'no-cache, no-store',
})
preroll_count = len(_mp3_preroll)
print(f"LISTENER: New listener joined stream (Pre-roll: {preroll_count} chunks)")
client_q = queue.Queue(maxsize=500)
with _mp3_lock:
preroll_bytes = _mp3_preroll_bytes
for chunk in _mp3_preroll:
try:
client_q.put_nowait(chunk)
except Exception:
break
_mp3_clients.add(client_q)
print(f"LISTENER: New listener joined stream (Pre-roll: {preroll_bytes} bytes)")
def gen():
idle_checks = 0
@ -542,6 +694,8 @@ def setup_shared_routes(app, index_file='index.html'):
'ffmpeg_running': running,
'ffmpeg_found': (proc is not None),
'mp3_clients': len(_mp3_clients),
'preroll_chunks': len(_mp3_preroll),
'preroll_bytes': _mp3_preroll_bytes,
'transcoder_bytes_out': _transcoder_bytes_out,
'transcoder_last_error': _transcoder_last_error,
'last_audio_chunk_ts': _last_audio_chunk_ts,
@ -734,7 +888,7 @@ def dj_start(data=None):
if not was_already_active:
# Fresh broadcast start — clear pre-roll and reset announcement flag.
with _mp3_lock:
_mp3_preroll.clear()
_preroll_clear()
if is_mp3_input:
# MP3-direct mode (Qt client): the DJ still needs to press play before
@ -791,6 +945,11 @@ def dj_now_playing(data):
"""Relay the currently playing track title to all listener pages."""
if not isinstance(data, dict):
return
# Track changed — flush the pre-roll so listeners who reconnect (e.g. after
# a stall watchdog trigger) start at the current song, not a replay of the
# previous one. This is the primary guard against the audio-loop symptom.
with _mp3_lock:
_preroll_clear()
listener_socketio.emit('now_playing', {
'title': str(data.get('title', '')),
'deck': str(data.get('deck', '')),
@ -834,11 +993,27 @@ def dj_audio(data):
# static_folder=None prevents Flask's built-in static handler from serving
# DJ files (like index.html) at /<path> — all static files go through our
# custom serve_static route which has security checks.
listener_app = Flask(__name__, static_folder=None)
listener_app = Flask(__name__, static_folder=None, template_folder='.')
listener_app.config['SECRET_KEY'] = CONFIG_SECRET + '_listener'
listener_app.config['MAX_CONTENT_LENGTH'] = CONFIG_MAX_UPLOAD_MB * 1024 * 1024
setup_shared_routes(listener_app, index_file='listener.html')
def _listener_home():
"""Serve listener.html as a Jinja2 template so live context is injected."""
ctx = _listener_template_context()
response = make_response(render_template('listener.html', **ctx))
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
# Replace the static file serve registered by setup_shared_routes with the
# template-rendering version so Jinja2 context is injected on every page load.
listener_app.view_functions['index'] = _listener_home
# Block write/admin endpoints AND DJ-only files on the listener server
@listener_app.before_request
def _restrict_listener_routes():
@ -905,6 +1080,163 @@ def listener_join():
def listener_get_count():
emit('listener_count', {'count': len(listener_sids)})
# === MediaMTX webhook — SRT publish / unpublish events ===
def _listener_template_context() -> dict:
"""Return Jinja2 template context for listener.html.
Keys:
live True when the SRT stream is currently active.
hls_url MediaMTX HLS playlist URL for the video player.
"""
return {
'live': _srt_state['active'],
'hls_url': _MEDIAMTX_HLS_URL,
}
@listener_app.route('/api/webhook', methods=['POST'])
def mediamtx_webhook():
"""Receive publish/unpublish events from MediaMTX.
MediaMTX path config example (mediamtx.yml):
paths:
live:
runOnPublish: 'curl -s -X POST http://127.0.0.1:5001/api/webhook -H "Content-Type: application/json" -d \'{"event":"publish","path":"%MTX_PATH%","id":"%MTX_ID%"}\''
runOnUnpublish: 'curl -s -X POST http://127.0.0.1:5001/api/webhook -H "Content-Type: application/json" -d \'{"event":"unpublish","path":"%MTX_PATH%","id":"%MTX_ID%"}\''
Or if using MediaMTX >= 1.x webhook integration, point:
api > hooks > [publish|unpublish] > url: http://127.0.0.1:5001/api/webhook
"""
global _mp3_broadcast_announced
# ── Localhost-only: MediaMTX is co-located on the same server ──────────
if request.remote_addr not in ('127.0.0.1', '::1'):
abort(403)
# ── Optional shared-secret authentication ──────────────────────────────
# Set mediamtx_webhook_secret in config.json and pass the same value in
# the X-MediaMTX-Webhook-Secret request header (or as ?secret= query arg).
if _SRT_WEBHOOK_SECRET:
provided = (
request.headers.get('X-MediaMTX-Webhook-Secret')
or request.args.get('secret', '')
)
# Constant-time comparison prevents timing side-channels.
if not hmac.compare_digest(provided, _SRT_WEBHOOK_SECRET):
abort(403)
data = request.get_json(silent=True) or {}
# Support both JSON body fields and legacy query-string parameters.
event = (data.get('event') or request.args.get('event', '')).lower().strip()
path = (data.get('path') or data.get('MTX_PATH', '') or request.args.get('path', '')).strip()
source_id = (data.get('id') or data.get('sourceID', '') or '').strip()
# ── Source-IP allowlist (mediamtx >= 1.x sends source.remoteAddr) ──────
# Extract publisher IP from the nested source object MediaMTX sends in
# its native webhook payload. curl-based runOnPublish callers won't
# include this field, so we fall through silently when it's absent.
raw_remote = ''
if isinstance(data.get('source'), dict):
raw_remote = data['source'].get('remoteAddr', '')
# remoteAddr is "ip:port" — strip the port
publisher_ip = raw_remote.rsplit(':', 1)[0].strip('[]') if raw_remote else ''
if _SRT_ALLOWED_IPS and publisher_ip and publisher_ip not in _SRT_ALLOWED_IPS:
print(f"SRT WEBHOOK: Rejected publish from unauthorized IP '{publisher_ip}'")
return jsonify({'ok': False, 'error': 'source IP not in allowlist'}), 403
if event == 'publish':
_srt_state.update({
'active': True,
'path': path,
'source_id': source_id,
'started_at': time.time(),
})
broadcast_state['active'] = True
broadcast_state['is_mp3_input'] = False
_mp3_broadcast_announced = True # SRT audio starts flowing immediately
print(f"SRT: Stream PUBLISHED — path='{path}' source='{source_id}' ip='{publisher_ip or "unknown"}'")
with _mp3_lock:
_preroll_clear()
_start_srt_transcoder()
listener_socketio.emit('broadcast_started', namespace='/')
listener_socketio.emit('stream_status', {'active': True}, namespace='/')
dj_socketio.emit( 'stream_status', {'active': True, 'source': 'srt'}, namespace='/')
return jsonify({'ok': True, 'event': 'publish', 'path': path})
if event == 'unpublish':
_srt_state.update({
'active': False,
'path': None,
'source_id': None,
'started_at': None,
})
broadcast_state['active'] = False
broadcast_state['is_mp3_input'] = False
_mp3_broadcast_announced = False
print(f"SRT: Stream UNPUBLISHED — path='{path}'")
_stop_srt_transcoder()
listener_socketio.emit('broadcast_stopped', namespace='/')
listener_socketio.emit('stream_status', {'active': False}, namespace='/')
dj_socketio.emit( 'stream_status', {'active': False, 'source': 'srt'}, namespace='/')
return jsonify({'ok': True, 'event': 'unpublish', 'path': path})
return jsonify({'ok': False, 'error': f"Unknown or missing 'event' field: '{event}'"}), 400
@listener_app.route('/api/srt_auth', methods=['POST'])
def srt_auth():
"""MediaMTX externalAuthenticationURL handler.
MediaMTX calls this endpoint for every new SRT (and RTSP/HLS) connection
attempt. Returning HTTP 200 allows the connection; anything else rejects it.
mediamtx.yml:
externalAuthenticationURL: http://127.0.0.1:5001/api/srt_auth
MediaMTX sends JSON like:
{"ip": "1.2.3.4", "action": "publish", "protocol": "srt",
"path": "aussie_dj", "user": "", "password": "", "query": "", "id": "..."}
"""
# Only MediaMTX (localhost) should be calling this
if request.remote_addr not in ('127.0.0.1', '::1'):
abort(403)
data = request.get_json(silent=True) or {}
action = (data.get('action') or '').strip().lower()
protocol = (data.get('protocol') or '').strip().lower()
client_ip = (data.get('ip') or '').strip()
# Only gate SRT publish actions — allow reads and other protocols through.
if action == 'publish' and protocol == 'srt':
if _SRT_ALLOWED_IPS and client_ip not in _SRT_ALLOWED_IPS:
print(f"SRT AUTH: Blocked '{client_ip}' — not in srt_allowed_ips")
return '', 403
print(f"SRT AUTH: Allowed '{client_ip}'")
return '', 200
@listener_app.route('/api/srt_status')
def srt_status():
"""Polling endpoint — returns current SRT / broadcast live state."""
proc = _srt_ffmpeg_proc
return jsonify({
'srt_active': _srt_state['active'],
'srt_path': _srt_state['path'],
'srt_source_id': _srt_state['source_id'],
'srt_started_at': _srt_state['started_at'],
'broadcast_active': broadcast_state.get('active', False),
'srt_transcoder_running': proc is not None and proc.poll() is None,
'srt_allowed_ips': sorted(_SRT_ALLOWED_IPS) if _SRT_ALLOWED_IPS else 'any',
})
# DJ Panel Routes (No engine commands needed in local mode)
def _transcoder_watchdog():
"""Periodic check to ensure the ffmpeg transcoder stays alive.
@ -913,9 +1245,16 @@ def _transcoder_watchdog():
while True:
is_mp3_direct = broadcast_state.get('is_mp3_input', False)
if broadcast_state.get('active') and not is_mp3_direct:
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
print("WARNING: Watchdog: Transcoder dead during active broadcast, reviving...")
_start_transcoder_if_needed(is_mp3_input=False)
# Browser WebM/opus path — watchdog for the push-based transcoder
if _srt_state['active']:
# SRT path — watchdog for the pull-based SRT transcoder
if _srt_ffmpeg_proc is None or _srt_ffmpeg_proc.poll() is not None:
print('WARNING: Watchdog: SRT transcoder dead during active stream, reviving...')
_start_srt_transcoder()
else:
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
print('WARNING: Watchdog: Transcoder dead during active broadcast, reviving...')
_start_transcoder_if_needed(is_mp3_input=False)
eventlet.sleep(5)