Migrate eventlet→gevent: fix WebSocket upgrade failure and EBADF errors
- eventlet worker deprecated in gunicorn 25+; WebSocket hijack broken, causing all socket.io connections to stay on long-polling forever and produce 'Bad file descriptor' errors on every poll cycle. - Replace eventlet monkey_patch / spawn / sleep with gevent equivalents - async_mode='gevent' in both SocketIO instances - worker_class='gevent' in gunicorn.conf.py - Add gevent-websocket to requirements (needed for WS upgrade in gunicorn)
This commit is contained in:
parent
69c078071d
commit
f4ca6a5ad1
|
|
@ -21,7 +21,7 @@ def _load_cfg():
|
||||||
_cfg = _load_cfg()
|
_cfg = _load_cfg()
|
||||||
|
|
||||||
# ── Gunicorn settings ─────────────────────────────────────────────────────────
|
# ── Gunicorn settings ─────────────────────────────────────────────────────────
|
||||||
worker_class = 'eventlet'
|
worker_class = 'gevent'
|
||||||
workers = 1 # Must be 1 — eventlet handles concurrency via greenlets
|
workers = 1 # Must be 1 — eventlet handles concurrency via greenlets
|
||||||
# and both apps share in-process state.
|
# and both apps share in-process state.
|
||||||
bind = f"{_cfg.get('host', '0.0.0.0')}:{_cfg.get('dj_port', 5000)}"
|
bind = f"{_cfg.get('host', '0.0.0.0')}:{_cfg.get('dj_port', 5000)}"
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,8 @@
|
||||||
# TechDJ Requirements - Web Server
|
# TechDJ Requirements - Web Server
|
||||||
flask
|
flask
|
||||||
flask-socketio
|
flask-socketio
|
||||||
eventlet
|
gevent
|
||||||
|
gevent-websocket
|
||||||
gunicorn
|
gunicorn
|
||||||
python-dotenv
|
python-dotenv
|
||||||
|
|
||||||
|
|
|
||||||
40
server.py
40
server.py
|
|
@ -1,6 +1,7 @@
|
||||||
# Monkey patch MUST be first - before any other imports!
|
# Monkey patch MUST be first - before any other imports!
|
||||||
import eventlet
|
from gevent import monkey
|
||||||
eventlet.monkey_patch()
|
monkey.patch_all()
|
||||||
|
import gevent
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
|
|
@ -191,8 +192,8 @@ def _start_transcoder_if_needed(is_mp3_input=False):
|
||||||
except: break
|
except: break
|
||||||
|
|
||||||
# Define greenlets INSIDE so they close over THIS specific 'proc'.
|
# Define greenlets INSIDE so they close over THIS specific 'proc'.
|
||||||
# Blocking subprocess pipe I/O is delegated to eventlet.tpool so it runs
|
# Blocking subprocess pipe I/O runs in a real OS thread via gevent's
|
||||||
# in a real OS thread, preventing it from stalling the eventlet hub.
|
# threadpool, preventing it from stalling the gevent hub.
|
||||||
|
|
||||||
def _stderr_drain(proc):
|
def _stderr_drain(proc):
|
||||||
"""Drain ffmpeg's stderr pipe so it never fills the OS buffer (64 KB on
|
"""Drain ffmpeg's stderr pipe so it never fills the OS buffer (64 KB on
|
||||||
|
|
@ -205,8 +206,7 @@ def _start_transcoder_if_needed(is_mp3_input=False):
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
eventlet.spawn(_stderr_drain, _ffmpeg_proc)
|
gevent.spawn(_stderr_drain, _ffmpeg_proc)
|
||||||
|
|
||||||
def _writer(proc):
|
def _writer(proc):
|
||||||
global _transcoder_last_error
|
global _transcoder_last_error
|
||||||
print(f"[THREAD] Transcoder writer started (PID: {proc.pid})")
|
print(f"[THREAD] Transcoder writer started (PID: {proc.pid})")
|
||||||
|
|
@ -265,8 +265,8 @@ def _start_transcoder_if_needed(is_mp3_input=False):
|
||||||
print(f"[THREAD] Transcoder reader finished (PID: {proc.pid})")
|
print(f"[THREAD] Transcoder reader finished (PID: {proc.pid})")
|
||||||
|
|
||||||
# Spawn as greenlets — tpool handles the blocking subprocess I/O internally
|
# Spawn as greenlets — tpool handles the blocking subprocess I/O internally
|
||||||
eventlet.spawn(_writer, _ffmpeg_proc)
|
gevent.spawn(_writer, _ffmpeg_proc)
|
||||||
eventlet.spawn(_reader, _ffmpeg_proc)
|
gevent.spawn(_reader, _ffmpeg_proc)
|
||||||
|
|
||||||
|
|
||||||
def _stop_transcoder():
|
def _stop_transcoder():
|
||||||
|
|
@ -381,7 +381,7 @@ def _start_srt_transcoder():
|
||||||
except: pass
|
except: pass
|
||||||
print(f'[THREAD] SRT reader finished (PID: {proc.pid})')
|
print(f'[THREAD] SRT reader finished (PID: {proc.pid})')
|
||||||
|
|
||||||
eventlet.spawn(_srt_reader, _srt_ffmpeg_proc)
|
gevent.spawn(_srt_reader, _srt_ffmpeg_proc)
|
||||||
|
|
||||||
|
|
||||||
def _stop_srt_transcoder():
|
def _stop_srt_transcoder():
|
||||||
|
|
@ -651,7 +651,7 @@ def setup_shared_routes(app, index_file='index.html'):
|
||||||
|
|
||||||
waited = 0.0
|
waited = 0.0
|
||||||
while not _active_transcoder() and waited < 5.0:
|
while not _active_transcoder() and waited < 5.0:
|
||||||
eventlet.sleep(0.5)
|
gevent.sleep(0.5)
|
||||||
waited += 0.5
|
waited += 0.5
|
||||||
|
|
||||||
if not _active_transcoder():
|
if not _active_transcoder():
|
||||||
|
|
@ -833,7 +833,7 @@ def client_config():
|
||||||
dj_socketio = SocketIO(
|
dj_socketio = SocketIO(
|
||||||
dj_app,
|
dj_app,
|
||||||
cors_allowed_origins=CONFIG_CORS,
|
cors_allowed_origins=CONFIG_CORS,
|
||||||
async_mode='eventlet',
|
async_mode='gevent',
|
||||||
max_http_buffer_size=CONFIG_MAX_UPLOAD_MB * 1024 * 1024,
|
max_http_buffer_size=CONFIG_MAX_UPLOAD_MB * 1024 * 1024,
|
||||||
ping_timeout=60,
|
ping_timeout=60,
|
||||||
ping_interval=25,
|
ping_interval=25,
|
||||||
|
|
@ -853,7 +853,7 @@ def _dj_disconnect_grace():
|
||||||
"""Auto-stop broadcast if no DJ reconnects within the grace period."""
|
"""Auto-stop broadcast if no DJ reconnects within the grace period."""
|
||||||
global _dj_grace_greenlet, _mp3_broadcast_announced
|
global _dj_grace_greenlet, _mp3_broadcast_announced
|
||||||
print(f"INFO: Grace period started — {DJ_GRACE_PERIOD_SECS}s for DJ to reconnect")
|
print(f"INFO: Grace period started — {DJ_GRACE_PERIOD_SECS}s for DJ to reconnect")
|
||||||
eventlet.sleep(DJ_GRACE_PERIOD_SECS)
|
gevent.sleep(DJ_GRACE_PERIOD_SECS)
|
||||||
if broadcast_state.get('active') and not dj_sids:
|
if broadcast_state.get('active') and not dj_sids:
|
||||||
print("WARNING: Grace period expired, no DJ reconnected — auto-stopping broadcast")
|
print("WARNING: Grace period expired, no DJ reconnected — auto-stopping broadcast")
|
||||||
broadcast_state['active'] = False
|
broadcast_state['active'] = False
|
||||||
|
|
@ -874,7 +874,7 @@ def dj_disconnect():
|
||||||
# If broadcast is active and no other DJs remain, start the grace period
|
# If broadcast is active and no other DJs remain, start the grace period
|
||||||
if broadcast_state.get('active') and not dj_sids:
|
if broadcast_state.get('active') and not dj_sids:
|
||||||
if _dj_grace_greenlet is None:
|
if _dj_grace_greenlet is None:
|
||||||
_dj_grace_greenlet = eventlet.spawn(_dj_disconnect_grace)
|
_dj_grace_greenlet = gevent.spawn(_dj_disconnect_grace)
|
||||||
elif not broadcast_state.get('active'):
|
elif not broadcast_state.get('active'):
|
||||||
# Nothing to do — no active broadcast
|
# Nothing to do — no active broadcast
|
||||||
pass
|
pass
|
||||||
|
|
@ -1049,7 +1049,7 @@ def _restrict_listener_routes():
|
||||||
listener_socketio = SocketIO(
|
listener_socketio = SocketIO(
|
||||||
listener_app,
|
listener_app,
|
||||||
cors_allowed_origins=CONFIG_CORS,
|
cors_allowed_origins=CONFIG_CORS,
|
||||||
async_mode='eventlet',
|
async_mode='gevent',
|
||||||
max_http_buffer_size=CONFIG_MAX_UPLOAD_MB * 1024 * 1024,
|
max_http_buffer_size=CONFIG_MAX_UPLOAD_MB * 1024 * 1024,
|
||||||
# Lower timeouts: stale connections detected in ~25s instead of ~85s
|
# Lower timeouts: stale connections detected in ~25s instead of ~85s
|
||||||
# ping_interval: how often to probe (seconds)
|
# ping_interval: how often to probe (seconds)
|
||||||
|
|
@ -1247,14 +1247,14 @@ def _transcoder_watchdog():
|
||||||
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
|
if _ffmpeg_proc is None or _ffmpeg_proc.poll() is not None:
|
||||||
print('WARNING: Watchdog: Transcoder dead during active broadcast, reviving...')
|
print('WARNING: Watchdog: Transcoder dead during active broadcast, reviving...')
|
||||||
_start_transcoder_if_needed(is_mp3_input=False)
|
_start_transcoder_if_needed(is_mp3_input=False)
|
||||||
eventlet.sleep(5)
|
gevent.sleep(5)
|
||||||
|
|
||||||
|
|
||||||
def _listener_count_sync_loop():
|
def _listener_count_sync_loop():
|
||||||
"""Periodic reconciliation — catches any edge cases where connect/disconnect
|
"""Periodic reconciliation — catches any edge cases where connect/disconnect
|
||||||
events were missed (e.g. server under load, eventlet greenlet delays)."""
|
events were missed (e.g. server under load, gevent greenlet delays)."""
|
||||||
while True:
|
while True:
|
||||||
eventlet.sleep(5)
|
gevent.sleep(5)
|
||||||
_broadcast_listener_count()
|
_broadcast_listener_count()
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -1274,9 +1274,9 @@ def _start_background_tasks():
|
||||||
return
|
return
|
||||||
_background_tasks_started = True
|
_background_tasks_started = True
|
||||||
|
|
||||||
eventlet.spawn(_listener_count_sync_loop)
|
gevent.spawn(_listener_count_sync_loop)
|
||||||
eventlet.spawn(_transcoder_watchdog)
|
gevent.spawn(_transcoder_watchdog)
|
||||||
eventlet.spawn(
|
gevent.spawn(
|
||||||
listener_socketio.run,
|
listener_socketio.run,
|
||||||
listener_app,
|
listener_app,
|
||||||
host=CONFIG_HOST,
|
host=CONFIG_HOST,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue