"""Authentik admin API client. Provides an AuthentikAdmin class for managing OAuth2 providers, applications, and users via the Authentik REST API. Used by setup_authentik_integration.py and setup_docs_integration.py scripts. """ import json import urllib.error import urllib.parse import urllib.request class AuthentikAdmin: """Client for the Authentik Admin REST API.""" def __init__(self, base_url: str, token: str): self.base_url = base_url.rstrip("/") self.api_url = f"{self.base_url}/api/v3" self.token = token def _request(self, method: str, endpoint: str, *, data: dict | None = None, timeout: int = 30) -> tuple[int, dict | list | None]: """Make an authenticated API request.""" url = f"{self.api_url}{endpoint}" body = json.dumps(data).encode() if data is not None else None req = urllib.request.Request(url, data=body, method=method) req.add_header("Authorization", f"Bearer {self.token}") req.add_header("Content-Type", "application/json") try: with urllib.request.urlopen(req, timeout=timeout) as resp: raw = resp.read() if not raw: return resp.getcode(), None return resp.getcode(), json.loads(raw) except urllib.error.HTTPError as e: try: raw = e.read().decode(errors="replace") return e.code, json.loads(raw) if raw.strip() else None except Exception: return e.code, None def _get_first(self, endpoint: str, match_field: str, match_value: str) -> dict | None: """GET a list endpoint and return the first result matching a field.""" _, data = self._request("GET", endpoint) if not data or "results" not in data: return None for r in data["results"]: if r.get(match_field) == match_value: return r return None def resolve_uuids(self) -> dict: """Look up Authentik flow/mapping/certificate UUIDs dynamically.""" print("=== Resolving Authentik UUIDs ===", flush=True) uuids = {} lookups = [ ("authorization_flow", "/flows/instances/?slug=default-provider-authorization-implicit-consent"), ("invalidation_flow", "/flows/instances/?slug=default-provider-invalidation-flow"), ("authentication_flow", "/flows/instances/?slug=default-authentication-flow"), ("signing_key", "/crypto/certificatekeypairs/?name=authentik+Self-signed+Certificate"), ("scope_openid", "/propertymappings/all/?managed=goauthentik.io/providers/oauth2/scope-openid"), ("scope_email", "/propertymappings/all/?managed=goauthentik.io/providers/oauth2/scope-email"), ("scope_profile", "/propertymappings/all/?managed=goauthentik.io/providers/oauth2/scope-profile"), ] for name, endpoint in lookups: _, data = self._request("GET", endpoint) pk = data["results"][0]["pk"] uuids[name] = pk print(f" {name}: {pk}", flush=True) return uuids def ensure_provider(self, name: str, client_id: str, redirect_uris: list[dict], uuids: dict) -> tuple[int, str]: """Create an OAuth2 provider if needed. Returns (provider_pk, client_secret).""" print(f"=== Ensure OAuth2 provider '{name}' ===", flush=True) existing = self._get_first( f"/providers/oauth2/?search={name}", "name", name) if existing: pk = existing["pk"] print(f" Provider '{name}' already exists (pk: {pk}), fetching secret", flush=True) _, provider_data = self._request("GET", f"/providers/oauth2/{pk}/") client_secret = provider_data["client_secret"] else: _, resp = self._request("POST", "/providers/oauth2/", data={ "name": name, "client_type": "confidential", "client_id": client_id, "authorization_flow": uuids["authorization_flow"], "authentication_flow": uuids["authentication_flow"], "invalidation_flow": uuids["invalidation_flow"], "redirect_uris": redirect_uris, "property_mappings": [ uuids["scope_openid"], uuids["scope_email"], uuids["scope_profile"], ], "signing_key": uuids["signing_key"], "include_claims_in_id_token": True, }) pk = resp["pk"] client_secret = resp["client_secret"] print(f" Created provider (pk: {pk})", flush=True) print(f" Client ID: {client_id}", flush=True) print(f" Client secret: {client_secret}", flush=True) return pk, client_secret def ensure_application(self, name: str, slug: str, provider_pk: int, launch_url: str) -> None: """Create an application if it doesn't exist.""" print(f"=== Ensure application '{slug}' ===", flush=True) existing = self._get_first( f"/core/applications/?slug={slug}", "slug", slug) if existing: print(f" Application '{slug}' already exists, skipping", flush=True) else: self._request("POST", "/core/applications/", data={ "name": name, "slug": slug, "provider": provider_pk, "meta_launch_url": launch_url, }) print(f" Created application '{slug}'", flush=True) def ensure_user(self, username: str, email: str, password: str) -> int: """Create a user if needed, set password. Returns user pk.""" print(f"=== Ensure user '{username}' ===", flush=True) existing = self._get_first( f"/core/users/?search={username}", "username", username) if existing: user_pk = existing["pk"] print(f" User '{username}' already exists (pk: {user_pk})", flush=True) else: _, resp = self._request("POST", "/core/users/", data={ "username": username, "name": "Test User", "email": email, "is_active": True, }) user_pk = resp["pk"] print(f" Created user '{username}' (pk: {user_pk})", flush=True) # Set password print(f" Setting password for '{username}' ...", flush=True) self._request( "POST", f"/core/users/{user_pk}/set_password/", data={"password": password}, ) print(" Password set", flush=True) return user_pk def ensure_app_password(self, user_pk: int, identifier: str = "testuser-app-password") -> str: """Create an APP_PASSWORD token for password grant. Returns the key.""" print(" Creating APP_PASSWORD token for password grant ...", flush=True) # Delete existing token if present existing = self._get_first( f"/core/tokens/?identifier={identifier}", "identifier", identifier) if existing: self._request("DELETE", f"/core/tokens/{identifier}/") print(" Deleted existing APP_PASSWORD token", flush=True) self._request("POST", "/core/tokens/", data={ "identifier": identifier, "intent": "app_password", "user": user_pk, "description": "Test user app password for OIDC password grant", "expiring": False, }) _, key_data = self._request( "GET", f"/core/tokens/{identifier}/view_key/") app_password = key_data["key"] print(f" APP_PASSWORD created: {app_password[:10]}...", flush=True) return app_password