forked from computertech/techdj
Add MP3 fallback stream when Opus unsupported
This commit is contained in:
164
server.py
164
server.py
@@ -3,7 +3,10 @@ import eventlet
|
||||
eventlet.monkey_patch()
|
||||
|
||||
import os
|
||||
from flask import Flask, send_from_directory, jsonify, request, session
|
||||
import subprocess
|
||||
import threading
|
||||
import queue
|
||||
from flask import Flask, send_from_directory, jsonify, request, session, Response, stream_with_context
|
||||
from flask_socketio import SocketIO, emit
|
||||
from dotenv import load_dotenv
|
||||
# Load environment variables from .env file
|
||||
@@ -12,10 +15,118 @@ import downloader
|
||||
|
||||
# Relay State
|
||||
broadcast_state = {
|
||||
'active': False
|
||||
'active': False,
|
||||
'mimeType': None,
|
||||
}
|
||||
listener_sids = set()
|
||||
dj_sids = set()
|
||||
|
||||
# === Optional MP3 fallback stream (server-side transcoding) ===
|
||||
# This allows listeners on browsers that don't support WebM/Opus via MediaSource
|
||||
# (notably some Safari / locked-down environments) to still hear the stream.
|
||||
_ffmpeg_proc = None
|
||||
_ffmpeg_in_q = queue.Queue(maxsize=200)
|
||||
_mp3_clients = set() # set[queue.Queue]
|
||||
_mp3_lock = threading.Lock()
|
||||
_transcode_threads_started = False
|
||||
|
||||
|
||||
def _start_transcoder_if_needed():
|
||||
global _ffmpeg_proc, _transcode_threads_started
|
||||
|
||||
if _ffmpeg_proc is not None and _ffmpeg_proc.poll() is None:
|
||||
return
|
||||
|
||||
cmd = [
|
||||
'ffmpeg',
|
||||
'-hide_banner',
|
||||
'-loglevel', 'error',
|
||||
'-i', 'pipe:0',
|
||||
'-vn',
|
||||
'-acodec', 'libmp3lame',
|
||||
'-b:a', '192k',
|
||||
'-f', 'mp3',
|
||||
'pipe:1',
|
||||
]
|
||||
|
||||
try:
|
||||
_ffmpeg_proc = subprocess.Popen(
|
||||
cmd,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
bufsize=0,
|
||||
)
|
||||
except FileNotFoundError:
|
||||
_ffmpeg_proc = None
|
||||
print('⚠️ ffmpeg not found; /stream.mp3 fallback disabled')
|
||||
return
|
||||
|
||||
def _writer():
|
||||
while True:
|
||||
chunk = _ffmpeg_in_q.get()
|
||||
if chunk is None:
|
||||
break
|
||||
proc = _ffmpeg_proc
|
||||
if proc is None or proc.stdin is None:
|
||||
continue
|
||||
try:
|
||||
proc.stdin.write(chunk)
|
||||
except Exception:
|
||||
# If ffmpeg dies or pipe breaks, just stop writing.
|
||||
break
|
||||
|
||||
def _reader():
|
||||
proc = _ffmpeg_proc
|
||||
if proc is None or proc.stdout is None:
|
||||
return
|
||||
while True:
|
||||
try:
|
||||
data = proc.stdout.read(4096)
|
||||
except Exception:
|
||||
break
|
||||
if not data:
|
||||
break
|
||||
with _mp3_lock:
|
||||
clients = list(_mp3_clients)
|
||||
for q in clients:
|
||||
try:
|
||||
q.put_nowait(data)
|
||||
except Exception:
|
||||
# Drop if client queue is full or gone.
|
||||
pass
|
||||
|
||||
if not _transcode_threads_started:
|
||||
threading.Thread(target=_writer, daemon=True).start()
|
||||
threading.Thread(target=_reader, daemon=True).start()
|
||||
_transcode_threads_started = True
|
||||
|
||||
|
||||
def _stop_transcoder():
|
||||
global _ffmpeg_proc
|
||||
try:
|
||||
_ffmpeg_in_q.put_nowait(None)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
proc = _ffmpeg_proc
|
||||
_ffmpeg_proc = None
|
||||
if proc is None:
|
||||
return
|
||||
try:
|
||||
proc.terminate()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _feed_transcoder(data: bytes):
|
||||
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
|
||||
return
|
||||
try:
|
||||
_ffmpeg_in_q.put_nowait(data)
|
||||
except Exception:
|
||||
# Queue full; drop to keep latency bounded.
|
||||
pass
|
||||
MUSIC_FOLDER = "music"
|
||||
# Ensure music folder exists
|
||||
if not os.path.exists(MUSIC_FOLDER):
|
||||
@@ -138,6 +249,37 @@ def setup_shared_routes(app):
|
||||
print(f"❌ Upload error: {e}")
|
||||
return jsonify({"success": False, "error": str(e)}), 500
|
||||
|
||||
@app.route('/stream.mp3')
|
||||
def stream_mp3():
|
||||
# Streaming response from the ffmpeg transcoder output.
|
||||
# If ffmpeg isn't available, return 503.
|
||||
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
|
||||
return jsonify({"success": False, "error": "MP3 stream not available"}), 503
|
||||
|
||||
client_q: queue.Queue = queue.Queue(maxsize=200)
|
||||
with _mp3_lock:
|
||||
_mp3_clients.add(client_q)
|
||||
|
||||
def gen():
|
||||
try:
|
||||
while True:
|
||||
chunk = client_q.get()
|
||||
if chunk is None:
|
||||
break
|
||||
yield chunk
|
||||
finally:
|
||||
with _mp3_lock:
|
||||
_mp3_clients.discard(client_q)
|
||||
|
||||
return Response(
|
||||
stream_with_context(gen()),
|
||||
mimetype='audio/mpeg',
|
||||
headers={
|
||||
'Cache-Control': 'no-store, no-cache, must-revalidate, max-age=0',
|
||||
'Connection': 'keep-alive',
|
||||
},
|
||||
)
|
||||
|
||||
# === DJ SERVER (Port 5000) ===
|
||||
dj_app = Flask(__name__, static_folder='.', static_url_path='')
|
||||
dj_app.config['SECRET_KEY'] = 'dj_panel_secret'
|
||||
@@ -168,19 +310,31 @@ def stop_broadcast_after_timeout():
|
||||
pass
|
||||
|
||||
@dj_socketio.on('start_broadcast')
|
||||
def dj_start():
|
||||
def dj_start(data=None):
|
||||
mime_type = None
|
||||
if isinstance(data, dict):
|
||||
mime_type = data.get('mimeType') or None
|
||||
|
||||
broadcast_state['active'] = True
|
||||
broadcast_state['mimeType'] = mime_type
|
||||
session['is_dj'] = True
|
||||
print("🎙️ Broadcast -> ACTIVE")
|
||||
|
||||
_start_transcoder_if_needed()
|
||||
|
||||
listener_socketio.emit('broadcast_started', namespace='/')
|
||||
listener_socketio.emit('stream_status', {'active': True}, namespace='/')
|
||||
if mime_type:
|
||||
listener_socketio.emit('stream_mime', {'mimeType': mime_type}, namespace='/')
|
||||
|
||||
@dj_socketio.on('stop_broadcast')
|
||||
def dj_stop():
|
||||
broadcast_state['active'] = False
|
||||
broadcast_state['mimeType'] = None
|
||||
session['is_dj'] = False
|
||||
print("🛑 DJ stopped broadcasting")
|
||||
|
||||
_stop_transcoder()
|
||||
|
||||
listener_socketio.emit('broadcast_stopped', namespace='/')
|
||||
listener_socketio.emit('stream_status', {'active': False}, namespace='/')
|
||||
@@ -190,6 +344,8 @@ def dj_audio(data):
|
||||
# Relay audio chunk to all listeners immediately
|
||||
if broadcast_state['active']:
|
||||
listener_socketio.emit('audio_data', data, namespace='/')
|
||||
if isinstance(data, (bytes, bytearray)):
|
||||
_feed_transcoder(bytes(data))
|
||||
|
||||
# === LISTENER SERVER (Port 5001) ===
|
||||
listener_app = Flask(__name__, static_folder='.', static_url_path='')
|
||||
@@ -229,6 +385,8 @@ def listener_join():
|
||||
dj_socketio.emit('listener_count', {'count': count}, namespace='/')
|
||||
|
||||
emit('stream_status', {'active': broadcast_state['active']})
|
||||
if broadcast_state.get('mimeType'):
|
||||
emit('stream_mime', {'mimeType': broadcast_state['mimeType']})
|
||||
|
||||
@listener_socketio.on('get_listener_count')
|
||||
def listener_get_count():
|
||||
|
||||
Reference in New Issue
Block a user