#!/usr/bin/env python3 """ADVERSARY check4 (WC1 concurrency + reaping, deploy-free). Cold-run from my own clone. Asserts: realm_for distinct per run-hex; realms create on live warm kc + oidc_password_grant returns a JWT each; reap_orphaned_realms keeps the live hex and deletes the orphans. Leaves kc clean.""" import sys, os sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from harness import warm, sso D = warm.warm_domain("keycloak") assert D == "warm-keycloak.ci.commoninternet.net", D fails = [] # 1) realm_for distinct per run-hex (two concurrent same-recipe runs never collide) r_a = warm.realm_for("lasuite-docs", "lasu-aaa111.ci.commoninternet.net") r_b = warm.realm_for("lasuite-docs", "lasu-bbb222.ci.commoninternet.net") print(f"realm_for aaa111={r_a!r} bbb222={r_b!r}") if r_a != "lasuite-docs-aaa111": fails.append(f"realm_for aaa111 -> {r_a}") if r_b != "lasuite-docs-bbb222": fails.append(f"realm_for bbb222 -> {r_b}") if r_a == r_b: fails.append("realm_for collision") admin = sso.admin_password_inside(D) before = sorted(sso.list_realms(D, admin)) print(f"realms BEFORE: {before}") # 2) create three realms; each must yield a working password-grant JWT hexes = ["aaa111", "bbb222", "ccc333"] created = [] for h in hexes: realm = f"advchk-{h}" creds = sso.setup_keycloak_realm(D, realm, f"client-{h}", redirect_uris=["*"], web_origins=["*"]) created.append(realm) tok = sso.oidc_password_grant(creds) ok = isinstance(tok, str) and tok.count(".") == 2 and len(tok) > 40 print(f" {realm}: JWT={'OK' if ok else 'BAD'} (len={len(tok)}, dots={tok.count('.')})") if not ok: fails.append(f"{realm} no/!JWT") # confirm discovery issuer too (independent re-check) disc = sso.assert_discovery_endpoint(creds) if disc.get("issuer") != f"https://{D}/realms/{realm}": fails.append(f"{realm} issuer") mid = sorted(sso.list_realms(D, admin)) print(f"realms AFTER CREATE: {mid}") for realm in created: if realm not in mid: fails.append(f"{realm} not present after create") # 3) reap with live_hexes={aaa111}: must delete bbb222+ccc333, KEEP aaa111 reaped = sorted(sso.reap_orphaned_realms(D, live_hexes={"aaa111"})) print(f"REAPED (live=aaa111): {reaped}") if reaped != ["advchk-bbb222", "advchk-ccc333"]: fails.append(f"reaped set wrong: {reaped}") after = sorted(sso.list_realms(D, admin)) print(f"realms AFTER REAP: {after}") if "advchk-aaa111" not in after: fails.append("aaa111 wrongly reaped (live run would lose its realm)") if "advchk-bbb222" in after or "advchk-ccc333" in after: fails.append("orphan not reaped") # cleanup: remove aaa111 too; leave kc with only master (+ any pre-existing non-advchk realms) sso.delete_keycloak_realm(D, "advchk-aaa111", admin) final = sorted(sso.list_realms(D, admin)) print(f"realms FINAL (after cleanup): {final}") leftover = [r for r in final if r.startswith("advchk-")] if leftover: fails.append(f"leftover advchk realms: {leftover}") print("\nRESULT:", "FAIL " + "; ".join(fails) if fails else "PASS — all check4 assertions hold") sys.exit(1 if fails else 0)