test(concurrency): real-kernel suite for the restructured model — 20 tests, 19 plan cases
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
tests/concurrency/ — NOT in the default `pytest tests/unit` gate; run explicitly with `pytest tests/concurrency -q`. flock/prctl/alarm are never mocked: helper subprocesses (helpers.py) hold real locks and install the real lifetime guards; locks live in a per-test tmp dir via CCCI_APP_LOCK_DIR; every helper (and recorded grandchild) is reaped by fixture cleanup. - test_locks.py (cases 1-4): SIGKILL auto-release; LOCK_NB held/unheld semantics; PEP 446 fd-not-inherited (holder's child survives, lock still releases); same-domain second acquire blocks until first holder exits. - test_janitor.py (cases 5-12): orphan reaped once + lockfile unlinked; live holder never reaped + logged; new-run acquire blocks until a slow reap completes (reap-under-probe-lock); two overlapping janitors -> exactly one reaps (flock arbitration); reboot sim (no lockfile) reaps immediately with no age wait; >120min-held lock flagged 'possible leaked run' and NOT stolen; warm/canonical names never probed (no lockfile even created); directory-as-lockfile and missing lock dir degrade to skip+log, never crash. - test_lifetime.py (cases 13-16): PDEATHSIG (wrapper parent SIGKILL'd -> guarded child TERM'd, teardown marker, lock released); already-orphaned helper REFUSES to run (ppid race); 2s deadline alarm -> teardown + exit 142 + lock released; SIGTERM -> teardown + exit 143 + lock released. - test_abra_dir.py (cases 17-19 + 18b): per-run dir built + $ABRA_DIR exported before the first abra call (recording stub abra on PATH); two CONCURRENT same-recipe fetch+checkout flows into different ABRA_DIRs -> divergent correct trees, canonical staged clone untouched; .env written through the servers/ symlink lands in the canonical path (env_get/env_set agree); manual runs get pid-suffixed dirs. On cc-ci: pytest tests/concurrency -q -> 20 passed; tests/unit -> 138 passed; lint PASS.
This commit is contained in:
189
tests/concurrency/test_janitor.py
Normal file
189
tests/concurrency/test_janitor.py
Normal file
@ -0,0 +1,189 @@
|
||||
"""Janitor / flock-probe semantics (concurrency-restructure plan, cases 5-12).
|
||||
|
||||
The janitor runs IN-PROCESS with its discovery monkeypatched (candidates injected via a stubbed
|
||||
abra.app_ls + empty docker sweep) and teardown_app stubbed to record calls — but the LOCKS are
|
||||
real kernel flocks, held by real helper subprocesses where a live owner is needed."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
sys.path.insert(0, os.path.dirname(__file__))
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner"))
|
||||
from concutil import DOMAIN, lock_state, wait_marker # noqa: E402
|
||||
from harness import lifecycle # noqa: E402
|
||||
|
||||
|
||||
def _inject_candidates(monkeypatch, domains):
|
||||
"""Point janitor discovery at exactly `domains`: abra lists them, docker sweep is empty.
|
||||
teardown_app is stubbed to a recorder; returns the calls list."""
|
||||
calls = []
|
||||
monkeypatch.setattr(lifecycle.abra, "app_ls", lambda: [{"appName": d} for d in domains])
|
||||
monkeypatch.setattr(lifecycle, "_docker_names", lambda kind, stack: [])
|
||||
monkeypatch.setattr(lifecycle, "teardown_app", lambda d, verify=True: calls.append(d))
|
||||
return calls
|
||||
|
||||
|
||||
def test_5_orphan_reaped_lockfile_unlinked(lock_dir, pool, monkeypatch):
|
||||
"""Case 5: an orphan (lockfile exists, no holder — its run was SIGKILL'd) is reaped exactly
|
||||
once and its lockfile unlinked."""
|
||||
p, out = pool.spawn("hold", DOMAIN)
|
||||
assert wait_marker(out, "ACQUIRED")
|
||||
p.kill()
|
||||
p.wait(timeout=10)
|
||||
calls = _inject_candidates(monkeypatch, [DOMAIN])
|
||||
lifecycle.janitor()
|
||||
assert calls == [DOMAIN], f"teardown calls: {calls} (expected exactly one)"
|
||||
assert lock_state(DOMAIN) == "absent", "reaped orphan's lockfile must be unlinked"
|
||||
|
||||
|
||||
def test_6_live_run_never_reaped(lock_dir, pool, monkeypatch, capsys):
|
||||
"""Case 6: a held lock (live helper) is never reaped and is logged as live."""
|
||||
p, out = pool.spawn("hold", DOMAIN)
|
||||
assert wait_marker(out, "ACQUIRED")
|
||||
calls = _inject_candidates(monkeypatch, [DOMAIN])
|
||||
lifecycle.janitor()
|
||||
assert calls == []
|
||||
assert "live concurrent run" in capsys.readouterr().out
|
||||
assert lock_state(DOMAIN) == "held"
|
||||
|
||||
|
||||
def test_7_new_run_blocks_until_reap_finishes(lock_dir, pool, monkeypatch):
|
||||
"""Case 7: the janitor reaps WHILE HOLDING the probe lock, so a new run of the same domain
|
||||
blocks in acquire_app_lock until the reap completes — no window where a fresh app coexists
|
||||
with a half-reaped one."""
|
||||
# Make an orphan.
|
||||
p, out = pool.spawn("hold", DOMAIN)
|
||||
assert wait_marker(out, "ACQUIRED")
|
||||
p.kill()
|
||||
p.wait(timeout=10)
|
||||
|
||||
state = {"teardown_end": None, "acquirer_out": None}
|
||||
|
||||
def slow_teardown(domain, verify=True):
|
||||
# While the janitor holds the probe lock mid-reap, a new run starts acquiring.
|
||||
_, aout = pool.spawn("hold", DOMAIN)
|
||||
state["acquirer_out"] = aout
|
||||
time.sleep(2.0)
|
||||
state["teardown_end"] = time.time()
|
||||
|
||||
monkeypatch.setattr(lifecycle.abra, "app_ls", lambda: [{"appName": DOMAIN}])
|
||||
monkeypatch.setattr(lifecycle, "_docker_names", lambda kind, stack: [])
|
||||
monkeypatch.setattr(lifecycle, "teardown_app", slow_teardown)
|
||||
lifecycle.janitor()
|
||||
|
||||
line = wait_marker(state["acquirer_out"], "ACQUIRED", timeout=15)
|
||||
assert line, "new run never acquired after the reap"
|
||||
acquired_ts = float(line.split()[1])
|
||||
assert (
|
||||
acquired_ts >= state["teardown_end"]
|
||||
), f"new run acquired at {acquired_ts} BEFORE the reap finished at {state['teardown_end']}"
|
||||
# The new run must hold a lock the next probe can SEE (fresh inode at the path).
|
||||
assert lock_state(DOMAIN) == "held"
|
||||
|
||||
|
||||
def test_8_two_janitors_exactly_one_reaps(lock_dir, pool, monkeypatch):
|
||||
"""Case 8: two concurrent janitors arbitrate on the probe flock — exactly one reaps (the
|
||||
other sees 'held' and leaves). Teardown is slowed so the runs genuinely overlap."""
|
||||
p, out = pool.spawn("hold", DOMAIN)
|
||||
assert wait_marker(out, "ACQUIRED")
|
||||
p.kill()
|
||||
p.wait(timeout=10)
|
||||
|
||||
calls = []
|
||||
calls_lock = threading.Lock()
|
||||
|
||||
def slow_teardown(domain, verify=True):
|
||||
with calls_lock:
|
||||
calls.append(domain)
|
||||
time.sleep(2.0)
|
||||
|
||||
monkeypatch.setattr(lifecycle.abra, "app_ls", lambda: [{"appName": DOMAIN}])
|
||||
monkeypatch.setattr(lifecycle, "_docker_names", lambda kind, stack: [])
|
||||
monkeypatch.setattr(lifecycle, "teardown_app", slow_teardown)
|
||||
|
||||
barrier = threading.Barrier(2)
|
||||
|
||||
def run_janitor():
|
||||
barrier.wait()
|
||||
lifecycle.janitor()
|
||||
|
||||
t1, t2 = threading.Thread(target=run_janitor), threading.Thread(target=run_janitor)
|
||||
t1.start(), t2.start()
|
||||
t1.join(timeout=30), t2.join(timeout=30)
|
||||
assert calls == [DOMAIN], f"expected exactly one reap, got {calls}"
|
||||
assert lock_state(DOMAIN) == "absent"
|
||||
|
||||
|
||||
def test_9_reboot_lockfile_absent_reaped_immediately(lock_dir, monkeypatch):
|
||||
"""Case 9: post-reboot simulation — the app exists but its lockfile is gone (/run/lock is
|
||||
tmpfs). The probe trivially acquires -> immediate reap, NO age threshold (improvement over
|
||||
the old 2h fallback)."""
|
||||
assert lock_state(DOMAIN) == "absent"
|
||||
calls = _inject_candidates(monkeypatch, [DOMAIN])
|
||||
t0 = time.time()
|
||||
lifecycle.janitor()
|
||||
assert calls == [DOMAIN]
|
||||
assert time.time() - t0 < 5, "reap must be immediate (no age wait)"
|
||||
|
||||
|
||||
def test_10_long_held_lock_flagged_never_stolen(lock_dir, pool, monkeypatch, capsys):
|
||||
"""Case 10: a lock held with mtime older than 120min is flagged as a possible leaked run —
|
||||
and NOT reaped (never steal a held lock)."""
|
||||
p, out = pool.spawn("hold", DOMAIN)
|
||||
assert wait_marker(out, "ACQUIRED")
|
||||
path = lifecycle._app_lock_path(DOMAIN) # noqa: SLF001
|
||||
backdate = time.time() - (130 * 60)
|
||||
os.utime(path, (backdate, backdate))
|
||||
calls = _inject_candidates(monkeypatch, [DOMAIN])
|
||||
lifecycle.janitor()
|
||||
assert calls == []
|
||||
out_text = capsys.readouterr().out
|
||||
assert "possible leaked run" in out_text and "lslocks" in out_text
|
||||
assert lock_state(DOMAIN) == "held"
|
||||
|
||||
|
||||
def test_11_warm_canonical_names_never_probed(lock_dir, monkeypatch):
|
||||
"""Case 11: RUN_APP_RE allowlist — warm/canonical-shaped names never become candidates, so
|
||||
they are never probed (no lockfile is even created for them) and never reaped."""
|
||||
warmish = [
|
||||
"warm-keycloak.ci.commoninternet.net",
|
||||
"keycloak.ci.commoninternet.net",
|
||||
"warm-hedgedoc.ci.commoninternet.net",
|
||||
"drone.ci.commoninternet.net",
|
||||
]
|
||||
calls = []
|
||||
monkeypatch.setattr(lifecycle.abra, "app_ls", lambda: [{"appName": d} for d in warmish])
|
||||
monkeypatch.setattr(
|
||||
lifecycle,
|
||||
"_docker_names",
|
||||
lambda kind, stack: ["warm-keycloak_ci_commoninternet_net_app"]
|
||||
if kind == "service"
|
||||
else [],
|
||||
)
|
||||
monkeypatch.setattr(lifecycle, "teardown_app", lambda d, verify=True: calls.append(d))
|
||||
lifecycle.janitor()
|
||||
assert calls == []
|
||||
lockdir = os.environ["CCCI_APP_LOCK_DIR"]
|
||||
assert [
|
||||
f for f in os.listdir(lockdir) if f.startswith("cc-ci-app-")
|
||||
] == [], "janitor must not create lockfiles for non-run-app names"
|
||||
|
||||
|
||||
def test_12_degrades_safely_on_bad_lockfile_and_missing_dir(lock_dir, monkeypatch, capsys):
|
||||
"""Case 12: a garbled/unopenable lockfile (here: a DIRECTORY at the lockfile path) is skipped
|
||||
with a log line; a missing lock dir doesn't crash the janitor either. Never a crash."""
|
||||
path = lifecycle._app_lock_path(DOMAIN) # noqa: SLF001
|
||||
os.makedirs(path) # open(path, "a") -> IsADirectoryError (an OSError)
|
||||
calls = _inject_candidates(monkeypatch, [DOMAIN])
|
||||
lifecycle.janitor() # must not raise
|
||||
assert calls == []
|
||||
assert "skipping" in capsys.readouterr().out
|
||||
|
||||
os.rmdir(path)
|
||||
monkeypatch.setenv("CCCI_APP_LOCK_DIR", os.path.join(os.environ["CCCI_APP_LOCK_DIR"], "gone"))
|
||||
lifecycle.janitor() # missing dir: probe open fails -> skip; tidy glob -> empty. No crash.
|
||||
assert calls == []
|
||||
Reference in New Issue
Block a user