capsul-flask/capsulflask/auth.py

115 lines
3.9 KiB
Python
Raw Normal View History

2020-05-10 01:36:14 +00:00
import functools
2020-05-10 04:45:20 +00:00
import re
2020-05-10 01:36:14 +00:00
from nanoid import generate
2020-05-10 01:36:14 +00:00
from flask import Blueprint
2020-05-10 03:59:22 +00:00
from flask import flash
from flask import current_app
2020-05-10 01:36:14 +00:00
from flask import g
from flask import redirect
from flask import url_for
2020-05-10 03:59:22 +00:00
from flask import request
2020-05-10 01:36:14 +00:00
from flask import session
2020-05-10 03:59:22 +00:00
from flask import render_template
from flask_mail import Message
2020-05-10 04:32:13 +00:00
from werkzeug.exceptions import abort
2020-05-10 01:36:14 +00:00
from capsulflask.db import get_model
bp = Blueprint("auth", __name__, url_prefix="/auth")
def account_required(view):
2020-05-10 18:51:54 +00:00
"""View decorator that redirects non-logged-in users to the login page."""
2020-05-10 01:36:14 +00:00
@functools.wraps(view)
def wrapped_view(**kwargs):
if session.get("account") is None or session.get("csrf-token") is None :
2020-05-10 01:36:14 +00:00
return redirect(url_for("auth.login"))
return view(**kwargs)
return wrapped_view
2021-07-08 19:10:14 +00:00
def admin_account_required(view):
"""View decorator that redirects non-admin users to the login page."""
@functools.wraps(view)
def wrapped_view(**kwargs):
if session.get("account") is None or session.get("csrf-token") is None:
return abort(404)
2021-07-08 19:10:14 +00:00
if session.get("account") not in current_app.config["ADMIN_PANEL_ALLOW_EMAIL_ADDRESSES"].split(","):
return abort(404)
2021-07-08 19:10:14 +00:00
return view(**kwargs)
return wrapped_view
2020-05-10 03:59:22 +00:00
@bp.route("/login", methods=("GET", "POST"))
def login():
2020-05-10 01:36:14 +00:00
if request.method == "POST":
email = request.form["email"]
errors = list()
2020-05-10 01:36:14 +00:00
if not email:
errors.append("email is required")
elif len(email.strip()) < 6 or email.count('@') != 1 or email.count('.') == 0:
errors.append("enter a valid email address")
2020-05-10 01:36:14 +00:00
if len(errors) == 0:
result = get_model().login(email)
token = result[0]
ignoreCaseMatches = result[1]
2020-05-10 18:51:54 +00:00
if token is None:
errors.append("too many logins. please use one of the existing login links that have been emailed to you")
2020-05-10 18:51:54 +00:00
else:
link = f"{current_app.config['BASE_URL']}/auth/magic/{token}"
message = (f"Navigate to {link} to log into Capsul.\n"
"\nIf you didn't request this, ignore this message.")
if len(ignoreCaseMatches) > 0:
joinedMatches = " or ".join(map(lambda x: f"'{x}'", ignoreCaseMatches))
message = (f"You tried to log in as '{email}', but that account doesn't exist yet. \n"
f"If you would like to create a new account for '{email}', click here {link} \n\n"
f"If you meant to log in as {joinedMatches}, please return to https://capsul.org \n"
"and log in again with the correct (case-sensitive) email address.")
2020-05-10 18:51:54 +00:00
current_app.config["FLASK_MAIL_INSTANCE"].send(
Message(
"Click This Link to Login to Capsul",
sender=current_app.config["MAIL_DEFAULT_SENDER"],
body=message,
recipients=[email]
2020-05-10 18:51:54 +00:00
)
2020-05-10 03:59:22 +00:00
)
return render_template("login-landing.html", email=email, has_smtp=(current_app.config["MAIL_SERVER"] != ""))
2020-05-10 01:36:14 +00:00
for error in errors:
flash(error)
2020-05-10 01:36:14 +00:00
2020-05-10 03:59:22 +00:00
return render_template("login.html")
2020-05-10 04:32:13 +00:00
@bp.route("/magic/<string:token>", methods=("GET", ))
def magiclink(token):
email = get_model().consume_token(token)
2020-05-10 04:32:13 +00:00
if email is not None:
session.clear()
session["account"] = email
session["csrf-token"] = generate()
return redirect(url_for("console.index"))
2020-05-10 04:32:13 +00:00
else:
# this is here to prevent xss
if token and not re.match(r"^[a-zA-Z0-9_-]+$", token):
token = '___________'
2020-05-10 04:32:13 +00:00
abort(404, f"Token {token} doesn't exist or has already been used.")
2020-05-10 03:59:22 +00:00
@bp.route("/logout")
def logout():
session.clear()
return redirect(url_for("index"))