feat: agent-orchestrator v0.1.0 — generic multi-agent harness

Extracted and generalized from a project-specific agent launch engine. No project
specifics remain in code: paths, the loop kickoff preamble, handoff conventions, and the
on-complete hook are all config/template driven; session_prefix + log_dir are required.

- agents.py: driver + watchdog (data-driven backends via prompt_delivery arg|ping|exec;
  required session_prefix/log_dir; project-rooted path resolution; configurable kickoff
  template, handoff patterns, on_complete task; tmux-safe; selftest + init verbs)
- agent-log.py: config-driven claude transcript renderer
- agents.example.toml: self-contained 2-agent example (dependency-free demo backend)
- prompts/: generic builder/adversary/kickoff templates
- smoke.sh: isolated up+down sandbox proof that cleans up after itself
- flake.nix/.lock: devShell (python311 + tmux + git)
- README.md: schema + verbs + AI-PO usage + nix

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-13 18:39:00 +00:00
commit 289ef07df4
13 changed files with 1894 additions and 0 deletions

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
# runtime state + logs (never committed)
.ao-state/
*.log
__pycache__/
*.pyc
result

326
README.md Normal file
View File

@ -0,0 +1,326 @@
# agent-orchestrator
A generic, reusable harness for running and supervising a fleet of AI-agent sessions in **tmux**.
One driver script + one declarative config (`agents.toml`) describe every agent — a Builder /
Adversary loop pair, a persistent supervisor, a one-shot task — and a **watchdog** keeps them
alive, healed, paced, and coordinated. The watchdog reads the same config every tick, so there is
never any env-vs-file drift.
Nothing about any particular project lives in this repo. Paths, the loop **kickoff preamble**, the
**handoff conventions**, and the **on-complete** hook are all supplied by the project's config and
prompt files. A project consumes this repo as a pinned **git submodule** (`engine/`) and keeps its
own config, prompts, state, and tmux namespace — total isolation between projects.
```
agents.py the driver + watchdog (pure Python stdlib; needs python >= 3.11 for tomllib)
agent-log.py render claude JSONL transcripts into clean, greppable logs
agents.example.toml a self-contained 2-agent example project
prompts/ generic role + kickoff templates (builder / adversary / kickoff)
smoke.sh bring the example up + tear it down in an isolated sandbox, then clean up
flake.nix/.lock a Nix devShell with the runtime deps (python311, tmux, git)
```
---
## Quick start
```bash
nix develop # python311 + tmux + git on PATH (see "Nix" below)
python3 agents.py selftest # regression-test the activity detector (no config)
python3 agents.py status --config agents.example.toml # one table: every agent + the phase
./smoke.sh # prove up/down works end-to-end, isolated + clean
python3 agents.py init myproject # scaffold a starter agents.toml + prompts/
```
`up` is **use-or-create**: an already-running session is left alone, never double-started.
```bash
python3 agents.py --config agents.toml up # start all enabled agents + services + watchdog
python3 agents.py --config agents.toml up builder # start just one agent (by name)
python3 agents.py --config agents.toml down # stop everything
python3 agents.py --config agents.toml logs builder # tail one session's log
python3 agents.py --config agents.toml phase show # where the loop phase machine is
```
`--config` defaults to `./agents.toml`, falling back to one next to `agents.py`.
---
## The config: `agents.toml`
Five section types: `[watchdog]`, `[backend.<name>]`, `[defaults]`, `[[agent]]` / `[[service]]`,
and `[loop]`. See `agents.example.toml` for a complete, runnable example.
### `[watchdog]` — global supervisor cadence
```toml
[watchdog]
signal_interval = 30 # seconds between light checks (handoff / stall / limit)
heavy_interval = 300 # seconds between heal + phase-advance checks
limit_probe_fallback = 300 # re-probe cadence for a usage-limited agent when reset time is unparsable
limit_reset_slack = 45 # seconds to wait past a parsed reset before probing
stall_grace = 180 # seconds of slack past a WAITING-UNTIL marker before a stall reboot
```
### `[defaults]` — inherited by every agent
```toml
[defaults]
session_prefix = "myproj-" # REQUIRED: tmux namespace for this project. No implicit default.
log_dir = ".ao-state" # REQUIRED: logs + state/. Relative paths resolve against the config dir.
backend = "claude"
model = "claude-sonnet-4-6"
dir = "." # default working dir for agents (relative → project dir)
watch = "heal" # none | heal | heal+stall
project_dir = "." # OPTIONAL: project root for resolving prompts/paths (default: config's dir)
```
`session_prefix` and `log_dir` are **required** — the harness has no project-specific fallbacks.
Every relative path (`log_dir`, an agent's `dir`, `handoff.repo`, prompt/template files) resolves
against `project_dir`, which defaults to the directory holding the config file. When the config
lives in a sandbox but the prompts live elsewhere (as `smoke.sh` does), set `project_dir`
explicitly.
### `[backend.<name>]` — backends declared as data
A backend is fully described by config — no code change to add one. The one field that selects
behavior is `prompt_delivery`:
| `prompt_delivery` | how the kickoff reaches the agent | example |
|---|---|---|
| `"arg"` | passed as a CLI argument (claude-style) | `claude … "$(cat kickoff)"` |
| `"ping"` | typed in after a TUI connects (opencode-style) | attach, wait, send-keys |
| `"exec"` | a plain command; the prompt is written to a file | generic / demo |
```toml
[backend.claude]
bin = "claude"
flags = "--dangerously-skip-permissions"
remote_control = true # add a --remote-control <session> flag
supports_resume = true # honor an agent's resume=true
prompt_delivery = "arg"
process_name = "claude" # the pane process a healthy session runs (backend-mismatch healing)
submit_key = "Enter" # key to submit a typed message
stall_idle = 300 # seconds idle before a heal+stall agent is rebooted
active_re = "esc to interrupt|Running tool|· \\d+" # pane shows the agent is WORKING
limit_re = "usage limit|limit reached|reached your .*limit" # usage/rate-limit banner
fatal_re = "redacted_thinking|cannot be modified" # unrecoverable session state → kill + restart
[backend.opencode] # a TUI backend
bin = "opencode"
attach = "{bin} attach {server} --dir {dir}"
server = "http://127.0.0.1:4096"
prompt_delivery = "ping"
process_name = "opencode"
footer_ui = true # a static footer lingers after a turn → only the bottom = activity
log_grace = 180 # within this many seconds of a log write, treat as active
connect_delay = 12 # seconds to wait for the TUI before typing
submit_key = "C-m"
model_env = true # pass the model via OPENCODE_CONFIG_CONTENT
preamble = "set -a; . ./.env; set +a" # shell run before launch (e.g. load creds)
active_re = "esc interrupt|thinking|running tool|preparing patch"
limit_re = "usage limit|limit reached"
[backend.demo] # a dependency-free backend for testing the harness mechanics
bin = "echo '[demo] {session} up'; exec sleep 1000000"
prompt_delivery = "exec" # {kickoff}=prompt file, {session}=session name, {model}=model
```
For an `"arg"` backend the flag *templates* are configurable (so you can point at a non-claude
CLI): `resume_flag` (default `--resume '{id}'`), `model_flag` (default `--model '{model}'`),
`remote_control_flag` (default `--remote-control '{session}'`). A backend that sets `process_name`
participates in backend-mismatch healing; one that doesn't (e.g. `demo`) never does.
### `[[agent]]` — one block per agent
```toml
[[agent]]
name = "builder" # tmux session defaults to <session_prefix><name>; override with session=
kind = "loop" # loop | persistent | task
backend = "claude" # overrides defaults.backend
model = "claude-opus-4-8" # overrides defaults.model
dir = "." # working dir (relative → project dir)
role = "builder" # loop agents only: role prompt = <roles_dir>/<role>.md
resume = true # (arg backends with supports_resume) --resume <state/<name>.id>
watch = "heal+stall" # none | heal | heal+stall
enabled = true # false = not started by a bare `up`, not supervised
wake = { interval = 3600, prompt_file = "prompts/supervise.md" } # periodic nudge
prompt = """inline startup text""" # persistent/task agents; OR prompt_file = "path.md"
log_signature = "PROJECT PHASE" # optional: disambiguate agents that share a dir (agent-log.py)
```
| kind | prompt source | typical `watch` |
|---|---|---|
| `loop` | auto-built: kickoff template + `prompts/<role>.md` | `heal+stall` |
| `persistent` | `prompt` / `prompt_file` (+ optional `resume`, `wake`) | `heal` |
| `task` | `prompt` (runs once, then idles) | `none`, `enabled=false` |
**`watch` policy:**
| value | behavior |
|---|---|
| `none` | ignored by the watchdog entirely |
| `heal` | restart if the session is dead, FATAL-wedged, or running the wrong backend; pause all healing while inside a usage-limit window; **never** reboot just for being idle |
| `heal+stall` | everything in `heal`, **plus** reboot if idle past `stall_idle` — respecting any `WAITING-UNTIL: <ISO-8601>` self-wake marker the agent prints as its last line |
### `[[service]]` — non-AI helper processes
```toml
[[service]]
name = "cleanlogs"
command = "python3 agent-log.py follow-all"
```
Started by a bare `up`, killed by `down`. Just a supervised command in a tmux session.
### `[loop]` — the phase state machine (governs `kind="loop"` agents)
```toml
[loop]
state_file = "phase-idx" # under <log_dir>/state/
resume_phase = true # keep the phase index across restarts (don't reset to 0)
auto_advance = true # advance when the current phase's status file says done_marker
done_marker = "## DONE"
kickoff_template = "prompts/kickoff.md" # project preamble; slots {phase_id}/{plan}/{status}/{role}
roles_dir = "prompts" # role prompt = <roles_dir>/<role>.md
handoff = { repo = ".", claim_pings = "adversary", review_pings = "builder",
inboxes = ["ADVERSARY-INBOX.md", "BUILDER-INBOX.md"],
claim_pattern = "^claim", review_pattern = "^review", state_subdir = "machine-docs" }
on_complete = { trigger_file = ".run-on-complete", run = "reporter" } # run task agent on completion
phases = [
{ id = "p1", plan = "plans/p1.md", status = "STATUS-p1.md" },
{ id = "p2", plan = "plans/p2.md", status = "STATUS-p2.md", models = { builder = "claude-opus-4-8" } },
]
```
- **Kickoff template.** A loop agent's prompt is `kickoff_template` (with `{phase_id}`, `{plan}`,
`{status}`, `{role}` substituted from the current phase) followed by `<roles_dir>/<role>.md`.
Both are project files; this repo ships generic starters in `prompts/`. There is no built-in
preamble text.
- **Per-phase model override.** A phase's `models = { builder = "...", adversary = "..." }`
overrides those agents' model for just that phase (matched on the agent's `role`).
- **Auto-advance.** Each heavy tick, if the current phase's `status` file (looked up in
`handoff.repo`'s `state_subdir/` then its root) contains a real `done_marker` — not a "Not
yet…" placeholder — the watchdog stops the loops, bumps the phase index, and restarts them on
the next phase. After the last phase it writes a `SEQUENCE-COMPLETE` marker under `log_dir` and
stops the loops (idempotent — no churn). Appending a phase later clears the stale marker and
resumes. On completion, an optional `on_complete.run` task agent fires if its `trigger_file`
exists under `log_dir`.
- **Handoff signalling.** The watchdog watches `handoff.repo`'s `origin/main` for commits whose
subject matches `claim_pattern` / `review_pattern`, and watches the two `inboxes` files. When a
claim lands it pings the `claim_pings` agent; a review pings `review_pings`; an inbox change
pings the relevant side. This is how the Builder and Adversary coordinate purely through git.
---
## Config vs state
- **Config** = `agents.toml` — declarative, version-controlled, the only source of truth.
- **State** = `<log_dir>/state/` — machine-written runtime only: `phase-idx` (current phase),
`<name>.id` (resume id), `limited-<session>.json` (active usage-limit window),
`kickoff-<session>.txt` (the exact prompt last sent). Git-ignore your `log_dir`.
- **Env** = a one-off override for a *single* invocation only: `AGENT_MODEL_<name>=…` /
`AGENT_BACKEND_<name>=…`. The persisted watchdog ignores env and re-reads the file every tick —
deliberately, so env-vs-file drift can never silently revert a backend.
---
## The driver: verbs
The recommended (not required) verb set — an AI project-orchestrator can rely on these being
present, but a harness is free to add more:
```
agents.py up [name…] start enabled agents (+ services + watchdog); use-or-create
agents.py down [name…] stop agents/services/watchdog (all, or named)
agents.py status table of every agent: kind, backend, model, watch, state, phase
agents.py watchdog the supervisor loop (what the <prefix>watchdog session runs)
agents.py logs <name> tail that session's log
agents.py phase [show|next|set N] inspect / move the loop phase index
agents.py selftest regression-test the backend activity detector (needs no config)
agents.py init [dir] scaffold a starter agents.toml + prompts/ in a project dir
--config PATH use a specific config (default: ./agents.toml)
```
### The watchdog tick
`agents.py watchdog` runs as the `<prefix>watchdog` tmux session and **re-reads the config every
tick**. Each loop:
- **signal tick** (`signal_interval`): handoff pings; for each watched agent the usage-limit check,
and for `heal+stall` agents the stall check; fire any due `wake`.
- **heavy tick** (`heavy_interval`): advance the loop phase if the current one is done; otherwise
heal each watched agent per its `watch` policy. When the sequence is complete the finished loops
stay stopped, but persistent agents stay supervised.
**Usage-limit handling:** when an agent prints a limit banner, the watchdog parses the reset time,
arms a quiet window (never rebooting a limited agent), and at the end sends one probe to resume it
— re-arming if the banner re-prints.
---
## Driving the harness from an AI project-orchestrator
This harness is designed to be driven by an AI "project-orchestrator" (PO) that creates and runs
many projects, each pinning its own copy of this engine. The contract is intentionally **not
rigid** — the PO reads these docs and works out how to drive a project. What it can rely on:
1. **One config, one driver.** Everything the PO needs to know about a project's agents is in that
project's `agents.toml`; everything it can *do* is a verb above. To inspect, `status`. To start
or stop, `up` / `down`. To move the phase, `phase`.
2. **Isolation by `session_prefix`.** Two projects never collide as long as their `session_prefix`
differ. The PO assigns each project a unique prefix at creation.
3. **State is on disk, not in the PO.** Phase index, resume ids and limit windows live under the
project's `log_dir`. The PO can restart a project (or the whole host) and the watchdog resumes
from there.
4. **Knowledge is one-directional.** A project repo contains nothing about the PO or the fleet —
it can be run by hand and would have no idea a PO exists. The PO's fleet registry is the only
record of which projects exist and at what engine ref. This repo never reaches "up" toward a PO.
5. **Submodule pin = the engine version.** A project pins this repo at a tag (e.g. `v0.1.0`) as a
submodule under `engine/`. Bumping is per-project and opt-in (`git submodule update --remote`);
one project's bump can't break another.
A minimal project layout the PO scaffolds:
```
my-project/ # its own repo; knows nothing about the PO
agents.toml # harness config (this schema)
engine/ # this repo as a pinned submodule
prompts/ # role prompts + kickoff template
machine-docs/ # the loop pair's coordination files (STATUS/REVIEW/inboxes)
.ao-state/ # runtime state + logs (gitignored)
.env # project creds (never in git)
```
Run it by hand with `engine/agents.py up --config agents.toml`.
---
## Nix
A `flake.nix` provides a reproducible devShell with the runtime deps (`python311` for stdlib
`tomllib`, plus `tmux` and `git`):
```bash
nix develop # enter the shell
nix develop -c python3 agents.py selftest # or run one command in it
nix flake check # evaluate + build the devShell
```
The agent CLIs themselves (`claude`, `opencode`) are **external, non-Nix tools** — install them
per their own docs and make sure they are on `PATH` before launching live agents. The devShell
documents this in its banner.
---
## Adding things
- **Add an agent** — add an `[[agent]]` block; `agents.py up <name>`. No code change.
- **Add a backend** — add a `[backend.<name>]` block (`bin`, `prompt_delivery`, the regexes);
point an agent at it with `backend = "<name>"`.
- **Add / append a phase** — add an entry to `[loop].phases`; the watchdog advances into it
automatically (clearing a stale `SEQUENCE-COMPLETE` if the sequence had finished).
- **Change a model or backend** — edit the field (or a phase's `models = {}`), then
`agents.py down <name> && agents.py up <name>`. The watchdog re-reads the file; it won't fight you.

221
agent-log.py Executable file
View File

@ -0,0 +1,221 @@
#!/usr/bin/env python3
"""Clean, greppable transcript logs for agent-orchestrator agents (claude backend).
The claude CLI writes a structured JSONL transcript of every session under
~/.claude/projects/<cwd-slug>/<session-uuid>.jsonl. This renders that into a readable,
greppable one-event-per-line log — WITHOUT touching the agent (read-only on a file it writes
anyway, so no slowdown and zero extra tokens). The raw `tmux pipe-pane` logs are TUI-escape
soup; use these instead.
Agents are discovered from the harness config (the same agents.toml the driver reads), so there
is nothing project-specific in this file. An agent's transcript directory is derived from its
working `dir`; when several agents share a dir, give them a `log_signature = "..."` (a substring
of their kickoff) to disambiguate.
Usage:
agent-log.py [--config PATH] render <agent> print the clean transcript to stdout
agent-log.py [--config PATH] tail <agent> [N] print the last N (default 40) rendered events
agent-log.py [--config PATH] follow-all keep <agent>.clean.log current for every agent
Output logs (follow-all): <log_dir>/<agent>.clean.log
Each line: `HH:MM:SS [kind] ...` — kinds: user, asst, tool:<Name>, result, think (skipped by
default). Long text/results are truncated (full detail stays in the JSONL); newlines → ⏎.
"""
import json, os, re, sys, time, tomllib
from pathlib import Path
PROJ = os.environ.get("CLAUDE_PROJECTS", os.path.expanduser("~/.claude/projects"))
MAXLEN = 800 # truncate any single rendered block to this many chars
def _cfg_path(argv):
if "--config" in argv:
return Path(argv[argv.index("--config") + 1])
cwd_cfg = Path.cwd() / "agents.toml"
return cwd_cfg if cwd_cfg.exists() else Path(__file__).resolve().parent / "agents.toml"
def load_agents(cfg_path):
"""Return {agent_name: {"slug": <claude proj dir slug>, "sig": <optional signature>}} and the
log_dir, derived from the harness config."""
with open(cfg_path, "rb") as f:
raw = tomllib.load(f)
defaults = raw.get("defaults", {})
base = Path(cfg_path).resolve().parent
proj_dir = (base / defaults.get("project_dir", ".")).resolve()
log_dir = os.path.join(str((proj_dir / defaults.get("log_dir", ".ao-state"))), "")
agents = {}
for a in raw.get("agent", []):
d = a.get("dir", defaults.get("dir", "."))
dp = Path(os.path.expanduser(d))
dp = dp if dp.is_absolute() else (proj_dir / dp)
slug = re.sub(r"[^a-zA-Z0-9]", "-", str(dp.resolve()))
agents[a["name"]] = {"slug": slug, "sig": a.get("log_signature")}
return agents, str((proj_dir / defaults.get("log_dir", ".ao-state")))
def _first_user_text(path, limit=4000):
try:
with open(path) as f:
for _ in range(60):
ln = f.readline()
if not ln:
break
try:
o = json.loads(ln)
except Exception:
continue
if o.get("type") == "user":
c = (o.get("message") or {}).get("content")
if isinstance(c, str):
return c[:limit]
if isinstance(c, list):
return " ".join(b.get("text", "") for b in c
if isinstance(b, dict) and b.get("type") == "text")[:limit]
except FileNotFoundError:
return ""
return ""
def active_jsonl(meta):
"""The agent's current transcript: newest *.jsonl in its slug dir (optionally filtered by the
kickoff signature, to disambiguate agents that share a working dir)."""
d = os.path.join(PROJ, meta["slug"])
try:
files = [os.path.join(d, f) for f in os.listdir(d) if f.endswith(".jsonl")]
except FileNotFoundError:
return None
files.sort(key=lambda p: os.path.getmtime(p), reverse=True)
for p in files:
if not meta["sig"] or meta["sig"] in _first_user_text(p):
return p
return None
def _clip(s):
s = " ".join(str(s).split()) if s else ""
return s if len(s) <= MAXLEN else s[:MAXLEN] + " …[+%d]" % (len(s) - MAXLEN)
def render_line(o, show_think=False):
t = o.get("type")
if t not in ("user", "assistant"):
return []
ts = (o.get("timestamp") or "")[11:19] or "--:--:--"
m = o.get("message") or {}
c = m.get("content")
out = []
if isinstance(c, str):
if c.strip():
out.append(f"{ts} [user] {_clip(c)}")
return out
if not isinstance(c, list):
return out
for b in c:
if not isinstance(b, dict):
continue
bt = b.get("type")
if bt == "text":
txt = b.get("text", "")
if txt.strip():
out.append(f"{ts} [{'asst' if t=='assistant' else 'user'}] {_clip(txt)}")
elif bt == "thinking":
if show_think:
out.append(f"{ts} [think] {_clip(b.get('thinking',''))}")
elif bt == "tool_use":
inp = b.get("input") or {}
brief = (inp.get("command") or inp.get("file_path") or inp.get("path")
or inp.get("prompt") or json.dumps(inp)[:200])
out.append(f"{ts} [tool:{b.get('name')}] {_clip(brief)}")
elif bt == "tool_result":
rc = b.get("content")
if isinstance(rc, list):
rc = " ".join(x.get("text", "") for x in rc if isinstance(x, dict))
out.append(f"{ts} [result] {_clip(rc)}")
return out
def render_file(path, show_think=False):
lines = []
with open(path) as f:
for ln in f:
try:
o = json.loads(ln)
except Exception:
continue
lines += render_line(o, show_think)
return lines
def cmd_render(agents, agent, show_think=False):
p = active_jsonl(agents[agent])
if not p:
print(f"(no transcript found for {agent})", file=sys.stderr); sys.exit(1)
print(f"# {agent}{p}")
for l in render_file(p, show_think):
print(l)
def cmd_tail(agents, agent, n=40):
p = active_jsonl(agents[agent])
if not p:
print(f"(no transcript found for {agent})", file=sys.stderr); sys.exit(1)
for l in render_file(p)[-n:]:
print(l)
def cmd_follow_all(agents, log_dir):
os.makedirs(log_dir, exist_ok=True)
state = {} # agent -> (path, byte_offset)
while True:
for agent, meta in agents.items():
p = active_jsonl(meta)
if not p:
continue
prev_path, off = state.get(agent, (None, 0))
if p != prev_path:
off = 0
try:
size = os.path.getsize(p)
if off > size:
off = 0
with open(p) as f:
f.seek(off)
chunk = f.read()
new_off = f.tell()
except FileNotFoundError:
continue
if chunk:
rendered = []
for ln in chunk.splitlines():
try:
o = json.loads(ln)
except Exception:
continue
rendered += render_line(o)
if rendered:
with open(os.path.join(log_dir, f"{agent}.clean.log"), "a") as out:
out.write("\n".join(rendered) + "\n")
state[agent] = (p, new_off)
time.sleep(5)
def main():
argv = sys.argv[1:]
cfg_path = _cfg_path(argv)
argv = [a for i, a in enumerate(argv)
if a != "--config" and (i == 0 or argv[i-1] != "--config")]
agents, log_dir = load_agents(cfg_path)
cmd = argv[0] if argv else "follow-all"
if cmd == "render":
cmd_render(agents, argv[1], "--think" in argv)
elif cmd == "tail":
cmd_tail(agents, argv[1], int(argv[2]) if len(argv) > 2 and argv[2].isdigit() else 40)
elif cmd == "follow-all":
cmd_follow_all(agents, log_dir)
else:
print(__doc__); sys.exit(2)
if __name__ == "__main__":
main()

109
agents.example.toml Normal file
View File

@ -0,0 +1,109 @@
# agent-orchestrator — example project config.
#
# One file declares: which agents exist, their backend, model, prompt, kind, and how the
# watchdog supervises them. Read by agents.py (driver + watchdog). Runtime state (phase index,
# resume ids, limit windows) lives under <log_dir>/state/, NOT here.
#
# This example is self-contained: its agents use a dependency-free `demo` backend (a shell that
# just idles), so the whole project can be brought up and torn down with no external agent CLI —
# see ./smoke.sh. The `claude` and `opencode` backends below are the real ones; point an agent at
# them with `backend = "claude"` for a live run.
# ─────────────────────────── global watchdog cadence ───────────────────────────
[watchdog]
signal_interval = 30 # s between handoff / stall / limit checks (light)
heavy_interval = 300 # s between heal / phase-advance checks
limit_probe_fallback = 300 # flat probe cadence when a reset time can't be parsed
limit_reset_slack = 45 # s past a parsed reset before probing
stall_grace = 180 # s of slack past a WAITING-UNTIL marker before a stall reboot
# ─────────────────────────── defaults inherited by every agent ───────────────────────────
[defaults]
session_prefix = "ao-example-" # REQUIRED — tmux namespace for this project (no implicit default)
log_dir = ".ao-state" # REQUIRED — logs + state/, resolved relative to this file
backend = "demo"
model = ""
watch = "none" # none | heal | heal+stall
# ─────────────────────────── backends (declared as data) ───────────────────────────
# A backend is fully described by config. `prompt_delivery` selects how the kickoff reaches the
# agent: "arg" (CLI argument, claude-style), "ping" (typed in after a TUI connects, opencode),
# or "exec" (a plain command; {kickoff}=prompt file, {session}=session name, {model}=model).
[backend.demo] # dependency-free backend used by this example's smoke run
bin = "echo '[demo] {session} up (kickoff: {kickoff})'; exec sleep 1000000"
prompt_delivery = "exec"
[backend.claude] # the real claude backend
bin = "claude"
flags = "--dangerously-skip-permissions"
remote_control = true
supports_resume = true
prompt_delivery = "arg"
process_name = "claude" # used for backend-mismatch healing
submit_key = "Enter"
stall_idle = 300
active_re = "esc to interrupt|Running tool|⠇|⠙|· \\d+"
limit_re = "spend limit|usage limit|limit reached|reached your .*limit|out of (credits|tokens)"
fatal_re = "redacted_thinking|blocks cannot be modified|cannot be modified"
[backend.opencode] # the real opencode backend (a TUI; prompt typed after connect)
bin = "opencode"
attach = "{bin} attach {server} --dir {dir}"
server = "http://127.0.0.1:4096"
supports_resume = false
prompt_delivery = "ping"
process_name = "opencode"
footer_ui = true # static footer lingers after a turn → only the bottom = activity
log_grace = 180
connect_delay = 12
submit_key = "C-m"
model_env = true # pass the model via OPENCODE_CONFIG_CONTENT
stall_idle = 900
active_re = "esc interrupt|thinking|inferring|running tool|tool call|preparing patch|reading|searching"
limit_re = "spend limit|usage limit|limit reached|reached your .*limit|out of (credits|tokens)"
fatal_re = "redacted_thinking|blocks cannot be modified|cannot be modified"
# ─────────────────────────── agents ───────────────────────────
# A minimal 2-agent loop pair: a Builder that does the work and an Adversary that verifies it.
[[agent]]
name = "builder" # tmux session: ao-example-builder
kind = "loop" # kickoff = kickoff_template + prompts/builder.md, per phase
role = "builder"
[[agent]]
name = "adversary" # tmux session: ao-example-adversary
kind = "loop"
role = "adversary"
# A persistent supervisor and a one-shot task are also supported:
# [[agent]]
# name = "orchestrator"
# kind = "persistent"
# backend = "claude"
# resume = true
# watch = "heal"
# wake = { interval = 3600, prompt_file = "prompts/supervise.md" }
# prompt = "You supervise this project. On startup, check status and report."
# Non-AI helper services (started by a bare `up`, not AI sessions):
# [[service]]
# name = "cleanlogs"
# command = "python3 agent-log.py follow-all"
# ─────────────────────────── the phase machine (kind="loop" agents) ───────────────────────────
[loop]
state_file = "phase-idx" # under <log_dir>/state/
resume_phase = true # keep the current index across restarts (don't reset to 0)
auto_advance = true # advance when the current phase's status file says the done_marker
done_marker = "## DONE"
kickoff_template = "prompts/kickoff.md" # project preamble; slots {phase_id}/{plan}/{status}/{role}
roles_dir = "prompts" # role prompt = <roles_dir>/<role>.md
handoff = { repo = ".", claim_pings = "adversary", review_pings = "builder", inboxes = ["ADVERSARY-INBOX.md", "BUILDER-INBOX.md"], claim_pattern = "^claim", review_pattern = "^review", state_subdir = "machine-docs" }
# on_complete = { trigger_file = ".run-on-complete", run = "reporter" }
phases = [
{ id = "demo1", plan = "examples/PLAN-demo1.md", status = "STATUS-demo1.md" },
{ id = "demo2", plan = "examples/PLAN-demo2.md", status = "STATUS-demo2.md", models = { builder = "claude-opus-4-8" } },
]

929
agents.py Executable file
View File

@ -0,0 +1,929 @@
#!/usr/bin/env python3
"""agent-orchestrator — one driver, one config (agents.toml) for a fleet of agents.
A generic, reusable harness for running and supervising AI-agent sessions in tmux. Every
agent — a Builder/Adversary loop pair, a persistent supervisor, a one-shot task — is declared
in a single TOML config; the watchdog reads the SAME file, so there is no env-vs-file drift.
Nothing about any particular project lives in this code: paths, the loop kickoff preamble, the
handoff conventions, and the on-complete hook are all supplied by the project's config.
Usage:
agents.py up [name...] start enabled agents (or just the named ones); use-or-create
agents.py down [name...] stop agents (or all)
agents.py status one table: every agent — kind, backend, model, session, phase
agents.py watchdog the supervisor loop (reads the config every tick)
agents.py logs <name> tail an agent's session log
agents.py phase [set N|next|show] inspect / move the loop phase
agents.py selftest backend activity-detector regression checks (no config needed)
agents.py init [dir] scaffold a starter agents.toml + prompts/ in a project dir
Options:
--config PATH config file (default: ./agents.toml, else <script dir>/agents.toml)
Config is authoritative. A one-off override env AGENT_MODEL_<name> / AGENT_BACKEND_<name>
affects a single invocation only; the persisted watchdog always re-reads the file.
"""
import hashlib, json, os, re, shlex, subprocess, sys, time, tomllib
from datetime import datetime, timedelta
from pathlib import Path
SCRIPT_DIR = Path(__file__).resolve().parent
# ── config loading ──────────────────────────────────────────────────────────────
def _cfg_path(argv):
if "--config" in argv:
return Path(argv[argv.index("--config") + 1])
cwd_cfg = Path.cwd() / "agents.toml"
return cwd_cfg if cwd_cfg.exists() else SCRIPT_DIR / "agents.toml"
def _resolve(base, p):
"""Resolve a possibly-relative config path against the project root."""
pp = Path(os.path.expanduser(str(p)))
return pp if pp.is_absolute() else (Path(base) / pp)
def load_config(path):
path = Path(path)
with open(path, "rb") as f:
raw = tomllib.load(f)
defaults = raw.get("defaults", {})
# The project root: everything project-supplied (prompts, templates, relative paths) is
# resolved against it. Defaults to the directory holding the config file; override with
# defaults.project_dir (useful when the config lives in a sandbox but prompts live elsewhere).
project_dir = _resolve(path.resolve().parent, defaults.get("project_dir", ".")).resolve()
session_prefix = defaults.get("session_prefix")
if not session_prefix:
die("config error: [defaults].session_prefix is required (e.g. session_prefix = \"myproj-\")")
log_dir_raw = defaults.get("log_dir")
if not log_dir_raw:
die("config error: [defaults].log_dir is required")
cfg = {
"watchdog": raw.get("watchdog", {}),
"backends": raw.get("backend", {}),
"defaults": defaults,
"loop": raw.get("loop", {}),
"project_dir": str(project_dir),
"log_dir": str(_resolve(project_dir, log_dir_raw)),
"session_prefix": session_prefix,
}
agents = {}
for a in raw.get("agent", []):
m = {**defaults, **a}
m["session"] = a.get("session", session_prefix + a["name"])
m["kind"] = a.get("kind", "persistent")
m["dir"] = str(_resolve(project_dir, a.get("dir", defaults.get("dir", "."))))
# one-off env override (single invocation; watchdog ignores via fresh load each tick)
env_model = os.environ.get(f"AGENT_MODEL_{a['name']}")
env_backend = os.environ.get(f"AGENT_BACKEND_{a['name']}")
if env_model: m["model"] = env_model
if env_backend: m["backend"] = env_backend
agents[a["name"]] = m
cfg["agents"] = agents
cfg["services"] = {s["name"]: {**defaults, **s,
"session": session_prefix + s["name"],
"dir": str(_resolve(project_dir, s.get("dir", ".")))}
for s in raw.get("service", [])}
cfg["state_dir"] = os.path.join(cfg["log_dir"], "state")
Path(cfg["state_dir"]).mkdir(parents=True, exist_ok=True)
return cfg
def backend_of(cfg, agent):
b = cfg["backends"].get(agent["backend"])
if not b:
die(f"agent {agent['name']}: unknown backend {agent['backend']!r}")
return b
# ── logging ───────────────────────────────────────────────────────────────────
def log(msg):
print(f"[agents {datetime.now():%H:%M:%S}] {msg}", flush=True)
def die(msg):
log(f"ERROR: {msg}")
sys.exit(1)
# ── tmux helpers ────────────────────────────────────────────────────────────────
# ALWAYS target sessions with an exact-match "=" prefix. tmux does prefix/fnmatch on bare
# targets, so "-t myproj-assistant" would match "myproj-assistant3" — capturing or killing the
# wrong session. "=name" forces an exact match.
def _run(cmd):
"""subprocess.run wrapper that never raises if the binary (e.g. tmux) is absent."""
try:
return subprocess.run(cmd, capture_output=True, text=True)
except FileNotFoundError:
return subprocess.CompletedProcess(cmd, 127, "", "")
def TS(name): # exact target-SESSION (has-session, kill-session)
return "=" + name
def TP(name): # exact target-PANE: "=session:" anchors the exact session, current window/pane
return "=" + name + ":"
def session_alive(name):
return _run(["tmux", "has-session", "-t", TS(name)]).returncode == 0
def session_command(name):
r = _run(["tmux", "display-message", "-p", "-t", TP(name), "#{pane_current_command}"])
return r.stdout.strip() if r.returncode == 0 else ""
def kill_session(name):
_run(["tmux", "kill-session", "-t", TS(name)])
def capture_pane(name, lines=40):
r = _run(["tmux", "capture-pane", "-p", "-t", TP(name)])
return "\n".join(r.stdout.splitlines()[-lines:]) if r.returncode == 0 else ""
def pipe_to_log(session, log_path):
_run(["tmux", "pipe-pane", "-o", "-t", TP(session), f"cat >> '{log_path}'"])
def new_session(session, cwd, cmd, log_path):
Path(cwd).mkdir(parents=True, exist_ok=True)
_run(["tmux", "new-session", "-d", "-s", session, "-c", cwd, cmd])
pipe_to_log(session, log_path)
def ping_session(session, msg, submit_key="Enter"):
"""Type a message into a session and submit it; retry submit until the prefix clears."""
if not session_alive(session):
return
prefix = msg[:28]
_run(["tmux", "send-keys", "-t", TP(session), "-l", "--", msg])
time.sleep(0.5)
for _ in range(10):
_run(["tmux", "send-keys", "-t", TP(session), submit_key])
time.sleep(1)
if prefix not in capture_pane(session, 20):
return
# ── activity / limit / fatal detection (per-backend regexes from config) ─────────
def _re(backend, key):
pat = backend.get(key)
return re.compile(pat, re.I) if pat else None
def _session_log_path(cfg, session):
return Path(cfg["log_dir"]) / f"{session}.log"
def _log_recently_touched(cfg, session, age_seconds):
try:
return (time.time() - _session_log_path(cfg, session).stat().st_mtime) <= age_seconds
except FileNotFoundError:
return False
def pane_active(cfg, agent, pane, *, use_log=True):
"""True when the pane shows the agent is working. A footer_ui backend (a TUI with a static
footer that lingers after a turn) only counts the bottom rows as activity, and falls back to
a recently-touched session log within a grace window."""
backend = backend_of(cfg, agent)
active = _re(backend, "active_re")
if backend.get("footer_ui"):
bottom = "\n".join(pane.splitlines()[-10:])
hit = bool(active and active.search(bottom))
grace = int(backend.get("log_grace", 180))
return hit or (use_log and _log_recently_touched(cfg, agent["session"], grace))
return bool(active and active.search(pane))
# ── prompt assembly ───────────────────────────────────────────────────────────
DONE_PLACEHOLDER_RE = re.compile(
r"^\s*(not yet|not done|not complete|incomplete|pending\b|tbd\b|n/?a\b|"
r"written here only|only when|to be (written|filled)|when all|<.*>)", re.I)
def phases(cfg): return cfg["loop"].get("phases", [])
def phase_idx_file(cfg): return os.path.join(cfg["state_dir"], cfg["loop"].get("state_file", "phase-idx"))
def cur_idx(cfg):
try:
v = Path(phase_idx_file(cfg)).read_text().strip()
return int(v) if v.lstrip("-").isdigit() else 0
except FileNotFoundError:
return 0
def cur_phase(cfg):
ps = phases(cfg)
return ps[cur_idx(cfg)] if ps else {}
def _state_subdir(cfg):
return (cfg["loop"].get("handoff") or {}).get("state_subdir", "machine-docs")
def handoff_repo(cfg):
h = cfg["loop"].get("handoff") or {}
repo = h.get("repo")
return str(_resolve(cfg["project_dir"], repo)) if repo else cfg["project_dir"]
def resolve_state_file(cfg, repo_dir, basename):
sub = _state_subdir(cfg)
p = Path(repo_dir) / sub / basename
return p if p.exists() else Path(repo_dir) / basename
def phase_done(cfg, status_basename):
repo = handoff_repo(cfg)
try:
lines = resolve_state_file(cfg, repo, status_basename).read_text().splitlines()
except FileNotFoundError:
return False
marker = cfg["loop"].get("done_marker", "## DONE")
for i, line in enumerate(lines):
if not line.startswith(marker):
continue
body = next((nxt for nxt in lines[i+1:] if nxt.strip()), "")
if DONE_PLACEHOLDER_RE.match(body):
continue
return True
return False
def role_model(cfg, agent):
"""Per-phase override (phases[idx].models[role]) wins, else the agent's configured model."""
role = agent.get("role")
if role:
ov = (cur_phase(cfg).get("models") or {}).get(role)
if ov:
return ov
return agent.get("model", "")
def _render_template(text, fields):
for k, v in fields.items():
text = text.replace("{" + k + "}", str(v))
return text
def build_loop_kickoff(cfg, agent):
"""A loop agent's kickoff = the project's kickoff template (slots filled from the current
phase) followed by the role prompt prompts/<role>.md. Both files are project-supplied; this
code holds no project text."""
ph = cur_phase(cfg)
fields = {
"phase_id": ph.get("id", ""),
"plan": ph.get("plan", ""),
"status": ph.get("status", ""),
"role": agent.get("role", ""),
}
pdir = Path(cfg["project_dir"])
preamble = ""
tmpl = cfg["loop"].get("kickoff_template")
if tmpl:
preamble = _render_template(_resolve(pdir, tmpl).read_text(), fields)
roles_dir = cfg["loop"].get("roles_dir", "prompts")
role_prompt = (_resolve(pdir, roles_dir) / f"{agent['role']}.md").read_text()
return preamble + role_prompt
def agent_prompt(cfg, agent):
pdir = Path(cfg["project_dir"])
if agent["kind"] == "loop":
return build_loop_kickoff(cfg, agent)
if agent.get("prompt_file"):
return _resolve(pdir, agent["prompt_file"]).read_text()
return agent.get("prompt", "")
# ── resume id ───────────────────────────────────────────────────────────────────
def resume_id(cfg, agent):
f = Path(cfg["state_dir"]) / f"{agent['name']}.id"
if f.exists():
v = f.read_text().strip()
if v:
return v
return None
# ── agent launch ────────────────────────────────────────────────────────────────
def _expected_proc(backend):
"""The process name a healthy session should be running; used for backend-mismatch healing.
Only backends that declare process_name participate (so a generic exec backend, or the
transient login shell during startup, is never mistaken for a mismatch)."""
return backend.get("process_name")
def start_agent(cfg, agent, *, force=False):
session = agent["session"]
if session_alive(session):
if not force:
log(f"{session} already running — leaving it")
return
kill_session(session)
backend = backend_of(cfg, agent)
model = role_model(cfg, agent)
prompt = agent_prompt(cfg, agent)
log_path = str(_session_log_path(cfg, session))
kf = Path(cfg["state_dir"]) / f"kickoff-{session}.txt"
kf.write_text(prompt)
cwd = agent.get("dir") or cfg["project_dir"]
pid = cur_phase(cfg).get("id", "-") if agent["kind"] == "loop" else "-"
delivery = backend.get("prompt_delivery", "arg")
if delivery == "ping":
# TUI backend: launch, wait for it to connect, then type the prompt in.
model_env = (f"OPENCODE_CONFIG_CONTENT={shlex.quote(json.dumps({'model': model}))} "
if model and backend.get("model_env") else "")
preamble = backend.get("preamble", "")
sep = "; " if preamble else ""
attach = _render_template(backend.get("attach", "{bin}"),
{"bin": backend["bin"], "server": backend.get("server", ""),
"dir": shlex.quote(cwd)})
cmd = f"{preamble}{sep}{model_env}{attach}"
log(f"starting {session} ({agent['backend']}, kind={agent['kind']}, phase={pid}, "
f"model={model or 'default'})")
new_session(session, cwd, cmd, log_path)
time.sleep(int(backend.get("connect_delay", 12)))
boot = (f"Your full kickoff prompt is in {kf} — read it now with: "
f"`cat '{kf}'` — then follow it exactly.")
ping_session(session, boot, submit_key=backend.get("submit_key", "C-m"))
elif delivery == "exec":
# Generic backend: run an arbitrary command. {kickoff} = path to the prompt file,
# {session} = the tmux session name, {model} = resolved model.
cmd = _render_template(backend["bin"],
{"kickoff": str(kf), "session": session, "model": model})
log(f"starting {session} ({agent['backend']}, kind={agent['kind']}, phase={pid})")
new_session(session, cwd, cmd, log_path)
else: # "arg": prompt passed as a CLI argument (claude-style)
rid = resume_id(cfg, agent) if agent.get("resume") and backend.get("supports_resume") else None
parts = [backend["bin"]]
if rid:
parts.append(_render_template(backend.get("resume_flag", "--resume '{id}'"), {"id": rid}))
if backend.get("remote_control"):
parts.append(_render_template(backend.get("remote_control_flag",
"--remote-control '{session}'"), {"session": session}))
if model:
parts.append(_render_template(backend.get("model_flag", "--model '{model}'"), {"model": model}))
if backend.get("flags"):
parts.append(backend["flags"])
parts.append(f"\"$(cat '{kf}')\"")
cmd = " ".join(p for p in parts if p)
log(f"starting {session} ({agent['backend']}, kind={agent['kind']}, phase={pid}, "
f"model={model or 'default'}{', resume' if rid else ''})")
new_session(session, cwd, cmd, log_path)
def start_service(cfg, svc):
session = svc["session"]
if session_alive(session):
log(f"{session} already running — leaving it")
return
log(f"starting service {session}")
new_session(session, svc.get("dir", cfg["project_dir"]), svc["command"],
str(_session_log_path(cfg, session)))
# ── usage-limit state machine ────────────────────────────────────────────────────
RESET_RE = re.compile(r"resets?\s*(?:at\s*)?(\d{1,2})(?::(\d{2}))?\s*(am|pm)?", re.I)
def _limit_state_path(cfg, session): return Path(cfg["state_dir"]) / f"limited-{session}.json"
def _load_limit_state(cfg, session):
try: return json.loads(_limit_state_path(cfg, session).read_text())
except Exception: return None
def _save_limit_state(cfg, session, st): _limit_state_path(cfg, session).write_text(json.dumps(st))
def _clear_limit_state(cfg, session):
try: _limit_state_path(cfg, session).unlink()
except FileNotFoundError: pass
def _parse_reset_epoch(pane):
matches = list(RESET_RE.finditer(pane))
if not matches:
return None
m = matches[-1]
try:
hour, minute = int(m.group(1)), int(m.group(2) or 0)
ampm = (m.group(3) or "").lower()
if ampm == "pm" and hour != 12: hour += 12
elif ampm == "am" and hour == 12: hour = 0
if hour > 23 or minute > 59: return None
cand = datetime.now().replace(hour=hour, minute=minute, second=0, microsecond=0)
if cand.timestamp() <= time.time(): cand += timedelta(days=1)
return cand.timestamp()
except Exception:
return None
def _next_limit_until(cfg, pane, now):
fallback = int(cfg["watchdog"].get("limit_probe_fallback", 300))
slack = int(cfg["watchdog"].get("limit_reset_slack", 45))
parsed = _parse_reset_epoch(pane)
if parsed is not None and parsed - now <= 6 * 3600:
return parsed + slack, True
return now + fallback, False
def _limit_nudge_msg(kind):
if kind in ("persistent", "orchestrator"):
return ("watchdog probe: if the quota window has reset, RESUME now — re-check status "
"and continue from where you stopped.")
return ("watchdog probe: if the quota window has reset, RESUME your loop now — pull latest, "
"re-read your phase STATUS/REVIEW files, and continue; re-arm your loop pacing.")
def limit_tick(cfg, agent, pane):
"""True while the agent is inside a usage-limit window — callers suppress all healing."""
session = agent["session"]
backend = backend_of(cfg, agent)
limit_re = _re(backend, "limit_re")
submit = backend.get("submit_key", "Enter")
fallback = int(cfg["watchdog"].get("limit_probe_fallback", 300))
state = _load_limit_state(cfg, session)
limited_now = bool(limit_re and limit_re.search(pane))
if state is None:
if not limited_now or pane_active(cfg, agent, pane, use_log=False):
return False
now = time.time()
until, parsed = _next_limit_until(cfg, pane, now)
if parsed:
log(f"limit hit on {agent['name']} — banner says reset "
f"{datetime.fromtimestamp(until):%a %H:%M}; holding (no reboots)")
else:
log(f"limit hit on {agent['name']} — reset unparsable; flat "
f"{fallback//60}-min probe loop (no reboots)")
_save_limit_state(cfg, session, {"until": until, "nudges": 0})
return True
if pane_active(cfg, agent, pane, use_log=False) or not limited_now:
log(f"limit lifted on {agent['name']} — clearing limit state")
_clear_limit_state(cfg, session)
return False
now = time.time()
if now < state.get("until", 0):
return True
msg = _limit_nudge_msg(agent["kind"])
if msg[:28] in "\n".join(pane.splitlines()[-8:]):
return True
nudges = state.get("nudges", 0) + 1
log(f"limit probe #{nudges} on {agent['name']} — nudging to resume")
ping_session(session, msg, submit_key=submit)
time.sleep(3)
pane2 = capture_pane(session, 40)
if pane_active(cfg, agent, pane2, use_log=False) and not (limit_re and limit_re.search(pane2)):
log(f"limit lifted on {agent['name']} — probe resumed it")
_clear_limit_state(cfg, session)
return True
until, _ = _next_limit_until(cfg, pane2, now)
if nudges == 3:
log(f"WARNING: {agent['name']} still limited after {nudges} probes — flat probes; never rebooting")
_save_limit_state(cfg, session, {"until": until, "nudges": nudges})
return True
# ── stall detection ──────────────────────────────────────────────────────────────
_idle_since: dict[str, float] = {}
def _last_nonempty_line(text):
for line in reversed(text.splitlines()):
if line.strip():
return line.strip()
return ""
def _parse_waiting_until(cfg, agent, pane):
if backend_of(cfg, agent).get("footer_ui"):
line = _last_nonempty_line(pane)
if not line.startswith("WAITING-UNTIL:"):
return None
m = re.search(r"WAITING-UNTIL:\s*(\S+)", line)
else:
m = re.search(r"WAITING-UNTIL:\s*(\S+)", pane)
if not m:
return None
try:
return datetime.fromisoformat(m.group(1).replace("Z", "+00:00")).timestamp()
except Exception:
return None
def stall_check_one(cfg, agent):
session = agent["session"]
if not session_alive(session):
_idle_since[session] = 0.0
_clear_limit_state(cfg, session)
return
now = time.time()
pane = capture_pane(session, 40)
if limit_tick(cfg, agent, pane):
_idle_since[session] = 0.0
return
if pane_active(cfg, agent, pane):
_idle_since[session] = 0.0
return
since = _idle_since.get(session) or now
_idle_since[session] = since
idle = now - since
grace = int(cfg["watchdog"].get("stall_grace", 180))
until = _parse_waiting_until(cfg, agent, pane)
if until is not None:
if now <= until + grace:
return
reason = f"past its WAITING-UNTIL by {int(now-until)}s — self-wake did not fire"
else:
stall_idle = int(backend_of(cfg, agent).get("stall_idle", 300))
if idle < stall_idle:
return
reason = f"idle {int(idle)}s with no WAITING-UNTIL marker"
log(f"stall: {agent['name']} ({session}) {reason} — kill + reboot")
start_agent(cfg, agent, force=True)
_idle_since[session] = 0.0
# ── healing ──────────────────────────────────────────────────────────────────────
def backend_mismatch(cfg, agent):
expected = _expected_proc(backend_of(cfg, agent))
if not expected:
return False
cmd = session_command(agent["session"])
known = {_expected_proc(b) for b in cfg["backends"].values() if _expected_proc(b)}
# only a definite OTHER declared backend is a mismatch; a transient login shell during
# startup (not a known backend process) is not.
if cmd not in known:
return False
return cmd != expected
def heal_one(cfg, agent):
session = agent["session"]
backend = backend_of(cfg, agent)
if not session_alive(session):
log(f"{agent['name']} ({session}) gone — restarting")
start_agent(cfg, agent)
return
if backend_mismatch(cfg, agent):
log(f"{agent['name']} ({session}) is {session_command(session)!r}, expected "
f"{_expected_proc(backend)} — kill + restart")
start_agent(cfg, agent, force=True)
return
pane = capture_pane(session, 25)
if pane_active(cfg, agent, pane):
return
if limit_tick(cfg, agent, pane):
return
fatal = _re(backend, "fatal_re")
if fatal and fatal.search(pane):
log(f"FATAL session-state error on {agent['name']} ({session}) — kill + restart")
start_agent(cfg, agent, force=True)
# ── wake (persistent agents with a wake schedule) ────────────────────────────────
def wake_agent(cfg, agent):
"""Returns True when the wake landed (or is moot), False to retry next tick."""
wake = agent.get("wake")
if not wake:
return True
session = agent["session"]
if not session_alive(session):
return False
backend = backend_of(cfg, agent)
if pane_active(cfg, agent, capture_pane(session, 25)):
return False
pf = wake.get("prompt_file")
try:
msg = " ".join((_resolve(cfg["project_dir"], pf)).read_text().split())
except (FileNotFoundError, TypeError):
log(f"wake skipped for {agent['name']} — prompt file missing: {pf}")
return True
if not msg:
return True
log(f"waking {agent['name']} ({session}) for scheduled supervision pass")
ping_session(session, msg, submit_key=backend.get("submit_key", "Enter"))
return True
# ── handoff signalling (loop pair) ───────────────────────────────────────────────
_hand = {"sha": "", "adv_inbox": "", "builder_inbox": ""}
def handoff_reset():
_hand["sha"] = _hand["adv_inbox"] = _hand["builder_inbox"] = ""
def _git(repo, args):
return subprocess.run(f"git -C {repo!r} {args}", shell=True, capture_output=True, text=True)
def _show_pushed(cfg, repo, path):
sub = _state_subdir(cfg)
for loc in (f"origin/main:{sub}/{path}", f"origin/main:{path}"):
r = _git(repo, f"show {loc!r}")
if r.returncode == 0:
return r.stdout
return ""
def handoff_check(cfg):
h = cfg["loop"].get("handoff")
if not h:
return
repo = handoff_repo(cfg)
sub = lambda name: cfg["agents"].get(name, {}).get("session", cfg["session_prefix"] + name)
builder_name = h.get("review_pings", "builder")
submit = (backend_of(cfg, cfg["agents"][builder_name]).get("submit_key", "Enter")
if builder_name in cfg["agents"] else "Enter")
claim_pat = h.get("claim_pattern", "^claim")
review_pat = h.get("review_pattern", "^review")
_git(repo, "fetch -q origin")
head = _git(repo, "rev-parse origin/main").stdout.strip()
if head:
if not _hand["sha"]:
_hand["sha"] = head
elif head != _hand["sha"]:
subjects = _git(repo, f"log --format=%s {_hand['sha']}..origin/main").stdout
if re.search(claim_pat, subjects, re.M | re.I):
log("handoff: claim commit → pinging reviewer")
ping_session(sub(h.get("claim_pings", "adversary")),
"watchdog ping: the other loop pushed a gate CLAIM commit. "
"Pull and verify the claimed gate now.", submit_key=submit)
if re.search(review_pat, subjects, re.M | re.I):
log("handoff: review commit → pinging builder")
ping_session(sub(h.get("review_pings", "builder")),
"watchdog ping: the other loop pushed a verdict/finding commit. "
"Pull the review file and act.", submit_key=submit)
_hand["sha"] = head
inboxes = h.get("inboxes", [])
md5 = lambda s: hashlib.md5(s.encode()).hexdigest()
sub_dir = _state_subdir(cfg)
for fname, key, target in (
(inboxes[0] if len(inboxes) > 0 else None, "adv_inbox", h.get("claim_pings", "adversary")),
(inboxes[1] if len(inboxes) > 1 else None, "builder_inbox", h.get("review_pings", "builder")),
):
if not fname:
continue
content = _show_pushed(cfg, repo, fname)
if content:
hh = md5(content)
if hh != _hand[key]:
log(f"handoff: {fname} changed → pinging {target}")
ping_session(sub(target),
f"watchdog ping: the other loop pushed {sub_dir}/{fname} — pull, read it, "
f"act, then delete the file (commit + push) to mark it consumed.", submit_key=submit)
_hand[key] = hh
else:
_hand[key] = ""
# ── phase advance (loop machine) ─────────────────────────────────────────────────
def loop_agents(cfg):
return [a for a in cfg["agents"].values() if a["kind"] == "loop" and a.get("enabled", True)]
def stop_loops(cfg):
for a in loop_agents(cfg):
if session_alive(a["session"]):
log(f"killing {a['session']}")
kill_session(a["session"])
def start_loops(cfg):
for a in loop_agents(cfg):
start_agent(cfg, a)
def phase_advance_check(cfg):
"""On heavy tick: if the current phase is DONE, advance (or finish the sequence).
Returns True only when it actually transitions/completes THIS tick (caller skips healing
that one tick). Once the sequence is already marked complete it is idempotent (returns
False, no re-log, no re-stop). Appending a phase after completion clears the stale marker
and resumes the loops on the new phase."""
ps = phases(cfg)
if not ps or not cfg["loop"].get("auto_advance", True):
return False
marker = Path(cfg["log_dir"]) / "SEQUENCE-COMPLETE"
idx = cur_idx(cfg)
ph = ps[idx]
if not phase_done(cfg, ph["status"]):
return False
nxt = idx + 1
if nxt < len(ps):
log(f"PHASE {ph['id']} DONE — auto-transitioning to {ps[nxt]['id']}")
stop_loops(cfg)
Path(phase_idx_file(cfg)).write_text(str(nxt))
if marker.exists():
marker.unlink() # resuming into a (freshly-appended) phase — clear stale completion
handoff_reset()
start_loops(cfg)
return True
# last phase is DONE → sequence complete
if marker.exists():
return False # already handled — idempotent (no re-log, no re-stop)
log(f"PHASE SEQUENCE COMPLETE (last phase {ph['id']} DONE) — stopping loops")
stop_loops(cfg)
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
marker.write_text(f"phase sequence complete {ts}. Loops stopped; build finished.\n")
oc = cfg["loop"].get("on_complete")
if oc:
trig = Path(cfg["log_dir"]) / oc.get("trigger_file", ".run-on-complete")
if trig.exists():
trig.unlink()
runname = oc.get("run")
if runname and runname in cfg["agents"]:
log(f"on_complete: launching task agent {runname!r}")
start_agent(cfg, cfg["agents"][runname], force=True)
return True
# ── watchdog loop ────────────────────────────────────────────────────────────────
def watched(cfg):
return [a for a in cfg["agents"].values()
if a.get("enabled", True) and a.get("watch", "none") != "none"]
def watchdog_loop(cfg_path):
cfg = load_config(cfg_path)
sig = int(cfg["watchdog"].get("signal_interval", 30))
heavy = int(cfg["watchdog"].get("heavy_interval", 300))
ps = phases(cfg)
log(f"watchdog up — phase={cur_phase(cfg).get('id','-')} [{cur_idx(cfg)+1}/{len(ps)}] "
f"signal={sig}s heavy={heavy}s, watching: {[a['name'] for a in watched(cfg)]}")
elapsed = heavy # force a heavy check on first tick
wake_elapsed = {a["name"]: 0 for a in cfg["agents"].values() if a.get("wake")}
while True:
cfg = load_config(cfg_path) # re-read every tick: config is authoritative, no env drift
has_loops = bool(loop_agents(cfg))
seq_done = (Path(cfg["log_dir"]) / "SEQUENCE-COMPLETE").exists()
if has_loops:
handoff_check(cfg)
for a in watched(cfg):
if a["watch"] == "heal+stall":
stall_check_one(cfg, a)
else:
if session_alive(a["session"]):
limit_tick(cfg, a, capture_pane(a["session"], 40))
if not seq_done:
for name, el in list(wake_elapsed.items()):
interval = int(cfg["agents"][name]["wake"].get("interval", 3600))
if el >= interval:
if wake_agent(cfg, cfg["agents"][name]):
wake_elapsed[name] = 0
if elapsed >= heavy:
elapsed = 0
advanced = phase_advance_check(cfg) if has_loops else False
if not advanced:
for a in watched(cfg):
if seq_done and a["kind"] == "loop":
continue
heal_one(cfg, a)
time.sleep(sig)
elapsed += sig
for k in wake_elapsed:
wake_elapsed[k] += sig
# ── CLI commands ──────────────────────────────────────────────────────────────
def start_watchdog(cfg, cfg_path):
session = cfg["session_prefix"] + "watchdog"
if session_alive(session):
log("watchdog already running")
return
log("starting watchdog")
script = Path(__file__).resolve()
new_session(session, cfg["project_dir"],
f"exec >>'{cfg['log_dir']}/{session}.log' 2>&1; "
f"python3 '{script}' watchdog --config '{Path(cfg_path).resolve()}'",
str(_session_log_path(cfg, session)))
def cmd_up(cfg, cfg_path, names):
if cfg["loop"].get("resume_phase") is False and not Path(phase_idx_file(cfg)).exists():
Path(phase_idx_file(cfg)).write_text("0")
targets = ([cfg["agents"][n] for n in names if n in cfg["agents"]]
if names else
[a for a in cfg["agents"].values() if a.get("enabled", True)])
for a in targets:
start_agent(cfg, a)
if not names:
for s in cfg["services"].values():
start_service(cfg, s)
start_watchdog(cfg, cfg_path)
else:
for n in names:
if n in cfg["services"]:
start_service(cfg, cfg["services"][n])
if n == "watchdog":
start_watchdog(cfg, cfg_path)
def cmd_down(cfg, names):
sessions = []
if names:
for n in names:
if n in cfg["agents"]: sessions.append(cfg["agents"][n]["session"])
elif n in cfg["services"]: sessions.append(cfg["services"][n]["session"])
elif n == "watchdog": sessions.append(cfg["session_prefix"] + "watchdog")
else:
sessions = [a["session"] for a in cfg["agents"].values()]
sessions += [s["session"] for s in cfg["services"].values()]
sessions.append(cfg["session_prefix"] + "watchdog")
for s in sessions:
if session_alive(s):
log(f"killing {s}")
kill_session(s)
def cmd_status(cfg):
idx, ps = cur_idx(cfg), phases(cfg)
if ps:
ph = ps[idx]
done = "## DONE" if phase_done(cfg, ph["status"]) else "in progress"
print(f" phase: {ph['id']} [{idx+1}/{len(ps)}] plan={ph.get('plan','-')} ({done})")
print(f" {'AGENT':<14} {'KIND':<11} {'BACKEND':<9} {'MODEL':<20} {'WATCH':<10} STATE")
for a in cfg["agents"].values():
st = "RUNNING" if session_alive(a["session"]) else "stopped"
en = "" if a.get("enabled", True) else " (disabled)"
rc = session_command(a["session"]) if st == "RUNNING" else ""
print(f" {a['name']:<14} {a['kind']:<11} {a['backend']:<9} "
f"{role_model(cfg,a) or 'default':<20} {a.get('watch','none'):<10} {st}{en}"
+ (f" [{rc}]" if rc else ""))
for s in cfg["services"].values():
st = "RUNNING" if session_alive(s["session"]) else "stopped"
print(f" {s['name']:<14} {'service':<11} {'-':<9} {'-':<20} {'-':<10} {st}")
wd = cfg["session_prefix"] + "watchdog"
print(f" {'watchdog':<14} {'service':<11} {'-':<9} {'-':<20} {'-':<10} "
f"{'RUNNING' if session_alive(wd) else 'stopped'}")
def cmd_phase(cfg, args):
ps = phases(cfg)
if not ps:
print("no [loop].phases configured"); return
if not args or args[0] == "show":
idx = cur_idx(cfg)
print(f"phase {ps[idx]['id']} [{idx+1}/{len(ps)}] seq: {' '.join(p['id'] for p in ps)}")
return
if args[0] == "next":
Path(phase_idx_file(cfg)).write_text(str(min(cur_idx(cfg)+1, len(ps)-1)))
elif args[0] == "set" and len(args) > 1:
Path(phase_idx_file(cfg)).write_text(str(int(args[1])))
print(f"phase idx now {cur_idx(cfg)} ({cur_phase(cfg).get('id')})")
def cmd_selftest():
"""Self-contained regression checks for the footer-UI activity detector. Needs no config."""
backend = {
"active_re": "esc interrupt|thinking|inferring|running tool|preparing patch|reading|searching",
"footer_ui": True, "log_grace": 180,
}
cfg = {"backends": {"tui": backend}, "log_dir": "/tmp"}
a = {"name": "x", "backend": "tui", "session": "selftest-x", "kind": "loop"}
idle = "\n ▣ Build · GPT-5.4 · 2m 19s\n 178.4K (17%) ctrl+p commands\n"
active = "\n ~ Preparing patch...\n ⬝⬝⬝■■ esc interrupt 137.6K\n"
checks = [
("footer_ui idle footer is idle", not pane_active(cfg, a, idle, use_log=False)),
("footer_ui active footer is active", pane_active(cfg, a, active, use_log=False)),
("limit banner + idle footer is not active",
not pane_active(cfg, a, idle + "\nYou've hit your weekly limit · resets Jun 16, 10pm\n", use_log=False)),
]
bad = [n for n, ok in checks if not ok]
for n, ok in checks:
print(f" {'PASS' if ok else 'FAIL'}: {n}")
sys.exit(1 if bad else 0)
INIT_TOML = """\
# Starter agent-orchestrator config. See the README for the full schema.
[defaults]
session_prefix = "{prefix}-"
log_dir = ".ao-state"
backend = "claude"
model = "claude-sonnet-4-6"
watch = "heal"
[backend.claude]
bin = "claude"
flags = "--dangerously-skip-permissions"
prompt_delivery = "arg"
process_name = "claude"
submit_key = "Enter"
stall_idle = 300
active_re = "esc to interrupt|Running tool|\\\\u00b7 \\\\d+"
limit_re = "usage limit|limit reached|reached your .*limit"
[[agent]]
name = "worker"
kind = "persistent"
prompt = "You are a worker agent. Wait for instructions."
"""
def cmd_init(args):
target = Path(args[0]) if args else Path.cwd()
target.mkdir(parents=True, exist_ok=True)
cfg_file = target / "agents.toml"
if cfg_file.exists():
die(f"{cfg_file} already exists — refusing to overwrite")
cfg_file.write_text(INIT_TOML.format(prefix=target.resolve().name or "proj"))
(target / "prompts").mkdir(exist_ok=True)
log(f"scaffolded {cfg_file} and {target/'prompts'}/ — edit, then `agents.py up --config {cfg_file}`")
# ── main ──────────────────────────────────────────────────────────────────────
def main():
argv = sys.argv[1:]
if not argv or argv[0] in ("-h", "--help", "help"):
print(__doc__); return
cfg_path = _cfg_path(argv)
argv = [a for i, a in enumerate(argv)
if a != "--config" and (i == 0 or argv[i-1] != "--config")]
cmd = argv[0] if argv else "status"
rest = argv[1:]
if cmd == "selftest": cmd_selftest(); return
if cmd == "init": cmd_init(rest); return
cfg = load_config(cfg_path)
if cmd == "up": cmd_up(cfg, cfg_path, rest)
elif cmd == "down": cmd_down(cfg, rest)
elif cmd == "status": cmd_status(cfg)
elif cmd == "watchdog": watchdog_loop(cfg_path)
elif cmd == "phase": cmd_phase(cfg, rest)
elif cmd == "logs":
if not rest:
die("usage: agents.py logs <name>")
sess = cfg["agents"].get(rest[0], {}).get("session") or (cfg["session_prefix"] + rest[0])
os.execvp("tail", ["tail", "-f", str(_session_log_path(cfg, sess))])
else:
print(__doc__)
if __name__ == "__main__":
main()

8
examples/PLAN-demo1.md Normal file
View File

@ -0,0 +1,8 @@
# Example phase: demo1
A placeholder phase plan for the example project. In a real project this file is the single
source of truth for the phase: its mission and its Definition of Done. The Builder reads it,
builds, and claims each gate; the Adversary verifies from a cold start and records PASS/FAIL.
## Definition of Done
- (example) the thing builds and the Adversary records a fresh PASS.

7
examples/PLAN-demo2.md Normal file
View File

@ -0,0 +1,7 @@
# Example phase: demo2
A second placeholder phase, to show phase auto-advance and a per-phase model override
(see `models = { builder = "claude-opus-4-8" }` in agents.example.toml).
## Definition of Done
- (example) the second deliverable is Adversary-verified.

27
flake.lock generated Normal file
View File

@ -0,0 +1,27 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1751274312,
"narHash": "sha256-/bVBlRpECLVzjV19t5KMdMFWSwKLtb5RyXdjz3LJT+g=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "50ab793786d9de88ee30ec4e4c24fb4236fc2674",
"type": "github"
},
"original": {
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "50ab793786d9de88ee30ec4e4c24fb4236fc2674",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

39
flake.nix Normal file
View File

@ -0,0 +1,39 @@
{
description = "agent-orchestrator a generic multi-agent orchestration harness (tmux + watchdog)";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/50ab793786d9de88ee30ec4e4c24fb4236fc2674";
};
outputs = { self, nixpkgs }:
let
systems = [ "x86_64-linux" "aarch64-linux" "x86_64-darwin" "aarch64-darwin" ];
forAllSystems = f: nixpkgs.lib.genAttrs systems (system: f nixpkgs.legacyPackages.${system});
in
{
# Reproducible devShell with the harness runtime deps. The driver itself is pure Python
# stdlib (it needs tomllib, so python >= 3.11); the rest is what the agents/watchdog shell
# out to. Make the agent CLIs (claude / opencode) available on PATH separately — they are
# external, non-Nix tools; install them per their own docs, then `nix develop` here.
devShells = forAllSystems (pkgs: {
default = pkgs.mkShell {
packages = [
pkgs.python311 # stdlib tomllib (>=3.11) — the driver imports it
pkgs.tmux # every agent/session/watchdog runs in tmux
pkgs.git # handoff signalling watches origin/main
pkgs.coreutils
pkgs.bash
];
shellHook = ''
echo "agent-orchestrator devShell $(python3 --version), $(tmux -V), $(git --version)"
echo "try: python3 agents.py selftest | ./smoke.sh | python3 agents.py status --config agents.example.toml"
'';
};
});
# `nix flake check` evaluates this — a cheap smoke that the devShell builds.
checks = forAllSystems (pkgs: {
devshell-builds = self.devShells.${pkgs.system}.default;
});
};
}

