feat(3 U0.2+U0.3): per-test results + results.json with computed level

harness/results.py: JUnit-XML parsing (stdlib) → per-stage/per-test rows; derive_rungs (documented
tier+deps/SSO → rung mapping); build_results assembles results.json {recipe,version,pr,ref,run_id,
stages[],level,level_cap_reason,rungs,flags{clean_teardown,no_secret_leak},screenshot,summary_card};
write_results (atomic). run_recipe_ci.py: tiers emit --junitxml + append {tier,source,file,rc,junit}
records; main() assembles+writes results.json wrapped so a failure NEVER changes the verdict (R7),
incl. a narrow leak-scan of the serialised artifact. 17 new unit tests (test_results.py).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
autonomic-bot
2026-05-31 05:55:52 +00:00
parent df54693449
commit 52e5d210d8
5 changed files with 819 additions and 63 deletions

View File

@ -66,7 +66,9 @@ def compute_level(rungs: dict[str, str]) -> tuple[int, str]:
for name in RUNGS: for name in RUNGS:
st = rungs.get(name) st = rungs.get(name)
if st not in VALID: if st not in VALID:
raise ValueError(f"rung {name!r} has invalid status {st!r} (expect one of {sorted(VALID)})") raise ValueError(
f"rung {name!r} has invalid status {st!r} (expect one of {sorted(VALID)})"
)
# L0: install did not pass. # L0: install did not pass.
if rungs["install"] != "pass": if rungs["install"] != "pass":

268
runner/harness/results.py Normal file
View File

@ -0,0 +1,268 @@
"""Phase 3 — structured run results + results.json (plan-phase3-results-ux.md §4.2, R1/R3).
Turns a run's per-tier pytest outcomes into a single `results.json` artifact carrying, per the plan:
{ recipe, version, pr, ref, run_id, finished, stages:[{name,status,tests:[{name,status,ms}]}],
level, level_cap_reason, rungs, flags:{clean_teardown,no_secret_leak}, screenshot, summary_card }
The per-test breakdown comes from JUnit XML emitted by each tier's pytest invocation (`--junitxml`),
parsed here with the stdlib (no new dep). The integer **level** is computed by harness.level from a
rung-status dict derived here (`derive_rungs`) from the tier results + deps/SSO signals the
orchestrator holds; that mapping is documented in DECISIONS.md (Phase 3).
This module is import-pure (no side effects at import). `write_results` is the only writer; the
orchestrator calls the build/write path inside a try/except so a results failure NEVER changes the
run's exit code (R7 — cosmetics never block the pipeline).
"""
from __future__ import annotations
import json
import os
import xml.etree.ElementTree as ET
from . import level as level_mod
# Where per-run artifacts (results.json, screenshot, summary card) are written on the runner host.
# The dashboard serves these read-only at /runs/<run_id>/... (U0.4). Overridable for tests.
RUNS_DIR_DEFAULT = "/var/lib/cc-ci-runs"
def runs_dir() -> str:
return os.environ.get("CCCI_RUNS_DIR", RUNS_DIR_DEFAULT)
def run_id() -> str:
"""Stable id for this run. Prefer the Drone build number (what the PR comment + dashboard link
to); fall back to the unique run domain so a hand-run still gets a distinct artifact dir."""
n = os.environ.get("DRONE_BUILD_NUMBER")
if n and n.strip():
return n.strip()
return os.environ.get("CCCI_APP_DOMAIN") or os.environ.get("CCCI_RUN_ID") or "manual"
def junit_file(junit_dir: str, tier: str, source: str, path: str) -> str:
"""Deterministic per-(tier,source,file) JUnit XML path under junit_dir."""
base = os.path.splitext(os.path.basename(path))[0]
safe = f"{tier}__{source}__{base}".replace("/", "_").replace(os.sep, "_")
return os.path.join(junit_dir, safe + ".xml")
def _case_status(case: ET.Element) -> tuple[str, str]:
"""(status, message) for one <testcase>. JUnit: child <failure>/<error>/<skipped>, else passed."""
for tag, st in (("error", "error"), ("failure", "fail"), ("skipped", "skip")):
el = case.find(tag)
if el is not None:
return st, (el.get("message") or "").strip()
return "pass", ""
def parse_junit(xml_path: str) -> list[dict]:
"""Parse one JUnit XML file → list of per-test rows {name, classname, status, ms, message}.
Tolerant: a missing/corrupt file yields []."""
try:
tree = ET.parse(xml_path)
except (OSError, ET.ParseError):
return []
rows: list[dict] = []
for case in tree.iter("testcase"):
status, message = _case_status(case)
try:
ms = int(round(float(case.get("time", "0")) * 1000))
except (TypeError, ValueError):
ms = 0
rows.append(
{
"name": case.get("name", "?"),
"classname": case.get("classname", ""),
"status": status,
"ms": ms,
"message": message,
}
)
return rows
def _stage_status(tests: list[dict]) -> str:
"""Roll per-test rows up to a stage status. Any error/fail → fail; else if any pass → pass;
else (all skipped / empty) → skip."""
sts = {t["status"] for t in tests}
if "fail" in sts or "error" in sts:
return "fail"
if "pass" in sts:
return "pass"
return "skip"
def collect_stages(records: list[dict]) -> list[dict]:
"""Group per-file run records into ordered stage dicts with their per-test breakdown.
`records` items: {tier, source, file, rc, junit}. Tests are read from each file's JUnit XML; if a
file produced no JUnit (e.g. pytest crashed before writing), fall back to a single synthetic row
derived from its exit code so the stage still reflects reality (rc!=0 → fail).
"""
order = ("install", "upgrade", "backup", "restore", "custom")
by_tier: dict[str, list[dict]] = {}
for rec in records:
tests = parse_junit(rec.get("junit", "")) if rec.get("junit") else []
if not tests:
# No JUnit rows — synthesize from the exit code so a crash isn't shown as "no tests".
base = os.path.basename(rec.get("file", "?"))
tests = [
{
"name": base,
"classname": rec.get("source", ""),
"status": "pass" if rec.get("rc", 1) == 0 else "fail",
"ms": 0,
"message": "" if rec.get("rc", 1) == 0 else "tier produced no JUnit; exit!=0",
}
]
for t in tests:
t["source"] = rec.get("source", "")
by_tier.setdefault(rec["tier"], []).extend(tests)
stages = []
for tier in order:
if tier in by_tier:
tests = by_tier[tier]
stages.append({"name": tier, "status": _stage_status(tests), "tests": tests})
return stages
def _has_repo_local(records: list[dict]) -> bool:
return any(r.get("source") == "repo-local" for r in records)
def _repo_local_passed(records: list[dict]) -> bool:
repo = [r for r in records if r.get("source") == "repo-local"]
return bool(repo) and all(r.get("rc", 1) == 0 for r in repo)
def derive_rungs(
results: dict[str, str],
*,
backup_capable: bool,
declared: list[str] | None,
deps_ready: bool,
sso_unverified: bool,
has_custom: bool,
has_repo_local: bool,
repo_local_passed: bool,
) -> dict[str, str]:
"""Translate the orchestrator's tier results + deps/SSO signals into the rung-status dict
harness.level consumes. Documented in DECISIONS.md (Phase 3). Conservative by design — never
reports a rung 'pass' it can't substantiate (cardinal guardrail: presentation never inflates).
L1 install : install tier pass.
L2 upgrade : upgrade tier (skip → N/A: only one published version).
L3 backup/res : backup AND restore tiers pass (N/A if not backup-capable).
L4 functional : the recipe-specific functional (non-deps) tests pass — the custom tier, minus
its SSO/integration tests. N/A if the recipe has no custom tests at all.
L5 integration: SSO/OIDC + cross-app. Applies ONLY if the recipe declares deps (else N/A — the
"no integration surface caps at L4" rule, §4.1). pass iff deps wired
(deps_ready) and not sso_unverified and the custom tier didn't fail.
L6 recipe-loc : the recipe repo's own tests/ (repo-local source) ran and passed (N/A if none).
"""
declared = declared or []
rungs: dict[str, str] = {}
rungs["install"] = level_mod.tier_to_rung(results.get("install"))
rungs["upgrade"] = level_mod.tier_to_rung(results.get("upgrade"))
rungs["backup_restore"] = level_mod.backup_restore_status(
results.get("backup"), results.get("restore"), backup_capable
)
custom = results.get("custom")
# Functional rung (L4): the non-deps custom tests.
if not has_custom or custom == "skip" or custom is None:
rungs["functional"] = "na"
elif custom == "fail":
# A custom test failed. With declared deps we cannot cheaply tell functional-vs-SSO apart, so
# conservatively fail the functional rung (caps at L3) — never inflate.
rungs["functional"] = "fail"
else: # custom == "pass"
rungs["functional"] = "pass"
# Integration rung (L5): only recipes with an SSO/integration surface (declared deps) can climb.
if not declared:
rungs["integration"] = "na"
elif sso_unverified or not deps_ready or custom == "fail":
# SSO not wired/verified, or a custom test failed → integration not verified.
rungs["integration"] = "fail"
elif custom == "pass":
rungs["integration"] = "pass"
else:
# declared deps but no custom tests ran — can't claim integration verified
rungs["integration"] = "na"
# Recipe-local rung (L6).
if not has_repo_local:
rungs["recipe_local"] = "na"
else:
rungs["recipe_local"] = "pass" if repo_local_passed else "fail"
return rungs
def build_results(
*,
recipe: str,
version: str | None,
pr: str,
ref: str | None,
records: list[dict],
results: dict[str, str],
backup_capable: bool,
declared: list[str] | None,
deps_ready: bool,
sso_unverified: bool,
clean_teardown: bool,
no_secret_leak: bool,
finished_ts: float | None,
screenshot: str | None = None,
summary_card: str | None = None,
) -> dict:
"""Assemble the full results.json dict (no I/O). `finished_ts` is passed in (the orchestrator
stamps it) so this stays pure and deterministic for unit tests."""
stages = collect_stages(records)
has_custom = any(r["tier"] == "custom" for r in records)
rungs = derive_rungs(
results,
backup_capable=backup_capable,
declared=declared,
deps_ready=deps_ready,
sso_unverified=sso_unverified,
has_custom=has_custom,
has_repo_local=_has_repo_local(records),
repo_local_passed=_repo_local_passed(records),
)
lvl, cap_reason = level_mod.compute_level(rungs)
return {
"schema": 1,
"run_id": run_id(),
"recipe": recipe,
"version": version,
"pr": str(pr),
"ref": (ref or "")[:12],
"finished": finished_ts,
"level": lvl,
"level_cap_reason": cap_reason,
"rungs": rungs,
"stages": stages,
"results": results,
"flags": {
"clean_teardown": bool(clean_teardown),
"no_secret_leak": bool(no_secret_leak),
},
"screenshot": screenshot,
"summary_card": summary_card,
}
def write_results(data: dict, runs_dir_override: str | None = None) -> str:
"""Write results.json into the run's artifact dir; return its path. Creates the dir."""
rd = runs_dir_override or runs_dir()
out_dir = os.path.join(rd, data["run_id"])
os.makedirs(out_dir, exist_ok=True)
path = os.path.join(out_dir, "results.json")
tmp = path + ".tmp"
with open(tmp, "w") as f:
json.dump(data, f, indent=2, sort_keys=True)
os.replace(tmp, path)
return path

