fix(harness): make concurrent recipe runs safe (per-recipe flock + active-run registry)
All checks were successful
continuous-integration/drone/push Build is passing

capacity=2 went live with three stale capacity=1-era assumptions that corrupted
concurrent runs (immich 229/230 '/pg_backup.sh: No such file'):

- ~/.abra/recipes/<recipe> is ONE shared working tree that fetch_recipe rm-rf's/
  reclones and the upgrade tier git-checkouts mid-run. Same-recipe runs now
  serialise on an exclusive flock (/run/lock/cc-ci-recipe-<recipe>.lock), taken
  in main() BEFORE fetch_recipe and held for the whole run; the kernel releases
  it on any process death, so there is no stale-lock failure mode. Different
  recipes still run in parallel.

- CCCI_JANITOR_MAX_AGE=0 made a starting build reap ANY in-flight run app. Every
  run now registers its app domain + pid in /run/cc-ci-active/<domain> before
  app creation; the janitor checks the owner: alive (pid is a live run_recipe_ci
  process) -> never reaped; dead -> reaped immediately; unknown (pre-registry or
  post-reboot) -> age fallback (default 2h). The MAX_AGE=0 env override is gone
  from .drone.yml.

- .drone.yml: concurrency.limit 1 -> 2 to match DRONE_RUNNER_CAPACITY=2; the
  'safe because capacity=1' comments now describe the flock+registry model.

lint: PASS, unit tests: 138 passed.
This commit is contained in:
2026-06-09 21:56:25 +00:00
parent 9a7772563a
commit c0df77d0d9
3 changed files with 131 additions and 24 deletions

View File

@ -35,10 +35,12 @@ steps:
# the comment-bridge). Deploys the recipe at the PR head, runs install/upgrade/backup + any # the comment-bridge). Deploys the recipe at the PR head, runs install/upgrade/backup + any
# recipe-local tests via the shared harness, then guarantees teardown (plan §4.2/§4.3). # recipe-local tests via the shared harness, then guarantees teardown (plan §4.2/§4.3).
# #
# Resource safety (plan §4.2/§4.3): MAX_TESTS=DRONE_RUNNER_CAPACITY=1 (nix/modules/drone-runner.nix) is # Resource safety (plan §4.2/§4.3): DRONE_RUNNER_CAPACITY=2 (nix/modules/drone-runner.nix) +
# the primary concurrency cap; concurrency.limit below is a redundant belt. CCCI_JANITOR_MAX_AGE=0 # concurrency.limit=2 below allow two recipe runs in parallel. Concurrent-run safety is enforced by
# makes the run-start janitor reap ANY orphaned run app before deploying — safe because capacity=1 # the harness, not by serialisation: same-recipe runs serialise on a per-recipe flock
# means no concurrent run exists (a SIGKILL'd/timed-out build leaves an orphan with no teardown). # (lifecycle.acquire_recipe_lock — the shared ~/.abra/recipes/<recipe> checkout is the conflict),
# and every run registers its app domain + pid in /run/cc-ci-active so the run-start janitor only
# reaps orphans whose owning run is DEAD (alive → never touched; unknown → age fallback, default 2h).
kind: pipeline kind: pipeline
type: exec type: exec
name: recipe-ci name: recipe-ci
@ -52,16 +54,16 @@ trigger:
- custom - custom
concurrency: concurrency:
limit: 1 limit: 2
steps: steps:
- name: ci - name: ci
environment: environment:
STAGES: install,upgrade,backup,restore,custom STAGES: install,upgrade,backup,restore,custom
CCCI_JANITOR_MAX_AGE: "0"
# The exec runner points HOME at a per-build workspace; force it to /root so abra finds its # The exec runner points HOME at a per-build workspace; force it to /root so abra finds its
# server config + recipes under /root/.abra (as the manual M4/M5 runs did). Safe: capacity=1 # server config + recipes under /root/.abra (as the manual M4/M5 runs did). Safe with
# means no concurrent build shares /root/.abra. # capacity=2: app names are unique per (recipe,pr,ref) and same-recipe runs serialise on the
# per-recipe flock, so concurrent builds never touch the same recipe checkout or app.
HOME: /root HOME: /root
commands: commands:
# RECIPE/REF/PR/SRC (+ CCCI_QUICK for `!testme --quick`) are injected as env vars from the # RECIPE/REF/PR/SRC (+ CCCI_QUICK for `!testme --quick`) are injected as env vars from the

