fix(harness): make concurrent recipe runs safe (per-recipe flock + active-run registry)
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
capacity=2 went live with three stale capacity=1-era assumptions that corrupted concurrent runs (immich 229/230 '/pg_backup.sh: No such file'): - ~/.abra/recipes/<recipe> is ONE shared working tree that fetch_recipe rm-rf's/ reclones and the upgrade tier git-checkouts mid-run. Same-recipe runs now serialise on an exclusive flock (/run/lock/cc-ci-recipe-<recipe>.lock), taken in main() BEFORE fetch_recipe and held for the whole run; the kernel releases it on any process death, so there is no stale-lock failure mode. Different recipes still run in parallel. - CCCI_JANITOR_MAX_AGE=0 made a starting build reap ANY in-flight run app. Every run now registers its app domain + pid in /run/cc-ci-active/<domain> before app creation; the janitor checks the owner: alive (pid is a live run_recipe_ci process) -> never reaped; dead -> reaped immediately; unknown (pre-registry or post-reboot) -> age fallback (default 2h). The MAX_AGE=0 env override is gone from .drone.yml. - .drone.yml: concurrency.limit 1 -> 2 to match DRONE_RUNNER_CAPACITY=2; the 'safe because capacity=1' comments now describe the flock+registry model. lint: PASS, unit tests: 138 passed.
This commit is contained in:
18
.drone.yml
18
.drone.yml
@ -35,10 +35,12 @@ steps:
|
||||
# the comment-bridge). Deploys the recipe at the PR head, runs install/upgrade/backup + any
|
||||
# recipe-local tests via the shared harness, then guarantees teardown (plan §4.2/§4.3).
|
||||
#
|
||||
# Resource safety (plan §4.2/§4.3): MAX_TESTS=DRONE_RUNNER_CAPACITY=1 (nix/modules/drone-runner.nix) is
|
||||
# the primary concurrency cap; concurrency.limit below is a redundant belt. CCCI_JANITOR_MAX_AGE=0
|
||||
# makes the run-start janitor reap ANY orphaned run app before deploying — safe because capacity=1
|
||||
# means no concurrent run exists (a SIGKILL'd/timed-out build leaves an orphan with no teardown).
|
||||
# Resource safety (plan §4.2/§4.3): DRONE_RUNNER_CAPACITY=2 (nix/modules/drone-runner.nix) +
|
||||
# concurrency.limit=2 below allow two recipe runs in parallel. Concurrent-run safety is enforced by
|
||||
# the harness, not by serialisation: same-recipe runs serialise on a per-recipe flock
|
||||
# (lifecycle.acquire_recipe_lock — the shared ~/.abra/recipes/<recipe> checkout is the conflict),
|
||||
# and every run registers its app domain + pid in /run/cc-ci-active so the run-start janitor only
|
||||
# reaps orphans whose owning run is DEAD (alive → never touched; unknown → age fallback, default 2h).
|
||||
kind: pipeline
|
||||
type: exec
|
||||
name: recipe-ci
|
||||
@ -52,16 +54,16 @@ trigger:
|
||||
- custom
|
||||
|
||||
concurrency:
|
||||
limit: 1
|
||||
limit: 2
|
||||
|
||||
steps:
|
||||
- name: ci
|
||||
environment:
|
||||
STAGES: install,upgrade,backup,restore,custom
|
||||
CCCI_JANITOR_MAX_AGE: "0"
|
||||
# The exec runner points HOME at a per-build workspace; force it to /root so abra finds its
|
||||
# server config + recipes under /root/.abra (as the manual M4/M5 runs did). Safe: capacity=1
|
||||
# means no concurrent build shares /root/.abra.
|
||||
# server config + recipes under /root/.abra (as the manual M4/M5 runs did). Safe with
|
||||
# capacity=2: app names are unique per (recipe,pr,ref) and same-recipe runs serialise on the
|
||||
# per-recipe flock, so concurrent builds never touch the same recipe checkout or app.
|
||||
HOME: /root
|
||||
commands:
|
||||
# RECIPE/REF/PR/SRC (+ CCCI_QUICK for `!testme --quick`) are injected as env vars from the
|
||||
|
||||
@ -8,6 +8,7 @@ from __future__ import annotations
|
||||
|
||||
import contextlib
|
||||
import datetime
|
||||
import fcntl
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
@ -29,6 +30,73 @@ class TeardownError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
# --- Concurrent-run safety (capacity=2) -------------------------------------------------------
|
||||
# Two cooperating mechanisms, both process-lifetime-scoped so SIGKILL can't leak a stale lock:
|
||||
# 1. Per-recipe flock: ~/.abra/recipes/<recipe> is ONE shared working tree that fetch_recipe
|
||||
# rm-rf's/reclones and the upgrade tier git-checkouts mid-run. Concurrent runs of the SAME
|
||||
# recipe would corrupt each other's deploy tree (observed: immich builds 229/230 deployed a
|
||||
# tree missing its config), so they serialise on an exclusive flock; different recipes run in
|
||||
# parallel. The kernel drops a flock when the holder dies, however it dies.
|
||||
# 2. Active-run registry: each run registers its app domain + pid before creating the app, so the
|
||||
# janitor can tell a live concurrent run from a crashed run's orphan (see janitor()).
|
||||
RECIPE_LOCK_DIR = "/run/lock"
|
||||
ACTIVE_RUN_DIR = "/run/cc-ci-active"
|
||||
|
||||
|
||||
def acquire_recipe_lock(recipe: str):
|
||||
"""Take the per-recipe exclusive lock; blocks (with a log line) if another run of the same
|
||||
recipe is in flight. Returns the open lock file — the CALLER must keep a reference for the
|
||||
whole run; the lock is released only when the process exits and the fd closes."""
|
||||
path = os.path.join(RECIPE_LOCK_DIR, f"cc-ci-recipe-{recipe}.lock")
|
||||
f = open(path, "w") # noqa: SIM115 — deliberately held for the lifetime of the run
|
||||
try:
|
||||
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
except BlockingIOError:
|
||||
print(
|
||||
f"== recipe lock: another {recipe} run is in flight — waiting for {path} "
|
||||
"(shared ~/.abra/recipes checkout) ==",
|
||||
flush=True,
|
||||
)
|
||||
fcntl.flock(f, fcntl.LOCK_EX)
|
||||
print(f"== recipe lock: acquired {path} ==", flush=True)
|
||||
return f
|
||||
|
||||
|
||||
def _registry_path(domain: str) -> str:
|
||||
return os.path.join(ACTIVE_RUN_DIR, domain)
|
||||
|
||||
|
||||
def register_run_app(domain: str) -> None:
|
||||
"""Record this process as the live owner of a run app (called BEFORE the app is created, so a
|
||||
concurrent run's janitor can never observe the app without its registration)."""
|
||||
with contextlib.suppress(OSError):
|
||||
os.makedirs(ACTIVE_RUN_DIR, exist_ok=True)
|
||||
with open(_registry_path(domain), "w") as f:
|
||||
f.write(str(os.getpid()))
|
||||
|
||||
|
||||
def unregister_run_app(domain: str) -> None:
|
||||
with contextlib.suppress(OSError):
|
||||
os.remove(_registry_path(domain))
|
||||
|
||||
|
||||
def _run_owner_state(domain: str) -> str:
|
||||
"""'alive' if the registered owner is a live run_recipe_ci process, 'dead' if registered but
|
||||
gone (definite orphan), 'unknown' if never registered (pre-registry code or post-reboot)."""
|
||||
try:
|
||||
with open(_registry_path(domain)) as f:
|
||||
pid = int(f.read().strip())
|
||||
except (OSError, ValueError):
|
||||
return "unknown"
|
||||
try:
|
||||
with open(f"/proc/{pid}/cmdline", "rb") as f:
|
||||
cmdline = f.read().decode(errors="replace").replace("\0", " ")
|
||||
except OSError:
|
||||
return "dead"
|
||||
# Guard against pid reuse: the owner must still look like a harness run.
|
||||
return "alive" if "run_recipe_ci" in cmdline else "dead"
|
||||
|
||||
|
||||
def _docker_names(kind: str, stack: str) -> list[str]:
|
||||
"""docker <kind> ls names filtered to a stack (kind: service|volume|secret)."""
|
||||
proc = subprocess.run(
|
||||
@ -161,7 +229,8 @@ def prepull_images(recipe: str, domain: str) -> None:
|
||||
# --env-file supplies $VERSION-style interpolation so pinned tags resolve correctly.
|
||||
cf = subprocess.run(
|
||||
["bash", "-c", f'set -a; . "{env_path}"; printf "%s" "${{COMPOSE_FILE:-compose.yml}}"'],
|
||||
capture_output=True, text=True,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
).stdout.strip()
|
||||
files = [f for f in cf.split(":") if f] or ["compose.yml"]
|
||||
args = ["docker", "compose", "--env-file", env_path]
|
||||
@ -209,6 +278,9 @@ def deploy_app(
|
||||
past the 900s default. abra's INTERNAL TIMEOUT (recipe's TIMEOUT env, default 300s) is set via
|
||||
EXTRA_ENV; this is the Python subprocess wrapper's timeout so abra doesn't get SIGKILLed mid-deploy."""
|
||||
_record_deploy()
|
||||
# Register BEFORE the app exists: a concurrent run's janitor must never see this app without
|
||||
# its live-owner registration (it would reap an in-flight deploy).
|
||||
register_run_app(domain)
|
||||
abra.app_config_remove(domain) # clear any stale .env from a prior crashed run
|
||||
abra.app_new(recipe, domain, version=version, secrets=secrets)
|
||||
# A pinned version must actually deploy that version: check the recipe out to the tag so the
|
||||
@ -415,7 +487,9 @@ def recipe_checkout_ref(recipe: str, ref: str) -> None:
|
||||
abra.recipe_checkout(recipe, ref)
|
||||
|
||||
|
||||
def chaos_redeploy(domain: str, deploy_timeout: int = 900, no_converge_checks: bool = False) -> None:
|
||||
def chaos_redeploy(
|
||||
domain: str, deploy_timeout: int = 900, no_converge_checks: bool = False
|
||||
) -> None:
|
||||
"""In-place `abra app deploy --chaos`: redeploy the running app at the CURRENT recipe checkout
|
||||
(HC1: the PR-head code under test). This is the upgrade op, not a fresh install — it does NOT go
|
||||
through deploy_app, so the deploy-count guard (DG4.1) is not incremented.
|
||||
@ -603,13 +677,19 @@ def teardown_app(domain: str, verify: bool = True) -> None:
|
||||
residual = _residual(domain)
|
||||
if any(residual.values()):
|
||||
raise TeardownError(f"teardown left residual for {domain}: {residual}")
|
||||
# The app is gone — drop its active-run registration (janitor() also clears it when reaping).
|
||||
unregister_run_app(domain)
|
||||
|
||||
|
||||
def janitor(max_age_seconds: int | None = None) -> None:
|
||||
"""Reap orphaned run apps from crashed/rebooted runs. Matches the real naming scheme and only
|
||||
reaps apps older than max_age_seconds (so concurrent in-flight runs are never killed). Reaps via
|
||||
docker primitives so it works even when the .env is gone (A2/A3). Default 2h, env-overridable
|
||||
via CCCI_JANITOR_MAX_AGE (e.g. 0 to reap all matching orphans immediately)."""
|
||||
"""Reap orphaned run apps from crashed/rebooted runs. Matches the real naming scheme. Safe under
|
||||
CONCURRENT runs (capacity=2): every harness run registers its app in the active-run registry
|
||||
(register_run_app), so the janitor distinguishes the three cases instead of using age alone:
|
||||
- registered + owner run_recipe_ci process ALIVE -> in-flight concurrent run: never reap
|
||||
- registered + owner DEAD (crashed/SIGKILLed run) -> definite orphan: reap immediately
|
||||
- no registry entry (pre-registry code, reboot) -> fall back to the age threshold
|
||||
Reaps via docker primitives so it works even when the .env is gone (A2/A3). Age fallback default
|
||||
2h, env-overridable via CCCI_JANITOR_MAX_AGE."""
|
||||
import os
|
||||
|
||||
if max_age_seconds is None:
|
||||
@ -627,9 +707,18 @@ def janitor(max_age_seconds: int | None = None) -> None:
|
||||
seen.add(f"{m.group(1)}.ci.commoninternet.net")
|
||||
|
||||
for name in seen:
|
||||
stack = _stack_name(name)
|
||||
age = _stack_age_seconds(stack)
|
||||
if age is not None and age < max_age_seconds:
|
||||
continue # likely a concurrent in-flight run; leave it
|
||||
owner = _run_owner_state(name)
|
||||
if owner == "alive":
|
||||
print(f" janitor: {name} is a live concurrent run — leaving it", flush=True)
|
||||
continue
|
||||
if owner == "unknown":
|
||||
# No registry entry (manual run on pre-registry code, or post-reboot): only the age
|
||||
# threshold protects it, as before.
|
||||
stack = _stack_name(name)
|
||||
age = _stack_age_seconds(stack)
|
||||
if age is not None and age < max_age_seconds:
|
||||
continue # young and of unknown provenance; leave it
|
||||
# owner == "dead" (a crashed/killed run's definite orphan) or old enough -> reap
|
||||
with contextlib.suppress(Exception):
|
||||
teardown_app(name, verify=False)
|
||||
unregister_run_app(name)
|
||||
|
||||
@ -44,17 +44,25 @@ sys.path.insert(0, os.path.join(ROOT, "runner"))
|
||||
from harness import ( # noqa: E402
|
||||
abra,
|
||||
canonical,
|
||||
card as card_mod,
|
||||
deps as deps_mod,
|
||||
discovery,
|
||||
generic,
|
||||
lifecycle,
|
||||
naming,
|
||||
results as results_mod,
|
||||
screenshot as screenshot_mod,
|
||||
warm,
|
||||
warmsnap,
|
||||
)
|
||||
from harness import ( # noqa: E402
|
||||
card as card_mod,
|
||||
)
|
||||
from harness import ( # noqa: E402
|
||||
deps as deps_mod,
|
||||
)
|
||||
from harness import ( # noqa: E402
|
||||
results as results_mod,
|
||||
)
|
||||
from harness import ( # noqa: E402
|
||||
screenshot as screenshot_mod,
|
||||
)
|
||||
|
||||
ALL_STAGES = ("install", "upgrade", "backup", "restore", "custom")
|
||||
|
||||
@ -827,6 +835,12 @@ def main() -> int:
|
||||
print(
|
||||
f"== cc-ci run: recipe={recipe} ref={ref} pr={os.environ.get('PR', '0')} stages={sorted(stages)}"
|
||||
)
|
||||
# Concurrent-run safety: runs of the SAME recipe serialise on a per-recipe flock — they share
|
||||
# ONE ~/.abra/recipes/<recipe> working tree which fetch_recipe (below) rm-rf's/reclones and the
|
||||
# upgrade tier git-checkouts mid-run. Must be taken BEFORE fetch_recipe. Different recipes run
|
||||
# in parallel (capacity=2). The reference must stay alive for the whole run: the kernel drops
|
||||
# the flock when the fd closes (including on any crash/SIGKILL — no stale-lock failure mode).
|
||||
_recipe_lock = lifecycle.acquire_recipe_lock(recipe) # noqa: F841
|
||||
fetch_recipe(recipe, ref, src)
|
||||
# The PR-head commit the upgrade tier re-checks out for the chaos redeploy to the code under test
|
||||
# (HC1). Prefer the explicit PR head sha ($REF) — robust + exact; fall back to the recipe checkout
|
||||
@ -1285,8 +1299,10 @@ def main() -> int:
|
||||
capped = data.get("level_cap_rung")
|
||||
sk = data.get("skips", {})
|
||||
cap_skip = (
|
||||
"intentional" if capped in (sk.get("intentional") or {})
|
||||
else "unintentional" if capped in (sk.get("unintentional") or [])
|
||||
"intentional"
|
||||
if capped in (sk.get("intentional") or {})
|
||||
else "unintentional"
|
||||
if capped in (sk.get("unintentional") or [])
|
||||
else ""
|
||||
)
|
||||
with open(os.path.join(run_artifact_dir, "badge.svg"), "w", encoding="utf-8") as f:
|
||||
|
||||
Reference in New Issue
Block a user