Fix streaming: bypass ffmpeg for MP3 input, fix PyQt latency, fix routing bugs

- server.py: Add _distribute_mp3() to route MP3 chunks directly to listener
  queues without a second ffmpeg passthrough (halves pipeline latency, removes
  the eventlet/subprocess blocking read that caused the Qt client to fail)
- server.py: dj_start no longer starts ffmpeg for is_mp3_input=True
- server.py: dj_audio routes to _distribute_mp3 vs _feed_transcoder based on format
- server.py: _transcoder_watchdog skips MP3-direct mode
- server.py: stream_mp3 endpoint no longer waits for ffmpeg proc when MP3 direct
- techdj_qt.py: Add -fflags nobuffer + -flush_packets 1 to reduce source latency
- techdj_qt.py: bufsize=0 and read(4096) instead of read(8192) for ~260ms chunks
- listener.js: Reduce broadcast_started connect delay 800ms -> 300ms
This commit is contained in:
ComputerTech 2026-03-10 19:54:06 +00:00
parent 514f9899a3
commit 6027f2e973
3 changed files with 71 additions and 35 deletions

View File

@ -294,9 +294,10 @@ function initSocket() {
updateNowPlaying('Stream is live!');
if (window.listenerAudioEnabled) {
// Small delay to let the transcoder produce initial data
// Brief delay: 300ms is enough for ffmpeg to produce its first output
// (was 800ms — reduced to cut perceived startup lag)
resetReconnectBackoff();
setTimeout(() => connectStream(), 800);
setTimeout(() => connectStream(), 300);
}
});

View File

