Push builds have been RED on the lint step since ~build 209 from accumulated formatting drift. This is the mechanical cleanup: ruff format + ruff --fix (UP038 isinstance unions, SIM105 contextlib.suppress, UP031 f-strings, SIM115 tempfile context manager), shfmt -i 2 -ci, nixpkgs-fmt/statix/deadnix (merged attrsets, dropped unused lib args), yamllint, and shell quoting fixes in tests/lasuite-docs/setup_custom_tests.sh. No behaviour changes intended; lint: PASS, unit tests: 138 passed.
170 lines
6.9 KiB
Python
170 lines
6.9 KiB
Python
"""keycloak — recipe-specific functional test (Phase 2 P3, ≥2 beyond parity).
|
|
|
|
The canonical "real app integrates with keycloak" flow: an admin creates a confidential client +
|
|
secret in the master realm via the admin API; the client uses the client_credentials grant to
|
|
obtain its own service-account token; the JWT's `azp` (authorized party) matches the new client's
|
|
clientId. Then idempotent cleanup deletes the client.
|
|
|
|
Non-vacuous: tests both the admin-API CRUD on clients AND the client_credentials grant — distinct
|
|
from the password-grant test which only exercises the admin user's own token. A keycloak that
|
|
returns 200 on /realms/master but has a broken admin API or signing failure for service accounts
|
|
would fail this test.
|
|
|
|
Reuses `tests/keycloak/kc_admin.py` for admin_password + admin_token; adds an inline minimal
|
|
admin-API client + token request — these are recipe-specific enough that they're not pulled into
|
|
the shared harness yet (when SSO-flow harness lands in Q2.3, the client-create + token-fetch will
|
|
move there).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import json
|
|
import os
|
|
import ssl
|
|
import sys
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
import uuid
|
|
|
|
# kc_admin.py lives in tests/keycloak/, one level up from this file in tests/keycloak/functional/
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "runner"))
|
|
import kc_admin # noqa: E402
|
|
|
|
_CTX = ssl.create_default_context()
|
|
_CTX.check_hostname = False
|
|
_CTX.verify_mode = ssl.CERT_NONE
|
|
|
|
|
|
def _admin_post(domain, token, path, body):
|
|
data = json.dumps(body).encode()
|
|
req = urllib.request.Request(
|
|
f"https://{domain}/admin{path}",
|
|
data=data,
|
|
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
|
|
return r.status, r.headers.get("Location", "")
|
|
except urllib.error.HTTPError as e:
|
|
return e.code, e.headers.get("Location", "") if e.headers else ""
|
|
|
|
|
|
def _admin_get_clients(domain, token, realm="master"):
|
|
"""List clients in a realm; returns the list (each item a dict with id + clientId)."""
|
|
req = urllib.request.Request(
|
|
f"https://{domain}/admin/realms/{realm}/clients",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
method="GET",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
|
|
return r.status, json.load(r)
|
|
except urllib.error.HTTPError as e:
|
|
return e.code, None
|
|
|
|
|
|
def _admin_delete(domain, token, path):
|
|
req = urllib.request.Request(
|
|
f"https://{domain}/admin{path}",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
method="DELETE",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
|
|
return r.status
|
|
except urllib.error.HTTPError as e:
|
|
return e.code
|
|
|
|
|
|
def _client_credentials_token(domain, client_id, client_secret, realm="master"):
|
|
data = urllib.parse.urlencode(
|
|
{
|
|
"grant_type": "client_credentials",
|
|
"client_id": client_id,
|
|
"client_secret": client_secret,
|
|
}
|
|
).encode()
|
|
req = urllib.request.Request(
|
|
f"https://{domain}/realms/{realm}/protocol/openid-connect/token",
|
|
data=data,
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
method="POST",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
|
|
return r.status, json.load(r)
|
|
except urllib.error.HTTPError as e:
|
|
try:
|
|
return e.code, json.loads(e.read().decode())
|
|
except Exception: # noqa: BLE001
|
|
return e.code, None
|
|
|
|
|
|
def _b64url_decode(seg: str) -> bytes:
|
|
pad = "=" * ((4 - len(seg) % 4) % 4)
|
|
return base64.urlsafe_b64decode(seg + pad)
|
|
|
|
|
|
def test_create_confidential_client_and_obtain_token(live_app):
|
|
"""Admin creates a confidential client; client obtains its own token via client_credentials."""
|
|
admin_token = kc_admin.admin_token(live_app, kc_admin.admin_password(live_app))
|
|
|
|
# Use a unique client id so concurrent or repeated runs don't collide. The client is per-run.
|
|
client_id = f"ccci-client-{uuid.uuid4().hex[:8]}"
|
|
client_secret = uuid.uuid4().hex
|
|
body = {
|
|
"clientId": client_id,
|
|
"enabled": True,
|
|
"secret": client_secret,
|
|
"publicClient": False, # confidential client
|
|
"serviceAccountsEnabled": True, # required for client_credentials grant
|
|
"standardFlowEnabled": False, # not needed for service-account-only client
|
|
"directAccessGrantsEnabled": False,
|
|
"protocol": "openid-connect",
|
|
}
|
|
|
|
cleanup_id = None
|
|
try:
|
|
status, location = _admin_post(live_app, admin_token, "/realms/master/clients", body)
|
|
assert status in (201, 409), f"create client returned HTTP {status} (expected 201)"
|
|
# On 201, Location header carries the new client's internal id; otherwise look it up.
|
|
if status == 201 and "/clients/" in (location or ""):
|
|
cleanup_id = location.rsplit("/clients/", 1)[1]
|
|
else:
|
|
# 409 collision (shouldn't happen with UUID) — find by clientId
|
|
_, clients = _admin_get_clients(live_app, admin_token)
|
|
cleanup_id = next(
|
|
(c["id"] for c in (clients or []) if c.get("clientId") == client_id), None
|
|
)
|
|
assert cleanup_id, "could not determine the new client's id"
|
|
|
|
# Use the client to obtain its own token (client_credentials grant)
|
|
tok_status, tok_resp = _client_credentials_token(live_app, client_id, client_secret)
|
|
assert (
|
|
tok_status == 200
|
|
), f"client_credentials token returned HTTP {tok_status}: {tok_resp!r}"
|
|
access_token = tok_resp.get("access_token") if isinstance(tok_resp, dict) else None
|
|
assert (
|
|
isinstance(access_token, str) and access_token.count(".") == 2
|
|
), f"client_credentials access_token not a JWT: {access_token!r}"
|
|
|
|
# Decode the JWT payload; assert azp matches the new client
|
|
payload = json.loads(_b64url_decode(access_token.split(".")[1]))
|
|
assert (
|
|
payload.get("azp") == client_id
|
|
), f"client_credentials JWT azp={payload.get('azp')!r} != client_id={client_id!r}"
|
|
# Service-account token does NOT carry a session-scoped user (azp + clientId differ from
|
|
# admin-cli token). The presence of azp + iss == per-run-domain proves the issuance flow.
|
|
expected_iss = f"https://{live_app}/realms/master"
|
|
assert (
|
|
payload.get("iss") == expected_iss
|
|
), f"JWT iss={payload.get('iss')!r} != {expected_iss!r}"
|
|
finally:
|
|
# Idempotent cleanup
|
|
if cleanup_id:
|
|
_admin_delete(live_app, admin_token, f"/realms/master/clients/{cleanup_id}")
|