bastebin/app.py

366 lines
14 KiB
Python

import json
import os
import re
import secrets
import sqlite3
import sys
import threading
import uuid
import datetime
from flask import Flask, render_template, request, jsonify, abort, Response, g
_UTC = datetime.timezone.utc
# ── Load configuration ────────────────────────────────────────────────────────
_CONFIG_PATH = os.path.join(os.path.dirname(__file__), 'config.json')
def _load_config():
with open(_CONFIG_PATH, 'r', encoding='utf-8') as f:
return json.load(f)
try:
CFG = _load_config()
except (FileNotFoundError, json.JSONDecodeError) as e:
raise RuntimeError(f"Failed to load config.json: {e}") from e
_site = CFG['site']
_server = CFG['server']
_db_cfg = CFG['database']
_pastes = CFG['pastes']
_theme = CFG['theme']
_feat = CFG['features']
_ui = CFG['ui']
# ── Flask app setup ───────────────────────────────────────────────────────────
app = Flask(__name__)
# Prefer SECRET_KEY from the environment; fall back to config.json.
# Refuse to start if the key is still the default placeholder.
_secret_key = os.environ.get('SECRET_KEY') or _server.get('secret_key', '')
_DEFAULT_KEYS = {'', 'change-me', 'change-this-to-a-long-random-secret'}
if _secret_key in _DEFAULT_KEYS:
raise RuntimeError(
'SECRET_KEY is not set or is still the default placeholder. '
'Set a long random value in the SECRET_KEY environment variable '
'or in config.json before starting the server.'
)
app.config['SECRET_KEY'] = _secret_key
if _server.get('debug', False):
print('WARNING: debug=true is set in config.json — never use debug mode in production!',
file=sys.stderr, flush=True)
_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
_raw_db_path = _db_cfg.get('path', 'bastebin.db')
DATABASE = os.path.realpath(os.path.join(_BASE_DIR, _raw_db_path))
if not DATABASE.startswith(_BASE_DIR):
raise RuntimeError(f"Database path '{DATABASE}' must be within the project directory.")
MAX_ENCRYPTED_BYTES = _pastes.get('max_size_bytes', 2 * 1024 * 1024)
MAX_TITLE_BYTES = 200
_ENCRYPTED_RE = re.compile(r'^[A-Za-z0-9_-]+:[A-Za-z0-9_-]+$')
_LANGUAGE_RE = re.compile(r'^[a-z0-9_+-]{1,32}$')
_PASTE_ID_RE = re.compile(r'^[0-9a-f]{4,32}$')
# ── Security headers ─────────────────────────────────────────────────────────
@app.before_request
def _generate_csp_nonce():
"""Generate a unique nonce per request for the Content-Security-Policy."""
g.csp_nonce = secrets.token_urlsafe(16)
@app.after_request
def set_security_headers(response):
nonce = getattr(g, 'csp_nonce', '')
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['Referrer-Policy'] = 'same-origin'
response.headers['Content-Security-Policy'] = (
"default-src 'self'; "
f"script-src 'self' https://cdnjs.cloudflare.com 'nonce-{nonce}'; "
"style-src 'self' https://cdnjs.cloudflare.com 'unsafe-inline'; "
"img-src 'self' data:; "
"connect-src 'self' blob:; "
"object-src 'none'; "
"base-uri 'self'; "
"frame-ancestors 'none';"
)
return response
# ── Jinja helpers ─────────────────────────────────────────────────────────────
@app.context_processor
def inject_config():
return {'cfg': CFG, 'csp_nonce': getattr(g, 'csp_nonce', '')}
# ── Database ──────────────────────────────────────────────────────────────────
def get_db_connection():
conn = sqlite3.connect(DATABASE, timeout=10)
conn.row_factory = sqlite3.Row
conn.execute('PRAGMA journal_mode=WAL')
return conn
_db_init_lock = threading.Lock()
_db_initialized = False
def init_db():
"""Initialise the database. Thread-safe and idempotent: runs DDL only once per process."""
global _db_initialized
with _db_init_lock:
if _db_initialized:
return
conn = sqlite3.connect(DATABASE)
try:
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(pastes)")
columns = {row[1] for row in cursor.fetchall()}
if columns and 'content' in columns and 'encrypted_data' not in columns:
cursor.execute("DROP TABLE pastes")
cursor.execute('''
CREATE TABLE IF NOT EXISTS pastes (
id TEXT PRIMARY KEY,
encrypted_data TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP,
views INTEGER DEFAULT 0
)
''')
conn.commit()
finally:
conn.close()
_db_initialized = True
# ── Helpers ───────────────────────────────────────────────────────────────────
def generate_paste_id():
length = _pastes.get('id_length', 8)
return str(uuid.uuid4()).replace('-', '')[:length]
def validate_encrypted_data(value):
if not isinstance(value, str):
return False
# Use byte length for a consistent limit regardless of character encoding
if len(value.encode('utf-8')) > MAX_ENCRYPTED_BYTES:
return False
return bool(_ENCRYPTED_RE.match(value))
def _get_paste_or_abort(paste_id):
conn = get_db_connection()
try:
paste = conn.execute('SELECT * FROM pastes WHERE id = ?', (paste_id,)).fetchone()
if not paste:
abort(404)
if paste['expires_at']:
expires_at = datetime.datetime.fromisoformat(paste['expires_at']).replace(tzinfo=_UTC)
if expires_at < datetime.datetime.now(_UTC):
conn.execute('DELETE FROM pastes WHERE id = ?', (paste_id,))
conn.commit()
abort(410)
finally:
conn.close()
return paste
# ── Routes ────────────────────────────────────────────────────────────────────
@app.route('/')
def index():
return render_template('index.html')
@app.route('/create', methods=['POST'])
def create_paste():
data = request.get_json(silent=True)
if not data:
return jsonify({'error': 'JSON body required'}), 400
if 'encrypted_data' in data:
# Encrypted path — always accepted regardless of encrypt_pastes setting
store_data = data.get('encrypted_data', '')
if not validate_encrypted_data(store_data):
return jsonify({'error': 'encrypted_data must be a valid iv:ciphertext string'}), 400
elif 'content' in data:
# Plaintext path — always accepted regardless of encrypt_pastes setting
content = data.get('content', '')
title = data.get('title', 'Untitled') or 'Untitled'
language = data.get('language', _pastes.get('default_language', 'text'))
if not content or not isinstance(content, str):
return jsonify({'error': 'content must be a non-empty string'}), 400
if len(content.encode('utf-8')) > MAX_ENCRYPTED_BYTES:
return jsonify({'error': 'Paste too large'}), 413
# Enforce server-side limits on title and language
if not isinstance(title, str):
title = 'Untitled'
title = title.strip()[:MAX_TITLE_BYTES] or 'Untitled'
if not isinstance(language, str) or not _LANGUAGE_RE.match(language):
language = _pastes.get('default_language', 'text')
store_data = json.dumps({'title': title, 'content': content, 'language': language})
else:
return jsonify({'error': 'Provide either encrypted_data or content'}), 400
allowed_expiry = set(_pastes.get('allow_expiry_options', ['never']))
expires_in = data.get('expires_in', 'never')
if expires_in not in allowed_expiry:
expires_in = 'never'
expires_at = None
if expires_in != 'never':
delta_map = {
'1hour': datetime.timedelta(hours=1),
'1day': datetime.timedelta(days=1),
'1week': datetime.timedelta(weeks=1),
'1month': datetime.timedelta(days=30),
}
delta = delta_map.get(expires_in)
if delta is None:
# Config lists an expiry option not mapped in delta_map — treat as never
expires_in = 'never'
else:
expires_at = datetime.datetime.now(_UTC) + delta
paste_id = None
conn = get_db_connection()
try:
for _ in range(5):
paste_id = generate_paste_id()
try:
conn.execute(
'INSERT INTO pastes (id, encrypted_data, expires_at) VALUES (?, ?, ?)',
(paste_id, store_data, expires_at)
)
conn.commit()
break
except sqlite3.IntegrityError:
continue
else:
return jsonify({'error': 'Service temporarily unavailable'}), 503
finally:
conn.close()
return jsonify({'paste_id': paste_id, 'url': f'/{paste_id}'})
@app.route('/<string:paste_id>')
def view_paste(paste_id):
if not _PASTE_ID_RE.match(paste_id):
abort(404)
paste = _get_paste_or_abort(paste_id)
# Increment view count in a separate connection after expiry check.
conn = get_db_connection()
try:
conn.execute('UPDATE pastes SET views = views + 1 WHERE id = ?', (paste_id,))
conn.commit()
finally:
conn.close()
return render_template('view.html', paste=paste)
@app.route('/<string:paste_id>/raw')
def view_paste_raw(paste_id):
if not _PASTE_ID_RE.match(paste_id):
abort(404)
paste = _get_paste_or_abort(paste_id)
stored = paste['encrypted_data']
# Plaintext pastes are stored as a JSON object; return the content directly.
if not re.match(r'^[A-Za-z0-9_-]+:[A-Za-z0-9_-]+$', stored):
try:
data = json.loads(stored)
return Response(data.get('content', ''), mimetype='text/plain')
except (json.JSONDecodeError, TypeError):
pass
# Encrypted paste — return the raw ciphertext blob for API consumers.
return jsonify({
'id': paste['id'],
'encrypted_data': stored,
'created_at': paste['created_at'],
'expires_at': paste['expires_at'],
'views': paste['views'],
})
@app.route('/api/languages')
def get_languages():
return jsonify(CFG.get('languages', []))
@app.route('/api/config')
def get_client_config():
"""Expose a whitelisted subset of config to the browser. Never forward entire blobs."""
return jsonify({
'site': {
'name': _site.get('name', 'Bastebin'),
'tagline': _site.get('tagline', ''),
'brand_icon': _site.get('brand_icon', ''),
'footer_text': _site.get('footer_text', ''),
},
'theme': {
'default': _theme.get('default', 'auto'),
'allow_user_toggle': _theme.get('allow_user_toggle', True),
'light': _theme.get('light', {}),
'dark': _theme.get('dark', {}),
},
'features': {
'encrypt_pastes': _feat.get('encrypt_pastes', True),
'show_recent': _feat.get('show_recent', False),
'show_view_count': _feat.get('show_view_count', True),
'show_e2e_banner': _feat.get('show_e2e_banner', True),
'allow_raw_api': _feat.get('allow_raw_api', True),
'auto_save_draft': _feat.get('auto_save_draft', True),
'draft_max_age_days': _feat.get('draft_max_age_days', 7),
'keyboard_shortcuts': _feat.get('keyboard_shortcuts', True),
},
'ui': {
'code_font_family': _ui.get('code_font_family', 'monospace'),
'code_font_size': _ui.get('code_font_size', '0.875rem'),
'code_line_height': _ui.get('code_line_height', '1.6'),
'textarea_rows': _ui.get('textarea_rows', 20),
'border_radius': _ui.get('border_radius', '8px'),
'animation_speed': _ui.get('animation_speed', '0.2s'),
},
'pastes': {
'default_language': _pastes.get('default_language', 'text'),
'default_expiry': _pastes.get('default_expiry', 'never'),
'allow_expiry_options': _pastes.get('allow_expiry_options', []),
'expiry_labels': _pastes.get('expiry_labels', {}),
},
})
@app.route('/recent')
def recent_pastes():
"""Show recently created pastes (only available when show_recent is enabled in config)."""
if not _feat.get('show_recent', False):
abort(404)
limit = int(_pastes.get('recent_limit', 50))
now = datetime.datetime.now(_UTC).isoformat()
conn = get_db_connection()
try:
pastes = conn.execute(
'''
SELECT id, created_at, expires_at, views FROM pastes
WHERE expires_at IS NULL OR expires_at > ?
ORDER BY created_at DESC LIMIT ?
''',
(now, limit)
).fetchall()
finally:
conn.close()
return render_template('recent.html', pastes=pastes)
# ── Error handlers ────────────────────────────────────────────────────────────
@app.errorhandler(404)
def not_found(error):
return render_template('404.html'), 404
@app.errorhandler(410)
def gone(error):
return render_template('410.html'), 410
# ── Entry point ───────────────────────────────────────────────────────────────
if __name__ == '__main__':
init_db()
app.run(
debug=_server.get('debug', False),
host=_server.get('host', '0.0.0.0'),
port=_server.get('port', 5000)
)