51
prompts/adversary.md Normal file
View File

@ -0,0 +1,51 @@
You are the **Adversary** agent — one of two independent loops (Builder + Adversary). Your job is
to DISBELIEVE the Builder. Read the current phase's plan in full; it is the single source of truth
for what is being verified.
Start a self-paced loop now: invoke `/loop` with no interval so you re-wake yourself via
ScheduleWakeup. Pace yourself: when a gate is CLAIMED (or the watchdog pings you that one is),
verify it promptly — that is top priority. When nothing is pending you may IDLE freely (sleep in
chunks of ≤10 min). The watchdog pings you the instant the Builder claims a gate, so you don't
need to busy-poll. Poll ~4 min only while actively watching a CLAIMED gate's run. Keep running
independent break-it probes even when no gate is pending.
LIVENESS PROTOCOL (the watchdog enforces this):
- **Cap every wait at 10 minutes.** To wait longer, wake at 10 min, re-check, then wait again.
- **Declare every wait.** Immediately before going idle, your FINAL output line MUST be exactly
`WAITING-UNTIL: <ISO-8601 UTC>` (≤10 min out, matching your ScheduleWakeup; compute it with
`date -u -d '+10 min' +%FT%TZ`). If the watchdog sees you idle with no current marker, or idle
past the time it names, it kills + reboots you.
- **Compact proactively** if context usage climbs high (≳80%) — your state is in git + REVIEW/STATUS.
You run as a SEPARATE process and coordinate ONLY through the git repo:
- FILE-LOCATION RULE: ALL coordination / loop-state files live under `machine-docs/`.
- Keep your OWN clone, separate from the Builder's. If the repo doesn't exist yet, wait and retry.
- `git pull --rebase` before every edit; commit; push; never `--force`.
- COMMIT-PREFIX CONVENTION (the watchdog depends on it). Prefix every commit that records a
**verdict or finding** with `review(...)`. The watchdog watches `origin/main` and pings the
Builder the moment a `review(...)` commit lands — that IS the handoff signal. (The Builder's gate
claims are `claim(...)`.) `review(` is load-bearing.
- Write ONLY your files: REVIEW and the "## Adversary findings" section of BACKLOG. Everything else
(code, STATUS, JOURNAL, "## Build backlog") is read-only to you.
- INBOX side-channel: for non-gate messages to the Builder, write/append
`machine-docs/BUILDER-INBOX.md` and push. To receive one, look for
`machine-docs/ADVERSARY-INBOX.md`; process it, then delete it (commit + push) — deletion is the
"consumed" signal.
- ISOLATION DISCIPLINE (anti-anchoring — critical). The Builder gives you in STATUS the essential
verification info: WHAT is claimed, HOW to verify, the EXPECTED outcome, WHERE the inputs live —
read STATUS for that. What you must IGNORE — and NEVER read in JOURNAL before your verdict — is
the Builder's REASONING / RATIONALISATIONS. Form your verdict from (a) the phase plan, (b) the
code / git history, (c) the verification info in STATUS, and (d) your own COLD re-run of the
check. Only AFTER writing your verdict may you consult JOURNAL — note in REVIEW that you did.
Each wake:
1. Pull. Read STATUS for any "Gate: <id> CLAIMED, awaiting Adversary".
2. Verify claims from a COLD START (fresh shell, your own clone, no cached state). Re-run the
acceptance check yourself; do not trust the Builder's word.
3. Actively try to break things — edge cases, missing cleanup, leaked secrets, races.
4. Record verdicts in REVIEW ("<id>: PASS @<ts>" + evidence, or FAIL). File each defect as a
"## Adversary findings" item with repro steps. Only YOU close those, after re-test. You hold
veto power: write "## VETO <reason>" to REVIEW to forbid done until cleared.
5. Push (with a `review(...)` commit). Schedule the next wake.
Begin: read the phase plan, then enter the self-paced loop (start by cloning the repo if it exists).

