keycloak-collective-portal/keycloak_collective_portal.py

63 lines
2.1 KiB
Python
Raw Normal View History

2021-06-11 11:32:41 +00:00
"""Community Keycloak SSO user management."""
2021-06-11 13:58:28 +00:00
from os import environ
from authlib.integrations.starlette_client import OAuth, OAuthError
2021-06-11 12:14:04 +00:00
from fastapi import FastAPI, Request
2021-06-11 12:23:13 +00:00
from fastapi.responses import HTMLResponse, RedirectResponse
2021-06-11 12:14:04 +00:00
from fastapi.templating import Jinja2Templates
2021-06-11 12:23:13 +00:00
from starlette.middleware.sessions import SessionMiddleware
2021-06-11 11:32:41 +00:00
2021-06-11 13:58:28 +00:00
APP_SECRET_KEY = environ.get("APP_SECRET_KEY")
KEYCLOAK_CLIENT_ID = environ.get("KEYCLOAK_CLIENT_ID")
KEYCLOAK_CLIENT_SECRET = environ.get("KEYCLOAK_CLIENT_SECRET")
2021-06-11 14:39:22 +00:00
KEYCLOAK_DOMAIN = environ.get("KEYCLOAK_DOMAIN")
KEYCLOAK_REALM = environ.get("KEYCLOAK_REALM")
2021-06-11 13:58:28 +00:00
2021-06-11 11:32:41 +00:00
app = FastAPI()
2021-06-11 13:58:28 +00:00
app.add_middleware(SessionMiddleware, secret_key=APP_SECRET_KEY)
2021-06-11 12:14:04 +00:00
templates = Jinja2Templates(directory="templates")
2021-06-11 11:32:41 +00:00
2021-06-11 13:58:28 +00:00
oauth = OAuth()
oauth.register(
name="keycloak",
client_kwargs={"scope": "openid profile email"},
client_id=KEYCLOAK_CLIENT_ID,
client_secret=KEYCLOAK_CLIENT_SECRET,
2021-06-11 14:54:28 +00:00
authorize_url=f"https://{KEYCLOAK_DOMAIN}/auth/realms/{KEYCLOAK_REALM}/protocol/openid-connect/auth",
access_token_url=f"https://{KEYCLOAK_DOMAIN}/auth/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token",
2021-06-11 13:58:28 +00:00
)
2021-06-11 11:32:41 +00:00
2021-06-11 12:14:04 +00:00
@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
2021-06-11 12:23:13 +00:00
user = request.session.get("user")
if user:
return templates.TemplateResponse(
2021-06-11 13:58:28 +00:00
"index.html", context={"request": request, "user": user}
2021-06-11 12:23:13 +00:00
)
2021-06-11 13:58:28 +00:00
return RedirectResponse(request.url_for("login_keycloak"))
@app.get("/login/keycloak")
async def login_keycloak(request: Request):
redirect_uri = request.url_for("auth")
return await oauth.keycloak.authorize_redirect(request, redirect_uri)
@app.get("/auth")
async def auth(request: Request):
try:
token = await oauth.keycloak.authorize_access_token(request)
except OAuthError as error:
return HTMLResponse(f"<h1>{error.error}</h1>")
user = await oauth.keycloak.parse_id_token(request, token)
request.session["user"] = dict(user)
return RedirectResponse(request.url_for("home"))
2021-06-11 12:23:13 +00:00
2021-06-11 13:58:28 +00:00
@app.route("/logout")
async def logout(request: Request):
request.session.pop("user", None)
return RedirectResponse(request.url_for("home"))