"""Lock fundamentals (concurrency-restructure plan, cases 1-4). Real kernel flocks held by real subprocesses — nothing mocked.""" from __future__ import annotations import fcntl import os import sys import time sys.path.insert(0, os.path.dirname(__file__)) sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner")) from concutil import ( # noqa: E402 DOMAIN, lock_state, wait_lock_state, wait_marker, ) from harness import lifecycle # noqa: E402 def test_1_sigkill_releases_lock(lock_dir, pool): """Case 1: acquire -> holder SIGKILL'd -> lock immediately acquirable (kernel auto-release). The exact property the old pidfile registry approximated with /proc checks.""" p, out = pool.spawn("hold", DOMAIN) assert wait_marker(out, "ACQUIRED"), "holder never acquired" assert lock_state(DOMAIN) == "held" p.kill() p.wait(timeout=10) assert wait_lock_state(DOMAIN, "free") == "free" def test_2_nb_probe_held_vs_unheld(lock_dir, pool): """Case 2: LOCK_NB probe raises BlockingIOError against a held lock; succeeds when unheld.""" p, out = pool.spawn("hold", DOMAIN) assert wait_marker(out, "ACQUIRED") path = lifecycle._app_lock_path(DOMAIN) # noqa: SLF001 with open(path, "a") as f: try: fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) raise AssertionError("LOCK_NB succeeded against a held lock") except BlockingIOError: pass p.kill() p.wait(timeout=10) assert wait_lock_state(DOMAIN, "free") == "free" with open(path, "a") as f: fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) # must not raise now def test_3_lock_fd_not_inherited_by_children(lock_dir, pool): """Case 3 (PEP 446): the holder spawns a subprocess child, the holder dies, the child lives — and the lock is STILL released (the child never inherited the lock fd). This is what makes 'held lock == live HARNESS owner' sound even though runs spawn abra/docker/pytest children.""" p, out = pool.spawn("hold-with-child", DOMAIN) assert wait_marker(out, "ACQUIRED") child_line = wait_marker(out, "CHILD") assert child_line, "holder never reported its child pid" child_pid = int(child_line.split()[1]) pool.track_pid(child_pid) p.kill() p.wait(timeout=10) assert os.path.exists(f"/proc/{child_pid}"), "child should outlive the holder" assert ( wait_lock_state(DOMAIN, "free") == "free" ), "lock must release on holder death even with a live child (PEP 446 non-inheritable fd)" def test_4_second_acquire_blocks_until_first_exits(lock_dir, pool): """Case 4: a second same-domain acquire blocks until the first holder exits — the double-!testme serialisation property.""" p1, out1 = pool.spawn("hold", DOMAIN) assert wait_marker(out1, "ACQUIRED") p2, out2 = pool.spawn("hold", DOMAIN) # p2 must NOT acquire while p1 holds. time.sleep(1.5) assert wait_marker(out2, "ACQUIRED", timeout=0.1) is None, "second acquire did not block" t_kill = time.time() p1.kill() p1.wait(timeout=10) line = wait_marker(out2, "ACQUIRED", timeout=15) assert line, "second acquire never completed after first holder exited" acquired_ts = float(line.split()[1]) assert acquired_ts >= t_kill - 0.05, "second holder acquired before the first exited" assert lock_state(DOMAIN) == "held"