Files
recipe-maintainer/utils/authentik_client.py
autonomic-bot f283a371bb recipe-maintainer: public snapshot (secrets + deployment plans removed, single commit)
Sanitized single-commit public mirror of recipe-maintainer.
- Removed test-ssh/.testenv (live creds); added test-ssh/.testenv.example placeholders.
- Removed plans/ and planned-updates/ (deployment-planning docs) so no client/
  deployment domains appear in the public repo.
- All other secret stores were already gitignored.
- docs.coopcloud.tech retained as a submodule (public upstream).
2026-06-16 20:18:24 +00:00

227 lines
8.1 KiB
Python

"""Shared Authentik API client for SSO integration scripts.
No pip dependencies -- uses urllib.request + json from stdlib.
"""
import json
import urllib.request
import urllib.error
import urllib.parse
class AuthentikClient:
"""Encapsulates all Authentik API interactions for SSO setup."""
def __init__(self, base_url: str, token: str):
self.base_url = base_url.rstrip("/")
self.api_url = f"{self.base_url}/api/v3"
self.token = token
# -- low-level helpers ---------------------------------------------------
def _request(self, method, endpoint, data=None):
"""Make an authenticated API request. Returns parsed JSON."""
url = f"{self.api_url}{endpoint}"
body = json.dumps(data).encode() if data is not None else None
req = urllib.request.Request(url, data=body, method=method)
req.add_header("Authorization", f"Bearer {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
raw = resp.read()
return json.loads(raw) if raw else {}
except urllib.error.HTTPError as e:
raw = e.read().decode(errors="replace")
raise RuntimeError(
f"Authentik API {method} {endpoint} returned {e.code}: {raw}"
) from e
def _get(self, endpoint):
return self._request("GET", endpoint)
def _post(self, endpoint, data):
return self._request("POST", endpoint, data)
def _delete(self, endpoint):
return self._request("DELETE", endpoint)
# -- resolve UUIDs -------------------------------------------------------
def resolve_uuids(self):
"""Fetch PKs for default flows, signing key, and scope mappings.
Returns a dict with keys:
authorization_flow, invalidation_flow, authentication_flow,
signing_key, scope_openid, scope_email, scope_profile
"""
uuids = {}
# Flows
flow_slugs = {
"authorization_flow": "default-provider-authorization-implicit-consent",
"invalidation_flow": "default-provider-invalidation-flow",
"authentication_flow": "default-authentication-flow",
}
for key, slug in flow_slugs.items():
resp = self._get(f"/flows/instances/?slug={slug}")
results = resp.get("results", [])
if not results:
raise RuntimeError(f"Flow '{slug}' not found in Authentik")
uuids[key] = results[0]["pk"]
print(f" {key}: {uuids[key]}")
# Signing key
resp = self._get(
"/crypto/certificatekeypairs/"
"?name=authentik+Self-signed+Certificate"
)
results = resp.get("results", [])
if not results:
raise RuntimeError("Authentik Self-signed Certificate not found")
uuids["signing_key"] = results[0]["pk"]
print(f" signing_key: {uuids['signing_key']}")
# Scope property mappings
scope_managed = {
"scope_openid": "goauthentik.io/providers/oauth2/scope-openid",
"scope_email": "goauthentik.io/providers/oauth2/scope-email",
"scope_profile": "goauthentik.io/providers/oauth2/scope-profile",
}
for key, managed in scope_managed.items():
resp = self._get(f"/propertymappings/all/?managed={managed}")
results = resp.get("results", [])
if not results:
raise RuntimeError(f"Scope mapping '{managed}' not found")
uuids[key] = results[0]["pk"]
print(f" {key}: {uuids[key]}")
return uuids
# -- ensure OAuth2 provider ----------------------------------------------
def ensure_oauth2_provider(self, name, client_id, redirect_uris, uuids):
"""Create or reuse an OAuth2 provider.
redirect_uris: list of {"matching_mode": "strict", "url": "..."}
Returns (provider_pk, client_secret).
"""
resp = self._get(
f"/providers/oauth2/?search={urllib.parse.quote(name)}"
)
for r in resp.get("results", []):
if r["name"] == name:
pk = r["pk"]
print(f" Provider '{name}' already exists (pk: {pk})")
detail = self._get(f"/providers/oauth2/{pk}/")
return pk, detail["client_secret"]
# Create new provider
provider = self._post("/providers/oauth2/", {
"name": name,
"client_type": "confidential",
"client_id": client_id,
"authorization_flow": uuids["authorization_flow"],
"authentication_flow": uuids["authentication_flow"],
"invalidation_flow": uuids["invalidation_flow"],
"redirect_uris": redirect_uris,
"property_mappings": [
uuids["scope_openid"],
uuids["scope_email"],
uuids["scope_profile"],
],
"signing_key": uuids["signing_key"],
"include_claims_in_id_token": True,
})
pk = provider["pk"]
secret = provider["client_secret"]
print(f" Created provider '{name}' (pk: {pk})")
return pk, secret
# -- ensure application --------------------------------------------------
def ensure_application(self, name, slug, provider_pk, launch_url):
"""Create or reuse an application linked to the provider.
Returns the application slug.
"""
resp = self._get(f"/core/applications/?slug={urllib.parse.quote(slug)}")
for r in resp.get("results", []):
if r["slug"] == slug:
print(f" Application '{slug}' already exists")
return slug
self._post("/core/applications/", {
"name": name,
"slug": slug,
"provider": provider_pk,
"meta_launch_url": launch_url,
})
print(f" Created application '{slug}'")
return slug
# -- ensure test user ----------------------------------------------------
def ensure_test_user(self, username, email, password):
"""Create or reuse a user and set their password.
Returns user_pk.
"""
resp = self._get(
f"/core/users/?search={urllib.parse.quote(username)}"
)
user_pk = None
for r in resp.get("results", []):
if r["username"] == username:
user_pk = r["pk"]
print(f" User '{username}' already exists (pk: {user_pk})")
break
if user_pk is None:
user = self._post("/core/users/", {
"username": username,
"name": "Test User",
"email": email,
"is_active": True,
})
user_pk = user["pk"]
print(f" Created user '{username}' (pk: {user_pk})")
# Always set password
self._post(f"/core/users/{user_pk}/set_password/", {
"password": password,
})
print(f" Password set for '{username}'")
return user_pk
# -- ensure app password -------------------------------------------------
def ensure_app_password(self, user_pk, identifier):
"""Create an APP_PASSWORD token (deletes existing one first).
Returns the app password string.
"""
# Delete existing token if present
resp = self._get(
f"/core/tokens/?identifier={urllib.parse.quote(identifier)}"
)
if resp.get("results"):
try:
self._delete(f"/core/tokens/{identifier}/")
print(f" Deleted existing token '{identifier}'")
except RuntimeError:
pass # Already gone
self._post("/core/tokens/", {
"identifier": identifier,
"intent": "app_password",
"user": user_pk,
"description": "App password for OIDC password grant",
"expiring": False,
})
key_resp = self._get(f"/core/tokens/{identifier}/view_key/")
app_password = key_resp["key"]
print(f" APP_PASSWORD created: {app_password[:10]}...")
return app_password