63
prompts/builder.md Normal file
View File

@ -0,0 +1,63 @@
You are the **Builder** agent — one of two independent loops (Builder + Adversary). Your job is
to build what the current phase's plan specifies, working autonomously, and to get every
Definition-of-Done item verified by the Adversary before declaring the phase done.
Start a self-paced loop now: invoke `/loop` with no interval so you re-wake yourself via
ScheduleWakeup. Each iteration = one unit of work. Pace yourself:
1. **A build/deploy/test is in flight** → poll every ~5 min; never a single long wait matching the
expected runtime (catch a failure at minute 4 of a 25-min run, not at minute 25).
2. **Parked at a CLAIMED gate awaiting the Adversary, nothing else unblocked** → the watchdog
pings you the moment the Adversary pushes a verdict or an inbox message, so you may wait; keep a
fallback self-poll every 24 min in case a ping is missed.
3. **Genuinely idle** → sleep in chunks of ≤10 min. Prefer keeping an unblocked backlog item in
hand so you rarely hit case 2.
LIVENESS PROTOCOL (the watchdog enforces this):
- **Cap every wait at 10 minutes.** To wait longer, wake at 10 min, re-check, then wait again.
- **Declare every wait.** Immediately before going idle, your FINAL output line MUST be exactly
`WAITING-UNTIL: <ISO-8601 UTC>` — the time you will resume (≤10 min out, matching your
ScheduleWakeup; compute it with `date -u -d '+10 min' +%FT%TZ`). If the watchdog sees you idle
with no current marker as your last line, or idle past the time it names, it kills + reboots you
(you resume cleanly from git + your STATUS/REVIEW files).
- **Compact proactively.** If context usage climbs high (≳80%), run `/compact` — your loop state
is in git + the phase STATUS/REVIEW files, so compaction is lossless.
You run as a SEPARATE process from the Adversary and coordinate ONLY through the git repo:
- FILE-LOCATION RULE: ALL coordination / loop-state files live under `machine-docs/`, never the
repo root.
- `git pull --rebase` before every edit; make the smallest change; commit; push. Never `--force`.
- COMMIT-PREFIX CONVENTION (the watchdog depends on it). Prefix every commit with a conventional
type. CRITICALLY: prefix a commit that **claims a gate** with `claim(...)`. The watchdog watches
`origin/main` and pings the Adversary the moment a `claim(...)` commit lands — that IS the
handoff signal. (Adversary verdicts are `review(...)`.) Also use `feat/fix/status/journal/
decisions/chore/inbox(...)`, but `claim(` is load-bearing.
- Write ONLY your files: source/config, STATUS, JOURNAL, DECISIONS, and the "## Build backlog"
section of BACKLOG. Treat REVIEW and "## Adversary findings" as read-only — the Adversary owns
them.
- ARTIFACT-LAYER ISOLATION (facts in STATUS, reasoning in JOURNAL). STATUS MUST give the Adversary
everything it needs to verify your claim: **WHAT** is claimed (gate id, DoD items), **HOW** to
verify it (the exact command/check it can re-run from its own clone), the **EXPECTED** outcome
(hashes, file contents, status codes, command exit), and **WHERE** the inputs live (commit shas,
paths). STATUS MUST NOT include rationalisations / "I think this passes because…" / design
narrative — those go in JOURNAL, which the Adversary does not read before forming its verdict
(anti-anchoring). The line: **WHAT + HOW + EXPECTED + WHERE = STATUS; WHY = JOURNAL.**
- At each milestone gate, set "Gate: <id> CLAIMED, awaiting Adversary" in STATUS and work other
unblocked items; do NOT advance past the gate until REVIEW shows its PASS.
- CLEAN TREE BEFORE CLAIM: run `git status` before you claim — the tree MUST be clean (everything
committed AND pushed). The Adversary cold-verifies from a fresh clone, so any un-pushed change is
a guaranteed mismatch.
- INBOX side-channel: for non-gate messages to the Adversary, write/append
`machine-docs/ADVERSARY-INBOX.md` and push. To receive a message, look for
`machine-docs/BUILDER-INBOX.md`; process it, then delete it (commit + push) — deletion is the
"consumed" signal.
Overriding rules:
- "Done" is defined ONLY by the phase plan's Definition of Done, Adversary-verified. No
self-certifying.
- Verify every change against reality; paste command + output into JOURNAL. No "should work."
- Never weaken, skip, or delete a test to make a run pass. A red test is information.
- 3rd identical failure → stop, record the dead-end in DECISIONS, change approach or mark blocked.
- Write the done marker only when REVIEW shows a fresh PASS for every Definition-of-Done item and
there is no standing veto.
Begin: read the phase plan named above, then enter the self-paced loop.

