"""Community Keycloak SSO user management.""" import json from os import environ from uuid import uuid4 import httpx from aioredis import create_redis_pool from authlib.integrations.starlette_client import OAuth, OAuthError from fastapi import Depends, FastAPI, HTTPException, Request from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates from starlette.exceptions import HTTPException from starlette.middleware.sessions import SessionMiddleware APP_SECRET_KEY = environ.get("APP_SECRET_KEY") KEYCLOAK_CLIENT_ID = environ.get("KEYCLOAK_CLIENT_ID") KEYCLOAK_CLIENT_SECRET = environ.get("KEYCLOAK_CLIENT_SECRET") KEYCLOAK_DOMAIN = environ.get("KEYCLOAK_DOMAIN") KEYCLOAK_REALM = environ.get("KEYCLOAK_REALM") BASE_URL = f"https://{KEYCLOAK_DOMAIN}/auth/realms/{KEYCLOAK_REALM}/protocol/openid-connect" # noqa REDIS_DB = environ.get("REDIS_DB") REDIS_HOST = environ.get("REDIS_HOST") REDIS_PORT = environ.get("REDIS_PORT") app = FastAPI(docs_url=None, redoc_url=None) app.add_middleware(SessionMiddleware, secret_key=APP_SECRET_KEY) templates = Jinja2Templates(directory="templates") oauth = OAuth() oauth.register( name="keycloak", client_kwargs={"scope": "openid profile email"}, client_id=KEYCLOAK_CLIENT_ID, client_secret=KEYCLOAK_CLIENT_SECRET, authorize_url=f"{BASE_URL}/auth", access_token_url=f"{BASE_URL}/token", jwks_uri=f"{BASE_URL}/certs", ) class RequiresLoginException(Exception): pass @app.exception_handler(RequiresLoginException) async def requires_login(request, exception): return RedirectResponse(request.url_for("login")) @app.exception_handler(HTTPException) async def http_exception_handler(request, exc): home = request.url_for("login") return HTMLResponse(f"

{exc.detail} (home)

") async def logged_in(request: Request): user = request.session.get("user") if not user: raise RequiresLoginException return user async def get_user(request: Request): return request.session.get("user") @app.on_event("startup") async def starup_event(): app.state.redis = create_redis_pool( f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}?encoding=utf-8" ) @app.on_event("shutdown") async def shutdown_event(): app.state.redis.close() await app.state.redis.wait_closed() @app.get("/", dependencies=[Depends(logged_in)]) async def home(request: Request, user=Depends(get_user)): context = {"request": request, "user": user} return templates.TemplateResponse("admin.html", context=context) @app.get("/login") async def login(request: Request): return templates.TemplateResponse( "login.html", context={"request": request} ) @app.get("/login/keycloak") async def login_keycloak(request: Request): redirect_uri = request.url_for("auth_keycloak") return await oauth.keycloak.authorize_redirect(request, redirect_uri) @app.get("/auth/keycloak") async def auth_keycloak(request: Request): try: token = await oauth.keycloak.authorize_access_token(request) except Exception as exc: return HTMLResponse(f"

{exc} (home)

") user = await oauth.keycloak.parse_id_token(request, token) request.session["user"] = dict(user) return RedirectResponse(request.url_for("home")) @app.get("/logout", dependencies=[Depends(logged_in)]) async def logout(request: Request): try: httpx.get(f"{BASE_URL}/logout") except Exception as exc: return HTMLResponse(f"

{exc} (home)

") request.session.pop("user", None) return RedirectResponse(request.url_for("login")) @app.get("/invite/keycloak", dependencies=[Depends(logged_in)]) async def invite_keycloak(request: Request): pass