Files
cc-ci/runner/harness/lifecycle.py
autonomic-bot 17ebdf39ac
All checks were successful
continuous-integration/drone/push Build is passing
feat(harness): P3 per-run ABRA_DIR — structural recipe-tree isolation, recipe flock deleted
- run_recipe_ci.setup_run_abra_dir(): builds <runs_dir>/<run-id>/abra with servers/ and
  catalogue/ symlinked to the canonical ~/.abra (app .env files keep landing in the shared
  canonical path, so janitor discovery and env-based teardown are unchanged; per-domain
  filenames + the P2 app-domain lock prevent write conflicts) and a FRESH empty recipes/ —
  each run clones + checkouts its own recipe trees. Exported as $ABRA_DIR (honored by the
  abra CLI, verified on-host) before ANY abra call. Manual runs get manual-<pid> isolation.
- fetch_recipe(): plain clone into $ABRA_DIR/recipes/<recipe> — no shared-tree rm-rf, no lock.
  CCCI_SKIP_FETCH=1 now copies the canonically-staged clone into the per-run tree (same staging
  workflow, run reads staged state).
- abra.abra_dir()/recipe_dir(): single resolution rule ($ABRA_DIR else ~/.abra), used by
  recipe_checkout, has_lightweight_version_tags, recipe_head_commit, recipe_versions,
  generic._recipe_dir, lifecycle.prepull_images, snapshot_recipe_tests, and
  warm_reconcile._recipe_dir (which keeps the canonical default for its own systemd runs but
  follows the per-run tree when imported by promote_canonical inside a run).
- deleted: lifecycle.acquire_recipe_lock, RECIPE_LOCK_DIR, the main() call site and the
  must-lock-before-fetch ordering rule.
- tests/{ghost,discourse}/install_steps.sh: RECIPE_DIR resolves ${ABRA_DIR:-$HOME/.abra} so the
  compose.ccci.yml overlay lands in the tree the run actually deploys from (mechanical path fix
  required by per-run trees; no assertion/gate touched — see DECISIONS.md).
- .drone.yml comments updated (HOME=/root rationale now via the servers symlink).
2026-06-10 04:18:33 +00:00

803 lines
39 KiB
Python