View File

@ -49,6 +49,7 @@ from harness import ( # noqa: E402
generic, generic,
lifecycle, lifecycle,
naming, naming,
results as results_mod,
warm, warm,
warmsnap, warmsnap,
) )
@ -194,7 +195,15 @@ def _load_meta(recipe: str) -> dict:
ns: dict = {} ns: dict = {}
with open(path) as fh: with open(path) as fh:
exec(compile(fh.read(), path, "exec"), ns) # noqa: S102 (trusted, in-repo) exec(compile(fh.read(), path, "exec"), ns) # noqa: S102 (trusted, in-repo)
for k in list(meta) + ["BACKUP_CAPABLE", "SKIP_GENERIC", "OIDC_AT_INSTALL", "READY_PROBE", "UPGRADE_BASE_VERSION", "BACKUP_VERIFY", "UPGRADE_EXTRA_ENV"]: for k in list(meta) + [
"BACKUP_CAPABLE",
"SKIP_GENERIC",
"OIDC_AT_INSTALL",
"READY_PROBE",
"UPGRADE_BASE_VERSION",
"BACKUP_VERIFY",
"UPGRADE_EXTRA_ENV",
]:
if k in ns: if k in ns:
meta[k] = ns[k] meta[k] = ns[k]
return meta return meta
@ -240,7 +249,12 @@ def _run_pre_hook(recipe: str, op: str, repo_local: str | None, domain: str, met
def _perform_op( def _perform_op(
op: str, domain: str, recipe: str, head_ref: str | None, op_state: dict, deploy_timeout: int = 900, op: str,
domain: str,
recipe: str,
head_ref: str | None,
op_state: dict,
deploy_timeout: int = 900,
meta: dict | None = None, meta: dict | None = None,
) -> None: ) -> None:
"""Perform the single mutating op ONCE (the harness owns the op, HC3). install has no op. Records """Perform the single mutating op ONCE (the harness owns the op, HC3). install has no op. Records
@ -250,7 +264,9 @@ def _perform_op(
upgrade chaos redeploy so a heavy reconverge isn't SIGKILLed by the 900s default mid-wait; `meta` upgrade chaos redeploy so a heavy reconverge isn't SIGKILLed by the 900s default mid-wait; `meta`
lets the upgrade op own a recipe-aware convergence+health wait (F2-12, READY_PROBE).""" lets the upgrade op own a recipe-aware convergence+health wait (F2-12, READY_PROBE)."""
if op == "upgrade": if op == "upgrade":
before = generic.perform_upgrade(domain, recipe, head_ref, deploy_timeout=deploy_timeout, meta=meta) before = generic.perform_upgrade(
domain, recipe, head_ref, deploy_timeout=deploy_timeout, meta=meta
)
op_state["upgrade"] = {"before": before, "head_ref": head_ref} op_state["upgrade"] = {"before": before, "head_ref": head_ref}
elif op == "backup": elif op == "backup":
# Backup integrity + retry (F2-14b). A recipe may define BACKUP_VERIFY(domain) -> bool that # Backup integrity + retry (F2-14b). A recipe may define BACKUP_VERIFY(domain) -> bool that
@ -273,7 +289,10 @@ def _perform_op(
) )
snap = generic.perform_backup(domain) snap = generic.perform_backup(domain)
if callable(verify) and not verify(domain): if callable(verify) and not verify(domain):
print(f" !! backup-verify still FAILED after {attempt} attempts — backup is incomplete", flush=True) print(
f" !! backup-verify still FAILED after {attempt} attempts — backup is incomplete",
flush=True,
)
op_state["backup"] = {"snapshot_id": snap} op_state["backup"] = {"snapshot_id": snap}
elif op == "restore": elif op == "restore":
generic.perform_restore(domain) generic.perform_restore(domain)
@ -288,11 +307,17 @@ def run_lifecycle_tier(
meta: dict, meta: dict,
head_ref: str | None, head_ref: str | None,
op_state: dict, op_state: dict,
records: list[dict] | None = None,
junit_dir: str | None = None,
) -> str: ) -> str:
"""Additive lifecycle tier (HC3): seed (pre-op hook) → perform the op ONCE → run the generic """Additive lifecycle tier (HC3): seed (pre-op hook) → perform the op ONCE → run the generic
assertion file (unless opted out) AND the overlay assertion file, both against the shared post-op assertion file (unless opted out) AND the overlay assertion file, both against the shared post-op
deployment. The upgrade op redeploys the PR head (head_ref) via chaos (HC1). Returns deployment. The upgrade op redeploys the PR head (head_ref) via chaos (HC1). Returns
'pass' | 'fail' | 'skip'.""" 'pass' | 'fail' | 'skip'.
Phase 3 (R1/R3): when `records`/`junit_dir` are given, each pytest file is run with --junitxml and
a {tier,source,file,rc,junit} record appended, so the run can assemble per-stage/per-test
results.json + the level afterwards. Purely additive — does not change the verdict."""
overlay = discovery.resolve_overlay_op(recipe, op, repo_local) overlay = discovery.resolve_overlay_op(recipe, op, repo_local)
skip_gen = _skip_generic(op, meta) skip_gen = _skip_generic(op, meta)
files: list[tuple[str, str]] = [] files: list[tuple[str, str]] = []
@ -314,8 +339,13 @@ def run_lifecycle_tier(
try: try:
_run_pre_hook(recipe, op, repo_local, domain, meta) _run_pre_hook(recipe, op, repo_local, domain, meta)
_perform_op( _perform_op(
op, domain, recipe, head_ref, op_state, op,
deploy_timeout=int(meta.get("DEPLOY_TIMEOUT", 900)), meta=meta, domain,
recipe,
head_ref,
op_state,
deploy_timeout=int(meta.get("DEPLOY_TIMEOUT", 900)),
meta=meta,
) )
with open(os.environ["CCCI_OP_STATE_FILE"], "w") as f: with open(os.environ["CCCI_OP_STATE_FILE"], "w") as f:
json.dump(op_state, f) json.dump(op_state, f)
@ -328,9 +358,22 @@ def run_lifecycle_tier(
rc_all = 0 rc_all = 0
for source, path in files: for source, path in files:
print(f" assert ({source}): {os.path.relpath(path, ROOT)}", flush=True) print(f" assert ({source}): {os.path.relpath(path, ROOT)}", flush=True)
rc = run_redacted( cmd = [sys.executable, "-m", "pytest", "-v", "-rA", path]
[sys.executable, "-m", "pytest", "-v", "-rA", path], env=_tier_env(domain) jx = None
) if junit_dir is not None:
jx = results_mod.junit_file(junit_dir, op, source, path)
cmd.append(f"--junitxml={jx}")
rc = run_redacted(cmd, env=_tier_env(domain))
if records is not None:
records.append(
{
"tier": op,
"source": source,
"file": os.path.relpath(path, ROOT),
"rc": rc,
"junit": jx,
}
)
if rc != 0: if rc != 0:
rc_all = rc rc_all = rc
return "pass" if rc_all == 0 else "fail" return "pass" if rc_all == 0 else "fail"
@ -390,7 +433,9 @@ def _enrich_deps_with_sso(parent_recipe: str, parent_domain: str, deps_list) ->
return out return out
def _provision_deps(recipe: str, domain: str, ref: str | None, declared: list[str]) -> dict[str, dict]: def _provision_deps(
recipe: str, domain: str, ref: str | None, declared: list[str]
) -> dict[str, dict]:
"""Provision a run's declared deps and write `$CCCI_DEPS_FILE`; return the recipe→entry deps_state. """Provision a run's declared deps and write `$CCCI_DEPS_FILE`; return the recipe→entry deps_state.
Splits deps into live-warm (shared provider at a stable domain + a per-run realm) vs cold Splits deps into live-warm (shared provider at a stable domain + a per-run realm) vs cold
@ -438,7 +483,10 @@ def _run_setup_custom_tests_hook(recipe: str, domain: str, deps_file: str) -> No
if not os.path.isfile(path): if not os.path.isfile(path):
# No hook = recipe doesn't need post-deps wiring; deps are deployed + creds available # No hook = recipe doesn't need post-deps wiring; deps are deployed + creds available
# via deps_apps fixture as-is. # via deps_apps fixture as-is.
print(f" setup_custom_tests: no hook at {os.path.relpath(path, ROOT)} (deps creds ready in $CCCI_DEPS_FILE)", flush=True) print(
f" setup_custom_tests: no hook at {os.path.relpath(path, ROOT)} (deps creds ready in $CCCI_DEPS_FILE)",
flush=True,
)
return return
print(f" setup_custom_tests hook: {os.path.relpath(path, ROOT)}", flush=True) print(f" setup_custom_tests hook: {os.path.relpath(path, ROOT)}", flush=True)
rc = subprocess.run( rc = subprocess.run(
@ -452,9 +500,15 @@ def _run_setup_custom_tests_hook(recipe: str, domain: str, deps_file: str) -> No
) )
def run_custom(recipe: str, repo_local: str | None, domain: str) -> str: def run_custom(
recipe: str,
repo_local: str | None,
domain: str,
records: list[dict] | None = None,
junit_dir: str | None = None,
) -> str:
"""Run all discovered non-lifecycle custom test_*.py (both locations, additive). Returns """Run all discovered non-lifecycle custom test_*.py (both locations, additive). Returns
'skip' if none defined, else 'pass'/'fail'.""" 'skip' if none defined, else 'pass'/'fail'. Phase 3: emits JUnit + records when given."""
customs = discovery.custom_tests(recipe, repo_local) customs = discovery.custom_tests(recipe, repo_local)
if not customs: if not customs:
return "skip" return "skip"
@ -463,9 +517,14 @@ def run_custom(recipe: str, repo_local: str | None, domain: str) -> str:
for source, path in customs: for source, path in customs:
rel = os.path.relpath(path, ROOT) rel = os.path.relpath(path, ROOT)
print(f" custom ({source}): {rel}", flush=True) print(f" custom ({source}): {rel}", flush=True)
rc = run_redacted( cmd = [sys.executable, "-m", "pytest", "-v", "-rA", path]
[sys.executable, "-m", "pytest", "-v", "-rA", path], env=_tier_env(domain) jx = None
) if junit_dir is not None:
jx = results_mod.junit_file(junit_dir, "custom", source, path)
cmd.append(f"--junitxml={jx}")
rc = run_redacted(cmd, env=_tier_env(domain))
if records is not None:
records.append({"tier": "custom", "source": source, "file": rel, "rc": rc, "junit": jx})
if rc != 0: if rc != 0:
rc_all = rc rc_all = rc
return "pass" if rc_all == 0 else "fail" return "pass" if rc_all == 0 else "fail"
@ -482,8 +541,9 @@ def _wait_undeployed(domain: str, timeout: int = 120) -> None:
time.sleep(2) time.sleep(2)
def run_quick(recipe: str, ref: str | None, head_ref: str | None, repo_local: str | None, def run_quick(
meta: dict) -> int: recipe: str, ref: str | None, head_ref: str | None, repo_local: str | None, meta: dict
) -> int:
"""WC4 `--quick` opt-in fast lane (plan §2). Reattach the data-warm canonical (known-good volume) """WC4 `--quick` opt-in fast lane (plan §2). Reattach the data-warm canonical (known-good volume)
→ upgrade IN PLACE to the PR head (chaos) → assert generic UPGRADE (reconverge+moved+serving) + → upgrade IN PLACE to the PR head (chaos) → assert generic UPGRADE (reconverge+moved+serving) +
overlay + custom. PASS → undeploy-keep-volume, **known-good UNCHANGED (NEVER promote)**; FAIL → overlay + custom. PASS → undeploy-keep-volume, **known-good UNCHANGED (NEVER promote)**; FAIL →
@ -532,8 +592,11 @@ def run_quick(recipe: str, ref: str | None, head_ref: str | None, repo_local: st
try: try:
canonical.deploy_canonical(recipe, timeout=int(meta.get("DEPLOY_TIMEOUT", 900))) canonical.deploy_canonical(recipe, timeout=int(meta.get("DEPLOY_TIMEOUT", 900)))
lifecycle.wait_healthy( lifecycle.wait_healthy(
domain, ok_codes=tuple(meta["HEALTH_OK"]), path=meta["HEALTH_PATH"], domain,
deploy_timeout=meta["DEPLOY_TIMEOUT"], http_timeout=meta["HTTP_TIMEOUT"], ok_codes=tuple(meta["HEALTH_OK"]),
path=meta["HEALTH_PATH"],
deploy_timeout=meta["DEPLOY_TIMEOUT"],
http_timeout=meta["HTTP_TIMEOUT"],
) )
warm_ok = True warm_ok = True
except Exception as e: # noqa: BLE001 except Exception as e: # noqa: BLE001
@ -550,9 +613,11 @@ def run_quick(recipe: str, ref: str | None, head_ref: str | None, repo_local: st
(warm_deps if (wd and warm.is_warm_up(d, wd)) else cold_deps).append(d) (warm_deps if (wd and warm.is_warm_up(d, wd)) else cold_deps).append(d)
dep_metas = {d: _load_meta(d) for d in cold_deps} dep_metas = {d: _load_meta(d) for d in cold_deps}
deps_list = ( deps_list = (
deps_mod.deploy_deps(recipe, os.environ.get("PR", "0"), ref, cold_deps, deps_mod.deploy_deps(
meta_for=dep_metas) recipe, os.environ.get("PR", "0"), ref, cold_deps, meta_for=dep_metas
if cold_deps else [] )
if cold_deps
else []
) )
for d in warm_deps: for d in warm_deps:
wd = warm.warm_domain(d) wd = warm.warm_domain(d)
@ -565,8 +630,10 @@ def run_quick(recipe: str, ref: str | None, head_ref: str | None, repo_local: st
except Exception as e: # noqa: BLE001 except Exception as e: # noqa: BLE001
deps_ready = False deps_ready = False
deps_not_ready_reason = _scrub(str(e))[:300] deps_not_ready_reason = _scrub(str(e))[:300]
print(f"!! setup_custom_tests failed (deps-not-ready): {deps_not_ready_reason}", print(
flush=True) f"!! setup_custom_tests failed (deps-not-ready): {deps_not_ready_reason}",
flush=True,
)
# 3) UPGRADE to PR head (chaos) + assert (generic reconverge+moved+serving + overlay) # 3) UPGRADE to PR head (chaos) + assert (generic reconverge+moved+serving + overlay)
results["upgrade"] = run_lifecycle_tier( results["upgrade"] = run_lifecycle_tier(
@ -589,19 +656,28 @@ def run_quick(recipe: str, ref: str | None, head_ref: str | None, repo_local: st
pass pass
sso_unverified = sso_dep_unverified(declared, deps_ready, requires_deps_skipped) sso_unverified = sso_dep_unverified(declared, deps_ready, requires_deps_skipped)
passed = ( passed = (
warm_ok and bool(results) and all(v != "fail" for v in results.values()) warm_ok
and bool(results)
and all(v != "fail" for v in results.values())
and not sso_unverified and not sso_unverified
) )
# dep teardown: delete per-run warm realms; undeploy cold deps (mirrors cold) # dep teardown: delete per-run warm realms; undeploy cold deps (mirrors cold)
if deps_state: if deps_state:
ordered = ([deps_state[d] for d in declared if d in deps_state] ordered = (
if isinstance(deps_state, dict) else deps_state) [deps_state[d] for d in declared if d in deps_state]
if isinstance(deps_state, dict)
else deps_state
)
for e in [x for x in ordered if x.get("warm")]: for e in [x for x in ordered if x.get("warm")]:
try: try:
from harness import sso from harness import sso
sso.delete_keycloak_realm(e["domain"], e["realm"]) sso.delete_keycloak_realm(e["domain"], e["realm"])
print(f" dep: deleted per-run realm {e['realm']} on warm {e['recipe']}", flush=True) print(
f" dep: deleted per-run realm {e['realm']} on warm {e['recipe']}",
flush=True,
)
except Exception as ex: # noqa: BLE001 except Exception as ex: # noqa: BLE001
dep_teardown_error = f"warm realm delete failed for {e.get('realm')}: {ex}" dep_teardown_error = f"warm realm delete failed for {e.get('realm')}: {ex}"
print(f"!! {dep_teardown_error}", flush=True) print(f"!! {dep_teardown_error}", flush=True)
@ -617,10 +693,14 @@ def run_quick(recipe: str, ref: str | None, head_ref: str | None, repo_local: st
try: try:
if warm_ok and passed: if warm_ok and passed:
canonical.undeploy_keep_volume(recipe) canonical.undeploy_keep_volume(recipe)
print(" quick PASS → canonical undeployed, volume retained, known-good UNCHANGED", print(
flush=True) " quick PASS → canonical undeployed, volume retained, known-good UNCHANGED",
flush=True,
)
elif warm_ok: elif warm_ok:
print(" quick FAIL → rolling back canonical to last-known-good snapshot", flush=True) print(
" quick FAIL → rolling back canonical to last-known-good snapshot", flush=True
)
abra.undeploy(domain) abra.undeploy(domain)
_wait_undeployed(domain) _wait_undeployed(domain)
warmsnap.restore(recipe, domain) warmsnap.restore(recipe, domain)
@ -630,8 +710,10 @@ def run_quick(recipe: str, ref: str | None, head_ref: str | None, repo_local: st
abra.env_set(domain, "TYPE", f"{recipe}:{reg['version']}") abra.env_set(domain, "TYPE", f"{recipe}:{reg['version']}")
canonical._set_status(recipe, "idle") # noqa: SLF001 canonical._set_status(recipe, "idle") # noqa: SLF001
rolled_back = True rolled_back = True
print(" quick FAIL → restored known-good data; canonical idle (NOT promoted)", print(
flush=True) " quick FAIL → restored known-good data; canonical idle (NOT promoted)",
flush=True,
)
except Exception as e: # noqa: BLE001 except Exception as e: # noqa: BLE001
dep_teardown_error = (dep_teardown_error or "") + f" | quick teardown/rollback: {e}" dep_teardown_error = (dep_teardown_error or "") + f" | quick teardown/rollback: {e}"
print(f"!! quick teardown/rollback error: {e}", flush=True) print(f"!! quick teardown/rollback error: {e}", flush=True)
@ -644,8 +726,10 @@ def run_quick(recipe: str, ref: str | None, head_ref: str | None, repo_local: st
os.remove(skipfile) os.remove(skipfile)
print("\n===== RUN SUMMARY =====", flush=True) print("\n===== RUN SUMMARY =====", flush=True)
print(f"mode = quick (LOWER-CONFIDENCE; opt-in; does not gate merge)") print("mode = quick (LOWER-CONFIDENCE; opt-in; does not gate merge)")
print(f"canonical = {domain} known-good = {reg.get('version')} (UNCHANGED; quick never promotes)") print(
f"canonical = {domain} known-good = {reg.get('version')} (UNCHANGED; quick never promotes)"
)
if rolled_back: if rolled_back:
print("rolled-back = yes (restored last-known-good snapshot)") print("rolled-back = yes (restored last-known-good snapshot)")
for op in ("upgrade", "custom"): for op in ("upgrade", "custom"):
@ -659,8 +743,11 @@ def run_quick(recipe: str, ref: str | None, head_ref: str | None, repo_local: st
if any(v == "fail" for v in results.values()) or not warm_ok: if any(v == "fail" for v in results.values()) or not warm_ok:
overall = 1 overall = 1
if sso_unverified: if sso_unverified:
print(f"!! DEPS={declared} but setup_custom_tests failed and {requires_deps_skipped} " print(
"requires_deps SKIPPED — SSO NOT verified (F2-11)", file=sys.stderr) f"!! DEPS={declared} but setup_custom_tests failed and {requires_deps_skipped} "
"requires_deps SKIPPED — SSO NOT verified (F2-11)",
file=sys.stderr,
)
overall = 1 overall = 1
if dep_teardown_error: if dep_teardown_error:
print(f"!! teardown leaked/erred: {dep_teardown_error}", file=sys.stderr) print(f"!! teardown leaked/erred: {dep_teardown_error}", file=sys.stderr)
@ -695,16 +782,31 @@ def promote_canonical(recipe: str, head_ref: str | None) -> None:
meta = _load_meta(recipe) meta = _load_meta(recipe)
# The cold run's deploy-count was already asserted + the countfile removed; don't perturb it. # The cold run's deploy-count was already asserted + the countfile removed; don't perturb it.
os.environ.pop("CCCI_DEPLOY_COUNT_FILE", None) os.environ.pop("CCCI_DEPLOY_COUNT_FILE", None)
print(f"\n===== WC5 promote-on-green-cold: (re)seed canonical {recipe} @ {latest} =====", flush=True) print(
lifecycle.deploy_app(recipe, domain, version=latest, secrets=True, f"\n===== WC5 promote-on-green-cold: (re)seed canonical {recipe} @ {latest} =====",
deploy_timeout=int(meta.get("DEPLOY_TIMEOUT", 900))) flush=True,
lifecycle.wait_healthy(domain, ok_codes=tuple(meta["HEALTH_OK"]), path=meta["HEALTH_PATH"], )
deploy_timeout=meta["DEPLOY_TIMEOUT"], http_timeout=meta["HTTP_TIMEOUT"]) lifecycle.deploy_app(
recipe,
domain,
version=latest,
secrets=True,
deploy_timeout=int(meta.get("DEPLOY_TIMEOUT", 900)),
)
lifecycle.wait_healthy(
domain,
ok_codes=tuple(meta["HEALTH_OK"]),
path=meta["HEALTH_PATH"],
deploy_timeout=meta["DEPLOY_TIMEOUT"],
http_timeout=meta["HTTP_TIMEOUT"],
)
abra.undeploy(domain) abra.undeploy(domain)
_wait_undeployed(domain) _wait_undeployed(domain)
canonical.seed_canonical(recipe, latest, commit=head_ref) canonical.seed_canonical(recipe, latest, commit=head_ref)
print(f"WC5 promote: canonical {recipe} advanced to known-good {latest} (idle, volume retained)", print(
flush=True) f"WC5 promote: canonical {recipe} advanced to known-good {latest} (idle, volume retained)",
flush=True,
)
def main() -> int: def main() -> int:
@ -750,7 +852,11 @@ def main() -> int:
# newest published tag, where the correct base is [-1] (the newest published), not [-2]. The # newest published tag, where the correct base is [-1] (the newest published), not [-2]. The
# override must be an exact published version tag (deployed as a pinned base). (Adversary §7.1.) # override must be an exact published version tag (deployed as a pinned base). (Adversary §7.1.)
want_upgrade = "upgrade" in stages want_upgrade = "upgrade" in stages
prev = (meta.get("UPGRADE_BASE_VERSION") or lifecycle.previous_version(recipe)) if want_upgrade else None prev = (
(meta.get("UPGRADE_BASE_VERSION") or lifecycle.previous_version(recipe))
if want_upgrade
else None
)
base = prev or target base = prev or target
backup_cap = generic.backup_capable(recipe, meta) backup_cap = generic.backup_capable(recipe, meta)
hook = discovery.install_steps(recipe, repo_local) hook = discovery.install_steps(recipe, repo_local)
@ -761,6 +867,15 @@ def main() -> int:
f.write("0") f.write("0")
os.environ["CCCI_DEPLOY_COUNT_FILE"] = countfile os.environ["CCCI_DEPLOY_COUNT_FILE"] = countfile
# Phase 3 (R1/R3): per-run artifact dir + JUnit dir. The tiers emit JUnit per file and append a
# {tier,source,file,rc,junit} record; after the run we assemble results.json (per-stage/per-test +
# level) into the artifact dir. Best-effort — never changes the verdict (R7).
run_artifact_dir = os.path.join(results_mod.runs_dir(), results_mod.run_id())
junit_dir = os.path.join(run_artifact_dir, "junit")
records: list[dict] = []
with contextlib.suppress(OSError):
os.makedirs(junit_dir, exist_ok=True)
# Run-scoped op state (HC3): the orchestrator records op results (pre-upgrade identity, backup # Run-scoped op state (HC3): the orchestrator records op results (pre-upgrade identity, backup
# snapshot_id) here for the assertion tiers (generic + overlay) to read via generic.op_state(). # snapshot_id) here for the assertion tiers (generic + overlay) to read via generic.op_state().
statefile = os.path.join(tempfile.gettempdir(), f"ccci-opstate-{domain}.json") statefile = os.path.join(tempfile.gettempdir(), f"ccci-opstate-{domain}.json")
@ -805,14 +920,23 @@ def main() -> int:
# failure we mark deps-not-ready but STILL deploy the recipe alone (install_steps.sh no-ops # failure we mark deps-not-ready but STILL deploy the recipe alone (install_steps.sh no-ops
# on an empty deps file) so the generic tiers run; the OIDC custom test then skips → F2-11. ---- # on an empty deps file) so the generic tiers run; the OIDC custom test then skips → F2-11. ----
if oidc_at_install: if oidc_at_install:
print(f"\n===== install-time OIDC: provisioning deps {declared} BEFORE deploy =====", flush=True) print(
f"\n===== install-time OIDC: provisioning deps {declared} BEFORE deploy =====",
flush=True,
)
try: try:
deps_state = _provision_deps(recipe, domain, ref, declared) deps_state = _provision_deps(recipe, domain, ref, declared)
print(" install-time OIDC: deps provisioned; install_steps.sh will wire OIDC env", flush=True) print(
" install-time OIDC: deps provisioned; install_steps.sh will wire OIDC env",
flush=True,
)
except Exception as e: # noqa: BLE001 — isolated; recipe still deploys, OIDC test skips except Exception as e: # noqa: BLE001 — isolated; recipe still deploys, OIDC test skips
deps_ready = False deps_ready = False
deps_not_ready_reason = _scrub(str(e))[:300] deps_not_ready_reason = _scrub(str(e))[:300]
print(f"!! install-time dep provisioning failed (deps-not-ready): {deps_not_ready_reason}", flush=True) print(
f"!! install-time dep provisioning failed (deps-not-ready): {deps_not_ready_reason}",
flush=True,
)
# ---- deploy RECIPE FIRST, alone (no deps yet — generic tiers run recipe-only) ---- # ---- deploy RECIPE FIRST, alone (no deps yet — generic tiers run recipe-only) ----
try: try:
@ -842,7 +966,17 @@ def main() -> int:
# ---- INSTALL tier (always; additive generic + overlay, no op) ---- # ---- INSTALL tier (always; additive generic + overlay, no op) ----
if "install" in stages: if "install" in stages:
results["install"] = ( results["install"] = (
run_lifecycle_tier(recipe, "install", repo_local, domain, meta, head_ref, op_state) run_lifecycle_tier(
recipe,
"install",
repo_local,
domain,
meta,
head_ref,
op_state,
records=records,
junit_dir=junit_dir,
)
if deploy_ok if deploy_ok
else "fail" else "fail"
) )
@ -852,7 +986,15 @@ def main() -> int:
if "upgrade" in stages: if "upgrade" in stages:
results["upgrade"] = ( results["upgrade"] = (
run_lifecycle_tier( run_lifecycle_tier(
recipe, "upgrade", repo_local, domain, meta, head_ref, op_state recipe,
"upgrade",
repo_local,
domain,
meta,
head_ref,
op_state,
records=records,
junit_dir=junit_dir,
) )
if prev if prev
else "skip" # only one published version → nothing to upgrade from else "skip" # only one published version → nothing to upgrade from
@ -861,7 +1003,15 @@ def main() -> int:
if "backup" in stages: if "backup" in stages:
results["backup"] = ( results["backup"] = (
run_lifecycle_tier( run_lifecycle_tier(
recipe, "backup", repo_local, domain, meta, head_ref, op_state recipe,
"backup",
repo_local,
domain,
meta,
head_ref,
op_state,
records=records,
junit_dir=junit_dir,
) )
if backup_cap if backup_cap
else "skip" else "skip"
@ -869,7 +1019,15 @@ def main() -> int:
if "restore" in stages: if "restore" in stages:
results["restore"] = ( results["restore"] = (
run_lifecycle_tier( run_lifecycle_tier(
recipe, "restore", repo_local, domain, meta, head_ref, op_state recipe,
"restore",
repo_local,
domain,
meta,
head_ref,
op_state,
records=records,
junit_dir=junit_dir,
) )
if backup_cap if backup_cap
else "skip" else "skip"
@ -916,7 +1074,9 @@ def main() -> int:
# tests when CCCI_DEPS_READY=0. # tests when CCCI_DEPS_READY=0.
os.environ["CCCI_DEPS_READY"] = "1" if deps_ready else "0" os.environ["CCCI_DEPS_READY"] = "1" if deps_ready else "0"
os.environ["CCCI_DEPS_NOT_READY_REASON"] = deps_not_ready_reason os.environ["CCCI_DEPS_NOT_READY_REASON"] = deps_not_ready_reason
results["custom"] = run_custom(recipe, repo_local, domain) results["custom"] = run_custom(
recipe, repo_local, domain, records=records, junit_dir=junit_dir
)
else: else:
# install failed → the shared deployment is dead; remaining tiers cannot run on it. # install failed → the shared deployment is dead; remaining tiers cannot run on it.
for op in ("upgrade", "backup", "restore", "custom"): for op in ("upgrade", "backup", "restore", "custom"):
@ -945,7 +1105,10 @@ def main() -> int:
from harness import sso from harness import sso
sso.delete_keycloak_realm(e["domain"], e["realm"]) sso.delete_keycloak_realm(e["domain"], e["realm"])
print(f" dep: deleted per-run realm {e['realm']} on warm {e['recipe']}", flush=True) print(
f" dep: deleted per-run realm {e['realm']} on warm {e['recipe']}",
flush=True,
)
except Exception as ex: # noqa: BLE001 — a leaked realm is a teardown failure (§9) except Exception as ex: # noqa: BLE001 — a leaked realm is a teardown failure (§9)
dep_teardown_error = f"warm realm delete failed for {e.get('realm')}: {ex}" dep_teardown_error = f"warm realm delete failed for {e.get('realm')}: {ex}"
print(f"!! {dep_teardown_error}", flush=True) print(f"!! {dep_teardown_error}", flush=True)
@ -980,13 +1143,16 @@ def main() -> int:
# WC1: a live-warm dep (keycloak) is NOT deployed by the run — it only gets a per-run realm — so # WC1: a live-warm dep (keycloak) is NOT deployed by the run — it only gets a per-run realm — so
# warm deps contribute 0. So expected = 1 + (number of COLD deps that actually got deployed). # warm deps contribute 0. So expected = 1 + (number of COLD deps that actually got deployed).
_dep_entries = deps_state.values() if isinstance(deps_state, dict) else (deps_state or []) _dep_entries = deps_state.values() if isinstance(deps_state, dict) else (deps_state or [])
deps_deployed_count = sum(1 for e in _dep_entries if not (isinstance(e, dict) and e.get("warm"))) deps_deployed_count = sum(
1 for e in _dep_entries if not (isinstance(e, dict) and e.get("warm"))
)
expected_deploy_count = 1 + deps_deployed_count expected_deploy_count = 1 + deps_deployed_count
print("\n===== RUN SUMMARY =====", flush=True) print("\n===== RUN SUMMARY =====", flush=True)
print(f"deploy-count = {deploy_count} (expect {expected_deploy_count})") print(f"deploy-count = {deploy_count} (expect {expected_deploy_count})")
if deps_state: if deps_state:
deps_list_for_summary = ( deps_list_for_summary = (
list(deps_state.keys()) if isinstance(deps_state, dict) list(deps_state.keys())
if isinstance(deps_state, dict)
else [d.get("recipe", "?") for d in deps_state] else [d.get("recipe", "?") for d in deps_state]
) )
print(f" deps deployed: {deps_list_for_summary}") print(f" deps deployed: {deps_list_for_summary}")
@ -1029,6 +1195,47 @@ def main() -> int:
print("no tiers ran", file=sys.stderr) print("no tiers ran", file=sys.stderr)
return 1 return 1
# ---- Phase 3 (R1/R3): assemble results.json (per-stage/per-test + computed level). Best-effort:
# a failure here NEVER changes `overall` (R7 — cosmetics never block the pipeline). ----
try:
sso_unverified = sso_dep_unverified(declared, deps_ready, requires_deps_skipped)
clean_teardown = (deploy_count == expected_deploy_count) and not dep_teardown_error
data = results_mod.build_results(
recipe=recipe,
version=target or (head_ref[:12] if head_ref else None),
pr=os.environ.get("PR", "0"),
ref=ref,
records=records,
results=results,
backup_capable=backup_cap,
declared=declared,
deps_ready=deps_ready,
sso_unverified=sso_unverified,
clean_teardown=clean_teardown,
no_secret_leak=True, # narrowed below by an actual scan of the serialised artifact
finished_ts=time.time(),
)
# Real (if narrow) leak check: no known infra-secret value may appear in the artifact (R7).
blob = json.dumps(data)
leaked = any(v in blob for v in _REDACT)
data["flags"]["no_secret_leak"] = not leaked
if leaked:
print(
"!! results.json leak-scan: a known secret value appeared — scrubbing flag set False",
file=sys.stderr,
)
path = results_mod.write_results(data)
print(
f"results.json written: {path} (level={data['level']}"
f"{'' + data['level_cap_reason'] if data['level_cap_reason'] else ''})",
flush=True,
)
except Exception as e: # noqa: BLE001 — results assembly is cosmetic; never fail a run on it (R7)
print(
f"!! results.json assembly failed (non-fatal, verdict unaffected): {_scrub(str(e))}",
file=sys.stderr,
)
# WC5 promote-on-green-cold: a GREEN COLD run on LATEST (no PR head) of an enrolled # WC5 promote-on-green-cold: a GREEN COLD run on LATEST (no PR head) of an enrolled
# (WARM_CANONICAL) recipe advances/seeds the canonical. ONLY cold-on-latest advances it (a PR # (WARM_CANONICAL) recipe advances/seeds the canonical. ONLY cold-on-latest advances it (a PR
# `!testme` carries REF and must NOT promote; `--quick` never promotes — handled in run_quick). # `!testme` carries REF and must NOT promote; `--quick` never promotes — handled in run_quick).
@ -1037,8 +1244,10 @@ def main() -> int:
try: try:
promote_canonical(recipe, head_ref) promote_canonical(recipe, head_ref)
except Exception as e: # noqa: BLE001 — promote is a post-green bonus; never fail a green run except Exception as e: # noqa: BLE001 — promote is a post-green bonus; never fail a green run
print(f"!! WC5 promote failed (non-fatal; known-good unchanged): {_scrub(str(e))}", print(
flush=True) f"!! WC5 promote failed (non-fatal; known-good unchanged): {_scrub(str(e))}",
flush=True,
)
return overall return overall

View File

@ -14,8 +14,14 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner")
from harness import level as L # noqa: E402 from harness import level as L # noqa: E402
def _rungs(install="pass", upgrade="pass", backup_restore="pass", functional="pass", def _rungs(
integration="pass", recipe_local="pass"): install="pass",
upgrade="pass",
backup_restore="pass",
functional="pass",
integration="pass",
recipe_local="pass",
):
return { return {
"install": install, "install": install,
"upgrade": upgrade, "upgrade": upgrade,
@ -28,6 +34,7 @@ def _rungs(install="pass", upgrade="pass", backup_restore="pass", functional="pa
# ---- the U0 gate: L4-pass and L2-cap ---- # ---- the U0 gate: L4-pass and L2-cap ----
def test_full_clean_climb_to_L6(): def test_full_clean_climb_to_L6():
lvl, reason = L.compute_level(_rungs()) lvl, reason = L.compute_level(_rungs())
assert lvl == 6 assert lvl == 6
@ -50,6 +57,7 @@ def test_fails_at_L2_capped_at_L1():
# ---- L0 / install ---- # ---- L0 / install ----
def test_install_fail_is_L0(): def test_install_fail_is_L0():
lvl, reason = L.compute_level(_rungs(install="fail")) lvl, reason = L.compute_level(_rungs(install="fail"))
assert lvl == 0 assert lvl == 0
@ -58,6 +66,7 @@ def test_install_fail_is_L0():
# ---- gap-caps semantics: a higher pass can't rescue a lower gap ---- # ---- gap-caps semantics: a higher pass can't rescue a lower gap ----
def test_higher_pass_does_not_rescue_lower_na(): def test_higher_pass_does_not_rescue_lower_na():
# backup/restore N/A (stateless app) caps at L2 even though functional would pass. # backup/restore N/A (stateless app) caps at L2 even though functional would pass.
lvl, reason = L.compute_level(_rungs(backup_restore="na", functional="pass", integration="na")) lvl, reason = L.compute_level(_rungs(backup_restore="na", functional="pass", integration="na"))
@ -94,6 +103,7 @@ def test_functional_fail_caps_at_L3():
# ---- input validation ---- # ---- input validation ----
def test_invalid_status_raises(): def test_invalid_status_raises():
bad = _rungs() bad = _rungs()
bad["functional"] = "passed" # not in the vocabulary bad["functional"] = "passed" # not in the vocabulary
@ -106,6 +116,7 @@ def test_invalid_status_raises():
# ---- helpers: backup_restore_status ---- # ---- helpers: backup_restore_status ----
def test_backup_restore_status_pass(): def test_backup_restore_status_pass():
assert L.backup_restore_status("pass", "pass", True) == "pass" assert L.backup_restore_status("pass", "pass", True) == "pass"
@ -126,6 +137,7 @@ def test_backup_restore_partial_is_na():
# ---- helpers: tier_to_rung ---- # ---- helpers: tier_to_rung ----
def test_tier_to_rung_mapping(): def test_tier_to_rung_mapping():
assert L.tier_to_rung("pass") == "pass" assert L.tier_to_rung("pass") == "pass"
assert L.tier_to_rung("fail") == "fail" assert L.tier_to_rung("fail") == "fail"

265
tests/unit/test_results.py Normal file
View File

@ -0,0 +1,265 @@
"""Unit tests for Phase-3 results assembly (harness.results), plan-phase3-results-ux.md §4.2 / R1/R3.
Covers JUnit parsing, stage roll-up, the tier→rung derivation (the documented mapping the level
depends on), and full results.json assembly incl. the U0 gate cases. Pure / tmp-file only. Run cold:
cc-ci-run -m pytest tests/unit/test_results.py -q
"""
from __future__ import annotations
import json
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner"))
from harness import results as R # noqa: E402
JUNIT_PASS = """<?xml version="1.0"?>
<testsuites><testsuite name="pytest" tests="2">
<testcase classname="tests.x" name="test_a" time="0.012"/>
<testcase classname="tests.x" name="test_b" time="1.5"/>
</testsuite></testsuites>"""
JUNIT_MIXED = """<?xml version="1.0"?>
<testsuites><testsuite name="pytest" tests="3">
<testcase classname="tests.y" name="test_ok" time="0.1"/>
<testcase classname="tests.y" name="test_bad" time="0.2"><failure message="boom">trace</failure></testcase>
<testcase classname="tests.y" name="test_skipped" time="0"><skipped message="no deps"/></testcase>
</testsuite></testsuites>"""
def _write(tmp_path, name, content):
p = tmp_path / name
p.write_text(content)
return str(p)
def test_parse_junit_pass(tmp_path):
rows = R.parse_junit(_write(tmp_path, "p.xml", JUNIT_PASS))
assert len(rows) == 2
assert {r["status"] for r in rows} == {"pass"}
assert rows[1]["ms"] == 1500
def test_parse_junit_mixed(tmp_path):
rows = R.parse_junit(_write(tmp_path, "m.xml", JUNIT_MIXED))
by = {r["name"]: r["status"] for r in rows}
assert by == {"test_ok": "pass", "test_bad": "fail", "test_skipped": "skip"}
def test_parse_junit_missing_file_is_empty():
assert R.parse_junit("/nonexistent/x.xml") == []
def test_collect_stages_orders_and_rolls_up(tmp_path):
recs = [
{
"tier": "install",
"source": "generic",
"file": "g/test_install.py",
"rc": 0,
"junit": _write(tmp_path, "i.xml", JUNIT_PASS),
},
{
"tier": "custom",
"source": "cc-ci",
"file": "c/test_x.py",
"rc": 1,
"junit": _write(tmp_path, "c.xml", JUNIT_MIXED),
},
]
stages = R.collect_stages(recs)
assert [s["name"] for s in stages] == ["install", "custom"] # install before custom
assert stages[0]["status"] == "pass"
assert stages[1]["status"] == "fail" # the failure in JUNIT_MIXED
assert len(stages[1]["tests"]) == 3
def test_collect_stages_synthesizes_when_no_junit():
recs = [
{
"tier": "install",
"source": "generic",
"file": "g/test_install.py",
"rc": 1,
"junit": None,
}
]
stages = R.collect_stages(recs)
assert stages[0]["status"] == "fail"
assert len(stages[0]["tests"]) == 1
# ---- derive_rungs: the documented mapping ----
def _results(**kw):
base = {
"install": "pass",
"upgrade": "pass",
"backup": "pass",
"restore": "pass",
"custom": "pass",
}
base.update(kw)
return base
def test_derive_rungs_full_stateful_sso():
rungs = R.derive_rungs(
_results(),
backup_capable=True,
declared=["keycloak"],
deps_ready=True,
sso_unverified=False,
has_custom=True,
has_repo_local=False,
repo_local_passed=False,
)
assert rungs == {
"install": "pass",
"upgrade": "pass",
"backup_restore": "pass",
"functional": "pass",
"integration": "pass",
"recipe_local": "na",
}
def test_derive_rungs_no_sso_surface_is_integration_na():
rungs = R.derive_rungs(
_results(),
backup_capable=True,
declared=[],
deps_ready=True,
sso_unverified=False,
has_custom=True,
has_repo_local=False,
repo_local_passed=False,
)
assert rungs["integration"] == "na"
assert rungs["functional"] == "pass"
def test_derive_rungs_stateless_backup_na():
rungs = R.derive_rungs(
_results(backup="skip", restore="skip", custom="skip"),
backup_capable=False,
declared=[],
deps_ready=True,
sso_unverified=False,
has_custom=False,
has_repo_local=False,
repo_local_passed=False,
)
assert rungs["backup_restore"] == "na"
assert rungs["functional"] == "na"
def test_derive_rungs_sso_unverified_is_integration_fail():
rungs = R.derive_rungs(
_results(),
backup_capable=True,
declared=["keycloak"],
deps_ready=False,
sso_unverified=True,
has_custom=True,
has_repo_local=False,
repo_local_passed=False,
)
assert rungs["integration"] == "fail"
def test_derive_rungs_repo_local_pass():
rungs = R.derive_rungs(
_results(),
backup_capable=True,
declared=[],
deps_ready=True,
sso_unverified=False,
has_custom=True,
has_repo_local=True,
repo_local_passed=True,
)
assert rungs["recipe_local"] == "pass"
# ---- build_results: end-to-end incl level + flags ----
def test_build_results_level_and_flags(tmp_path):
recs = [
{
"tier": "install",
"source": "generic",
"file": "g/test_install.py",
"rc": 0,
"junit": _write(tmp_path, "i.xml", JUNIT_PASS),
},
{
"tier": "custom",
"source": "cc-ci",
"file": "c/test_func.py",
"rc": 0,
"junit": _write(tmp_path, "c.xml", JUNIT_PASS),
},
]
data = R.build_results(
recipe="hedgedoc",
version="1.2.3",
pr="7",
ref="deadbeefcafe0000",
records=recs,
results=_results(),
backup_capable=True,
declared=[],
deps_ready=True,
sso_unverified=False,
clean_teardown=True,
no_secret_leak=True,
finished_ts=1234.0,
)
# stateful, functional pass, no SSO surface, no repo-local → caps at L4
assert data["level"] == 4
assert "L5" in data["level_cap_reason"]
assert data["recipe"] == "hedgedoc"
assert data["ref"] == "deadbeefcafe"
assert data["flags"] == {"clean_teardown": True, "no_secret_leak": True}
assert [s["name"] for s in data["stages"]] == ["install", "custom"]
def test_build_results_capped_at_L1_on_upgrade_fail(tmp_path):
recs = [
{
"tier": "install",
"source": "generic",
"file": "g/test_install.py",
"rc": 0,
"junit": _write(tmp_path, "i.xml", JUNIT_PASS),
}
]
data = R.build_results(
recipe="x",
version=None,
pr="0",
ref=None,
records=recs,
results=_results(upgrade="fail"),
backup_capable=True,
declared=[],
deps_ready=True,
sso_unverified=False,
clean_teardown=True,
no_secret_leak=True,
finished_ts=0.0,
)
assert data["level"] == 1
assert "L2" in data["level_cap_reason"]
def test_write_results_roundtrip(tmp_path):
data = {"run_id": "42", "level": 3, "stages": []}
path = R.write_results(data, runs_dir_override=str(tmp_path))
assert path.endswith("/42/results.json")
with open(path) as f:
assert json.load(f)["level"] == 3