Extracted and generalized from a project-specific agent launch engine. No project specifics remain in code: paths, the loop kickoff preamble, handoff conventions, and the on-complete hook are all config/template driven; session_prefix + log_dir are required. - agents.py: driver + watchdog (data-driven backends via prompt_delivery arg|ping|exec; required session_prefix/log_dir; project-rooted path resolution; configurable kickoff template, handoff patterns, on_complete task; tmux-safe; selftest + init verbs) - agent-log.py: config-driven claude transcript renderer - agents.example.toml: self-contained 2-agent example (dependency-free demo backend) - prompts/: generic builder/adversary/kickoff templates - smoke.sh: isolated up+down sandbox proof that cleans up after itself - flake.nix/.lock: devShell (python311 + tmux + git) - README.md: schema + verbs + AI-PO usage + nix Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
930 lines
40 KiB
Python
Executable File
930 lines
40 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""agent-orchestrator — one driver, one config (agents.toml) for a fleet of agents.
|
|
|
|
A generic, reusable harness for running and supervising AI-agent sessions in tmux. Every
|
|
agent — a Builder/Adversary loop pair, a persistent supervisor, a one-shot task — is declared
|
|
in a single TOML config; the watchdog reads the SAME file, so there is no env-vs-file drift.
|
|
Nothing about any particular project lives in this code: paths, the loop kickoff preamble, the
|
|
handoff conventions, and the on-complete hook are all supplied by the project's config.
|
|
|
|
Usage:
|
|
agents.py up [name...] start enabled agents (or just the named ones); use-or-create
|
|
agents.py down [name...] stop agents (or all)
|
|
agents.py status one table: every agent — kind, backend, model, session, phase
|
|
agents.py watchdog the supervisor loop (reads the config every tick)
|
|
agents.py logs <name> tail an agent's session log
|
|
agents.py phase [set N|next|show] inspect / move the loop phase
|
|
agents.py selftest backend activity-detector regression checks (no config needed)
|
|
agents.py init [dir] scaffold a starter agents.toml + prompts/ in a project dir
|
|
|
|
Options:
|
|
--config PATH config file (default: ./agents.toml, else <script dir>/agents.toml)
|
|
|
|
Config is authoritative. A one-off override env AGENT_MODEL_<name> / AGENT_BACKEND_<name>
|
|
affects a single invocation only; the persisted watchdog always re-reads the file.
|
|
"""
|
|
|
|
import hashlib, json, os, re, shlex, subprocess, sys, time, tomllib
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
SCRIPT_DIR = Path(__file__).resolve().parent
|
|
|
|
# ── config loading ──────────────────────────────────────────────────────────────
|
|
|
|
def _cfg_path(argv):
|
|
if "--config" in argv:
|
|
return Path(argv[argv.index("--config") + 1])
|
|
cwd_cfg = Path.cwd() / "agents.toml"
|
|
return cwd_cfg if cwd_cfg.exists() else SCRIPT_DIR / "agents.toml"
|
|
|
|
def _resolve(base, p):
|
|
"""Resolve a possibly-relative config path against the project root."""
|
|
pp = Path(os.path.expanduser(str(p)))
|
|
return pp if pp.is_absolute() else (Path(base) / pp)
|
|
|
|
def load_config(path):
|
|
path = Path(path)
|
|
with open(path, "rb") as f:
|
|
raw = tomllib.load(f)
|
|
defaults = raw.get("defaults", {})
|
|
# The project root: everything project-supplied (prompts, templates, relative paths) is
|
|
# resolved against it. Defaults to the directory holding the config file; override with
|
|
# defaults.project_dir (useful when the config lives in a sandbox but prompts live elsewhere).
|
|
project_dir = _resolve(path.resolve().parent, defaults.get("project_dir", ".")).resolve()
|
|
|
|
session_prefix = defaults.get("session_prefix")
|
|
if not session_prefix:
|
|
die("config error: [defaults].session_prefix is required (e.g. session_prefix = \"myproj-\")")
|
|
log_dir_raw = defaults.get("log_dir")
|
|
if not log_dir_raw:
|
|
die("config error: [defaults].log_dir is required")
|
|
|
|
cfg = {
|
|
"watchdog": raw.get("watchdog", {}),
|
|
"backends": raw.get("backend", {}),
|
|
"defaults": defaults,
|
|
"loop": raw.get("loop", {}),
|
|
"project_dir": str(project_dir),
|
|
"log_dir": str(_resolve(project_dir, log_dir_raw)),
|
|
"session_prefix": session_prefix,
|
|
}
|
|
agents = {}
|
|
for a in raw.get("agent", []):
|
|
m = {**defaults, **a}
|
|
m["session"] = a.get("session", session_prefix + a["name"])
|
|
m["kind"] = a.get("kind", "persistent")
|
|
m["dir"] = str(_resolve(project_dir, a.get("dir", defaults.get("dir", "."))))
|
|
# one-off env override (single invocation; watchdog ignores via fresh load each tick)
|
|
env_model = os.environ.get(f"AGENT_MODEL_{a['name']}")
|
|
env_backend = os.environ.get(f"AGENT_BACKEND_{a['name']}")
|
|
if env_model: m["model"] = env_model
|
|
if env_backend: m["backend"] = env_backend
|
|
agents[a["name"]] = m
|
|
cfg["agents"] = agents
|
|
cfg["services"] = {s["name"]: {**defaults, **s,
|
|
"session": session_prefix + s["name"],
|
|
"dir": str(_resolve(project_dir, s.get("dir", ".")))}
|
|
for s in raw.get("service", [])}
|
|
cfg["state_dir"] = os.path.join(cfg["log_dir"], "state")
|
|
Path(cfg["state_dir"]).mkdir(parents=True, exist_ok=True)
|
|
return cfg
|
|
|
|
def backend_of(cfg, agent):
|
|
b = cfg["backends"].get(agent["backend"])
|
|
if not b:
|
|
die(f"agent {agent['name']}: unknown backend {agent['backend']!r}")
|
|
return b
|
|
|
|
# ── logging ───────────────────────────────────────────────────────────────────
|
|
|
|
def log(msg):
|
|
print(f"[agents {datetime.now():%H:%M:%S}] {msg}", flush=True)
|
|
|
|
def die(msg):
|
|
log(f"ERROR: {msg}")
|
|
sys.exit(1)
|
|
|
|
# ── tmux helpers ────────────────────────────────────────────────────────────────
|
|
# ALWAYS target sessions with an exact-match "=" prefix. tmux does prefix/fnmatch on bare
|
|
# targets, so "-t myproj-assistant" would match "myproj-assistant3" — capturing or killing the
|
|
# wrong session. "=name" forces an exact match.
|
|
|
|
def _run(cmd):
|
|
"""subprocess.run wrapper that never raises if the binary (e.g. tmux) is absent."""
|
|
try:
|
|
return subprocess.run(cmd, capture_output=True, text=True)
|
|
except FileNotFoundError:
|
|
return subprocess.CompletedProcess(cmd, 127, "", "")
|
|
|
|
def TS(name): # exact target-SESSION (has-session, kill-session)
|
|
return "=" + name
|
|
|
|
def TP(name): # exact target-PANE: "=session:" anchors the exact session, current window/pane
|
|
return "=" + name + ":"
|
|
|
|
def session_alive(name):
|
|
return _run(["tmux", "has-session", "-t", TS(name)]).returncode == 0
|
|
|
|
def session_command(name):
|
|
r = _run(["tmux", "display-message", "-p", "-t", TP(name), "#{pane_current_command}"])
|
|
return r.stdout.strip() if r.returncode == 0 else ""
|
|
|
|
def kill_session(name):
|
|
_run(["tmux", "kill-session", "-t", TS(name)])
|
|
|
|
def capture_pane(name, lines=40):
|
|
r = _run(["tmux", "capture-pane", "-p", "-t", TP(name)])
|
|
return "\n".join(r.stdout.splitlines()[-lines:]) if r.returncode == 0 else ""
|
|
|
|
def pipe_to_log(session, log_path):
|
|
_run(["tmux", "pipe-pane", "-o", "-t", TP(session), f"cat >> '{log_path}'"])
|
|
|
|
def new_session(session, cwd, cmd, log_path):
|
|
Path(cwd).mkdir(parents=True, exist_ok=True)
|
|
_run(["tmux", "new-session", "-d", "-s", session, "-c", cwd, cmd])
|
|
pipe_to_log(session, log_path)
|
|
|
|
def ping_session(session, msg, submit_key="Enter"):
|
|
"""Type a message into a session and submit it; retry submit until the prefix clears."""
|
|
if not session_alive(session):
|
|
return
|
|
prefix = msg[:28]
|
|
_run(["tmux", "send-keys", "-t", TP(session), "-l", "--", msg])
|
|
time.sleep(0.5)
|
|
for _ in range(10):
|
|
_run(["tmux", "send-keys", "-t", TP(session), submit_key])
|
|
time.sleep(1)
|
|
if prefix not in capture_pane(session, 20):
|
|
return
|
|
|
|
# ── activity / limit / fatal detection (per-backend regexes from config) ─────────
|
|
|
|
def _re(backend, key):
|
|
pat = backend.get(key)
|
|
return re.compile(pat, re.I) if pat else None
|
|
|
|
def _session_log_path(cfg, session):
|
|
return Path(cfg["log_dir"]) / f"{session}.log"
|
|
|
|
def _log_recently_touched(cfg, session, age_seconds):
|
|
try:
|
|
return (time.time() - _session_log_path(cfg, session).stat().st_mtime) <= age_seconds
|
|
except FileNotFoundError:
|
|
return False
|
|
|
|
def pane_active(cfg, agent, pane, *, use_log=True):
|
|
"""True when the pane shows the agent is working. A footer_ui backend (a TUI with a static
|
|
footer that lingers after a turn) only counts the bottom rows as activity, and falls back to
|
|
a recently-touched session log within a grace window."""
|
|
backend = backend_of(cfg, agent)
|
|
active = _re(backend, "active_re")
|
|
if backend.get("footer_ui"):
|
|
bottom = "\n".join(pane.splitlines()[-10:])
|
|
hit = bool(active and active.search(bottom))
|
|
grace = int(backend.get("log_grace", 180))
|
|
return hit or (use_log and _log_recently_touched(cfg, agent["session"], grace))
|
|
return bool(active and active.search(pane))
|
|
|
|
# ── prompt assembly ───────────────────────────────────────────────────────────
|
|
|
|
DONE_PLACEHOLDER_RE = re.compile(
|
|
r"^\s*(not yet|not done|not complete|incomplete|pending\b|tbd\b|n/?a\b|"
|
|
r"written here only|only when|to be (written|filled)|when all|<.*>)", re.I)
|
|
|
|
def phases(cfg): return cfg["loop"].get("phases", [])
|
|
def phase_idx_file(cfg): return os.path.join(cfg["state_dir"], cfg["loop"].get("state_file", "phase-idx"))
|
|
|
|
def cur_idx(cfg):
|
|
try:
|
|
v = Path(phase_idx_file(cfg)).read_text().strip()
|
|
return int(v) if v.lstrip("-").isdigit() else 0
|
|
except FileNotFoundError:
|
|
return 0
|
|
|
|
def cur_phase(cfg):
|
|
ps = phases(cfg)
|
|
return ps[cur_idx(cfg)] if ps else {}
|
|
|
|
def _state_subdir(cfg):
|
|
return (cfg["loop"].get("handoff") or {}).get("state_subdir", "machine-docs")
|
|
|
|
def handoff_repo(cfg):
|
|
h = cfg["loop"].get("handoff") or {}
|
|
repo = h.get("repo")
|
|
return str(_resolve(cfg["project_dir"], repo)) if repo else cfg["project_dir"]
|
|
|
|
def resolve_state_file(cfg, repo_dir, basename):
|
|
sub = _state_subdir(cfg)
|
|
p = Path(repo_dir) / sub / basename
|
|
return p if p.exists() else Path(repo_dir) / basename
|
|
|
|
def phase_done(cfg, status_basename):
|
|
repo = handoff_repo(cfg)
|
|
try:
|
|
lines = resolve_state_file(cfg, repo, status_basename).read_text().splitlines()
|
|
except FileNotFoundError:
|
|
return False
|
|
marker = cfg["loop"].get("done_marker", "## DONE")
|
|
for i, line in enumerate(lines):
|
|
if not line.startswith(marker):
|
|
continue
|
|
body = next((nxt for nxt in lines[i+1:] if nxt.strip()), "")
|
|
if DONE_PLACEHOLDER_RE.match(body):
|
|
continue
|
|
return True
|
|
return False
|
|
|
|
def role_model(cfg, agent):
|
|
"""Per-phase override (phases[idx].models[role]) wins, else the agent's configured model."""
|
|
role = agent.get("role")
|
|
if role:
|
|
ov = (cur_phase(cfg).get("models") or {}).get(role)
|
|
if ov:
|
|
return ov
|
|
return agent.get("model", "")
|
|
|
|
def _render_template(text, fields):
|
|
for k, v in fields.items():
|
|
text = text.replace("{" + k + "}", str(v))
|
|
return text
|
|
|
|
def build_loop_kickoff(cfg, agent):
|
|
"""A loop agent's kickoff = the project's kickoff template (slots filled from the current
|
|
phase) followed by the role prompt prompts/<role>.md. Both files are project-supplied; this
|
|
code holds no project text."""
|
|
ph = cur_phase(cfg)
|
|
fields = {
|
|
"phase_id": ph.get("id", ""),
|
|
"plan": ph.get("plan", ""),
|
|
"status": ph.get("status", ""),
|
|
"role": agent.get("role", ""),
|
|
}
|
|
pdir = Path(cfg["project_dir"])
|
|
preamble = ""
|
|
tmpl = cfg["loop"].get("kickoff_template")
|
|
if tmpl:
|
|
preamble = _render_template(_resolve(pdir, tmpl).read_text(), fields)
|
|
roles_dir = cfg["loop"].get("roles_dir", "prompts")
|
|
role_prompt = (_resolve(pdir, roles_dir) / f"{agent['role']}.md").read_text()
|
|
return preamble + role_prompt
|
|
|
|
def agent_prompt(cfg, agent):
|
|
pdir = Path(cfg["project_dir"])
|
|
if agent["kind"] == "loop":
|
|
return build_loop_kickoff(cfg, agent)
|
|
if agent.get("prompt_file"):
|
|
return _resolve(pdir, agent["prompt_file"]).read_text()
|
|
return agent.get("prompt", "")
|
|
|
|
# ── resume id ───────────────────────────────────────────────────────────────────
|
|
|
|
def resume_id(cfg, agent):
|
|
f = Path(cfg["state_dir"]) / f"{agent['name']}.id"
|
|
if f.exists():
|
|
v = f.read_text().strip()
|
|
if v:
|
|
return v
|
|
return None
|
|
|
|
# ── agent launch ────────────────────────────────────────────────────────────────
|
|
|
|
def _expected_proc(backend):
|
|
"""The process name a healthy session should be running; used for backend-mismatch healing.
|
|
Only backends that declare process_name participate (so a generic exec backend, or the
|
|
transient login shell during startup, is never mistaken for a mismatch)."""
|
|
return backend.get("process_name")
|
|
|
|
def start_agent(cfg, agent, *, force=False):
|
|
session = agent["session"]
|
|
if session_alive(session):
|
|
if not force:
|
|
log(f"{session} already running — leaving it")
|
|
return
|
|
kill_session(session)
|
|
|
|
backend = backend_of(cfg, agent)
|
|
model = role_model(cfg, agent)
|
|
prompt = agent_prompt(cfg, agent)
|
|
log_path = str(_session_log_path(cfg, session))
|
|
kf = Path(cfg["state_dir"]) / f"kickoff-{session}.txt"
|
|
kf.write_text(prompt)
|
|
cwd = agent.get("dir") or cfg["project_dir"]
|
|
pid = cur_phase(cfg).get("id", "-") if agent["kind"] == "loop" else "-"
|
|
delivery = backend.get("prompt_delivery", "arg")
|
|
|
|
if delivery == "ping":
|
|
# TUI backend: launch, wait for it to connect, then type the prompt in.
|
|
model_env = (f"OPENCODE_CONFIG_CONTENT={shlex.quote(json.dumps({'model': model}))} "
|
|
if model and backend.get("model_env") else "")
|
|
preamble = backend.get("preamble", "")
|
|
sep = "; " if preamble else ""
|
|
attach = _render_template(backend.get("attach", "{bin}"),
|
|
{"bin": backend["bin"], "server": backend.get("server", ""),
|
|
"dir": shlex.quote(cwd)})
|
|
cmd = f"{preamble}{sep}{model_env}{attach}"
|
|
log(f"starting {session} ({agent['backend']}, kind={agent['kind']}, phase={pid}, "
|
|
f"model={model or 'default'})")
|
|
new_session(session, cwd, cmd, log_path)
|
|
time.sleep(int(backend.get("connect_delay", 12)))
|
|
boot = (f"Your full kickoff prompt is in {kf} — read it now with: "
|
|
f"`cat '{kf}'` — then follow it exactly.")
|
|
ping_session(session, boot, submit_key=backend.get("submit_key", "C-m"))
|
|
|
|
elif delivery == "exec":
|
|
# Generic backend: run an arbitrary command. {kickoff} = path to the prompt file,
|
|
# {session} = the tmux session name, {model} = resolved model.
|
|
cmd = _render_template(backend["bin"],
|
|
{"kickoff": str(kf), "session": session, "model": model})
|
|
log(f"starting {session} ({agent['backend']}, kind={agent['kind']}, phase={pid})")
|
|
new_session(session, cwd, cmd, log_path)
|
|
|
|
else: # "arg": prompt passed as a CLI argument (claude-style)
|
|
rid = resume_id(cfg, agent) if agent.get("resume") and backend.get("supports_resume") else None
|
|
parts = [backend["bin"]]
|
|
if rid:
|
|
parts.append(_render_template(backend.get("resume_flag", "--resume '{id}'"), {"id": rid}))
|
|
if backend.get("remote_control"):
|
|
parts.append(_render_template(backend.get("remote_control_flag",
|
|
"--remote-control '{session}'"), {"session": session}))
|
|
if model:
|
|
parts.append(_render_template(backend.get("model_flag", "--model '{model}'"), {"model": model}))
|
|
if backend.get("flags"):
|
|
parts.append(backend["flags"])
|
|
parts.append(f"\"$(cat '{kf}')\"")
|
|
cmd = " ".join(p for p in parts if p)
|
|
log(f"starting {session} ({agent['backend']}, kind={agent['kind']}, phase={pid}, "
|
|
f"model={model or 'default'}{', resume' if rid else ''})")
|
|
new_session(session, cwd, cmd, log_path)
|
|
|
|
def start_service(cfg, svc):
|
|
session = svc["session"]
|
|
if session_alive(session):
|
|
log(f"{session} already running — leaving it")
|
|
return
|
|
log(f"starting service {session}")
|
|
new_session(session, svc.get("dir", cfg["project_dir"]), svc["command"],
|
|
str(_session_log_path(cfg, session)))
|
|
|
|
# ── usage-limit state machine ────────────────────────────────────────────────────
|
|
|
|
RESET_RE = re.compile(r"resets?\s*(?:at\s*)?(\d{1,2})(?::(\d{2}))?\s*(am|pm)?", re.I)
|
|
|
|
def _limit_state_path(cfg, session): return Path(cfg["state_dir"]) / f"limited-{session}.json"
|
|
def _load_limit_state(cfg, session):
|
|
try: return json.loads(_limit_state_path(cfg, session).read_text())
|
|
except Exception: return None
|
|
def _save_limit_state(cfg, session, st): _limit_state_path(cfg, session).write_text(json.dumps(st))
|
|
def _clear_limit_state(cfg, session):
|
|
try: _limit_state_path(cfg, session).unlink()
|
|
except FileNotFoundError: pass
|
|
|
|
def _parse_reset_epoch(pane):
|
|
matches = list(RESET_RE.finditer(pane))
|
|
if not matches:
|
|
return None
|
|
m = matches[-1]
|
|
try:
|
|
hour, minute = int(m.group(1)), int(m.group(2) or 0)
|
|
ampm = (m.group(3) or "").lower()
|
|
if ampm == "pm" and hour != 12: hour += 12
|
|
elif ampm == "am" and hour == 12: hour = 0
|
|
if hour > 23 or minute > 59: return None
|
|
cand = datetime.now().replace(hour=hour, minute=minute, second=0, microsecond=0)
|
|
if cand.timestamp() <= time.time(): cand += timedelta(days=1)
|
|
return cand.timestamp()
|
|
except Exception:
|
|
return None
|
|
|
|
def _next_limit_until(cfg, pane, now):
|
|
fallback = int(cfg["watchdog"].get("limit_probe_fallback", 300))
|
|
slack = int(cfg["watchdog"].get("limit_reset_slack", 45))
|
|
parsed = _parse_reset_epoch(pane)
|
|
if parsed is not None and parsed - now <= 6 * 3600:
|
|
return parsed + slack, True
|
|
return now + fallback, False
|
|
|
|
def _limit_nudge_msg(kind):
|
|
if kind in ("persistent", "orchestrator"):
|
|
return ("watchdog probe: if the quota window has reset, RESUME now — re-check status "
|
|
"and continue from where you stopped.")
|
|
return ("watchdog probe: if the quota window has reset, RESUME your loop now — pull latest, "
|
|
"re-read your phase STATUS/REVIEW files, and continue; re-arm your loop pacing.")
|
|
|
|
def limit_tick(cfg, agent, pane):
|
|
"""True while the agent is inside a usage-limit window — callers suppress all healing."""
|
|
session = agent["session"]
|
|
backend = backend_of(cfg, agent)
|
|
limit_re = _re(backend, "limit_re")
|
|
submit = backend.get("submit_key", "Enter")
|
|
fallback = int(cfg["watchdog"].get("limit_probe_fallback", 300))
|
|
state = _load_limit_state(cfg, session)
|
|
limited_now = bool(limit_re and limit_re.search(pane))
|
|
|
|
if state is None:
|
|
if not limited_now or pane_active(cfg, agent, pane, use_log=False):
|
|
return False
|
|
now = time.time()
|
|
until, parsed = _next_limit_until(cfg, pane, now)
|
|
if parsed:
|
|
log(f"limit hit on {agent['name']} — banner says reset "
|
|
f"{datetime.fromtimestamp(until):%a %H:%M}; holding (no reboots)")
|
|
else:
|
|
log(f"limit hit on {agent['name']} — reset unparsable; flat "
|
|
f"{fallback//60}-min probe loop (no reboots)")
|
|
_save_limit_state(cfg, session, {"until": until, "nudges": 0})
|
|
return True
|
|
|
|
if pane_active(cfg, agent, pane, use_log=False) or not limited_now:
|
|
log(f"limit lifted on {agent['name']} — clearing limit state")
|
|
_clear_limit_state(cfg, session)
|
|
return False
|
|
|
|
now = time.time()
|
|
if now < state.get("until", 0):
|
|
return True
|
|
|
|
msg = _limit_nudge_msg(agent["kind"])
|
|
if msg[:28] in "\n".join(pane.splitlines()[-8:]):
|
|
return True
|
|
nudges = state.get("nudges", 0) + 1
|
|
log(f"limit probe #{nudges} on {agent['name']} — nudging to resume")
|
|
ping_session(session, msg, submit_key=submit)
|
|
time.sleep(3)
|
|
pane2 = capture_pane(session, 40)
|
|
if pane_active(cfg, agent, pane2, use_log=False) and not (limit_re and limit_re.search(pane2)):
|
|
log(f"limit lifted on {agent['name']} — probe resumed it")
|
|
_clear_limit_state(cfg, session)
|
|
return True
|
|
until, _ = _next_limit_until(cfg, pane2, now)
|
|
if nudges == 3:
|
|
log(f"WARNING: {agent['name']} still limited after {nudges} probes — flat probes; never rebooting")
|
|
_save_limit_state(cfg, session, {"until": until, "nudges": nudges})
|
|
return True
|
|
|
|
# ── stall detection ──────────────────────────────────────────────────────────────
|
|
|
|
_idle_since: dict[str, float] = {}
|
|
|
|
def _last_nonempty_line(text):
|
|
for line in reversed(text.splitlines()):
|
|
if line.strip():
|
|
return line.strip()
|
|
return ""
|
|
|
|
def _parse_waiting_until(cfg, agent, pane):
|
|
if backend_of(cfg, agent).get("footer_ui"):
|
|
line = _last_nonempty_line(pane)
|
|
if not line.startswith("WAITING-UNTIL:"):
|
|
return None
|
|
m = re.search(r"WAITING-UNTIL:\s*(\S+)", line)
|
|
else:
|
|
m = re.search(r"WAITING-UNTIL:\s*(\S+)", pane)
|
|
if not m:
|
|
return None
|
|
try:
|
|
return datetime.fromisoformat(m.group(1).replace("Z", "+00:00")).timestamp()
|
|
except Exception:
|
|
return None
|
|
|
|
def stall_check_one(cfg, agent):
|
|
session = agent["session"]
|
|
if not session_alive(session):
|
|
_idle_since[session] = 0.0
|
|
_clear_limit_state(cfg, session)
|
|
return
|
|
now = time.time()
|
|
pane = capture_pane(session, 40)
|
|
if limit_tick(cfg, agent, pane):
|
|
_idle_since[session] = 0.0
|
|
return
|
|
if pane_active(cfg, agent, pane):
|
|
_idle_since[session] = 0.0
|
|
return
|
|
since = _idle_since.get(session) or now
|
|
_idle_since[session] = since
|
|
idle = now - since
|
|
grace = int(cfg["watchdog"].get("stall_grace", 180))
|
|
until = _parse_waiting_until(cfg, agent, pane)
|
|
if until is not None:
|
|
if now <= until + grace:
|
|
return
|
|
reason = f"past its WAITING-UNTIL by {int(now-until)}s — self-wake did not fire"
|
|
else:
|
|
stall_idle = int(backend_of(cfg, agent).get("stall_idle", 300))
|
|
if idle < stall_idle:
|
|
return
|
|
reason = f"idle {int(idle)}s with no WAITING-UNTIL marker"
|
|
log(f"stall: {agent['name']} ({session}) {reason} — kill + reboot")
|
|
start_agent(cfg, agent, force=True)
|
|
_idle_since[session] = 0.0
|
|
|
|
# ── healing ──────────────────────────────────────────────────────────────────────
|
|
|
|
def backend_mismatch(cfg, agent):
|
|
expected = _expected_proc(backend_of(cfg, agent))
|
|
if not expected:
|
|
return False
|
|
cmd = session_command(agent["session"])
|
|
known = {_expected_proc(b) for b in cfg["backends"].values() if _expected_proc(b)}
|
|
# only a definite OTHER declared backend is a mismatch; a transient login shell during
|
|
# startup (not a known backend process) is not.
|
|
if cmd not in known:
|
|
return False
|
|
return cmd != expected
|
|
|
|
def heal_one(cfg, agent):
|
|
session = agent["session"]
|
|
backend = backend_of(cfg, agent)
|
|
if not session_alive(session):
|
|
log(f"{agent['name']} ({session}) gone — restarting")
|
|
start_agent(cfg, agent)
|
|
return
|
|
if backend_mismatch(cfg, agent):
|
|
log(f"{agent['name']} ({session}) is {session_command(session)!r}, expected "
|
|
f"{_expected_proc(backend)} — kill + restart")
|
|
start_agent(cfg, agent, force=True)
|
|
return
|
|
pane = capture_pane(session, 25)
|
|
if pane_active(cfg, agent, pane):
|
|
return
|
|
if limit_tick(cfg, agent, pane):
|
|
return
|
|
fatal = _re(backend, "fatal_re")
|
|
if fatal and fatal.search(pane):
|
|
log(f"FATAL session-state error on {agent['name']} ({session}) — kill + restart")
|
|
start_agent(cfg, agent, force=True)
|
|
|
|
# ── wake (persistent agents with a wake schedule) ────────────────────────────────
|
|
|
|
def wake_agent(cfg, agent):
|
|
"""Returns True when the wake landed (or is moot), False to retry next tick."""
|
|
wake = agent.get("wake")
|
|
if not wake:
|
|
return True
|
|
session = agent["session"]
|
|
if not session_alive(session):
|
|
return False
|
|
backend = backend_of(cfg, agent)
|
|
if pane_active(cfg, agent, capture_pane(session, 25)):
|
|
return False
|
|
pf = wake.get("prompt_file")
|
|
try:
|
|
msg = " ".join((_resolve(cfg["project_dir"], pf)).read_text().split())
|
|
except (FileNotFoundError, TypeError):
|
|
log(f"wake skipped for {agent['name']} — prompt file missing: {pf}")
|
|
return True
|
|
if not msg:
|
|
return True
|
|
log(f"waking {agent['name']} ({session}) for scheduled supervision pass")
|
|
ping_session(session, msg, submit_key=backend.get("submit_key", "Enter"))
|
|
return True
|
|
|
|
# ── handoff signalling (loop pair) ───────────────────────────────────────────────
|
|
|
|
_hand = {"sha": "", "adv_inbox": "", "builder_inbox": ""}
|
|
|
|
def handoff_reset():
|
|
_hand["sha"] = _hand["adv_inbox"] = _hand["builder_inbox"] = ""
|
|
|
|
def _git(repo, args):
|
|
return subprocess.run(f"git -C {repo!r} {args}", shell=True, capture_output=True, text=True)
|
|
|
|
def _show_pushed(cfg, repo, path):
|
|
sub = _state_subdir(cfg)
|
|
for loc in (f"origin/main:{sub}/{path}", f"origin/main:{path}"):
|
|
r = _git(repo, f"show {loc!r}")
|
|
if r.returncode == 0:
|
|
return r.stdout
|
|
return ""
|
|
|
|
def handoff_check(cfg):
|
|
h = cfg["loop"].get("handoff")
|
|
if not h:
|
|
return
|
|
repo = handoff_repo(cfg)
|
|
sub = lambda name: cfg["agents"].get(name, {}).get("session", cfg["session_prefix"] + name)
|
|
builder_name = h.get("review_pings", "builder")
|
|
submit = (backend_of(cfg, cfg["agents"][builder_name]).get("submit_key", "Enter")
|
|
if builder_name in cfg["agents"] else "Enter")
|
|
claim_pat = h.get("claim_pattern", "^claim")
|
|
review_pat = h.get("review_pattern", "^review")
|
|
_git(repo, "fetch -q origin")
|
|
head = _git(repo, "rev-parse origin/main").stdout.strip()
|
|
if head:
|
|
if not _hand["sha"]:
|
|
_hand["sha"] = head
|
|
elif head != _hand["sha"]:
|
|
subjects = _git(repo, f"log --format=%s {_hand['sha']}..origin/main").stdout
|
|
if re.search(claim_pat, subjects, re.M | re.I):
|
|
log("handoff: claim commit → pinging reviewer")
|
|
ping_session(sub(h.get("claim_pings", "adversary")),
|
|
"watchdog ping: the other loop pushed a gate CLAIM commit. "
|
|
"Pull and verify the claimed gate now.", submit_key=submit)
|
|
if re.search(review_pat, subjects, re.M | re.I):
|
|
log("handoff: review commit → pinging builder")
|
|
ping_session(sub(h.get("review_pings", "builder")),
|
|
"watchdog ping: the other loop pushed a verdict/finding commit. "
|
|
"Pull the review file and act.", submit_key=submit)
|
|
_hand["sha"] = head
|
|
inboxes = h.get("inboxes", [])
|
|
md5 = lambda s: hashlib.md5(s.encode()).hexdigest()
|
|
sub_dir = _state_subdir(cfg)
|
|
for fname, key, target in (
|
|
(inboxes[0] if len(inboxes) > 0 else None, "adv_inbox", h.get("claim_pings", "adversary")),
|
|
(inboxes[1] if len(inboxes) > 1 else None, "builder_inbox", h.get("review_pings", "builder")),
|
|
):
|
|
if not fname:
|
|
continue
|
|
content = _show_pushed(cfg, repo, fname)
|
|
if content:
|
|
hh = md5(content)
|
|
if hh != _hand[key]:
|
|
log(f"handoff: {fname} changed → pinging {target}")
|
|
ping_session(sub(target),
|
|
f"watchdog ping: the other loop pushed {sub_dir}/{fname} — pull, read it, "
|
|
f"act, then delete the file (commit + push) to mark it consumed.", submit_key=submit)
|
|
_hand[key] = hh
|
|
else:
|
|
_hand[key] = ""
|
|
|
|
# ── phase advance (loop machine) ─────────────────────────────────────────────────
|
|
|
|
def loop_agents(cfg):
|
|
return [a for a in cfg["agents"].values() if a["kind"] == "loop" and a.get("enabled", True)]
|
|
|
|
def stop_loops(cfg):
|
|
for a in loop_agents(cfg):
|
|
if session_alive(a["session"]):
|
|
log(f"killing {a['session']}")
|
|
kill_session(a["session"])
|
|
|
|
def start_loops(cfg):
|
|
for a in loop_agents(cfg):
|
|
start_agent(cfg, a)
|
|
|
|
def phase_advance_check(cfg):
|
|
"""On heavy tick: if the current phase is DONE, advance (or finish the sequence).
|
|
|
|
Returns True only when it actually transitions/completes THIS tick (caller skips healing
|
|
that one tick). Once the sequence is already marked complete it is idempotent (returns
|
|
False, no re-log, no re-stop). Appending a phase after completion clears the stale marker
|
|
and resumes the loops on the new phase."""
|
|
ps = phases(cfg)
|
|
if not ps or not cfg["loop"].get("auto_advance", True):
|
|
return False
|
|
marker = Path(cfg["log_dir"]) / "SEQUENCE-COMPLETE"
|
|
idx = cur_idx(cfg)
|
|
ph = ps[idx]
|
|
if not phase_done(cfg, ph["status"]):
|
|
return False
|
|
nxt = idx + 1
|
|
if nxt < len(ps):
|
|
log(f"PHASE {ph['id']} DONE — auto-transitioning to {ps[nxt]['id']}")
|
|
stop_loops(cfg)
|
|
Path(phase_idx_file(cfg)).write_text(str(nxt))
|
|
if marker.exists():
|
|
marker.unlink() # resuming into a (freshly-appended) phase — clear stale completion
|
|
handoff_reset()
|
|
start_loops(cfg)
|
|
return True
|
|
# last phase is DONE → sequence complete
|
|
if marker.exists():
|
|
return False # already handled — idempotent (no re-log, no re-stop)
|
|
log(f"PHASE SEQUENCE COMPLETE (last phase {ph['id']} DONE) — stopping loops")
|
|
stop_loops(cfg)
|
|
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
marker.write_text(f"phase sequence complete {ts}. Loops stopped; build finished.\n")
|
|
oc = cfg["loop"].get("on_complete")
|
|
if oc:
|
|
trig = Path(cfg["log_dir"]) / oc.get("trigger_file", ".run-on-complete")
|
|
if trig.exists():
|
|
trig.unlink()
|
|
runname = oc.get("run")
|
|
if runname and runname in cfg["agents"]:
|
|
log(f"on_complete: launching task agent {runname!r}")
|
|
start_agent(cfg, cfg["agents"][runname], force=True)
|
|
return True
|
|
|
|
# ── watchdog loop ────────────────────────────────────────────────────────────────
|
|
|
|
def watched(cfg):
|
|
return [a for a in cfg["agents"].values()
|
|
if a.get("enabled", True) and a.get("watch", "none") != "none"]
|
|
|
|
def watchdog_loop(cfg_path):
|
|
cfg = load_config(cfg_path)
|
|
sig = int(cfg["watchdog"].get("signal_interval", 30))
|
|
heavy = int(cfg["watchdog"].get("heavy_interval", 300))
|
|
ps = phases(cfg)
|
|
log(f"watchdog up — phase={cur_phase(cfg).get('id','-')} [{cur_idx(cfg)+1}/{len(ps)}] "
|
|
f"signal={sig}s heavy={heavy}s, watching: {[a['name'] for a in watched(cfg)]}")
|
|
elapsed = heavy # force a heavy check on first tick
|
|
wake_elapsed = {a["name"]: 0 for a in cfg["agents"].values() if a.get("wake")}
|
|
while True:
|
|
cfg = load_config(cfg_path) # re-read every tick: config is authoritative, no env drift
|
|
has_loops = bool(loop_agents(cfg))
|
|
seq_done = (Path(cfg["log_dir"]) / "SEQUENCE-COMPLETE").exists()
|
|
|
|
if has_loops:
|
|
handoff_check(cfg)
|
|
for a in watched(cfg):
|
|
if a["watch"] == "heal+stall":
|
|
stall_check_one(cfg, a)
|
|
else:
|
|
if session_alive(a["session"]):
|
|
limit_tick(cfg, a, capture_pane(a["session"], 40))
|
|
|
|
if not seq_done:
|
|
for name, el in list(wake_elapsed.items()):
|
|
interval = int(cfg["agents"][name]["wake"].get("interval", 3600))
|
|
if el >= interval:
|
|
if wake_agent(cfg, cfg["agents"][name]):
|
|
wake_elapsed[name] = 0
|
|
|
|
if elapsed >= heavy:
|
|
elapsed = 0
|
|
advanced = phase_advance_check(cfg) if has_loops else False
|
|
if not advanced:
|
|
for a in watched(cfg):
|
|
if seq_done and a["kind"] == "loop":
|
|
continue
|
|
heal_one(cfg, a)
|
|
|
|
time.sleep(sig)
|
|
elapsed += sig
|
|
for k in wake_elapsed:
|
|
wake_elapsed[k] += sig
|
|
|
|
# ── CLI commands ──────────────────────────────────────────────────────────────
|
|
|
|
def start_watchdog(cfg, cfg_path):
|
|
session = cfg["session_prefix"] + "watchdog"
|
|
if session_alive(session):
|
|
log("watchdog already running")
|
|
return
|
|
log("starting watchdog")
|
|
script = Path(__file__).resolve()
|
|
new_session(session, cfg["project_dir"],
|
|
f"exec >>'{cfg['log_dir']}/{session}.log' 2>&1; "
|
|
f"python3 '{script}' watchdog --config '{Path(cfg_path).resolve()}'",
|
|
str(_session_log_path(cfg, session)))
|
|
|
|
def cmd_up(cfg, cfg_path, names):
|
|
if cfg["loop"].get("resume_phase") is False and not Path(phase_idx_file(cfg)).exists():
|
|
Path(phase_idx_file(cfg)).write_text("0")
|
|
targets = ([cfg["agents"][n] for n in names if n in cfg["agents"]]
|
|
if names else
|
|
[a for a in cfg["agents"].values() if a.get("enabled", True)])
|
|
for a in targets:
|
|
start_agent(cfg, a)
|
|
if not names:
|
|
for s in cfg["services"].values():
|
|
start_service(cfg, s)
|
|
start_watchdog(cfg, cfg_path)
|
|
else:
|
|
for n in names:
|
|
if n in cfg["services"]:
|
|
start_service(cfg, cfg["services"][n])
|
|
if n == "watchdog":
|
|
start_watchdog(cfg, cfg_path)
|
|
|
|
def cmd_down(cfg, names):
|
|
sessions = []
|
|
if names:
|
|
for n in names:
|
|
if n in cfg["agents"]: sessions.append(cfg["agents"][n]["session"])
|
|
elif n in cfg["services"]: sessions.append(cfg["services"][n]["session"])
|
|
elif n == "watchdog": sessions.append(cfg["session_prefix"] + "watchdog")
|
|
else:
|
|
sessions = [a["session"] for a in cfg["agents"].values()]
|
|
sessions += [s["session"] for s in cfg["services"].values()]
|
|
sessions.append(cfg["session_prefix"] + "watchdog")
|
|
for s in sessions:
|
|
if session_alive(s):
|
|
log(f"killing {s}")
|
|
kill_session(s)
|
|
|
|
def cmd_status(cfg):
|
|
idx, ps = cur_idx(cfg), phases(cfg)
|
|
if ps:
|
|
ph = ps[idx]
|
|
done = "## DONE" if phase_done(cfg, ph["status"]) else "in progress"
|
|
print(f" phase: {ph['id']} [{idx+1}/{len(ps)}] plan={ph.get('plan','-')} ({done})")
|
|
print(f" {'AGENT':<14} {'KIND':<11} {'BACKEND':<9} {'MODEL':<20} {'WATCH':<10} STATE")
|
|
for a in cfg["agents"].values():
|
|
st = "RUNNING" if session_alive(a["session"]) else "stopped"
|
|
en = "" if a.get("enabled", True) else " (disabled)"
|
|
rc = session_command(a["session"]) if st == "RUNNING" else ""
|
|
print(f" {a['name']:<14} {a['kind']:<11} {a['backend']:<9} "
|
|
f"{role_model(cfg,a) or 'default':<20} {a.get('watch','none'):<10} {st}{en}"
|
|
+ (f" [{rc}]" if rc else ""))
|
|
for s in cfg["services"].values():
|
|
st = "RUNNING" if session_alive(s["session"]) else "stopped"
|
|
print(f" {s['name']:<14} {'service':<11} {'-':<9} {'-':<20} {'-':<10} {st}")
|
|
wd = cfg["session_prefix"] + "watchdog"
|
|
print(f" {'watchdog':<14} {'service':<11} {'-':<9} {'-':<20} {'-':<10} "
|
|
f"{'RUNNING' if session_alive(wd) else 'stopped'}")
|
|
|
|
def cmd_phase(cfg, args):
|
|
ps = phases(cfg)
|
|
if not ps:
|
|
print("no [loop].phases configured"); return
|
|
if not args or args[0] == "show":
|
|
idx = cur_idx(cfg)
|
|
print(f"phase {ps[idx]['id']} [{idx+1}/{len(ps)}] seq: {' '.join(p['id'] for p in ps)}")
|
|
return
|
|
if args[0] == "next":
|
|
Path(phase_idx_file(cfg)).write_text(str(min(cur_idx(cfg)+1, len(ps)-1)))
|
|
elif args[0] == "set" and len(args) > 1:
|
|
Path(phase_idx_file(cfg)).write_text(str(int(args[1])))
|
|
print(f"phase idx now {cur_idx(cfg)} ({cur_phase(cfg).get('id')})")
|
|
|
|
def cmd_selftest():
|
|
"""Self-contained regression checks for the footer-UI activity detector. Needs no config."""
|
|
backend = {
|
|
"active_re": "esc interrupt|thinking|inferring|running tool|preparing patch|reading|searching",
|
|
"footer_ui": True, "log_grace": 180,
|
|
}
|
|
cfg = {"backends": {"tui": backend}, "log_dir": "/tmp"}
|
|
a = {"name": "x", "backend": "tui", "session": "selftest-x", "kind": "loop"}
|
|
idle = "\n ▣ Build · GPT-5.4 · 2m 19s\n 178.4K (17%) ctrl+p commands\n"
|
|
active = "\n ~ Preparing patch...\n ⬝⬝⬝■■ esc interrupt 137.6K\n"
|
|
checks = [
|
|
("footer_ui idle footer is idle", not pane_active(cfg, a, idle, use_log=False)),
|
|
("footer_ui active footer is active", pane_active(cfg, a, active, use_log=False)),
|
|
("limit banner + idle footer is not active",
|
|
not pane_active(cfg, a, idle + "\nYou've hit your weekly limit · resets Jun 16, 10pm\n", use_log=False)),
|
|
]
|
|
bad = [n for n, ok in checks if not ok]
|
|
for n, ok in checks:
|
|
print(f" {'PASS' if ok else 'FAIL'}: {n}")
|
|
sys.exit(1 if bad else 0)
|
|
|
|
INIT_TOML = """\
|
|
# Starter agent-orchestrator config. See the README for the full schema.
|
|
[defaults]
|
|
session_prefix = "{prefix}-"
|
|
log_dir = ".ao-state"
|
|
backend = "claude"
|
|
model = "claude-sonnet-4-6"
|
|
watch = "heal"
|
|
|
|
[backend.claude]
|
|
bin = "claude"
|
|
flags = "--dangerously-skip-permissions"
|
|
prompt_delivery = "arg"
|
|
process_name = "claude"
|
|
submit_key = "Enter"
|
|
stall_idle = 300
|
|
active_re = "esc to interrupt|Running tool|\\\\u00b7 \\\\d+"
|
|
limit_re = "usage limit|limit reached|reached your .*limit"
|
|
|
|
[[agent]]
|
|
name = "worker"
|
|
kind = "persistent"
|
|
prompt = "You are a worker agent. Wait for instructions."
|
|
"""
|
|
|
|
def cmd_init(args):
|
|
target = Path(args[0]) if args else Path.cwd()
|
|
target.mkdir(parents=True, exist_ok=True)
|
|
cfg_file = target / "agents.toml"
|
|
if cfg_file.exists():
|
|
die(f"{cfg_file} already exists — refusing to overwrite")
|
|
cfg_file.write_text(INIT_TOML.format(prefix=target.resolve().name or "proj"))
|
|
(target / "prompts").mkdir(exist_ok=True)
|
|
log(f"scaffolded {cfg_file} and {target/'prompts'}/ — edit, then `agents.py up --config {cfg_file}`")
|
|
|
|
# ── main ──────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
argv = sys.argv[1:]
|
|
if not argv or argv[0] in ("-h", "--help", "help"):
|
|
print(__doc__); return
|
|
cfg_path = _cfg_path(argv)
|
|
argv = [a for i, a in enumerate(argv)
|
|
if a != "--config" and (i == 0 or argv[i-1] != "--config")]
|
|
cmd = argv[0] if argv else "status"
|
|
rest = argv[1:]
|
|
|
|
if cmd == "selftest": cmd_selftest(); return
|
|
if cmd == "init": cmd_init(rest); return
|
|
|
|
cfg = load_config(cfg_path)
|
|
if cmd == "up": cmd_up(cfg, cfg_path, rest)
|
|
elif cmd == "down": cmd_down(cfg, rest)
|
|
elif cmd == "status": cmd_status(cfg)
|
|
elif cmd == "watchdog": watchdog_loop(cfg_path)
|
|
elif cmd == "phase": cmd_phase(cfg, rest)
|
|
elif cmd == "logs":
|
|
if not rest:
|
|
die("usage: agents.py logs <name>")
|
|
sess = cfg["agents"].get(rest[0], {}).get("session") or (cfg["session_prefix"] + rest[0])
|
|
os.execvp("tail", ["tail", "-f", str(_session_log_path(cfg, sess))])
|
|
else:
|
|
print(__doc__)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|