19
prompts/kickoff.md Normal file
View File

@ -0,0 +1,19 @@
*** PROJECT PHASE: {phase_id} ***
SINGLE SOURCE OF TRUTH for THIS phase: {plan} — read it in full now; it defines this phase's
mission and Definition of Done.
Track loop state in PHASE-NAMESPACED files UNDER `machine-docs/` in your repo clone (create the
dir if missing): `machine-docs/{status}`, `machine-docs/BACKLOG-{phase_id}.md`,
`machine-docs/REVIEW-{phase_id}.md`, `machine-docs/JOURNAL-{phase_id}.md`. `machine-docs/DECISIONS.md`
is shared (append-only, joint authority).
FILE-LOCATION RULE (mandatory): ALL coordination / loop-state files live under `machine-docs/`,
NEVER the repo root — that includes STATUS/BACKLOG/REVIEW/JOURNAL (phase-namespaced),
DECISIONS.md, and the ADVERSARY-INBOX.md / BUILDER-INBOX.md side-channels. If you find one at the
root, `git mv` it into `machine-docs/`.
"Done" for this phase = the Builder writes the done marker ("## DONE") to `machine-docs/{status}`
ONLY after every Definition-of-Done item is Adversary-verified with a fresh PASS in
`machine-docs/REVIEW-{phase_id}.md`.
=== standing role & rules ===

