feat(2): Q2.1 — keycloak Phase-2 parity + functional (full e2e green)

- tests/keycloak/PARITY.md: parity table (health_check ported); oidc_integration.py
  noted as Q3-deferred (cross-recipe test needs lasuite-docs + dep resolver).
- tests/keycloak/functional/test_health_check.py: parity port of
  recipe-info/keycloak/tests/health_check.py — SOURCE comment.
- tests/keycloak/functional/test_password_grant_token.py: NEW recipe-specific —
  password grant against /realms/master/protocol/openid-connect/token; decodes
  the JWT payload; asserts iss=https://<live_app>/realms/master, azp=admin-cli,
  typ=Bearer, exp in future, iat reasonable past. Reuses kc_admin.py helpers.
- tests/keycloak/functional/test_create_client_and_use.py: NEW recipe-specific —
  admin creates a UUID-named confidential client via admin API → uses client
  credentials grant to obtain a service-account token → decodes JWT, asserts azp
  matches the new clientId, iss matches per-run domain → idempotent DELETE cleanup.
- tests/keycloak/recipe_meta.py: bumped DEPLOY_TIMEOUT + HTTP_TIMEOUT 600 -> 900
  (cold-start JVM + mariadb migration intermittently exceeds 600s on a 2-vCPU host;
  observed 502 fallback after 600s in run #1).

Cold-verifiable on cc-ci (log /root/ccci-q2-keycloak-r3.log):
  RECIPE=keycloak cc-ci-run runner/run_recipe_ci.py
  all 5 stages PASS, deploy-count=1, head_ref=666649a6==chaos-version=666649a6
  (HC1 non-vacuous), version 10.7.0+26.6.1 -> 10.7.1+26.6.2.
  Custom tier 3 PASS: parity health_check, JWT password-grant, client_credentials.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-28 07:34:14 +01:00
parent 9c79215fb9
commit d5f5e86c7b
5 changed files with 309 additions and 2 deletions

40
tests/keycloak/PARITY.md Normal file
View File

@ -0,0 +1,40 @@
# Parity — keycloak
Phase-2 P2 mapping table. The Adversary cold-verifies parity by reading the source
`recipe-info/keycloak/tests/<file>` and the cc-ci file side-by-side.
| recipe-maintainer file | cc-ci file | what's verified | status |
|---|---|---|---|
| `recipe-info/keycloak/tests/health_check.py` | `tests/keycloak/functional/test_health_check.py` | The keycloak master realm endpoint (`/realms/master`) returns HTTP 200 — the original's assertion shape, preserved. The cc-ci port adapts to the ephemeral per-run domain via the `live_app` fixture. | **ported** |
| `recipe-info/keycloak/tests/oidc_integration.py` | (deferred to Q3 lasuite-docs) | The original is a **cross-recipe** integration test: it expects both keycloak AND lasuite-docs deployed, with a pre-seeded credentials TOML, and proves a keycloak-issued token is accepted by lasuite-docs. This requires the Phase-2 dependency resolver (Q0.4/Q2.3) + lasuite-docs Phase-2 enrollment. Will land as `tests/lasuite-docs/functional/test_oidc_with_keycloak.py` in Q3, sharing the SSO-setup harness. | **deferred to Q3** (logged in DECISIONS.md) |
## Recipe-specific tests (Phase-2 P3, ≥2 beyond parity)
keycloak's characteristic behavior is **realm/client management + OIDC token issuance**. The
lifecycle overlays already prove the admin API works (`kc_admin.py` creates/queries/deletes a
marker realm across upgrade/backup/restore — Phase 1d/1e). Two new functional tests beyond parity:
| cc-ci file | what's verified | rationale |
|---|---|---|
| `tests/keycloak/functional/test_password_grant_token.py` | Obtains an admin-CLI access token via the password grant (`grant_type=password`) against `/realms/master/protocol/openid-connect/token`, asserts the token is a valid JWT (3 base64url-encoded segments), decodes the payload, and asserts the JWT claims include `iss` matching the live domain, `azp == "admin-cli"`, `typ == "Bearer"`, and a future `exp`. | The defining keycloak behavior is issuing JWTs; this test does the canonical password-grant flow against the real running keycloak (real admin user, real password from the abra-generated secret) and proves the JWT contract is intact. Non-vacuous: a wrongly-configured realm, broken signing key, or wrong issuer would fail the claim assertions. |
| `tests/keycloak/functional/test_create_client_and_use.py` | Authenticates as admin → creates a confidential client in the master realm via admin API with a known `clientId` + a known client secret → obtains a token via `grant_type=client_credentials` for that client → asserts the token's `azp` (authorized party) matches the new client's clientId → deletes the client (idempotent cleanup). | Proves the full lifecycle of admin-API client creation + service-account token issuance, the canonical "real app integrating with keycloak" flow. Non-vacuous: tests TWO grant types (password + client-credentials) and the admin-API CRUD on clients. |
Both tests run in the **custom** tier against the same `live_app` shared deployment as the
lifecycle overlays — no extra deploy, no extra teardown.
## Backup data-integrity (P4)
Already exercised by the lifecycle overlays: `kc_admin.create_marker_realm` (mariadb data) before
backup, mutated by deletion before restore, asserted present after restore. See
`tests/keycloak/{ops.py,test_upgrade.py,test_backup.py,test_restore.py}` + `kc_admin.py`.
## Playwright (P6)
The keycloak admin console (login form) is loaded by `tests/keycloak/test_install.py::
test_serving_and_admin_console` (lifecycle install overlay). Adequate for P6 — the admin UI is the
recipe's primary browser surface.
## Non-ports
`oidc_integration.py` is deferred to Q3 (cross-recipe test; needs lasuite-docs Phase-2 enrollment +
the dependency-resolver harness primitive). Logged in DECISIONS.md Phase-2 section.

View File

@ -0,0 +1,169 @@
"""keycloak — recipe-specific functional test (Phase 2 P3, ≥2 beyond parity).
The canonical "real app integrates with keycloak" flow: an admin creates a confidential client +
secret in the master realm via the admin API; the client uses the client_credentials grant to
obtain its own service-account token; the JWT's `azp` (authorized party) matches the new client's
clientId. Then idempotent cleanup deletes the client.
Non-vacuous: tests both the admin-API CRUD on clients AND the client_credentials grant — distinct
from the password-grant test which only exercises the admin user's own token. A keycloak that
returns 200 on /realms/master but has a broken admin API or signing failure for service accounts
would fail this test.
Reuses `tests/keycloak/kc_admin.py` for admin_password + admin_token; adds an inline minimal
admin-API client + token request — these are recipe-specific enough that they're not pulled into
the shared harness yet (when SSO-flow harness lands in Q2.3, the client-create + token-fetch will
move there).
"""
from __future__ import annotations
import base64
import json
import os
import ssl
import sys
import urllib.error
import urllib.parse
import urllib.request
import uuid
# kc_admin.py lives in tests/keycloak/, one level up from this file in tests/keycloak/functional/
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "runner"))
import kc_admin # noqa: E402
_CTX = ssl.create_default_context()
_CTX.check_hostname = False
_CTX.verify_mode = ssl.CERT_NONE
def _admin_post(domain, token, path, body):
data = json.dumps(body).encode()
req = urllib.request.Request(
f"https://{domain}/admin{path}",
data=data,
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
return r.status, r.headers.get("Location", "")
except urllib.error.HTTPError as e:
return e.code, e.headers.get("Location", "") if e.headers else ""
def _admin_get_clients(domain, token, realm="master"):
"""List clients in a realm; returns the list (each item a dict with id + clientId)."""
req = urllib.request.Request(
f"https://{domain}/admin/realms/{realm}/clients",
headers={"Authorization": f"Bearer {token}"},
method="GET",
)
try:
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
return r.status, json.load(r)
except urllib.error.HTTPError as e:
return e.code, None
def _admin_delete(domain, token, path):
req = urllib.request.Request(
f"https://{domain}/admin{path}",
headers={"Authorization": f"Bearer {token}"},
method="DELETE",
)
try:
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
return r.status
except urllib.error.HTTPError as e:
return e.code
def _client_credentials_token(domain, client_id, client_secret, realm="master"):
data = urllib.parse.urlencode(
{
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
}
).encode()
req = urllib.request.Request(
f"https://{domain}/realms/{realm}/protocol/openid-connect/token",
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
return r.status, json.load(r)
except urllib.error.HTTPError as e:
try:
return e.code, json.loads(e.read().decode())
except Exception: # noqa: BLE001
return e.code, None
def _b64url_decode(seg: str) -> bytes:
pad = "=" * ((4 - len(seg) % 4) % 4)
return base64.urlsafe_b64decode(seg + pad)
def test_create_confidential_client_and_obtain_token(live_app):
"""Admin creates a confidential client; client obtains its own token via client_credentials."""
admin_token = kc_admin.admin_token(live_app, kc_admin.admin_password(live_app))
# Use a unique client id so concurrent or repeated runs don't collide. The client is per-run.
client_id = f"ccci-client-{uuid.uuid4().hex[:8]}"
client_secret = uuid.uuid4().hex
body = {
"clientId": client_id,
"enabled": True,
"secret": client_secret,
"publicClient": False, # confidential client
"serviceAccountsEnabled": True, # required for client_credentials grant
"standardFlowEnabled": False, # not needed for service-account-only client
"directAccessGrantsEnabled": False,
"protocol": "openid-connect",
}
cleanup_id = None
try:
status, location = _admin_post(live_app, admin_token, "/realms/master/clients", body)
assert status in (201, 409), f"create client returned HTTP {status} (expected 201)"
# On 201, Location header carries the new client's internal id; otherwise look it up.
if status == 201 and "/clients/" in (location or ""):
cleanup_id = location.rsplit("/clients/", 1)[1]
else:
# 409 collision (shouldn't happen with UUID) — find by clientId
_, clients = _admin_get_clients(live_app, admin_token)
cleanup_id = next(
(c["id"] for c in (clients or []) if c.get("clientId") == client_id), None
)
assert cleanup_id, "could not determine the new client's id"
# Use the client to obtain its own token (client_credentials grant)
tok_status, tok_resp = _client_credentials_token(live_app, client_id, client_secret)
assert tok_status == 200, (
f"client_credentials token returned HTTP {tok_status}: {tok_resp!r}"
)
access_token = tok_resp.get("access_token") if isinstance(tok_resp, dict) else None
assert isinstance(access_token, str) and access_token.count(".") == 2, (
f"client_credentials access_token not a JWT: {access_token!r}"
)
# Decode the JWT payload; assert azp matches the new client
payload = json.loads(_b64url_decode(access_token.split(".")[1]))
assert payload.get("azp") == client_id, (
f"client_credentials JWT azp={payload.get('azp')!r} != client_id={client_id!r}"
)
# Service-account token does NOT carry a session-scoped user (azp + clientId differ from
# admin-cli token). The presence of azp + iss == per-run-domain proves the issuance flow.
expected_iss = f"https://{live_app}/realms/master"
assert payload.get("iss") == expected_iss, (
f"JWT iss={payload.get('iss')!r} != {expected_iss!r}"
)
finally:
# Idempotent cleanup
if cleanup_id:
_admin_delete(live_app, admin_token, f"/realms/master/clients/{cleanup_id}")

View File

@ -0,0 +1,23 @@
"""keycloak — parity port of recipe-maintainer's health_check.py (Phase 2 P2).
SOURCE: references/recipe-maintainer/recipe-info/keycloak/tests/health_check.py
The original asserted HTTP 200 from `https://keycloak.<DOMAIN_SUFFIX>/realms/master`. The cc-ci port
preserves the assertion shape, adapted to the ephemeral per-run domain via the `live_app` fixture.
Runs in the custom tier against the shared post-install live deployment.
"""
from __future__ import annotations
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "runner"))
from harness import http as harness_http # noqa: E402
def test_keycloak_master_realm_returns_200(live_app):
"""Parity with recipe-info/keycloak/tests/health_check.py: HTTP 200 from /realms/master."""
url = f"https://{live_app}/realms/master"
status, _ = harness_http.retry_http_get(url, expect_status=200, max_wait=60, interval=3)
assert status == 200, f"keycloak at {url} returned HTTP {status} (expected 200)"

