diff --git a/runner/harness/level.py b/runner/harness/level.py index 834f9f9..f88d802 100644 --- a/runner/harness/level.py +++ b/runner/harness/level.py @@ -66,7 +66,9 @@ def compute_level(rungs: dict[str, str]) -> tuple[int, str]: for name in RUNGS: st = rungs.get(name) if st not in VALID: - raise ValueError(f"rung {name!r} has invalid status {st!r} (expect one of {sorted(VALID)})") + raise ValueError( + f"rung {name!r} has invalid status {st!r} (expect one of {sorted(VALID)})" + ) # L0: install did not pass. if rungs["install"] != "pass": diff --git a/runner/harness/results.py b/runner/harness/results.py new file mode 100644 index 0000000..c3fae07 --- /dev/null +++ b/runner/harness/results.py @@ -0,0 +1,268 @@ +"""Phase 3 — structured run results + results.json (plan-phase3-results-ux.md §4.2, R1/R3). + +Turns a run's per-tier pytest outcomes into a single `results.json` artifact carrying, per the plan: + { recipe, version, pr, ref, run_id, finished, stages:[{name,status,tests:[{name,status,ms}]}], + level, level_cap_reason, rungs, flags:{clean_teardown,no_secret_leak}, screenshot, summary_card } + +The per-test breakdown comes from JUnit XML emitted by each tier's pytest invocation (`--junitxml`), +parsed here with the stdlib (no new dep). The integer **level** is computed by harness.level from a +rung-status dict derived here (`derive_rungs`) from the tier results + deps/SSO signals the +orchestrator holds; that mapping is documented in DECISIONS.md (Phase 3). + +This module is import-pure (no side effects at import). `write_results` is the only writer; the +orchestrator calls the build/write path inside a try/except so a results failure NEVER changes the +run's exit code (R7 — cosmetics never block the pipeline). +""" + +from __future__ import annotations + +import json +import os +import xml.etree.ElementTree as ET + +from . import level as level_mod + +# Where per-run artifacts (results.json, screenshot, summary card) are written on the runner host. +# The dashboard serves these read-only at /runs//... (U0.4). Overridable for tests. +RUNS_DIR_DEFAULT = "/var/lib/cc-ci-runs" + + +def runs_dir() -> str: + return os.environ.get("CCCI_RUNS_DIR", RUNS_DIR_DEFAULT) + + +def run_id() -> str: + """Stable id for this run. Prefer the Drone build number (what the PR comment + dashboard link + to); fall back to the unique run domain so a hand-run still gets a distinct artifact dir.""" + n = os.environ.get("DRONE_BUILD_NUMBER") + if n and n.strip(): + return n.strip() + return os.environ.get("CCCI_APP_DOMAIN") or os.environ.get("CCCI_RUN_ID") or "manual" + + +def junit_file(junit_dir: str, tier: str, source: str, path: str) -> str: + """Deterministic per-(tier,source,file) JUnit XML path under junit_dir.""" + base = os.path.splitext(os.path.basename(path))[0] + safe = f"{tier}__{source}__{base}".replace("/", "_").replace(os.sep, "_") + return os.path.join(junit_dir, safe + ".xml") + + +def _case_status(case: ET.Element) -> tuple[str, str]: + """(status, message) for one . JUnit: child //, else passed.""" + for tag, st in (("error", "error"), ("failure", "fail"), ("skipped", "skip")): + el = case.find(tag) + if el is not None: + return st, (el.get("message") or "").strip() + return "pass", "" + + +def parse_junit(xml_path: str) -> list[dict]: + """Parse one JUnit XML file → list of per-test rows {name, classname, status, ms, message}. + Tolerant: a missing/corrupt file yields [].""" + try: + tree = ET.parse(xml_path) + except (OSError, ET.ParseError): + return [] + rows: list[dict] = [] + for case in tree.iter("testcase"): + status, message = _case_status(case) + try: + ms = int(round(float(case.get("time", "0")) * 1000)) + except (TypeError, ValueError): + ms = 0 + rows.append( + { + "name": case.get("name", "?"), + "classname": case.get("classname", ""), + "status": status, + "ms": ms, + "message": message, + } + ) + return rows + + +def _stage_status(tests: list[dict]) -> str: + """Roll per-test rows up to a stage status. Any error/fail → fail; else if any pass → pass; + else (all skipped / empty) → skip.""" + sts = {t["status"] for t in tests} + if "fail" in sts or "error" in sts: + return "fail" + if "pass" in sts: + return "pass" + return "skip" + + +def collect_stages(records: list[dict]) -> list[dict]: + """Group per-file run records into ordered stage dicts with their per-test breakdown. + + `records` items: {tier, source, file, rc, junit}. Tests are read from each file's JUnit XML; if a + file produced no JUnit (e.g. pytest crashed before writing), fall back to a single synthetic row + derived from its exit code so the stage still reflects reality (rc!=0 → fail). + """ + order = ("install", "upgrade", "backup", "restore", "custom") + by_tier: dict[str, list[dict]] = {} + for rec in records: + tests = parse_junit(rec.get("junit", "")) if rec.get("junit") else [] + if not tests: + # No JUnit rows — synthesize from the exit code so a crash isn't shown as "no tests". + base = os.path.basename(rec.get("file", "?")) + tests = [ + { + "name": base, + "classname": rec.get("source", ""), + "status": "pass" if rec.get("rc", 1) == 0 else "fail", + "ms": 0, + "message": "" if rec.get("rc", 1) == 0 else "tier produced no JUnit; exit!=0", + } + ] + for t in tests: + t["source"] = rec.get("source", "") + by_tier.setdefault(rec["tier"], []).extend(tests) + stages = [] + for tier in order: + if tier in by_tier: + tests = by_tier[tier] + stages.append({"name": tier, "status": _stage_status(tests), "tests": tests}) + return stages + + +def _has_repo_local(records: list[dict]) -> bool: + return any(r.get("source") == "repo-local" for r in records) + + +def _repo_local_passed(records: list[dict]) -> bool: + repo = [r for r in records if r.get("source") == "repo-local"] + return bool(repo) and all(r.get("rc", 1) == 0 for r in repo) + + +def derive_rungs( + results: dict[str, str], + *, + backup_capable: bool, + declared: list[str] | None, + deps_ready: bool, + sso_unverified: bool, + has_custom: bool, + has_repo_local: bool, + repo_local_passed: bool, +) -> dict[str, str]: + """Translate the orchestrator's tier results + deps/SSO signals into the rung-status dict + harness.level consumes. Documented in DECISIONS.md (Phase 3). Conservative by design — never + reports a rung 'pass' it can't substantiate (cardinal guardrail: presentation never inflates). + + L1 install : install tier pass. + L2 upgrade : upgrade tier (skip → N/A: only one published version). + L3 backup/res : backup AND restore tiers pass (N/A if not backup-capable). + L4 functional : the recipe-specific functional (non-deps) tests pass — the custom tier, minus + its SSO/integration tests. N/A if the recipe has no custom tests at all. + L5 integration: SSO/OIDC + cross-app. Applies ONLY if the recipe declares deps (else N/A — the + "no integration surface caps at L4" rule, §4.1). pass iff deps wired + (deps_ready) and not sso_unverified and the custom tier didn't fail. + L6 recipe-loc : the recipe repo's own tests/ (repo-local source) ran and passed (N/A if none). + """ + declared = declared or [] + rungs: dict[str, str] = {} + rungs["install"] = level_mod.tier_to_rung(results.get("install")) + rungs["upgrade"] = level_mod.tier_to_rung(results.get("upgrade")) + rungs["backup_restore"] = level_mod.backup_restore_status( + results.get("backup"), results.get("restore"), backup_capable + ) + + custom = results.get("custom") + # Functional rung (L4): the non-deps custom tests. + if not has_custom or custom == "skip" or custom is None: + rungs["functional"] = "na" + elif custom == "fail": + # A custom test failed. With declared deps we cannot cheaply tell functional-vs-SSO apart, so + # conservatively fail the functional rung (caps at L3) — never inflate. + rungs["functional"] = "fail" + else: # custom == "pass" + rungs["functional"] = "pass" + + # Integration rung (L5): only recipes with an SSO/integration surface (declared deps) can climb. + if not declared: + rungs["integration"] = "na" + elif sso_unverified or not deps_ready or custom == "fail": + # SSO not wired/verified, or a custom test failed → integration not verified. + rungs["integration"] = "fail" + elif custom == "pass": + rungs["integration"] = "pass" + else: + # declared deps but no custom tests ran — can't claim integration verified + rungs["integration"] = "na" + + # Recipe-local rung (L6). + if not has_repo_local: + rungs["recipe_local"] = "na" + else: + rungs["recipe_local"] = "pass" if repo_local_passed else "fail" + return rungs + + +def build_results( + *, + recipe: str, + version: str | None, + pr: str, + ref: str | None, + records: list[dict], + results: dict[str, str], + backup_capable: bool, + declared: list[str] | None, + deps_ready: bool, + sso_unverified: bool, + clean_teardown: bool, + no_secret_leak: bool, + finished_ts: float | None, + screenshot: str | None = None, + summary_card: str | None = None, +) -> dict: + """Assemble the full results.json dict (no I/O). `finished_ts` is passed in (the orchestrator + stamps it) so this stays pure and deterministic for unit tests.""" + stages = collect_stages(records) + has_custom = any(r["tier"] == "custom" for r in records) + rungs = derive_rungs( + results, + backup_capable=backup_capable, + declared=declared, + deps_ready=deps_ready, + sso_unverified=sso_unverified, + has_custom=has_custom, + has_repo_local=_has_repo_local(records), + repo_local_passed=_repo_local_passed(records), + ) + lvl, cap_reason = level_mod.compute_level(rungs) + return { + "schema": 1, + "run_id": run_id(), + "recipe": recipe, + "version": version, + "pr": str(pr), + "ref": (ref or "")[:12], + "finished": finished_ts, + "level": lvl, + "level_cap_reason": cap_reason, + "rungs": rungs, + "stages": stages, + "results": results, + "flags": { + "clean_teardown": bool(clean_teardown), + "no_secret_leak": bool(no_secret_leak), + }, + "screenshot": screenshot, + "summary_card": summary_card, + } + + +def write_results(data: dict, runs_dir_override: str | None = None) -> str: + """Write results.json into the run's artifact dir; return its path. Creates the dir.""" + rd = runs_dir_override or runs_dir() + out_dir = os.path.join(rd, data["run_id"]) + os.makedirs(out_dir, exist_ok=True) + path = os.path.join(out_dir, "results.json") + tmp = path + ".tmp" + with open(tmp, "w") as f: + json.dump(data, f, indent=2, sort_keys=True) + os.replace(tmp, path) + return path diff --git a/runner/run_recipe_ci.py b/runner/run_recipe_ci.py index 31f8a50..f0b1e0c 100644 --- a/runner/run_recipe_ci.py +++ b/runner/run_recipe_ci.py @@ -49,6 +49,7 @@ from harness import ( # noqa: E402 generic, lifecycle, naming, + results as results_mod, warm, warmsnap, ) @@ -194,7 +195,15 @@ def _load_meta(recipe: str) -> dict: ns: dict = {} with open(path) as fh: exec(compile(fh.read(), path, "exec"), ns) # noqa: S102 (trusted, in-repo) - for k in list(meta) + ["BACKUP_CAPABLE", "SKIP_GENERIC", "OIDC_AT_INSTALL", "READY_PROBE", "UPGRADE_BASE_VERSION", "BACKUP_VERIFY", "UPGRADE_EXTRA_ENV"]: + for k in list(meta) + [ + "BACKUP_CAPABLE", + "SKIP_GENERIC", + "OIDC_AT_INSTALL", + "READY_PROBE", + "UPGRADE_BASE_VERSION", + "BACKUP_VERIFY", + "UPGRADE_EXTRA_ENV", + ]: if k in ns: meta[k] = ns[k] return meta @@ -240,7 +249,12 @@ def _run_pre_hook(recipe: str, op: str, repo_local: str | None, domain: str, met def _perform_op( - op: str, domain: str, recipe: str, head_ref: str | None, op_state: dict, deploy_timeout: int = 900, + op: str, + domain: str, + recipe: str, + head_ref: str | None, + op_state: dict, + deploy_timeout: int = 900, meta: dict | None = None, ) -> None: """Perform the single mutating op ONCE (the harness owns the op, HC3). install has no op. Records @@ -250,7 +264,9 @@ def _perform_op( upgrade chaos redeploy so a heavy reconverge isn't SIGKILLed by the 900s default mid-wait; `meta` lets the upgrade op own a recipe-aware convergence+health wait (F2-12, READY_PROBE).""" if op == "upgrade": - before = generic.perform_upgrade(domain, recipe, head_ref, deploy_timeout=deploy_timeout, meta=meta) + before = generic.perform_upgrade( + domain, recipe, head_ref, deploy_timeout=deploy_timeout, meta=meta + ) op_state["upgrade"] = {"before": before, "head_ref": head_ref} elif op == "backup": # Backup integrity + retry (F2-14b). A recipe may define BACKUP_VERIFY(domain) -> bool that @@ -273,7 +289,10 @@ def _perform_op( ) snap = generic.perform_backup(domain) if callable(verify) and not verify(domain): - print(f" !! backup-verify still FAILED after {attempt} attempts — backup is incomplete", flush=True) + print( + f" !! backup-verify still FAILED after {attempt} attempts — backup is incomplete", + flush=True, + ) op_state["backup"] = {"snapshot_id": snap} elif op == "restore": generic.perform_restore(domain) @@ -288,11 +307,17 @@ def run_lifecycle_tier( meta: dict, head_ref: str | None, op_state: dict, + records: list[dict] | None = None, + junit_dir: str | None = None, ) -> str: """Additive lifecycle tier (HC3): seed (pre-op hook) → perform the op ONCE → run the generic assertion file (unless opted out) AND the overlay assertion file, both against the shared post-op deployment. The upgrade op redeploys the PR head (head_ref) via chaos (HC1). Returns - 'pass' | 'fail' | 'skip'.""" + 'pass' | 'fail' | 'skip'. + + Phase 3 (R1/R3): when `records`/`junit_dir` are given, each pytest file is run with --junitxml and + a {tier,source,file,rc,junit} record appended, so the run can assemble per-stage/per-test + results.json + the level afterwards. Purely additive — does not change the verdict.""" overlay = discovery.resolve_overlay_op(recipe, op, repo_local) skip_gen = _skip_generic(op, meta) files: list[tuple[str, str]] = [] @@ -314,8 +339,13 @@ def run_lifecycle_tier( try: _run_pre_hook(recipe, op, repo_local, domain, meta) _perform_op( - op, domain, recipe, head_ref, op_state, - deploy_timeout=int(meta.get("DEPLOY_TIMEOUT", 900)), meta=meta, + op, + domain, + recipe, + head_ref, + op_state, + deploy_timeout=int(meta.get("DEPLOY_TIMEOUT", 900)), + meta=meta, ) with open(os.environ["CCCI_OP_STATE_FILE"], "w") as f: json.dump(op_state, f) @@ -328,9 +358,22 @@ def run_lifecycle_tier( rc_all = 0 for source, path in files: print(f" assert ({source}): {os.path.relpath(path, ROOT)}", flush=True) - rc = run_redacted( - [sys.executable, "-m", "pytest", "-v", "-rA", path], env=_tier_env(domain) - ) + cmd = [sys.executable, "-m", "pytest", "-v", "-rA", path] + jx = None + if junit_dir is not None: + jx = results_mod.junit_file(junit_dir, op, source, path) + cmd.append(f"--junitxml={jx}") + rc = run_redacted(cmd, env=_tier_env(domain)) + if records is not None: + records.append( + { + "tier": op, + "source": source, + "file": os.path.relpath(path, ROOT), + "rc": rc, + "junit": jx, + } + ) if rc != 0: rc_all = rc return "pass" if rc_all == 0 else "fail" @@ -390,7 +433,9 @@ def _enrich_deps_with_sso(parent_recipe: str, parent_domain: str, deps_list) -> return out -def _provision_deps(recipe: str, domain: str, ref: str | None, declared: list[str]) -> dict[str, dict]: +def _provision_deps( + recipe: str, domain: str, ref: str | None, declared: list[str] +) -> dict[str, dict]: """Provision a run's declared deps and write `$CCCI_DEPS_FILE`; return the recipe→entry deps_state. Splits deps into live-warm (shared provider at a stable domain + a per-run realm) vs cold @@ -438,7 +483,10 @@ def _run_setup_custom_tests_hook(recipe: str, domain: str, deps_file: str) -> No if not os.path.isfile(path): # No hook = recipe doesn't need post-deps wiring; deps are deployed + creds available # via deps_apps fixture as-is. - print(f" setup_custom_tests: no hook at {os.path.relpath(path, ROOT)} (deps creds ready in $CCCI_DEPS_FILE)", flush=True) + print( + f" setup_custom_tests: no hook at {os.path.relpath(path, ROOT)} (deps creds ready in $CCCI_DEPS_FILE)", + flush=True, + ) return print(f" setup_custom_tests hook: {os.path.relpath(path, ROOT)}", flush=True) rc = subprocess.run( @@ -452,9 +500,15 @@ def _run_setup_custom_tests_hook(recipe: str, domain: str, deps_file: str) -> No ) -def run_custom(recipe: str, repo_local: str | None, domain: str) -> str: +def run_custom( + recipe: str, + repo_local: str | None, + domain: str, + records: list[dict] | None = None, + junit_dir: str | None = None, +) -> str: """Run all discovered non-lifecycle custom test_*.py (both locations, additive). Returns - 'skip' if none defined, else 'pass'/'fail'.""" + 'skip' if none defined, else 'pass'/'fail'. Phase 3: emits JUnit + records when given.""" customs = discovery.custom_tests(recipe, repo_local) if not customs: return "skip" @@ -463,9 +517,14 @@ def run_custom(recipe: str, repo_local: str | None, domain: str) -> str: for source, path in customs: rel = os.path.relpath(path, ROOT) print(f" custom ({source}): {rel}", flush=True) - rc = run_redacted( - [sys.executable, "-m", "pytest", "-v", "-rA", path], env=_tier_env(domain) - ) + cmd = [sys.executable, "-m", "pytest", "-v", "-rA", path] + jx = None + if junit_dir is not None: + jx = results_mod.junit_file(junit_dir, "custom", source, path) + cmd.append(f"--junitxml={jx}") + rc = run_redacted(cmd, env=_tier_env(domain)) + if records is not None: + records.append({"tier": "custom", "source": source, "file": rel, "rc": rc, "junit": jx}) if rc != 0: rc_all = rc return "pass" if rc_all == 0 else "fail" @@ -482,8 +541,9 @@ def _wait_undeployed(domain: str, timeout: int = 120) -> None: time.sleep(2) -def run_quick(recipe: str, ref: str | None, head_ref: str | None, repo_local: str | None, - meta: dict) -> int: +def run_quick( + recipe: str, ref: str | None, head_ref: str | None, repo_local: str | None, meta: dict +) -> int: """WC4 `--quick` opt-in fast lane (plan §2). Reattach the data-warm canonical (known-good volume) → upgrade IN PLACE to the PR head (chaos) → assert generic UPGRADE (reconverge+moved+serving) + overlay + custom. PASS → undeploy-keep-volume, **known-good UNCHANGED (NEVER promote)**; FAIL → @@ -532,8 +592,11 @@ def run_quick(recipe: str, ref: str | None, head_ref: str | None, repo_local: st try: canonical.deploy_canonical(recipe, timeout=int(meta.get("DEPLOY_TIMEOUT", 900))) lifecycle.wait_healthy( - domain, ok_codes=tuple(meta["HEALTH_OK"]), path=meta["HEALTH_PATH"], - deploy_timeout=meta["DEPLOY_TIMEOUT"], http_timeout=meta["HTTP_TIMEOUT"], + domain, + ok_codes=tuple(meta["HEALTH_OK"]), + path=meta["HEALTH_PATH"], + deploy_timeout=meta["DEPLOY_TIMEOUT"], + http_timeout=meta["HTTP_TIMEOUT"], ) warm_ok = True except Exception as e: # noqa: BLE001 @@ -550,9 +613,11 @@ def run_quick(recipe: str, ref: str | None, head_ref: str | None, repo_local: st (warm_deps if (wd and warm.is_warm_up(d, wd)) else cold_deps).append(d) dep_metas = {d: _load_meta(d) for d in cold_deps} deps_list = ( - deps_mod.deploy_deps(recipe, os.environ.get("PR", "0"), ref, cold_deps, - meta_for=dep_metas) - if cold_deps else [] + deps_mod.deploy_deps( + recipe, os.environ.get("PR", "0"), ref, cold_deps, meta_for=dep_metas + ) + if cold_deps + else [] ) for d in warm_deps: wd = warm.warm_domain(d) @@ -565,8 +630,10 @@ def run_quick(recipe: str, ref: str | None, head_ref: str | None, repo_local: st except Exception as e: # noqa: BLE001 deps_ready = False deps_not_ready_reason = _scrub(str(e))[:300] - print(f"!! setup_custom_tests failed (deps-not-ready): {deps_not_ready_reason}", - flush=True) + print( + f"!! setup_custom_tests failed (deps-not-ready): {deps_not_ready_reason}", + flush=True, + ) # 3) UPGRADE to PR head (chaos) + assert (generic reconverge+moved+serving + overlay) results["upgrade"] = run_lifecycle_tier( @@ -589,19 +656,28 @@ def run_quick(recipe: str, ref: str | None, head_ref: str | None, repo_local: st pass sso_unverified = sso_dep_unverified(declared, deps_ready, requires_deps_skipped) passed = ( - warm_ok and bool(results) and all(v != "fail" for v in results.values()) + warm_ok + and bool(results) + and all(v != "fail" for v in results.values()) and not sso_unverified ) # dep teardown: delete per-run warm realms; undeploy cold deps (mirrors cold) if deps_state: - ordered = ([deps_state[d] for d in declared if d in deps_state] - if isinstance(deps_state, dict) else deps_state) + ordered = ( + [deps_state[d] for d in declared if d in deps_state] + if isinstance(deps_state, dict) + else deps_state + ) for e in [x for x in ordered if x.get("warm")]: try: from harness import sso + sso.delete_keycloak_realm(e["domain"], e["realm"]) - print(f" dep: deleted per-run realm {e['realm']} on warm {e['recipe']}", flush=True) + print( + f" dep: deleted per-run realm {e['realm']} on warm {e['recipe']}", + flush=True, + ) except Exception as ex: # noqa: BLE001 dep_teardown_error = f"warm realm delete failed for {e.get('realm')}: {ex}" print(f"!! {dep_teardown_error}", flush=True) @@ -617,10 +693,14 @@ def run_quick(recipe: str, ref: str | None, head_ref: str | None, repo_local: st try: if warm_ok and passed: canonical.undeploy_keep_volume(recipe) - print(" quick PASS → canonical undeployed, volume retained, known-good UNCHANGED", - flush=True) + print( + " quick PASS → canonical undeployed, volume retained, known-good UNCHANGED", + flush=True, + ) elif warm_ok: - print(" quick FAIL → rolling back canonical to last-known-good snapshot", flush=True) + print( + " quick FAIL → rolling back canonical to last-known-good snapshot", flush=True + ) abra.undeploy(domain) _wait_undeployed(domain) warmsnap.restore(recipe, domain) @@ -630,8 +710,10 @@ def run_quick(recipe: str, ref: str | None, head_ref: str | None, repo_local: st abra.env_set(domain, "TYPE", f"{recipe}:{reg['version']}") canonical._set_status(recipe, "idle") # noqa: SLF001 rolled_back = True - print(" quick FAIL → restored known-good data; canonical idle (NOT promoted)", - flush=True) + print( + " quick FAIL → restored known-good data; canonical idle (NOT promoted)", + flush=True, + ) except Exception as e: # noqa: BLE001 dep_teardown_error = (dep_teardown_error or "") + f" | quick teardown/rollback: {e}" print(f"!! quick teardown/rollback error: {e}", flush=True) @@ -644,8 +726,10 @@ def run_quick(recipe: str, ref: str | None, head_ref: str | None, repo_local: st os.remove(skipfile) print("\n===== RUN SUMMARY =====", flush=True) - print(f"mode = quick (LOWER-CONFIDENCE; opt-in; does not gate merge)") - print(f"canonical = {domain} known-good = {reg.get('version')} (UNCHANGED; quick never promotes)") + print("mode = quick (LOWER-CONFIDENCE; opt-in; does not gate merge)") + print( + f"canonical = {domain} known-good = {reg.get('version')} (UNCHANGED; quick never promotes)" + ) if rolled_back: print("rolled-back = yes (restored last-known-good snapshot)") for op in ("upgrade", "custom"): @@ -659,8 +743,11 @@ def run_quick(recipe: str, ref: str | None, head_ref: str | None, repo_local: st if any(v == "fail" for v in results.values()) or not warm_ok: overall = 1 if sso_unverified: - print(f"!! DEPS={declared} but setup_custom_tests failed and {requires_deps_skipped} " - "requires_deps SKIPPED — SSO NOT verified (F2-11)", file=sys.stderr) + print( + f"!! DEPS={declared} but setup_custom_tests failed and {requires_deps_skipped} " + "requires_deps SKIPPED — SSO NOT verified (F2-11)", + file=sys.stderr, + ) overall = 1 if dep_teardown_error: print(f"!! teardown leaked/erred: {dep_teardown_error}", file=sys.stderr) @@ -695,16 +782,31 @@ def promote_canonical(recipe: str, head_ref: str | None) -> None: meta = _load_meta(recipe) # The cold run's deploy-count was already asserted + the countfile removed; don't perturb it. os.environ.pop("CCCI_DEPLOY_COUNT_FILE", None) - print(f"\n===== WC5 promote-on-green-cold: (re)seed canonical {recipe} @ {latest} =====", flush=True) - lifecycle.deploy_app(recipe, domain, version=latest, secrets=True, - deploy_timeout=int(meta.get("DEPLOY_TIMEOUT", 900))) - lifecycle.wait_healthy(domain, ok_codes=tuple(meta["HEALTH_OK"]), path=meta["HEALTH_PATH"], - deploy_timeout=meta["DEPLOY_TIMEOUT"], http_timeout=meta["HTTP_TIMEOUT"]) + print( + f"\n===== WC5 promote-on-green-cold: (re)seed canonical {recipe} @ {latest} =====", + flush=True, + ) + lifecycle.deploy_app( + recipe, + domain, + version=latest, + secrets=True, + deploy_timeout=int(meta.get("DEPLOY_TIMEOUT", 900)), + ) + lifecycle.wait_healthy( + domain, + ok_codes=tuple(meta["HEALTH_OK"]), + path=meta["HEALTH_PATH"], + deploy_timeout=meta["DEPLOY_TIMEOUT"], + http_timeout=meta["HTTP_TIMEOUT"], + ) abra.undeploy(domain) _wait_undeployed(domain) canonical.seed_canonical(recipe, latest, commit=head_ref) - print(f"WC5 promote: canonical {recipe} advanced to known-good {latest} (idle, volume retained)", - flush=True) + print( + f"WC5 promote: canonical {recipe} advanced to known-good {latest} (idle, volume retained)", + flush=True, + ) def main() -> int: @@ -750,7 +852,11 @@ def main() -> int: # newest published tag, where the correct base is [-1] (the newest published), not [-2]. The # override must be an exact published version tag (deployed as a pinned base). (Adversary §7.1.) want_upgrade = "upgrade" in stages - prev = (meta.get("UPGRADE_BASE_VERSION") or lifecycle.previous_version(recipe)) if want_upgrade else None + prev = ( + (meta.get("UPGRADE_BASE_VERSION") or lifecycle.previous_version(recipe)) + if want_upgrade + else None + ) base = prev or target backup_cap = generic.backup_capable(recipe, meta) hook = discovery.install_steps(recipe, repo_local) @@ -761,6 +867,15 @@ def main() -> int: f.write("0") os.environ["CCCI_DEPLOY_COUNT_FILE"] = countfile + # Phase 3 (R1/R3): per-run artifact dir + JUnit dir. The tiers emit JUnit per file and append a + # {tier,source,file,rc,junit} record; after the run we assemble results.json (per-stage/per-test + + # level) into the artifact dir. Best-effort — never changes the verdict (R7). + run_artifact_dir = os.path.join(results_mod.runs_dir(), results_mod.run_id()) + junit_dir = os.path.join(run_artifact_dir, "junit") + records: list[dict] = [] + with contextlib.suppress(OSError): + os.makedirs(junit_dir, exist_ok=True) + # Run-scoped op state (HC3): the orchestrator records op results (pre-upgrade identity, backup # snapshot_id) here for the assertion tiers (generic + overlay) to read via generic.op_state(). statefile = os.path.join(tempfile.gettempdir(), f"ccci-opstate-{domain}.json") @@ -805,14 +920,23 @@ def main() -> int: # failure we mark deps-not-ready but STILL deploy the recipe alone (install_steps.sh no-ops # on an empty deps file) so the generic tiers run; the OIDC custom test then skips → F2-11. ---- if oidc_at_install: - print(f"\n===== install-time OIDC: provisioning deps {declared} BEFORE deploy =====", flush=True) + print( + f"\n===== install-time OIDC: provisioning deps {declared} BEFORE deploy =====", + flush=True, + ) try: deps_state = _provision_deps(recipe, domain, ref, declared) - print(" install-time OIDC: deps provisioned; install_steps.sh will wire OIDC env", flush=True) + print( + " install-time OIDC: deps provisioned; install_steps.sh will wire OIDC env", + flush=True, + ) except Exception as e: # noqa: BLE001 — isolated; recipe still deploys, OIDC test skips deps_ready = False deps_not_ready_reason = _scrub(str(e))[:300] - print(f"!! install-time dep provisioning failed (deps-not-ready): {deps_not_ready_reason}", flush=True) + print( + f"!! install-time dep provisioning failed (deps-not-ready): {deps_not_ready_reason}", + flush=True, + ) # ---- deploy RECIPE FIRST, alone (no deps yet — generic tiers run recipe-only) ---- try: @@ -842,7 +966,17 @@ def main() -> int: # ---- INSTALL tier (always; additive generic + overlay, no op) ---- if "install" in stages: results["install"] = ( - run_lifecycle_tier(recipe, "install", repo_local, domain, meta, head_ref, op_state) + run_lifecycle_tier( + recipe, + "install", + repo_local, + domain, + meta, + head_ref, + op_state, + records=records, + junit_dir=junit_dir, + ) if deploy_ok else "fail" ) @@ -852,7 +986,15 @@ def main() -> int: if "upgrade" in stages: results["upgrade"] = ( run_lifecycle_tier( - recipe, "upgrade", repo_local, domain, meta, head_ref, op_state + recipe, + "upgrade", + repo_local, + domain, + meta, + head_ref, + op_state, + records=records, + junit_dir=junit_dir, ) if prev else "skip" # only one published version → nothing to upgrade from @@ -861,7 +1003,15 @@ def main() -> int: if "backup" in stages: results["backup"] = ( run_lifecycle_tier( - recipe, "backup", repo_local, domain, meta, head_ref, op_state + recipe, + "backup", + repo_local, + domain, + meta, + head_ref, + op_state, + records=records, + junit_dir=junit_dir, ) if backup_cap else "skip" @@ -869,7 +1019,15 @@ def main() -> int: if "restore" in stages: results["restore"] = ( run_lifecycle_tier( - recipe, "restore", repo_local, domain, meta, head_ref, op_state + recipe, + "restore", + repo_local, + domain, + meta, + head_ref, + op_state, + records=records, + junit_dir=junit_dir, ) if backup_cap else "skip" @@ -916,7 +1074,9 @@ def main() -> int: # tests when CCCI_DEPS_READY=0. os.environ["CCCI_DEPS_READY"] = "1" if deps_ready else "0" os.environ["CCCI_DEPS_NOT_READY_REASON"] = deps_not_ready_reason - results["custom"] = run_custom(recipe, repo_local, domain) + results["custom"] = run_custom( + recipe, repo_local, domain, records=records, junit_dir=junit_dir + ) else: # install failed → the shared deployment is dead; remaining tiers cannot run on it. for op in ("upgrade", "backup", "restore", "custom"): @@ -945,7 +1105,10 @@ def main() -> int: from harness import sso sso.delete_keycloak_realm(e["domain"], e["realm"]) - print(f" dep: deleted per-run realm {e['realm']} on warm {e['recipe']}", flush=True) + print( + f" dep: deleted per-run realm {e['realm']} on warm {e['recipe']}", + flush=True, + ) except Exception as ex: # noqa: BLE001 — a leaked realm is a teardown failure (§9) dep_teardown_error = f"warm realm delete failed for {e.get('realm')}: {ex}" print(f"!! {dep_teardown_error}", flush=True) @@ -980,13 +1143,16 @@ def main() -> int: # WC1: a live-warm dep (keycloak) is NOT deployed by the run — it only gets a per-run realm — so # warm deps contribute 0. So expected = 1 + (number of COLD deps that actually got deployed). _dep_entries = deps_state.values() if isinstance(deps_state, dict) else (deps_state or []) - deps_deployed_count = sum(1 for e in _dep_entries if not (isinstance(e, dict) and e.get("warm"))) + deps_deployed_count = sum( + 1 for e in _dep_entries if not (isinstance(e, dict) and e.get("warm")) + ) expected_deploy_count = 1 + deps_deployed_count print("\n===== RUN SUMMARY =====", flush=True) print(f"deploy-count = {deploy_count} (expect {expected_deploy_count})") if deps_state: deps_list_for_summary = ( - list(deps_state.keys()) if isinstance(deps_state, dict) + list(deps_state.keys()) + if isinstance(deps_state, dict) else [d.get("recipe", "?") for d in deps_state] ) print(f" deps deployed: {deps_list_for_summary}") @@ -1029,6 +1195,47 @@ def main() -> int: print("no tiers ran", file=sys.stderr) return 1 + # ---- Phase 3 (R1/R3): assemble results.json (per-stage/per-test + computed level). Best-effort: + # a failure here NEVER changes `overall` (R7 — cosmetics never block the pipeline). ---- + try: + sso_unverified = sso_dep_unverified(declared, deps_ready, requires_deps_skipped) + clean_teardown = (deploy_count == expected_deploy_count) and not dep_teardown_error + data = results_mod.build_results( + recipe=recipe, + version=target or (head_ref[:12] if head_ref else None), + pr=os.environ.get("PR", "0"), + ref=ref, + records=records, + results=results, + backup_capable=backup_cap, + declared=declared, + deps_ready=deps_ready, + sso_unverified=sso_unverified, + clean_teardown=clean_teardown, + no_secret_leak=True, # narrowed below by an actual scan of the serialised artifact + finished_ts=time.time(), + ) + # Real (if narrow) leak check: no known infra-secret value may appear in the artifact (R7). + blob = json.dumps(data) + leaked = any(v in blob for v in _REDACT) + data["flags"]["no_secret_leak"] = not leaked + if leaked: + print( + "!! results.json leak-scan: a known secret value appeared — scrubbing flag set False", + file=sys.stderr, + ) + path = results_mod.write_results(data) + print( + f"results.json written: {path} (level={data['level']}" + f"{' — ' + data['level_cap_reason'] if data['level_cap_reason'] else ''})", + flush=True, + ) + except Exception as e: # noqa: BLE001 — results assembly is cosmetic; never fail a run on it (R7) + print( + f"!! results.json assembly failed (non-fatal, verdict unaffected): {_scrub(str(e))}", + file=sys.stderr, + ) + # WC5 promote-on-green-cold: a GREEN COLD run on LATEST (no PR head) of an enrolled # (WARM_CANONICAL) recipe advances/seeds the canonical. ONLY cold-on-latest advances it (a PR # `!testme` carries REF and must NOT promote; `--quick` never promotes — handled in run_quick). @@ -1037,8 +1244,10 @@ def main() -> int: try: promote_canonical(recipe, head_ref) except Exception as e: # noqa: BLE001 — promote is a post-green bonus; never fail a green run - print(f"!! WC5 promote failed (non-fatal; known-good unchanged): {_scrub(str(e))}", - flush=True) + print( + f"!! WC5 promote failed (non-fatal; known-good unchanged): {_scrub(str(e))}", + flush=True, + ) return overall diff --git a/tests/unit/test_level.py b/tests/unit/test_level.py index 45fc198..bb8994a 100644 --- a/tests/unit/test_level.py +++ b/tests/unit/test_level.py @@ -14,8 +14,14 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner") from harness import level as L # noqa: E402 -def _rungs(install="pass", upgrade="pass", backup_restore="pass", functional="pass", - integration="pass", recipe_local="pass"): +def _rungs( + install="pass", + upgrade="pass", + backup_restore="pass", + functional="pass", + integration="pass", + recipe_local="pass", +): return { "install": install, "upgrade": upgrade, @@ -28,6 +34,7 @@ def _rungs(install="pass", upgrade="pass", backup_restore="pass", functional="pa # ---- the U0 gate: L4-pass and L2-cap ---- + def test_full_clean_climb_to_L6(): lvl, reason = L.compute_level(_rungs()) assert lvl == 6 @@ -50,6 +57,7 @@ def test_fails_at_L2_capped_at_L1(): # ---- L0 / install ---- + def test_install_fail_is_L0(): lvl, reason = L.compute_level(_rungs(install="fail")) assert lvl == 0 @@ -58,6 +66,7 @@ def test_install_fail_is_L0(): # ---- gap-caps semantics: a higher pass can't rescue a lower gap ---- + def test_higher_pass_does_not_rescue_lower_na(): # backup/restore N/A (stateless app) caps at L2 even though functional would pass. lvl, reason = L.compute_level(_rungs(backup_restore="na", functional="pass", integration="na")) @@ -94,6 +103,7 @@ def test_functional_fail_caps_at_L3(): # ---- input validation ---- + def test_invalid_status_raises(): bad = _rungs() bad["functional"] = "passed" # not in the vocabulary @@ -106,6 +116,7 @@ def test_invalid_status_raises(): # ---- helpers: backup_restore_status ---- + def test_backup_restore_status_pass(): assert L.backup_restore_status("pass", "pass", True) == "pass" @@ -126,6 +137,7 @@ def test_backup_restore_partial_is_na(): # ---- helpers: tier_to_rung ---- + def test_tier_to_rung_mapping(): assert L.tier_to_rung("pass") == "pass" assert L.tier_to_rung("fail") == "fail" diff --git a/tests/unit/test_results.py b/tests/unit/test_results.py new file mode 100644 index 0000000..d8bdd51 --- /dev/null +++ b/tests/unit/test_results.py @@ -0,0 +1,265 @@ +"""Unit tests for Phase-3 results assembly (harness.results), plan-phase3-results-ux.md §4.2 / R1/R3. + +Covers JUnit parsing, stage roll-up, the tier→rung derivation (the documented mapping the level +depends on), and full results.json assembly incl. the U0 gate cases. Pure / tmp-file only. Run cold: + cc-ci-run -m pytest tests/unit/test_results.py -q +""" + +from __future__ import annotations + +import json +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner")) +from harness import results as R # noqa: E402 + +JUNIT_PASS = """ + + + +""" + +JUNIT_MIXED = """ + + +trace + +""" + + +def _write(tmp_path, name, content): + p = tmp_path / name + p.write_text(content) + return str(p) + + +def test_parse_junit_pass(tmp_path): + rows = R.parse_junit(_write(tmp_path, "p.xml", JUNIT_PASS)) + assert len(rows) == 2 + assert {r["status"] for r in rows} == {"pass"} + assert rows[1]["ms"] == 1500 + + +def test_parse_junit_mixed(tmp_path): + rows = R.parse_junit(_write(tmp_path, "m.xml", JUNIT_MIXED)) + by = {r["name"]: r["status"] for r in rows} + assert by == {"test_ok": "pass", "test_bad": "fail", "test_skipped": "skip"} + + +def test_parse_junit_missing_file_is_empty(): + assert R.parse_junit("/nonexistent/x.xml") == [] + + +def test_collect_stages_orders_and_rolls_up(tmp_path): + recs = [ + { + "tier": "install", + "source": "generic", + "file": "g/test_install.py", + "rc": 0, + "junit": _write(tmp_path, "i.xml", JUNIT_PASS), + }, + { + "tier": "custom", + "source": "cc-ci", + "file": "c/test_x.py", + "rc": 1, + "junit": _write(tmp_path, "c.xml", JUNIT_MIXED), + }, + ] + stages = R.collect_stages(recs) + assert [s["name"] for s in stages] == ["install", "custom"] # install before custom + assert stages[0]["status"] == "pass" + assert stages[1]["status"] == "fail" # the failure in JUNIT_MIXED + assert len(stages[1]["tests"]) == 3 + + +def test_collect_stages_synthesizes_when_no_junit(): + recs = [ + { + "tier": "install", + "source": "generic", + "file": "g/test_install.py", + "rc": 1, + "junit": None, + } + ] + stages = R.collect_stages(recs) + assert stages[0]["status"] == "fail" + assert len(stages[0]["tests"]) == 1 + + +# ---- derive_rungs: the documented mapping ---- + + +def _results(**kw): + base = { + "install": "pass", + "upgrade": "pass", + "backup": "pass", + "restore": "pass", + "custom": "pass", + } + base.update(kw) + return base + + +def test_derive_rungs_full_stateful_sso(): + rungs = R.derive_rungs( + _results(), + backup_capable=True, + declared=["keycloak"], + deps_ready=True, + sso_unverified=False, + has_custom=True, + has_repo_local=False, + repo_local_passed=False, + ) + assert rungs == { + "install": "pass", + "upgrade": "pass", + "backup_restore": "pass", + "functional": "pass", + "integration": "pass", + "recipe_local": "na", + } + + +def test_derive_rungs_no_sso_surface_is_integration_na(): + rungs = R.derive_rungs( + _results(), + backup_capable=True, + declared=[], + deps_ready=True, + sso_unverified=False, + has_custom=True, + has_repo_local=False, + repo_local_passed=False, + ) + assert rungs["integration"] == "na" + assert rungs["functional"] == "pass" + + +def test_derive_rungs_stateless_backup_na(): + rungs = R.derive_rungs( + _results(backup="skip", restore="skip", custom="skip"), + backup_capable=False, + declared=[], + deps_ready=True, + sso_unverified=False, + has_custom=False, + has_repo_local=False, + repo_local_passed=False, + ) + assert rungs["backup_restore"] == "na" + assert rungs["functional"] == "na" + + +def test_derive_rungs_sso_unverified_is_integration_fail(): + rungs = R.derive_rungs( + _results(), + backup_capable=True, + declared=["keycloak"], + deps_ready=False, + sso_unverified=True, + has_custom=True, + has_repo_local=False, + repo_local_passed=False, + ) + assert rungs["integration"] == "fail" + + +def test_derive_rungs_repo_local_pass(): + rungs = R.derive_rungs( + _results(), + backup_capable=True, + declared=[], + deps_ready=True, + sso_unverified=False, + has_custom=True, + has_repo_local=True, + repo_local_passed=True, + ) + assert rungs["recipe_local"] == "pass" + + +# ---- build_results: end-to-end incl level + flags ---- + + +def test_build_results_level_and_flags(tmp_path): + recs = [ + { + "tier": "install", + "source": "generic", + "file": "g/test_install.py", + "rc": 0, + "junit": _write(tmp_path, "i.xml", JUNIT_PASS), + }, + { + "tier": "custom", + "source": "cc-ci", + "file": "c/test_func.py", + "rc": 0, + "junit": _write(tmp_path, "c.xml", JUNIT_PASS), + }, + ] + data = R.build_results( + recipe="hedgedoc", + version="1.2.3", + pr="7", + ref="deadbeefcafe0000", + records=recs, + results=_results(), + backup_capable=True, + declared=[], + deps_ready=True, + sso_unverified=False, + clean_teardown=True, + no_secret_leak=True, + finished_ts=1234.0, + ) + # stateful, functional pass, no SSO surface, no repo-local → caps at L4 + assert data["level"] == 4 + assert "L5" in data["level_cap_reason"] + assert data["recipe"] == "hedgedoc" + assert data["ref"] == "deadbeefcafe" + assert data["flags"] == {"clean_teardown": True, "no_secret_leak": True} + assert [s["name"] for s in data["stages"]] == ["install", "custom"] + + +def test_build_results_capped_at_L1_on_upgrade_fail(tmp_path): + recs = [ + { + "tier": "install", + "source": "generic", + "file": "g/test_install.py", + "rc": 0, + "junit": _write(tmp_path, "i.xml", JUNIT_PASS), + } + ] + data = R.build_results( + recipe="x", + version=None, + pr="0", + ref=None, + records=recs, + results=_results(upgrade="fail"), + backup_capable=True, + declared=[], + deps_ready=True, + sso_unverified=False, + clean_teardown=True, + no_secret_leak=True, + finished_ts=0.0, + ) + assert data["level"] == 1 + assert "L2" in data["level_cap_reason"] + + +def test_write_results_roundtrip(tmp_path): + data = {"run_id": "42", "level": 3, "stages": []} + path = R.write_results(data, runs_dir_override=str(tmp_path)) + assert path.endswith("/42/results.json") + with open(path) as f: + assert json.load(f)["level"] == 3