diff --git a/tests/concurrency/concutil.py b/tests/concurrency/concutil.py new file mode 100644 index 0000000..374e747 --- /dev/null +++ b/tests/concurrency/concutil.py @@ -0,0 +1,108 @@ +"""Shared utilities for the real-kernel concurrency suite (imported by the test modules; the +fixtures in conftest.py wrap these). No flock mocking anywhere — probes use real LOCK_NB.""" + +from __future__ import annotations + +import contextlib +import fcntl +import os +import signal +import subprocess +import sys +import time + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner")) +from harness import lifecycle # noqa: E402 + +HELPERS = os.path.join(os.path.dirname(__file__), "helpers.py") +DOMAIN = "test-abc123.ci.commoninternet.net" # matches RUN_APP_RE + + +class HelperPool: + """Spawns helpers.py subprocesses and GUARANTEES their cleanup (incl. recorded grandchild + pids from `hold-with-child`/`wrapper` markers) — no leaked children in the test VM.""" + + def __init__(self, out_dir: str): + self.out_dir = out_dir + self.procs: list[subprocess.Popen] = [] + self.extra_pids: list[int] = [] + self._n = 0 + + def spawn(self, *args: str, env_extra: dict | None = None) -> tuple[subprocess.Popen, str]: + """Start `helpers.py `; returns (proc, marker_file).""" + self._n += 1 + out = os.path.join(self.out_dir, f"helper-{self._n}.out") + env = dict(os.environ, CCCI_HELPER_OUT=out, **(env_extra or {})) + p = subprocess.Popen( # noqa: S603 + [sys.executable, HELPERS, *args], + env=env, + stdout=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + ) + self.procs.append(p) + return p, out + + def track_pid(self, pid: int) -> None: + self.extra_pids.append(pid) + + def cleanup(self) -> None: + for p in self.procs: + if p.poll() is None: + p.kill() + with contextlib.suppress(subprocess.TimeoutExpired): + p.wait(timeout=10) + for pid in self.extra_pids: + with contextlib.suppress(OSError): + os.kill(pid, signal.SIGKILL) + + +def wait_marker(out: str, token: str, timeout: float = 15.0) -> str | None: + """Poll a helper's marker file for a line containing `token`; returns the line or None.""" + deadline = time.time() + timeout + while time.time() < deadline: + try: + with open(out) as f: + for line in f: + if token in line: + return line.strip() + except OSError: + pass + time.sleep(0.1) + return None + + +def lock_state(domain: str) -> str: + """'held' | 'free' | 'absent' for the domain's lockfile, probed with a REAL LOCK_NB.""" + path = lifecycle._app_lock_path(domain) # noqa: SLF001 + if not os.path.exists(path): + return "absent" + with open(path, "a") as f: + try: + fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) + return "free" + except BlockingIOError: + return "held" + + +def wait_lock_state(domain: str, want: str, timeout: float = 10.0) -> str: + """Poll until lock_state(domain) == want (kernel release on process death is fast, but give + the scheduler room). Returns the final observed state.""" + deadline = time.time() + timeout + state = lock_state(domain) + while state != want and time.time() < deadline: + time.sleep(0.1) + state = lock_state(domain) + return state + + +def pid_alive(pid: int) -> bool: + return os.path.exists(f"/proc/{pid}") + + +def wait_pid_gone(pid: int, timeout: float = 15.0) -> bool: + deadline = time.time() + timeout + while time.time() < deadline: + if not pid_alive(pid): + return True + time.sleep(0.1) + return False diff --git a/tests/concurrency/conftest.py b/tests/concurrency/conftest.py new file mode 100644 index 0000000..ab7be03 --- /dev/null +++ b/tests/concurrency/conftest.py @@ -0,0 +1,34 @@ +"""Fixtures for the real-kernel concurrency suite (concurrency-restructure plan, 19 cases). + +NOT part of the default `pytest tests/unit` gate — run explicitly with `pytest tests/concurrency +-q` (docs/concurrency.md). Locks live in a per-test tmp dir (CCCI_APP_LOCK_DIR); helper +subprocesses hold REAL flocks / install the REAL prctl+signal guards and are always reaped in +fixture finalizers (no leaked children in the test VM). +""" + +from __future__ import annotations + +import os +import sys + +import pytest + +sys.path.insert(0, os.path.dirname(__file__)) +from concutil import HelperPool # noqa: E402 + + +@pytest.fixture +def lock_dir(tmp_path, monkeypatch): + """Sandbox lock dir, exported so BOTH this process's lifecycle calls and helper subprocesses + (which inherit os.environ) resolve their lockfiles here — never /run/lock.""" + d = tmp_path / "locks" + d.mkdir() + monkeypatch.setenv("CCCI_APP_LOCK_DIR", str(d)) + return str(d) + + +@pytest.fixture +def pool(tmp_path): + hp = HelperPool(str(tmp_path)) + yield hp + hp.cleanup() diff --git a/tests/concurrency/helpers.py b/tests/concurrency/helpers.py new file mode 100644 index 0000000..559801f --- /dev/null +++ b/tests/concurrency/helpers.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +"""Subprocess helpers for tests/concurrency — REAL kernel locks and the REAL lifetime guards in +separate processes (flock/prctl are never mocked; tests assert on actual kernel behavior). + +Invoked as: python3 helpers.py + +Env contract (set by the spawning test): + CCCI_APP_LOCK_DIR sandbox lock dir (never /run/lock in tests) + CCCI_HELPER_OUT marker file this helper APPENDS progress lines to (ACQUIRED/READY/...) + +Commands: + hold acquire the app lock, mark `ACQUIRED `, sleep forever + hold-with-child acquire the lock, spawn a plain sleeping subprocess child, mark + `ACQUIRED ` + `CHILD ` (PEP 446: the child must NOT + inherit the lock fd), sleep forever + guarded install the REAL lifetime guards (alarm=s), acquire the + lock, mark `READY`; when the teardown funnel runs (`finally:`), + mark `TEARDOWN` before exiting + wrapper spawn `guarded 3600` as MY child, mark `WRAPPED `, + sleep — the test kills me to prove PDEATHSIG TERMs the child + orphan-probe wait (bounded) until reparented (ppid==1), then install the + guards; mark `REFUSED` if they exit (expected) or `GUARDS_OK` + fetch-checkout run run_recipe_ci.fetch_recipe (the test sets CCCI_SKIP_FETCH=1 + + a per-"run" ABRA_DIR), git-checkout , mark + `RESULT ` +""" + +from __future__ import annotations + +import os +import subprocess +import sys +import time + +sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "runner")) +from harness import abra, lifecycle, lifetime # noqa: E402 + +OUT = os.environ.get("CCCI_HELPER_OUT") + + +def mark(line: str) -> None: + if OUT: + with open(OUT, "a") as f: + f.write(line + "\n") + f.flush() + print(line, flush=True) + + +def cmd_hold(domain: str) -> None: + lifecycle.acquire_app_lock(domain) + mark(f"ACQUIRED {time.time()}") + time.sleep(3600) + + +def cmd_hold_with_child(domain: str) -> None: + lifecycle.acquire_app_lock(domain) + child = subprocess.Popen([sys.executable, "-c", "import time; time.sleep(3600)"]) + mark(f"ACQUIRED {time.time()}") + mark(f"CHILD {child.pid}") + time.sleep(3600) + + +def cmd_guarded(domain: str, deadline: str) -> None: + lifetime.install_lifetime_guards(deadline_seconds=int(deadline)) + lifecycle.acquire_app_lock(domain) + mark("READY") + try: + time.sleep(3600) + finally: + mark("TEARDOWN") + + +def cmd_wrapper(domain: str) -> None: + p = subprocess.Popen( # noqa: S603 + [sys.executable, os.path.abspath(__file__), "guarded", domain, "3600"], + env=os.environ.copy(), + ) + mark(f"WRAPPED {p.pid}") + time.sleep(3600) + + +def cmd_orphan_probe() -> None: + # Our spawner exits immediately after fork; wait (bounded) until we are reparented so the + # prctl is installed with the parent ALREADY dead — the exact race the ppid check closes. + for _ in range(200): + if os.getppid() == 1: + break + time.sleep(0.05) + else: + mark("NEVER_REPARENTED") # e.g. a subreaper environment — test will fail visibly + return + try: + lifetime.install_lifetime_guards() + except SystemExit: + mark("REFUSED") + raise + mark("GUARDS_OK") + + +def cmd_fetch_checkout(recipe: str, ref: str) -> None: + import run_recipe_ci + + run_recipe_ci.fetch_recipe(recipe, None, None) + abra.recipe_checkout(recipe, ref) + head = abra.recipe_head_commit(recipe) + with open(os.path.join(abra.recipe_dir(recipe), "data.txt")) as f: + content = f.read().strip() + mark(f"RESULT {head} {content}") + + +if __name__ == "__main__": + cmd, *args = sys.argv[1:] + { + "hold": cmd_hold, + "hold-with-child": cmd_hold_with_child, + "guarded": cmd_guarded, + "wrapper": cmd_wrapper, + "orphan-probe": cmd_orphan_probe, + "fetch-checkout": cmd_fetch_checkout, + }[cmd](*args) diff --git a/tests/concurrency/test_abra_dir.py b/tests/concurrency/test_abra_dir.py new file mode 100644 index 0000000..9867fe1 --- /dev/null +++ b/tests/concurrency/test_abra_dir.py @@ -0,0 +1,175 @@ +"""Per-run ABRA_DIR isolation (concurrency-restructure plan, cases 17-19). Real directories, +real symlinks, real git — abra itself is replaced by a recording stub where a CLI call is +involved (case 17), because these cases test OUR dir/env plumbing, not abra.""" + +from __future__ import annotations + +import os +import stat +import subprocess +import sys + +sys.path.insert(0, os.path.dirname(__file__)) +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner")) +import run_recipe_ci # noqa: E402 +from concutil import wait_marker # noqa: E402 +from harness import abra # noqa: E402 + +RECIPE = "fakerecipe" + + +def _git(cwd, *args): + subprocess.run( + ["git", "-c", "user.email=t@t", "-c", "user.name=t", *args], + cwd=cwd, + check=True, + capture_output=True, + ) + + +def _make_fake_home(tmp_path): + """A fake $HOME with a canonical ~/.abra: servers/default + catalogue dirs, and a recipe git + repo with two tags whose data.txt differs (v1 -> 'one', v2 -> 'two', HEAD at v2).""" + home = tmp_path / "home" + (home / ".abra" / "servers" / "default").mkdir(parents=True) + (home / ".abra" / "catalogue").mkdir(parents=True) + repo = home / ".abra" / "recipes" / RECIPE + repo.mkdir(parents=True) + _git(repo, "init", "-q") + (repo / "data.txt").write_text("one\n") + _git(repo, "add", "data.txt") + _git(repo, "commit", "-qm", "v1") + _git(repo, "tag", "v1") + (repo / "data.txt").write_text("two\n") + _git(repo, "add", "data.txt") + _git(repo, "commit", "-qm", "v2") + _git(repo, "tag", "v2") + return home + + +def test_17_per_run_dir_built_and_exported_before_abra(tmp_path, monkeypatch): + """Case 17: setup_run_abra_dir builds the per-run dir correctly (servers/catalogue symlinks + resolve to the canonical tree, recipes/ empty + writable) and $ABRA_DIR is exported before + the first abra call — proven by a stub `abra` on PATH that records the env it saw.""" + home = _make_fake_home(tmp_path) + monkeypatch.setenv("HOME", str(home)) + monkeypatch.setenv("CCCI_RUNS_DIR", str(tmp_path / "runs")) + monkeypatch.setenv("DRONE_BUILD_NUMBER", "777") + monkeypatch.setenv("ABRA_DIR", "sentinel-to-be-overwritten") # so monkeypatch restores it + + d = run_recipe_ci.setup_run_abra_dir() + assert d == str(tmp_path / "runs" / "777" / "abra") + assert os.environ["ABRA_DIR"] == d + assert os.readlink(os.path.join(d, "servers")) == str(home / ".abra" / "servers") + assert os.readlink(os.path.join(d, "catalogue")) == str(home / ".abra" / "catalogue") + # symlinks RESOLVE (targets exist) and recipes/ is empty + writable + assert os.path.isdir(os.path.join(d, "servers", "default")) + assert os.path.isdir(os.path.join(d, "catalogue")) + assert os.listdir(os.path.join(d, "recipes")) == [] + probe = os.path.join(d, "recipes", ".write-probe") + open(probe, "w").close() + os.remove(probe) + # idempotent re-entry (Drone build-number retry): must not raise on existing symlinks + assert run_recipe_ci.setup_run_abra_dir() == d + + # stub abra records $ABRA_DIR at call time; fetch_recipe's catalogue branch invokes it + stub_dir = tmp_path / "bin" + stub_dir.mkdir() + log = tmp_path / "abra-env.log" + stub = stub_dir / "abra" + stub.write_text(f'#!/bin/sh\necho "$ABRA_DIR" >> {log}\nexit 0\n') + stub.chmod(stub.stat().st_mode | stat.S_IEXEC) + monkeypatch.setenv("PATH", f"{stub_dir}{os.pathsep}{os.environ['PATH']}") + monkeypatch.delenv("CCCI_SKIP_FETCH", raising=False) + run_recipe_ci.fetch_recipe(RECIPE, None, None) + assert log.read_text().strip() == d, "abra was called without the per-run ABRA_DIR exported" + + +def test_18_concurrent_same_recipe_fetch_no_cross_talk(tmp_path, monkeypatch, pool): + """Case 18: two CONCURRENT fetch+checkout flows of the SAME recipe into different ABRA_DIRs + produce two correct, divergent trees (v1 vs v2) — the old shared-tree corruption scenario, + now structurally safe with no lock. The canonical staged clone is untouched.""" + home = _make_fake_home(tmp_path) + canonical_repo = home / ".abra" / "recipes" / RECIPE + head_before = subprocess.run( + ["git", "-C", canonical_repo, "rev-parse", "HEAD"], capture_output=True, text=True + ).stdout.strip() + + runs = {} + for name, ref in (("runA", "v1"), ("runB", "v2")): + abra_dir = tmp_path / name / "abra" + abra_dir.mkdir(parents=True) + _, out = pool.spawn( + "fetch-checkout", + RECIPE, + ref, + env_extra={ + "HOME": str(home), + "ABRA_DIR": str(abra_dir), + "CCCI_SKIP_FETCH": "1", + }, + ) + runs[name] = (out, ref, abra_dir) + + expect = {"v1": "one", "v2": "two"} + for name, (out, ref, abra_dir) in runs.items(): + line = wait_marker(out, "RESULT", timeout=30) + assert line, f"{name} never produced a RESULT" + _, head, content = line.split() + assert content == expect[ref], f"{name}@{ref}: tree content {content!r}" + tree = abra_dir / "recipes" / RECIPE + assert (tree / "data.txt").read_text().strip() == expect[ref] + assert ( + head + == subprocess.run( + ["git", "-C", tree, "rev-parse", "HEAD"], capture_output=True, text=True + ).stdout.strip() + ) + + # the two trees genuinely diverge AND the canonical staged clone is untouched + a = (runs["runA"][2] / "recipes" / RECIPE / "data.txt").read_text() + b = (runs["runB"][2] / "recipes" / RECIPE / "data.txt").read_text() + assert a != b + head_after = subprocess.run( + ["git", "-C", canonical_repo, "rev-parse", "HEAD"], capture_output=True, text=True + ).stdout.strip() + assert head_after == head_before, "canonical clone must not be touched by per-run fetches" + + +def test_19_env_written_through_servers_symlink_lands_canonical(tmp_path, monkeypatch): + """Case 19: an app .env written through the per-run servers/ symlink (what abra does under + $ABRA_DIR) lands in the CANONICAL shared path — so janitor discovery and every + expanduser('~/.abra/servers/...') reader keep working unchanged.""" + home = _make_fake_home(tmp_path) + monkeypatch.setenv("HOME", str(home)) + monkeypatch.setenv("CCCI_RUNS_DIR", str(tmp_path / "runs")) + monkeypatch.setenv("DRONE_BUILD_NUMBER", "778") + monkeypatch.setenv("ABRA_DIR", "sentinel-to-be-overwritten") + d = run_recipe_ci.setup_run_abra_dir() + + domain = "test-abc123.ci.commoninternet.net" + via_symlink = os.path.join(d, "servers", "default", f"{domain}.env") + with open(via_symlink, "w") as f: + f.write("TYPE=fakerecipe:1.0.0\nDOMAIN=placeholder\n") + + canonical = home / ".abra" / "servers" / "default" / f"{domain}.env" + assert canonical.is_file(), ".env written via the symlink must land in the canonical path" + # the canonical-path readers/writers (abra.env_get/env_set use ~/.abra) see the same file + assert abra.env_get(domain, "TYPE") == "fakerecipe:1.0.0" + abra.env_set(domain, "DOMAIN", domain) + with open(via_symlink) as f: + assert f"DOMAIN={domain}" in f.read() + + +def test_18b_run_id_manual_fallback_is_per_process(tmp_path, monkeypatch): + """Companion to case 18: two concurrent MANUAL runs (no DRONE_BUILD_NUMBER) must not share an + abra dir either — the manual fallback is pid-suffixed.""" + home = _make_fake_home(tmp_path) + monkeypatch.setenv("HOME", str(home)) + monkeypatch.setenv("CCCI_RUNS_DIR", str(tmp_path / "runs")) + monkeypatch.delenv("DRONE_BUILD_NUMBER", raising=False) + monkeypatch.delenv("CCCI_APP_DOMAIN", raising=False) + monkeypatch.delenv("CCCI_RUN_ID", raising=False) + monkeypatch.setenv("ABRA_DIR", "sentinel-to-be-overwritten") + d = run_recipe_ci.setup_run_abra_dir() + assert f"manual-{os.getpid()}" in d diff --git a/tests/concurrency/test_janitor.py b/tests/concurrency/test_janitor.py new file mode 100644 index 0000000..bdf61b9 --- /dev/null +++ b/tests/concurrency/test_janitor.py @@ -0,0 +1,189 @@ +"""Janitor / flock-probe semantics (concurrency-restructure plan, cases 5-12). + +The janitor runs IN-PROCESS with its discovery monkeypatched (candidates injected via a stubbed +abra.app_ls + empty docker sweep) and teardown_app stubbed to record calls — but the LOCKS are +real kernel flocks, held by real helper subprocesses where a live owner is needed.""" + +from __future__ import annotations + +import os +import sys +import threading +import time + +sys.path.insert(0, os.path.dirname(__file__)) +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner")) +from concutil import DOMAIN, lock_state, wait_marker # noqa: E402 +from harness import lifecycle # noqa: E402 + + +def _inject_candidates(monkeypatch, domains): + """Point janitor discovery at exactly `domains`: abra lists them, docker sweep is empty. + teardown_app is stubbed to a recorder; returns the calls list.""" + calls = [] + monkeypatch.setattr(lifecycle.abra, "app_ls", lambda: [{"appName": d} for d in domains]) + monkeypatch.setattr(lifecycle, "_docker_names", lambda kind, stack: []) + monkeypatch.setattr(lifecycle, "teardown_app", lambda d, verify=True: calls.append(d)) + return calls + + +def test_5_orphan_reaped_lockfile_unlinked(lock_dir, pool, monkeypatch): + """Case 5: an orphan (lockfile exists, no holder — its run was SIGKILL'd) is reaped exactly + once and its lockfile unlinked.""" + p, out = pool.spawn("hold", DOMAIN) + assert wait_marker(out, "ACQUIRED") + p.kill() + p.wait(timeout=10) + calls = _inject_candidates(monkeypatch, [DOMAIN]) + lifecycle.janitor() + assert calls == [DOMAIN], f"teardown calls: {calls} (expected exactly one)" + assert lock_state(DOMAIN) == "absent", "reaped orphan's lockfile must be unlinked" + + +def test_6_live_run_never_reaped(lock_dir, pool, monkeypatch, capsys): + """Case 6: a held lock (live helper) is never reaped and is logged as live.""" + p, out = pool.spawn("hold", DOMAIN) + assert wait_marker(out, "ACQUIRED") + calls = _inject_candidates(monkeypatch, [DOMAIN]) + lifecycle.janitor() + assert calls == [] + assert "live concurrent run" in capsys.readouterr().out + assert lock_state(DOMAIN) == "held" + + +def test_7_new_run_blocks_until_reap_finishes(lock_dir, pool, monkeypatch): + """Case 7: the janitor reaps WHILE HOLDING the probe lock, so a new run of the same domain + blocks in acquire_app_lock until the reap completes — no window where a fresh app coexists + with a half-reaped one.""" + # Make an orphan. + p, out = pool.spawn("hold", DOMAIN) + assert wait_marker(out, "ACQUIRED") + p.kill() + p.wait(timeout=10) + + state = {"teardown_end": None, "acquirer_out": None} + + def slow_teardown(domain, verify=True): + # While the janitor holds the probe lock mid-reap, a new run starts acquiring. + _, aout = pool.spawn("hold", DOMAIN) + state["acquirer_out"] = aout + time.sleep(2.0) + state["teardown_end"] = time.time() + + monkeypatch.setattr(lifecycle.abra, "app_ls", lambda: [{"appName": DOMAIN}]) + monkeypatch.setattr(lifecycle, "_docker_names", lambda kind, stack: []) + monkeypatch.setattr(lifecycle, "teardown_app", slow_teardown) + lifecycle.janitor() + + line = wait_marker(state["acquirer_out"], "ACQUIRED", timeout=15) + assert line, "new run never acquired after the reap" + acquired_ts = float(line.split()[1]) + assert ( + acquired_ts >= state["teardown_end"] + ), f"new run acquired at {acquired_ts} BEFORE the reap finished at {state['teardown_end']}" + # The new run must hold a lock the next probe can SEE (fresh inode at the path). + assert lock_state(DOMAIN) == "held" + + +def test_8_two_janitors_exactly_one_reaps(lock_dir, pool, monkeypatch): + """Case 8: two concurrent janitors arbitrate on the probe flock — exactly one reaps (the + other sees 'held' and leaves). Teardown is slowed so the runs genuinely overlap.""" + p, out = pool.spawn("hold", DOMAIN) + assert wait_marker(out, "ACQUIRED") + p.kill() + p.wait(timeout=10) + + calls = [] + calls_lock = threading.Lock() + + def slow_teardown(domain, verify=True): + with calls_lock: + calls.append(domain) + time.sleep(2.0) + + monkeypatch.setattr(lifecycle.abra, "app_ls", lambda: [{"appName": DOMAIN}]) + monkeypatch.setattr(lifecycle, "_docker_names", lambda kind, stack: []) + monkeypatch.setattr(lifecycle, "teardown_app", slow_teardown) + + barrier = threading.Barrier(2) + + def run_janitor(): + barrier.wait() + lifecycle.janitor() + + t1, t2 = threading.Thread(target=run_janitor), threading.Thread(target=run_janitor) + t1.start(), t2.start() + t1.join(timeout=30), t2.join(timeout=30) + assert calls == [DOMAIN], f"expected exactly one reap, got {calls}" + assert lock_state(DOMAIN) == "absent" + + +def test_9_reboot_lockfile_absent_reaped_immediately(lock_dir, monkeypatch): + """Case 9: post-reboot simulation — the app exists but its lockfile is gone (/run/lock is + tmpfs). The probe trivially acquires -> immediate reap, NO age threshold (improvement over + the old 2h fallback).""" + assert lock_state(DOMAIN) == "absent" + calls = _inject_candidates(monkeypatch, [DOMAIN]) + t0 = time.time() + lifecycle.janitor() + assert calls == [DOMAIN] + assert time.time() - t0 < 5, "reap must be immediate (no age wait)" + + +def test_10_long_held_lock_flagged_never_stolen(lock_dir, pool, monkeypatch, capsys): + """Case 10: a lock held with mtime older than 120min is flagged as a possible leaked run — + and NOT reaped (never steal a held lock).""" + p, out = pool.spawn("hold", DOMAIN) + assert wait_marker(out, "ACQUIRED") + path = lifecycle._app_lock_path(DOMAIN) # noqa: SLF001 + backdate = time.time() - (130 * 60) + os.utime(path, (backdate, backdate)) + calls = _inject_candidates(monkeypatch, [DOMAIN]) + lifecycle.janitor() + assert calls == [] + out_text = capsys.readouterr().out + assert "possible leaked run" in out_text and "lslocks" in out_text + assert lock_state(DOMAIN) == "held" + + +def test_11_warm_canonical_names_never_probed(lock_dir, monkeypatch): + """Case 11: RUN_APP_RE allowlist — warm/canonical-shaped names never become candidates, so + they are never probed (no lockfile is even created for them) and never reaped.""" + warmish = [ + "warm-keycloak.ci.commoninternet.net", + "keycloak.ci.commoninternet.net", + "warm-hedgedoc.ci.commoninternet.net", + "drone.ci.commoninternet.net", + ] + calls = [] + monkeypatch.setattr(lifecycle.abra, "app_ls", lambda: [{"appName": d} for d in warmish]) + monkeypatch.setattr( + lifecycle, + "_docker_names", + lambda kind, stack: ["warm-keycloak_ci_commoninternet_net_app"] + if kind == "service" + else [], + ) + monkeypatch.setattr(lifecycle, "teardown_app", lambda d, verify=True: calls.append(d)) + lifecycle.janitor() + assert calls == [] + lockdir = os.environ["CCCI_APP_LOCK_DIR"] + assert [ + f for f in os.listdir(lockdir) if f.startswith("cc-ci-app-") + ] == [], "janitor must not create lockfiles for non-run-app names" + + +def test_12_degrades_safely_on_bad_lockfile_and_missing_dir(lock_dir, monkeypatch, capsys): + """Case 12: a garbled/unopenable lockfile (here: a DIRECTORY at the lockfile path) is skipped + with a log line; a missing lock dir doesn't crash the janitor either. Never a crash.""" + path = lifecycle._app_lock_path(DOMAIN) # noqa: SLF001 + os.makedirs(path) # open(path, "a") -> IsADirectoryError (an OSError) + calls = _inject_candidates(monkeypatch, [DOMAIN]) + lifecycle.janitor() # must not raise + assert calls == [] + assert "skipping" in capsys.readouterr().out + + os.rmdir(path) + monkeypatch.setenv("CCCI_APP_LOCK_DIR", os.path.join(os.environ["CCCI_APP_LOCK_DIR"], "gone")) + lifecycle.janitor() # missing dir: probe open fails -> skip; tidy glob -> empty. No crash. + assert calls == [] diff --git a/tests/concurrency/test_lifetime.py b/tests/concurrency/test_lifetime.py new file mode 100644 index 0000000..1b23c69 --- /dev/null +++ b/tests/concurrency/test_lifetime.py @@ -0,0 +1,82 @@ +"""Lifetime hardening (concurrency-restructure plan, cases 13-16): the REAL prctl/signal/alarm +guards installed by helper subprocesses; tests assert teardown ran, exit was non-zero, and the +lock was released.""" + +from __future__ import annotations + +import os +import signal +import sys + +sys.path.insert(0, os.path.dirname(__file__)) +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner")) +from concutil import ( # noqa: E402 + DOMAIN, + wait_lock_state, + wait_marker, + wait_pid_gone, +) + + +def test_13_pdeathsig_parent_kill_terms_harness(lock_dir, pool): + """Case 13: wrapper-parent spawns a guarded harness-child; the parent is SIGKILL'd (the + harness gets no courtesy signal) -> the kernel's PDEATHSIG TERMs the child, its teardown + funnel runs, it exits, and the lock is released.""" + p, out = pool.spawn("wrapper", DOMAIN) + line = wait_marker(out, "WRAPPED") + assert line, "wrapper never spawned its child" + child_pid = int(line.split()[1]) + pool.track_pid(child_pid) + assert wait_marker(out, "READY"), "guarded child never got ready" + + p.kill() # parent dies WITHOUT signalling the child — only PDEATHSIG can save us + p.wait(timeout=10) + assert wait_pid_gone(child_pid), "guarded child must exit on parent death (PDEATHSIG)" + assert wait_marker(out, "TEARDOWN", timeout=5), "teardown funnel did not run" + assert wait_lock_state(DOMAIN, "free") == "free" + + +def test_14_already_orphaned_helper_refuses_to_run(lock_dir, pool): + """Case 14 (ppid race): a helper whose parent died BEFORE the prctl was armed (it starts + already reparented to pid 1) must refuse to run — PDEATHSIG would never fire for it.""" + # Spawn an intermediate parent that forks orphan-probe and exits immediately. + import subprocess + + out = os.path.join(pool.out_dir, "orphan.out") + intermediate = ( + "import subprocess, sys, os; " + "subprocess.Popen([sys.executable, os.environ['CCCI_HELPERS'], 'orphan-probe']); " + ) + env = dict( + os.environ, + CCCI_HELPER_OUT=out, + CCCI_HELPERS=os.path.join(os.path.dirname(__file__), "helpers.py"), + ) + subprocess.run([sys.executable, "-c", intermediate], env=env, timeout=15, check=True) + line = wait_marker(out, "REFUSED", timeout=20) + assert line, "orphaned helper did not refuse to run (or never reparented to pid 1)" + + +def test_15_deadline_alarm_fires_teardown_and_releases(lock_dir, pool): + """Case 15: the self-deadline (alarm). A guarded helper with a 2s deadline tears down via + the funnel (finally: ran), exits NON-zero, and its lock is released.""" + p, out = pool.spawn("guarded", DOMAIN, "2") + assert wait_marker(out, "READY") + rc = p.wait(timeout=20) + assert rc != 0, f"deadline exit must be non-zero (got {rc})" + assert rc == 128 + signal.SIGALRM, f"expected 142 (128+SIGALRM), got {rc}" + assert wait_marker(out, "TEARDOWN", timeout=5), "teardown funnel did not run on deadline" + assert wait_lock_state(DOMAIN, "free") == "free" + + +def test_16_sigterm_runs_teardown_funnel_and_releases(lock_dir, pool): + """Case 16: SIGTERM (drone cancel path) -> the finally: teardown funnel runs, exit is + non-zero, lock released.""" + p, out = pool.spawn("guarded", DOMAIN, "3600") + assert wait_marker(out, "READY") + p.send_signal(signal.SIGTERM) + rc = p.wait(timeout=20) + assert rc != 0, f"SIGTERM exit must be non-zero (got {rc})" + assert rc == 128 + signal.SIGTERM, f"expected 143 (128+SIGTERM), got {rc}" + assert wait_marker(out, "TEARDOWN", timeout=5), "teardown funnel did not run on SIGTERM" + assert wait_lock_state(DOMAIN, "free") == "free" diff --git a/tests/concurrency/test_locks.py b/tests/concurrency/test_locks.py new file mode 100644 index 0000000..8059dbb --- /dev/null +++ b/tests/concurrency/test_locks.py @@ -0,0 +1,85 @@ +"""Lock fundamentals (concurrency-restructure plan, cases 1-4). Real kernel flocks held by real +subprocesses — nothing mocked.""" + +from __future__ import annotations + +import fcntl +import os +import sys +import time + +sys.path.insert(0, os.path.dirname(__file__)) +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner")) +from concutil import ( # noqa: E402 + DOMAIN, + lock_state, + wait_lock_state, + wait_marker, +) +from harness import lifecycle # noqa: E402 + + +def test_1_sigkill_releases_lock(lock_dir, pool): + """Case 1: acquire -> holder SIGKILL'd -> lock immediately acquirable (kernel auto-release). + The exact property the old pidfile registry approximated with /proc checks.""" + p, out = pool.spawn("hold", DOMAIN) + assert wait_marker(out, "ACQUIRED"), "holder never acquired" + assert lock_state(DOMAIN) == "held" + p.kill() + p.wait(timeout=10) + assert wait_lock_state(DOMAIN, "free") == "free" + + +def test_2_nb_probe_held_vs_unheld(lock_dir, pool): + """Case 2: LOCK_NB probe raises BlockingIOError against a held lock; succeeds when unheld.""" + p, out = pool.spawn("hold", DOMAIN) + assert wait_marker(out, "ACQUIRED") + path = lifecycle._app_lock_path(DOMAIN) # noqa: SLF001 + with open(path, "a") as f: + try: + fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) + raise AssertionError("LOCK_NB succeeded against a held lock") + except BlockingIOError: + pass + p.kill() + p.wait(timeout=10) + assert wait_lock_state(DOMAIN, "free") == "free" + with open(path, "a") as f: + fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) # must not raise now + + +def test_3_lock_fd_not_inherited_by_children(lock_dir, pool): + """Case 3 (PEP 446): the holder spawns a subprocess child, the holder dies, the child lives — + and the lock is STILL released (the child never inherited the lock fd). This is what makes + 'held lock == live HARNESS owner' sound even though runs spawn abra/docker/pytest children.""" + p, out = pool.spawn("hold-with-child", DOMAIN) + assert wait_marker(out, "ACQUIRED") + child_line = wait_marker(out, "CHILD") + assert child_line, "holder never reported its child pid" + child_pid = int(child_line.split()[1]) + pool.track_pid(child_pid) + p.kill() + p.wait(timeout=10) + assert os.path.exists(f"/proc/{child_pid}"), "child should outlive the holder" + assert ( + wait_lock_state(DOMAIN, "free") == "free" + ), "lock must release on holder death even with a live child (PEP 446 non-inheritable fd)" + + +def test_4_second_acquire_blocks_until_first_exits(lock_dir, pool): + """Case 4: a second same-domain acquire blocks until the first holder exits — the + double-!testme serialisation property.""" + p1, out1 = pool.spawn("hold", DOMAIN) + assert wait_marker(out1, "ACQUIRED") + p2, out2 = pool.spawn("hold", DOMAIN) + # p2 must NOT acquire while p1 holds. + time.sleep(1.5) + assert wait_marker(out2, "ACQUIRED", timeout=0.1) is None, "second acquire did not block" + t_kill = time.time() + p1.kill() + p1.wait(timeout=10) + line = wait_marker(out2, "ACQUIRED", timeout=15) + assert line, "second acquire never completed after first holder exited" + acquired_ts = float(line.split()[1]) + assert acquired_ts >= t_kill - 0.05, "second holder acquired before the first exited" + assert lock_state(DOMAIN) == "held"