test(concurrency): real-kernel suite for the restructured model — 20 tests, 19 plan cases
All checks were successful
continuous-integration/drone/push Build is passing

tests/concurrency/ — NOT in the default `pytest tests/unit` gate; run explicitly with
`pytest tests/concurrency -q`. flock/prctl/alarm are never mocked: helper subprocesses
(helpers.py) hold real locks and install the real lifetime guards; locks live in a per-test
tmp dir via CCCI_APP_LOCK_DIR; every helper (and recorded grandchild) is reaped by fixture
cleanup.

- test_locks.py (cases 1-4): SIGKILL auto-release; LOCK_NB held/unheld semantics; PEP 446
  fd-not-inherited (holder's child survives, lock still releases); same-domain second acquire
  blocks until first holder exits.
- test_janitor.py (cases 5-12): orphan reaped once + lockfile unlinked; live holder never
  reaped + logged; new-run acquire blocks until a slow reap completes (reap-under-probe-lock);
  two overlapping janitors -> exactly one reaps (flock arbitration); reboot sim (no lockfile)
  reaps immediately with no age wait; >120min-held lock flagged 'possible leaked run' and NOT
  stolen; warm/canonical names never probed (no lockfile even created); directory-as-lockfile
  and missing lock dir degrade to skip+log, never crash.
- test_lifetime.py (cases 13-16): PDEATHSIG (wrapper parent SIGKILL'd -> guarded child TERM'd,
  teardown marker, lock released); already-orphaned helper REFUSES to run (ppid race); 2s
  deadline alarm -> teardown + exit 142 + lock released; SIGTERM -> teardown + exit 143 +
  lock released.
- test_abra_dir.py (cases 17-19 + 18b): per-run dir built + $ABRA_DIR exported before the
  first abra call (recording stub abra on PATH); two CONCURRENT same-recipe fetch+checkout
  flows into different ABRA_DIRs -> divergent correct trees, canonical staged clone untouched;
  .env written through the servers/ symlink lands in the canonical path (env_get/env_set
  agree); manual runs get pid-suffixed dirs.

On cc-ci: pytest tests/concurrency -q -> 20 passed; tests/unit -> 138 passed; lint PASS.
This commit is contained in:
autonomic-bot
2026-06-10 04:29:36 +00:00
parent 91d3cc7e99
commit 84d90fb655
7 changed files with 793 additions and 0 deletions

View File

@ -0,0 +1,108 @@
"""Shared utilities for the real-kernel concurrency suite (imported by the test modules; the
fixtures in conftest.py wrap these). No flock mocking anywhere — probes use real LOCK_NB."""
from __future__ import annotations
import contextlib
import fcntl
import os
import signal
import subprocess
import sys
import time
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner"))
from harness import lifecycle # noqa: E402
HELPERS = os.path.join(os.path.dirname(__file__), "helpers.py")
DOMAIN = "test-abc123.ci.commoninternet.net" # matches RUN_APP_RE
class HelperPool:
"""Spawns helpers.py subprocesses and GUARANTEES their cleanup (incl. recorded grandchild
pids from `hold-with-child`/`wrapper` markers) — no leaked children in the test VM."""
def __init__(self, out_dir: str):
self.out_dir = out_dir
self.procs: list[subprocess.Popen] = []
self.extra_pids: list[int] = []
self._n = 0
def spawn(self, *args: str, env_extra: dict | None = None) -> tuple[subprocess.Popen, str]:
"""Start `helpers.py <args...>`; returns (proc, marker_file)."""
self._n += 1
out = os.path.join(self.out_dir, f"helper-{self._n}.out")
env = dict(os.environ, CCCI_HELPER_OUT=out, **(env_extra or {}))
p = subprocess.Popen( # noqa: S603
[sys.executable, HELPERS, *args],
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
)
self.procs.append(p)
return p, out
def track_pid(self, pid: int) -> None:
self.extra_pids.append(pid)
def cleanup(self) -> None:
for p in self.procs:
if p.poll() is None:
p.kill()
with contextlib.suppress(subprocess.TimeoutExpired):
p.wait(timeout=10)
for pid in self.extra_pids:
with contextlib.suppress(OSError):
os.kill(pid, signal.SIGKILL)
def wait_marker(out: str, token: str, timeout: float = 15.0) -> str | None:
"""Poll a helper's marker file for a line containing `token`; returns the line or None."""
deadline = time.time() + timeout
while time.time() < deadline:
try:
with open(out) as f:
for line in f:
if token in line:
return line.strip()
except OSError:
pass
time.sleep(0.1)
return None
def lock_state(domain: str) -> str:
"""'held' | 'free' | 'absent' for the domain's lockfile, probed with a REAL LOCK_NB."""
path = lifecycle._app_lock_path(domain) # noqa: SLF001
if not os.path.exists(path):
return "absent"
with open(path, "a") as f:
try:
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
return "free"
except BlockingIOError:
return "held"
def wait_lock_state(domain: str, want: str, timeout: float = 10.0) -> str:
"""Poll until lock_state(domain) == want (kernel release on process death is fast, but give
the scheduler room). Returns the final observed state."""
deadline = time.time() + timeout
state = lock_state(domain)
while state != want and time.time() < deadline:
time.sleep(0.1)
state = lock_state(domain)
return state
def pid_alive(pid: int) -> bool:
return os.path.exists(f"/proc/{pid}")
def wait_pid_gone(pid: int, timeout: float = 15.0) -> bool:
deadline = time.time() + timeout
while time.time() < deadline:
if not pid_alive(pid):
return True
time.sleep(0.1)
return False

View File

@ -0,0 +1,34 @@
"""Fixtures for the real-kernel concurrency suite (concurrency-restructure plan, 19 cases).
NOT part of the default `pytest tests/unit` gate — run explicitly with `pytest tests/concurrency
-q` (docs/concurrency.md). Locks live in a per-test tmp dir (CCCI_APP_LOCK_DIR); helper
subprocesses hold REAL flocks / install the REAL prctl+signal guards and are always reaped in
fixture finalizers (no leaked children in the test VM).
"""
from __future__ import annotations
import os
import sys
import pytest
sys.path.insert(0, os.path.dirname(__file__))
from concutil import HelperPool # noqa: E402
@pytest.fixture
def lock_dir(tmp_path, monkeypatch):
"""Sandbox lock dir, exported so BOTH this process's lifecycle calls and helper subprocesses
(which inherit os.environ) resolve their lockfiles here — never /run/lock."""
d = tmp_path / "locks"
d.mkdir()
monkeypatch.setenv("CCCI_APP_LOCK_DIR", str(d))
return str(d)
@pytest.fixture
def pool(tmp_path):
hp = HelperPool(str(tmp_path))
yield hp
hp.cleanup()

View File

@ -0,0 +1,120 @@
#!/usr/bin/env python3
"""Subprocess helpers for tests/concurrency — REAL kernel locks and the REAL lifetime guards in
separate processes (flock/prctl are never mocked; tests assert on actual kernel behavior).
Invoked as: python3 helpers.py <command> <args...>
Env contract (set by the spawning test):
CCCI_APP_LOCK_DIR sandbox lock dir (never /run/lock in tests)
CCCI_HELPER_OUT marker file this helper APPENDS progress lines to (ACQUIRED/READY/...)
Commands:
hold <domain> acquire the app lock, mark `ACQUIRED <ts>`, sleep forever
hold-with-child <domain> acquire the lock, spawn a plain sleeping subprocess child, mark
`ACQUIRED <ts>` + `CHILD <pid>` (PEP 446: the child must NOT
inherit the lock fd), sleep forever
guarded <domain> <deadline> install the REAL lifetime guards (alarm=<deadline>s), acquire the
lock, mark `READY`; when the teardown funnel runs (`finally:`),
mark `TEARDOWN` before exiting
wrapper <domain> spawn `guarded <domain> 3600` as MY child, mark `WRAPPED <pid>`,
sleep — the test kills me to prove PDEATHSIG TERMs the child
orphan-probe wait (bounded) until reparented (ppid==1), then install the
guards; mark `REFUSED` if they exit (expected) or `GUARDS_OK`
fetch-checkout <recipe> <ref> run run_recipe_ci.fetch_recipe (the test sets CCCI_SKIP_FETCH=1
+ a per-"run" ABRA_DIR), git-checkout <ref>, mark
`RESULT <head> <data.txt content>`
"""
from __future__ import annotations
import os
import subprocess
import sys
import time
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "runner"))
from harness import abra, lifecycle, lifetime # noqa: E402
OUT = os.environ.get("CCCI_HELPER_OUT")
def mark(line: str) -> None:
if OUT:
with open(OUT, "a") as f:
f.write(line + "\n")
f.flush()
print(line, flush=True)
def cmd_hold(domain: str) -> None:
lifecycle.acquire_app_lock(domain)
mark(f"ACQUIRED {time.time()}")
time.sleep(3600)
def cmd_hold_with_child(domain: str) -> None:
lifecycle.acquire_app_lock(domain)
child = subprocess.Popen([sys.executable, "-c", "import time; time.sleep(3600)"])
mark(f"ACQUIRED {time.time()}")
mark(f"CHILD {child.pid}")
time.sleep(3600)
def cmd_guarded(domain: str, deadline: str) -> None:
lifetime.install_lifetime_guards(deadline_seconds=int(deadline))
lifecycle.acquire_app_lock(domain)
mark("READY")
try:
time.sleep(3600)
finally:
mark("TEARDOWN")
def cmd_wrapper(domain: str) -> None:
p = subprocess.Popen( # noqa: S603
[sys.executable, os.path.abspath(__file__), "guarded", domain, "3600"],
env=os.environ.copy(),
)
mark(f"WRAPPED {p.pid}")
time.sleep(3600)
def cmd_orphan_probe() -> None:
# Our spawner exits immediately after fork; wait (bounded) until we are reparented so the
# prctl is installed with the parent ALREADY dead — the exact race the ppid check closes.
for _ in range(200):
if os.getppid() == 1:
break
time.sleep(0.05)
else:
mark("NEVER_REPARENTED") # e.g. a subreaper environment — test will fail visibly
return
try:
lifetime.install_lifetime_guards()
except SystemExit:
mark("REFUSED")
raise
mark("GUARDS_OK")
def cmd_fetch_checkout(recipe: str, ref: str) -> None:
import run_recipe_ci
run_recipe_ci.fetch_recipe(recipe, None, None)
abra.recipe_checkout(recipe, ref)
head = abra.recipe_head_commit(recipe)
with open(os.path.join(abra.recipe_dir(recipe), "data.txt")) as f:
content = f.read().strip()
mark(f"RESULT {head} {content}")
if __name__ == "__main__":
cmd, *args = sys.argv[1:]
{
"hold": cmd_hold,
"hold-with-child": cmd_hold_with_child,
"guarded": cmd_guarded,
"wrapper": cmd_wrapper,
"orphan-probe": cmd_orphan_probe,
"fetch-checkout": cmd_fetch_checkout,
}[cmd](*args)

