Compare commits

...

9 Commits

Author SHA1 Message Date
08bbb60343 fix(watchdog): stop phase-machine handoff/gate-token work after SEQUENCE-COMPLETE
Gate-token tracking + handoff pings kept running on the completed phase machine,
churning 0-token gate records every tick. Gate them on `not seq_done`.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01UWTdUq2bsic7JZGqJp3nD6
2026-06-24 15:38:02 +00:00
164df87e98 fix(wake): persistent-agent wakes survive SEQUENCE-COMPLETE
The watchdog gated ALL scheduled wakes behind `if not seq_done`, so once a phase
sequence completed, even a persistent operator-facing supervisor stopped waking.
That breaks follow-on supervision (e.g. a second build started after the first
sequence finishes). Now: loop-tied wakes (on-demand auditor etc.) still quiet after
completion, but persistent agents keep waking — their hourly supervision survives.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01UWTdUq2bsic7JZGqJp3nD6
2026-06-24 15:36:02 +00:00
44bb1da1be feat(watchdog): DONE-nudge for ceremony-lag (built-but-unmarked phase) before kill+reboot
Recurring stall: a phase is substantively complete (all DoD gates PASS from both
adversaries, no veto) but the builder never writes the done marker, so auto-advance
cannot fire and the loops idle. A blunt stall kill+reboot does not fix it (the
re-kickoffed agent just re-idles).

On a stall, if the agent is a loop agent and the current phase is NOT marked done,
send a one-time DONE-nudge (ping) telling it to write the done marker IF the DoD is
met (both adversaries PASS, no veto), giving a fresh idle window; only escalate to
the kill+reboot if it stays stalled. One nudge per phase (cleared on phase advance).
Gated by [loop].done_nudge (default true); message uses the configured done_marker
and the phase status file.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01UWTdUq2bsic7JZGqJp3nD6
2026-06-24 02:40:14 +00:00
e6b53513d4 feat(wake): re-run one-shot task agents on their wake interval (autonomous cadence)
wake_agent only re-prompted a live persistent/loop session and returned False for a
dead one, so a "task" agent (one-shot, exits after its run) could not be re-run on a
schedule — its wake never fired. Now, for kind=="task", a wake kills+restarts the
task for a clean re-run (skipping only while its previous run is still active). This
makes scheduled work like a coverage audit recur autonomously, no operator trigger.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01UWTdUq2bsic7JZGqJp3nD6
2026-06-23 05:17:07 +00:00
65ceeb3a7b fix(watchdog): seed stall clock from pane's real last-activity, not watchdog start
Stall detection tracked idle time in an in-memory _idle_since map seeded to now()
on first observation, so a freshly-(re)started watchdog reset every agent's stall
clock and had to wait a full stall_idle before it could nudge — an agent idle for
an hour looked freshly-idle after a watchdog restart. Seed  from the tmux
window's last-activity timestamp (#{window_activity}) instead, so idle duration
reflects the agent's real last activity regardless of when the watchdog started.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01UWTdUq2bsic7JZGqJp3nD6
2026-06-23 04:40:34 +00:00
57082acc05 fix(tokens): restore token_phase_flush re-baseline; drop stray block from gate_token_check
The per-gate functions were inserted immediately after token_phase_flush's log
line, which split the function: its trailing re-baseline block (the
'if next_phase_id is not None: ...' that re-seeds the per-phase baseline for the
next phase, or finalizes when None) was orphaned onto the end of gate_token_check,
where next_phase_id is undefined. The watchdog therefore crashed with NameError on
the first tick of every start. Move that block back into token_phase_flush (where
next_phase_id/cur/sf are in scope) and end gate_token_check at its log line.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01UWTdUq2bsic7JZGqJp3nD6
2026-06-22 07:27:50 +00:00
188c12ad9e feat: configurable per-gate token logging + responsive phase auto-advance
Two watchdog/metrics improvements to the loop machine:

