aprhodite/routes.py

378 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
routes.py REST API blueprint for SexyChat.
Endpoints
---------
POST /api/auth/register Create account, return JWT
POST /api/auth/login Verify credentials, return JWT
GET /api/pm/history Last 500 encrypted messages for a conversation
POST /api/ai/message Transit-decrypt, get AI response, re-encrypt, persist
POST /api/payment/success Validate webhook secret, unlock AI, push socket event
"""
import os
import hmac
import hashlib
import random
import functools
import base64
import bcrypt
from flask import Blueprint, g, jsonify, request
from database import db
from models import User, Message
from config import (
AI_FREE_LIMIT, AI_BOT_NAME, PAYMENT_SECRET, MAX_HISTORY, JWT_SECRET,
aesgcm_encrypt, aesgcm_decrypt, issue_jwt, verify_jwt,
generate_csrf_token, sanitize_user_input,
)
api = Blueprint("api", __name__, url_prefix="/api")
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
AI_RESPONSES = [
"Mmm, you have my full attention 💋",
"Oh my... keep going 😈 Don't stop there.",
"You're bold. I like that 🔥 Most guests aren't this brave.",
"I wasn't expecting that... but I'm definitely not complaining 😏",
"Well well well, aren't you something 👀 Tell me everything.",
"That's deliciously naughty 🌙 You're dangerous, aren't you?",
"You certainly know how to make an evening interesting 💜",
"I love the way you think 😌 It's... refreshing.",
"A dark club, a conversation like this... 🕯️ This is my kind of night.",
"Say that again. Slower. 💋",
"*sets down the wine glass* Is it warm in here, or is it just you? 🔥",
"You're trouble. The most delicious kind 😈",
"I don't usually admit when a guest surprises me, but here we are 🌙",
"Keep talking. I could listen to this all night 💜",
]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _require_auth(f):
"""Decorator parse Bearer JWT and populate g.current_user."""
@functools.wraps(f)
def wrapped(*args, **kwargs):
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return jsonify({"error": "Unauthorized"}), 401
payload = verify_jwt(auth_header[7:])
if not payload:
return jsonify({"error": "Invalid or expired token"}), 401
user = db.session.get(User, payload["user_id"])
if not user:
return jsonify({"error": "User not found"}), 401
g.current_user = user
return f(*args, **kwargs)
return wrapped
def _require_csrf(f):
"""Decorator validate CSRF token from request header."""
@functools.wraps(f)
def wrapped(*args, **kwargs):
csrf_token = request.headers.get("X-CSRF-Token", "")
session_csrf = request.headers.get("X-Session-CSRF", "")
# CSRF check: token must match session token (simple HMAC validation)
if not csrf_token or not session_csrf:
return jsonify({"error": "Missing CSRF tokens"}), 403
# For this implementation, we just ensure token is non-empty
# In production, validate against server-side session store
if len(csrf_token) < 20:
return jsonify({"error": "Invalid CSRF token"}), 403
return f(*args, **kwargs)
return wrapped
def _persist_message(sender_id: int, recipient_id: int,
encrypted_content: str, nonce: str) -> None:
"""Save a PM to the database. Enforces MAX_HISTORY per conversation pair."""
msg = Message(
sender_id=sender_id,
recipient_id=recipient_id,
encrypted_content=encrypted_content,
nonce=nonce,
)
db.session.add(msg)
db.session.commit()
# Lazy pruning: cap conversation history at MAX_HISTORY
pair_ids = [sender_id, recipient_id]
total = (
Message.query
.filter(
Message.sender_id.in_(pair_ids),
Message.recipient_id.in_(pair_ids),
)
.count()
)
if total > MAX_HISTORY:
excess = total - MAX_HISTORY
oldest = (
Message.query
.filter(
Message.sender_id.in_(pair_ids),
Message.recipient_id.in_(pair_ids),
)
.order_by(Message.timestamp.asc())
.limit(excess)
.all()
)
for old_msg in oldest:
db.session.delete(old_msg)
db.session.commit()
def _get_ai_bot() -> User:
"""Return the SexyAI virtual user, creating it if absent."""
bot = User.query.filter_by(username=AI_BOT_NAME).first()
if not bot:
bot = User(
username=AI_BOT_NAME,
password_hash=bcrypt.hashpw(os.urandom(32), bcrypt.gensalt()).decode(),
has_ai_access=True,
)
db.session.add(bot)
db.session.commit()
return bot
# ---------------------------------------------------------------------------
# Auth routes
# ---------------------------------------------------------------------------
@api.route("/auth/register", methods=["POST"])
def register():
data = request.get_json() or {}
username = str(data.get("username", "")).strip()[:20]
password = str(data.get("password", "")).strip()
email = str(data.get("email", "")).strip()[:255] or None
if not username or not username.replace("_", "").replace("-", "").isalnum():
return jsonify({"error": "Invalid username."}), 400
if len(password) < 6:
return jsonify({"error": "Password must be at least 6 characters."}), 400
if username.lower() == AI_BOT_NAME.lower():
return jsonify({"error": "That username is reserved."}), 409
if User.query.filter(db.func.lower(User.username) == username.lower()).first():
return jsonify({"error": "Username already registered."}), 409
hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
user = User(username=username, password_hash=hashed, email=email)
db.session.add(user)
db.session.commit()
token = issue_jwt(user.id, user.username)
csrf_token = generate_csrf_token()
return jsonify({
"token": token,
"csrf_token": csrf_token,
"user": {
"id": user.id,
"username": user.username,
"has_ai_access": user.has_ai_access,
"ai_messages_used": user.ai_messages_used,
},
}), 201
@api.route("/auth/login", methods=["POST"])
def login():
data = request.get_json() or {}
username = str(data.get("username", "")).strip()
password = str(data.get("password", "")).strip()
user = User.query.filter(
db.func.lower(User.username) == username.lower()
).first()
if not user or not bcrypt.checkpw(password.encode(), user.password_hash.encode()):
return jsonify({"error": "Invalid username or password."}), 401
token = issue_jwt(user.id, user.username)
csrf_token = generate_csrf_token()
return jsonify({
"token": token,
"csrf_token": csrf_token,
"user": {
"id": user.id,
"username": user.username,
"has_ai_access": user.has_ai_access,
"ai_messages_used": user.ai_messages_used,
},
})
# ---------------------------------------------------------------------------
# PM history
# ---------------------------------------------------------------------------
@api.route("/pm/history", methods=["GET"])
@_require_auth
def pm_history():
me = g.current_user
target_name = request.args.get("with", "").strip()
if not target_name:
return jsonify({"error": "Missing 'with' query parameter"}), 400
other = User.query.filter(
db.func.lower(User.username) == target_name.lower()
).first()
if not other:
return jsonify({"messages": []})
# Derive the room key so the client can decrypt history
pair = ":".join(sorted([me.username.lower(), other.username.lower()]))
room_name = "pm:" + pair
room_key = base64.b64encode(
hmac.new(JWT_SECRET.encode(), room_name.encode(), hashlib.sha256).digest()
).decode()
rows = (
Message.query
.filter(
db.or_(
db.and_(Message.sender_id == me.id, Message.recipient_id == other.id),
db.and_(Message.sender_id == other.id, Message.recipient_id == me.id),
)
)
.order_by(Message.timestamp.desc())
.limit(MAX_HISTORY)
.all()
)
rows.reverse() # return in chronological order
return jsonify({
"room_key": room_key,
"messages": [
{
"from_me": m.sender_id == me.id,
"ciphertext": m.encrypted_content,
"nonce": m.nonce,
"ts": m.timestamp.strftime("%H:%M"),
}
for m in rows
]
})
# ---------------------------------------------------------------------------
# AI message (transit encryption key never persisted)
# ---------------------------------------------------------------------------
@api.route("/ai/message", methods=["POST"])
@_require_auth
@_require_csrf
def ai_message():
user = g.current_user
data = request.get_json() or {}
# ── Gate ──────────────────────────────────────────────────────────────────
if not user.has_ai_access and user.ai_messages_used >= AI_FREE_LIMIT:
return jsonify({
"error": "ai_limit_reached",
"limit": AI_FREE_LIMIT,
"ai_messages_used": user.ai_messages_used,
}), 402
ciphertext = data.get("ciphertext", "")
nonce_b64 = data.get("nonce", "")
transit_key = data.get("transit_key", "")
if not all([ciphertext, nonce_b64, transit_key]):
return jsonify({"error": "Missing encryption fields"}), 400
# ── Transit decrypt (message readable for AI; key NOT stored) ─────────────
try:
plaintext = aesgcm_decrypt(transit_key, ciphertext, nonce_b64)
# Sanitize before using in AI prompt
plaintext = sanitize_user_input(plaintext)
except Exception:
return jsonify({"error": "Decryption failed wrong key or corrupted data"}), 400
# ── AI response (mock; swap in OpenAI / Anthropic here) ──────────────────
ai_text = random.choice(AI_RESPONSES)
# ── Persist both legs encrypted in DB (server uses transit key) ──────────
bot = _get_ai_bot()
_persist_message(user.id, bot.id, ciphertext, nonce_b64) # user → AI
resp_ct, resp_nonce = aesgcm_encrypt(transit_key, ai_text)
_persist_message(bot.id, user.id, resp_ct, resp_nonce) # AI → user
# ── Update free trial counter ─────────────────────────────────────────────
if not user.has_ai_access:
user.ai_messages_used = min(user.ai_messages_used + 1, AI_FREE_LIMIT)
db.session.commit()
# ── Re-encrypt AI response for transit back ───────────────────────────────
resp_ct_transit, resp_nonce_transit = aesgcm_encrypt(transit_key, ai_text)
return jsonify({
"ciphertext": resp_ct_transit,
"nonce": resp_nonce_transit,
"ai_messages_used": user.ai_messages_used,
"has_ai_access": user.has_ai_access,
})
# ---------------------------------------------------------------------------
# Payment webhook
# ---------------------------------------------------------------------------
@api.route("/payment/success", methods=["POST"])
def payment_success():
"""
Server-side payment webhook NOT callable by clients.
Validates the webhook secret and unlocks AI access for the user
identified by the 'user_id' field in the JSON body.
For Stripe production: replace the secret comparison with
stripe.Webhook.construct_event() using the raw request body and
the Stripe-Signature header.
"""
data = request.get_json() or {}
secret = data.get("secret", "")
# Constant-time comparison to prevent timing attacks
if not secret or not hmac.compare_digest(
secret.encode(),
PAYMENT_SECRET.encode(),
):
return jsonify({"error": "Invalid or missing payment secret"}), 403
# Identify the user from the webhook payload (NOT from client auth)
user_id = data.get("user_id")
if not user_id:
return jsonify({"error": "Missing user_id in webhook payload"}), 400
user = db.session.get(User, user_id)
if not user:
return jsonify({"error": "User not found"}), 404
if not user.has_ai_access:
user.has_ai_access = True
db.session.commit()
# Emit real-time unlock event to the user's active socket connection
try:
from app import socketio, username_to_sid
target_sid = username_to_sid.get(user.username.lower())
if target_sid:
socketio.emit("ai_unlock", {
"msg": "🎉 AI access unlocked! Enjoy unlimited SexyAI chat.",
}, to=target_sid)
except Exception:
pass # Non-critical: UI will refresh on next request anyway
return jsonify({"status": "ok", "has_ai_access": True})