View File

@ -0,0 +1,175 @@
"""Per-run ABRA_DIR isolation (concurrency-restructure plan, cases 17-19). Real directories,
real symlinks, real git — abra itself is replaced by a recording stub where a CLI call is
involved (case 17), because these cases test OUR dir/env plumbing, not abra."""
from __future__ import annotations
import os
import stat
import subprocess
import sys
sys.path.insert(0, os.path.dirname(__file__))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner"))
import run_recipe_ci # noqa: E402
from concutil import wait_marker # noqa: E402
from harness import abra # noqa: E402
RECIPE = "fakerecipe"
def _git(cwd, *args):
subprocess.run(
["git", "-c", "user.email=t@t", "-c", "user.name=t", *args],
cwd=cwd,
check=True,
capture_output=True,
)
def _make_fake_home(tmp_path):
"""A fake $HOME with a canonical ~/.abra: servers/default + catalogue dirs, and a recipe git
repo with two tags whose data.txt differs (v1 -> 'one', v2 -> 'two', HEAD at v2)."""
home = tmp_path / "home"
(home / ".abra" / "servers" / "default").mkdir(parents=True)
(home / ".abra" / "catalogue").mkdir(parents=True)
repo = home / ".abra" / "recipes" / RECIPE
repo.mkdir(parents=True)
_git(repo, "init", "-q")
(repo / "data.txt").write_text("one\n")
_git(repo, "add", "data.txt")
_git(repo, "commit", "-qm", "v1")
_git(repo, "tag", "v1")
(repo / "data.txt").write_text("two\n")
_git(repo, "add", "data.txt")
_git(repo, "commit", "-qm", "v2")
_git(repo, "tag", "v2")
return home
def test_17_per_run_dir_built_and_exported_before_abra(tmp_path, monkeypatch):
"""Case 17: setup_run_abra_dir builds the per-run dir correctly (servers/catalogue symlinks
resolve to the canonical tree, recipes/ empty + writable) and $ABRA_DIR is exported before
the first abra call — proven by a stub `abra` on PATH that records the env it saw."""
home = _make_fake_home(tmp_path)
monkeypatch.setenv("HOME", str(home))
monkeypatch.setenv("CCCI_RUNS_DIR", str(tmp_path / "runs"))
monkeypatch.setenv("DRONE_BUILD_NUMBER", "777")
monkeypatch.setenv("ABRA_DIR", "sentinel-to-be-overwritten") # so monkeypatch restores it
d = run_recipe_ci.setup_run_abra_dir()
assert d == str(tmp_path / "runs" / "777" / "abra")
assert os.environ["ABRA_DIR"] == d
assert os.readlink(os.path.join(d, "servers")) == str(home / ".abra" / "servers")
assert os.readlink(os.path.join(d, "catalogue")) == str(home / ".abra" / "catalogue")
# symlinks RESOLVE (targets exist) and recipes/ is empty + writable
assert os.path.isdir(os.path.join(d, "servers", "default"))
assert os.path.isdir(os.path.join(d, "catalogue"))
assert os.listdir(os.path.join(d, "recipes")) == []
probe = os.path.join(d, "recipes", ".write-probe")
open(probe, "w").close()
os.remove(probe)
# idempotent re-entry (Drone build-number retry): must not raise on existing symlinks
assert run_recipe_ci.setup_run_abra_dir() == d
# stub abra records $ABRA_DIR at call time; fetch_recipe's catalogue branch invokes it
stub_dir = tmp_path / "bin"
stub_dir.mkdir()
log = tmp_path / "abra-env.log"
stub = stub_dir / "abra"
stub.write_text(f'#!/bin/sh\necho "$ABRA_DIR" >> {log}\nexit 0\n')
stub.chmod(stub.stat().st_mode | stat.S_IEXEC)
monkeypatch.setenv("PATH", f"{stub_dir}{os.pathsep}{os.environ['PATH']}")
monkeypatch.delenv("CCCI_SKIP_FETCH", raising=False)
run_recipe_ci.fetch_recipe(RECIPE, None, None)
assert log.read_text().strip() == d, "abra was called without the per-run ABRA_DIR exported"
def test_18_concurrent_same_recipe_fetch_no_cross_talk(tmp_path, monkeypatch, pool):
"""Case 18: two CONCURRENT fetch+checkout flows of the SAME recipe into different ABRA_DIRs
produce two correct, divergent trees (v1 vs v2) — the old shared-tree corruption scenario,
now structurally safe with no lock. The canonical staged clone is untouched."""
home = _make_fake_home(tmp_path)
canonical_repo = home / ".abra" / "recipes" / RECIPE
head_before = subprocess.run(
["git", "-C", canonical_repo, "rev-parse", "HEAD"], capture_output=True, text=True
).stdout.strip()
runs = {}
for name, ref in (("runA", "v1"), ("runB", "v2")):
abra_dir = tmp_path / name / "abra"
abra_dir.mkdir(parents=True)
_, out = pool.spawn(
"fetch-checkout",
RECIPE,
ref,
env_extra={
"HOME": str(home),
"ABRA_DIR": str(abra_dir),
"CCCI_SKIP_FETCH": "1",
},
)
runs[name] = (out, ref, abra_dir)
expect = {"v1": "one", "v2": "two"}
for name, (out, ref, abra_dir) in runs.items():
line = wait_marker(out, "RESULT", timeout=30)
assert line, f"{name} never produced a RESULT"
_, head, content = line.split()
assert content == expect[ref], f"{name}@{ref}: tree content {content!r}"
tree = abra_dir / "recipes" / RECIPE
assert (tree / "data.txt").read_text().strip() == expect[ref]
assert (
head
== subprocess.run(
["git", "-C", tree, "rev-parse", "HEAD"], capture_output=True, text=True
).stdout.strip()
)
# the two trees genuinely diverge AND the canonical staged clone is untouched
a = (runs["runA"][2] / "recipes" / RECIPE / "data.txt").read_text()
b = (runs["runB"][2] / "recipes" / RECIPE / "data.txt").read_text()
assert a != b
head_after = subprocess.run(
["git", "-C", canonical_repo, "rev-parse", "HEAD"], capture_output=True, text=True
).stdout.strip()
assert head_after == head_before, "canonical clone must not be touched by per-run fetches"
def test_19_env_written_through_servers_symlink_lands_canonical(tmp_path, monkeypatch):
"""Case 19: an app .env written through the per-run servers/ symlink (what abra does under
$ABRA_DIR) lands in the CANONICAL shared path — so janitor discovery and every
expanduser('~/.abra/servers/...') reader keep working unchanged."""
home = _make_fake_home(tmp_path)
monkeypatch.setenv("HOME", str(home))
monkeypatch.setenv("CCCI_RUNS_DIR", str(tmp_path / "runs"))
monkeypatch.setenv("DRONE_BUILD_NUMBER", "778")
monkeypatch.setenv("ABRA_DIR", "sentinel-to-be-overwritten")
d = run_recipe_ci.setup_run_abra_dir()
domain = "test-abc123.ci.commoninternet.net"
via_symlink = os.path.join(d, "servers", "default", f"{domain}.env")
with open(via_symlink, "w") as f:
f.write("TYPE=fakerecipe:1.0.0\nDOMAIN=placeholder\n")
canonical = home / ".abra" / "servers" / "default" / f"{domain}.env"
assert canonical.is_file(), ".env written via the symlink must land in the canonical path"
# the canonical-path readers/writers (abra.env_get/env_set use ~/.abra) see the same file
assert abra.env_get(domain, "TYPE") == "fakerecipe:1.0.0"
abra.env_set(domain, "DOMAIN", domain)
with open(via_symlink) as f:
assert f"DOMAIN={domain}" in f.read()
def test_18b_run_id_manual_fallback_is_per_process(tmp_path, monkeypatch):
"""Companion to case 18: two concurrent MANUAL runs (no DRONE_BUILD_NUMBER) must not share an
abra dir either — the manual fallback is pid-suffixed."""
home = _make_fake_home(tmp_path)
monkeypatch.setenv("HOME", str(home))
monkeypatch.setenv("CCCI_RUNS_DIR", str(tmp_path / "runs"))
monkeypatch.delenv("DRONE_BUILD_NUMBER", raising=False)
monkeypatch.delenv("CCCI_APP_DOMAIN", raising=False)
monkeypatch.delenv("CCCI_RUN_ID", raising=False)
monkeypatch.setenv("ABRA_DIR", "sentinel-to-be-overwritten")
d = run_recipe_ci.setup_run_abra_dir()
assert f"manual-{os.getpid()}" in d

