from base64 import b64decode import functools import re from nanoid import generate from flask import Blueprint from flask import flash from flask import current_app from flask import g from flask import redirect from flask import url_for from flask import request from flask import session from flask import render_template from flask_mail import Message from werkzeug.exceptions import abort from capsulflask.db import get_model bp = Blueprint("auth", __name__, url_prefix="/auth") def account_required(view): """View decorator that redirects non-logged-in users to the login page.""" @functools.wraps(view) def wrapped_view(**kwargs): api_token = request.headers.get('authorization', None) if api_token is not None: email = get_model().authenticate_token(b64decode(api_token).decode('utf-8')) if email is not None: session.clear() session["account"] = email session["csrf-token"] = generate() if session.get("account") is None or session.get("csrf-token") is None : return redirect(url_for("auth.login")) return view(**kwargs) return wrapped_view def admin_account_required(view): """View decorator that redirects non-admin users to the login page.""" @functools.wraps(view) def wrapped_view(**kwargs): if session.get("account") is None or session.get("csrf-token") is None: return abort(404) if session.get("account") not in current_app.config["ADMIN_PANEL_ALLOW_EMAIL_ADDRESSES"].split(","): return abort(404) return view(**kwargs) return wrapped_view @bp.route("/login", methods=("GET", "POST")) def login(): if request.method == "POST": email = request.form["email"] errors = list() if not email: errors.append("email is required") elif len(email.strip()) < 6 or email.count('@') != 1 or email.count('.') == 0: errors.append("enter a valid email address") if len(errors) == 0: result = get_model().login(email) token = result[0] ignoreCaseMatches = result[1] if token is None: errors.append("too many logins. please use one of the existing login links that have been emailed to you") else: link = f"{current_app.config['BASE_URL']}/auth/magic/{token}" message = (f"Navigate to {link} to log into Capsul.\n" "\nIf you didn't request this, ignore this message.") if len(ignoreCaseMatches) > 0: joinedMatches = " or ".join(map(lambda x: f"'{x}'", ignoreCaseMatches)) message = (f"You tried to log in as '{email}', but that account doesn't exist yet. \n" f"If you would like to create a new account for '{email}', click here {link} \n\n" f"If you meant to log in as {joinedMatches}, please return to https://capsul.org \n" "and log in again with the correct (case-sensitive) email address.") current_app.config["FLASK_MAIL_INSTANCE"].send( Message( "Click This Link to Login to Capsul", sender=current_app.config["MAIL_DEFAULT_SENDER"], body=message, recipients=[email] ) ) return render_template("login-landing.html", email=email, has_smtp=(current_app.config["MAIL_SERVER"] != "")) for error in errors: flash(error) return render_template("login.html") @bp.route("/magic/", methods=("GET", )) def magiclink(token): email = get_model().consume_token(token) if email is not None: session.clear() session["account"] = email session["csrf-token"] = generate() return redirect(url_for("console.index")) else: # this is here to prevent xss if token and not re.match(r"^[a-zA-Z0-9_-]+$", token): token = '___________' abort(404, f"Token {token} doesn't exist or has already been used.") @bp.route("/logout") def logout(): session.clear() return redirect(url_for("index"))