1) Token-logging granularity is configurable via [watchdog].token_granularity:
   'gate' (default) or 'phase'. In 'gate' mode, tokens are attributed to each
   claimed gate -- any 'claim(<label>)' commit on the work repo's origin/main
   (e.g. claim(D1-D5), claim(feat:multi-file); a leading 'feat:' is stripped) --
   in addition to the per-phase rollup, appended to token-log.jsonl tagged
   phase_id='<phase>:<label>'. A change in the most-recently-claimed label is the
   boundary; the in-flight gate is also flushed when the phase ends. 'phase' mode
   keeps the original per-phase-only behaviour.

2) Phase auto-advance is now evaluated on EVERY signal tick instead of only the
   heavy tick, so a completed phase advances within signal_interval of its
   '## DONE' landing rather than idling up to heavy_interval. Healing stays on the
   heavy cadence.

Note: gate-boundary detection assumes the loop's 'claim(<label>)' commit convention.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01UWTdUq2bsic7JZGqJp3nD6
2026-06-22 05:15:08 +00:00
98d198baa9 feat(handoff): claim_pings/review_pings accept a list — ping every reviewer
Multi-reviewer setups (e.g. a correctness + a readability adversary) can now have
the watchdog ping ALL reviewers on a claim, each in its own session with its own
submit key. A bare string still works (single agent). _ping_agents() helper.
2026-06-22 00:24:41 +00:00
781db071dd docs(readme): add Examples section (Builder/Adversary variants, snakepit) + benchmark note 2026-06-16 02:35:40 +00:00
2 changed files with 213 additions and 21 deletions

View File

