capsul-flask/capsulflask/auth.py

88 lines
2.6 KiB
Python

import functools
import re
from nanoid import generate
from flask import Blueprint
from flask import flash
from flask import current_app
from flask import g
from flask import redirect
from flask import url_for
from flask import request
from flask import session
from flask import render_template
from flask_mail import Message
from werkzeug.exceptions import abort
from capsulflask.db import get_model
bp = Blueprint("auth", __name__, url_prefix="/auth")
def account_required(view):
"""View decorator that redirects non-logged-in users to the login page."""
@functools.wraps(view)
def wrapped_view(**kwargs):
if session.get("account") is None or session.get("csrf-token") is None :
return redirect(url_for("auth.login"))
return view(**kwargs)
return wrapped_view
@bp.route("/login", methods=("GET", "POST"))
def login():
if request.method == "POST":
email = request.form["email"]
errors = list()
if not email:
errors.append("email is required")
elif len(email.strip()) < 6 or email.count('@') != 1 or email.count('.') == 0:
errors.append("enter a valid email address")
if len(errors) == 0:
token = get_model().login(email)
if token is None:
errors.append("too many logins. please use one of the existing login links that have been emailed to you")
else:
link = f"{current_app.config['BASE_URL']}/auth/magic/{token}"
current_app.config["FLASK_MAIL_INSTANCE"].send(
Message(
"Click This Link to Login to Capsul",
body=(f"Navigate to {link} to log into Capsul.\n"
"\nIf you didn't request this, ignore this message."),
recipients=[email]
)
)
return render_template("login-landing.html", email=email)
for error in errors:
flash(error)
return render_template("login.html")
@bp.route("/magic/<string:token>", methods=("GET", ))
def magiclink(token):
email = get_model().consume_token(token)
if email is not None:
session.clear()
session["account"] = email
session["csrf-token"] = generate()
return redirect(url_for("console.index"))
else:
# this is here to prevent xss
if token and not re.match(r"^[a-zA-Z0-9_-]+$", token):
token = '___________'
abort(404, f"Token {token} doesn't exist or has already been used.")
@bp.route("/logout")
def logout():
session.clear()
return redirect(url_for("index"))