View File

@ -8,6 +8,7 @@ from __future__ import annotations
import contextlib import contextlib
import datetime import datetime
import fcntl
import json import json
import os import os
import re import re
@ -29,6 +30,73 @@ class TeardownError(RuntimeError):
pass pass
# --- Concurrent-run safety (capacity=2) -------------------------------------------------------
# Two cooperating mechanisms, both process-lifetime-scoped so SIGKILL can't leak a stale lock:
# 1. Per-recipe flock: ~/.abra/recipes/<recipe> is ONE shared working tree that fetch_recipe
# rm-rf's/reclones and the upgrade tier git-checkouts mid-run. Concurrent runs of the SAME
# recipe would corrupt each other's deploy tree (observed: immich builds 229/230 deployed a
# tree missing its config), so they serialise on an exclusive flock; different recipes run in
# parallel. The kernel drops a flock when the holder dies, however it dies.
# 2. Active-run registry: each run registers its app domain + pid before creating the app, so the
# janitor can tell a live concurrent run from a crashed run's orphan (see janitor()).
RECIPE_LOCK_DIR = "/run/lock"
ACTIVE_RUN_DIR = "/run/cc-ci-active"
def acquire_recipe_lock(recipe: str):
"""Take the per-recipe exclusive lock; blocks (with a log line) if another run of the same
recipe is in flight. Returns the open lock file — the CALLER must keep a reference for the
whole run; the lock is released only when the process exits and the fd closes."""
path = os.path.join(RECIPE_LOCK_DIR, f"cc-ci-recipe-{recipe}.lock")
f = open(path, "w") # noqa: SIM115 — deliberately held for the lifetime of the run
try:
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
print(
f"== recipe lock: another {recipe} run is in flight — waiting for {path} "
"(shared ~/.abra/recipes checkout) ==",
flush=True,
)
fcntl.flock(f, fcntl.LOCK_EX)
print(f"== recipe lock: acquired {path} ==", flush=True)
return f
def _registry_path(domain: str) -> str:
return os.path.join(ACTIVE_RUN_DIR, domain)
def register_run_app(domain: str) -> None:
"""Record this process as the live owner of a run app (called BEFORE the app is created, so a
concurrent run's janitor can never observe the app without its registration)."""
with contextlib.suppress(OSError):
os.makedirs(ACTIVE_RUN_DIR, exist_ok=True)
with open(_registry_path(domain), "w") as f:
f.write(str(os.getpid()))
def unregister_run_app(domain: str) -> None:
with contextlib.suppress(OSError):
os.remove(_registry_path(domain))
def _run_owner_state(domain: str) -> str:
"""'alive' if the registered owner is a live run_recipe_ci process, 'dead' if registered but
gone (definite orphan), 'unknown' if never registered (pre-registry code or post-reboot)."""
try:
with open(_registry_path(domain)) as f:
pid = int(f.read().strip())
except (OSError, ValueError):
return "unknown"
try:
with open(f"/proc/{pid}/cmdline", "rb") as f:
cmdline = f.read().decode(errors="replace").replace("\0", " ")
except OSError:
return "dead"
# Guard against pid reuse: the owner must still look like a harness run.
return "alive" if "run_recipe_ci" in cmdline else "dead"
def _docker_names(kind: str, stack: str) -> list[str]: def _docker_names(kind: str, stack: str) -> list[str]:
"""docker <kind> ls names filtered to a stack (kind: service|volume|secret).""" """docker <kind> ls names filtered to a stack (kind: service|volume|secret)."""
proc = subprocess.run( proc = subprocess.run(
@ -161,7 +229,8 @@ def prepull_images(recipe: str, domain: str) -> None:
# --env-file supplies $VERSION-style interpolation so pinned tags resolve correctly. # --env-file supplies $VERSION-style interpolation so pinned tags resolve correctly.
cf = subprocess.run( cf = subprocess.run(
["bash", "-c", f'set -a; . "{env_path}"; printf "%s" "${{COMPOSE_FILE:-compose.yml}}"'], ["bash", "-c", f'set -a; . "{env_path}"; printf "%s" "${{COMPOSE_FILE:-compose.yml}}"'],
capture_output=True, text=True, capture_output=True,
text=True,
).stdout.strip() ).stdout.strip()
files = [f for f in cf.split(":") if f] or ["compose.yml"] files = [f for f in cf.split(":") if f] or ["compose.yml"]
args = ["docker", "compose", "--env-file", env_path] args = ["docker", "compose", "--env-file", env_path]
@ -209,6 +278,9 @@ def deploy_app(
past the 900s default. abra's INTERNAL TIMEOUT (recipe's TIMEOUT env, default 300s) is set via past the 900s default. abra's INTERNAL TIMEOUT (recipe's TIMEOUT env, default 300s) is set via
EXTRA_ENV; this is the Python subprocess wrapper's timeout so abra doesn't get SIGKILLed mid-deploy.""" EXTRA_ENV; this is the Python subprocess wrapper's timeout so abra doesn't get SIGKILLed mid-deploy."""
_record_deploy() _record_deploy()
# Register BEFORE the app exists: a concurrent run's janitor must never see this app without
# its live-owner registration (it would reap an in-flight deploy).
register_run_app(domain)
abra.app_config_remove(domain) # clear any stale .env from a prior crashed run abra.app_config_remove(domain) # clear any stale .env from a prior crashed run
abra.app_new(recipe, domain, version=version, secrets=secrets) abra.app_new(recipe, domain, version=version, secrets=secrets)
# A pinned version must actually deploy that version: check the recipe out to the tag so the # A pinned version must actually deploy that version: check the recipe out to the tag so the
@ -415,7 +487,9 @@ def recipe_checkout_ref(recipe: str, ref: str) -> None:
abra.recipe_checkout(recipe, ref) abra.recipe_checkout(recipe, ref)
def chaos_redeploy(domain: str, deploy_timeout: int = 900, no_converge_checks: bool = False) -> None: def chaos_redeploy(
domain: str, deploy_timeout: int = 900, no_converge_checks: bool = False
) -> None:
"""In-place `abra app deploy --chaos`: redeploy the running app at the CURRENT recipe checkout """In-place `abra app deploy --chaos`: redeploy the running app at the CURRENT recipe checkout
(HC1: the PR-head code under test). This is the upgrade op, not a fresh install — it does NOT go (HC1: the PR-head code under test). This is the upgrade op, not a fresh install — it does NOT go
through deploy_app, so the deploy-count guard (DG4.1) is not incremented. through deploy_app, so the deploy-count guard (DG4.1) is not incremented.
@ -603,13 +677,19 @@ def teardown_app(domain: str, verify: bool = True) -> None:
residual = _residual(domain) residual = _residual(domain)
if any(residual.values()): if any(residual.values()):
raise TeardownError(f"teardown left residual for {domain}: {residual}") raise TeardownError(f"teardown left residual for {domain}: {residual}")
# The app is gone — drop its active-run registration (janitor() also clears it when reaping).
unregister_run_app(domain)
def janitor(max_age_seconds: int | None = None) -> None: def janitor(max_age_seconds: int | None = None) -> None:
"""Reap orphaned run apps from crashed/rebooted runs. Matches the real naming scheme and only """Reap orphaned run apps from crashed/rebooted runs. Matches the real naming scheme. Safe under
reaps apps older than max_age_seconds (so concurrent in-flight runs are never killed). Reaps via CONCURRENT runs (capacity=2): every harness run registers its app in the active-run registry
docker primitives so it works even when the .env is gone (A2/A3). Default 2h, env-overridable (register_run_app), so the janitor distinguishes the three cases instead of using age alone:
via CCCI_JANITOR_MAX_AGE (e.g. 0 to reap all matching orphans immediately).""" - registered + owner run_recipe_ci process ALIVE -> in-flight concurrent run: never reap
- registered + owner DEAD (crashed/SIGKILLed run) -> definite orphan: reap immediately
- no registry entry (pre-registry code, reboot) -> fall back to the age threshold
Reaps via docker primitives so it works even when the .env is gone (A2/A3). Age fallback default
2h, env-overridable via CCCI_JANITOR_MAX_AGE."""
import os import os
if max_age_seconds is None: if max_age_seconds is None:
@ -627,9 +707,18 @@ def janitor(max_age_seconds: int | None = None) -> None:
seen.add(f"{m.group(1)}.ci.commoninternet.net") seen.add(f"{m.group(1)}.ci.commoninternet.net")
for name in seen: for name in seen:
stack = _stack_name(name) owner = _run_owner_state(name)
age = _stack_age_seconds(stack) if owner == "alive":
if age is not None and age < max_age_seconds: print(f" janitor: {name} is a live concurrent run — leaving it", flush=True)
continue # likely a concurrent in-flight run; leave it continue
if owner == "unknown":
# No registry entry (manual run on pre-registry code, or post-reboot): only the age
# threshold protects it, as before.
stack = _stack_name(name)
age = _stack_age_seconds(stack)
if age is not None and age < max_age_seconds:
continue # young and of unknown provenance; leave it
# owner == "dead" (a crashed/killed run's definite orphan) or old enough -> reap
with contextlib.suppress(Exception): with contextlib.suppress(Exception):
teardown_app(name, verify=False) teardown_app(name, verify=False)
unregister_run_app(name)

View File

@ -44,17 +44,25 @@ sys.path.insert(0, os.path.join(ROOT, "runner"))
from harness import ( # noqa: E402 from harness import ( # noqa: E402
abra, abra,
canonical, canonical,
card as card_mod,
deps as deps_mod,
discovery, discovery,
generic, generic,
lifecycle, lifecycle,
naming, naming,
results as results_mod,
screenshot as screenshot_mod,
warm, warm,
warmsnap, warmsnap,
) )
from harness import ( # noqa: E402
card as card_mod,
)
from harness import ( # noqa: E402
deps as deps_mod,
)
from harness import ( # noqa: E402
results as results_mod,
)
from harness import ( # noqa: E402
screenshot as screenshot_mod,
)
ALL_STAGES = ("install", "upgrade", "backup", "restore", "custom") ALL_STAGES = ("install", "upgrade", "backup", "restore", "custom")
@ -827,6 +835,12 @@ def main() -> int:
print( print(
f"== cc-ci run: recipe={recipe} ref={ref} pr={os.environ.get('PR', '0')} stages={sorted(stages)}" f"== cc-ci run: recipe={recipe} ref={ref} pr={os.environ.get('PR', '0')} stages={sorted(stages)}"
) )
# Concurrent-run safety: runs of the SAME recipe serialise on a per-recipe flock — they share
# ONE ~/.abra/recipes/<recipe> working tree which fetch_recipe (below) rm-rf's/reclones and the
# upgrade tier git-checkouts mid-run. Must be taken BEFORE fetch_recipe. Different recipes run
# in parallel (capacity=2). The reference must stay alive for the whole run: the kernel drops
# the flock when the fd closes (including on any crash/SIGKILL — no stale-lock failure mode).
_recipe_lock = lifecycle.acquire_recipe_lock(recipe) # noqa: F841
fetch_recipe(recipe, ref, src) fetch_recipe(recipe, ref, src)
# The PR-head commit the upgrade tier re-checks out for the chaos redeploy to the code under test # The PR-head commit the upgrade tier re-checks out for the chaos redeploy to the code under test
# (HC1). Prefer the explicit PR head sha ($REF) — robust + exact; fall back to the recipe checkout # (HC1). Prefer the explicit PR head sha ($REF) — robust + exact; fall back to the recipe checkout
@ -1285,8 +1299,10 @@ def main() -> int:
capped = data.get("level_cap_rung") capped = data.get("level_cap_rung")
sk = data.get("skips", {}) sk = data.get("skips", {})
cap_skip = ( cap_skip = (
"intentional" if capped in (sk.get("intentional") or {}) "intentional"
else "unintentional" if capped in (sk.get("unintentional") or []) if capped in (sk.get("intentional") or {})
else "unintentional"
if capped in (sk.get("unintentional") or [])
else "" else ""
) )
with open(os.path.join(run_artifact_dir, "badge.svg"), "w", encoding="utf-8") as f: with open(os.path.join(run_artifact_dir, "badge.svg"), "w", encoding="utf-8") as f: