Files
agent-orchestrator/agents.py
autonomic-bot 289ef07df4 feat: agent-orchestrator v0.1.0 — generic multi-agent harness
Extracted and generalized from a project-specific agent launch engine. No project
specifics remain in code: paths, the loop kickoff preamble, handoff conventions, and the
on-complete hook are all config/template driven; session_prefix + log_dir are required.

- agents.py: driver + watchdog (data-driven backends via prompt_delivery arg|ping|exec;
  required session_prefix/log_dir; project-rooted path resolution; configurable kickoff
  template, handoff patterns, on_complete task; tmux-safe; selftest + init verbs)
- agent-log.py: config-driven claude transcript renderer
- agents.example.toml: self-contained 2-agent example (dependency-free demo backend)
- prompts/: generic builder/adversary/kickoff templates
- smoke.sh: isolated up+down sandbox proof that cleans up after itself
- flake.nix/.lock: devShell (python311 + tmux + git)
- README.md: schema + verbs + AI-PO usage + nix

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-13 18:39:00 +00:00

930 lines
40 KiB
Python
Executable File

#!/usr/bin/env python3
"""agent-orchestrator — one driver, one config (agents.toml) for a fleet of agents.
A generic, reusable harness for running and supervising AI-agent sessions in tmux. Every
agent — a Builder/Adversary loop pair, a persistent supervisor, a one-shot task — is declared
in a single TOML config; the watchdog reads the SAME file, so there is no env-vs-file drift.
Nothing about any particular project lives in this code: paths, the loop kickoff preamble, the
handoff conventions, and the on-complete hook are all supplied by the project's config.
Usage:
agents.py up [name...] start enabled agents (or just the named ones); use-or-create
agents.py down [name...] stop agents (or all)
agents.py status one table: every agent — kind, backend, model, session, phase
agents.py watchdog the supervisor loop (reads the config every tick)
agents.py logs <name> tail an agent's session log
agents.py phase [set N|next|show] inspect / move the loop phase
agents.py selftest backend activity-detector regression checks (no config needed)
agents.py init [dir] scaffold a starter agents.toml + prompts/ in a project dir
Options:
--config PATH config file (default: ./agents.toml, else <script dir>/agents.toml)
Config is authoritative. A one-off override env AGENT_MODEL_<name> / AGENT_BACKEND_<name>
affects a single invocation only; the persisted watchdog always re-reads the file.
"""
import hashlib, json, os, re, shlex, subprocess, sys, time, tomllib
from datetime import datetime, timedelta
from pathlib import Path
SCRIPT_DIR = Path(__file__).resolve().parent
# ── config loading ──────────────────────────────────────────────────────────────
def _cfg_path(argv):
if "--config" in argv:
return Path(argv[argv.index("--config") + 1])
cwd_cfg = Path.cwd() / "agents.toml"
return cwd_cfg if cwd_cfg.exists() else SCRIPT_DIR / "agents.toml"
def _resolve(base, p):
"""Resolve a possibly-relative config path against the project root."""
pp = Path(os.path.expanduser(str(p)))
return pp if pp.is_absolute() else (Path(base) / pp)
def load_config(path):
path = Path(path)
with open(path, "rb") as f:
raw = tomllib.load(f)
defaults = raw.get("defaults", {})
# The project root: everything project-supplied (prompts, templates, relative paths) is
# resolved against it. Defaults to the directory holding the config file; override with
# defaults.project_dir (useful when the config lives in a sandbox but prompts live elsewhere).
project_dir = _resolve(path.resolve().parent, defaults.get("project_dir", ".")).resolve()
session_prefix = defaults.get("session_prefix")
if not session_prefix:
die("config error: [defaults].session_prefix is required (e.g. session_prefix = \"myproj-\")")
log_dir_raw = defaults.get("log_dir")
if not log_dir_raw:
die("config error: [defaults].log_dir is required")
cfg = {
"watchdog": raw.get("watchdog", {}),
"backends": raw.get("backend", {}),
"defaults": defaults,
"loop": raw.get("loop", {}),
"project_dir": str(project_dir),
"log_dir": str(_resolve(project_dir, log_dir_raw)),
"session_prefix": session_prefix,
}
agents = {}
for a in raw.get("agent", []):
m = {**defaults, **a}
m["session"] = a.get("session", session_prefix + a["name"])
m["kind"] = a.get("kind", "persistent")
m["dir"] = str(_resolve(project_dir, a.get("dir", defaults.get("dir", "."))))
# one-off env override (single invocation; watchdog ignores via fresh load each tick)
env_model = os.environ.get(f"AGENT_MODEL_{a['name']}")
env_backend = os.environ.get(f"AGENT_BACKEND_{a['name']}")
if env_model: m["model"] = env_model
if env_backend: m["backend"] = env_backend
agents[a["name"]] = m
cfg["agents"] = agents
cfg["services"] = {s["name"]: {**defaults, **s,
"session": session_prefix + s["name"],
"dir": str(_resolve(project_dir, s.get("dir", ".")))}
for s in raw.get("service", [])}
cfg["state_dir"] = os.path.join(cfg["log_dir"], "state")
Path(cfg["state_dir"]).mkdir(parents=True, exist_ok=True)
return cfg
def backend_of(cfg, agent):
b = cfg["backends"].get(agent["backend"])
if not b:
die(f"agent {agent['name']}: unknown backend {agent['backend']!r}")
return b
# ── logging ───────────────────────────────────────────────────────────────────
def log(msg):
print(f"[agents {datetime.now():%H:%M:%S}] {msg}", flush=True)
def die(msg):
log(f"ERROR: {msg}")
sys.exit(1)
# ── tmux helpers ────────────────────────────────────────────────────────────────
# ALWAYS target sessions with an exact-match "=" prefix. tmux does prefix/fnmatch on bare
# targets, so "-t myproj-assistant" would match "myproj-assistant3" — capturing or killing the
# wrong session. "=name" forces an exact match.
def _run(cmd):
"""subprocess.run wrapper that never raises if the binary (e.g. tmux) is absent."""
try:
return subprocess.run(cmd, capture_output=True, text=True)
except FileNotFoundError:
return subprocess.CompletedProcess(cmd, 127, "", "")
def TS(name): # exact target-SESSION (has-session, kill-session)
return "=" + name
def TP(name): # exact target-PANE: "=session:" anchors the exact session, current window/pane
return "=" + name + ":"
def session_alive(name):
return _run(["tmux", "has-session", "-t", TS(name)]).returncode == 0
def session_command(name):
r = _run(["tmux", "display-message", "-p", "-t", TP(name), "#{pane_current_command}"])
return r.stdout.strip() if r.returncode == 0 else ""
def kill_session(name):
_run(["tmux", "kill-session", "-t", TS(name)])
def capture_pane(name, lines=40):
r = _run(["tmux", "capture-pane", "-p", "-t", TP(name)])
return "\n".join(r.stdout.splitlines()[-lines:]) if r.returncode == 0 else ""
def pipe_to_log(session, log_path):
_run(["tmux", "pipe-pane", "-o", "-t", TP(session), f"cat >> '{log_path}'"])
def new_session(session, cwd, cmd, log_path):
Path(cwd).mkdir(parents=True, exist_ok=True)
_run(["tmux", "new-session", "-d", "-s", session, "-c", cwd, cmd])
pipe_to_log(session, log_path)
def ping_session(session, msg, submit_key="Enter"):
"""Type a message into a session and submit it; retry submit until the prefix clears."""
if not session_alive(session):
return
prefix = msg[:28]
_run(["tmux", "send-keys", "-t", TP(session), "-l", "--", msg])
time.sleep(0.5)
for _ in range(10):
_run(["tmux", "send-keys", "-t", TP(session), submit_key])
time.sleep(1)
if prefix not in capture_pane(session, 20):
return
# ── activity / limit / fatal detection (per-backend regexes from config) ─────────
def _re(backend, key):
pat = backend.get(key)
return re.compile(pat, re.I) if pat else None
def _session_log_path(cfg, session):
return Path(cfg["log_dir"]) / f"{session}.log"
def _log_recently_touched(cfg, session, age_seconds):
try:
return (time.time() - _session_log_path(cfg, session).stat().st_mtime) <= age_seconds
except FileNotFoundError:
return False
def pane_active(cfg, agent, pane, *, use_log=True):
"""True when the pane shows the agent is working. A footer_ui backend (a TUI with a static
footer that lingers after a turn) only counts the bottom rows as activity, and falls back to
a recently-touched session log within a grace window."""
backend = backend_of(cfg, agent)
active = _re(backend, "active_re")
if backend.get("footer_ui"):
bottom = "\n".join(pane.splitlines()[-10:])
hit = bool(active and active.search(bottom))
grace = int(backend.get("log_grace", 180))
return hit or (use_log and _log_recently_touched(cfg, agent["session"], grace))
return bool(active and active.search(pane))
# ── prompt assembly ───────────────────────────────────────────────────────────
DONE_PLACEHOLDER_RE = re.compile(
r"^\s*(not yet|not done|not complete|incomplete|pending\b|tbd\b|n/?a\b|"
r"written here only|only when|to be (written|filled)|when all|<.*>)", re.I)
def phases(cfg): return cfg["loop"].get("phases", [])
def phase_idx_file(cfg): return os.path.join(cfg["state_dir"], cfg["loop"].get("state_file", "phase-idx"))
def cur_idx(cfg):
try:
v = Path(phase_idx_file(cfg)).read_text().strip()
return int(v) if v.lstrip("-").isdigit() else 0
except FileNotFoundError:
return 0
def cur_phase(cfg):
ps = phases(cfg)
return ps[cur_idx(cfg)] if ps else {}
def _state_subdir(cfg):
return (cfg["loop"].get("handoff") or {}).get("state_subdir", "machine-docs")
def handoff_repo(cfg):
h = cfg["loop"].get("handoff") or {}
repo = h.get("repo")
return str(_resolve(cfg["project_dir"], repo)) if repo else cfg["project_dir"]
def resolve_state_file(cfg, repo_dir, basename):
sub = _state_subdir(cfg)
p = Path(repo_dir) / sub / basename
return p if p.exists() else Path(repo_dir) / basename
def phase_done(cfg, status_basename):
repo = handoff_repo(cfg)
try:
lines = resolve_state_file(cfg, repo, status_basename).read_text().splitlines()
except FileNotFoundError:
return False
marker = cfg["loop"].get("done_marker", "## DONE")
for i, line in enumerate(lines):
if not line.startswith(marker):
continue
body = next((nxt for nxt in lines[i+1:] if nxt.strip()), "")
if DONE_PLACEHOLDER_RE.match(body):
continue
return True
return False
def role_model(cfg, agent):
"""Per-phase override (phases[idx].models[role]) wins, else the agent's configured model."""
role = agent.get("role")
if role:
ov = (cur_phase(cfg).get("models") or {}).get(role)
if ov:
return ov
return agent.get("model", "")
def _render_template(text, fields):
for k, v in fields.items():
text = text.replace("{" + k + "}", str(v))
return text
def build_loop_kickoff(cfg, agent):
"""A loop agent's kickoff = the project's kickoff template (slots filled from the current
phase) followed by the role prompt prompts/<role>.md. Both files are project-supplied; this
code holds no project text."""
ph = cur_phase(cfg)
fields = {
"phase_id": ph.get("id", ""),
"plan": ph.get("plan", ""),
"status": ph.get("status", ""),
"role": agent.get("role", ""),
}
pdir = Path(cfg["project_dir"])
preamble = ""
tmpl = cfg["loop"].get("kickoff_template")
if tmpl:
preamble = _render_template(_resolve(pdir, tmpl).read_text(), fields)
roles_dir = cfg["loop"].get("roles_dir", "prompts")
role_prompt = (_resolve(pdir, roles_dir) / f"{agent['role']}.md").read_text()
return preamble + role_prompt
def agent_prompt(cfg, agent):
pdir = Path(cfg["project_dir"])
if agent["kind"] == "loop":
return build_loop_kickoff(cfg, agent)
if agent.get("prompt_file"):
return _resolve(pdir, agent["prompt_file"]).read_text()
return agent.get("prompt", "")
# ── resume id ───────────────────────────────────────────────────────────────────
def resume_id(cfg, agent):
f = Path(cfg["state_dir"]) / f"{agent['name']}.id"
if f.exists():
v = f.read_text().strip()
if v:
return v
return None
# ── agent launch ────────────────────────────────────────────────────────────────
def _expected_proc(backend):
"""The process name a healthy session should be running; used for backend-mismatch healing.
Only backends that declare process_name participate (so a generic exec backend, or the
transient login shell during startup, is never mistaken for a mismatch)."""
return backend.get("process_name")
def start_agent(cfg, agent, *, force=False):
session = agent["session"]
if session_alive(session):
if not force:
log(f"{session} already running — leaving it")
return
kill_session(session)
backend = backend_of(cfg, agent)
model = role_model(cfg, agent)
prompt = agent_prompt(cfg, agent)
log_path = str(_session_log_path(cfg, session))
kf = Path(cfg["state_dir"]) / f"kickoff-{session}.txt"
kf.write_text(prompt)
cwd = agent.get("dir") or cfg["project_dir"]
pid = cur_phase(cfg).get("id", "-") if agent["kind"] == "loop" else "-"
delivery = backend.get("prompt_delivery", "arg")
if delivery == "ping":
# TUI backend: launch, wait for it to connect, then type the prompt in.
model_env = (f"OPENCODE_CONFIG_CONTENT={shlex.quote(json.dumps({'model': model}))} "
if model and backend.get("model_env") else "")
preamble = backend.get("preamble", "")
sep = "; " if preamble else ""
attach = _render_template(backend.get("attach", "{bin}"),
{"bin": backend["bin"], "server": backend.get("server", ""),
"dir": shlex.quote(cwd)})
cmd = f"{preamble}{sep}{model_env}{attach}"
log(f"starting {session} ({agent['backend']}, kind={agent['kind']}, phase={pid}, "
f"model={model or 'default'})")
new_session(session, cwd, cmd, log_path)
time.sleep(int(backend.get("connect_delay", 12)))
boot = (f"Your full kickoff prompt is in {kf} — read it now with: "
f"`cat '{kf}'` — then follow it exactly.")
ping_session(session, boot, submit_key=backend.get("submit_key", "C-m"))
elif delivery == "exec":
# Generic backend: run an arbitrary command. {kickoff} = path to the prompt file,
# {session} = the tmux session name, {model} = resolved model.
cmd = _render_template(backend["bin"],
{"kickoff": str(kf), "session": session, "model": model})
log(f"starting {session} ({agent['backend']}, kind={agent['kind']}, phase={pid})")
new_session(session, cwd, cmd, log_path)
else: # "arg": prompt passed as a CLI argument (claude-style)
rid = resume_id(cfg, agent) if agent.get("resume") and backend.get("supports_resume") else None
parts = [backend["bin"]]
if rid:
parts.append(_render_template(backend.get("resume_flag", "--resume '{id}'"), {"id": rid}))
if backend.get("remote_control"):
parts.append(_render_template(backend.get("remote_control_flag",
"--remote-control '{session}'"), {"session": session}))
if model:
parts.append(_render_template(backend.get("model_flag", "--model '{model}'"), {"model": model}))
if backend.get("flags"):
parts.append(backend["flags"])
parts.append(f"\"$(cat '{kf}')\"")
cmd = " ".join(p for p in parts if p)
log(f"starting {session} ({agent['backend']}, kind={agent['kind']}, phase={pid}, "
f"model={model or 'default'}{', resume' if rid else ''})")
new_session(session, cwd, cmd, log_path)
def start_service(cfg, svc):
session = svc["session"]
if session_alive(session):
log(f"{session} already running — leaving it")
return
log(f"starting service {session}")
new_session(session, svc.get("dir", cfg["project_dir"]), svc["command"],
str(_session_log_path(cfg, session)))
# ── usage-limit state machine ────────────────────────────────────────────────────
RESET_RE = re.compile(r"resets?\s*(?:at\s*)?(\d{1,2})(?::(\d{2}))?\s*(am|pm)?", re.I)
def _limit_state_path(cfg, session): return Path(cfg["state_dir"]) / f"limited-{session}.json"
def _load_limit_state(cfg, session):
try: return json.loads(_limit_state_path(cfg, session).read_text())
except Exception: return None
def _save_limit_state(cfg, session, st): _limit_state_path(cfg, session).write_text(json.dumps(st))
def _clear_limit_state(cfg, session):
try: _limit_state_path(cfg, session).unlink()
except FileNotFoundError: pass
def _parse_reset_epoch(pane):
matches = list(RESET_RE.finditer(pane))
if not matches:
return None
m = matches[-1]
try:
hour, minute = int(m.group(1)), int(m.group(2) or 0)
ampm = (m.group(3) or "").lower()
if ampm == "pm" and hour != 12: hour += 12
elif ampm == "am" and hour == 12: hour = 0
if hour > 23 or minute > 59: return None
cand = datetime.now().replace(hour=hour, minute=minute, second=0, microsecond=0)
if cand.timestamp() <= time.time(): cand += timedelta(days=1)
return cand.timestamp()
except Exception:
return None
def _next_limit_until(cfg, pane, now):
fallback = int(cfg["watchdog"].get("limit_probe_fallback", 300))
slack = int(cfg["watchdog"].get("limit_reset_slack", 45))
parsed = _parse_reset_epoch(pane)
if parsed is not None and parsed - now <= 6 * 3600:
return parsed + slack, True
return now + fallback, False
def _limit_nudge_msg(kind):
if kind in ("persistent", "orchestrator"):
return ("watchdog probe: if the quota window has reset, RESUME now — re-check status "
"and continue from where you stopped.")
return ("watchdog probe: if the quota window has reset, RESUME your loop now — pull latest, "
"re-read your phase STATUS/REVIEW files, and continue; re-arm your loop pacing.")
def limit_tick(cfg, agent, pane):
"""True while the agent is inside a usage-limit window — callers suppress all healing."""
session = agent["session"]
backend = backend_of(cfg, agent)
limit_re = _re(backend, "limit_re")
submit = backend.get("submit_key", "Enter")
fallback = int(cfg["watchdog"].get("limit_probe_fallback", 300))
state = _load_limit_state(cfg, session)
limited_now = bool(limit_re and limit_re.search(pane))
if state is None:
if not limited_now or pane_active(cfg, agent, pane, use_log=False):
return False
now = time.time()
until, parsed = _next_limit_until(cfg, pane, now)
if parsed:
log(f"limit hit on {agent['name']} — banner says reset "
f"{datetime.fromtimestamp(until):%a %H:%M}; holding (no reboots)")
else:
log(f"limit hit on {agent['name']} — reset unparsable; flat "
f"{fallback//60}-min probe loop (no reboots)")
_save_limit_state(cfg, session, {"until": until, "nudges": 0})
return True
if pane_active(cfg, agent, pane, use_log=False) or not limited_now:
log(f"limit lifted on {agent['name']} — clearing limit state")
_clear_limit_state(cfg, session)
return False
now = time.time()
if now < state.get("until", 0):
return True
msg = _limit_nudge_msg(agent["kind"])
if msg[:28] in "\n".join(pane.splitlines()[-8:]):
return True
nudges = state.get("nudges", 0) + 1
log(f"limit probe #{nudges} on {agent['name']} — nudging to resume")
ping_session(session, msg, submit_key=submit)
time.sleep(3)
pane2 = capture_pane(session, 40)
if pane_active(cfg, agent, pane2, use_log=False) and not (limit_re and limit_re.search(pane2)):
log(f"limit lifted on {agent['name']} — probe resumed it")
_clear_limit_state(cfg, session)
return True
until, _ = _next_limit_until(cfg, pane2, now)
if nudges == 3:
log(f"WARNING: {agent['name']} still limited after {nudges} probes — flat probes; never rebooting")
_save_limit_state(cfg, session, {"until": until, "nudges": nudges})
return True
# ── stall detection ──────────────────────────────────────────────────────────────
_idle_since: dict[str, float] = {}
def _last_nonempty_line(text):
for line in reversed(text.splitlines()):
if line.strip():
return line.strip()
return ""
def _parse_waiting_until(cfg, agent, pane):
if backend_of(cfg, agent).get("footer_ui"):
line = _last_nonempty_line(pane)
if not line.startswith("WAITING-UNTIL:"):
return None
m = re.search(r"WAITING-UNTIL:\s*(\S+)", line)
else:
m = re.search(r"WAITING-UNTIL:\s*(\S+)", pane)
if not m:
return None
try:
return datetime.fromisoformat(m.group(1).replace("Z", "+00:00")).timestamp()
except Exception:
return None
def stall_check_one(cfg, agent):
session = agent["session"]
if not session_alive(session):
_idle_since[session] = 0.0
_clear_limit_state(cfg, session)
return
now = time.time()
pane = capture_pane(session, 40)
if limit_tick(cfg, agent, pane):
_idle_since[session] = 0.0
return
if pane_active(cfg, agent, pane):
_idle_since[session] = 0.0
return
since = _idle_since.get(session) or now
_idle_since[session] = since
idle = now - since
grace = int(cfg["watchdog"].get("stall_grace", 180))
until = _parse_waiting_until(cfg, agent, pane)
if until is not None:
if now <= until + grace:
return
reason = f"past its WAITING-UNTIL by {int(now-until)}s — self-wake did not fire"
else:
stall_idle = int(backend_of(cfg, agent).get("stall_idle", 300))
if idle < stall_idle:
return
reason = f"idle {int(idle)}s with no WAITING-UNTIL marker"
log(f"stall: {agent['name']} ({session}) {reason} — kill + reboot")
start_agent(cfg, agent, force=True)
_idle_since[session] = 0.0
# ── healing ──────────────────────────────────────────────────────────────────────
def backend_mismatch(cfg, agent):
expected = _expected_proc(backend_of(cfg, agent))
if not expected:
return False
cmd = session_command(agent["session"])
known = {_expected_proc(b) for b in cfg["backends"].values() if _expected_proc(b)}
# only a definite OTHER declared backend is a mismatch; a transient login shell during
# startup (not a known backend process) is not.
if cmd not in known:
return False
return cmd != expected
def heal_one(cfg, agent):
session = agent["session"]
backend = backend_of(cfg, agent)
if not session_alive(session):
log(f"{agent['name']} ({session}) gone — restarting")
start_agent(cfg, agent)
return
if backend_mismatch(cfg, agent):
log(f"{agent['name']} ({session}) is {session_command(session)!r}, expected "
f"{_expected_proc(backend)} — kill + restart")
start_agent(cfg, agent, force=True)
return
pane = capture_pane(session, 25)
if pane_active(cfg, agent, pane):
return
if limit_tick(cfg, agent, pane):
return
fatal = _re(backend, "fatal_re")
if fatal and fatal.search(pane):
log(f"FATAL session-state error on {agent['name']} ({session}) — kill + restart")
start_agent(cfg, agent, force=True)
# ── wake (persistent agents with a wake schedule) ────────────────────────────────
def wake_agent(cfg, agent):
"""Returns True when the wake landed (or is moot), False to retry next tick."""
wake = agent.get("wake")
if not wake:
return True
session = agent["session"]
if not session_alive(session):
return False
backend = backend_of(cfg, agent)
if pane_active(cfg, agent, capture_pane(session, 25)):
return False
pf = wake.get("prompt_file")
try:
msg = " ".join((_resolve(cfg["project_dir"], pf)).read_text().split())
except (FileNotFoundError, TypeError):
log(f"wake skipped for {agent['name']} — prompt file missing: {pf}")
return True
if not msg:
return True
log(f"waking {agent['name']} ({session}) for scheduled supervision pass")
ping_session(session, msg, submit_key=backend.get("submit_key", "Enter"))
return True
# ── handoff signalling (loop pair) ───────────────────────────────────────────────
_hand = {"sha": "", "adv_inbox": "", "builder_inbox": ""}
def handoff_reset():
_hand["sha"] = _hand["adv_inbox"] = _hand["builder_inbox"] = ""
def _git(repo, args):
return subprocess.run(f"git -C {repo!r} {args}", shell=True, capture_output=True, text=True)
def _show_pushed(cfg, repo, path):
sub = _state_subdir(cfg)
for loc in (f"origin/main:{sub}/{path}", f"origin/main:{path}"):
r = _git(repo, f"show {loc!r}")
if r.returncode == 0:
return r.stdout
return ""
def handoff_check(cfg):
h = cfg["loop"].get("handoff")
if not h:
return
repo = handoff_repo(cfg)
sub = lambda name: cfg["agents"].get(name, {}).get("session", cfg["session_prefix"] + name)
builder_name = h.get("review_pings", "builder")
submit = (backend_of(cfg, cfg["agents"][builder_name]).get("submit_key", "Enter")
if builder_name in cfg["agents"] else "Enter")
claim_pat = h.get("claim_pattern", "^claim")
review_pat = h.get("review_pattern", "^review")
_git(repo, "fetch -q origin")
head = _git(repo, "rev-parse origin/main").stdout.strip()
if head:
if not _hand["sha"]:
_hand["sha"] = head
elif head != _hand["sha"]:
subjects = _git(repo, f"log --format=%s {_hand['sha']}..origin/main").stdout
if re.search(claim_pat, subjects, re.M | re.I):
log("handoff: claim commit → pinging reviewer")
ping_session(sub(h.get("claim_pings", "adversary")),
"watchdog ping: the other loop pushed a gate CLAIM commit. "
"Pull and verify the claimed gate now.", submit_key=submit)
if re.search(review_pat, subjects, re.M | re.I):
log("handoff: review commit → pinging builder")
ping_session(sub(h.get("review_pings", "builder")),
"watchdog ping: the other loop pushed a verdict/finding commit. "
"Pull the review file and act.", submit_key=submit)
_hand["sha"] = head
inboxes = h.get("inboxes", [])
md5 = lambda s: hashlib.md5(s.encode()).hexdigest()
sub_dir = _state_subdir(cfg)
for fname, key, target in (
(inboxes[0] if len(inboxes) > 0 else None, "adv_inbox", h.get("claim_pings", "adversary")),
(inboxes[1] if len(inboxes) > 1 else None, "builder_inbox", h.get("review_pings", "builder")),
):
if not fname:
continue
content = _show_pushed(cfg, repo, fname)
if content:
hh = md5(content)
if hh != _hand[key]:
log(f"handoff: {fname} changed → pinging {target}")
ping_session(sub(target),
f"watchdog ping: the other loop pushed {sub_dir}/{fname} — pull, read it, "
f"act, then delete the file (commit + push) to mark it consumed.", submit_key=submit)
_hand[key] = hh
else:
_hand[key] = ""
# ── phase advance (loop machine) ─────────────────────────────────────────────────
def loop_agents(cfg):
return [a for a in cfg["agents"].values() if a["kind"] == "loop" and a.get("enabled", True)]
def stop_loops(cfg):
for a in loop_agents(cfg):
if session_alive(a["session"]):
log(f"killing {a['session']}")
kill_session(a["session"])
def start_loops(cfg):
for a in loop_agents(cfg):
start_agent(cfg, a)
def phase_advance_check(cfg):
"""On heavy tick: if the current phase is DONE, advance (or finish the sequence).
Returns True only when it actually transitions/completes THIS tick (caller skips healing
that one tick). Once the sequence is already marked complete it is idempotent (returns
False, no re-log, no re-stop). Appending a phase after completion clears the stale marker
and resumes the loops on the new phase."""
ps = phases(cfg)
if not ps or not cfg["loop"].get("auto_advance", True):
return False
marker = Path(cfg["log_dir"]) / "SEQUENCE-COMPLETE"
idx = cur_idx(cfg)
ph = ps[idx]
if not phase_done(cfg, ph["status"]):
return False
nxt = idx + 1
if nxt < len(ps):
log(f"PHASE {ph['id']} DONE — auto-transitioning to {ps[nxt]['id']}")
stop_loops(cfg)
Path(phase_idx_file(cfg)).write_text(str(nxt))
if marker.exists():
marker.unlink() # resuming into a (freshly-appended) phase — clear stale completion
handoff_reset()
start_loops(cfg)
return True
# last phase is DONE → sequence complete
if marker.exists():
return False # already handled — idempotent (no re-log, no re-stop)
log(f"PHASE SEQUENCE COMPLETE (last phase {ph['id']} DONE) — stopping loops")
stop_loops(cfg)
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
marker.write_text(f"phase sequence complete {ts}. Loops stopped; build finished.\n")
oc = cfg["loop"].get("on_complete")
if oc:
trig = Path(cfg["log_dir"]) / oc.get("trigger_file", ".run-on-complete")
if trig.exists():
trig.unlink()
runname = oc.get("run")
if runname and runname in cfg["agents"]:
log(f"on_complete: launching task agent {runname!r}")
start_agent(cfg, cfg["agents"][runname], force=True)
return True
# ── watchdog loop ────────────────────────────────────────────────────────────────
def watched(cfg):
return [a for a in cfg["agents"].values()
if a.get("enabled", True) and a.get("watch", "none") != "none"]
def watchdog_loop(cfg_path):
cfg = load_config(cfg_path)
sig = int(cfg["watchdog"].get("signal_interval", 30))
heavy = int(cfg["watchdog"].get("heavy_interval", 300))
ps = phases(cfg)
log(f"watchdog up — phase={cur_phase(cfg).get('id','-')} [{cur_idx(cfg)+1}/{len(ps)}] "
f"signal={sig}s heavy={heavy}s, watching: {[a['name'] for a in watched(cfg)]}")
elapsed = heavy # force a heavy check on first tick
wake_elapsed = {a["name"]: 0 for a in cfg["agents"].values() if a.get("wake")}
while True:
cfg = load_config(cfg_path) # re-read every tick: config is authoritative, no env drift
has_loops = bool(loop_agents(cfg))
seq_done = (Path(cfg["log_dir"]) / "SEQUENCE-COMPLETE").exists()
if has_loops:
handoff_check(cfg)
for a in watched(cfg):
if a["watch"] == "heal+stall":
stall_check_one(cfg, a)
else:
if session_alive(a["session"]):
limit_tick(cfg, a, capture_pane(a["session"], 40))
if not seq_done:
for name, el in list(wake_elapsed.items()):
interval = int(cfg["agents"][name]["wake"].get("interval", 3600))
if el >= interval:
if wake_agent(cfg, cfg["agents"][name]):
wake_elapsed[name] = 0
if elapsed >= heavy:
elapsed = 0
advanced = phase_advance_check(cfg) if has_loops else False
if not advanced:
for a in watched(cfg):
if seq_done and a["kind"] == "loop":
continue
heal_one(cfg, a)
time.sleep(sig)
elapsed += sig
for k in wake_elapsed:
wake_elapsed[k] += sig
# ── CLI commands ──────────────────────────────────────────────────────────────
def start_watchdog(cfg, cfg_path):
session = cfg["session_prefix"] + "watchdog"
if session_alive(session):
log("watchdog already running")
return
log("starting watchdog")
script = Path(__file__).resolve()
new_session(session, cfg["project_dir"],
f"exec >>'{cfg['log_dir']}/{session}.log' 2>&1; "
f"python3 '{script}' watchdog --config '{Path(cfg_path).resolve()}'",
str(_session_log_path(cfg, session)))
def cmd_up(cfg, cfg_path, names):
if cfg["loop"].get("resume_phase") is False and not Path(phase_idx_file(cfg)).exists():
Path(phase_idx_file(cfg)).write_text("0")
targets = ([cfg["agents"][n] for n in names if n in cfg["agents"]]
if names else
[a for a in cfg["agents"].values() if a.get("enabled", True)])
for a in targets:
start_agent(cfg, a)
if not names:
for s in cfg["services"].values():
start_service(cfg, s)
start_watchdog(cfg, cfg_path)
else:
for n in names:
if n in cfg["services"]:
start_service(cfg, cfg["services"][n])
if n == "watchdog":
start_watchdog(cfg, cfg_path)
def cmd_down(cfg, names):
sessions = []
if names:
for n in names:
if n in cfg["agents"]: sessions.append(cfg["agents"][n]["session"])
elif n in cfg["services"]: sessions.append(cfg["services"][n]["session"])
elif n == "watchdog": sessions.append(cfg["session_prefix"] + "watchdog")
else:
sessions = [a["session"] for a in cfg["agents"].values()]
sessions += [s["session"] for s in cfg["services"].values()]
sessions.append(cfg["session_prefix"] + "watchdog")
for s in sessions:
if session_alive(s):
log(f"killing {s}")
kill_session(s)
def cmd_status(cfg):
idx, ps = cur_idx(cfg), phases(cfg)
if ps:
ph = ps[idx]
done = "## DONE" if phase_done(cfg, ph["status"]) else "in progress"
print(f" phase: {ph['id']} [{idx+1}/{len(ps)}] plan={ph.get('plan','-')} ({done})")
print(f" {'AGENT':<14} {'KIND':<11} {'BACKEND':<9} {'MODEL':<20} {'WATCH':<10} STATE")
for a in cfg["agents"].values():
st = "RUNNING" if session_alive(a["session"]) else "stopped"
en = "" if a.get("enabled", True) else " (disabled)"
rc = session_command(a["session"]) if st == "RUNNING" else ""
print(f" {a['name']:<14} {a['kind']:<11} {a['backend']:<9} "
f"{role_model(cfg,a) or 'default':<20} {a.get('watch','none'):<10} {st}{en}"
+ (f" [{rc}]" if rc else ""))
for s in cfg["services"].values():
st = "RUNNING" if session_alive(s["session"]) else "stopped"
print(f" {s['name']:<14} {'service':<11} {'-':<9} {'-':<20} {'-':<10} {st}")
wd = cfg["session_prefix"] + "watchdog"
print(f" {'watchdog':<14} {'service':<11} {'-':<9} {'-':<20} {'-':<10} "
f"{'RUNNING' if session_alive(wd) else 'stopped'}")
def cmd_phase(cfg, args):
ps = phases(cfg)
if not ps:
print("no [loop].phases configured"); return
if not args or args[0] == "show":
idx = cur_idx(cfg)
print(f"phase {ps[idx]['id']} [{idx+1}/{len(ps)}] seq: {' '.join(p['id'] for p in ps)}")
return
if args[0] == "next":
Path(phase_idx_file(cfg)).write_text(str(min(cur_idx(cfg)+1, len(ps)-1)))
elif args[0] == "set" and len(args) > 1:
Path(phase_idx_file(cfg)).write_text(str(int(args[1])))
print(f"phase idx now {cur_idx(cfg)} ({cur_phase(cfg).get('id')})")
def cmd_selftest():
"""Self-contained regression checks for the footer-UI activity detector. Needs no config."""
backend = {
"active_re": "esc interrupt|thinking|inferring|running tool|preparing patch|reading|searching",
"footer_ui": True, "log_grace": 180,
}
cfg = {"backends": {"tui": backend}, "log_dir": "/tmp"}
a = {"name": "x", "backend": "tui", "session": "selftest-x", "kind": "loop"}
idle = "\n ▣ Build · GPT-5.4 · 2m 19s\n 178.4K (17%) ctrl+p commands\n"
active = "\n ~ Preparing patch...\n ⬝⬝⬝■■ esc interrupt 137.6K\n"
checks = [
("footer_ui idle footer is idle", not pane_active(cfg, a, idle, use_log=False)),
("footer_ui active footer is active", pane_active(cfg, a, active, use_log=False)),
("limit banner + idle footer is not active",
not pane_active(cfg, a, idle + "\nYou've hit your weekly limit · resets Jun 16, 10pm\n", use_log=False)),
]
bad = [n for n, ok in checks if not ok]
for n, ok in checks:
print(f" {'PASS' if ok else 'FAIL'}: {n}")
sys.exit(1 if bad else 0)
INIT_TOML = """\
# Starter agent-orchestrator config. See the README for the full schema.
[defaults]
session_prefix = "{prefix}-"
log_dir = ".ao-state"
backend = "claude"
model = "claude-sonnet-4-6"
watch = "heal"
[backend.claude]
bin = "claude"
flags = "--dangerously-skip-permissions"
prompt_delivery = "arg"
process_name = "claude"
submit_key = "Enter"
stall_idle = 300
active_re = "esc to interrupt|Running tool|\\\\u00b7 \\\\d+"
limit_re = "usage limit|limit reached|reached your .*limit"
[[agent]]
name = "worker"
kind = "persistent"
prompt = "You are a worker agent. Wait for instructions."
"""
def cmd_init(args):
target = Path(args[0]) if args else Path.cwd()
target.mkdir(parents=True, exist_ok=True)
cfg_file = target / "agents.toml"
if cfg_file.exists():
die(f"{cfg_file} already exists — refusing to overwrite")
cfg_file.write_text(INIT_TOML.format(prefix=target.resolve().name or "proj"))
(target / "prompts").mkdir(exist_ok=True)
log(f"scaffolded {cfg_file} and {target/'prompts'}/ — edit, then `agents.py up --config {cfg_file}`")
# ── main ──────────────────────────────────────────────────────────────────────
def main():
argv = sys.argv[1:]
if not argv or argv[0] in ("-h", "--help", "help"):
print(__doc__); return
cfg_path = _cfg_path(argv)
argv = [a for i, a in enumerate(argv)
if a != "--config" and (i == 0 or argv[i-1] != "--config")]
cmd = argv[0] if argv else "status"
rest = argv[1:]
if cmd == "selftest": cmd_selftest(); return
if cmd == "init": cmd_init(rest); return
cfg = load_config(cfg_path)
if cmd == "up": cmd_up(cfg, cfg_path, rest)
elif cmd == "down": cmd_down(cfg, rest)
elif cmd == "status": cmd_status(cfg)
elif cmd == "watchdog": watchdog_loop(cfg_path)
elif cmd == "phase": cmd_phase(cfg, rest)
elif cmd == "logs":
if not rest:
die("usage: agents.py logs <name>")
sess = cfg["agents"].get(rest[0], {}).get("session") or (cfg["session_prefix"] + rest[0])
os.execvp("tail", ["tail", "-f", str(_session_log_path(cfg, sess))])
else:
print(__doc__)
if __name__ == "__main__":
main()