# Monkey patch MUST be first - before any other imports! import eventlet eventlet.monkey_patch() import os import json import re import subprocess import threading import queue import time import collections import hmac from flask import Flask, send_from_directory, jsonify, request, session, Response, stream_with_context, abort, render_template, make_response from flask_socketio import SocketIO, emit from dotenv import load_dotenv load_dotenv() def _load_config(): """Loads optional config.json from the project root. If missing or invalid, returns an empty dict. """ try: with open('config.json', 'r', encoding='utf-8') as f: data = json.load(f) return data if isinstance(data, dict) else {} except FileNotFoundError: return {} except Exception: return {} CONFIG = _load_config() # --- Config-driven settings (config.json > env vars > defaults) --- CONFIG_HOST = (CONFIG.get('host') or os.environ.get('HOST') or '0.0.0.0').strip() CONFIG_DJ_PORT = int(CONFIG.get('dj_port') or os.environ.get('DJ_PORT') or 5000) CONFIG_LISTENER_PORT = int(CONFIG.get('listener_port') or os.environ.get('LISTEN_PORT') or 5001) CONFIG_SECRET = (CONFIG.get('secret_key') or '').strip() or 'dj_panel_secret' CONFIG_CORS = CONFIG.get('cors_origins', '*') CONFIG_MAX_UPLOAD_MB = int(CONFIG.get('max_upload_mb') or 500) CONFIG_DEBUG = bool(CONFIG.get('debug', False)) CONFIG_LISTENER_URL = (CONFIG.get('listener_url') or '').strip() # MediaMTX / SRT integration # secret shared with MediaMTX webhook requests (leave blank to disable auth) _SRT_WEBHOOK_SECRET = (CONFIG.get('mediamtx_webhook_secret') or '').strip() # SRT ingest port MediaMTX listens on (Flask does not bind this — configure in mediamtx.yml) _MEDIAMTX_SRT_PORT = int(CONFIG.get('mediamtx_srt_port') or 8890) # RTSP URL MediaMTX exposes for the incoming SRT path (used by ffmpeg to pull audio) _MEDIAMTX_RTSP_URL = (CONFIG.get('mediamtx_rtsp_url') or 'rtsp://127.0.0.1:8554/live').strip() _MEDIAMTX_HLS_URL = (CONFIG.get('mediamtx_hls_url') or 'http://techy.music:8888/aussie_dj/index.m3u8').strip() DJ_PANEL_PASSWORD = (CONFIG.get('dj_panel_password') or '').strip() DJ_AUTH_ENABLED = bool(DJ_PANEL_PASSWORD) # Broadcast State broadcast_state = { 'active': False, } # SRT stream state — updated by the /api/webhook MediaMTX callback _srt_state = { 'active': False, 'path': None, 'source_id': None, 'started_at': None, } # ffmpeg process that pulls audio from MediaMTX's RTSP output for an SRT session _srt_ffmpeg_proc = None listener_sids = set() dj_sids = set() # Set to True once the first audio chunk arrives in MP3-direct mode. # broadcast_started is held back until then so listeners don't connect # to /stream.mp3 before any data is flowing (e.g. before the DJ presses play). _mp3_broadcast_announced = False # Current listener glow intensity — persisted so new/reloaded listeners get the # correct level immediately without waiting for the DJ to move the slider again. _current_glow_intensity = 30 # Grace-period greenlet: auto-stop broadcast if DJ doesn't reconnect in time _dj_grace_greenlet = None DJ_GRACE_PERIOD_SECS = 20 # seconds to wait before auto-stopping # === Optional MP3 fallback stream (server-side transcoding) === _ffmpeg_proc = None _ffmpeg_in_q = queue.Queue(maxsize=200) _current_bitrate = (CONFIG.get('stream_bitrate') or '192k').strip() _mp3_clients = set() # set[queue.Queue] _mp3_lock = threading.Lock() _transcoder_bytes_out = 0 _transcoder_last_error = None _last_audio_chunk_ts = 0.0 _mp3_preroll = collections.deque() # byte-budget FIFO; evicted from the front when over limit _mp3_preroll_bytes = 0 _MP3_PREROLL_MAX_BYTES = 256 * 1024 # 256 KB ≈ 10 s at 192 kbps — enough for smooth reconnects def _preroll_append(data: bytes) -> None: """Add a chunk to the pre-roll buffer, evicting the oldest data from the front when the byte budget is exceeded. MUST be called with _mp3_lock already held. """ global _mp3_preroll_bytes _mp3_preroll.append(data) _mp3_preroll_bytes += len(data) while _mp3_preroll_bytes > _MP3_PREROLL_MAX_BYTES and _mp3_preroll: _mp3_preroll_bytes -= len(_mp3_preroll.popleft()) def _preroll_clear() -> None: """Empty the pre-roll buffer and reset its byte counter. MUST be called with _mp3_lock already held. """ global _mp3_preroll_bytes _mp3_preroll.clear() _mp3_preroll_bytes = 0 def _start_transcoder_if_needed(is_mp3_input=False): global _ffmpeg_proc, _transcoder_last_error # If already running, check if we need to restart if _ffmpeg_proc is not None and _ffmpeg_proc.poll() is None: return # Ensure stale process is cleaned up if _ffmpeg_proc: try: _ffmpeg_proc.terminate() except: pass _ffmpeg_proc = None codec = 'copy' if is_mp3_input else 'libmp3lame' cmd = [ 'ffmpeg', '-hide_banner', '-loglevel', 'error', '-fflags', 'nobuffer', '-flags', 'low_delay', '-probesize', '32', '-analyzeduration', '0' ] if is_mp3_input: cmd.extend(['-f', 'mp3']) cmd.extend([ '-i', 'pipe:0', '-vn', '-acodec', codec, ]) if not is_mp3_input: cmd.extend(['-b:a', _current_bitrate]) cmd.extend([ '-flush_packets', '1', '-f', 'mp3', 'pipe:1', ]) try: _ffmpeg_proc = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, ) except FileNotFoundError: _ffmpeg_proc = None print('WARNING: ffmpeg not found; /stream.mp3 fallback disabled') return mode_str = "PASSTHROUGH (copy)" if is_mp3_input else f"TRANSCODE ({_current_bitrate})" print(f'INFO: ffmpeg transcoder started ({mode_str})') _transcoder_last_error = None # Clear queue to avoid stale data while not _ffmpeg_in_q.empty(): try: _ffmpeg_in_q.get_nowait() except: break # Define greenlets INSIDE so they close over THIS specific 'proc'. # Blocking subprocess pipe I/O is delegated to eventlet.tpool so it runs # in a real OS thread, preventing it from stalling the eventlet hub. def _writer(proc): global _transcoder_last_error print(f"[THREAD] Transcoder writer started (PID: {proc.pid})") while proc.poll() is None: try: chunk = _ffmpeg_in_q.get(timeout=1.0) if chunk is None: break if proc.stdin: proc.stdin.write(chunk) proc.stdin.flush() except queue.Empty: continue except (BrokenPipeError, ConnectionResetError): _transcoder_last_error = "Broken pipe in writer" break except Exception as e: print(f"WARNING: Transcoder writer error: {e}") _transcoder_last_error = str(e) break # Ensure process is killed if greenlet exits unexpectedly if proc.poll() is None: try: proc.terminate() except: pass print(f"[THREAD] Transcoder writer finished (PID: {proc.pid})") def _reader(proc): global _transcoder_bytes_out, _transcoder_last_error print(f"[THREAD] Transcoder reader started (PID: {proc.pid})") while proc.poll() is None: try: data = proc.stdout.read(1024) if not data: break _transcoder_bytes_out += len(data) with _mp3_lock: _preroll_append(data) clients = list(_mp3_clients) for q in clients: try: q.put_nowait(data) except queue.Full: # Client is too slow — skip this chunk for them pass except Exception as e: print(f"WARNING: Transcoder reader error: {e}") _transcoder_last_error = str(e) break # Ensure process is killed if greenlet exits unexpectedly if proc.poll() is None: try: proc.terminate() except: pass print(f"[THREAD] Transcoder reader finished (PID: {proc.pid})") # Spawn as greenlets — tpool handles the blocking subprocess I/O internally eventlet.spawn(_writer, _ffmpeg_proc) eventlet.spawn(_reader, _ffmpeg_proc) def _stop_transcoder(): global _ffmpeg_proc print("STOPPING: Transcoder process") proc = _ffmpeg_proc _ffmpeg_proc = None # Null first so _feed_transcoder stops enqueuing # Drain stale input chunks so the None sentinel is guaranteed to fit and # the writer greenlet exits on its next get() instead of stalling. while True: try: _ffmpeg_in_q.get_nowait() except queue.Empty: break try: _ffmpeg_in_q.put_nowait(None) except queue.Full: pass if proc: try: proc.terminate() proc.communicate(timeout=1.0) except: try: proc.kill() except: pass with _mp3_lock: clients = list(_mp3_clients) for q in clients: try: q.put_nowait(None) except: pass _mp3_clients.clear() _preroll_clear() # === SRT / MediaMTX transcoder === # When MediaMTX receives the incoming SRT stream it exposes it as an RTSP endpoint. # We launch a dedicated ffmpeg process that *pulls* from that RTSP URL and feeds the # same _mp3_clients / _mp3_preroll distribution system used by all other stream modes. def _start_srt_transcoder(): """Start an ffmpeg process that pulls from MediaMTX's RTSP output.""" global _srt_ffmpeg_proc, _transcoder_last_error if _srt_ffmpeg_proc is not None and _srt_ffmpeg_proc.poll() is None: return # Already running if _srt_ffmpeg_proc: try: _srt_ffmpeg_proc.terminate() except: pass _srt_ffmpeg_proc = None cmd = [ 'ffmpeg', '-hide_banner', '-loglevel', 'error', '-fflags', 'nobuffer', '-flags', 'low_delay', '-rtsp_transport', 'tcp', '-i', _MEDIAMTX_RTSP_URL, '-vn', '-acodec', 'libmp3lame', '-b:a', _current_bitrate, '-flush_packets', '1', '-f', 'mp3', 'pipe:1', ] try: _srt_ffmpeg_proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, ) except FileNotFoundError: _srt_ffmpeg_proc = None print('WARNING: ffmpeg not found; SRT→MP3 bridge disabled') return print(f'INFO: SRT transcoder started — pulling from {_MEDIAMTX_RTSP_URL} @ {_current_bitrate}') _transcoder_last_error = None def _srt_reader(proc): global _transcoder_bytes_out, _transcoder_last_error print(f'[THREAD] SRT reader started (PID: {proc.pid})') while proc.poll() is None: try: data = proc.stdout.read(4096) if not data: break _transcoder_bytes_out += len(data) with _mp3_lock: _preroll_append(data) clients = list(_mp3_clients) for q in clients: try: q.put_nowait(data) except queue.Full: pass except Exception as e: print(f'WARNING: SRT reader error: {e}') _transcoder_last_error = str(e) break # Log any ffmpeg stderr output to help diagnose failures try: err = proc.stderr.read().decode(errors='replace').strip() if err: print(f'[FFMPEG SRT] {err}') except Exception: pass if proc.poll() is None: try: proc.terminate() except: pass print(f'[THREAD] SRT reader finished (PID: {proc.pid})') eventlet.spawn(_srt_reader, _srt_ffmpeg_proc) def _stop_srt_transcoder(): """Terminate the SRT→MP3 ffmpeg process and flush all listener queues.""" global _srt_ffmpeg_proc print('STOPPING: SRT transcoder') proc = _srt_ffmpeg_proc _srt_ffmpeg_proc = None if proc: try: proc.terminate() proc.communicate(timeout=2.0) except Exception: try: proc.kill() except: pass with _mp3_lock: clients = list(_mp3_clients) for q in clients: try: q.put_nowait(None) except: pass _mp3_clients.clear() _preroll_clear() def _feed_transcoder(data: bytes): global _last_audio_chunk_ts if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None: # If active but dead, restart it automatically (non-MP3 mode only) if broadcast_state.get('active'): _start_transcoder_if_needed(is_mp3_input=False) else: return _last_audio_chunk_ts = time.time() try: _ffmpeg_in_q.put_nowait(data) except queue.Full: # Drop chunk if overflow to prevent memory bloat pass def _distribute_mp3(data: bytes): """Distribute MP3 bytes directly to preroll buffer and all connected listener clients, bypassing the ffmpeg transcoder entirely. Used when the DJ is already sending valid MP3 (e.g. the Qt desktop client) to eliminate the unnecessary encode/decode round-trip and cut pipeline latency roughly in half. """ global _transcoder_bytes_out, _last_audio_chunk_ts _last_audio_chunk_ts = time.time() _transcoder_bytes_out += len(data) with _mp3_lock: _preroll_append(data) for q in list(_mp3_clients): try: q.put_nowait(data) except queue.Full: pass # Load settings to get MUSIC_FOLDER def _load_settings(): try: if os.path.exists('settings.json'): with open('settings.json', 'r', encoding='utf-8') as f: return json.load(f) except: pass return {} SETTINGS = _load_settings() _config_music = (CONFIG.get('music_folder') or '').strip() MUSIC_FOLDER = _config_music or SETTINGS.get('library', {}).get('music_folder', 'music') # Ensure music folder exists if not os.path.exists(MUSIC_FOLDER): try: os.makedirs(MUSIC_FOLDER) except: # Fallback to default if custom path fails MUSIC_FOLDER = "music" if not os.path.exists(MUSIC_FOLDER): os.makedirs(MUSIC_FOLDER) # Helper for shared routes def setup_shared_routes(app, index_file='index.html'): @app.route('/library.json') def get_library(): library = [] global MUSIC_FOLDER if os.path.exists(MUSIC_FOLDER): for root, dirs, files in os.walk(MUSIC_FOLDER): for filename in sorted(files): if filename.lower().endswith(('.mp3', '.m4a', '.wav', '.flac', '.ogg')): rel_path = os.path.relpath(os.path.join(root, filename), MUSIC_FOLDER) library.append({ "title": os.path.splitext(filename)[0], "file": f"music_proxy/{rel_path}", # Absolute path exposed for Tauri's asset protocol (convertFileSrc). # The web fallback keeps using the music_proxy route above. "absolutePath": os.path.abspath(os.path.join(root, filename)) }) break # Top-level only return jsonify(library) @app.route('/music_proxy/') def music_proxy(filename): return send_from_directory(MUSIC_FOLDER, filename) @app.route('/browse_directories', methods=['GET']) def browse_directories(): path = request.args.get('path', os.path.expanduser('~')) try: entries = [] if os.path.exists(path) and os.path.isdir(path): # Add parent parent = os.path.dirname(os.path.abspath(path)) entries.append({"name": "..", "path": parent, "isDir": True}) for item in sorted(os.listdir(path)): full_path = os.path.join(path, item) if os.path.isdir(full_path) and not item.startswith('.'): entries.append({ "name": item, "path": full_path, "isDir": True }) return jsonify({"success": True, "path": os.path.abspath(path), "entries": entries}) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route('/update_settings', methods=['POST']) def update_settings(): try: data = request.get_json() # Load existing settings = _load_settings() # Update selectively if 'library' not in settings: settings['library'] = {} if 'music_folder' in data.get('library', {}): new_folder = data['library']['music_folder'] if os.path.exists(new_folder) and os.path.isdir(new_folder): settings['library']['music_folder'] = new_folder global MUSIC_FOLDER MUSIC_FOLDER = new_folder else: return jsonify({"success": False, "error": "Invalid folder path"}), 400 with open('settings.json', 'w', encoding='utf-8') as f: json.dump(settings, f, indent=4) return jsonify({"success": True}) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route('/') def serve_static(filename): # Prevent path traversal if '..' in filename or filename.startswith('/'): abort(403) # Allow only known safe static file extensions allowed_extensions = ('.css', '.js', '.html', '.htm', '.png', '.jpg', '.jpeg', '.gif', '.svg', '.ico', '.woff', '.woff2', '.ttf', '.eot', '.map') if not filename.endswith(allowed_extensions): abort(403) response = send_from_directory('.', filename) if filename.endswith(('.css', '.js', '.html')): response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0' return response @app.route('/') def index(): response = send_from_directory('.', index_file) response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0' response.headers['Pragma'] = 'no-cache' response.headers['Expires'] = '0' return response @app.route('/upload', methods=['POST']) def upload_file(): if 'file' not in request.files: return jsonify({"success": False, "error": "No file provided"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"success": False, "error": "No file selected"}), 400 allowed_exts = ('.mp3', '.m4a', '.wav', '.flac', '.ogg') ext = os.path.splitext(file.filename)[1].lower() if ext not in allowed_exts: return jsonify({"success": False, "error": f"Supported formats: {', '.join(allowed_exts)}"}), 400 # Sanitize filename (keep extension) name_without_ext = os.path.splitext(file.filename)[0] name_without_ext = re.sub(r'[^\w\s-]', '', name_without_ext) name_without_ext = re.sub(r'\s+', ' ', name_without_ext).strip() filename = f"{name_without_ext}{ext}" filepath = os.path.join(MUSIC_FOLDER, filename) if os.path.exists(filepath): return jsonify({"success": False, "error": "File already exists in library"}), 409 try: file.save(filepath) print(f"UPLOADED: {filename}") return jsonify({"success": True, "filename": filename}) except Exception as e: print(f"ERROR: Upload error: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/save_keymaps', methods=['POST']) def save_keymaps(): try: data = request.get_json() with open('keymaps.json', 'w', encoding='utf-8') as f: json.dump(data, f, indent=4) print("SAVED: Keymaps saved to keymaps.json") return jsonify({"success": True}) except Exception as e: print(f"ERROR: Save keymaps error: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/load_keymaps', methods=['GET']) def load_keymaps(): try: if os.path.exists('keymaps.json'): with open('keymaps.json', 'r', encoding='utf-8') as f: data = json.load(f) return jsonify({"success": True, "keymaps": data}) else: return jsonify({"success": True, "keymaps": None}) except Exception as e: print(f"ERROR: Load keymaps error: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/stream.mp3') def stream_mp3(): """Live MP3 audio stream from the ffmpeg transcoder.""" # If broadcast is not active, return 503 with audio/mpeg content type. # Returning JSON here would confuse the browser's