89
smoke.sh Executable file
View File

@ -0,0 +1,89 @@
#!/usr/bin/env bash
# Self-contained smoke test: bring a 2-agent example project up and tear it down in an ISOLATED
# sandbox (its own session_prefix + a throwaway log_dir), using only files in this repo and no
# external agent CLI (the demo backend is just a shell that idles). Cleans up after itself.
#
# Usage: ./smoke.sh → prints "SMOKE PASS" and exits 0 on success.
set -euo pipefail
HERE="$(cd "$(dirname "$0")" && pwd)"
PREFIX="ao-smoke-$$-"
SANDBOX="$(mktemp -d)"
CFG="$SANDBOX/agents.toml"
cleanup() {
local rc=$?
python3 "$HERE/agents.py" --config "$CFG" down >/dev/null 2>&1 || true
# belt-and-suspenders: kill any session that still carries our unique prefix
if command -v tmux >/dev/null 2>&1; then
tmux ls 2>/dev/null | sed 's/:.*//' | grep "^${PREFIX}" | while read -r s; do
tmux kill-session -t "=$s" 2>/dev/null || true
done || true
fi
rm -rf "$SANDBOX"
exit "$rc"
}
trap cleanup EXIT
fail() { echo "SMOKE FAIL: $1" >&2; exit 1; }
command -v tmux >/dev/null 2>&1 || fail "tmux not on PATH (run inside 'nix develop')"
# A throwaway config: project_dir points back at the repo so prompts/ resolve, but session_prefix
# and log_dir are unique + temporary, so this run can never touch a real project's sessions.
cat > "$CFG" <<EOF
[defaults]
project_dir = "$HERE"
session_prefix = "$PREFIX"
log_dir = "$SANDBOX/state"
backend = "demo"
watch = "none"
[backend.demo]
bin = "echo '[demo] {session} up'; exec sleep 1000000"
prompt_delivery = "exec"
[[agent]]
name = "builder"
kind = "loop"
role = "builder"
[[agent]]
name = "adversary"
kind = "loop"
role = "adversary"
[loop]
kickoff_template = "prompts/kickoff.md"
roles_dir = "prompts"
phases = [ { id = "smoke", plan = "examples/PLAN-demo1.md", status = "STATUS-smoke.md" } ]
EOF
echo "== sanity: 'status' on the shipped example config =="
python3 "$HERE/agents.py" --config "$HERE/agents.example.toml" status >/dev/null \
|| fail "status on agents.example.toml failed"
echo "== bring up isolated sandbox ($PREFIX) =="
python3 "$HERE/agents.py" --config "$CFG" up builder adversary
for s in "${PREFIX}builder" "${PREFIX}adversary"; do
tmux has-session -t "=$s" 2>/dev/null || fail "$s did not start"
echo " up: $s"
done
# the kickoff prompt should have been assembled (template preamble + role prompt) into state/
KF="$SANDBOX/state/state/kickoff-${PREFIX}builder.txt"
test -s "$KF" || fail "kickoff file not written ($KF)"
grep -q "PROJECT PHASE: smoke" "$KF" || fail "kickoff template not rendered into kickoff"
grep -q "You are the \*\*Builder\*\*" "$KF" || fail "role prompt not appended to kickoff"
echo " kickoff assembled OK (template + role prompt)"
echo "== tear down =="
python3 "$HERE/agents.py" --config "$CFG" down builder adversary
sleep 1
for s in "${PREFIX}builder" "${PREFIX}adversary"; do
! tmux has-session -t "=$s" 2>/dev/null || fail "$s still alive after down"
echo " down: $s"
done
echo "SMOKE PASS"