Files
recipe-maintainer/lib/authentik.py
autonomic-bot f283a371bb recipe-maintainer: public snapshot (secrets + deployment plans removed, single commit)
Sanitized single-commit public mirror of recipe-maintainer.
- Removed test-ssh/.testenv (live creds); added test-ssh/.testenv.example placeholders.
- Removed plans/ and planned-updates/ (deployment-planning docs) so no client/
  deployment domains appear in the public repo.
- All other secret stores were already gitignored.
- docs.coopcloud.tech retained as a submodule (public upstream).
2026-06-16 20:18:24 +00:00

200 lines
7.9 KiB
Python

"""Authentik admin API client.
Provides an AuthentikAdmin class for managing OAuth2 providers, applications,
and users via the Authentik REST API. Used by setup_authentik_integration.py
and setup_docs_integration.py scripts.
"""
import json
import urllib.error
import urllib.parse
import urllib.request
class AuthentikAdmin:
"""Client for the Authentik Admin REST API."""
def __init__(self, base_url: str, token: str):
self.base_url = base_url.rstrip("/")
self.api_url = f"{self.base_url}/api/v3"
self.token = token
def _request(self, method: str, endpoint: str, *,
data: dict | None = None,
timeout: int = 30) -> tuple[int, dict | list | None]:
"""Make an authenticated API request."""
url = f"{self.api_url}{endpoint}"
body = json.dumps(data).encode() if data is not None else None
req = urllib.request.Request(url, data=body, method=method)
req.add_header("Authorization", f"Bearer {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read()
if not raw:
return resp.getcode(), None
return resp.getcode(), json.loads(raw)
except urllib.error.HTTPError as e:
try:
raw = e.read().decode(errors="replace")
return e.code, json.loads(raw) if raw.strip() else None
except Exception:
return e.code, None
def _get_first(self, endpoint: str, match_field: str,
match_value: str) -> dict | None:
"""GET a list endpoint and return the first result matching a field."""
_, data = self._request("GET", endpoint)
if not data or "results" not in data:
return None
for r in data["results"]:
if r.get(match_field) == match_value:
return r
return None
def resolve_uuids(self) -> dict:
"""Look up Authentik flow/mapping/certificate UUIDs dynamically."""
print("=== Resolving Authentik UUIDs ===", flush=True)
uuids = {}
lookups = [
("authorization_flow",
"/flows/instances/?slug=default-provider-authorization-implicit-consent"),
("invalidation_flow",
"/flows/instances/?slug=default-provider-invalidation-flow"),
("authentication_flow",
"/flows/instances/?slug=default-authentication-flow"),
("signing_key",
"/crypto/certificatekeypairs/?name=authentik+Self-signed+Certificate"),
("scope_openid",
"/propertymappings/all/?managed=goauthentik.io/providers/oauth2/scope-openid"),
("scope_email",
"/propertymappings/all/?managed=goauthentik.io/providers/oauth2/scope-email"),
("scope_profile",
"/propertymappings/all/?managed=goauthentik.io/providers/oauth2/scope-profile"),
]
for name, endpoint in lookups:
_, data = self._request("GET", endpoint)
pk = data["results"][0]["pk"]
uuids[name] = pk
print(f" {name}: {pk}", flush=True)
return uuids
def ensure_provider(self, name: str, client_id: str,
redirect_uris: list[dict],
uuids: dict) -> tuple[int, str]:
"""Create an OAuth2 provider if needed. Returns (provider_pk, client_secret)."""
print(f"=== Ensure OAuth2 provider '{name}' ===", flush=True)
existing = self._get_first(
f"/providers/oauth2/?search={name}", "name", name)
if existing:
pk = existing["pk"]
print(f" Provider '{name}' already exists (pk: {pk}), fetching secret",
flush=True)
_, provider_data = self._request("GET", f"/providers/oauth2/{pk}/")
client_secret = provider_data["client_secret"]
else:
_, resp = self._request("POST", "/providers/oauth2/", data={
"name": name,
"client_type": "confidential",
"client_id": client_id,
"authorization_flow": uuids["authorization_flow"],
"authentication_flow": uuids["authentication_flow"],
"invalidation_flow": uuids["invalidation_flow"],
"redirect_uris": redirect_uris,
"property_mappings": [
uuids["scope_openid"],
uuids["scope_email"],
uuids["scope_profile"],
],
"signing_key": uuids["signing_key"],
"include_claims_in_id_token": True,
})
pk = resp["pk"]
client_secret = resp["client_secret"]
print(f" Created provider (pk: {pk})", flush=True)
print(f" Client ID: {client_id}", flush=True)
print(f" Client secret: {client_secret}", flush=True)
return pk, client_secret
def ensure_application(self, name: str, slug: str,
provider_pk: int, launch_url: str) -> None:
"""Create an application if it doesn't exist."""
print(f"=== Ensure application '{slug}' ===", flush=True)
existing = self._get_first(
f"/core/applications/?slug={slug}", "slug", slug)
if existing:
print(f" Application '{slug}' already exists, skipping", flush=True)
else:
self._request("POST", "/core/applications/", data={
"name": name,
"slug": slug,
"provider": provider_pk,
"meta_launch_url": launch_url,
})
print(f" Created application '{slug}'", flush=True)
def ensure_user(self, username: str, email: str,
password: str) -> int:
"""Create a user if needed, set password. Returns user pk."""
print(f"=== Ensure user '{username}' ===", flush=True)
existing = self._get_first(
f"/core/users/?search={username}", "username", username)
if existing:
user_pk = existing["pk"]
print(f" User '{username}' already exists (pk: {user_pk})",
flush=True)
else:
_, resp = self._request("POST", "/core/users/", data={
"username": username,
"name": "Test User",
"email": email,
"is_active": True,
})
user_pk = resp["pk"]
print(f" Created user '{username}' (pk: {user_pk})", flush=True)
# Set password
print(f" Setting password for '{username}' ...", flush=True)
self._request(
"POST", f"/core/users/{user_pk}/set_password/",
data={"password": password},
)
print(" Password set", flush=True)
return user_pk
def ensure_app_password(self, user_pk: int,
identifier: str = "testuser-app-password") -> str:
"""Create an APP_PASSWORD token for password grant. Returns the key."""
print(" Creating APP_PASSWORD token for password grant ...", flush=True)
# Delete existing token if present
existing = self._get_first(
f"/core/tokens/?identifier={identifier}", "identifier", identifier)
if existing:
self._request("DELETE", f"/core/tokens/{identifier}/")
print(" Deleted existing APP_PASSWORD token", flush=True)
self._request("POST", "/core/tokens/", data={
"identifier": identifier,
"intent": "app_password",
"user": user_pk,
"description": "Test user app password for OIDC password grant",
"expiring": False,
})
_, key_data = self._request(
"GET", f"/core/tokens/{identifier}/view_key/")
app_password = key_data["key"]
print(f" APP_PASSWORD created: {app_password[:10]}...", flush=True)
return app_password