"""App lifecycle for the CI harness: deploy, wait-healthy, teardown, janitor (plan §4.3). The teardown guarantee is sacred: a failed test must never leak an app/volume/secret into the next run. Callers wrap deploy()/teardown() in try/finally (or a pytest finalizer). """ from __future__ import annotations import contextlib import datetime import fcntl import json import os import re import socket import ssl import subprocess import time import urllib.request from . import abra GATEWAY_IP = "143.244.213.108" # *.ci.commoninternet.net -> gateway (TLS passthrough to cc-ci) # A run app domain is "-<6hex>.ci.commoninternet.net" (see DECISIONS.md). Used by the # janitor to recognise orphaned run apps (infra apps like traefik/drone/backups don't match). RUN_APP_RE = re.compile(r"^[a-z0-9]{1,4}-[0-9a-f]{6}\.ci\.commoninternet\.net$") class TeardownError(RuntimeError): pass # --- Concurrent-run safety (capacity=2) ------------------------------------------------------- # Two cooperating mechanisms, both process-lifetime-scoped so SIGKILL can't leak a stale lock: # 1. Per-recipe flock: ~/.abra/recipes/ is ONE shared working tree that fetch_recipe # rm-rf's/reclones and the upgrade tier git-checkouts mid-run. Concurrent runs of the SAME # recipe would corrupt each other's deploy tree (observed: immich builds 229/230 deployed a # tree missing its config), so they serialise on an exclusive flock; different recipes run in # parallel. The kernel drops a flock when the holder dies, however it dies. # 2. Active-run registry: each run registers its app domain + pid before creating the app, so the # janitor can tell a live concurrent run from a crashed run's orphan (see janitor()). RECIPE_LOCK_DIR = "/run/lock" ACTIVE_RUN_DIR = "/run/cc-ci-active" def acquire_recipe_lock(recipe: str): """Take the per-recipe exclusive lock; blocks (with a log line) if another run of the same recipe is in flight. Returns the open lock file — the CALLER must keep a reference for the whole run; the lock is released only when the process exits and the fd closes.""" path = os.path.join(RECIPE_LOCK_DIR, f"cc-ci-recipe-{recipe}.lock") f = open(path, "w") # noqa: SIM115 — deliberately held for the lifetime of the run try: fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError: print( f"== recipe lock: another {recipe} run is in flight — waiting for {path} " "(shared ~/.abra/recipes checkout) ==", flush=True, ) fcntl.flock(f, fcntl.LOCK_EX) print(f"== recipe lock: acquired {path} ==", flush=True) return f def _registry_path(domain: str) -> str: return os.path.join(ACTIVE_RUN_DIR, domain) def register_run_app(domain: str) -> None: """Record this process as the live owner of a run app (called BEFORE the app is created, so a concurrent run's janitor can never observe the app without its registration).""" with contextlib.suppress(OSError): os.makedirs(ACTIVE_RUN_DIR, exist_ok=True) with open(_registry_path(domain), "w") as f: f.write(str(os.getpid())) def unregister_run_app(domain: str) -> None: with contextlib.suppress(OSError): os.remove(_registry_path(domain)) def _run_owner_state(domain: str) -> str: """'alive' if the registered owner is a live run_recipe_ci process, 'dead' if registered but gone (definite orphan), 'unknown' if never registered (pre-registry code or post-reboot).""" try: with open(_registry_path(domain)) as f: pid = int(f.read().strip()) except (OSError, ValueError): return "unknown" try: with open(f"/proc/{pid}/cmdline", "rb") as f: cmdline = f.read().decode(errors="replace").replace("\0", " ") except OSError: return "dead" # Guard against pid reuse: the owner must still look like a harness run. return "alive" if "run_recipe_ci" in cmdline else "dead" def _docker_names(kind: str, stack: str) -> list[str]: """docker ls names filtered to a stack (kind: service|volume|secret).""" proc = subprocess.run( ["docker", kind, "ls", "--filter", f"name={stack}", "--format", "{{.Name}}"], capture_output=True, text=True, ) return [n for n in proc.stdout.split("\n") if n.strip()] def _residual(domain: str) -> dict: stack = _stack_name(domain) return { "services": _docker_names("service", stack), "volumes": _docker_names("volume", stack), "secrets": _docker_names("secret", stack), } def _stack_age_seconds(stack: str) -> float | None: """Age of the stack's oldest service, or None if not present.""" svcs = _docker_names("service", stack) if not svcs: return None oldest = None for s in svcs: p = subprocess.run( ["docker", "service", "inspect", s, "--format", "{{.CreatedAt}}"], capture_output=True, text=True, ) ts = p.stdout.strip() try: # docker emits e.g. 2026-05-27 00:12:33.123 +0000 UTC -> take the leading 19 chars dt = datetime.datetime.strptime(ts[:19], "%Y-%m-%d %H:%M:%S").replace( tzinfo=datetime.UTC ) except ValueError: continue age = (datetime.datetime.now(datetime.UTC) - dt).total_seconds() oldest = age if oldest is None else max(oldest, age) return oldest def _recipe_extra_env(recipe: str, domain: str) -> dict[str, str]: """Per-recipe extra .env keys, applied at every deploy (install + upgrade's old_app) so a recipe with multi-domain / config needs is enrolled with NO shared-harness change (D5/M6.5). A recipe declares `EXTRA_ENV` in tests//recipe_meta.py as either a dict or a callable `EXTRA_ENV(domain) -> dict` (callable form lets it derive values from the per-run domain, e.g. cryptpad's SANDBOX_DOMAIN). Returns {} if none.""" path = os.path.join(os.path.dirname(__file__), "..", "..", "tests", recipe, "recipe_meta.py") if not os.path.exists(path): return {} ns: dict = {} with open(path) as fh: exec(compile(fh.read(), path, "exec"), ns) # noqa: S102 (trusted, in-repo) ee = ns.get("EXTRA_ENV") if callable(ee): ee = ee(domain) return {str(k): str(v) for k, v in (ee or {}).items()} def _recipe_meta_flag(recipe: str, key: str) -> bool: """Read a boolean flag from tests//recipe_meta.py (e.g. CHAOS_BASE_DEPLOY). Returns False if the recipe ships no meta or the flag is absent/falsey. Trusted in-repo exec, same as _recipe_extra_env.""" path = os.path.join(os.path.dirname(__file__), "..", "..", "tests", recipe, "recipe_meta.py") if not os.path.exists(path): return False ns: dict = {} with open(path) as fh: exec(compile(fh.read(), path, "exec"), ns) # noqa: S102 (trusted, in-repo) return bool(ns.get(key)) def _record_deploy() -> None: """Increment the per-run deploy counter (DG4.1: one deploy per run). No-op unless the orchestrator set CCCI_DEPLOY_COUNT_FILE — so it never affects standalone/manual use.""" path = os.environ.get("CCCI_DEPLOY_COUNT_FILE") if not path: return n = 0 with contextlib.suppress(OSError, ValueError), open(path) as f: n = int(f.read().strip() or "0") with contextlib.suppress(OSError), open(path, "w") as f: f.write(str(n + 1)) def _run_install_steps(hook: tuple[str, str], recipe: str, domain: str) -> None: """Run a recipe's custom install-steps hook (install_steps.sh) during the install tier — after `abra app new` + env defaults + secret generate, before deploy (Phase 1d DG5). The hook gets the app .env path + domain so it can insert secrets / set env / seed before the app comes up.""" source, path = hook env_path = os.path.expanduser(f"~/.abra/servers/default/{domain}.env") print(f" install-steps hook ({source}): {path}", flush=True) subprocess.run( ["bash", path], check=True, env=dict( os.environ, CCCI_APP_DOMAIN=domain, CCCI_RECIPE=recipe, CCCI_APP_ENV=env_path, ), ) def prepull_images(recipe: str, domain: str) -> None: """HQ1 (plan-prepull-images.md): pre-pull a recipe's images into the local store BEFORE the deploy. A pull failure (rate-limit / bad tag / slow) then fails FAST as a CLEAR pull error here, instead of surfacing later as a murky 'not converged' deploy timeout (the F2-12-class confusion); and images-already-local lets the deploy converge within abra's native window. Resolves images via `docker compose config --images` using abra's COMPOSE_FILE from the app .env (handles $VERSION interpolation + multi-compose recipes — a naive `grep image:` misses both), then `docker pull` each, SKIP-IF-PRESENT (zero network for already-cached pinned tags). The deploy itself stays UNCHANGED (real `abra app deploy`) — this only warms the local store. Removes PULL time, NOT app-INIT time (slow-init apps like collabora/immich still need their recipe healthcheck/READY_PROBE). Best-effort on resolution failure (skip + let the deploy pull as usual); HARD-fails on a real pull error (don't mask it).""" import os recipe_dir = os.path.expanduser(f"~/.abra/recipes/{recipe}") env_path = os.path.expanduser(f"~/.abra/servers/default/{domain}.env") if not os.path.isdir(recipe_dir) or not os.path.isfile(env_path): print(f" prepull: recipe dir or .env missing for {recipe} — skipping", flush=True) return # COMPOSE_FILE is a shell-style ':'-separated list (may self-reference $COMPOSE_FILE for # multi-compose); evaluate it the way abra does, then pass each file to docker compose. The # --env-file supplies $VERSION-style interpolation so pinned tags resolve correctly. cf = subprocess.run( ["bash", "-c", f'set -a; . "{env_path}"; printf "%s" "${{COMPOSE_FILE:-compose.yml}}"'], capture_output=True, text=True, ).stdout.strip() files = [f for f in cf.split(":") if f] or ["compose.yml"] args = ["docker", "compose", "--env-file", env_path] for f in files: args += ["-f", f] args += ["config", "--images"] proc = subprocess.run(args, cwd=recipe_dir, capture_output=True, text=True) # `config --images` prints one image ref per line to stdout (warnings go to stderr). images = sorted({ln.strip() for ln in proc.stdout.splitlines() if ln.strip()}) if not images: print( f" prepull: no images resolved for {recipe} (config --images rc={proc.returncode}) — " f"skipping (deploy will pull as usual). stderr: {proc.stderr.strip()[-160:]}", flush=True, ) return for img in images: if subprocess.run(["docker", "image", "inspect", img], capture_output=True).returncode == 0: print(f" prepull: present {img}", flush=True) continue print(f" prepull: pulling {img} …", flush=True) r = subprocess.run(["docker", "pull", img], capture_output=True, text=True) if r.returncode != 0: raise RuntimeError( f"prepull: `docker pull {img}` failed (rc={r.returncode}) — clear pull error BEFORE " f"deploy: {r.stderr.strip()[-300:] or r.stdout.strip()[-300:]}" ) print(f" prepull: {len(images)} image(s) present/pulled for {recipe}", flush=True) def deploy_app( recipe: str, domain: str, version: str | None = None, secrets: bool = True, install_steps_hook: tuple[str, str] | None = None, deploy_timeout: int = 900, ) -> None: """Create + configure + deploy an app. Forces LETS_ENCRYPT_ENV='' so traefik serves the wildcard cert via the file provider and NEVER attempts ACME (adversary finding A1). Applies any per-recipe EXTRA_ENV (recipe_meta.py) and the custom install-steps hook (Phase 1d) before deploy. `deploy_timeout` is the subprocess timeout for `abra app deploy`. Caller (orchestrator) passes `recipe_meta.DEPLOY_TIMEOUT` so heavy recipes (ghost, matrix-synapse, lasuite-meet) can extend past the 900s default. abra's INTERNAL TIMEOUT (recipe's TIMEOUT env, default 300s) is set via EXTRA_ENV; this is the Python subprocess wrapper's timeout so abra doesn't get SIGKILLed mid-deploy.""" _record_deploy() # Register BEFORE the app exists: a concurrent run's janitor must never see this app without # its live-owner registration (it would reap an in-flight deploy). register_run_app(domain) abra.app_config_remove(domain) # clear any stale .env from a prior crashed run abra.app_new(recipe, domain, version=version, secrets=secrets) # A pinned version must actually deploy that version: check the recipe out to the tag so the # on-disk compose/.env match, and deploy NON-chaos below (chaos ignores the pin → deployed LATEST, # Adversary F1d-2). Chaos is correct ONLY for the version=None case (deploy the current PR-head # checkout). Order matters: checkout before secret_generate (-C) so secrets match the pinned tree. chaos = version is None if version: abra.recipe_checkout(recipe, version) # A pinned (non-chaos) deploy runs `abra recipe lint`, which FATAs R014 ('only annotated # tags') if the upstream recipe ships a stray lightweight version tag (e.g. lasuite-meet's # 0.3.0+v1.16.0). In that case deploy the EXPLICITLY-checked-out pinned version with chaos: # chaos skips lint and deploys the current checkout (we just checked out `version`), so it # still deploys the intended pinned version — not LATEST (the F1d-2 hazard was a *missing* # checkout, which recipe_checkout above fixes). No-op for all-annotated recipes (stays pinned). if abra.has_lightweight_version_tags(recipe): print( f" deploy_app({recipe}@{version}): lightweight upstream tag present → chaos base " "deploy of the checked-out pinned version (skips R014 lint; not LATEST)", flush=True, ) chaos = True # A recipe may force a chaos base deploy via recipe_meta CHAOS_BASE_DEPLOY=True when an # install_steps hook adds an untracked compose overlay to the recipe checkout (e.g. discourse's # compose.ccci.yml, provided by install_steps for the pinned base). The untracked file makes # abra's pinned-deploy clean-tree check FATA ('has locally unstaged changes'); chaos skips lint + # the clean-tree gate and deploys the EXPLICITLY-checked-out pinned version (we already ran # recipe_checkout(version) above) — NOT latest. Same mechanism as the lightweight-tag branch. elif _recipe_meta_flag(recipe, "CHAOS_BASE_DEPLOY"): print( f" deploy_app({recipe}@{version}): CHAOS_BASE_DEPLOY set → chaos base deploy of the " "checked-out pinned version (skips clean-tree/lint; deploys version, not LATEST)", flush=True, ) chaos = True # Pin DOMAIN to the run domain explicitly. `abra app new -D` fills it for recipes whose # .env.sample uses a literal placeholder, but NOT for ones using a `{{ .Domain }}` Go-template # (this abra version leaves it unexpanded → deploy fails "can't evaluate field Domain"). Setting # it ourselves is recipe-agnostic and canonical (the run domain IS the app's domain). abra.env_set(domain, "DOMAIN", domain) abra.env_set(domain, "LETS_ENCRYPT_ENV", "") for k, v in _recipe_extra_env(recipe, domain).items(): abra.env_set(domain, k, v) if secrets: abra.secret_generate(domain) if install_steps_hook: _run_install_steps(install_steps_hook, recipe, domain) # HQ1: warm the local image store before the (real, unchanged) abra deploy. prepull_images(recipe, domain) abra.deploy(domain, chaos=chaos, timeout=deploy_timeout) def _stack_name(domain: str) -> str: # abra derives the swarm stack name from the domain by replacing dots with underscores # and KEEPING hyphens (e.g. custom-html-x.ci.commoninternet.net -> custom-html-x_ci_...). return domain.replace(".", "_") def services_converged(domain: str) -> bool: """True when every service in the stack reports replicas N/N (N>0) AND no service is mid-rolling-update (swarm UpdateStatus settled).""" stack = _stack_name(domain) proc = subprocess.run( ["docker", "stack", "services", stack, "--format", "{{.Name}} {{.Replicas}}"], capture_output=True, text=True, ) rows = [r for r in proc.stdout.split("\n") if r.strip()] if not rows: return False names = [] for r in rows: name, _, replicas = r.partition(" ") names.append(name) cur, _, want = replicas.partition("/") # A service at its DESIRED replica count is converged — including a `replicas: 0` # on-demand one-shot (e.g. lasuite-drive's `minio-createbuckets`, which is scaled up # manually only when buckets need (re)creating), which reports "0/0". The earlier # `want == "0"` rejection wrongly treated those as never-converged, hanging the deploy # forever. `cur == want` (with `want` present) is the correct convergence test; a service # still spinning up shows e.g. "0/1" (cur != want) and is correctly not-yet-converged. if not want or cur != want: return False # N/N alone is NOT convergence during a stop-first rolling update: a chaos redeploy that changes # a non-app service image (e.g. immich's db pin) registers the update immediately, but swarm may # not have cycled that service's task yet — the OLD task still shows 1/1, then dies seconds later # (immich CI 238: backupbot exec'd the db pre-hook into the just-killed container → 409). Require # every service's UpdateStatus to be settled too, so the wait spans the whole rolling update. proc = subprocess.run( [ "docker", "service", "inspect", *names, "--format", "{{if .UpdateStatus}}{{.UpdateStatus.State}}{{end}}", ], capture_output=True, text=True, ) if proc.returncode != 0: return False # a service vanished mid-check — not settled for state in proc.stdout.split("\n"): # Only ACTIVE states block convergence. 'paused'/'rollback_paused' are terminal-without- # intervention: swarm's default update-failure-action pauses the update on one task flicker # and the flag then persists FOREVER (immich CI 241: app service 'paused' from a restart # during restore, service back at 1/1 and healthy — the wait hung to its deadline). With # N/N already required above, a paused update is settled for our purposes; the HTTP-health # and tier assertions still gate whether the app actually works. if state.strip() in ("updating", "rollback_started"): return False return True def http_get(domain: str, path: str = "/", timeout: int = 15) -> int: """HTTPS GET the app by its real hostname. On cc-ci the *.ci.commoninternet.net wildcard resolves (public DNS) to the gateway, which SNI-passthroughs to cc-ci's traefik — so using the real URL keeps SNI correct (connecting to the bare IP would drop SNI and fail to route).""" ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE req = urllib.request.Request(f"https://{domain}{path}", method="GET") try: with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp: return resp.status except urllib.error.HTTPError as e: return e.code except Exception: return 0 def http_fetch(domain: str, path: str = "/", timeout: int = 15) -> tuple[int, str]: """One HTTPS GET → (status, body) in a SINGLE request, never raising. Lets a caller check the status and body together with no race between two requests (assert_serving) — and captures the error body on a 4xx/5xx instead of throwing.""" ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE req = urllib.request.Request(f"https://{domain}{path}", method="GET") try: with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp: return resp.status, resp.read().decode(errors="replace") except urllib.error.HTTPError as e: try: body = e.read().decode(errors="replace") except Exception: # noqa: BLE001 body = "" return e.code, body except Exception: # noqa: BLE001 return 0, "" def wait_healthy( domain: str, ok_codes=(200, 301, 302), path: str = "/", deploy_timeout: int = 600, http_timeout: int = 300, ) -> None: """Wait for stack services converged, then for the app to answer ok over HTTPS at `path`. `path` is per-recipe (recipe_meta.HEALTH_PATH), e.g. keycloak uses /realms/master.""" deadline = time.time() + deploy_timeout while time.time() < deadline: if services_converged(domain): break time.sleep(5) else: raise TimeoutError(f"{domain}: services did not converge in {deploy_timeout}s") deadline = time.time() + http_timeout last = 0 while time.time() < deadline: last = http_get(domain, path) if last in ok_codes: return time.sleep(5) raise TimeoutError(f"{domain}: not healthy over HTTPS {path} (last status {last})") def deployed_identity(domain: str, service: str = "app") -> dict[str, str | None]: """Identity of the running app service: {"version", "image", "chaos"}. Used to prove an upgrade actually MOVED the deployment (not a vacuous no-op — Adversary F1d-2), AND (Phase 1e HC1) that an `abra app deploy --chaos` upgrade actually deployed the PR-head code under test. - `version` = the `coop-cloud..version` label (bumped per published recipe version). - `image` = the running container image (usually bumps with a published version). - `chaos` = the chaos deploy's recipe git commit. abra stamps `coop-cloud..chaos-version` = the deployed recipe commit (e.g. "91b27ceb") + `coop-cloud..chaos`="true" on a `--chaos` deploy; both are absent on a clean pinned-tag deploy. We prefer the `.chaos-version` commit — for prev→PR-head it IS the proof the PR-head code under test was deployed even when the version label is unbumped (HC1); fall back to the `.chaos` flag if no commit is present.""" name = f"{_stack_name(domain)}_{service}" proc = subprocess.run( [ "docker", "service", "inspect", name, "--format", "{{json .Spec.Labels}}|{{.Spec.TaskTemplate.ContainerSpec.Image}}", ], capture_output=True, text=True, ) out = proc.stdout.strip() if "|" not in out: return {"version": None, "image": None, "chaos": None} labels_json, _, image = out.partition("|") ver = chaos = chaos_flag = None with contextlib.suppress(ValueError, json.JSONDecodeError): for k, v in json.loads(labels_json).items(): if not k.startswith("coop-cloud."): continue if k.endswith(".version"): ver = v elif k.endswith(".chaos-version"): chaos = v # the deployed recipe commit — the strongest signal elif k.endswith(".chaos"): chaos_flag = v return {"version": ver, "image": image.strip() or None, "chaos": chaos or chaos_flag} def upgrade_app(domain: str, version: str | None = None) -> None: abra.upgrade(domain, version=version) def recipe_head_commit(recipe: str) -> str | None: """The recipe checkout's current HEAD commit (captured right after fetch, before any version-tag checkout) so the upgrade tier can re-checkout the PR head for the chaos redeploy (HC1).""" return abra.recipe_head_commit(recipe) def recipe_checkout_ref(recipe: str, ref: str) -> None: """git-checkout the recipe to an arbitrary ref/commit (HC1: restore the PR-head checkout before the chaos upgrade — the prev-tag base deploy reset it to the published tag).""" abra.recipe_checkout(recipe, ref) def chaos_redeploy( domain: str, deploy_timeout: int = 900, no_converge_checks: bool = False ) -> None: """In-place `abra app deploy --chaos`: redeploy the running app at the CURRENT recipe checkout (HC1: the PR-head code under test). This is the upgrade op, not a fresh install — it does NOT go through deploy_app, so the deploy-count guard (DG4.1) is not incremented. `deploy_timeout` is the abra subprocess wrapper timeout; pass the recipe's DEPLOY_TIMEOUT so a heavy stack's reconverge (e.g. lasuite-drive's slow collabora/onlyoffice boot) isn't SIGKILLed by the 900s default while abra is still legitimately waiting (its internal TIMEOUT can be larger via the .env). Mirrors the install deploy_app timeout plumbing. `no_converge_checks` (`abra … -c`): skip abra's own convergence monitor — the caller then owns a stricter convergence+health wait (F2-12: abra FATAs on the heavy lasuite-drive prev→PR-head crossover while the new collabora's healthcheck is still in its start_period, even though it converges given swarm's healthcheck retries). The stack spec IS applied either way (docker stack deploy runs before the monitor).""" abra.deploy(domain, chaos=True, timeout=deploy_timeout, no_converge_checks=no_converge_checks) def wait_ready_probes(meta: dict, domain: str, timeout: int = 600) -> None: """Poll a recipe's optional READY_PROBE endpoints until each returns an accepted status, or raise. A recipe_meta may define `READY_PROBE(domain) -> [{"host":..., "path":..., "ok":(200,)}, ...]` for readiness signals NOT captured by container-replica convergence or the app's HEALTH_PATH — e.g. lasuite-drive's collabora WOPI discovery (`/hosting/discovery` on the collabora sibling host): swarm reports collabora 1/1 'running' while coolwsd is still doing jail/config init and its discovery endpoint 404s, so replica-convergence alone is not real readiness. Used after the install deploy and after the upgrade chaos redeploy so 'reconverged' means genuinely ready. A probe may instead be a TCP-listen check: `{"tcp_host":..., "tcp_port": int, "stable": N}` — poll until a socket connect succeeds N consecutive times (default 2). This is for NON-HTTP services whose HEALTH_PATH doesn't reflect them, e.g. mumble's voice server on 64738: the app's HTTP readiness comes from the mumble-web sidecar, so after a chaos upgrade redeploy (host-mode 64738 must be released by the old task + rebound by the new) the voice server can be down while HTTP-200 still passes — and backup-bot then execs into a not-running app container (409). Requiring the voice port to be stably listening before proceeding closes that window.""" probe_fn = meta.get("READY_PROBE") if not callable(probe_fn): return probes = probe_fn(domain) or [] for probe in probes: if "tcp_port" in probe: host = probe.get("tcp_host", "127.0.0.1") port = int(probe["tcp_port"]) needed = int(probe.get("stable", 2)) deadline = time.time() + timeout consec = 0 last_err = None while time.time() < deadline: try: with socket.create_connection((host, port), timeout=10): consec += 1 if consec >= needed: print(f" ready-probe OK (tcp {needed}x): {host}:{port}", flush=True) break except OSError as e: consec = 0 last_err = e time.sleep(3) else: raise TimeoutError( f"READY_PROBE tcp {host}:{port} not stably listening ({needed}x) within " f"{timeout}s — last error: {last_err}" ) continue host = probe["host"] path = probe.get("path", "/") ok = tuple(probe.get("ok", (200,))) deadline = time.time() + timeout last = 0 while time.time() < deadline: last = http_get(host, path, timeout=15) if last in ok: print(f" ready-probe OK ({last}): https://{host}{path}", flush=True) break time.sleep(5) else: raise TimeoutError( f"READY_PROBE not ready: https://{host}{path} (last status {last}) within {timeout}s" ) def backup_app(domain: str) -> str: """Create a backup; return the abra/restic output (carries the produced snapshot_id).""" # Never back up a stack that is still converging/rolling-updating: backupbot resolves each # service's hook container ONCE up front, so a task that cycles between that lookup and the # pre-hook exec crashes the whole backup with a 409 (immich CI 238). Bounded wait — on timeout # we still attempt the backup and let the tier's assertion deliver the verdict. deadline = time.time() + 300 while time.time() < deadline and not services_converged(domain): print( f" backup: {domain} stack not settled yet — waiting before backup create", flush=True ) time.sleep(5) return abra.backup_create(domain) def restore_app(domain: str) -> None: abra.restore(domain) def previous_version(recipe: str) -> str | None: """The second-newest published version (to deploy before upgrading to latest).""" vers = abra.recipe_versions(recipe) return vers[-2] if len(vers) >= 2 else None def _app_container(domain: str, service: str = "app", timeout: int = 60) -> str: """The running container id for _, with a BOUNDED POLL for it to (re)appear. A lifecycle op can briefly leave no running task — notably `abra app backup create`, where backup-bot-two stops/cycles the app container, so a mutate exec right after backup hit an empty `docker ps` and raised. Poll (no bare sleep) until the container is back or timeout.""" name = f"{_stack_name(domain)}_{service}" deadline = time.time() + timeout while True: proc = subprocess.run( ["docker", "ps", "--filter", f"name={name}", "--format", "{{.ID}}"], capture_output=True, text=True, ) cid = proc.stdout.strip().split("\n")[0] if cid: return cid if time.time() >= deadline: raise RuntimeError(f"no running container for {name} after {timeout}s") time.sleep(3) def exec_in_app(domain: str, cmd: list[str], service: str = "app", timeout: int = 90) -> str: """Run `docker exec` in the app's container and return stdout. Hardened (Adversary F1e-1): a lifecycle op (backup/restore) cycles the container, so a freshly-resolved container can be mid-transition and `docker exec` FAILS — poll (re-resolving the container each try) until the exec succeeds (returncode 0) or timeout, then RAISE. Never silently return '' on a failed exec: that masked a container-cycle race as empty data, flipping a healthy recipe RED under opt-out (no accidental generic-pytest timing buffer) — and could mask a real failure as a pass elsewhere.""" deadline = time.time() + timeout last = "" while True: cid = _app_container(domain, service) proc = subprocess.run(["docker", "exec", cid, *cmd], capture_output=True, text=True) if proc.returncode == 0: return proc.stdout last = (proc.stderr or proc.stdout).strip() if time.time() >= deadline: raise RuntimeError( f"docker exec in {domain}/{service} failed (rc={proc.returncode}) after {timeout}s: {last}" ) time.sleep(3) def http_body(domain: str, path: str = "/", timeout: int = 15) -> str: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE req = urllib.request.Request(f"https://{domain}{path}", method="GET") with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp: return resp.read().decode(errors="replace") def _force_stack_rm(stack: str, timeout: int = 120) -> None: """Remove a stack's services directly (no .env needed) and wait for them to disappear.""" subprocess.run(["docker", "stack", "rm", stack], capture_output=True, text=True) deadline = time.time() + timeout while time.time() < deadline and _docker_names("service", stack): time.sleep(2) def teardown_app(domain: str, verify: bool = True) -> None: """Full teardown with a docker fallback, then VERIFY nothing is left (raise otherwise). Order matters (A3): undeploy, then remove volumes/secrets *while the .env still exists* (abra needs it), then drop the .env LAST — and only after the stack is confirmed gone. If abra undeploy fails, fall back to `docker stack rm` (which needs no .env).""" stack = _stack_name(domain) abra.undeploy(domain) if _docker_names("service", stack): _force_stack_rm(stack) # fallback: abra undeploy didn't clear it abra.volume_remove(domain) # needs the .env -> before removing it abra.secret_remove_all(domain) # belt-and-suspenders: drop any volumes/secrets abra missed, by stack name. A volume can be # briefly held by a just-stopped task after `stack rm`, so retry the volume removal. deadline = time.time() + 60 while time.time() < deadline: vols = _docker_names("volume", stack) if not vols: break for v in vols: subprocess.run(["docker", "volume", "rm", v], capture_output=True, text=True) if not _docker_names("volume", stack): break time.sleep(3) for s in _docker_names("secret", stack): subprocess.run(["docker", "secret", "rm", s], capture_output=True, text=True) abra.app_config_remove(domain) # only now (stack gone) drop the .env if verify: residual = _residual(domain) if any(residual.values()): raise TeardownError(f"teardown left residual for {domain}: {residual}") # The app is gone — drop its active-run registration (janitor() also clears it when reaping). unregister_run_app(domain) def janitor(max_age_seconds: int | None = None) -> None: """Reap orphaned run apps from crashed/rebooted runs. Matches the real naming scheme. Safe under CONCURRENT runs (capacity=2): every harness run registers its app in the active-run registry (register_run_app), so the janitor distinguishes the three cases instead of using age alone: - registered + owner run_recipe_ci process ALIVE -> in-flight concurrent run: never reap - registered + owner DEAD (crashed/SIGKILLed run) -> definite orphan: reap immediately - no registry entry (pre-registry code, reboot) -> fall back to the age threshold Reaps via docker primitives so it works even when the .env is gone (A2/A3). Age fallback default 2h, env-overridable via CCCI_JANITOR_MAX_AGE.""" import os if max_age_seconds is None: max_age_seconds = int(os.environ.get("CCCI_JANITOR_MAX_AGE", "7200")) seen = set() for app in abra.app_ls(): name = app.get("appName") or app.get("domain") or "" if RUN_APP_RE.match(name): seen.add(name) # also catch stacks whose .env was already deleted (abra ls won't list them) for svc in _docker_names("service", ""): # svc like cust-c95a69_ci_commoninternet_net_app -> reconstruct domain m = re.match(r"^([a-z0-9]{1,4}-[0-9a-f]{6})_ci_commoninternet_net_", svc) if m: seen.add(f"{m.group(1)}.ci.commoninternet.net") for name in seen: owner = _run_owner_state(name) if owner == "alive": print(f" janitor: {name} is a live concurrent run — leaving it", flush=True) continue if owner == "unknown": # No registry entry (manual run on pre-registry code, or post-reboot): only the age # threshold protects it, as before. stack = _stack_name(name) age = _stack_age_seconds(stack) if age is not None and age < max_age_seconds: continue # young and of unknown provenance; leave it # owner == "dead" (a crashed/killed run's definite orphan) or old enough -> reap with contextlib.suppress(Exception): teardown_app(name, verify=False) unregister_run_app(name)