""" routes.py – REST API blueprint for SexyChat. Endpoints --------- POST /api/auth/register – Create account, return JWT POST /api/auth/login – Verify credentials, return JWT GET /api/pm/history – Last 500 encrypted messages for a conversation POST /api/ai/message – Transit-decrypt, get AI response, re-encrypt, persist POST /api/payment/success – Validate webhook secret, unlock AI, push socket event """ import os import hmac import hashlib import random import functools import base64 import bcrypt from flask import Blueprint, g, jsonify, request from database import db from models import User, Message from config import ( AI_FREE_LIMIT, AI_BOT_NAME, PAYMENT_SECRET, MAX_HISTORY, JWT_SECRET, aesgcm_encrypt, aesgcm_decrypt, issue_jwt, verify_jwt, generate_csrf_token, sanitize_user_input, ) api = Blueprint("api", __name__, url_prefix="/api") # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- AI_RESPONSES = [ "Mmm, you have my full attention 💋", "Oh my... keep going 😈 Don't stop there.", "You're bold. I like that 🔥 Most guests aren't this brave.", "I wasn't expecting that... but I'm definitely not complaining 😏", "Well well well, aren't you something 👀 Tell me everything.", "That's deliciously naughty 🌙 You're dangerous, aren't you?", "You certainly know how to make an evening interesting 💜", "I love the way you think 😌 It's... refreshing.", "A dark club, a conversation like this... 🕯️ This is my kind of night.", "Say that again. Slower. 💋", "*sets down the wine glass* Is it warm in here, or is it just you? 🔥", "You're trouble. The most delicious kind 😈", "I don't usually admit when a guest surprises me, but here we are 🌙", "Keep talking. I could listen to this all night 💜", ] # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _require_auth(f): """Decorator – parse Bearer JWT and populate g.current_user.""" @functools.wraps(f) def wrapped(*args, **kwargs): auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): return jsonify({"error": "Unauthorized"}), 401 payload = verify_jwt(auth_header[7:]) if not payload: return jsonify({"error": "Invalid or expired token"}), 401 user = db.session.get(User, payload["user_id"]) if not user: return jsonify({"error": "User not found"}), 401 g.current_user = user return f(*args, **kwargs) return wrapped def _require_csrf(f): """Decorator – validate CSRF token from request header.""" @functools.wraps(f) def wrapped(*args, **kwargs): csrf_token = request.headers.get("X-CSRF-Token", "") session_csrf = request.headers.get("X-Session-CSRF", "") # CSRF check: token must match session token (simple HMAC validation) if not csrf_token or not session_csrf: return jsonify({"error": "Missing CSRF tokens"}), 403 # For this implementation, we just ensure token is non-empty # In production, validate against server-side session store if len(csrf_token) < 20: return jsonify({"error": "Invalid CSRF token"}), 403 return f(*args, **kwargs) return wrapped def _persist_message(sender_id: int, recipient_id: int, encrypted_content: str, nonce: str) -> None: """Save a PM to the database. Enforces MAX_HISTORY per conversation pair.""" msg = Message( sender_id=sender_id, recipient_id=recipient_id, encrypted_content=encrypted_content, nonce=nonce, ) db.session.add(msg) db.session.commit() # Lazy pruning: cap conversation history at MAX_HISTORY pair_ids = [sender_id, recipient_id] total = ( Message.query .filter( Message.sender_id.in_(pair_ids), Message.recipient_id.in_(pair_ids), ) .count() ) if total > MAX_HISTORY: excess = total - MAX_HISTORY oldest = ( Message.query .filter( Message.sender_id.in_(pair_ids), Message.recipient_id.in_(pair_ids), ) .order_by(Message.timestamp.asc()) .limit(excess) .all() ) for old_msg in oldest: db.session.delete(old_msg) db.session.commit() def _get_ai_bot() -> User: """Return the SexyAI virtual user, creating it if absent.""" bot = User.query.filter_by(username=AI_BOT_NAME).first() if not bot: bot = User( username=AI_BOT_NAME, password_hash=bcrypt.hashpw(os.urandom(32), bcrypt.gensalt()).decode(), has_ai_access=True, ) db.session.add(bot) db.session.commit() return bot # --------------------------------------------------------------------------- # Auth routes # --------------------------------------------------------------------------- @api.route("/auth/register", methods=["POST"]) def register(): data = request.get_json() or {} username = str(data.get("username", "")).strip()[:20] password = str(data.get("password", "")).strip() email = str(data.get("email", "")).strip()[:255] or None if not username or not username.replace("_", "").replace("-", "").isalnum(): return jsonify({"error": "Invalid username."}), 400 if len(password) < 6: return jsonify({"error": "Password must be at least 6 characters."}), 400 if username.lower() == AI_BOT_NAME.lower(): return jsonify({"error": "That username is reserved."}), 409 if User.query.filter(db.func.lower(User.username) == username.lower()).first(): return jsonify({"error": "Username already registered."}), 409 hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() user = User(username=username, password_hash=hashed, email=email) db.session.add(user) db.session.commit() token = issue_jwt(user.id, user.username) csrf_token = generate_csrf_token() return jsonify({ "token": token, "csrf_token": csrf_token, "user": { "id": user.id, "username": user.username, "has_ai_access": user.has_ai_access, "ai_messages_used": user.ai_messages_used, }, }), 201 @api.route("/auth/login", methods=["POST"]) def login(): data = request.get_json() or {} username = str(data.get("username", "")).strip() password = str(data.get("password", "")).strip() user = User.query.filter( db.func.lower(User.username) == username.lower() ).first() if not user or not bcrypt.checkpw(password.encode(), user.password_hash.encode()): return jsonify({"error": "Invalid username or password."}), 401 token = issue_jwt(user.id, user.username) csrf_token = generate_csrf_token() return jsonify({ "token": token, "csrf_token": csrf_token, "user": { "id": user.id, "username": user.username, "has_ai_access": user.has_ai_access, "ai_messages_used": user.ai_messages_used, }, }) # --------------------------------------------------------------------------- # PM history # --------------------------------------------------------------------------- @api.route("/pm/history", methods=["GET"]) @_require_auth def pm_history(): me = g.current_user target_name = request.args.get("with", "").strip() if not target_name: return jsonify({"error": "Missing 'with' query parameter"}), 400 other = User.query.filter( db.func.lower(User.username) == target_name.lower() ).first() if not other: return jsonify({"messages": []}) # Derive the room key so the client can decrypt history pair = ":".join(sorted([me.username.lower(), other.username.lower()])) room_name = "pm:" + pair room_key = base64.b64encode( hmac.new(JWT_SECRET.encode(), room_name.encode(), hashlib.sha256).digest() ).decode() rows = ( Message.query .filter( db.or_( db.and_(Message.sender_id == me.id, Message.recipient_id == other.id), db.and_(Message.sender_id == other.id, Message.recipient_id == me.id), ) ) .order_by(Message.timestamp.desc()) .limit(MAX_HISTORY) .all() ) rows.reverse() # return in chronological order return jsonify({ "room_key": room_key, "messages": [ { "from_me": m.sender_id == me.id, "ciphertext": m.encrypted_content, "nonce": m.nonce, "ts": m.timestamp.strftime("%H:%M"), } for m in rows ] }) # --------------------------------------------------------------------------- # AI message (transit encryption – key never persisted) # --------------------------------------------------------------------------- @api.route("/ai/message", methods=["POST"]) @_require_auth @_require_csrf def ai_message(): user = g.current_user data = request.get_json() or {} # ── Gate ────────────────────────────────────────────────────────────────── if not user.has_ai_access and user.ai_messages_used >= AI_FREE_LIMIT: return jsonify({ "error": "ai_limit_reached", "limit": AI_FREE_LIMIT, "ai_messages_used": user.ai_messages_used, }), 402 ciphertext = data.get("ciphertext", "") nonce_b64 = data.get("nonce", "") transit_key = data.get("transit_key", "") if not all([ciphertext, nonce_b64, transit_key]): return jsonify({"error": "Missing encryption fields"}), 400 # ── Transit decrypt (message readable for AI; key NOT stored) ───────────── try: plaintext = aesgcm_decrypt(transit_key, ciphertext, nonce_b64) # Sanitize before using in AI prompt plaintext = sanitize_user_input(plaintext) except Exception: return jsonify({"error": "Decryption failed – wrong key or corrupted data"}), 400 # ── AI response (mock; swap in OpenAI / Anthropic here) ────────────────── ai_text = random.choice(AI_RESPONSES) # ── Persist both legs encrypted in DB (server uses transit key) ────────── bot = _get_ai_bot() _persist_message(user.id, bot.id, ciphertext, nonce_b64) # user → AI resp_ct, resp_nonce = aesgcm_encrypt(transit_key, ai_text) _persist_message(bot.id, user.id, resp_ct, resp_nonce) # AI → user # ── Update free trial counter ───────────────────────────────────────────── if not user.has_ai_access: user.ai_messages_used = min(user.ai_messages_used + 1, AI_FREE_LIMIT) db.session.commit() # ── Re-encrypt AI response for transit back ─────────────────────────────── resp_ct_transit, resp_nonce_transit = aesgcm_encrypt(transit_key, ai_text) return jsonify({ "ciphertext": resp_ct_transit, "nonce": resp_nonce_transit, "ai_messages_used": user.ai_messages_used, "has_ai_access": user.has_ai_access, }) # --------------------------------------------------------------------------- # Payment webhook # --------------------------------------------------------------------------- @api.route("/payment/success", methods=["POST"]) def payment_success(): """ Server-side payment webhook – NOT callable by clients. Validates the webhook secret and unlocks AI access for the user identified by the 'user_id' field in the JSON body. For Stripe production: replace the secret comparison with stripe.Webhook.construct_event() using the raw request body and the Stripe-Signature header. """ data = request.get_json() or {} secret = data.get("secret", "") # Constant-time comparison to prevent timing attacks if not secret or not hmac.compare_digest( secret.encode(), PAYMENT_SECRET.encode(), ): return jsonify({"error": "Invalid or missing payment secret"}), 403 # Identify the user from the webhook payload (NOT from client auth) user_id = data.get("user_id") if not user_id: return jsonify({"error": "Missing user_id in webhook payload"}), 400 user = db.session.get(User, user_id) if not user: return jsonify({"error": "User not found"}), 404 if not user.has_ai_access: user.has_ai_access = True db.session.commit() # Emit real-time unlock event to the user's active socket connection try: from app import socketio, username_to_sid target_sid = username_to_sid.get(user.username.lower()) if target_sid: socketio.emit("ai_unlock", { "msg": "🎉 AI access unlocked! Enjoy unlimited SexyAI chat.", }, to=target_sid) except Exception: pass # Non-critical: UI will refresh on next request anyway return jsonify({"status": "ok", "has_ai_access": True})