# Monkey patch MUST be first - before any other imports! import eventlet eventlet.monkey_patch() import os import json import re import subprocess import threading import queue import time import collections from flask import Flask, send_from_directory, jsonify, request, session, Response, stream_with_context, abort from flask_socketio import SocketIO, emit from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() def _load_config(): """Loads optional config.json from the project root. If missing or invalid, returns an empty dict. """ try: with open('config.json', 'r', encoding='utf-8') as f: data = json.load(f) return data if isinstance(data, dict) else {} except FileNotFoundError: return {} except Exception: return {} CONFIG = _load_config() # --- Config-driven settings (config.json > env vars > defaults) --- CONFIG_HOST = (CONFIG.get('host') or os.environ.get('HOST') or '0.0.0.0').strip() CONFIG_DJ_PORT = int(CONFIG.get('dj_port') or os.environ.get('DJ_PORT') or 5000) CONFIG_LISTENER_PORT = int(CONFIG.get('listener_port') or os.environ.get('LISTEN_PORT') or 5001) CONFIG_SECRET = (CONFIG.get('secret_key') or '').strip() or 'dj_panel_secret' CONFIG_CORS = CONFIG.get('cors_origins', '*') CONFIG_MAX_UPLOAD_MB = int(CONFIG.get('max_upload_mb') or 500) CONFIG_DEBUG = bool(CONFIG.get('debug', False)) DJ_PANEL_PASSWORD = (CONFIG.get('dj_panel_password') or '').strip() DJ_AUTH_ENABLED = bool(DJ_PANEL_PASSWORD) # Relay State broadcast_state = { 'active': False, } listener_sids = set() dj_sids = set() # === Optional MP3 fallback stream (server-side transcoding) === _ffmpeg_proc = None _ffmpeg_in_q = queue.Queue(maxsize=20) # Optimized for low-latency live streaming _current_bitrate = (CONFIG.get('stream_bitrate') or '192k').strip() _mp3_clients = set() # set[queue.Queue] _mp3_lock = threading.Lock() _transcoder_bytes_out = 0 _transcoder_last_error = None _last_audio_chunk_ts = 0.0 _mp3_preroll = collections.deque(maxlen=512) # Larger pre-roll (~512KB) def _start_transcoder_if_needed(is_mp3_input=False): global _ffmpeg_proc, _transcoder_last_error # If already running, check if we need to restart if _ffmpeg_proc is not None and _ffmpeg_proc.poll() is None: return # Ensure stale process is cleaned up if _ffmpeg_proc: try: _ffmpeg_proc.terminate() except: pass _ffmpeg_proc = None codec = 'copy' if is_mp3_input else 'libmp3lame' cmd = [ 'ffmpeg', '-hide_banner', '-loglevel', 'error', '-fflags', 'nobuffer', '-flags', 'low_delay', '-probesize', '32', '-analyzeduration', '0' ] if is_mp3_input: cmd.extend(['-f', 'mp3']) cmd.extend([ '-i', 'pipe:0', '-vn', '-acodec', codec, ]) if not is_mp3_input: cmd.extend(['-b:a', _current_bitrate]) cmd.extend([ '-flush_packets', '1', '-f', 'mp3', 'pipe:1', ]) try: _ffmpeg_proc = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, ) except FileNotFoundError: _ffmpeg_proc = None print('WARNING: ffmpeg not found; /stream.mp3 fallback disabled') return mode_str = "PASSTHROUGH (copy)" if is_mp3_input else f"TRANSCODE ({_current_bitrate})" print(f'INFO: ffmpeg transcoder started ({mode_str})') _transcoder_last_error = None # Clear queue to avoid stale data while not _ffmpeg_in_q.empty(): try: _ffmpeg_in_q.get_nowait() except: break # Define threads INSIDE so they close over THIS specific 'proc' def _writer(proc): global _transcoder_last_error print(f"[THREAD] Transcoder writer started (PID: {proc.pid})") while proc.poll() is None: try: chunk = _ffmpeg_in_q.get(timeout=1.0) if chunk is None: break if proc.stdin: proc.stdin.write(chunk) proc.stdin.flush() except queue.Empty: continue except (BrokenPipeError, ConnectionResetError): _transcoder_last_error = "Broken pipe in writer" break except Exception as e: print(f"WARNING: Transcoder writer error: {e}") _transcoder_last_error = str(e) break # Ensure process is killed if thread exits unexpectedly if proc.poll() is None: try: proc.terminate() except: pass print(f"[THREAD] Transcoder writer finished (PID: {proc.pid})") def _reader(proc): global _transcoder_bytes_out, _transcoder_last_error print(f"[THREAD] Transcoder reader started (PID: {proc.pid})") while proc.poll() is None: try: # Smaller read for smoother delivery (1KB) # This prevents buffering delays at lower bitrates data = proc.stdout.read(1024) if not data: break _transcoder_bytes_out += len(data) with _mp3_lock: _mp3_preroll.append(data) clients = list(_mp3_clients) for q in clients: try: q.put_nowait(data) except queue.Full: # Client is too slow, skip this chunk for them pass except Exception as e: print(f"WARNING: Transcoder reader error: {e}") _transcoder_last_error = str(e) break # Ensure process is killed if thread exits unexpectedly if proc.poll() is None: try: proc.terminate() except: pass print(f"[THREAD] Transcoder reader finished (PID: {proc.pid})") # Start greenlets/threads for THIS process specifically eventlet.spawn(_writer, _ffmpeg_proc) eventlet.spawn(_reader, _ffmpeg_proc) def _stop_transcoder(): global _ffmpeg_proc print("STOPPING: Transcoder process") # Signal threads to stop via the queue try: _ffmpeg_in_q.put_nowait(None) except: pass # Shutdown the process proc = _ffmpeg_proc _ffmpeg_proc = None if proc: try: proc.terminate() # Drain stdout/stderr to satisfy OS buffers proc.communicate(timeout=1.0) except: try: proc.kill() except: pass # Clear client state with _mp3_lock: clients = list(_mp3_clients) for q in clients: try: q.put_nowait(None) except: pass _mp3_clients.clear() _mp3_preroll.clear() def _feed_transcoder(data: bytes): global _last_audio_chunk_ts if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None: # If active but dead, restart it automatically if broadcast_state.get('active'): _start_transcoder_if_needed(is_mp3_input=broadcast_state.get('is_mp3_input', False)) else: return _last_audio_chunk_ts = time.time() try: _ffmpeg_in_q.put_nowait(data) except queue.Full: # Drop chunk if overflow to prevent memory bloat pass # Load settings to get MUSIC_FOLDER def _load_settings(): try: if os.path.exists('settings.json'): with open('settings.json', 'r', encoding='utf-8') as f: return json.load(f) except: pass return {} SETTINGS = _load_settings() # Config.json music_folder overrides settings.json if set _config_music = (CONFIG.get('music_folder') or '').strip() MUSIC_FOLDER = _config_music or SETTINGS.get('library', {}).get('music_folder', 'music') # Ensure music folder exists if not os.path.exists(MUSIC_FOLDER): try: os.makedirs(MUSIC_FOLDER) except: # Fallback to default if custom path fails MUSIC_FOLDER = "music" if not os.path.exists(MUSIC_FOLDER): os.makedirs(MUSIC_FOLDER) # Helper for shared routes def setup_shared_routes(app, index_file='index.html'): @app.route('/library.json') def get_library(): library = [] global MUSIC_FOLDER if os.path.exists(MUSIC_FOLDER): # Recursively find music files if desired, or stay top-level. # The prompt says "choose which folder", so maybe top-level of that folder is fine. for root, dirs, files in os.walk(MUSIC_FOLDER): for filename in sorted(files): if filename.lower().endswith(('.mp3', '.m4a', '.wav', '.flac', '.ogg')): rel_path = os.path.relpath(os.path.join(root, filename), MUSIC_FOLDER) library.append({ "title": os.path.splitext(filename)[0], "file": f"music_proxy/{rel_path}" }) break # Only top level for now to keep it simple, or remove break for recursive return jsonify(library) @app.route('/music_proxy/') def music_proxy(filename): return send_from_directory(MUSIC_FOLDER, filename) @app.route('/browse_directories', methods=['GET']) def browse_directories(): path = request.args.get('path', os.path.expanduser('~')) try: entries = [] if os.path.exists(path) and os.path.isdir(path): # Add parent parent = os.path.dirname(os.path.abspath(path)) entries.append({"name": "..", "path": parent, "isDir": True}) for item in sorted(os.listdir(path)): full_path = os.path.join(path, item) if os.path.isdir(full_path) and not item.startswith('.'): entries.append({ "name": item, "path": full_path, "isDir": True }) return jsonify({"success": True, "path": os.path.abspath(path), "entries": entries}) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route('/update_settings', methods=['POST']) def update_settings(): try: data = request.get_json() # Load existing settings = _load_settings() # Update selectively if 'library' not in settings: settings['library'] = {} if 'music_folder' in data.get('library', {}): new_folder = data['library']['music_folder'] if os.path.exists(new_folder) and os.path.isdir(new_folder): settings['library']['music_folder'] = new_folder global MUSIC_FOLDER MUSIC_FOLDER = new_folder else: return jsonify({"success": False, "error": "Invalid folder path"}), 400 with open('settings.json', 'w', encoding='utf-8') as f: json.dump(settings, f, indent=4) return jsonify({"success": True}) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route('/') def serve_static(filename): # Prevent path traversal if '..' in filename or filename.startswith('/'): abort(403) # Allow only known safe static file extensions allowed_extensions = ('.css', '.js', '.html', '.htm', '.png', '.jpg', '.jpeg', '.gif', '.svg', '.ico', '.woff', '.woff2', '.ttf', '.eot', '.map') if not filename.endswith(allowed_extensions): abort(403) response = send_from_directory('.', filename) if filename.endswith(('.css', '.js', '.html')): response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0' return response @app.route('/') def index(): response = send_from_directory('.', index_file) response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0' response.headers['Pragma'] = 'no-cache' response.headers['Expires'] = '0' return response @app.route('/upload', methods=['POST']) def upload_file(): if 'file' not in request.files: return jsonify({"success": False, "error": "No file provided"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"success": False, "error": "No file selected"}), 400 allowed_exts = ('.mp3', '.m4a', '.wav', '.flac', '.ogg') ext = os.path.splitext(file.filename)[1].lower() if ext not in allowed_exts: return jsonify({"success": False, "error": f"Supported formats: {', '.join(allowed_exts)}"}), 400 # Sanitize filename (keep extension) name_without_ext = os.path.splitext(file.filename)[0] name_without_ext = re.sub(r'[^\w\s-]', '', name_without_ext) name_without_ext = re.sub(r'\s+', ' ', name_without_ext).strip() filename = f"{name_without_ext}{ext}" filepath = os.path.join(MUSIC_FOLDER, filename) try: file.save(filepath) print(f"UPLOADED: {filename}") return jsonify({"success": True, "filename": filename}) except Exception as e: print(f"ERROR: Upload error: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/save_keymaps', methods=['POST']) def save_keymaps(): try: data = request.get_json() with open('keymaps.json', 'w', encoding='utf-8') as f: json.dump(data, f, indent=4) print("SAVED: Keymaps saved to keymaps.json") return jsonify({"success": True}) except Exception as e: print(f"ERROR: Save keymaps error: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/load_keymaps', methods=['GET']) def load_keymaps(): try: if os.path.exists('keymaps.json'): with open('keymaps.json', 'r', encoding='utf-8') as f: data = json.load(f) return jsonify({"success": True, "keymaps": data}) else: return jsonify({"success": True, "keymaps": None}) except Exception as e: print(f"ERROR: Load keymaps error: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/stream.mp3') def stream_mp3(): # Streaming response from the ffmpeg transcoder output. # If ffmpeg isn't available, return 503. if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None: return jsonify({"success": False, "error": "MP3 stream not available"}), 503 print(f"LISTENER: New listener joined stream (Bursting {_mp3_preroll.maxlen} frames)") client_q: queue.Queue = queue.Queue(maxsize=500) with _mp3_lock: # Burst pre-roll to new client so they start playing instantly for chunk in _mp3_preroll: try: client_q.put_nowait(chunk) except Exception: break _mp3_clients.add(client_q) def gen(): try: while True: try: chunk = client_q.get(timeout=30) except queue.Empty: # No data for 30s - check if broadcast is still active if not broadcast_state.get('active'): break continue if chunk is None: break yield chunk finally: with _mp3_lock: _mp3_clients.discard(client_q) print(f"LISTENER: Listener disconnected from stream") return Response(stream_with_context(gen()), content_type='audio/mpeg', headers={ 'Cache-Control': 'no-cache, no-store, must-revalidate', 'Connection': 'keep-alive', 'Pragma': 'no-cache', 'Expires': '0', 'X-Content-Type-Options': 'nosniff' }) @app.route('/stream_debug') def stream_debug(): proc = _ffmpeg_proc running = proc is not None and proc.poll() is None return jsonify({ 'broadcast_active': broadcast_state.get('active', False), 'broadcast_mimeType': broadcast_state.get('mimeType'), 'ffmpeg_running': running, 'ffmpeg_found': (proc is not None), 'mp3_clients': len(_mp3_clients), 'transcoder_bytes_out': _transcoder_bytes_out, 'transcoder_last_error': _transcoder_last_error, 'last_audio_chunk_ts': _last_audio_chunk_ts, }) # === DJ SERVER === dj_app = Flask(__name__, static_folder='.', static_url_path='') dj_app.config['SECRET_KEY'] = CONFIG_SECRET dj_app.config['MAX_CONTENT_LENGTH'] = CONFIG_MAX_UPLOAD_MB * 1024 * 1024 setup_shared_routes(dj_app) @dj_app.before_request def _protect_dj_panel(): """Optionally require a password for the DJ panel only (port 5000). This does not affect the listener server (port 5001). """ if not DJ_AUTH_ENABLED: return None # Allow login/logout endpoints if request.path in ('/login', '/logout'): return None # If already authenticated, allow if session.get('dj_authed') is True: return None # Redirect everything else to login return ( "" "Redirecting to /login...", 302, {'Location': '/login'} ) @dj_app.route('/login', methods=['GET', 'POST']) def dj_login(): if not DJ_AUTH_ENABLED: # If auth is disabled, just go to the panel. session['dj_authed'] = True return ( "" "Auth disabled. Redirecting...", 302, {'Location': '/'} ) error = None if request.method == 'POST': pw = (request.form.get('password') or '').strip() if pw == DJ_PANEL_PASSWORD: session['dj_authed'] = True return ( "" "Logged in. Redirecting...", 302, {'Location': '/'} ) error = 'Invalid password' # Minimal inline login page (no new assets) return f""" TechDJ - DJ Login

