Update TechDJ: fix viewer controls, boost listener volume, improve deck sync

This commit is contained in:
3nd3r
2026-01-04 08:14:52 -06:00
parent e4f27c012d
commit 4a1844ae1b
4 changed files with 860 additions and 87 deletions

510
server.py
View File

@@ -38,16 +38,87 @@ DJ_AUTH_ENABLED = bool(DJ_PANEL_PASSWORD)
# Relay State
broadcast_state = {
'active': False,
'remote_relay': False,
'server_mix': False,
}
listener_sids = set()
dj_sids = set()
# DJ identity mapping (for auto-reclaim)
dj_identity_by_sid: dict[str, str] = {}
last_controller_identity: str | None = None
last_controller_released_at: float = 0.0
# === Multi-DJ controller lock (one active controller at a time) ===
active_controller_sid = None
def _emit_controller_status(to_sid: str | None = None):
payload = {
'controller_active': active_controller_sid is not None,
'controller_sid': active_controller_sid,
}
if to_sid:
payload['you_are_controller'] = (to_sid == active_controller_sid)
dj_socketio.emit('controller_status', payload, to=to_sid, namespace='/')
else:
# Send individualized payload so each DJ can reliably know whether they are the controller.
for sid in list(dj_sids):
dj_socketio.emit(
'controller_status',
{
**payload,
'you_are_controller': (sid == active_controller_sid),
},
to=sid,
namespace='/',
)
def _deny_if_not_controller() -> bool:
"""Returns True if caller is NOT the controller and was denied."""
if active_controller_sid is None:
dj_socketio.emit('error', {'message': 'No active DJ controller. Click Take Control.'}, to=request.sid)
return True
if request.sid != active_controller_sid:
dj_socketio.emit('error', {'message': 'Control is currently held by another DJ'}, to=request.sid)
return True
return False
def _sid_identity(sid: str) -> str | None:
return dj_identity_by_sid.get(sid)
# === Server-side mixer state (authoritative UI sync) ===
def _default_deck_state():
return {
'filename': None,
'duration': 0.0,
'position': 0.0,
'playing': False,
'pitch': 1.0,
'volume': 0.8,
'eq': {'low': 0.0, 'mid': 0.0, 'high': 0.0},
# Internal anchors for time interpolation
'_started_at': None,
'_started_pos': 0.0,
}
mixer_state = {
'deck_a': _default_deck_state(),
'deck_b': _default_deck_state(),
'crossfader': 50,
}
# === Optional MP3 fallback stream (server-side transcoding) ===
# This allows listeners on browsers that don't support WebM/Opus via MediaSource
# (notably some Safari / locked-down environments) to still hear the stream.
_ffmpeg_proc = None
_ffmpeg_in_q = queue.Queue(maxsize=200)
_mp3_clients = set() # set[queue.Queue]
_ffmpeg_in_q = eventlet.queue.LightQueue(maxsize=200)
_mp3_clients = set() # set[eventlet.queue.LightQueue]
_mp3_lock = threading.Lock()
_transcode_threads_started = False
_transcoder_bytes_out = 0
@@ -55,6 +126,9 @@ _transcoder_last_error = None
_last_audio_chunk_ts = 0.0
_remote_stream_url = None # For relaying remote streams
_mix_restart_timer = None
_mix_restart_lock = threading.Lock()
def _start_transcoder_if_needed():
global _ffmpeg_proc, _transcode_threads_started
@@ -62,12 +136,105 @@ def _start_transcoder_if_needed():
if _ffmpeg_proc is not None and _ffmpeg_proc.poll() is None:
return
if _remote_stream_url:
# Remote relay mode: input from URL
def _safe_float(val, default=0.0):
try:
return float(val)
except Exception:
return float(default)
def _clamp(val, lo, hi):
return max(lo, min(hi, val))
def _deck_runtime_position(d: dict) -> float:
if not d.get('playing'):
return _safe_float(d.get('position'), 0.0)
started_at = d.get('_started_at')
started_pos = _safe_float(d.get('_started_pos'), _safe_float(d.get('position'), 0.0))
if started_at is None:
return _safe_float(d.get('position'), 0.0)
pitch = _safe_float(d.get('pitch'), 1.0)
return max(0.0, started_pos + (time.time() - _safe_float(started_at)) * pitch)
def _ffmpeg_atempo_chain(speed: float) -> str:
# atempo supports ~[0.5, 2.0] per filter; clamp for now
s = _clamp(speed, 0.5, 2.0)
return f"atempo={s:.4f}"
def _build_server_mix_cmd() -> list[str]:
# Always include an infinite silent input so ffmpeg never exits.
silence_src = 'anullsrc=channel_layout=stereo:sample_rate=44100'
deck_a = mixer_state['deck_a']
deck_b = mixer_state['deck_b']
cf = int(mixer_state.get('crossfader', 50))
# Volumes: crossfader scaling * per-deck volume
cf_a = (100 - cf) / 100.0
cf_b = cf / 100.0
vol_a = _clamp(_safe_float(deck_a.get('volume'), 0.8) * cf_a, 0.0, 1.5)
vol_b = _clamp(_safe_float(deck_b.get('volume'), 0.8) * cf_b, 0.0, 1.5)
# Source selection
def _input_args(deck: dict) -> list[str]:
fn = deck.get('filename')
if fn and deck.get('playing'):
pos = _deck_runtime_position(deck)
path = os.path.join(os.getcwd(), fn)
return ['-re', '-ss', f"{pos:.3f}", '-i', path]
return ['-re', '-f', 'lavfi', '-i', silence_src]
cmd = [
'ffmpeg',
'-hide_banner',
'-loglevel', 'error',
*_input_args(deck_a),
*_input_args(deck_b),
'-re', '-f', 'lavfi', '-i', silence_src,
]
# Filters per deck
def _deck_filters(deck: dict, vol: float) -> str:
parts = [f"volume={vol:.4f}"]
eq = deck.get('eq') or {}
low = _clamp(_safe_float(eq.get('low'), 0.0), -20.0, 20.0)
mid = _clamp(_safe_float(eq.get('mid'), 0.0), -20.0, 20.0)
high = _clamp(_safe_float(eq.get('high'), 0.0), -20.0, 20.0)
# Use octave width (o) so it's somewhat musical.
if abs(low) > 0.001:
parts.append(f"equalizer=f=320:width_type=o:width=1:g={low:.2f}")
if abs(mid) > 0.001:
parts.append(f"equalizer=f=1000:width_type=o:width=1:g={mid:.2f}")
if abs(high) > 0.001:
parts.append(f"equalizer=f=3200:width_type=o:width=1:g={high:.2f}")
return ','.join(parts)
fc = (
f"[0:a]{_deck_filters(deck_a, vol_a)}[a0];"
f"[1:a]{_deck_filters(deck_b, vol_b)}[a1];"
f"[2:a]volume=0[sil];"
f"[a0][a1][sil]amix=inputs=3:duration=longest:dropout_transition=0[m]"
)
cmd += [
'-filter_complex', fc,
'-map', '[m]',
'-vn',
'-ac', '2',
'-ar', '44100',
'-acodec', 'libmp3lame',
'-b:a', '192k',
'-f', 'mp3',
'pipe:1',
]
return cmd
if _remote_stream_url:
cmd = [
'ffmpeg',
'-hide_banner',
'-loglevel', 'error',
'-re',
'-i', _remote_stream_url,
'-vn',
'-acodec', 'libmp3lame',
@@ -75,8 +242,10 @@ def _start_transcoder_if_needed():
'-f', 'mp3',
'pipe:1',
]
elif broadcast_state.get('server_mix'):
cmd = _build_server_mix_cmd()
else:
# Local broadcast mode: input from pipe
# Local browser-broadcast mode: input from pipe
cmd = [
'ffmpeg',
'-hide_banner',
@@ -89,10 +258,13 @@ def _start_transcoder_if_needed():
'pipe:1',
]
needs_stdin = (not _remote_stream_url) and (not broadcast_state.get('server_mix'))
try:
if _remote_stream_url:
if needs_stdin:
_ffmpeg_proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
@@ -100,7 +272,6 @@ def _start_transcoder_if_needed():
else:
_ffmpeg_proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
@@ -110,14 +281,15 @@ def _start_transcoder_if_needed():
print('⚠️ ffmpeg not found; /stream.mp3 fallback disabled')
return
print(f'🎛️ ffmpeg transcoder started for /stream.mp3 ({ "remote relay" if _remote_stream_url else "local broadcast" })')
mode = 'remote relay' if _remote_stream_url else ('server mix' if broadcast_state.get('server_mix') else 'local broadcast')
print(f'🎛️ ffmpeg transcoder started for /stream.mp3 ({mode})')
def _writer():
global _transcoder_last_error
while True:
chunk = _ffmpeg_in_q.get()
if chunk is None:
break
continue
proc = _ffmpeg_proc
if proc is None or proc.stdin is None:
continue
@@ -152,18 +324,14 @@ def _start_transcoder_if_needed():
pass
if not _transcode_threads_started:
threading.Thread(target=_writer, daemon=True).start()
threading.Thread(target=_reader, daemon=True).start()
eventlet.spawn_n(_writer)
_transcode_threads_started = True
eventlet.spawn_n(_reader)
def _stop_transcoder():
global _ffmpeg_proc
try:
_ffmpeg_in_q.put_nowait(None)
except Exception:
pass
proc = _ffmpeg_proc
_ffmpeg_proc = None
if proc is None:
@@ -174,9 +342,30 @@ def _stop_transcoder():
pass
def _schedule_mix_restart():
global _mix_restart_timer
if not broadcast_state.get('active') or not broadcast_state.get('server_mix'):
return
if _remote_stream_url:
return
with _mix_restart_lock:
if _mix_restart_timer is not None:
try:
_mix_restart_timer.cancel()
except Exception:
pass
def _do():
# Restart ffmpeg so changes apply.
_stop_transcoder()
_start_transcoder_if_needed()
_mix_restart_timer = eventlet.spawn_after(0.20, _do)
def _feed_transcoder(data: bytes):
global _last_audio_chunk_ts
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None or _remote_stream_url:
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None or _remote_stream_url or broadcast_state.get('server_mix'):
return
_last_audio_chunk_ts = time.time()
try:
@@ -313,7 +502,7 @@ def setup_shared_routes(app):
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
return jsonify({"success": False, "error": "MP3 stream not available"}), 503
client_q: queue.Queue = queue.Queue(maxsize=200)
client_q = eventlet.queue.LightQueue(maxsize=200)
with _mp3_lock:
_mp3_clients.add(client_q)
@@ -473,39 +662,120 @@ def dj_connect():
print(f"🎧 DJ connected: {request.sid}")
dj_sids.add(request.sid)
# Send controller status + current stream status to the new DJ
_emit_controller_status(to_sid=request.sid)
dj_socketio.emit('mixer_status', {
'deck_a': _public_deck_state(mixer_state['deck_a']),
'deck_b': _public_deck_state(mixer_state['deck_b']),
'crossfader': mixer_state.get('crossfader', 50),
}, to=request.sid, namespace='/')
dj_socketio.emit('stream_status', {
'active': broadcast_state.get('active', False),
'remote_relay': bool(broadcast_state.get('remote_relay', False)),
'server_mix': bool(broadcast_state.get('server_mix', False)),
}, to=request.sid, namespace='/')
@dj_socketio.on('disconnect')
def dj_disconnect():
dj_sids.discard(request.sid)
ident = dj_identity_by_sid.get(request.sid)
dj_identity_by_sid.pop(request.sid, None)
global active_controller_sid
was_controller = (request.sid == active_controller_sid)
if was_controller:
global last_controller_identity, last_controller_released_at
last_controller_identity = ident
last_controller_released_at = time.time()
active_controller_sid = None
print("🧑‍✈️ DJ controller disconnected; control released")
_emit_controller_status()
print("⚠️ DJ disconnected - broadcast will continue until manually stopped")
@dj_socketio.on('dj_identity')
def dj_identity(data):
"""Associate a stable client identity with this socket and optionally auto-reclaim control."""
global active_controller_sid
ident = (data or {}).get('id')
auto_reclaim = bool((data or {}).get('auto_reclaim'))
if not ident or not isinstance(ident, str):
return
dj_identity_by_sid[request.sid] = ident
# Auto-reclaim: only if no controller exists AND you were the last controller.
if auto_reclaim and active_controller_sid is None:
if last_controller_identity and ident == last_controller_identity:
active_controller_sid = request.sid
print(f"🧑‍✈️ Auto-reclaimed control for identity: {ident}")
_emit_controller_status()
@dj_socketio.on('take_control')
def dj_take_control():
global active_controller_sid
if active_controller_sid is not None and active_controller_sid != request.sid:
dj_socketio.emit('error', {'message': 'Control is currently held by another DJ'}, to=request.sid)
_emit_controller_status(to_sid=request.sid)
return
active_controller_sid = request.sid
global last_controller_identity
last_controller_identity = _sid_identity(request.sid)
print(f"🧑‍✈️ DJ took control: {request.sid}")
_emit_controller_status()
def stop_broadcast_after_timeout():
"""No longer used - broadcasts don't auto-stop"""
pass
@dj_socketio.on('start_broadcast')
def dj_start(data=None):
if _deny_if_not_controller():
return
global _remote_stream_url
_remote_stream_url = None
broadcast_state['active'] = True
broadcast_state['remote_relay'] = False
broadcast_state['server_mix'] = True
session['is_dj'] = True
print("🎙️ Broadcast -> ACTIVE")
_start_transcoder_if_needed()
dj_socketio.emit('stream_status', {
'active': True,
'remote_relay': False,
'server_mix': True,
}, namespace='/')
listener_socketio.emit('broadcast_started', namespace='/')
listener_socketio.emit('stream_status', {'active': True}, namespace='/')
listener_socketio.emit('stream_status', {
'active': True,
'remote_relay': bool(broadcast_state.get('remote_relay', False)),
'server_mix': bool(broadcast_state.get('server_mix', False)),
}, namespace='/')
@dj_socketio.on('stop_broadcast')
def dj_stop():
if _deny_if_not_controller():
return
broadcast_state['active'] = False
session['is_dj'] = False
print("🛑 DJ stopped broadcasting")
broadcast_state['remote_relay'] = False
broadcast_state['server_mix'] = False
_stop_transcoder()
dj_socketio.emit('stream_status', {'active': False, 'remote_relay': False, 'server_mix': False}, namespace='/')
listener_socketio.emit('broadcast_stopped', namespace='/')
listener_socketio.emit('stream_status', {'active': False}, namespace='/')
listener_socketio.emit('stream_status', {'active': False, 'remote_relay': False, 'server_mix': False}, namespace='/')
@dj_socketio.on('start_remote_relay')
def dj_start_remote_relay(data):
if _deny_if_not_controller():
return
global _remote_stream_url
url = data.get('url', '').strip()
if not url:
@@ -519,32 +789,40 @@ def dj_start_remote_relay(data):
_remote_stream_url = url
broadcast_state['active'] = True
broadcast_state['remote_relay'] = True
broadcast_state['server_mix'] = False
session['is_dj'] = True
print(f"🔗 Starting remote relay from: {url}")
_start_transcoder_if_needed()
dj_socketio.emit('stream_status', {'active': True, 'remote_relay': True, 'server_mix': False}, namespace='/')
listener_socketio.emit('broadcast_started', namespace='/')
listener_socketio.emit('stream_status', {'active': True, 'remote_relay': True}, namespace='/')
listener_socketio.emit('stream_status', {'active': True, 'remote_relay': True, 'server_mix': False}, namespace='/')
@dj_socketio.on('stop_remote_relay')
def dj_stop_remote_relay():
if _deny_if_not_controller():
return
global _remote_stream_url
_remote_stream_url = None
broadcast_state['active'] = False
broadcast_state['remote_relay'] = False
broadcast_state['server_mix'] = False
session['is_dj'] = False
print("🛑 Remote relay stopped")
_stop_transcoder()
dj_socketio.emit('stream_status', {'active': False, 'remote_relay': False, 'server_mix': False}, namespace='/')
listener_socketio.emit('broadcast_stopped', namespace='/')
listener_socketio.emit('stream_status', {'active': False}, namespace='/')
listener_socketio.emit('stream_status', {'active': False, 'remote_relay': False, 'server_mix': False}, namespace='/')
@dj_socketio.on('audio_chunk')
def dj_audio(data):
# MP3-only mode: do not relay raw chunks to listeners; feed transcoder only.
if broadcast_state['active']:
if broadcast_state['active'] and not broadcast_state.get('server_mix'):
# Ensure MP3 fallback transcoder is running (if ffmpeg is installed)
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
_start_transcoder_if_needed()
@@ -589,7 +867,11 @@ def listener_join():
listener_socketio.emit('listener_count', {'count': count}, namespace='/')
dj_socketio.emit('listener_count', {'count': count}, namespace='/')
emit('stream_status', {'active': broadcast_state['active']})
emit('stream_status', {
'active': broadcast_state.get('active', False),
'remote_relay': bool(broadcast_state.get('remote_relay', False)),
'server_mix': bool(broadcast_state.get('server_mix', False)),
})
@listener_socketio.on('get_listener_count')
def listener_get_count():
@@ -598,7 +880,185 @@ def listener_get_count():
# DJ Panel Routes (No engine commands needed in local mode)
@dj_socketio.on('get_mixer_status')
def get_mixer_status():
pass
emit('mixer_status', {
'deck_a': _public_deck_state(mixer_state['deck_a']),
'deck_b': _public_deck_state(mixer_state['deck_b']),
'crossfader': mixer_state.get('crossfader', 50),
})
def _ffprobe_duration_seconds(path: str) -> float:
try:
out = subprocess.check_output(
['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=nw=1:nk=1', path],
stderr=subprocess.DEVNULL,
)
return float(out.decode('utf-8').strip() or 0.0)
except Exception:
return 0.0
def _deck_key(deck: str) -> str:
return 'deck_a' if deck == 'A' else 'deck_b'
def _deck_current_position(d: dict) -> float:
if not d.get('playing'):
return float(d.get('position') or 0.0)
started_at = d.get('_started_at')
started_pos = float(d.get('_started_pos') or 0.0)
if started_at is None:
return float(d.get('position') or 0.0)
pitch = float(d.get('pitch') or 1.0)
return max(0.0, started_pos + (time.time() - float(started_at)) * pitch)
def _public_deck_state(d: dict) -> dict:
out = {k: v for k, v in d.items() if not k.startswith('_')}
out['position'] = _deck_current_position(d)
return out
def _broadcast_mixer_status():
dj_socketio.emit('mixer_status', {
'deck_a': _public_deck_state(mixer_state['deck_a']),
'deck_b': _public_deck_state(mixer_state['deck_b']),
'crossfader': mixer_state.get('crossfader', 50),
}, namespace='/')
_schedule_mix_restart()
@dj_socketio.on('audio_load_track')
def audio_load_track(data):
if _deny_if_not_controller():
return
deck = (data or {}).get('deck')
filename = (data or {}).get('filename')
if deck not in ('A', 'B') or not filename:
dj_socketio.emit('error', {'message': 'Invalid load request'}, to=request.sid)
return
path = os.path.join(MUSIC_FOLDER, filename)
if not os.path.exists(path):
dj_socketio.emit('error', {'message': f'Track not found: {filename}'}, to=request.sid)
return
key = _deck_key(deck)
d = mixer_state[key]
d['filename'] = f"music/{filename}"
d['duration'] = _ffprobe_duration_seconds(path)
d['position'] = 0.0
d['playing'] = False
d['_started_at'] = None
d['_started_pos'] = 0.0
_broadcast_mixer_status()
@dj_socketio.on('audio_play')
def audio_play(data):
if _deny_if_not_controller():
return
deck = (data or {}).get('deck')
if deck not in ('A', 'B'):
return
d = mixer_state[_deck_key(deck)]
if not d.get('filename'):
dj_socketio.emit('error', {'message': f'No track loaded on Deck {deck}'}, to=request.sid)
return
# Anchor for interpolation
d['position'] = _deck_current_position(d)
d['playing'] = True
d['_started_at'] = time.time()
d['_started_pos'] = float(d['position'])
_broadcast_mixer_status()
@dj_socketio.on('audio_pause')
def audio_pause(data):
if _deny_if_not_controller():
return
deck = (data or {}).get('deck')
if deck not in ('A', 'B'):
return
d = mixer_state[_deck_key(deck)]
d['position'] = _deck_current_position(d)
d['playing'] = False
d['_started_at'] = None
d['_started_pos'] = float(d['position'])
_broadcast_mixer_status()
@dj_socketio.on('audio_seek')
def audio_seek(data):
if _deny_if_not_controller():
return
deck = (data or {}).get('deck')
pos = float((data or {}).get('position') or 0.0)
if deck not in ('A', 'B'):
return
d = mixer_state[_deck_key(deck)]
d['position'] = max(0.0, pos)
if d.get('playing'):
d['_started_at'] = time.time()
d['_started_pos'] = float(d['position'])
_broadcast_mixer_status()
@dj_socketio.on('audio_set_volume')
def audio_set_volume(data):
if _deny_if_not_controller():
return
deck = (data or {}).get('deck')
vol = float((data or {}).get('volume') or 0.0)
if deck not in ('A', 'B'):
return
d = mixer_state[_deck_key(deck)]
d['volume'] = max(0.0, min(1.0, vol))
_broadcast_mixer_status()
@dj_socketio.on('audio_set_pitch')
def audio_set_pitch(data):
if _deny_if_not_controller():
return
deck = (data or {}).get('deck')
pitch = float((data or {}).get('pitch') or 1.0)
if deck not in ('A', 'B'):
return
d = mixer_state[_deck_key(deck)]
d['pitch'] = max(0.5, min(2.0, pitch))
# Re-anchor so position interpolation is consistent
d['position'] = _deck_current_position(d)
if d.get('playing'):
d['_started_at'] = time.time()
d['_started_pos'] = float(d['position'])
_broadcast_mixer_status()
@dj_socketio.on('audio_set_eq')
def audio_set_eq(data):
if _deny_if_not_controller():
return
deck = (data or {}).get('deck')
band = (data or {}).get('band')
value = float((data or {}).get('value') or 0.0)
if deck not in ('A', 'B'):
return
if band not in ('low', 'mid', 'high'):
return
d = mixer_state[_deck_key(deck)]
eq = d.get('eq') or {'low': 0.0, 'mid': 0.0, 'high': 0.0}
eq[band] = max(-20.0, min(20.0, value))
d['eq'] = eq
_broadcast_mixer_status()
@dj_socketio.on('audio_set_crossfader')
def audio_set_crossfader(data):
if _deny_if_not_controller():
return
val = int((data or {}).get('value') or 50)
mixer_state['crossfader'] = max(0, min(100, val))
_broadcast_mixer_status()
@dj_socketio.on('audio_sync_queue')
def audio_sync_queue(data):