forked from ComputerTech/bastebin
560 lines
22 KiB
Python
560 lines
22 KiB
Python
import json
|
|
import os
|
|
import re
|
|
import secrets
|
|
import sqlite3
|
|
import sys
|
|
import threading
|
|
import time
|
|
import uuid
|
|
import datetime
|
|
from flask import Flask, render_template, request, jsonify, abort, Response, g, session, redirect, url_for
|
|
from werkzeug.security import check_password_hash
|
|
|
|
_UTC = datetime.timezone.utc
|
|
|
|
|
|
# ── Load configuration ────────────────────────────────────────────────────────
|
|
|
|
_CONFIG_PATH = os.path.join(os.path.dirname(__file__), 'config.json')
|
|
|
|
def _load_config():
|
|
with open(_CONFIG_PATH, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
|
|
try:
|
|
CFG = _load_config()
|
|
except (FileNotFoundError, json.JSONDecodeError) as e:
|
|
raise RuntimeError(f"Failed to load config.json: {e}") from e
|
|
|
|
_site = CFG['site']
|
|
_server = CFG['server']
|
|
_db_cfg = CFG['database']
|
|
_pastes = CFG['pastes']
|
|
_theme = CFG['theme']
|
|
_feat = CFG['features']
|
|
_ui = CFG['ui']
|
|
|
|
# ── Flask app setup ───────────────────────────────────────────────────────────
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Prefer SECRET_KEY from the environment; fall back to config.json.
|
|
# Refuse to start if the key is still the default placeholder.
|
|
_secret_key = os.environ.get('SECRET_KEY') or _server.get('secret_key', '')
|
|
_DEFAULT_KEYS = {'', 'change-me', 'change-this-to-a-long-random-secret'}
|
|
if _secret_key in _DEFAULT_KEYS:
|
|
raise RuntimeError(
|
|
'SECRET_KEY is not set or is still the default placeholder. '
|
|
'Set a long random value in the SECRET_KEY environment variable '
|
|
'or in config.json before starting the server.'
|
|
)
|
|
app.config['SECRET_KEY'] = _secret_key
|
|
# ── Session hardening ─────────────────────────────────────────────────────────
|
|
app.config['SESSION_COOKIE_HTTPONLY'] = True
|
|
app.config['SESSION_COOKIE_SECURE'] = _server.get('use_https', False)
|
|
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
|
|
|
|
if _server.get('debug', False):
|
|
print('WARNING: debug=true is set in config.json — never use debug mode in production!',
|
|
file=sys.stderr, flush=True)
|
|
|
|
_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
_raw_db_path = _db_cfg.get('path', 'bastebin.db')
|
|
DATABASE = os.path.realpath(os.path.join(_BASE_DIR, _raw_db_path))
|
|
if not DATABASE.startswith(_BASE_DIR):
|
|
raise RuntimeError(f"Database path '{DATABASE}' must be within the project directory.")
|
|
MAX_ENCRYPTED_BYTES = _pastes.get('max_size_bytes', 2 * 1024 * 1024)
|
|
MAX_TITLE_BYTES = 200
|
|
_ENCRYPTED_RE = re.compile(r'^[A-Za-z0-9_-]+:[A-Za-z0-9_-]+$')
|
|
_LANGUAGE_RE = re.compile(r'^[a-z0-9_+-]{1,32}$')
|
|
_PASTE_ID_RE = re.compile(r'^[0-9a-f]{4,32}$')
|
|
|
|
# ── Security headers ─────────────────────────────────────────────────────────
|
|
|
|
@app.before_request
|
|
def _generate_csp_nonce():
|
|
"""Generate a unique nonce per request for the Content-Security-Policy."""
|
|
g.csp_nonce = secrets.token_urlsafe(16)
|
|
|
|
@app.after_request
|
|
def set_security_headers(response):
|
|
nonce = getattr(g, 'csp_nonce', '')
|
|
response.headers['X-Frame-Options'] = 'DENY'
|
|
response.headers['X-Content-Type-Options'] = 'nosniff'
|
|
response.headers['Referrer-Policy'] = 'same-origin'
|
|
response.headers['Content-Security-Policy'] = (
|
|
"default-src 'self'; "
|
|
f"script-src 'self' https://cdnjs.cloudflare.com 'nonce-{nonce}'; "
|
|
"style-src 'self' https://cdnjs.cloudflare.com 'unsafe-inline'; "
|
|
"img-src 'self' data:; "
|
|
"connect-src 'self' blob:; "
|
|
"object-src 'none'; "
|
|
"base-uri 'self'; "
|
|
"frame-ancestors 'none';"
|
|
)
|
|
return response
|
|
|
|
# ── Jinja helpers ─────────────────────────────────────────────────────────────
|
|
|
|
@app.context_processor
|
|
def inject_config():
|
|
return {'cfg': CFG, 'csp_nonce': getattr(g, 'csp_nonce', '')}
|
|
|
|
# ── Database ──────────────────────────────────────────────────────────────────
|
|
|
|
def get_db_connection():
|
|
conn = sqlite3.connect(DATABASE, timeout=10)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute('PRAGMA journal_mode=WAL')
|
|
return conn
|
|
|
|
_db_init_lock = threading.Lock()
|
|
_db_initialized = False
|
|
|
|
def init_db():
|
|
"""Initialise the database. Thread-safe and idempotent: runs DDL only once per process."""
|
|
global _db_initialized
|
|
with _db_init_lock:
|
|
if _db_initialized:
|
|
return
|
|
conn = sqlite3.connect(DATABASE)
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("PRAGMA table_info(pastes)")
|
|
columns = {row[1] for row in cursor.fetchall()}
|
|
if columns and 'content' in columns and 'encrypted_data' not in columns:
|
|
cursor.execute("DROP TABLE pastes")
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS pastes (
|
|
id TEXT PRIMARY KEY,
|
|
encrypted_data TEXT NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
expires_at TIMESTAMP,
|
|
views INTEGER DEFAULT 0,
|
|
deletion_token TEXT
|
|
)
|
|
''')
|
|
# Migration: add deletion_token column to existing table if missing
|
|
if columns and 'id' in columns and 'deletion_token' not in columns:
|
|
cursor.execute('ALTER TABLE pastes ADD COLUMN deletion_token TEXT')
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS rate_limits (
|
|
ip_address TEXT,
|
|
timestamp REAL,
|
|
PRIMARY KEY (ip_address, timestamp)
|
|
)
|
|
''')
|
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_rate_limit_ts ON rate_limits(timestamp)')
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
_db_initialized = True
|
|
_start_cleanup_task()
|
|
|
|
_cleanup_running = False
|
|
|
|
def _start_cleanup_task():
|
|
"""Start a background thread to purge expired pastes every hour."""
|
|
global _cleanup_running
|
|
if _cleanup_running:
|
|
return
|
|
_cleanup_running = True
|
|
def run():
|
|
while True:
|
|
try:
|
|
# Sleep first to avoid running immediately on startup
|
|
time.sleep(3600)
|
|
now = datetime.datetime.now(_UTC).isoformat()
|
|
conn = sqlite3.connect(DATABASE)
|
|
try:
|
|
with conn:
|
|
res = conn.execute('DELETE FROM pastes WHERE expires_at < ?', (now,))
|
|
purged_pastes = res.rowcount
|
|
res = conn.execute('DELETE FROM rate_limits WHERE timestamp < ?', (time.time() - 3600,))
|
|
purged_ips = res.rowcount
|
|
if purged_pastes > 0 or purged_ips > 0:
|
|
print(f"[Cleanup] Purged {purged_pastes} expired pastes and {purged_ips} rate limit entries.", flush=True)
|
|
finally:
|
|
conn.close()
|
|
except Exception as e:
|
|
print(f"Error in background cleanup: {e}", file=sys.stderr)
|
|
|
|
t = threading.Thread(target=run, daemon=True)
|
|
t.start()
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
def generate_paste_id():
|
|
length = _pastes.get('id_length', 8)
|
|
return str(uuid.uuid4()).replace('-', '')[:length]
|
|
|
|
def validate_encrypted_data(value):
|
|
if not isinstance(value, str):
|
|
return False
|
|
# Use byte length for a consistent limit regardless of character encoding
|
|
if len(value.encode('utf-8')) > MAX_ENCRYPTED_BYTES:
|
|
return False
|
|
return bool(_ENCRYPTED_RE.match(value))
|
|
|
|
def _is_same_origin():
|
|
"""Verify that the request is from the same origin to prevent CSRF."""
|
|
origin = request.headers.get('Origin')
|
|
referer = request.headers.get('Referer')
|
|
base_url = request.host_url.rstrip('/') # e.g. http://localhost:5500
|
|
|
|
if origin:
|
|
return origin.rstrip('/') == base_url
|
|
if referer:
|
|
return referer.startswith(base_url)
|
|
return False
|
|
|
|
def _check_rate_limit(remote_ip, key_prefix='rl', window=600, limit=10):
|
|
"""Generic rate limiting via SQLite. Window is in seconds."""
|
|
now_ts = time.time()
|
|
conn = get_db_connection()
|
|
try:
|
|
count = conn.execute(
|
|
'SELECT COUNT(*) FROM rate_limits WHERE ip_address = ? AND timestamp > ?',
|
|
(f"{key_prefix}:{remote_ip}", now_ts - window)
|
|
).fetchone()[0]
|
|
|
|
if count >= limit:
|
|
return False
|
|
|
|
conn.execute('INSERT INTO rate_limits (ip_address, timestamp) VALUES (?, ?)', (f"{key_prefix}:{remote_ip}", now_ts))
|
|
conn.commit()
|
|
return True
|
|
finally:
|
|
conn.close()
|
|
|
|
def _get_paste_or_abort(paste_id):
|
|
conn = get_db_connection()
|
|
try:
|
|
paste = conn.execute('SELECT * FROM pastes WHERE id = ?', (paste_id,)).fetchone()
|
|
if not paste:
|
|
abort(404)
|
|
if paste['expires_at']:
|
|
expires_at = datetime.datetime.fromisoformat(paste['expires_at']).replace(tzinfo=_UTC)
|
|
if expires_at < datetime.datetime.now(_UTC):
|
|
conn.execute('DELETE FROM pastes WHERE id = ?', (paste_id,))
|
|
conn.commit()
|
|
abort(410)
|
|
finally:
|
|
conn.close()
|
|
return paste
|
|
|
|
# ── Routes ────────────────────────────────────────────────────────────────────
|
|
|
|
@app.route('/')
|
|
def index():
|
|
return render_template('index.html')
|
|
|
|
@app.route('/create', methods=['POST'])
|
|
def create_paste():
|
|
# ── Rate limiting ────────────────────────────────────────────────────────
|
|
# 10 pastes per 10 minutes per IP address (SQLite backed for worker safety)
|
|
# 10 pastes per 10 minutes per IP address (SQLite backed for worker safety)
|
|
if not _check_rate_limit(request.remote_addr, key_prefix='create', limit=10):
|
|
return jsonify({'error': 'Rate limit exceeded. Please wait a few minutes.'}), 429
|
|
|
|
data = request.get_json(silent=True)
|
|
if not data:
|
|
return jsonify({'error': 'JSON body required'}), 400
|
|
|
|
if 'encrypted_data' in data:
|
|
# Encrypted path — always accepted regardless of encrypt_pastes setting
|
|
store_data = data.get('encrypted_data', '')
|
|
if not validate_encrypted_data(store_data):
|
|
return jsonify({'error': 'encrypted_data must be a valid iv:ciphertext string'}), 400
|
|
elif 'content' in data:
|
|
# Plaintext path — always accepted regardless of encrypt_pastes setting
|
|
content = data.get('content', '')
|
|
title = data.get('title', 'Untitled') or 'Untitled'
|
|
language = data.get('language', _pastes.get('default_language', 'text'))
|
|
if not content or not isinstance(content, str):
|
|
return jsonify({'error': 'content must be a non-empty string'}), 400
|
|
if len(content.encode('utf-8')) > MAX_ENCRYPTED_BYTES:
|
|
return jsonify({'error': 'Paste too large'}), 413
|
|
# Enforce server-side limits on title and language
|
|
if not isinstance(title, str):
|
|
title = 'Untitled'
|
|
title = title.strip()[:MAX_TITLE_BYTES] or 'Untitled'
|
|
if not isinstance(language, str) or not _LANGUAGE_RE.match(language):
|
|
language = _pastes.get('default_language', 'text')
|
|
store_data = json.dumps({'title': title, 'content': content, 'language': language})
|
|
else:
|
|
return jsonify({'error': 'Provide either encrypted_data or content'}), 400
|
|
|
|
allowed_expiry = set(_pastes.get('allow_expiry_options', ['1year']))
|
|
expires_in = data.get('expires_in', _pastes.get('default_expiry', '1year'))
|
|
if expires_in not in allowed_expiry:
|
|
# Fallback to the first allowed option if everything is missing
|
|
expires_in = _pastes.get('default_expiry', list(allowed_expiry)[0])
|
|
|
|
expires_at = None
|
|
if expires_in != 'never':
|
|
delta_map = {
|
|
'1hour': datetime.timedelta(hours=1),
|
|
'1day': datetime.timedelta(days=1),
|
|
'1week': datetime.timedelta(weeks=1),
|
|
'1month': datetime.timedelta(days=30),
|
|
'1year': datetime.timedelta(days=365),
|
|
}
|
|
delta = delta_map.get(expires_in)
|
|
if delta is None:
|
|
# Config lists an expiry option not mapped in delta_map — treat as never
|
|
expires_in = 'never'
|
|
else:
|
|
expires_at = datetime.datetime.now(_UTC) + delta
|
|
|
|
deletion_token = secrets.token_urlsafe(32)
|
|
paste_id = None
|
|
conn = get_db_connection()
|
|
try:
|
|
for _ in range(5):
|
|
paste_id = generate_paste_id()
|
|
try:
|
|
conn.execute(
|
|
'INSERT INTO pastes (id, encrypted_data, expires_at, deletion_token) VALUES (?, ?, ?, ?)',
|
|
(paste_id, store_data, expires_at, deletion_token)
|
|
)
|
|
conn.commit()
|
|
break
|
|
except sqlite3.IntegrityError:
|
|
continue
|
|
else:
|
|
return jsonify({'error': 'Service temporarily unavailable'}), 503
|
|
finally:
|
|
conn.close()
|
|
return jsonify({'paste_id': paste_id, 'url': f'/{paste_id}', 'deletion_token': deletion_token})
|
|
|
|
@app.route('/api/delete/<string:paste_id>', methods=['POST'])
|
|
def delete_paste(paste_id):
|
|
if not _PASTE_ID_RE.match(paste_id):
|
|
abort(404)
|
|
|
|
# CSRF check
|
|
if not _is_same_origin():
|
|
return jsonify({'error': 'Cross-Origin request blocked'}), 403
|
|
|
|
data = request.get_json(silent=True) or {}
|
|
token = data.get('token')
|
|
|
|
if not token:
|
|
return jsonify({'error': 'Deletion token required'}), 400
|
|
|
|
conn = get_db_connection()
|
|
try:
|
|
paste = conn.execute('SELECT deletion_token FROM pastes WHERE id = ?', (paste_id,)).fetchone()
|
|
if not paste:
|
|
return jsonify({'error': 'Paste not found'}), 404
|
|
|
|
# Verify token
|
|
if paste['deletion_token'] != token:
|
|
return jsonify({'error': 'Invalid deletion token'}), 403
|
|
|
|
conn.execute('DELETE FROM pastes WHERE id = ?', (paste_id,))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
return jsonify({'success': True, 'message': 'Paste deleted successfully'})
|
|
|
|
@app.route('/<string:paste_id>')
|
|
def view_paste(paste_id):
|
|
if not _PASTE_ID_RE.match(paste_id):
|
|
abort(404)
|
|
paste = _get_paste_or_abort(paste_id)
|
|
# Increment view count in a separate connection after expiry check.
|
|
conn = get_db_connection()
|
|
try:
|
|
conn.execute('UPDATE pastes SET views = views + 1 WHERE id = ?', (paste_id,))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
return render_template('view.html', paste=paste)
|
|
|
|
@app.route('/<string:paste_id>/raw')
|
|
def view_paste_raw(paste_id):
|
|
if not _PASTE_ID_RE.match(paste_id):
|
|
abort(404)
|
|
paste = _get_paste_or_abort(paste_id)
|
|
stored = paste['encrypted_data']
|
|
|
|
# 1. Plaintext paste — Return content directly as text/plain
|
|
if not re.match(r'^[A-Za-z0-9_-]+:[A-Za-z0-9_-]+$', stored):
|
|
try:
|
|
data = json.loads(stored)
|
|
return Response(data.get('content', ''), mimetype='text/plain; charset=utf-8')
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
|
|
# 2. Encrypted paste — Browsers get a minimal decryptor; API consumers get JSON
|
|
accept = request.headers.get('Accept', '')
|
|
if 'text/html' in accept:
|
|
# Minimal HTML shell that handles decryption for browsers (E2E)
|
|
return render_template('raw_decryptor.html', paste=paste)
|
|
|
|
# API response
|
|
return jsonify({
|
|
'id': paste['id'],
|
|
'encrypted_data': stored,
|
|
'created_at': paste['created_at'],
|
|
'expires_at': paste['expires_at'],
|
|
'views': paste['views'],
|
|
})
|
|
|
|
@app.route('/api/languages')
|
|
def get_languages():
|
|
return jsonify(CFG.get('languages', []))
|
|
|
|
@app.route('/api/config')
|
|
def get_client_config():
|
|
"""Expose a whitelisted subset of config to the browser. Never forward entire blobs."""
|
|
return jsonify({
|
|
'site': {
|
|
'name': _site.get('name', 'Bastebin'),
|
|
'tagline': _site.get('tagline', ''),
|
|
'brand_icon': _site.get('brand_icon', ''),
|
|
'footer_text': _site.get('footer_text', ''),
|
|
},
|
|
'theme': {
|
|
'default': _theme.get('default', 'auto'),
|
|
'allow_user_toggle': _theme.get('allow_user_toggle', True),
|
|
'light': _theme.get('light', {}),
|
|
'dark': _theme.get('dark', {}),
|
|
},
|
|
'features': {
|
|
'encrypt_pastes': _feat.get('encrypt_pastes', True),
|
|
'show_recent': _feat.get('show_recent', False),
|
|
'show_view_count': _feat.get('show_view_count', True),
|
|
'show_e2e_banner': _feat.get('show_e2e_banner', True),
|
|
'allow_raw_api': _feat.get('allow_raw_api', True),
|
|
'auto_save_draft': _feat.get('auto_save_draft', True),
|
|
'draft_max_age_days': _feat.get('draft_max_age_days', 7),
|
|
'keyboard_shortcuts': _feat.get('keyboard_shortcuts', True),
|
|
},
|
|
'ui': {
|
|
'code_font_family': _ui.get('code_font_family', 'monospace'),
|
|
'code_font_size': _ui.get('code_font_size', '0.875rem'),
|
|
'code_line_height': _ui.get('code_line_height', '1.6'),
|
|
'textarea_rows': _ui.get('textarea_rows', 20),
|
|
'border_radius': _ui.get('border_radius', '8px'),
|
|
'animation_speed': _ui.get('animation_speed', '0.2s'),
|
|
},
|
|
'pastes': {
|
|
'default_language': _pastes.get('default_language', 'text'),
|
|
'default_expiry': _pastes.get('default_expiry', 'never'),
|
|
'allow_expiry_options': _pastes.get('allow_expiry_options', []),
|
|
'expiry_labels': _pastes.get('expiry_labels', {}),
|
|
'encryption_key_bits': _pastes.get('encryption_key_bits', 128),
|
|
},
|
|
})
|
|
|
|
@app.route('/recent')
|
|
def recent_pastes():
|
|
"""Show recently created pastes (only available when show_recent is enabled in config)."""
|
|
if not _feat.get('show_recent', False):
|
|
abort(404)
|
|
limit = int(_pastes.get('recent_limit', 50))
|
|
now = datetime.datetime.now(_UTC).isoformat()
|
|
conn = get_db_connection()
|
|
try:
|
|
pastes = conn.execute(
|
|
'''
|
|
SELECT id, created_at, expires_at, views FROM pastes
|
|
WHERE expires_at IS NULL OR expires_at > ?
|
|
ORDER BY created_at DESC LIMIT ?
|
|
''',
|
|
(now, limit)
|
|
).fetchall()
|
|
finally:
|
|
conn.close()
|
|
return render_template('recent.html', pastes=pastes)
|
|
|
|
# ── Admin Panel ───────────────────────────────────────────────────────────────
|
|
|
|
def is_admin():
|
|
return session.get('admin_logged_in') == True
|
|
|
|
@app.route('/admin/login', methods=['GET', 'POST'])
|
|
def admin_login():
|
|
error = None
|
|
admin_cfg = CFG.get('admin', {})
|
|
if request.method == 'POST':
|
|
# 1. Rate limit
|
|
if not _check_rate_limit(request.remote_addr, key_prefix='admin_login', window=3600, limit=5):
|
|
return render_template('admin_login.html', error='Too many login attempts. Try again later.'), 429
|
|
|
|
# 2. CSRF
|
|
if not _is_same_origin():
|
|
return "CSRF Blocked", 403
|
|
|
|
user = request.form.get('username')
|
|
pw = request.form.get('password')
|
|
stored_hash = admin_cfg.get('pass')
|
|
if user == admin_cfg.get('user') and check_password_hash(stored_hash, pw):
|
|
session['admin_logged_in'] = True
|
|
return redirect(url_for('admin_dashboard'))
|
|
else:
|
|
time.sleep(1) # Slow down brute force
|
|
error = 'Invalid credentials'
|
|
return render_template('admin_login.html', error=error)
|
|
|
|
@app.route('/admin/logout')
|
|
def admin_logout():
|
|
session.pop('admin_logged_in', None)
|
|
return redirect(url_for('index'))
|
|
|
|
@app.route('/admin')
|
|
def admin_dashboard():
|
|
if not is_admin():
|
|
return redirect(url_for('admin_login'))
|
|
|
|
now = datetime.datetime.now(_UTC).isoformat()
|
|
conn = get_db_connection()
|
|
try:
|
|
pastes = conn.execute(
|
|
'SELECT id, created_at, expires_at, views FROM pastes ORDER BY created_at DESC'
|
|
).fetchall()
|
|
finally:
|
|
conn.close()
|
|
return render_template('admin_dashboard.html', pastes=pastes)
|
|
|
|
@app.route('/admin/delete/<string:paste_id>', methods=['POST'])
|
|
def admin_delete_paste(paste_id):
|
|
if not is_admin():
|
|
abort(403)
|
|
|
|
if not _is_same_origin():
|
|
abort(403)
|
|
|
|
conn = get_db_connection()
|
|
try:
|
|
conn.execute('DELETE FROM pastes WHERE id = ?', (paste_id,))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
return redirect(url_for('admin_dashboard'))
|
|
|
|
# ── Error handlers ────────────────────────────────────────────────────────────
|
|
|
|
@app.errorhandler(404)
|
|
def not_found(error):
|
|
return render_template('404.html'), 404
|
|
|
|
@app.errorhandler(410)
|
|
def gone(error):
|
|
return render_template('410.html'), 410
|
|
|
|
# ── Entry point ───────────────────────────────────────────────────────────────
|
|
|
|
if __name__ == '__main__':
|
|
init_db()
|
|
app.run(
|
|
debug=_server.get('debug', False),
|
|
host=_server.get('host', '0.0.0.0'),
|
|
port=_server.get('port', 5000)
|
|
)
|