""" app.py – Flask-SocketIO backend for SexyChat (Phase 2). Architecture ------------ Single 'lobby' room – ephemeral, nothing persisted. PM rooms – AES-GCM encrypted; persisted for registered users. AI ('violet') room – transit-decrypted for Ollama, re-encrypted for DB. Inference queue – one Ollama request at a time; broadcasts violet_typing. Socket events (client → server) -------------------------------- join { mode, username, password?, email?, mod_password? } message { text } pm_open { target } pm_accept { room } pm_message { room, text? } | { room, ciphertext, nonce } ai_message { ciphertext, nonce, transit_key } mod_kick { target } mod_ban { target } mod_mute { target } Socket events (server → client) -------------------------------- joined { username, is_admin, is_registered, has_ai_access, ai_messages_used, token? } nicklist { users } message { username, text, is_admin, is_registered, ts } system { msg, ts } error { msg } kicked { msg } pm_invite { from, room } pm_ready { with, room } pm_message { from, text?, ciphertext?, nonce?, room, ts } violet_typing { busy: bool } ai_response { ciphertext, nonce, ai_messages_used, has_ai_access } or { error: 'ai_limit_reached' } ai_unlock { msg } """ import os import json import time import uuid import base64 import functools from collections import defaultdict from datetime import datetime, timedelta import bcrypt import jwt as pyjwt import eventlet # noqa – monkey-patched in start.py before any other import from eventlet.queue import Queue as EvQueue from flask import Flask, request, send_from_directory from flask_socketio import SocketIO, emit, join_room, disconnect from database import db, init_db from models import User, Message, UserIgnore # --------------------------------------------------------------------------- # Configuration Loader # --------------------------------------------------------------------------- def load_config(): conf = {} config_path = os.path.join(os.path.dirname(__file__), "config.json") if os.path.exists(config_path): try: with open(config_path, "r") as f: conf = json.load(f) except Exception as e: print(f"⚠️ Warning: Failed to load config.json: {e}") return conf _CONFIG = load_config() def _get_conf(key, default=None): # Order: Env Var > Config File > Default return os.environ.get(key, _CONFIG.get(key, default)) # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- SECRET_KEY = _get_conf("SECRET_KEY", uuid.uuid4().hex) JWT_SECRET = _get_conf("JWT_SECRET", uuid.uuid4().hex) ADMIN_USERNAME = _get_conf("ADMIN_USERNAME", "ComputerTech") ADMIN_PASSWORD = _get_conf("ADMIN_PASSWORD", "789abc//") MAX_MSG_LEN = 500 LOBBY = "lobby" AI_FREE_LIMIT = int(_get_conf("AI_FREE_LIMIT", 3)) AI_BOT_NAME = "Violet" # Ollama OLLAMA_URL = _get_conf("OLLAMA_URL", "http://localhost:11434") VIOLET_MODEL = _get_conf("VIOLET_MODEL", "sam860/dolphin3-llama3.2:3b") VIOLET_SYSTEM = ( "You are Violet, a flirtatious and sophisticated nightclub hostess at " "an exclusive, dimly-lit members-only club. You are charming, witty, " "and seductive — never crude or offensive. You speak with elegance, " "mystery, and a hint of playful danger. Keep every reply to 1–3 " "sentences maximum. You are in a private conversation with a special " "guest who has caught your eye." ) # --------------------------------------------------------------------------- # In-process state # --------------------------------------------------------------------------- # sid → { username, ip, is_admin, joined_at, user_id, is_registered, # has_ai_access, ai_messages_used } connected_users: dict = {} username_to_sid: dict = {} # lowercase_name → sid muted_users: set = set() banned_usernames: set = set() banned_ips: set = set() message_timestamps: dict = defaultdict(list) RATE_LIMIT = 6 RATE_WINDOW = 5 # AI inference queue (one Ollama call at a time) ai_queue: EvQueue = EvQueue() _app_ref = None # set in create_app() for greenlet app-context access # --------------------------------------------------------------------------- # AES-GCM helpers (server-side, transit only) # --------------------------------------------------------------------------- def _aesgcm_encrypt(key_b64: str, plaintext: str) -> tuple: from cryptography.hazmat.primitives.ciphers.aead import AESGCM key = base64.b64decode(key_b64) nonce = os.urandom(12) ct = AESGCM(key).encrypt(nonce, plaintext.encode("utf-8"), None) return base64.b64encode(ct).decode(), base64.b64encode(nonce).decode() def _aesgcm_decrypt(key_b64: str, ciphertext_b64: str, nonce_b64: str) -> str: from cryptography.hazmat.primitives.ciphers.aead import AESGCM key = base64.b64decode(key_b64) ct = base64.b64decode(ciphertext_b64) nonce = base64.b64decode(nonce_b64) return AESGCM(key).decrypt(nonce, ct, None).decode("utf-8") # --------------------------------------------------------------------------- # Ollama integration # --------------------------------------------------------------------------- def call_ollama(user_message: str) -> str: """Call the local Ollama API. Returns plaintext AI response.""" import requests as req try: resp = req.post( f"{OLLAMA_URL}/api/chat", json={ "model": VIOLET_MODEL, "messages": [ {"role": "system", "content": VIOLET_SYSTEM}, {"role": "user", "content": user_message}, ], "stream": False, "options": {"temperature": 0.88, "num_predict": 120}, }, timeout=90, ) resp.raise_for_status() return resp.json()["message"]["content"].strip() except Exception as exc: print(f"[Violet/Ollama] error: {exc}") return "Give me just a moment, darling... 💜" # --------------------------------------------------------------------------- # AI inference queue worker (single greenlet, serialises Ollama calls) # --------------------------------------------------------------------------- def _ai_worker() -> None: """Eventlet greenlet – drains ai_queue one task at a time.""" global _app_ref while True: task = ai_queue.get() # blocks cooperatively until item available # ── Announce Violet is busy ─────────────────────────────────────────── room = _pm_room(db.session.get(User, task["user_id"]).username, AI_BOT_NAME) if _app_ref else None if room: socketio.emit("violet_typing", {"busy": True, "room": room}, to=room) else: socketio.emit("violet_typing", {"busy": True}) # ── Decrypt user message (transit; key never stored) ────────────────── try: plaintext = _aesgcm_decrypt( task["transit_key"], task["ciphertext"], task["nonce_val"] ) ai_text = call_ollama(plaintext) except Exception as exc: print(f"[Violet] processing error: {exc}") ai_text = "Mmm, something went wrong, darling 💜" # ── Re-encrypt AI response ──────────────────────────────────────────── resp_ct, resp_nonce = _aesgcm_encrypt(task["transit_key"], ai_text) ai_messages_used = task.get("ai_messages_used", 0) has_ai_access = task.get("has_ai_access", False) # ── DB operations (need explicit app context in greenlet) ───────────── with _app_ref.app_context(): bot = User.query.filter_by(username=AI_BOT_NAME).first() if bot and task.get("user_id"): _save_pm(task["user_id"], bot.id, task["ciphertext"], task["nonce_val"]) # user → Violet _save_pm(bot.id, task["user_id"], resp_ct, resp_nonce) # Violet → user if task.get("user_id") and not has_ai_access: db_user = db.session.get(User, task["user_id"]) if db_user and not db_user.has_ai_access: db_user.ai_messages_used = min( db_user.ai_messages_used + 1, AI_FREE_LIMIT ) db.session.commit() ai_messages_used = db_user.ai_messages_used has_ai_access = db_user.has_ai_access # Update in-process cache sid = task["sid"] if sid in connected_users: connected_users[sid]["ai_messages_used"] = ai_messages_used connected_users[sid]["has_ai_access"] = has_ai_access # ── Emit response to originating client ─────────────────────────────── with _app_ref.app_context(): db_user = db.session.get(User, task["user_id"]) room = _pm_room(db_user.username, AI_BOT_NAME) socketio.emit("pm_message", { "from": AI_BOT_NAME, "ciphertext": resp_ct, "nonce": resp_nonce, "room": room, "ts": _ts() }, to=room) socketio.emit("violet_typing", {"busy": False, "room": room}, to=room) ai_queue.task_done() # Clear typing indicator when queue drains # Done in per-room emit above # --------------------------------------------------------------------------- # General helpers # --------------------------------------------------------------------------- def _pm_room(a: str, b: str) -> str: return "pm:" + ":".join(sorted([a.lower(), b.lower()])) def _get_nicklist() -> list: users = [] for info in connected_users.values(): if not info.get("username"): continue users.append({ "username": info["username"], "is_admin": info["is_admin"], "is_registered": info.get("is_registered", False), "is_verified": info.get("is_verified", False), }) # Static "Violet" AI user users.append({ "username": AI_BOT_NAME, "is_admin": False, "is_registered": True, "is_verified": True, "is_ai": True }) return sorted(users, key=lambda u: u["username"].lower()) def _require_admin(f): @functools.wraps(f) def wrapped(*args, **kwargs): user = connected_users.get(request.sid) if not user or not user.get("is_admin"): emit("error", {"msg": "Forbidden."}) return return f(*args, **kwargs) return wrapped def _rate_limited(sid: str) -> bool: now = time.time() message_timestamps[sid] = [t for t in message_timestamps[sid] if now - t < RATE_WINDOW] if len(message_timestamps[sid]) >= RATE_LIMIT: return True message_timestamps[sid].append(now) return False def _ts() -> str: return time.strftime("%H:%M", time.localtime()) def _do_disconnect(sid: str) -> None: try: socketio.server.disconnect(sid) except Exception: pass def _issue_jwt(user_id: int, username: str) -> str: return pyjwt.encode( {"user_id": user_id, "username": username, "exp": datetime.utcnow() + timedelta(days=7)}, JWT_SECRET, algorithm="HS256", ) def _verify_jwt(token: str): try: return pyjwt.decode(token, JWT_SECRET, algorithms=["HS256"]) except Exception: return None def _save_pm(sender_id: int, recipient_id: int, encrypted_content: str, nonce: str) -> None: msg = Message( sender_id=sender_id, recipient_id=recipient_id, encrypted_content=encrypted_content, nonce=nonce, ) db.session.add(msg) db.session.commit() # --------------------------------------------------------------------------- # SocketIO instance # --------------------------------------------------------------------------- socketio = SocketIO() # --------------------------------------------------------------------------- # App factory # --------------------------------------------------------------------------- def create_app() -> Flask: global _app_ref app = Flask(__name__, static_folder="static", template_folder=".") app.config.update( SECRET_KEY=SECRET_KEY, SQLALCHEMY_DATABASE_URI=os.environ.get( "DATABASE_URL", "sqlite:///sexchat.db" ), SQLALCHEMY_TRACK_MODIFICATIONS=False, SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SAMESITE="Lax", ) init_db(app) _app_ref = app msg_queue = ( os.environ.get("SOCKETIO_MESSAGE_QUEUE") or os.environ.get("REDIS_URL") or None ) socketio.init_app( app, async_mode="eventlet", cors_allowed_origins="*", message_queue=msg_queue, logger=False, engineio_logger=False, ) # Start the AI inference queue worker greenlet eventlet.spawn(_ai_worker) from routes import api app.register_blueprint(api) @app.route("/") def index(): return send_from_directory(".", "index.html") return app # --------------------------------------------------------------------------- # Socket – connection lifecycle # --------------------------------------------------------------------------- @socketio.on("connect") def on_connect(auth=None): sid = request.sid ip = request.environ.get("HTTP_X_FORWARDED_FOR", request.remote_addr) if ip in banned_ips: emit("error", {"msg": "You are banned."}) disconnect() return False user_id = None; is_registered = False has_ai_access = False; ai_used = 0; jwt_username = None if auth and isinstance(auth, dict) and auth.get("token"): payload = _verify_jwt(auth["token"]) if payload: db_user = db.session.get(User, payload.get("user_id")) if db_user: user_id = db_user.id is_registered = True has_ai_access = db_user.has_ai_access ai_used = db_user.ai_messages_used jwt_username = db_user.username connected_users[sid] = { "username": None, "ip": ip, "is_admin": False, "joined_at": time.time(), "user_id": user_id, "is_registered": is_registered, "has_ai_access": has_ai_access, "ai_messages_used": ai_used, "_jwt_username": jwt_username, } @socketio.on("disconnect") def on_disconnect(): sid = request.sid user = connected_users.pop(sid, None) message_timestamps.pop(sid, None) if user and user.get("username"): lower = user["username"].lower() username_to_sid.pop(lower, None) socketio.emit("system", {"msg": f"**{user['username']}** left the room.", "ts": _ts()}, to=LOBBY) socketio.emit("nicklist", {"users": _get_nicklist()}, to=LOBBY) # --------------------------------------------------------------------------- # Join / auth # --------------------------------------------------------------------------- @socketio.on("join") def on_join(data): sid = request.sid user = connected_users.get(sid) if not user: return mode = str(data.get("mode", "guest")).strip() username = str(data.get("username", "")).strip()[:20] password = str(data.get("password", "")).strip() email = str(data.get("email", "")).strip()[:255] or None token = None db_user = None if mode == "register": if not username or not username.replace("_","").replace("-","").isalnum(): emit("error", {"msg": "Invalid username."}); return if len(password) < 6: emit("error", {"msg": "Password must be at least 6 characters."}); return if username.lower() == AI_BOT_NAME.lower(): emit("error", {"msg": "That username is reserved."}); return if User.query.filter(db.func.lower(User.username) == username.lower()).first(): emit("error", {"msg": "Username already registered."}); return hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() db_user = User(username=username, password_hash=hashed, email=email) db.session.add(db_user); db.session.commit() # DO NOT login yet – stay as guest and wait for mod emit("joined", { "username": username, "is_admin": False, "is_registered": False, "has_ai_access": False, "ai_messages_used": 0, "system_msg": "Account created! Please wait for a moderator to verify you before logging in." }) return elif mode == "login": # Check for Master Admin Override is_master = (username.lower() == ADMIN_USERNAME.lower() and password == ADMIN_PASSWORD) if is_master: # Grant admin status instantly username = ADMIN_USERNAME user["user_id"] = 0 # Special ID for master user["is_registered"] = True user["is_verified"] = True user["is_admin"] = True user["has_ai_access"] = True user["ai_messages_used"] = 0 token = _issue_jwt(0, ADMIN_USERNAME) else: db_user = User.query.filter( db.func.lower(User.username) == username.lower() ).first() if not db_user or not bcrypt.checkpw(password.encode(), db_user.password_hash.encode()): emit("error", {"msg": "Invalid username or password."}); return if not db_user.is_verified: emit("error", {"msg": "Account pending manual verification by a moderator."}); return username = db_user.username user["user_id"] = db_user.id user["is_registered"] = True user["is_verified"] = True user["is_admin"] = db_user.is_admin user["has_ai_access"] = db_user.has_ai_access user["ai_messages_used"] = db_user.ai_messages_used token = _issue_jwt(db_user.id, db_user.username) elif mode == "restore": if not user.get("user_id"): emit("error", {"msg": "Session expired. Please log in again."}); return db_user = db.session.get(User, user["user_id"]) if not db_user: emit("error", {"msg": "Account not found."}); return if not db_user.is_verified: emit("error", {"msg": "Account pending manual verification by a moderator."}); return username = db_user.username user["has_ai_access"] = db_user.has_ai_access user["ai_messages_used"] = db_user.ai_messages_used token = _issue_jwt(db_user.id, db_user.username) else: # guest if not username or not username.replace("_","").replace("-","").isalnum(): emit("error", {"msg": "Invalid username. Use letters, numbers, - or _."}); return lower = username.lower() if lower in banned_usernames: emit("error", {"msg": "That username is banned."}); return if lower in username_to_sid and username_to_sid[lower] != sid: emit("error", {"msg": "Username already in use."}); return is_admin = False mod_pw = str(data.get("mod_password", "")).strip() if mod_pw and mod_pw == ADMIN_PASSWORD: is_admin = True user["username"] = username user["is_admin"] = is_admin user["is_verified"] = db_user.is_verified if db_user else True # Guests are always "verified" for lobby username_to_sid[lower] = sid join_room(LOBBY) emit("joined", { "username": username, "is_admin": is_admin, "is_registered": user["is_registered"], "has_ai_access": user["has_ai_access"], "ai_messages_used": user["ai_messages_used"], "token": token, "ignored_list": [u.username for u in db_user.ignoring] if db_user else [] }) emit("system", { "msg": f"{'🛡️ ' if is_admin else ''}**{username}** joined the room.", "ts": _ts(), }, to=LOBBY) socketio.emit("nicklist", {"users": _get_nicklist()}, to=LOBBY) # --------------------------------------------------------------------------- # Lobby (ephemeral – never persisted) # --------------------------------------------------------------------------- @socketio.on("message") def on_message(data): sid = request.sid user = connected_users.get(sid) if not user or not user.get("username"): return if user["username"].lower() in muted_users: emit("error", {"msg": "You are muted."}); return if _rate_limited(sid): emit("error", {"msg": "Slow down!"}); return text = str(data.get("text", "")).strip()[:MAX_MSG_LEN] if not text: return emit("message", { "username": user["username"], "text": text, "is_admin": user["is_admin"], "is_registered": user.get("is_registered", False), "ts": _ts(), }, to=LOBBY) # --------------------------------------------------------------------------- # Private Messaging # --------------------------------------------------------------------------- @socketio.on("pm_open") def on_pm_open(data): sid = request.sid user = connected_users.get(sid) if not user or not user["username"]: return target = str(data.get("target", "")).strip() target_sid = username_to_sid.get(target.lower()) # "Violet" virtual connection if not target_sid and target.lower() == AI_BOT_NAME.lower(): # She's always online pass elif not target_sid: emit("error", {"msg": f"{target} is not online."}); return # Check if target has ignored me target_info = connected_users.get(target_sid) if target_info and target_info.get("user_id"): target_db = db.session.get(User, target_info["user_id"]) me_db = User.query.filter(db.func.lower(User.username) == user["username"].lower()).first() if target_db and me_db and me_db in target_db.ignoring: # We don't want to tell the requester they are ignored (stealth) # but we just don't send the invite. # Actually, to make it clear why nothing is happening: emit("error", {"msg": f"{target} is not accepting messages from you right now."}) return room = _pm_room(user["username"], target) join_room(room) socketio.emit("pm_invite", {"from": user["username"], "room": room}, to=target_sid) emit("pm_ready", {"with": target, "room": room}) @socketio.on("pm_accept") def on_pm_accept(data): join_room(data.get("room")) @socketio.on("pm_message") def on_pm_message(data): sid = request.sid user = connected_users.get(sid) if not user or not user["username"]: return room = str(data.get("room", "")) if not room.startswith("pm:"): return ciphertext = data.get("ciphertext", "") nonce_val = data.get("nonce", "") text = str(data.get("text", "")).strip()[:MAX_MSG_LEN] is_encrypted = bool(ciphertext and nonce_val) if not is_encrypted and not text: return ts = _ts() payload = ( {"from": user["username"], "ciphertext": ciphertext, "nonce": nonce_val, "room": room, "ts": ts} if is_encrypted else {"from": user["username"], "text": text, "room": room, "ts": ts} ) # Route to AI if recipient is Violet if f":{AI_BOT_NAME.lower()}" in room.lower(): if not user.get("user_id"): emit("error", {"msg": "You must be registered to chat with Violet."}); return if not user.get("is_verified"): emit("error", {"msg": "Your account is pending moderator approval. Please wait to chat with Violet."}); return if not user.get("has_ai_access") and user.get("ai_messages_used", 0) >= AI_FREE_LIMIT: emit("pm_message", {"from": AI_BOT_NAME, "text": "ai_limit_reached", "room": room, "system": True}, to=sid) return transit_key = data.get("transit_key", "") if not all([ciphertext, nonce_val, transit_key]): emit("error", {"msg": "AI Private Messaging requires transit encryption."}); return ai_queue.put({ "sid": sid, "user_id": user["user_id"], "has_ai_access": user["has_ai_access"], "ai_messages_used": user["ai_messages_used"], "ciphertext": ciphertext, "nonce_val": nonce_val, "transit_key": transit_key, }) return # ai_worker will handle the delivery emit("pm_message", payload, to=room) if is_encrypted and user.get("user_id"): parts = room.split(":")[1:] my_lower = user["username"].lower() other_low = next((p for p in parts if p != my_lower), None) if other_low: other_sid = username_to_sid.get(other_low) other_info = connected_users.get(other_sid, {}) if other_sid else {} if other_info.get("user_id"): _save_pm(user["user_id"], other_info["user_id"], ciphertext, nonce_val) # --------------------------------------------------------------------------- # AI – Violet (queued Ollama inference) # --------------------------------------------------------------------------- @socketio.on("ai_message") def on_ai_message(data): # Deprecated: use pm_message to Violet instead pass # --------------------------------------------------------------------------- # Mod tools # --------------------------------------------------------------------------- @socketio.on("mod_kick") @_require_admin def on_kick(data): target = str(data.get("target", "")).strip() target_sid = username_to_sid.get(target.lower()) if not target_sid: emit("error", {"msg": f"{target} is not online."}); return socketio.emit("kicked", {"msg": "You have been kicked by a moderator."}, to=target_sid) socketio.emit("system", {"msg": f"🚫 **{target}** was kicked.", "ts": _ts()}, to=LOBBY) eventlet.spawn_after(0.5, _do_disconnect, target_sid) @socketio.on("mod_ban") @_require_admin def on_ban(data): target = str(data.get("target", "")).strip() lower = target.lower() banned_usernames.add(lower) target_sid = username_to_sid.get(lower) if target_sid: info = connected_users.get(target_sid, {}) if info.get("ip"): banned_ips.add(info["ip"]) socketio.emit("kicked", {"msg": "You have been banned."}, to=target_sid) eventlet.spawn_after(0.5, _do_disconnect, target_sid) socketio.emit("system", {"msg": f"🔨 **{target}** was banned.", "ts": _ts()}, to=LOBBY) @socketio.on("mod_mute") @_require_admin def on_mute(data): target = str(data.get("target", "")).strip() lower = target.lower() if lower in muted_users: muted_users.discard(lower); action = "unmuted" else: muted_users.add(lower); action = "muted" emit("system", {"msg": f"🔇 **{target}** was {action}.", "ts": _ts()}, to=LOBBY) @socketio.on("mod_kickban") @_require_admin def on_kickban(data): target = str(data.get("target", "")).strip() lower = target.lower() # Ban banned_usernames.add(lower) target_sid = username_to_sid.get(lower) if target_sid: info = connected_users.get(target_sid, {}) if info.get("ip"): banned_ips.add(info["ip"]) socketio.emit("kicked", {"msg": "You have been banned."}, to=target_sid) eventlet.spawn_after(0.5, _do_disconnect, target_sid) # Announce socketio.emit("system", {"msg": f"💀 **{target}** was kickbanned.", "ts": _ts()}, to=LOBBY) @socketio.on("user_ignore") def on_ignore(data): sid = request.sid user = connected_users.get(sid) if not user or not user.get("user_id"): return target_name = str(data.get("target", "")).strip() target_user = User.query.filter(db.func.lower(User.username) == target_name.lower()).first() if target_user: me = db.session.get(User, user["user_id"]) if target_user not in me.ignoring: me.ignoring.append(target_user) db.session.commit() emit("ignore_status", {"target": target_user.username, "ignored": True}) @socketio.on("user_unignore") def on_unignore(data): sid = request.sid user = connected_users.get(sid) if not user or not user.get("user_id"): return target_name = str(data.get("target", "")).strip() me = db.session.get(User, user["user_id"]) target_user = me.ignoring.filter(db.func.lower(User.username) == target_name.lower()).first() if target_user: me.ignoring.remove(target_user) db.session.commit() emit("ignore_status", {"target": target_user.username, "ignored": False}) @socketio.on("mod_verify") @_require_admin def on_verify(data): target_name = str(data.get("target", "")).strip() target_user = User.query.filter(db.func.lower(User.username) == target_name.lower()).first() if target_user: target_user.is_verified = True db.session.commit() # Update online status if target is currently online as a guest target_sid = username_to_sid.get(target_name.lower()) if target_sid: target_info = connected_users.get(target_sid) if target_info: target_info["is_verified"] = True socketio.emit("system", { "msg": "🎉 **Your account has been verified!** You can now log in to access persistent features and chat with Violet.", "ts": _ts() }, to=target_sid) socketio.emit("system", {"msg": f"✅ **{target_user.username}** has been verified by a moderator.", "ts": _ts()}, to=LOBBY) socketio.emit("nicklist", {"users": _get_nicklist()}, to=LOBBY)