"""Generic, recipe-agnostic lifecycle assertions + op helpers (Phase 1d, plan §2.1). These are THE default for each lifecycle op: when a recipe ships no `test_.py` overlay, the generic tier (tests/_generic/test_.py) runs these against the single shared deployment the orchestrator brought up. The lifecycle OPERATIONS (upgrade/backup/restore) live here too — owned by the shared harness, not copy-pasted per recipe (DG7 DRY) — so overlays are assertions-only and may reuse these by composition (`from harness import generic; generic.assert_serving(...)`). Design + precedence: machine-docs/DECISIONS.md (Phase 1d). """ from __future__ import annotations import glob import os import re import socket import ssl import time from . import lifecycle # A recipe is backup-capable iff a compose file carries a truthy backupbot.backup label. _BACKUPBOT_RE = re.compile(r"backupbot\.backup\b[^\n]*\btrue\b", re.IGNORECASE) def _recipe_dir(recipe: str) -> str: return os.path.expanduser(f"~/.abra/recipes/{recipe}") def backup_capable(recipe: str, meta: dict | None = None) -> bool: """Whether the harness should run the backup/restore tiers (else they are a clean N/A skip, DG3). `recipe_meta.BACKUP_CAPABLE` (bool) overrides; otherwise auto-detect by scanning the recipe's compose*.yml for a truthy `backupbot.backup` label (the Co-op Cloud backup convention).""" if meta and "BACKUP_CAPABLE" in meta: return bool(meta["BACKUP_CAPABLE"]) for path in glob.glob(os.path.join(_recipe_dir(recipe), "compose*.yml")): try: with open(path) as fh: if _BACKUPBOT_RE.search(fh.read()): return True except OSError: continue return False def served_cert(domain: str, port: int = 443) -> tuple[bool, str]: """CA-verified TLS handshake to `domain` (via the gateway passthrough to cc-ci's Traefik). Returns (verified, detail) with CN+SAN on success, or the failure reason. Scope (per Adversary finding F1d-1): this is an INFRA TLS sanity check — it proves the served wildcard cert is publicly trusted, unexpired, and hostname-valid (so it would fail if the operator's LE wildcard lapsed/was mis-rotated — a real concern, plan §4.0 renewal). It does NOT distinguish a routed app from an un-routed host: Traefik's file provider serves the wildcard for the WHOLE `*.ci.commoninternet.net` zone, so any in-zone subdomain verifies whether or not an app is deployed. The app-vs-Traefik-fallback proof is `services_converged` + a non-404 status in `assert_serving`, not this.""" ctx = ssl.create_default_context() # verifies chain against system CAs + checks hostname try: with ( socket.create_connection((domain, port), timeout=20) as sock, ctx.wrap_socket(sock, server_hostname=domain) as ssock, ): cert = ssock.getpeercert() except ssl.SSLCertVerificationError as e: return (False, f"cert did not verify (Traefik default/self-signed?): {e}") except (OSError, ssl.SSLError) as e: return (False, f"TLS handshake error: {e}") cn = next( (v for rdn in cert.get("subject", ()) for k, v in rdn if k == "commonName"), "", ) sans = [v for typ, v in cert.get("subjectAltName", ()) if typ == "DNS"] return (True, f"CN={cn} SAN={sans}") def assert_serving(domain: str, meta: dict) -> None: """The single generic "is the app really serving?" assertion (DG1). The app-vs-Traefik-fallback proof is steps 1+2 (both load-bearing, verified by the Adversary): 1. every service in the stack converged (the app's OWN containers are N/N — an un-routed host has no app service, so this is False for a non-deployment); 2. a real HTTP(S) response with a status in HEALTH_OK — which EXCLUDES 404, so a Traefik unmatched-router fallback (404) fails, as does a routed-but-dead backend (502/503); 3. the body (from the SAME request as the status — no race) is not Traefik's default 404 page; 4. an INFRA TLS sanity check (served_cert): the served wildcard cert is trusted+unexpired. This does NOT distinguish the app from an un-routed host (Traefik serves the wildcard zone-wide, F1d-1) — it only catches a lapsed/mis-rotated cert. Steps 1–2 are BOUNDED POLLS (no bare sleep), so a state-mutating op (upgrade/restore) that leaves the app briefly reconverging settles, while a persistent failure still fails within the timeout.""" deadline = time.time() + meta["DEPLOY_TIMEOUT"] while time.time() < deadline and not lifecycle.services_converged(domain): time.sleep(5) assert lifecycle.services_converged(domain), f"{domain}: services did not converge" path = meta["HEALTH_PATH"] ok = tuple(meta["HEALTH_OK"]) deadline = time.time() + meta["HTTP_TIMEOUT"] served = False status, body = 0, "" while time.time() < deadline: status, body = lifecycle.http_fetch(domain, path) if status in ok and not (status == 200 and "404 page not found" in body): served = True break time.sleep(5) assert served, ( f"{domain}{path}: not serving — last HTTP {status} (Traefik 404 fallback, " "unhealthy backend, or default-404 body)" ) # Infra TLS sanity only (F1d-1): catches a lapsed/mis-rotated wildcard cert; does NOT prove the # app is routed (Traefik serves the wildcard zone-wide). The serving proof is steps 1–2 above. verified, detail = served_cert(domain) assert verified, f"{domain}: served wildcard cert is not trusted/valid — {detail}" assert "commoninternet.net" in detail.lower(), f"{domain}: served cert unexpected — {detail}" def do_upgrade(domain: str, target: str | None, meta: dict) -> None: """UPGRADE op (in place on the shared deployment): abra app upgrade -> target, then assert it reconverges + still serves AND that the deployment actually MOVED (version label and/or image changed). The move assertion guards against a vacuous no-op upgrade silently passing — the exact F1d-2 failure where a mis-pinned base deployed LATEST so 'upgrade to latest' changed nothing.""" before = lifecycle.deployed_identity(domain) lifecycle.upgrade_app(domain, version=target) assert_serving(domain, meta) after = lifecycle.deployed_identity(domain) moved = (before[0] and after[0] and before[0] != after[0]) or ( before[1] and after[1] and before[1] != after[1] ) assert moved, ( f"{domain}: upgrade did not move the deployment " f"(version {before[0]}->{after[0]}, image {before[1]}->{after[1]}) — " "not a real previous->target upgrade (DG2 must be non-vacuous)" ) _SNAPSHOT_ID_RE = re.compile(r'"snapshot_id"\s*:\s*"([0-9a-f]{8,})"') def parse_snapshot_id(backup_output: str) -> str | None: """The snapshot id from `abra app backup create` output (restic JSON summary line). This IS the backup artifact identity (DG3) — read from the create output because `abra app backup snapshots` requires a TTY and is awkward to script.""" m = _SNAPSHOT_ID_RE.search(backup_output) return m.group(1) if m else None def do_backup(domain: str) -> str: """BACKUP op: create a backup, then assert a snapshot artifact was produced (returns its id).""" out = lifecycle.backup_app(domain) snap_id = parse_snapshot_id(out) assert snap_id, ( f"{domain}: backup produced no snapshot artifact " "(no snapshot_id in `abra app backup create` output)" ) return snap_id def do_restore(domain: str, meta: dict) -> None: """RESTORE op: restore the latest snapshot, then assert the app is healthy + serving again (assert_serving polls, so the post-restore reconverge settles).""" lifecycle.restore_app(domain) assert_serving(domain, meta)