capsul-flask/capsulflask/auth.py
3wc cf42ac5e4d Add basic "create" API..
.. using server-side API tokens
2021-07-17 12:18:16 +02:00

126 lines
4.3 KiB
Python

from base64 import b64decode
import functools
import re
from nanoid import generate
from flask import Blueprint
from flask import flash
from flask import current_app
from flask import g
from flask import redirect
from flask import url_for
from flask import request
from flask import session
from flask import render_template
from flask_mail import Message
from werkzeug.exceptions import abort
from capsulflask.db import get_model
bp = Blueprint("auth", __name__, url_prefix="/auth")
def account_required(view):
"""View decorator that redirects non-logged-in users to the login page."""
@functools.wraps(view)
def wrapped_view(**kwargs):
api_token = request.headers.get('authorization', None)
if api_token is not None:
email = get_model().authenticate_token(b64decode(api_token).decode('utf-8'))
if email is not None:
session.clear()
session["account"] = email
session["csrf-token"] = generate()
if session.get("account") is None or session.get("csrf-token") is None :
return redirect(url_for("auth.login"))
return view(**kwargs)
return wrapped_view
def admin_account_required(view):
"""View decorator that redirects non-admin users to the login page."""
@functools.wraps(view)
def wrapped_view(**kwargs):
if session.get("account") is None or session.get("csrf-token") is None:
return abort(404)
if session.get("account") not in current_app.config["ADMIN_PANEL_ALLOW_EMAIL_ADDRESSES"].split(","):
return abort(404)
return view(**kwargs)
return wrapped_view
@bp.route("/login", methods=("GET", "POST"))
def login():
if request.method == "POST":
email = request.form["email"]
errors = list()
if not email:
errors.append("email is required")
elif len(email.strip()) < 6 or email.count('@') != 1 or email.count('.') == 0:
errors.append("enter a valid email address")
if len(errors) == 0:
result = get_model().login(email)
token = result[0]
ignoreCaseMatches = result[1]
if token is None:
errors.append("too many logins. please use one of the existing login links that have been emailed to you")
else:
link = f"{current_app.config['BASE_URL']}/auth/magic/{token}"
message = (f"Navigate to {link} to log into Capsul.\n"
"\nIf you didn't request this, ignore this message.")
if len(ignoreCaseMatches) > 0:
joinedMatches = " or ".join(map(lambda x: f"'{x}'", ignoreCaseMatches))
message = (f"You tried to log in as '{email}', but that account doesn't exist yet. \n"
f"If you would like to create a new account for '{email}', click here {link} \n\n"
f"If you meant to log in as {joinedMatches}, please return to https://capsul.org \n"
"and log in again with the correct (case-sensitive) email address.")
current_app.config["FLASK_MAIL_INSTANCE"].send(
Message(
"Click This Link to Login to Capsul",
sender=current_app.config["MAIL_DEFAULT_SENDER"],
body=message,
recipients=[email]
)
)
return render_template("login-landing.html", email=email, has_smtp=(current_app.config["MAIL_SERVER"] != ""))
for error in errors:
flash(error)
return render_template("login.html")
@bp.route("/magic/<string:token>", methods=("GET", ))
def magiclink(token):
email = get_model().consume_token(token)
if email is not None:
session.clear()
session["account"] = email
session["csrf-token"] = generate()
return redirect(url_for("console.index"))
else:
# this is here to prevent xss
if token and not re.match(r"^[a-zA-Z0-9_-]+$", token):
token = '___________'
abort(404, f"Token {token} doesn't exist or has already been used.")
@bp.route("/logout")
def logout():
session.clear()
return redirect(url_for("index"))