View File

@ -0,0 +1,189 @@
"""Janitor / flock-probe semantics (concurrency-restructure plan, cases 5-12).
The janitor runs IN-PROCESS with its discovery monkeypatched (candidates injected via a stubbed
abra.app_ls + empty docker sweep) and teardown_app stubbed to record calls — but the LOCKS are
real kernel flocks, held by real helper subprocesses where a live owner is needed."""
from __future__ import annotations
import os
import sys
import threading
import time
sys.path.insert(0, os.path.dirname(__file__))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner"))
from concutil import DOMAIN, lock_state, wait_marker # noqa: E402
from harness import lifecycle # noqa: E402
def _inject_candidates(monkeypatch, domains):
"""Point janitor discovery at exactly `domains`: abra lists them, docker sweep is empty.
teardown_app is stubbed to a recorder; returns the calls list."""
calls = []
monkeypatch.setattr(lifecycle.abra, "app_ls", lambda: [{"appName": d} for d in domains])
monkeypatch.setattr(lifecycle, "_docker_names", lambda kind, stack: [])
monkeypatch.setattr(lifecycle, "teardown_app", lambda d, verify=True: calls.append(d))
return calls
def test_5_orphan_reaped_lockfile_unlinked(lock_dir, pool, monkeypatch):
"""Case 5: an orphan (lockfile exists, no holder — its run was SIGKILL'd) is reaped exactly
once and its lockfile unlinked."""
p, out = pool.spawn("hold", DOMAIN)
assert wait_marker(out, "ACQUIRED")
p.kill()
p.wait(timeout=10)
calls = _inject_candidates(monkeypatch, [DOMAIN])
lifecycle.janitor()
assert calls == [DOMAIN], f"teardown calls: {calls} (expected exactly one)"
assert lock_state(DOMAIN) == "absent", "reaped orphan's lockfile must be unlinked"
def test_6_live_run_never_reaped(lock_dir, pool, monkeypatch, capsys):
"""Case 6: a held lock (live helper) is never reaped and is logged as live."""
p, out = pool.spawn("hold", DOMAIN)
assert wait_marker(out, "ACQUIRED")
calls = _inject_candidates(monkeypatch, [DOMAIN])
lifecycle.janitor()
assert calls == []
assert "live concurrent run" in capsys.readouterr().out
assert lock_state(DOMAIN) == "held"
def test_7_new_run_blocks_until_reap_finishes(lock_dir, pool, monkeypatch):
"""Case 7: the janitor reaps WHILE HOLDING the probe lock, so a new run of the same domain
blocks in acquire_app_lock until the reap completes — no window where a fresh app coexists
with a half-reaped one."""
# Make an orphan.
p, out = pool.spawn("hold", DOMAIN)
assert wait_marker(out, "ACQUIRED")
p.kill()
p.wait(timeout=10)
state = {"teardown_end": None, "acquirer_out": None}
def slow_teardown(domain, verify=True):
# While the janitor holds the probe lock mid-reap, a new run starts acquiring.
_, aout = pool.spawn("hold", DOMAIN)
state["acquirer_out"] = aout
time.sleep(2.0)
state["teardown_end"] = time.time()
monkeypatch.setattr(lifecycle.abra, "app_ls", lambda: [{"appName": DOMAIN}])
monkeypatch.setattr(lifecycle, "_docker_names", lambda kind, stack: [])
monkeypatch.setattr(lifecycle, "teardown_app", slow_teardown)
lifecycle.janitor()
line = wait_marker(state["acquirer_out"], "ACQUIRED", timeout=15)
assert line, "new run never acquired after the reap"
acquired_ts = float(line.split()[1])
assert (
acquired_ts >= state["teardown_end"]
), f"new run acquired at {acquired_ts} BEFORE the reap finished at {state['teardown_end']}"
# The new run must hold a lock the next probe can SEE (fresh inode at the path).
assert lock_state(DOMAIN) == "held"
def test_8_two_janitors_exactly_one_reaps(lock_dir, pool, monkeypatch):
"""Case 8: two concurrent janitors arbitrate on the probe flock — exactly one reaps (the
other sees 'held' and leaves). Teardown is slowed so the runs genuinely overlap."""
p, out = pool.spawn("hold", DOMAIN)
assert wait_marker(out, "ACQUIRED")
p.kill()
p.wait(timeout=10)
calls = []
calls_lock = threading.Lock()
def slow_teardown(domain, verify=True):
with calls_lock:
calls.append(domain)
time.sleep(2.0)
monkeypatch.setattr(lifecycle.abra, "app_ls", lambda: [{"appName": DOMAIN}])
monkeypatch.setattr(lifecycle, "_docker_names", lambda kind, stack: [])
monkeypatch.setattr(lifecycle, "teardown_app", slow_teardown)
barrier = threading.Barrier(2)
def run_janitor():
barrier.wait()
lifecycle.janitor()
t1, t2 = threading.Thread(target=run_janitor), threading.Thread(target=run_janitor)
t1.start(), t2.start()
t1.join(timeout=30), t2.join(timeout=30)
assert calls == [DOMAIN], f"expected exactly one reap, got {calls}"
assert lock_state(DOMAIN) == "absent"
def test_9_reboot_lockfile_absent_reaped_immediately(lock_dir, monkeypatch):
"""Case 9: post-reboot simulation — the app exists but its lockfile is gone (/run/lock is
tmpfs). The probe trivially acquires -> immediate reap, NO age threshold (improvement over
the old 2h fallback)."""
assert lock_state(DOMAIN) == "absent"
calls = _inject_candidates(monkeypatch, [DOMAIN])
t0 = time.time()
lifecycle.janitor()
assert calls == [DOMAIN]
assert time.time() - t0 < 5, "reap must be immediate (no age wait)"
def test_10_long_held_lock_flagged_never_stolen(lock_dir, pool, monkeypatch, capsys):
"""Case 10: a lock held with mtime older than 120min is flagged as a possible leaked run —
and NOT reaped (never steal a held lock)."""
p, out = pool.spawn("hold", DOMAIN)
assert wait_marker(out, "ACQUIRED")
path = lifecycle._app_lock_path(DOMAIN) # noqa: SLF001
backdate = time.time() - (130 * 60)
os.utime(path, (backdate, backdate))
calls = _inject_candidates(monkeypatch, [DOMAIN])
lifecycle.janitor()
assert calls == []
out_text = capsys.readouterr().out
assert "possible leaked run" in out_text and "lslocks" in out_text
assert lock_state(DOMAIN) == "held"
def test_11_warm_canonical_names_never_probed(lock_dir, monkeypatch):
"""Case 11: RUN_APP_RE allowlist — warm/canonical-shaped names never become candidates, so
they are never probed (no lockfile is even created for them) and never reaped."""
warmish = [
"warm-keycloak.ci.commoninternet.net",
"keycloak.ci.commoninternet.net",
"warm-hedgedoc.ci.commoninternet.net",
"drone.ci.commoninternet.net",
]
calls = []
monkeypatch.setattr(lifecycle.abra, "app_ls", lambda: [{"appName": d} for d in warmish])
monkeypatch.setattr(
lifecycle,
"_docker_names",
lambda kind, stack: ["warm-keycloak_ci_commoninternet_net_app"]
if kind == "service"
else [],
)
monkeypatch.setattr(lifecycle, "teardown_app", lambda d, verify=True: calls.append(d))
lifecycle.janitor()
assert calls == []
lockdir = os.environ["CCCI_APP_LOCK_DIR"]
assert [
f for f in os.listdir(lockdir) if f.startswith("cc-ci-app-")
] == [], "janitor must not create lockfiles for non-run-app names"
def test_12_degrades_safely_on_bad_lockfile_and_missing_dir(lock_dir, monkeypatch, capsys):
"""Case 12: a garbled/unopenable lockfile (here: a DIRECTORY at the lockfile path) is skipped
with a log line; a missing lock dir doesn't crash the janitor either. Never a crash."""
path = lifecycle._app_lock_path(DOMAIN) # noqa: SLF001
os.makedirs(path) # open(path, "a") -> IsADirectoryError (an OSError)
calls = _inject_candidates(monkeypatch, [DOMAIN])
lifecycle.janitor() # must not raise
assert calls == []
assert "skipping" in capsys.readouterr().out
os.rmdir(path)
monkeypatch.setenv("CCCI_APP_LOCK_DIR", os.path.join(os.environ["CCCI_APP_LOCK_DIR"], "gone"))
lifecycle.janitor() # missing dir: probe open fails -> skip; tidy glob -> empty. No crash.
assert calls == []