DJ Panel Locked

{f"
{error}
" if error else ""}
Set/disable this in config.json (dj_panel_password).
""" @dj_app.route('/logout') def dj_logout(): session.pop('dj_authed', None) return ( "" "Logged out. Redirecting...", 302, {'Location': '/login'} ) dj_socketio = SocketIO( dj_app, cors_allowed_origins=CONFIG_CORS, async_mode='eventlet', max_http_buffer_size=CONFIG_MAX_UPLOAD_MB * 1024 * 1024, ping_timeout=60, ping_interval=25, logger=CONFIG_DEBUG, engineio_logger=CONFIG_DEBUG ) @dj_socketio.on('connect') def dj_connect(): if DJ_AUTH_ENABLED and session.get('dj_authed') is not True: print(f"REJECTED: DJ socket rejected (unauthorized): {request.sid}") return False print(f"STREAMPANEL: DJ connected: {request.sid}") dj_sids.add(request.sid) @dj_socketio.on('disconnect') def dj_disconnect(): dj_sids.discard(request.sid) print("WARNING: DJ disconnected - broadcast will continue until manually stopped") def stop_broadcast_after_timeout(): """No longer used - broadcasts don't auto-stop""" pass @dj_socketio.on('start_broadcast') def dj_start(data=None): was_already_active = broadcast_state.get('active', False) broadcast_state['active'] = True session['is_dj'] = True print("BROADCAST: ACTIVE") is_mp3_input = False if data: if 'bitrate' in data: global _current_bitrate _current_bitrate = data['bitrate'] print(f"BITRATE: Setting stream bitrate to: {_current_bitrate}") if data.get('format') == 'mp3': is_mp3_input = True broadcast_state['is_mp3_input'] = is_mp3_input if not was_already_active: # Fresh broadcast start - clear pre-roll and start transcoder cleanly with _mp3_lock: _mp3_preroll.clear() _start_transcoder_if_needed(is_mp3_input=is_mp3_input) # Tell listeners a new broadcast has begun (triggers audio player reload) listener_socketio.emit('broadcast_started', namespace='/') else: # DJ reconnected mid-broadcast - just ensure transcoder is alive # Do NOT clear pre-roll or trigger listener reload print("BROADCAST: DJ reconnected - resuming existing broadcast") _start_transcoder_if_needed(is_mp3_input=is_mp3_input) # Always send current status so any waiting listeners get unblocked listener_socketio.emit('stream_status', {'active': True}, namespace='/') @dj_socketio.on('get_listener_count') def dj_get_listener_count(): emit('listener_count', {'count': len(listener_sids)}) @dj_socketio.on('stop_broadcast') def dj_stop(): broadcast_state['active'] = False session['is_dj'] = False print("STOPPED: DJ stopped broadcasting") _stop_transcoder() listener_socketio.emit('broadcast_stopped', namespace='/') listener_socketio.emit('stream_status', {'active': False}, namespace='/') @dj_socketio.on('audio_chunk') def dj_audio(data): # MP3-only mode: do not relay raw chunks to listeners; feed transcoder only. if broadcast_state['active']: # Ensure MP3 fallback transcoder is running (if ffmpeg is installed) if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None: # If we don't know the format, default to transcode, # but usually start_broadcast handles this _start_transcoder_if_needed() if isinstance(data, (bytes, bytearray)): _feed_transcoder(bytes(data)) # === LISTENER SERVER === # static_folder=None prevents Flask's built-in static handler from serving # DJ files (like index.html) at / — all static files go through our # custom serve_static route which has security checks. listener_app = Flask(__name__, static_folder=None) listener_app.config['SECRET_KEY'] = CONFIG_SECRET + '_listener' listener_app.config['MAX_CONTENT_LENGTH'] = CONFIG_MAX_UPLOAD_MB * 1024 * 1024 setup_shared_routes(listener_app, index_file='listener.html') # Block write/admin endpoints AND DJ-only files on the listener server @listener_app.before_request def _restrict_listener_routes(): """Prevent listeners from accessing DJ-only endpoints and files.""" blocked_paths = ('/update_settings', '/upload', '/save_keymaps', '/browse_directories') if request.path in blocked_paths: abort(403) # Block DJ-only files — prevents serving the DJ panel even via direct URL dj_only_files = ('/index.html', '/script.js', '/style.css') if request.path in dj_only_files: abort(403) listener_socketio = SocketIO( listener_app, cors_allowed_origins=CONFIG_CORS, async_mode='eventlet', max_http_buffer_size=CONFIG_MAX_UPLOAD_MB * 1024 * 1024, ping_timeout=60, ping_interval=25, logger=CONFIG_DEBUG, engineio_logger=CONFIG_DEBUG ) @listener_socketio.on('connect') def listener_connect(): print(f"LISTENER: Listener Socket Connected: {request.sid}") @listener_socketio.on('disconnect') def listener_disconnect(): listener_sids.discard(request.sid) count = len(listener_sids) print(f"REMOVED: Listener left. Total: {count}") # Notify BOTH namespaces listener_socketio.emit('listener_count', {'count': count}, namespace='/') dj_socketio.emit('listener_count', {'count': count}, namespace='/') @listener_socketio.on('join_listener') def listener_join(): if request.sid not in listener_sids: listener_sids.add(request.sid) count = len(listener_sids) print(f"LISTENER: New listener joined. Total: {count}") listener_socketio.emit('listener_count', {'count': count}, namespace='/') dj_socketio.emit('listener_count', {'count': count}, namespace='/') emit('stream_status', {'active': broadcast_state['active']}) @listener_socketio.on('get_listener_count') def listener_get_count(): emit('listener_count', {'count': len(listener_sids)}) # DJ Panel Routes (No engine commands needed in local mode) def _transcoder_watchdog(): """Periodic check to ensure the transcoder stays alive during active broadcasts.""" while True: if broadcast_state.get('active'): if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None: # Only log if it's actually dead and supposed to be alive print("WARNING: Watchdog: Transcoder dead during active broadcast, reviving...") _start_transcoder_if_needed(is_mp3_input=broadcast_state.get('is_mp3_input', False)) eventlet.sleep(5) def _listener_count_sync_loop(): """Periodic background sync to ensure listener count is always accurate.""" while True: count = len(listener_sids) listener_socketio.emit('listener_count', {'count': count}, namespace='/') dj_socketio.emit('listener_count', {'count': count}, namespace='/') eventlet.sleep(5) if __name__ == '__main__': print("=" * 50) print("TECHDJ PRO - DUAL PORT ARCHITECTURE") print("=" * 50) print(f"HOST: {CONFIG_HOST}") print(f"URL: DJ PANEL -> http://{CONFIG_HOST}:{CONFIG_DJ_PORT}") print(f"URL: LISTENER -> http://{CONFIG_HOST}:{CONFIG_LISTENER_PORT}") if DJ_AUTH_ENABLED: print("AUTH: DJ panel password ENABLED") else: print("AUTH: DJ panel password DISABLED") print(f"DEBUG: {CONFIG_DEBUG}") print("=" * 50) print(f"READY: Server ready on {CONFIG_HOST}:{CONFIG_DJ_PORT} & {CONFIG_HOST}:{CONFIG_LISTENER_PORT}") # Run both servers using eventlet's spawn eventlet.spawn(_listener_count_sync_loop) eventlet.spawn(_transcoder_watchdog) eventlet.spawn(dj_socketio.run, dj_app, host=CONFIG_HOST, port=CONFIG_DJ_PORT, debug=False) listener_socketio.run(listener_app, host=CONFIG_HOST, port=CONFIG_LISTENER_PORT, debug=False)