#!/usr/bin/env python3 """ cc-ci loop launcher — phase-aware Builder/Adversary loops + watchdog. Usage: launch.py start start loops + watchdog (resets to phase 0 unless RESUME_PHASE=1) launch.py stop stop loops + watchdog launch.py status show phase + session state launch.py watchdog run the watchdog in the foreground (called by start_watchdog) launch.py logs builder|adversary|watchdog tail a log Env (all optional — defaults shown): LOOP_BACKEND claude (default) | opencode LOOP_MODEL model flag, e.g. "sonnet" (claude) or "tinfoil/deepseek-v4-pro" (opencode) ADV_MODEL optional model override for the Adversary only (falls back to LOOP_MODEL) RESUME_PHASE 1 = keep current phase index on start (default resets to 0) CLAUDE_BIN claude OPENCODE_BIN /home/loops/.local/bin/opencode OPENCODE_SERVER http://127.0.0.1:4096 PLAN_DIR /srv/cc-ci/cc-ci-plan BUILDER_DIR /srv/cc-ci/cc-ci ADV_DIR /srv/cc-ci/cc-ci-adv LOG_DIR /srv/cc-ci/.cc-ci-logs PHASES_SPEC semicolon-separated "id|planfile|statusfile" entries PHASE_IDX_FILE $LOG_DIR/.phase-idx WATCH_INTERVAL 300 (seconds between heavy checks: phase DONE / heal sessions) SIGNAL_INTERVAL 30 (seconds between handoff / stall checks) ORCH_WAKE_INTERVAL 3600 (seconds between supervision wakes typed into the orchestrator session) ORCH_WAKE_PROMPT $PLAN_DIR/ai-progress-monitor-prompt.txt (the supervision prompt) STALL_IDLE 300 (idle seconds without a WAITING-UNTIL before reboot) STALL_GRACE 180 (seconds past a WAITING-UNTIL before reboot) """ import hashlib, json, os, re, subprocess, sys, time from datetime import datetime, timedelta, timezone from pathlib import Path # ── config ──────────────────────────────────────────────────────────────────── PLAN_DIR = os.environ.get("PLAN_DIR", "/srv/cc-ci/cc-ci-plan") BUILDER_DIR = os.environ.get("BUILDER_DIR", "/srv/cc-ci/cc-ci") ADV_DIR = os.environ.get("ADV_DIR", "/srv/cc-ci/cc-ci-adv") LOG_DIR = os.environ.get("LOG_DIR", "/srv/cc-ci/.cc-ci-logs") # Backend is read from env, falling back to a persisted file written by `start`. # This ensures the watchdog (which runs in its own tmux session without the caller's env) # uses the same backend/model when it restarts a dead session. _BACKEND_FILE = os.path.join(LOG_DIR, ".loop-backend") _PHASES_FILE = os.path.join(LOG_DIR, ".phases-spec") _MODEL_FILE = os.path.join(LOG_DIR, ".loop-model") _ADV_MODEL_FILE = os.path.join(LOG_DIR, ".loop-model-adv") def _read_file_default(path, default): try: v = Path(path).read_text().strip() return v if v else default except FileNotFoundError: return default BACKEND = os.environ.get("LOOP_BACKEND") or _read_file_default(_BACKEND_FILE, "claude") LOOP_MODEL = os.environ.get("LOOP_MODEL") or _read_file_default(_MODEL_FILE, "") # Per-role override: the Adversary may run a different model (e.g. opus for review while the # Builder runs fable). Empty → falls back to LOOP_MODEL at the call site. ADV_MODEL = os.environ.get("ADV_MODEL") or _read_file_default(_ADV_MODEL_FILE, "") def role_model(role): # Per-phase override wins: a file LOG_DIR/.loop-model- (builder/shared) or # .loop-model-adv- (adversary) pins a model for just that phase. Read fresh on # every call (cur_idx → PHASE_IDX_FILE), so a phase transition flips the model # automatically — e.g. opus for a hard phase, sonnet after — with no watchdog bounce. pid = phase_id(cur_idx()) if role == "adversary": ov = _read_file_default(os.path.join(LOG_DIR, f".loop-model-adv-{pid}"), "") return ov or ADV_MODEL or LOOP_MODEL ov = _read_file_default(os.path.join(LOG_DIR, f".loop-model-{pid}"), "") return ov or LOOP_MODEL REMOTE_CONTROL = os.environ.get("REMOTE_CONTROL", "1") == "1" CLAUDE_BIN = os.environ.get("CLAUDE_BIN", "claude") CLAUDE_FLAGS = os.environ.get("CLAUDE_FLAGS", "") if os.getuid() == 0: os.environ.setdefault("CLAUDE_DANGEROUSLY_SKIP_PERMISSIONS", "1") else: CLAUDE_FLAGS = os.environ.get("CLAUDE_FLAGS", "--dangerously-skip-permissions") OPENCODE_BIN = os.environ.get("OPENCODE_BIN", "/home/loops/.local/bin/opencode") OPENCODE_SERVER = os.environ.get("OPENCODE_SERVER", "http://127.0.0.1:4096") ORCH_SESSION = os.environ.get("ORCH_SESSION", "cc-ci-orchestrator") ORCH_LAUNCHER = os.environ.get("ORCH_LAUNCHER", f"{PLAN_DIR}/launch-orchestrator.sh") WATCH_ORCHESTRATOR = os.environ.get("WATCH_ORCHESTRATOR", "1") == "1" BUILDER_SESSION = "cc-ci-builder" ADV_SESSION = "cc-ci-adv" WATCHDOG_SESSION = "cc-ci-watchdog" WATCH_INTERVAL = int(os.environ.get("WATCH_INTERVAL", 300)) SIGNAL_INTERVAL = int(os.environ.get("SIGNAL_INTERVAL", 30)) # Hourly supervision wake: the watchdog types this prompt into the orchestrator session # so it reviews the loops and nudges as needed (replaces the standalone ai-progress-monitor waker). ORCH_WAKE_INTERVAL = int(os.environ.get("ORCH_WAKE_INTERVAL", 3600)) ORCH_WAKE_PROMPT = os.environ.get("ORCH_WAKE_PROMPT", f"{PLAN_DIR}/ai-progress-monitor-prompt.txt") STALL_IDLE = int(os.environ.get("STALL_IDLE", 300)) STALL_GRACE = int(os.environ.get("STALL_GRACE", 180)) _DEFAULT_PHASES_SPEC = ";".join([ "1c|plan-phase1c-full-reproducibility.md|STATUS-1c.md", "1b|plan-phase1b-review-lint.md|STATUS-1b.md", "1d|plan-phase1d-generic-test-suite.md|STATUS-1d.md", "1e|plan-phase1e-harness-corrections.md|STATUS-1e.md", "2w|plan-phase2w-warm-canonical-quick.md|STATUS-2w.md", "2pc|plan-phase2pc-image-cache.md|STATUS-2pc.md", "2|plan-phase2-recipe-tests.md|STATUS-2.md", "2b|plan-phase2b-test-performance.md|STATUS-2b.md", "3|plan-phase3-results-ux.md|STATUS-3.md", "4|plan-phase4-final-review-polish-cleanup.md|STATUS-4.md", "5|plan-phase5-verify-upgrade-flow.md|STATUS-5.md", ]) # Env wins; else a persisted file written by `start` (so status/watchdog/reboot all agree on the # current phase set); else the default build sequence above. PHASES_SPEC = os.environ.get("PHASES_SPEC") or _read_file_default(_PHASES_FILE, _DEFAULT_PHASES_SPEC) PHASES = [p.split("|") for p in PHASES_SPEC.split(";")] PHASE_IDX_FILE = os.environ.get("PHASE_IDX_FILE", f"{LOG_DIR}/.phase-idx") # Regex patterns for session-state detection ACTIVE_RE = re.compile(r"esc to interrupt|⠋|⠙|⠹|⠸|⠼|⠴|⠦|⠧|⠇|⠏|Running tool|▣|Build ·|· \d+") LIMIT_RE = re.compile(r"spend limit|usage limit|limit reached|reached your .*limit|out of (credits|tokens)", re.I) FATAL_RE = re.compile(r"redacted_thinking|blocks cannot be modified|cannot be modified", re.I) RECENT_ACTIVITY_RE = re.compile(r"thinking|inferring|running tool|remote control (active|connecting)|tool call|schedulewake?up", re.I) OPENCODE_STALL_IDLE = int(os.environ.get("OPENCODE_STALL_IDLE", 900)) OPENCODE_LOG_GRACE = int(os.environ.get("OPENCODE_LOG_GRACE", 180)) # ── logging ─────────────────────────────────────────────────────────────────── def log(msg): ts = datetime.now().strftime("%H:%M:%S") print(f"[launch {ts}] {msg}", flush=True) def die(msg): log(f"ERROR: {msg}") sys.exit(1) # ── tmux helpers ────────────────────────────────────────────────────────────── def session_alive(name): return subprocess.run( ["tmux", "has-session", "-t", name], capture_output=True ).returncode == 0 def kill_session(name): subprocess.run(["tmux", "kill-session", "-t", name], capture_output=True) def capture_pane(name, lines=40): r = subprocess.run(["tmux", "capture-pane", "-pt", name], capture_output=True, text=True) return "\n".join(r.stdout.splitlines()[-lines:]) if r.returncode == 0 else "" def _session_log_path(session): return Path(LOG_DIR) / f"{session}.log" def _log_recently_touched(session, age_seconds): try: return (time.time() - _session_log_path(session).stat().st_mtime) <= age_seconds except FileNotFoundError: return False def _last_nonempty_line(text): for line in reversed(text.splitlines()): if line.strip(): return line.strip() return "" def pipe_to_log(session, log_path): subprocess.run(["tmux", "pipe-pane", "-o", "-t", session, f"cat >> '{log_path}'"]) def ping_session(session, msg, submit_key="Enter"): """Type a message into a tmux session and submit it. submit_key: "Enter" for claude; "C-m" for opencode (Ctrl+M = Enter). Retries the submit key until the typed prefix is no longer visible in the content area. opencode renders the input in the content area, so we check more lines. """ if not session_alive(session): return prefix = msg[:28] subprocess.run(["tmux", "send-keys", "-t", session, "-l", "--", msg], capture_output=True) time.sleep(0.5) for _ in range(10): subprocess.run(["tmux", "send-keys", "-t", session, submit_key], capture_output=True) time.sleep(1) # Check the top 20 lines of content (not just last 4 bottom UI) if prefix not in capture_pane(session, 20): return # message was accepted # ── phase helpers ───────────────────────────────────────────────────────────── def cur_idx(): try: v = Path(PHASE_IDX_FILE).read_text().strip() return int(v) if v.isdigit() else 0 except FileNotFoundError: return 0 def phase_id(idx): return PHASES[idx][0] def phase_plan(idx): return PHASES[idx][1] def phase_status(idx): return PHASES[idx][2] def all_ids(): return " ".join(p[0] for p in PHASES) def resolve_state(repo_dir, basename): """Return the path to a loop-state file — machine-docs/ if present, else repo root.""" p = Path(repo_dir) / "machine-docs" / basename return p if p.exists() else Path(repo_dir) / basename # A "## DONE" heading marks real completion ONLY when its body is a substantive completion # statement. A Builder that scaffolds a "## DONE / Not yet ..." placeholder section must not # trip the auto-advance — that false-completed and SKIPPED the mailu phase once (2026-06-11). _DONE_PLACEHOLDER_RE = re.compile( r"^\s*(not yet|not done|not complete|incomplete|pending\b|tbd\b|n/?a\b|" r"written here only|only when|to be (written|filled)|when all|<.*>)", re.I) def phase_done(status_basename): path = resolve_state(BUILDER_DIR, status_basename) try: lines = path.read_text().splitlines() except FileNotFoundError: return False for i, line in enumerate(lines): if not line.startswith("## DONE"): continue body = next((nxt for nxt in lines[i+1:] if nxt.strip()), "") # first non-empty body line if _DONE_PLACEHOLDER_RE.match(body): continue # placeholder section ("Not yet ...") — keep scanning, not real return True return False # ── kickoff prompt ──────────────────────────────────────────────────────────── def build_kickoff(role, idx): pid, plan, status = phase_id(idx), phase_plan(idx), phase_status(idx) preamble = ( f"*** cc-ci SUB-PHASE {pid} ***\n" f"SINGLE SOURCE OF TRUTH for THIS phase: /srv/cc-ci/cc-ci-plan/{plan} — read it in full " f"now; it defines this phase's mission and Definition of Done.\n" f"The general loop protocol still applies and lives in /srv/cc-ci/cc-ci-plan/plan.md " f"(§6.1 coordination, §7 pacing, §9 guardrails) — read those sections too.\n" f"Track loop state in PHASE-NAMESPACED files UNDER machine-docs/ in your repo clone " f"(create the dir if missing): machine-docs/{status}, machine-docs/BACKLOG-{pid}.md, " f"machine-docs/REVIEW-{pid}.md, machine-docs/JOURNAL-{pid}.md. machine-docs/DECISIONS.md " f"is shared (append).\n" f"FILE-LOCATION RULE (mandatory): ALL coordination / loop-state files live in " f"machine-docs/, NEVER the repo root — that includes STATUS/BACKLOG/REVIEW/JOURNAL " f"(phase-namespaced), DECISIONS.md, DEFERRED.md, and the ADVERSARY-INBOX.md / " f"BUILDER-INBOX.md side-channels. If you ever find one at the root, git mv it into " f"machine-docs/.\n" f'"Done" for this phase = the Builder writes "## DONE" to machine-docs/{status} ONLY after ' f"every Definition-of-Done item is Adversary-verified with a fresh PASS in " f"machine-docs/REVIEW-{pid}.md (handshake per §6.1).\n" f"The repo's Phase-1 STATUS.md / BACKLOG.md / REVIEW.md are HISTORY from the completed " f"Phase 1 — do NOT use them as your state; use the phase-namespaced files above.\n" f'Wherever the standing rules below say "plan.md"/"STATUS.md"/"BACKLOG.md"/"REVIEW.md", ' f"substitute the phase plan and these machine-docs/ phase-namespaced files.\n\n" f"=== standing role & rules ===\n" ) role_prompt = (Path(PLAN_DIR) / "prompts" / f"{role}.md").read_text() return preamble + role_prompt # ── agent launch ────────────────────────────────────────────────────────────── def start_agent(role, session, workdir): if session_alive(session): log(f"{session} already running — leaving it") return Path(workdir).mkdir(parents=True, exist_ok=True) Path(LOG_DIR).mkdir(parents=True, exist_ok=True) idx = cur_idx() pid, plan = phase_id(idx), phase_plan(idx) kf = Path(LOG_DIR) / f".kickoff-{session}.txt" kf.write_text(build_kickoff(role, idx)) model = role_model(role) model_flag = f"--model '{model}'" if model else "" session_cwd = workdir if BACKEND == "claude": rc = f"--remote-control '{session}'" if REMOTE_CONTROL else "" cmd = f"{CLAUDE_BIN} {rc} {model_flag} {CLAUDE_FLAGS} \"$(cat '{kf}')\"" log(f"starting {session} (backend=claude, phase={pid}, plan={plan}, model={model or 'default'})") elif BACKEND == "opencode": # Attach each TUI to the shared opencode web server so sessions are recorded the same # way as browser-created sessions, including a populated `path` in the DB. # We still pin the visible project root with --dir, while the kickoff instructions use # absolute repo paths for builder/adversary work. session_cwd = "/srv/cc-ci-orch/cc-ci" cmd = ( f"set -a; . /srv/cc-ci/.testenv; set +a; " f"NO_COLOR=1 {OPENCODE_BIN} attach {OPENCODE_SERVER} --dir {session_cwd}" ) log(f"starting {session} (backend=opencode, phase={pid}, model={model or 'default'})") log(f" visible at http://oc.commoninternet.net (tailnet only)") else: die(f"unknown BACKEND '{BACKEND}' — set LOOP_BACKEND=claude or LOOP_BACKEND=opencode") subprocess.run(["tmux", "new-session", "-d", "-s", session, "-c", session_cwd, cmd]) pipe_to_log(session, f"{LOG_DIR}/{session}.log") # opencode: send a short bootstrap once the TUI is ready. # opencode TUI uses C-m (Ctrl+M = Enter) to submit messages. # The full kickoff lives in the kickoff file; we point to it to stay under send-keys limits. if BACKEND == "opencode": time.sleep(12) # opencode TUI needs more time to connect to the server bootstrap = ( f"Your full kickoff prompt is in {kf} — read it now with: " f"`cat '{kf}'` — then follow its instructions exactly." ) ping_session(session, bootstrap, submit_key="C-m") def start_loops(): start_agent("builder", BUILDER_SESSION, BUILDER_DIR) start_agent("adversary", ADV_SESSION, ADV_DIR) def stop_loops(): for s in (BUILDER_SESSION, ADV_SESSION): if session_alive(s): log(f"killing {s}") kill_session(s) # ── usage-limit handling ────────────────────────────────────────────────────── # # When a session hits the usage/spend limit it prints a banner ("…limit reached ∙ # resets 3am") and sits idle. The old behaviour either blind-nudged every heavy tick # (claude) or let the generic idle-stall reboot it — losing the banner, re-hitting # the limit fresh, and churning. The state machine below instead: # 1. parses the reset time from the banner and arms a quiet window until reset+45s # (parse failure → flat 5-minute probe loop, never exponential backoff); # 2. suppresses ALL kill/reboot healing while the window is armed — a limit-stalled # session is NEVER rebooted; # 3. at the window's end sends ONE nudge as a self-verifying probe: if the limit # really lifted the session resumes (spinner clears the state next tick); if # Claude Code re-prints the banner we re-parse and re-arm. # State is persisted per session in LOG_DIR so a watchdog restart keeps the window. LIMIT_PROBE_FALLBACK = int(os.environ.get("LIMIT_PROBE_FALLBACK", 300)) # flat probe cadence LIMIT_RESET_SLACK = int(os.environ.get("LIMIT_RESET_SLACK", 45)) # s past parsed reset RESET_RE = re.compile(r"resets?\s*(?:at\s*)?(\d{1,2})(?::(\d{2}))?\s*(am|pm)?", re.I) def _limit_state_path(session): return Path(LOG_DIR) / f".limited-{session}" def _load_limit_state(session): try: return json.loads(_limit_state_path(session).read_text()) except Exception: return None def _save_limit_state(session, state): _limit_state_path(session).write_text(json.dumps(state)) def _clear_limit_state(session): try: _limit_state_path(session).unlink() except FileNotFoundError: pass def _parse_reset_epoch(pane): """Parse the reset time-of-day from a limit banner (last match = freshest banner). Returns an epoch in local time, rolled to tomorrow if the time already passed today, or None when unparsable.""" matches = list(RESET_RE.finditer(pane)) if not matches: return None m = matches[-1] try: hour, minute = int(m.group(1)), int(m.group(2) or 0) ampm = (m.group(3) or "").lower() if ampm == "pm" and hour != 12: hour += 12 elif ampm == "am" and hour == 12: hour = 0 if hour > 23 or minute > 59: return None cand = datetime.now().replace(hour=hour, minute=minute, second=0, microsecond=0) if cand.timestamp() <= time.time(): cand += timedelta(days=1) return cand.timestamp() except Exception: return None def _next_limit_until(pane, now): """When should we next probe? Parsed reset + slack when the banner is parsable and plausibly fresh; else flat LIMIT_PROBE_FALLBACK. Time-of-day-only banners come from the 5-hour limit window, so a fresh reset is always ≤ ~5h away — a parse landing further out means the stated time already passed (stale banner rolled to tomorrow): probe soon instead of waiting most of a day. Longer (weekly) limits print a date, which doesn't parse → flat probe loop by design.""" parsed = _parse_reset_epoch(pane) if parsed is not None and parsed - now <= 6 * 3600: return parsed + LIMIT_RESET_SLACK, True return now + LIMIT_PROBE_FALLBACK, False def _limit_nudge_msg(role): # CAUTION: this text lands in the pane that limit_tick later greps. It must NEVER # match LIMIT_RE (found 2026-06-11: "usage limit has reset" matched, making the # window self-sustaining once the probe scrolled into the conversation). if role == "orchestrator": return ("watchdog probe: if the quota window has reset, RESUME now — " "you are the cc-ci orchestrator; re-check loop status and continue " "supervising from where you stopped.") return ("watchdog probe: if the quota window has reset, RESUME your loop now — " "pull latest, re-read your phase STATUS/REVIEW files, and continue from " "where you stopped; re-arm your loop pacing.") def limit_tick(role, session, pane): """Unified limit state machine for both backends and the orchestrator. Returns True while the session is inside a limit window — callers must then suppress idle accounting, reboots, and other healing.""" state = _load_limit_state(session) limited_now = bool(LIMIT_RE.search(pane)) if state is None: if not limited_now or ACTIVE_RE.search(pane): return False now = time.time() until, parsed = _next_limit_until(pane, now) if parsed: log(f"limit hit on {role} ({session}) — banner says reset " f"{datetime.fromtimestamp(until):%a %H:%M}; holding until then (no reboots)") else: log(f"limit hit on {role} ({session}) — reset time unparsable; flat " f"{LIMIT_PROBE_FALLBACK // 60}-min probe loop (no reboots)") _save_limit_state(session, {"until": until, "nudges": 0}) return True # Armed window: a spinner or a vanished banner means the limit lifted. if ACTIVE_RE.search(pane) or not limited_now: log(f"limit lifted on {role} ({session}) — clearing limit state") _clear_limit_state(session) return False now = time.time() if now < state.get("until", 0): return True # quietly inside the window # Window elapsed, banner still showing → probe. Dedupe: never stack a second probe # while our own text sits typed-but-unsubmitted in the INPUT area (bottom lines only — # checking the whole pane wedged the machine once a submitted probe scrolled into the # conversation above, suppressing all further probes; found 2026-06-11). msg = _limit_nudge_msg(role) if msg[:28] in "\n".join(pane.splitlines()[-8:]): return True nudges = state.get("nudges", 0) + 1 log(f"limit probe #{nudges} on {role} ({session}) — nudging to resume") ping_session(session, msg, submit_key=_SUBMIT) # Self-verify: re-read the pane. A spinner with no banner = resumed; a re-printed # banner = still limited → re-arm from the fresh banner (else flat fallback). time.sleep(3) pane2 = capture_pane(session, 40) if ACTIVE_RE.search(pane2) and not LIMIT_RE.search(pane2): log(f"limit lifted on {role} ({session}) — probe resumed it") _clear_limit_state(session) # Return True ("handled this tick") so the caller does NOT continue evaluating # its pre-resume pane capture — that stale data held an old WAITING-UNTIL and # got a freshly-resumed session rebooted (2026-06-11 05:52). Next tick sees the # live session normally. return True until, _ = _next_limit_until(pane2, now) if nudges == 3: log(f"WARNING: {role} ({session}) still limited after {nudges} probes — " f"continuing flat {LIMIT_PROBE_FALLBACK // 60}-min probes; never rebooting") _save_limit_state(session, {"until": until, "nudges": nudges}) return True # ── session healing ─────────────────────────────────────────────────────────── def heal_session(role, session, workdir): """Restart a dead session; kill+restart a FATAL-wedged one; never touch a limit-stalled one.""" if not session_alive(session): log(f"{role} ({session}) gone — restarting (phase {phase_id(cur_idx())})") start_agent(role, session, workdir) return pane = capture_pane(session, 25) if ACTIVE_RE.search(pane): return # actively working — leave it alone if limit_tick(role, session, pane): return # inside a usage-limit window — limit_tick owns nudging; NEVER kill if FATAL_RE.search(pane): log(f"FATAL session-state error on {role} ({session}) — kill + restart fresh") kill_session(session) start_agent(role, session, workdir) return # ── stall detection ─────────────────────────────────────────────────────────── _idle_since: dict[str, float] = {} def _parse_waiting_until(pane): """Extract the epoch timestamp from a WAITING-UNTIL marker, or None.""" if BACKEND == "opencode": line = _last_nonempty_line(pane) if not line.startswith("WAITING-UNTIL:"): return None m = re.search(r"WAITING-UNTIL:\s*(\S+)", line) else: m = re.search(r"WAITING-UNTIL:\s*(\S+)", pane) if not m: return None try: ts = m.group(1) dt = datetime.fromisoformat(ts.replace("Z", "+00:00")) return dt.timestamp() except Exception: return None def stall_check_one(role, session, workdir): if not session_alive(session): _idle_since[session] = 0.0 _clear_limit_state(session) return now = time.time() pane = capture_pane(session, 40) # Limit windows (both backends): suppress idle accounting and reboots entirely — # a limit-stalled session must never reach the kill+reboot below. if limit_tick(role, session, pane): _idle_since[session] = 0.0 return if ACTIVE_RE.search(pane) or (BACKEND == "opencode" and ( RECENT_ACTIVITY_RE.search(pane) or _log_recently_touched(session, OPENCODE_LOG_GRACE) )): _idle_since[session] = 0.0 return since = _idle_since.get(session) or now _idle_since[session] = since idle = now - since until = _parse_waiting_until(pane) if until is not None: # Declared wait: only reboot once STALL_GRACE seconds past the stated time. # Never reboot before — that races with the healthy self-wake. if now <= until + STALL_GRACE: return reason = f"past its WAITING-UNTIL by {int(now - until)}s — self-wake did not fire" else: stall_idle = OPENCODE_STALL_IDLE if BACKEND == "opencode" else STALL_IDLE if idle < stall_idle: return reason = f"idle {int(idle)}s with no WAITING-UNTIL marker" log(f"stall: {role} ({session}) {reason} — kill + reboot") kill_session(session) start_agent(role, session, workdir) _idle_since[session] = 0.0 def stall_check(): stall_check_one("builder", BUILDER_SESSION, BUILDER_DIR) stall_check_one("adversary", ADV_SESSION, ADV_DIR) # ── orchestrator healing ────────────────────────────────────────────────────── def orchestrator_alive(): """ True if an orchestrator process is running anywhere. Conflict-safety: never launch a second orchestrator resuming the same session (double-resume causes "thinking blocks cannot be modified" crashes). """ for line in subprocess.run("pgrep -x claude || true", shell=True, capture_output=True, text=True).stdout.splitlines(): pid = line.strip() if not pid: continue try: cmdline = Path(f"/proc/{pid}/cmdline").read_bytes().decode(errors="replace").replace("\0", " ") # Skip the loop sessions and the upgrader — they're not the orchestrator. if re.search(r"--remote-control\s+'?cc-ci-(builder|adv|upgrader)'?", cmdline): continue return True except Exception: pass return session_alive(ORCH_SESSION) def heal_orchestrator(): if not WATCH_ORCHESTRATOR: return if not Path(ORCH_LAUNCHER).is_file(): return if orchestrator_alive(): if session_alive(ORCH_SESSION): pane = capture_pane(ORCH_SESSION, 25) if ACTIVE_RE.search(pane): return if limit_tick("orchestrator", ORCH_SESSION, pane): return # limit window — never kill/restart the orchestrator over a limit if FATAL_RE.search(pane): log(f"FATAL session-state error on orchestrator ({ORCH_SESSION}) — kill + restart") kill_session(ORCH_SESSION) subprocess.run([ORCH_LAUNCHER, "start"], capture_output=True) return log(f"orchestrator not running — restarting via {ORCH_LAUNCHER}") subprocess.run([ORCH_LAUNCHER, "start"], capture_output=True) def wake_orchestrator(): """Hourly supervision nudge: type the progress-monitor prompt into the orchestrator session so it reviews the loops. Returns True when the wake was delivered (or is moot), False when it should be retried on a later tick. Skips (retry later) if the orchestrator is absent — heal_orchestrator restarts it — or actively working, so we never interrupt a turn; the wake lands the moment it goes idle. """ if not WATCH_ORCHESTRATOR: return True # feature off — treat as handled so the timer doesn't spin if not session_alive(ORCH_SESSION): return False if _load_limit_state(ORCH_SESSION) is not None: # Deliberately wake anyway: the hourly prompt doubles as a fallback that keeps # the orchestrator on track even if the limit-state machinery breaks. If the # limit is genuinely still in force the wake is harmless (banner re-prints). log(f"orchestrator ({ORCH_SESSION}) is in a limit window — sending hourly wake anyway (fallback)") if ACTIVE_RE.search(capture_pane(ORCH_SESSION, 25)): return False # busy — don't interrupt; retry when idle try: msg = " ".join(Path(ORCH_WAKE_PROMPT).read_text().split()) except FileNotFoundError: log(f"orchestrator wake skipped — prompt file missing: {ORCH_WAKE_PROMPT}") return True if not msg: return True log(f"waking orchestrator ({ORCH_SESSION}) for scheduled supervision pass") ping_session(ORCH_SESSION, msg, submit_key=_SUBMIT) return True def orch_limit_check(): """Signal-tick limit care for the orchestrator: heal_orchestrator only runs on heavy ticks, so this keeps the orchestrator's limit-resume latency ≤ SIGNAL_INTERVAL — the same best treatment the loops get.""" if not WATCH_ORCHESTRATOR or not session_alive(ORCH_SESSION): return limit_tick("orchestrator", ORCH_SESSION, capture_pane(ORCH_SESSION, 40)) # ── handoff signalling ──────────────────────────────────────────────────────── _last_sha = "" _adv_inbox_seen = "" _builder_inbox_seen = "" def handoff_reset(): global _last_sha, _adv_inbox_seen, _builder_inbox_seen _last_sha = _adv_inbox_seen = _builder_inbox_seen = "" def _fetch_origin(): subprocess.run(f"git -C {BUILDER_DIR!r} fetch -q origin", shell=True, capture_output=True) def _show_pushed(path): """Read a file from origin/main (machine-docs/ first, then repo root).""" for loc in (f"origin/main:machine-docs/{path}", f"origin/main:{path}"): r = subprocess.run( f"git -C {BUILDER_DIR!r} show {loc!r}", shell=True, capture_output=True, text=True) if r.returncode == 0: return r.stdout return "" _SUBMIT = "C-m" if BACKEND == "opencode" else "Enter" def handoff_check(): global _last_sha, _adv_inbox_seen, _builder_inbox_seen _fetch_origin() r = subprocess.run( f"git -C {BUILDER_DIR!r} rev-parse origin/main", shell=True, capture_output=True, text=True) head = r.stdout.strip() if head: if not _last_sha: _last_sha = head # baseline silently on first tick elif head != _last_sha: subjects = subprocess.run( f"git -C {BUILDER_DIR!r} log --format=%s {_last_sha}..origin/main", shell=True, capture_output=True, text=True).stdout if re.search(r"^claim", subjects, re.MULTILINE | re.IGNORECASE): log("handoff: new claim(...) commit → pinging Adversary") ping_session(ADV_SESSION, "watchdog ping: the Builder pushed a gate CLAIM (claim(...) commit). " "Pull and verify the claimed gate now.", submit_key=_SUBMIT) if re.search(r"^review", subjects, re.MULTILINE | re.IGNORECASE): log("handoff: new review(...) commit → pinging Builder") ping_session(BUILDER_SESSION, "watchdog ping: the Adversary pushed a verdict/finding (review(...) commit). " "Pull REVIEW and act — proceed if it PASSes your gate, address it if it's a finding.", submit_key=_SUBMIT) _last_sha = head adv_inbox = _show_pushed("ADVERSARY-INBOX.md") builder_inbox = _show_pushed("BUILDER-INBOX.md") def md5(s): return hashlib.md5(s.encode()).hexdigest() if adv_inbox: h = md5(adv_inbox) if h != _adv_inbox_seen: log("handoff: ADVERSARY-INBOX.md changed → pinging Adversary") ping_session(ADV_SESSION, "watchdog ping: the Builder pushed machine-docs/ADVERSARY-INBOX.md — " "pull, read it, act, then delete the file (commit + push) to mark it consumed.", submit_key=_SUBMIT) _adv_inbox_seen = h else: _adv_inbox_seen = "" if builder_inbox: h = md5(builder_inbox) if h != _builder_inbox_seen: log("handoff: BUILDER-INBOX.md changed → pinging Builder") ping_session(BUILDER_SESSION, "watchdog ping: the Adversary pushed machine-docs/BUILDER-INBOX.md — " "pull, read it, act, then delete the file (commit + push) to mark it consumed.", submit_key=_SUBMIT) _builder_inbox_seen = h else: _builder_inbox_seen = "" # ── watchdog loop ───────────────────────────────────────────────────────────── def watchdog_loop(): idx = cur_idx() log(f"watchdog up — phase={phase_id(idx)} [{idx+1}/{len(PHASES)}] " f"seq='{all_ids()}' signal={SIGNAL_INTERVAL}s heavy={WATCH_INTERVAL}s") elapsed = WATCH_INTERVAL # force a heavy check on the first tick wake_elapsed = 0 # first orchestrator wake fires after a full interval, not at startup while True: handoff_check() stall_check() orch_limit_check() if wake_elapsed >= ORCH_WAKE_INTERVAL: # Reset only once the wake actually lands; if the orchestrator is busy/absent, # leave the timer tripped so we retry each tick until it's idle. if wake_orchestrator(): wake_elapsed = 0 if elapsed >= WATCH_INTERVAL: elapsed = 0 idx = cur_idx() pid = phase_id(idx) status = phase_status(idx) if phase_done(status): next_idx = idx + 1 if next_idx < len(PHASES): log(f"PHASE {pid} DONE — auto-transitioning to {phase_id(next_idx)}") stop_loops() Path(PHASE_IDX_FILE).write_text(str(next_idx)) handoff_reset() start_loops() else: log(f"PHASE SEQUENCE COMPLETE (last phase {pid} DONE) — stopping loops") stop_loops() ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") Path(LOG_DIR, "SEQUENCE-COMPLETE").write_text( f"cc-ci phase sequence complete {ts}. Phases: {all_ids()}. " f"Loops stopped; entire build finished.\n") # One-shot: if the operator queued an upgrade to follow the phase run # (touch LOG_DIR/.run-upgrade-on-complete), launch /upgrade-all now in # place of the weekly cron. Flag is consumed so it never repeats. trigger = Path(LOG_DIR) / ".run-upgrade-on-complete" if trigger.exists(): trigger.unlink() log("upgrade-on-complete flag set — launching /upgrade-all") subprocess.run( ["python3", f"{PLAN_DIR}/launch-upgrader.py", "start"], env={**os.environ, "PATH": "/home/loops/.local/bin:" + os.environ.get("PATH", "")}, capture_output=True) log("watchdog exiting.") return else: heal_session("builder", BUILDER_SESSION, BUILDER_DIR) heal_session("adversary", ADV_SESSION, ADV_DIR) heal_orchestrator() time.sleep(SIGNAL_INTERVAL) elapsed += SIGNAL_INTERVAL wake_elapsed += SIGNAL_INTERVAL def start_watchdog(): if session_alive(WATCHDOG_SESSION): log("watchdog already running") return log("starting watchdog") script = Path(__file__).resolve() # Forward the phase spec / idx file / backend explicitly. The watchdog tmux session is spawned # into the existing tmux server and would otherwise NOT inherit a custom PHASES_SPEC from the # caller's env — it would fall back to the default spec and mis-detect phase completion. env_prefix = ( f"PHASES_SPEC='{PHASES_SPEC}' PHASE_IDX_FILE='{PHASE_IDX_FILE}' " f"LOOP_BACKEND='{BACKEND}' LOOP_MODEL='{LOOP_MODEL}' ADV_MODEL='{ADV_MODEL}' " ) subprocess.run([ "tmux", "new-session", "-d", "-s", WATCHDOG_SESSION, "-c", PLAN_DIR, f"exec >>'{LOG_DIR}/watchdog.log' 2>&1; {env_prefix}python3 '{script}' watchdog" ]) def start_cleanlogs(): """Maintain readable, greppable per-agent transcript logs (.clean.log) by tailing each session's JSONL — costs nothing on the agents (read-only on a file claude writes anyway).""" if session_alive("cc-ci-cleanlogs"): log("cleanlogs already running") return log("starting cleanlogs (per-agent .clean.log)") al = Path(__file__).resolve().parent / "agent-log.py" subprocess.run(["tmux", "new-session", "-d", "-s", "cc-ci-cleanlogs", "-c", PLAN_DIR, f"python3 '{al}' follow-all"]) # ── preflight ───────────────────────────────────────────────────────────────── def preflight(): import shutil if not shutil.which("tmux"): die("tmux not found") if BACKEND == "claude": if not shutil.which(CLAUDE_BIN): die(f"claude CLI not found — set CLAUDE_BIN (currently: {CLAUDE_BIN})") elif BACKEND == "opencode": if not Path(OPENCODE_BIN).exists(): die(f"opencode not found at {OPENCODE_BIN}") else: die(f"unknown LOOP_BACKEND '{BACKEND}' — use 'claude' or 'opencode'") for phase in PHASES: plan = Path(PLAN_DIR) / phase[1] if not plan.exists(): die(f"missing phase plan: {plan}") for prompt_file in ("builder.md", "adversary.md"): if not (Path(PLAN_DIR) / "prompts" / prompt_file).exists(): die(f"missing {PLAN_DIR}/prompts/{prompt_file}") Path(LOG_DIR).mkdir(parents=True, exist_ok=True) # ── status ──────────────────────────────────────────────────────────────────── def cmd_status(): idx = cur_idx() pid = phase_id(idx) print(f" phase: {pid} [{idx+1}/{len(PHASES)}] plan={phase_plan(idx)} status={phase_status(idx)}") for s in (BUILDER_SESSION, ADV_SESSION, WATCHDOG_SESSION): state = "RUNNING" if session_alive(s) else "stopped" print(f" {s}: {state}") done_str = "## DONE" if phase_done(phase_status(idx)) else "in progress" print(f" phase {pid}: {done_str}") seq = Path(LOG_DIR) / "SEQUENCE-COMPLETE" if seq.exists(): print(f" >>> {seq.read_text().strip()}") # ── main ────────────────────────────────────────────────────────────────────── def main(): cmd = sys.argv[1] if len(sys.argv) > 1 else "" if cmd == "start": preflight() stop_loops() if os.environ.get("RESUME_PHASE") != "1": Path(PHASE_IDX_FILE).write_text("0") seq = Path(LOG_DIR) / "SEQUENCE-COMPLETE" if seq.exists(): seq.unlink() # Persist backend/model so the watchdog uses them when restarting dead sessions. Path(_BACKEND_FILE).write_text(BACKEND) Path(_MODEL_FILE).write_text(LOOP_MODEL) Path(_ADV_MODEL_FILE).write_text(ADV_MODEL) Path(_PHASES_FILE).write_text(PHASES_SPEC) log(f"backend={BACKEND} model={LOOP_MODEL or ''} " f"adv-model={ADV_MODEL or ''} (persisted to {_BACKEND_FILE})") log(f"phases='{all_ids()}' (persisted to {_PHASES_FILE})") start_loops() start_watchdog() start_cleanlogs() log(f"started at phase {phase_id(cur_idx())}.") elif cmd == "watchdog": preflight() watchdog_loop() elif cmd == "status": cmd_status() elif cmd == "stop": stop_loops() if session_alive(WATCHDOG_SESSION): log(f"killing {WATCHDOG_SESSION}") kill_session(WATCHDOG_SESSION) log("stopped.") elif cmd == "logs": sub = sys.argv[2] if len(sys.argv) > 2 else "" log_files = { "builder": f"{LOG_DIR}/{BUILDER_SESSION}.log", "adversary": f"{LOG_DIR}/{ADV_SESSION}.log", "watchdog": f"{LOG_DIR}/watchdog.log", } if sub not in log_files: die("usage: launch.py logs builder|adversary|watchdog") os.execvp("tail", ["tail", "-f", log_files[sub]]) else: print(f"""cc-ci loop launcher (phase-aware) launch.py start start loops + watchdog (RESUME_PHASE=1 to keep current phase) launch.py stop stop loops + watchdog launch.py status show phase + session state launch.py logs builder|adversary|watchdog tail a log launch.py watchdog run watchdog in foreground Backend: {BACKEND} Model: {LOOP_MODEL or ''} Adversary model: {ADV_MODEL or ''} Phase sequence ({len(PHASES)} phases, auto-advance on ## DONE, stop after last): {all_ids()} """) if __name__ == "__main__": main()