"""App lifecycle for the CI harness: deploy, wait-healthy, teardown, janitor (plan §4.3).
The teardown guarantee is sacred: a failed test must never leak an app/volume/secret into the
next run. Callers wrap deploy()/teardown() in try/finally (or a pytest finalizer).
"""
from __future__ import annotations
import contextlib
import fcntl
import glob
import json
import os
import re
import socket
import ssl
import subprocess
import time
import urllib.request
from . import abra, lifetime
GATEWAY_IP = "143.244.213.108" # *.ci.commoninternet.net -> gateway (TLS passthrough to cc-ci)
# A run app domain is "<recipe[:4]>-<6hex>.ci.commoninternet.net" (see DECISIONS.md). Used by the
# janitor to recognise orphaned run apps (infra apps like traefik/drone/backups don't match).
RUN_APP_RE = re.compile(r"^[a-z0-9]{1,4}-[0-9a-f]{6}\.ci\.commoninternet\.net$")
class TeardownError(RuntimeError):
pass
# --- Concurrent-run safety (capacity=2) -------------------------------------------------------
# ONE mechanism, process-lifetime-scoped so SIGKILL can't leak a stale claim: every run holds an
# exclusive kernel flock on its app DOMAIN (/run/lock/cc-ci-app-<domain>.lock) for the whole run.
# A held lock implies a live owner — the kernel releases a flock when the holding process dies,
# however it dies. The janitor probes the lock (LOCK_NB) to tell a live concurrent run (held →
# leave it) from a crashed run's orphan (acquirable → reap it); it never inspects pids and never
# steals a held lock. Recipe-tree corruption between same-recipe runs is gone structurally (each
# run deploys from its own per-run ABRA_DIR — there is no shared recipe tree and no recipe lock),
# and same-domain runs (double-!testme of one PR) serialise on this app lock.
# See docs/concurrency.md.
# Acquired app-lock file objects are retained here for the REMAINING PROCESS LIFETIME: if the
# caller drops the returned file object, GC would close the fd and silently release the lock —
# this list is the lock's owner of record. Never cleared; release is process exit.
_held_app_locks: list = []
def _app_lock_dir() -> str:
"""The app-domain lockfile dir. /run/lock (tmpfs: a reboot clears locks AND lockfiles, so
post-reboot apps probe as orphans and are reaped immediately). Env-overridable so the
tests/concurrency suite (and its helper subprocesses) can use a sandbox dir."""
return os.environ.get("CCCI_APP_LOCK_DIR", "/run/lock")
def _app_lock_path(domain: str) -> str:
return os.path.join(_app_lock_dir(), f"cc-ci-app-{domain}.lock")
def acquire_app_lock(domain: str):
"""Take the per-app-domain exclusive lock; blocks (with a log line) if another run of the
same domain is in flight (double-!testme serialisation). Returns the open lock file, which is
ALSO retained in _held_app_locks so the flock lives exactly as long as the process.
Unlink/recreate race guard: the janitor unlinks a reaped orphan's lockfile while holding its
flock, so a waiter blocked on the OLD inode can win a lock no later opener can observe (a new
open() at the path creates a FRESH inode). After every acquisition, verify the locked fd is
still the file at the path (st_ino match); if not, drop it and retry on the live path."""
path = _app_lock_path(domain)
waited = False
while True:
# PEP 446: the fd is non-inheritable, so subprocess children never carry the lock.
f = open(path, "a") # noqa: SIM115 — deliberately held for the rest of the process
try:
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
if not waited:
print(f"== app lock: another run of {domain} is in flight — waiting ==", flush=True)
waited = True
fcntl.flock(f, fcntl.LOCK_EX)
try:
if os.fstat(f.fileno()).st_ino == os.stat(path).st_ino:
break # we hold the lock on the inode the path names — done
except FileNotFoundError:
pass
f.close() # locked a stale (unlinked) inode — retry on the live path
os.utime(f.fileno()) # mtime = acquisition time = lock age (janitor's long-held flag)
_held_app_locks.append(f)
if waited:
print(f"== app lock: acquired {path} ==", flush=True)
return f
def _docker_names(kind: str, stack: str) -> list[str]:
"""docker <kind> ls names filtered to a stack (kind: service|volume|secret)."""
proc = subprocess.run(
["docker", kind, "ls", "--filter", f"name={stack}", "--format", "{{.Name}}"],
capture_output=True,
text=True,
)
return [n for n in proc.stdout.split("\n") if n.strip()]
def _residual(domain: str) -> dict:
stack = _stack_name(domain)
return {
"services": _docker_names("service", stack),
"volumes": _docker_names("volume", stack),
"secrets": _docker_names("secret", stack),
}
def _recipe_extra_env(recipe: str, domain: str) -> dict[str, str]:
"""Per-recipe extra .env keys, applied at every deploy (install + upgrade's old_app) so a recipe
with multi-domain / config needs is enrolled with NO shared-harness change (D5/M6.5). A recipe
declares `EXTRA_ENV` in tests/<recipe>/recipe_meta.py as either a dict or a callable
`EXTRA_ENV(domain) -> dict` (callable form lets it derive values from the per-run domain, e.g.
cryptpad's SANDBOX_DOMAIN). Returns {} if none."""
path = os.path.join(os.path.dirname(__file__), "..", "..", "tests", recipe, "recipe_meta.py")
if not os.path.exists(path):
return {}
ns: dict = {}
with open(path) as fh:
exec(compile(fh.read(), path, "exec"), ns) # noqa: S102 (trusted, in-repo)
ee = ns.get("EXTRA_ENV")
if callable(ee):
ee = ee(domain)
return {str(k): str(v) for k, v in (ee or {}).items()}
def _recipe_meta_flag(recipe: str, key: str) -> bool:
"""Read a boolean flag from tests/<recipe>/recipe_meta.py (e.g. CHAOS_BASE_DEPLOY). Returns
False if the recipe ships no meta or the flag is absent/falsey. Trusted in-repo exec, same as
_recipe_extra_env."""
path = os.path.join(os.path.dirname(__file__), "..", "..", "tests", recipe, "recipe_meta.py")
if not os.path.exists(path):
return False
ns: dict = {}
with open(path) as fh:
exec(compile(fh.read(), path, "exec"), ns) # noqa: S102 (trusted, in-repo)
return bool(ns.get(key))
def _record_deploy() -> None:
"""Increment the per-run deploy counter (DG4.1: one deploy per run). No-op unless the
orchestrator set CCCI_DEPLOY_COUNT_FILE — so it never affects standalone/manual use."""
path = os.environ.get("CCCI_DEPLOY_COUNT_FILE")
if not path:
return
n = 0
with contextlib.suppress(OSError, ValueError), open(path) as f:
n = int(f.read().strip() or "0")
with contextlib.suppress(OSError), open(path, "w") as f:
f.write(str(n + 1))
def _run_install_steps(hook: tuple[str, str], recipe: str, domain: str) -> None:
"""Run a recipe's custom install-steps hook (install_steps.sh) during the install tier — after
`abra app new` + env defaults + secret generate, before deploy (Phase 1d DG5). The hook gets the
app .env path + domain so it can insert secrets / set env / seed before the app comes up."""
source, path = hook
env_path = os.path.expanduser(f"~/.abra/servers/default/{domain}.env")
print(f" install-steps hook ({source}): {path}", flush=True)
subprocess.run(
["bash", path],
check=True,
env=dict(
os.environ,
CCCI_APP_DOMAIN=domain,
CCCI_RECIPE=recipe,
CCCI_APP_ENV=env_path,
),
)
def prepull_images(recipe: str, domain: str) -> None:
"""HQ1 (plan-prepull-images.md): pre-pull a recipe's images into the local store BEFORE the deploy.
A pull failure (rate-limit / bad tag / slow) then fails FAST as a CLEAR pull error here, instead
of surfacing later as a murky 'not converged' deploy timeout (the F2-12-class confusion); and
images-already-local lets the deploy converge within abra's native window. Resolves images via
`docker compose config --images` using abra's COMPOSE_FILE from the app .env (handles $VERSION
interpolation + multi-compose recipes — a naive `grep image:` misses both), then `docker pull`
each, SKIP-IF-PRESENT (zero network for already-cached pinned tags). The deploy itself stays
UNCHANGED (real `abra app deploy`) — this only warms the local store. Removes PULL time, NOT
app-INIT time (slow-init apps like collabora/immich still need their recipe healthcheck/READY_PROBE).
Best-effort on resolution failure (skip + let the deploy pull as usual); HARD-fails on a real
pull error (don't mask it)."""
recipe_dir = abra.recipe_dir(recipe) # per-run tree inside a CI run
# The app .env lives in the CANONICAL servers path (the per-run ABRA_DIR's servers/ is a
# symlink to it, so abra and this path agree on the same file).
env_path = os.path.expanduser(f"~/.abra/servers/default/{domain}.env")
if not os.path.isdir(recipe_dir) or not os.path.isfile(env_path):
print(f" prepull: recipe dir or .env missing for {recipe} — skipping", flush=True)
return
# COMPOSE_FILE is a shell-style ':'-separated list (may self-reference $COMPOSE_FILE for
# multi-compose); evaluate it the way abra does, then pass each file to docker compose. The
# --env-file supplies $VERSION-style interpolation so pinned tags resolve correctly.
cf = subprocess.run(
["bash", "-c", f'set -a; . "{env_path}"; printf "%s" "${{COMPOSE_FILE:-compose.yml}}"'],
capture_output=True,
text=True,
).stdout.strip()
files = [f for f in cf.split(":") if f] or ["compose.yml"]
args = ["docker", "compose", "--env-file", env_path]
for f in files:
args += ["-f", f]
args += ["config", "--images"]
proc = subprocess.run(args, cwd=recipe_dir, capture_output=True, text=True)
# `config --images` prints one image ref per line to stdout (warnings go to stderr).
images = sorted({ln.strip() for ln in proc.stdout.splitlines() if ln.strip()})
if not images:
print(
f" prepull: no images resolved for {recipe} (config --images rc={proc.returncode}) — "
f"skipping (deploy will pull as usual). stderr: {proc.stderr.strip()[-160:]}",
flush=True,
)
return
for img in images:
if subprocess.run(["docker", "image", "inspect", img], capture_output=True).returncode == 0:
print(f" prepull: present {img}", flush=True)
continue
print(f" prepull: pulling {img}", flush=True)
r = subprocess.run(["docker", "pull", img], capture_output=True, text=True)
if r.returncode != 0:
raise RuntimeError(
f"prepull: `docker pull {img}` failed (rc={r.returncode}) — clear pull error BEFORE "
f"deploy: {r.stderr.strip()[-300:] or r.stdout.strip()[-300:]}"
)
print(f" prepull: {len(images)} image(s) present/pulled for {recipe}", flush=True)
def deploy_app(
recipe: str,
domain: str,
version: str | None = None,
secrets: bool = True,
install_steps_hook: tuple[str, str] | None = None,
deploy_timeout: int = 900,
) -> None:
"""Create + configure + deploy an app. Forces LETS_ENCRYPT_ENV='' so traefik serves the
wildcard cert via the file provider and NEVER attempts ACME (adversary finding A1). Applies any
per-recipe EXTRA_ENV (recipe_meta.py) and the custom install-steps hook (Phase 1d) before deploy.
`deploy_timeout` is the subprocess timeout for `abra app deploy`. Caller (orchestrator) passes
`recipe_meta.DEPLOY_TIMEOUT` so heavy recipes (ghost, matrix-synapse, lasuite-meet) can extend
past the 900s default. abra's INTERNAL TIMEOUT (recipe's TIMEOUT env, default 300s) is set via
EXTRA_ENV; this is the Python subprocess wrapper's timeout so abra doesn't get SIGKILLed mid-deploy."""
_record_deploy()
# Lock BEFORE the app exists: a concurrent run's janitor must never see this app without a
# held app lock (it would probe it as an orphan and reap an in-flight deploy). Also the
# double-!testme serialisation point: a second run of the same domain blocks here.
acquire_app_lock(domain)
abra.app_config_remove(domain) # clear any stale .env from a prior crashed run
abra.app_new(recipe, domain, version=version, secrets=secrets)
# A pinned version must actually deploy that version: check the recipe out to the tag so the
# on-disk compose/.env match, and deploy NON-chaos below (chaos ignores the pin → deployed LATEST,
# Adversary F1d-2). Chaos is correct ONLY for the version=None case (deploy the current PR-head
# checkout). Order matters: checkout before secret_generate (-C) so secrets match the pinned tree.
chaos = version is None
if version:
abra.recipe_checkout(recipe, version)
# A pinned (non-chaos) deploy runs `abra recipe lint`, which FATAs R014 ('only annotated
# tags') if the upstream recipe ships a stray lightweight version tag (e.g. lasuite-meet's
# 0.3.0+v1.16.0). In that case deploy the EXPLICITLY-checked-out pinned version with chaos:
# chaos skips lint and deploys the current checkout (we just checked out `version`), so it
# still deploys the intended pinned version — not LATEST (the F1d-2 hazard was a *missing*
# checkout, which recipe_checkout above fixes). No-op for all-annotated recipes (stays pinned).
if abra.has_lightweight_version_tags(recipe):
print(
f" deploy_app({recipe}@{version}): lightweight upstream tag present → chaos base "
"deploy of the checked-out pinned version (skips R014 lint; not LATEST)",
flush=True,
)
chaos = True
# A recipe may force a chaos base deploy via recipe_meta CHAOS_BASE_DEPLOY=True when an
# install_steps hook adds an untracked compose overlay to the recipe checkout (e.g. discourse's
# compose.ccci.yml, provided by install_steps for the pinned base). The untracked file makes
# abra's pinned-deploy clean-tree check FATA ('has locally unstaged changes'); chaos skips lint +
# the clean-tree gate and deploys the EXPLICITLY-checked-out pinned version (we already ran
# recipe_checkout(version) above) — NOT latest. Same mechanism as the lightweight-tag branch.
elif _recipe_meta_flag(recipe, "CHAOS_BASE_DEPLOY"):
print(
f" deploy_app({recipe}@{version}): CHAOS_BASE_DEPLOY set → chaos base deploy of the "
"checked-out pinned version (skips clean-tree/lint; deploys version, not LATEST)",
flush=True,
)
chaos = True
# Pin DOMAIN to the run domain explicitly. `abra app new -D` fills it for recipes whose
# .env.sample uses a literal placeholder, but NOT for ones using a `{{ .Domain }}` Go-template
# (this abra version leaves it unexpanded → deploy fails "can't evaluate field Domain"). Setting
# it ourselves is recipe-agnostic and canonical (the run domain IS the app's domain).
abra.env_set(domain, "DOMAIN", domain)
abra.env_set(domain, "LETS_ENCRYPT_ENV", "")
for k, v in _recipe_extra_env(recipe, domain).items():
abra.env_set(domain, k, v)
if secrets:
abra.secret_generate(domain)
if install_steps_hook:
_run_install_steps(install_steps_hook, recipe, domain)
# HQ1: warm the local image store before the (real, unchanged) abra deploy.
prepull_images(recipe, domain)
abra.deploy(domain, chaos=chaos, timeout=deploy_timeout)
def _stack_name(domain: str) -> str:
# abra derives the swarm stack name from the domain by replacing dots with underscores
# and KEEPING hyphens (e.g. custom-html-x.ci.commoninternet.net -> custom-html-x_ci_...).
return domain.replace(".", "_")
def services_converged(domain: str) -> bool:
"""True when every service in the stack reports replicas N/N (N>0) AND no service is
mid-rolling-update (swarm UpdateStatus settled)."""
stack = _stack_name(domain)
proc = subprocess.run(
["docker", "stack", "services", stack, "--format", "{{.Name}} {{.Replicas}}"],
capture_output=True,
text=True,
)
rows = [r for r in proc.stdout.split("\n") if r.strip()]
if not rows:
return False
names = []
for r in rows:
name, _, replicas = r.partition(" ")
names.append(name)
cur, _, want = replicas.partition("/")
# A service at its DESIRED replica count is converged — including a `replicas: 0`
# on-demand one-shot (e.g. lasuite-drive's `minio-createbuckets`, which is scaled up
# manually only when buckets need (re)creating), which reports "0/0". The earlier
# `want == "0"` rejection wrongly treated those as never-converged, hanging the deploy
# forever. `cur == want` (with `want` present) is the correct convergence test; a service
# still spinning up shows e.g. "0/1" (cur != want) and is correctly not-yet-converged.
if not want or cur != want:
return False
# N/N alone is NOT convergence during a stop-first rolling update: a chaos redeploy that changes
# a non-app service image (e.g. immich's db pin) registers the update immediately, but swarm may
# not have cycled that service's task yet — the OLD task still shows 1/1, then dies seconds later
# (immich CI 238: backupbot exec'd the db pre-hook into the just-killed container → 409). Require
# every service's UpdateStatus to be settled too, so the wait spans the whole rolling update.
proc = subprocess.run(
[
"docker",
"service",
"inspect",
*names,
"--format",
"{{if .UpdateStatus}}{{.UpdateStatus.State}}{{end}}",
],
capture_output=True,
text=True,
)
if proc.returncode != 0:
return False # a service vanished mid-check — not settled
for state in proc.stdout.split("\n"):
# Only ACTIVE states block convergence. 'paused'/'rollback_paused' are terminal-without-
# intervention: swarm's default update-failure-action pauses the update on one task flicker
# and the flag then persists FOREVER (immich CI 241: app service 'paused' from a restart
# during restore, service back at 1/1 and healthy — the wait hung to its deadline). With
# N/N already required above, a paused update is settled for our purposes; the HTTP-health
# and tier assertions still gate whether the app actually works.
if state.strip() in ("updating", "rollback_started"):
return False
return True
def http_get(domain: str, path: str = "/", timeout: int = 15) -> int:
"""HTTPS GET the app by its real hostname. On cc-ci the *.ci.commoninternet.net wildcard
resolves (public DNS) to the gateway, which SNI-passthroughs to cc-ci's traefik — so using
the real URL keeps SNI correct (connecting to the bare IP would drop SNI and fail to route)."""
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = urllib.request.Request(f"https://{domain}{path}", method="GET")
try:
with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp:
return resp.status
except urllib.error.HTTPError as e:
return e.code
except Exception:
return 0
def http_fetch(domain: str, path: str = "/", timeout: int = 15) -> tuple[int, str]:
"""One HTTPS GET → (status, body) in a SINGLE request, never raising. Lets a caller check the
status and body together with no race between two requests (assert_serving) — and captures the
error body on a 4xx/5xx instead of throwing."""
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = urllib.request.Request(f"https://{domain}{path}", method="GET")
try:
with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp:
return resp.status, resp.read().decode(errors="replace")
except urllib.error.HTTPError as e:
try:
body = e.read().decode(errors="replace")
except Exception: # noqa: BLE001
body = ""
return e.code, body
except Exception: # noqa: BLE001
return 0, ""
def wait_healthy(
domain: str,
ok_codes=(200, 301, 302),
path: str = "/",
deploy_timeout: int = 600,
http_timeout: int = 300,
) -> None:
"""Wait for stack services converged, then for the app to answer ok over HTTPS at `path`.
`path` is per-recipe (recipe_meta.HEALTH_PATH), e.g. keycloak uses /realms/master."""
deadline = time.time() + deploy_timeout
while time.time() < deadline:
if services_converged(domain):
break
time.sleep(5)
else:
raise TimeoutError(f"{domain}: services did not converge in {deploy_timeout}s")
deadline = time.time() + http_timeout
last = 0
while time.time() < deadline:
last = http_get(domain, path)
if last in ok_codes:
return
time.sleep(5)
raise TimeoutError(f"{domain}: not healthy over HTTPS {path} (last status {last})")
def deployed_identity(domain: str, service: str = "app") -> dict[str, str | None]:
"""Identity of the running app service: {"version", "image", "chaos"}. Used to prove an upgrade
actually MOVED the deployment (not a vacuous no-op — Adversary F1d-2), AND (Phase 1e HC1) that an
`abra app deploy --chaos` upgrade actually deployed the PR-head code under test.
- `version` = the `coop-cloud.<stack>.version` label (bumped per published recipe version).
- `image` = the running container image (usually bumps with a published version).
- `chaos` = the chaos deploy's recipe git commit. abra stamps `coop-cloud.<stack>.chaos-version`
= the deployed recipe commit (e.g. "91b27ceb") + `coop-cloud.<stack>.chaos`="true" on a
`--chaos` deploy; both are absent on a clean pinned-tag deploy. We prefer the `.chaos-version`
commit — for prev→PR-head it IS the proof the PR-head code under test was deployed even when the
version label is unbumped (HC1); fall back to the `.chaos` flag if no commit is present."""
name = f"{_stack_name(domain)}_{service}"
proc = subprocess.run(
[
"docker",
"service",
"inspect",
name,
"--format",
"{{json .Spec.Labels}}|{{.Spec.TaskTemplate.ContainerSpec.Image}}",
],
capture_output=True,
text=True,
)
out = proc.stdout.strip()
if "|" not in out:
return {"version": None, "image": None, "chaos": None}
labels_json, _, image = out.partition("|")
ver = chaos = chaos_flag = None
with contextlib.suppress(ValueError, json.JSONDecodeError):
for k, v in json.loads(labels_json).items():
if not k.startswith("coop-cloud."):
continue
if k.endswith(".version"):
ver = v
elif k.endswith(".chaos-version"):
chaos = v # the deployed recipe commit — the strongest signal
elif k.endswith(".chaos"):
chaos_flag = v
return {"version": ver, "image": image.strip() or None, "chaos": chaos or chaos_flag}
def upgrade_app(domain: str, version: str | None = None) -> None:
abra.upgrade(domain, version=version)
def recipe_head_commit(recipe: str) -> str | None:
"""The recipe checkout's current HEAD commit (captured right after fetch, before any version-tag
checkout) so the upgrade tier can re-checkout the PR head for the chaos redeploy (HC1)."""
return abra.recipe_head_commit(recipe)
def recipe_checkout_ref(recipe: str, ref: str) -> None:
"""git-checkout the recipe to an arbitrary ref/commit (HC1: restore the PR-head checkout before
the chaos upgrade — the prev-tag base deploy reset it to the published tag)."""
abra.recipe_checkout(recipe, ref)
def chaos_redeploy(
domain: str, deploy_timeout: int = 900, no_converge_checks: bool = False
) -> None:
"""In-place `abra app deploy --chaos`: redeploy the running app at the CURRENT recipe checkout
(HC1: the PR-head code under test). This is the upgrade op, not a fresh install — it does NOT go
through deploy_app, so the deploy-count guard (DG4.1) is not incremented.
`deploy_timeout` is the abra subprocess wrapper timeout; pass the recipe's DEPLOY_TIMEOUT so a
heavy stack's reconverge (e.g. lasuite-drive's slow collabora/onlyoffice boot) isn't SIGKILLed
by the 900s default while abra is still legitimately waiting (its internal TIMEOUT can be larger
via the .env). Mirrors the install deploy_app timeout plumbing.
`no_converge_checks` (`abra … -c`): skip abra's own convergence monitor — the caller then owns a
stricter convergence+health wait (F2-12: abra FATAs on the heavy lasuite-drive prev→PR-head
crossover while the new collabora's healthcheck is still in its start_period, even though it
converges given swarm's healthcheck retries). The stack spec IS applied either way (docker stack
deploy runs before the monitor)."""
abra.deploy(domain, chaos=True, timeout=deploy_timeout, no_converge_checks=no_converge_checks)
def wait_ready_probes(meta: dict, domain: str, timeout: int = 600) -> None:
"""Poll a recipe's optional READY_PROBE endpoints until each returns an accepted status, or raise.
A recipe_meta may define `READY_PROBE(domain) -> [{"host":..., "path":..., "ok":(200,)}, ...]`
for readiness signals NOT captured by container-replica convergence or the app's HEALTH_PATH —
e.g. lasuite-drive's collabora WOPI discovery (`/hosting/discovery` on the collabora sibling
host): swarm reports collabora 1/1 'running' while coolwsd is still doing jail/config init and
its discovery endpoint 404s, so replica-convergence alone is not real readiness. Used after the
install deploy and after the upgrade chaos redeploy so 'reconverged' means genuinely ready.
A probe may instead be a TCP-listen check: `{"tcp_host":..., "tcp_port": int, "stable": N}` — poll
until a socket connect succeeds N consecutive times (default 2). This is for NON-HTTP services
whose HEALTH_PATH doesn't reflect them, e.g. mumble's voice server on 64738: the app's HTTP
readiness comes from the mumble-web sidecar, so after a chaos upgrade redeploy (host-mode 64738
must be released by the old task + rebound by the new) the voice server can be down while
HTTP-200 still passes — and backup-bot then execs into a not-running app container (409). Requiring
the voice port to be stably listening before proceeding closes that window."""
probe_fn = meta.get("READY_PROBE")
if not callable(probe_fn):
return
probes = probe_fn(domain) or []
for probe in probes:
if "tcp_port" in probe:
host = probe.get("tcp_host", "127.0.0.1")
port = int(probe["tcp_port"])
needed = int(probe.get("stable", 2))
deadline = time.time() + timeout
consec = 0
last_err = None
while time.time() < deadline:
try:
with socket.create_connection((host, port), timeout=10):
consec += 1
if consec >= needed:
print(f" ready-probe OK (tcp {needed}x): {host}:{port}", flush=True)
break
except OSError as e:
consec = 0
last_err = e
time.sleep(3)
else:
raise TimeoutError(
f"READY_PROBE tcp {host}:{port} not stably listening ({needed}x) within "
f"{timeout}s — last error: {last_err}"
)
continue
host = probe["host"]
path = probe.get("path", "/")
ok = tuple(probe.get("ok", (200,)))
deadline = time.time() + timeout
last = 0
while time.time() < deadline:
last = http_get(host, path, timeout=15)
if last in ok:
print(f" ready-probe OK ({last}): https://{host}{path}", flush=True)
break
time.sleep(5)
else:
raise TimeoutError(
f"READY_PROBE not ready: https://{host}{path} (last status {last}) within {timeout}s"
)
def backup_app(domain: str) -> str:
"""Create a backup; return the abra/restic output (carries the produced snapshot_id)."""
# Never back up a stack that is still converging/rolling-updating: backupbot resolves each
# service's hook container ONCE up front, so a task that cycles between that lookup and the
# pre-hook exec crashes the whole backup with a 409 (immich CI 238). Bounded wait — on timeout
# we still attempt the backup and let the tier's assertion deliver the verdict.
deadline = time.time() + 300
while time.time() < deadline and not services_converged(domain):
print(
f" backup: {domain} stack not settled yet — waiting before backup create", flush=True
)
time.sleep(5)
return abra.backup_create(domain)
def restore_app(domain: str) -> None:
abra.restore(domain)
def previous_version(recipe: str) -> str | None:
"""The second-newest published version (to deploy before upgrading to latest)."""
vers = abra.recipe_versions(recipe)
return vers[-2] if len(vers) >= 2 else None
def _app_container(domain: str, service: str = "app", timeout: int = 60) -> str:
"""The running container id for <stack>_<service>, with a BOUNDED POLL for it to (re)appear.
A lifecycle op can briefly leave no running task — notably `abra app backup create`, where
backup-bot-two stops/cycles the app container, so a mutate exec right after backup hit an empty
`docker ps` and raised. Poll (no bare sleep) until the container is back or timeout."""
name = f"{_stack_name(domain)}_{service}"
deadline = time.time() + timeout
while True:
proc = subprocess.run(
["docker", "ps", "--filter", f"name={name}", "--format", "{{.ID}}"],
capture_output=True,
text=True,
)
cid = proc.stdout.strip().split("\n")[0]
if cid:
return cid
if time.time() >= deadline:
raise RuntimeError(f"no running container for {name} after {timeout}s")
time.sleep(3)
def exec_in_app(domain: str, cmd: list[str], service: str = "app", timeout: int = 90) -> str:
"""Run `docker exec` in the app's container and return stdout. Hardened (Adversary F1e-1): a
lifecycle op (backup/restore) cycles the container, so a freshly-resolved container can be
mid-transition and `docker exec` FAILS — poll (re-resolving the container each try) until the exec
succeeds (returncode 0) or timeout, then RAISE. Never silently return '' on a failed exec: that
masked a container-cycle race as empty data, flipping a healthy recipe RED under opt-out (no
accidental generic-pytest timing buffer) — and could mask a real failure as a pass elsewhere."""
deadline = time.time() + timeout
last = ""
while True:
cid = _app_container(domain, service)
proc = subprocess.run(["docker", "exec", cid, *cmd], capture_output=True, text=True)
if proc.returncode == 0:
return proc.stdout
last = (proc.stderr or proc.stdout).strip()
if time.time() >= deadline:
raise RuntimeError(
f"docker exec in {domain}/{service} failed (rc={proc.returncode}) after {timeout}s: {last}"
)
time.sleep(3)
def http_body(domain: str, path: str = "/", timeout: int = 15) -> str:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = urllib.request.Request(f"https://{domain}{path}", method="GET")
with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp:
return resp.read().decode(errors="replace")
def _force_stack_rm(stack: str, timeout: int = 120) -> None:
"""Remove a stack's services directly (no .env needed) and wait for them to disappear."""
subprocess.run(["docker", "stack", "rm", stack], capture_output=True, text=True)
deadline = time.time() + timeout
while time.time() < deadline and _docker_names("service", stack):
time.sleep(2)
def teardown_app(domain: str, verify: bool = True) -> None:
"""Full teardown with a docker fallback, then VERIFY nothing is left (raise otherwise).
Order matters (A3): undeploy, then remove volumes/secrets *while the .env still exists* (abra
needs it), then drop the .env LAST — and only after the stack is confirmed gone. If abra
undeploy fails, fall back to `docker stack rm` (which needs no .env)."""
stack = _stack_name(domain)
abra.undeploy(domain)
if _docker_names("service", stack):
_force_stack_rm(stack) # fallback: abra undeploy didn't clear it
abra.volume_remove(domain) # needs the .env -> before removing it
abra.secret_remove_all(domain)
# belt-and-suspenders: drop any volumes/secrets abra missed, by stack name. A volume can be
# briefly held by a just-stopped task after `stack rm`, so retry the volume removal.
deadline = time.time() + 60
while time.time() < deadline:
vols = _docker_names("volume", stack)
if not vols:
break
for v in vols:
subprocess.run(["docker", "volume", "rm", v], capture_output=True, text=True)
if not _docker_names("volume", stack):
break
time.sleep(3)
for s in _docker_names("secret", stack):
subprocess.run(["docker", "secret", "rm", s], capture_output=True, text=True)
abra.app_config_remove(domain) # only now (stack gone) drop the .env
if verify:
residual = _residual(domain)
if any(residual.values()):
raise TeardownError(f"teardown left residual for {domain}: {residual}")
# No unregistration step: the app lock releases implicitly at process exit. The clean run's
# leftover lockfile (unheld) is unlinked on sight by the next janitor's stale-lockfile sweep.
# A lock held longer than 2x the 60-min hard deadline can only be a leaked run (the deadline
# bounds every healthy run). Flag it for a human — NEVER steal a held lock.
LONG_HELD_LOCK_SECONDS = 2 * lifetime.HARD_DEADLINE_SECONDS
def _probe_and_reap(domain: str) -> None:
"""Probe one run app's lock; reap iff nobody holds it (kernel-guaranteed orphan).
Reaping happens WHILE HOLDING the probe lock, closing the janitor-vs-new-run race: a new run
of the same domain blocks in acquire_app_lock until the reap finishes, so a fresh app never
coexists with a half-reaped one. The lockfile is unlinked before release (still holding the
lock); a waiter that blocked on the unlinked inode re-checks identity and retries. Two racing
janitors arbitrate on the same flock: one reaps, the other sees 'held' and leaves —
teardown_app(verify=False) is idempotent either way."""
path = _app_lock_path(domain)
try:
# PEP 446: non-inheritable fd, same as acquire_app_lock.
f = open(path, "a") # noqa: SIM115 — closed in the finally below, lock released with it
except OSError as e:
print(f"!! janitor: cannot open lockfile {path} ({e}) — skipping {domain}", flush=True)
return
try:
try:
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
# Held -> live run. Never steal; flag if it has been held implausibly long.
try:
held_for = time.time() - os.stat(path).st_mtime
except OSError:
held_for = 0
if held_for > LONG_HELD_LOCK_SECONDS:
print(
f"!! lock for {domain} held >{LONG_HELD_LOCK_SECONDS // 60}min — possible "
"leaked run; inspect with lslocks",
flush=True,
)
else:
print(
f" janitor: {domain} lock held — live concurrent run, leaving it", flush=True
)
return
# Acquired — but only the inode the PATH names counts (another janitor may have reaped
# and unlinked this inode while we raced; a lock on an unlinked inode protects nothing
# and unlinking the path now would delete a NEWER run's lockfile).
try:
if os.fstat(f.fileno()).st_ino != os.stat(path).st_ino:
return
except FileNotFoundError:
return
# Orphan: no live owner (the kernel released the lock when the owner died). Reap while
# holding the probe lock, then unlink the lockfile before releasing.
print(f" janitor: {domain} lock acquirable — orphan, reaping", flush=True)
with contextlib.suppress(Exception):
teardown_app(domain, verify=False)
with contextlib.suppress(OSError):
os.unlink(path)
finally:
f.close()
def janitor() -> None:
"""Reap orphaned run apps from crashed/rebooted runs; the kernel flock is the only liveness
oracle. For every candidate run app, probe its app-domain lock (LOCK_NB):
acquirable -> nobody holds it -> orphan -> reap under the probe lock + unlink lockfile
held -> live concurrent run -> leave it (warn if held >2x the hard deadline)
Candidate discovery is unchanged: `abra app ls` + a docker-service sweep (catches stacks
whose .env is already gone), both matched against RUN_APP_RE — warm/canonical apps never
match and are never probed. Post-reboot, /run/lock (tmpfs) is empty, so every surviving app
probes as an orphan and is reaped immediately (no age threshold). Stale lockfiles with no
app behind them are unlinked on sight. Degrades safely: an unreadable lockfile/dir is
skipped with a log line, never a crash. Reaps via docker primitives so it works even when
the .env is gone (A2/A3)."""
seen = set()
for app in abra.app_ls():
name = app.get("appName") or app.get("domain") or ""
if RUN_APP_RE.match(name):
seen.add(name)
# also catch stacks whose .env was already deleted (abra ls won't list them)
for svc in _docker_names("service", ""):
# svc like cust-c95a69_ci_commoninternet_net_app -> reconstruct domain
m = re.match(r"^([a-z0-9]{1,4}-[0-9a-f]{6})_ci_commoninternet_net_", svc)
if m:
seen.add(f"{m.group(1)}.ci.commoninternet.net")
for name in seen:
_probe_and_reap(name)
# Tidy /run/lock: a clean run's leftover lockfile is unheld and appless — unlink it (under
# its own probe lock, with the same identity check as above).
with contextlib.suppress(OSError):
for path in glob.glob(os.path.join(_app_lock_dir(), "cc-ci-app-*.lock")):
domain = os.path.basename(path)[len("cc-ci-app-") : -len(".lock")]
if domain in seen:
continue # handled (or deliberately left) above
with contextlib.suppress(OSError):
f = open(path, "a") # noqa: SIM115 — closed below, lock released with it
try:
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
if os.fstat(f.fileno()).st_ino == os.stat(path).st_ino:
os.unlink(path)
except (BlockingIOError, FileNotFoundError):
pass # held (live run pre-deploy) or already gone — leave it
finally:
f.close()