Files
cc-ci/runner/harness/sso.py
autonomic-bot 4d6b040ba7 feat(2): Q2.3 — dep resolver + SSO-setup harness primitives
- runner/harness/deps.py: dep resolver primitive (Phase 2 §4.2 / Q2.3).
  - declared_deps(recipe) reads DEPS list from tests/<recipe>/recipe_meta.py
  - dep_domain(parent, pr, ref, dep) — per-run domain per (parent, dep) pair
    so two recipes' deps of the same kind don't collide on a host
  - deploy_deps / teardown_deps — sequential deploy + reverse-order teardown
  - read/write of run-scoped $CCCI_DEPS_FILE
- runner/harness/sso.py: SSO-setup / OIDC-flow primitive (Phase 2 §4.2 / Q2.3).
  - setup_keycloak_realm: idempotent realm + confidential OIDC client +
    test user with generated 25-char alphanumeric password (class-B per §4.4-B);
    returns SsoCreds dict with discovery_url, token_url, all identifiers.
  - oidc_password_grant: exercises the password-grant OIDC flow; returns
    access_token (a JWT) or raises.
  - assert_discovery_endpoint: GET /.well-known/openid-configuration; asserts
    issuer matches the per-run provider domain+realm.
- runner/run_recipe_ci.py: wired in dep deploy BEFORE recipe-under-test, dep
  teardown LAST in finally (reverse order). DG4.1 deploy-count guard now
  expects 1 + len(deps_state) — accommodates declared deps without breaking
  the no-extra-deploys invariant.
- tests/conftest.py: deps_apps fixture reads $CCCI_DEPS_FILE -> dict mapping
  dep_recipe -> dep_domain.
- tests/unit/test_deps.py: 7 unit tests covering declared_deps parsing,
  per-(parent,dep) domain distinctness, run-state JSON write/load, env-var
  no-op semantics. 28/28 unit tests PASS on cc-ci.

Smoke test confirmed deploy_count == expected (1) when no deps declared
(custom-html install run, log /root/ccci-q2-deps-smoke.log).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 07:41:56 +01:00

287 lines
11 KiB
Python

