Files
cc-ci/tests/keycloak/functional/test_create_client_and_use.py
autonomic-bot d5f5e86c7b feat(2): Q2.1 — keycloak Phase-2 parity + functional (full e2e green)
- tests/keycloak/PARITY.md: parity table (health_check ported); oidc_integration.py
  noted as Q3-deferred (cross-recipe test needs lasuite-docs + dep resolver).
- tests/keycloak/functional/test_health_check.py: parity port of
  recipe-info/keycloak/tests/health_check.py — SOURCE comment.
- tests/keycloak/functional/test_password_grant_token.py: NEW recipe-specific —
  password grant against /realms/master/protocol/openid-connect/token; decodes
  the JWT payload; asserts iss=https://<live_app>/realms/master, azp=admin-cli,
  typ=Bearer, exp in future, iat reasonable past. Reuses kc_admin.py helpers.
- tests/keycloak/functional/test_create_client_and_use.py: NEW recipe-specific —
  admin creates a UUID-named confidential client via admin API → uses client
  credentials grant to obtain a service-account token → decodes JWT, asserts azp
  matches the new clientId, iss matches per-run domain → idempotent DELETE cleanup.
- tests/keycloak/recipe_meta.py: bumped DEPLOY_TIMEOUT + HTTP_TIMEOUT 600 -> 900
  (cold-start JVM + mariadb migration intermittently exceeds 600s on a 2-vCPU host;
  observed 502 fallback after 600s in run #1).

Cold-verifiable on cc-ci (log /root/ccci-q2-keycloak-r3.log):
  RECIPE=keycloak cc-ci-run runner/run_recipe_ci.py
  all 5 stages PASS, deploy-count=1, head_ref=666649a6==chaos-version=666649a6
  (HC1 non-vacuous), version 10.7.0+26.6.1 -> 10.7.1+26.6.2.
  Custom tier 3 PASS: parity health_check, JWT password-grant, client_credentials.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 07:34:14 +01:00

170 lines
6.9 KiB
Python

"""keycloak — recipe-specific functional test (Phase 2 P3, ≥2 beyond parity).
The canonical "real app integrates with keycloak" flow: an admin creates a confidential client +
secret in the master realm via the admin API; the client uses the client_credentials grant to
obtain its own service-account token; the JWT's `azp` (authorized party) matches the new client's
clientId. Then idempotent cleanup deletes the client.
Non-vacuous: tests both the admin-API CRUD on clients AND the client_credentials grant — distinct
from the password-grant test which only exercises the admin user's own token. A keycloak that
returns 200 on /realms/master but has a broken admin API or signing failure for service accounts
would fail this test.
Reuses `tests/keycloak/kc_admin.py` for admin_password + admin_token; adds an inline minimal
admin-API client + token request — these are recipe-specific enough that they're not pulled into
the shared harness yet (when SSO-flow harness lands in Q2.3, the client-create + token-fetch will
move there).
"""
from __future__ import annotations
import base64
import json
import os
import ssl
import sys
import urllib.error
import urllib.parse
import urllib.request
import uuid
# kc_admin.py lives in tests/keycloak/, one level up from this file in tests/keycloak/functional/
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "runner"))
import kc_admin # noqa: E402
_CTX = ssl.create_default_context()
_CTX.check_hostname = False
_CTX.verify_mode = ssl.CERT_NONE
def _admin_post(domain, token, path, body):
data = json.dumps(body).encode()
req = urllib.request.Request(
f"https://{domain}/admin{path}",
data=data,
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
return r.status, r.headers.get("Location", "")
except urllib.error.HTTPError as e:
return e.code, e.headers.get("Location", "") if e.headers else ""
def _admin_get_clients(domain, token, realm="master"):
"""List clients in a realm; returns the list (each item a dict with id + clientId)."""
req = urllib.request.Request(
f"https://{domain}/admin/realms/{realm}/clients",
headers={"Authorization": f"Bearer {token}"},
method="GET",
)
try:
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
return r.status, json.load(r)
except urllib.error.HTTPError as e:
return e.code, None
def _admin_delete(domain, token, path):
req = urllib.request.Request(
f"https://{domain}/admin{path}",
headers={"Authorization": f"Bearer {token}"},
method="DELETE",
)
try:
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
return r.status
except urllib.error.HTTPError as e:
return e.code
def _client_credentials_token(domain, client_id, client_secret, realm="master"):
data = urllib.parse.urlencode(
{
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
}
).encode()
req = urllib.request.Request(
f"https://{domain}/realms/{realm}/protocol/openid-connect/token",
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
return r.status, json.load(r)
except urllib.error.HTTPError as e:
try:
return e.code, json.loads(e.read().decode())
except Exception: # noqa: BLE001
return e.code, None
def _b64url_decode(seg: str) -> bytes:
pad = "=" * ((4 - len(seg) % 4) % 4)
return base64.urlsafe_b64decode(seg + pad)
def test_create_confidential_client_and_obtain_token(live_app):
"""Admin creates a confidential client; client obtains its own token via client_credentials."""
admin_token = kc_admin.admin_token(live_app, kc_admin.admin_password(live_app))
# Use a unique client id so concurrent or repeated runs don't collide. The client is per-run.
client_id = f"ccci-client-{uuid.uuid4().hex[:8]}"
client_secret = uuid.uuid4().hex
body = {
"clientId": client_id,
"enabled": True,
"secret": client_secret,
"publicClient": False, # confidential client
"serviceAccountsEnabled": True, # required for client_credentials grant
"standardFlowEnabled": False, # not needed for service-account-only client
"directAccessGrantsEnabled": False,
"protocol": "openid-connect",
}
cleanup_id = None
try:
status, location = _admin_post(live_app, admin_token, "/realms/master/clients", body)
assert status in (201, 409), f"create client returned HTTP {status} (expected 201)"
# On 201, Location header carries the new client's internal id; otherwise look it up.
if status == 201 and "/clients/" in (location or ""):
cleanup_id = location.rsplit("/clients/", 1)[1]
else:
# 409 collision (shouldn't happen with UUID) — find by clientId
_, clients = _admin_get_clients(live_app, admin_token)
cleanup_id = next(
(c["id"] for c in (clients or []) if c.get("clientId") == client_id), None
)
assert cleanup_id, "could not determine the new client's id"
# Use the client to obtain its own token (client_credentials grant)
tok_status, tok_resp = _client_credentials_token(live_app, client_id, client_secret)
assert tok_status == 200, (
f"client_credentials token returned HTTP {tok_status}: {tok_resp!r}"
)
access_token = tok_resp.get("access_token") if isinstance(tok_resp, dict) else None
assert isinstance(access_token, str) and access_token.count(".") == 2, (
f"client_credentials access_token not a JWT: {access_token!r}"
)
# Decode the JWT payload; assert azp matches the new client
payload = json.loads(_b64url_decode(access_token.split(".")[1]))
assert payload.get("azp") == client_id, (
f"client_credentials JWT azp={payload.get('azp')!r} != client_id={client_id!r}"
)
# Service-account token does NOT carry a session-scoped user (azp + clientId differ from
# admin-cli token). The presence of azp + iss == per-run-domain proves the issuance flow.
expected_iss = f"https://{live_app}/realms/master"
assert payload.get("iss") == expected_iss, (
f"JWT iss={payload.get('iss')!r} != {expected_iss!r}"
)
finally:
# Idempotent cleanup
if cleanup_id:
_admin_delete(live_app, admin_token, f"/realms/master/clients/{cleanup_id}")