Complete ircquotes application with all features

- Added copy quote functionality with clipboard integration
- Implemented bulk moderation actions for admin
- Created mobile responsive design with bash.org styling
- Added API rate limiting per IP address
- Implemented dark mode toggle with flash prevention
- Enhanced error messages throughout application
- Fixed all security vulnerabilities (SQL injection, XSS, CSRF)
- Added comprehensive rate limiting on all endpoints
- Implemented secure session configuration
- Added input validation and length limits
- Created centralized configuration system with config.json
- Set up production deployment with Gunicorn
- Added security headers and production hardening
- Added password generation and config management tools
This commit is contained in:
2025-09-20 19:41:23 +01:00
parent 0b1241714d
commit f409977257
21 changed files with 1936 additions and 304 deletions

602
app.py
View File

@@ -3,6 +3,7 @@ from flask_sqlalchemy import SQLAlchemy
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_cors import CORS
from flask_wtf.csrf import CSRFProtect
import datetime
import json
import random
@@ -10,10 +11,58 @@ from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
from werkzeug.middleware.proxy_fix import ProxyFix # Import ProxyFix
import logging
from sqlalchemy import event
from sqlalchemy.engine import Engine
import sqlite3
from config_loader import config # Import configuration system
# Configure SQLite for better concurrency
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
if isinstance(dbapi_connection, sqlite3.Connection):
cursor = dbapi_connection.cursor()
# Set WAL mode for better concurrency
cursor.execute("PRAGMA journal_mode=WAL")
# Set timeout for locked database
cursor.execute("PRAGMA busy_timeout=30000") # 30 seconds
# Optimize for performance
cursor.execute("PRAGMA synchronous=NORMAL")
cursor.execute("PRAGMA cache_size=1000")
cursor.execute("PRAGMA temp_store=memory")
cursor.close()
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///quotes.db'
app.config['SQLALCHEMY_DATABASE_URI'] = config.database_uri
app.config['SECRET_KEY'] = open("instance/flask_secret_key", "r").read().strip()
# Configure secure session settings from config
app.config['SESSION_COOKIE_SECURE'] = config.get('security.session_cookie_secure', False)
app.config['SESSION_COOKIE_HTTPONLY'] = config.get('security.session_cookie_httponly', True)
app.config['SESSION_COOKIE_SAMESITE'] = config.get('security.session_cookie_samesite', 'Lax')
# Configure CSRF protection from config
app.config['WTF_CSRF_ENABLED'] = config.csrf_enabled
app.config['WTF_CSRF_TIME_LIMIT'] = config.get('security.csrf_time_limit')
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_timeout': config.get('database.pool_timeout', 20),
'pool_recycle': config.get('database.pool_recycle', -1),
'pool_pre_ping': config.get('database.pool_pre_ping', True)
}
# Initialize CSRF protection
csrf = CSRFProtect(app)
# Exempt API endpoints from CSRF protection
csrf.exempt('get_all_quotes')
csrf.exempt('get_quote')
csrf.exempt('get_random_quote')
csrf.exempt('get_top_quotes')
csrf.exempt('search_quotes')
csrf.exempt('get_stats')
# Initialize rate limiter
limiter = Limiter(app, key_func=get_remote_address)
db = SQLAlchemy(app)
# Apply ProxyFix middleware
@@ -22,14 +71,32 @@ app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1, x_
# Initialize Argon2 password hasher
ph = PasswordHasher()
# Initialize logging for debugging
logging.basicConfig(level=logging.DEBUG)
# Configure logging from config
logging.basicConfig(
level=getattr(logging, config.logging_level),
format=config.get('logging.format', '%(asctime)s [%(levelname)s] %(message)s')
)
# Hardcoded admin credentials (hashed password using Argon2)
# Add security headers from config
@app.after_request
def add_security_headers(response):
headers = config.get('security.security_headers', {})
if headers.get('x_content_type_options'):
response.headers['X-Content-Type-Options'] = headers['x_content_type_options']
if headers.get('x_frame_options'):
response.headers['X-Frame-Options'] = headers['x_frame_options']
if headers.get('x_xss_protection'):
response.headers['X-XSS-Protection'] = headers['x_xss_protection']
if headers.get('strict_transport_security'):
response.headers['Strict-Transport-Security'] = headers['strict_transport_security']
if headers.get('content_security_policy'):
response.headers['Content-Security-Policy'] = headers['content_security_policy']
return response
# Admin credentials from config
ADMIN_CREDENTIALS = {
'username': 'admin',
# Replace this with the hashed password generated by Argon2
'password': '$argon2i$v=19$m=65536,t=4,p=1$cWZDc1pQaUJLTUJoaVI4cw$kn8XKz6AEZi8ebXfyyZuzommSypliVFrsGqzOyUEIHA' # Example hash
'username': config.admin_username,
'password': config.admin_password_hash
}
# Define the Quote model
@@ -42,6 +109,7 @@ class Quote(db.Model):
ip_address = db.Column(db.String(45)) # Store IPv4 and IPv6 addresses
user_agent = db.Column(db.String(255)) # Store user-agent strings
submitted_at = db.Column(db.DateTime, default=datetime.datetime.utcnow)
flag_count = db.Column(db.Integer, default=0) # Track how many times quote has been flagged
# Home route to display quotes
@app.route('/')
@@ -57,12 +125,32 @@ def index():
# Separate route for submitting quotes
@app.route('/submit', methods=['GET', 'POST'])
@limiter.limit(config.get('rate_limiting.endpoints.submit', '5 per minute'))
def submit():
if request.method == 'POST':
quote_text = request.form.get('quote')
if not quote_text:
flash("Quote cannot be empty.", 'error')
flash("Oops! Your quote seems to be empty. Please enter some text before submitting.", 'error')
return redirect(url_for('submit'))
# Input validation and length limits from config
quote_text = quote_text.strip()
min_length = config.min_quote_length
max_length = config.max_quote_length
if len(quote_text) < min_length:
flash(f"Your quote is too short. Please enter at least {min_length} characters.", 'error')
return redirect(url_for('submit'))
if len(quote_text) > max_length:
flash(f"Your quote is too long. Please keep it under {max_length} characters.", 'error')
return redirect(url_for('submit'))
# Basic content validation (no scripts or dangerous content)
if not config.get('quotes.allow_html', False):
if '<script' in quote_text.lower() or 'javascript:' in quote_text.lower():
flash("Invalid content detected. Please remove any script tags or JavaScript.", 'error')
return redirect(url_for('submit'))
ip_address = request.headers.get('CF-Connecting-IP', request.remote_addr) # Get the user's IP address
user_agent = request.headers.get('User-Agent') # Get the user's browser info
@@ -72,10 +160,10 @@ def submit():
try:
db.session.add(new_quote)
db.session.commit()
flash("Quote submitted successfully!", 'success')
flash("Thanks! Your quote has been submitted and is awaiting approval by our moderators.", 'success')
except Exception as e:
db.session.rollback()
flash("Error submitting quote: {}".format(e), 'error')
flash("Sorry, something went wrong while submitting your quote. Please try again in a moment.", 'error')
return redirect(url_for('index'))
@@ -86,16 +174,22 @@ def submit():
return render_template('submit.html', approved_count=approved_count, pending_count=pending_count)
@app.route('/vote/<int:id>/<action>')
@limiter.limit("20 per minute")
def vote(id, action):
quote = Quote.query.get_or_404(id)
# Retrieve vote history from the cookie
vote_cookie = request.cookies.get('votes')
if vote_cookie:
vote_data = json.loads(vote_cookie)
try:
vote_data = json.loads(vote_cookie)
except (json.JSONDecodeError, ValueError):
# If cookie is corrupted, start fresh
vote_data = {}
else:
vote_data = {}
message = ""
# If no prior vote, apply the new vote
if str(id) not in vote_data:
if action == 'upvote':
@@ -104,7 +198,7 @@ def vote(id, action):
elif action == 'downvote':
quote.votes -= 1
vote_data[str(id)] = 'downvote'
flash("Thank you for voting!", 'success')
message = "Thank you for voting!"
else:
previous_action = vote_data[str(id)]
@@ -116,7 +210,7 @@ def vote(id, action):
elif action == 'downvote':
quote.votes += 1
del vote_data[str(id)] # Remove the vote record (undo)
flash("Your vote has been undone.", 'success')
message = "Your vote has been undone."
else:
# If the user switches votes (upvote -> downvote or vice versa)
if previous_action == 'upvote' and action == 'downvote':
@@ -125,33 +219,56 @@ def vote(id, action):
elif previous_action == 'downvote' and action == 'upvote':
quote.votes += 2 # Undo downvote (-1) and apply upvote (+1)
vote_data[str(id)] = 'upvote'
flash("Your vote has been changed.", 'success')
message = "Your vote has been changed."
# Save the updated vote data to the cookie
try:
db.session.commit()
page = request.args.get('page', 1)
resp = make_response(redirect(url_for('browse', page=page)))
resp.set_cookie('votes', json.dumps(vote_data), max_age=60*60*24*365) # Store vote history in cookies for 1 year
return resp
# Check if it's an AJAX request
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
# Return JSON response for AJAX
resp = make_response(jsonify({
'success': True,
'votes': quote.votes,
'user_vote': vote_data.get(str(id)),
'message': message
}))
resp.set_cookie('votes', json.dumps(vote_data), max_age=60*60*24*365)
return resp
else:
# Traditional redirect for non-AJAX requests
flash(message, 'success')
page = request.args.get('page', 1)
resp = make_response(redirect(url_for('browse', page=page)))
resp.set_cookie('votes', json.dumps(vote_data), max_age=60*60*24*365)
return resp
except Exception as e:
db.session.rollback()
flash(f"Error while voting: {e}", 'error')
return redirect(url_for('browse', page=page))
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'success': False,
'message': f"Error while voting: {e}"
}), 500
else:
flash(f"Error while voting: {e}", 'error')
page = request.args.get('page', 1)
return redirect(url_for('browse', page=page))
# Route for displaying a random quote
@app.route('/random')
def random_quote():
approved_count = Quote.query.filter_by(status=1).count()
pending_count = Quote.query.filter_by(status=0).count()
count = Quote.query.count()
count = Quote.query.filter_by(status=1).count() # Only count approved quotes
if count == 0:
flash("No quotes available yet.", 'error')
flash("No quotes have been approved yet. Check back later or submit the first one!", 'error')
return redirect(url_for('index'))
random_id = random.randint(1, count)
random_quote = Quote.query.get(random_id)
# Use offset to get a random quote from approved quotes
random_offset = random.randint(0, count - 1)
random_quote = Quote.query.filter_by(status=1).offset(random_offset).first()
return render_template('random.html', quote=random_quote, approved_count=approved_count, pending_count=pending_count)
@@ -165,7 +282,7 @@ def quote_homepathid(id):
def quote():
quote_id = request.args.get('id')
if not quote_id:
flash("Quote ID not provided.", 'error')
flash("Please enter a quote number to view that specific quote.", 'error')
return redirect(url_for('browse'))
quote = Quote.query.get_or_404(quote_id)
@@ -175,8 +292,50 @@ def quote():
def faq():
return render_template('faq.html')
# Flag/Report a quote route
@app.route('/flag/<int:id>')
@limiter.limit("10 per minute")
def flag_quote(id):
quote = Quote.query.get_or_404(id)
# Increment flag count
quote.flag_count += 1
try:
db.session.commit()
message = 'Quote has been flagged for review. Thank you for helping keep the site clean!'
# Check if it's an AJAX request
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'success': True,
'message': message,
'flag_count': quote.flag_count
})
else:
flash(message, 'success')
except Exception as e:
db.session.rollback()
error_message = 'Error flagging quote. Please try again.'
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'success': False,
'message': error_message
}), 500
else:
flash(error_message, 'error')
# For non-AJAX requests, redirect back to the same page
referer = request.headers.get('Referer')
if referer and any(path in referer for path in ['/browse', '/quote', '/random', '/search']):
return redirect(referer)
else:
return redirect(url_for('browse'))
# Admin login route
@app.route('/login', methods=['GET', 'POST'])
@limiter.limit(config.get('rate_limiting.endpoints.login', '5 per minute'))
def login():
if request.method == 'POST':
username = request.form['username']
@@ -187,23 +346,24 @@ def login():
try:
ph.verify(ADMIN_CREDENTIALS['password'], password) # Verify password using Argon2
session['admin'] = True
flash('Login successful!', 'success')
flash('Welcome back! You are now logged in as administrator.', 'success')
return redirect(url_for('modapp'))
except VerifyMismatchError:
flash('Invalid password. Please try again.', 'danger')
flash('The password you entered is incorrect. Please check your password and try again.', 'danger')
else:
flash('Invalid username. Please try again.', 'danger')
flash('The username you entered is not recognized. Please check your username and try again.', 'danger')
return render_template('login.html')
# Admin panel route (accessible only to logged-in admins)
@app.route('/modapp')
@limiter.limit("20 per minute")
def modapp():
if not session.get('admin'):
flash('You need to log in first.', 'danger')
flash('Access denied. Please log in with administrator credentials to access the moderation panel.', 'danger')
return redirect(url_for('login'))
# Apply filtering (pending, approved, rejected)
# Apply filtering (pending, approved, rejected, flagged)
filter_status = request.args.get('filter', 'pending')
page = request.args.get('page', 1, type=int)
@@ -211,6 +371,9 @@ def modapp():
quotes = Quote.query.filter_by(status=1).order_by(Quote.date.desc()).paginate(page=page, per_page=10)
elif filter_status == 'rejected':
quotes = Quote.query.filter_by(status=2).order_by(Quote.date.desc()).paginate(page=page, per_page=10)
elif filter_status == 'flagged':
# Show quotes with flag_count > 0, ordered by flag count (highest first)
quotes = Quote.query.filter(Quote.flag_count > 0).order_by(Quote.flag_count.desc(), Quote.date.desc()).paginate(page=page, per_page=10)
else: # Default to pending
quotes = Quote.query.filter_by(status=0).order_by(Quote.date.desc()).paginate(page=page, per_page=10)
@@ -218,10 +381,63 @@ def modapp():
approved_count = Quote.query.filter_by(status=1).count()
pending_count = Quote.query.filter_by(status=0).count()
rejected_count = Quote.query.filter_by(status=2).count()
flagged_count = Quote.query.filter(Quote.flag_count > 0).count()
return render_template('modapp.html', quotes=quotes, filter_status=filter_status,
approved_count=approved_count, pending_count=pending_count,
rejected_count=rejected_count)
rejected_count=rejected_count, flagged_count=flagged_count)
# Bulk actions route for modapp
@app.route('/modapp/bulk', methods=['POST'])
@limiter.limit("10 per minute")
def modapp_bulk():
if not session.get('admin'):
flash('Access denied. Administrator login required for bulk actions.', 'danger')
return redirect(url_for('login'))
action = request.form.get('action')
quote_ids = request.form.getlist('quote_ids')
if not quote_ids:
flash('Please select at least one quote before performing a bulk action.', 'error')
return redirect(url_for('modapp'))
if not action or action not in ['approve', 'reject', 'delete', 'clear_flags']:
flash('The requested action is not supported. Please try again or contact support.', 'error')
return redirect(url_for('modapp'))
success_count = 0
try:
for quote_id in quote_ids:
quote = Quote.query.get(int(quote_id))
if quote:
if action == 'approve':
quote.status = 1
success_count += 1
elif action == 'reject':
quote.status = 2
success_count += 1
elif action == 'delete':
db.session.delete(quote)
success_count += 1
elif action == 'clear_flags':
quote.flag_count = 0
success_count += 1
db.session.commit()
if action == 'clear_flags':
flash(f'Successfully cleared flags on {success_count} quote(s).', 'success')
else:
flash(f'Successfully {action}d {success_count} quote(s).', 'success')
except Exception as e:
db.session.rollback()
flash(f'Error performing bulk action: {str(e)}', 'error')
return redirect(url_for('modapp'))
# Helper function to approve a quote
@@ -235,15 +451,13 @@ def approve_quote(quote_id):
def reject_quote(quote_id):
quote = Quote.query.get(quote_id)
if quote and quote.status != 2: # Only reject if not already rejected
logging.debug(f"Rejecting quote ID: {quote.id}") # Add logging for rejection
quote.status = 2 # Rejected
quote.status = 'rejected'
db.session.commit()
# Helper function to delete a quote
def delete_quote(quote_id):
quote = Quote.query.get(quote_id)
if quote:
logging.debug(f"Deleting quote ID: {quote.id}") # Add logging for deletion
db.session.delete(quote)
db.session.commit()
@@ -257,8 +471,8 @@ def search():
pending_count = Quote.query.filter_by(status=0).count()
if query:
# Perform text search in quotes
quotes = Quote.query.filter(Quote.text.like(f'%{query}%'), Quote.status == 1).all()
# Perform text search in quotes using safe parameterized query
quotes = Quote.query.filter(Quote.text.contains(query), Quote.status == 1).all()
return render_template('search.html', quotes=quotes, query=query, approved_count=approved_count, pending_count=pending_count)
@@ -267,7 +481,7 @@ def read_quote():
quote_id = request.args.get('id', type=int) # Get the quote number
if not quote_id:
flash("Quote number is required.", 'error')
flash("Please enter a valid quote number to search for that specific quote.", 'error')
return redirect(url_for('search'))
# Find the quote by ID (only approved quotes)
@@ -286,9 +500,10 @@ def browse():
approved_count = Quote.query.filter_by(status=1).count()
pending_count = Quote.query.filter_by(status=0).count()
# Pagination setup
# Pagination setup with config
page = request.args.get('page', 1, type=int)
quotes = Quote.query.filter_by(status=1).order_by(Quote.date.desc()).paginate(page=page, per_page=10)
per_page = config.quotes_per_page
quotes = Quote.query.filter_by(status=1).order_by(Quote.date.desc()).paginate(page=page, per_page=per_page)
# Pass the counts and the quotes to the template
return render_template('browse.html', quotes=quotes, approved_count=approved_count, pending_count=pending_count)
@@ -296,6 +511,7 @@ def browse():
# Approve a quote (admin only)
@app.route('/approve/<int:id>')
@limiter.limit("30 per minute")
def approve(id):
if not session.get('admin'):
return redirect(url_for('login'))
@@ -310,6 +526,7 @@ def approve(id):
# Reject a quote (admin only)
@app.route('/reject/<int:id>')
@limiter.limit("30 per minute")
def reject(id):
if not session.get('admin'):
return redirect(url_for('login'))
@@ -321,6 +538,7 @@ def reject(id):
# Delete a quote (admin only)
@app.route('/delete/<int:id>')
@limiter.limit("20 per minute")
def delete(id):
if not session.get('admin'):
return redirect(url_for('login'))
@@ -330,6 +548,22 @@ def delete(id):
db.session.commit()
return redirect(url_for('modapp'))
# Clear flags from a quote (admin only)
@app.route('/clear_flags/<int:id>')
def clear_flags(id):
if not session.get('admin'):
return redirect(url_for('login'))
quote = Quote.query.get_or_404(id)
quote.flag_count = 0
db.session.commit()
flash(f'Flags cleared for quote #{id}. Quote remains {["pending", "approved", "rejected"][quote.status]}.', 'success')
# Redirect back to the same page with filter preserved
page = request.args.get('page', 1)
filter_status = request.args.get('filter', 'flagged')
return redirect(url_for('modapp', page=page, filter=filter_status))
# Admin logout route
@app.route('/logout')
def logout():
@@ -340,38 +574,86 @@ def logout():
# Automatically create the database tables using app context
with app.app_context():
db.create_all()
# Add flag_count column if it doesn't exist (for existing databases)
try:
# Try to access flag_count on a quote to test if column exists
test_query = db.session.execute(db.text("SELECT flag_count FROM quote LIMIT 1"))
except Exception as e:
if "no such column" in str(e).lower():
# Add the missing column using raw SQL
db.session.execute(db.text("ALTER TABLE quote ADD COLUMN flag_count INTEGER DEFAULT 0"))
db.session.commit()
print("Added flag_count column to existing database")
# Initialize rate limiter and CORS for cross-origin API access
limiter = Limiter(app, key_func=get_remote_address)
# Initialize CORS for cross-origin API access
CORS(app)
# API to get all approved quotes
# API to get all approved quotes with pagination
@app.route('/api/quotes', methods=['GET'])
@limiter.limit("60 per minute")
def get_all_quotes():
quotes = Quote.query.filter_by(status=1).all() # Only approved quotes
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 20, type=int), 100) # Max 100 per page
sort_by = request.args.get('sort', 'date') # date, votes, id
order = request.args.get('order', 'desc') # asc, desc
# Build query
query = Quote.query.filter_by(status=1)
# Apply sorting
if sort_by == 'votes':
if order == 'asc':
query = query.order_by(Quote.votes.asc())
else:
query = query.order_by(Quote.votes.desc())
elif sort_by == 'id':
if order == 'asc':
query = query.order_by(Quote.id.asc())
else:
query = query.order_by(Quote.id.desc())
else: # Default to date
if order == 'asc':
query = query.order_by(Quote.date.asc())
else:
query = query.order_by(Quote.date.desc())
# Paginate
quotes = query.paginate(page=page, per_page=per_page, error_out=False)
quote_list = [{
'id': quote.id,
'text': quote.text,
'votes': quote.votes,
'date': quote.date.strftime('%Y-%m-%d')
} for quote in quotes]
'votes': quote.votes
} for quote in quotes.items]
return jsonify(quote_list), 200
return jsonify({
'quotes': quote_list,
'pagination': {
'page': quotes.page,
'pages': quotes.pages,
'per_page': quotes.per_page,
'total': quotes.total,
'has_next': quotes.has_next,
'has_prev': quotes.has_prev
}
}), 200
# API to get a specific quote by ID
@app.route('/api/quotes/<int:id>', methods=['GET'])
@limiter.limit("120 per minute")
def get_quote(id):
quote = Quote.query.filter_by(id=id, status=1).first_or_404() # Only approved quotes
quote_data = {
'id': quote.id,
'text': quote.text,
'votes': quote.votes,
'date': quote.date.strftime('%Y-%m-%d')
'votes': quote.votes
}
return jsonify(quote_data), 200
# API to get a random approved quote
@app.route('/api/random', methods=['GET'])
@limiter.limit("30 per minute")
def get_random_quote():
count = Quote.query.filter_by(status=1).count()
if count == 0:
@@ -383,70 +665,216 @@ def get_random_quote():
quote_data = {
'id': random_quote.id,
'text': random_quote.text,
'votes': random_quote.votes,
'date': random_quote.date.strftime('%Y-%m-%d')
'votes': random_quote.votes
}
return jsonify(quote_data), 200
# API to get the top quotes by vote count
@app.route('/api/top', methods=['GET'])
@limiter.limit("30 per minute")
def get_top_quotes():
top_quotes = Quote.query.filter_by(status=1).order_by(Quote.votes.desc()).limit(10).all() # Limit to top 10
limit = min(request.args.get('limit', 10, type=int), 100) # Default 10, max 100
min_votes = request.args.get('min_votes', 0, type=int) # Minimum vote threshold
top_quotes = Quote.query.filter(Quote.status == 1, Quote.votes >= min_votes).order_by(Quote.votes.desc()).limit(limit).all()
quote_list = [{
'id': quote.id,
'text': quote.text,
'votes': quote.votes,
'date': quote.date.strftime('%Y-%m-%d')
'votes': quote.votes
} for quote in top_quotes]
return jsonify(quote_list), 200
return jsonify({
'quotes': quote_list,
'meta': {
'limit': limit,
'min_votes': min_votes,
'count': len(quote_list)
}
}), 200
# API to search for quotes
# API to search for quotes with pagination
@app.route('/api/search', methods=['GET'])
@limiter.limit("40 per minute")
def search_quotes():
query = request.args.get('q', '').strip()
if not query:
return jsonify({"error": "No search term provided"}), 400
quotes = Quote.query.filter(Quote.text.ilike(f'%{query}%'), Quote.status == 1).all() # Search in approved quotes
if not quotes:
return jsonify({"error": "No quotes found for search term"}), 404
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 20, type=int), 100) # Max 100 per page
# Search in approved quotes with pagination using safe parameterized query
quotes = Quote.query.filter(
Quote.text.contains(query),
Quote.status == 1
).order_by(Quote.votes.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
if not quotes.items:
return jsonify({
"error": "No quotes found for search term",
"search_term": query,
"total_results": 0
}), 404
quote_list = [{
'id': quote.id,
'text': quote.text,
'votes': quote.votes,
'date': quote.date.strftime('%Y-%m-%d')
} for quote in quotes]
'votes': quote.votes
} for quote in quotes.items]
return jsonify(quote_list), 200
return jsonify({
'quotes': quote_list,
'search_term': query,
'pagination': {
'page': quotes.page,
'pages': quotes.pages,
'per_page': quotes.per_page,
'total': quotes.total,
'has_next': quotes.has_next,
'has_prev': quotes.has_prev
}
}), 200
# API to submit a new quote
# API to get quote statistics
@app.route('/api/stats', methods=['GET'])
@limiter.limit("20 per minute")
def get_stats():
total_quotes = Quote.query.count()
approved_quotes = Quote.query.filter_by(status=1).count()
pending_quotes = Quote.query.filter_by(status=0).count()
rejected_quotes = Quote.query.filter_by(status=2).count()
flagged_quotes = Quote.query.filter(Quote.flag_count > 0).count()
# Vote statistics
top_voted = Quote.query.filter_by(status=1).order_by(Quote.votes.desc()).first()
total_votes = db.session.query(db.func.sum(Quote.votes)).filter_by(status=1).scalar() or 0
avg_votes = db.session.query(db.func.avg(Quote.votes)).filter_by(status=1).scalar() or 0
return jsonify({
'total_quotes': total_quotes,
'approved_quotes': approved_quotes,
'pending_quotes': pending_quotes,
'rejected_quotes': rejected_quotes,
'flagged_quotes': flagged_quotes,
'vote_stats': {
'total_votes': int(total_votes),
'average_votes': round(float(avg_votes), 2),
'highest_voted': {
'id': top_voted.id if top_voted else None,
'votes': top_voted.votes if top_voted else 0,
'text_preview': top_voted.text[:100] + '...' if top_voted and len(top_voted.text) > 100 else (top_voted.text if top_voted else None)
}
}
}), 200
# API documentation endpoint
@app.route('/api/docs', methods=['GET'])
@limiter.limit("10 per minute")
def api_docs():
docs = {
"ircquotes.org API Documentation": {
"version": "1.0",
"description": "Read-only API for accessing IRC quotes",
"base_url": request.url_root + "api/",
"endpoints": {
"/api/quotes": {
"method": "GET",
"description": "Get paginated list of approved quotes",
"parameters": {
"page": "Page number (default: 1)",
"per_page": "Results per page (default: 20, max: 100)",
"sort": "Sort by 'date', 'votes', or 'id' (default: 'date')",
"order": "Sort order 'asc' or 'desc' (default: 'desc')"
},
"example": "/api/quotes?page=1&per_page=10&sort=votes&order=desc"
},
"/api/quotes/<id>": {
"method": "GET",
"description": "Get a specific quote by ID",
"parameters": {
"id": "Quote ID (required)"
},
"example": "/api/quotes/12345"
},
"/api/random": {
"method": "GET",
"description": "Get a random approved quote",
"parameters": "None",
"example": "/api/random"
},
"/api/top": {
"method": "GET",
"description": "Get top-voted quotes",
"parameters": {
"limit": "Number of quotes to return (default: 10, max: 100)",
"min_votes": "Minimum vote threshold (default: 0)"
},
"example": "/api/top?limit=20&min_votes=5"
},
"/api/search": {
"method": "GET",
"description": "Search quotes by text content",
"parameters": {
"q": "Search query (required)",
"page": "Page number (default: 1)",
"per_page": "Results per page (default: 20, max: 100)"
},
"example": "/api/search?q=linux&page=1&per_page=10"
},
"/api/stats": {
"method": "GET",
"description": "Get quote database statistics",
"parameters": "None",
"example": "/api/stats"
}
},
"response_format": {
"quotes": "Array of quote objects",
"quote_object": {
"id": "Quote ID",
"text": "Quote text content",
"votes": "Current vote count",
"date": "Creation date (YYYY-MM-DD)",
"datetime": "Full timestamp (YYYY-MM-DD HH:MM:SS)"
},
"pagination": {
"page": "Current page number",
"pages": "Total pages",
"per_page": "Results per page",
"total": "Total results",
"has_next": "Boolean - has next page",
"has_prev": "Boolean - has previous page"
}
},
"notes": [
"All endpoints return only approved quotes",
"Rate limiting may apply to prevent abuse",
"All responses are in JSON format",
"CORS is enabled for cross-origin requests"
]
}
}
return jsonify(docs), 200
# API to submit a new quote (DISABLED for abuse prevention)
@app.route('/api/submit', methods=['POST'])
@limiter.limit("5 per minute") # Rate limiting to prevent abuse
@limiter.limit("5 per minute")
def submit_quote():
data = request.get_json()
return jsonify({
"error": "Quote submission via API is currently disabled to prevent abuse.",
"message": "Please use the web interface at /submit to submit quotes.",
"web_submit_url": request.url_root + "submit"
}), 403
# Validate the input
if not data or not data.get('text'):
return jsonify({"error": "Quote text is required"}), 400
quote_text = data.get('text').strip()
# Create tables if they don't exist
with app.app_context():
db.create_all()
# Basic validation to prevent spam
if len(quote_text) < 5 or len(quote_text) > 1000:
return jsonify({"error": "Quote must be between 5 and 1000 characters"}), 400
new_quote = Quote(text=quote_text)
try:
db.session.add(new_quote)
db.session.commit()
return jsonify({"success": "Quote submitted successfully!", "id": new_quote.id}), 201
except Exception as e:
db.session.rollback()
return jsonify({"error": f"Error submitting quote: {str(e)}"}), 500
# Run the Flask app
# For Gunicorn deployment
if __name__ == '__main__':
app.run(host='127.0.0.1', port=5050, debug=True)
# This is only used for local development testing
# In production, use: gunicorn -w 4 -b 0.0.0.0:5050 app:app
print("Warning: Using Flask development server. Use Gunicorn for production!")
app.run(host='127.0.0.1', port=5050, debug=False)