techdj/techdj_qt.py

2668 lines
107 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import sys
import os
import json
import random
import math
import time
import shutil
import requests
import re
import socketio
from urllib.parse import urlparse
import subprocess
import atexit
from pathlib import Path
import soundfile as sf
# --- BACKEND OVERRIDE ---
# On Linux, GStreamer (default) often miscalculates MP3 duration for VBR files.
# FFmpeg backend is much more reliable if available.
os.environ["QT_MULTIMEDIA_BACKEND"] = "ffmpeg"
# --- DEPENDENCY CHECK ---
try:
import yt_dlp
HAS_YTDLP = True
except ImportError:
HAS_YTDLP = False
print("CRITICAL: yt-dlp not found. Run 'pip install yt-dlp'")
from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QHBoxLayout, QPushButton, QSlider, QLabel,
QListWidget, QGroupBox, QListWidgetItem,
QLineEdit, QGridLayout, QAbstractItemView,
QDialog, QMessageBox, QFrame, QComboBox, QProgressBar,
QTableWidget, QTableWidgetItem, QHeaderView, QTabWidget,
QCheckBox, QSpinBox, QFileDialog)
from PyQt6.QtMultimedia import QMediaPlayer, QAudioOutput
from PyQt6.QtCore import Qt, QUrl, QTimer, QPointF, QRectF, pyqtSignal, QProcess, QThread
from PyQt6.QtGui import QPainter, QColor, QPen, QBrush, QKeySequence, QIcon, QRadialGradient, QPainterPath, QShortcut
# --- CONFIGURATION ---
DEFAULT_BPM = 124
ANIMATION_FPS = 30
ANIMATION_INTERVAL = 1000 // ANIMATION_FPS # ms between animation frames
LOOP_CHECK_INTERVAL = 20 # ms between loop boundary checks
MS_PER_MINUTE = 60000
NUM_EQ_BANDS = 3
MAX_SLIDER_VALUE = 100
STYLESHEET = """
QMainWindow { background-color: #050505; }
QGroupBox { background-color: #0a0a0a; border-radius: 6px; margin-top: 10px; font-family: "Courier New"; }
QGroupBox#Deck_A { border: 2px solid #00ffff; }
QGroupBox#Deck_A::title { color: #00ffff; font-weight: bold; subcontrol-origin: margin; left: 10px; }
QGroupBox#Deck_B { border: 2px solid #ff00ff; }
QGroupBox#Deck_B::title { color: #ff00ff; font-weight: bold; subcontrol-origin: margin; left: 10px; }
QPushButton { background-color: #000; color: #fff; border: 1px solid #444; padding: 6px; font-weight: bold; border-radius: 4px; }
QPushButton:hover { background-color: #222; border: 1px solid #fff; }
QPushButton:pressed { background-color: #444; }
QPushButton#btn_neon { font-family: "Courier New"; margin-bottom: 5px; font-size: 12px; }
QPushButton#btn_yt_go { background-color: #cc0000; border: 1px solid #ff0000; color: white; font-weight: bold; }
QPushButton#btn_yt_go:hover { background-color: #ff0000; }
QPushButton#btn_remove { background-color: #330000; color: #ff0000; border: 1px solid #550000; padding: 0px; font-size: 10px; min-width: 20px; min-height: 20px; }
QPushButton#btn_remove:hover { background-color: #ff0000; color: #fff; border-color: #ff5555; }
QPushButton#btn_loop { background-color: #1a1a1a; color: #888; border: 1px solid #333; font-size: 11px; }
QPushButton#btn_loop:hover { border-color: #ffa500; color: #ffa500; }
QPushButton#btn_loop:checked { background-color: #ffa500; color: #000; border: 1px solid #ffcc00; }
QPushButton#btn_loop_exit { color: #ff3333; border: 1px solid #550000; font-size: 11px; }
QPushButton#btn_loop_exit:hover { background-color: #330000; border-color: #ff0000; }
QPushButton[mode="0"] { color: #00ff00; border-color: #005500; }
QPushButton[mode="1"] { color: #ffa500; border-color: #553300; }
QPushButton#btn_lib_local { color: #00ffff; border-color: #008888; }
QPushButton#btn_lib_local:checked { background-color: #00ffff; color: #000; font-weight: bold; }
QPushButton#btn_lib_server { color: #ff00ff; border-color: #880088; }
QPushButton#btn_lib_server:checked { background-color: #ff00ff; color: #000; font-weight: bold; }
QPushButton[mode="2"] { color: #ff0000; border-color: #550000; }
QLineEdit { background-color: #111; color: #fff; border: 1px solid #555; padding: 6px; font-family: "Courier New"; }
QLineEdit:focus { border: 1px solid #00ff00; }
QListWidget { background-color: #000; border: 1px solid #333; color: #888; font-family: "Courier New"; }
QListWidget::item:selected { background-color: #222; color: #fff; border: 1px solid #00ff00; }
QListWidget#queue_list::item:selected { background-color: #331111; color: #ffaaaa; border: 1px solid #550000; }
QSlider::groove:horizontal { border: 1px solid #333; height: 4px; background: #222; }
QSlider::handle:horizontal { background: #fff; border: 2px solid #fff; width: 14px; height: 14px; margin: -6px 0; border-radius: 8px; }
QSlider::groove:vertical { border: 1px solid #333; width: 6px; background: #111; border-radius: 3px; }
QSlider::handle:vertical { background: #ccc; border: 1px solid #fff; height: 14px; width: 14px; margin: 0 -5px; border-radius: 4px; }
QSlider::sub-page:vertical { background: #444; border-radius: 3px; }
QSlider::add-page:vertical { background: #222; border-radius: 3px; }
QSlider[eq="vol"]::handle:vertical { background: #fff; border: 1px solid #fff; }
QSlider[eq="high"]::handle:vertical { background: #00ffff; border: 1px solid #00ffff; }
QSlider[eq="mid"]::handle:vertical { background: #00ff00; border: 1px solid #00ff00; }
QSlider[eq="low"]::handle:vertical { background: #ff0000; border: 1px solid #ff0000; }
QSlider#crossfader::groove:horizontal {
border: 1px solid #777;
height: 16px;
background: qlineargradient(x1:0, y1:0, x2:1, y2:0, stop:0 #00ffff, stop:0.5 #111, stop:1 #ff00ff);
border-radius: 8px;
}
QSlider#crossfader::handle:horizontal {
background: qlineargradient(x1:0, y1:0, x2:0, y2:1, stop:0 #eee, stop:1 #888);
border: 2px solid #fff;
width: 32px;
height: 36px;
margin: -11px 0;
border-radius: 6px;
}
QSlider#crossfader::handle:horizontal:hover {
background: qlineargradient(x1:0, y1:0, x2:0, y2:1, stop:0 #fff, stop:1 #aaa);
border-color: #00ff00;
}
"""
# --- AUDIO ISOLATION ---
# We create a PulseAudio virtual null sink and set PULSE_SINK *before* Qt
# initialises audio. That forces ALL audio from this process to the virtual
# sink automatically no fragile PID-based routing needed.
#
# A loopback copies the virtual-sink audio back to the real speakers so the
# DJ still hears their mix. Streaming/recording capture from the virtual
# sink's monitor, which contains ONLY this app's audio.
_AUDIO_SINK_NAME = "techdj_stream"
_AUDIO_MONITOR = f"{_AUDIO_SINK_NAME}.monitor"
_audio_sink_module = None # pactl module ID for the null sink
_audio_lb_module = None # pactl module ID for the loopback
_audio_isolated = False # True when the virtual sink is active
def _cleanup_stale_sinks():
"""Remove leftover techdj_stream modules from a previous crash."""
try:
result = subprocess.run(
['pactl', 'list', 'modules', 'short'],
capture_output=True, text=True, timeout=5,
)
for line in result.stdout.strip().split('\n'):
if _AUDIO_SINK_NAME in line and (
'module-null-sink' in line or 'module-loopback' in line):
mod_id = line.split()[0]
subprocess.run(['pactl', 'unload-module', mod_id],
capture_output=True, timeout=5)
print(f"[AUDIO] Cleaned up stale module {mod_id}")
except Exception:
pass
def _setup_audio_isolation():
"""Create the virtual sink and set PULSE_SINK.
**MUST** be called before QApplication() so that Qt's audio output
is automatically routed to the virtual sink.
"""
global _audio_sink_module, _audio_lb_module, _audio_isolated
if not shutil.which('pactl'):
print('[AUDIO] pactl not found audio isolation disabled '
'(install pulseaudio-utils or pipewire-pulse)')
return
_cleanup_stale_sinks()
# 1. Create null sink
r = subprocess.run(
['pactl', 'load-module', 'module-null-sink',
f'sink_name={_AUDIO_SINK_NAME}',
f'sink_properties=device.description="TechDJ_Stream"'],
capture_output=True, text=True, timeout=5,
)
if r.returncode != 0:
print(f'[AUDIO] Failed to create virtual sink: {r.stderr.strip()}')
return
_audio_sink_module = r.stdout.strip()
# 2. Loopback → real speakers so the DJ hears the mix
r = subprocess.run(
['pactl', 'load-module', 'module-loopback',
f'source={_AUDIO_MONITOR}',
'latency_msec=50'],
capture_output=True, text=True, timeout=5,
)
if r.returncode == 0:
_audio_lb_module = r.stdout.strip()
else:
print(f'[AUDIO] Loopback failed (DJ may not hear audio): {r.stderr.strip()}')
# 3. Verify the sink and its monitor actually exist
verify = subprocess.run(
['pactl', 'list', 'sources', 'short'],
capture_output=True, text=True, timeout=5,
)
if _AUDIO_MONITOR not in verify.stdout:
print(f'[AUDIO] ERROR: monitor source "{_AUDIO_MONITOR}" not found!')
print(f'[AUDIO] Available sources:\n{verify.stdout.strip()}')
# Tear down the sink we just created since the monitor isn't there
if _audio_sink_module:
subprocess.run(['pactl', 'unload-module', _audio_sink_module],
capture_output=True, timeout=5)
_audio_sink_module = None
return
# 4. Force this process's audio to the virtual sink
os.environ['PULSE_SINK'] = _AUDIO_SINK_NAME
_audio_isolated = True
print(f'[AUDIO] ✓ Virtual sink "{_AUDIO_SINK_NAME}" active (module {_audio_sink_module})')
print(f'[AUDIO] ✓ Loopback active (module {_audio_lb_module})')
print(f'[AUDIO] ✓ PULSE_SINK={_AUDIO_SINK_NAME} — all app audio routed to virtual sink')
print(f'[AUDIO] ✓ Capture source: {_AUDIO_MONITOR}')
def _teardown_audio_isolation():
"""Remove the virtual sink (called at process exit via atexit)."""
global _audio_sink_module, _audio_lb_module, _audio_isolated
for mod_id in (_audio_lb_module, _audio_sink_module):
if mod_id:
try:
subprocess.run(['pactl', 'unload-module', mod_id],
capture_output=True, timeout=5)
except Exception:
pass
_audio_sink_module = None
_audio_lb_module = None
if _audio_isolated:
_audio_isolated = False
os.environ.pop('PULSE_SINK', None)
print('[AUDIO] Virtual sink removed')
def get_audio_capture_source():
"""Return the PulseAudio source to capture from.
If the virtual sink is active this returns its monitor; otherwise
falls back to default.monitor (which captures ALL system audio).
"""
if _audio_isolated:
# Double-check the monitor source still exists (PipeWire can be fussy)
try:
r = subprocess.run(
['pactl', 'list', 'sources', 'short'],
capture_output=True, text=True, timeout=5,
)
if _AUDIO_MONITOR in r.stdout:
print(f'[AUDIO] Capturing from isolated source: {_AUDIO_MONITOR}')
return _AUDIO_MONITOR
else:
print(f'[AUDIO] ERROR: {_AUDIO_MONITOR} disappeared! '
f'Available: {r.stdout.strip()}')
except Exception as e:
print(f'[AUDIO] pactl check failed: {e}')
print('[AUDIO] WARNING: virtual sink not active capturing ALL system audio!')
print('[AUDIO] The listener WILL hear all your system audio (YouTube, Spotify, etc).')
return 'default.monitor'
def _route_qt_audio_to_virtual_sink():
"""Move any PulseAudio/PipeWire sink-inputs from this process to the
virtual techdj_stream sink.
Needed because Qt6 may output audio via the PipeWire-native backend
instead of the PulseAudio compatibility layer, which means PULSE_SINK
has no effect and the monitor source used by the streaming ffmpeg would
capture silence instead of the DJ's music. Calling pactl move-sink-input
works regardless of whether the client originally used PulseAudio or
PipeWire-native, as PipeWire exposes a PulseAudio-compatible socket.
"""
if not _audio_isolated:
return
pid_str = str(os.getpid())
try:
result = subprocess.run(
['pactl', 'list', 'sink-inputs'],
capture_output=True, text=True, timeout=3,
)
current_id = None
current_app_pid = None
for line in result.stdout.splitlines():
stripped = line.strip()
if stripped.startswith('Sink Input #'):
# Flush previous block
if current_id and current_app_pid == pid_str:
subprocess.run(
['pactl', 'move-sink-input', current_id, _AUDIO_SINK_NAME],
capture_output=True, timeout=3,
)
current_id = stripped.split('#')[1].strip()
current_app_pid = None
elif 'application.process.id' in stripped and '"' in stripped:
current_app_pid = stripped.split('"')[1]
# Handle last block
if current_id and current_app_pid == pid_str:
subprocess.run(
['pactl', 'move-sink-input', current_id, _AUDIO_SINK_NAME],
capture_output=True, timeout=3,
)
except Exception:
pass
atexit.register(_teardown_audio_isolation)
# --- WORKERS ---
class DownloadThread(QThread):
progress = pyqtSignal(int)
finished = pyqtSignal(str, bool)
def __init__(self, url, filepath):
super().__init__()
self.url = url
self.filepath = filepath
def run(self):
try:
response = requests.get(self.url, stream=True, timeout=30)
if response.status_code != 200:
self.finished.emit(self.filepath, False)
return
total_size = int(response.headers.get('content-length', 0))
os.makedirs(os.path.dirname(self.filepath), exist_ok=True)
downloaded = 0
with open(self.filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
self.progress.emit(int((downloaded / total_size) * 100))
self.finished.emit(self.filepath, True)
except Exception:
self.finished.emit(self.filepath, False)
class LibraryScannerThread(QThread):
files_found = pyqtSignal(list)
def __init__(self, lib_path):
super().__init__()
self.lib_path = lib_path
def run(self):
files = []
if self.lib_path.exists():
for f in self.lib_path.rglob('*'):
if f.suffix.lower() in ['.mp3', '.wav', '.ogg', '.m4a', '.flac']:
files.append(f)
files.sort(key=lambda x: x.name)
self.files_found.emit(files)
class ServerLibraryFetcher(QThread):
finished = pyqtSignal(list, str, bool)
def __init__(self, url):
super().__init__()
self.url = url
def run(self):
try:
response = requests.get(self.url, timeout=5)
if response.status_code == 200:
self.finished.emit(response.json(), "", True)
else:
self.finished.emit([], f"Server error: {response.status_code}", False)
except Exception as e:
self.finished.emit([], str(e), False)
class YTSearchWorker(QProcess):
results_ready = pyqtSignal(list)
error_occurred = pyqtSignal(str)
def __init__(self, parent=None):
super().__init__(parent)
self.output_buffer = b""
self.readyReadStandardOutput.connect(self.handle_output)
self.finished.connect(self.handle_finished)
def search(self, query):
self.output_buffer = b""
print(f"[DEBUG] Searching for: {query}")
cmd = sys.executable
args = [
"-m", "yt_dlp",
f"ytsearch5:{query}",
"--dump-json",
"--flat-playlist",
"--quiet",
"--no-warnings",
"--compat-options", "no-youtube-unavailable-videos"
]
self.start(cmd, args)
def handle_output(self):
self.output_buffer += self.readAllStandardOutput().data()
def handle_finished(self):
try:
results = []
decoded = self.output_buffer.decode('utf-8', errors='ignore').strip()
for line in decoded.split('\n'):
if line:
try:
results.append(json.loads(line))
except json.JSONDecodeError:
pass
if results:
self.results_ready.emit(results)
else:
self.error_occurred.emit("No results found or network error.")
except Exception as e:
self.error_occurred.emit(str(e))
class YTDownloadWorker(QProcess):
download_finished = pyqtSignal(str)
error_occurred = pyqtSignal(str)
download_progress = pyqtSignal(float) # Progress percentage (0-100)
def __init__(self, parent=None):
super().__init__(parent)
self.final_filename = ""
self.error_log = ""
self.readyReadStandardOutput.connect(self.handle_output)
self.readyReadStandardError.connect(self.handle_error)
self.finished.connect(self.handle_finished)
def download(self, url, dest, audio_format="mp3"):
# 1. Ensure Dest exists
if not os.path.exists(dest):
try:
os.makedirs(dest)
print(f"[DEBUG] Created directory: {dest}")
except Exception as e:
self.error_occurred.emit(f"Could not create folder: {e}")
return
# 2. Check FFmpeg (only needed for MP3 conversion)
if audio_format == "mp3" and not shutil.which("ffmpeg"):
self.error_occurred.emit("CRITICAL ERROR: FFmpeg is missing.\nRun 'sudo apt install ffmpeg' in terminal.")
return
self.final_filename = ""
self.error_log = ""
print(f"[DEBUG] Starting download: {url} -> {dest} (format: {audio_format})")
cmd = sys.executable
out_tmpl = os.path.join(dest, '%(title)s.%(ext)s')
# Build args based on format choice
args = ["-m", "yt_dlp"]
if audio_format == "mp3":
# MP3: Convert to MP3 (slower, universal)
args.extend([
"-f", "bestaudio/best",
"-x", "--audio-format", "mp3",
"--audio-quality", "192K",
])
else:
# Best Quality: Download original audio (faster, better quality)
args.extend([
"-f", "bestaudio[ext=m4a]/bestaudio", # Prefer m4a, fallback to best
])
# Common args
args.extend([
"-o", out_tmpl,
"--no-playlist",
"--newline",
"--no-warnings",
"--progress",
"--print", "after_move:filepath",
url
])
self.start(cmd, args)
def handle_output(self):
chunks = self.readAllStandardOutput().data().decode('utf-8', errors='ignore').splitlines()
for chunk in chunks:
line = chunk.strip()
if line:
# Progress parsing from stdout (newline mode)
if '[download]' in line and '%' in line:
try:
parts = line.split()
for part in parts:
if '%' in part:
p_str = part.replace('%', '')
self.download_progress.emit(float(p_str))
break
except: pass
# yt-dlp prints the final filepath via --print after_move:filepath
# Store it unconditionally — the file may not exist yet if FFmpeg
# post-processing is still running, so DON'T gate on os.path.exists here.
elif os.path.isabs(line) or (os.path.sep in line and any(
line.endswith(ext) for ext in ('.mp3', '.m4a', '.opus', '.ogg', '.wav', '.flac'))):
self.final_filename = line
print(f"[DEBUG] Captured output path: {line}")
def handle_error(self):
err_data = self.readAllStandardError().data().decode('utf-8', errors='ignore').strip()
if err_data:
# Only log actual errors
if "error" in err_data.lower():
print(f"[YT-DLP ERR] {err_data}")
self.error_log += err_data + "\n"
def handle_finished(self):
if self.exitCode() == 0 and self.final_filename:
# Use a non-blocking timer to poll for the file instead of blocking the GUI thread.
self._poll_deadline = time.time() + 10.0
self._poll_timer = QTimer()
self._poll_timer.setInterval(100)
self._poll_timer.timeout.connect(self._check_file_ready)
self._poll_timer.start()
elif self.exitCode() == 0 and not self.final_filename:
self.error_occurred.emit("Download finished but could not determine output filename.\nCheck the download folder manually.")
else:
self.error_occurred.emit(f"Download process failed.\n{self.error_log}")
def _check_file_ready(self):
"""Non-blocking poll: check if the downloaded file has appeared on disk."""
if os.path.exists(self.final_filename) and os.path.getsize(self.final_filename) > 0:
self._poll_timer.stop()
print(f"[DEBUG] Download complete: {self.final_filename} ({os.path.getsize(self.final_filename)} bytes)")
self.download_finished.emit(self.final_filename)
elif time.time() > self._poll_deadline:
self._poll_timer.stop()
self.error_occurred.emit(f"Download finished but file missing or empty:\n{self.final_filename}")
class SettingsDialog(QDialog):
def __init__(self, settings_data, parent=None):
super().__init__(parent)
self.setWindowTitle("Settings")
self.resize(650, 650)
self.setStyleSheet("background-color: #111; color: #fff;")
# Store all settings
self.shortcuts = settings_data.get("shortcuts", {}).copy()
self.audio_settings = settings_data.get("audio", {}).copy()
self.ui_settings = settings_data.get("ui", {}).copy()
self.library_settings = settings_data.get("library", {}).copy()
layout = QVBoxLayout(self)
# Create tab widget
self.tabs = QTabWidget()
self.tabs.setStyleSheet("""
QTabWidget::pane { border: 1px solid #333; background: #0a0a0a; }
QTabBar::tab { background: #222; color: #888; padding: 8px 16px; margin: 2px; }
QTabBar::tab:selected { background: #00ffff; color: #000; font-weight: bold; }
QTabBar::tab:hover { background: #333; color: #fff; }
""")
# Tab 1: Keyboard Shortcuts
self.shortcuts_tab = self.create_shortcuts_tab()
self.tabs.addTab(self.shortcuts_tab, "Keyboard")
# Tab 2: Audio & Recording
self.audio_tab = self.create_audio_tab()
self.tabs.addTab(self.audio_tab, "Audio")
# Tab 3: UI Preferences
self.ui_tab = self.create_ui_tab()
self.tabs.addTab(self.ui_tab, "UI")
# Tab 4: Library
self.library_tab = self.create_library_tab()
self.tabs.addTab(self.library_tab, "Library")
layout.addWidget(self.tabs)
# Buttons
btn_layout = QHBoxLayout()
self.save_btn = QPushButton("Save All")
self.save_btn.clicked.connect(self.accept)
self.cancel_btn = QPushButton("Cancel")
self.cancel_btn.clicked.connect(self.reject)
btn_layout.addWidget(self.save_btn)
btn_layout.addWidget(self.cancel_btn)
layout.addLayout(btn_layout)
def create_shortcuts_tab(self):
widget = QWidget()
layout = QVBoxLayout(widget)
self.shortcuts_table = QTableWidget(len(self.shortcuts), 2)
self.shortcuts_table.setHorizontalHeaderLabels(["Action", "Key"])
self.shortcuts_table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Stretch)
self.shortcuts_table.setStyleSheet("background-color: #000; border: 1px solid #333;")
self.shortcuts_table.setEditTriggers(QAbstractItemView.EditTrigger.NoEditTriggers)
self.shortcuts_table.setSelectionMode(QAbstractItemView.SelectionMode.SingleSelection)
self.shortcuts_table.setSelectionBehavior(QAbstractItemView.SelectionBehavior.SelectRows)
actions = sorted(self.shortcuts.keys())
for row, action in enumerate(actions):
self.shortcuts_table.setItem(row, 0, QTableWidgetItem(action))
self.shortcuts_table.setItem(row, 1, QTableWidgetItem(self.shortcuts[action]))
layout.addWidget(self.shortcuts_table)
rebind_btn = QPushButton("Rebind Selected Shortcut")
rebind_btn.clicked.connect(self.rebind_selected)
layout.addWidget(rebind_btn)
return widget
def create_audio_tab(self):
widget = QWidget()
layout = QVBoxLayout(widget)
layout.setAlignment(Qt.AlignmentFlag.AlignTop)
# Streaming section
stream_group = QLabel("Live Streaming")
stream_group.setStyleSheet("font-size: 14px; font-weight: bold; color: #00ffff; margin-top: 10px;")
layout.addWidget(stream_group)
stream_url_label = QLabel("Stream Server URL:")
self.stream_url_input = QLineEdit()
self.stream_url_input.setPlaceholderText("http://YOUR_SERVER_IP:8080/api/stream")
current_stream_url = self.audio_settings.get("stream_server_url", "http://localhost:8080/api/stream")
self.stream_url_input.setText(current_stream_url)
self.stream_url_input.setStyleSheet("""
QLineEdit {
background: #1a1a1a;
border: 1px solid #333;
padding: 8px;
color: #fff;
border-radius: 4px;
}
""")
pw_label = QLabel("DJ Panel Password (leave blank if none):")
self.dj_password_input = QLineEdit()
self.dj_password_input.setEchoMode(QLineEdit.EchoMode.Password)
self.dj_password_input.setPlaceholderText("Leave blank if no password is set")
self.dj_password_input.setText(self.audio_settings.get("dj_panel_password", ""))
self.dj_password_input.setStyleSheet("""
QLineEdit {
background: #1a1a1a;
border: 1px solid #333;
padding: 8px;
color: #fff;
border-radius: 4px;
}
""")
layout.addWidget(stream_url_label)
layout.addWidget(self.stream_url_input)
layout.addSpacing(8)
layout.addWidget(pw_label)
layout.addWidget(self.dj_password_input)
layout.addSpacing(20)
# Recording section
rec_group = QLabel("Recording")
rec_group.setStyleSheet("font-size: 14px; font-weight: bold; color: #ff00ff; margin-top: 10px;")
layout.addWidget(rec_group)
# Sample rate
rate_label = QLabel("Recording Sample Rate:")
self.sample_rate_combo = QComboBox()
self.sample_rate_combo.addItem("44.1 kHz", 44100)
self.sample_rate_combo.addItem("48 kHz (Recommended)", 48000)
current_rate = self.audio_settings.get("recording_sample_rate", 48000)
self.sample_rate_combo.setCurrentIndex(0 if current_rate == 44100 else 1)
# Format
format_label = QLabel("Recording Format:")
self.format_combo = QComboBox()
self.format_combo.addItem("WAV (Lossless)", "wav")
self.format_combo.addItem("MP3 (Compressed)", "mp3")
current_format = self.audio_settings.get("recording_format", "wav")
self.format_combo.setCurrentIndex(0 if current_format == "wav" else 1)
layout.addWidget(rate_label)
layout.addWidget(self.sample_rate_combo)
layout.addSpacing(10)
layout.addWidget(format_label)
layout.addWidget(self.format_combo)
layout.addStretch()
return widget
def create_ui_tab(self):
widget = QWidget()
layout = QVBoxLayout(widget)
layout.setAlignment(Qt.AlignmentFlag.AlignTop)
# Neon mode default
neon_label = QLabel("Default Neon Edge Mode:")
self.neon_combo = QComboBox()
self.neon_combo.addItem("Off", 0)
self.neon_combo.addItem("Blue (Cyan)", 1)
self.neon_combo.addItem("Purple (Magenta)", 2)
current_neon = self.ui_settings.get("neon_mode", 0)
self.neon_combo.setCurrentIndex(current_neon)
layout.addWidget(neon_label)
layout.addWidget(self.neon_combo)
layout.addStretch()
return widget
def create_library_tab(self):
widget = QWidget()
layout = QVBoxLayout(widget)
layout.setAlignment(Qt.AlignmentFlag.AlignTop)
# Auto-scan
self.auto_scan_check = QCheckBox("Auto-scan library on startup")
self.auto_scan_check.setChecked(self.library_settings.get("auto_scan", True))
# YouTube default format
yt_label = QLabel("YouTube Download Default Format:")
self.yt_format_combo = QComboBox()
self.yt_format_combo.addItem("MP3 (Universal)", "mp3")
self.yt_format_combo.addItem("Best Quality (Faster)", "best")
current_yt = self.library_settings.get("yt_default_format", "mp3")
self.yt_format_combo.setCurrentIndex(0 if current_yt == "mp3" else 1)
layout.addWidget(self.auto_scan_check)
layout.addSpacing(10)
layout.addWidget(yt_label)
layout.addWidget(self.yt_format_combo)
layout.addStretch()
return widget
def rebind_selected(self):
row = self.shortcuts_table.currentRow()
if row < 0:
QMessageBox.warning(self, "No Selection", "Please select an action to rebind.")
return
action = self.shortcuts_table.item(row, 0).text()
from PyQt6.QtWidgets import QInputDialog
new_key, ok = QInputDialog.getText(self, "Rebind Key", f"Enter new key sequence for {action}:", text=self.shortcuts[action])
if ok and new_key:
self.shortcuts[action] = new_key
self.shortcuts_table.item(row, 1).setText(new_key)
def get_all_settings(self):
"""Return all settings as a dictionary"""
return {
"shortcuts": self.shortcuts,
"audio": {
"recording_sample_rate": self.sample_rate_combo.currentData(),
"recording_format": self.format_combo.currentData(),
"stream_server_url": self.stream_url_input.text(),
"dj_panel_password": self.dj_password_input.text(),
},
"ui": {
"neon_mode": self.neon_combo.currentData(),
},
"library": {
"auto_scan": self.auto_scan_check.isChecked(),
"yt_default_format": self.yt_format_combo.currentData(),
}
}
class YTResultDialog(QDialog):
def __init__(self, results, parent=None):
super().__init__(parent)
self.setWindowTitle("YouTube Pro Search")
self.resize(600, 450)
self.setStyleSheet("""
QDialog { background-color: #0a0a0a; border: 2px solid #cc0000; }
QListWidget { background-color: #000; color: #0f0; border: 1px solid #333; font-family: 'Courier New'; font-size: 13px; }
QListWidget::item { padding: 10px; border-bottom: 1px solid #111; }
QListWidget::item:selected { background-color: #222; color: #fff; border: 1px solid #cc0000; }
QLabel { color: #fff; font-weight: bold; font-size: 16px; margin-bottom: 10px; }
QPushButton { background-color: #cc0000; color: white; border: none; padding: 12px; font-weight: bold; border-radius: 5px; }
QPushButton:hover { background-color: #ff0000; }
""")
layout = QVBoxLayout(self)
header = QLabel("YouTube Search Results")
layout.addWidget(header)
self.list_widget = QListWidget()
layout.addWidget(self.list_widget)
for vid in results:
duration_sec = vid.get('duration', 0)
if not duration_sec: duration_sec = 0
m, s = divmod(int(duration_sec), 60)
title_text = vid.get('title', 'Unknown Title')
channel = vid.get('uploader', 'Unknown Artist')
item = QListWidgetItem(f"{title_text}\n [{m:02}:{s:02}] - {channel}")
item.setData(Qt.ItemDataRole.UserRole, vid.get('url'))
self.list_widget.addItem(item)
self.list_widget.itemDoubleClicked.connect(self.accept)
btn_layout = QHBoxLayout()
self.cancel_btn = QPushButton("CANCEL")
self.cancel_btn.setStyleSheet("background-color: #333; color: #888; border-radius: 5px;")
self.cancel_btn.clicked.connect(self.reject)
btn_hl = QPushButton("DOWNLOAD & IMPORT")
btn_hl.clicked.connect(self.accept)
btn_layout.addWidget(self.cancel_btn)
btn_layout.addWidget(btn_hl)
layout.addLayout(btn_layout)
def get_selected_url(self):
i = self.list_widget.currentItem()
return i.data(Qt.ItemDataRole.UserRole) if i else None
class RecordingWorker(QProcess):
"""Records this app's audio output (isolated) using FFmpeg."""
recording_started = pyqtSignal()
recording_error = pyqtSignal(str)
def __init__(self, parent=None):
super().__init__(parent)
self.output_file = ""
self.readyReadStandardError.connect(self.handle_error)
def start_recording(self, output_path):
"""Start recording this app's audio to file."""
self.output_file = output_path
if not shutil.which("ffmpeg"):
self.recording_error.emit("FFmpeg not found. Install with: sudo apt install ffmpeg")
return False
source = get_audio_capture_source()
print(f"[RECORDING] Starting: {output_path} (source={source})")
args = [
"-f", "pulse",
"-i", source,
"-ac", "2",
"-ar", "48000",
"-acodec", "pcm_s16le",
"-sample_fmt", "s16",
"-y",
output_path
]
self.start("ffmpeg", args)
self.recording_started.emit()
return True
def stop_recording(self):
"""Stop the recording."""
if self.state() == QProcess.ProcessState.Running:
print("[RECORDING] Stopping...")
self.write(b"q")
self.waitForFinished(3000)
if self.state() == QProcess.ProcessState.Running:
self.kill()
def handle_error(self):
"""Handle FFmpeg stderr (which includes progress info)"""
err = self.readAllStandardError().data().decode('utf-8', errors='ignore').strip()
if err and "error" in err.lower():
print(f"[RECORDING ERROR] {err}")
class StreamingWorker(QThread):
"""Streams this app's isolated audio output to a server using Socket.IO.
PULSE_SINK is set at startup so ALL audio from this process goes to the
virtual sink automatically. We just capture from its monitor here.
"""
streaming_started = pyqtSignal()
streaming_error = pyqtSignal(str)
listener_count = pyqtSignal(int)
def __init__(self, parent=None):
super().__init__(parent)
self.sio = None
self.stream_url = ""
self.dj_password = ""
self.glow_intensity = 30 # mirrors the UI slider; sent to listeners on connect
self.is_running = False
self.ffmpeg_proc = None
self._broadcast_started = False
def on_connect(self):
print("[SOCKET] Connected to DJ server")
if not self._broadcast_started:
self._broadcast_started = True
self.sio.emit('start_broadcast', {'format': 'mp3', 'bitrate': '128k'})
# Push current glow intensity to all listeners as soon as we connect
self.sio.emit('listener_glow', {'intensity': self.glow_intensity})
self.streaming_started.emit()
else:
# Reconnected mid-stream — server handles resume gracefully
print("[SOCKET] Reconnected - resuming existing broadcast")
self.sio.emit('start_broadcast', {'format': 'mp3', 'bitrate': '128k'})
self.sio.emit('listener_glow', {'intensity': self.glow_intensity})
def on_disconnect(self):
print("[SOCKET] Disconnected from DJ server")
def on_connect_error(self, data):
self.streaming_error.emit(f"Connection error: {data}")
def on_listener_count(self, data):
self.listener_count.emit(data.get('count', 0))
@staticmethod
def _get_auth_cookie(base_url, password):
"""POST the DJ panel password to /login and return the session cookie string.
The server sets a Flask session cookie on success. We forward that
cookie as an HTTP header on the Socket.IO upgrade request so the
socket handler sees an authenticated session.
"""
try:
resp = requests.post(
f"{base_url}/login",
data={"password": password},
allow_redirects=False,
timeout=5,
)
# Server responds with 302 + Set-Cookie on success
if resp.cookies:
return "; ".join(f"{k}={v}" for k, v in resp.cookies.items())
print(f"[AUTH] Login response {resp.status_code} — no session cookie returned")
return None
except Exception as e:
print(f"[AUTH] Login request failed: {e}")
return None
@staticmethod
def _drain_stderr(proc):
"""Read ffmpeg's stderr in a daemon thread so the OS pipe buffer never
fills up and deadlocks the stdout read loop."""
try:
for raw_line in iter(proc.stderr.readline, b''):
line = raw_line.decode('utf-8', errors='ignore').strip()
if line:
print(f"[STREAM FFMPEG] {line}")
except Exception:
pass
def run(self):
try:
# Create a fresh Socket.IO client for each session
self.sio = socketio.Client()
self.sio.on('connect', self.on_connect)
self.sio.on('disconnect', self.on_disconnect)
self.sio.on('listener_count', self.on_listener_count)
self.sio.on('connect_error', self.on_connect_error)
# If the DJ panel has a password set, authenticate via HTTP first to
# obtain a Flask session cookie, then forward it on the WS upgrade
# request so the socket handler sees an authenticated session.
connect_headers = {}
if self.dj_password:
print("[AUTH] DJ panel password configured — authenticating...")
cookie = self._get_auth_cookie(self.stream_url, self.dj_password)
if cookie:
connect_headers['Cookie'] = cookie
print("[AUTH] Authenticated — session cookie obtained")
else:
self.streaming_error.emit(
"Authentication failed.\n"
"Check the DJ panel password in Settings → Audio."
)
return
# wait_timeout: how long to wait for the server to respond during connect
self.sio.connect(self.stream_url, wait_timeout=10, headers=connect_headers)
source = get_audio_capture_source()
print(f"[STREAM] Capturing from: {source}")
cmd = [
"ffmpeg",
"-hide_banner",
"-loglevel", "error",
# Disable input buffering so frames reach the pipe immediately
"-fflags", "nobuffer",
"-f", "pulse",
"-i", source,
"-ac", "2",
"-ar", "44100",
"-b:a", "128k",
"-af", "aresample=async=1",
# Flush every packet — critical for low-latency pipe streaming
"-flush_packets", "1",
"-f", "mp3",
"pipe:1",
]
self.ffmpeg_proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0
)
# Drain stderr in a real daemon thread so the OS pipe buffer never
# fills up and blocks stdout (classic Python subprocess deadlock).
import threading as _threading
stderr_thread = _threading.Thread(
target=self._drain_stderr, args=(self.ffmpeg_proc,), daemon=True
)
stderr_thread.start()
while self.is_running and self.ffmpeg_proc.poll() is None:
# 4096 bytes ≈ 10 MP3 frames ≈ ~260 ms at 128 kbps — low-latency chunks
chunk = self.ffmpeg_proc.stdout.read(4096)
if not chunk:
break
sio = self.sio # local ref guards against stop_streaming() race
if sio and sio.connected:
sio.emit('audio_chunk', chunk)
# Detect unexpected ffmpeg exit during an active stream
if self.is_running:
ret = self.ffmpeg_proc.poll() if self.ffmpeg_proc else None
if ret is not None and ret != 0:
self.streaming_error.emit(
f"FFmpeg exited with code {ret}.\n"
"Check that PulseAudio / PipeWire is running and the "
"virtual audio sink was created successfully."
)
except Exception as e:
self.streaming_error.emit(f"Streaming error: {e}")
finally:
self.stop_streaming()
def start_streaming(self, base_url, bitrate=128, password=""):
self.stream_url = base_url
self.dj_password = password
self.is_running = True
self.start()
return True
def emit_if_connected(self, event, data=None):
"""Thread-safe emit from any thread — no-op if socket is not connected."""
sio = self.sio
if sio and sio.connected:
try:
sio.emit(event, data)
except Exception as e:
print(f"[SOCKET] emit_if_connected error: {e}")
def stop_streaming(self):
"""Thread-safe stop: capture refs locally before clearing to avoid TOCTOU."""
self.is_running = False
self._broadcast_started = False
proc = self.ffmpeg_proc
self.ffmpeg_proc = None
sio = self.sio
self.sio = None
if proc:
try:
proc.terminate()
except Exception:
pass
if sio:
try:
if sio.connected:
sio.emit('stop_broadcast')
sio.disconnect()
except Exception:
pass
# --- WIDGETS ---
class GlowFrame(QWidget):
"""Custom widget that paints a neon glow effect around the edges"""
def __init__(self, parent=None):
super().__init__(parent)
self.setAttribute(Qt.WidgetAttribute.WA_TransparentForMouseEvents) # Don't block mouse events
self.setAttribute(Qt.WidgetAttribute.WA_TranslucentBackground)
self.glow_color = QColor("#0ff")
self.glow_enabled = False
def set_glow(self, enabled, color="#0ff"):
self.glow_enabled = enabled
self.glow_color = QColor(color)
self.update()
def paintEvent(self, event):
if not self.glow_enabled:
return
painter = QPainter(self)
painter.setRenderHint(QPainter.RenderHint.Antialiasing)
rect = self.rect()
glow_width = 80 # Wider glow for more intensity
# Draw glow from each edge using linear gradients
from PyQt6.QtGui import QLinearGradient
# Top edge glow
top_gradient = QLinearGradient(0, 0, 0, glow_width)
for i in range(6):
pos = i / 5.0
alpha = int(255 * (1 - pos)) # Full opacity at edge
color = QColor(self.glow_color)
color.setAlpha(alpha)
top_gradient.setColorAt(pos, color)
painter.fillRect(0, 0, rect.width(), glow_width, top_gradient)
# Bottom edge glow
bottom_gradient = QLinearGradient(0, rect.height() - glow_width, 0, rect.height())
for i in range(6):
pos = i / 5.0
alpha = int(255 * pos)
color = QColor(self.glow_color)
color.setAlpha(alpha)
bottom_gradient.setColorAt(pos, color)
painter.fillRect(0, rect.height() - glow_width, rect.width(), glow_width, bottom_gradient)
# Left edge glow
left_gradient = QLinearGradient(0, 0, glow_width, 0)
for i in range(6):
pos = i / 5.0
alpha = int(255 * (1 - pos))
color = QColor(self.glow_color)
color.setAlpha(alpha)
left_gradient.setColorAt(pos, color)
painter.fillRect(0, 0, glow_width, rect.height(), left_gradient)
# Right edge glow
right_gradient = QLinearGradient(rect.width() - glow_width, 0, rect.width(), 0)
for i in range(6):
pos = i / 5.0
alpha = int(255 * pos)
color = QColor(self.glow_color)
color.setAlpha(alpha)
right_gradient.setColorAt(pos, color)
painter.fillRect(rect.width() - glow_width, 0, glow_width, rect.height(), right_gradient)
class VinylWidget(QWidget):
def __init__(self, color_hex, parent=None):
super().__init__(parent)
self.setMinimumSize(120, 120)
self.angle = 0
self.speed = 1.0
self.is_spinning = False
self.color = QColor(color_hex)
# Initialize drawing resources
self.brush_disk = QBrush(QColor("#111"))
self.pen_disk = QPen(QColor("#000"), 2)
self.brush_label = QBrush(self.color)
self.brush_white = QBrush(Qt.GlobalColor.white)
self.center = QPointF(0, 0)
self.radius = 0
self.timer = QTimer(self)
self.timer.timeout.connect(self.rotate)
def resizeEvent(self, event):
w, h = self.width(), self.height()
self.center = QPointF(w / 2, h / 2)
self.radius = min(w, h) / 2 - 5
super().resizeEvent(event)
def start_spin(self):
if not self.is_spinning:
self.is_spinning = True
self.timer.start(ANIMATION_INTERVAL)
def stop_spin(self):
self.is_spinning = False
self.timer.stop()
def set_speed(self, rate):
self.speed = rate
def rotate(self):
self.angle = (self.angle + 3.0 * self.speed) % 360
self.update()
def paintEvent(self, event):
p = QPainter(self)
p.setRenderHint(QPainter.RenderHint.Antialiasing)
p.translate(self.center)
p.rotate(self.angle)
# Draw vinyl disk
p.setBrush(self.brush_disk)
p.setPen(self.pen_disk)
p.drawEllipse(QPointF(0, 0), self.radius, self.radius)
# Draw grooves
p.setBrush(Qt.BrushStyle.NoBrush)
p.setPen(QPen(QColor("#222"), 1))
p.drawEllipse(QPointF(0, 0), self.radius * 0.8, self.radius * 0.8)
p.drawEllipse(QPointF(0, 0), self.radius * 0.6, self.radius * 0.6)
# Draw center label
p.setBrush(self.brush_label)
p.setPen(Qt.PenStyle.NoPen)
p.drawEllipse(QPointF(0, 0), self.radius * 0.35, self.radius * 0.35)
# Draw position marker
p.setBrush(self.brush_white)
p.drawRect(QRectF(-2, -self.radius * 0.35, 4, 12))
class WaveformWidget(QWidget):
seekRequested = pyqtSignal(int)
def __init__(self, color_hex, parent=None):
super().__init__(parent)
self.color = QColor(color_hex)
self.setMinimumHeight(60)
self.setCursor(Qt.CursorShape.PointingHandCursor)
self.duration = 1
self.position = 0
self.wave_data = []
self.loop_active = False
self.loop_start = 0
self.loop_end = 0
self.last_seek_time = 0
# Initialize drawing resources
self.brush_active = QBrush(self.color)
self.brush_inactive = QBrush(QColor("#444"))
self.pen_white = QPen(QColor("#fff"), 2)
self.loop_brush = QBrush(QColor(255, 165, 0, 100))
self.loop_pen = QPen(QColor("#ffa500"), 2)
def generate_wave(self, file_path):
random.seed(str(file_path))
self.wave_data = [max(0.1, random.random()**2) for _ in range(250)]
self.update()
def set_duration(self, d):
self.duration = max(1, d)
self.update()
def set_position(self, p):
self.position = p
self.update()
def set_loop_region(self, active, start, end):
self.loop_active = active
self.loop_start = start
self.loop_end = end
self.update()
def mousePressEvent(self, e):
if time.time() - self.last_seek_time > 0.1:
seek_pos = int((e.position().x() / self.width()) * self.duration)
self.seekRequested.emit(seek_pos)
self.last_seek_time = time.time()
def mouseMoveEvent(self, e):
if time.time() - self.last_seek_time > 0.1:
seek_pos = int((e.position().x() / self.width()) * self.duration)
self.seekRequested.emit(seek_pos)
self.last_seek_time = time.time()
def paintEvent(self, event):
p = QPainter(self)
p.setRenderHint(QPainter.RenderHint.Antialiasing)
w, h = self.width(), self.height()
p.fillRect(0, 0, w, h, QColor("#111"))
if not self.wave_data:
p.setPen(self.pen_white)
p.drawLine(0, int(h / 2), w, int(h / 2))
return
bar_width = w / len(self.wave_data)
play_x = (self.position / self.duration) * w
p.setPen(Qt.PenStyle.NoPen)
# Draw waveform bars
for i, val in enumerate(self.wave_data):
brush = self.brush_active if i * bar_width < play_x else self.brush_inactive
p.setBrush(brush)
bar_height = val * h * 0.9
p.drawRect(QRectF(i * bar_width, (h - bar_height) / 2, bar_width, bar_height))
# Draw loop region
if self.loop_active:
loop_x = (self.loop_start / self.duration) * w
loop_width = ((self.loop_end - self.loop_start) / self.duration) * w
p.setBrush(self.loop_brush)
p.drawRect(QRectF(loop_x, 0, loop_width, h))
p.setPen(self.loop_pen)
p.drawLine(int(loop_x), 0, int(loop_x), h)
p.drawLine(int(loop_x + loop_width), 0, int(loop_x + loop_width), h)
# Draw playhead
p.setPen(self.pen_white)
p.drawLine(int(play_x), 0, int(play_x), h)
class DeckWidget(QGroupBox):
def __init__(self, name, color_code, deck_id, parent=None):
super().__init__(name, parent)
self.setObjectName(name.replace(" ", "_"))
self.color_code = color_code
self.deck_id = deck_id
self.playback_mode = 0
self.loop_active = False
self.loop_start = 0
self.loop_end = 0
self.loop_btns = []
self.xf_vol = 100
self.current_title = ""
self.loop_timer = QTimer(self)
self.loop_timer.setInterval(LOOP_CHECK_INTERVAL)
self.loop_timer.timeout.connect(self.check_loop)
self.audio_output = QAudioOutput()
self.player = QMediaPlayer()
self.player.setAudioOutput(self.audio_output)
self.player.positionChanged.connect(self.on_position_changed)
self.player.durationChanged.connect(self.on_duration_changed)
self.player.mediaStatusChanged.connect(self.check_queue)
self.real_duration = 0
self.setup_ui()
def setup_ui(self):
layout = QVBoxLayout()
layout.setSpacing(5)
# Top row: Vinyl and track info
r1 = QHBoxLayout()
self.vinyl = VinylWidget(self.color_code)
r1.addWidget(self.vinyl)
c1 = QVBoxLayout()
self.lbl_tr = QLabel("NO MEDIA")
self.lbl_tr.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.lbl_tr.setStyleSheet(
f"color: {self.color_code}; border: 1px solid {self.color_code}; "
f"background: #000; padding: 4px;"
)
self.lbl_tr.setWordWrap(True)
c1.addWidget(self.lbl_tr)
rt = QHBoxLayout()
self.lbl_cur = QLabel("00:00")
self.lbl_cur.setStyleSheet("color:#fff")
self.lbl_tot = QLabel("00:00")
self.lbl_tot.setStyleSheet("color:#fff")
rt.addWidget(self.lbl_cur)
rt.addStretch()
rt.addWidget(self.lbl_tot)
c1.addLayout(rt)
r1.addLayout(c1)
layout.addLayout(r1)
# Waveform
self.wave = WaveformWidget(self.color_code)
self.wave.seekRequested.connect(self.player.setPosition)
layout.addWidget(self.wave)
# Loop buttons
g = QGridLayout()
g.setSpacing(2)
loops = [("8", 8), ("4", 4), ("2", 2), ("1", 1), ("1/2", 0.5), ("1/4", 0.25), ("1/8", 0.125)]
for i, (label, beats) in enumerate(loops):
btn = QPushButton(label)
btn.setObjectName("btn_loop")
btn.setCheckable(True)
btn.setToolTip(f"Set loop to {beats} beat(s)")
btn.clicked.connect(lambda c, b=beats, o=btn: self.set_loop(b, o))
g.addWidget(btn, 0, i)
self.loop_btns.append(btn)
exit_btn = QPushButton("EXIT")
exit_btn.setObjectName("btn_loop_exit")
exit_btn.setToolTip("Clear active loop")
exit_btn.clicked.connect(self.clear_loop)
g.addWidget(exit_btn, 0, len(loops))
layout.addLayout(g)
# Playback controls
rc = QHBoxLayout()
bp = QPushButton("PLAY")
bp.setToolTip("Play track")
bp.clicked.connect(self.play)
bpa = QPushButton("PAUSE")
bpa.setToolTip("Pause playback")
bpa.clicked.connect(self.pause)
bs = QPushButton("STOP")
bs.setToolTip("Stop playback")
bs.clicked.connect(self.stop)
self.b_mode = QPushButton("MODE: CONT")
self.b_mode.setFixedWidth(100)
self.b_mode.setProperty("mode", "0")
self.b_mode.setToolTip("Cycle playback mode: Continuous / Loop 1 / Stop")
self.b_mode.clicked.connect(self.cycle_mode)
rc.addWidget(bp)
rc.addWidget(bpa)
rc.addWidget(bs)
rc.addSpacing(10)
rc.addWidget(self.b_mode)
layout.addLayout(rc)
# Pitch control
rp = QHBoxLayout()
self.sl_rate = QSlider(Qt.Orientation.Horizontal)
self.sl_rate.setRange(50, 150)
self.sl_rate.setValue(100)
self.sl_rate.setToolTip("Adjust playback speed / pitch")
self.sl_rate.valueChanged.connect(self.update_playback_rate)
br = QPushButton("R")
br.setToolTip("Reset pitch to 1.0x")
br.clicked.connect(lambda: self.sl_rate.setValue(100))
self.lbl_rate = QLabel("1.0x")
self.lbl_rate.setStyleSheet("color:#fff; font-weight:bold;")
rp.addWidget(QLabel("PITCH", styleSheet="color:#666"))
rp.addWidget(self.sl_rate)
rp.addWidget(br)
rp.addWidget(self.lbl_rate)
layout.addLayout(rp)
# Bottom section: Queue and EQ
bottom = QHBoxLayout()
# Queue widget
qc = QWidget()
ql = QVBoxLayout(qc)
ql.setContentsMargins(0, 0, 0, 0)
hq = QHBoxLayout()
hq.addWidget(QLabel(f"QUEUE {self.deck_id}", styleSheet="font-size:10px; color:#666"))
bd = QPushButton("X")
bd.setObjectName("btn_remove")
bd.setToolTip("Remove selected track from queue")
bd.clicked.connect(self.delete_selected)
hq.addStretch()
hq.addWidget(bd)
ql.addLayout(hq)
self.q_list = QListWidget()
self.q_list.setObjectName("queue_list")
self.q_list.setDragDropMode(QAbstractItemView.DragDropMode.InternalMove)
self.q_list.setDefaultDropAction(Qt.DropAction.MoveAction)
self.q_list.setSelectionMode(QAbstractItemView.SelectionMode.SingleSelection)
self.q_list.itemDoubleClicked.connect(
lambda i: self.q_list.takeItem(self.q_list.row(i))
)
QShortcut(QKeySequence(Qt.Key.Key_Delete), self.q_list).activated.connect(self.delete_selected)
ql.addWidget(self.q_list)
# EQ sliders widget
sc = QWidget()
sl = QVBoxLayout(sc)
sl.setContentsMargins(0, 0, 0, 0)
row_s = QHBoxLayout()
def make_slider(prop, label, tooltip):
v = QVBoxLayout()
s = QSlider(Qt.Orientation.Vertical)
s.setRange(0, MAX_SLIDER_VALUE)
s.setValue(MAX_SLIDER_VALUE)
s.setProperty("eq", prop)
s.setToolTip(tooltip)
s.valueChanged.connect(self.recalc_vol)
l = QLabel(label)
l.setAlignment(Qt.AlignmentFlag.AlignCenter)
l.setStyleSheet("font-size:8px; color:#aaa;")
v.addWidget(s, 1, Qt.AlignmentFlag.AlignHCenter)
v.addWidget(l)
row_s.addLayout(v)
return s
self.sl_vol = make_slider("vol", "LEV", "Volume level")
self.sl_hi = make_slider("high", "HI", "High frequencies (treble)")
self.sl_mid = make_slider("mid", "MID", "Mid frequencies")
self.sl_low = make_slider("low", "LO", "Low frequencies (bass)")
sl.addLayout(row_s)
if self.deck_id == "A":
bottom.addWidget(qc, 3)
bottom.addWidget(sc, 1)
else:
bottom.addWidget(sc, 1)
bottom.addWidget(qc, 3)
layout.addLayout(bottom, 1)
self.setLayout(layout)
def delete_selected(self):
for item in self.q_list.selectedItems():
self.q_list.takeItem(self.q_list.row(item))
def cycle_mode(self):
self.playback_mode = (self.playback_mode + 1) % 3
modes = {0: "CONT", 1: "LOOP 1", 2: "STOP"}
self.b_mode.setText(f"MODE: {modes[self.playback_mode]}")
self.b_mode.setProperty("mode", str(self.playback_mode))
self.b_mode.style().unpolish(self.b_mode)
self.b_mode.style().polish(self.b_mode)
def set_loop(self, beats, btn):
for x in self.loop_btns:
x.setChecked(x == btn)
if self.player.playbackState() != QMediaPlayer.PlaybackState.PlayingState:
return
ms_per_beat = MS_PER_MINUTE / DEFAULT_BPM
self.loop_start = self.player.position()
self.loop_end = self.loop_start + int(ms_per_beat * beats)
self.loop_active = True
self.loop_timer.start()
self.wave.set_loop_region(True, self.loop_start, self.loop_end)
def clear_loop(self):
self.loop_active = False
self.loop_timer.stop()
self.wave.set_loop_region(False, 0, 0)
for btn in self.loop_btns:
btn.setChecked(False)
def check_loop(self):
if self.loop_active and self.player.position() >= self.loop_end:
self.player.setPosition(int(self.loop_start))
def load_track(self, path):
if not path:
return
p = Path(path)
if not p.exists():
print(f"[ERROR] Track path does not exist: {p}")
return
try:
self.player.setSource(QUrl.fromLocalFile(str(p.absolute())))
self.current_title = p.stem
self.lbl_tr.setText(p.stem.upper())
self.vinyl.set_speed(0)
self.vinyl.angle = 0
self.vinyl.update()
self.wave.generate_wave(p)
# Find parent DJApp to show status
parent = self.window()
if hasattr(parent, 'status_label'):
parent.status_label.setText(f"Loaded: {p.name}")
# Use soundfile to get accurate duration (GStreamer/Qt6 can be wrong)
try:
info = sf.info(str(p.absolute()))
self.real_duration = int(info.duration * 1000)
print(f"[DEBUG] {self.deck_id} Real Duration: {self.real_duration}ms")
# Update UI immediately if possible, or wait for player durationChanged
self.wave.set_duration(self.real_duration)
except Exception as se:
print(f"[DEBUG] Could not get accurate duration with soundfile: {se}")
self.real_duration = 0
except Exception as e:
print(f"[ERROR] Failed to load track {p}: {e}")
self.lbl_tr.setText("LOAD ERROR")
def add_queue(self, path):
p = Path(path)
item = QListWidgetItem(p.name)
item.setData(Qt.ItemDataRole.UserRole, p)
self.q_list.addItem(item)
def check_queue(self, status):
if status == QMediaPlayer.MediaStatus.EndOfMedia:
# Premature EndOfMedia is common with GStreamer + VBR MP3s
if self.real_duration > 0 and self.player.position() < self.real_duration - 1000:
print(f"[DEBUG] {self.deck_id} Premature EndOfMedia at {self.player.position()}ms (expected {self.real_duration}ms)")
if self.playback_mode == 1:
# Loop 1 mode
self.player.setPosition(0)
self.play()
elif self.playback_mode == 0 and self.q_list.count() > 0:
# Continuous mode - load next from queue
next_item = self.q_list.takeItem(0)
self.load_track(next_item.data(Qt.ItemDataRole.UserRole))
self.play()
else:
# Stop mode or no queue items
self.stop()
def _emit_playing_state(self, is_playing):
"""Emit deck_glow and now_playing events to the listener page."""
mw = self.window()
if not hasattr(mw, 'streaming_worker') or not hasattr(mw, 'deck_a') or not hasattr(mw, 'deck_b'):
return
other = mw.deck_b if self.deck_id == 'A' else mw.deck_a
other_playing = (other.player.playbackState() == QMediaPlayer.PlaybackState.PlayingState)
a_playing = is_playing if self.deck_id == 'A' else other_playing
b_playing = is_playing if self.deck_id == 'B' else other_playing
mw.streaming_worker.emit_if_connected('deck_glow', {'A': a_playing, 'B': b_playing})
if is_playing and self.current_title:
mw.streaming_worker.emit_if_connected('now_playing', {
'title': self.current_title,
'deck': self.deck_id,
})
def play(self):
self.player.play()
self.vinyl.start_spin()
self._emit_playing_state(True)
def pause(self):
self.player.pause()
self.vinyl.stop_spin()
self._emit_playing_state(False)
def stop(self):
self.player.stop()
self.vinyl.stop_spin()
self.vinyl.angle = 0
self.vinyl.update()
self.clear_loop()
self._emit_playing_state(False)
def on_position_changed(self, pos):
self.wave.set_position(pos)
minutes = int(pos // MS_PER_MINUTE)
seconds = int((pos // 1000) % 60)
self.lbl_cur.setText(f"{minutes:02}:{seconds:02}")
def on_duration_changed(self, duration):
# Use our accurate duration if available, otherwise fallback to player's reported duration
final_duration = self.real_duration if self.real_duration > 0 else duration
self.wave.set_duration(final_duration)
minutes = int(final_duration // MS_PER_MINUTE)
seconds = int((final_duration // 1000) % 60)
self.lbl_tot.setText(f"{minutes:02}:{seconds:02}")
def update_playback_rate(self, value):
rate = value / 100.0
self.player.setPlaybackRate(rate)
self.lbl_rate.setText(f"{rate:.1f}x")
self.vinyl.set_speed(rate)
def set_xf_vol(self, volume):
self.xf_vol = volume
self.recalc_vol()
def recalc_vol(self):
eq_hi = self.sl_hi.value() / MAX_SLIDER_VALUE
eq_mid = self.sl_mid.value() / MAX_SLIDER_VALUE
eq_low = self.sl_low.value() / MAX_SLIDER_VALUE
eq_gain = eq_hi * eq_mid * eq_low
final = (self.xf_vol / 100.0) * (self.sl_vol.value() / MAX_SLIDER_VALUE) * eq_gain
self.audio_output.setVolume(final)
class DJApp(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("TechDJ Pro - Neon Edition")
self.resize(1200, 950)
self.setStyleSheet(STYLESHEET)
# Set window icon
icon_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "dj_icon.png")
if os.path.exists(icon_path):
self.setWindowIcon(QIcon(icon_path))
self.neon_state = 0
# --- LOCAL FOLDER SETUP ---
# Creates a folder named 'dj_tracks' inside your project directory
self.lib_path = Path(os.getcwd()) / "music"
if not self.lib_path.exists():
try:
self.lib_path.mkdir(exist_ok=True)
print(f"[INIT] Created library folder: {self.lib_path}")
except Exception as e:
print(f"[ERROR] Could not create library folder: {e}")
# Create recordings folder
self.recordings_path = Path(os.getcwd()) / "recordings"
if not self.recordings_path.exists():
try:
self.recordings_path.mkdir(exist_ok=True)
print(f"[INIT] Created recordings folder: {self.recordings_path}")
except Exception as e:
print(f"[ERROR] Could not create recordings folder: {e}")
self.search_worker = YTSearchWorker()
self.search_worker.results_ready.connect(self.on_search_results)
self.search_worker.error_occurred.connect(self.on_error)
self.download_worker = YTDownloadWorker()
self.download_worker.download_finished.connect(self.on_download_complete)
self.download_worker.error_occurred.connect(self.on_error)
self.download_worker.download_progress.connect(self.update_download_progress)
# Recording setup
self.recording_worker = RecordingWorker()
self.recording_worker.recording_error.connect(self.on_recording_error)
self.is_recording = False
self.recording_start_time = 0
self.recording_timer = QTimer()
self.recording_timer.timeout.connect(self.update_recording_time)
# Streaming setup
self.streaming_worker = StreamingWorker()
self.streaming_worker.streaming_error.connect(self.on_streaming_error)
self.streaming_worker.listener_count.connect(self.update_listener_count)
self.is_streaming = False
# Periodically ensure Qt's audio sink-inputs are routed to the virtual
# sink. Qt6 may use the PipeWire-native audio backend which ignores
# PULSE_SINK, causing the streaming ffmpeg to capture silence instead
# of the DJ's music. pactl move-sink-input fixes this regardless of
# which Qt audio backend is active.
self._audio_route_timer = QTimer()
self._audio_route_timer.timeout.connect(_route_qt_audio_to_virtual_sink)
self._audio_route_timer.start(2000) # run every 2 s
# Server library state
self.server_url = "http://localhost:5000"
self.library_mode = "local" # "local" or "server"
self.server_library = []
self.local_library = []
self.cache_dir = Path.home() / ".techdj_cache"
self.cache_dir.mkdir(exist_ok=True)
self.download_threads = {}
self.init_ui()
self.setup_keyboard_shortcuts()
self.apply_ui_settings() # Apply saved UI settings
# Filtering debounce timer
self.filter_timer = QTimer()
self.filter_timer.setSingleShot(True)
self.filter_timer.timeout.connect(self.perform_filter)
self.load_library()
def init_ui(self):
main = QWidget()
main.setObjectName("Central") # Set objectName for neon border styling
self.setCentralWidget(main)
layout = QVBoxLayout(main)
layout.setContentsMargins(15, 15, 15, 15)
# Create glow overlay
self.glow_frame = GlowFrame(main)
self.glow_frame.setGeometry(main.rect())
self.glow_frame.raise_() # Bring to front
# Top bar: Neon toggle and YouTube search
h = QHBoxLayout()
self.neon_button = QPushButton("NEON EDGE: OFF")
self.neon_button.setObjectName("btn_neon")
self.neon_button.setFixedWidth(150)
self.neon_button.setToolTip("Toggle neon border effect")
self.neon_button.clicked.connect(self.toggle_neon)
self.yt_input = QLineEdit()
self.yt_input.setPlaceholderText("Search YouTube or Paste URL...")
self.yt_input.setToolTip("Search YouTube with keywords or paste a YouTube/YT Music URL to download directly")
self.yt_input.returnPressed.connect(self.search_youtube)
# Format selector dropdown
self.format_selector = QComboBox()
self.format_selector.addItem("MP3 (slower, universal)", "mp3")
self.format_selector.addItem("Best Quality (faster)", "best")
self.format_selector.setCurrentIndex(0) # Default to MP3
self.format_selector.setToolTip("Choose download format:\nMP3 = Converted, slower\nBest = Original quality, faster")
self.format_selector.setFixedWidth(160)
self.search_button = QPushButton("GO")
self.search_button.setObjectName("btn_yt_go")
self.search_button.setFixedWidth(40)
self.search_button.setToolTip("Start YouTube search")
self.search_button.clicked.connect(self.search_youtube)
self.settings_btn = QPushButton("MAP")
self.settings_btn.setFixedWidth(40)
self.settings_btn.setToolTip("Open Keyboard Mapping Settings")
self.settings_btn.clicked.connect(self.open_settings)
self.status_label = QLabel("")
self.status_label.setStyleSheet("color:#0f0; font-weight:bold")
h.addWidget(self.neon_button)
h.addSpacing(10)
h.addWidget(self.yt_input)
h.addWidget(self.format_selector)
h.addWidget(self.search_button)
h.addWidget(self.settings_btn)
h.addWidget(self.status_label)
layout.addLayout(h)
# Download progress bar
self.download_progress_bar = QProgressBar()
self.download_progress_bar.setRange(0, 100)
self.download_progress_bar.setValue(0)
self.download_progress_bar.setTextVisible(True)
self.download_progress_bar.setFormat("%p% - Downloading...")
self.download_progress_bar.setStyleSheet("""
QProgressBar {
border: 1px solid #555;
border-radius: 3px;
text-align: center;
background-color: #111;
color: #fff;
}
QProgressBar::chunk {
background-color: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #00ff00, stop:1 #00aa00);
border-radius: 2px;
}
""")
self.download_progress_bar.setVisible(False) # Hidden by default
layout.addWidget(self.download_progress_bar)
# Decks
decks_layout = QHBoxLayout()
self.deck_a = DeckWidget("Deck A", "#00ffff", "A")
decks_layout.addWidget(self.deck_a)
self.deck_b = DeckWidget("Deck B", "#ff00ff", "B")
decks_layout.addWidget(self.deck_b)
layout.addLayout(decks_layout, 70)
# Crossfader
self.crossfader = QSlider(Qt.Orientation.Horizontal)
self.crossfader.setObjectName("crossfader")
self.crossfader.setRange(0, 100)
self.crossfader.setValue(50)
self.crossfader.setToolTip("Crossfade between decks (Left = Deck A, Right = Deck B)")
self.crossfader.valueChanged.connect(self.update_crossfade)
xf_layout = QVBoxLayout()
xf_layout.setContentsMargins(50, 5, 50, 5)
xf_layout.addWidget(self.crossfader)
layout.addLayout(xf_layout, 5)
# Recording controls
rec_layout = QHBoxLayout()
rec_layout.setContentsMargins(50, 10, 50, 10)
self.record_button = QPushButton("REC")
self.record_button.setFixedWidth(100)
self.record_button.setToolTip("Start/Stop recording your mix")
self.record_button.setStyleSheet("""
QPushButton {
background-color: #330000;
color: #ff3333;
border: 2px solid #550000;
font-weight: bold;
font-size: 14px;
padding: 8px;
}
QPushButton:hover {
background-color: #550000;
border-color: #ff0000;
}
""")
self.record_button.clicked.connect(self.toggle_recording)
self.recording_timer_label = QLabel("00:00")
self.recording_timer_label.setStyleSheet("""
color: #888;
font-size: 24px;
font-weight: bold;
font-family: 'Courier New';
""")
self.recording_timer_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.recording_status_label = QLabel("Ready to record")
self.recording_status_label.setStyleSheet("color: #666; font-size: 12px;")
self.recording_status_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
rec_left = QVBoxLayout()
rec_left.addWidget(self.recording_timer_label)
rec_left.addWidget(self.recording_status_label)
# Streaming button
self.stream_button = QPushButton("LIVE")
self.stream_button.setFixedWidth(100)
self.stream_button.setToolTip("Start/Stop live streaming")
self.stream_button.setStyleSheet("""
QPushButton {
background-color: #001a33;
color: #3399ff;
border: 2px solid #003366;
font-weight: bold;
font-size: 14px;
padding: 8px;
}
QPushButton:hover {
background-color: #003366;
border-color: #0066cc;
}
""")
self.stream_button.clicked.connect(self.toggle_streaming)
self.stream_status_label = QLabel("Offline")
self.stream_status_label.setStyleSheet("color: #666; font-size: 12px;")
self.stream_status_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.listener_count_label = QLabel("0 listeners")
self.listener_count_label.setStyleSheet("color: #3399ff; font-size: 10px; font-weight: bold;")
self.listener_count_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
stream_info = QVBoxLayout()
stream_info.addWidget(self.stream_status_label)
stream_info.addWidget(self.listener_count_label)
# --- Listener Glow Controls ---
glow_title = QLabel("LISTENER GLOW")
glow_title.setStyleSheet(
"color: #bc13fe; font-size: 10px; font-weight: bold; "
"font-family: 'Courier New'; letter-spacing: 1px;"
)
glow_title.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.glow_slider = QSlider(Qt.Orientation.Horizontal)
self.glow_slider.setRange(0, 100)
self.glow_slider.setValue(30)
self.glow_slider.setFixedWidth(130)
self.glow_slider.setToolTip(
"Listener page glow intensity\n"
"0 = off | 100 = max\n"
"Sends to all listeners in real-time while streaming"
)
self.glow_slider.setStyleSheet("""
QSlider::groove:horizontal {
border: 1px solid #550088;
height: 6px;
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #1a0026, stop:1 #bc13fe);
border-radius: 3px;
}
QSlider::handle:horizontal {
background: #bc13fe;
border: 2px solid #e040fb;
width: 14px;
height: 14px;
margin: -5px 0;
border-radius: 7px;
}
QSlider::handle:horizontal:hover {
background: #e040fb;
border-color: #fff;
}
""")
self.glow_slider.valueChanged.connect(self.on_glow_slider_changed)
self.glow_value_label = QLabel("30")
self.glow_value_label.setStyleSheet("color: #bc13fe; font-size: 10px;")
self.glow_value_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
glow_vbox = QVBoxLayout()
glow_vbox.setSpacing(2)
glow_vbox.addWidget(glow_title)
glow_vbox.addWidget(self.glow_slider)
glow_vbox.addWidget(self.glow_value_label)
rec_layout.addStretch()
rec_layout.addWidget(self.record_button)
rec_layout.addSpacing(20)
rec_layout.addLayout(rec_left)
rec_layout.addSpacing(40)
rec_layout.addWidget(self.stream_button)
rec_layout.addSpacing(20)
rec_layout.addLayout(stream_info)
rec_layout.addSpacing(40)
rec_layout.addLayout(glow_vbox)
rec_layout.addStretch()
layout.addLayout(rec_layout, 3)
# Library section
library_group = QGroupBox("LIBRARY")
lib_layout = QVBoxLayout(library_group)
button_row = QHBoxLayout()
self.local_mode_btn = QPushButton("LOCAL")
self.local_mode_btn.setObjectName("btn_lib_local")
self.local_mode_btn.setCheckable(True)
self.local_mode_btn.setChecked(True)
self.local_mode_btn.clicked.connect(lambda: self.set_library_mode("local"))
self.server_mode_btn = QPushButton("SERVER")
self.server_mode_btn.setObjectName("btn_lib_server")
self.server_mode_btn.setCheckable(True)
self.server_mode_btn.clicked.connect(lambda: self.set_library_mode("server"))
refresh_btn = QPushButton("REFRESH")
refresh_btn.setToolTip("Rescan library folder for audio files")
refresh_btn.clicked.connect(self.load_library)
upload_btn = QPushButton("UPLOAD")
upload_btn.setToolTip("Upload track to library")
upload_btn.clicked.connect(self.upload_track)
load_a_btn = QPushButton("LOAD A")
load_a_btn.setToolTip("Load selected track to Deck A (Ctrl+L)")
load_a_btn.clicked.connect(lambda: self.load_to_deck(self.deck_a))
queue_a_btn = QPushButton("Q A+")
queue_a_btn.setToolTip("Add selected track to Deck A queue (Ctrl+Shift+L)")
queue_a_btn.clicked.connect(lambda: self.queue_to_deck(self.deck_a))
load_b_btn = QPushButton("LOAD B")
load_b_btn.setToolTip("Load selected track to Deck B (Ctrl+R)")
load_b_btn.clicked.connect(lambda: self.load_to_deck(self.deck_b))
queue_b_btn = QPushButton("Q B+")
queue_b_btn.setToolTip("Add selected track to Deck B queue (Ctrl+Shift+R)")
queue_b_btn.clicked.connect(lambda: self.queue_to_deck(self.deck_b))
for btn in [self.local_mode_btn, self.server_mode_btn, refresh_btn, upload_btn, load_a_btn, queue_a_btn, load_b_btn, queue_b_btn]:
button_row.addWidget(btn)
lib_layout.addLayout(button_row)
self.search_filter = QLineEdit()
self.search_filter.setPlaceholderText("Filter...")
self.search_filter.setToolTip("Filter library by filename")
self.search_filter.textChanged.connect(self.filter_library)
lib_layout.addWidget(self.search_filter)
self.library_list = QListWidget()
self.library_list.setObjectName("main_lib")
lib_layout.addWidget(self.library_list)
layout.addWidget(library_group, 25)
# Initialize crossfade
self.update_crossfade()
def setup_keyboard_shortcuts(self):
# Clear existing shortcuts if any
if hasattr(self, '_shortcuts'):
for s in self._shortcuts:
s.setParent(None)
self._shortcuts = []
# Mapping actions to methods
mapping = {
"Deck A: Load": lambda: self.load_to_deck(self.deck_a),
"Deck A: Queue": lambda: self.queue_to_deck(self.deck_a),
"Deck A: Play/Pause": lambda: self.deck_a.play() if self.deck_a.player.playbackState() != QMediaPlayer.PlaybackState.PlayingState else self.deck_a.pause(),
"Deck B: Load": lambda: self.load_to_deck(self.deck_b),
"Deck B: Queue": lambda: self.queue_to_deck(self.deck_b),
"Deck B: Play/Pause": lambda: self.deck_b.play() if self.deck_b.player.playbackState() != QMediaPlayer.PlaybackState.PlayingState else self.deck_b.pause(),
"Global: Focus Search": lambda: self.search_filter.setFocus(),
"Global: Toggle Library": lambda: self.set_library_mode("server" if self.library_mode == "local" else "local"),
}
# Default shortcuts
self.default_shortcuts = {
"Deck A: Load": "Ctrl+L",
"Deck A: Queue": "Ctrl+Shift+L",
"Deck A: Play/Pause": "Space",
"Deck B: Load": "Ctrl+R",
"Deck B: Queue": "Ctrl+Shift+R",
"Deck B: Play/Pause": "Ctrl+Space",
"Global: Focus Search": "Ctrl+F",
"Global: Toggle Library": "Ctrl+Tab",
}
# Load all settings from file
self.settings_file = Path(os.getcwd()) / "settings.json"
self.all_settings = self.load_all_settings()
self.current_shortcuts = self.all_settings.get("shortcuts", self.default_shortcuts)
# Create shortcuts
for action, key in self.current_shortcuts.items():
if action in mapping:
sc = QShortcut(QKeySequence(key), self)
sc.activated.connect(mapping[action])
self._shortcuts.append(sc)
def load_all_settings(self):
"""Load all settings from settings.json"""
default_settings = {
"shortcuts": self.default_shortcuts if hasattr(self, 'default_shortcuts') else {},
"audio": {
"recording_sample_rate": 48000,
"recording_format": "wav",
},
"ui": {
"neon_mode": 0,
},
"library": {
"auto_scan": True,
"yt_default_format": "mp3",
}
}
if self.settings_file.exists():
try:
with open(self.settings_file, "r") as f:
loaded = json.load(f)
# Merge with defaults to ensure all keys exist
for key in default_settings:
if key not in loaded:
loaded[key] = default_settings[key]
return loaded
except Exception as e:
print(f"[SETTINGS] Error loading: {e}")
return default_settings
return default_settings
def apply_ui_settings(self):
"""Apply UI settings from loaded settings"""
ui_settings = self.all_settings.get("ui", {})
neon_mode = ui_settings.get("neon_mode", 0)
# Apply neon mode
if neon_mode != self.neon_state:
for _ in range(neon_mode):
self.toggle_neon()
# Apply library settings
library_settings = self.all_settings.get("library", {})
yt_default = library_settings.get("yt_default_format", "mp3")
self.format_selector.setCurrentIndex(0 if yt_default == "mp3" else 1)
def open_settings(self):
dialog = SettingsDialog(self.all_settings, self)
if dialog.exec():
# Get all updated settings
self.all_settings = dialog.get_all_settings()
self.current_shortcuts = self.all_settings["shortcuts"]
# Save all settings
with open(self.settings_file, "w") as f:
json.dump(self.all_settings, f, indent=4)
# Re-setup shortcuts
self.setup_keyboard_shortcuts()
# Apply UI settings
self.apply_ui_settings()
QMessageBox.information(self, "Settings Saved", "All settings have been updated!")
def search_youtube(self):
query = self.yt_input.text().strip()
if not query:
return
# Check if it's a direct URL
if "youtube.com/" in query or "youtu.be/" in query or "music.youtube.com/" in query:
# Direct Download mode
selected_format = self.format_selector.currentData()
self.status_label.setText("Downloading...")
# Show and reset progress bar
self.download_progress_bar.setValue(0)
self.download_progress_bar.setVisible(True)
self.download_worker.download(query, str(self.lib_path), selected_format)
self.yt_input.clear()
else:
# Keyword Search mode
self.status_label.setText("Searching...")
self.search_button.setEnabled(False)
self.search_worker.search(query)
def on_search_results(self, results):
self.status_label.setText("")
self.search_button.setEnabled(True)
dialog = YTResultDialog(results, self)
if dialog.exec():
url = dialog.get_selected_url()
if url:
# Get selected format from dropdown
selected_format = self.format_selector.currentData()
self.status_label.setText("Downloading...")
# Show and reset progress bar
self.download_progress_bar.setValue(0)
self.download_progress_bar.setVisible(True)
self.download_worker.download(url, str(self.lib_path), selected_format)
def update_download_progress(self, percentage):
"""Update download progress bar"""
self.download_progress_bar.setValue(int(percentage))
def on_download_complete(self, filepath):
self.status_label.setText("Done!")
self.download_progress_bar.setVisible(False) # Hide progress bar
self.load_library()
QMessageBox.information(self, "Download Complete", f"Saved: {os.path.basename(filepath)}")
self.status_label.setText("")
def on_error(self, error_msg):
self.status_label.setText("Error")
self.search_button.setEnabled(True)
self.download_progress_bar.setVisible(False) # Hide progress bar on error
QMessageBox.critical(self, "Error", error_msg)
def toggle_neon(self):
self.neon_state = (self.neon_state + 1) % 3
colors = {0: "#555", 1: "#0ff", 2: "#f0f"}
color = colors[self.neon_state]
labels = ["OFF", "BLUE", "PURPLE"]
self.neon_button.setText(f"NEON EDGE: {labels[self.neon_state]}")
self.neon_button.setStyleSheet(f"color: {color}; border: 1px solid {color};")
if self.neon_state == 0:
# Disable glow
self.glow_frame.set_glow(False)
self.centralWidget().setStyleSheet("QWidget#Central { border: none; }")
else:
# Enable glow with selected color
self.glow_frame.set_glow(True, color)
self.centralWidget().setStyleSheet("QWidget#Central { border: none; }")
def set_library_mode(self, mode):
self.library_mode = mode
self.local_mode_btn.setChecked(mode == "local")
self.server_mode_btn.setChecked(mode == "server")
self.load_library()
def load_library(self):
if self.library_mode == "local":
self.library_list.clear()
self.library_list.addItem(f"Reading: {self.lib_path.name}...")
self.library_scanner = LibraryScannerThread(self.lib_path)
self.library_scanner.files_found.connect(self.populate_library)
self.library_scanner.start()
else:
self.fetch_server_library()
def fetch_server_library(self):
self.library_list.clear()
self.library_list.addItem("Fetching server library...")
# Use the listener port (no auth required) for library / track data.
# The DJ port requires a password session which the Qt client doesn't hold.
listener_url = self.get_listener_base_url()
self.server_url = self.get_server_base_url() # kept for socket/streaming
self.fetcher = ServerLibraryFetcher(f"{listener_url}/library.json")
self.fetcher.finished.connect(
lambda tracks, err, success: self.on_server_library_fetched(tracks, listener_url, err, success)
)
self.fetcher.start()
def on_server_library_fetched(self, tracks, base_url, err, success):
self.library_list.clear()
if success:
self.server_library = tracks
self.populate_server_library(tracks, base_url)
else:
self.library_list.addItem(f"Error: {err}")
def populate_server_library(self, tracks, base_url):
self.library_list.clear()
for track in tracks:
item = QListWidgetItem(track['title'])
# Store URL and title
track_url = f"{base_url}/{track['file']}"
item.setData(Qt.ItemDataRole.UserRole, {"url": track_url, "title": track['title'], "is_server": True})
self.library_list.addItem(item)
def populate_library(self, files):
self.library_list.clear()
self.local_library = []
for file_path in files:
item = QListWidgetItem(file_path.name)
data = {"path": file_path, "title": file_path.stem, "is_server": False}
item.setData(Qt.ItemDataRole.UserRole, data)
self.library_list.addItem(item)
self.local_library.append(data)
def filter_library(self, filter_text):
# Debounce the search to prevent UI freezing while typing
self.filter_timer.start(250)
def perform_filter(self):
filter_text = self.search_filter.text().lower().strip()
self.library_list.setUpdatesEnabled(False)
for i in range(self.library_list.count()):
item = self.library_list.item(i)
is_match = not filter_text or filter_text in item.text().lower()
item.setHidden(not is_match)
self.library_list.setUpdatesEnabled(True)
def load_to_deck(self, deck):
item = self.library_list.currentItem()
if item:
data = item.data(Qt.ItemDataRole.UserRole)
if data:
if data.get("is_server"):
self.load_server_track(deck, data)
else:
path = data.get("path")
if path:
deck.load_track(path)
def load_server_track(self, deck, data):
url = data.get("url")
title = data.get("title")
filename = os.path.basename(url)
cache_path = self.cache_dir / filename
if cache_path.exists():
deck.load_track(cache_path)
else:
self.status_label.setText(f"Downloading: {title}...")
thread = DownloadThread(url, str(cache_path))
thread.finished.connect(lambda path, success: self.on_server_download_complete(deck, path, success))
thread.start()
self.download_threads[filename] = thread
def on_server_download_complete(self, deck, path, success):
self.status_label.setText("Download complete")
if success:
deck.load_track(Path(path))
else:
QMessageBox.warning(self, "Download Error", "Failed to download track from server")
def queue_to_deck(self, deck):
item = self.library_list.currentItem()
if item:
data = item.data(Qt.ItemDataRole.UserRole)
if data:
if data.get("is_server"):
# For server queueing, we download first then queue
url = data.get("url")
filename = os.path.basename(url)
cache_path = self.cache_dir / filename
if cache_path.exists():
deck.add_queue(cache_path)
else:
thread = DownloadThread(url, str(cache_path))
thread.finished.connect(lambda path, success: deck.add_queue(Path(path)) if success else None)
thread.start()
self.download_threads[os.path.basename(url)] = thread
else:
path = data.get("path")
if path:
deck.add_queue(path)
def upload_track(self):
file_path, _ = QFileDialog.getOpenFileName(self, "Select Track to Upload", "", "Audio Files (*.mp3 *.wav *.m4a *.flac *.ogg)")
if not file_path:
return
if self.library_mode == "local":
filename = os.path.basename(file_path)
# Check for duplicates in local_library
if any(Path(track['path']).name.lower() == filename.lower() for track in self.local_library):
QMessageBox.information(self, "Import Skipped", f"'{filename}' is already in your local library.")
return
# Copy to local music folder
dest = self.lib_path / filename
try:
shutil.copy2(file_path, dest)
self.status_label.setText(f"Imported: {filename}")
self.load_library()
except Exception as e:
QMessageBox.warning(self, "Import Error", f"Failed to import file: {e}")
else:
# Upload to server
filename = os.path.basename(file_path)
# Check for duplicates in server_library
if hasattr(self, 'server_library'):
if any(track['file'].split('/')[-1].lower() == filename.lower() for track in self.server_library):
QMessageBox.information(self, "Upload Skipped", f"'{filename}' already exists on the server.")
return
try:
self.status_label.setText("Uploading to server...")
base_url = self.get_server_base_url()
password = self.all_settings.get("audio", {}).get("dj_panel_password", "")
# Authenticate if a password is configured (same approach as StreamingWorker)
session_cookie = None
if password:
session_cookie = StreamingWorker._get_auth_cookie(base_url, password)
if not session_cookie:
QMessageBox.warning(self, "Upload Error", "DJ panel authentication failed.\nCheck the password in Settings → Audio.")
self.status_label.setText("Upload failed")
return
headers = {}
if session_cookie:
headers['Cookie'] = session_cookie
with open(file_path, 'rb') as f:
files = {'file': f}
response = requests.post(f"{base_url}/upload", files=files, headers=headers, timeout=60)
if response.status_code == 200:
self.status_label.setText("Upload successful!")
self.load_library()
else:
try:
err = response.json().get('error', 'Unknown error')
except:
err = f"Server returned {response.status_code}"
QMessageBox.warning(self, "Upload Error", f"Server error: {err}")
self.status_label.setText("Upload failed")
except Exception as e:
QMessageBox.warning(self, "Upload Error", f"Failed to upload: {e}")
self.status_label.setText("Upload error")
def update_crossfade(self):
value = self.crossfader.value()
ratio = value / 100.0
# Cosine crossfade curve for smooth transition
deck_a_vol = int(math.cos(ratio * 0.5 * math.pi) * 100)
deck_b_vol = int(math.cos((1 - ratio) * 0.5 * math.pi) * 100)
self.deck_a.set_xf_vol(deck_a_vol)
self.deck_b.set_xf_vol(deck_b_vol)
def toggle_recording(self):
"""Start or stop recording"""
if not self.is_recording:
# Start recording
from datetime import datetime
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
filename = f"mix_{timestamp}.wav"
output_path = str(self.recordings_path / filename)
if self.recording_worker.start_recording(output_path):
self.is_recording = True
self.recording_start_time = time.time()
self.recording_timer.start(1000) # Update every second
# Update UI
self.record_button.setText("STOP")
self.record_button.setStyleSheet("""
QPushButton {
background-color: #550000;
color: #ff0000;
border: 2px solid #ff0000;
font-weight: bold;
font-size: 14px;
padding: 8px;
}
QPushButton:hover {
background-color: #770000;
border-color: #ff3333;
}
""")
self.recording_status_label.setText(f"Recording to: {filename}")
self.recording_status_label.setStyleSheet("color: #ff0000; font-size: 12px;")
print(f"[RECORDING] Started: {output_path}")
else:
# Stop recording
self.recording_worker.stop_recording()
self.is_recording = False
self.recording_timer.stop()
# Update UI
self.record_button.setText("REC")
self.record_button.setStyleSheet("""
QPushButton {
background-color: #330000;
color: #ff3333;
border: 2px solid #550000;
font-weight: bold;
font-size: 14px;
padding: 8px;
}
QPushButton:hover {
background-color: #550000;
border-color: #ff0000;
}
""")
self.recording_timer_label.setText("00:00")
self.recording_timer_label.setStyleSheet("color: #888; font-size: 24px; font-weight: bold; font-family: 'Courier New';")
self.recording_status_label.setText("Recording saved!")
self.recording_status_label.setStyleSheet("color: #00ff00; font-size: 12px;")
print("[RECORDING] Stopped")
# Reset status after 3 seconds
def _reset_rec_status():
self.recording_status_label.setText("Ready to record")
self.recording_status_label.setStyleSheet("color: #666; font-size: 12px;")
QTimer.singleShot(3000, _reset_rec_status)
def update_recording_time(self):
"""Update the recording timer display"""
if self.is_recording:
elapsed = int(time.time() - self.recording_start_time)
minutes = elapsed // 60
seconds = elapsed % 60
self.recording_timer_label.setText(f"{minutes:02d}:{seconds:02d}")
self.recording_timer_label.setStyleSheet("color: #ff0000; font-size: 24px; font-weight: bold; font-family: 'Courier New';")
def on_recording_error(self, error_msg):
"""Handle recording errors"""
QMessageBox.critical(self, "Recording Error", error_msg)
if self.is_recording:
self.is_recording = False
self.recording_timer.stop()
self.record_button.setText("REC")
self.recording_status_label.setText("Recording failed")
self.recording_status_label.setStyleSheet("color: #ff0000; font-size: 12px;")
def toggle_streaming(self):
"""Toggle live streaming on/off"""
if not self.is_streaming:
# Get base URL and credentials from settings
base_url = self.get_server_base_url()
bitrate = self.all_settings.get("audio", {}).get("bitrate", 128)
password = self.all_settings.get("audio", {}).get("dj_panel_password", "")
# Start streaming (password handled inside StreamingWorker)
if self.streaming_worker.start_streaming(base_url, bitrate, password=password):
self.is_streaming = True
self.stream_button.setText("STOP")
self.stream_button.setStyleSheet("""
QPushButton {
background-color: #003366;
color: #00ff00;
border: 2px solid #0066cc;
font-weight: bold;
font-size: 14px;
padding: 8px;
}
QPushButton:hover {
background-color: #0066cc;
border-color: #00ff00;
}
""")
self.stream_status_label.setText("LIVE")
self.stream_status_label.setStyleSheet("color: #ff0000; font-size: 12px; font-weight: bold;")
print("[STREAMING] Started")
else:
# Stop streaming
self.streaming_worker.stop_streaming()
self.is_streaming = False
self.stream_button.setText("LIVE")
self.stream_button.setStyleSheet("""
QPushButton {
background-color: #001a33;
color: #3399ff;
border: 2px solid #003366;
font-weight: bold;
font-size: 14px;
padding: 8px;
}
QPushButton:hover {
background-color: #003366;
border-color: #0066cc;
}
""")
self.stream_status_label.setText("Offline")
self.stream_status_label.setStyleSheet("color: #666; font-size: 12px;")
print("[STREAMING] Stopped")
def on_streaming_error(self, error_msg):
"""Handle streaming errors"""
QMessageBox.critical(self, "Streaming Error", error_msg)
if self.is_streaming:
self.is_streaming = False
self.stream_button.setText("LIVE")
self.stream_status_label.setText("Error")
self.stream_status_label.setStyleSheet("color: #ff0000; font-size: 12px;")
def update_listener_count(self, count):
self.listener_count_label.setText(f"{count} listeners")
def on_glow_slider_changed(self, val):
"""Send listener glow intensity to all connected listeners in real-time."""
self.glow_value_label.setText(str(val))
# Keep the worker's glow_intensity in sync so it's sent on reconnect too
self.streaming_worker.glow_intensity = val
self.streaming_worker.emit_if_connected('listener_glow', {'intensity': val})
def get_listener_base_url(self):
"""Return the listener server base URL (no auth required).
The listener server runs on DJ_port + 1 by convention (default 5001).
Library JSON and music_proxy routes are available there without any
password, so we use this for all library / track-download requests.
"""
audio_settings = self.all_settings.get("audio", {})
raw = audio_settings.get("stream_server_url", "http://localhost:5000").strip()
if not raw.startswith(("http://", "https://", "ws://", "wss://")):
raw = "http://" + raw
parsed = urlparse(raw)
host = parsed.hostname or "localhost"
dj_port = parsed.port or 5000
# Normalise: if the configured port IS the listener port, keep it;
# otherwise derive listener port as DJ port + 1.
if dj_port == 5001:
listener_port = 5001
else:
# Remap any reverse-proxy or listener port back to DJ port first,
# then add 1 to get the listener port.
if dj_port == 8080:
dj_port = 5000
listener_port = dj_port + 1
return f"http://{host}:{listener_port}"
def get_server_base_url(self):
audio_settings = self.all_settings.get("audio", {})
raw = audio_settings.get("stream_server_url", "http://localhost:5000").strip()
# Ensure scheme is present so urlparse parses host/port correctly
if not raw.startswith(("http://", "https://", "ws://", "wss://")):
raw = "http://" + raw
parsed = urlparse(raw)
host = parsed.hostname or "localhost"
port = parsed.port or 5000
# Remap listener port and common reverse-proxy port to the DJ server port
if port in (5001, 8080):
port = 5000
# Always use plain HTTP — the server never terminates TLS directly.
# Any path component (e.g. /stream.mp3, /api/stream) is intentionally stripped.
return f"http://{host}:{port}"
def resizeEvent(self, event):
"""Update glow frame size when window is resized"""
super().resizeEvent(event)
if hasattr(self, 'glow_frame'):
self.glow_frame.setGeometry(self.centralWidget().rect())
def closeEvent(self, event):
# Stop recording if active
if self.is_recording:
self.recording_worker.stop_recording()
# Stop streaming if active
if self.is_streaming:
self.streaming_worker.stop_streaming()
self.deck_a.stop()
self.deck_b.stop()
self.search_worker.kill()
self.download_worker.kill()
event.accept()
if __name__ == "__main__":
# Create virtual sink BEFORE QApplication so Qt's audio output
# is automatically routed to it via PULSE_SINK env var.
_setup_audio_isolation()
app = QApplication(sys.argv)
app.setApplicationName("TechDJ Pro")
app.setDesktopFileName("techdj-pro")
window = DJApp()
window.show()
sys.exit(app.exec())