Files
cc-ci-orchestrator/cc-ci-plan/agents.py
autonomic-bot 97303abc25 watchdog: suppress scheduled wakes once the build sequence is complete
The unified agents.py watchdog kept firing the hourly orchestrator supervision
ping even after SEQUENCE-COMPLETE (the old launch.py watchdog exited on
completion, which stopped them). Gate the wake loop on the SEQUENCE-COMPLETE
marker so a finished build stays fully at rest — no pings. Resumes
automatically when new work is queued (that clears the marker, line 631).
2026-06-13 12:04:49 +00:00

851 lines
38 KiB
Python

#!/usr/bin/env python3
"""
cc-ci unified agent driver — one script, one config (agents.toml).
Replaces the five bespoke launchers (launch.py / launch-orchestrator.py / launch-assistant.py
/ launch-upgrader.py / launch-report.py) and the ~15 scattered dotfiles. Every agent — loop
pair, orchestrator, assistant, one-shot tasks — is declared in agents.toml; the watchdog reads
the SAME file, so there is no env-vs-file drift.
Usage:
agents.py up [name…] start enabled agents (or just the named ones); use-or-create
agents.py down [name…] stop agents (or all)
agents.py status one table: every agent — kind, backend, model, session, phase
agents.py watchdog the supervisor loop (reads agents.toml)
agents.py logs <name> tail an agent's session log
agents.py phase [set N|next|show] inspect / move the loop phase
agents.py selftest backend activity-detector regression checks
agents.py migrate seed state/ from the legacy dotfiles (resume ids, phase idx)
Options:
--config PATH config file (default: <script dir>/agents.toml)
Config is authoritative. A one-off override env AGENT_MODEL_<name> / AGENT_BACKEND_<name>
affects a single invocation only; the persisted watchdog always re-reads the file.
"""
import hashlib, json, os, re, shlex, subprocess, sys, time, tomllib
from datetime import datetime, timedelta
from pathlib import Path
SCRIPT_DIR = Path(__file__).resolve().parent
# ── config loading ──────────────────────────────────────────────────────────────
def _cfg_path(argv):
if "--config" in argv:
return Path(argv[argv.index("--config") + 1])
return SCRIPT_DIR / "agents.toml"
def load_config(path):
with open(path, "rb") as f:
raw = tomllib.load(f)
defaults = raw.get("defaults", {})
cfg = {
"watchdog": raw.get("watchdog", {}),
"backends": raw.get("backend", {}),
"defaults": defaults,
"loop": raw.get("loop", {}),
"plan_dir": str(SCRIPT_DIR),
"log_dir": defaults.get("log_dir", "/srv/cc-ci/.cc-ci-logs"),
"session_prefix": defaults.get("session_prefix", "cc-ci-"),
}
agents = {}
for a in raw.get("agent", []):
m = {**defaults, **a}
m["session"] = a.get("session", cfg["session_prefix"] + a["name"])
m["kind"] = a.get("kind", "persistent")
# one-off env override (single invocation; watchdog ignores via fresh load each tick)
env_model = os.environ.get(f"AGENT_MODEL_{a['name']}")
env_backend = os.environ.get(f"AGENT_BACKEND_{a['name']}")
if env_model: m["model"] = env_model
if env_backend: m["backend"] = env_backend
agents[a["name"]] = m
cfg["agents"] = agents
cfg["services"] = {s["name"]: {**defaults, **s, "session": cfg["session_prefix"] + s["name"]}
for s in raw.get("service", [])}
cfg["state_dir"] = os.path.join(cfg["log_dir"], "state")
Path(cfg["state_dir"]).mkdir(parents=True, exist_ok=True)
return cfg
def backend_of(cfg, agent):
b = cfg["backends"].get(agent["backend"])
if not b:
die(f"agent {agent['name']}: unknown backend {agent['backend']!r}")
return b
# ── logging ───────────────────────────────────────────────────────────────────
def log(msg):
print(f"[agents {datetime.now():%H:%M:%S}] {msg}", flush=True)
def die(msg):
log(f"ERROR: {msg}")
sys.exit(1)
# ── tmux helpers ────────────────────────────────────────────────────────────────
# ALWAYS target sessions with an exact-match "=" prefix. tmux does prefix/fnmatch on bare
# targets, so "-t cc-ci-assistant" would match "cc-ci-assistant3" — capturing or killing the
# wrong session. "=name" forces an exact match. (Bug found pre-cutover 2026-06-13.)
def TS(name): # exact target-SESSION (has-session, kill-session)
return "=" + name
def TP(name): # exact target-PANE: "=session:" anchors the exact session, current window/pane
return "=" + name + ":"
def session_alive(name):
return subprocess.run(["tmux", "has-session", "-t", TS(name)], capture_output=True).returncode == 0
def session_command(name):
r = subprocess.run(["tmux", "display-message", "-p", "-t", TP(name), "#{pane_current_command}"],
capture_output=True, text=True)
return r.stdout.strip() if r.returncode == 0 else ""
def kill_session(name):
subprocess.run(["tmux", "kill-session", "-t", TS(name)], capture_output=True)
def capture_pane(name, lines=40):
r = subprocess.run(["tmux", "capture-pane", "-p", "-t", TP(name)], capture_output=True, text=True)
return "\n".join(r.stdout.splitlines()[-lines:]) if r.returncode == 0 else ""
def pipe_to_log(session, log_path):
subprocess.run(["tmux", "pipe-pane", "-o", "-t", TP(session), f"cat >> '{log_path}'"])
def new_session(session, cwd, cmd, log_path):
Path(cwd).mkdir(parents=True, exist_ok=True)
subprocess.run(["tmux", "new-session", "-d", "-s", session, "-c", cwd, cmd])
pipe_to_log(session, log_path)
def ping_session(session, msg, submit_key="Enter"):
"""Type a message into a session and submit it; retry submit until the prefix clears."""
if not session_alive(session):
return
prefix = msg[:28]
subprocess.run(["tmux", "send-keys", "-t", TP(session), "-l", "--", msg], capture_output=True)
time.sleep(0.5)
for _ in range(10):
subprocess.run(["tmux", "send-keys", "-t", TP(session), submit_key], capture_output=True)
time.sleep(1)
if prefix not in capture_pane(session, 20):
return
# ── activity / limit / fatal detection (per-backend regexes from config) ─────────
def _re(backend, key):
pat = backend.get(key)
return re.compile(pat, re.I) if pat else None
def _session_log_path(cfg, session):
return Path(cfg["log_dir"]) / f"{session}.log"
def _log_recently_touched(cfg, session, age_seconds):
try:
return (time.time() - _session_log_path(cfg, session).stat().st_mtime) <= age_seconds
except FileNotFoundError:
return False
def pane_active(cfg, agent, pane, *, use_log=True):
backend = backend_of(cfg, agent)
active = _re(backend, "active_re")
if agent["backend"] == "opencode":
# completed opencode turns leave a static footer; only the bottom area = activity
bottom = "\n".join(pane.splitlines()[-10:])
hit = bool(active and active.search(bottom))
grace = int(backend.get("log_grace", 180))
return hit or (use_log and _log_recently_touched(cfg, agent["session"], grace))
return bool(active and active.search(pane))
# ── prompt assembly ───────────────────────────────────────────────────────────
DONE_PLACEHOLDER_RE = re.compile(
r"^\s*(not yet|not done|not complete|incomplete|pending\b|tbd\b|n/?a\b|"
r"written here only|only when|to be (written|filled)|when all|<.*>)", re.I)
def phases(cfg): return cfg["loop"].get("phases", [])
def phase_idx_file(cfg): return os.path.join(cfg["state_dir"], cfg["loop"].get("state_file", "phase-idx"))
def cur_idx(cfg):
try:
v = Path(phase_idx_file(cfg)).read_text().strip()
return int(v) if v.lstrip("-").isdigit() else 0
except FileNotFoundError:
return 0
def cur_phase(cfg):
ps = phases(cfg)
return ps[cur_idx(cfg)] if ps else {}
def resolve_state_file(repo_dir, basename):
p = Path(repo_dir) / "machine-docs" / basename
return p if p.exists() else Path(repo_dir) / basename
def phase_done(cfg, status_basename):
repo = cfg["loop"].get("handoff", {}).get("repo", "/srv/cc-ci/cc-ci")
try:
lines = resolve_state_file(repo, status_basename).read_text().splitlines()
except FileNotFoundError:
return False
marker = cfg["loop"].get("done_marker", "## DONE")
for i, line in enumerate(lines):
if not line.startswith(marker):
continue
body = next((nxt for nxt in lines[i+1:] if nxt.strip()), "")
if DONE_PLACEHOLDER_RE.match(body):
continue
return True
return False
def role_model(cfg, agent):
"""Per-phase override (phases[idx].models[role]) wins, else the agent's configured model."""
role = agent.get("role")
if role:
ov = (cur_phase(cfg).get("models") or {}).get(role)
if ov:
return ov
return agent.get("model", "")
def build_loop_kickoff(cfg, agent):
ph = cur_phase(cfg)
pid, plan, status = ph["id"], ph["plan"], ph["status"]
preamble = (
f"*** cc-ci SUB-PHASE {pid} ***\n"
f"SINGLE SOURCE OF TRUTH for THIS phase: /srv/cc-ci/cc-ci-plan/{plan} — read it in full "
f"now; it defines this phase's mission and Definition of Done.\n"
f"The general loop protocol still applies and lives in /srv/cc-ci/cc-ci-plan/plan.md "
f"(§6.1 coordination, §7 pacing, §9 guardrails) — read those sections too.\n"
f"Track loop state in PHASE-NAMESPACED files UNDER machine-docs/ in your repo clone "
f"(create the dir if missing): machine-docs/{status}, machine-docs/BACKLOG-{pid}.md, "
f"machine-docs/REVIEW-{pid}.md, machine-docs/JOURNAL-{pid}.md. machine-docs/DECISIONS.md "
f"is shared (append).\n"
f"FILE-LOCATION RULE (mandatory): ALL coordination / loop-state files live in "
f"machine-docs/, NEVER the repo root — that includes STATUS/BACKLOG/REVIEW/JOURNAL "
f"(phase-namespaced), DECISIONS.md, DEFERRED.md, and the ADVERSARY-INBOX.md / "
f"BUILDER-INBOX.md side-channels. If you ever find one at the root, git mv it into "
f"machine-docs/.\n"
f'"Done" for this phase = the Builder writes "## DONE" to machine-docs/{status} ONLY after '
f"every Definition-of-Done item is Adversary-verified with a fresh PASS in "
f"machine-docs/REVIEW-{pid}.md (handshake per §6.1).\n"
f"The repo's Phase-1 STATUS.md / BACKLOG.md / REVIEW.md are HISTORY from the completed "
f"Phase 1 — do NOT use them as your state; use the phase-namespaced files above.\n"
f'Wherever the standing rules below say "plan.md"/"STATUS.md"/"BACKLOG.md"/"REVIEW.md", '
f"substitute the phase plan and these machine-docs/ phase-namespaced files.\n\n"
f"=== standing role & rules ===\n"
)
role_prompt = (Path(cfg["plan_dir"]) / "prompts" / f"{agent['role']}.md").read_text()
return preamble + role_prompt
def agent_prompt(cfg, agent):
if agent["kind"] == "loop":
return build_loop_kickoff(cfg, agent)
if agent.get("prompt_file"):
return (Path(cfg["plan_dir"]) / agent["prompt_file"]).read_text()
return agent.get("prompt", "")
# ── resume id ───────────────────────────────────────────────────────────────────
_LEGACY_ID = {"orchestrator": ".orchestrator-session-id", "assistant": ".assistant-session-id"}
def resume_id(cfg, agent):
f = Path(cfg["state_dir"]) / f"{agent['name']}.id"
if f.exists():
v = f.read_text().strip()
if v:
return v
legacy = _LEGACY_ID.get(agent["name"])
if legacy:
lp = Path(cfg["log_dir"]) / legacy
if lp.exists():
v = lp.read_text().strip()
if v:
f.write_text(v) # seed state from legacy
return v
return None
# ── agent launch ────────────────────────────────────────────────────────────────
def start_agent(cfg, agent, *, force=False):
session = agent["session"]
if session_alive(session):
if not force:
log(f"{session} already running — leaving it")
return
kill_session(session)
backend = backend_of(cfg, agent)
model = role_model(cfg, agent)
prompt = agent_prompt(cfg, agent)
log_path = str(_session_log_path(cfg, session))
kf = Path(cfg["state_dir"]) / f"kickoff-{session}.txt"
kf.write_text(prompt)
cwd = agent.get("dir", "/srv/cc-ci-orch")
pid = cur_phase(cfg).get("id", "-") if agent["kind"] == "loop" else "-"
if agent["backend"] == "claude":
rc = f"--remote-control '{session}'" if backend.get("remote_control") else ""
rid = resume_id(cfg, agent) if agent.get("resume") and backend.get("supports_resume") else None
resume = f"--resume '{rid}'" if rid else ""
mflag = f"--model '{model}'" if model else ""
flags = backend.get("flags", "")
cmd = f"{backend['bin']} {resume} {rc} {mflag} {flags} \"$(cat '{kf}')\""
log(f"starting {session} (claude, kind={agent['kind']}, phase={pid}, "
f"model={model or 'default'}{', resume' if rid else ''})")
new_session(session, cwd, cmd, log_path)
elif agent["backend"] == "opencode":
cwd = "/srv/cc-ci-orch/cc-ci" if agent["kind"] == "loop" else cwd
model_env = (f"OPENCODE_CONFIG_CONTENT={shlex.quote(json.dumps({'model': model}))} "
if model else "")
preamble = backend.get("preamble", "")
cmd = (f"{preamble}; {model_env}NO_COLOR=1 {backend['bin']} "
f"attach {backend['server']} --dir {shlex.quote(cwd)}")
log(f"starting {session} (opencode, kind={agent['kind']}, phase={pid}, model={model or 'default'})")
new_session(session, cwd, cmd, log_path)
time.sleep(int(backend.get("connect_delay", 12)))
boot = f"Your full kickoff prompt is in {kf} — read it now with: `cat '{kf}'` — then follow it exactly."
ping_session(session, boot, submit_key=backend.get("submit_key", "C-m"))
else:
die(f"unknown backend {agent['backend']!r}")
def start_service(cfg, svc):
session = svc["session"]
if session_alive(session):
log(f"{session} already running — leaving it")
return
log(f"starting service {session}")
new_session(session, svc.get("dir", cfg["plan_dir"]), svc["command"],
str(_session_log_path(cfg, session)))
# ── usage-limit state machine (ported verbatim from launch.py) ───────────────────
RESET_RE = re.compile(r"resets?\s*(?:at\s*)?(\d{1,2})(?::(\d{2}))?\s*(am|pm)?", re.I)
def _limit_state_path(cfg, session): return Path(cfg["state_dir"]) / f"limited-{session}.json"
def _load_limit_state(cfg, session):
try: return json.loads(_limit_state_path(cfg, session).read_text())
except Exception: return None
def _save_limit_state(cfg, session, st): _limit_state_path(cfg, session).write_text(json.dumps(st))
def _clear_limit_state(cfg, session):
try: _limit_state_path(cfg, session).unlink()
except FileNotFoundError: pass
def _parse_reset_epoch(pane):
matches = list(RESET_RE.finditer(pane))
if not matches:
return None
m = matches[-1]
try:
hour, minute = int(m.group(1)), int(m.group(2) or 0)
ampm = (m.group(3) or "").lower()
if ampm == "pm" and hour != 12: hour += 12
elif ampm == "am" and hour == 12: hour = 0
if hour > 23 or minute > 59: return None
cand = datetime.now().replace(hour=hour, minute=minute, second=0, microsecond=0)
if cand.timestamp() <= time.time(): cand += timedelta(days=1)
return cand.timestamp()
except Exception:
return None
def _next_limit_until(cfg, pane, now):
fallback = int(cfg["watchdog"].get("limit_probe_fallback", 300))
slack = int(cfg["watchdog"].get("limit_reset_slack", 45))
parsed = _parse_reset_epoch(pane)
if parsed is not None and parsed - now <= 6 * 3600:
return parsed + slack, True
return now + fallback, False
def _limit_nudge_msg(kind):
if kind in ("persistent", "orchestrator"):
return ("watchdog probe: if the quota window has reset, RESUME now — re-check status "
"and continue from where you stopped.")
return ("watchdog probe: if the quota window has reset, RESUME your loop now — pull latest, "
"re-read your phase STATUS/REVIEW files, and continue; re-arm your loop pacing.")
def limit_tick(cfg, agent, pane):
"""True while the agent is inside a usage-limit window — callers suppress all healing."""
session = agent["session"]
backend = backend_of(cfg, agent)
limit_re = _re(backend, "limit_re")
submit = backend.get("submit_key", "Enter")
fallback = int(cfg["watchdog"].get("limit_probe_fallback", 300))
state = _load_limit_state(cfg, session)
limited_now = bool(limit_re and limit_re.search(pane))
if state is None:
if not limited_now or pane_active(cfg, agent, pane, use_log=False):
return False
now = time.time()
until, parsed = _next_limit_until(cfg, pane, now)
if parsed:
log(f"limit hit on {agent['name']} — banner says reset "
f"{datetime.fromtimestamp(until):%a %H:%M}; holding (no reboots)")
else:
log(f"limit hit on {agent['name']} — reset unparsable; flat "
f"{fallback//60}-min probe loop (no reboots)")
_save_limit_state(cfg, session, {"until": until, "nudges": 0})
return True
if pane_active(cfg, agent, pane, use_log=False) or not limited_now:
log(f"limit lifted on {agent['name']} — clearing limit state")
_clear_limit_state(cfg, session)
return False
now = time.time()
if now < state.get("until", 0):
return True
msg = _limit_nudge_msg(agent["kind"])
if msg[:28] in "\n".join(pane.splitlines()[-8:]):
return True
nudges = state.get("nudges", 0) + 1
log(f"limit probe #{nudges} on {agent['name']} — nudging to resume")
ping_session(session, msg, submit_key=submit)
time.sleep(3)
pane2 = capture_pane(session, 40)
if pane_active(cfg, agent, pane2, use_log=False) and not (limit_re and limit_re.search(pane2)):
log(f"limit lifted on {agent['name']} — probe resumed it")
_clear_limit_state(cfg, session)
return True
until, _ = _next_limit_until(cfg, pane2, now)
if nudges == 3:
log(f"WARNING: {agent['name']} still limited after {nudges} probes — flat probes; never rebooting")
_save_limit_state(cfg, session, {"until": until, "nudges": nudges})
return True
# ── stall detection (ported) ─────────────────────────────────────────────────────
_idle_since: dict[str, float] = {}
def _last_nonempty_line(text):
for line in reversed(text.splitlines()):
if line.strip():
return line.strip()
return ""
def _parse_waiting_until(agent, pane):
if agent["backend"] == "opencode":
line = _last_nonempty_line(pane)
if not line.startswith("WAITING-UNTIL:"):
return None
m = re.search(r"WAITING-UNTIL:\s*(\S+)", line)
else:
m = re.search(r"WAITING-UNTIL:\s*(\S+)", pane)
if not m:
return None
try:
return datetime.fromisoformat(m.group(1).replace("Z", "+00:00")).timestamp()
except Exception:
return None
def stall_check_one(cfg, agent):
session = agent["session"]
if not session_alive(session):
_idle_since[session] = 0.0
_clear_limit_state(cfg, session)
return
now = time.time()
pane = capture_pane(session, 40)
if limit_tick(cfg, agent, pane):
_idle_since[session] = 0.0
return
if pane_active(cfg, agent, pane):
_idle_since[session] = 0.0
return
since = _idle_since.get(session) or now
_idle_since[session] = since
idle = now - since
grace = int(cfg["watchdog"].get("stall_grace", 180))
until = _parse_waiting_until(agent, pane)
if until is not None:
if now <= until + grace:
return
reason = f"past its WAITING-UNTIL by {int(now-until)}s — self-wake did not fire"
else:
stall_idle = int(backend_of(cfg, agent).get("stall_idle", 300))
if idle < stall_idle:
return
reason = f"idle {int(idle)}s with no WAITING-UNTIL marker"
log(f"stall: {agent['name']} ({session}) {reason} — kill + reboot")
start_agent(cfg, agent, force=True)
_idle_since[session] = 0.0
# ── healing (ported, config-driven) ──────────────────────────────────────────────
def _expected_backend_cmd(agent):
return "opencode" if agent["backend"] == "opencode" else "claude"
def backend_mismatch(agent):
cmd = session_command(agent["session"])
# only a definite OTHER known backend is a mismatch; transient bash/sh during startup is not
if cmd not in ("claude", "opencode"):
return False
return cmd != _expected_backend_cmd(agent)
def heal_one(cfg, agent):
session = agent["session"]
backend = backend_of(cfg, agent)
if not session_alive(session):
log(f"{agent['name']} ({session}) gone — restarting")
start_agent(cfg, agent)
return
if backend_mismatch(agent):
log(f"{agent['name']} ({session}) is {session_command(session)!r}, expected "
f"{_expected_backend_cmd(agent)} — kill + restart")
start_agent(cfg, agent, force=True)
return
pane = capture_pane(session, 25)
if pane_active(cfg, agent, pane):
return
if limit_tick(cfg, agent, pane):
return
fatal = _re(backend, "fatal_re")
if fatal and fatal.search(pane):
log(f"FATAL session-state error on {agent['name']} ({session}) — kill + restart")
start_agent(cfg, agent, force=True)
# ── wake (persistent agents with a wake schedule) ────────────────────────────────
def wake_agent(cfg, agent):
"""Returns True when the wake landed (or is moot), False to retry next tick."""
wake = agent.get("wake")
if not wake:
return True
session = agent["session"]
if not session_alive(session):
return False
backend = backend_of(cfg, agent)
if pane_active(cfg, agent, capture_pane(session, 25)):
return False
pf = wake.get("prompt_file")
try:
msg = " ".join((Path(cfg["plan_dir"]) / pf).read_text().split())
except (FileNotFoundError, TypeError):
log(f"wake skipped for {agent['name']} — prompt file missing: {pf}")
return True
if not msg:
return True
log(f"waking {agent['name']} ({session}) for scheduled supervision pass")
ping_session(session, msg, submit_key=backend.get("submit_key", "Enter"))
return True
# ── handoff signalling (loop pair, ported) ───────────────────────────────────────
_hand = {"sha": "", "adv_inbox": "", "builder_inbox": ""}
def handoff_reset():
_hand["sha"] = _hand["adv_inbox"] = _hand["builder_inbox"] = ""
def _git(repo, args):
return subprocess.run(f"git -C {repo!r} {args}", shell=True, capture_output=True, text=True)
def _show_pushed(repo, path):
for loc in (f"origin/main:machine-docs/{path}", f"origin/main:{path}"):
r = _git(repo, f"show {loc!r}")
if r.returncode == 0:
return r.stdout
return ""
def handoff_check(cfg):
h = cfg["loop"].get("handoff")
if not h:
return
repo = h["repo"]
sub = lambda name: cfg["agents"].get(name, {}).get("session", cfg["session_prefix"] + name)
submit = backend_of(cfg, cfg["agents"]["builder"]).get("submit_key", "Enter")
_git(repo, "fetch -q origin")
head = _git(repo, "rev-parse origin/main").stdout.strip()
if head:
if not _hand["sha"]:
_hand["sha"] = head
elif head != _hand["sha"]:
subjects = _git(repo, f"log --format=%s {_hand['sha']}..origin/main").stdout
if re.search(r"^claim", subjects, re.M | re.I):
log("handoff: claim(...) commit → pinging adversary")
ping_session(sub(h.get("claim_pings", "adversary")),
"watchdog ping: the Builder pushed a gate CLAIM (claim(...) commit). "
"Pull and verify the claimed gate now.", submit_key=submit)
if re.search(r"^review", subjects, re.M | re.I):
log("handoff: review(...) commit → pinging builder")
ping_session(sub(h.get("review_pings", "builder")),
"watchdog ping: the Adversary pushed a verdict/finding (review(...) commit). "
"Pull REVIEW and act.", submit_key=submit)
_hand["sha"] = head
inboxes = h.get("inboxes", [])
md5 = lambda s: hashlib.md5(s.encode()).hexdigest()
for fname, key, target in (
(inboxes[0] if len(inboxes) > 0 else None, "adv_inbox", "adversary"),
(inboxes[1] if len(inboxes) > 1 else None, "builder_inbox", "builder"),
):
if not fname:
continue
content = _show_pushed(repo, fname)
if content:
hh = md5(content)
if hh != _hand[key]:
log(f"handoff: {fname} changed → pinging {target}")
ping_session(sub(target),
f"watchdog ping: the other loop pushed machine-docs/{fname} — pull, read it, "
f"act, then delete the file (commit + push) to mark it consumed.", submit_key=submit)
_hand[key] = hh
else:
_hand[key] = ""
# ── phase advance (loop machine, ported) ─────────────────────────────────────────
def loop_agents(cfg):
return [a for a in cfg["agents"].values() if a["kind"] == "loop" and a.get("enabled", True)]
def stop_loops(cfg):
for a in loop_agents(cfg):
if session_alive(a["session"]):
log(f"killing {a['session']}")
kill_session(a["session"])
def start_loops(cfg):
for a in loop_agents(cfg):
start_agent(cfg, a)
def phase_advance_check(cfg):
"""On heavy tick: if the current phase is DONE, advance (or finish the sequence).
Returns True only when it actually transitions/completes THIS tick (caller skips healing
that one tick). Once the sequence is already marked complete it is idempotent (returns
False, no re-log, no re-stop) so the watchdog keeps supervising the orchestrator without
spamming the log or churning the intentionally-stopped loops. If the operator APPENDS a
phase after completion, the current phase is no longer last → it advances and clears the
stale completion marker, resuming the loops on the new phase."""
ps = phases(cfg)
if not ps or not cfg["loop"].get("auto_advance", True):
return False
marker = Path(cfg["log_dir"]) / "SEQUENCE-COMPLETE"
idx = cur_idx(cfg)
ph = ps[idx]
if not phase_done(cfg, ph["status"]):
return False
nxt = idx + 1
if nxt < len(ps):
log(f"PHASE {ph['id']} DONE — auto-transitioning to {ps[nxt]['id']}")
stop_loops(cfg)
Path(phase_idx_file(cfg)).write_text(str(nxt))
if marker.exists():
marker.unlink() # resuming into a (freshly-appended) phase — clear stale completion
handoff_reset()
start_loops(cfg)
return True
# last phase is DONE → sequence complete
if marker.exists():
return False # already handled — idempotent (no re-log, no re-stop)
log(f"PHASE SEQUENCE COMPLETE (last phase {ph['id']} DONE) — stopping loops")
stop_loops(cfg)
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
marker.write_text(f"cc-ci phase sequence complete {ts}. Loops stopped; build finished.\n")
oc = cfg["loop"].get("on_complete")
if oc:
trig = Path(cfg["log_dir"]) / oc.get("trigger_file", ".run-upgrade-on-complete")
if trig.exists():
trig.unlink()
runname = oc.get("run", "upgrader")
log(f"on_complete: launching task agent {runname!r}")
if runname in cfg["agents"]:
start_agent(cfg, cfg["agents"][runname], force=True)
return True
# ── watchdog loop ────────────────────────────────────────────────────────────────
def watched(cfg):
return [a for a in cfg["agents"].values()
if a.get("enabled", True) and a.get("watch", "none") != "none"]
def watchdog_loop(cfg_path):
cfg = load_config(cfg_path)
sig = int(cfg["watchdog"].get("signal_interval", 30))
heavy = int(cfg["watchdog"].get("heavy_interval", 300))
ps = phases(cfg)
log(f"watchdog up — phase={cur_phase(cfg).get('id','-')} [{cur_idx(cfg)+1}/{len(ps)}] "
f"signal={sig}s heavy={heavy}s, watching: {[a['name'] for a in watched(cfg)]}")
elapsed = heavy # force a heavy check on first tick
wake_elapsed = {a["name"]: 0 for a in cfg["agents"].values() if a.get("wake")}
while True:
cfg = load_config(cfg_path) # re-read every tick: config is authoritative, no env drift
has_loops = bool(loop_agents(cfg))
seq_done = (Path(cfg["log_dir"]) / "SEQUENCE-COMPLETE").exists()
if has_loops:
handoff_check(cfg)
# signal-tick limit + stall care for every watched agent
for a in watched(cfg):
if a["watch"] == "heal+stall":
stall_check_one(cfg, a)
else:
# limit care only (low resume latency) for heal-only agents
if session_alive(a["session"]):
limit_tick(cfg, a, capture_pane(a["session"], 40))
# Scheduled wakes (e.g. the hourly orchestrator supervision ping). Suppressed once the
# build SEQUENCE is COMPLETE — nothing left to supervise, so stay fully at rest. Resumes
# automatically when new work is queued (that removes the SEQUENCE-COMPLETE marker).
if not seq_done:
for name, el in list(wake_elapsed.items()):
interval = int(cfg["agents"][name]["wake"].get("interval", 3600))
if el >= interval:
if wake_agent(cfg, cfg["agents"][name]):
wake_elapsed[name] = 0
if elapsed >= heavy:
elapsed = 0
advanced = phase_advance_check(cfg) if has_loops else False
if not advanced:
# When the build sequence is complete the loops are intentionally stopped —
# keep healing the orchestrator (and any persistent agent) but do NOT restart
# the finished loops. (seq_done computed at the top of the tick.)
for a in watched(cfg):
if seq_done and a["kind"] == "loop":
continue
heal_one(cfg, a)
time.sleep(sig)
elapsed += sig
for k in wake_elapsed:
wake_elapsed[k] += sig
def loop_sessions_alive(cfg):
return any(session_alive(a["session"]) for a in loop_agents(cfg))
# ── CLI commands ──────────────────────────────────────────────────────────────
def start_watchdog(cfg, cfg_path):
session = cfg["session_prefix"] + "watchdog"
if session_alive(session):
log("watchdog already running")
return
log("starting watchdog")
script = Path(__file__).resolve()
new_session(session, cfg["plan_dir"],
f"exec >>'{cfg['log_dir']}/{session}.log' 2>&1; "
f"python3 '{script}' watchdog --config '{cfg_path}'",
str(_session_log_path(cfg, session)))
def cmd_up(cfg, cfg_path, names):
if cfg["loop"].get("resume_phase") is False and not Path(phase_idx_file(cfg)).exists():
Path(phase_idx_file(cfg)).write_text("0")
targets = ([cfg["agents"][n] for n in names if n in cfg["agents"]]
if names else
[a for a in cfg["agents"].values() if a.get("enabled", True)])
for a in targets:
start_agent(cfg, a)
if not names:
for s in cfg["services"].values():
start_service(cfg, s)
start_watchdog(cfg, cfg_path)
else:
for n in names:
if n in cfg["services"]:
start_service(cfg, cfg["services"][n])
if n == "watchdog":
start_watchdog(cfg, cfg_path)
def cmd_down(cfg, names):
sessions = []
if names:
for n in names:
if n in cfg["agents"]: sessions.append(cfg["agents"][n]["session"])
elif n in cfg["services"]: sessions.append(cfg["services"][n]["session"])
elif n == "watchdog": sessions.append(cfg["session_prefix"] + "watchdog")
else:
sessions = [a["session"] for a in cfg["agents"].values()]
sessions += [s["session"] for s in cfg["services"].values()]
sessions.append(cfg["session_prefix"] + "watchdog")
for s in sessions:
if session_alive(s):
log(f"killing {s}")
kill_session(s)
def cmd_status(cfg):
idx, ps = cur_idx(cfg), phases(cfg)
if ps:
ph = ps[idx]
done = "## DONE" if phase_done(cfg, ph["status"]) else "in progress"
print(f" phase: {ph['id']} [{idx+1}/{len(ps)}] plan={ph['plan']} ({done})")
print(f" {'AGENT':<14} {'KIND':<11} {'BACKEND':<9} {'MODEL':<20} {'WATCH':<10} STATE")
for a in cfg["agents"].values():
st = "RUNNING" if session_alive(a["session"]) else "stopped"
en = "" if a.get("enabled", True) else " (disabled)"
rc = session_command(a["session"]) if st == "RUNNING" else ""
print(f" {a['name']:<14} {a['kind']:<11} {a['backend']:<9} "
f"{role_model(cfg,a) or 'default':<20} {a.get('watch','none'):<10} {st}{en}"
+ (f" [{rc}]" if rc else ""))
for s in cfg["services"].values():
st = "RUNNING" if session_alive(s["session"]) else "stopped"
print(f" {s['name']:<14} {'service':<11} {'-':<9} {'-':<20} {'-':<10} {st}")
wd = cfg["session_prefix"] + "watchdog"
print(f" {'watchdog':<14} {'service':<11} {'-':<9} {'-':<20} {'-':<10} "
f"{'RUNNING' if session_alive(wd) else 'stopped'}")
def cmd_phase(cfg, args):
if not args or args[0] == "show":
idx, ps = cur_idx(cfg), phases(cfg)
print(f"phase {ps[idx]['id']} [{idx+1}/{len(ps)}] seq: {' '.join(p['id'] for p in ps)}")
return
if args[0] == "next":
Path(phase_idx_file(cfg)).write_text(str(min(cur_idx(cfg)+1, len(phases(cfg))-1)))
elif args[0] == "set" and len(args) > 1:
Path(phase_idx_file(cfg)).write_text(str(int(args[1])))
print(f"phase idx now {cur_idx(cfg)} ({cur_phase(cfg).get('id')})")
def cmd_migrate(cfg):
"""Seed state/ from the legacy dotfiles so a cutover is seamless."""
legacy_phase = Path(cfg["log_dir"]) / ".phase-idx"
if legacy_phase.exists() and not Path(phase_idx_file(cfg)).exists():
log(f"NOTE: legacy .phase-idx={legacy_phase.read_text().strip()} — set state/phase-idx "
f"to the DE-DUPED index yourself (cf55: 10→9). Not auto-copied.")
for name in ("orchestrator", "assistant"):
a = cfg["agents"].get(name)
if a:
rid = resume_id(cfg, a) # seeds state/<name>.id from legacy
log(f"{name} resume id: {rid or '(none)'}")
log("migrate done — review state/ dir")
def cmd_selftest(cfg):
oc = {"name": "x", "backend": "opencode", "session": "x", "kind": "loop"}
idle = "\n ▣ Build · GPT-5.4 · 2m 19s\n 178.4K (17%) ctrl+p commands\n"
active = "\n ~ Preparing patch...\n ⬝⬝⬝■■ esc interrupt 137.6K\n"
checks = [
("opencode idle footer is idle", not pane_active(cfg, oc, idle, use_log=False)),
("opencode active footer is active", pane_active(cfg, oc, active, use_log=False)),
("limit banner+idle footer not active",
not pane_active(cfg, oc, idle + "\nYou've hit your weekly limit · resets Jun 16, 10pm\n", use_log=False)),
]
bad = [n for n, ok in checks if not ok]
for n, ok in checks:
print(f" {'PASS' if ok else 'FAIL'}: {n}")
sys.exit(1 if bad else 0)
# ── main ──────────────────────────────────────────────────────────────────────
def main():
argv = sys.argv[1:]
cfg_path = _cfg_path(argv)
argv = [a for i, a in enumerate(argv)
if a != "--config" and (i == 0 or argv[i-1] != "--config")]
cmd = argv[0] if argv else "status"
rest = argv[1:]
cfg = load_config(cfg_path)
if cmd == "up": cmd_up(cfg, cfg_path, rest)
elif cmd == "down": cmd_down(cfg, rest)
elif cmd == "status": cmd_status(cfg)
elif cmd == "watchdog": watchdog_loop(cfg_path)
elif cmd == "phase": cmd_phase(cfg, rest)
elif cmd == "migrate": cmd_migrate(cfg)
elif cmd == "selftest": cmd_selftest(cfg)
elif cmd == "logs":
if not rest:
die("usage: agents.py logs <name>")
sess = cfg["agents"].get(rest[0], {}).get("session") or (cfg["session_prefix"] + rest[0])
os.execvp("tail", ["tail", "-f", str(_session_log_path(cfg, sess))])
else:
print(__doc__)
if __name__ == "__main__":
main()