453 lines
16 KiB
Python
453 lines
16 KiB
Python
from flask import Flask, render_template, request, redirect, url_for, flash, abort, make_response, session, jsonify
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_limiter import Limiter
|
|
from flask_limiter.util import get_remote_address
|
|
from flask_cors import CORS
|
|
import datetime
|
|
import json
|
|
import random
|
|
from argon2 import PasswordHasher
|
|
from argon2.exceptions import VerifyMismatchError
|
|
from werkzeug.middleware.proxy_fix import ProxyFix # Import ProxyFix
|
|
import logging
|
|
|
|
app = Flask(__name__)
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///quotes.db'
|
|
app.config['SECRET_KEY'] = open("instance/flask_secret_key", "r").read().strip()
|
|
db = SQLAlchemy(app)
|
|
|
|
# Apply ProxyFix middleware
|
|
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1, x_prefix=1)
|
|
|
|
# Initialize Argon2 password hasher
|
|
ph = PasswordHasher()
|
|
|
|
# Initialize logging for debugging
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
# Hardcoded admin credentials (hashed password using Argon2)
|
|
ADMIN_CREDENTIALS = {
|
|
'username': 'admin',
|
|
# Replace this with the hashed password generated by Argon2
|
|
'password': '$argon2i$v=19$m=65536,t=4,p=1$cWZDc1pQaUJLTUJoaVI4cw$kn8XKz6AEZi8ebXfyyZuzommSypliVFrsGqzOyUEIHA' # Example hash
|
|
}
|
|
|
|
# Define the Quote model
|
|
class Quote(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
text = db.Column(db.Text, nullable=False)
|
|
votes = db.Column(db.Integer, default=0)
|
|
date = db.Column(db.DateTime, default=datetime.datetime.utcnow)
|
|
status = db.Column(db.Integer, default=0) # 0 = pending, 1 = approved, 2 = rejected
|
|
ip_address = db.Column(db.String(45)) # Store IPv4 and IPv6 addresses
|
|
user_agent = db.Column(db.String(255)) # Store user-agent strings
|
|
submitted_at = db.Column(db.DateTime, default=datetime.datetime.utcnow)
|
|
|
|
# Home route to display quotes
|
|
@app.route('/')
|
|
def index():
|
|
page = request.args.get('page', 1, type=int)
|
|
quotes = Quote.query.filter_by(status=1).order_by(Quote.date.desc()).paginate(page=page, per_page=5)
|
|
|
|
# Get the count of approved and pending quotes
|
|
approved_count = Quote.query.filter_by(status=1).count()
|
|
pending_count = Quote.query.filter_by(status=0).count()
|
|
|
|
return render_template('index.html', quotes=quotes, approved_count=approved_count, pending_count=pending_count)
|
|
|
|
# Separate route for submitting quotes
|
|
@app.route('/submit', methods=['GET', 'POST'])
|
|
def submit():
|
|
if request.method == 'POST':
|
|
quote_text = request.form.get('quote')
|
|
if not quote_text:
|
|
flash("Quote cannot be empty.", 'error')
|
|
return redirect(url_for('submit'))
|
|
|
|
ip_address = request.headers.get('CF-Connecting-IP', request.remote_addr) # Get the user's IP address
|
|
user_agent = request.headers.get('User-Agent') # Get the user's browser info
|
|
|
|
new_quote = Quote(text=quote_text, ip_address=ip_address, user_agent=user_agent)
|
|
|
|
try:
|
|
db.session.add(new_quote)
|
|
db.session.commit()
|
|
flash("Quote submitted successfully!", 'success')
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
flash("Error submitting quote: {}".format(e), 'error')
|
|
|
|
return redirect(url_for('index'))
|
|
|
|
# Get the count of approved and pending quotes
|
|
approved_count = Quote.query.filter_by(status=1).count()
|
|
pending_count = Quote.query.filter_by(status=0).count()
|
|
|
|
return render_template('submit.html', approved_count=approved_count, pending_count=pending_count)
|
|
|
|
@app.route('/vote/<int:id>/<action>')
|
|
def vote(id, action):
|
|
quote = Quote.query.get_or_404(id)
|
|
|
|
# Retrieve vote history from the cookie
|
|
vote_cookie = request.cookies.get('votes')
|
|
if vote_cookie:
|
|
vote_data = json.loads(vote_cookie)
|
|
else:
|
|
vote_data = {}
|
|
|
|
# If no prior vote, apply the new vote
|
|
if str(id) not in vote_data:
|
|
if action == 'upvote':
|
|
quote.votes += 1
|
|
vote_data[str(id)] = 'upvote'
|
|
elif action == 'downvote':
|
|
quote.votes -= 1
|
|
vote_data[str(id)] = 'downvote'
|
|
flash("Thank you for voting!", 'success')
|
|
|
|
else:
|
|
previous_action = vote_data[str(id)]
|
|
|
|
if previous_action == action:
|
|
# If the user clicks the same action again, undo the vote
|
|
if action == 'upvote':
|
|
quote.votes -= 1
|
|
elif action == 'downvote':
|
|
quote.votes += 1
|
|
del vote_data[str(id)] # Remove the vote record (undo)
|
|
flash("Your vote has been undone.", 'success')
|
|
else:
|
|
# If the user switches votes (upvote -> downvote or vice versa)
|
|
if previous_action == 'upvote' and action == 'downvote':
|
|
quote.votes -= 2 # Undo upvote (+1) and apply downvote (-1)
|
|
vote_data[str(id)] = 'downvote'
|
|
elif previous_action == 'downvote' and action == 'upvote':
|
|
quote.votes += 2 # Undo downvote (-1) and apply upvote (+1)
|
|
vote_data[str(id)] = 'upvote'
|
|
flash("Your vote has been changed.", 'success')
|
|
|
|
# Save the updated vote data to the cookie
|
|
try:
|
|
db.session.commit()
|
|
page = request.args.get('page', 1)
|
|
resp = make_response(redirect(url_for('browse', page=page)))
|
|
resp.set_cookie('votes', json.dumps(vote_data), max_age=60*60*24*365) # Store vote history in cookies for 1 year
|
|
return resp
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
flash(f"Error while voting: {e}", 'error')
|
|
return redirect(url_for('browse', page=page))
|
|
|
|
# Route for displaying a random quote
|
|
@app.route('/random')
|
|
def random_quote():
|
|
approved_count = Quote.query.filter_by(status=1).count()
|
|
pending_count = Quote.query.filter_by(status=0).count()
|
|
count = Quote.query.count()
|
|
|
|
if count == 0:
|
|
flash("No quotes available yet.", 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
random_id = random.randint(1, count)
|
|
random_quote = Quote.query.get(random_id)
|
|
|
|
return render_template('random.html', quote=random_quote, approved_count=approved_count, pending_count=pending_count)
|
|
|
|
|
|
@app.route('/<int:id>')
|
|
def quote_homepathid(id):
|
|
quote = Quote.query.get_or_404(id)
|
|
return render_template('quote.html', quote=quote)
|
|
|
|
@app.route('/quote')
|
|
def quote():
|
|
quote_id = request.args.get('id')
|
|
if not quote_id:
|
|
flash("Quote ID not provided.", 'error')
|
|
return redirect(url_for('browse'))
|
|
|
|
quote = Quote.query.get_or_404(quote_id)
|
|
return render_template('quote.html', quote=quote)
|
|
|
|
@app.route('/faq')
|
|
def faq():
|
|
return render_template('faq.html')
|
|
|
|
# Admin login route
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
|
|
# Check if the username is correct and verify the password using Argon2
|
|
if username == ADMIN_CREDENTIALS['username']:
|
|
try:
|
|
ph.verify(ADMIN_CREDENTIALS['password'], password) # Verify password using Argon2
|
|
session['admin'] = True
|
|
flash('Login successful!', 'success')
|
|
return redirect(url_for('modapp'))
|
|
except VerifyMismatchError:
|
|
flash('Invalid password. Please try again.', 'danger')
|
|
else:
|
|
flash('Invalid username. Please try again.', 'danger')
|
|
|
|
return render_template('login.html')
|
|
|
|
# Admin panel route (accessible only to logged-in admins)
|
|
@app.route('/modapp')
|
|
def modapp():
|
|
if not session.get('admin'):
|
|
flash('You need to log in first.', 'danger')
|
|
return redirect(url_for('login'))
|
|
|
|
# Apply filtering (pending, approved, rejected)
|
|
filter_status = request.args.get('filter', 'pending')
|
|
page = request.args.get('page', 1, type=int)
|
|
|
|
if filter_status == 'approved':
|
|
quotes = Quote.query.filter_by(status=1).order_by(Quote.date.desc()).paginate(page=page, per_page=10)
|
|
elif filter_status == 'rejected':
|
|
quotes = Quote.query.filter_by(status=2).order_by(Quote.date.desc()).paginate(page=page, per_page=10)
|
|
else: # Default to pending
|
|
quotes = Quote.query.filter_by(status=0).order_by(Quote.date.desc()).paginate(page=page, per_page=10)
|
|
|
|
# Get counts for each status
|
|
approved_count = Quote.query.filter_by(status=1).count()
|
|
pending_count = Quote.query.filter_by(status=0).count()
|
|
rejected_count = Quote.query.filter_by(status=2).count()
|
|
|
|
return render_template('modapp.html', quotes=quotes, filter_status=filter_status,
|
|
approved_count=approved_count, pending_count=pending_count,
|
|
rejected_count=rejected_count)
|
|
|
|
|
|
# Helper function to approve a quote
|
|
def approve_quote(quote_id):
|
|
quote = Quote.query.get(quote_id)
|
|
if quote and quote.status != 1: # Only approve if not already approved
|
|
quote.status = 1 # Approved
|
|
db.session.commit()
|
|
|
|
# Helper function to reject a quote
|
|
def reject_quote(quote_id):
|
|
quote = Quote.query.get(quote_id)
|
|
if quote and quote.status != 2: # Only reject if not already rejected
|
|
logging.debug(f"Rejecting quote ID: {quote.id}") # Add logging for rejection
|
|
quote.status = 2 # Rejected
|
|
db.session.commit()
|
|
|
|
# Helper function to delete a quote
|
|
def delete_quote(quote_id):
|
|
quote = Quote.query.get(quote_id)
|
|
if quote:
|
|
logging.debug(f"Deleting quote ID: {quote.id}") # Add logging for deletion
|
|
db.session.delete(quote)
|
|
db.session.commit()
|
|
|
|
@app.route('/search', methods=['GET'])
|
|
def search():
|
|
query = request.args.get('q', '').strip() # Get the search query
|
|
quotes = []
|
|
|
|
# Query counts of approved and pending quotes
|
|
approved_count = Quote.query.filter_by(status=1).count()
|
|
pending_count = Quote.query.filter_by(status=0).count()
|
|
|
|
if query:
|
|
# Perform text search in quotes
|
|
quotes = Quote.query.filter(Quote.text.like(f'%{query}%'), Quote.status == 1).all()
|
|
|
|
return render_template('search.html', quotes=quotes, query=query, approved_count=approved_count, pending_count=pending_count)
|
|
|
|
@app.route('/read', methods=['GET'])
|
|
def read_quote():
|
|
quote_id = request.args.get('id', type=int) # Get the quote number
|
|
|
|
if not quote_id:
|
|
flash("Quote number is required.", 'error')
|
|
return redirect(url_for('search'))
|
|
|
|
# Find the quote by ID (only approved quotes)
|
|
quote = Quote.query.filter_by(id=quote_id, status=1).first()
|
|
|
|
if quote:
|
|
return render_template('quote.html', quote=quote)
|
|
else:
|
|
flash(f"No quote found with ID {quote_id}", 'error')
|
|
return redirect(url_for('search'))
|
|
|
|
# Route for browsing approved quotes
|
|
@app.route('/browse', methods=['GET'])
|
|
def browse():
|
|
# Query the counts of approved and pending quotes
|
|
approved_count = Quote.query.filter_by(status=1).count()
|
|
pending_count = Quote.query.filter_by(status=0).count()
|
|
|
|
# Pagination setup
|
|
page = request.args.get('page', 1, type=int)
|
|
quotes = Quote.query.filter_by(status=1).order_by(Quote.date.desc()).paginate(page=page, per_page=10)
|
|
|
|
# Pass the counts and the quotes to the template
|
|
return render_template('browse.html', quotes=quotes, approved_count=approved_count, pending_count=pending_count)
|
|
|
|
|
|
# Approve a quote (admin only)
|
|
@app.route('/approve/<int:id>')
|
|
def approve(id):
|
|
if not session.get('admin'):
|
|
return redirect(url_for('login'))
|
|
|
|
quote = Quote.query.get_or_404(id)
|
|
quote.status = 1
|
|
db.session.commit()
|
|
|
|
# Redirect back to the same page
|
|
page = request.args.get('page', 1)
|
|
return redirect(url_for('modapp', page=page))
|
|
|
|
# Reject a quote (admin only)
|
|
@app.route('/reject/<int:id>')
|
|
def reject(id):
|
|
if not session.get('admin'):
|
|
return redirect(url_for('login'))
|
|
|
|
quote = Quote.query.get_or_404(id)
|
|
quote.status = 2 # 2 = rejected
|
|
db.session.commit()
|
|
return redirect(url_for('modapp'))
|
|
|
|
# Delete a quote (admin only)
|
|
@app.route('/delete/<int:id>')
|
|
def delete(id):
|
|
if not session.get('admin'):
|
|
return redirect(url_for('login'))
|
|
|
|
quote = Quote.query.get_or_404(id)
|
|
db.session.delete(quote)
|
|
db.session.commit()
|
|
return redirect(url_for('modapp'))
|
|
|
|
# Admin logout route
|
|
@app.route('/logout')
|
|
def logout():
|
|
session.pop('admin', None)
|
|
flash('Logged out successfully.', 'success')
|
|
return redirect(url_for('login'))
|
|
|
|
# Automatically create the database tables using app context
|
|
with app.app_context():
|
|
db.create_all()
|
|
|
|
# Initialize rate limiter and CORS for cross-origin API access
|
|
limiter = Limiter(app, key_func=get_remote_address)
|
|
CORS(app)
|
|
|
|
# API to get all approved quotes
|
|
@app.route('/api/quotes', methods=['GET'])
|
|
def get_all_quotes():
|
|
quotes = Quote.query.filter_by(status=1).all() # Only approved quotes
|
|
quote_list = [{
|
|
'id': quote.id,
|
|
'text': quote.text,
|
|
'votes': quote.votes,
|
|
'date': quote.date.strftime('%Y-%m-%d')
|
|
} for quote in quotes]
|
|
|
|
return jsonify(quote_list), 200
|
|
|
|
# API to get a specific quote by ID
|
|
@app.route('/api/quotes/<int:id>', methods=['GET'])
|
|
def get_quote(id):
|
|
quote = Quote.query.filter_by(id=id, status=1).first_or_404() # Only approved quotes
|
|
quote_data = {
|
|
'id': quote.id,
|
|
'text': quote.text,
|
|
'votes': quote.votes,
|
|
'date': quote.date.strftime('%Y-%m-%d')
|
|
}
|
|
return jsonify(quote_data), 200
|
|
|
|
# API to get a random approved quote
|
|
@app.route('/api/random', methods=['GET'])
|
|
def get_random_quote():
|
|
count = Quote.query.filter_by(status=1).count()
|
|
if count == 0:
|
|
return jsonify({"error": "No approved quotes available"}), 404
|
|
|
|
random_offset = random.randint(0, count - 1)
|
|
random_quote = Quote.query.filter_by(status=1).offset(random_offset).first()
|
|
|
|
quote_data = {
|
|
'id': random_quote.id,
|
|
'text': random_quote.text,
|
|
'votes': random_quote.votes,
|
|
'date': random_quote.date.strftime('%Y-%m-%d')
|
|
}
|
|
return jsonify(quote_data), 200
|
|
|
|
# API to get the top quotes by vote count
|
|
@app.route('/api/top', methods=['GET'])
|
|
def get_top_quotes():
|
|
top_quotes = Quote.query.filter_by(status=1).order_by(Quote.votes.desc()).limit(10).all() # Limit to top 10
|
|
quote_list = [{
|
|
'id': quote.id,
|
|
'text': quote.text,
|
|
'votes': quote.votes,
|
|
'date': quote.date.strftime('%Y-%m-%d')
|
|
} for quote in top_quotes]
|
|
|
|
return jsonify(quote_list), 200
|
|
|
|
# API to search for quotes
|
|
@app.route('/api/search', methods=['GET'])
|
|
def search_quotes():
|
|
query = request.args.get('q', '').strip()
|
|
if not query:
|
|
return jsonify({"error": "No search term provided"}), 400
|
|
|
|
quotes = Quote.query.filter(Quote.text.ilike(f'%{query}%'), Quote.status == 1).all() # Search in approved quotes
|
|
if not quotes:
|
|
return jsonify({"error": "No quotes found for search term"}), 404
|
|
|
|
quote_list = [{
|
|
'id': quote.id,
|
|
'text': quote.text,
|
|
'votes': quote.votes,
|
|
'date': quote.date.strftime('%Y-%m-%d')
|
|
} for quote in quotes]
|
|
|
|
return jsonify(quote_list), 200
|
|
|
|
# API to submit a new quote
|
|
@app.route('/api/submit', methods=['POST'])
|
|
@limiter.limit("5 per minute") # Rate limiting to prevent abuse
|
|
def submit_quote():
|
|
data = request.get_json()
|
|
|
|
# Validate the input
|
|
if not data or not data.get('text'):
|
|
return jsonify({"error": "Quote text is required"}), 400
|
|
|
|
quote_text = data.get('text').strip()
|
|
|
|
# Basic validation to prevent spam
|
|
if len(quote_text) < 5 or len(quote_text) > 1000:
|
|
return jsonify({"error": "Quote must be between 5 and 1000 characters"}), 400
|
|
|
|
new_quote = Quote(text=quote_text)
|
|
|
|
try:
|
|
db.session.add(new_quote)
|
|
db.session.commit()
|
|
return jsonify({"success": "Quote submitted successfully!", "id": new_quote.id}), 201
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return jsonify({"error": f"Error submitting quote: {str(e)}"}), 500
|
|
|
|
# Run the Flask app
|
|
if __name__ == '__main__':
|
|
app.run(host='127.0.0.1', port=5050, debug=True)
|