aprhodite/app.py

1166 lines
43 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
app.py Flask-SocketIO backend for SexyChat (Phase 2).
Architecture
------------
Single 'lobby' room ephemeral, nothing persisted.
PM rooms AES-GCM encrypted; persisted for registered users.
AI ('violet') room transit-decrypted for Ollama, re-encrypted for DB.
Inference queue one Ollama request at a time; broadcasts violet_typing.
Socket events (client → server)
--------------------------------
join { mode, username, password?, email?, mod_password? }
message { text }
pm_open { target }
pm_accept { room }
pm_message { room, text? } | { room, ciphertext, nonce }
ai_message { ciphertext, nonce, transit_key }
mod_kick { target }
mod_ban { target }
mod_mute { target }
Socket events (server → client)
--------------------------------
joined { username, is_admin, is_registered, has_ai_access,
ai_messages_used, token? }
nicklist { users }
message { username, text, is_admin, is_registered, ts }
system { msg, ts }
error { msg }
kicked { msg }
pm_invite { from, room }
pm_ready { with, room }
pm_message { from, text?, ciphertext?, nonce?, room, ts }
violet_typing { busy: bool }
ai_response { ciphertext, nonce, ai_messages_used, has_ai_access }
or { error: 'ai_limit_reached' }
ai_unlock { msg }
"""
import os
import time
import hmac
import hashlib
import functools
import logging
from collections import defaultdict
import bcrypt
import eventlet # noqa monkey-patched in start.py before any other import
from eventlet.queue import Queue as EvQueue
from flask import Flask, request, send_from_directory
from flask_socketio import SocketIO, emit, join_room, disconnect
from database import db, init_db
from models import User, Message, UserIgnore, Ban, Mute, VioletHistory
from config import (
SECRET_KEY, ADMIN_PASSWORD, DATABASE_URL, CORS_ORIGINS,
MAX_MSG_LEN, LOBBY, AI_FREE_LIMIT, AI_BOT_NAME,
OLLAMA_URL, VIOLET_MODEL, VIOLET_SYSTEM,
aesgcm_encrypt, aesgcm_decrypt, issue_jwt, verify_jwt,
)
# ─────────────────────────────────────────────────────────────────────────
# Security Logging Setup
# ─────────────────────────────────────────────────────────────────────────
security_logger = logging.getLogger("security")
security_logger.setLevel(logging.INFO)
if not security_logger.handlers:
handler = logging.FileHandler("security.log")
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - [%(name)s] - %(message)s"
)
handler.setFormatter(formatter)
security_logger.addHandler(handler)
# ---------------------------------------------------------------------------
# In-process state
# ---------------------------------------------------------------------------
# sid → { username, ip, is_admin, role, joined_at, user_id, is_registered,
# has_ai_access, ai_messages_used }
connected_users: dict = {}
# role hierarchy higher number = more power
ROLE_POWER = {"user": 0, "mod": 1, "admin": 2, "root": 3}
username_to_sid: dict = {} # lowercase_name → sid
muted_users: set = set()
banned_usernames: set = set()
banned_ips: set = set()
message_timestamps: dict = defaultdict(list)
pending_pm_invites: dict = {} # sid → set of room names they were invited to
RATE_LIMIT = 6
RATE_WINDOW = 5
# AI inference queue (one Ollama call at a time)
ai_queue: EvQueue = EvQueue()
_app_ref = None # set in create_app() for greenlet app-context access
# ---------------------------------------------------------------------------
# Ollama integration
# ---------------------------------------------------------------------------
MAX_HISTORY_PER_USER = 20 # last N turns loaded into Violet prompt
def call_ollama(messages: list) -> str:
"""Call the local Ollama API with a full messages list. Returns plaintext AI response."""
import requests as req
try:
resp = req.post(
f"{OLLAMA_URL}/api/chat",
json={
"model": VIOLET_MODEL,
"messages": messages,
"stream": False,
"options": {"temperature": 0.88, "num_predict": 120},
},
timeout=90,
)
resp.raise_for_status()
return resp.json()["message"]["content"].strip()
except Exception as exc:
print(f"[Violet/Ollama] error: {exc}")
return "Give me just a moment, darling... 💜"
def _load_violet_history(user_id: int) -> list:
"""Load recent conversation turns from DB. Returns list of {role, content} dicts."""
rows = (
VioletHistory.query
.filter_by(user_id=user_id)
.order_by(VioletHistory.id.desc())
.limit(MAX_HISTORY_PER_USER)
.all()
)
return [{"role": r.role, "content": r.text} for r in reversed(rows)]
def _save_violet_turn(user_id: int, role: str, text: str) -> None:
"""Persist a single conversation turn."""
db.session.add(VioletHistory(user_id=user_id, role=role, text=text))
db.session.commit()
# ---------------------------------------------------------------------------
# AI inference queue worker (single greenlet, serialises Ollama calls)
# ---------------------------------------------------------------------------
def _ai_worker() -> None:
"""Eventlet greenlet drains ai_queue one task at a time."""
global _app_ref
while True:
task = ai_queue.get() # blocks cooperatively until item available
sid = task["sid"]
try:
# Derive room name (needs app context for DB lookup)
with _app_ref.app_context():
if task.get("user_id"):
db_user = db.session.get(User, task["user_id"])
room = _pm_room(db_user.username, AI_BOT_NAME) if db_user else None
else:
uname = connected_users.get(sid, {}).get("username", "unknown")
room = _pm_room(uname, AI_BOT_NAME)
# ── Announce Violet is busy ───────────────────────────────────────
if room:
socketio.emit("violet_typing", {"busy": True, "room": room}, to=room)
else:
socketio.emit("violet_typing", {"busy": True})
# ── Decrypt user message (transit; key never stored) ──────────────
plaintext = aesgcm_decrypt(
task["transit_key"], task["ciphertext"], task["nonce_val"]
)
# ── Build messages array with history ─────────────────────────────
messages = [{"role": "system", "content": VIOLET_SYSTEM}]
if task.get("user_id"):
with _app_ref.app_context():
messages.extend(_load_violet_history(task["user_id"]))
messages.append({"role": "user", "content": plaintext})
ai_text = call_ollama(messages)
# ── Save conversation turns ───────────────────────────────────────
if task.get("user_id"):
with _app_ref.app_context():
_save_violet_turn(task["user_id"], "user", plaintext)
_save_violet_turn(task["user_id"], "assistant", ai_text)
# ── Re-encrypt AI response ────────────────────────────────────────
resp_ct, resp_nonce = aesgcm_encrypt(task["transit_key"], ai_text)
ai_messages_used = task.get("ai_messages_used", 0)
has_ai_access = task.get("has_ai_access", False)
# ── DB operations (need explicit app context in greenlet) ─────────
with _app_ref.app_context():
bot = User.query.filter_by(username=AI_BOT_NAME).first()
if bot and task.get("user_id"):
_save_pm(task["user_id"], bot.id,
task["ciphertext"], task["nonce_val"]) # user → Violet
_save_pm(bot.id, task["user_id"],
resp_ct, resp_nonce) # Violet → user
if task.get("user_id") and not has_ai_access:
db_user = db.session.get(User, task["user_id"])
if db_user and not db_user.has_ai_access:
db_user.ai_messages_used = min(
db_user.ai_messages_used + 1, AI_FREE_LIMIT
)
db.session.commit()
ai_messages_used = db_user.ai_messages_used
has_ai_access = db_user.has_ai_access
# Update in-process cache
if sid in connected_users:
connected_users[sid]["ai_messages_used"] = ai_messages_used
connected_users[sid]["has_ai_access"] = has_ai_access
# ── Emit response to originating client ───────────────────────────
if task.get("plaintext_mode"):
socketio.emit("pm_message", {
"from": AI_BOT_NAME,
"text": ai_text,
"room": room,
"ts": _ts()
}, to=room)
else:
socketio.emit("pm_message", {
"from": AI_BOT_NAME,
"ciphertext": resp_ct,
"nonce": resp_nonce,
"room": room,
"ts": _ts()
}, to=room)
socketio.emit("violet_typing", {"busy": False, "room": room}, to=room)
except Exception as exc:
import traceback; traceback.print_exc()
# Try to send error feedback to user
try:
uname = connected_users.get(sid, {}).get("username", "unknown")
room = _pm_room(uname, AI_BOT_NAME)
socketio.emit("pm_message", {
"from": AI_BOT_NAME,
"text": "Mmm, something went wrong, darling 💜",
"room": room,
"ts": _ts()
}, to=room)
socketio.emit("violet_typing", {"busy": False, "room": room}, to=room)
except Exception:
pass
finally:
ai_queue.task_done()
# ---------------------------------------------------------------------------
# General helpers
# ---------------------------------------------------------------------------
def _pm_room(a: str, b: str) -> str:
return "pm:" + ":".join(sorted([a.lower(), b.lower()]))
def _pm_room_key(room: str) -> str:
"""Derive a deterministic AES-256 key for a PM room.
Uses HMAC-SHA256 keyed with JWT_SECRET so the same room always gets
the same key (allowing history to be decrypted across sessions).
The server mediates the key this is NOT end-to-end, but it fixes
the broken cross-user decryption while matching the existing trust model.
"""
from config import JWT_SECRET
raw = hmac.new(JWT_SECRET.encode(), room.encode(), hashlib.sha256).digest()
import base64
return base64.b64encode(raw).decode()
def _get_nicklist() -> list:
users = []
for info in connected_users.values():
if not info.get("username"):
continue
users.append({
"username": info["username"],
"is_admin": info["is_admin"],
"is_registered": info.get("is_registered", False),
"is_verified": info.get("is_verified", False),
"role": info.get("role", "user"),
})
# Static "Violet" AI user
users.append({
"username": AI_BOT_NAME,
"is_admin": False,
"is_registered": True,
"is_verified": True,
"is_ai": True,
"role": "user",
})
return sorted(users, key=lambda u: u["username"].lower())
def _require_admin(f):
@functools.wraps(f)
def wrapped(*args, **kwargs):
user = connected_users.get(request.sid)
if not user or not user.get("is_admin"):
emit("error", {"msg": "Forbidden."})
return
return f(*args, **kwargs)
return wrapped
def _require_role(min_role):
"""Decorator: require at least min_role power level."""
def decorator(f):
@functools.wraps(f)
def wrapped(*args, **kwargs):
user = connected_users.get(request.sid)
if not user:
emit("error", {"msg": "Forbidden."}); return
user_power = ROLE_POWER.get(user.get("role", "user"), 0)
needed = ROLE_POWER.get(min_role, 0)
if user_power < needed:
emit("error", {"msg": "Forbidden."}); return
return f(*args, **kwargs)
return wrapped
return decorator
def _rate_limited(sid: str) -> bool:
now = time.time()
message_timestamps[sid] = [t for t in message_timestamps[sid] if now - t < RATE_WINDOW]
if len(message_timestamps[sid]) >= RATE_LIMIT:
return True
message_timestamps[sid].append(now)
return False
def _ts() -> str:
return time.strftime("%H:%M", time.localtime())
def _do_disconnect(sid: str) -> None:
try:
socketio.server.disconnect(sid)
except Exception:
pass
def _save_pm(sender_id: int, recipient_id: int,
encrypted_content: str, nonce: str) -> None:
msg = Message(
sender_id=sender_id,
recipient_id=recipient_id,
encrypted_content=encrypted_content,
nonce=nonce,
)
db.session.add(msg)
db.session.commit()
# ---------------------------------------------------------------------------
# SocketIO instance
# ---------------------------------------------------------------------------
socketio = SocketIO()
# ---------------------------------------------------------------------------
# App factory
# ---------------------------------------------------------------------------
def create_app() -> Flask:
global _app_ref
app = Flask(__name__, static_folder="static", template_folder=".")
app.config.update(
SECRET_KEY=SECRET_KEY,
SQLALCHEMY_DATABASE_URI=DATABASE_URL,
SQLALCHEMY_TRACK_MODIFICATIONS=False,
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE="Lax",
)
init_db(app)
_app_ref = app
# Load persisted bans and mutes from the database
with app.app_context():
for ban in Ban.query.all():
banned_usernames.add(ban.username.lower())
if ban.ip:
banned_ips.add(ban.ip)
for mute in Mute.query.all():
muted_users.add(mute.username.lower())
msg_queue = (
os.environ.get("SOCKETIO_MESSAGE_QUEUE")
or os.environ.get("REDIS_URL")
or None
)
socketio.init_app(
app,
async_mode="eventlet",
cors_allowed_origins=CORS_ORIGINS,
message_queue=msg_queue,
logger=False,
engineio_logger=False,
)
# Start the AI inference queue worker greenlet
eventlet.spawn(_ai_worker)
from routes import api
app.register_blueprint(api)
@app.route("/")
def index():
return send_from_directory(".", "index.html")
return app
# ---------------------------------------------------------------------------
# Socket connection lifecycle
# ---------------------------------------------------------------------------
@socketio.on("connect")
def on_connect(auth=None):
sid = request.sid
ip = request.environ.get("HTTP_X_FORWARDED_FOR", request.remote_addr)
if ip in banned_ips:
emit("error", {"msg": "You are banned."})
disconnect()
return False
user_id = None; is_registered = False
has_ai_access = False; ai_used = 0; jwt_username = None
if auth and isinstance(auth, dict) and auth.get("token"):
payload = verify_jwt(auth["token"])
if payload:
db_user = db.session.get(User, payload.get("user_id"))
if db_user:
user_id = db_user.id
is_registered = True
has_ai_access = db_user.has_ai_access
ai_used = db_user.ai_messages_used
jwt_username = db_user.username
connected_users[sid] = {
"username": None,
"ip": ip,
"is_admin": False,
"role": "user",
"joined_at": time.time(),
"user_id": user_id,
"is_registered": is_registered,
"has_ai_access": has_ai_access,
"ai_messages_used": ai_used,
"_jwt_username": jwt_username,
}
@socketio.on("disconnect")
def on_disconnect():
sid = request.sid
user = connected_users.pop(sid, None)
message_timestamps.pop(sid, None)
pending_pm_invites.pop(sid, None)
if user and user.get("username"):
lower = user["username"].lower()
username_to_sid.pop(lower, None)
socketio.emit("system", {"msg": f"**{user['username']}** left the room.", "ts": _ts()}, to=LOBBY)
socketio.emit("nicklist", {"users": _get_nicklist()}, to=LOBBY)
# ---------------------------------------------------------------------------
# Join / auth
# ---------------------------------------------------------------------------
@socketio.on("join")
def on_join(data):
sid = request.sid
user = connected_users.get(sid)
if not user:
return
mode = str(data.get("mode", "guest")).strip()
username = str(data.get("username", "")).strip()[:20]
password = str(data.get("password", "")).strip()
email = str(data.get("email", "")).strip()[:255] or None
token = None
db_user = None
if mode == "register":
if not username or not username.replace("_","").replace("-","").isalnum():
security_logger.warning(f"REGISTER_FAIL: Invalid username format from IP {request.remote_addr}")
emit("error", {"msg": "Invalid username."}); return
if len(password) < 6:
emit("error", {"msg": "Password must be at least 6 characters."}); return
if username.lower() == AI_BOT_NAME.lower():
emit("error", {"msg": "That username is reserved."}); return
if User.query.filter(db.func.lower(User.username) == username.lower()).first():
security_logger.info(f"REGISTER_FAIL: Duplicate username {username}")
emit("error", {"msg": "Username already registered."}); return
hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
db_user = User(username=username, password_hash=hashed, email=email)
db.session.add(db_user); db.session.commit()
user.update(user_id=db_user.id, is_registered=True,
has_ai_access=False, ai_messages_used=0)
token = issue_jwt(db_user.id, db_user.username)
security_logger.info(f"REGISTER_SUCCESS: {username} from IP {request.remote_addr}")
elif mode == "login":
db_user = User.query.filter(
db.func.lower(User.username) == username.lower()
).first()
if not db_user or not bcrypt.checkpw(password.encode(), db_user.password_hash.encode()):
security_logger.warning(f"LOGIN_FAIL: Invalid credentials for {username} from IP {request.remote_addr}")
emit("error", {"msg": "Invalid username or password."}); return
if not db_user.is_verified:
security_logger.info(f"LOGIN_FAIL: Unverified account {username}")
emit("error", {"msg": "Account pending manual verification by a moderator."}); return
username = db_user.username
user["user_id"] = db_user.id
user["is_registered"] = True
user["has_ai_access"] = db_user.has_ai_access
user["ai_messages_used"] = db_user.ai_messages_used
token = issue_jwt(db_user.id, db_user.username)
security_logger.info(f"LOGIN_SUCCESS: {username} from IP {request.remote_addr}")
elif mode == "restore":
if not user.get("user_id"):
emit("error", {"msg": "Session expired. Please log in again."}); return
db_user = db.session.get(User, user["user_id"])
if not db_user:
emit("error", {"msg": "Account not found."}); return
if not db_user.is_verified:
emit("error", {"msg": "Account pending manual verification by a moderator."}); return
username = db_user.username
user["has_ai_access"] = db_user.has_ai_access
user["ai_messages_used"] = db_user.ai_messages_used
token = issue_jwt(db_user.id, db_user.username)
else: # guest
if not username or not username.replace("_","").replace("-","").isalnum():
emit("error", {"msg": "Invalid username. Use letters, numbers, - or _."}); return
lower = username.lower()
if lower in banned_usernames:
emit("error", {"msg": "That username is banned."}); return
if lower in username_to_sid and username_to_sid[lower] != sid:
emit("error", {"msg": "Username already in use."}); return
# Derive role from DB (root/admin/mod grant is_admin automatically)
db_role = db_user.role if db_user else "user"
is_admin = ROLE_POWER.get(db_role, 0) >= ROLE_POWER["mod"]
# Legacy mod-password fallback for guests (temporary mod access)
mod_pw = str(data.get("mod_password", "")).strip()
if mod_pw and mod_pw == ADMIN_PASSWORD and not is_admin:
is_admin = True
if db_role == "user":
db_role = "mod" # temporary elevation for the session
user["username"] = username
user["is_admin"] = is_admin
user["role"] = db_role
user["is_verified"] = db_user.is_verified if db_user else True
username_to_sid[lower] = sid
join_room(LOBBY)
# Role badge for join message
role_icon = {"root": "👑 ", "admin": "⚔️ ", "mod": "🛡️ "}.get(db_role, "")
emit("joined", {
"username": username,
"is_admin": is_admin,
"role": db_role,
"is_registered": user["is_registered"],
"has_ai_access": user["has_ai_access"],
"ai_messages_used": user["ai_messages_used"],
"email": db_user.email if db_user else None,
"token": token,
"ignored_list": [u.username for u in db_user.ignoring] if db_user else []
})
emit("system", {
"msg": f"{role_icon}**{username}** joined the room.",
"ts": _ts(),
}, to=LOBBY)
socketio.emit("nicklist", {"users": _get_nicklist()}, to=LOBBY)
# ---------------------------------------------------------------------------
# Lobby (ephemeral never persisted)
# ---------------------------------------------------------------------------
@socketio.on("message")
def on_message(data):
sid = request.sid
user = connected_users.get(sid)
if not user or not user.get("username"):
return
if user["username"].lower() in muted_users:
emit("error", {"msg": "You are muted."}); return
if _rate_limited(sid):
emit("error", {"msg": "Slow down!"}); return
text = str(data.get("text", "")).strip()[:MAX_MSG_LEN]
if not text:
return
emit("message", {
"username": user["username"],
"text": text,
"is_admin": user["is_admin"],
"is_registered": user.get("is_registered", False),
"ts": _ts(),
}, to=LOBBY)
# ---------------------------------------------------------------------------
# Private Messaging
# ---------------------------------------------------------------------------
@socketio.on("pm_open")
def on_pm_open(data):
sid = request.sid
user = connected_users.get(sid)
if not user or not user["username"]:
return
target = str(data.get("target", "")).strip()
target_sid = username_to_sid.get(target.lower())
# "Violet" virtual connection
if not target_sid and target.lower() == AI_BOT_NAME.lower():
# She's always online
pass
elif not target_sid:
emit("error", {"msg": f"{target} is not online."}); return
# Check if target has ignored me
target_info = connected_users.get(target_sid)
if target_info and target_info.get("user_id"):
target_db = db.session.get(User, target_info["user_id"])
me_db = User.query.filter(db.func.lower(User.username) == user["username"].lower()).first()
if target_db and me_db and me_db in target_db.ignoring:
# We don't want to tell the requester they are ignored (stealth)
# but we just don't send the invite.
# Actually, to make it clear why nothing is happening:
emit("error", {"msg": f"{target} is not accepting messages from you right now."})
return
room = _pm_room(user["username"], target)
join_room(room)
room_key = _pm_room_key(room)
if target_sid:
pending_pm_invites.setdefault(target_sid, set()).add(room)
socketio.emit("pm_invite", {"from": user["username"], "room": room, "room_key": room_key}, to=target_sid)
emit("pm_ready", {"with": target, "room": room, "room_key": room_key})
@socketio.on("pm_accept")
def on_pm_accept(data):
sid = request.sid
room = str(data.get("room", ""))
allowed = pending_pm_invites.get(sid, set())
if room not in allowed:
emit("error", {"msg": "Invalid or expired PM invitation."})
return
allowed.discard(room)
join_room(room)
@socketio.on("pm_message")
def on_pm_message(data):
sid = request.sid
user = connected_users.get(sid)
if not user or not user["username"]:
return
room = str(data.get("room", ""))
if not room.startswith("pm:"):
return
ciphertext = data.get("ciphertext", "")
nonce_val = data.get("nonce", "")
text = str(data.get("text", "")).strip()[:MAX_MSG_LEN]
is_encrypted = bool(ciphertext and nonce_val)
if not is_encrypted and not text:
return
ts = _ts()
payload = (
{"from": user["username"], "ciphertext": ciphertext,
"nonce": nonce_val, "room": room, "ts": ts}
if is_encrypted else
{"from": user["username"], "text": text, "room": room, "ts": ts}
)
# Route to AI if recipient is Violet
if room.endswith(f":{AI_BOT_NAME.lower()}"):
if not user.get("user_id") and not user.get("is_admin"):
# Echo their message, then reply as Violet
emit("pm_message", payload, to=sid)
emit("pm_message", {
"from": AI_BOT_NAME,
"text": "Hey hun 💜 You'll need to register an account before we can chat. "
"Go back and sign up — I'll be waiting for you! 😘",
"room": room,
"ts": _ts(),
}, to=sid)
return
if not user.get("has_ai_access") and user.get("ai_messages_used", 0) >= AI_FREE_LIMIT:
username = user.get("username", "unknown")
security_logger.warning(f"AI_LIMIT_REACHED: {username} tried to use AI after free trial exhausted")
emit("ai_response", {"error": "ai_limit_reached", "room": room}, to=sid)
return
# Echo the user's own message back so it appears in their chat
emit("pm_message", payload, to=sid)
transit_key = data.get("transit_key", "")
if not all([ciphertext, nonce_val, transit_key]):
# Plaintext fallback (e.g. session restore without crypto key)
if text:
import base64 as _b64
transit_key = _b64.b64encode(os.urandom(32)).decode()
ciphertext_new, nonce_new = aesgcm_encrypt(transit_key, text)
ai_queue.put({
"sid": sid,
"user_id": user.get("user_id"),
"has_ai_access": user.get("has_ai_access", False),
"ai_messages_used": user.get("ai_messages_used", 0),
"ciphertext": ciphertext_new,
"nonce_val": nonce_new,
"transit_key": transit_key,
"plaintext_mode": True,
})
return
emit("error", {"msg": "Message cannot be empty."}); return
ai_queue.put({
"sid": sid,
"user_id": user["user_id"],
"has_ai_access": user["has_ai_access"],
"ai_messages_used": user["ai_messages_used"],
"ciphertext": ciphertext,
"nonce_val": nonce_val,
"transit_key": transit_key,
})
return # ai_worker will handle the delivery
emit("pm_message", payload, to=room)
if is_encrypted and user.get("user_id"):
parts = room.split(":")[1:]
my_lower = user["username"].lower()
other_low = next((p for p in parts if p != my_lower), None)
if other_low:
other_sid = username_to_sid.get(other_low)
other_info = connected_users.get(other_sid, {}) if other_sid else {}
if other_info.get("user_id"):
_save_pm(user["user_id"], other_info["user_id"],
ciphertext, nonce_val)
# ---------------------------------------------------------------------------
# AI Violet (queued Ollama inference)
# ---------------------------------------------------------------------------
@socketio.on("ai_message")
def on_ai_message(data):
# Deprecated: use pm_message to Violet instead
pass
@socketio.on("violet_reset")
def on_violet_reset(_data=None):
sid = request.sid
user = connected_users.get(sid)
if not user or not user.get("user_id"):
emit("error", {"msg": "You must be registered to reset Violet history."}); return
user_id = user["user_id"]
VioletHistory.query.filter_by(user_id=user_id).delete()
db.session.commit()
room = _pm_room(user["username"], AI_BOT_NAME)
emit("pm_message", {
"from": AI_BOT_NAME,
"text": "Memory cleared, darling. Let's start fresh! 💜",
"room": room,
"ts": _ts(),
}, to=sid)
# ---------------------------------------------------------------------------
# Mod tools
# ---------------------------------------------------------------------------
@socketio.on("mod_kick")
@_require_admin
def on_kick(data):
target = str(data.get("target", "")).strip()
target_sid = username_to_sid.get(target.lower())
if not target_sid:
emit("error", {"msg": f"{target} is not online."}); return
# Security logging
mod_name = connected_users.get(request.sid, {}).get("username", "unknown")
security_logger.warning(f"MOD_KICK: {mod_name} kicked {target}")
socketio.emit("kicked", {"msg": "You have been kicked by a moderator."}, to=target_sid)
socketio.emit("system", {"msg": f"🚫 **{target}** was kicked.", "ts": _ts()}, to=LOBBY)
eventlet.spawn_after(0.5, _do_disconnect, target_sid)
@socketio.on("mod_ban")
@_require_admin
def on_ban(data):
target = str(data.get("target", "")).strip()
lower = target.lower()
banned_usernames.add(lower)
ip = None
target_sid = username_to_sid.get(lower)
if target_sid:
info = connected_users.get(target_sid, {})
if info.get("ip"):
banned_ips.add(info["ip"])
ip = info["ip"]
socketio.emit("kicked", {"msg": "You have been banned."}, to=target_sid)
eventlet.spawn_after(0.5, _do_disconnect, target_sid)
# Security logging
mod_name = connected_users.get(request.sid, {}).get("username", "unknown")
security_logger.warning(f"MOD_BAN: {mod_name} banned {target} (IP: {ip})")
# Persist to DB
if not Ban.query.filter_by(username=lower).first():
db.session.add(Ban(username=lower, ip=ip))
db.session.commit()
socketio.emit("system", {"msg": f"🔨 **{target}** was banned.", "ts": _ts()}, to=LOBBY)
@socketio.on("mod_mute")
@_require_admin
def on_mute(data):
target = str(data.get("target", "")).strip()
lower = target.lower()
if lower in muted_users:
muted_users.discard(lower)
Mute.query.filter_by(username=lower).delete()
db.session.commit()
action = "unmuted"
else:
muted_users.add(lower)
if not Mute.query.filter_by(username=lower).first():
db.session.add(Mute(username=lower))
db.session.commit()
action = "muted"
emit("system", {"msg": f"🔇 **{target}** was {action}.", "ts": _ts()}, to=LOBBY)
@socketio.on("mod_kickban")
@_require_admin
def on_kickban(data):
target = str(data.get("target", "")).strip()
lower = target.lower()
# Ban
banned_usernames.add(lower)
ip = None
target_sid = username_to_sid.get(lower)
if target_sid:
info = connected_users.get(target_sid, {})
if info.get("ip"):
banned_ips.add(info["ip"])
ip = info["ip"]
socketio.emit("kicked", {"msg": "You have been banned."}, to=target_sid)
eventlet.spawn_after(0.5, _do_disconnect, target_sid)
# Persist to DB
if not Ban.query.filter_by(username=lower).first():
db.session.add(Ban(username=lower, ip=ip))
db.session.commit()
# Announce
socketio.emit("system", {"msg": f"💀 **{target}** was kickbanned.", "ts": _ts()}, to=LOBBY)
@socketio.on("user_ignore")
def on_ignore(data):
sid = request.sid
user = connected_users.get(sid)
if not user or not user.get("user_id"):
return
target_name = str(data.get("target", "")).strip()
target_user = User.query.filter(db.func.lower(User.username) == target_name.lower()).first()
if target_user:
me = db.session.get(User, user["user_id"])
if target_user not in me.ignoring:
me.ignoring.append(target_user)
db.session.commit()
emit("ignore_status", {"target": target_user.username, "ignored": True})
@socketio.on("user_unignore")
def on_unignore(data):
sid = request.sid
user = connected_users.get(sid)
if not user or not user.get("user_id"):
return
target_name = str(data.get("target", "")).strip()
me = db.session.get(User, user["user_id"])
target_user = me.ignoring.filter(db.func.lower(User.username) == target_name.lower()).first()
if target_user:
me.ignoring.remove(target_user)
db.session.commit()
emit("ignore_status", {"target": target_user.username, "ignored": False})
@socketio.on("mod_verify")
@_require_admin
def on_verify(data):
target_name = str(data.get("target", "")).strip()
target_user = User.query.filter(db.func.lower(User.username) == target_name.lower()).first()
if target_user:
target_user.is_verified = True
db.session.commit()
# Update online status if target is currently online as a guest
target_sid = username_to_sid.get(target_name.lower())
if target_sid:
target_info = connected_users.get(target_sid)
if target_info:
target_info["is_verified"] = True
socketio.emit("system", {"msg": f"✅ **{target_user.username}** has been verified by a moderator.", "ts": _ts()}, to=LOBBY)
socketio.emit("nicklist", {"users": _get_nicklist()}, to=LOBBY)
# ---------------------------------------------------------------------------
# Account management
# ---------------------------------------------------------------------------
@socketio.on("change_password")
def on_change_password(data):
sid = request.sid
user = connected_users.get(sid)
if not user or not user.get("user_id"):
emit("password_changed", {"success": False, "msg": "You must be registered."})
return
old_pw = str(data.get("old_password", ""))
new_pw = str(data.get("new_password", ""))
if not old_pw or not new_pw:
emit("password_changed", {"success": False, "msg": "Both fields are required."})
return
if len(new_pw) < 6:
emit("password_changed", {"success": False, "msg": "Password must be at least 6 characters."})
return
db_user = db.session.get(User, user["user_id"])
if not db_user:
emit("password_changed", {"success": False, "msg": "User not found."})
return
if not bcrypt.checkpw(old_pw.encode("utf-8"), db_user.password_hash.encode("utf-8")):
emit("password_changed", {"success": False, "msg": "Current password is incorrect."})
return
db_user.password_hash = bcrypt.hashpw(new_pw.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
db.session.commit()
emit("password_changed", {"success": True})
# ---------------------------------------------------------------------------
# Admin panel
# ---------------------------------------------------------------------------
@socketio.on("admin_get_users")
@_require_role("mod")
def on_admin_get_users(_data=None):
"""Send the full user list to the admin panel."""
users = User.query.order_by(User.id).all()
result = []
for u in users:
if u.username == AI_BOT_NAME:
continue
online_sid = username_to_sid.get(u.username.lower())
result.append({
"id": u.id,
"username": u.username,
"role": u.role,
"is_verified": u.is_verified,
"has_ai_access": u.has_ai_access,
"email": u.email or "",
"created_at": u.created_at.strftime("%Y-%m-%d"),
"online": online_sid is not None,
})
emit("admin_users", {"users": result})
@socketio.on("admin_get_bans")
@_require_role("mod")
def on_admin_get_bans(_data=None):
bans = Ban.query.order_by(Ban.created_at.desc()).all()
emit("admin_bans", {"bans": [
{"id": b.id, "username": b.username, "ip": b.ip or "", "reason": b.reason or "",
"created_at": b.created_at.strftime("%Y-%m-%d")} for b in bans
]})
@socketio.on("admin_get_mutes")
@_require_role("mod")
def on_admin_get_mutes(_data=None):
mutes = Mute.query.order_by(Mute.created_at.desc()).all()
emit("admin_mutes", {"mutes": [
{"id": m.id, "username": m.username,
"created_at": m.created_at.strftime("%Y-%m-%d")} for m in mutes
]})
@socketio.on("admin_set_role")
@_require_role("admin")
def on_admin_set_role(data):
"""Change a user's role. Only root can set admin/root. Admins can set mod/user."""
sid = request.sid
me = connected_users.get(sid)
my_power = ROLE_POWER.get(me.get("role", "user"), 0)
target_id = int(data.get("user_id", 0))
new_role = str(data.get("role", "")).strip().lower()
if new_role not in ROLE_POWER:
emit("error", {"msg": "Invalid role."}); return
target_power = ROLE_POWER[new_role]
if target_power >= my_power:
emit("error", {"msg": "Cannot assign a role equal/above your own."}); return
target_user = db.session.get(User, target_id)
if not target_user:
emit("error", {"msg": "User not found."}); return
# Can't change someone with equal or higher power
if ROLE_POWER.get(target_user.role, 0) >= my_power:
emit("error", {"msg": "Cannot modify a user with equal/higher privileges."}); return
target_user.role = new_role
db.session.commit()
# Update live session if they're online
target_sid = username_to_sid.get(target_user.username.lower())
if target_sid and target_sid in connected_users:
connected_users[target_sid]["role"] = new_role
connected_users[target_sid]["is_admin"] = ROLE_POWER[new_role] >= ROLE_POWER["mod"]
# Notify the target user of their new role
socketio.emit("role_updated", {"role": new_role}, to=target_sid)
socketio.emit("system", {
"msg": f"⚙️ **{target_user.username}** is now **{new_role}**.",
"ts": _ts()
}, to=LOBBY)
socketio.emit("nicklist", {"users": _get_nicklist()}, to=LOBBY)
emit("admin_action_ok", {"msg": f"{target_user.username}{new_role}"})
@socketio.on("admin_verify_user")
@_require_role("mod")
def on_admin_verify(data):
target_id = int(data.get("user_id", 0))
target_user = db.session.get(User, target_id)
if not target_user:
emit("error", {"msg": "User not found."}); return
target_user.is_verified = not target_user.is_verified
db.session.commit()
status = "verified" if target_user.is_verified else "unverified"
target_sid = username_to_sid.get(target_user.username.lower())
if target_sid and target_sid in connected_users:
connected_users[target_sid]["is_verified"] = target_user.is_verified
socketio.emit("system", {
"msg": f"{'' if target_user.is_verified else ''} **{target_user.username}** was {status}.",
"ts": _ts()
}, to=LOBBY)
socketio.emit("nicklist", {"users": _get_nicklist()}, to=LOBBY)
emit("admin_action_ok", {"msg": f"{target_user.username}{status}"})
@socketio.on("admin_toggle_ai")
@_require_role("admin")
def on_admin_toggle_ai(data):
target_id = int(data.get("user_id", 0))
target_user = db.session.get(User, target_id)
if not target_user:
emit("error", {"msg": "User not found."}); return
target_user.has_ai_access = not target_user.has_ai_access
db.session.commit()
target_sid = username_to_sid.get(target_user.username.lower())
if target_sid and target_sid in connected_users:
connected_users[target_sid]["has_ai_access"] = target_user.has_ai_access
# Notify target so their UI updates immediately
socketio.emit("ai_unlock", {
"has_ai_access": target_user.has_ai_access,
"msg": "Premium access granted! Unlimited Violet." if target_user.has_ai_access else "Premium access revoked."
}, to=target_sid)
status = "granted" if target_user.has_ai_access else "revoked"
emit("admin_action_ok", {"msg": f"AI access {status} for {target_user.username}"})
@socketio.on("admin_unban")
@_require_role("mod")
def on_admin_unban(data):
ban_id = int(data.get("ban_id", 0))
ban = db.session.get(Ban, ban_id)
if not ban:
emit("error", {"msg": "Ban not found."}); return
banned_usernames.discard(ban.username.lower())
if ban.ip:
banned_ips.discard(ban.ip)
db.session.delete(ban)
db.session.commit()
socketio.emit("system", {
"msg": f"🔓 **{ban.username}** was unbanned.",
"ts": _ts()
}, to=LOBBY)
emit("admin_action_ok", {"msg": f"Unbanned {ban.username}"})
@socketio.on("admin_unmute")
@_require_role("mod")
def on_admin_unmute(data):
mute_id = int(data.get("mute_id", 0))
mute = db.session.get(Mute, mute_id)
if not mute:
emit("error", {"msg": "Mute not found."}); return
muted_users.discard(mute.username.lower())
db.session.delete(mute)
db.session.commit()
socketio.emit("system", {
"msg": f"🔊 **{mute.username}** was unmuted.",
"ts": _ts()
}, to=LOBBY)
emit("admin_action_ok", {"msg": f"Unmuted {mute.username}"})