feat(1e): HC3 additive generic + op/assertion split (orchestrator owns the op)

- orchestrator: per mutating tier, run optional pre-op seed hook (ops.py pre_<op>) → perform the op
  ONCE (harness-owned) → run generic assertion (unless opted out) AND overlay assertion, both against
  the shared post-op deployment. Op results passed op→assertion via run-scoped CCCI_OP_STATE_FILE.
- opt-out: CCCI_SKIP_GENERIC / CCCI_SKIP_GENERIC_<OP> / recipe_meta.SKIP_GENERIC (declarative).
- generic.py: split do_* into op primitives (perform_upgrade/backup/restore) + assertions
  (assert_upgraded/backup_artifact/restore_healthy) reading op_state(); deployed_identity now returns
  {version,image,chaos} (chaos label ready for HC1).
- generic test_<op>.py + all 6 recipe overlays migrated to assertion-only; pre-op seeding moved to
  per-recipe ops.py (pre_upgrade/pre_backup/pre_restore). install overlays unchanged (no op).
- deploy-count stays 1 (op primitives never call deploy_app). lint PASS; 8 unit tests PASS on cc-ci.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-28 03:12:04 +01:00
parent 6a59343996
commit b7e6cbd7be
31 changed files with 623 additions and 412 deletions

View File

