import json import os import re import sqlite3 import uuid import datetime from flask import Flask, render_template, request, jsonify, abort # ── Load configuration ──────────────────────────────────────────────────────── _CONFIG_PATH = os.path.join(os.path.dirname(__file__), 'config.json') def _load_config(): with open(_CONFIG_PATH, 'r', encoding='utf-8') as f: return json.load(f) try: CFG = _load_config() except (FileNotFoundError, json.JSONDecodeError) as e: raise RuntimeError(f"Failed to load config.json: {e}") from e _site = CFG['site'] _server = CFG['server'] _db_cfg = CFG['database'] _pastes = CFG['pastes'] _theme = CFG['theme'] _feat = CFG['features'] _ui = CFG['ui'] # ── Flask app setup ─────────────────────────────────────────────────────────── app = Flask(__name__) app.config['SECRET_KEY'] = _server.get('secret_key', 'change-me') DATABASE = _db_cfg.get('path', 'bastebin.db') MAX_ENCRYPTED_BYTES = _pastes.get('max_size_bytes', 2 * 1024 * 1024) _ENCRYPTED_RE = re.compile(r'^[A-Za-z0-9_-]+:[A-Za-z0-9_-]+$') # ── Jinja helpers ───────────────────────────────────────────────────────────── @app.context_processor def inject_config(): return {'cfg': CFG} # ── Database ────────────────────────────────────────────────────────────────── def get_db_connection(): conn = sqlite3.connect(DATABASE, timeout=10) conn.row_factory = sqlite3.Row conn.execute('PRAGMA journal_mode=WAL') return conn def init_db(): conn = sqlite3.connect(DATABASE) cursor = conn.cursor() cursor.execute("PRAGMA table_info(pastes)") columns = {row[1] for row in cursor.fetchall()} if columns and 'content' in columns and 'encrypted_data' not in columns: cursor.execute("DROP TABLE pastes") cursor.execute(''' CREATE TABLE IF NOT EXISTS pastes ( id TEXT PRIMARY KEY, encrypted_data TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, expires_at TIMESTAMP, views INTEGER DEFAULT 0 ) ''') conn.commit() conn.close() # ── Helpers ─────────────────────────────────────────────────────────────────── def generate_paste_id(): length = _pastes.get('id_length', 8) return str(uuid.uuid4()).replace('-', '')[:length] def validate_encrypted_data(value): if not isinstance(value, str): return False if len(value) > MAX_ENCRYPTED_BYTES: return False return bool(_ENCRYPTED_RE.match(value)) def _get_paste_or_abort(paste_id): conn = get_db_connection() paste = conn.execute('SELECT * FROM pastes WHERE id = ?', (paste_id,)).fetchone() conn.close() if not paste: abort(404) if paste['expires_at']: expires_at = datetime.datetime.fromisoformat(paste['expires_at']) if expires_at < datetime.datetime.now(): abort(410) return paste # ── Routes ──────────────────────────────────────────────────────────────────── @app.route('/') def index(): return render_template('index.html') @app.route('/create', methods=['POST']) def create_paste(): data = request.get_json(silent=True) if not data: return jsonify({'error': 'JSON body required'}), 400 if 'encrypted_data' in data: # Encrypted path — always accepted regardless of encrypt_pastes setting store_data = data.get('encrypted_data', '') if not validate_encrypted_data(store_data): return jsonify({'error': 'encrypted_data must be a valid iv:ciphertext string'}), 400 elif 'content' in data: # Plaintext path — always accepted regardless of encrypt_pastes setting content = data.get('content', '') title = data.get('title', 'Untitled') or 'Untitled' language = data.get('language', _pastes.get('default_language', 'text')) if not content or not isinstance(content, str): return jsonify({'error': 'content must be a non-empty string'}), 400 if len(content.encode('utf-8')) > MAX_ENCRYPTED_BYTES: return jsonify({'error': 'Paste too large'}), 413 store_data = json.dumps({'title': title, 'content': content, 'language': language}) else: return jsonify({'error': 'Provide either encrypted_data or content'}), 400 allowed_expiry = set(_pastes.get('allow_expiry_options', ['never'])) expires_in = data.get('expires_in', 'never') if expires_in not in allowed_expiry: expires_in = 'never' expires_at = None if expires_in != 'never': delta_map = { '1hour': datetime.timedelta(hours=1), '1day': datetime.timedelta(days=1), '1week': datetime.timedelta(weeks=1), '1month': datetime.timedelta(days=30), } expires_at = datetime.datetime.now() + delta_map[expires_in] paste_id = generate_paste_id() conn = get_db_connection() conn.execute( 'INSERT INTO pastes (id, encrypted_data, expires_at) VALUES (?, ?, ?)', (paste_id, store_data, expires_at) ) conn.commit() conn.close() return jsonify({'paste_id': paste_id, 'url': f'/{paste_id}'}) @app.route('/') def view_paste(paste_id): paste = _get_paste_or_abort(paste_id) conn = get_db_connection() conn.execute('UPDATE pastes SET views = views + 1 WHERE id = ?', (paste_id,)) conn.commit() paste = conn.execute('SELECT * FROM pastes WHERE id = ?', (paste_id,)).fetchone() conn.close() return render_template('view.html', paste=paste) @app.route('//raw') def view_paste_raw(paste_id): paste = _get_paste_or_abort(paste_id) return jsonify({ 'id': paste['id'], 'encrypted_data': paste['encrypted_data'], 'created_at': paste['created_at'], 'expires_at': paste['expires_at'], 'views': paste['views'], }) @app.route('/api/languages') def get_languages(): return jsonify(CFG.get('languages', [])) @app.route('/api/config') def get_client_config(): """Expose the browser-safe portion of config to JavaScript.""" return jsonify({ 'site': _site, 'theme': _theme, 'features': _feat, 'ui': _ui, 'pastes': { 'default_language': _pastes.get('default_language', 'text'), 'default_expiry': _pastes.get('default_expiry', 'never'), 'allow_expiry_options': _pastes.get('allow_expiry_options', []), 'expiry_labels': _pastes.get('expiry_labels', {}), } }) # ── Error handlers ──────────────────────────────────────────────────────────── @app.errorhandler(404) def not_found(error): return render_template('404.html'), 404 @app.errorhandler(410) def gone(error): return render_template('410.html'), 410 # ── Entry point ─────────────────────────────────────────────────────────────── if __name__ == '__main__': init_db() app.run( debug=_server.get('debug', False), host=_server.get('host', '0.0.0.0'), port=_server.get('port', 5000) )