- runner/harness/warm.py: stable-domain scheme (warm-<recipe>), is_warm_up probe, live_app_hexes scan, per-run realm_for naming, reap_orphan_realms. - run_recipe_ci.py: split declared deps into live-warm (shared provider + per-run realm, no deploy, realm deleted at teardown) vs cold (co-deploy). Warm path used only when provider is up; cold fallback otherwise. Reap orphan realms at run start (concurrency-safe). deploy-count excludes warm deps. Realm naming now per-run namespaced (<parent>-<6hex>). - dependent tests assert the namespaced realm pattern (stronger than ==parent). Live proof on warm keycloak: realm create -> password-grant JWT -> discovery issuer -> delete(idempotent) -> reap(keeps live hex, deletes orphan): PASS. 43 unit pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
93 lines
3.9 KiB
Python
93 lines
3.9 KiB
Python
"""lasuite-drive — Q3.2 SSO-flow test (operator-2026-05-28 SSO-dep plan).
|
|
|
|
Drive (La Suite Drive) is OIDC-required: login is gated by an external OpenID Connect provider.
|
|
Mirrors the proven lasuite-docs SSO model:
|
|
- The orchestrator deploys a per-run keycloak dep AFTER the generic tiers and provisions a fresh
|
|
realm/client/user via `harness.sso.setup_keycloak_realm`; `setup_custom_tests.sh` then wires the
|
|
OIDC env + client secret into the running drive app and redeploys. Creds land in `$CCCI_DEPS_FILE`
|
|
(read here via the `deps_creds` fixture).
|
|
- This test consumes those creds and exercises the real OIDC flow against the dep keycloak: discovery
|
|
endpoint advertises the realm, and a password grant yields a valid JWT with the expected claims.
|
|
- Marked `@pytest.mark.requires_deps` so if setup_custom_tests failed the test SKIPs with a clear
|
|
`deps-not-ready` reason — and (per F2-11) the orchestrator then fails the run rather than going
|
|
green on a skipped SSO test.
|
|
|
|
SOURCE: adapted from tests/lasuite-docs/functional/test_oidc_with_keycloak.py (Q2.4 acceptance).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
|
|
import pytest
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "runner"))
|
|
from harness import sso # noqa: E402
|
|
|
|
|
|
def _b64url_decode(seg: str) -> bytes:
|
|
pad = "=" * ((4 - len(seg) % 4) % 4)
|
|
return base64.urlsafe_b64decode(seg + pad)
|
|
|
|
|
|
@pytest.mark.requires_deps
|
|
def test_oidc_password_grant_against_dep_keycloak(live_app, deps_creds):
|
|
"""The dep keycloak issues a JWT for the pre-provisioned test user via OIDC password grant."""
|
|
assert "keycloak" in deps_creds, (
|
|
f"keycloak creds not in deps_creds; got {list(deps_creds.keys())}. "
|
|
"setup_custom_tests should have populated this."
|
|
)
|
|
kc = deps_creds["keycloak"]
|
|
|
|
# Creds shape. WC1: realm is per-run namespaced "<parent>-<6hex>"; client_id stays the parent.
|
|
assert kc["domain"]
|
|
assert re.fullmatch(r"lasuite-drive-[0-9a-f]{6}", kc["realm"]), (
|
|
f"realm {kc['realm']!r} not the per-run namespaced form lasuite-drive-<6hex>"
|
|
)
|
|
assert kc["client_id"] == "lasuite-drive"
|
|
assert isinstance(kc["client_secret"], str) and len(kc["client_secret"]) >= 16
|
|
assert isinstance(kc["password"], str) and len(kc["password"]) >= 16
|
|
|
|
creds = {
|
|
"provider": "keycloak",
|
|
"provider_domain": kc["domain"],
|
|
"realm": kc["realm"],
|
|
"client_id": kc["client_id"],
|
|
"client_secret": kc["client_secret"],
|
|
"user": kc["user"],
|
|
"password": kc["password"],
|
|
"email": kc["email"],
|
|
"discovery_url": kc["discovery_url"],
|
|
"token_url": kc["token_url"],
|
|
"auth_url": kc["auth_url"],
|
|
"userinfo_url": kc["userinfo_url"],
|
|
}
|
|
|
|
# OIDC discovery endpoint advertises the realm
|
|
discovery = sso.assert_discovery_endpoint(creds)
|
|
expected_iss = f"https://{kc['domain']}/realms/{kc['realm']}"
|
|
assert discovery.get("issuer") == expected_iss
|
|
assert discovery.get("token_endpoint", "").startswith(expected_iss + "/")
|
|
assert discovery.get("authorization_endpoint", "").startswith(expected_iss + "/")
|
|
|
|
# Password grant → real JWT
|
|
token = sso.oidc_password_grant(creds)
|
|
assert isinstance(token, str) and token.count(".") == 2, (
|
|
f"access_token is not a JWT: {token!r}"
|
|
)
|
|
payload = json.loads(_b64url_decode(token.split(".")[1]))
|
|
assert payload.get("iss") == expected_iss, f"JWT iss={payload.get('iss')!r} != {expected_iss!r}"
|
|
assert payload.get("azp") == kc["client_id"], (
|
|
f"JWT azp={payload.get('azp')!r} != {kc['client_id']!r}"
|
|
)
|
|
assert payload.get("typ") == "Bearer", f"JWT typ={payload.get('typ')!r} != 'Bearer'"
|
|
exp = payload.get("exp")
|
|
assert isinstance(exp, int) and exp > time.time(), (
|
|
f"JWT exp={exp!r} not a future timestamp (now={time.time():.0f})"
|
|
)
|