@ -16,6 +16,7 @@ agents.py the driver + watchdog (pure Python stdlib; needs python >=
agent-log.py render claude JSONL transcripts into clean, greppable logs
agents.example.toml a self-contained 2-agent example project
prompts/ generic role + kickoff templates (builder / adversary / kickoff)
examples/ runnable example projects — the Builder/Adversary variant family, snakepit, …
smoke.sh bring the example up + tear it down in an isolated sandbox, then clean up
tests/ the test suite — unit tests + isolated live backend smokes + a runner
flake.nix/.lock a Nix devShell with the runtime deps (python311, tmux, git)
@ -49,6 +50,42 @@ python3 agents.py --config agents.toml phase show # where the loop phase mach
---
## Examples
`examples/` holds runnable example projects — copy one, point `agents.py` at its `agents.toml`, and
go. The headline set is a family of **Builder/Adversary** variants that build the *same* task but each
differ in one dimension — useful both as templates and as a study of the pattern:
- **`builder-adversary`** — the canonical loop pair: a Builder that builds and an Adversary that
cold-verifies every claim, coordinating only through git (`claim(`/`review(` commits + the watchdog
handoff). **Start here.**
- **`builder-adversary-min`** — the same pattern with the prompts compressed to minimal tokens.
- **`builder-adversary-stateless`** — `builder-adversary` + **context hygiene** (compact at each
checkpoint, read diffs not trees, lean loads) to minimise carried/reloaded context.
- **`builder-adversary-lean`** — context hygiene + **per-gate** review (one claim/verdict per gate).
- **`builder-adversary-deferred`** — the Adversary verifies **once**, after the whole build, in a
final comprehensive `review` phase (vs per-phase / per-gate).
- **`builder-solo`** — a single Builder that self-certifies, with **no Adversary** (the control).
- **`snakepit`** — a different topology entirely: a pool of identical worker "snakes" pulling tasks
from a shared filesystem queue, plus cleanup specialists. (`examples/IDEAS.md` sketches more.)
Each example has its own `README.md`. Run one by hand:
```bash
cd examples/builder-adversary
python3 ../../agents.py status --config agents.toml # read-only
python3 ../../agents.py up --config agents.toml # needs `claude` on PATH
```
**Benchmark.** The separate
[`agent-orchestrator-benchmark`](https://git.autonomic.zone/recipe-maintainers/agent-orchestrator-benchmark)
repo runs these Builder/Adversary variants head-to-head (N=5, real `agents.py up` runs) to measure
what drives token cost. Short version: an independent adversary costs **~4.7×** a solo builder, but
the review *cadence* (per-gate / per-phase / deferred) is **nearly token-neutral**, and **context
hygiene** is the one clean **~22%** win. See that repo's `FINDINGS.md`.
---
## The config: `agents.toml`
Five section types: `[watchdog]`, `[backend.<name>]`, `[defaults]`, `[[agent]]` / `[[service]]`,
@ -230,6 +267,9 @@ phases = [
subject matches `claim_pattern` / `review_pattern`, and watches the two `inboxes` files. When a
claim lands it pings the `claim_pings` agent; a review pings `review_pings`; an inbox change
pings the relevant side. This is how the Builder and Adversary coordinate purely through git.
`claim_pings` / `review_pings` may be a single agent name **or a list** — e.g.
`claim_pings = ["correctness-adversary", "readability-adversary"]` pings every reviewer on a claim
(each in its own session), for multi-reviewer setups.
---

194
agents.py
View File

@ -466,6 +466,30 @@ def limit_tick(cfg, agent, pane):
# ── stall detection ──────────────────────────────────────────────────────────────
_idle_since: dict[str, float] = {}
_done_nudged: dict[str, bool] = {} # per-session: sent the one-time "write the done marker" nudge this phase
def _done_nudge_msg(cfg, ph):
"""The DONE-nudge: prompts a stalled loop agent to finalize a built-but-unmarked phase."""
dm = cfg["loop"].get("done_marker", "## DONE")
pid = ph.get("id", "")
status = ph.get("status", f"STATUS-{pid}.md")
sub = _state_subdir(cfg)
return (f"watchdog nudge: you've stalled in phase '{pid}', which is NOT yet marked '{dm}'. Resume now "
f"— pull any pending review/inbox and continue. If (and ONLY if) every DoD item has a fresh "
f"PASS from BOTH adversaries with no standing veto, write '{dm}' to {sub}/{status} and push, "
f"so the phase settles and auto-advances. Do not stay idle.")
def _pane_last_active(session):
"""Unix timestamp of the tmux window's last activity (last output change), or None.
Seeds idle-duration from the agent's REAL last activity rather than `now`, so stalls are
detected regardless of when the watchdog process started — restarting the watchdog no longer
resets every agent's stall clock."""
r = subprocess.run(f"tmux display-message -p -t {session!r} '#{{window_activity}}'",
shell=True, capture_output=True, text=True)
try:
return float(r.stdout.strip())
except (ValueError, AttributeError):
return None
def _last_nonempty_line(text):
for line in reversed(text.splitlines()):
@ -502,7 +526,9 @@ def stall_check_one(cfg, agent):
if pane_active(cfg, agent, pane):
_idle_since[session] = 0.0
return
since = _idle_since.get(session) or now
# Seed from the pane's real last-activity (not `now`), so a watchdog that just (re)started still
# sees an already-idle pane as idle-for-its-true-duration instead of resetting the clock.
since = _idle_since.get(session) or _pane_last_active(session) or now
_idle_since[session] = since
idle = now - since
grace = int(cfg["watchdog"].get("stall_grace", 180))
@ -516,6 +542,17 @@ def stall_check_one(cfg, agent):
if idle < stall_idle:
return
reason = f"idle {int(idle)}s with no WAITING-UNTIL marker"
# Ceremony-lag guard: a loop agent idling in a phase that's built but NOT marked done won't let the
# phase advance (the recurring "all gates PASS but no ## DONE written" stall). Nudge it ONCE per phase
# to finalize (write the done marker if the DoD is met) before falling back to the blunt kill+reboot.
if (cfg["loop"].get("done_nudge", True) and agent.get("kind") == "loop" and phases(cfg)
and not phase_done(cfg, cur_phase(cfg).get("status", "")) and not _done_nudged.get(session)):
log(f"stall: {agent['name']} ({session}) {reason} — DONE-nudge (phase built but not marked done)")
ping_session(session, _done_nudge_msg(cfg, cur_phase(cfg)),
submit_key=backend_of(cfg, agent).get("submit_key", "Enter"))
_done_nudged[session] = True
_idle_since[session] = now # fresh idle window to act on the nudge before reboot escalates
return
log(f"stall: {agent['name']} ({session}) {reason} — kill + reboot")
start_agent(cfg, agent, force=True)
_idle_since[session] = 0.0
@ -564,6 +601,15 @@ def wake_agent(cfg, agent):
if not wake:
return True
session = agent["session"]
# A one-shot `task` is "woken" by RE-RUNNING it fresh — it has no persistent REPL to re-prompt — so
# scheduled work (e.g. a coverage audit) recurs autonomously on its interval, no operator needed.
# Skip only while its previous run is still going; otherwise kill + restart for a clean re-run.
if agent.get("kind") == "task":
if session_alive(session) and pane_active(cfg, agent, capture_pane(session, 25)):
return False
log(f"wake: re-running task {agent['name']} ({session})")
start_agent(cfg, agent, force=True)
return True
if not session_alive(session):
return False
backend = backend_of(cfg, agent)
@ -599,15 +645,22 @@ def _show_pushed(cfg, repo, path):
return r.stdout
return ""
def _ping_agents(cfg, value, default, msg):
"""Ping one or more agents. `value` is an agent name, a LIST of names, or falsy (→ default).
Each target is pinged in its own session with its own backend's submit key — so a handoff can
notify multiple reviewers (e.g. claim_pings = ["correctness-adversary", "readability-adversary"])."""
names = value if isinstance(value, list) else [value or default]
for name in names:
agent = cfg["agents"].get(name)
session = agent["session"] if agent and agent.get("session") else (cfg["session_prefix"] + str(name))
submit = backend_of(cfg, agent).get("submit_key", "Enter") if agent else "Enter"
ping_session(session, msg, submit_key=submit)
def handoff_check(cfg):
h = cfg["loop"].get("handoff")
if not h:
return
repo = handoff_repo(cfg)
sub = lambda name: cfg["agents"].get(name, {}).get("session", cfg["session_prefix"] + name)
builder_name = h.get("review_pings", "builder")
submit = (backend_of(cfg, cfg["agents"][builder_name]).get("submit_key", "Enter")
if builder_name in cfg["agents"] else "Enter")
claim_pat = h.get("claim_pattern", "^claim")
review_pat = h.get("review_pattern", "^review")
_git(repo, "fetch -q origin")
@ -618,15 +671,15 @@ def handoff_check(cfg):
elif head != _hand["sha"]:
subjects = _git(repo, f"log --format=%s {_hand['sha']}..origin/main").stdout
if re.search(claim_pat, subjects, re.M | re.I):
log("handoff: claim commit → pinging reviewer")
ping_session(sub(h.get("claim_pings", "adversary")),
log("handoff: claim commit → pinging reviewer(s)")
_ping_agents(cfg, h.get("claim_pings", "adversary"), "adversary",
"watchdog ping: the other loop pushed a gate CLAIM commit. "
"Pull and verify the claimed gate now.", submit_key=submit)
"Pull and verify the claimed gate now.")
if re.search(review_pat, subjects, re.M | re.I):
log("handoff: review commit → pinging builder")
ping_session(sub(h.get("review_pings", "builder")),
_ping_agents(cfg, h.get("review_pings", "builder"), "builder",
"watchdog ping: the other loop pushed a verdict/finding commit. "
"Pull the review file and act.", submit_key=submit)
"Pull the review file and act.")
_hand["sha"] = head
inboxes = h.get("inboxes", [])
md5 = lambda s: hashlib.md5(s.encode()).hexdigest()
@ -642,9 +695,9 @@ def handoff_check(cfg):
hh = md5(content)
if hh != _hand[key]:
log(f"handoff: {fname} changed → pinging {target}")
ping_session(sub(target),
_ping_agents(cfg, target, target,
f"watchdog ping: the other loop pushed {sub_dir}/{fname} — pull, read it, "
f"act, then delete the file (commit + push) to mark it consumed.", submit_key=submit)
f"act, then delete the file (commit + push) to mark it consumed.")
_hand[key] = hh
else:
_hand[key] = ""
@ -757,6 +810,93 @@ def token_phase_flush(cfg, next_phase_id):
else:
sf.unlink(missing_ok=True)
# ── token logging granularity: per phase, or also per GATE (log_tokens) ────────────
# Phases are tracked per phase (token_phase_begin/flush). With token_granularity="gate" (the default)
# tokens are ALSO attributed to each gate. A "gate" is a claimed unit — any `claim(<label>)` commit on
# the work repo's origin/main (e.g. claim(D1-D5), claim(feat:multi-file)); a leading "feat:" is
# stripped for readability. A change in the most-recently-claimed label is a boundary; on each
# boundary the previous gate's per-agent token delta + duration is appended to token-log.jsonl tagged
# phase_id="<phase>:<label>", so `agents.py tokens` lists it as its own row. The per-phase rollup
# record is written either way; "phase" granularity logs only that.
_gate_claim_re = re.compile(r"^claim\(\s*([^)]+?)\s*\)", re.I)
def token_granularity(cfg):
"""'gate' (default; per claimed gate, plus the per-phase rollup) or 'phase' (per phase only)."""
g = (cfg.get("watchdog", {}).get("token_granularity")
or cfg.get("loop", {}).get("token_granularity") or "gate")
return g if g in ("gate", "phase") else "gate"
def _token_gate_state_path(cfg):
return Path(cfg["state_dir"]) / "token-gate.json"
def _latest_claimed_gate(cfg):
"""Label of the most-recent `claim(<label>)` subject on the work repo's origin/main, or None.
A leading 'feat:' is stripped so feature gates read as their bare name."""
r = _git(handoff_repo(cfg), "log -1 --format=%s --grep 'claim(' origin/main")
m = _gate_claim_re.match((r.stdout or "").strip())
if not m:
return None
label = m.group(1).strip()
return label[5:].strip() if label.lower().startswith("feat:") else label
def _write_token_delta(cfg, phase_id, st):
"""Append one per-agent token delta record (vs the baseline in st) to token-log.jsonl."""
cur = _token_cumulative(cfg)
base = st.get("baseline", {})
started = st.get("started")
try:
dur = round((datetime.now() - datetime.fromisoformat(started)).total_seconds(), 1)
except Exception:
dur = None
per_agent = {n: _tok_delta(cur.get(n, {}), base.get(n, {})) for n in cur}
total = {k: sum(per_agent[n][k] for n in per_agent) for k in _TOKEN_KEYS}
rec = {"phase_id": phase_id, "started": started,
"ended": datetime.now().isoformat(timespec="seconds"), "duration_s": dur,
"agents": per_agent, "total": total}
with _token_log_path(cfg).open("a") as fh:
fh.write(json.dumps(rec) + "\n")
return rec, per_agent, total
def gate_token_flush(cfg):
"""Close out the currently-tracked gate (if any): write its token delta, then drop the state."""
sf = _token_gate_state_path(cfg)
try:
st = json.loads(sf.read_text())
except Exception:
return
gate = st.get("gate")
if not gate:
return
phase_id = f"{st['phase']}:{gate}" if st.get("phase") else gate
rec, per_agent, total = _write_token_delta(cfg, phase_id, st)
parts = ", ".join(f"{n}={per_agent[n]['total']:,}" for n in per_agent)
log(f"[log_tokens] gate {phase_id}: {total['total']:,} tok in {rec['duration_s']}s ({parts})")
try:
sf.unlink()
except Exception:
pass
def gate_token_check(cfg):
"""When token_granularity=='gate', detect gate boundaries and flush per-gate token deltas."""
if not log_tokens_enabled(cfg) or token_granularity(cfg) != "gate":
return
current = _latest_claimed_gate(cfg)
if not current:
return
sf = _token_gate_state_path(cfg)
try:
tracked = json.loads(sf.read_text()).get("gate")
except Exception:
tracked = None
if tracked == current:
return
if tracked:
gate_token_flush(cfg) # close out the previous gate before starting the next
sf.write_text(json.dumps({"gate": current, "phase": cur_phase(cfg).get("id"),
"started": datetime.now().isoformat(timespec="seconds"),
"baseline": _token_cumulative(cfg)}))
log(f"[log_tokens] tracking gate: {current}")
def phase_advance_check(cfg):
"""On heavy tick: if the current phase is DONE, advance (or finish the sequence).
@ -772,6 +912,8 @@ def phase_advance_check(cfg):
ph = ps[idx]
if not phase_done(cfg, ph["status"]):
return False
if log_tokens_enabled(cfg) and token_granularity(cfg) == "gate":
gate_token_flush(cfg) # close out the last in-flight gate before leaving the phase
nxt = idx + 1
if nxt < len(ps):
log(f"PHASE {ph['id']} DONE — auto-transitioning to {ps[nxt]['id']}")
@ -781,6 +923,7 @@ def phase_advance_check(cfg):
if marker.exists():
marker.unlink() # resuming into a (freshly-appended) phase — clear stale completion
handoff_reset()
_done_nudged.clear() # fresh DONE-nudge budget for the new phase
start_loops(cfg)
return True
# last phase is DONE → sequence complete
@ -819,14 +962,15 @@ def watchdog_loop(cfg_path):
wake_elapsed = {a["name"]: 0 for a in cfg["agents"].values() if a.get("wake")}
if log_tokens_enabled(cfg):
token_phase_begin(cfg, cur_phase(cfg).get("id"))
log("[log_tokens] enabled — per-phase token + time logging to token-log.jsonl")
log(f"[log_tokens] enabled (granularity={token_granularity(cfg)}) — token-log.jsonl")
while True:
cfg = load_config(cfg_path) # re-read every tick: config is authoritative, no env drift
has_loops = bool(loop_agents(cfg))
seq_done = (Path(cfg["log_dir"]) / "SEQUENCE-COMPLETE").exists()
if has_loops:
if has_loops and not seq_done:
handoff_check(cfg)
gate_token_check(cfg)
for a in watched(cfg):
if a["watch"] == "heal+stall":
stall_check_one(cfg, a)
@ -834,16 +978,24 @@ def watchdog_loop(cfg_path):
if session_alive(a["session"]):
limit_tick(cfg, a, capture_pane(a["session"], 40))
if not seq_done:
for name, el in list(wake_elapsed.items()):
interval = int(cfg["agents"][name]["wake"].get("interval", 3600))
if el >= interval:
if wake_agent(cfg, cfg["agents"][name]):
wake_elapsed[name] = 0
for name, el in list(wake_elapsed.items()):
agent = cfg["agents"][name]
# After the phase sequence completes, quiet the loop-tied wakes (e.g. the on-demand
# auditor) — but a PERSISTENT agent (the operator-facing supervisor) keeps waking, so its
# hourly supervision survives SEQUENCE-COMPLETE and can drive follow-on work (a second build).
if seq_done and agent.get("kind") != "persistent":
continue
interval = int(agent["wake"].get("interval", 3600))
if el >= interval:
if wake_agent(cfg, agent):
wake_elapsed[name] = 0
# Auto-advance is checked EVERY tick (not just the heavy tick) so a completed phase advances
# within signal_interval of its `## DONE` landing, instead of idling up to heavy_interval.
advanced = phase_advance_check(cfg) if has_loops else False
if elapsed >= heavy:
elapsed = 0
advanced = phase_advance_check(cfg) if has_loops else False
if not advanced:
for a in watched(cfg):
if seq_done and a["kind"] == "loop":