bastebin/production.py

306 lines
9.7 KiB
Python

#!/usr/bin/env python3
"""
production.py — Manage the Bastebin Gunicorn process.
Usage:
python production.py start [--host HOST] [--port PORT] [--workers N]
python production.py stop
python production.py restart [--host HOST] [--port PORT] [--workers N]
python production.py status
"""
import argparse
import json
import multiprocessing
import os
import signal
import subprocess
import sys
import time
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
PID_FILE = os.path.join(BASE_DIR, 'gunicorn.pid')
LOG_FILE = os.path.join(BASE_DIR, 'gunicorn.log')
CONF_FILE = os.path.join(BASE_DIR, 'gunicorn.conf.py')
VENV_BIN = os.path.join(BASE_DIR, '.venv', 'bin')
def _gunicorn_bin() -> str:
candidate = os.path.join(VENV_BIN, 'gunicorn')
if os.path.isfile(candidate):
return candidate
import shutil
found = shutil.which('gunicorn')
if not found:
sys.exit('gunicorn not found — run python setup.py first.')
return found
def _read_pid() -> int | None:
try:
with open(PID_FILE) as f:
return int(f.read().strip())
except (FileNotFoundError, ValueError):
return None
def _process_alive(pid: int) -> bool:
try:
os.kill(pid, 0)
return True
except (ProcessLookupError, PermissionError):
return False
def _find_all_pids() -> list[int]:
"""Search 'ps' for all Gunicorn/Bastebin processes associated with this directory."""
pids = []
try:
# Use 'ps' to find processes related to this Gunicorn instance
out = subprocess.check_output(['ps', 'wax', '-o', 'pid,command'], text=True)
for line in out.splitlines():
# Match gunicorn running from this directory or with this config
if 'gunicorn' in line and (BASE_DIR in line or CONF_FILE in line):
if 'production.py' in line: continue
parts = line.strip().split()
if parts:
try:
pids.append(int(parts[0]))
except ValueError:
continue
except (subprocess.SubprocessError, FileNotFoundError):
pass
return sorted(list(set(pids)))
def _is_port_in_use(port: int) -> bool:
"""Check if the given port is already occupied."""
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(0.5)
return s.connect_ex(('127.0.0.1', port)) == 0
def cmd_status() -> None:
pid_file_val = _read_pid()
all_pids = _find_all_pids()
if pid_file_val and _process_alive(pid_file_val):
print(f'Running (Master PID {pid_file_val})')
if len(all_pids) > 1:
print(f'Workers ({len(all_pids) - 1} processes)')
elif all_pids:
print(f'Warning: Found {len(all_pids)} orphaned processes not tracked in {PID_FILE}:')
print(f' PIDs: {", ".join(map(str, all_pids))}')
print(' Action: Use "python production.py kill" to clean these up.')
else:
if pid_file_val:
try: os.remove(PID_FILE)
except OSError: pass
print('Stopped')
def cmd_start(host: str, port: int, workers: int) -> None:
pid = _read_pid()
all_pids = _find_all_pids()
if (pid and _process_alive(pid)) or all_pids:
print(f'Already running (Master PID {pid or all_pids[0]}). Use "restart" to reload.')
return
if _is_port_in_use(port):
sys.exit(f'Error: Port {port} is already in use by another program (possibly an old zombie instance).')
gunicorn = _gunicorn_bin()
log = open(LOG_FILE, 'a')
try:
proc = subprocess.Popen(
[
gunicorn,
'--config', CONF_FILE,
'--bind', f'{host}:{port}',
'--workers', str(workers),
'--pid', PID_FILE,
'wsgi:app',
],
stdout=log,
stderr=log,
cwd=BASE_DIR,
start_new_session=True,
)
except Exception:
log.close()
raise
# Wait briefly to confirm the process stayed alive.
time.sleep(1.5)
log.close() # Gunicorn has inherited the fd; safe to close our end.
if proc.poll() is not None:
sys.exit(f'Gunicorn exited immediately. Check {LOG_FILE} for details.')
# Gunicorn writes its own PID file; confirm it appeared.
pid = _read_pid()
print(f'Started (PID {pid or proc.pid}) → http://{host}:{port}')
print(f'Logs: {LOG_FILE}')
def cmd_stop(graceful: bool = True, force: bool = False) -> bool:
pid = _read_pid()
all_pids = _find_all_pids()
if not pid or not _process_alive(pid):
if all_pids:
if force:
print(f"Cleaning up {len(all_pids)} orphaned processes...")
return cmd_kill()
print(f'Error: PID file is missing, but {len(all_pids)} orphaned processes were found.')
print('Use "python production.py kill" to force stop.')
else:
print('Not running.')
if pid:
try: os.remove(PID_FILE)
except OSError: pass
return False
# Standard shutdown
sig = signal.SIGTERM if graceful else signal.SIGKILL
os.kill(pid, sig)
# Wait up to 10 s for a clean shutdown.
print(f'Stopping PID {pid}...', end='', flush=True)
for _ in range(20):
time.sleep(0.5)
print('.', end='', flush=True)
if not _process_alive(pid):
break
print(' Done.')
if _process_alive(pid):
# Force-kill if still alive after graceful period.
os.kill(pid, signal.SIGKILL)
time.sleep(0.5)
if os.path.exists(PID_FILE):
try: os.remove(PID_FILE)
except OSError: pass
# Double check for ANY remaining processes in this directory
remaining = _find_all_pids()
if remaining and force:
for p in remaining:
try: os.kill(p, signal.SIGKILL)
except OSError: pass
return True
def cmd_kill() -> bool:
"""Nuke all Bastebin-related processes."""
pids = _find_all_pids()
if not pids:
print("Nothing to kill.")
if os.path.exists(PID_FILE):
os.remove(PID_FILE)
return False
print(f"Killing {len(pids)} processes...")
for pid in pids:
try:
print(f" - {pid}")
os.kill(pid, signal.SIGKILL)
except OSError:
pass
if os.path.exists(PID_FILE):
os.remove(PID_FILE)
print("Clean slate achieved.")
return True
def cmd_restart(host: str, port: int, workers: int) -> None:
pid = _read_pid()
if pid and _process_alive(pid):
# Send SIGHUP — Gunicorn performs a graceful reload without downtime.
os.kill(pid, signal.SIGHUP)
print(f'Reloaded (PID {pid})')
else:
print('Not running — starting fresh.')
cmd_start(host, port, workers)
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog='production.py',
description='Manage the Bastebin Gunicorn process.',
)
sub = parser.add_subparsers(dest='command', metavar='COMMAND')
sub.required = True
def _add_server_args(p: argparse.ArgumentParser) -> None:
p.add_argument('--host', default=None, metavar='HOST',
help='Bind address (default: 0.0.0.0)')
p.add_argument('--port', default=None, type=int, metavar='PORT',
help='Bind port (default: 5000)')
p.add_argument('--workers', default=None, type=int, metavar='N',
help='Worker processes (default: cpu_count + 1, min 2)')
p_start = sub.add_parser('start', help='Start Gunicorn in the background')
p_restart = sub.add_parser('restart', help='Gracefully reload (SIGHUP) or start if stopped')
p_stop = sub.add_parser('stop', help='Stop Gunicorn gracefully')
sub.add_parser('kill', help='Forcefully kill all Bastebin processes')
sub.add_parser('status', help='Show whether Gunicorn is running')
p_stop.add_argument('--force', action='store_true', help='Kill all orphaned processes')
_add_server_args(p_start)
_add_server_args(p_restart)
return parser
def main() -> None:
# 1. Load config for defaults
config_path = os.path.join(BASE_DIR, 'config.json')
config = {}
if os.path.exists(config_path):
try:
with open(config_path, 'r') as f:
config = json.load(f)
except json.JSONDecodeError as e:
sys.exit(f"Error: Failed to parse '{config_path}': {e}\nPlease check for syntax errors (like trailing commas).")
except IOError as e:
print(f"Warning: Could not read '{config_path}': {e}")
server_cfg = config.get('server', {})
default_host = server_cfg.get('host', '0.0.0.0')
default_port = server_cfg.get('port', 5000)
# 2. Parse arguments
parser = _build_parser()
args = parser.parse_args()
# 3. Resolve merge (CLI > config > absolute defaults)
default_workers = max(2, multiprocessing.cpu_count() + 1)
# Check if user provided CLI arguments.
# args.host and args.port always have values because of 'default' in ArgumentParser.
# We'll re-parse or check the sys.argv.
host = getattr(args, 'host', None) or default_host or '0.0.0.0'
port = getattr(args, 'port', None) or default_port or 5000
workers = getattr(args, 'workers', None) or default_workers
if args.command == 'start':
cmd_start(host, port, workers)
elif args.command == 'stop':
cmd_stop(force=args.force)
elif args.command == 'kill':
cmd_kill()
elif args.command == 'restart':
cmd_restart(host, port, workers)
elif args.command == 'status':
cmd_status()
if __name__ == '__main__':
main()