"""Shared Authentik API client for SSO integration scripts. No pip dependencies -- uses urllib.request + json from stdlib. """ import json import urllib.request import urllib.error import urllib.parse class AuthentikClient: """Encapsulates all Authentik API interactions for SSO setup.""" def __init__(self, base_url: str, token: str): self.base_url = base_url.rstrip("/") self.api_url = f"{self.base_url}/api/v3" self.token = token # -- low-level helpers --------------------------------------------------- def _request(self, method, endpoint, data=None): """Make an authenticated API request. Returns parsed JSON.""" url = f"{self.api_url}{endpoint}" body = json.dumps(data).encode() if data is not None else None req = urllib.request.Request(url, data=body, method=method) req.add_header("Authorization", f"Bearer {self.token}") req.add_header("Content-Type", "application/json") try: with urllib.request.urlopen(req, timeout=30) as resp: raw = resp.read() return json.loads(raw) if raw else {} except urllib.error.HTTPError as e: raw = e.read().decode(errors="replace") raise RuntimeError( f"Authentik API {method} {endpoint} returned {e.code}: {raw}" ) from e def _get(self, endpoint): return self._request("GET", endpoint) def _post(self, endpoint, data): return self._request("POST", endpoint, data) def _delete(self, endpoint): return self._request("DELETE", endpoint) # -- resolve UUIDs ------------------------------------------------------- def resolve_uuids(self): """Fetch PKs for default flows, signing key, and scope mappings. Returns a dict with keys: authorization_flow, invalidation_flow, authentication_flow, signing_key, scope_openid, scope_email, scope_profile """ uuids = {} # Flows flow_slugs = { "authorization_flow": "default-provider-authorization-implicit-consent", "invalidation_flow": "default-provider-invalidation-flow", "authentication_flow": "default-authentication-flow", } for key, slug in flow_slugs.items(): resp = self._get(f"/flows/instances/?slug={slug}") results = resp.get("results", []) if not results: raise RuntimeError(f"Flow '{slug}' not found in Authentik") uuids[key] = results[0]["pk"] print(f" {key}: {uuids[key]}") # Signing key resp = self._get( "/crypto/certificatekeypairs/" "?name=authentik+Self-signed+Certificate" ) results = resp.get("results", []) if not results: raise RuntimeError("Authentik Self-signed Certificate not found") uuids["signing_key"] = results[0]["pk"] print(f" signing_key: {uuids['signing_key']}") # Scope property mappings scope_managed = { "scope_openid": "goauthentik.io/providers/oauth2/scope-openid", "scope_email": "goauthentik.io/providers/oauth2/scope-email", "scope_profile": "goauthentik.io/providers/oauth2/scope-profile", } for key, managed in scope_managed.items(): resp = self._get(f"/propertymappings/all/?managed={managed}") results = resp.get("results", []) if not results: raise RuntimeError(f"Scope mapping '{managed}' not found") uuids[key] = results[0]["pk"] print(f" {key}: {uuids[key]}") return uuids # -- ensure OAuth2 provider ---------------------------------------------- def ensure_oauth2_provider(self, name, client_id, redirect_uris, uuids): """Create or reuse an OAuth2 provider. redirect_uris: list of {"matching_mode": "strict", "url": "..."} Returns (provider_pk, client_secret). """ resp = self._get( f"/providers/oauth2/?search={urllib.parse.quote(name)}" ) for r in resp.get("results", []): if r["name"] == name: pk = r["pk"] print(f" Provider '{name}' already exists (pk: {pk})") detail = self._get(f"/providers/oauth2/{pk}/") return pk, detail["client_secret"] # Create new provider provider = self._post("/providers/oauth2/", { "name": name, "client_type": "confidential", "client_id": client_id, "authorization_flow": uuids["authorization_flow"], "authentication_flow": uuids["authentication_flow"], "invalidation_flow": uuids["invalidation_flow"], "redirect_uris": redirect_uris, "property_mappings": [ uuids["scope_openid"], uuids["scope_email"], uuids["scope_profile"], ], "signing_key": uuids["signing_key"], "include_claims_in_id_token": True, }) pk = provider["pk"] secret = provider["client_secret"] print(f" Created provider '{name}' (pk: {pk})") return pk, secret # -- ensure application -------------------------------------------------- def ensure_application(self, name, slug, provider_pk, launch_url): """Create or reuse an application linked to the provider. Returns the application slug. """ resp = self._get(f"/core/applications/?slug={urllib.parse.quote(slug)}") for r in resp.get("results", []): if r["slug"] == slug: print(f" Application '{slug}' already exists") return slug self._post("/core/applications/", { "name": name, "slug": slug, "provider": provider_pk, "meta_launch_url": launch_url, }) print(f" Created application '{slug}'") return slug # -- ensure test user ---------------------------------------------------- def ensure_test_user(self, username, email, password): """Create or reuse a user and set their password. Returns user_pk. """ resp = self._get( f"/core/users/?search={urllib.parse.quote(username)}" ) user_pk = None for r in resp.get("results", []): if r["username"] == username: user_pk = r["pk"] print(f" User '{username}' already exists (pk: {user_pk})") break if user_pk is None: user = self._post("/core/users/", { "username": username, "name": "Test User", "email": email, "is_active": True, }) user_pk = user["pk"] print(f" Created user '{username}' (pk: {user_pk})") # Always set password self._post(f"/core/users/{user_pk}/set_password/", { "password": password, }) print(f" Password set for '{username}'") return user_pk # -- ensure app password ------------------------------------------------- def ensure_app_password(self, user_pk, identifier): """Create an APP_PASSWORD token (deletes existing one first). Returns the app password string. """ # Delete existing token if present resp = self._get( f"/core/tokens/?identifier={urllib.parse.quote(identifier)}" ) if resp.get("results"): try: self._delete(f"/core/tokens/{identifier}/") print(f" Deleted existing token '{identifier}'") except RuntimeError: pass # Already gone self._post("/core/tokens/", { "identifier": identifier, "intent": "app_password", "user": user_pk, "description": "App password for OIDC password grant", "expiring": False, }) key_resp = self._get(f"/core/tokens/{identifier}/view_key/") app_password = key_resp["key"] print(f" APP_PASSWORD created: {app_password[:10]}...") return app_password