Sanitized single-commit public mirror of recipe-maintainer. - Removed test-ssh/.testenv (live creds); added test-ssh/.testenv.example placeholders. - Removed plans/ and planned-updates/ (deployment-planning docs) so no client/ deployment domains appear in the public repo. - All other secret stores were already gitignored. - docs.coopcloud.tech retained as a submodule (public upstream).
227 lines
8.1 KiB
Python
227 lines
8.1 KiB
Python
"""Shared Authentik API client for SSO integration scripts.
|
|
|
|
No pip dependencies -- uses urllib.request + json from stdlib.
|
|
"""
|
|
|
|
import json
|
|
import urllib.request
|
|
import urllib.error
|
|
import urllib.parse
|
|
|
|
|
|
class AuthentikClient:
|
|
"""Encapsulates all Authentik API interactions for SSO setup."""
|
|
|
|
def __init__(self, base_url: str, token: str):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.api_url = f"{self.base_url}/api/v3"
|
|
self.token = token
|
|
|
|
# -- low-level helpers ---------------------------------------------------
|
|
|
|
def _request(self, method, endpoint, data=None):
|
|
"""Make an authenticated API request. Returns parsed JSON."""
|
|
url = f"{self.api_url}{endpoint}"
|
|
body = json.dumps(data).encode() if data is not None else None
|
|
req = urllib.request.Request(url, data=body, method=method)
|
|
req.add_header("Authorization", f"Bearer {self.token}")
|
|
req.add_header("Content-Type", "application/json")
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
raw = resp.read()
|
|
return json.loads(raw) if raw else {}
|
|
except urllib.error.HTTPError as e:
|
|
raw = e.read().decode(errors="replace")
|
|
raise RuntimeError(
|
|
f"Authentik API {method} {endpoint} returned {e.code}: {raw}"
|
|
) from e
|
|
|
|
def _get(self, endpoint):
|
|
return self._request("GET", endpoint)
|
|
|
|
def _post(self, endpoint, data):
|
|
return self._request("POST", endpoint, data)
|
|
|
|
def _delete(self, endpoint):
|
|
return self._request("DELETE", endpoint)
|
|
|
|
# -- resolve UUIDs -------------------------------------------------------
|
|
|
|
def resolve_uuids(self):
|
|
"""Fetch PKs for default flows, signing key, and scope mappings.
|
|
|
|
Returns a dict with keys:
|
|
authorization_flow, invalidation_flow, authentication_flow,
|
|
signing_key, scope_openid, scope_email, scope_profile
|
|
"""
|
|
uuids = {}
|
|
|
|
# Flows
|
|
flow_slugs = {
|
|
"authorization_flow": "default-provider-authorization-implicit-consent",
|
|
"invalidation_flow": "default-provider-invalidation-flow",
|
|
"authentication_flow": "default-authentication-flow",
|
|
}
|
|
for key, slug in flow_slugs.items():
|
|
resp = self._get(f"/flows/instances/?slug={slug}")
|
|
results = resp.get("results", [])
|
|
if not results:
|
|
raise RuntimeError(f"Flow '{slug}' not found in Authentik")
|
|
uuids[key] = results[0]["pk"]
|
|
print(f" {key}: {uuids[key]}")
|
|
|
|
# Signing key
|
|
resp = self._get(
|
|
"/crypto/certificatekeypairs/"
|
|
"?name=authentik+Self-signed+Certificate"
|
|
)
|
|
results = resp.get("results", [])
|
|
if not results:
|
|
raise RuntimeError("Authentik Self-signed Certificate not found")
|
|
uuids["signing_key"] = results[0]["pk"]
|
|
print(f" signing_key: {uuids['signing_key']}")
|
|
|
|
# Scope property mappings
|
|
scope_managed = {
|
|
"scope_openid": "goauthentik.io/providers/oauth2/scope-openid",
|
|
"scope_email": "goauthentik.io/providers/oauth2/scope-email",
|
|
"scope_profile": "goauthentik.io/providers/oauth2/scope-profile",
|
|
}
|
|
for key, managed in scope_managed.items():
|
|
resp = self._get(f"/propertymappings/all/?managed={managed}")
|
|
results = resp.get("results", [])
|
|
if not results:
|
|
raise RuntimeError(f"Scope mapping '{managed}' not found")
|
|
uuids[key] = results[0]["pk"]
|
|
print(f" {key}: {uuids[key]}")
|
|
|
|
return uuids
|
|
|
|
# -- ensure OAuth2 provider ----------------------------------------------
|
|
|
|
def ensure_oauth2_provider(self, name, client_id, redirect_uris, uuids):
|
|
"""Create or reuse an OAuth2 provider.
|
|
|
|
redirect_uris: list of {"matching_mode": "strict", "url": "..."}
|
|
Returns (provider_pk, client_secret).
|
|
"""
|
|
resp = self._get(
|
|
f"/providers/oauth2/?search={urllib.parse.quote(name)}"
|
|
)
|
|
for r in resp.get("results", []):
|
|
if r["name"] == name:
|
|
pk = r["pk"]
|
|
print(f" Provider '{name}' already exists (pk: {pk})")
|
|
detail = self._get(f"/providers/oauth2/{pk}/")
|
|
return pk, detail["client_secret"]
|
|
|
|
# Create new provider
|
|
provider = self._post("/providers/oauth2/", {
|
|
"name": name,
|
|
"client_type": "confidential",
|
|
"client_id": client_id,
|
|
"authorization_flow": uuids["authorization_flow"],
|
|
"authentication_flow": uuids["authentication_flow"],
|
|
"invalidation_flow": uuids["invalidation_flow"],
|
|
"redirect_uris": redirect_uris,
|
|
"property_mappings": [
|
|
uuids["scope_openid"],
|
|
uuids["scope_email"],
|
|
uuids["scope_profile"],
|
|
],
|
|
"signing_key": uuids["signing_key"],
|
|
"include_claims_in_id_token": True,
|
|
})
|
|
pk = provider["pk"]
|
|
secret = provider["client_secret"]
|
|
print(f" Created provider '{name}' (pk: {pk})")
|
|
return pk, secret
|
|
|
|
# -- ensure application --------------------------------------------------
|
|
|
|
def ensure_application(self, name, slug, provider_pk, launch_url):
|
|
"""Create or reuse an application linked to the provider.
|
|
|
|
Returns the application slug.
|
|
"""
|
|
resp = self._get(f"/core/applications/?slug={urllib.parse.quote(slug)}")
|
|
for r in resp.get("results", []):
|
|
if r["slug"] == slug:
|
|
print(f" Application '{slug}' already exists")
|
|
return slug
|
|
|
|
self._post("/core/applications/", {
|
|
"name": name,
|
|
"slug": slug,
|
|
"provider": provider_pk,
|
|
"meta_launch_url": launch_url,
|
|
})
|
|
print(f" Created application '{slug}'")
|
|
return slug
|
|
|
|
# -- ensure test user ----------------------------------------------------
|
|
|
|
def ensure_test_user(self, username, email, password):
|
|
"""Create or reuse a user and set their password.
|
|
|
|
Returns user_pk.
|
|
"""
|
|
resp = self._get(
|
|
f"/core/users/?search={urllib.parse.quote(username)}"
|
|
)
|
|
user_pk = None
|
|
for r in resp.get("results", []):
|
|
if r["username"] == username:
|
|
user_pk = r["pk"]
|
|
print(f" User '{username}' already exists (pk: {user_pk})")
|
|
break
|
|
|
|
if user_pk is None:
|
|
user = self._post("/core/users/", {
|
|
"username": username,
|
|
"name": "Test User",
|
|
"email": email,
|
|
"is_active": True,
|
|
})
|
|
user_pk = user["pk"]
|
|
print(f" Created user '{username}' (pk: {user_pk})")
|
|
|
|
# Always set password
|
|
self._post(f"/core/users/{user_pk}/set_password/", {
|
|
"password": password,
|
|
})
|
|
print(f" Password set for '{username}'")
|
|
|
|
return user_pk
|
|
|
|
# -- ensure app password -------------------------------------------------
|
|
|
|
def ensure_app_password(self, user_pk, identifier):
|
|
"""Create an APP_PASSWORD token (deletes existing one first).
|
|
|
|
Returns the app password string.
|
|
"""
|
|
# Delete existing token if present
|
|
resp = self._get(
|
|
f"/core/tokens/?identifier={urllib.parse.quote(identifier)}"
|
|
)
|
|
if resp.get("results"):
|
|
try:
|
|
self._delete(f"/core/tokens/{identifier}/")
|
|
print(f" Deleted existing token '{identifier}'")
|
|
except RuntimeError:
|
|
pass # Already gone
|
|
|
|
self._post("/core/tokens/", {
|
|
"identifier": identifier,
|
|
"intent": "app_password",
|
|
"user": user_pk,
|
|
"description": "App password for OIDC password grant",
|
|
"expiring": False,
|
|
})
|
|
|
|
key_resp = self._get(f"/core/tokens/{identifier}/view_key/")
|
|
app_password = key_resp["key"]
|
|
print(f" APP_PASSWORD created: {app_password[:10]}...")
|
|
return app_password
|