From 6027f2e9738e9cc46eefed26bd9d17995e822a25 Mon Sep 17 00:00:00 2001 From: ComputerTech Date: Tue, 10 Mar 2026 19:54:06 +0000 Subject: [PATCH] Fix streaming: bypass ffmpeg for MP3 input, fix PyQt latency, fix routing bugs - server.py: Add _distribute_mp3() to route MP3 chunks directly to listener queues without a second ffmpeg passthrough (halves pipeline latency, removes the eventlet/subprocess blocking read that caused the Qt client to fail) - server.py: dj_start no longer starts ffmpeg for is_mp3_input=True - server.py: dj_audio routes to _distribute_mp3 vs _feed_transcoder based on format - server.py: _transcoder_watchdog skips MP3-direct mode - server.py: stream_mp3 endpoint no longer waits for ffmpeg proc when MP3 direct - techdj_qt.py: Add -fflags nobuffer + -flush_packets 1 to reduce source latency - techdj_qt.py: bufsize=0 and read(4096) instead of read(8192) for ~260ms chunks - listener.js: Reduce broadcast_started connect delay 800ms -> 300ms --- listener.js | 5 +-- server.py | 90 ++++++++++++++++++++++++++++++++++------------------ techdj_qt.py | 11 +++++-- 3 files changed, 71 insertions(+), 35 deletions(-) diff --git a/listener.js b/listener.js index e071bf0..cc000ea 100644 --- a/listener.js +++ b/listener.js @@ -294,9 +294,10 @@ function initSocket() { updateNowPlaying('Stream is live!'); if (window.listenerAudioEnabled) { - // Small delay to let the transcoder produce initial data + // Brief delay: 300ms is enough for ffmpeg to produce its first output + // (was 800ms — reduced to cut perceived startup lag) resetReconnectBackoff(); - setTimeout(() => connectStream(), 800); + setTimeout(() => connectStream(), 300); } }); diff --git a/server.py b/server.py index cd58571..09da226 100644 --- a/server.py +++ b/server.py @@ -235,9 +235,9 @@ def _stop_transcoder(): def _feed_transcoder(data: bytes): global _last_audio_chunk_ts if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None: - # If active but dead, restart it automatically + # If active but dead, restart it automatically (non-MP3 mode only) if broadcast_state.get('active'): - _start_transcoder_if_needed(is_mp3_input=broadcast_state.get('is_mp3_input', False)) + _start_transcoder_if_needed(is_mp3_input=False) else: return @@ -247,6 +247,28 @@ def _feed_transcoder(data: bytes): except queue.Full: # Drop chunk if overflow to prevent memory bloat pass + + +def _distribute_mp3(data: bytes): + """Distribute MP3 bytes directly to preroll buffer and all connected listener + clients, bypassing the ffmpeg transcoder entirely. + + Used when the DJ is already sending valid MP3 (e.g. the Qt desktop client) + to eliminate the unnecessary encode/decode round-trip and cut pipeline latency + roughly in half. + """ + global _transcoder_bytes_out, _last_audio_chunk_ts + _last_audio_chunk_ts = time.time() + _transcoder_bytes_out += len(data) + with _mp3_lock: + _mp3_preroll.append(data) + for q in list(_mp3_clients): + try: + q.put_nowait(data) + except queue.Full: + pass + + # Load settings to get MUSIC_FOLDER def _load_settings(): try: @@ -432,18 +454,21 @@ def setup_shared_routes(app, index_file='index.html'): 'Cache-Control': 'no-cache, no-store', }) - # If the transcoder isn't ready yet, wait briefly (it may still be starting) - waited = 0.0 - while (_ffmpeg_proc is None or _ffmpeg_proc.poll() is not None) and waited < 5.0: - eventlet.sleep(0.5) - waited += 0.5 + # For non-MP3 input the server runs an ffmpeg transcoder; wait for it to start. + # For MP3 input (e.g. Qt client) chunks are distributed directly — no ffmpeg needed. + is_mp3_direct = broadcast_state.get('is_mp3_input', False) + if not is_mp3_direct: + waited = 0.0 + while (_ffmpeg_proc is None or _ffmpeg_proc.poll() is not None) and waited < 5.0: + eventlet.sleep(0.5) + waited += 0.5 - if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None: - return Response(b'', status=503, content_type='audio/mpeg', headers={ - 'Retry-After': '3', - 'Access-Control-Allow-Origin': '*', - 'Cache-Control': 'no-cache, no-store', - }) + if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None: + return Response(b'', status=503, content_type='audio/mpeg', headers={ + 'Retry-After': '3', + 'Access-Control-Allow-Origin': '*', + 'Cache-Control': 'no-cache, no-store', + }) preroll_count = len(_mp3_preroll) print(f"LISTENER: New listener joined stream (Pre-roll: {preroll_count} chunks)") @@ -688,17 +713,21 @@ def dj_start(data=None): broadcast_state['is_mp3_input'] = is_mp3_input if not was_already_active: - # Fresh broadcast start - clear pre-roll and start transcoder cleanly + # Fresh broadcast start — clear pre-roll. + # For non-MP3 input start the ffmpeg transcoder; for MP3 input chunks are + # distributed directly via _distribute_mp3(), no transcoder required. with _mp3_lock: _mp3_preroll.clear() - _start_transcoder_if_needed(is_mp3_input=is_mp3_input) + if not is_mp3_input: + _start_transcoder_if_needed(is_mp3_input=False) # Tell listeners a new broadcast has begun (triggers audio player reload) listener_socketio.emit('broadcast_started', namespace='/') else: - # DJ reconnected mid-broadcast - just ensure transcoder is alive + # DJ reconnected mid-broadcast - just ensure transcoder is alive (non-MP3 only) # Do NOT clear pre-roll or trigger listener reload print("BROADCAST: DJ reconnected - resuming existing broadcast") - _start_transcoder_if_needed(is_mp3_input=is_mp3_input) + if not is_mp3_input: + _start_transcoder_if_needed(is_mp3_input=False) # Always send current status so any waiting listeners get unblocked listener_socketio.emit('stream_status', {'active': True}, namespace='/') @@ -720,15 +749,14 @@ def dj_stop(): @dj_socketio.on('audio_chunk') def dj_audio(data): - # MP3-only mode: do not relay raw chunks to listeners; feed transcoder only. - if broadcast_state['active']: - # Ensure MP3 fallback transcoder is running (if ffmpeg is installed) - if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None: - # If we don't know the format, default to transcode, - # but usually start_broadcast handles this - _start_transcoder_if_needed() - - if isinstance(data, (bytes, bytearray)): + if broadcast_state['active'] and isinstance(data, (bytes, bytearray)): + if broadcast_state.get('is_mp3_input', False): + # MP3 input (e.g. Qt client): skip ffmpeg, send directly to listeners + _distribute_mp3(bytes(data)) + else: + # Other formats (e.g. webm/opus from browser): route through ffmpeg transcoder + if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None: + _start_transcoder_if_needed(is_mp3_input=False) _feed_transcoder(bytes(data)) # === LISTENER SERVER === @@ -792,13 +820,15 @@ def listener_get_count(): # DJ Panel Routes (No engine commands needed in local mode) def _transcoder_watchdog(): - """Periodic check to ensure the transcoder stays alive during active broadcasts.""" + """Periodic check to ensure the ffmpeg transcoder stays alive. + Only applies to non-MP3 input; MP3 input (Qt client) is distributed directly. + """ while True: - if broadcast_state.get('active'): + is_mp3_direct = broadcast_state.get('is_mp3_input', False) + if broadcast_state.get('active') and not is_mp3_direct: if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None: - # Only log if it's actually dead and supposed to be alive print("WARNING: Watchdog: Transcoder dead during active broadcast, reviving...") - _start_transcoder_if_needed(is_mp3_input=broadcast_state.get('is_mp3_input', False)) + _start_transcoder_if_needed(is_mp3_input=False) eventlet.sleep(5) diff --git a/techdj_qt.py b/techdj_qt.py index cb24f84..0e1fdf5 100644 --- a/techdj_qt.py +++ b/techdj_qt.py @@ -852,21 +852,26 @@ class StreamingWorker(QThread): "ffmpeg", "-hide_banner", "-loglevel", "error", + # Disable input buffering so frames reach the pipe immediately + "-fflags", "nobuffer", "-f", "pulse", "-i", source, "-ac", "2", "-ar", "44100", - "-f", "mp3", "-b:a", "128k", "-af", "aresample=async=1", + # Flush every packet — critical for low-latency pipe streaming + "-flush_packets", "1", + "-f", "mp3", "pipe:1" ] self.ffmpeg_proc = subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=8192 + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0 ) while self.is_running and self.ffmpeg_proc.poll() is None: - chunk = self.ffmpeg_proc.stdout.read(8192) + # 4096 bytes ≈ 10 MP3 frames ≈ ~260ms at 128kbps — low-latency chunks + chunk = self.ffmpeg_proc.stdout.read(4096) if not chunk: break sio = self.sio # Local ref to avoid race with stop_streaming()