"""Shared utilities for the real-kernel concurrency suite (imported by the test modules; the fixtures in conftest.py wrap these). No flock mocking anywhere — probes use real LOCK_NB.""" from __future__ import annotations import contextlib import fcntl import os import signal import subprocess import sys import time sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner")) from harness import lifecycle # noqa: E402 HELPERS = os.path.join(os.path.dirname(__file__), "helpers.py") DOMAIN = "test-abc123.ci.commoninternet.net" # matches RUN_APP_RE class HelperPool: """Spawns helpers.py subprocesses and GUARANTEES their cleanup (incl. recorded grandchild pids from `hold-with-child`/`wrapper` markers) — no leaked children in the test VM.""" def __init__(self, out_dir: str): self.out_dir = out_dir self.procs: list[subprocess.Popen] = [] self.extra_pids: list[int] = [] self._n = 0 def spawn(self, *args: str, env_extra: dict | None = None) -> tuple[subprocess.Popen, str]: """Start `helpers.py `; returns (proc, marker_file).""" self._n += 1 out = os.path.join(self.out_dir, f"helper-{self._n}.out") env = dict(os.environ, CCCI_HELPER_OUT=out, **(env_extra or {})) p = subprocess.Popen( # noqa: S603 [sys.executable, HELPERS, *args], env=env, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT, ) self.procs.append(p) return p, out def track_pid(self, pid: int) -> None: self.extra_pids.append(pid) def cleanup(self) -> None: for p in self.procs: if p.poll() is None: p.kill() with contextlib.suppress(subprocess.TimeoutExpired): p.wait(timeout=10) for pid in self.extra_pids: with contextlib.suppress(OSError): os.kill(pid, signal.SIGKILL) def wait_marker(out: str, token: str, timeout: float = 15.0) -> str | None: """Poll a helper's marker file for a line containing `token`; returns the line or None.""" deadline = time.time() + timeout while time.time() < deadline: try: with open(out) as f: for line in f: if token in line: return line.strip() except OSError: pass time.sleep(0.1) return None def lock_state(domain: str) -> str: """'held' | 'free' | 'absent' for the domain's lockfile, probed with a REAL LOCK_NB.""" path = lifecycle._app_lock_path(domain) # noqa: SLF001 if not os.path.exists(path): return "absent" with open(path, "a") as f: try: fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) return "free" except BlockingIOError: return "held" def wait_lock_state(domain: str, want: str, timeout: float = 10.0) -> str: """Poll until lock_state(domain) == want (kernel release on process death is fast, but give the scheduler room). Returns the final observed state.""" deadline = time.time() + timeout state = lock_state(domain) while state != want and time.time() < deadline: time.sleep(0.1) state = lock_state(domain) return state def pid_alive(pid: int) -> bool: return os.path.exists(f"/proc/{pid}") def wait_pid_gone(pid: int, timeout: float = 15.0) -> bool: deadline = time.time() + timeout while time.time() < deadline: if not pid_alive(pid): return True time.sleep(0.1) return False