forked from ComputerTech/aprhodite
125 lines
4.6 KiB
Python
125 lines
4.6 KiB
Python
"""
|
||
config.py – Shared configuration and utilities for SexyChat.
|
||
|
||
Centralises constants, config loading, AES-GCM helpers, and JWT helpers
|
||
so that app.py and routes.py share a single source of truth.
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import uuid
|
||
import base64
|
||
from datetime import datetime, timezone, timedelta
|
||
|
||
import jwt as pyjwt
|
||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Configuration Loader
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def load_config():
|
||
conf = {}
|
||
config_path = os.path.join(os.path.dirname(__file__), "config.json")
|
||
if os.path.exists(config_path):
|
||
try:
|
||
with open(config_path, "r") as f:
|
||
conf = json.load(f)
|
||
except Exception as e:
|
||
print(f"⚠️ Warning: Failed to load config.json: {e}")
|
||
return conf
|
||
|
||
_CONFIG = load_config()
|
||
|
||
def get_conf(key, default=None):
|
||
"""Resolve a config value: Env Var → config.json → default."""
|
||
return os.environ.get(key, _CONFIG.get(key, default))
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Shared Constants
|
||
# ---------------------------------------------------------------------------
|
||
|
||
SECRET_KEY = get_conf("SECRET_KEY", uuid.uuid4().hex)
|
||
JWT_SECRET = get_conf("JWT_SECRET", uuid.uuid4().hex)
|
||
ADMIN_PASSWORD = get_conf("ADMIN_PASSWORD", None) # Must be set in production
|
||
DATABASE_URL = get_conf("DATABASE_URL", "sqlite:///sexchat.db")
|
||
PAYMENT_SECRET = get_conf("PAYMENT_SECRET", "change-me-payment-secret")
|
||
CORS_ORIGINS = get_conf("CORS_ORIGINS", None)
|
||
|
||
MAX_MSG_LEN = 500
|
||
LOBBY = "lobby"
|
||
AI_FREE_LIMIT = int(get_conf("AI_FREE_LIMIT", 3))
|
||
AI_BOT_NAME = "Violet"
|
||
JWT_EXPIRY_DAYS = 1 # 24-hour expiry for security
|
||
JWT_EXPIRY_SECS = 60 # 60-second refresh token expiry
|
||
MAX_HISTORY = 500
|
||
CSRF_TOKEN_LEN = 32 # CSRF token length in bytes
|
||
|
||
# Ollama
|
||
OLLAMA_URL = get_conf("OLLAMA_URL", "http://localhost:11434")
|
||
VIOLET_MODEL = get_conf("VIOLET_MODEL", "sadiq-bd/llama3.2-3b-uncensored:latest")
|
||
VIOLET_SYSTEM = (
|
||
"You are Violet, a flirtatious and sophisticated nightclub hostess at "
|
||
"an exclusive, dimly-lit members-only club. You are charming, witty, "
|
||
"and seductive — never crude or offensive. You speak with elegance, "
|
||
"mystery, and a hint of playful danger. Keep every reply to 1–3 "
|
||
"sentences maximum. You are in a private conversation with a special "
|
||
"guest who has caught your eye."
|
||
)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# AES-GCM Helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def aesgcm_encrypt(key_b64: str, plaintext: str) -> tuple:
|
||
"""Encrypt plaintext with AES-GCM. Returns (ciphertext_b64, nonce_b64)."""
|
||
key = base64.b64decode(key_b64)
|
||
nonce = os.urandom(12)
|
||
ct = AESGCM(key).encrypt(nonce, plaintext.encode("utf-8"), None)
|
||
return base64.b64encode(ct).decode(), base64.b64encode(nonce).decode()
|
||
|
||
|
||
def aesgcm_decrypt(key_b64: str, ciphertext_b64: str, nonce_b64: str) -> str:
|
||
"""Decrypt AES-GCM ciphertext. Raises on authentication failure."""
|
||
key = base64.b64decode(key_b64)
|
||
ct = base64.b64decode(ciphertext_b64)
|
||
nonce = base64.b64decode(nonce_b64)
|
||
return AESGCM(key).decrypt(nonce, ct, None).decode("utf-8")
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# JWT Helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def issue_jwt(user_id: int, username: str) -> str:
|
||
"""Issue a signed JWT with user_id and username claims."""
|
||
payload = {
|
||
"user_id": user_id,
|
||
"username": username,
|
||
"exp": datetime.now(timezone.utc) + timedelta(days=JWT_EXPIRY_DAYS),
|
||
}
|
||
return pyjwt.encode(payload, JWT_SECRET, algorithm="HS256")
|
||
|
||
|
||
def verify_jwt(token: str):
|
||
"""Decode and verify a JWT. Returns payload dict or None."""
|
||
try:
|
||
return pyjwt.decode(token, JWT_SECRET, algorithms=["HS256"])
|
||
except pyjwt.PyJWTError:
|
||
return None
|
||
|
||
|
||
def generate_csrf_token() -> str:
|
||
"""Generate a CSRF token for REST API requests."""
|
||
import secrets
|
||
return secrets.token_urlsafe(CSRF_TOKEN_LEN)
|
||
|
||
|
||
def sanitize_user_input(text: str, max_len: int = MAX_MSG_LEN) -> str:
|
||
"""Sanitize user input to prevent prompt injection and buffer overflow."""
|
||
if not isinstance(text, str):
|
||
return ""
|
||
# Remove null bytes and other control characters
|
||
sanitized = "".join(c for c in text if ord(c) >= 32 or c in "\n\r\t")
|
||
# Truncate to max length
|
||
return sanitized[:max_len].strip()
|