Files
cc-ci/runner/harness/sso.py
autonomic-bot 1be74fb9e1
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone Build is passing
fix(lint): F821 undefined 'e' in test_scm_configured; shfmt/ruff auto-fixes
- test_scm_configured.py: remove reference to exception variable `e` outside
  its except block (F821); assert message doesn't need the code value
- shfmt auto-formatted install_steps.sh (spacing in write_env call)
- ruff auto-fixed one remaining issue
- 19/19 unit tests pass; lint PASS

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-11 22:17:19 +00:00

506 lines
20 KiB
Python

"""SSO-setup / OIDC-flow harness primitive (Phase 2 §4.2 / Q2.3).
Given a deployed SSO provider (keycloak today; authentik in a follow-up), this module:
1. Reads the provider's admin password (the abra-generated `admin_password` secret in the
container's `/run/secrets/admin_password`).
2. Authenticates as admin (admin-cli password grant).
3. Creates a realm/client/test-user idempotently with cc-ci-controlled identifiers.
4. Returns a `SsoCreds` dict the dependent recipe's tests can use:
- `provider`, `provider_domain`, `realm`, `client_id`, `client_secret`
- `user`, `password`, `email`
- `discovery_url`, `token_url`
5. Provides `oidc_password_grant(...)` that performs the OIDC password-grant flow against the
provider, returns the access_token (a JWT).
Reusable by every SSO-dependent recipe (cryptpad, lasuite-docs, lasuite-meet, immich, etc.). Per
plan §4.4-B, generated client_secret + test_password are class-B run-scoped secrets that are
destroyed when the run's apps are torn down (the SSO provider app is torn down with the rest).
"""
from __future__ import annotations
import contextlib
import json
import os
import re
import secrets
import ssl
import urllib.error
import urllib.parse
import urllib.request
from . import lifecycle
# A per-run realm is named "<parent_recipe>-<6hex>" (the parent's per-run domain hex). This regex
# extracts that trailing hex so reaping can map a realm to the live app stack it belongs to.
_REALM_HEX_RE = re.compile(r"-([0-9a-f]{6})$")
_CTX = ssl.create_default_context()
_CTX.check_hostname = False
_CTX.verify_mode = ssl.CERT_NONE
# ---------------------------------------------------------------------------
# Keycloak admin-API helpers (port + adaptation of tests/keycloak/kc_admin.py)
# ---------------------------------------------------------------------------
def _kc_admin_password(provider_domain: str) -> str:
"""Read the abra-generated admin_password from inside the running keycloak container."""
return lifecycle.exec_in_app(provider_domain, ["cat", "/run/secrets/admin_password"]).strip()
def _kc_admin_token(provider_domain: str, password: str) -> str:
data = urllib.parse.urlencode(
{
"grant_type": "password",
"client_id": "admin-cli",
"username": "admin",
"password": password,
}
).encode()
req = urllib.request.Request(
f"https://{provider_domain}/realms/master/protocol/openid-connect/token",
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
return json.load(r)["access_token"]
def _kc_admin_call(provider_domain: str, token: str, path: str, method: str = "GET", body=None):
"""Admin-API call. Returns (status, body_json_or_None, location_header)."""
data = json.dumps(body).encode() if body is not None else None
headers = {"Authorization": f"Bearer {token}"}
if data:
headers["Content-Type"] = "application/json"
req = urllib.request.Request(
f"https://{provider_domain}/admin{path}", data=data, headers=headers, method=method
)
try:
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
raw = r.read()
try:
parsed = json.loads(raw) if raw else None
except (json.JSONDecodeError, ValueError):
parsed = None
return r.status, parsed, r.headers.get("Location", "")
except urllib.error.HTTPError as e:
try:
raw = e.read()
parsed = json.loads(raw) if raw else None
except Exception: # noqa: BLE001
parsed = None
return e.code, parsed, e.headers.get("Location", "") if e.headers else ""
def setup_keycloak_realm(
provider_domain: str,
realm: str,
client_id: str,
redirect_uris: list[str] | None = None,
web_origins: list[str] | None = None,
) -> dict:
"""Create a realm + a confidential OIDC client + a test user idempotently. Returns an
`SsoCreds` dict with all the identifiers + generated secrets a dependent recipe needs.
- Generates a `client_secret` (32 hex chars) — run-scoped class-B per plan §4.4-B.
- Generates a test user `testuser` with a 25-char alphanumeric password.
- Returns urls (`discovery_url`, `token_url`) the dependent recipe can configure with.
"""
redirect_uris = redirect_uris or []
web_origins = web_origins or []
admin_pass = _kc_admin_password(provider_domain)
token = _kc_admin_token(provider_domain, admin_pass)
# 1) Realm
status, _, _ = _kc_admin_call(
provider_domain, token, "/realms", "POST", {"realm": realm, "enabled": True}
)
if status not in (201, 409):
raise RuntimeError(f"realm create failed: HTTP {status}")
# 2) Client (confidential, with secret we control)
client_secret = secrets.token_hex(16)
client_body = {
"clientId": client_id,
"enabled": True,
"secret": client_secret,
"publicClient": False,
"serviceAccountsEnabled": False,
"standardFlowEnabled": True,
"directAccessGrantsEnabled": True, # required for password grant
"redirectUris": redirect_uris or ["*"],
"webOrigins": web_origins or ["*"],
"protocol": "openid-connect",
}
status, _, location = _kc_admin_call(
provider_domain, token, f"/realms/{realm}/clients", "POST", client_body
)
if status == 409:
# Client already exists — find its internal id and update the secret to our known value.
s2, clients, _ = _kc_admin_call(
provider_domain, token, f"/realms/{realm}/clients?clientId={client_id}", "GET"
)
if s2 == 200 and isinstance(clients, list) and clients:
client_internal_id = clients[0]["id"]
# Update the secret via PUT /clients/{id} (Keycloak admin API)
_kc_admin_call(
provider_domain,
token,
f"/realms/{realm}/clients/{client_internal_id}",
"PUT",
{**client_body, "id": client_internal_id},
)
else:
raise RuntimeError(f"client {client_id} exists but couldn't be resolved")
elif status != 201:
raise RuntimeError(f"client create failed: HTTP {status}")
# 3) Test user (idempotent: create or skip)
user = "testuser"
email = f"{user}@example.test"
password = secrets.token_urlsafe(18)[:24] + "A1" # >= 8 chars, mixed alnum, defeats policies
user_body = {
"username": user,
"email": email,
"enabled": True,
"emailVerified": True,
"firstName": "Test",
"lastName": "User",
"credentials": [{"type": "password", "value": password, "temporary": False}],
}
status, _, location = _kc_admin_call(
provider_domain, token, f"/realms/{realm}/users", "POST", user_body
)
if status == 409:
# User already exists — reset their password to our known value
s2, users, _ = _kc_admin_call(
provider_domain, token, f"/realms/{realm}/users?username={user}", "GET"
)
if s2 == 200 and isinstance(users, list) and users:
user_internal_id = users[0]["id"]
_kc_admin_call(
provider_domain,
token,
f"/realms/{realm}/users/{user_internal_id}/reset-password",
"PUT",
{"type": "password", "value": password, "temporary": False},
)
else:
raise RuntimeError(f"user {user} exists but couldn't be resolved")
elif status not in (201, 204):
raise RuntimeError(f"user create failed: HTTP {status}")
base = f"https://{provider_domain}/realms/{realm}/protocol/openid-connect"
return {
"provider": "keycloak",
"provider_domain": provider_domain,
"realm": realm,
"client_id": client_id,
"client_secret": client_secret,
"user": user,
"password": password,
"email": email,
"discovery_url": f"https://{provider_domain}/realms/{realm}/.well-known/openid-configuration",
"token_url": f"{base}/token",
"auth_url": f"{base}/auth",
"userinfo_url": f"{base}/userinfo",
}
# ---------------------------------------------------------------------------
# Realm lifecycle on a shared (live-warm) keycloak (WC1)
# ---------------------------------------------------------------------------
#
# When keycloak is live-warm and shared, the per-run realm is the isolation unit: each dependent run
# creates a namespaced realm "<parent_recipe>-<6hex>" (setup_keycloak_realm) and deletes it at
# teardown (delete_keycloak_realm). Crashed/killed runs leave orphan realms behind; reap_orphaned_
# realms removes those whose hex no longer maps to a live app stack (concurrency-safe — a realm
# belonging to a still-running run keeps its app stack, so its hex stays in `live_hexes`).
def list_realms(provider_domain: str, admin_password: str | None = None) -> list[str]:
"""List realm names on the provider (admin API GET /admin/realms)."""
admin_password = admin_password or _kc_admin_password(provider_domain)
token = _kc_admin_token(provider_domain, admin_password)
status, body, _ = _kc_admin_call(provider_domain, token, "/realms", "GET")
if status != 200 or not isinstance(body, list):
raise RuntimeError(f"list realms failed: HTTP {status}")
return [r.get("realm", "") for r in body if r.get("realm")]
def delete_keycloak_realm(
provider_domain: str, realm: str, admin_password: str | None = None
) -> bool:
"""Delete a realm idempotently (admin API DELETE /admin/realms/{realm}). Returns True if the
realm was deleted (204) or already absent (404); raises on any other status. Never deletes
`master` (guard against a caller passing the wrong name)."""
if realm == "master":
raise ValueError("refusing to delete the keycloak master realm")
admin_password = admin_password or _kc_admin_password(provider_domain)
token = _kc_admin_token(provider_domain, admin_password)
status, _, _ = _kc_admin_call(provider_domain, token, f"/realms/{realm}", "DELETE")
if status in (204, 404):
return True
raise RuntimeError(f"delete realm {realm} failed: HTTP {status}")
def realms_to_reap(realm_names, live_hexes) -> list[str]:
"""PURE predicate (unit-tested): given all realm names on the provider and the set of 6hex
suffixes of currently-live app stacks, return the per-run realms to reap — those matching the
"-<6hex>" namespace whose hex is NOT live (orphans from crashed/killed runs). Never returns
`master` or realms that don't match the per-run pattern (e.g. an operator-created realm)."""
live = set(live_hexes or ())
out: list[str] = []
for name in realm_names or ():
if name == "master":
continue
m = _REALM_HEX_RE.search(name)
if m and m.group(1) not in live:
out.append(name)
return out
def reap_orphaned_realms(
provider_domain: str, live_hexes, admin_password: str | None = None
) -> list[str]:
"""Reap per-run realms left behind by crashed/killed dependent runs. `live_hexes` is the set of
6hex suffixes of currently-deployed app stacks (the caller derives these from docker). Returns
the list of realms actually deleted. Concurrency-safe: a realm whose hex maps to a live stack is
kept."""
admin_password = admin_password or _kc_admin_password(provider_domain)
names = list_realms(provider_domain, admin_password)
reaped: list[str] = []
for realm in realms_to_reap(names, live_hexes):
with contextlib.suppress(Exception):
delete_keycloak_realm(provider_domain, realm, admin_password)
reaped.append(realm)
return reaped
# ---------------------------------------------------------------------------
# OIDC flows
# ---------------------------------------------------------------------------
def oidc_password_grant(creds: dict) -> str:
"""Exercise the OIDC password grant against the provider; return the access_token (a JWT).
Raises if the grant doesn't succeed.
Reusable by dependent recipes' SSO tests."""
data = urllib.parse.urlencode(
{
"grant_type": "password",
"client_id": creds["client_id"],
"client_secret": creds["client_secret"],
"username": creds["user"],
"password": creds["password"],
"scope": "openid email profile",
}
).encode()
req = urllib.request.Request(
creds["token_url"],
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
body = json.load(r)
except urllib.error.HTTPError as e:
try:
err = e.read().decode(errors="replace")[:200]
except Exception: # noqa: BLE001
err = ""
raise RuntimeError(f"password grant HTTP {e.code}: {err}") from e
access_token = body.get("access_token")
if not access_token:
raise RuntimeError(f"password grant returned no access_token: keys={list(body.keys())}")
return access_token
def assert_discovery_endpoint(creds: dict) -> dict:
"""GET the provider's OIDC discovery endpoint; assert it returns a well-formed JSON config with
the expected `issuer`. Returns the discovery JSON."""
req = urllib.request.Request(creds["discovery_url"], method="GET")
with urllib.request.urlopen(req, timeout=20, context=_CTX) as r:
body = json.load(r)
expected_issuer = f"https://{creds['provider_domain']}/realms/{creds['realm']}"
issuer = body.get("issuer", "")
if issuer != expected_issuer:
raise AssertionError(f"OIDC discovery issuer={issuer!r} != {expected_issuer!r}")
return body
# ---------------------------------------------------------------------------
# Persistence for cross-test creds (class-B per §4.4-B)
# ---------------------------------------------------------------------------
def admin_password_inside(provider_domain: str) -> str:
"""Read the abra-generated admin_password from inside the provider container.
Public re-export of the previously-private _kc_admin_password for the orchestrator wiring."""
return _kc_admin_password(provider_domain)
def write_sso_creds(creds: dict) -> None:
"""Persist creds to $CCCI_SSO_CREDS_FILE for the dependent recipe's tests to read. The file is
in /tmp (the runner's per-process tempdir) and deleted at run end alongside the deps file."""
path = os.environ.get("CCCI_SSO_CREDS_FILE")
if not path:
return
with contextlib.suppress(OSError), open(path, "w") as f:
json.dump(creds, f)
def load_sso_creds() -> dict | None:
"""Load the run-scoped SSO creds. Returns None if not present."""
path = os.environ.get("CCCI_SSO_CREDS_FILE")
if not path or not os.path.exists(path):
return None
try:
with open(path) as f:
return json.load(f)
except (OSError, ValueError):
return None
# ---------------------------------------------------------------------------
# Gitea OAuth2 setup — drone dep provider (phase drone)
# ---------------------------------------------------------------------------
def _gitea_api(
provider_domain: str,
path: str,
method: str = "GET",
body=None,
*,
username: str,
password: str,
) -> tuple[int, object]:
"""Call the gitea REST API (basic-auth). Returns (status, body_json_or_None)."""
import base64
data = json.dumps(body).encode() if body is not None else None
auth = base64.b64encode(f"{username}:{password}".encode()).decode()
headers: dict[str, str] = {"Authorization": f"Basic {auth}"}
if data:
headers["Content-Type"] = "application/json"
req = urllib.request.Request(
f"https://{provider_domain}/api/v1{path}",
data=data,
headers=headers,
method=method,
)
try:
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
raw = r.read()
return r.status, (json.loads(raw) if raw else None)
except urllib.error.HTTPError as e:
raw = e.read()
try:
parsed = json.loads(raw) if raw else None
except (ValueError, json.JSONDecodeError):
parsed = None
return e.code, parsed
def setup_gitea_oauth(provider_domain: str, parent_domain: str) -> dict:
"""Create a gitea admin user + OAuth2 application for a drone dep.
Steps:
1. Create admin user via `gitea admin user create` CLI inside the container.
2. Create an OAuth2 app via the gitea REST API (basic auth as the new admin).
3. Return a creds dict: {admin_user, admin_password, client_id, client_secret}.
The caller (orchestrator) stores creds in $CCCI_DEPS_FILE so drone's install_steps.sh
can wire DRONE_GITEA_CLIENT_ID + the client_secret Docker secret before the first deploy.
Per plan §4.4-B, the client_secret is class-B run-scoped and destroyed on teardown.
"""
admin_user = "ci_admin"
# 32-char alphanumeric password — safe to pass as a CLI arg (no shell metacharacters)
admin_password = secrets.token_hex(16)
admin_email = "ci@ci.local"
# 1. Create admin user via gitea CLI inside the running container.
# The rootless gitea image has GITEA_WORK_DIR + GITEA_CUSTOM set as ENV; docker exec
# inherits those (image ENV is part of the container config). The gitea binary is in PATH.
print(f" gitea dep: creating admin user {admin_user!r} on {provider_domain}", flush=True)
try:
out = lifecycle.exec_in_app(
provider_domain,
[
"gitea",
"admin",
"user",
"create",
"--admin",
"--username",
admin_user,
"--password",
admin_password,
"--email",
admin_email,
"--must-change-password=false", # equals-form required; gitea BoolFlag default=true
],
timeout=120,
)
print(f" gitea dep: admin user created: {out.strip()[:80]}", flush=True)
except RuntimeError as e:
msg = str(e)
if "already exists" in msg.lower() or "user already exists" in msg.lower():
# Stale volume from a prior run — reset the password to the newly-generated one
# so the API call below can authenticate. In production CI, teardown_deps removes
# volumes so this branch is only hit in re-runs against a stale volume.
print(f" gitea dep: {admin_user!r} already exists — resetting password", flush=True)
lifecycle.exec_in_app(
provider_domain,
[
"gitea",
"admin",
"user",
"change-password",
"--username",
admin_user,
"--password",
admin_password,
],
timeout=60,
)
else:
raise
# 2. Create OAuth2 application via gitea API.
oauth_app_name = f"drone-{parent_domain[:8]}"
redirect_uri = f"https://{parent_domain}/login"
status, resp = _gitea_api(
provider_domain,
"/user/applications/oauth2",
method="POST",
body={
"name": oauth_app_name,
"redirect_uris": [redirect_uri],
"confidential_client": True,
},
username=admin_user,
password=admin_password,
)
if status not in (201, 200):
raise RuntimeError(f"gitea OAuth2 app create failed: HTTP {status}{resp!r}")
client_id = resp["client_id"]
client_secret = resp["client_secret"]
print(
f" gitea dep: OAuth2 app {oauth_app_name!r} created (client_id={client_id})",
flush=True,
)
return {
"admin_user": admin_user,
"admin_password": admin_password,
"client_id": client_id,
"client_secret": client_secret,
}