from flask import Flask, render_template, request, redirect, url_for, flash, abort, make_response, session, jsonify from flask_sqlalchemy import SQLAlchemy from flask_limiter import Limiter from flask_limiter.util import get_remote_address from flask_cors import CORS import datetime import json import random from argon2 import PasswordHasher from argon2.exceptions import VerifyMismatchError from werkzeug.middleware.proxy_fix import ProxyFix # Import ProxyFix import logging app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///quotes.db' app.config['SECRET_KEY'] = 'your_secret_key' # Use environment variable in production db = SQLAlchemy(app) # Apply ProxyFix middleware app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1, x_prefix=1) # Initialize Argon2 password hasher ph = PasswordHasher() # Initialize logging for debugging logging.basicConfig(level=logging.DEBUG) # Hardcoded admin credentials (hashed password using Argon2) ADMIN_CREDENTIALS = { 'username': 'admin', # Replace this with the hashed password generated by Argon2 'password': '$argon2i$v=19$m=65536,t=4,p=1$cWZDc1pQaUJLTUJoaVI4cw$kn8XKz6AEZi8ebXfyyZuzommSypliVFrsGqzOyUEIHA' # Example hash } # Define the Quote model class Quote(db.Model): id = db.Column(db.Integer, primary_key=True) text = db.Column(db.Text, nullable=False) votes = db.Column(db.Integer, default=0) date = db.Column(db.DateTime, default=datetime.datetime.utcnow) status = db.Column(db.Integer, default=0) # 0 = pending, 1 = approved, 2 = rejected ip_address = db.Column(db.String(45)) # Store IPv4 and IPv6 addresses user_agent = db.Column(db.String(255)) # Store user-agent strings submitted_at = db.Column(db.DateTime, default=datetime.datetime.utcnow) # Home route to display quotes @app.route('/') def index(): page = request.args.get('page', 1, type=int) quotes = Quote.query.filter_by(status=1).order_by(Quote.date.desc()).paginate(page=page, per_page=5) # Get the count of approved and pending quotes approved_count = Quote.query.filter_by(status=1).count() pending_count = Quote.query.filter_by(status=0).count() return render_template('index.html', quotes=quotes, approved_count=approved_count, pending_count=pending_count) # Separate route for submitting quotes @app.route('/submit', methods=['GET', 'POST']) def submit(): if request.method == 'POST': quote_text = request.form.get('quote') if not quote_text: flash("Quote cannot be empty.", 'error') return redirect(url_for('submit')) ip_address = request.headers.get('CF-Connecting-IP', request.remote_addr) # Get the user's IP address user_agent = request.headers.get('User-Agent') # Get the user's browser info new_quote = Quote(text=quote_text, ip_address=ip_address, user_agent=user_agent) try: db.session.add(new_quote) db.session.commit() flash("Quote submitted successfully!", 'success') except Exception as e: db.session.rollback() flash("Error submitting quote: {}".format(e), 'error') return redirect(url_for('index')) # Get the count of approved and pending quotes approved_count = Quote.query.filter_by(status=1).count() pending_count = Quote.query.filter_by(status=0).count() return render_template('submit.html', approved_count=approved_count, pending_count=pending_count) @app.route('/vote//') def vote(id, action): quote = Quote.query.get_or_404(id) # Retrieve vote history from the cookie vote_cookie = request.cookies.get('votes') if vote_cookie: vote_data = json.loads(vote_cookie) else: vote_data = {} # If no prior vote, apply the new vote if str(id) not in vote_data: if action == 'upvote': quote.votes += 1 vote_data[str(id)] = 'upvote' elif action == 'downvote': quote.votes -= 1 vote_data[str(id)] = 'downvote' flash("Thank you for voting!", 'success') else: previous_action = vote_data[str(id)] if previous_action == action: # If the user clicks the same action again, undo the vote if action == 'upvote': quote.votes -= 1 elif action == 'downvote': quote.votes += 1 del vote_data[str(id)] # Remove the vote record (undo) flash("Your vote has been undone.", 'success') else: # If the user switches votes (upvote -> downvote or vice versa) if previous_action == 'upvote' and action == 'downvote': quote.votes -= 2 # Undo upvote (+1) and apply downvote (-1) vote_data[str(id)] = 'downvote' elif previous_action == 'downvote' and action == 'upvote': quote.votes += 2 # Undo downvote (-1) and apply upvote (+1) vote_data[str(id)] = 'upvote' flash("Your vote has been changed.", 'success') # Save the updated vote data to the cookie try: db.session.commit() page = request.args.get('page', 1) resp = make_response(redirect(url_for('browse', page=page))) resp.set_cookie('votes', json.dumps(vote_data), max_age=60*60*24*365) # Store vote history in cookies for 1 year return resp except Exception as e: db.session.rollback() flash(f"Error while voting: {e}", 'error') return redirect(url_for('browse', page=page)) # Route for displaying a random quote @app.route('/random') def random_quote(): approved_count = Quote.query.filter_by(status=1).count() pending_count = Quote.query.filter_by(status=0).count() count = Quote.query.count() if count == 0: flash("No quotes available yet.", 'error') return redirect(url_for('index')) random_id = random.randint(1, count) random_quote = Quote.query.get(random_id) return render_template('random.html', quote=random_quote, approved_count=approved_count, pending_count=pending_count) @app.route('/') def quote_homepathid(id): quote = Quote.query.get_or_404(id) return render_template('quote.html', quote=quote) @app.route('/quote') def quote(): quote_id = request.args.get('id') if not quote_id: flash("Quote ID not provided.", 'error') return redirect(url_for('browse')) quote = Quote.query.get_or_404(quote_id) return render_template('quote.html', quote=quote) @app.route('/faq') def faq(): return render_template('faq.html') # Admin login route @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] # Check if the username is correct and verify the password using Argon2 if username == ADMIN_CREDENTIALS['username']: try: ph.verify(ADMIN_CREDENTIALS['password'], password) # Verify password using Argon2 session['admin'] = True flash('Login successful!', 'success') return redirect(url_for('modapp')) except VerifyMismatchError: flash('Invalid password. Please try again.', 'danger') else: flash('Invalid username. Please try again.', 'danger') return render_template('login.html') # Admin panel route (accessible only to logged-in admins) @app.route('/modapp') def modapp(): if not session.get('admin'): flash('You need to log in first.', 'danger') return redirect(url_for('login')) # Apply filtering (pending, approved, rejected) filter_status = request.args.get('filter', 'pending') page = request.args.get('page', 1, type=int) if filter_status == 'approved': quotes = Quote.query.filter_by(status=1).order_by(Quote.date.desc()).paginate(page=page, per_page=10) elif filter_status == 'rejected': quotes = Quote.query.filter_by(status=2).order_by(Quote.date.desc()).paginate(page=page, per_page=10) else: # Default to pending quotes = Quote.query.filter_by(status=0).order_by(Quote.date.desc()).paginate(page=page, per_page=10) # Get counts for each status approved_count = Quote.query.filter_by(status=1).count() pending_count = Quote.query.filter_by(status=0).count() rejected_count = Quote.query.filter_by(status=2).count() return render_template('modapp.html', quotes=quotes, filter_status=filter_status, approved_count=approved_count, pending_count=pending_count, rejected_count=rejected_count) # Helper function to approve a quote def approve_quote(quote_id): quote = Quote.query.get(quote_id) if quote and quote.status != 1: # Only approve if not already approved quote.status = 1 # Approved db.session.commit() # Helper function to reject a quote def reject_quote(quote_id): quote = Quote.query.get(quote_id) if quote and quote.status != 2: # Only reject if not already rejected logging.debug(f"Rejecting quote ID: {quote.id}") # Add logging for rejection quote.status = 2 # Rejected db.session.commit() # Helper function to delete a quote def delete_quote(quote_id): quote = Quote.query.get(quote_id) if quote: logging.debug(f"Deleting quote ID: {quote.id}") # Add logging for deletion db.session.delete(quote) db.session.commit() @app.route('/search', methods=['GET']) def search(): query = request.args.get('q', '').strip() # Get the search query quotes = [] # Query counts of approved and pending quotes approved_count = Quote.query.filter_by(status=1).count() pending_count = Quote.query.filter_by(status=0).count() if query: # Perform text search in quotes quotes = Quote.query.filter(Quote.text.like(f'%{query}%'), Quote.status == 1).all() return render_template('search.html', quotes=quotes, query=query, approved_count=approved_count, pending_count=pending_count) @app.route('/read', methods=['GET']) def read_quote(): quote_id = request.args.get('id', type=int) # Get the quote number if not quote_id: flash("Quote number is required.", 'error') return redirect(url_for('search')) # Find the quote by ID (only approved quotes) quote = Quote.query.filter_by(id=quote_id, status=1).first() if quote: return render_template('quote.html', quote=quote) else: flash(f"No quote found with ID {quote_id}", 'error') return redirect(url_for('search')) # Route for browsing approved quotes @app.route('/browse', methods=['GET']) def browse(): # Query the counts of approved and pending quotes approved_count = Quote.query.filter_by(status=1).count() pending_count = Quote.query.filter_by(status=0).count() # Pagination setup page = request.args.get('page', 1, type=int) quotes = Quote.query.filter_by(status=1).order_by(Quote.date.desc()).paginate(page=page, per_page=10) # Pass the counts and the quotes to the template return render_template('browse.html', quotes=quotes, approved_count=approved_count, pending_count=pending_count) # Approve a quote (admin only) @app.route('/approve/') def approve(id): if not session.get('admin'): return redirect(url_for('login')) quote = Quote.query.get_or_404(id) quote.status = 1 db.session.commit() # Redirect back to the same page page = request.args.get('page', 1) return redirect(url_for('modapp', page=page)) # Reject a quote (admin only) @app.route('/reject/') def reject(id): if not session.get('admin'): return redirect(url_for('login')) quote = Quote.query.get_or_404(id) quote.status = 2 # 2 = rejected db.session.commit() return redirect(url_for('modapp')) # Delete a quote (admin only) @app.route('/delete/') def delete(id): if not session.get('admin'): return redirect(url_for('login')) quote = Quote.query.get_or_404(id) db.session.delete(quote) db.session.commit() return redirect(url_for('modapp')) # Admin logout route @app.route('/logout') def logout(): session.pop('admin', None) flash('Logged out successfully.', 'success') return redirect(url_for('login')) # Automatically create the database tables using app context with app.app_context(): db.create_all() # Initialize rate limiter and CORS for cross-origin API access limiter = Limiter(app, key_func=get_remote_address) CORS(app) # API to get all approved quotes @app.route('/api/quotes', methods=['GET']) def get_all_quotes(): quotes = Quote.query.filter_by(status=1).all() # Only approved quotes quote_list = [{ 'id': quote.id, 'text': quote.text, 'votes': quote.votes, 'date': quote.date.strftime('%Y-%m-%d') } for quote in quotes] return jsonify(quote_list), 200 # API to get a specific quote by ID @app.route('/api/quotes/', methods=['GET']) def get_quote(id): quote = Quote.query.filter_by(id=id, status=1).first_or_404() # Only approved quotes quote_data = { 'id': quote.id, 'text': quote.text, 'votes': quote.votes, 'date': quote.date.strftime('%Y-%m-%d') } return jsonify(quote_data), 200 # API to get a random approved quote @app.route('/api/random', methods=['GET']) def get_random_quote(): count = Quote.query.filter_by(status=1).count() if count == 0: return jsonify({"error": "No approved quotes available"}), 404 random_offset = random.randint(0, count - 1) random_quote = Quote.query.filter_by(status=1).offset(random_offset).first() quote_data = { 'id': random_quote.id, 'text': random_quote.text, 'votes': random_quote.votes, 'date': random_quote.date.strftime('%Y-%m-%d') } return jsonify(quote_data), 200 # API to get the top quotes by vote count @app.route('/api/top', methods=['GET']) def get_top_quotes(): top_quotes = Quote.query.filter_by(status=1).order_by(Quote.votes.desc()).limit(10).all() # Limit to top 10 quote_list = [{ 'id': quote.id, 'text': quote.text, 'votes': quote.votes, 'date': quote.date.strftime('%Y-%m-%d') } for quote in top_quotes] return jsonify(quote_list), 200 # API to search for quotes @app.route('/api/search', methods=['GET']) def search_quotes(): query = request.args.get('q', '').strip() if not query: return jsonify({"error": "No search term provided"}), 400 quotes = Quote.query.filter(Quote.text.ilike(f'%{query}%'), Quote.status == 1).all() # Search in approved quotes if not quotes: return jsonify({"error": "No quotes found for search term"}), 404 quote_list = [{ 'id': quote.id, 'text': quote.text, 'votes': quote.votes, 'date': quote.date.strftime('%Y-%m-%d') } for quote in quotes] return jsonify(quote_list), 200 # API to submit a new quote @app.route('/api/submit', methods=['POST']) @limiter.limit("5 per minute") # Rate limiting to prevent abuse def submit_quote(): data = request.get_json() # Validate the input if not data or not data.get('text'): return jsonify({"error": "Quote text is required"}), 400 quote_text = data.get('text').strip() # Basic validation to prevent spam if len(quote_text) < 5 or len(quote_text) > 1000: return jsonify({"error": "Quote must be between 5 and 1000 characters"}), 400 new_quote = Quote(text=quote_text) try: db.session.add(new_quote) db.session.commit() return jsonify({"success": "Quote submitted successfully!", "id": new_quote.id}), 201 except Exception as e: db.session.rollback() return jsonify({"error": f"Error submitting quote: {str(e)}"}), 500 # Run the Flask app if __name__ == '__main__': app.run(host='127.0.0.1', port=5050, debug=True)