test(concurrency): real-kernel suite for the restructured model — 20 tests, 19 plan cases
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
tests/concurrency/ — NOT in the default `pytest tests/unit` gate; run explicitly with `pytest tests/concurrency -q`. flock/prctl/alarm are never mocked: helper subprocesses (helpers.py) hold real locks and install the real lifetime guards; locks live in a per-test tmp dir via CCCI_APP_LOCK_DIR; every helper (and recorded grandchild) is reaped by fixture cleanup. - test_locks.py (cases 1-4): SIGKILL auto-release; LOCK_NB held/unheld semantics; PEP 446 fd-not-inherited (holder's child survives, lock still releases); same-domain second acquire blocks until first holder exits. - test_janitor.py (cases 5-12): orphan reaped once + lockfile unlinked; live holder never reaped + logged; new-run acquire blocks until a slow reap completes (reap-under-probe-lock); two overlapping janitors -> exactly one reaps (flock arbitration); reboot sim (no lockfile) reaps immediately with no age wait; >120min-held lock flagged 'possible leaked run' and NOT stolen; warm/canonical names never probed (no lockfile even created); directory-as-lockfile and missing lock dir degrade to skip+log, never crash. - test_lifetime.py (cases 13-16): PDEATHSIG (wrapper parent SIGKILL'd -> guarded child TERM'd, teardown marker, lock released); already-orphaned helper REFUSES to run (ppid race); 2s deadline alarm -> teardown + exit 142 + lock released; SIGTERM -> teardown + exit 143 + lock released. - test_abra_dir.py (cases 17-19 + 18b): per-run dir built + $ABRA_DIR exported before the first abra call (recording stub abra on PATH); two CONCURRENT same-recipe fetch+checkout flows into different ABRA_DIRs -> divergent correct trees, canonical staged clone untouched; .env written through the servers/ symlink lands in the canonical path (env_get/env_set agree); manual runs get pid-suffixed dirs. On cc-ci: pytest tests/concurrency -q -> 20 passed; tests/unit -> 138 passed; lint PASS.
This commit is contained in:
108
tests/concurrency/concutil.py
Normal file
108
tests/concurrency/concutil.py
Normal file
@ -0,0 +1,108 @@
|
||||
"""Shared utilities for the real-kernel concurrency suite (imported by the test modules; the
|
||||
fixtures in conftest.py wrap these). No flock mocking anywhere — probes use real LOCK_NB."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import contextlib
|
||||
import fcntl
|
||||
import os
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner"))
|
||||
from harness import lifecycle # noqa: E402
|
||||
|
||||
HELPERS = os.path.join(os.path.dirname(__file__), "helpers.py")
|
||||
DOMAIN = "test-abc123.ci.commoninternet.net" # matches RUN_APP_RE
|
||||
|
||||
|
||||
class HelperPool:
|
||||
"""Spawns helpers.py subprocesses and GUARANTEES their cleanup (incl. recorded grandchild
|
||||
pids from `hold-with-child`/`wrapper` markers) — no leaked children in the test VM."""
|
||||
|
||||
def __init__(self, out_dir: str):
|
||||
self.out_dir = out_dir
|
||||
self.procs: list[subprocess.Popen] = []
|
||||
self.extra_pids: list[int] = []
|
||||
self._n = 0
|
||||
|
||||
def spawn(self, *args: str, env_extra: dict | None = None) -> tuple[subprocess.Popen, str]:
|
||||
"""Start `helpers.py <args...>`; returns (proc, marker_file)."""
|
||||
self._n += 1
|
||||
out = os.path.join(self.out_dir, f"helper-{self._n}.out")
|
||||
env = dict(os.environ, CCCI_HELPER_OUT=out, **(env_extra or {}))
|
||||
p = subprocess.Popen( # noqa: S603
|
||||
[sys.executable, HELPERS, *args],
|
||||
env=env,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.STDOUT,
|
||||
)
|
||||
self.procs.append(p)
|
||||
return p, out
|
||||
|
||||
def track_pid(self, pid: int) -> None:
|
||||
self.extra_pids.append(pid)
|
||||
|
||||
def cleanup(self) -> None:
|
||||
for p in self.procs:
|
||||
if p.poll() is None:
|
||||
p.kill()
|
||||
with contextlib.suppress(subprocess.TimeoutExpired):
|
||||
p.wait(timeout=10)
|
||||
for pid in self.extra_pids:
|
||||
with contextlib.suppress(OSError):
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
|
||||
|
||||
def wait_marker(out: str, token: str, timeout: float = 15.0) -> str | None:
|
||||
"""Poll a helper's marker file for a line containing `token`; returns the line or None."""
|
||||
deadline = time.time() + timeout
|
||||
while time.time() < deadline:
|
||||
try:
|
||||
with open(out) as f:
|
||||
for line in f:
|
||||
if token in line:
|
||||
return line.strip()
|
||||
except OSError:
|
||||
pass
|
||||
time.sleep(0.1)
|
||||
return None
|
||||
|
||||
|
||||
def lock_state(domain: str) -> str:
|
||||
"""'held' | 'free' | 'absent' for the domain's lockfile, probed with a REAL LOCK_NB."""
|
||||
path = lifecycle._app_lock_path(domain) # noqa: SLF001
|
||||
if not os.path.exists(path):
|
||||
return "absent"
|
||||
with open(path, "a") as f:
|
||||
try:
|
||||
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
return "free"
|
||||
except BlockingIOError:
|
||||
return "held"
|
||||
|
||||
|
||||
def wait_lock_state(domain: str, want: str, timeout: float = 10.0) -> str:
|
||||
"""Poll until lock_state(domain) == want (kernel release on process death is fast, but give
|
||||
the scheduler room). Returns the final observed state."""
|
||||
deadline = time.time() + timeout
|
||||
state = lock_state(domain)
|
||||
while state != want and time.time() < deadline:
|
||||
time.sleep(0.1)
|
||||
state = lock_state(domain)
|
||||
return state
|
||||
|
||||
|
||||
def pid_alive(pid: int) -> bool:
|
||||
return os.path.exists(f"/proc/{pid}")
|
||||
|
||||
|
||||
def wait_pid_gone(pid: int, timeout: float = 15.0) -> bool:
|
||||
deadline = time.time() + timeout
|
||||
while time.time() < deadline:
|
||||
if not pid_alive(pid):
|
||||
return True
|
||||
time.sleep(0.1)
|
||||
return False
|
||||
34
tests/concurrency/conftest.py
Normal file
34
tests/concurrency/conftest.py
Normal file
@ -0,0 +1,34 @@
|
||||
"""Fixtures for the real-kernel concurrency suite (concurrency-restructure plan, 19 cases).
|
||||
|
||||
NOT part of the default `pytest tests/unit` gate — run explicitly with `pytest tests/concurrency
|
||||
-q` (docs/concurrency.md). Locks live in a per-test tmp dir (CCCI_APP_LOCK_DIR); helper
|
||||
subprocesses hold REAL flocks / install the REAL prctl+signal guards and are always reaped in
|
||||
fixture finalizers (no leaked children in the test VM).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.dirname(__file__))
|
||||
from concutil import HelperPool # noqa: E402
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def lock_dir(tmp_path, monkeypatch):
|
||||
"""Sandbox lock dir, exported so BOTH this process's lifecycle calls and helper subprocesses
|
||||
(which inherit os.environ) resolve their lockfiles here — never /run/lock."""
|
||||
d = tmp_path / "locks"
|
||||
d.mkdir()
|
||||
monkeypatch.setenv("CCCI_APP_LOCK_DIR", str(d))
|
||||
return str(d)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pool(tmp_path):
|
||||
hp = HelperPool(str(tmp_path))
|
||||
yield hp
|
||||
hp.cleanup()
|
||||
120
tests/concurrency/helpers.py
Normal file
120
tests/concurrency/helpers.py
Normal file
@ -0,0 +1,120 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Subprocess helpers for tests/concurrency — REAL kernel locks and the REAL lifetime guards in
|
||||
separate processes (flock/prctl are never mocked; tests assert on actual kernel behavior).
|
||||
|
||||
Invoked as: python3 helpers.py <command> <args...>
|
||||
|
||||
Env contract (set by the spawning test):
|
||||
CCCI_APP_LOCK_DIR sandbox lock dir (never /run/lock in tests)
|
||||
CCCI_HELPER_OUT marker file this helper APPENDS progress lines to (ACQUIRED/READY/...)
|
||||
|
||||
Commands:
|
||||
hold <domain> acquire the app lock, mark `ACQUIRED <ts>`, sleep forever
|
||||
hold-with-child <domain> acquire the lock, spawn a plain sleeping subprocess child, mark
|
||||
`ACQUIRED <ts>` + `CHILD <pid>` (PEP 446: the child must NOT
|
||||
inherit the lock fd), sleep forever
|
||||
guarded <domain> <deadline> install the REAL lifetime guards (alarm=<deadline>s), acquire the
|
||||
lock, mark `READY`; when the teardown funnel runs (`finally:`),
|
||||
mark `TEARDOWN` before exiting
|
||||
wrapper <domain> spawn `guarded <domain> 3600` as MY child, mark `WRAPPED <pid>`,
|
||||
sleep — the test kills me to prove PDEATHSIG TERMs the child
|
||||
orphan-probe wait (bounded) until reparented (ppid==1), then install the
|
||||
guards; mark `REFUSED` if they exit (expected) or `GUARDS_OK`
|
||||
fetch-checkout <recipe> <ref> run run_recipe_ci.fetch_recipe (the test sets CCCI_SKIP_FETCH=1
|
||||
+ a per-"run" ABRA_DIR), git-checkout <ref>, mark
|
||||
`RESULT <head> <data.txt content>`
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "runner"))
|
||||
from harness import abra, lifecycle, lifetime # noqa: E402
|
||||
|
||||
OUT = os.environ.get("CCCI_HELPER_OUT")
|
||||
|
||||
|
||||
def mark(line: str) -> None:
|
||||
if OUT:
|
||||
with open(OUT, "a") as f:
|
||||
f.write(line + "\n")
|
||||
f.flush()
|
||||
print(line, flush=True)
|
||||
|
||||
|
||||
def cmd_hold(domain: str) -> None:
|
||||
lifecycle.acquire_app_lock(domain)
|
||||
mark(f"ACQUIRED {time.time()}")
|
||||
time.sleep(3600)
|
||||
|
||||
|
||||
def cmd_hold_with_child(domain: str) -> None:
|
||||
lifecycle.acquire_app_lock(domain)
|
||||
child = subprocess.Popen([sys.executable, "-c", "import time; time.sleep(3600)"])
|
||||
mark(f"ACQUIRED {time.time()}")
|
||||
mark(f"CHILD {child.pid}")
|
||||
time.sleep(3600)
|
||||
|
||||
|
||||
def cmd_guarded(domain: str, deadline: str) -> None:
|
||||
lifetime.install_lifetime_guards(deadline_seconds=int(deadline))
|
||||
lifecycle.acquire_app_lock(domain)
|
||||
mark("READY")
|
||||
try:
|
||||
time.sleep(3600)
|
||||
finally:
|
||||
mark("TEARDOWN")
|
||||
|
||||
|
||||
def cmd_wrapper(domain: str) -> None:
|
||||
p = subprocess.Popen( # noqa: S603
|
||||
[sys.executable, os.path.abspath(__file__), "guarded", domain, "3600"],
|
||||
env=os.environ.copy(),
|
||||
)
|
||||
mark(f"WRAPPED {p.pid}")
|
||||
time.sleep(3600)
|
||||
|
||||
|
||||
def cmd_orphan_probe() -> None:
|
||||
# Our spawner exits immediately after fork; wait (bounded) until we are reparented so the
|
||||
# prctl is installed with the parent ALREADY dead — the exact race the ppid check closes.
|
||||
for _ in range(200):
|
||||
if os.getppid() == 1:
|
||||
break
|
||||
time.sleep(0.05)
|
||||
else:
|
||||
mark("NEVER_REPARENTED") # e.g. a subreaper environment — test will fail visibly
|
||||
return
|
||||
try:
|
||||
lifetime.install_lifetime_guards()
|
||||
except SystemExit:
|
||||
mark("REFUSED")
|
||||
raise
|
||||
mark("GUARDS_OK")
|
||||
|
||||
|
||||
def cmd_fetch_checkout(recipe: str, ref: str) -> None:
|
||||
import run_recipe_ci
|
||||
|
||||
run_recipe_ci.fetch_recipe(recipe, None, None)
|
||||
abra.recipe_checkout(recipe, ref)
|
||||
head = abra.recipe_head_commit(recipe)
|
||||
with open(os.path.join(abra.recipe_dir(recipe), "data.txt")) as f:
|
||||
content = f.read().strip()
|
||||
mark(f"RESULT {head} {content}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cmd, *args = sys.argv[1:]
|
||||
{
|
||||
"hold": cmd_hold,
|
||||
"hold-with-child": cmd_hold_with_child,
|
||||
"guarded": cmd_guarded,
|
||||
"wrapper": cmd_wrapper,
|
||||
"orphan-probe": cmd_orphan_probe,
|
||||
"fetch-checkout": cmd_fetch_checkout,
|
||||
}[cmd](*args)
|
||||
175
tests/concurrency/test_abra_dir.py
Normal file
175
tests/concurrency/test_abra_dir.py
Normal file
@ -0,0 +1,175 @@
|
||||
"""Per-run ABRA_DIR isolation (concurrency-restructure plan, cases 17-19). Real directories,
|
||||
real symlinks, real git — abra itself is replaced by a recording stub where a CLI call is
|
||||
involved (case 17), because these cases test OUR dir/env plumbing, not abra."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import stat
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(__file__))
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner"))
|
||||
import run_recipe_ci # noqa: E402
|
||||
from concutil import wait_marker # noqa: E402
|
||||
from harness import abra # noqa: E402
|
||||
|
||||
RECIPE = "fakerecipe"
|
||||
|
||||
|
||||
def _git(cwd, *args):
|
||||
subprocess.run(
|
||||
["git", "-c", "user.email=t@t", "-c", "user.name=t", *args],
|
||||
cwd=cwd,
|
||||
check=True,
|
||||
capture_output=True,
|
||||
)
|
||||
|
||||
|
||||
def _make_fake_home(tmp_path):
|
||||
"""A fake $HOME with a canonical ~/.abra: servers/default + catalogue dirs, and a recipe git
|
||||
repo with two tags whose data.txt differs (v1 -> 'one', v2 -> 'two', HEAD at v2)."""
|
||||
home = tmp_path / "home"
|
||||
(home / ".abra" / "servers" / "default").mkdir(parents=True)
|
||||
(home / ".abra" / "catalogue").mkdir(parents=True)
|
||||
repo = home / ".abra" / "recipes" / RECIPE
|
||||
repo.mkdir(parents=True)
|
||||
_git(repo, "init", "-q")
|
||||
(repo / "data.txt").write_text("one\n")
|
||||
_git(repo, "add", "data.txt")
|
||||
_git(repo, "commit", "-qm", "v1")
|
||||
_git(repo, "tag", "v1")
|
||||
(repo / "data.txt").write_text("two\n")
|
||||
_git(repo, "add", "data.txt")
|
||||
_git(repo, "commit", "-qm", "v2")
|
||||
_git(repo, "tag", "v2")
|
||||
return home
|
||||
|
||||
|
||||
def test_17_per_run_dir_built_and_exported_before_abra(tmp_path, monkeypatch):
|
||||
"""Case 17: setup_run_abra_dir builds the per-run dir correctly (servers/catalogue symlinks
|
||||
resolve to the canonical tree, recipes/ empty + writable) and $ABRA_DIR is exported before
|
||||
the first abra call — proven by a stub `abra` on PATH that records the env it saw."""
|
||||
home = _make_fake_home(tmp_path)
|
||||
monkeypatch.setenv("HOME", str(home))
|
||||
monkeypatch.setenv("CCCI_RUNS_DIR", str(tmp_path / "runs"))
|
||||
monkeypatch.setenv("DRONE_BUILD_NUMBER", "777")
|
||||
monkeypatch.setenv("ABRA_DIR", "sentinel-to-be-overwritten") # so monkeypatch restores it
|
||||
|
||||
d = run_recipe_ci.setup_run_abra_dir()
|
||||
assert d == str(tmp_path / "runs" / "777" / "abra")
|
||||
assert os.environ["ABRA_DIR"] == d
|
||||
assert os.readlink(os.path.join(d, "servers")) == str(home / ".abra" / "servers")
|
||||
assert os.readlink(os.path.join(d, "catalogue")) == str(home / ".abra" / "catalogue")
|
||||
# symlinks RESOLVE (targets exist) and recipes/ is empty + writable
|
||||
assert os.path.isdir(os.path.join(d, "servers", "default"))
|
||||
assert os.path.isdir(os.path.join(d, "catalogue"))
|
||||
assert os.listdir(os.path.join(d, "recipes")) == []
|
||||
probe = os.path.join(d, "recipes", ".write-probe")
|
||||
open(probe, "w").close()
|
||||
os.remove(probe)
|
||||
# idempotent re-entry (Drone build-number retry): must not raise on existing symlinks
|
||||
assert run_recipe_ci.setup_run_abra_dir() == d
|
||||
|
||||
# stub abra records $ABRA_DIR at call time; fetch_recipe's catalogue branch invokes it
|
||||
stub_dir = tmp_path / "bin"
|
||||
stub_dir.mkdir()
|
||||
log = tmp_path / "abra-env.log"
|
||||
stub = stub_dir / "abra"
|
||||
stub.write_text(f'#!/bin/sh\necho "$ABRA_DIR" >> {log}\nexit 0\n')
|
||||
stub.chmod(stub.stat().st_mode | stat.S_IEXEC)
|
||||
monkeypatch.setenv("PATH", f"{stub_dir}{os.pathsep}{os.environ['PATH']}")
|
||||
monkeypatch.delenv("CCCI_SKIP_FETCH", raising=False)
|
||||
run_recipe_ci.fetch_recipe(RECIPE, None, None)
|
||||
assert log.read_text().strip() == d, "abra was called without the per-run ABRA_DIR exported"
|
||||
|
||||
|
||||
def test_18_concurrent_same_recipe_fetch_no_cross_talk(tmp_path, monkeypatch, pool):
|
||||
"""Case 18: two CONCURRENT fetch+checkout flows of the SAME recipe into different ABRA_DIRs
|
||||
produce two correct, divergent trees (v1 vs v2) — the old shared-tree corruption scenario,
|
||||
now structurally safe with no lock. The canonical staged clone is untouched."""
|
||||
home = _make_fake_home(tmp_path)
|
||||
canonical_repo = home / ".abra" / "recipes" / RECIPE
|
||||
head_before = subprocess.run(
|
||||
["git", "-C", canonical_repo, "rev-parse", "HEAD"], capture_output=True, text=True
|
||||
).stdout.strip()
|
||||
|
||||
runs = {}
|
||||
for name, ref in (("runA", "v1"), ("runB", "v2")):
|
||||
abra_dir = tmp_path / name / "abra"
|
||||
abra_dir.mkdir(parents=True)
|
||||
_, out = pool.spawn(
|
||||
"fetch-checkout",
|
||||
RECIPE,
|
||||
ref,
|
||||
env_extra={
|
||||
"HOME": str(home),
|
||||
"ABRA_DIR": str(abra_dir),
|
||||
"CCCI_SKIP_FETCH": "1",
|
||||
},
|
||||
)
|
||||
runs[name] = (out, ref, abra_dir)
|
||||
|
||||
expect = {"v1": "one", "v2": "two"}
|
||||
for name, (out, ref, abra_dir) in runs.items():
|
||||
line = wait_marker(out, "RESULT", timeout=30)
|
||||
assert line, f"{name} never produced a RESULT"
|
||||
_, head, content = line.split()
|
||||
assert content == expect[ref], f"{name}@{ref}: tree content {content!r}"
|
||||
tree = abra_dir / "recipes" / RECIPE
|
||||
assert (tree / "data.txt").read_text().strip() == expect[ref]
|
||||
assert (
|
||||
head
|
||||
== subprocess.run(
|
||||
["git", "-C", tree, "rev-parse", "HEAD"], capture_output=True, text=True
|
||||
).stdout.strip()
|
||||
)
|
||||
|
||||
# the two trees genuinely diverge AND the canonical staged clone is untouched
|
||||
a = (runs["runA"][2] / "recipes" / RECIPE / "data.txt").read_text()
|
||||
b = (runs["runB"][2] / "recipes" / RECIPE / "data.txt").read_text()
|
||||
assert a != b
|
||||
head_after = subprocess.run(
|
||||
["git", "-C", canonical_repo, "rev-parse", "HEAD"], capture_output=True, text=True
|
||||
).stdout.strip()
|
||||
assert head_after == head_before, "canonical clone must not be touched by per-run fetches"
|
||||
|
||||
|
||||
def test_19_env_written_through_servers_symlink_lands_canonical(tmp_path, monkeypatch):
|
||||
"""Case 19: an app .env written through the per-run servers/ symlink (what abra does under
|
||||
$ABRA_DIR) lands in the CANONICAL shared path — so janitor discovery and every
|
||||
expanduser('~/.abra/servers/...') reader keep working unchanged."""
|
||||
home = _make_fake_home(tmp_path)
|
||||
monkeypatch.setenv("HOME", str(home))
|
||||
monkeypatch.setenv("CCCI_RUNS_DIR", str(tmp_path / "runs"))
|
||||
monkeypatch.setenv("DRONE_BUILD_NUMBER", "778")
|
||||
monkeypatch.setenv("ABRA_DIR", "sentinel-to-be-overwritten")
|
||||
d = run_recipe_ci.setup_run_abra_dir()
|
||||
|
||||
domain = "test-abc123.ci.commoninternet.net"
|
||||
via_symlink = os.path.join(d, "servers", "default", f"{domain}.env")
|
||||
with open(via_symlink, "w") as f:
|
||||
f.write("TYPE=fakerecipe:1.0.0\nDOMAIN=placeholder\n")
|
||||
|
||||
canonical = home / ".abra" / "servers" / "default" / f"{domain}.env"
|
||||
assert canonical.is_file(), ".env written via the symlink must land in the canonical path"
|
||||
# the canonical-path readers/writers (abra.env_get/env_set use ~/.abra) see the same file
|
||||
assert abra.env_get(domain, "TYPE") == "fakerecipe:1.0.0"
|
||||
abra.env_set(domain, "DOMAIN", domain)
|
||||
with open(via_symlink) as f:
|
||||
assert f"DOMAIN={domain}" in f.read()
|
||||
|
||||
|
||||
def test_18b_run_id_manual_fallback_is_per_process(tmp_path, monkeypatch):
|
||||
"""Companion to case 18: two concurrent MANUAL runs (no DRONE_BUILD_NUMBER) must not share an
|
||||
abra dir either — the manual fallback is pid-suffixed."""
|
||||
home = _make_fake_home(tmp_path)
|
||||
monkeypatch.setenv("HOME", str(home))
|
||||
monkeypatch.setenv("CCCI_RUNS_DIR", str(tmp_path / "runs"))
|
||||
monkeypatch.delenv("DRONE_BUILD_NUMBER", raising=False)
|
||||
monkeypatch.delenv("CCCI_APP_DOMAIN", raising=False)
|
||||
monkeypatch.delenv("CCCI_RUN_ID", raising=False)
|
||||
monkeypatch.setenv("ABRA_DIR", "sentinel-to-be-overwritten")
|
||||
d = run_recipe_ci.setup_run_abra_dir()
|
||||
assert f"manual-{os.getpid()}" in d
|
||||
189
tests/concurrency/test_janitor.py
Normal file
189
tests/concurrency/test_janitor.py
Normal file
@ -0,0 +1,189 @@
|
||||
"""Janitor / flock-probe semantics (concurrency-restructure plan, cases 5-12).
|
||||
|
||||
The janitor runs IN-PROCESS with its discovery monkeypatched (candidates injected via a stubbed
|
||||
abra.app_ls + empty docker sweep) and teardown_app stubbed to record calls — but the LOCKS are
|
||||
real kernel flocks, held by real helper subprocesses where a live owner is needed."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
sys.path.insert(0, os.path.dirname(__file__))
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner"))
|
||||
from concutil import DOMAIN, lock_state, wait_marker # noqa: E402
|
||||
from harness import lifecycle # noqa: E402
|
||||
|
||||
|
||||
def _inject_candidates(monkeypatch, domains):
|
||||
"""Point janitor discovery at exactly `domains`: abra lists them, docker sweep is empty.
|
||||
teardown_app is stubbed to a recorder; returns the calls list."""
|
||||
calls = []
|
||||
monkeypatch.setattr(lifecycle.abra, "app_ls", lambda: [{"appName": d} for d in domains])
|
||||
monkeypatch.setattr(lifecycle, "_docker_names", lambda kind, stack: [])
|
||||
monkeypatch.setattr(lifecycle, "teardown_app", lambda d, verify=True: calls.append(d))
|
||||
return calls
|
||||
|
||||
|
||||
def test_5_orphan_reaped_lockfile_unlinked(lock_dir, pool, monkeypatch):
|
||||
"""Case 5: an orphan (lockfile exists, no holder — its run was SIGKILL'd) is reaped exactly
|
||||
once and its lockfile unlinked."""
|
||||
p, out = pool.spawn("hold", DOMAIN)
|
||||
assert wait_marker(out, "ACQUIRED")
|
||||
p.kill()
|
||||
p.wait(timeout=10)
|
||||
calls = _inject_candidates(monkeypatch, [DOMAIN])
|
||||
lifecycle.janitor()
|
||||
assert calls == [DOMAIN], f"teardown calls: {calls} (expected exactly one)"
|
||||
assert lock_state(DOMAIN) == "absent", "reaped orphan's lockfile must be unlinked"
|
||||
|
||||
|
||||
def test_6_live_run_never_reaped(lock_dir, pool, monkeypatch, capsys):
|
||||
"""Case 6: a held lock (live helper) is never reaped and is logged as live."""
|
||||
p, out = pool.spawn("hold", DOMAIN)
|
||||
assert wait_marker(out, "ACQUIRED")
|
||||
calls = _inject_candidates(monkeypatch, [DOMAIN])
|
||||
lifecycle.janitor()
|
||||
assert calls == []
|
||||
assert "live concurrent run" in capsys.readouterr().out
|
||||
assert lock_state(DOMAIN) == "held"
|
||||
|
||||
|
||||
def test_7_new_run_blocks_until_reap_finishes(lock_dir, pool, monkeypatch):
|
||||
"""Case 7: the janitor reaps WHILE HOLDING the probe lock, so a new run of the same domain
|
||||
blocks in acquire_app_lock until the reap completes — no window where a fresh app coexists
|
||||
with a half-reaped one."""
|
||||
# Make an orphan.
|
||||
p, out = pool.spawn("hold", DOMAIN)
|
||||
assert wait_marker(out, "ACQUIRED")
|
||||
p.kill()
|
||||
p.wait(timeout=10)
|
||||
|
||||
state = {"teardown_end": None, "acquirer_out": None}
|
||||
|
||||
def slow_teardown(domain, verify=True):
|
||||
# While the janitor holds the probe lock mid-reap, a new run starts acquiring.
|
||||
_, aout = pool.spawn("hold", DOMAIN)
|
||||
state["acquirer_out"] = aout
|
||||
time.sleep(2.0)
|
||||
state["teardown_end"] = time.time()
|
||||
|
||||
monkeypatch.setattr(lifecycle.abra, "app_ls", lambda: [{"appName": DOMAIN}])
|
||||
monkeypatch.setattr(lifecycle, "_docker_names", lambda kind, stack: [])
|
||||
monkeypatch.setattr(lifecycle, "teardown_app", slow_teardown)
|
||||
lifecycle.janitor()
|
||||
|
||||
line = wait_marker(state["acquirer_out"], "ACQUIRED", timeout=15)
|
||||
assert line, "new run never acquired after the reap"
|
||||
acquired_ts = float(line.split()[1])
|
||||
assert (
|
||||
acquired_ts >= state["teardown_end"]
|
||||
), f"new run acquired at {acquired_ts} BEFORE the reap finished at {state['teardown_end']}"
|
||||
# The new run must hold a lock the next probe can SEE (fresh inode at the path).
|
||||
assert lock_state(DOMAIN) == "held"
|
||||
|
||||
|
||||
def test_8_two_janitors_exactly_one_reaps(lock_dir, pool, monkeypatch):
|
||||
"""Case 8: two concurrent janitors arbitrate on the probe flock — exactly one reaps (the
|
||||
other sees 'held' and leaves). Teardown is slowed so the runs genuinely overlap."""
|
||||
p, out = pool.spawn("hold", DOMAIN)
|
||||
assert wait_marker(out, "ACQUIRED")
|
||||
p.kill()
|
||||
p.wait(timeout=10)
|
||||
|
||||
calls = []
|
||||
calls_lock = threading.Lock()
|
||||
|
||||
def slow_teardown(domain, verify=True):
|
||||
with calls_lock:
|
||||
calls.append(domain)
|
||||
time.sleep(2.0)
|
||||
|
||||
monkeypatch.setattr(lifecycle.abra, "app_ls", lambda: [{"appName": DOMAIN}])
|
||||
monkeypatch.setattr(lifecycle, "_docker_names", lambda kind, stack: [])
|
||||
monkeypatch.setattr(lifecycle, "teardown_app", slow_teardown)
|
||||
|
||||
barrier = threading.Barrier(2)
|
||||
|
||||
def run_janitor():
|
||||
barrier.wait()
|
||||
lifecycle.janitor()
|
||||
|
||||
t1, t2 = threading.Thread(target=run_janitor), threading.Thread(target=run_janitor)
|
||||
t1.start(), t2.start()
|
||||
t1.join(timeout=30), t2.join(timeout=30)
|
||||
assert calls == [DOMAIN], f"expected exactly one reap, got {calls}"
|
||||
assert lock_state(DOMAIN) == "absent"
|
||||
|
||||
|
||||
def test_9_reboot_lockfile_absent_reaped_immediately(lock_dir, monkeypatch):
|
||||
"""Case 9: post-reboot simulation — the app exists but its lockfile is gone (/run/lock is
|
||||
tmpfs). The probe trivially acquires -> immediate reap, NO age threshold (improvement over
|
||||
the old 2h fallback)."""
|
||||
assert lock_state(DOMAIN) == "absent"
|
||||
calls = _inject_candidates(monkeypatch, [DOMAIN])
|
||||
t0 = time.time()
|
||||
lifecycle.janitor()
|
||||
assert calls == [DOMAIN]
|
||||
assert time.time() - t0 < 5, "reap must be immediate (no age wait)"
|
||||
|
||||
|
||||
def test_10_long_held_lock_flagged_never_stolen(lock_dir, pool, monkeypatch, capsys):
|
||||
"""Case 10: a lock held with mtime older than 120min is flagged as a possible leaked run —
|
||||
and NOT reaped (never steal a held lock)."""
|
||||
p, out = pool.spawn("hold", DOMAIN)
|
||||
assert wait_marker(out, "ACQUIRED")
|
||||
path = lifecycle._app_lock_path(DOMAIN) # noqa: SLF001
|
||||
backdate = time.time() - (130 * 60)
|
||||
os.utime(path, (backdate, backdate))
|
||||
calls = _inject_candidates(monkeypatch, [DOMAIN])
|
||||
lifecycle.janitor()
|
||||
assert calls == []
|
||||
out_text = capsys.readouterr().out
|
||||
assert "possible leaked run" in out_text and "lslocks" in out_text
|
||||
assert lock_state(DOMAIN) == "held"
|
||||
|
||||
|
||||
def test_11_warm_canonical_names_never_probed(lock_dir, monkeypatch):
|
||||
"""Case 11: RUN_APP_RE allowlist — warm/canonical-shaped names never become candidates, so
|
||||
they are never probed (no lockfile is even created for them) and never reaped."""
|
||||
warmish = [
|
||||
"warm-keycloak.ci.commoninternet.net",
|
||||
"keycloak.ci.commoninternet.net",
|
||||
"warm-hedgedoc.ci.commoninternet.net",
|
||||
"drone.ci.commoninternet.net",
|
||||
]
|
||||
calls = []
|
||||
monkeypatch.setattr(lifecycle.abra, "app_ls", lambda: [{"appName": d} for d in warmish])
|
||||
monkeypatch.setattr(
|
||||
lifecycle,
|
||||
"_docker_names",
|
||||
lambda kind, stack: ["warm-keycloak_ci_commoninternet_net_app"]
|
||||
if kind == "service"
|
||||
else [],
|
||||
)
|
||||
monkeypatch.setattr(lifecycle, "teardown_app", lambda d, verify=True: calls.append(d))
|
||||
lifecycle.janitor()
|
||||
assert calls == []
|
||||
lockdir = os.environ["CCCI_APP_LOCK_DIR"]
|
||||
assert [
|
||||
f for f in os.listdir(lockdir) if f.startswith("cc-ci-app-")
|
||||
] == [], "janitor must not create lockfiles for non-run-app names"
|
||||
|
||||
|
||||
def test_12_degrades_safely_on_bad_lockfile_and_missing_dir(lock_dir, monkeypatch, capsys):
|
||||
"""Case 12: a garbled/unopenable lockfile (here: a DIRECTORY at the lockfile path) is skipped
|
||||
with a log line; a missing lock dir doesn't crash the janitor either. Never a crash."""
|
||||
path = lifecycle._app_lock_path(DOMAIN) # noqa: SLF001
|
||||
os.makedirs(path) # open(path, "a") -> IsADirectoryError (an OSError)
|
||||
calls = _inject_candidates(monkeypatch, [DOMAIN])
|
||||
lifecycle.janitor() # must not raise
|
||||
assert calls == []
|
||||
assert "skipping" in capsys.readouterr().out
|
||||
|
||||
os.rmdir(path)
|
||||
monkeypatch.setenv("CCCI_APP_LOCK_DIR", os.path.join(os.environ["CCCI_APP_LOCK_DIR"], "gone"))
|
||||
lifecycle.janitor() # missing dir: probe open fails -> skip; tidy glob -> empty. No crash.
|
||||
assert calls == []
|
||||
82
tests/concurrency/test_lifetime.py
Normal file
82
tests/concurrency/test_lifetime.py
Normal file
@ -0,0 +1,82 @@
|
||||
"""Lifetime hardening (concurrency-restructure plan, cases 13-16): the REAL prctl/signal/alarm
|
||||
guards installed by helper subprocesses; tests assert teardown ran, exit was non-zero, and the
|
||||
lock was released."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(__file__))
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner"))
|
||||
from concutil import ( # noqa: E402
|
||||
DOMAIN,
|
||||
wait_lock_state,
|
||||
wait_marker,
|
||||
wait_pid_gone,
|
||||
)
|
||||
|
||||
|
||||
def test_13_pdeathsig_parent_kill_terms_harness(lock_dir, pool):
|
||||
"""Case 13: wrapper-parent spawns a guarded harness-child; the parent is SIGKILL'd (the
|
||||
harness gets no courtesy signal) -> the kernel's PDEATHSIG TERMs the child, its teardown
|
||||
funnel runs, it exits, and the lock is released."""
|
||||
p, out = pool.spawn("wrapper", DOMAIN)
|
||||
line = wait_marker(out, "WRAPPED")
|
||||
assert line, "wrapper never spawned its child"
|
||||
child_pid = int(line.split()[1])
|
||||
pool.track_pid(child_pid)
|
||||
assert wait_marker(out, "READY"), "guarded child never got ready"
|
||||
|
||||
p.kill() # parent dies WITHOUT signalling the child — only PDEATHSIG can save us
|
||||
p.wait(timeout=10)
|
||||
assert wait_pid_gone(child_pid), "guarded child must exit on parent death (PDEATHSIG)"
|
||||
assert wait_marker(out, "TEARDOWN", timeout=5), "teardown funnel did not run"
|
||||
assert wait_lock_state(DOMAIN, "free") == "free"
|
||||
|
||||
|
||||
def test_14_already_orphaned_helper_refuses_to_run(lock_dir, pool):
|
||||
"""Case 14 (ppid race): a helper whose parent died BEFORE the prctl was armed (it starts
|
||||
already reparented to pid 1) must refuse to run — PDEATHSIG would never fire for it."""
|
||||
# Spawn an intermediate parent that forks orphan-probe and exits immediately.
|
||||
import subprocess
|
||||
|
||||
out = os.path.join(pool.out_dir, "orphan.out")
|
||||
intermediate = (
|
||||
"import subprocess, sys, os; "
|
||||
"subprocess.Popen([sys.executable, os.environ['CCCI_HELPERS'], 'orphan-probe']); "
|
||||
)
|
||||
env = dict(
|
||||
os.environ,
|
||||
CCCI_HELPER_OUT=out,
|
||||
CCCI_HELPERS=os.path.join(os.path.dirname(__file__), "helpers.py"),
|
||||
)
|
||||
subprocess.run([sys.executable, "-c", intermediate], env=env, timeout=15, check=True)
|
||||
line = wait_marker(out, "REFUSED", timeout=20)
|
||||
assert line, "orphaned helper did not refuse to run (or never reparented to pid 1)"
|
||||
|
||||
|
||||
def test_15_deadline_alarm_fires_teardown_and_releases(lock_dir, pool):
|
||||
"""Case 15: the self-deadline (alarm). A guarded helper with a 2s deadline tears down via
|
||||
the funnel (finally: ran), exits NON-zero, and its lock is released."""
|
||||
p, out = pool.spawn("guarded", DOMAIN, "2")
|
||||
assert wait_marker(out, "READY")
|
||||
rc = p.wait(timeout=20)
|
||||
assert rc != 0, f"deadline exit must be non-zero (got {rc})"
|
||||
assert rc == 128 + signal.SIGALRM, f"expected 142 (128+SIGALRM), got {rc}"
|
||||
assert wait_marker(out, "TEARDOWN", timeout=5), "teardown funnel did not run on deadline"
|
||||
assert wait_lock_state(DOMAIN, "free") == "free"
|
||||
|
||||
|
||||
def test_16_sigterm_runs_teardown_funnel_and_releases(lock_dir, pool):
|
||||
"""Case 16: SIGTERM (drone cancel path) -> the finally: teardown funnel runs, exit is
|
||||
non-zero, lock released."""
|
||||
p, out = pool.spawn("guarded", DOMAIN, "3600")
|
||||
assert wait_marker(out, "READY")
|
||||
p.send_signal(signal.SIGTERM)
|
||||
rc = p.wait(timeout=20)
|
||||
assert rc != 0, f"SIGTERM exit must be non-zero (got {rc})"
|
||||
assert rc == 128 + signal.SIGTERM, f"expected 143 (128+SIGTERM), got {rc}"
|
||||
assert wait_marker(out, "TEARDOWN", timeout=5), "teardown funnel did not run on SIGTERM"
|
||||
assert wait_lock_state(DOMAIN, "free") == "free"
|
||||
85
tests/concurrency/test_locks.py
Normal file
85
tests/concurrency/test_locks.py
Normal file
@ -0,0 +1,85 @@
|
||||
"""Lock fundamentals (concurrency-restructure plan, cases 1-4). Real kernel flocks held by real
|
||||
subprocesses — nothing mocked."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import fcntl
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
sys.path.insert(0, os.path.dirname(__file__))
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner"))
|
||||
from concutil import ( # noqa: E402
|
||||
DOMAIN,
|
||||
lock_state,
|
||||
wait_lock_state,
|
||||
wait_marker,
|
||||
)
|
||||
from harness import lifecycle # noqa: E402
|
||||
|
||||
|
||||
def test_1_sigkill_releases_lock(lock_dir, pool):
|
||||
"""Case 1: acquire -> holder SIGKILL'd -> lock immediately acquirable (kernel auto-release).
|
||||
The exact property the old pidfile registry approximated with /proc checks."""
|
||||
p, out = pool.spawn("hold", DOMAIN)
|
||||
assert wait_marker(out, "ACQUIRED"), "holder never acquired"
|
||||
assert lock_state(DOMAIN) == "held"
|
||||
p.kill()
|
||||
p.wait(timeout=10)
|
||||
assert wait_lock_state(DOMAIN, "free") == "free"
|
||||
|
||||
|
||||
def test_2_nb_probe_held_vs_unheld(lock_dir, pool):
|
||||
"""Case 2: LOCK_NB probe raises BlockingIOError against a held lock; succeeds when unheld."""
|
||||
p, out = pool.spawn("hold", DOMAIN)
|
||||
assert wait_marker(out, "ACQUIRED")
|
||||
path = lifecycle._app_lock_path(DOMAIN) # noqa: SLF001
|
||||
with open(path, "a") as f:
|
||||
try:
|
||||
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
raise AssertionError("LOCK_NB succeeded against a held lock")
|
||||
except BlockingIOError:
|
||||
pass
|
||||
p.kill()
|
||||
p.wait(timeout=10)
|
||||
assert wait_lock_state(DOMAIN, "free") == "free"
|
||||
with open(path, "a") as f:
|
||||
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) # must not raise now
|
||||
|
||||
|
||||
def test_3_lock_fd_not_inherited_by_children(lock_dir, pool):
|
||||
"""Case 3 (PEP 446): the holder spawns a subprocess child, the holder dies, the child lives —
|
||||
and the lock is STILL released (the child never inherited the lock fd). This is what makes
|
||||
'held lock == live HARNESS owner' sound even though runs spawn abra/docker/pytest children."""
|
||||
p, out = pool.spawn("hold-with-child", DOMAIN)
|
||||
assert wait_marker(out, "ACQUIRED")
|
||||
child_line = wait_marker(out, "CHILD")
|
||||
assert child_line, "holder never reported its child pid"
|
||||
child_pid = int(child_line.split()[1])
|
||||
pool.track_pid(child_pid)
|
||||
p.kill()
|
||||
p.wait(timeout=10)
|
||||
assert os.path.exists(f"/proc/{child_pid}"), "child should outlive the holder"
|
||||
assert (
|
||||
wait_lock_state(DOMAIN, "free") == "free"
|
||||
), "lock must release on holder death even with a live child (PEP 446 non-inheritable fd)"
|
||||
|
||||
|
||||
def test_4_second_acquire_blocks_until_first_exits(lock_dir, pool):
|
||||
"""Case 4: a second same-domain acquire blocks until the first holder exits — the
|
||||
double-!testme serialisation property."""
|
||||
p1, out1 = pool.spawn("hold", DOMAIN)
|
||||
assert wait_marker(out1, "ACQUIRED")
|
||||
p2, out2 = pool.spawn("hold", DOMAIN)
|
||||
# p2 must NOT acquire while p1 holds.
|
||||
time.sleep(1.5)
|
||||
assert wait_marker(out2, "ACQUIRED", timeout=0.1) is None, "second acquire did not block"
|
||||
t_kill = time.time()
|
||||
p1.kill()
|
||||
p1.wait(timeout=10)
|
||||
line = wait_marker(out2, "ACQUIRED", timeout=15)
|
||||
assert line, "second acquire never completed after first holder exited"
|
||||
acquired_ts = float(line.split()[1])
|
||||
assert acquired_ts >= t_kill - 0.05, "second holder acquired before the first exited"
|
||||
assert lock_state(DOMAIN) == "held"
|
||||
Reference in New Issue
Block a user