fix: defer broadcast_started for Qt MP3-direct mode until first audio chunk
Previously, start_broadcast immediately emitted broadcast_started and stream_status: active to listeners, even before the DJ pressed play. Listeners would connect to /stream.mp3, wait 60s with no data, then disconnect — so music never reached them from the PyQt client. The web DJ panel was unaffected because it only sends audio_chunk while music is actively playing, so start_broadcast and data arrive together. Fix: in MP3-direct mode (Qt client, format=mp3), hold back broadcast_started and stream_status: active until the very first audio_chunk arrives. The first chunk guarantees the preroll buffer has data and listeners will immediately receive audio when they connect. - _mp3_broadcast_announced flag tracks whether the deferred announcement has been sent in the current session - Resets on stop_broadcast, grace-period auto-stop, and fresh start_broadcast - Browser (webm/opus) mode is unaffected — announces immediately as before
This commit is contained in:
parent
35adfa7feb
commit
80a4286bc0
100
server.py
100
server.py
|
|
@ -54,6 +54,15 @@ broadcast_state = {
|
|||
listener_sids = set()
|
||||
dj_sids = set()
|
||||
|
||||
# Set to True once the first audio chunk arrives in MP3-direct mode.
|
||||
# broadcast_started is held back until then so listeners don't connect
|
||||
# to /stream.mp3 before any data is flowing (e.g. before the DJ presses play).
|
||||
_mp3_broadcast_announced = False
|
||||
|
||||
# Current listener glow intensity — persisted so new/reloaded listeners get the
|
||||
# correct level immediately without waiting for the DJ to move the slider again.
|
||||
_current_glow_intensity = 30
|
||||
|
||||
# Grace-period greenlet: auto-stop broadcast if DJ doesn't reconnect in time
|
||||
_dj_grace_greenlet = None
|
||||
DJ_GRACE_PERIOD_SECS = 20 # seconds to wait before auto-stopping
|
||||
|
|
@ -136,7 +145,9 @@ def _start_transcoder_if_needed(is_mp3_input=False):
|
|||
try: _ffmpeg_in_q.get_nowait()
|
||||
except: break
|
||||
|
||||
# Define threads INSIDE so they close over THIS specific 'proc'
|
||||
# Define greenlets INSIDE so they close over THIS specific 'proc'.
|
||||
# Blocking subprocess pipe I/O is delegated to eventlet.tpool so it runs
|
||||
# in a real OS thread, preventing it from stalling the eventlet hub.
|
||||
def _writer(proc):
|
||||
global _transcoder_last_error
|
||||
print(f"[THREAD] Transcoder writer started (PID: {proc.pid})")
|
||||
|
|
@ -144,10 +155,11 @@ def _start_transcoder_if_needed(is_mp3_input=False):
|
|||
try:
|
||||
chunk = _ffmpeg_in_q.get(timeout=1.0)
|
||||
if chunk is None: break
|
||||
|
||||
|
||||
if proc.stdin:
|
||||
proc.stdin.write(chunk)
|
||||
proc.stdin.flush()
|
||||
# Run blocking pipe-write in a real thread via tpool
|
||||
eventlet.tpool.execute(proc.stdin.write, chunk)
|
||||
eventlet.tpool.execute(proc.stdin.flush)
|
||||
except queue.Empty:
|
||||
continue
|
||||
except (BrokenPipeError, ConnectionResetError):
|
||||
|
|
@ -157,8 +169,8 @@ def _start_transcoder_if_needed(is_mp3_input=False):
|
|||
print(f"WARNING: Transcoder writer error: {e}")
|
||||
_transcoder_last_error = str(e)
|
||||
break
|
||||
|
||||
# Ensure process is killed if thread exits unexpectedly
|
||||
|
||||
# Ensure process is killed if greenlet exits unexpectedly
|
||||
if proc.poll() is None:
|
||||
try: proc.terminate()
|
||||
except: pass
|
||||
|
|
@ -169,9 +181,9 @@ def _start_transcoder_if_needed(is_mp3_input=False):
|
|||
print(f"[THREAD] Transcoder reader started (PID: {proc.pid})")
|
||||
while proc.poll() is None:
|
||||
try:
|
||||
# Smaller read for smoother delivery (1KB)
|
||||
# This prevents buffering delays at lower bitrates
|
||||
data = proc.stdout.read(1024)
|
||||
# Run blocking pipe-read in a real thread via tpool (1 KB chunks
|
||||
# for smooth delivery; prevents buffering delays at lower bitrates)
|
||||
data = eventlet.tpool.execute(proc.stdout.read, 1024)
|
||||
if not data: break
|
||||
_transcoder_bytes_out += len(data)
|
||||
|
||||
|
|
@ -183,20 +195,20 @@ def _start_transcoder_if_needed(is_mp3_input=False):
|
|||
try:
|
||||
q.put_nowait(data)
|
||||
except queue.Full:
|
||||
# Client is too slow, skip this chunk for them
|
||||
# Client is too slow — skip this chunk for them
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f"WARNING: Transcoder reader error: {e}")
|
||||
_transcoder_last_error = str(e)
|
||||
break
|
||||
|
||||
# Ensure process is killed if thread exits unexpectedly
|
||||
# Ensure process is killed if greenlet exits unexpectedly
|
||||
if proc.poll() is None:
|
||||
try: proc.terminate()
|
||||
except: pass
|
||||
print(f"[THREAD] Transcoder reader finished (PID: {proc.pid})")
|
||||
|
||||
# Start greenlets/threads for THIS process specifically
|
||||
# Spawn as greenlets — tpool handles the blocking subprocess I/O internally
|
||||
eventlet.spawn(_writer, _ffmpeg_proc)
|
||||
eventlet.spawn(_reader, _ffmpeg_proc)
|
||||
|
||||
|
|
@ -664,12 +676,14 @@ def dj_connect():
|
|||
|
||||
def _dj_disconnect_grace():
|
||||
"""Auto-stop broadcast if no DJ reconnects within the grace period."""
|
||||
global _dj_grace_greenlet
|
||||
global _dj_grace_greenlet, _mp3_broadcast_announced
|
||||
print(f"INFO: Grace period started — {DJ_GRACE_PERIOD_SECS}s for DJ to reconnect")
|
||||
eventlet.sleep(DJ_GRACE_PERIOD_SECS)
|
||||
if broadcast_state.get('active') and not dj_sids:
|
||||
print("WARNING: Grace period expired, no DJ reconnected — auto-stopping broadcast")
|
||||
broadcast_state['active'] = False
|
||||
broadcast_state['is_mp3_input'] = False
|
||||
_mp3_broadcast_announced = False
|
||||
_stop_transcoder()
|
||||
listener_socketio.emit('broadcast_stopped', namespace='/')
|
||||
listener_socketio.emit('stream_status', {'active': False}, namespace='/')
|
||||
|
|
@ -694,11 +708,11 @@ def dj_disconnect():
|
|||
|
||||
@dj_socketio.on('start_broadcast')
|
||||
def dj_start(data=None):
|
||||
global _dj_grace_greenlet
|
||||
global _dj_grace_greenlet, _mp3_broadcast_announced
|
||||
|
||||
# Cancel any pending auto-stop grace period (DJ reconnected in time)
|
||||
if _dj_grace_greenlet is not None:
|
||||
_dj_grace_greenlet.kill()
|
||||
_dj_grace_greenlet.kill(block=True) # block=True prevents a GreenletExit race
|
||||
_dj_grace_greenlet = None
|
||||
print("INFO: DJ reconnected within grace period — broadcast continues")
|
||||
|
||||
|
|
@ -718,15 +732,23 @@ def dj_start(data=None):
|
|||
broadcast_state['is_mp3_input'] = is_mp3_input
|
||||
|
||||
if not was_already_active:
|
||||
# Fresh broadcast start — clear pre-roll.
|
||||
# For non-MP3 input start the ffmpeg transcoder; for MP3 input chunks are
|
||||
# distributed directly via _distribute_mp3(), no transcoder required.
|
||||
# Fresh broadcast start — clear pre-roll and reset announcement flag.
|
||||
with _mp3_lock:
|
||||
_mp3_preroll.clear()
|
||||
if not is_mp3_input:
|
||||
|
||||
if is_mp3_input:
|
||||
# MP3-direct mode (Qt client): the DJ still needs to press play before
|
||||
# audio flows. Firing broadcast_started now would cause listeners to
|
||||
# connect to /stream.mp3 before any data exists, wait 60 s, then
|
||||
# disconnect. Instead we hold back the announcement and send it on
|
||||
# the very first audio_chunk so listeners connect when data is ready.
|
||||
_mp3_broadcast_announced = False
|
||||
print("BROADCAST: MP3-direct mode — deferring broadcast_started until first chunk")
|
||||
else:
|
||||
# Non-MP3 (browser webm/opus): ffmpeg transcoder starts immediately,
|
||||
# so listeners can connect right away.
|
||||
_start_transcoder_if_needed(is_mp3_input=False)
|
||||
# Tell listeners a new broadcast has begun (triggers audio player reload)
|
||||
listener_socketio.emit('broadcast_started', namespace='/')
|
||||
listener_socketio.emit('broadcast_started', namespace='/')
|
||||
else:
|
||||
# DJ reconnected mid-broadcast - just ensure transcoder is alive (non-MP3 only)
|
||||
# Do NOT clear pre-roll or trigger listener reload
|
||||
|
|
@ -734,8 +756,12 @@ def dj_start(data=None):
|
|||
if not is_mp3_input:
|
||||
_start_transcoder_if_needed(is_mp3_input=False)
|
||||
|
||||
# Always send current status so any waiting listeners get unblocked
|
||||
listener_socketio.emit('stream_status', {'active': True}, namespace='/')
|
||||
# For MP3-direct fresh broadcast, hold back stream_status active=true too
|
||||
# so listeners don't auto-connect before audio data is flowing.
|
||||
# It will be sent alongside broadcast_started on the first audio_chunk.
|
||||
# For non-MP3 (browser) mode or DJ reconnects, send immediately.
|
||||
if not (is_mp3_input and not was_already_active):
|
||||
listener_socketio.emit('stream_status', {'active': True}, namespace='/')
|
||||
|
||||
@dj_socketio.on('get_listener_count')
|
||||
def dj_get_listener_count():
|
||||
|
|
@ -744,8 +770,10 @@ def dj_get_listener_count():
|
|||
@dj_socketio.on('listener_glow')
|
||||
def dj_listener_glow(data):
|
||||
"""DJ sets the glow intensity on the listener page."""
|
||||
global _current_glow_intensity
|
||||
intensity = int(data.get('intensity', 30)) if isinstance(data, dict) else 30
|
||||
intensity = max(0, min(100, intensity))
|
||||
_current_glow_intensity = intensity
|
||||
listener_socketio.emit('listener_glow', {'intensity': intensity}, namespace='/')
|
||||
|
||||
@dj_socketio.on('deck_glow')
|
||||
|
|
@ -758,9 +786,22 @@ def dj_deck_glow(data):
|
|||
'B': bool(data.get('B', False)),
|
||||
}, namespace='/')
|
||||
|
||||
@dj_socketio.on('now_playing')
|
||||
def dj_now_playing(data):
|
||||
"""Relay the currently playing track title to all listener pages."""
|
||||
if not isinstance(data, dict):
|
||||
return
|
||||
listener_socketio.emit('now_playing', {
|
||||
'title': str(data.get('title', '')),
|
||||
'deck': str(data.get('deck', '')),
|
||||
}, namespace='/')
|
||||
|
||||
@dj_socketio.on('stop_broadcast')
|
||||
def dj_stop():
|
||||
global _mp3_broadcast_announced
|
||||
broadcast_state['active'] = False
|
||||
broadcast_state['is_mp3_input'] = False
|
||||
_mp3_broadcast_announced = False
|
||||
session['is_dj'] = False
|
||||
print("STOPPED: DJ stopped broadcasting")
|
||||
|
||||
|
|
@ -771,9 +812,17 @@ def dj_stop():
|
|||
|
||||
@dj_socketio.on('audio_chunk')
|
||||
def dj_audio(data):
|
||||
global _mp3_broadcast_announced
|
||||
if broadcast_state['active'] and isinstance(data, (bytes, bytearray)):
|
||||
if broadcast_state.get('is_mp3_input', False):
|
||||
# MP3 input (e.g. Qt client): skip ffmpeg, send directly to listeners
|
||||
# MP3 input (e.g. Qt client): skip ffmpeg, send directly to listeners.
|
||||
# Fire broadcast_started on the very first chunk so listeners connect
|
||||
# only once actual audio data is flowing (the DJ has pressed play).
|
||||
if not _mp3_broadcast_announced:
|
||||
_mp3_broadcast_announced = True
|
||||
print("BROADCAST: First MP3 chunk received — announcing broadcast_started to listeners")
|
||||
listener_socketio.emit('broadcast_started', namespace='/')
|
||||
listener_socketio.emit('stream_status', {'active': True}, namespace='/')
|
||||
_distribute_mp3(bytes(data))
|
||||
else:
|
||||
# Other formats (e.g. webm/opus from browser): route through ffmpeg transcoder
|
||||
|
|
@ -848,8 +897,9 @@ def listener_disconnect():
|
|||
|
||||
@listener_socketio.on('join_listener')
|
||||
def listener_join():
|
||||
# SID already added in listener_connect(); just send stream status back
|
||||
# SID already added in listener_connect(); send stream status and current glow
|
||||
emit('stream_status', {'active': broadcast_state['active']})
|
||||
emit('listener_glow', {'intensity': _current_glow_intensity})
|
||||
|
||||
@listener_socketio.on('get_listener_count')
|
||||
def listener_get_count():
|
||||
|
|
|
|||
Loading…
Reference in New Issue