diff --git a/listener.js b/listener.js index ee503ed..381b7bf 100644 --- a/listener.js +++ b/listener.js @@ -26,6 +26,10 @@ let reconnectTimer = null; const MAX_RECONNECT_DELAY = 30000; // 30 s cap const BASE_RECONNECT_DELAY = 2000; // Start at 2 s +// Guard: prevents two simultaneous connectStream() calls (e.g. stream_status and +// broadcast_started can arrive back-to-back and both call connectStream) +let _streamConnecting = false; + // Stall watchdog state let stallWatchdogInterval = null; let lastWatchdogTime = 0; @@ -208,6 +212,10 @@ function connectStream() { return; } + // Prevent simultaneous duplicate connection attempts + if (_streamConnecting) return; + _streamConnecting = true; + console.log('[STREAM] Connecting to MP3 stream...'); updateStatus('Connecting...', false); @@ -217,6 +225,7 @@ function connectStream() { window.listenerAudio.play() .then(() => { + _streamConnecting = false; console.log('[OK] Stream playback started'); resetReconnectBackoff(); updateStatus('Audio Active — Enjoy the stream!', true); @@ -224,6 +233,7 @@ function connectStream() { startListenerVUMeter(); }) .catch(e => { + _streamConnecting = false; console.warn('[WARN] play() rejected:', e.name, e.message); // If broadcast went offline while we were connecting, don't retry if (!broadcastActive) { @@ -317,11 +327,18 @@ function initSocket() { document.body.classList.toggle('playing-B', !!data.B); }); + socket.on('now_playing', (data) => { + if (data && data.title) { + updateNowPlaying(data.title); + } + }); + return socket; } /** Clean up when broadcast goes offline. */ function handleBroadcastOffline() { + _streamConnecting = false; // cancel any in-flight connect attempt cancelScheduledReconnect(); stopStallWatchdog(); diff --git a/script.js b/script.js index 952f33b..5f8fd30 100644 --- a/script.js +++ b/script.js @@ -1292,6 +1292,14 @@ function tauriResolve(track) { async function fetchLibrary() { try { + // In Tauri: scan disk directly via native Rust commands — no Flask needed. + if (window.__TAURI__?.core?.invoke) { + const musicDir = await window.__TAURI__.core.invoke('get_music_folder'); + allSongs = await window.__TAURI__.core.invoke('scan_library', { musicDir }); + renderLibrary(allSongs); + return; + } + // Browser / Flask fallback const res = await fetch('library.json?t=' + new Date().getTime()); allSongs = await res.json(); renderLibrary(allSongs); @@ -1536,6 +1544,27 @@ async function handleFileUpload(event) { const files = Array.from(event.target.files); if (!files || files.length === 0) return; + // In Tauri: files are already local — no server upload needed. + // Create session blob URLs and add directly to the in-memory library. + if (window.__TAURI__?.core?.invoke) { + const allowed = ['.mp3', '.m4a', '.wav', '.flac', '.ogg']; + let added = 0; + files.forEach(file => { + const ext = file.name.substring(file.name.lastIndexOf('.')).toLowerCase(); + if (!allowed.includes(ext)) return; + const blobUrl = URL.createObjectURL(file); + const title = file.name.replace(/\.[^.]+$/, ''); + // absolutePath is null — tauriResolve will fall back to the blob URL. + allSongs.push({ title, file: blobUrl, absolutePath: null }); + added++; + }); + if (added > 0) { + renderLibrary(allSongs); + showToast(`${added} track(s) loaded into library`, 'success'); + } + return; + } + console.log(`Uploading ${files.length} file(s)...`); // Create/Show progress container @@ -1700,8 +1729,13 @@ async function browseToPath(targetPath) { } try { - const res = await fetch(`/browse_directories?path=${encodeURIComponent(path)}`); - const data = await res.json(); + let data; + if (window.__TAURI__?.core?.invoke) { + data = await window.__TAURI__.core.invoke('list_dirs', { path }); + } else { + const res = await fetch(`/browse_directories?path=${encodeURIComponent(path)}`); + data = await res.json(); + } if (data.success) { currentInput.value = data.path; const list = document.getElementById('dir-list'); @@ -1723,23 +1757,26 @@ async function browseToPath(targetPath) { async function confirmFolderSelection() { const path = document.getElementById('current-folder-path').value; try { - const res = await fetch('/update_settings', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - library: { music_folder: path } - }) - }); - const result = await res.json(); + let result; + if (window.__TAURI__?.core?.invoke) { + result = await window.__TAURI__.core.invoke('save_music_folder', { path }); + } else { + const res = await fetch('/update_settings', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ library: { music_folder: path } }) + }); + result = await res.json(); + } if (result.success) { - alert("Music folder updated! Refreshing library..."); + alert('Music folder updated! Refreshing library...'); closeFolderPicker(); fetchLibrary(); } else { - alert("Error: " + result.error); + alert('Error: ' + (result.error || 'Unknown error')); } } catch (e) { - alert("Failed to update settings"); + alert('Failed to update settings'); } } @@ -1870,6 +1907,12 @@ let currentStreamMimeType = null; function initSocket() { if (socket) return socket; + // Socket.IO is loaded from a CDN; in Tauri (offline) it may not be available. + if (typeof io === 'undefined') { + console.warn('[SOCKET] Socket.IO not loaded — live streaming is unavailable'); + return null; + } + const serverUrl = window.location.origin; console.log(`[SOCKET] Connecting to ${serverUrl}`); @@ -2223,6 +2266,15 @@ function startBroadcast() { // 250ms chunks: More frequent smaller chunks reduces stall gaps on weak connections. // A 1-second chunk creates a 1-second starvation gap if the network hiccups; // 250ms chunks keep the server fed 4x more often. + + // Notify server FIRST so broadcast_state is active on the server before + // the first audio_chunk arrives — prevents the first ~250 ms of audio + // being silently dropped by the server's isinstance() guard. + if (!socket) initSocket(); + const bitrateValue = document.getElementById('stream-quality').value + 'k'; + socket.emit('start_broadcast', { bitrate: bitrateValue }); + socket.emit('get_listener_count'); + // Validate state before starting if (mediaRecorder.state === 'inactive') { mediaRecorder.start(250); @@ -2240,12 +2292,6 @@ function startBroadcast() { document.getElementById('broadcast-status').textContent = 'LIVE'; document.getElementById('broadcast-status').classList.add('live'); - // Notify server that broadcast is active (listeners use MP3 stream) - if (!socket) initSocket(); - const bitrateValue = document.getElementById('stream-quality').value + 'k'; - socket.emit('start_broadcast', { bitrate: bitrateValue }); - socket.emit('get_listener_count'); - console.log('[OK] Broadcasting started successfully!'); console.log('TIP TIP: Play a track on Deck A or B to stream audio'); @@ -2276,23 +2322,16 @@ function stopBroadcast() { } if (streamDestination) { + // Only disconnect the specific stream connection — do NOT call .disconnect() + // with no args as that also removes the audioCtx.destination connection and + // causes an audible pop / silence gap for locally-monitored decks. if (decks.A.crossfaderGain) { - try { - decks.A.crossfaderGain.disconnect(streamDestination); - decks.A.crossfaderGain.disconnect(); - decks.A.crossfaderGain.connect(audioCtx.destination); - } catch (e) { - console.warn('Error restoring Deck A audio:', e); - } + try { decks.A.crossfaderGain.disconnect(streamDestination); } + catch (e) { console.warn('Error disconnecting Deck A from stream:', e); } } if (decks.B.crossfaderGain) { - try { - decks.B.crossfaderGain.disconnect(streamDestination); - decks.B.crossfaderGain.disconnect(); - decks.B.crossfaderGain.connect(audioCtx.destination); - } catch (e) { - console.warn('Error restoring Deck B audio:', e); - } + try { decks.B.crossfaderGain.disconnect(streamDestination); } + catch (e) { console.warn('Error disconnecting Deck B from stream:', e); } } streamDestination = null; } @@ -2325,25 +2364,16 @@ function restartBroadcast() { streamProcessor = null; } - // Clean up old stream destination + // Clean up old stream destination — only disconnect the stream leg, + // not the audioCtx.destination leg, to avoid an audible glitch. if (streamDestination) { if (decks.A.crossfaderGain) { - try { - decks.A.crossfaderGain.disconnect(streamDestination); - decks.A.crossfaderGain.disconnect(); - decks.A.crossfaderGain.connect(audioCtx.destination); - } catch (e) { - console.warn('Error cleaning up Deck A:', e); - } + try { decks.A.crossfaderGain.disconnect(streamDestination); } + catch (e) { console.warn('Error cleaning up Deck A:', e); } } if (decks.B.crossfaderGain) { - try { - decks.B.crossfaderGain.disconnect(streamDestination); - decks.B.crossfaderGain.disconnect(); - decks.B.crossfaderGain.connect(audioCtx.destination); - } catch (e) { - console.warn('Error cleaning up Deck B:', e); - } + try { decks.B.crossfaderGain.disconnect(streamDestination); } + catch (e) { console.warn('Error cleaning up Deck B:', e); } } streamDestination = null; } diff --git a/settings.json b/settings.json index 1ce10d5..aa403f3 100644 --- a/settings.json +++ b/settings.json @@ -10,7 +10,8 @@ "audio": { "recording_sample_rate": 48000, "recording_format": "wav", - "stream_server_url": "http://54.37.246.24:5001/" + "stream_server_url": "http://54.37.246.24:5000", + "dj_panel_password": "techymusic//789//" }, "ui": { "neon_mode": 2 diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index fc0d76b..0490d1e 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -1,10 +1,147 @@ +use std::path::{Path, PathBuf}; +use std::fs; +use serde_json::{json, Value}; +use tauri::Manager; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +fn home_dir() -> PathBuf { + std::env::var("HOME") + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from("/home")) +} + +fn settings_file(app: &tauri::AppHandle) -> PathBuf { + app.path() + .app_local_data_dir() + .unwrap_or_else(|_| home_dir().join(".local/share/techdj")) + .join("settings.json") +} + +fn read_settings(app: &tauri::AppHandle) -> Value { + fs::read_to_string(settings_file(app)) + .ok() + .and_then(|s| serde_json::from_str::(&s).ok()) + .unwrap_or_else(|| json!({})) +} + +// --------------------------------------------------------------------------- +// Tauri commands +// --------------------------------------------------------------------------- + +/// Returns the current music folder path stored in app settings. +/// Falls back to ~/Music if nothing is saved. +#[tauri::command] +fn get_music_folder(app: tauri::AppHandle) -> String { + let settings = read_settings(&app); + if let Some(s) = settings["music_folder"].as_str() { + if !s.is_empty() { + return s.to_string(); + } + } + home_dir().join("Music").to_string_lossy().into_owned() +} + +/// Persists the chosen music folder to the Tauri app-local settings file. +#[tauri::command] +fn save_music_folder(app: tauri::AppHandle, path: String) -> Value { + let mut settings = read_settings(&app); + settings["music_folder"] = json!(path); + let sf = settings_file(&app); + if let Some(parent) = sf.parent() { + let _ = fs::create_dir_all(parent); + } + match fs::write(&sf, serde_json::to_string_pretty(&settings).unwrap_or_default()) { + Ok(_) => json!({ "success": true }), + Err(e) => json!({ "success": false, "error": e.to_string() }), + } +} + +/// Recursively scans `music_dir` for supported audio files and returns an +/// array of `{ title, file, absolutePath }` objects — same shape as the +/// Flask `/library.json` endpoint so the front-end works without changes. +#[tauri::command] +fn scan_library(music_dir: String) -> Vec { + let mut tracks = Vec::new(); + let p = Path::new(&music_dir); + if p.is_dir() { + scan_dir(p, &mut tracks); + } + tracks +} + +fn scan_dir(dir: &Path, tracks: &mut Vec) { + let Ok(rd) = fs::read_dir(dir) else { return }; + let mut entries: Vec<_> = rd.flatten().collect(); + entries.sort_by_key(|e| e.file_name()); + for entry in entries { + let p = entry.path(); + if p.is_dir() { + scan_dir(&p, tracks); + } else if let Some(ext) = p.extension() { + let ext_lc = ext.to_string_lossy().to_lowercase(); + if matches!(ext_lc.as_str(), "mp3" | "m4a" | "wav" | "flac" | "ogg" | "aac") { + let title = p + .file_stem() + .map(|s| s.to_string_lossy().into_owned()) + .unwrap_or_default(); + let file_name = p + .file_name() + .map(|s| s.to_string_lossy().into_owned()) + .unwrap_or_default(); + let abs = p.to_string_lossy().into_owned(); + tracks.push(json!({ + "title": title, + "file": format!("music_proxy/{}", file_name), + "absolutePath": abs, + })); + } + } + } +} + +/// Lists subdirectories at `path` — replaces the Flask `/browse_directories` +/// endpoint for the in-app folder picker. +#[tauri::command] +fn list_dirs(path: String) -> Value { + let dir = Path::new(&path); + let mut entries: Vec = Vec::new(); + + if let Some(parent) = dir.parent() { + entries.push(json!({ "name": "..", "path": parent.to_string_lossy(), "isDir": true })); + } + + if let Ok(rd) = fs::read_dir(dir) { + let mut dirs: Vec<_> = rd.flatten().filter(|e| e.path().is_dir()).collect(); + dirs.sort_by_key(|e| e.file_name()); + for d in dirs { + entries.push(json!({ + "name": d.file_name().to_string_lossy(), + "path": d.path().to_string_lossy(), + "isDir": true, + })); + } + } + + json!({ "success": true, "path": path, "entries": entries }) +} + +// --------------------------------------------------------------------------- +// App entry point +// --------------------------------------------------------------------------- + #[cfg_attr(mobile, tauri::mobile_entry_point)] pub fn run() { tauri::Builder::default() - // Grant the WebView direct read access to local audio files so - // convertFileSrc() can serve tracks from $HOME without going - // through the Flask proxy. .plugin(tauri_plugin_fs::init()) + .invoke_handler(tauri::generate_handler![ + get_music_folder, + save_music_folder, + scan_library, + list_dirs, + ]) .run(tauri::generate_context!()) .expect("error while running TechDJ"); } diff --git a/src-tauri/tauri.conf.json b/src-tauri/tauri.conf.json index 4730f4c..b167baf 100644 --- a/src-tauri/tauri.conf.json +++ b/src-tauri/tauri.conf.json @@ -23,10 +23,7 @@ "security": { "assetProtocol": { "enable": true, - "scope": [ - "$HOME/**", - "$HOME/Music/**" - ] + "scope": ["$HOME/**"] } } }, diff --git a/techdj_qt.py b/techdj_qt.py index 6ece249..a915314 100644 --- a/techdj_qt.py +++ b/techdj_qt.py @@ -5,7 +5,9 @@ import json import random import math import time +import queue import shutil +import threading import requests import re import socketio @@ -861,10 +863,13 @@ class RecordingWorker(QProcess): print(f"[RECORDING ERROR] {err}") class StreamingWorker(QThread): - """Streams this app's isolated audio output to a server using Socket.IO. + """Streams the currently-playing deck's audio file to the server. - PULSE_SINK is set at startup so ALL audio from this process goes to the - virtual sink automatically. We just capture from its monitor here. + Instead of capturing from a PulseAudio monitor (which is unreliable when + Qt6 uses the PipeWire-native or ALSA audio backend), this worker reads the + file being played directly through ffmpeg with '-re' (real-time rate) and + sends the resulting MP3 bytes to the server as audio_chunk events. The + server distributes them to all connected listener browsers via /stream.mp3. """ streaming_started = pyqtSignal() streaming_error = pyqtSignal(str) @@ -879,6 +884,8 @@ class StreamingWorker(QThread): self.is_running = False self.ffmpeg_proc = None self._broadcast_started = False + # Thread-safe command queue: ('play', file_path, position_ms) or ('stop',) + self._file_cmd_queue = queue.Queue(maxsize=20) def on_connect(self): print("[SOCKET] Connected to DJ server") @@ -968,56 +975,106 @@ class StreamingWorker(QThread): # wait_timeout: how long to wait for the server to respond during connect self.sio.connect(self.stream_url, wait_timeout=10, headers=connect_headers) - source = get_audio_capture_source() - print(f"[STREAM] Capturing from: {source}") + print("[STREAM] Connected — waiting for deck to start playing...") - cmd = [ - "ffmpeg", - "-hide_banner", - "-loglevel", "error", - # Disable input buffering so frames reach the pipe immediately - "-fflags", "nobuffer", - "-f", "pulse", - "-i", source, - "-ac", "2", - "-ar", "44100", - "-b:a", "128k", - "-af", "aresample=async=1", - # Flush every packet — critical for low-latency pipe streaming - "-flush_packets", "1", - "-f", "mp3", - "pipe:1", - ] - self.ffmpeg_proc = subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0 - ) + # File-based streaming loop. + # Waits for ('play', path, pos_ms) commands from the main thread, + # then pipes the file through ffmpeg at real-time rate to the server. + # This is reliable regardless of Qt's audio backend (PulseAudio / + # PipeWire-native / ALSA), since we read the file directly. + current_proc = None - # Drain stderr in a real daemon thread so the OS pipe buffer never - # fills up and blocks stdout (classic Python subprocess deadlock). - import threading as _threading - stderr_thread = _threading.Thread( - target=self._drain_stderr, args=(self.ffmpeg_proc,), daemon=True - ) - stderr_thread.start() + while self.is_running: + # Block briefly waiting for a command; loop allows is_running re-check + try: + cmd = self._file_cmd_queue.get(timeout=0.25) + except queue.Empty: + # If the current ffmpeg exited on its own (track ended), clean up + if current_proc is not None and current_proc.poll() is not None: + current_proc = None + self.ffmpeg_proc = None + continue - while self.is_running and self.ffmpeg_proc.poll() is None: - # 4096 bytes ≈ 10 MP3 frames ≈ ~260 ms at 128 kbps — low-latency chunks - chunk = self.ffmpeg_proc.stdout.read(4096) - if not chunk: - break - sio = self.sio # local ref guards against stop_streaming() race - if sio and sio.connected: - sio.emit('audio_chunk', chunk) + if cmd[0] == 'play': + # Kill previous ffmpeg before starting a new one + if current_proc and current_proc.poll() is None: + current_proc.terminate() + try: + current_proc.wait(timeout=1.0) + except Exception: + current_proc.kill() - # Detect unexpected ffmpeg exit during an active stream - if self.is_running: - ret = self.ffmpeg_proc.poll() if self.ffmpeg_proc else None - if ret is not None and ret != 0: - self.streaming_error.emit( - f"FFmpeg exited with code {ret}.\n" - "Check that PulseAudio / PipeWire is running and the " - "virtual audio sink was created successfully." + _, file_path, position_ms = cmd + position_secs = max(0.0, position_ms / 1000.0) + print(f"[STREAM] Streaming file: {file_path} from {position_secs:.1f}s") + + ffmpeg_cmd = [ + "ffmpeg", + "-hide_banner", + "-loglevel", "error", + "-re", # real-time output rate — prevents flooding the socket + "-ss", f"{position_secs:.3f}", # seek to playback position + "-i", file_path, + "-vn", # discard video (cover art etc.) + "-ac", "2", + "-ar", "44100", + "-b:a", "128k", + "-flush_packets", "1", + "-f", "mp3", + "pipe:1", + ] + current_proc = subprocess.Popen( + ffmpeg_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=0, ) + self.ffmpeg_proc = current_proc + threading.Thread( + target=self._drain_stderr, args=(current_proc,), daemon=True + ).start() + + # Inner read loop for this file + while self.is_running: + # Non-blocking check for new command (switch track / stop) + try: + next_cmd = self._file_cmd_queue.get_nowait() + # Re-queue so the outer loop handles it + try: + self._file_cmd_queue.put_nowait(next_cmd) + except queue.Full: + pass + # Kill current proc so read() below returns immediately + if current_proc.poll() is None: + current_proc.terminate() + break + except queue.Empty: + pass + + if current_proc.poll() is not None: + current_proc = None + self.ffmpeg_proc = None + break + + chunk = current_proc.stdout.read(4096) + if not chunk: + current_proc = None + self.ffmpeg_proc = None + break + + sio = self.sio + if sio and sio.connected: + sio.emit('audio_chunk', chunk) + + elif cmd[0] == 'stop': + if current_proc and current_proc.poll() is None: + current_proc.terminate() + try: + current_proc.wait(timeout=1.0) + except Exception: + current_proc.kill() + current_proc = None + self.ffmpeg_proc = None except Exception as e: self.streaming_error.emit(f"Streaming error: {e}") @@ -1040,6 +1097,37 @@ class StreamingWorker(QThread): except Exception as e: print(f"[SOCKET] emit_if_connected error: {e}") + def switch_file(self, file_path, position_ms=0): + """Called from the main thread when a deck starts playing. + + Signals the streaming loop to kill the current ffmpeg (if any) and + start a new one reading *file_path* from *position_ms*. + """ + if not file_path: + return + # Drop any stale pending commands so only the latest file matters + while not self._file_cmd_queue.empty(): + try: + self._file_cmd_queue.get_nowait() + except queue.Empty: + break + try: + self._file_cmd_queue.put_nowait(('play', file_path, int(position_ms))) + except queue.Full: + pass + + def stop_file(self): + """Called from the main thread when a deck pauses or stops.""" + while not self._file_cmd_queue.empty(): + try: + self._file_cmd_queue.get_nowait() + except queue.Empty: + break + try: + self._file_cmd_queue.put_nowait(('stop',)) + except queue.Full: + pass + def stop_streaming(self): """Thread-safe stop: capture refs locally before clearing to avoid TOCTOU.""" self.is_running = False @@ -1305,6 +1393,7 @@ class DeckWidget(QGroupBox): self.loop_btns = [] self.xf_vol = 100 self.current_title = "" + self.current_file_path = None self.loop_timer = QTimer(self) self.loop_timer.setInterval(LOOP_CHECK_INTERVAL) @@ -1541,6 +1630,7 @@ class DeckWidget(QGroupBox): try: self.player.setSource(QUrl.fromLocalFile(str(p.absolute()))) self.current_title = p.stem + self.current_file_path = str(p.absolute()) self.lbl_tr.setText(p.stem.upper()) self.vinyl.set_speed(0) self.vinyl.angle = 0 @@ -1610,11 +1700,18 @@ class DeckWidget(QGroupBox): self.player.play() self.vinyl.start_spin() self._emit_playing_state(True) + # Stream the file directly to listeners (reliable regardless of audio backend) + mw = self.window() + if hasattr(mw, 'streaming_worker') and self.current_file_path: + mw.streaming_worker.switch_file(self.current_file_path, self.player.position()) def pause(self): self.player.pause() self.vinyl.stop_spin() self._emit_playing_state(False) + mw = self.window() + if hasattr(mw, 'streaming_worker'): + mw.streaming_worker.stop_file() def stop(self): self.player.stop() @@ -1623,6 +1720,9 @@ class DeckWidget(QGroupBox): self.vinyl.update() self.clear_loop() self._emit_playing_state(False) + mw = self.window() + if hasattr(mw, 'streaming_worker'): + mw.streaming_worker.stop_file() def on_position_changed(self, pos): self.wave.set_position(pos)