"""SSO-setup / OIDC-flow harness primitive (Phase 2 §4.2 / Q2.3).
Given a deployed SSO provider (keycloak today; authentik in a follow-up), this module:
1. Reads the provider's admin password (the abra-generated `admin_password` secret in the
container's `/run/secrets/admin_password`).
2. Authenticates as admin (admin-cli password grant).
3. Creates a realm/client/test-user idempotently with cc-ci-controlled identifiers.
4. Returns a `SsoCreds` dict the dependent recipe's tests can use:
- `provider`, `provider_domain`, `realm`, `client_id`, `client_secret`
- `user`, `password`, `email`
- `discovery_url`, `token_url`
5. Provides `oidc_password_grant(...)` that performs the OIDC password-grant flow against the
provider, returns the access_token (a JWT).
Reusable by every SSO-dependent recipe (cryptpad, lasuite-docs, lasuite-meet, immich, etc.). Per
plan §4.4-B, generated client_secret + test_password are class-B run-scoped secrets that are
destroyed when the run's apps are torn down (the SSO provider app is torn down with the rest).
"""
from __future__ import annotations
import contextlib
import json
import os
import secrets
import ssl
import urllib.error
import urllib.parse
import urllib.request
from . import lifecycle
_CTX = ssl.create_default_context()
_CTX.check_hostname = False
_CTX.verify_mode = ssl.CERT_NONE
# ---------------------------------------------------------------------------
# Keycloak admin-API helpers (port + adaptation of tests/keycloak/kc_admin.py)
# ---------------------------------------------------------------------------
def _kc_admin_password(provider_domain: str) -> str:
"""Read the abra-generated admin_password from inside the running keycloak container."""
return lifecycle.exec_in_app(provider_domain, ["cat", "/run/secrets/admin_password"]).strip()
def _kc_admin_token(provider_domain: str, password: str) -> str:
data = urllib.parse.urlencode(
{
"grant_type": "password",
"client_id": "admin-cli",
"username": "admin",
"password": password,
}
).encode()
req = urllib.request.Request(
f"https://{provider_domain}/realms/master/protocol/openid-connect/token",
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
return json.load(r)["access_token"]
def _kc_admin_call(provider_domain: str, token: str, path: str, method: str = "GET", body=None):
"""Admin-API call. Returns (status, body_json_or_None, location_header)."""
data = json.dumps(body).encode() if body is not None else None
headers = {"Authorization": f"Bearer {token}"}
if data:
headers["Content-Type"] = "application/json"
req = urllib.request.Request(
f"https://{provider_domain}/admin{path}", data=data, headers=headers, method=method
)
try:
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
raw = r.read()
try:
parsed = json.loads(raw) if raw else None
except (json.JSONDecodeError, ValueError):
parsed = None
return r.status, parsed, r.headers.get("Location", "")
except urllib.error.HTTPError as e:
try:
raw = e.read()
parsed = json.loads(raw) if raw else None
except Exception: # noqa: BLE001
parsed = None
return e.code, parsed, e.headers.get("Location", "") if e.headers else ""
def setup_keycloak_realm(
provider_domain: str,
realm: str,
client_id: str,
redirect_uris: list[str] | None = None,
web_origins: list[str] | None = None,
) -> dict:
"""Create a realm + a confidential OIDC client + a test user idempotently. Returns an
`SsoCreds` dict with all the identifiers + generated secrets a dependent recipe needs.
- Generates a `client_secret` (32 hex chars) — run-scoped class-B per plan §4.4-B.
- Generates a test user `testuser` with a 25-char alphanumeric password.
- Returns urls (`discovery_url`, `token_url`) the dependent recipe can configure with.
"""
redirect_uris = redirect_uris or []
web_origins = web_origins or []
admin_pass = _kc_admin_password(provider_domain)
token = _kc_admin_token(provider_domain, admin_pass)
# 1) Realm
status, _, _ = _kc_admin_call(
provider_domain, token, "/realms", "POST", {"realm": realm, "enabled": True}
)
if status not in (201, 409):
raise RuntimeError(f"realm create failed: HTTP {status}")
# 2) Client (confidential, with secret we control)
client_secret = secrets.token_hex(16)
client_body = {
"clientId": client_id,
"enabled": True,
"secret": client_secret,
"publicClient": False,
"serviceAccountsEnabled": False,
"standardFlowEnabled": True,
"directAccessGrantsEnabled": True, # required for password grant
"redirectUris": redirect_uris or ["*"],
"webOrigins": web_origins or ["*"],
"protocol": "openid-connect",
}
status, _, location = _kc_admin_call(
provider_domain, token, f"/realms/{realm}/clients", "POST", client_body
)
if status == 409:
# Client already exists — find its internal id and update the secret to our known value.
s2, clients, _ = _kc_admin_call(
provider_domain, token, f"/realms/{realm}/clients?clientId={client_id}", "GET"
)
if s2 == 200 and isinstance(clients, list) and clients:
client_internal_id = clients[0]["id"]
# Update the secret via PUT /clients/{id} (Keycloak admin API)
_kc_admin_call(
provider_domain,
token,
f"/realms/{realm}/clients/{client_internal_id}",
"PUT",
{**client_body, "id": client_internal_id},
)
else:
raise RuntimeError(f"client {client_id} exists but couldn't be resolved")
elif status != 201:
raise RuntimeError(f"client create failed: HTTP {status}")
# 3) Test user (idempotent: create or skip)
user = "testuser"
email = f"{user}@example.test"
password = secrets.token_urlsafe(18)[:24] + "A1" # >= 8 chars, mixed alnum, defeats policies
user_body = {
"username": user,
"email": email,
"enabled": True,
"emailVerified": True,
"firstName": "Test",
"lastName": "User",
"credentials": [{"type": "password", "value": password, "temporary": False}],
}
status, _, location = _kc_admin_call(
provider_domain, token, f"/realms/{realm}/users", "POST", user_body
)
if status == 409:
# User already exists — reset their password to our known value
s2, users, _ = _kc_admin_call(
provider_domain, token, f"/realms/{realm}/users?username={user}", "GET"
)
if s2 == 200 and isinstance(users, list) and users:
user_internal_id = users[0]["id"]
_kc_admin_call(
provider_domain,
token,
f"/realms/{realm}/users/{user_internal_id}/reset-password",
"PUT",
{"type": "password", "value": password, "temporary": False},
)
else:
raise RuntimeError(f"user {user} exists but couldn't be resolved")
elif status not in (201, 204):
raise RuntimeError(f"user create failed: HTTP {status}")
base = f"https://{provider_domain}/realms/{realm}/protocol/openid-connect"
return {
"provider": "keycloak",
"provider_domain": provider_domain,
"realm": realm,
"client_id": client_id,
"client_secret": client_secret,
"user": user,
"password": password,
"email": email,
"discovery_url": f"https://{provider_domain}/realms/{realm}/.well-known/openid-configuration",
"token_url": f"{base}/token",
"auth_url": f"{base}/auth",
"userinfo_url": f"{base}/userinfo",
}
# ---------------------------------------------------------------------------
# OIDC flows
# ---------------------------------------------------------------------------
def oidc_password_grant(creds: dict) -> str:
"""Exercise the OIDC password grant against the provider; return the access_token (a JWT).
Raises if the grant doesn't succeed.
Reusable by dependent recipes' SSO tests."""
data = urllib.parse.urlencode(
{
"grant_type": "password",
"client_id": creds["client_id"],
"client_secret": creds["client_secret"],
"username": creds["user"],
"password": creds["password"],
"scope": "openid email profile",
}
).encode()
req = urllib.request.Request(
creds["token_url"],
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
body = json.load(r)
except urllib.error.HTTPError as e:
try:
err = e.read().decode(errors="replace")[:200]
except Exception: # noqa: BLE001
err = ""
raise RuntimeError(f"password grant HTTP {e.code}: {err}") from e
access_token = body.get("access_token")
if not access_token:
raise RuntimeError(f"password grant returned no access_token: keys={list(body.keys())}")
return access_token
def assert_discovery_endpoint(creds: dict) -> dict:
"""GET the provider's OIDC discovery endpoint; assert it returns a well-formed JSON config with
the expected `issuer`. Returns the discovery JSON."""
req = urllib.request.Request(creds["discovery_url"], method="GET")
with urllib.request.urlopen(req, timeout=20, context=_CTX) as r:
body = json.load(r)
expected_issuer = f"https://{creds['provider_domain']}/realms/{creds['realm']}"
issuer = body.get("issuer", "")
if issuer != expected_issuer:
raise AssertionError(f"OIDC discovery issuer={issuer!r} != {expected_issuer!r}")
return body
# ---------------------------------------------------------------------------
# Persistence for cross-test creds (class-B per §4.4-B)
# ---------------------------------------------------------------------------
def write_sso_creds(creds: dict) -> None:
"""Persist creds to $CCCI_SSO_CREDS_FILE for the dependent recipe's tests to read. The file is
in /tmp (the runner's per-process tempdir) and deleted at run end alongside the deps file."""
path = os.environ.get("CCCI_SSO_CREDS_FILE")
if not path:
return
with contextlib.suppress(OSError), open(path, "w") as f:
json.dump(creds, f)
def load_sso_creds() -> dict | None:
"""Load the run-scoped SSO creds. Returns None if not present."""
path = os.environ.get("CCCI_SSO_CREDS_FILE")
if not path or not os.path.exists(path):
return None
try:
with open(path) as f:
return json.load(f)
except (OSError, ValueError):
return None