@ -235,9 +235,9 @@ def _stop_transcoder():
def _feed_transcoder(data: bytes):
global _last_audio_chunk_ts
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
# If active but dead, restart it automatically
# If active but dead, restart it automatically (non-MP3 mode only)
if broadcast_state.get('active'):
_start_transcoder_if_needed(is_mp3_input=broadcast_state.get('is_mp3_input', False))
_start_transcoder_if_needed(is_mp3_input=False)
else:
return
@ -247,6 +247,28 @@ def _feed_transcoder(data: bytes):
except queue.Full:
# Drop chunk if overflow to prevent memory bloat
pass
def _distribute_mp3(data: bytes):
"""Distribute MP3 bytes directly to preroll buffer and all connected listener
clients, bypassing the ffmpeg transcoder entirely.
Used when the DJ is already sending valid MP3 (e.g. the Qt desktop client)
to eliminate the unnecessary encode/decode round-trip and cut pipeline latency
roughly in half.
"""
global _transcoder_bytes_out, _last_audio_chunk_ts
_last_audio_chunk_ts = time.time()
_transcoder_bytes_out += len(data)
with _mp3_lock:
_mp3_preroll.append(data)
for q in list(_mp3_clients):
try:
q.put_nowait(data)
except queue.Full:
pass
# Load settings to get MUSIC_FOLDER
def _load_settings():
try:
@ -432,18 +454,21 @@ def setup_shared_routes(app, index_file='index.html'):
'Cache-Control': 'no-cache, no-store',
})
# If the transcoder isn't ready yet, wait briefly (it may still be starting)
waited = 0.0
while (_ffmpeg_proc is None or _ffmpeg_proc.poll() is not None) and waited < 5.0:
eventlet.sleep(0.5)
waited += 0.5
# For non-MP3 input the server runs an ffmpeg transcoder; wait for it to start.
# For MP3 input (e.g. Qt client) chunks are distributed directly — no ffmpeg needed.
is_mp3_direct = broadcast_state.get('is_mp3_input', False)
if not is_mp3_direct:
waited = 0.0
while (_ffmpeg_proc is None or _ffmpeg_proc.poll() is not None) and waited < 5.0:
eventlet.sleep(0.5)
waited += 0.5
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
return Response(b'', status=503, content_type='audio/mpeg', headers={
'Retry-After': '3',
'Access-Control-Allow-Origin': '*',
'Cache-Control': 'no-cache, no-store',
})
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
return Response(b'', status=503, content_type='audio/mpeg', headers={
'Retry-After': '3',
'Access-Control-Allow-Origin': '*',
'Cache-Control': 'no-cache, no-store',
})
preroll_count = len(_mp3_preroll)
print(f"LISTENER: New listener joined stream (Pre-roll: {preroll_count} chunks)")
@ -688,17 +713,21 @@ def dj_start(data=None):
broadcast_state['is_mp3_input'] = is_mp3_input
if not was_already_active:
# Fresh broadcast start - clear pre-roll and start transcoder cleanly
# Fresh broadcast start — clear pre-roll.
# For non-MP3 input start the ffmpeg transcoder; for MP3 input chunks are
# distributed directly via _distribute_mp3(), no transcoder required.
with _mp3_lock:
_mp3_preroll.clear()
_start_transcoder_if_needed(is_mp3_input=is_mp3_input)
if not is_mp3_input:
_start_transcoder_if_needed(is_mp3_input=False)
# Tell listeners a new broadcast has begun (triggers audio player reload)
listener_socketio.emit('broadcast_started', namespace='/')
else:
# DJ reconnected mid-broadcast - just ensure transcoder is alive
# DJ reconnected mid-broadcast - just ensure transcoder is alive (non-MP3 only)
# Do NOT clear pre-roll or trigger listener reload
print("BROADCAST: DJ reconnected - resuming existing broadcast")
_start_transcoder_if_needed(is_mp3_input=is_mp3_input)
if not is_mp3_input:
_start_transcoder_if_needed(is_mp3_input=False)
# Always send current status so any waiting listeners get unblocked
listener_socketio.emit('stream_status', {'active': True}, namespace='/')
@ -720,15 +749,14 @@ def dj_stop():
@dj_socketio.on('audio_chunk')
def dj_audio(data):
# MP3-only mode: do not relay raw chunks to listeners; feed transcoder only.
if broadcast_state['active']:
# Ensure MP3 fallback transcoder is running (if ffmpeg is installed)
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
# If we don't know the format, default to transcode,
# but usually start_broadcast handles this
_start_transcoder_if_needed()
if isinstance(data, (bytes, bytearray)):
if broadcast_state['active'] and isinstance(data, (bytes, bytearray)):
if broadcast_state.get('is_mp3_input', False):
# MP3 input (e.g. Qt client): skip ffmpeg, send directly to listeners
_distribute_mp3(bytes(data))
else:
# Other formats (e.g. webm/opus from browser): route through ffmpeg transcoder
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
_start_transcoder_if_needed(is_mp3_input=False)
_feed_transcoder(bytes(data))
# === LISTENER SERVER ===
@ -792,13 +820,15 @@ def listener_get_count():
# DJ Panel Routes (No engine commands needed in local mode)
def _transcoder_watchdog():
"""Periodic check to ensure the transcoder stays alive during active broadcasts."""
"""Periodic check to ensure the ffmpeg transcoder stays alive.
Only applies to non-MP3 input; MP3 input (Qt client) is distributed directly.
"""
while True:
if broadcast_state.get('active'):
is_mp3_direct = broadcast_state.get('is_mp3_input', False)
if broadcast_state.get('active') and not is_mp3_direct:
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
# Only log if it's actually dead and supposed to be alive
print("WARNING: Watchdog: Transcoder dead during active broadcast, reviving...")
_start_transcoder_if_needed(is_mp3_input=broadcast_state.get('is_mp3_input', False))
_start_transcoder_if_needed(is_mp3_input=False)
eventlet.sleep(5)

View File

@ -852,21 +852,26 @@ class StreamingWorker(QThread):
"ffmpeg",
"-hide_banner",
"-loglevel", "error",
# Disable input buffering so frames reach the pipe immediately
"-fflags", "nobuffer",
"-f", "pulse",
"-i", source,
"-ac", "2",
"-ar", "44100",
"-f", "mp3",
"-b:a", "128k",
"-af", "aresample=async=1",
# Flush every packet — critical for low-latency pipe streaming
"-flush_packets", "1",
"-f", "mp3",
"pipe:1"
]
self.ffmpeg_proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=8192
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0
)
while self.is_running and self.ffmpeg_proc.poll() is None:
chunk = self.ffmpeg_proc.stdout.read(8192)
# 4096 bytes ≈ 10 MP3 frames ≈ ~260ms at 128kbps — low-latency chunks
chunk = self.ffmpeg_proc.stdout.read(4096)
if not chunk:
break
sio = self.sio # Local ref to avoid race with stop_streaming()