#!/usr/bin/env python3 """ cc-ci loop launcher — phase-aware Builder/Adversary loops + watchdog. Usage: launch.py start start loops + watchdog (resets to phase 0 unless RESUME_PHASE=1) launch.py stop stop loops + watchdog launch.py status show phase + session state launch.py watchdog run the watchdog in the foreground (called by start_watchdog) launch.py logs builder|adversary|watchdog tail a log Env (all optional — defaults shown): LOOP_BACKEND claude (default) | opencode LOOP_MODEL model flag, e.g. "sonnet" (claude) or "tinfoil/deepseek-v4-pro" (opencode) RESUME_PHASE 1 = keep current phase index on start (default resets to 0) CLAUDE_BIN claude OPENCODE_BIN /home/loops/.local/bin/opencode OPENCODE_SERVER http://127.0.0.1:4096 PLAN_DIR /srv/cc-ci/cc-ci-plan BUILDER_DIR /srv/cc-ci/cc-ci ADV_DIR /srv/cc-ci/cc-ci-adv LOG_DIR /srv/cc-ci/.cc-ci-logs PHASES_SPEC semicolon-separated "id|planfile|statusfile" entries PHASE_IDX_FILE $LOG_DIR/.phase-idx WATCH_INTERVAL 300 (seconds between heavy checks: phase DONE / heal sessions) SIGNAL_INTERVAL 30 (seconds between handoff / stall checks) STALL_IDLE 300 (idle seconds without a WAITING-UNTIL before reboot) STALL_GRACE 180 (seconds past a WAITING-UNTIL before reboot) """ import hashlib, os, re, subprocess, sys, time from datetime import datetime, timezone from pathlib import Path # ── config ──────────────────────────────────────────────────────────────────── PLAN_DIR = os.environ.get("PLAN_DIR", "/srv/cc-ci/cc-ci-plan") BUILDER_DIR = os.environ.get("BUILDER_DIR", "/srv/cc-ci/cc-ci") ADV_DIR = os.environ.get("ADV_DIR", "/srv/cc-ci/cc-ci-adv") LOG_DIR = os.environ.get("LOG_DIR", "/srv/cc-ci/.cc-ci-logs") BACKEND = os.environ.get("LOOP_BACKEND", "claude") LOOP_MODEL = os.environ.get("LOOP_MODEL", "") REMOTE_CONTROL = os.environ.get("REMOTE_CONTROL", "1") == "1" CLAUDE_BIN = os.environ.get("CLAUDE_BIN", "claude") CLAUDE_FLAGS = os.environ.get("CLAUDE_FLAGS", "") if os.getuid() == 0: os.environ.setdefault("CLAUDE_DANGEROUSLY_SKIP_PERMISSIONS", "1") else: CLAUDE_FLAGS = os.environ.get("CLAUDE_FLAGS", "--dangerously-skip-permissions") OPENCODE_BIN = os.environ.get("OPENCODE_BIN", "/home/loops/.local/bin/opencode") OPENCODE_SERVER = os.environ.get("OPENCODE_SERVER", "http://127.0.0.1:4096") ORCH_SESSION = os.environ.get("ORCH_SESSION", "cc-ci-orchestrator-vm") ORCH_LAUNCHER = os.environ.get("ORCH_LAUNCHER", f"{PLAN_DIR}/launch-orchestrator.sh") WATCH_ORCHESTRATOR = os.environ.get("WATCH_ORCHESTRATOR", "1") == "1" BUILDER_SESSION = "cc-ci-builder" ADV_SESSION = "cc-ci-adv" WATCHDOG_SESSION = "cc-ci-watchdog" WATCH_INTERVAL = int(os.environ.get("WATCH_INTERVAL", 300)) SIGNAL_INTERVAL = int(os.environ.get("SIGNAL_INTERVAL", 30)) STALL_IDLE = int(os.environ.get("STALL_IDLE", 300)) STALL_GRACE = int(os.environ.get("STALL_GRACE", 180)) PHASES_SPEC = os.environ.get("PHASES_SPEC", ";".join([ "1c|plan-phase1c-full-reproducibility.md|STATUS-1c.md", "1b|plan-phase1b-review-lint.md|STATUS-1b.md", "1d|plan-phase1d-generic-test-suite.md|STATUS-1d.md", "1e|plan-phase1e-harness-corrections.md|STATUS-1e.md", "2w|plan-phase2w-warm-canonical-quick.md|STATUS-2w.md", "2pc|plan-phase2pc-image-cache.md|STATUS-2pc.md", "2|plan-phase2-recipe-tests.md|STATUS-2.md", "2b|plan-phase2b-test-performance.md|STATUS-2b.md", "3|plan-phase3-results-ux.md|STATUS-3.md", "4|plan-phase4-final-review-polish-cleanup.md|STATUS-4.md", "5|plan-phase5-verify-upgrade-flow.md|STATUS-5.md", ])) PHASES = [p.split("|") for p in PHASES_SPEC.split(";")] PHASE_IDX_FILE = os.environ.get("PHASE_IDX_FILE", f"{LOG_DIR}/.phase-idx") # Regex patterns for session-state detection ACTIVE_RE = re.compile(r"esc to interrupt|⠋|⠙|⠹|⠸|⠼|⠴|⠦|⠧|⠇|⠏|Running tool") LIMIT_RE = re.compile(r"spend limit|usage limit|limit reached|reached your .*limit|out of (credits|tokens)", re.I) FATAL_RE = re.compile(r"redacted_thinking|blocks cannot be modified|cannot be modified", re.I) # ── logging ─────────────────────────────────────────────────────────────────── def log(msg): ts = datetime.now().strftime("%H:%M:%S") print(f"[launch {ts}] {msg}", flush=True) def die(msg): log(f"ERROR: {msg}") sys.exit(1) # ── tmux helpers ────────────────────────────────────────────────────────────── def session_alive(name): return subprocess.run( ["tmux", "has-session", "-t", name], capture_output=True ).returncode == 0 def kill_session(name): subprocess.run(["tmux", "kill-session", "-t", name], capture_output=True) def capture_pane(name, lines=40): r = subprocess.run(["tmux", "capture-pane", "-pt", name], capture_output=True, text=True) return "\n".join(r.stdout.splitlines()[-lines:]) if r.returncode == 0 else "" def pipe_to_log(session, log_path): subprocess.run(["tmux", "pipe-pane", "-o", "-t", session, f"cat >> '{log_path}'"]) def ping_session(session, msg): """Type a message into a tmux session and submit it, retrying Enter until accepted.""" if not session_alive(session): return prefix = msg[:28] subprocess.run(["tmux", "send-keys", "-t", session, "-l", "--", msg], capture_output=True) time.sleep(0.5) for _ in range(5): subprocess.run(["tmux", "send-keys", "-t", session, "Enter"], capture_output=True) time.sleep(1) if prefix not in capture_pane(session, 4): return # message was accepted subprocess.run(["tmux", "send-keys", "-t", session, "C-m"], capture_output=True) time.sleep(0.5) # ── phase helpers ───────────────────────────────────────────────────────────── def cur_idx(): try: v = Path(PHASE_IDX_FILE).read_text().strip() return int(v) if v.isdigit() else 0 except FileNotFoundError: return 0 def phase_id(idx): return PHASES[idx][0] def phase_plan(idx): return PHASES[idx][1] def phase_status(idx): return PHASES[idx][2] def all_ids(): return " ".join(p[0] for p in PHASES) def resolve_state(repo_dir, basename): """Return the path to a loop-state file — machine-docs/ if present, else repo root.""" p = Path(repo_dir) / "machine-docs" / basename return p if p.exists() else Path(repo_dir) / basename def phase_done(status_basename): path = resolve_state(BUILDER_DIR, status_basename) try: return any(line.startswith("## DONE") for line in path.open()) except FileNotFoundError: return False # ── kickoff prompt ──────────────────────────────────────────────────────────── def build_kickoff(role, idx): pid, plan, status = phase_id(idx), phase_plan(idx), phase_status(idx) preamble = ( f"*** cc-ci SUB-PHASE {pid} ***\n" f"SINGLE SOURCE OF TRUTH for THIS phase: /srv/cc-ci/cc-ci-plan/{plan} — read it in full " f"now; it defines this phase's mission and Definition of Done.\n" f"The general loop protocol still applies and lives in /srv/cc-ci/cc-ci-plan/plan.md " f"(§6.1 coordination, §7 pacing, §9 guardrails) — read those sections too.\n" f"Track loop state in PHASE-NAMESPACED files in your repo clone: {status}, " f"BACKLOG-{pid}.md, REVIEW-{pid}.md, JOURNAL-{pid}.md. DECISIONS.md is shared (append).\n" f'"Done" for this phase = the Builder writes "## DONE" to {status} ONLY after every ' f"Definition-of-Done item is Adversary-verified with a fresh PASS in REVIEW-{pid}.md " f"(handshake per §6.1).\n" f"The repo's Phase-1 STATUS.md / BACKLOG.md / REVIEW.md are HISTORY from the completed " f"Phase 1 — do NOT use them as your state; use the phase-namespaced files above.\n" f'Wherever the standing rules below say "plan.md"/"STATUS.md"/"BACKLOG.md"/"REVIEW.md", ' f"substitute the phase plan and these phase-namespaced files.\n\n" f"=== standing role & rules ===\n" ) role_prompt = (Path(PLAN_DIR) / "prompts" / f"{role}.md").read_text() return preamble + role_prompt # ── agent launch ────────────────────────────────────────────────────────────── def start_agent(role, session, workdir): if session_alive(session): log(f"{session} already running — leaving it") return Path(workdir).mkdir(parents=True, exist_ok=True) Path(LOG_DIR).mkdir(parents=True, exist_ok=True) idx = cur_idx() pid, plan = phase_id(idx), phase_plan(idx) kf = Path(LOG_DIR) / f".kickoff-{session}.txt" kf.write_text(build_kickoff(role, idx)) model_flag = f"--model '{LOOP_MODEL}'" if LOOP_MODEL else "" if BACKEND == "claude": rc = f"--remote-control '{session}'" if REMOTE_CONTROL else "" cmd = f"{CLAUDE_BIN} {rc} {model_flag} {CLAUDE_FLAGS} \"$(cat '{kf}')\"" log(f"starting {session} (backend=claude, phase={pid}, plan={plan}, model={LOOP_MODEL or 'default'})") elif BACKEND == "opencode": # `opencode attach` is the persistent TUI (stays alive in tmux, like the claude TUI). # We attach to the shared server, then send the kickoff message via tmux send-keys. # The server stores the session by title; NO_COLOR=1 skips the first-run theme picker. cmd = ( f"set -a; . /srv/cc-ci/.testenv; set +a; " f"NO_COLOR=1 {OPENCODE_BIN} {model_flag} attach '{OPENCODE_SERVER}'" ) log(f"starting {session} (backend=opencode, phase={pid}, model={LOOP_MODEL or 'default'})") log(f" visible at http://oc.commoninternet.net (tailnet only)") else: die(f"unknown BACKEND '{BACKEND}' — set LOOP_BACKEND=claude or LOOP_BACKEND=opencode") subprocess.run(["tmux", "new-session", "-d", "-s", session, "-c", workdir, cmd]) pipe_to_log(session, f"{LOG_DIR}/{session}.log") # opencode: send the kickoff prompt once the TUI is ready (give it a moment to connect). if BACKEND == "opencode": time.sleep(4) kickoff_text = kf.read_text().strip() ping_session(session, kickoff_text) def start_loops(): start_agent("builder", BUILDER_SESSION, BUILDER_DIR) start_agent("adversary", ADV_SESSION, ADV_DIR) def stop_loops(): for s in (BUILDER_SESSION, ADV_SESSION): if session_alive(s): log(f"killing {s}") kill_session(s) # ── session healing ─────────────────────────────────────────────────────────── def heal_session(role, session, workdir): """Restart a dead session; kill+restart a FATAL-wedged one; nudge a limit-stalled one.""" if not session_alive(session): log(f"{role} ({session}) gone — restarting (phase {phase_id(cur_idx())})") start_agent(role, session, workdir) return pane = capture_pane(session, 25) if ACTIVE_RE.search(pane): return # actively working — leave it alone if FATAL_RE.search(pane): log(f"FATAL session-state error on {role} ({session}) — kill + restart fresh") kill_session(session) start_agent(role, session, workdir) return if LIMIT_RE.search(pane): log(f"limit-stall on {role} ({session}) — nudging to resume") ping_session(session, "watchdog: the usage/spend limit appears lifted — RESUME your loop now. " "Pull latest, re-read your phase STATUS/REVIEW files, and continue from where you " "stopped; re-arm your loop pacing.") # ── stall detection ─────────────────────────────────────────────────────────── _idle_since: dict[str, float] = {} def _parse_waiting_until(pane): """Extract the epoch timestamp from a WAITING-UNTIL marker, or None.""" m = re.search(r"WAITING-UNTIL:\s*(\S+)", pane) if not m: return None try: ts = m.group(1) dt = datetime.fromisoformat(ts.replace("Z", "+00:00")) return dt.timestamp() except Exception: return None def stall_check_one(role, session, workdir): if not session_alive(session): _idle_since[session] = 0.0 return now = time.time() pane = capture_pane(session, 40) if ACTIVE_RE.search(pane): _idle_since[session] = 0.0 return since = _idle_since.get(session) or now _idle_since[session] = since idle = now - since until = _parse_waiting_until(pane) if until is not None: # Declared wait: only reboot once STALL_GRACE seconds past the stated time. # Never reboot before — that races with the healthy self-wake. if now <= until + STALL_GRACE: return reason = f"past its WAITING-UNTIL by {int(now - until)}s — self-wake did not fire" else: if idle < STALL_IDLE: return reason = f"idle {int(idle)}s with no WAITING-UNTIL marker" log(f"stall: {role} ({session}) {reason} — kill + reboot") kill_session(session) start_agent(role, session, workdir) _idle_since[session] = 0.0 def stall_check(): stall_check_one("builder", BUILDER_SESSION, BUILDER_DIR) stall_check_one("adversary", ADV_SESSION, ADV_DIR) # ── orchestrator healing ────────────────────────────────────────────────────── def orchestrator_alive(): """ True if an orchestrator process is running anywhere. Conflict-safety: never launch a second orchestrator resuming the same session (double-resume causes "thinking blocks cannot be modified" crashes). """ for line in subprocess.run("pgrep -x claude || true", shell=True, capture_output=True, text=True).stdout.splitlines(): pid = line.strip() if not pid: continue try: cmdline = Path(f"/proc/{pid}/cmdline").read_bytes().decode(errors="replace").replace("\0", " ") # Skip the loop sessions and the upgrader — they're not the orchestrator. if re.search(r"--remote-control\s+'?cc-ci-(builder|adv|upgrader)'?", cmdline): continue return True except Exception: pass return session_alive(ORCH_SESSION) def heal_orchestrator(): if not WATCH_ORCHESTRATOR: return if not Path(ORCH_LAUNCHER).is_file(): return if orchestrator_alive(): if session_alive(ORCH_SESSION): pane = capture_pane(ORCH_SESSION, 25) if ACTIVE_RE.search(pane): return if FATAL_RE.search(pane): log(f"FATAL session-state error on orchestrator ({ORCH_SESSION}) — kill + restart") kill_session(ORCH_SESSION) subprocess.run([ORCH_LAUNCHER, "start"], capture_output=True) return log(f"orchestrator not running — restarting via {ORCH_LAUNCHER}") subprocess.run([ORCH_LAUNCHER, "start"], capture_output=True) # ── handoff signalling ──────────────────────────────────────────────────────── _last_sha = "" _adv_inbox_seen = "" _builder_inbox_seen = "" def handoff_reset(): global _last_sha, _adv_inbox_seen, _builder_inbox_seen _last_sha = _adv_inbox_seen = _builder_inbox_seen = "" def _fetch_origin(): subprocess.run(f"git -C {BUILDER_DIR!r} fetch -q origin", shell=True, capture_output=True) def _show_pushed(path): """Read a file from origin/main (machine-docs/ first, then repo root).""" for loc in (f"origin/main:machine-docs/{path}", f"origin/main:{path}"): r = subprocess.run( f"git -C {BUILDER_DIR!r} show {loc!r}", shell=True, capture_output=True, text=True) if r.returncode == 0: return r.stdout return "" def handoff_check(): global _last_sha, _adv_inbox_seen, _builder_inbox_seen _fetch_origin() r = subprocess.run( f"git -C {BUILDER_DIR!r} rev-parse origin/main", shell=True, capture_output=True, text=True) head = r.stdout.strip() if head: if not _last_sha: _last_sha = head # baseline silently on first tick elif head != _last_sha: subjects = subprocess.run( f"git -C {BUILDER_DIR!r} log --format=%s {_last_sha}..origin/main", shell=True, capture_output=True, text=True).stdout if re.search(r"^claim", subjects, re.MULTILINE | re.IGNORECASE): log("handoff: new claim(...) commit → pinging Adversary") ping_session(ADV_SESSION, "watchdog ping: the Builder pushed a gate CLAIM (claim(...) commit). " "Pull and verify the claimed gate now.") if re.search(r"^review", subjects, re.MULTILINE | re.IGNORECASE): log("handoff: new review(...) commit → pinging Builder") ping_session(BUILDER_SESSION, "watchdog ping: the Adversary pushed a verdict/finding (review(...) commit). " "Pull REVIEW and act — proceed if it PASSes your gate, address it if it's a finding.") _last_sha = head adv_inbox = _show_pushed("ADVERSARY-INBOX.md") builder_inbox = _show_pushed("BUILDER-INBOX.md") def md5(s): return hashlib.md5(s.encode()).hexdigest() if adv_inbox: h = md5(adv_inbox) if h != _adv_inbox_seen: log("handoff: ADVERSARY-INBOX.md changed → pinging Adversary") ping_session(ADV_SESSION, "watchdog ping: the Builder pushed machine-docs/ADVERSARY-INBOX.md — " "pull, read it, act, then delete the file (commit + push) to mark it consumed.") _adv_inbox_seen = h else: _adv_inbox_seen = "" if builder_inbox: h = md5(builder_inbox) if h != _builder_inbox_seen: log("handoff: BUILDER-INBOX.md changed → pinging Builder") ping_session(BUILDER_SESSION, "watchdog ping: the Adversary pushed machine-docs/BUILDER-INBOX.md — " "pull, read it, act, then delete the file (commit + push) to mark it consumed.") _builder_inbox_seen = h else: _builder_inbox_seen = "" # ── watchdog loop ───────────────────────────────────────────────────────────── def watchdog_loop(): idx = cur_idx() log(f"watchdog up — phase={phase_id(idx)} [{idx+1}/{len(PHASES)}] " f"seq='{all_ids()}' signal={SIGNAL_INTERVAL}s heavy={WATCH_INTERVAL}s") elapsed = WATCH_INTERVAL # force a heavy check on the first tick while True: handoff_check() stall_check() if elapsed >= WATCH_INTERVAL: elapsed = 0 idx = cur_idx() pid = phase_id(idx) status = phase_status(idx) if phase_done(status): next_idx = idx + 1 if next_idx < len(PHASES): log(f"PHASE {pid} DONE — auto-transitioning to {phase_id(next_idx)}") stop_loops() Path(PHASE_IDX_FILE).write_text(str(next_idx)) handoff_reset() start_loops() else: log(f"PHASE SEQUENCE COMPLETE (last phase {pid} DONE) — stopping loops") stop_loops() ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") Path(LOG_DIR, "SEQUENCE-COMPLETE").write_text( f"cc-ci phase sequence complete {ts}. Phases: {all_ids()}. " f"Loops stopped; entire build finished.\n") log("watchdog exiting.") return else: heal_session("builder", BUILDER_SESSION, BUILDER_DIR) heal_session("adversary", ADV_SESSION, ADV_DIR) heal_orchestrator() time.sleep(SIGNAL_INTERVAL) elapsed += SIGNAL_INTERVAL def start_watchdog(): if session_alive(WATCHDOG_SESSION): log("watchdog already running") return log("starting watchdog") script = Path(__file__).resolve() subprocess.run([ "tmux", "new-session", "-d", "-s", WATCHDOG_SESSION, "-c", PLAN_DIR, f"exec >>'{LOG_DIR}/watchdog.log' 2>&1; python3 '{script}' watchdog" ]) # ── preflight ───────────────────────────────────────────────────────────────── def preflight(): import shutil if not shutil.which("tmux"): die("tmux not found") if BACKEND == "claude": if not shutil.which(CLAUDE_BIN): die(f"claude CLI not found — set CLAUDE_BIN (currently: {CLAUDE_BIN})") elif BACKEND == "opencode": if not Path(OPENCODE_BIN).exists(): die(f"opencode not found at {OPENCODE_BIN}") else: die(f"unknown LOOP_BACKEND '{BACKEND}' — use 'claude' or 'opencode'") for phase in PHASES: plan = Path(PLAN_DIR) / phase[1] if not plan.exists(): die(f"missing phase plan: {plan}") for prompt_file in ("builder.md", "adversary.md"): if not (Path(PLAN_DIR) / "prompts" / prompt_file).exists(): die(f"missing {PLAN_DIR}/prompts/{prompt_file}") Path(LOG_DIR).mkdir(parents=True, exist_ok=True) # ── status ──────────────────────────────────────────────────────────────────── def cmd_status(): idx = cur_idx() pid = phase_id(idx) print(f" phase: {pid} [{idx+1}/{len(PHASES)}] plan={phase_plan(idx)} status={phase_status(idx)}") for s in (BUILDER_SESSION, ADV_SESSION, WATCHDOG_SESSION): state = "RUNNING" if session_alive(s) else "stopped" print(f" {s}: {state}") done_str = "## DONE" if phase_done(phase_status(idx)) else "in progress" print(f" phase {pid}: {done_str}") seq = Path(LOG_DIR) / "SEQUENCE-COMPLETE" if seq.exists(): print(f" >>> {seq.read_text().strip()}") # ── main ────────────────────────────────────────────────────────────────────── def main(): cmd = sys.argv[1] if len(sys.argv) > 1 else "" if cmd == "start": preflight() stop_loops() if os.environ.get("RESUME_PHASE") != "1": Path(PHASE_IDX_FILE).write_text("0") seq = Path(LOG_DIR) / "SEQUENCE-COMPLETE" if seq.exists(): seq.unlink() start_loops() start_watchdog() log(f"started at phase {phase_id(cur_idx())}.") elif cmd == "watchdog": preflight() watchdog_loop() elif cmd == "status": cmd_status() elif cmd == "stop": stop_loops() if session_alive(WATCHDOG_SESSION): log(f"killing {WATCHDOG_SESSION}") kill_session(WATCHDOG_SESSION) log("stopped.") elif cmd == "logs": sub = sys.argv[2] if len(sys.argv) > 2 else "" log_files = { "builder": f"{LOG_DIR}/{BUILDER_SESSION}.log", "adversary": f"{LOG_DIR}/{ADV_SESSION}.log", "watchdog": f"{LOG_DIR}/watchdog.log", } if sub not in log_files: die("usage: launch.py logs builder|adversary|watchdog") os.execvp("tail", ["tail", "-f", log_files[sub]]) else: print(f"""cc-ci loop launcher (phase-aware) launch.py start start loops + watchdog (RESUME_PHASE=1 to keep current phase) launch.py stop stop loops + watchdog launch.py status show phase + session state launch.py logs builder|adversary|watchdog tail a log launch.py watchdog run watchdog in foreground Backend: {BACKEND} Model: {LOOP_MODEL or ''} Phase sequence ({len(PHASES)} phases, auto-advance on ## DONE, stop after last): {all_ids()} """) if __name__ == "__main__": main()