Per operator-2026-05-28 SSO-dep plan (plan-sso-dep-testing.md). Substantial orchestrator
restructuring:
NEW LIFECYCLE ORDER:
1. Recipe deploy ALONE (no deps).
2. install / upgrade / backup / restore — recipe-only generic tiers.
3. setup_custom_tests step (NEW):
a. Deploy each declared dep + provision realm/client/test-user via harness.sso.
b. Write $CCCI_DEPS_FILE in dict shape {dep_recipe: {domain, realm, client_id, client_secret,
admin_user, admin_password, discovery_url, token_url, ...}}.
c. Run tests/<recipe>/setup_custom_tests.sh hook (jq-readable; wires OIDC env via abra
secret insert + .env edits + in-place 'abra app deploy --force --chaos').
4. CUSTOM tier with deps-ready flag; @pytest.mark.requires_deps tests skip with
'deps-not-ready: <reason>' when setup_custom_tests fails. NON-deps custom tests still run
normally — FAILURE ISOLATION (a DoD item per plan).
5. Teardown: recipe first, deps in reverse declaration order.
Harness changes:
- runner/run_recipe_ci.py: deps deploy moves from BEFORE recipe deploy to AFTER restore tier.
Adds _enrich_deps_with_sso() + _run_setup_custom_tests_hook(). DG4.1 generalised to
'one abra app new per app' (recipe + each dep); in-place redeploys (\--force) don't count.
- runner/harness/deps.py: write_run_state + load_run_state accept dict OR list shape;
deps_as_dict() coerces either to a recipe→entry map.
- runner/harness/sso.py: admin_password_inside() public re-export.
- tests/conftest.py: deps_creds fixture (full creds dict); deps_apps fixture flattens to
recipe→domain string. pytest_collection_modifyitems hook skips
\@pytest.mark.requires_deps tests when CCCI_DEPS_READY=0.
pytest_configure registers the marker.
Recipe content:
- tests/lasuite-docs/setup_custom_tests.sh: NEW hook reads $CCCI_DEPS_FILE via jq;
inserts oidc_rpcs secret at BUMPED version (v1→v2) since abra app new -S generates v1 first
and Swarm forbids overwriting; updates SECRET_OIDC_RPCS_VERSION in .env; writes 9 OIDC env
vars (REALM/DISCOVERY/AUTH/TOKEN/USERINFO/LOGOUT/JWKS/CLIENT_ID/SCOPES); ensures trailing
newline on .env so writes don't concatenate (caught a 'TIMEOUT=900OIDC_REALM=...' bug);
triggers in-place 'abra app deploy --force --chaos --no-input'.
- tests/lasuite-docs/functional/test_oidc_with_keycloak.py: refactored to consume deps_creds
fixture (no longer calls setup_keycloak_realm itself — the orchestrator does it in
setup_custom_tests). Marked \@pytest.mark.requires_deps.
Cold-verifiable on cc-ci (log /root/ccci-refactor-lasuite-r5.log):
RECIPE=lasuite-docs STAGES=install,custom cc-ci-run runner/run_recipe_ci.py
install: PASS, custom: 3 PASS incl. test_oidc_password_grant_against_dep_keycloak.
deploy-count = 2 (expect 2) — DG4.1 generalised holds.
Smoke regression: RECIPE=custom-html STAGES=install,custom → 5 PASS, deploy-count=1.
Closes DEFERRED.md #5 (lasuite-docs OIDC parity ports via this plan).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
293 lines
11 KiB
Python
293 lines
11 KiB
Python
"""SSO-setup / OIDC-flow harness primitive (Phase 2 §4.2 / Q2.3).
|
|
|
|
Given a deployed SSO provider (keycloak today; authentik in a follow-up), this module:
|
|
1. Reads the provider's admin password (the abra-generated `admin_password` secret in the
|
|
container's `/run/secrets/admin_password`).
|
|
2. Authenticates as admin (admin-cli password grant).
|
|
3. Creates a realm/client/test-user idempotently with cc-ci-controlled identifiers.
|
|
4. Returns a `SsoCreds` dict the dependent recipe's tests can use:
|
|
- `provider`, `provider_domain`, `realm`, `client_id`, `client_secret`
|
|
- `user`, `password`, `email`
|
|
- `discovery_url`, `token_url`
|
|
5. Provides `oidc_password_grant(...)` that performs the OIDC password-grant flow against the
|
|
provider, returns the access_token (a JWT).
|
|
|
|
Reusable by every SSO-dependent recipe (cryptpad, lasuite-docs, lasuite-meet, immich, etc.). Per
|
|
plan §4.4-B, generated client_secret + test_password are class-B run-scoped secrets that are
|
|
destroyed when the run's apps are torn down (the SSO provider app is torn down with the rest).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import contextlib
|
|
import json
|
|
import os
|
|
import secrets
|
|
import ssl
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
from . import lifecycle
|
|
|
|
_CTX = ssl.create_default_context()
|
|
_CTX.check_hostname = False
|
|
_CTX.verify_mode = ssl.CERT_NONE
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Keycloak admin-API helpers (port + adaptation of tests/keycloak/kc_admin.py)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _kc_admin_password(provider_domain: str) -> str:
|
|
"""Read the abra-generated admin_password from inside the running keycloak container."""
|
|
return lifecycle.exec_in_app(provider_domain, ["cat", "/run/secrets/admin_password"]).strip()
|
|
|
|
|
|
def _kc_admin_token(provider_domain: str, password: str) -> str:
|
|
data = urllib.parse.urlencode(
|
|
{
|
|
"grant_type": "password",
|
|
"client_id": "admin-cli",
|
|
"username": "admin",
|
|
"password": password,
|
|
}
|
|
).encode()
|
|
req = urllib.request.Request(
|
|
f"https://{provider_domain}/realms/master/protocol/openid-connect/token",
|
|
data=data,
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
method="POST",
|
|
)
|
|
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
|
|
return json.load(r)["access_token"]
|
|
|
|
|
|
def _kc_admin_call(provider_domain: str, token: str, path: str, method: str = "GET", body=None):
|
|
"""Admin-API call. Returns (status, body_json_or_None, location_header)."""
|
|
data = json.dumps(body).encode() if body is not None else None
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
if data:
|
|
headers["Content-Type"] = "application/json"
|
|
req = urllib.request.Request(
|
|
f"https://{provider_domain}/admin{path}", data=data, headers=headers, method=method
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
|
|
raw = r.read()
|
|
try:
|
|
parsed = json.loads(raw) if raw else None
|
|
except (json.JSONDecodeError, ValueError):
|
|
parsed = None
|
|
return r.status, parsed, r.headers.get("Location", "")
|
|
except urllib.error.HTTPError as e:
|
|
try:
|
|
raw = e.read()
|
|
parsed = json.loads(raw) if raw else None
|
|
except Exception: # noqa: BLE001
|
|
parsed = None
|
|
return e.code, parsed, e.headers.get("Location", "") if e.headers else ""
|
|
|
|
|
|
def setup_keycloak_realm(
|
|
provider_domain: str,
|
|
realm: str,
|
|
client_id: str,
|
|
redirect_uris: list[str] | None = None,
|
|
web_origins: list[str] | None = None,
|
|
) -> dict:
|
|
"""Create a realm + a confidential OIDC client + a test user idempotently. Returns an
|
|
`SsoCreds` dict with all the identifiers + generated secrets a dependent recipe needs.
|
|
|
|
- Generates a `client_secret` (32 hex chars) — run-scoped class-B per plan §4.4-B.
|
|
- Generates a test user `testuser` with a 25-char alphanumeric password.
|
|
- Returns urls (`discovery_url`, `token_url`) the dependent recipe can configure with.
|
|
"""
|
|
redirect_uris = redirect_uris or []
|
|
web_origins = web_origins or []
|
|
admin_pass = _kc_admin_password(provider_domain)
|
|
token = _kc_admin_token(provider_domain, admin_pass)
|
|
|
|
# 1) Realm
|
|
status, _, _ = _kc_admin_call(
|
|
provider_domain, token, "/realms", "POST", {"realm": realm, "enabled": True}
|
|
)
|
|
if status not in (201, 409):
|
|
raise RuntimeError(f"realm create failed: HTTP {status}")
|
|
|
|
# 2) Client (confidential, with secret we control)
|
|
client_secret = secrets.token_hex(16)
|
|
client_body = {
|
|
"clientId": client_id,
|
|
"enabled": True,
|
|
"secret": client_secret,
|
|
"publicClient": False,
|
|
"serviceAccountsEnabled": False,
|
|
"standardFlowEnabled": True,
|
|
"directAccessGrantsEnabled": True, # required for password grant
|
|
"redirectUris": redirect_uris or ["*"],
|
|
"webOrigins": web_origins or ["*"],
|
|
"protocol": "openid-connect",
|
|
}
|
|
status, _, location = _kc_admin_call(
|
|
provider_domain, token, f"/realms/{realm}/clients", "POST", client_body
|
|
)
|
|
if status == 409:
|
|
# Client already exists — find its internal id and update the secret to our known value.
|
|
s2, clients, _ = _kc_admin_call(
|
|
provider_domain, token, f"/realms/{realm}/clients?clientId={client_id}", "GET"
|
|
)
|
|
if s2 == 200 and isinstance(clients, list) and clients:
|
|
client_internal_id = clients[0]["id"]
|
|
# Update the secret via PUT /clients/{id} (Keycloak admin API)
|
|
_kc_admin_call(
|
|
provider_domain,
|
|
token,
|
|
f"/realms/{realm}/clients/{client_internal_id}",
|
|
"PUT",
|
|
{**client_body, "id": client_internal_id},
|
|
)
|
|
else:
|
|
raise RuntimeError(f"client {client_id} exists but couldn't be resolved")
|
|
elif status != 201:
|
|
raise RuntimeError(f"client create failed: HTTP {status}")
|
|
|
|
# 3) Test user (idempotent: create or skip)
|
|
user = "testuser"
|
|
email = f"{user}@example.test"
|
|
password = secrets.token_urlsafe(18)[:24] + "A1" # >= 8 chars, mixed alnum, defeats policies
|
|
user_body = {
|
|
"username": user,
|
|
"email": email,
|
|
"enabled": True,
|
|
"emailVerified": True,
|
|
"firstName": "Test",
|
|
"lastName": "User",
|
|
"credentials": [{"type": "password", "value": password, "temporary": False}],
|
|
}
|
|
status, _, location = _kc_admin_call(
|
|
provider_domain, token, f"/realms/{realm}/users", "POST", user_body
|
|
)
|
|
if status == 409:
|
|
# User already exists — reset their password to our known value
|
|
s2, users, _ = _kc_admin_call(
|
|
provider_domain, token, f"/realms/{realm}/users?username={user}", "GET"
|
|
)
|
|
if s2 == 200 and isinstance(users, list) and users:
|
|
user_internal_id = users[0]["id"]
|
|
_kc_admin_call(
|
|
provider_domain,
|
|
token,
|
|
f"/realms/{realm}/users/{user_internal_id}/reset-password",
|
|
"PUT",
|
|
{"type": "password", "value": password, "temporary": False},
|
|
)
|
|
else:
|
|
raise RuntimeError(f"user {user} exists but couldn't be resolved")
|
|
elif status not in (201, 204):
|
|
raise RuntimeError(f"user create failed: HTTP {status}")
|
|
|
|
base = f"https://{provider_domain}/realms/{realm}/protocol/openid-connect"
|
|
return {
|
|
"provider": "keycloak",
|
|
"provider_domain": provider_domain,
|
|
"realm": realm,
|
|
"client_id": client_id,
|
|
"client_secret": client_secret,
|
|
"user": user,
|
|
"password": password,
|
|
"email": email,
|
|
"discovery_url": f"https://{provider_domain}/realms/{realm}/.well-known/openid-configuration",
|
|
"token_url": f"{base}/token",
|
|
"auth_url": f"{base}/auth",
|
|
"userinfo_url": f"{base}/userinfo",
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# OIDC flows
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def oidc_password_grant(creds: dict) -> str:
|
|
"""Exercise the OIDC password grant against the provider; return the access_token (a JWT).
|
|
Raises if the grant doesn't succeed.
|
|
|
|
Reusable by dependent recipes' SSO tests."""
|
|
data = urllib.parse.urlencode(
|
|
{
|
|
"grant_type": "password",
|
|
"client_id": creds["client_id"],
|
|
"client_secret": creds["client_secret"],
|
|
"username": creds["user"],
|
|
"password": creds["password"],
|
|
"scope": "openid email profile",
|
|
}
|
|
).encode()
|
|
req = urllib.request.Request(
|
|
creds["token_url"],
|
|
data=data,
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
method="POST",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
|
|
body = json.load(r)
|
|
except urllib.error.HTTPError as e:
|
|
try:
|
|
err = e.read().decode(errors="replace")[:200]
|
|
except Exception: # noqa: BLE001
|
|
err = ""
|
|
raise RuntimeError(f"password grant HTTP {e.code}: {err}") from e
|
|
access_token = body.get("access_token")
|
|
if not access_token:
|
|
raise RuntimeError(f"password grant returned no access_token: keys={list(body.keys())}")
|
|
return access_token
|
|
|
|
|
|
def assert_discovery_endpoint(creds: dict) -> dict:
|
|
"""GET the provider's OIDC discovery endpoint; assert it returns a well-formed JSON config with
|
|
the expected `issuer`. Returns the discovery JSON."""
|
|
req = urllib.request.Request(creds["discovery_url"], method="GET")
|
|
with urllib.request.urlopen(req, timeout=20, context=_CTX) as r:
|
|
body = json.load(r)
|
|
expected_issuer = f"https://{creds['provider_domain']}/realms/{creds['realm']}"
|
|
issuer = body.get("issuer", "")
|
|
if issuer != expected_issuer:
|
|
raise AssertionError(f"OIDC discovery issuer={issuer!r} != {expected_issuer!r}")
|
|
return body
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Persistence for cross-test creds (class-B per §4.4-B)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def admin_password_inside(provider_domain: str) -> str:
|
|
"""Read the abra-generated admin_password from inside the provider container.
|
|
Public re-export of the previously-private _kc_admin_password for the orchestrator wiring."""
|
|
return _kc_admin_password(provider_domain)
|
|
|
|
|
|
def write_sso_creds(creds: dict) -> None:
|
|
"""Persist creds to $CCCI_SSO_CREDS_FILE for the dependent recipe's tests to read. The file is
|
|
in /tmp (the runner's per-process tempdir) and deleted at run end alongside the deps file."""
|
|
path = os.environ.get("CCCI_SSO_CREDS_FILE")
|
|
if not path:
|
|
return
|
|
with contextlib.suppress(OSError), open(path, "w") as f:
|
|
json.dump(creds, f)
|
|
|
|
|
|
def load_sso_creds() -> dict | None:
|
|
"""Load the run-scoped SSO creds. Returns None if not present."""
|
|
path = os.environ.get("CCCI_SSO_CREDS_FILE")
|
|
if not path or not os.path.exists(path):
|
|
return None
|
|
try:
|
|
with open(path) as f:
|
|
return json.load(f)
|
|
except (OSError, ValueError):
|
|
return None
|