Files
cc-ci-orchestrator/cc-ci-plan/launch.py
autonomic-bot 5ea17fca21 watchdog: fix limit-probe self-match + scrollback dedupe wedge; plan(lvl5): badge shows level only
Night-watch findings (monthly-spend-limit window, ~01:49-04:45):
- probe text said 'usage limit' which matches LIMIT_RE, so a submitted probe
  kept limited_now true forever -> reworded to 'quota window' with a CAUTION
  note (nudge text must never match LIMIT_RE)
- dedupe scanned all 40 captured lines, so once a probe scrolled into the
  conversation no further probe ever fired (builder/adv frozen at nudges=1,
  orchestrator probes degraded to hourly riding the wake scroll) -> dedupe
  now only checks the bottom 8 lines (input area)
Core invariant HELD: zero kill+reboots during the limit window.

plan(lvl5): operator addition - the top-corner level badge (card, dashboard
pill, badge SVG) shows only the level number+color, zero capping info; the
inline per-rung table keeps intentional-skip/unverified detail.
2026-06-11 05:52:26 +00:00

892 lines
40 KiB
Python

#!/usr/bin/env python3
"""
cc-ci loop launcher — phase-aware Builder/Adversary loops + watchdog.
Usage:
launch.py start start loops + watchdog (resets to phase 0 unless RESUME_PHASE=1)
launch.py stop stop loops + watchdog
launch.py status show phase + session state
launch.py watchdog run the watchdog in the foreground (called by start_watchdog)
launch.py logs builder|adversary|watchdog tail a log
Env (all optional — defaults shown):
LOOP_BACKEND claude (default) | opencode
LOOP_MODEL model flag, e.g. "sonnet" (claude) or "tinfoil/deepseek-v4-pro" (opencode)
ADV_MODEL optional model override for the Adversary only (falls back to LOOP_MODEL)
RESUME_PHASE 1 = keep current phase index on start (default resets to 0)
CLAUDE_BIN claude
OPENCODE_BIN /home/loops/.local/bin/opencode
OPENCODE_SERVER http://127.0.0.1:4096
PLAN_DIR /srv/cc-ci/cc-ci-plan
BUILDER_DIR /srv/cc-ci/cc-ci
ADV_DIR /srv/cc-ci/cc-ci-adv
LOG_DIR /srv/cc-ci/.cc-ci-logs
PHASES_SPEC semicolon-separated "id|planfile|statusfile" entries
PHASE_IDX_FILE $LOG_DIR/.phase-idx
WATCH_INTERVAL 300 (seconds between heavy checks: phase DONE / heal sessions)
SIGNAL_INTERVAL 30 (seconds between handoff / stall checks)
ORCH_WAKE_INTERVAL 3600 (seconds between supervision wakes typed into the orchestrator session)
ORCH_WAKE_PROMPT $PLAN_DIR/ai-progress-monitor-prompt.txt (the supervision prompt)
STALL_IDLE 300 (idle seconds without a WAITING-UNTIL before reboot)
STALL_GRACE 180 (seconds past a WAITING-UNTIL before reboot)
"""
import hashlib, json, os, re, subprocess, sys, time
from datetime import datetime, timedelta, timezone
from pathlib import Path
# ── config ────────────────────────────────────────────────────────────────────
PLAN_DIR = os.environ.get("PLAN_DIR", "/srv/cc-ci/cc-ci-plan")
BUILDER_DIR = os.environ.get("BUILDER_DIR", "/srv/cc-ci/cc-ci")
ADV_DIR = os.environ.get("ADV_DIR", "/srv/cc-ci/cc-ci-adv")
LOG_DIR = os.environ.get("LOG_DIR", "/srv/cc-ci/.cc-ci-logs")
# Backend is read from env, falling back to a persisted file written by `start`.
# This ensures the watchdog (which runs in its own tmux session without the caller's env)
# uses the same backend/model when it restarts a dead session.
_BACKEND_FILE = os.path.join(LOG_DIR, ".loop-backend")
_PHASES_FILE = os.path.join(LOG_DIR, ".phases-spec")
_MODEL_FILE = os.path.join(LOG_DIR, ".loop-model")
_ADV_MODEL_FILE = os.path.join(LOG_DIR, ".loop-model-adv")
def _read_file_default(path, default):
try:
v = Path(path).read_text().strip()
return v if v else default
except FileNotFoundError:
return default
BACKEND = os.environ.get("LOOP_BACKEND") or _read_file_default(_BACKEND_FILE, "claude")
LOOP_MODEL = os.environ.get("LOOP_MODEL") or _read_file_default(_MODEL_FILE, "")
# Per-role override: the Adversary may run a different model (e.g. opus for review while the
# Builder runs fable). Empty → falls back to LOOP_MODEL at the call site.
ADV_MODEL = os.environ.get("ADV_MODEL") or _read_file_default(_ADV_MODEL_FILE, "")
def role_model(role):
return ADV_MODEL if (role == "adversary" and ADV_MODEL) else LOOP_MODEL
REMOTE_CONTROL = os.environ.get("REMOTE_CONTROL", "1") == "1"
CLAUDE_BIN = os.environ.get("CLAUDE_BIN", "claude")
CLAUDE_FLAGS = os.environ.get("CLAUDE_FLAGS", "")
if os.getuid() == 0:
os.environ.setdefault("CLAUDE_DANGEROUSLY_SKIP_PERMISSIONS", "1")
else:
CLAUDE_FLAGS = os.environ.get("CLAUDE_FLAGS", "--dangerously-skip-permissions")
OPENCODE_BIN = os.environ.get("OPENCODE_BIN", "/home/loops/.local/bin/opencode")
OPENCODE_SERVER = os.environ.get("OPENCODE_SERVER", "http://127.0.0.1:4096")
ORCH_SESSION = os.environ.get("ORCH_SESSION", "cc-ci-orchestrator")
ORCH_LAUNCHER = os.environ.get("ORCH_LAUNCHER", f"{PLAN_DIR}/launch-orchestrator.sh")
WATCH_ORCHESTRATOR = os.environ.get("WATCH_ORCHESTRATOR", "1") == "1"
BUILDER_SESSION = "cc-ci-builder"
ADV_SESSION = "cc-ci-adv"
WATCHDOG_SESSION = "cc-ci-watchdog"
WATCH_INTERVAL = int(os.environ.get("WATCH_INTERVAL", 300))
SIGNAL_INTERVAL = int(os.environ.get("SIGNAL_INTERVAL", 30))
# Hourly supervision wake: the watchdog types this prompt into the orchestrator session
# so it reviews the loops and nudges as needed (replaces the standalone ai-progress-monitor waker).
ORCH_WAKE_INTERVAL = int(os.environ.get("ORCH_WAKE_INTERVAL", 3600))
ORCH_WAKE_PROMPT = os.environ.get("ORCH_WAKE_PROMPT", f"{PLAN_DIR}/ai-progress-monitor-prompt.txt")
STALL_IDLE = int(os.environ.get("STALL_IDLE", 300))
STALL_GRACE = int(os.environ.get("STALL_GRACE", 180))
_DEFAULT_PHASES_SPEC = ";".join([
"1c|plan-phase1c-full-reproducibility.md|STATUS-1c.md",
"1b|plan-phase1b-review-lint.md|STATUS-1b.md",
"1d|plan-phase1d-generic-test-suite.md|STATUS-1d.md",
"1e|plan-phase1e-harness-corrections.md|STATUS-1e.md",
"2w|plan-phase2w-warm-canonical-quick.md|STATUS-2w.md",
"2pc|plan-phase2pc-image-cache.md|STATUS-2pc.md",
"2|plan-phase2-recipe-tests.md|STATUS-2.md",
"2b|plan-phase2b-test-performance.md|STATUS-2b.md",
"3|plan-phase3-results-ux.md|STATUS-3.md",
"4|plan-phase4-final-review-polish-cleanup.md|STATUS-4.md",
"5|plan-phase5-verify-upgrade-flow.md|STATUS-5.md",
])
# Env wins; else a persisted file written by `start` (so status/watchdog/reboot all agree on the
# current phase set); else the default build sequence above.
PHASES_SPEC = os.environ.get("PHASES_SPEC") or _read_file_default(_PHASES_FILE, _DEFAULT_PHASES_SPEC)
PHASES = [p.split("|") for p in PHASES_SPEC.split(";")]
PHASE_IDX_FILE = os.environ.get("PHASE_IDX_FILE", f"{LOG_DIR}/.phase-idx")
# Regex patterns for session-state detection
ACTIVE_RE = re.compile(r"esc to interrupt|⠋|⠙|⠹|⠸|⠼|⠴|⠦|⠧|⠇|⠏|Running tool|▣|Build ·|· \d+")
LIMIT_RE = re.compile(r"spend limit|usage limit|limit reached|reached your .*limit|out of (credits|tokens)", re.I)
FATAL_RE = re.compile(r"redacted_thinking|blocks cannot be modified|cannot be modified", re.I)
RECENT_ACTIVITY_RE = re.compile(r"thinking|inferring|running tool|remote control (active|connecting)|tool call|schedulewake?up", re.I)
OPENCODE_STALL_IDLE = int(os.environ.get("OPENCODE_STALL_IDLE", 900))
OPENCODE_LOG_GRACE = int(os.environ.get("OPENCODE_LOG_GRACE", 180))
# ── logging ───────────────────────────────────────────────────────────────────
def log(msg):
ts = datetime.now().strftime("%H:%M:%S")
print(f"[launch {ts}] {msg}", flush=True)
def die(msg):
log(f"ERROR: {msg}")
sys.exit(1)
# ── tmux helpers ──────────────────────────────────────────────────────────────
def session_alive(name):
return subprocess.run(
["tmux", "has-session", "-t", name],
capture_output=True
).returncode == 0
def kill_session(name):
subprocess.run(["tmux", "kill-session", "-t", name], capture_output=True)
def capture_pane(name, lines=40):
r = subprocess.run(["tmux", "capture-pane", "-pt", name], capture_output=True, text=True)
return "\n".join(r.stdout.splitlines()[-lines:]) if r.returncode == 0 else ""
def _session_log_path(session):
return Path(LOG_DIR) / f"{session}.log"
def _log_recently_touched(session, age_seconds):
try:
return (time.time() - _session_log_path(session).stat().st_mtime) <= age_seconds
except FileNotFoundError:
return False
def _last_nonempty_line(text):
for line in reversed(text.splitlines()):
if line.strip():
return line.strip()
return ""
def pipe_to_log(session, log_path):
subprocess.run(["tmux", "pipe-pane", "-o", "-t", session, f"cat >> '{log_path}'"])
def ping_session(session, msg, submit_key="Enter"):
"""Type a message into a tmux session and submit it.
submit_key: "Enter" for claude; "C-m" for opencode (Ctrl+M = Enter).
Retries the submit key until the typed prefix is no longer visible in the content area.
opencode renders the input in the content area, so we check more lines.
"""
if not session_alive(session):
return
prefix = msg[:28]
subprocess.run(["tmux", "send-keys", "-t", session, "-l", "--", msg], capture_output=True)
time.sleep(0.5)
for _ in range(10):
subprocess.run(["tmux", "send-keys", "-t", session, submit_key], capture_output=True)
time.sleep(1)
# Check the top 20 lines of content (not just last 4 bottom UI)
if prefix not in capture_pane(session, 20):
return # message was accepted
# ── phase helpers ─────────────────────────────────────────────────────────────
def cur_idx():
try:
v = Path(PHASE_IDX_FILE).read_text().strip()
return int(v) if v.isdigit() else 0
except FileNotFoundError:
return 0
def phase_id(idx): return PHASES[idx][0]
def phase_plan(idx): return PHASES[idx][1]
def phase_status(idx): return PHASES[idx][2]
def all_ids(): return " ".join(p[0] for p in PHASES)
def resolve_state(repo_dir, basename):
"""Return the path to a loop-state file — machine-docs/ if present, else repo root."""
p = Path(repo_dir) / "machine-docs" / basename
return p if p.exists() else Path(repo_dir) / basename
def phase_done(status_basename):
path = resolve_state(BUILDER_DIR, status_basename)
try:
return any(line.startswith("## DONE") for line in path.open())
except FileNotFoundError:
return False
# ── kickoff prompt ────────────────────────────────────────────────────────────
def build_kickoff(role, idx):
pid, plan, status = phase_id(idx), phase_plan(idx), phase_status(idx)
preamble = (
f"*** cc-ci SUB-PHASE {pid} ***\n"
f"SINGLE SOURCE OF TRUTH for THIS phase: /srv/cc-ci/cc-ci-plan/{plan} — read it in full "
f"now; it defines this phase's mission and Definition of Done.\n"
f"The general loop protocol still applies and lives in /srv/cc-ci/cc-ci-plan/plan.md "
f"(§6.1 coordination, §7 pacing, §9 guardrails) — read those sections too.\n"
f"Track loop state in PHASE-NAMESPACED files in your repo clone: {status}, "
f"BACKLOG-{pid}.md, REVIEW-{pid}.md, JOURNAL-{pid}.md. DECISIONS.md is shared (append).\n"
f'"Done" for this phase = the Builder writes "## DONE" to {status} ONLY after every '
f"Definition-of-Done item is Adversary-verified with a fresh PASS in REVIEW-{pid}.md "
f"(handshake per §6.1).\n"
f"The repo's Phase-1 STATUS.md / BACKLOG.md / REVIEW.md are HISTORY from the completed "
f"Phase 1 — do NOT use them as your state; use the phase-namespaced files above.\n"
f'Wherever the standing rules below say "plan.md"/"STATUS.md"/"BACKLOG.md"/"REVIEW.md", '
f"substitute the phase plan and these phase-namespaced files.\n\n"
f"=== standing role & rules ===\n"
)
role_prompt = (Path(PLAN_DIR) / "prompts" / f"{role}.md").read_text()
return preamble + role_prompt
# ── agent launch ──────────────────────────────────────────────────────────────
def start_agent(role, session, workdir):
if session_alive(session):
log(f"{session} already running — leaving it")
return
Path(workdir).mkdir(parents=True, exist_ok=True)
Path(LOG_DIR).mkdir(parents=True, exist_ok=True)
idx = cur_idx()
pid, plan = phase_id(idx), phase_plan(idx)
kf = Path(LOG_DIR) / f".kickoff-{session}.txt"
kf.write_text(build_kickoff(role, idx))
model = role_model(role)
model_flag = f"--model '{model}'" if model else ""
session_cwd = workdir
if BACKEND == "claude":
rc = f"--remote-control '{session}'" if REMOTE_CONTROL else ""
cmd = f"{CLAUDE_BIN} {rc} {model_flag} {CLAUDE_FLAGS} \"$(cat '{kf}')\""
log(f"starting {session} (backend=claude, phase={pid}, plan={plan}, model={model or 'default'})")
elif BACKEND == "opencode":
# Attach each TUI to the shared opencode web server so sessions are recorded the same
# way as browser-created sessions, including a populated `path` in the DB.
# We still pin the visible project root with --dir, while the kickoff instructions use
# absolute repo paths for builder/adversary work.
session_cwd = "/srv/cc-ci-orch/cc-ci"
cmd = (
f"set -a; . /srv/cc-ci/.testenv; set +a; "
f"NO_COLOR=1 {OPENCODE_BIN} attach {OPENCODE_SERVER} --dir {session_cwd}"
)
log(f"starting {session} (backend=opencode, phase={pid}, model={model or 'default'})")
log(f" visible at http://oc.commoninternet.net (tailnet only)")
else:
die(f"unknown BACKEND '{BACKEND}' — set LOOP_BACKEND=claude or LOOP_BACKEND=opencode")
subprocess.run(["tmux", "new-session", "-d", "-s", session, "-c", session_cwd, cmd])
pipe_to_log(session, f"{LOG_DIR}/{session}.log")
# opencode: send a short bootstrap once the TUI is ready.
# opencode TUI uses C-m (Ctrl+M = Enter) to submit messages.
# The full kickoff lives in the kickoff file; we point to it to stay under send-keys limits.
if BACKEND == "opencode":
time.sleep(12) # opencode TUI needs more time to connect to the server
bootstrap = (
f"Your full kickoff prompt is in {kf} — read it now with: "
f"`cat '{kf}'` — then follow its instructions exactly."
)
ping_session(session, bootstrap, submit_key="C-m")
def start_loops():
start_agent("builder", BUILDER_SESSION, BUILDER_DIR)
start_agent("adversary", ADV_SESSION, ADV_DIR)
def stop_loops():
for s in (BUILDER_SESSION, ADV_SESSION):
if session_alive(s):
log(f"killing {s}")
kill_session(s)
# ── usage-limit handling ──────────────────────────────────────────────────────
#
# When a session hits the usage/spend limit it prints a banner ("…limit reached ∙
# resets 3am") and sits idle. The old behaviour either blind-nudged every heavy tick
# (claude) or let the generic idle-stall reboot it — losing the banner, re-hitting
# the limit fresh, and churning. The state machine below instead:
# 1. parses the reset time from the banner and arms a quiet window until reset+45s
# (parse failure → flat 5-minute probe loop, never exponential backoff);
# 2. suppresses ALL kill/reboot healing while the window is armed — a limit-stalled
# session is NEVER rebooted;
# 3. at the window's end sends ONE nudge as a self-verifying probe: if the limit
# really lifted the session resumes (spinner clears the state next tick); if
# Claude Code re-prints the banner we re-parse and re-arm.
# State is persisted per session in LOG_DIR so a watchdog restart keeps the window.
LIMIT_PROBE_FALLBACK = int(os.environ.get("LIMIT_PROBE_FALLBACK", 300)) # flat probe cadence
LIMIT_RESET_SLACK = int(os.environ.get("LIMIT_RESET_SLACK", 45)) # s past parsed reset
RESET_RE = re.compile(r"resets?\s*(?:at\s*)?(\d{1,2})(?::(\d{2}))?\s*(am|pm)?", re.I)
def _limit_state_path(session):
return Path(LOG_DIR) / f".limited-{session}"
def _load_limit_state(session):
try:
return json.loads(_limit_state_path(session).read_text())
except Exception:
return None
def _save_limit_state(session, state):
_limit_state_path(session).write_text(json.dumps(state))
def _clear_limit_state(session):
try:
_limit_state_path(session).unlink()
except FileNotFoundError:
pass
def _parse_reset_epoch(pane):
"""Parse the reset time-of-day from a limit banner (last match = freshest banner).
Returns an epoch in local time, rolled to tomorrow if the time already passed today,
or None when unparsable."""
matches = list(RESET_RE.finditer(pane))
if not matches:
return None
m = matches[-1]
try:
hour, minute = int(m.group(1)), int(m.group(2) or 0)
ampm = (m.group(3) or "").lower()
if ampm == "pm" and hour != 12:
hour += 12
elif ampm == "am" and hour == 12:
hour = 0
if hour > 23 or minute > 59:
return None
cand = datetime.now().replace(hour=hour, minute=minute, second=0, microsecond=0)
if cand.timestamp() <= time.time():
cand += timedelta(days=1)
return cand.timestamp()
except Exception:
return None
def _next_limit_until(pane, now):
"""When should we next probe? Parsed reset + slack when the banner is parsable and
plausibly fresh; else flat LIMIT_PROBE_FALLBACK. Time-of-day-only banners come from
the 5-hour limit window, so a fresh reset is always ≤ ~5h away — a parse landing
further out means the stated time already passed (stale banner rolled to tomorrow):
probe soon instead of waiting most of a day. Longer (weekly) limits print a date,
which doesn't parse → flat probe loop by design."""
parsed = _parse_reset_epoch(pane)
if parsed is not None and parsed - now <= 6 * 3600:
return parsed + LIMIT_RESET_SLACK, True
return now + LIMIT_PROBE_FALLBACK, False
def _limit_nudge_msg(role):
# CAUTION: this text lands in the pane that limit_tick later greps. It must NEVER
# match LIMIT_RE (found 2026-06-11: "usage limit has reset" matched, making the
# window self-sustaining once the probe scrolled into the conversation).
if role == "orchestrator":
return ("watchdog probe: if the quota window has reset, RESUME now — "
"you are the cc-ci orchestrator; re-check loop status and continue "
"supervising from where you stopped.")
return ("watchdog probe: if the quota window has reset, RESUME your loop now — "
"pull latest, re-read your phase STATUS/REVIEW files, and continue from "
"where you stopped; re-arm your loop pacing.")
def limit_tick(role, session, pane):
"""Unified limit state machine for both backends and the orchestrator.
Returns True while the session is inside a limit window — callers must then
suppress idle accounting, reboots, and other healing."""
state = _load_limit_state(session)
limited_now = bool(LIMIT_RE.search(pane))
if state is None:
if not limited_now or ACTIVE_RE.search(pane):
return False
now = time.time()
until, parsed = _next_limit_until(pane, now)
if parsed:
log(f"limit hit on {role} ({session}) — banner says reset "
f"{datetime.fromtimestamp(until):%a %H:%M}; holding until then (no reboots)")
else:
log(f"limit hit on {role} ({session}) — reset time unparsable; flat "
f"{LIMIT_PROBE_FALLBACK // 60}-min probe loop (no reboots)")
_save_limit_state(session, {"until": until, "nudges": 0})
return True
# Armed window: a spinner or a vanished banner means the limit lifted.
if ACTIVE_RE.search(pane) or not limited_now:
log(f"limit lifted on {role} ({session}) — clearing limit state")
_clear_limit_state(session)
return False
now = time.time()
if now < state.get("until", 0):
return True # quietly inside the window
# Window elapsed, banner still showing → probe. Dedupe: never stack a second probe
# while our own text sits typed-but-unsubmitted in the INPUT area (bottom lines only —
# checking the whole pane wedged the machine once a submitted probe scrolled into the
# conversation above, suppressing all further probes; found 2026-06-11).
msg = _limit_nudge_msg(role)
if msg[:28] in "\n".join(pane.splitlines()[-8:]):
return True
nudges = state.get("nudges", 0) + 1
log(f"limit probe #{nudges} on {role} ({session}) — nudging to resume")
ping_session(session, msg, submit_key=_SUBMIT)
# Self-verify: re-read the pane. A spinner with no banner = resumed; a re-printed
# banner = still limited → re-arm from the fresh banner (else flat fallback).
time.sleep(3)
pane2 = capture_pane(session, 40)
if ACTIVE_RE.search(pane2) and not LIMIT_RE.search(pane2):
log(f"limit lifted on {role} ({session}) — probe resumed it")
_clear_limit_state(session)
return False
until, _ = _next_limit_until(pane2, now)
if nudges == 3:
log(f"WARNING: {role} ({session}) still limited after {nudges} probes — "
f"continuing flat {LIMIT_PROBE_FALLBACK // 60}-min probes; never rebooting")
_save_limit_state(session, {"until": until, "nudges": nudges})
return True
# ── session healing ───────────────────────────────────────────────────────────
def heal_session(role, session, workdir):
"""Restart a dead session; kill+restart a FATAL-wedged one; never touch a limit-stalled one."""
if not session_alive(session):
log(f"{role} ({session}) gone — restarting (phase {phase_id(cur_idx())})")
start_agent(role, session, workdir)
return
pane = capture_pane(session, 25)
if ACTIVE_RE.search(pane):
return # actively working — leave it alone
if limit_tick(role, session, pane):
return # inside a usage-limit window — limit_tick owns nudging; NEVER kill
if FATAL_RE.search(pane):
log(f"FATAL session-state error on {role} ({session}) — kill + restart fresh")
kill_session(session)
start_agent(role, session, workdir)
return
# ── stall detection ───────────────────────────────────────────────────────────
_idle_since: dict[str, float] = {}
def _parse_waiting_until(pane):
"""Extract the epoch timestamp from a WAITING-UNTIL marker, or None."""
if BACKEND == "opencode":
line = _last_nonempty_line(pane)
if not line.startswith("WAITING-UNTIL:"):
return None
m = re.search(r"WAITING-UNTIL:\s*(\S+)", line)
else:
m = re.search(r"WAITING-UNTIL:\s*(\S+)", pane)
if not m:
return None
try:
ts = m.group(1)
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
return dt.timestamp()
except Exception:
return None
def stall_check_one(role, session, workdir):
if not session_alive(session):
_idle_since[session] = 0.0
_clear_limit_state(session)
return
now = time.time()
pane = capture_pane(session, 40)
# Limit windows (both backends): suppress idle accounting and reboots entirely —
# a limit-stalled session must never reach the kill+reboot below.
if limit_tick(role, session, pane):
_idle_since[session] = 0.0
return
if ACTIVE_RE.search(pane) or (BACKEND == "opencode" and (
RECENT_ACTIVITY_RE.search(pane) or _log_recently_touched(session, OPENCODE_LOG_GRACE)
)):
_idle_since[session] = 0.0
return
since = _idle_since.get(session) or now
_idle_since[session] = since
idle = now - since
until = _parse_waiting_until(pane)
if until is not None:
# Declared wait: only reboot once STALL_GRACE seconds past the stated time.
# Never reboot before — that races with the healthy self-wake.
if now <= until + STALL_GRACE:
return
reason = f"past its WAITING-UNTIL by {int(now - until)}s — self-wake did not fire"
else:
stall_idle = OPENCODE_STALL_IDLE if BACKEND == "opencode" else STALL_IDLE
if idle < stall_idle:
return
reason = f"idle {int(idle)}s with no WAITING-UNTIL marker"
log(f"stall: {role} ({session}) {reason} — kill + reboot")
kill_session(session)
start_agent(role, session, workdir)
_idle_since[session] = 0.0
def stall_check():
stall_check_one("builder", BUILDER_SESSION, BUILDER_DIR)
stall_check_one("adversary", ADV_SESSION, ADV_DIR)
# ── orchestrator healing ──────────────────────────────────────────────────────
def orchestrator_alive():
"""
True if an orchestrator process is running anywhere.
Conflict-safety: never launch a second orchestrator resuming the same session
(double-resume causes "thinking blocks cannot be modified" crashes).
"""
for line in subprocess.run("pgrep -x claude || true", shell=True,
capture_output=True, text=True).stdout.splitlines():
pid = line.strip()
if not pid:
continue
try:
cmdline = Path(f"/proc/{pid}/cmdline").read_bytes().decode(errors="replace").replace("\0", " ")
# Skip the loop sessions and the upgrader — they're not the orchestrator.
if re.search(r"--remote-control\s+'?cc-ci-(builder|adv|upgrader)'?", cmdline):
continue
return True
except Exception:
pass
return session_alive(ORCH_SESSION)
def heal_orchestrator():
if not WATCH_ORCHESTRATOR:
return
if not Path(ORCH_LAUNCHER).is_file():
return
if orchestrator_alive():
if session_alive(ORCH_SESSION):
pane = capture_pane(ORCH_SESSION, 25)
if ACTIVE_RE.search(pane):
return
if limit_tick("orchestrator", ORCH_SESSION, pane):
return # limit window — never kill/restart the orchestrator over a limit
if FATAL_RE.search(pane):
log(f"FATAL session-state error on orchestrator ({ORCH_SESSION}) — kill + restart")
kill_session(ORCH_SESSION)
subprocess.run([ORCH_LAUNCHER, "start"], capture_output=True)
return
log(f"orchestrator not running — restarting via {ORCH_LAUNCHER}")
subprocess.run([ORCH_LAUNCHER, "start"], capture_output=True)
def wake_orchestrator():
"""Hourly supervision nudge: type the progress-monitor prompt into the orchestrator
session so it reviews the loops. Returns True when the wake was delivered (or is moot),
False when it should be retried on a later tick.
Skips (retry later) if the orchestrator is absent — heal_orchestrator restarts it — or
actively working, so we never interrupt a turn; the wake lands the moment it goes idle.
"""
if not WATCH_ORCHESTRATOR:
return True # feature off — treat as handled so the timer doesn't spin
if not session_alive(ORCH_SESSION):
return False
if _load_limit_state(ORCH_SESSION) is not None:
# Deliberately wake anyway: the hourly prompt doubles as a fallback that keeps
# the orchestrator on track even if the limit-state machinery breaks. If the
# limit is genuinely still in force the wake is harmless (banner re-prints).
log(f"orchestrator ({ORCH_SESSION}) is in a limit window — sending hourly wake anyway (fallback)")
if ACTIVE_RE.search(capture_pane(ORCH_SESSION, 25)):
return False # busy — don't interrupt; retry when idle
try:
msg = " ".join(Path(ORCH_WAKE_PROMPT).read_text().split())
except FileNotFoundError:
log(f"orchestrator wake skipped — prompt file missing: {ORCH_WAKE_PROMPT}")
return True
if not msg:
return True
log(f"waking orchestrator ({ORCH_SESSION}) for scheduled supervision pass")
ping_session(ORCH_SESSION, msg, submit_key=_SUBMIT)
return True
def orch_limit_check():
"""Signal-tick limit care for the orchestrator: heal_orchestrator only runs on
heavy ticks, so this keeps the orchestrator's limit-resume latency ≤ SIGNAL_INTERVAL —
the same best treatment the loops get."""
if not WATCH_ORCHESTRATOR or not session_alive(ORCH_SESSION):
return
limit_tick("orchestrator", ORCH_SESSION, capture_pane(ORCH_SESSION, 40))
# ── handoff signalling ────────────────────────────────────────────────────────
_last_sha = ""
_adv_inbox_seen = ""
_builder_inbox_seen = ""
def handoff_reset():
global _last_sha, _adv_inbox_seen, _builder_inbox_seen
_last_sha = _adv_inbox_seen = _builder_inbox_seen = ""
def _fetch_origin():
subprocess.run(f"git -C {BUILDER_DIR!r} fetch -q origin", shell=True, capture_output=True)
def _show_pushed(path):
"""Read a file from origin/main (machine-docs/ first, then repo root)."""
for loc in (f"origin/main:machine-docs/{path}", f"origin/main:{path}"):
r = subprocess.run(
f"git -C {BUILDER_DIR!r} show {loc!r}",
shell=True, capture_output=True, text=True)
if r.returncode == 0:
return r.stdout
return ""
_SUBMIT = "C-m" if BACKEND == "opencode" else "Enter"
def handoff_check():
global _last_sha, _adv_inbox_seen, _builder_inbox_seen
_fetch_origin()
r = subprocess.run(
f"git -C {BUILDER_DIR!r} rev-parse origin/main",
shell=True, capture_output=True, text=True)
head = r.stdout.strip()
if head:
if not _last_sha:
_last_sha = head # baseline silently on first tick
elif head != _last_sha:
subjects = subprocess.run(
f"git -C {BUILDER_DIR!r} log --format=%s {_last_sha}..origin/main",
shell=True, capture_output=True, text=True).stdout
if re.search(r"^claim", subjects, re.MULTILINE | re.IGNORECASE):
log("handoff: new claim(...) commit → pinging Adversary")
ping_session(ADV_SESSION,
"watchdog ping: the Builder pushed a gate CLAIM (claim(...) commit). "
"Pull and verify the claimed gate now.", submit_key=_SUBMIT)
if re.search(r"^review", subjects, re.MULTILINE | re.IGNORECASE):
log("handoff: new review(...) commit → pinging Builder")
ping_session(BUILDER_SESSION,
"watchdog ping: the Adversary pushed a verdict/finding (review(...) commit). "
"Pull REVIEW and act — proceed if it PASSes your gate, address it if it's a finding.",
submit_key=_SUBMIT)
_last_sha = head
adv_inbox = _show_pushed("ADVERSARY-INBOX.md")
builder_inbox = _show_pushed("BUILDER-INBOX.md")
def md5(s): return hashlib.md5(s.encode()).hexdigest()
if adv_inbox:
h = md5(adv_inbox)
if h != _adv_inbox_seen:
log("handoff: ADVERSARY-INBOX.md changed → pinging Adversary")
ping_session(ADV_SESSION,
"watchdog ping: the Builder pushed machine-docs/ADVERSARY-INBOX.md — "
"pull, read it, act, then delete the file (commit + push) to mark it consumed.",
submit_key=_SUBMIT)
_adv_inbox_seen = h
else:
_adv_inbox_seen = ""
if builder_inbox:
h = md5(builder_inbox)
if h != _builder_inbox_seen:
log("handoff: BUILDER-INBOX.md changed → pinging Builder")
ping_session(BUILDER_SESSION,
"watchdog ping: the Adversary pushed machine-docs/BUILDER-INBOX.md — "
"pull, read it, act, then delete the file (commit + push) to mark it consumed.",
submit_key=_SUBMIT)
_builder_inbox_seen = h
else:
_builder_inbox_seen = ""
# ── watchdog loop ─────────────────────────────────────────────────────────────
def watchdog_loop():
idx = cur_idx()
log(f"watchdog up — phase={phase_id(idx)} [{idx+1}/{len(PHASES)}] "
f"seq='{all_ids()}' signal={SIGNAL_INTERVAL}s heavy={WATCH_INTERVAL}s")
elapsed = WATCH_INTERVAL # force a heavy check on the first tick
wake_elapsed = 0 # first orchestrator wake fires after a full interval, not at startup
while True:
handoff_check()
stall_check()
orch_limit_check()
if wake_elapsed >= ORCH_WAKE_INTERVAL:
# Reset only once the wake actually lands; if the orchestrator is busy/absent,
# leave the timer tripped so we retry each tick until it's idle.
if wake_orchestrator():
wake_elapsed = 0
if elapsed >= WATCH_INTERVAL:
elapsed = 0
idx = cur_idx()
pid = phase_id(idx)
status = phase_status(idx)
if phase_done(status):
next_idx = idx + 1
if next_idx < len(PHASES):
log(f"PHASE {pid} DONE — auto-transitioning to {phase_id(next_idx)}")
stop_loops()
Path(PHASE_IDX_FILE).write_text(str(next_idx))
handoff_reset()
start_loops()
else:
log(f"PHASE SEQUENCE COMPLETE (last phase {pid} DONE) — stopping loops")
stop_loops()
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
Path(LOG_DIR, "SEQUENCE-COMPLETE").write_text(
f"cc-ci phase sequence complete {ts}. Phases: {all_ids()}. "
f"Loops stopped; entire build finished.\n")
log("watchdog exiting.")
return
else:
heal_session("builder", BUILDER_SESSION, BUILDER_DIR)
heal_session("adversary", ADV_SESSION, ADV_DIR)
heal_orchestrator()
time.sleep(SIGNAL_INTERVAL)
elapsed += SIGNAL_INTERVAL
wake_elapsed += SIGNAL_INTERVAL
def start_watchdog():
if session_alive(WATCHDOG_SESSION):
log("watchdog already running")
return
log("starting watchdog")
script = Path(__file__).resolve()
# Forward the phase spec / idx file / backend explicitly. The watchdog tmux session is spawned
# into the existing tmux server and would otherwise NOT inherit a custom PHASES_SPEC from the
# caller's env — it would fall back to the default spec and mis-detect phase completion.
env_prefix = (
f"PHASES_SPEC='{PHASES_SPEC}' PHASE_IDX_FILE='{PHASE_IDX_FILE}' "
f"LOOP_BACKEND='{BACKEND}' LOOP_MODEL='{LOOP_MODEL}' ADV_MODEL='{ADV_MODEL}' "
)
subprocess.run([
"tmux", "new-session", "-d", "-s", WATCHDOG_SESSION, "-c", PLAN_DIR,
f"exec >>'{LOG_DIR}/watchdog.log' 2>&1; {env_prefix}python3 '{script}' watchdog"
])
def start_cleanlogs():
"""Maintain readable, greppable per-agent transcript logs (<agent>.clean.log) by tailing each
session's JSONL — costs nothing on the agents (read-only on a file claude writes anyway)."""
if session_alive("cc-ci-cleanlogs"):
log("cleanlogs already running")
return
log("starting cleanlogs (per-agent <agent>.clean.log)")
al = Path(__file__).resolve().parent / "agent-log.py"
subprocess.run(["tmux", "new-session", "-d", "-s", "cc-ci-cleanlogs", "-c", PLAN_DIR,
f"python3 '{al}' follow-all"])
# ── preflight ─────────────────────────────────────────────────────────────────
def preflight():
import shutil
if not shutil.which("tmux"):
die("tmux not found")
if BACKEND == "claude":
if not shutil.which(CLAUDE_BIN):
die(f"claude CLI not found — set CLAUDE_BIN (currently: {CLAUDE_BIN})")
elif BACKEND == "opencode":
if not Path(OPENCODE_BIN).exists():
die(f"opencode not found at {OPENCODE_BIN}")
else:
die(f"unknown LOOP_BACKEND '{BACKEND}' — use 'claude' or 'opencode'")
for phase in PHASES:
plan = Path(PLAN_DIR) / phase[1]
if not plan.exists():
die(f"missing phase plan: {plan}")
for prompt_file in ("builder.md", "adversary.md"):
if not (Path(PLAN_DIR) / "prompts" / prompt_file).exists():
die(f"missing {PLAN_DIR}/prompts/{prompt_file}")
Path(LOG_DIR).mkdir(parents=True, exist_ok=True)
# ── status ────────────────────────────────────────────────────────────────────
def cmd_status():
idx = cur_idx()
pid = phase_id(idx)
print(f" phase: {pid} [{idx+1}/{len(PHASES)}] plan={phase_plan(idx)} status={phase_status(idx)}")
for s in (BUILDER_SESSION, ADV_SESSION, WATCHDOG_SESSION):
state = "RUNNING" if session_alive(s) else "stopped"
print(f" {s}: {state}")
done_str = "## DONE" if phase_done(phase_status(idx)) else "in progress"
print(f" phase {pid}: {done_str}")
seq = Path(LOG_DIR) / "SEQUENCE-COMPLETE"
if seq.exists():
print(f" >>> {seq.read_text().strip()}")
# ── main ──────────────────────────────────────────────────────────────────────
def main():
cmd = sys.argv[1] if len(sys.argv) > 1 else ""
if cmd == "start":
preflight()
stop_loops()
if os.environ.get("RESUME_PHASE") != "1":
Path(PHASE_IDX_FILE).write_text("0")
seq = Path(LOG_DIR) / "SEQUENCE-COMPLETE"
if seq.exists():
seq.unlink()
# Persist backend/model so the watchdog uses them when restarting dead sessions.
Path(_BACKEND_FILE).write_text(BACKEND)
Path(_MODEL_FILE).write_text(LOOP_MODEL)
Path(_ADV_MODEL_FILE).write_text(ADV_MODEL)
Path(_PHASES_FILE).write_text(PHASES_SPEC)
log(f"backend={BACKEND} model={LOOP_MODEL or '<default>'} "
f"adv-model={ADV_MODEL or '<same>'} (persisted to {_BACKEND_FILE})")
log(f"phases='{all_ids()}' (persisted to {_PHASES_FILE})")
start_loops()
start_watchdog()
start_cleanlogs()
log(f"started at phase {phase_id(cur_idx())}.")
elif cmd == "watchdog":
preflight()
watchdog_loop()
elif cmd == "status":
cmd_status()
elif cmd == "stop":
stop_loops()
if session_alive(WATCHDOG_SESSION):
log(f"killing {WATCHDOG_SESSION}")
kill_session(WATCHDOG_SESSION)
log("stopped.")
elif cmd == "logs":
sub = sys.argv[2] if len(sys.argv) > 2 else ""
log_files = {
"builder": f"{LOG_DIR}/{BUILDER_SESSION}.log",
"adversary": f"{LOG_DIR}/{ADV_SESSION}.log",
"watchdog": f"{LOG_DIR}/watchdog.log",
}
if sub not in log_files:
die("usage: launch.py logs builder|adversary|watchdog")
os.execvp("tail", ["tail", "-f", log_files[sub]])
else:
print(f"""cc-ci loop launcher (phase-aware)
launch.py start start loops + watchdog (RESUME_PHASE=1 to keep current phase)
launch.py stop stop loops + watchdog
launch.py status show phase + session state
launch.py logs builder|adversary|watchdog tail a log
launch.py watchdog run watchdog in foreground
Backend: {BACKEND} Model: {LOOP_MODEL or '<default>'} Adversary model: {ADV_MODEL or '<same>'}
Phase sequence ({len(PHASES)} phases, auto-advance on ## DONE, stop after last):
{all_ids()}
""")
if __name__ == "__main__":
main()