Files
cc-ci/runner/harness/generic.py
autonomic-bot b7e6cbd7be feat(1e): HC3 additive generic + op/assertion split (orchestrator owns the op)
- orchestrator: per mutating tier, run optional pre-op seed hook (ops.py pre_<op>) → perform the op
  ONCE (harness-owned) → run generic assertion (unless opted out) AND overlay assertion, both against
  the shared post-op deployment. Op results passed op→assertion via run-scoped CCCI_OP_STATE_FILE.
- opt-out: CCCI_SKIP_GENERIC / CCCI_SKIP_GENERIC_<OP> / recipe_meta.SKIP_GENERIC (declarative).
- generic.py: split do_* into op primitives (perform_upgrade/backup/restore) + assertions
  (assert_upgraded/backup_artifact/restore_healthy) reading op_state(); deployed_identity now returns
  {version,image,chaos} (chaos label ready for HC1).
- generic test_<op>.py + all 6 recipe overlays migrated to assertion-only; pre-op seeding moved to
  per-recipe ops.py (pre_upgrade/pre_backup/pre_restore). install overlays unchanged (no op).
- deploy-count stays 1 (op primitives never call deploy_app). lint PASS; 8 unit tests PASS on cc-ci.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 03:12:04 +01:00

215 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Generic, recipe-agnostic lifecycle assertions + op helpers (Phase 1d, plan §2.1).
These are THE default for each lifecycle op: when a recipe ships no `test_<op>.py` overlay, the
generic tier (tests/_generic/test_<op>.py) runs these against the single shared deployment the
orchestrator brought up. The lifecycle OPERATIONS (upgrade/backup/restore) live here too — owned by
the shared harness, not copy-pasted per recipe (DG7 DRY) — so overlays are assertions-only and may
reuse these by composition (`from harness import generic; generic.assert_serving(...)`).
Design + precedence: machine-docs/DECISIONS.md (Phase 1d).
"""
from __future__ import annotations
import glob
import os
import re
import socket
import ssl
import time
from . import lifecycle
# A recipe is backup-capable iff a compose file carries a truthy backupbot.backup label.
_BACKUPBOT_RE = re.compile(r"backupbot\.backup\b[^\n]*\btrue\b", re.IGNORECASE)
def _recipe_dir(recipe: str) -> str:
return os.path.expanduser(f"~/.abra/recipes/{recipe}")
def backup_capable(recipe: str, meta: dict | None = None) -> bool:
"""Whether the harness should run the backup/restore tiers (else they are a clean N/A skip, DG3).
`recipe_meta.BACKUP_CAPABLE` (bool) overrides; otherwise auto-detect by scanning the recipe's
compose*.yml for a truthy `backupbot.backup` label (the Co-op Cloud backup convention)."""
if meta and "BACKUP_CAPABLE" in meta:
return bool(meta["BACKUP_CAPABLE"])
for path in glob.glob(os.path.join(_recipe_dir(recipe), "compose*.yml")):
try:
with open(path) as fh:
if _BACKUPBOT_RE.search(fh.read()):
return True
except OSError:
continue
return False
def served_cert(domain: str, port: int = 443) -> tuple[bool, str]:
"""CA-verified TLS handshake to `domain` (via the gateway passthrough to cc-ci's Traefik).
Returns (verified, detail) with CN+SAN on success, or the failure reason.
Scope (per Adversary finding F1d-1): this is an INFRA TLS sanity check — it proves the served
wildcard cert is publicly trusted, unexpired, and hostname-valid (so it would fail if the
operator's LE wildcard lapsed/was mis-rotated — a real concern, plan §4.0 renewal). It does NOT
distinguish a routed app from an un-routed host: Traefik's file provider serves the wildcard for
the WHOLE `*.ci.commoninternet.net` zone, so any in-zone subdomain verifies whether or not an app
is deployed. The app-vs-Traefik-fallback proof is `services_converged` + a non-404 status in
`assert_serving`, not this."""
ctx = ssl.create_default_context() # verifies chain against system CAs + checks hostname
try:
with (
socket.create_connection((domain, port), timeout=20) as sock,
ctx.wrap_socket(sock, server_hostname=domain) as ssock,
):
cert = ssock.getpeercert()
except ssl.SSLCertVerificationError as e:
return (False, f"cert did not verify (Traefik default/self-signed?): {e}")
except (OSError, ssl.SSLError) as e:
return (False, f"TLS handshake error: {e}")
cn = next(
(v for rdn in cert.get("subject", ()) for k, v in rdn if k == "commonName"),
"",
)
sans = [v for typ, v in cert.get("subjectAltName", ()) if typ == "DNS"]
return (True, f"CN={cn} SAN={sans}")
def assert_serving(domain: str, meta: dict) -> None:
"""The single generic "is the app really serving?" assertion (DG1).
The app-vs-Traefik-fallback proof is steps 1+2 (both load-bearing, verified by the Adversary):
1. every service in the stack converged (the app's OWN containers are N/N — an un-routed host
has no app service, so this is False for a non-deployment);
2. a real HTTP(S) response with a status in HEALTH_OK — which EXCLUDES 404, so a Traefik
unmatched-router fallback (404) fails, as does a routed-but-dead backend (502/503);
3. the body (from the SAME request as the status — no race) is not Traefik's default 404 page;
4. an INFRA TLS sanity check (served_cert): the served wildcard cert is trusted+unexpired. This
does NOT distinguish the app from an un-routed host (Traefik serves the wildcard zone-wide,
F1d-1) — it only catches a lapsed/mis-rotated cert.
Steps 12 are BOUNDED POLLS (no bare sleep), so a state-mutating op (upgrade/restore) that leaves
the app briefly reconverging settles, while a persistent failure still fails within the timeout."""
deadline = time.time() + meta["DEPLOY_TIMEOUT"]
while time.time() < deadline and not lifecycle.services_converged(domain):
time.sleep(5)
assert lifecycle.services_converged(domain), f"{domain}: services did not converge"
path = meta["HEALTH_PATH"]
ok = tuple(meta["HEALTH_OK"])
deadline = time.time() + meta["HTTP_TIMEOUT"]
served = False
status, body = 0, ""
while time.time() < deadline:
status, body = lifecycle.http_fetch(domain, path)
if status in ok and not (status == 200 and "404 page not found" in body):
served = True
break
time.sleep(5)
assert served, (
f"{domain}{path}: not serving — last HTTP {status} (Traefik 404 fallback, "
"unhealthy backend, or default-404 body)"
)
# Infra TLS sanity only (F1d-1): catches a lapsed/mis-rotated wildcard cert; does NOT prove the
# app is routed (Traefik serves the wildcard zone-wide). The serving proof is steps 12 above.
verified, detail = served_cert(domain)
assert verified, f"{domain}: served wildcard cert is not trusted/valid — {detail}"
assert "commoninternet.net" in detail.lower(), f"{domain}: served cert unexpected — {detail}"
# ---- Op/assertion split (Phase 1e HC3) -------------------------------------------------------
# The orchestrator performs each mutating op ONCE (the harness owns the op), records what an
# assertion needs (pre-upgrade identity, backup snapshot_id) into a run-scoped JSON state file at
# $CCCI_OP_STATE_FILE, then runs the generic assertion file (unless opted out) AND the overlay
# assertion file against the shared post-op state. The assertion functions below read that state via
# `op_state()`. They NEVER perform the op — that keeps the op single + lets generic+overlay coexist.
import json as _json # noqa: E402
def op_state() -> dict:
"""The run-scoped op state the orchestrator wrote between op and assertions (or {} if unset).
Carries e.g. {"upgrade": {"before": {...}}, "backup": {"snapshot_id": "..."}}."""
path = os.environ.get("CCCI_OP_STATE_FILE")
if not path or not os.path.exists(path):
return {}
try:
with open(path) as f:
return _json.load(f)
except (OSError, ValueError):
return {}
def assert_upgraded(domain: str, meta: dict) -> None:
"""Generic UPGRADE assertion (post-op): the orchestrator already performed the upgrade once.
Assert it reconverged + still serves AND that the deployment actually MOVED — guarding against a
vacuous no-op upgrade silently passing (F1d-2). HC1: prev→PR-head may NOT bump the version label,
so a MOVE is ANY of: version-label change, image change, or a chaos label now present (a chaos
deploy stamps the PR-head commit — THE proof the code under test was deployed)."""
before = op_state().get("upgrade", {}).get("before") or {}
assert_serving(domain, meta)
after = lifecycle.deployed_identity(domain)
moved = (
(before.get("version") and after.get("version") and before["version"] != after["version"])
or (before.get("image") and after.get("image") and before["image"] != after["image"])
or (after.get("chaos") and after.get("chaos") != before.get("chaos"))
)
assert moved, (
f"{domain}: upgrade did not move the deployment "
f"(version {before.get('version')}->{after.get('version')}, "
f"image {before.get('image')}->{after.get('image')}, "
f"chaos {before.get('chaos')}->{after.get('chaos')}) — "
"not a real upgrade to the code under test (HC1/DG2 must be non-vacuous)"
)
_SNAPSHOT_ID_RE = re.compile(r'"snapshot_id"\s*:\s*"([0-9a-f]{8,})"')
def parse_snapshot_id(backup_output: str) -> str | None:
"""The snapshot id from `abra app backup create` output (restic JSON summary line). This IS the
backup artifact identity (DG3) — read from the create output because `abra app backup snapshots`
requires a TTY and is awkward to script."""
m = _SNAPSHOT_ID_RE.search(backup_output)
return m.group(1) if m else None
def assert_backup_artifact(domain: str) -> str:
"""Generic BACKUP assertion (post-op): the orchestrator already ran the backup once. Assert a
snapshot artifact was produced (its id recorded in op state). Returns the id."""
snap_id = op_state().get("backup", {}).get("snapshot_id")
assert snap_id, (
f"{domain}: backup produced no snapshot artifact "
"(no snapshot_id in `abra app backup create` output)"
)
return snap_id
def assert_restore_healthy(domain: str, meta: dict) -> None:
"""Generic RESTORE assertion (post-op): the orchestrator already restored. Assert the app is
healthy + serving again (assert_serving polls, so the post-restore reconverge settles)."""
assert_serving(domain, meta)
# ---- Op primitives (orchestrator-only; perform the op once, never assert) --------------------
def perform_upgrade(domain: str, target: str | None) -> dict[str, str | None]:
"""Perform the UPGRADE op once (in place). E1 baseline: `abra app upgrade` -> target. (HC1/E2
redefines this as a chaos redeploy of the PR-head checkout.) Returns the pre-upgrade identity so
the orchestrator can record it for `assert_upgraded`'s move check."""
before = lifecycle.deployed_identity(domain)
lifecycle.upgrade_app(domain, version=target)
return before
def perform_backup(domain: str) -> str | None:
"""Perform the BACKUP op once. Returns the produced snapshot_id (or None) for the assertion."""
return parse_snapshot_id(lifecycle.backup_app(domain))
def perform_restore(domain: str) -> None:
"""Perform the RESTORE op once (restore the latest snapshot)."""
lifecycle.restore_app(domain)