Fix: isolate DJ audio from system audio in streaming/recording

The Qt app was using PulseAudio's 'default.monitor' which captures ALL
system audio (YouTube Music, Spotify, browser, etc.). This caused listeners
to hear whatever was playing on the DJ's system, not just the DJ mix.

Added PulseAudioIsolator class that:
- Creates a virtual PulseAudio null sink ('techdj_stream')
- Routes only this app's audio to the virtual sink
- Creates a loopback so the DJ still hears their mix through speakers
- Captures from the virtual sink's monitor (only DJ audio)
- Reference-counted: shared between streaming and recording workers
- Automatically cleans up stale sinks from previous crashes
- Periodically re-routes audio to catch new tracks/streams
- Falls back to default.monitor if pactl is unavailable

Both StreamingWorker and RecordingWorker now use the isolator.
This commit is contained in:
ComputerTech 2026-03-09 19:45:43 +00:00
parent abf907ddfb
commit cddce99b29
1 changed files with 202 additions and 24 deletions

View File

@ -10,6 +10,7 @@ import requests
import re import re
import socketio import socketio
import subprocess import subprocess
import threading
from pathlib import Path from pathlib import Path
import soundfile as sf import soundfile as sf
@ -108,6 +109,164 @@ QSlider#crossfader::handle:horizontal:hover {
} }
""" """
# --- AUDIO ISOLATION ---
# PulseAudio virtual sink so streaming/recording only captures THIS app's audio,
# not YouTube Music, Spotify, system sounds, etc.
class PulseAudioIsolator:
"""Manages a PulseAudio/PipeWire virtual sink for app-only audio capture.
Usage:
source = PulseAudioIsolator.acquire() # "techdj_stream.monitor" or "default.monitor"
...capture from `source`...
PulseAudioIsolator.release()
Reference-counted: the virtual sink stays alive until the last user releases it.
A loopback is created so the DJ still hears their mix through speakers.
"""
_ref_count = 0
_sink_module_id = None
_loopback_module_id = None
_lock = threading.Lock()
SINK_NAME = "techdj_stream"
@classmethod
def acquire(cls):
"""Create the virtual sink (if needed) and route app audio to it.
Returns the PulseAudio monitor source name to capture from."""
with cls._lock:
cls._ref_count += 1
if cls._ref_count == 1:
if not cls._create_sink():
cls._ref_count -= 1
return "default.monitor"
cls._route_app_audio(cls.SINK_NAME)
return f"{cls.SINK_NAME}.monitor"
@classmethod
def release(cls):
"""Release the virtual sink. Removed when last user releases."""
with cls._lock:
cls._ref_count = max(0, cls._ref_count - 1)
if cls._ref_count == 0:
cls._destroy_sink()
@classmethod
def refresh_routes(cls):
"""Re-route app audio (call periodically to catch new PulseAudio streams)."""
with cls._lock:
if cls._ref_count > 0 and cls._sink_module_id:
cls._route_app_audio(cls.SINK_NAME)
# -- internal helpers --
@classmethod
def _create_sink(cls):
try:
if not shutil.which("pactl"):
print("[AUDIO] pactl not found - cannot isolate audio")
return False
# Clean up any stale sinks from a previous crash
cls._cleanup_stale_sinks()
# Create a null sink dedicated to this app
result = subprocess.run(
['pactl', 'load-module', 'module-null-sink',
f'sink_name={cls.SINK_NAME}',
f'sink_properties=device.description="TechDJ_Stream"'],
capture_output=True, text=True, timeout=5
)
if result.returncode != 0:
print(f"[AUDIO] Virtual sink failed: {result.stderr.strip()}")
return False
cls._sink_module_id = result.stdout.strip()
# Loopback: route virtual-sink audio back to speakers so DJ can monitor
result = subprocess.run(
['pactl', 'load-module', 'module-loopback',
f'source={cls.SINK_NAME}.monitor',
'latency_msec=50'],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
cls._loopback_module_id = result.stdout.strip()
else:
print(f"[AUDIO] Loopback failed (DJ may not hear audio): {result.stderr.strip()}")
cls._route_app_audio(cls.SINK_NAME)
print("[AUDIO] Virtual sink active - app audio isolated")
return True
except Exception as e:
print(f"[AUDIO] Virtual sink error: {e}")
return False
@classmethod
def _cleanup_stale_sinks(cls):
"""Remove any leftover techdj_stream sinks from a previous crash."""
try:
result = subprocess.run(
['pactl', 'list', 'modules', 'short'],
capture_output=True, text=True, timeout=5
)
for line in result.stdout.strip().split('\n'):
if 'module-null-sink' in line and cls.SINK_NAME in line:
module_id = line.split()[0]
subprocess.run(['pactl', 'unload-module', module_id],
capture_output=True, timeout=5)
print(f"[AUDIO] Cleaned up stale sink module {module_id}")
if 'module-loopback' in line and cls.SINK_NAME in line:
module_id = line.split()[0]
subprocess.run(['pactl', 'unload-module', module_id],
capture_output=True, timeout=5)
print(f"[AUDIO] Cleaned up stale loopback module {module_id}")
except Exception:
pass
@classmethod
def _destroy_sink(cls):
try:
cls._route_app_audio('@DEFAULT_SINK@')
except Exception:
pass
for mid in (cls._loopback_module_id, cls._sink_module_id):
if mid:
try:
subprocess.run(['pactl', 'unload-module', mid],
capture_output=True, timeout=5)
except Exception:
pass
cls._sink_module_id = None
cls._loopback_module_id = None
print("[AUDIO] Virtual sink removed")
@classmethod
def _route_app_audio(cls, target_sink):
"""Move this process's PulseAudio sink-inputs to *target_sink*."""
pid = str(os.getpid())
try:
result = subprocess.run(
['pactl', 'list', 'sink-inputs'],
capture_output=True, text=True, timeout=5
)
current_idx = None
for line in result.stdout.split('\n'):
s = line.strip()
if s.startswith('Sink Input #'):
current_idx = s.split('#')[1].strip()
elif 'application.process.id' in s and current_idx:
m = re.search(r'"(\d+)"', s)
if m and m.group(1) == pid:
subprocess.run(
['pactl', 'move-sink-input', current_idx, target_sink],
capture_output=True, timeout=5
)
current_idx = None
except Exception:
pass
# --- WORKERS --- # --- WORKERS ---
class DownloadThread(QThread): class DownloadThread(QThread):
@ -601,42 +760,40 @@ class YTResultDialog(QDialog):
return i.data(Qt.ItemDataRole.UserRole) if i else None return i.data(Qt.ItemDataRole.UserRole) if i else None
class RecordingWorker(QProcess): class RecordingWorker(QProcess):
"""Records system audio output using FFmpeg""" """Records this app's audio output (isolated) using FFmpeg."""
recording_started = pyqtSignal() recording_started = pyqtSignal()
recording_error = pyqtSignal(str) recording_error = pyqtSignal(str)
def __init__(self, parent=None): def __init__(self, parent=None):
super().__init__(parent) super().__init__(parent)
self.output_file = "" self.output_file = ""
self._using_virtual_sink = False
self.readyReadStandardError.connect(self.handle_error) self.readyReadStandardError.connect(self.handle_error)
def start_recording(self, output_path): def start_recording(self, output_path):
"""Start recording system audio to file""" """Start recording this app's audio to file."""
self.output_file = output_path self.output_file = output_path
# Check if FFmpeg is available
if not shutil.which("ffmpeg"): if not shutil.which("ffmpeg"):
self.recording_error.emit("FFmpeg not found. Install with: sudo apt install ffmpeg") self.recording_error.emit("FFmpeg not found. Install with: sudo apt install ffmpeg")
return False return False
print(f"[RECORDING] Starting: {output_path}") print(f"[RECORDING] Starting: {output_path}")
# FFmpeg command to record PulseAudio output with high quality # Acquire isolated audio source (virtual sink or fallback)
# IMPORTANT: Use .monitor to capture OUTPUT (what you hear), not INPUT (microphone) source = PulseAudioIsolator.acquire()
# -f pulse: use PulseAudio self._using_virtual_sink = (source != "default.monitor")
# -i default.monitor: capture system audio OUTPUT (not microphone) if not self._using_virtual_sink:
# -ac 2: stereo print("[RECORDING] WARNING: Using default.monitor - ALL system audio will be recorded")
# -ar 48000: 48kHz sample rate (higher quality than 44.1kHz)
# -acodec pcm_s16le: uncompressed 16-bit PCM (lossless)
# -sample_fmt s16: 16-bit samples
args = [ args = [
"-f", "pulse", "-f", "pulse",
"-i", "default.monitor", # .monitor captures OUTPUT, not microphone! "-i", source,
"-ac", "2", "-ac", "2",
"-ar", "48000", # Higher sample rate for better quality "-ar", "48000",
"-acodec", "pcm_s16le", # Lossless PCM codec "-acodec", "pcm_s16le",
"-sample_fmt", "s16", "-sample_fmt", "s16",
"-y", # Overwrite if exists "-y",
output_path output_path
] ]
@ -645,14 +802,15 @@ class RecordingWorker(QProcess):
return True return True
def stop_recording(self): def stop_recording(self):
"""Stop the recording""" """Stop the recording and release the virtual sink."""
if self.state() == QProcess.ProcessState.Running: if self.state() == QProcess.ProcessState.Running:
print("[RECORDING] Stopping...") print("[RECORDING] Stopping...")
# Send 'q' to FFmpeg to gracefully stop
self.write(b"q") self.write(b"q")
self.waitForFinished(3000) self.waitForFinished(3000)
if self.state() == QProcess.ProcessState.Running: if self.state() == QProcess.ProcessState.Running:
self.kill() self.kill()
PulseAudioIsolator.release()
self._using_virtual_sink = False
def handle_error(self): def handle_error(self):
"""Handle FFmpeg stderr (which includes progress info)""" """Handle FFmpeg stderr (which includes progress info)"""
@ -661,7 +819,11 @@ class RecordingWorker(QProcess):
print(f"[RECORDING ERROR] {err}") print(f"[RECORDING ERROR] {err}")
class StreamingWorker(QThread): class StreamingWorker(QThread):
"""Streams system audio output to a server using Socket.IO Chunks""" """Streams this app's audio output to a server using Socket.IO.
Uses PulseAudioIsolator to capture ONLY the DJ app's audio,
not YouTube Music, Spotify, or other system sounds.
"""
streaming_started = pyqtSignal() streaming_started = pyqtSignal()
streaming_error = pyqtSignal(str) streaming_error = pyqtSignal(str)
listener_count = pyqtSignal(int) listener_count = pyqtSignal(int)
@ -672,6 +834,7 @@ class StreamingWorker(QThread):
self.stream_url = "" self.stream_url = ""
self.is_running = False self.is_running = False
self.ffmpeg_proc = None self.ffmpeg_proc = None
self._using_virtual_sink = False
def on_connect(self): def on_connect(self):
print("[SOCKET] Connected to DJ server") print("[SOCKET] Connected to DJ server")
@ -689,23 +852,27 @@ class StreamingWorker(QThread):
def run(self): def run(self):
try: try:
# Create a fresh Socket.IO client for each session to avoid stale state # Create a fresh Socket.IO client for each session
self.sio = socketio.Client() self.sio = socketio.Client()
self.sio.on('connect', self.on_connect) self.sio.on('connect', self.on_connect)
self.sio.on('disconnect', self.on_disconnect) self.sio.on('disconnect', self.on_disconnect)
self.sio.on('listener_count', self.on_listener_count) self.sio.on('listener_count', self.on_listener_count)
self.sio.on('connect_error', self.on_connect_error) self.sio.on('connect_error', self.on_connect_error)
# Connect to socket
self.sio.connect(self.stream_url) self.sio.connect(self.stream_url)
# Start FFmpeg to capture audio and output to pipe # Acquire isolated audio source (virtual sink or fallback)
source = PulseAudioIsolator.acquire()
self._using_virtual_sink = (source != "default.monitor")
if not self._using_virtual_sink:
print("[STREAM] WARNING: Capturing ALL system audio (pactl unavailable)")
cmd = [ cmd = [
"ffmpeg", "ffmpeg",
"-hide_banner", "-hide_banner",
"-loglevel", "error", "-loglevel", "error",
"-f", "pulse", "-f", "pulse",
"-i", "default.monitor", "-i", source,
"-ac", "2", "-ac", "2",
"-ar", "44100", "-ar", "44100",
"-f", "mp3", "-f", "mp3",
@ -713,13 +880,22 @@ class StreamingWorker(QThread):
"-af", "aresample=async=1", "-af", "aresample=async=1",
"pipe:1" "pipe:1"
] ]
self.ffmpeg_proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=8192) self.ffmpeg_proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=8192
)
last_reroute = time.time()
while self.is_running and self.ffmpeg_proc.poll() is None: while self.is_running and self.ffmpeg_proc.poll() is None:
chunk = self.ffmpeg_proc.stdout.read(8192) chunk = self.ffmpeg_proc.stdout.read(8192)
if not chunk: break if not chunk:
break
if self.sio.connected: if self.sio.connected:
self.sio.emit('audio_chunk', chunk) self.sio.emit('audio_chunk', chunk)
# Periodically re-route audio (handles new sink-inputs from track changes)
if self._using_virtual_sink and time.time() - last_reroute > 3:
PulseAudioIsolator.refresh_routes()
last_reroute = time.time()
except Exception as e: except Exception as e:
self.streaming_error.emit(f"Streaming thread error: {e}") self.streaming_error.emit(f"Streaming thread error: {e}")
@ -738,6 +914,8 @@ class StreamingWorker(QThread):
try: self.ffmpeg_proc.terminate() try: self.ffmpeg_proc.terminate()
except: pass except: pass
self.ffmpeg_proc = None self.ffmpeg_proc = None
PulseAudioIsolator.release()
self._using_virtual_sink = False
if self.sio and self.sio.connected: if self.sio and self.sio.connected:
try: try:
self.sio.emit('stop_broadcast') self.sio.emit('stop_broadcast')