Fix #1+#8: Extract shared config module, unify JWT secret

- Create config.py with shared constants, AES-GCM helpers, and JWT helpers
- app.py and routes.py now import from the single source of truth
- Eliminates JWT secret mismatch (routes.py had hardcoded default)
- Removes all duplicate _issue_jwt, _verify_jwt, _aesgcm_encrypt,
  _aesgcm_decrypt definitions
- start.py also uses shared config loader
This commit is contained in:
3nd3r 2026-04-12 12:49:44 -05:00
parent 1c17a9bcf0
commit 99859f009f
4 changed files with 134 additions and 160 deletions

107
app.py
View File

@ -39,16 +39,11 @@ Socket events (server → client)
"""
import os
import json
import time
import uuid
import base64
import functools
from collections import defaultdict
from datetime import datetime, timedelta
import bcrypt
import jwt as pyjwt
import eventlet # noqa monkey-patched in start.py before any other import
from eventlet.queue import Queue as EvQueue
@ -57,52 +52,15 @@ from flask_socketio import SocketIO, emit, join_room, disconnect
from database import db, init_db
from models import User, Message, UserIgnore
# ---------------------------------------------------------------------------
# Configuration Loader
# ---------------------------------------------------------------------------
def load_config():
conf = {}
config_path = os.path.join(os.path.dirname(__file__), "config.json")
if os.path.exists(config_path):
try:
with open(config_path, "r") as f:
conf = json.load(f)
except Exception as e:
print(f"⚠️ Warning: Failed to load config.json: {e}")
return conf
_CONFIG = load_config()
def _get_conf(key, default=None):
# Order: Env Var > Config File > Default
return os.environ.get(key, _CONFIG.get(key, default))
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
SECRET_KEY = _get_conf("SECRET_KEY", uuid.uuid4().hex)
JWT_SECRET = _get_conf("JWT_SECRET", uuid.uuid4().hex)
ADMIN_PASSWORD = _get_conf("ADMIN_PASSWORD", "admin1234")
MAX_MSG_LEN = 500
LOBBY = "lobby"
AI_FREE_LIMIT = int(_get_conf("AI_FREE_LIMIT", 3))
AI_BOT_NAME = "Violet"
# Ollama
OLLAMA_URL = _get_conf("OLLAMA_URL", "http://localhost:11434")
VIOLET_MODEL = _get_conf("VIOLET_MODEL", "sam860/dolphin3-llama3.2:3b")
VIOLET_SYSTEM = (
"You are Violet, a flirtatious and sophisticated nightclub hostess at "
"an exclusive, dimly-lit members-only club. You are charming, witty, "
"and seductive — never crude or offensive. You speak with elegance, "
"mystery, and a hint of playful danger. Keep every reply to 13 "
"sentences maximum. You are in a private conversation with a special "
"guest who has caught your eye."
from config import (
SECRET_KEY, ADMIN_PASSWORD, DATABASE_URL,
MAX_MSG_LEN, LOBBY, AI_FREE_LIMIT, AI_BOT_NAME,
OLLAMA_URL, VIOLET_MODEL, VIOLET_SYSTEM,
aesgcm_encrypt, aesgcm_decrypt, issue_jwt, verify_jwt,
)
# ---------------------------------------------------------------------------
# In-process state
# ---------------------------------------------------------------------------
@ -123,26 +81,6 @@ RATE_WINDOW = 5
ai_queue: EvQueue = EvQueue()
_app_ref = None # set in create_app() for greenlet app-context access
# ---------------------------------------------------------------------------
# AES-GCM helpers (server-side, transit only)
# ---------------------------------------------------------------------------
def _aesgcm_encrypt(key_b64: str, plaintext: str) -> tuple:
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
key = base64.b64decode(key_b64)
nonce = os.urandom(12)
ct = AESGCM(key).encrypt(nonce, plaintext.encode("utf-8"), None)
return base64.b64encode(ct).decode(), base64.b64encode(nonce).decode()
def _aesgcm_decrypt(key_b64: str, ciphertext_b64: str, nonce_b64: str) -> str:
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
key = base64.b64decode(key_b64)
ct = base64.b64decode(ciphertext_b64)
nonce = base64.b64decode(nonce_b64)
return AESGCM(key).decrypt(nonce, ct, None).decode("utf-8")
# ---------------------------------------------------------------------------
# Ollama integration
# ---------------------------------------------------------------------------
@ -191,7 +129,7 @@ def _ai_worker() -> None:
# ── Decrypt user message (transit; key never stored) ──────────────────
try:
plaintext = _aesgcm_decrypt(
plaintext = aesgcm_decrypt(
task["transit_key"], task["ciphertext"], task["nonce_val"]
)
ai_text = call_ollama(plaintext)
@ -200,7 +138,7 @@ def _ai_worker() -> None:
ai_text = "Mmm, something went wrong, darling 💜"
# ── Re-encrypt AI response ────────────────────────────────────────────
resp_ct, resp_nonce = _aesgcm_encrypt(task["transit_key"], ai_text)
resp_ct, resp_nonce = aesgcm_encrypt(task["transit_key"], ai_text)
ai_messages_used = task.get("ai_messages_used", 0)
has_ai_access = task.get("has_ai_access", False)
@ -311,21 +249,6 @@ def _do_disconnect(sid: str) -> None:
pass
def _issue_jwt(user_id: int, username: str) -> str:
return pyjwt.encode(
{"user_id": user_id, "username": username,
"exp": datetime.utcnow() + timedelta(days=7)},
JWT_SECRET, algorithm="HS256",
)
def _verify_jwt(token: str):
try:
return pyjwt.decode(token, JWT_SECRET, algorithms=["HS256"])
except Exception:
return None
def _save_pm(sender_id: int, recipient_id: int,
encrypted_content: str, nonce: str) -> None:
msg = Message(
@ -355,9 +278,7 @@ def create_app() -> Flask:
app = Flask(__name__, static_folder="static", template_folder=".")
app.config.update(
SECRET_KEY=SECRET_KEY,
SQLALCHEMY_DATABASE_URI=os.environ.get(
"DATABASE_URL", "sqlite:///sexchat.db"
),
SQLALCHEMY_DATABASE_URI=DATABASE_URL,
SQLALCHEMY_TRACK_MODIFICATIONS=False,
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE="Lax",
@ -411,7 +332,7 @@ def on_connect(auth=None):
has_ai_access = False; ai_used = 0; jwt_username = None
if auth and isinstance(auth, dict) and auth.get("token"):
payload = _verify_jwt(auth["token"])
payload = verify_jwt(auth["token"])
if payload:
db_user = db.session.get(User, payload.get("user_id"))
if db_user:
@ -478,7 +399,7 @@ def on_join(data):
db.session.add(db_user); db.session.commit()
user.update(user_id=db_user.id, is_registered=True,
has_ai_access=False, ai_messages_used=0)
token = _issue_jwt(db_user.id, db_user.username)
token = issue_jwt(db_user.id, db_user.username)
elif mode == "login":
db_user = User.query.filter(
@ -493,7 +414,7 @@ def on_join(data):
user["is_registered"] = True
user["has_ai_access"] = db_user.has_ai_access
user["ai_messages_used"] = db_user.ai_messages_used
token = _issue_jwt(db_user.id, db_user.username)
token = issue_jwt(db_user.id, db_user.username)
elif mode == "restore":
if not user.get("user_id"):
@ -506,7 +427,7 @@ def on_join(data):
username = db_user.username
user["has_ai_access"] = db_user.has_ai_access
user["ai_messages_used"] = db_user.ai_messages_used
token = _issue_jwt(db_user.id, db_user.username)
token = issue_jwt(db_user.id, db_user.username)
else: # guest
if not username or not username.replace("_","").replace("-","").isalnum():

106
config.py Normal file
View File

@ -0,0 +1,106 @@
"""
config.py Shared configuration and utilities for SexyChat.
Centralises constants, config loading, AES-GCM helpers, and JWT helpers
so that app.py and routes.py share a single source of truth.
"""
import os
import json
import uuid
import base64
from datetime import datetime, timezone, timedelta
import jwt as pyjwt
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
# ---------------------------------------------------------------------------
# Configuration Loader
# ---------------------------------------------------------------------------
def load_config():
conf = {}
config_path = os.path.join(os.path.dirname(__file__), "config.json")
if os.path.exists(config_path):
try:
with open(config_path, "r") as f:
conf = json.load(f)
except Exception as e:
print(f"⚠️ Warning: Failed to load config.json: {e}")
return conf
_CONFIG = load_config()
def get_conf(key, default=None):
"""Resolve a config value: Env Var → config.json → default."""
return os.environ.get(key, _CONFIG.get(key, default))
# ---------------------------------------------------------------------------
# Shared Constants
# ---------------------------------------------------------------------------
SECRET_KEY = get_conf("SECRET_KEY", uuid.uuid4().hex)
JWT_SECRET = get_conf("JWT_SECRET", uuid.uuid4().hex)
ADMIN_PASSWORD = get_conf("ADMIN_PASSWORD", "admin1234")
DATABASE_URL = get_conf("DATABASE_URL", "sqlite:///sexchat.db")
PAYMENT_SECRET = get_conf("PAYMENT_SECRET", "change-me-payment-secret")
CORS_ORIGINS = get_conf("CORS_ORIGINS", None)
MAX_MSG_LEN = 500
LOBBY = "lobby"
AI_FREE_LIMIT = int(get_conf("AI_FREE_LIMIT", 3))
AI_BOT_NAME = "Violet"
JWT_EXPIRY_DAYS = 7
MAX_HISTORY = 500
# Ollama
OLLAMA_URL = get_conf("OLLAMA_URL", "http://localhost:11434")
VIOLET_MODEL = get_conf("VIOLET_MODEL", "sam860/dolphin3-llama3.2:3b")
VIOLET_SYSTEM = (
"You are Violet, a flirtatious and sophisticated nightclub hostess at "
"an exclusive, dimly-lit members-only club. You are charming, witty, "
"and seductive — never crude or offensive. You speak with elegance, "
"mystery, and a hint of playful danger. Keep every reply to 13 "
"sentences maximum. You are in a private conversation with a special "
"guest who has caught your eye."
)
# ---------------------------------------------------------------------------
# AES-GCM Helpers
# ---------------------------------------------------------------------------
def aesgcm_encrypt(key_b64: str, plaintext: str) -> tuple:
"""Encrypt plaintext with AES-GCM. Returns (ciphertext_b64, nonce_b64)."""
key = base64.b64decode(key_b64)
nonce = os.urandom(12)
ct = AESGCM(key).encrypt(nonce, plaintext.encode("utf-8"), None)
return base64.b64encode(ct).decode(), base64.b64encode(nonce).decode()
def aesgcm_decrypt(key_b64: str, ciphertext_b64: str, nonce_b64: str) -> str:
"""Decrypt AES-GCM ciphertext. Raises on authentication failure."""
key = base64.b64decode(key_b64)
ct = base64.b64decode(ciphertext_b64)
nonce = base64.b64decode(nonce_b64)
return AESGCM(key).decrypt(nonce, ct, None).decode("utf-8")
# ---------------------------------------------------------------------------
# JWT Helpers
# ---------------------------------------------------------------------------
def issue_jwt(user_id: int, username: str) -> str:
"""Issue a signed JWT with user_id and username claims."""
payload = {
"user_id": user_id,
"username": username,
"exp": datetime.now(timezone.utc) + timedelta(days=JWT_EXPIRY_DAYS),
}
return pyjwt.encode(payload, JWT_SECRET, algorithm="HS256")
def verify_jwt(token: str):
"""Decode and verify a JWT. Returns payload dict or None."""
try:
return pyjwt.decode(token, JWT_SECRET, algorithms=["HS256"])
except pyjwt.PyJWTError:
return None

View File

@ -11,19 +11,19 @@ POST /api/payment/success Validate webhook secret, unlock AI, push socke
"""
import os
import base64
import hmac
import random
import functools
from datetime import datetime, timedelta
import bcrypt
import jwt as pyjwt
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from flask import Blueprint, g, jsonify, request
from database import db
from models import User, Message
from config import (
AI_FREE_LIMIT, AI_BOT_NAME, PAYMENT_SECRET, MAX_HISTORY,
aesgcm_encrypt, aesgcm_decrypt, issue_jwt, verify_jwt,
)
api = Blueprint("api", __name__, url_prefix="/api")
@ -31,13 +31,6 @@ api = Blueprint("api", __name__, url_prefix="/api")
# Config
# ---------------------------------------------------------------------------
JWT_SECRET = os.environ.get("JWT_SECRET", "change-me-jwt-secret")
JWT_EXPIRY_DAYS = 7
AI_FREE_LIMIT = 3
PAYMENT_SECRET = os.environ.get("PAYMENT_SECRET", "change-me-payment-secret")
MAX_HISTORY = 500
AI_BOT_NAME = "Violet"
AI_RESPONSES = [
"Mmm, you have my full attention 💋",
"Oh my... keep going 😈 Don't stop there.",
@ -60,22 +53,6 @@ AI_RESPONSES = [
# ---------------------------------------------------------------------------
def _issue_jwt(user_id: int, username: str) -> str:
payload = {
"user_id": user_id,
"username": username,
"exp": datetime.utcnow() + timedelta(days=JWT_EXPIRY_DAYS),
}
return pyjwt.encode(payload, JWT_SECRET, algorithm="HS256")
def _verify_jwt(token: str):
try:
return pyjwt.decode(token, JWT_SECRET, algorithms=["HS256"])
except pyjwt.PyJWTError:
return None
def _require_auth(f):
"""Decorator parse Bearer JWT and populate g.current_user."""
@functools.wraps(f)
@ -83,7 +60,7 @@ def _require_auth(f):
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return jsonify({"error": "Unauthorized"}), 401
payload = _verify_jwt(auth_header[7:])
payload = verify_jwt(auth_header[7:])
if not payload:
return jsonify({"error": "Invalid or expired token"}), 401
user = db.session.get(User, payload["user_id"])
@ -94,22 +71,6 @@ def _require_auth(f):
return wrapped
def _aesgcm_encrypt(key_b64: str, plaintext: str) -> tuple:
"""Encrypt plaintext with AES-GCM. Returns (ciphertext_b64, nonce_b64)."""
key_bytes = base64.b64decode(key_b64)
nonce = os.urandom(12)
ct = AESGCM(key_bytes).encrypt(nonce, plaintext.encode("utf-8"), None)
return base64.b64encode(ct).decode(), base64.b64encode(nonce).decode()
def _aesgcm_decrypt(key_b64: str, ciphertext_b64: str, nonce_b64: str) -> str:
"""Decrypt AES-GCM ciphertext. Raises on authentication failure."""
key_bytes = base64.b64decode(key_b64)
ct = base64.b64decode(ciphertext_b64)
nonce = base64.b64decode(nonce_b64)
return AESGCM(key_bytes).decrypt(nonce, ct, None).decode("utf-8")
def _persist_message(sender_id: int, recipient_id: int,
encrypted_content: str, nonce: str) -> None:
"""Save a PM to the database. Enforces MAX_HISTORY per conversation pair."""
@ -188,7 +149,7 @@ def register():
db.session.add(user)
db.session.commit()
token = _issue_jwt(user.id, user.username)
token = issue_jwt(user.id, user.username)
return jsonify({
"token": token,
"user": {
@ -212,7 +173,7 @@ def login():
if not user or not bcrypt.checkpw(password.encode(), user.password_hash.encode()):
return jsonify({"error": "Invalid username or password."}), 401
token = _issue_jwt(user.id, user.username)
token = issue_jwt(user.id, user.username)
return jsonify({
"token": token,
"user": {
@ -296,7 +257,7 @@ def ai_message():
# ── Transit decrypt (message readable for AI; key NOT stored) ─────────────
try:
_plaintext = _aesgcm_decrypt(transit_key, ciphertext, nonce_b64)
_plaintext = aesgcm_decrypt(transit_key, ciphertext, nonce_b64)
except Exception:
return jsonify({"error": "Decryption failed wrong key or corrupted data"}), 400
@ -306,7 +267,7 @@ def ai_message():
# ── Persist both legs encrypted in DB (server uses transit key) ──────────
bot = _get_ai_bot()
_persist_message(user.id, bot.id, ciphertext, nonce_b64) # user → AI
resp_ct, resp_nonce = _aesgcm_encrypt(transit_key, ai_text)
resp_ct, resp_nonce = aesgcm_encrypt(transit_key, ai_text)
_persist_message(bot.id, user.id, resp_ct, resp_nonce) # AI → user
# ── Update free trial counter ─────────────────────────────────────────────
@ -315,7 +276,7 @@ def ai_message():
db.session.commit()
# ── Re-encrypt AI response for transit back ───────────────────────────────
resp_ct_transit, resp_nonce_transit = _aesgcm_encrypt(transit_key, ai_text)
resp_ct_transit, resp_nonce_transit = aesgcm_encrypt(transit_key, ai_text)
return jsonify({
"ciphertext": resp_ct_transit,

View File

@ -12,7 +12,6 @@ Usage:
import os
import sys
import json
import subprocess
import signal
import time
@ -21,24 +20,11 @@ import eventlet
# Monkey-patch stdlib BEFORE any other import
eventlet.monkey_patch()
from config import get_conf
# PID file to track the daemon process
PID_FILE = "sexchat.pid"
def load_config():
conf = {}
config_path = os.path.join(os.path.dirname(__file__), "config.json")
if os.path.exists(config_path):
try:
with open(config_path, "r") as f:
conf = json.load(f)
except Exception:
pass
return conf
def _get_conf(key, default=None):
conf = load_config()
return os.environ.get(key, conf.get(key, default))
def get_pid():
if os.path.exists(PID_FILE):
with open(PID_FILE, "r") as f:
@ -68,7 +54,7 @@ def start_daemon():
"gunicorn",
"--worker-class", "eventlet",
"-w", "1",
"--bind", f"{_get_conf('HOST', '0.0.0.0')}:{_get_conf('PORT', 5000)}",
"--bind", f"{get_conf('HOST', '0.0.0.0')}:{get_conf('PORT', 5000)}",
"--daemon",
"--pid", PID_FILE,
"--access-logfile", "access.log",
@ -124,7 +110,7 @@ def run_debug():
"gunicorn",
"--worker-class", "eventlet",
"-w", "1",
"--bind", f"{_get_conf('HOST', '0.0.0.0')}:{_get_conf('PORT', 5000)}",
"--bind", f"{get_conf('HOST', '0.0.0.0')}:{get_conf('PORT', 5000)}",
"--log-level", "debug",
"--access-logfile", "-",
"--error-logfile", "-",