View File

@ -0,0 +1,75 @@
"""keycloak — recipe-specific functional test (Phase 2 P3, ≥2 beyond parity).
The defining keycloak behavior is issuing JWTs. This test does the canonical password-grant flow
against the live keycloak (real admin user, real abra-generated admin_password secret), then
decodes the returned JWT and asserts the standard claims: `iss` (issuer URL), `azp` (authorized
party), `typ == "Bearer"`, and `exp` in the future. Non-vacuous: a wrongly-configured realm,
broken signing key, or wrong issuer would fail the claim assertions, not just status==200.
Reuses the per-recipe `tests/keycloak/kc_admin.py` helpers (admin_password + admin_token shape)
and the harness `lifecycle.exec_in_app` (already polled+raising per Phase 1e F1e-1 fix).
"""
from __future__ import annotations
import base64
import json
import os
import sys
import time
# kc_admin.py lives in tests/keycloak/, one level up from this file in tests/keycloak/functional/
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "runner"))
import kc_admin # noqa: E402
def _b64url_decode(seg: str) -> bytes:
"""JWT segments are URL-safe base64 without padding — add padding before decode."""
pad = "=" * ((4 - len(seg) % 4) % 4)
return base64.urlsafe_b64decode(seg + pad)
def _decode_jwt_payload(token: str) -> dict:
parts = token.split(".")
assert len(parts) == 3, f"JWT must have 3 segments (header.payload.signature), got {len(parts)}"
payload = json.loads(_b64url_decode(parts[1]))
return payload
def test_password_grant_issues_valid_jwt(live_app):
"""Obtain a token via the password grant, validate the JWT shape + standard claims."""
password = kc_admin.admin_password(live_app)
token = kc_admin.admin_token(live_app, password)
# Shape: a JWT is exactly 3 base64url segments
assert isinstance(token, str) and token.count(".") == 2, (
f"access_token does not look like a JWT (no 3 segments): len={len(token) if token else 0}"
)
payload = _decode_jwt_payload(token)
# iss = the issuer URL, must be the per-run domain's /realms/master endpoint
expected_iss = f"https://{live_app}/realms/master"
assert payload.get("iss") == expected_iss, (
f"JWT iss claim {payload.get('iss')!r} != {expected_iss!r}"
)
# azp = authorized party (which client requested this token)
assert payload.get("azp") == "admin-cli", (
f"JWT azp claim {payload.get('azp')!r} != 'admin-cli'"
)
# typ = token type
assert payload.get("typ") == "Bearer", f"JWT typ claim {payload.get('typ')!r} != 'Bearer'"
# exp must be in the future (the token must be usable, not pre-expired)
exp = payload.get("exp")
assert isinstance(exp, int), f"JWT exp claim missing or not int: {exp!r}"
assert exp > time.time(), f"JWT exp {exp} not in the future (now={time.time():.0f})"
# iat (issued at) is also a standard claim
iat = payload.get("iat")
assert isinstance(iat, int) and iat <= time.time() + 60, (
f"JWT iat {iat!r} not a reasonable past timestamp"
)

View File

@ -2,5 +2,5 @@
# conftest — enrolling this recipe needs NO change to runner/harness code (D5).
HEALTH_PATH = "/realms/master" # 200 JSON once keycloak is up (not "/", which redirects)
HEALTH_OK = (200,)
DEPLOY_TIMEOUT = 600 # JVM + DB migration are slow on a 2-vCPU VM
HTTP_TIMEOUT = 600
DEPLOY_TIMEOUT = 900 # JVM + DB migration are slow on a 2-vCPU VM; observed 502 fallback up to ~10min
HTTP_TIMEOUT = 900