@ -118,22 +118,49 @@ def assert_serving(domain: str, meta: dict) -> None:
assert "commoninternet.net" in detail.lower(), f"{domain}: served cert unexpected — {detail}"
def do_upgrade(domain: str, target: str | None, meta: dict) -> None:
"""UPGRADE op (in place on the shared deployment): abra app upgrade -> target, then assert it
reconverges + still serves AND that the deployment actually MOVED (version label and/or image
changed). The move assertion guards against a vacuous no-op upgrade silently passing — the exact
F1d-2 failure where a mis-pinned base deployed LATEST so 'upgrade to latest' changed nothing."""
before = lifecycle.deployed_identity(domain)
lifecycle.upgrade_app(domain, version=target)
# ---- Op/assertion split (Phase 1e HC3) -------------------------------------------------------
# The orchestrator performs each mutating op ONCE (the harness owns the op), records what an
# assertion needs (pre-upgrade identity, backup snapshot_id) into a run-scoped JSON state file at
# $CCCI_OP_STATE_FILE, then runs the generic assertion file (unless opted out) AND the overlay
# assertion file against the shared post-op state. The assertion functions below read that state via
# `op_state()`. They NEVER perform the op — that keeps the op single + lets generic+overlay coexist.
import json as _json # noqa: E402
def op_state() -> dict:
"""The run-scoped op state the orchestrator wrote between op and assertions (or {} if unset).
Carries e.g. {"upgrade": {"before": {...}}, "backup": {"snapshot_id": "..."}}."""
path = os.environ.get("CCCI_OP_STATE_FILE")
if not path or not os.path.exists(path):
return {}
try:
with open(path) as f:
return _json.load(f)
except (OSError, ValueError):
return {}
def assert_upgraded(domain: str, meta: dict) -> None:
"""Generic UPGRADE assertion (post-op): the orchestrator already performed the upgrade once.
Assert it reconverged + still serves AND that the deployment actually MOVED — guarding against a
vacuous no-op upgrade silently passing (F1d-2). HC1: prev→PR-head may NOT bump the version label,
so a MOVE is ANY of: version-label change, image change, or a chaos label now present (a chaos
deploy stamps the PR-head commit — THE proof the code under test was deployed)."""
before = op_state().get("upgrade", {}).get("before") or {}
assert_serving(domain, meta)
after = lifecycle.deployed_identity(domain)
moved = (before[0] and after[0] and before[0] != after[0]) or (
before[1] and after[1] and before[1] != after[1]
moved = (
(before.get("version") and after.get("version") and before["version"] != after["version"])
or (before.get("image") and after.get("image") and before["image"] != after["image"])
or (after.get("chaos") and after.get("chaos") != before.get("chaos"))
)
assert moved, (
f"{domain}: upgrade did not move the deployment "
f"(version {before[0]}->{after[0]}, image {before[1]}->{after[1]}) — "
"not a real previous->target upgrade (DG2 must be non-vacuous)"
f"(version {before.get('version')}->{after.get('version')}, "
f"image {before.get('image')}->{after.get('image')}, "
f"chaos {before.get('chaos')}->{after.get('chaos')}) — "
"not a real upgrade to the code under test (HC1/DG2 must be non-vacuous)"
)
@ -148,10 +175,10 @@ def parse_snapshot_id(backup_output: str) -> str | None:
return m.group(1) if m else None
def do_backup(domain: str) -> str:
"""BACKUP op: create a backup, then assert a snapshot artifact was produced (returns its id)."""
out = lifecycle.backup_app(domain)
snap_id = parse_snapshot_id(out)
def assert_backup_artifact(domain: str) -> str:
"""Generic BACKUP assertion (post-op): the orchestrator already ran the backup once. Assert a
snapshot artifact was produced (its id recorded in op state). Returns the id."""
snap_id = op_state().get("backup", {}).get("snapshot_id")
assert snap_id, (
f"{domain}: backup produced no snapshot artifact "
"(no snapshot_id in `abra app backup create` output)"
@ -159,8 +186,29 @@ def do_backup(domain: str) -> str:
return snap_id
def do_restore(domain: str, meta: dict) -> None:
"""RESTORE op: restore the latest snapshot, then assert the app is healthy + serving again
(assert_serving polls, so the post-restore reconverge settles)."""
lifecycle.restore_app(domain)
def assert_restore_healthy(domain: str, meta: dict) -> None:
"""Generic RESTORE assertion (post-op): the orchestrator already restored. Assert the app is
healthy + serving again (assert_serving polls, so the post-restore reconverge settles)."""
assert_serving(domain, meta)
# ---- Op primitives (orchestrator-only; perform the op once, never assert) --------------------
def perform_upgrade(domain: str, target: str | None) -> dict[str, str | None]:
"""Perform the UPGRADE op once (in place). E1 baseline: `abra app upgrade` -> target. (HC1/E2
redefines this as a chaos redeploy of the PR-head checkout.) Returns the pre-upgrade identity so
the orchestrator can record it for `assert_upgraded`'s move check."""
before = lifecycle.deployed_identity(domain)
lifecycle.upgrade_app(domain, version=target)
return before
def perform_backup(domain: str) -> str | None:
"""Perform the BACKUP op once. Returns the produced snapshot_id (or None) for the assertion."""
return parse_snapshot_id(lifecycle.backup_app(domain))
def perform_restore(domain: str) -> None:
"""Perform the RESTORE op once (restore the latest snapshot)."""
lifecycle.restore_app(domain)

View File

@ -245,11 +245,18 @@ def wait_healthy(
raise TimeoutError(f"{domain}: not healthy over HTTPS {path} (last status {last})")
def deployed_identity(domain: str, service: str = "app") -> tuple[str | None, str | None]:
"""(coop-cloud version label, image) of the running app service. Used to prove an upgrade
actually MOVED the deployment prev→target (not a vacuous no-op — Adversary F1d-2). The version
label (`coop-cloud.<stack>.version`) is bumped per published recipe version; the image usually
bumps too. Either changing proves the upgrade did something."""
def deployed_identity(domain: str, service: str = "app") -> dict[str, str | None]:
"""Identity of the running app service: {"version", "image", "chaos"}. Used to prove an upgrade
actually MOVED the deployment (not a vacuous no-op — Adversary F1d-2), AND (Phase 1e HC1) that an
`abra app deploy --chaos` upgrade actually deployed the PR-head code under test.
- `version` = the `coop-cloud.<stack>.version` label (bumped per published recipe version).
- `image` = the running container image (usually bumps with a published version).
- `chaos` = the chaos label value (a chaos deploy stamps the recipe git commit/dirty state here)
— present after `abra app deploy --chaos`, absent on a clean pinned-tag deploy. For prev→PR-head
this is THE proof PR-head was deployed even when the version label is unbumped (HC1). The exact
chaos label key varies by abra version, so we capture any `coop-cloud.<stack>.*` label whose key
contains "chaos"."""
name = f"{_stack_name(domain)}_{service}"
proc = subprocess.run(
[
@ -265,15 +272,18 @@ def deployed_identity(domain: str, service: str = "app") -> tuple[str | None, st
)
out = proc.stdout.strip()
if "|" not in out:
return (None, None)
return {"version": None, "image": None, "chaos": None}
labels_json, _, image = out.partition("|")
ver = None
ver = chaos = None
with contextlib.suppress(ValueError, json.JSONDecodeError):
for k, v in json.loads(labels_json).items():
if k.startswith("coop-cloud.") and k.endswith(".version"):
if not k.startswith("coop-cloud."):
continue
if k.endswith(".version"):
ver = v
break
return (ver, image.strip() or None)
elif "chaos" in k:
chaos = v
return {"version": ver, "image": image.strip() or None, "chaos": chaos}
def upgrade_app(domain: str, version: str | None = None) -> None:

View File

@ -1,13 +1,18 @@
#!/usr/bin/env python3
"""Top-level CI orchestrator (plan §4.3 + Phase 1d), invoked by the Drone pipeline (or by hand).
"""Top-level CI orchestrator (plan §4.3 + Phase 1d/1e), invoked by the Drone pipeline (or by hand).
Phase 1d model: deploy the app ONCE, then run lifecycle TIERS against that single shared deployment
(install asserts; upgrade does `abra app upgrade` in place; backup/restore mutate in place; custom
asserts), then ONE teardown in `finally`. Each tier's assertions come from exactly one file — a
recipe overlay if present, else the generic default — discovered by `harness.discovery`
(precedence repo-local > cc-ci > generic). The generic is the default for every op, so ANY recipe is
testable with zero config (DG1DG4). The lifecycle OPS live in the shared harness (harness.generic),
not per-recipe (DG7 DRY).
Model: deploy the app ONCE, then run lifecycle TIERS against that single shared deployment, then ONE
teardown in `finally`. Per Phase 1e the orchestrator OWNS each mutating op (HC3): for a tier it runs
the optional pre-op seed hook (recipe ops.py `pre_<op>`), performs the op exactly ONCE
(upgrade/backup/restore — install has none), then runs BOTH the generic assertion file (the floor,
unless explicitly opted out) AND the recipe overlay assertion file (if any) against the shared
post-op state — generic and overlay are ADDITIVE, not override (HC3). Op results an assertion needs
(pre-upgrade identity, snapshot_id) pass op→assertion via a run-scoped JSON state file
($CCCI_OP_STATE_FILE). The upgrade op deploys the PR-HEAD code under test via `abra app deploy
--chaos` (HC1). Repo-local (PR-authored) overlays/hooks run only for allowlist-approved recipes (HC2,
gated in harness.discovery). The generic is the default for every op, so ANY recipe is testable with
zero config (DG1DG4). The lifecycle OPS live in the shared harness (harness.generic), not per-recipe
(DG7 DRY).
Run parameters from env (set by the comment-bridge via Drone build params):
RECIPE recipe name (e.g. custom-html) [required]
@ -23,7 +28,10 @@ invoke as: cc-ci-run runner/run_recipe_ci.py
from __future__ import annotations
import contextlib
import glob
import importlib.util
import json
import os
import shutil
import subprocess
@ -37,6 +45,10 @@ from harness import discovery, generic, lifecycle, naming # noqa: E402
ALL_STAGES = ("install", "upgrade", "backup", "restore", "custom")
def _truthy(v: str | None) -> bool:
return str(v or "").strip().lower() in ("1", "true", "yes", "on")
def _redact_values() -> list[str]:
"""Values to scrub from published logs (D6 redaction filter, plan §4.4). The infra secrets
materialised at /run/secrets/* — if any subprocess ever echoes one, mask it. Only >=8-char
@ -56,6 +68,14 @@ def _redact_values() -> list[str]:
_REDACT = _redact_values()
def _scrub(text: str) -> str:
"""Mask any known infra-secret value in a string (D6 redaction, plan §4.4)."""
for v in _REDACT:
if v in text:
text = text.replace(v, "***REDACTED***")
return text
def run_redacted(cmd: list[str], env: dict | None = None) -> int:
"""Run a subprocess, streaming output live (so Drone logs stay tail-able) but masking any known
infra-secret value first. Belt-and-suspenders: the harness never prints secrets and abra doesn't
@ -71,10 +91,7 @@ def run_redacted(cmd: list[str], env: dict | None = None) -> int:
)
assert proc.stdout is not None
for line in proc.stdout:
for v in _REDACT:
if v in line:
line = line.replace(v, "***REDACTED***")
sys.stdout.write(line)
sys.stdout.write(_scrub(line))
sys.stdout.flush()
return proc.wait()
@ -149,7 +166,7 @@ def _load_meta(recipe: str) -> dict:
ns: dict = {}
with open(path) as fh:
exec(compile(fh.read(), path, "exec"), ns) # noqa: S102 (trusted, in-repo)
for k in list(meta) + ["BACKUP_CAPABLE"]:
for k in list(meta) + ["BACKUP_CAPABLE", "SKIP_GENERIC"]:
if k in ns:
meta[k] = ns[k]
return meta
@ -159,15 +176,105 @@ def _tier_env(domain: str) -> dict:
return dict(os.environ, CCCI_APP_DOMAIN=domain, CCCI_BASE_URL=f"https://{domain}")
def run_op_tier(recipe: str, op: str, repo_local: str | None, domain: str) -> str:
"""Run the single assertion file for a lifecycle op (overlay or generic) against the shared
deployment. The file performs the op (upgrade/backup/restore) + asserts; install asserts only
(already deployed). Returns 'pass' | 'fail'."""
source, path = discovery.resolve_op(recipe, op, repo_local)
rel = os.path.relpath(path, ROOT)
print(f"\n===== TIER: {op} ({source}: {rel}) =====", flush=True)
rc = run_redacted([sys.executable, "-m", "pytest", "-v", "-rA", path], env=_tier_env(domain))
return "pass" if rc == 0 else "fail"
def _skip_generic(op: str, meta: dict) -> bool:
"""Whether the generic assertion for `op` is opted out (Phase 1e HC3). Default: run (additive).
Opt-out, any of: env CCCI_SKIP_GENERIC (all ops), env CCCI_SKIP_GENERIC_<OP>, or the recipe's
declarative recipe_meta.SKIP_GENERIC list (op name, or "all"/"*")."""
if _truthy(os.environ.get("CCCI_SKIP_GENERIC")):
return True
if _truthy(os.environ.get(f"CCCI_SKIP_GENERIC_{op.upper()}")):
return True
sg = [str(s).lower() for s in (meta.get("SKIP_GENERIC") or [])]
return "all" in sg or "*" in sg or op in sg
def _run_pre_hook(recipe: str, op: str, repo_local: str | None, domain: str, meta: dict) -> None:
"""Run the optional pre-op seed hook (recipe ops.py `pre_<op>`) BEFORE the harness performs the
op (HC3 op/assertion split): overlays seed data-continuity markers / the backup→restore mutation
here, then assert post-op in test_<op>.py. cc-ci's ops.py is trusted; a repo-local ops.py is
consulted only for allowlist-approved recipes (HC2 gate is inside discovery.pre_op_hook). Imported
in-process; the recipe dir is put on sys.path so an ops.py can import its sibling helpers."""
hook = discovery.pre_op_hook(recipe, op, repo_local)
if not hook:
return
source, path = hook
d = os.path.dirname(path)
sys.path.insert(0, d)
try:
spec = importlib.util.spec_from_file_location(f"ccci_ops_{recipe}_{op}", path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
print(f" pre-op seed ({source}): {os.path.relpath(path, ROOT)}::pre_{op}", flush=True)
getattr(mod, f"pre_{op}")(domain, meta)
finally:
if d in sys.path:
sys.path.remove(d)
def _perform_op(op: str, domain: str, target: str | None, op_state: dict) -> None:
"""Perform the single mutating op ONCE (the harness owns the op, HC3). install has no op. Records
what the assertions need (pre-upgrade identity, backup snapshot_id) into op_state. None of these
call deploy_app, so the deploy-count guard (DG4.1) stays 1 — the in-place upgrade is not a new
install (HC1 reconciliation)."""
if op == "upgrade":
op_state["upgrade"] = {"before": generic.perform_upgrade(domain, target)}
elif op == "backup":
op_state["backup"] = {"snapshot_id": generic.perform_backup(domain)}
elif op == "restore":
generic.perform_restore(domain)
# install: already deployed; no op
def run_lifecycle_tier(
recipe: str,
op: str,
repo_local: str | None,
domain: str,
meta: dict,
target: str | None,
op_state: dict,
) -> str:
"""Additive lifecycle tier (HC3): seed (pre-op hook) → perform the op ONCE → run the generic
assertion file (unless opted out) AND the overlay assertion file, both against the shared post-op
deployment. Returns 'pass' | 'fail' | 'skip'."""
overlay = discovery.resolve_overlay_op(recipe, op, repo_local)
skip_gen = _skip_generic(op, meta)
files: list[tuple[str, str]] = []
if not skip_gen:
files.append(discovery.generic_op(op))
if overlay:
files.append(overlay)
if not files:
# generic opted out AND no overlay → nothing would assert; don't perform a pointless mutating op
print(f"\n===== TIER: {op} — SKIP (generic opted out, no overlay) =====", flush=True)
return "skip"
ov = f"{overlay[0]}:{os.path.relpath(overlay[1], ROOT)}" if overlay else "none"
print(
f"\n===== TIER: {op} (generic={'skip' if skip_gen else 'run'}, overlay={ov}) =====",
flush=True,
)
# 1) pre-op seed hook + 2) the op ONCE (harness-owned). A failure here is an op failure → tier fail.
try:
_run_pre_hook(recipe, op, repo_local, domain, meta)
_perform_op(op, domain, target, op_state)
with open(os.environ["CCCI_OP_STATE_FILE"], "w") as f:
json.dump(op_state, f)
except Exception as e: # noqa: BLE001 — a failed op is a reported tier failure, not a crash
print(f"!! {op} op failed: {_scrub(str(e))}", flush=True)
return "fail"
# 3) assertions: generic (unless opted out) + overlay, each its own pytest, all against the
# single post-op deployment. Generic runs first so an overlay may assume readiness.
rc_all = 0
for source, path in files:
print(f" assert ({source}): {os.path.relpath(path, ROOT)}", flush=True)
rc = run_redacted(
[sys.executable, "-m", "pytest", "-v", "-rA", path], env=_tier_env(domain)
)
if rc != 0:
rc_all = rc
return "pass" if rc_all == 0 else "fail"
def run_custom(recipe: str, repo_local: str | None, domain: str) -> str:
@ -223,6 +330,14 @@ def main() -> int:
f.write("0")
os.environ["CCCI_DEPLOY_COUNT_FILE"] = countfile
# Run-scoped op state (HC3): the orchestrator records op results (pre-upgrade identity, backup
# snapshot_id) here for the assertion tiers (generic + overlay) to read via generic.op_state().
statefile = os.path.join(tempfile.gettempdir(), f"ccci-opstate-{domain}.json")
with open(statefile, "w") as f:
json.dump({}, f)
os.environ["CCCI_OP_STATE_FILE"] = statefile
op_state: dict = {}
results: dict[str, str] = {}
lifecycle.janitor()
try:
@ -243,28 +358,38 @@ def main() -> int:
print(f"!! deploy/readiness failed: {e}", flush=True)
deploy_ok = False
# ---- INSTALL tier (always) ----
# ---- INSTALL tier (always; additive generic + overlay, no op) ----
if "install" in stages:
results["install"] = (
run_op_tier(recipe, "install", repo_local, domain) if deploy_ok else "fail"
run_lifecycle_tier(recipe, "install", repo_local, domain, meta, target, op_state)
if deploy_ok
else "fail"
)
if deploy_ok:
# ---- UPGRADE tier ----
# ---- UPGRADE tier (op once → generic + overlay assert) ----
if "upgrade" in stages:
results["upgrade"] = (
run_op_tier(recipe, "upgrade", repo_local, domain)
run_lifecycle_tier(
recipe, "upgrade", repo_local, domain, meta, target, op_state
)
if prev
else "skip" # only one published version → nothing to upgrade from
)
# ---- BACKUP + RESTORE tiers (backup-capable only; else clean N/A) ----
if "backup" in stages:
results["backup"] = (
run_op_tier(recipe, "backup", repo_local, domain) if backup_cap else "skip"
run_lifecycle_tier(recipe, "backup", repo_local, domain, meta, target, op_state)
if backup_cap
else "skip"
)
if "restore" in stages:
results["restore"] = (
run_op_tier(recipe, "restore", repo_local, domain) if backup_cap else "skip"
run_lifecycle_tier(
recipe, "restore", repo_local, domain, meta, target, op_state
)
if backup_cap
else "skip"
)
# ---- CUSTOM tier ----
if "custom" in stages:
@ -281,6 +406,8 @@ def main() -> int:
with open(countfile) as f:
deploy_count = int(f.read().strip() or "0")
os.remove(countfile)
with contextlib.suppress(OSError):
os.remove(statefile)
# ---- per-op summary (DG6 feed) ----
print("\n===== RUN SUMMARY =====", flush=True)