View File

@ -0,0 +1,82 @@
"""Lifetime hardening (concurrency-restructure plan, cases 13-16): the REAL prctl/signal/alarm
guards installed by helper subprocesses; tests assert teardown ran, exit was non-zero, and the
lock was released."""
from __future__ import annotations
import os
import signal
import sys
sys.path.insert(0, os.path.dirname(__file__))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner"))
from concutil import ( # noqa: E402
DOMAIN,
wait_lock_state,
wait_marker,
wait_pid_gone,
)
def test_13_pdeathsig_parent_kill_terms_harness(lock_dir, pool):
"""Case 13: wrapper-parent spawns a guarded harness-child; the parent is SIGKILL'd (the
harness gets no courtesy signal) -> the kernel's PDEATHSIG TERMs the child, its teardown
funnel runs, it exits, and the lock is released."""
p, out = pool.spawn("wrapper", DOMAIN)
line = wait_marker(out, "WRAPPED")
assert line, "wrapper never spawned its child"
child_pid = int(line.split()[1])
pool.track_pid(child_pid)
assert wait_marker(out, "READY"), "guarded child never got ready"
p.kill() # parent dies WITHOUT signalling the child — only PDEATHSIG can save us
p.wait(timeout=10)
assert wait_pid_gone(child_pid), "guarded child must exit on parent death (PDEATHSIG)"
assert wait_marker(out, "TEARDOWN", timeout=5), "teardown funnel did not run"
assert wait_lock_state(DOMAIN, "free") == "free"
def test_14_already_orphaned_helper_refuses_to_run(lock_dir, pool):
"""Case 14 (ppid race): a helper whose parent died BEFORE the prctl was armed (it starts
already reparented to pid 1) must refuse to run — PDEATHSIG would never fire for it."""
# Spawn an intermediate parent that forks orphan-probe and exits immediately.
import subprocess
out = os.path.join(pool.out_dir, "orphan.out")
intermediate = (
"import subprocess, sys, os; "
"subprocess.Popen([sys.executable, os.environ['CCCI_HELPERS'], 'orphan-probe']); "
)
env = dict(
os.environ,
CCCI_HELPER_OUT=out,
CCCI_HELPERS=os.path.join(os.path.dirname(__file__), "helpers.py"),
)
subprocess.run([sys.executable, "-c", intermediate], env=env, timeout=15, check=True)
line = wait_marker(out, "REFUSED", timeout=20)
assert line, "orphaned helper did not refuse to run (or never reparented to pid 1)"
def test_15_deadline_alarm_fires_teardown_and_releases(lock_dir, pool):
"""Case 15: the self-deadline (alarm). A guarded helper with a 2s deadline tears down via
the funnel (finally: ran), exits NON-zero, and its lock is released."""
p, out = pool.spawn("guarded", DOMAIN, "2")
assert wait_marker(out, "READY")
rc = p.wait(timeout=20)
assert rc != 0, f"deadline exit must be non-zero (got {rc})"
assert rc == 128 + signal.SIGALRM, f"expected 142 (128+SIGALRM), got {rc}"
assert wait_marker(out, "TEARDOWN", timeout=5), "teardown funnel did not run on deadline"
assert wait_lock_state(DOMAIN, "free") == "free"
def test_16_sigterm_runs_teardown_funnel_and_releases(lock_dir, pool):
"""Case 16: SIGTERM (drone cancel path) -> the finally: teardown funnel runs, exit is
non-zero, lock released."""
p, out = pool.spawn("guarded", DOMAIN, "3600")
assert wait_marker(out, "READY")
p.send_signal(signal.SIGTERM)
rc = p.wait(timeout=20)
assert rc != 0, f"SIGTERM exit must be non-zero (got {rc})"
assert rc == 128 + signal.SIGTERM, f"expected 143 (128+SIGTERM), got {rc}"
assert wait_marker(out, "TEARDOWN", timeout=5), "teardown funnel did not run on SIGTERM"
assert wait_lock_state(DOMAIN, "free") == "free"

