""" routes.py – REST API blueprint for SexyChat. Endpoints --------- POST /api/auth/register – Create account, return JWT POST /api/auth/login – Verify credentials, return JWT GET /api/pm/history – Last 500 encrypted messages for a conversation POST /api/ai/message – Transit-decrypt, get AI response, re-encrypt, persist POST /api/payment/success – Validate webhook secret, unlock AI, push socket event """ import os import base64 import hmac import random import functools from datetime import datetime, timedelta import bcrypt import jwt as pyjwt from cryptography.hazmat.primitives.ciphers.aead import AESGCM from flask import Blueprint, g, jsonify, request from database import db from models import User, Message api = Blueprint("api", __name__, url_prefix="/api") # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- JWT_SECRET = os.environ.get("JWT_SECRET", "change-me-jwt-secret") JWT_EXPIRY_DAYS = 7 AI_FREE_LIMIT = 3 PAYMENT_SECRET = os.environ.get("PAYMENT_SECRET", "change-me-payment-secret") MAX_HISTORY = 500 AI_BOT_NAME = "Violet" AI_RESPONSES = [ "Mmm, you have my full attention 💋", "Oh my... keep going 😈 Don't stop there.", "You're bold. I like that 🔥 Most guests aren't this brave.", "I wasn't expecting that... but I'm definitely not complaining 😏", "Well well well, aren't you something 👀 Tell me everything.", "That's deliciously naughty 🌙 You're dangerous, aren't you?", "You certainly know how to make an evening interesting 💜", "I love the way you think 😌 It's... refreshing.", "A dark club, a conversation like this... 🕯️ This is my kind of night.", "Say that again. Slower. 💋", "*sets down the wine glass* Is it warm in here, or is it just you? 🔥", "You're trouble. The most delicious kind 😈", "I don't usually admit when a guest surprises me, but here we are 🌙", "Keep talking. I could listen to this all night 💜", ] # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _issue_jwt(user_id: int, username: str) -> str: payload = { "user_id": user_id, "username": username, "exp": datetime.utcnow() + timedelta(days=JWT_EXPIRY_DAYS), } return pyjwt.encode(payload, JWT_SECRET, algorithm="HS256") def _verify_jwt(token: str): try: return pyjwt.decode(token, JWT_SECRET, algorithms=["HS256"]) except pyjwt.PyJWTError: return None def _require_auth(f): """Decorator – parse Bearer JWT and populate g.current_user.""" @functools.wraps(f) def wrapped(*args, **kwargs): auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): return jsonify({"error": "Unauthorized"}), 401 payload = _verify_jwt(auth_header[7:]) if not payload: return jsonify({"error": "Invalid or expired token"}), 401 user = db.session.get(User, payload["user_id"]) if not user: return jsonify({"error": "User not found"}), 401 g.current_user = user return f(*args, **kwargs) return wrapped def _aesgcm_encrypt(key_b64: str, plaintext: str) -> tuple: """Encrypt plaintext with AES-GCM. Returns (ciphertext_b64, nonce_b64).""" key_bytes = base64.b64decode(key_b64) nonce = os.urandom(12) ct = AESGCM(key_bytes).encrypt(nonce, plaintext.encode("utf-8"), None) return base64.b64encode(ct).decode(), base64.b64encode(nonce).decode() def _aesgcm_decrypt(key_b64: str, ciphertext_b64: str, nonce_b64: str) -> str: """Decrypt AES-GCM ciphertext. Raises on authentication failure.""" key_bytes = base64.b64decode(key_b64) ct = base64.b64decode(ciphertext_b64) nonce = base64.b64decode(nonce_b64) return AESGCM(key_bytes).decrypt(nonce, ct, None).decode("utf-8") def _persist_message(sender_id: int, recipient_id: int, encrypted_content: str, nonce: str) -> None: """Save a PM to the database. Enforces MAX_HISTORY per conversation pair.""" msg = Message( sender_id=sender_id, recipient_id=recipient_id, encrypted_content=encrypted_content, nonce=nonce, ) db.session.add(msg) db.session.commit() # Lazy pruning: cap conversation history at MAX_HISTORY pair_ids = [sender_id, recipient_id] total = ( Message.query .filter( Message.sender_id.in_(pair_ids), Message.recipient_id.in_(pair_ids), ) .count() ) if total > MAX_HISTORY: excess = total - MAX_HISTORY oldest = ( Message.query .filter( Message.sender_id.in_(pair_ids), Message.recipient_id.in_(pair_ids), ) .order_by(Message.timestamp.asc()) .limit(excess) .all() ) for old_msg in oldest: db.session.delete(old_msg) db.session.commit() def _get_ai_bot() -> User: """Return the SexyAI virtual user, creating it if absent.""" bot = User.query.filter_by(username=AI_BOT_NAME).first() if not bot: bot = User( username=AI_BOT_NAME, password_hash=bcrypt.hashpw(os.urandom(32), bcrypt.gensalt()).decode(), has_ai_access=True, ) db.session.add(bot) db.session.commit() return bot # --------------------------------------------------------------------------- # Auth routes # --------------------------------------------------------------------------- @api.route("/auth/register", methods=["POST"]) def register(): data = request.get_json() or {} username = str(data.get("username", "")).strip()[:20] password = str(data.get("password", "")).strip() email = str(data.get("email", "")).strip()[:255] or None if not username or not username.replace("_", "").replace("-", "").isalnum(): return jsonify({"error": "Invalid username."}), 400 if len(password) < 6: return jsonify({"error": "Password must be at least 6 characters."}), 400 if username.lower() == AI_BOT_NAME.lower(): return jsonify({"error": "That username is reserved."}), 409 if User.query.filter(db.func.lower(User.username) == username.lower()).first(): return jsonify({"error": "Username already registered."}), 409 hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() user = User(username=username, password_hash=hashed, email=email) db.session.add(user) db.session.commit() token = _issue_jwt(user.id, user.username) return jsonify({ "token": token, "user": { "id": user.id, "username": user.username, "has_ai_access": user.has_ai_access, "ai_messages_used": user.ai_messages_used, }, }), 201 @api.route("/auth/login", methods=["POST"]) def login(): data = request.get_json() or {} username = str(data.get("username", "")).strip() password = str(data.get("password", "")).strip() user = User.query.filter( db.func.lower(User.username) == username.lower() ).first() if not user or not bcrypt.checkpw(password.encode(), user.password_hash.encode()): return jsonify({"error": "Invalid username or password."}), 401 token = _issue_jwt(user.id, user.username) return jsonify({ "token": token, "user": { "id": user.id, "username": user.username, "has_ai_access": user.has_ai_access, "ai_messages_used": user.ai_messages_used, }, }) # --------------------------------------------------------------------------- # PM history # --------------------------------------------------------------------------- @api.route("/pm/history", methods=["GET"]) @_require_auth def pm_history(): me = g.current_user target_name = request.args.get("with", "").strip() if not target_name: return jsonify({"error": "Missing 'with' query parameter"}), 400 other = User.query.filter( db.func.lower(User.username) == target_name.lower() ).first() if not other: return jsonify({"messages": []}) rows = ( Message.query .filter( db.or_( db.and_(Message.sender_id == me.id, Message.recipient_id == other.id), db.and_(Message.sender_id == other.id, Message.recipient_id == me.id), ) ) .order_by(Message.timestamp.desc()) .limit(MAX_HISTORY) .all() ) rows.reverse() # return in chronological order return jsonify({ "messages": [ { "from_me": m.sender_id == me.id, "ciphertext": m.encrypted_content, "nonce": m.nonce, "ts": m.timestamp.strftime("%H:%M"), } for m in rows ] }) # --------------------------------------------------------------------------- # AI message (transit encryption – key never persisted) # --------------------------------------------------------------------------- @api.route("/ai/message", methods=["POST"]) @_require_auth def ai_message(): user = g.current_user data = request.get_json() or {} # ── Gate ────────────────────────────────────────────────────────────────── if not user.has_ai_access and user.ai_messages_used >= AI_FREE_LIMIT: return jsonify({ "error": "ai_limit_reached", "limit": AI_FREE_LIMIT, "ai_messages_used": user.ai_messages_used, }), 402 ciphertext = data.get("ciphertext", "") nonce_b64 = data.get("nonce", "") transit_key = data.get("transit_key", "") if not all([ciphertext, nonce_b64, transit_key]): return jsonify({"error": "Missing encryption fields"}), 400 # ── Transit decrypt (message readable for AI; key NOT stored) ───────────── try: _plaintext = _aesgcm_decrypt(transit_key, ciphertext, nonce_b64) except Exception: return jsonify({"error": "Decryption failed – wrong key or corrupted data"}), 400 # ── AI response (mock; swap in OpenAI / Anthropic here) ────────────────── ai_text = random.choice(AI_RESPONSES) # ── Persist both legs encrypted in DB (server uses transit key) ────────── bot = _get_ai_bot() _persist_message(user.id, bot.id, ciphertext, nonce_b64) # user → AI resp_ct, resp_nonce = _aesgcm_encrypt(transit_key, ai_text) _persist_message(bot.id, user.id, resp_ct, resp_nonce) # AI → user # ── Update free trial counter ───────────────────────────────────────────── if not user.has_ai_access: user.ai_messages_used = min(user.ai_messages_used + 1, AI_FREE_LIMIT) db.session.commit() # ── Re-encrypt AI response for transit back ─────────────────────────────── resp_ct_transit, resp_nonce_transit = _aesgcm_encrypt(transit_key, ai_text) return jsonify({ "ciphertext": resp_ct_transit, "nonce": resp_nonce_transit, "ai_messages_used": user.ai_messages_used, "has_ai_access": user.has_ai_access, }) # --------------------------------------------------------------------------- # Payment webhook # --------------------------------------------------------------------------- @api.route("/payment/success", methods=["POST"]) @_require_auth def payment_success(): """ Validate a payment webhook and flip user.has_ai_access. Expected body: { "secret": "" } For Stripe production: replace the secret comparison with stripe.Webhook.construct_event() using the raw request body and the Stripe-Signature header. """ data = request.get_json() or {} secret = data.get("secret", "") # Constant-time comparison to prevent timing attacks if not secret or not hmac.compare_digest( secret.encode(), PAYMENT_SECRET.encode(), ): return jsonify({"error": "Invalid or missing payment secret"}), 403 user = g.current_user if not user.has_ai_access: user.has_ai_access = True db.session.commit() # Emit real-time unlock event to the user's active socket connection try: from app import socketio, username_to_sid target_sid = username_to_sid.get(user.username.lower()) if target_sid: socketio.emit("ai_unlock", { "msg": "🎉 AI access unlocked! Enjoy unlimited SexyAI chat.", }, to=target_sid) except Exception: pass # Non-critical: UI will refresh on next request anyway return jsonify({"status": "ok", "has_ai_access": True})