Fix Qt client music not playing on listener page

Replace PulseAudio monitor capture with direct file streaming.
Qt6's FFmpeg/PipeWire-native audio backend ignores PULSE_SINK so
the monitor captured silence. StreamingWorker now receives the
playing file path via a command queue and pipes it through ffmpeg
-re (real-time rate) directly to the server as audio_chunk events.

- Add switch_file() / stop_file() to StreamingWorker
- Replace ffmpeg pulse capture loop with file-based cmd loop
- DeckWidget.play() calls switch_file(path, position_ms)
- DeckWidget.pause()/stop() calls stop_file()
- Add now_playing socket relay in server.py
- listener.js handles now_playing event to show track title
- Add deck_glow emission from Qt deck play/pause/stop
This commit is contained in:
ComputerTech 2026-04-03 14:18:28 +01:00
parent 46128f5c58
commit 20e56f37b8
6 changed files with 386 additions and 104 deletions

View File

@ -26,6 +26,10 @@ let reconnectTimer = null;
const MAX_RECONNECT_DELAY = 30000; // 30 s cap
const BASE_RECONNECT_DELAY = 2000; // Start at 2 s
// Guard: prevents two simultaneous connectStream() calls (e.g. stream_status and
// broadcast_started can arrive back-to-back and both call connectStream)
let _streamConnecting = false;
// Stall watchdog state
let stallWatchdogInterval = null;
let lastWatchdogTime = 0;
@ -208,6 +212,10 @@ function connectStream() {
return;
}
// Prevent simultaneous duplicate connection attempts
if (_streamConnecting) return;
_streamConnecting = true;
console.log('[STREAM] Connecting to MP3 stream...');
updateStatus('Connecting...', false);
@ -217,6 +225,7 @@ function connectStream() {
window.listenerAudio.play()
.then(() => {
_streamConnecting = false;
console.log('[OK] Stream playback started');
resetReconnectBackoff();
updateStatus('Audio Active — Enjoy the stream!', true);
@ -224,6 +233,7 @@ function connectStream() {
startListenerVUMeter();
})
.catch(e => {
_streamConnecting = false;
console.warn('[WARN] play() rejected:', e.name, e.message);
// If broadcast went offline while we were connecting, don't retry
if (!broadcastActive) {
@ -317,11 +327,18 @@ function initSocket() {
document.body.classList.toggle('playing-B', !!data.B);
});
socket.on('now_playing', (data) => {
if (data && data.title) {
updateNowPlaying(data.title);
}
});
return socket;
}
/** Clean up when broadcast goes offline. */
function handleBroadcastOffline() {
_streamConnecting = false; // cancel any in-flight connect attempt
cancelScheduledReconnect();
stopStallWatchdog();

116
script.js
View File

@ -1292,6 +1292,14 @@ function tauriResolve(track) {
async function fetchLibrary() {
try {
// In Tauri: scan disk directly via native Rust commands — no Flask needed.
if (window.__TAURI__?.core?.invoke) {
const musicDir = await window.__TAURI__.core.invoke('get_music_folder');
allSongs = await window.__TAURI__.core.invoke('scan_library', { musicDir });
renderLibrary(allSongs);
return;
}
// Browser / Flask fallback
const res = await fetch('library.json?t=' + new Date().getTime());
allSongs = await res.json();
renderLibrary(allSongs);
@ -1536,6 +1544,27 @@ async function handleFileUpload(event) {
const files = Array.from(event.target.files);
if (!files || files.length === 0) return;
// In Tauri: files are already local — no server upload needed.
// Create session blob URLs and add directly to the in-memory library.
if (window.__TAURI__?.core?.invoke) {
const allowed = ['.mp3', '.m4a', '.wav', '.flac', '.ogg'];
let added = 0;
files.forEach(file => {
const ext = file.name.substring(file.name.lastIndexOf('.')).toLowerCase();
if (!allowed.includes(ext)) return;
const blobUrl = URL.createObjectURL(file);
const title = file.name.replace(/\.[^.]+$/, '');
// absolutePath is null — tauriResolve will fall back to the blob URL.
allSongs.push({ title, file: blobUrl, absolutePath: null });
added++;
});
if (added > 0) {
renderLibrary(allSongs);
showToast(`${added} track(s) loaded into library`, 'success');
}
return;
}
console.log(`Uploading ${files.length} file(s)...`);
// Create/Show progress container
@ -1700,8 +1729,13 @@ async function browseToPath(targetPath) {
}
try {
let data;
if (window.__TAURI__?.core?.invoke) {
data = await window.__TAURI__.core.invoke('list_dirs', { path });
} else {
const res = await fetch(`/browse_directories?path=${encodeURIComponent(path)}`);
const data = await res.json();
data = await res.json();
}
if (data.success) {
currentInput.value = data.path;
const list = document.getElementById('dir-list');
@ -1723,23 +1757,26 @@ async function browseToPath(targetPath) {
async function confirmFolderSelection() {
const path = document.getElementById('current-folder-path').value;
try {
let result;
if (window.__TAURI__?.core?.invoke) {
result = await window.__TAURI__.core.invoke('save_music_folder', { path });
} else {
const res = await fetch('/update_settings', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
library: { music_folder: path }
})
body: JSON.stringify({ library: { music_folder: path } })
});
const result = await res.json();
result = await res.json();
}
if (result.success) {
alert("Music folder updated! Refreshing library...");
alert('Music folder updated! Refreshing library...');
closeFolderPicker();
fetchLibrary();
} else {
alert("Error: " + result.error);
alert('Error: ' + (result.error || 'Unknown error'));
}
} catch (e) {
alert("Failed to update settings");
alert('Failed to update settings');
}
}
@ -1870,6 +1907,12 @@ let currentStreamMimeType = null;
function initSocket() {
if (socket) return socket;
// Socket.IO is loaded from a CDN; in Tauri (offline) it may not be available.
if (typeof io === 'undefined') {
console.warn('[SOCKET] Socket.IO not loaded — live streaming is unavailable');
return null;
}
const serverUrl = window.location.origin;
console.log(`[SOCKET] Connecting to ${serverUrl}`);
@ -2223,6 +2266,15 @@ function startBroadcast() {
// 250ms chunks: More frequent smaller chunks reduces stall gaps on weak connections.
// A 1-second chunk creates a 1-second starvation gap if the network hiccups;
// 250ms chunks keep the server fed 4x more often.
// Notify server FIRST so broadcast_state is active on the server before
// the first audio_chunk arrives — prevents the first ~250 ms of audio
// being silently dropped by the server's isinstance() guard.
if (!socket) initSocket();
const bitrateValue = document.getElementById('stream-quality').value + 'k';
socket.emit('start_broadcast', { bitrate: bitrateValue });
socket.emit('get_listener_count');
// Validate state before starting
if (mediaRecorder.state === 'inactive') {
mediaRecorder.start(250);
@ -2240,12 +2292,6 @@ function startBroadcast() {
document.getElementById('broadcast-status').textContent = 'LIVE';
document.getElementById('broadcast-status').classList.add('live');
// Notify server that broadcast is active (listeners use MP3 stream)
if (!socket) initSocket();
const bitrateValue = document.getElementById('stream-quality').value + 'k';
socket.emit('start_broadcast', { bitrate: bitrateValue });
socket.emit('get_listener_count');
console.log('[OK] Broadcasting started successfully!');
console.log('TIP TIP: Play a track on Deck A or B to stream audio');
@ -2276,23 +2322,16 @@ function stopBroadcast() {
}
if (streamDestination) {
// Only disconnect the specific stream connection — do NOT call .disconnect()
// with no args as that also removes the audioCtx.destination connection and
// causes an audible pop / silence gap for locally-monitored decks.
if (decks.A.crossfaderGain) {
try {
decks.A.crossfaderGain.disconnect(streamDestination);
decks.A.crossfaderGain.disconnect();
decks.A.crossfaderGain.connect(audioCtx.destination);
} catch (e) {
console.warn('Error restoring Deck A audio:', e);
}
try { decks.A.crossfaderGain.disconnect(streamDestination); }
catch (e) { console.warn('Error disconnecting Deck A from stream:', e); }
}
if (decks.B.crossfaderGain) {
try {
decks.B.crossfaderGain.disconnect(streamDestination);
decks.B.crossfaderGain.disconnect();
decks.B.crossfaderGain.connect(audioCtx.destination);
} catch (e) {
console.warn('Error restoring Deck B audio:', e);
}
try { decks.B.crossfaderGain.disconnect(streamDestination); }
catch (e) { console.warn('Error disconnecting Deck B from stream:', e); }
}
streamDestination = null;
}
@ -2325,25 +2364,16 @@ function restartBroadcast() {
streamProcessor = null;
}
// Clean up old stream destination
// Clean up old stream destination — only disconnect the stream leg,
// not the audioCtx.destination leg, to avoid an audible glitch.
if (streamDestination) {
if (decks.A.crossfaderGain) {
try {
decks.A.crossfaderGain.disconnect(streamDestination);
decks.A.crossfaderGain.disconnect();
decks.A.crossfaderGain.connect(audioCtx.destination);
} catch (e) {
console.warn('Error cleaning up Deck A:', e);
}
try { decks.A.crossfaderGain.disconnect(streamDestination); }
catch (e) { console.warn('Error cleaning up Deck A:', e); }
}
if (decks.B.crossfaderGain) {
try {
decks.B.crossfaderGain.disconnect(streamDestination);
decks.B.crossfaderGain.disconnect();
decks.B.crossfaderGain.connect(audioCtx.destination);
} catch (e) {
console.warn('Error cleaning up Deck B:', e);
}
try { decks.B.crossfaderGain.disconnect(streamDestination); }
catch (e) { console.warn('Error cleaning up Deck B:', e); }
}
streamDestination = null;
}

View File

@ -10,7 +10,8 @@
"audio": {
"recording_sample_rate": 48000,
"recording_format": "wav",
"stream_server_url": "http://54.37.246.24:5001/"
"stream_server_url": "http://54.37.246.24:5000",
"dj_panel_password": "techymusic//789//"
},
"ui": {
"neon_mode": 2

View File

@ -1,10 +1,147 @@
use std::path::{Path, PathBuf};
use std::fs;
use serde_json::{json, Value};
use tauri::Manager;
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
fn home_dir() -> PathBuf {
std::env::var("HOME")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("/home"))
}
fn settings_file(app: &tauri::AppHandle) -> PathBuf {
app.path()
.app_local_data_dir()
.unwrap_or_else(|_| home_dir().join(".local/share/techdj"))
.join("settings.json")
}
fn read_settings(app: &tauri::AppHandle) -> Value {
fs::read_to_string(settings_file(app))
.ok()
.and_then(|s| serde_json::from_str::<Value>(&s).ok())
.unwrap_or_else(|| json!({}))
}
// ---------------------------------------------------------------------------
// Tauri commands
// ---------------------------------------------------------------------------
/// Returns the current music folder path stored in app settings.
/// Falls back to ~/Music if nothing is saved.
#[tauri::command]
fn get_music_folder(app: tauri::AppHandle) -> String {
let settings = read_settings(&app);
if let Some(s) = settings["music_folder"].as_str() {
if !s.is_empty() {
return s.to_string();
}
}
home_dir().join("Music").to_string_lossy().into_owned()
}
/// Persists the chosen music folder to the Tauri app-local settings file.
#[tauri::command]
fn save_music_folder(app: tauri::AppHandle, path: String) -> Value {
let mut settings = read_settings(&app);
settings["music_folder"] = json!(path);
let sf = settings_file(&app);
if let Some(parent) = sf.parent() {
let _ = fs::create_dir_all(parent);
}
match fs::write(&sf, serde_json::to_string_pretty(&settings).unwrap_or_default()) {
Ok(_) => json!({ "success": true }),
Err(e) => json!({ "success": false, "error": e.to_string() }),
}
}
/// Recursively scans `music_dir` for supported audio files and returns an
/// array of `{ title, file, absolutePath }` objects — same shape as the
/// Flask `/library.json` endpoint so the front-end works without changes.
#[tauri::command]
fn scan_library(music_dir: String) -> Vec<Value> {
let mut tracks = Vec::new();
let p = Path::new(&music_dir);
if p.is_dir() {
scan_dir(p, &mut tracks);
}
tracks
}
fn scan_dir(dir: &Path, tracks: &mut Vec<Value>) {
let Ok(rd) = fs::read_dir(dir) else { return };
let mut entries: Vec<_> = rd.flatten().collect();
entries.sort_by_key(|e| e.file_name());
for entry in entries {
let p = entry.path();
if p.is_dir() {
scan_dir(&p, tracks);
} else if let Some(ext) = p.extension() {
let ext_lc = ext.to_string_lossy().to_lowercase();
if matches!(ext_lc.as_str(), "mp3" | "m4a" | "wav" | "flac" | "ogg" | "aac") {
let title = p
.file_stem()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_default();
let file_name = p
.file_name()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_default();
let abs = p.to_string_lossy().into_owned();
tracks.push(json!({
"title": title,
"file": format!("music_proxy/{}", file_name),
"absolutePath": abs,
}));
}
}
}
}
/// Lists subdirectories at `path` — replaces the Flask `/browse_directories`
/// endpoint for the in-app folder picker.
#[tauri::command]
fn list_dirs(path: String) -> Value {
let dir = Path::new(&path);
let mut entries: Vec<Value> = Vec::new();
if let Some(parent) = dir.parent() {
entries.push(json!({ "name": "..", "path": parent.to_string_lossy(), "isDir": true }));
}
if let Ok(rd) = fs::read_dir(dir) {
let mut dirs: Vec<_> = rd.flatten().filter(|e| e.path().is_dir()).collect();
dirs.sort_by_key(|e| e.file_name());
for d in dirs {
entries.push(json!({
"name": d.file_name().to_string_lossy(),
"path": d.path().to_string_lossy(),
"isDir": true,
}));
}
}
json!({ "success": true, "path": path, "entries": entries })
}
// ---------------------------------------------------------------------------
// App entry point
// ---------------------------------------------------------------------------
#[cfg_attr(mobile, tauri::mobile_entry_point)]
pub fn run() {
tauri::Builder::default()
// Grant the WebView direct read access to local audio files so
// convertFileSrc() can serve tracks from $HOME without going
// through the Flask proxy.
.plugin(tauri_plugin_fs::init())
.invoke_handler(tauri::generate_handler![
get_music_folder,
save_music_folder,
scan_library,
list_dirs,
])
.run(tauri::generate_context!())
.expect("error while running TechDJ");
}

View File

@ -23,10 +23,7 @@
"security": {
"assetProtocol": {
"enable": true,
"scope": [
"$HOME/**",
"$HOME/Music/**"
]
"scope": ["$HOME/**"]
}
}
},

View File

@ -5,7 +5,9 @@ import json
import random
import math
import time
import queue
import shutil
import threading
import requests
import re
import socketio
@ -861,10 +863,13 @@ class RecordingWorker(QProcess):
print(f"[RECORDING ERROR] {err}")
class StreamingWorker(QThread):
"""Streams this app's isolated audio output to a server using Socket.IO.
"""Streams the currently-playing deck's audio file to the server.
PULSE_SINK is set at startup so ALL audio from this process goes to the
virtual sink automatically. We just capture from its monitor here.
Instead of capturing from a PulseAudio monitor (which is unreliable when
Qt6 uses the PipeWire-native or ALSA audio backend), this worker reads the
file being played directly through ffmpeg with '-re' (real-time rate) and
sends the resulting MP3 bytes to the server as audio_chunk events. The
server distributes them to all connected listener browsers via /stream.mp3.
"""
streaming_started = pyqtSignal()
streaming_error = pyqtSignal(str)
@ -879,6 +884,8 @@ class StreamingWorker(QThread):
self.is_running = False
self.ffmpeg_proc = None
self._broadcast_started = False
# Thread-safe command queue: ('play', file_path, position_ms) or ('stop',)
self._file_cmd_queue = queue.Queue(maxsize=20)
def on_connect(self):
print("[SOCKET] Connected to DJ server")
@ -968,56 +975,106 @@ class StreamingWorker(QThread):
# wait_timeout: how long to wait for the server to respond during connect
self.sio.connect(self.stream_url, wait_timeout=10, headers=connect_headers)
source = get_audio_capture_source()
print(f"[STREAM] Capturing from: {source}")
print("[STREAM] Connected — waiting for deck to start playing...")
cmd = [
# File-based streaming loop.
# Waits for ('play', path, pos_ms) commands from the main thread,
# then pipes the file through ffmpeg at real-time rate to the server.
# This is reliable regardless of Qt's audio backend (PulseAudio /
# PipeWire-native / ALSA), since we read the file directly.
current_proc = None
while self.is_running:
# Block briefly waiting for a command; loop allows is_running re-check
try:
cmd = self._file_cmd_queue.get(timeout=0.25)
except queue.Empty:
# If the current ffmpeg exited on its own (track ended), clean up
if current_proc is not None and current_proc.poll() is not None:
current_proc = None
self.ffmpeg_proc = None
continue
if cmd[0] == 'play':
# Kill previous ffmpeg before starting a new one
if current_proc and current_proc.poll() is None:
current_proc.terminate()
try:
current_proc.wait(timeout=1.0)
except Exception:
current_proc.kill()
_, file_path, position_ms = cmd
position_secs = max(0.0, position_ms / 1000.0)
print(f"[STREAM] Streaming file: {file_path} from {position_secs:.1f}s")
ffmpeg_cmd = [
"ffmpeg",
"-hide_banner",
"-loglevel", "error",
# Disable input buffering so frames reach the pipe immediately
"-fflags", "nobuffer",
"-f", "pulse",
"-i", source,
"-re", # real-time output rate — prevents flooding the socket
"-ss", f"{position_secs:.3f}", # seek to playback position
"-i", file_path,
"-vn", # discard video (cover art etc.)
"-ac", "2",
"-ar", "44100",
"-b:a", "128k",
"-af", "aresample=async=1",
# Flush every packet — critical for low-latency pipe streaming
"-flush_packets", "1",
"-f", "mp3",
"pipe:1",
]
self.ffmpeg_proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0
current_proc = subprocess.Popen(
ffmpeg_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
)
self.ffmpeg_proc = current_proc
threading.Thread(
target=self._drain_stderr, args=(current_proc,), daemon=True
).start()
# Drain stderr in a real daemon thread so the OS pipe buffer never
# fills up and blocks stdout (classic Python subprocess deadlock).
import threading as _threading
stderr_thread = _threading.Thread(
target=self._drain_stderr, args=(self.ffmpeg_proc,), daemon=True
)
stderr_thread.start()
while self.is_running and self.ffmpeg_proc.poll() is None:
# 4096 bytes ≈ 10 MP3 frames ≈ ~260 ms at 128 kbps — low-latency chunks
chunk = self.ffmpeg_proc.stdout.read(4096)
if not chunk:
# Inner read loop for this file
while self.is_running:
# Non-blocking check for new command (switch track / stop)
try:
next_cmd = self._file_cmd_queue.get_nowait()
# Re-queue so the outer loop handles it
try:
self._file_cmd_queue.put_nowait(next_cmd)
except queue.Full:
pass
# Kill current proc so read() below returns immediately
if current_proc.poll() is None:
current_proc.terminate()
break
sio = self.sio # local ref guards against stop_streaming() race
except queue.Empty:
pass
if current_proc.poll() is not None:
current_proc = None
self.ffmpeg_proc = None
break
chunk = current_proc.stdout.read(4096)
if not chunk:
current_proc = None
self.ffmpeg_proc = None
break
sio = self.sio
if sio and sio.connected:
sio.emit('audio_chunk', chunk)
# Detect unexpected ffmpeg exit during an active stream
if self.is_running:
ret = self.ffmpeg_proc.poll() if self.ffmpeg_proc else None
if ret is not None and ret != 0:
self.streaming_error.emit(
f"FFmpeg exited with code {ret}.\n"
"Check that PulseAudio / PipeWire is running and the "
"virtual audio sink was created successfully."
)
elif cmd[0] == 'stop':
if current_proc and current_proc.poll() is None:
current_proc.terminate()
try:
current_proc.wait(timeout=1.0)
except Exception:
current_proc.kill()
current_proc = None
self.ffmpeg_proc = None
except Exception as e:
self.streaming_error.emit(f"Streaming error: {e}")
@ -1040,6 +1097,37 @@ class StreamingWorker(QThread):
except Exception as e:
print(f"[SOCKET] emit_if_connected error: {e}")
def switch_file(self, file_path, position_ms=0):
"""Called from the main thread when a deck starts playing.
Signals the streaming loop to kill the current ffmpeg (if any) and
start a new one reading *file_path* from *position_ms*.
"""
if not file_path:
return
# Drop any stale pending commands so only the latest file matters
while not self._file_cmd_queue.empty():
try:
self._file_cmd_queue.get_nowait()
except queue.Empty:
break
try:
self._file_cmd_queue.put_nowait(('play', file_path, int(position_ms)))
except queue.Full:
pass
def stop_file(self):
"""Called from the main thread when a deck pauses or stops."""
while not self._file_cmd_queue.empty():
try:
self._file_cmd_queue.get_nowait()
except queue.Empty:
break
try:
self._file_cmd_queue.put_nowait(('stop',))
except queue.Full:
pass
def stop_streaming(self):
"""Thread-safe stop: capture refs locally before clearing to avoid TOCTOU."""
self.is_running = False
@ -1305,6 +1393,7 @@ class DeckWidget(QGroupBox):
self.loop_btns = []
self.xf_vol = 100
self.current_title = ""
self.current_file_path = None
self.loop_timer = QTimer(self)
self.loop_timer.setInterval(LOOP_CHECK_INTERVAL)
@ -1541,6 +1630,7 @@ class DeckWidget(QGroupBox):
try:
self.player.setSource(QUrl.fromLocalFile(str(p.absolute())))
self.current_title = p.stem
self.current_file_path = str(p.absolute())
self.lbl_tr.setText(p.stem.upper())
self.vinyl.set_speed(0)
self.vinyl.angle = 0
@ -1610,11 +1700,18 @@ class DeckWidget(QGroupBox):
self.player.play()
self.vinyl.start_spin()
self._emit_playing_state(True)
# Stream the file directly to listeners (reliable regardless of audio backend)
mw = self.window()
if hasattr(mw, 'streaming_worker') and self.current_file_path:
mw.streaming_worker.switch_file(self.current_file_path, self.player.position())
def pause(self):
self.player.pause()
self.vinyl.stop_spin()
self._emit_playing_state(False)
mw = self.window()
if hasattr(mw, 'streaming_worker'):
mw.streaming_worker.stop_file()
def stop(self):
self.player.stop()
@ -1623,6 +1720,9 @@ class DeckWidget(QGroupBox):
self.vinyl.update()
self.clear_loop()
self._emit_playing_state(False)
mw = self.window()
if hasattr(mw, 'streaming_worker'):
mw.streaming_worker.stop_file()
def on_position_changed(self, pos):
self.wave.set_position(pos)