View File

@ -0,0 +1,85 @@
"""Lock fundamentals (concurrency-restructure plan, cases 1-4). Real kernel flocks held by real
subprocesses — nothing mocked."""
from __future__ import annotations
import fcntl
import os
import sys
import time
sys.path.insert(0, os.path.dirname(__file__))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner"))
from concutil import ( # noqa: E402
DOMAIN,
lock_state,
wait_lock_state,
wait_marker,
)
from harness import lifecycle # noqa: E402
def test_1_sigkill_releases_lock(lock_dir, pool):
"""Case 1: acquire -> holder SIGKILL'd -> lock immediately acquirable (kernel auto-release).
The exact property the old pidfile registry approximated with /proc checks."""
p, out = pool.spawn("hold", DOMAIN)
assert wait_marker(out, "ACQUIRED"), "holder never acquired"
assert lock_state(DOMAIN) == "held"
p.kill()
p.wait(timeout=10)
assert wait_lock_state(DOMAIN, "free") == "free"
def test_2_nb_probe_held_vs_unheld(lock_dir, pool):
"""Case 2: LOCK_NB probe raises BlockingIOError against a held lock; succeeds when unheld."""
p, out = pool.spawn("hold", DOMAIN)
assert wait_marker(out, "ACQUIRED")
path = lifecycle._app_lock_path(DOMAIN) # noqa: SLF001
with open(path, "a") as f:
try:
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
raise AssertionError("LOCK_NB succeeded against a held lock")
except BlockingIOError:
pass
p.kill()
p.wait(timeout=10)
assert wait_lock_state(DOMAIN, "free") == "free"
with open(path, "a") as f:
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) # must not raise now
def test_3_lock_fd_not_inherited_by_children(lock_dir, pool):
"""Case 3 (PEP 446): the holder spawns a subprocess child, the holder dies, the child lives —
and the lock is STILL released (the child never inherited the lock fd). This is what makes
'held lock == live HARNESS owner' sound even though runs spawn abra/docker/pytest children."""
p, out = pool.spawn("hold-with-child", DOMAIN)
assert wait_marker(out, "ACQUIRED")
child_line = wait_marker(out, "CHILD")
assert child_line, "holder never reported its child pid"
child_pid = int(child_line.split()[1])
pool.track_pid(child_pid)
p.kill()
p.wait(timeout=10)
assert os.path.exists(f"/proc/{child_pid}"), "child should outlive the holder"
assert (
wait_lock_state(DOMAIN, "free") == "free"
), "lock must release on holder death even with a live child (PEP 446 non-inheritable fd)"
def test_4_second_acquire_blocks_until_first_exits(lock_dir, pool):
"""Case 4: a second same-domain acquire blocks until the first holder exits — the
double-!testme serialisation property."""
p1, out1 = pool.spawn("hold", DOMAIN)
assert wait_marker(out1, "ACQUIRED")
p2, out2 = pool.spawn("hold", DOMAIN)
# p2 must NOT acquire while p1 holds.
time.sleep(1.5)
assert wait_marker(out2, "ACQUIRED", timeout=0.1) is None, "second acquire did not block"
t_kill = time.time()
p1.kill()
p1.wait(timeout=10)
line = wait_marker(out2, "ACQUIRED", timeout=15)
assert line, "second acquire never completed after first holder exited"
acquired_ts = float(line.split()[1])
assert acquired_ts >= t_kill - 0.05, "second holder acquired before the first exited"
assert lock_state(DOMAIN) == "held"