import json import os import re import secrets import sqlite3 import sys import threading import time import uuid import datetime from flask import Flask, render_template, request, jsonify, abort, Response, g, session, redirect, url_for from werkzeug.security import check_password_hash _UTC = datetime.timezone.utc # ── Load configuration ──────────────────────────────────────────────────────── _CONFIG_PATH = os.path.join(os.path.dirname(__file__), 'config.json') def _load_config(): with open(_CONFIG_PATH, 'r', encoding='utf-8') as f: return json.load(f) try: CFG = _load_config() except (FileNotFoundError, json.JSONDecodeError) as e: raise RuntimeError(f"Failed to load config.json: {e}") from e _site = CFG['site'] _server = CFG['server'] _db_cfg = CFG['database'] _pastes = CFG['pastes'] _theme = CFG['theme'] _feat = CFG['features'] _ui = CFG['ui'] # ── Flask app setup ─────────────────────────────────────────────────────────── app = Flask(__name__) # Prefer SECRET_KEY from the environment; fall back to config.json. # Refuse to start if the key is still the default placeholder. _secret_key = os.environ.get('SECRET_KEY') or _server.get('secret_key', '') _DEFAULT_KEYS = {'', 'change-me', 'change-this-to-a-long-random-secret'} if _secret_key in _DEFAULT_KEYS: raise RuntimeError( 'SECRET_KEY is not set or is still the default placeholder. ' 'Set a long random value in the SECRET_KEY environment variable ' 'or in config.json before starting the server.' ) app.config['SECRET_KEY'] = _secret_key # ── Session hardening ───────────────────────────────────────────────────────── app.config['SESSION_COOKIE_HTTPONLY'] = True app.config['SESSION_COOKIE_SECURE'] = _server.get('use_https', False) app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' if _server.get('debug', False): print('WARNING: debug=true is set in config.json — never use debug mode in production!', file=sys.stderr, flush=True) _BASE_DIR = os.path.dirname(os.path.abspath(__file__)) _raw_db_path = _db_cfg.get('path', 'bastebin.db') DATABASE = os.path.realpath(os.path.join(_BASE_DIR, _raw_db_path)) if not DATABASE.startswith(_BASE_DIR): raise RuntimeError(f"Database path '{DATABASE}' must be within the project directory.") MAX_ENCRYPTED_BYTES = _pastes.get('max_size_bytes', 2 * 1024 * 1024) MAX_TITLE_BYTES = 200 _ENCRYPTED_RE = re.compile(r'^[A-Za-z0-9_-]+:[A-Za-z0-9_-]+$') _LANGUAGE_RE = re.compile(r'^[a-z0-9_+-]{1,32}$') _PASTE_ID_RE = re.compile(r'^[0-9a-f]{4,32}$') # ── Security headers ───────────────────────────────────────────────────────── @app.before_request def _generate_csp_nonce(): """Generate a unique nonce per request for the Content-Security-Policy.""" g.csp_nonce = secrets.token_urlsafe(16) @app.after_request def set_security_headers(response): nonce = getattr(g, 'csp_nonce', '') response.headers['X-Frame-Options'] = 'DENY' response.headers['X-Content-Type-Options'] = 'nosniff' response.headers['Referrer-Policy'] = 'same-origin' response.headers['Content-Security-Policy'] = ( "default-src 'self'; " f"script-src 'self' https://cdnjs.cloudflare.com 'nonce-{nonce}'; " "style-src 'self' https://cdnjs.cloudflare.com 'unsafe-inline'; " "img-src 'self' data:; " "connect-src 'self' blob:; " "object-src 'none'; " "base-uri 'self'; " "frame-ancestors 'none';" ) return response # ── Jinja helpers ───────────────────────────────────────────────────────────── @app.context_processor def inject_config(): return {'cfg': CFG, 'csp_nonce': getattr(g, 'csp_nonce', '')} # ── Database ────────────────────────────────────────────────────────────────── def get_db_connection(): conn = sqlite3.connect(DATABASE, timeout=10) conn.row_factory = sqlite3.Row conn.execute('PRAGMA journal_mode=WAL') return conn _db_init_lock = threading.Lock() _db_initialized = False def init_db(): """Initialise the database. Thread-safe and idempotent: runs DDL only once per process.""" global _db_initialized with _db_init_lock: if _db_initialized: return conn = sqlite3.connect(DATABASE) try: cursor = conn.cursor() cursor.execute("PRAGMA table_info(pastes)") columns = {row[1] for row in cursor.fetchall()} if columns and 'content' in columns and 'encrypted_data' not in columns: cursor.execute("DROP TABLE pastes") cursor.execute(''' CREATE TABLE IF NOT EXISTS pastes ( id TEXT PRIMARY KEY, encrypted_data TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, expires_at TIMESTAMP, views INTEGER DEFAULT 0, deletion_token TEXT, discussions_enabled INTEGER DEFAULT 0 ) ''') # Migration: add deletion_token column to existing table if missing if columns and 'id' in columns and 'deletion_token' not in columns: cursor.execute('ALTER TABLE pastes ADD COLUMN deletion_token TEXT') # Migration: add discussions_enabled column to existing table if missing if columns and 'id' in columns and 'discussions_enabled' not in columns: cursor.execute('ALTER TABLE pastes ADD COLUMN discussions_enabled INTEGER DEFAULT 0') cursor.execute(''' CREATE TABLE IF NOT EXISTS rate_limits ( ip_address TEXT, timestamp REAL, PRIMARY KEY (ip_address, timestamp) ) ''') cursor.execute('CREATE INDEX IF NOT EXISTS idx_rate_limit_ts ON rate_limits(timestamp)') cursor.execute(''' CREATE TABLE IF NOT EXISTS comments ( id TEXT PRIMARY KEY, paste_id TEXT NOT NULL, content TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute('CREATE INDEX IF NOT EXISTS idx_comments_paste ON comments(paste_id)') conn.commit() finally: conn.close() _db_initialized = True _start_cleanup_task() _cleanup_running = False def _start_cleanup_task(): """Start a background thread to purge expired pastes every hour.""" global _cleanup_running if _cleanup_running: return _cleanup_running = True def run(): while True: try: # Sleep first to avoid running immediately on startup time.sleep(3600) now = datetime.datetime.now(_UTC).isoformat() conn = sqlite3.connect(DATABASE) try: with conn: res = conn.execute('DELETE FROM pastes WHERE expires_at < ?', (now,)) purged_pastes = res.rowcount res = conn.execute('DELETE FROM rate_limits WHERE timestamp < ?', (time.time() - 3600,)) purged_ips = res.rowcount if purged_pastes > 0 or purged_ips > 0: print(f"[Cleanup] Purged {purged_pastes} expired pastes and {purged_ips} rate limit entries.", flush=True) finally: conn.close() except Exception as e: print(f"Error in background cleanup: {e}", file=sys.stderr) t = threading.Thread(target=run, daemon=True) t.start() # ── Helpers ─────────────────────────────────────────────────────────────────── def generate_paste_id(): length = _pastes.get('id_length', 8) return str(uuid.uuid4()).replace('-', '')[:length] def validate_encrypted_data(value): if not isinstance(value, str): return False # Use byte length for a consistent limit regardless of character encoding if len(value.encode('utf-8')) > MAX_ENCRYPTED_BYTES: return False return bool(_ENCRYPTED_RE.match(value)) def _is_same_origin(): """Verify that the request is from the same origin to prevent CSRF.""" origin = request.headers.get('Origin') referer = request.headers.get('Referer') base_url = request.host_url.rstrip('/') # e.g. http://localhost:5500 if origin: return origin.rstrip('/') == base_url if referer: return referer.startswith(base_url) return False def _ensure_rate_limits_table(conn): """Create the rate_limits table if it doesn't exist (migration safety net).""" conn.execute(''' CREATE TABLE IF NOT EXISTS rate_limits ( ip_address TEXT, timestamp REAL, PRIMARY KEY (ip_address, timestamp) ) ''') conn.execute('CREATE INDEX IF NOT EXISTS idx_rate_limit_ts ON rate_limits(timestamp)') conn.commit() def _check_rate_limit(remote_ip, key_prefix='rl', window=600, limit=10): """Generic rate limiting via SQLite. Window is in seconds.""" now_ts = time.time() conn = get_db_connection() try: try: count = conn.execute( 'SELECT COUNT(*) FROM rate_limits WHERE ip_address = ? AND timestamp > ?', (f"{key_prefix}:{remote_ip}", now_ts - window) ).fetchone()[0] except sqlite3.OperationalError: # Table missing (pre-migration DB) — create it and proceed. _ensure_rate_limits_table(conn) count = 0 if count >= limit: return False conn.execute('INSERT INTO rate_limits (ip_address, timestamp) VALUES (?, ?)', (f"{key_prefix}:{remote_ip}", now_ts)) conn.commit() return True finally: conn.close() def _get_paste_or_abort(paste_id): conn = get_db_connection() try: paste = conn.execute('SELECT * FROM pastes WHERE id = ?', (paste_id,)).fetchone() if not paste: abort(404) if paste['expires_at']: expires_at = datetime.datetime.fromisoformat(paste['expires_at']).replace(tzinfo=_UTC) if expires_at < datetime.datetime.now(_UTC): conn.execute('DELETE FROM pastes WHERE id = ?', (paste_id,)) conn.commit() abort(410) finally: conn.close() return paste # ── Routes ──────────────────────────────────────────────────────────────────── @app.route('/') def index(): return render_template('index.html') @app.route('/create', methods=['POST']) def create_paste(): # ── Rate limiting ──────────────────────────────────────────────────────── # 10 pastes per 10 minutes per IP address (SQLite backed for worker safety) # 10 pastes per 10 minutes per IP address (SQLite backed for worker safety) if not _check_rate_limit(request.remote_addr, key_prefix='create', limit=10): return jsonify({'error': 'Rate limit exceeded. Please wait a few minutes.'}), 429 data = request.get_json(silent=True) if not data: return jsonify({'error': 'JSON body required'}), 400 discussions_enabled = bool(data.get('discussions', False)) if 'encrypted_data' in data: # Encrypted path — always accepted regardless of encrypt_pastes setting store_data = data.get('encrypted_data', '') if not validate_encrypted_data(store_data): return jsonify({'error': 'encrypted_data must be a valid iv:ciphertext string'}), 400 elif 'content' in data: # Plaintext path — always accepted regardless of encrypt_pastes setting content = data.get('content', '') title = data.get('title', 'Untitled') or 'Untitled' language = data.get('language', _pastes.get('default_language', 'text')) if not content or not isinstance(content, str): return jsonify({'error': 'content must be a non-empty string'}), 400 if len(content.encode('utf-8')) > MAX_ENCRYPTED_BYTES: return jsonify({'error': 'Paste too large'}), 413 # Enforce server-side limits on title and language if not isinstance(title, str): title = 'Untitled' title = title.strip()[:MAX_TITLE_BYTES] or 'Untitled' if not isinstance(language, str) or not _LANGUAGE_RE.match(language): language = _pastes.get('default_language', 'text') store_data = json.dumps({'title': title, 'content': content, 'language': language, 'discussions': discussions_enabled}) else: return jsonify({'error': 'Provide either encrypted_data or content'}), 400 allowed_expiry = set(_pastes.get('allow_expiry_options', ['1year'])) expires_in = data.get('expires_in', _pastes.get('default_expiry', '1year')) if expires_in not in allowed_expiry: # Fallback to the first allowed option if everything is missing expires_in = _pastes.get('default_expiry', list(allowed_expiry)[0]) expires_at = None if expires_in != 'never': delta_map = { '1hour': datetime.timedelta(hours=1), '1day': datetime.timedelta(days=1), '1week': datetime.timedelta(weeks=1), '1month': datetime.timedelta(days=30), '1year': datetime.timedelta(days=365), } delta = delta_map.get(expires_in) if delta is None: # Config lists an expiry option not mapped in delta_map — treat as never expires_in = 'never' else: expires_at = datetime.datetime.now(_UTC) + delta deletion_token = secrets.token_urlsafe(32) paste_id = None conn = get_db_connection() try: for _ in range(5): paste_id = generate_paste_id() try: conn.execute( 'INSERT INTO pastes (id, encrypted_data, expires_at, deletion_token, discussions_enabled) VALUES (?, ?, ?, ?, ?)', (paste_id, store_data, expires_at, deletion_token, 1 if discussions_enabled else 0) ) conn.commit() break except sqlite3.IntegrityError: continue else: return jsonify({'error': 'Service temporarily unavailable'}), 503 finally: conn.close() return jsonify({'paste_id': paste_id, 'url': f'/{paste_id}', 'deletion_token': deletion_token}) @app.route('/api/delete/', methods=['POST']) def delete_paste(paste_id): if not _PASTE_ID_RE.match(paste_id): abort(404) # CSRF check if not _is_same_origin(): return jsonify({'error': 'Cross-Origin request blocked'}), 403 data = request.get_json(silent=True) or {} token = data.get('token') if not token: return jsonify({'error': 'Deletion token required'}), 400 conn = get_db_connection() try: paste = conn.execute('SELECT deletion_token FROM pastes WHERE id = ?', (paste_id,)).fetchone() if not paste: return jsonify({'error': 'Paste not found'}), 404 # Verify token if paste['deletion_token'] != token: return jsonify({'error': 'Invalid deletion token'}), 403 conn.execute('DELETE FROM pastes WHERE id = ?', (paste_id,)) conn.commit() finally: conn.close() return jsonify({'success': True, 'message': 'Paste deleted successfully'}) @app.route('/') def view_paste(paste_id): if not _PASTE_ID_RE.match(paste_id): abort(404) paste = _get_paste_or_abort(paste_id) # Increment view count in a separate connection after expiry check. conn = get_db_connection() try: conn.execute('UPDATE pastes SET views = views + 1 WHERE id = ?', (paste_id,)) conn.commit() finally: conn.close() return render_template('view.html', paste=paste) @app.route('//raw') def view_paste_raw(paste_id): if not _PASTE_ID_RE.match(paste_id): abort(404) paste = _get_paste_or_abort(paste_id) stored = paste['encrypted_data'] # 1. Plaintext paste — Return content directly as text/plain if not re.match(r'^[A-Za-z0-9_-]+:[A-Za-z0-9_-]+$', stored): try: data = json.loads(stored) return Response(data.get('content', ''), mimetype='text/plain; charset=utf-8') except (json.JSONDecodeError, TypeError): pass # 2. Encrypted paste — Browsers get a minimal decryptor; API consumers get JSON accept = request.headers.get('Accept', '') if 'text/html' in accept: # Minimal HTML shell that handles decryption for browsers (E2E) return render_template('raw_decryptor.html', paste=paste) # API response return jsonify({ 'id': paste['id'], 'encrypted_data': stored, 'created_at': paste['created_at'], 'expires_at': paste['expires_at'], 'views': paste['views'], }) @app.route('/api/languages') def get_languages(): return jsonify(CFG.get('languages', [])) @app.route('/api/config') def get_client_config(): """Expose a whitelisted subset of config to the browser. Never forward entire blobs.""" return jsonify({ 'site': { 'name': _site.get('name', 'Bastebin'), 'tagline': _site.get('tagline', ''), 'brand_icon': _site.get('brand_icon', ''), 'footer_text': _site.get('footer_text', ''), }, 'theme': { 'default': _theme.get('default', 'auto'), 'allow_user_toggle': _theme.get('allow_user_toggle', True), 'light': _theme.get('light', {}), 'dark': _theme.get('dark', {}), }, 'features': { 'encrypt_pastes': _feat.get('encrypt_pastes', True), 'show_recent': _feat.get('show_recent', False), 'show_view_count': _feat.get('show_view_count', True), 'show_e2e_banner': _feat.get('show_e2e_banner', True), 'allow_raw_api': _feat.get('allow_raw_api', True), 'auto_save_draft': _feat.get('auto_save_draft', True), 'draft_max_age_days': _feat.get('draft_max_age_days', 7), 'keyboard_shortcuts': _feat.get('keyboard_shortcuts', True), }, 'ui': { 'code_font_family': _ui.get('code_font_family', 'monospace'), 'code_font_size': _ui.get('code_font_size', '0.875rem'), 'code_line_height': _ui.get('code_line_height', '1.6'), 'textarea_rows': _ui.get('textarea_rows', 20), 'border_radius': _ui.get('border_radius', '8px'), 'animation_speed': _ui.get('animation_speed', '0.2s'), }, 'pastes': { 'default_language': _pastes.get('default_language', 'text'), 'default_expiry': _pastes.get('default_expiry', 'never'), 'allow_expiry_options': _pastes.get('allow_expiry_options', []), 'expiry_labels': _pastes.get('expiry_labels', {}), 'encryption_key_bits': _pastes.get('encryption_key_bits', 128), }, }) @app.route('/recent') def recent_pastes(): """Show recently created pastes (only available when show_recent is enabled in config).""" if not _feat.get('show_recent', False): abort(404) limit = int(_pastes.get('recent_limit', 50)) now = datetime.datetime.now(_UTC).isoformat() conn = get_db_connection() try: pastes = conn.execute( ''' SELECT id, created_at, expires_at, views FROM pastes WHERE expires_at IS NULL OR expires_at > ? ORDER BY created_at DESC LIMIT ? ''', (now, limit) ).fetchall() finally: conn.close() return render_template('recent.html', pastes=pastes) # ── Comments API ────────────────────────────────────────────────────────────── _MAX_COMMENT_BYTES = 10240 _MAX_COMMENTS_PER_PASTE = 500 @app.route('/api/comments/') def get_comments(paste_id): if not _PASTE_ID_RE.match(paste_id): abort(404) conn = get_db_connection() try: paste = conn.execute( 'SELECT id, discussions_enabled, expires_at FROM pastes WHERE id = ?', (paste_id,) ).fetchone() if not paste: abort(404) if paste['expires_at']: exp = datetime.datetime.fromisoformat(paste['expires_at']).replace(tzinfo=_UTC) if exp < datetime.datetime.now(_UTC): abort(410) if not paste['discussions_enabled']: return jsonify({'error': 'Discussions not enabled for this paste'}), 403 comments = conn.execute( 'SELECT id, content, created_at FROM comments WHERE paste_id = ? ORDER BY created_at ASC', (paste_id,) ).fetchall() finally: conn.close() return jsonify({ 'discussions_enabled': True, 'comments': [dict(c) for c in comments], }) @app.route('/api/comments/', methods=['POST']) def post_comment(paste_id): if not _PASTE_ID_RE.match(paste_id): abort(404) if not _is_same_origin(): return jsonify({'error': 'Cross-Origin request blocked'}), 403 if not _check_rate_limit(request.remote_addr, key_prefix='comment', window=600, limit=20): return jsonify({'error': 'Rate limit exceeded. Please wait a few minutes.'}), 429 data = request.get_json(silent=True) if not data: return jsonify({'error': 'JSON body required'}), 400 content = data.get('content', '') if not content or not isinstance(content, str): return jsonify({'error': 'content required'}), 400 if len(content.encode('utf-8')) > _MAX_COMMENT_BYTES: return jsonify({'error': 'Comment too large'}), 413 conn = get_db_connection() try: paste = conn.execute( 'SELECT id, discussions_enabled, expires_at FROM pastes WHERE id = ?', (paste_id,) ).fetchone() if not paste: abort(404) if paste['expires_at']: exp = datetime.datetime.fromisoformat(paste['expires_at']).replace(tzinfo=_UTC) if exp < datetime.datetime.now(_UTC): abort(410) if not paste['discussions_enabled']: return jsonify({'error': 'Discussions not enabled for this paste'}), 403 count = conn.execute( 'SELECT COUNT(*) FROM comments WHERE paste_id = ?', (paste_id,) ).fetchone()[0] if count >= _MAX_COMMENTS_PER_PASTE: return jsonify({'error': 'Comment limit reached for this paste'}), 429 comment_id = secrets.token_hex(8) conn.execute( 'INSERT INTO comments (id, paste_id, content) VALUES (?, ?, ?)', (comment_id, paste_id, content) ) conn.commit() comment = conn.execute( 'SELECT id, created_at FROM comments WHERE id = ?', (comment_id,) ).fetchone() finally: conn.close() return jsonify({'id': comment['id'], 'created_at': comment['created_at']}), 201 # ── Admin Panel ─────────────────────────────────────────────────────────────── def is_admin(): return session.get('admin_logged_in') == True @app.route('/admin/login', methods=['GET', 'POST']) def admin_login(): error = None admin_cfg = CFG.get('admin', {}) if request.method == 'POST': # 1. Rate limit if not _check_rate_limit(request.remote_addr, key_prefix='admin_login', window=3600, limit=5): return render_template('admin_login.html', error='Too many login attempts. Try again later.'), 429 # 2. CSRF if not _is_same_origin(): return "CSRF Blocked", 403 user = request.form.get('username') pw = request.form.get('password') stored_hash = admin_cfg.get('pass') if user == admin_cfg.get('user') and check_password_hash(stored_hash, pw): session['admin_logged_in'] = True return redirect(url_for('admin_dashboard')) else: time.sleep(1) # Slow down brute force error = 'Invalid credentials' return render_template('admin_login.html', error=error) @app.route('/admin/logout') def admin_logout(): session.pop('admin_logged_in', None) return redirect(url_for('index')) @app.route('/admin') def admin_dashboard(): if not is_admin(): return redirect(url_for('admin_login')) now = datetime.datetime.now(_UTC).isoformat() conn = get_db_connection() try: pastes = conn.execute( 'SELECT id, created_at, expires_at, views FROM pastes ORDER BY created_at DESC' ).fetchall() finally: conn.close() return render_template('admin_dashboard.html', pastes=pastes) @app.route('/admin/delete/', methods=['POST']) def admin_delete_paste(paste_id): if not is_admin(): abort(403) if not _is_same_origin(): abort(403) conn = get_db_connection() try: conn.execute('DELETE FROM pastes WHERE id = ?', (paste_id,)) conn.commit() finally: conn.close() return redirect(url_for('admin_dashboard')) # ── Error handlers ──────────────────────────────────────────────────────────── @app.errorhandler(404) def not_found(error): return render_template('404.html'), 404 @app.errorhandler(410) def gone(error): return render_template('410.html'), 410 # ── Entry point ─────────────────────────────────────────────────────────────── # Always initialise the DB when the module is imported (works under Gunicorn # and other WSGI servers that import app directly, not just via wsgi.py). init_db() if __name__ == '__main__': app.run( debug=_server.get('debug', False), host=_server.get('host', '0.0.0.0'), port=_server.get('port', 5000) )