From 80a4286bc0272113e509bf61c32de520d1bc24b0 Mon Sep 17 00:00:00 2001 From: ComputerTech Date: Fri, 3 Apr 2026 13:44:09 +0100 Subject: [PATCH] fix: defer broadcast_started for Qt MP3-direct mode until first audio chunk MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, start_broadcast immediately emitted broadcast_started and stream_status: active to listeners, even before the DJ pressed play. Listeners would connect to /stream.mp3, wait 60s with no data, then disconnect — so music never reached them from the PyQt client. The web DJ panel was unaffected because it only sends audio_chunk while music is actively playing, so start_broadcast and data arrive together. Fix: in MP3-direct mode (Qt client, format=mp3), hold back broadcast_started and stream_status: active until the very first audio_chunk arrives. The first chunk guarantees the preroll buffer has data and listeners will immediately receive audio when they connect. - _mp3_broadcast_announced flag tracks whether the deferred announcement has been sent in the current session - Resets on stop_broadcast, grace-period auto-stop, and fresh start_broadcast - Browser (webm/opus) mode is unaffected — announces immediately as before --- server.py | 100 ++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 75 insertions(+), 25 deletions(-) diff --git a/server.py b/server.py index 400a96e..74782c7 100644 --- a/server.py +++ b/server.py @@ -54,6 +54,15 @@ broadcast_state = { listener_sids = set() dj_sids = set() +# Set to True once the first audio chunk arrives in MP3-direct mode. +# broadcast_started is held back until then so listeners don't connect +# to /stream.mp3 before any data is flowing (e.g. before the DJ presses play). +_mp3_broadcast_announced = False + +# Current listener glow intensity — persisted so new/reloaded listeners get the +# correct level immediately without waiting for the DJ to move the slider again. +_current_glow_intensity = 30 + # Grace-period greenlet: auto-stop broadcast if DJ doesn't reconnect in time _dj_grace_greenlet = None DJ_GRACE_PERIOD_SECS = 20 # seconds to wait before auto-stopping @@ -136,7 +145,9 @@ def _start_transcoder_if_needed(is_mp3_input=False): try: _ffmpeg_in_q.get_nowait() except: break - # Define threads INSIDE so they close over THIS specific 'proc' + # Define greenlets INSIDE so they close over THIS specific 'proc'. + # Blocking subprocess pipe I/O is delegated to eventlet.tpool so it runs + # in a real OS thread, preventing it from stalling the eventlet hub. def _writer(proc): global _transcoder_last_error print(f"[THREAD] Transcoder writer started (PID: {proc.pid})") @@ -144,10 +155,11 @@ def _start_transcoder_if_needed(is_mp3_input=False): try: chunk = _ffmpeg_in_q.get(timeout=1.0) if chunk is None: break - + if proc.stdin: - proc.stdin.write(chunk) - proc.stdin.flush() + # Run blocking pipe-write in a real thread via tpool + eventlet.tpool.execute(proc.stdin.write, chunk) + eventlet.tpool.execute(proc.stdin.flush) except queue.Empty: continue except (BrokenPipeError, ConnectionResetError): @@ -157,8 +169,8 @@ def _start_transcoder_if_needed(is_mp3_input=False): print(f"WARNING: Transcoder writer error: {e}") _transcoder_last_error = str(e) break - - # Ensure process is killed if thread exits unexpectedly + + # Ensure process is killed if greenlet exits unexpectedly if proc.poll() is None: try: proc.terminate() except: pass @@ -169,9 +181,9 @@ def _start_transcoder_if_needed(is_mp3_input=False): print(f"[THREAD] Transcoder reader started (PID: {proc.pid})") while proc.poll() is None: try: - # Smaller read for smoother delivery (1KB) - # This prevents buffering delays at lower bitrates - data = proc.stdout.read(1024) + # Run blocking pipe-read in a real thread via tpool (1 KB chunks + # for smooth delivery; prevents buffering delays at lower bitrates) + data = eventlet.tpool.execute(proc.stdout.read, 1024) if not data: break _transcoder_bytes_out += len(data) @@ -183,20 +195,20 @@ def _start_transcoder_if_needed(is_mp3_input=False): try: q.put_nowait(data) except queue.Full: - # Client is too slow, skip this chunk for them + # Client is too slow — skip this chunk for them pass except Exception as e: print(f"WARNING: Transcoder reader error: {e}") _transcoder_last_error = str(e) break - # Ensure process is killed if thread exits unexpectedly + # Ensure process is killed if greenlet exits unexpectedly if proc.poll() is None: try: proc.terminate() except: pass print(f"[THREAD] Transcoder reader finished (PID: {proc.pid})") - # Start greenlets/threads for THIS process specifically + # Spawn as greenlets — tpool handles the blocking subprocess I/O internally eventlet.spawn(_writer, _ffmpeg_proc) eventlet.spawn(_reader, _ffmpeg_proc) @@ -664,12 +676,14 @@ def dj_connect(): def _dj_disconnect_grace(): """Auto-stop broadcast if no DJ reconnects within the grace period.""" - global _dj_grace_greenlet + global _dj_grace_greenlet, _mp3_broadcast_announced print(f"INFO: Grace period started — {DJ_GRACE_PERIOD_SECS}s for DJ to reconnect") eventlet.sleep(DJ_GRACE_PERIOD_SECS) if broadcast_state.get('active') and not dj_sids: print("WARNING: Grace period expired, no DJ reconnected — auto-stopping broadcast") broadcast_state['active'] = False + broadcast_state['is_mp3_input'] = False + _mp3_broadcast_announced = False _stop_transcoder() listener_socketio.emit('broadcast_stopped', namespace='/') listener_socketio.emit('stream_status', {'active': False}, namespace='/') @@ -694,11 +708,11 @@ def dj_disconnect(): @dj_socketio.on('start_broadcast') def dj_start(data=None): - global _dj_grace_greenlet + global _dj_grace_greenlet, _mp3_broadcast_announced # Cancel any pending auto-stop grace period (DJ reconnected in time) if _dj_grace_greenlet is not None: - _dj_grace_greenlet.kill() + _dj_grace_greenlet.kill(block=True) # block=True prevents a GreenletExit race _dj_grace_greenlet = None print("INFO: DJ reconnected within grace period — broadcast continues") @@ -718,15 +732,23 @@ def dj_start(data=None): broadcast_state['is_mp3_input'] = is_mp3_input if not was_already_active: - # Fresh broadcast start — clear pre-roll. - # For non-MP3 input start the ffmpeg transcoder; for MP3 input chunks are - # distributed directly via _distribute_mp3(), no transcoder required. + # Fresh broadcast start — clear pre-roll and reset announcement flag. with _mp3_lock: _mp3_preroll.clear() - if not is_mp3_input: + + if is_mp3_input: + # MP3-direct mode (Qt client): the DJ still needs to press play before + # audio flows. Firing broadcast_started now would cause listeners to + # connect to /stream.mp3 before any data exists, wait 60 s, then + # disconnect. Instead we hold back the announcement and send it on + # the very first audio_chunk so listeners connect when data is ready. + _mp3_broadcast_announced = False + print("BROADCAST: MP3-direct mode — deferring broadcast_started until first chunk") + else: + # Non-MP3 (browser webm/opus): ffmpeg transcoder starts immediately, + # so listeners can connect right away. _start_transcoder_if_needed(is_mp3_input=False) - # Tell listeners a new broadcast has begun (triggers audio player reload) - listener_socketio.emit('broadcast_started', namespace='/') + listener_socketio.emit('broadcast_started', namespace='/') else: # DJ reconnected mid-broadcast - just ensure transcoder is alive (non-MP3 only) # Do NOT clear pre-roll or trigger listener reload @@ -734,8 +756,12 @@ def dj_start(data=None): if not is_mp3_input: _start_transcoder_if_needed(is_mp3_input=False) - # Always send current status so any waiting listeners get unblocked - listener_socketio.emit('stream_status', {'active': True}, namespace='/') + # For MP3-direct fresh broadcast, hold back stream_status active=true too + # so listeners don't auto-connect before audio data is flowing. + # It will be sent alongside broadcast_started on the first audio_chunk. + # For non-MP3 (browser) mode or DJ reconnects, send immediately. + if not (is_mp3_input and not was_already_active): + listener_socketio.emit('stream_status', {'active': True}, namespace='/') @dj_socketio.on('get_listener_count') def dj_get_listener_count(): @@ -744,8 +770,10 @@ def dj_get_listener_count(): @dj_socketio.on('listener_glow') def dj_listener_glow(data): """DJ sets the glow intensity on the listener page.""" + global _current_glow_intensity intensity = int(data.get('intensity', 30)) if isinstance(data, dict) else 30 intensity = max(0, min(100, intensity)) + _current_glow_intensity = intensity listener_socketio.emit('listener_glow', {'intensity': intensity}, namespace='/') @dj_socketio.on('deck_glow') @@ -758,9 +786,22 @@ def dj_deck_glow(data): 'B': bool(data.get('B', False)), }, namespace='/') +@dj_socketio.on('now_playing') +def dj_now_playing(data): + """Relay the currently playing track title to all listener pages.""" + if not isinstance(data, dict): + return + listener_socketio.emit('now_playing', { + 'title': str(data.get('title', '')), + 'deck': str(data.get('deck', '')), + }, namespace='/') + @dj_socketio.on('stop_broadcast') def dj_stop(): + global _mp3_broadcast_announced broadcast_state['active'] = False + broadcast_state['is_mp3_input'] = False + _mp3_broadcast_announced = False session['is_dj'] = False print("STOPPED: DJ stopped broadcasting") @@ -771,9 +812,17 @@ def dj_stop(): @dj_socketio.on('audio_chunk') def dj_audio(data): + global _mp3_broadcast_announced if broadcast_state['active'] and isinstance(data, (bytes, bytearray)): if broadcast_state.get('is_mp3_input', False): - # MP3 input (e.g. Qt client): skip ffmpeg, send directly to listeners + # MP3 input (e.g. Qt client): skip ffmpeg, send directly to listeners. + # Fire broadcast_started on the very first chunk so listeners connect + # only once actual audio data is flowing (the DJ has pressed play). + if not _mp3_broadcast_announced: + _mp3_broadcast_announced = True + print("BROADCAST: First MP3 chunk received — announcing broadcast_started to listeners") + listener_socketio.emit('broadcast_started', namespace='/') + listener_socketio.emit('stream_status', {'active': True}, namespace='/') _distribute_mp3(bytes(data)) else: # Other formats (e.g. webm/opus from browser): route through ffmpeg transcoder @@ -848,8 +897,9 @@ def listener_disconnect(): @listener_socketio.on('join_listener') def listener_join(): - # SID already added in listener_connect(); just send stream status back + # SID already added in listener_connect(); send stream status and current glow emit('stream_status', {'active': broadcast_state['active']}) + emit('listener_glow', {'intensity': _current_glow_intensity}) @listener_socketio.on('get_listener_count') def listener_get_count():