|
|
|
|
@ -33,8 +33,8 @@ Env (all optional — defaults shown):
|
|
|
|
|
STALL_GRACE 180 (seconds past a WAITING-UNTIL before reboot)
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import hashlib, os, re, subprocess, sys, time
|
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
|
import hashlib, json, os, re, subprocess, sys, time
|
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
|
# ── config ────────────────────────────────────────────────────────────────────
|
|
|
|
|
@ -79,7 +79,7 @@ else:
|
|
|
|
|
OPENCODE_BIN = os.environ.get("OPENCODE_BIN", "/home/loops/.local/bin/opencode")
|
|
|
|
|
OPENCODE_SERVER = os.environ.get("OPENCODE_SERVER", "http://127.0.0.1:4096")
|
|
|
|
|
|
|
|
|
|
ORCH_SESSION = os.environ.get("ORCH_SESSION", "cc-ci-orchestrator-vm")
|
|
|
|
|
ORCH_SESSION = os.environ.get("ORCH_SESSION", "cc-ci-orchestrator")
|
|
|
|
|
ORCH_LAUNCHER = os.environ.get("ORCH_LAUNCHER", f"{PLAN_DIR}/launch-orchestrator.sh")
|
|
|
|
|
WATCH_ORCHESTRATOR = os.environ.get("WATCH_ORCHESTRATOR", "1") == "1"
|
|
|
|
|
|
|
|
|
|
@ -301,10 +301,148 @@ def stop_loops():
|
|
|
|
|
log(f"killing {s}")
|
|
|
|
|
kill_session(s)
|
|
|
|
|
|
|
|
|
|
# ── usage-limit handling ──────────────────────────────────────────────────────
|
|
|
|
|
#
|
|
|
|
|
# When a session hits the usage/spend limit it prints a banner ("…limit reached ∙
|
|
|
|
|
# resets 3am") and sits idle. The old behaviour either blind-nudged every heavy tick
|
|
|
|
|
# (claude) or let the generic idle-stall reboot it — losing the banner, re-hitting
|
|
|
|
|
# the limit fresh, and churning. The state machine below instead:
|
|
|
|
|
# 1. parses the reset time from the banner and arms a quiet window until reset+45s
|
|
|
|
|
# (parse failure → flat 5-minute probe loop, never exponential backoff);
|
|
|
|
|
# 2. suppresses ALL kill/reboot healing while the window is armed — a limit-stalled
|
|
|
|
|
# session is NEVER rebooted;
|
|
|
|
|
# 3. at the window's end sends ONE nudge as a self-verifying probe: if the limit
|
|
|
|
|
# really lifted the session resumes (spinner clears the state next tick); if
|
|
|
|
|
# Claude Code re-prints the banner we re-parse and re-arm.
|
|
|
|
|
# State is persisted per session in LOG_DIR so a watchdog restart keeps the window.
|
|
|
|
|
|
|
|
|
|
LIMIT_PROBE_FALLBACK = int(os.environ.get("LIMIT_PROBE_FALLBACK", 300)) # flat probe cadence
|
|
|
|
|
LIMIT_RESET_SLACK = int(os.environ.get("LIMIT_RESET_SLACK", 45)) # s past parsed reset
|
|
|
|
|
|
|
|
|
|
RESET_RE = re.compile(r"resets?\s*(?:at\s*)?(\d{1,2})(?::(\d{2}))?\s*(am|pm)?", re.I)
|
|
|
|
|
|
|
|
|
|
def _limit_state_path(session):
|
|
|
|
|
return Path(LOG_DIR) / f".limited-{session}"
|
|
|
|
|
|
|
|
|
|
def _load_limit_state(session):
|
|
|
|
|
try:
|
|
|
|
|
return json.loads(_limit_state_path(session).read_text())
|
|
|
|
|
except Exception:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _save_limit_state(session, state):
|
|
|
|
|
_limit_state_path(session).write_text(json.dumps(state))
|
|
|
|
|
|
|
|
|
|
def _clear_limit_state(session):
|
|
|
|
|
try:
|
|
|
|
|
_limit_state_path(session).unlink()
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def _parse_reset_epoch(pane):
|
|
|
|
|
"""Parse the reset time-of-day from a limit banner (last match = freshest banner).
|
|
|
|
|
Returns an epoch in local time, rolled to tomorrow if the time already passed today,
|
|
|
|
|
or None when unparsable."""
|
|
|
|
|
matches = list(RESET_RE.finditer(pane))
|
|
|
|
|
if not matches:
|
|
|
|
|
return None
|
|
|
|
|
m = matches[-1]
|
|
|
|
|
try:
|
|
|
|
|
hour, minute = int(m.group(1)), int(m.group(2) or 0)
|
|
|
|
|
ampm = (m.group(3) or "").lower()
|
|
|
|
|
if ampm == "pm" and hour != 12:
|
|
|
|
|
hour += 12
|
|
|
|
|
elif ampm == "am" and hour == 12:
|
|
|
|
|
hour = 0
|
|
|
|
|
if hour > 23 or minute > 59:
|
|
|
|
|
return None
|
|
|
|
|
cand = datetime.now().replace(hour=hour, minute=minute, second=0, microsecond=0)
|
|
|
|
|
if cand.timestamp() <= time.time():
|
|
|
|
|
cand += timedelta(days=1)
|
|
|
|
|
return cand.timestamp()
|
|
|
|
|
except Exception:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _next_limit_until(pane, now):
|
|
|
|
|
"""When should we next probe? Parsed reset + slack when the banner is parsable and
|
|
|
|
|
plausibly fresh; else flat LIMIT_PROBE_FALLBACK. Time-of-day-only banners come from
|
|
|
|
|
the 5-hour limit window, so a fresh reset is always ≤ ~5h away — a parse landing
|
|
|
|
|
further out means the stated time already passed (stale banner rolled to tomorrow):
|
|
|
|
|
probe soon instead of waiting most of a day. Longer (weekly) limits print a date,
|
|
|
|
|
which doesn't parse → flat probe loop by design."""
|
|
|
|
|
parsed = _parse_reset_epoch(pane)
|
|
|
|
|
if parsed is not None and parsed - now <= 6 * 3600:
|
|
|
|
|
return parsed + LIMIT_RESET_SLACK, True
|
|
|
|
|
return now + LIMIT_PROBE_FALLBACK, False
|
|
|
|
|
|
|
|
|
|
def _limit_nudge_msg(role):
|
|
|
|
|
if role == "orchestrator":
|
|
|
|
|
return ("watchdog limit-probe: if the usage limit has reset, RESUME now — "
|
|
|
|
|
"you are the cc-ci orchestrator; re-check loop status and continue "
|
|
|
|
|
"supervising from where you stopped.")
|
|
|
|
|
return ("watchdog limit-probe: if the usage limit has reset, RESUME your loop now — "
|
|
|
|
|
"pull latest, re-read your phase STATUS/REVIEW files, and continue from "
|
|
|
|
|
"where you stopped; re-arm your loop pacing.")
|
|
|
|
|
|
|
|
|
|
def limit_tick(role, session, pane):
|
|
|
|
|
"""Unified limit state machine for both backends and the orchestrator.
|
|
|
|
|
Returns True while the session is inside a limit window — callers must then
|
|
|
|
|
suppress idle accounting, reboots, and other healing."""
|
|
|
|
|
state = _load_limit_state(session)
|
|
|
|
|
limited_now = bool(LIMIT_RE.search(pane))
|
|
|
|
|
|
|
|
|
|
if state is None:
|
|
|
|
|
if not limited_now or ACTIVE_RE.search(pane):
|
|
|
|
|
return False
|
|
|
|
|
now = time.time()
|
|
|
|
|
until, parsed = _next_limit_until(pane, now)
|
|
|
|
|
if parsed:
|
|
|
|
|
log(f"limit hit on {role} ({session}) — banner says reset "
|
|
|
|
|
f"{datetime.fromtimestamp(until):%a %H:%M}; holding until then (no reboots)")
|
|
|
|
|
else:
|
|
|
|
|
log(f"limit hit on {role} ({session}) — reset time unparsable; flat "
|
|
|
|
|
f"{LIMIT_PROBE_FALLBACK // 60}-min probe loop (no reboots)")
|
|
|
|
|
_save_limit_state(session, {"until": until, "nudges": 0})
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
# Armed window: a spinner or a vanished banner means the limit lifted.
|
|
|
|
|
if ACTIVE_RE.search(pane) or not limited_now:
|
|
|
|
|
log(f"limit lifted on {role} ({session}) — clearing limit state")
|
|
|
|
|
_clear_limit_state(session)
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
now = time.time()
|
|
|
|
|
if now < state.get("until", 0):
|
|
|
|
|
return True # quietly inside the window
|
|
|
|
|
|
|
|
|
|
# Window elapsed, banner still showing → probe. Dedupe: never stack a second
|
|
|
|
|
# probe while our own text is still visible (typed-but-unsubmitted, or echoed).
|
|
|
|
|
msg = _limit_nudge_msg(role)
|
|
|
|
|
if msg[:28] in pane:
|
|
|
|
|
return True
|
|
|
|
|
nudges = state.get("nudges", 0) + 1
|
|
|
|
|
log(f"limit probe #{nudges} on {role} ({session}) — nudging to resume")
|
|
|
|
|
ping_session(session, msg, submit_key=_SUBMIT)
|
|
|
|
|
|
|
|
|
|
# Self-verify: re-read the pane. A spinner with no banner = resumed; a re-printed
|
|
|
|
|
# banner = still limited → re-arm from the fresh banner (else flat fallback).
|
|
|
|
|
time.sleep(3)
|
|
|
|
|
pane2 = capture_pane(session, 40)
|
|
|
|
|
if ACTIVE_RE.search(pane2) and not LIMIT_RE.search(pane2):
|
|
|
|
|
log(f"limit lifted on {role} ({session}) — probe resumed it")
|
|
|
|
|
_clear_limit_state(session)
|
|
|
|
|
return False
|
|
|
|
|
until, _ = _next_limit_until(pane2, now)
|
|
|
|
|
if nudges == 3:
|
|
|
|
|
log(f"WARNING: {role} ({session}) still limited after {nudges} probes — "
|
|
|
|
|
f"continuing flat {LIMIT_PROBE_FALLBACK // 60}-min probes; never rebooting")
|
|
|
|
|
_save_limit_state(session, {"until": until, "nudges": nudges})
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
# ── session healing ───────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def heal_session(role, session, workdir):
|
|
|
|
|
"""Restart a dead session; kill+restart a FATAL-wedged one; nudge a limit-stalled one."""
|
|
|
|
|
"""Restart a dead session; kill+restart a FATAL-wedged one; never touch a limit-stalled one."""
|
|
|
|
|
if not session_alive(session):
|
|
|
|
|
log(f"{role} ({session}) gone — restarting (phase {phase_id(cur_idx())})")
|
|
|
|
|
start_agent(role, session, workdir)
|
|
|
|
|
@ -314,43 +452,18 @@ def heal_session(role, session, workdir):
|
|
|
|
|
if ACTIVE_RE.search(pane):
|
|
|
|
|
return # actively working — leave it alone
|
|
|
|
|
|
|
|
|
|
if limit_tick(role, session, pane):
|
|
|
|
|
return # inside a usage-limit window — limit_tick owns nudging; NEVER kill
|
|
|
|
|
|
|
|
|
|
if FATAL_RE.search(pane):
|
|
|
|
|
log(f"FATAL session-state error on {role} ({session}) — kill + restart fresh")
|
|
|
|
|
kill_session(session)
|
|
|
|
|
start_agent(role, session, workdir)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if BACKEND != "opencode" and LIMIT_RE.search(pane):
|
|
|
|
|
log(f"limit-stall on {role} ({session}) — nudging to resume")
|
|
|
|
|
ping_session(session,
|
|
|
|
|
"watchdog: the usage/spend limit appears lifted — RESUME your loop now. "
|
|
|
|
|
"Pull latest, re-read your phase STATUS/REVIEW files, and continue from where you "
|
|
|
|
|
"stopped; re-arm your loop pacing.", submit_key=_SUBMIT)
|
|
|
|
|
|
|
|
|
|
# ── stall detection ───────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
_idle_since: dict[str, float] = {}
|
|
|
|
|
_limit_nudged_at: dict[str, float] = {}
|
|
|
|
|
|
|
|
|
|
def _maybe_nudge_limit(role, session, pane):
|
|
|
|
|
if not LIMIT_RE.search(pane):
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
now = time.time()
|
|
|
|
|
last = _limit_nudged_at.get(session, 0.0)
|
|
|
|
|
if now - last < 300:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
_limit_nudged_at[session] = now
|
|
|
|
|
log(f"limit-stall on {role} ({session}) — nudging to resume")
|
|
|
|
|
ping_session(
|
|
|
|
|
session,
|
|
|
|
|
"watchdog: the usage/spend limit appears lifted or is about to reset. "
|
|
|
|
|
"RESUME your loop now. Pull latest, re-read your phase STATUS/REVIEW files, "
|
|
|
|
|
"and continue from where you stopped; re-arm your loop pacing.",
|
|
|
|
|
submit_key=_SUBMIT,
|
|
|
|
|
)
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def _parse_waiting_until(pane):
|
|
|
|
|
"""Extract the epoch timestamp from a WAITING-UNTIL marker, or None."""
|
|
|
|
|
@ -373,14 +486,16 @@ def _parse_waiting_until(pane):
|
|
|
|
|
def stall_check_one(role, session, workdir):
|
|
|
|
|
if not session_alive(session):
|
|
|
|
|
_idle_since[session] = 0.0
|
|
|
|
|
_limit_nudged_at[session] = 0.0
|
|
|
|
|
_clear_limit_state(session)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
now = time.time()
|
|
|
|
|
pane = capture_pane(session, 40)
|
|
|
|
|
|
|
|
|
|
if BACKEND == "opencode" and _maybe_nudge_limit(role, session, pane):
|
|
|
|
|
_idle_since[session] = now
|
|
|
|
|
# Limit windows (both backends): suppress idle accounting and reboots entirely —
|
|
|
|
|
# a limit-stalled session must never reach the kill+reboot below.
|
|
|
|
|
if limit_tick(role, session, pane):
|
|
|
|
|
_idle_since[session] = 0.0
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if ACTIVE_RE.search(pane) or (BACKEND == "opencode" and (
|
|
|
|
|
@ -449,6 +564,8 @@ def heal_orchestrator():
|
|
|
|
|
pane = capture_pane(ORCH_SESSION, 25)
|
|
|
|
|
if ACTIVE_RE.search(pane):
|
|
|
|
|
return
|
|
|
|
|
if limit_tick("orchestrator", ORCH_SESSION, pane):
|
|
|
|
|
return # limit window — never kill/restart the orchestrator over a limit
|
|
|
|
|
if FATAL_RE.search(pane):
|
|
|
|
|
log(f"FATAL session-state error on orchestrator ({ORCH_SESSION}) — kill + restart")
|
|
|
|
|
kill_session(ORCH_SESSION)
|
|
|
|
|
@ -470,6 +587,8 @@ def wake_orchestrator():
|
|
|
|
|
return True # feature off — treat as handled so the timer doesn't spin
|
|
|
|
|
if not session_alive(ORCH_SESSION):
|
|
|
|
|
return False
|
|
|
|
|
if _load_limit_state(ORCH_SESSION) is not None:
|
|
|
|
|
return False # in a usage-limit window — defer the wake until the limit clears
|
|
|
|
|
if ACTIVE_RE.search(capture_pane(ORCH_SESSION, 25)):
|
|
|
|
|
return False # busy — don't interrupt; retry when idle
|
|
|
|
|
try:
|
|
|
|
|
@ -483,6 +602,14 @@ def wake_orchestrator():
|
|
|
|
|
ping_session(ORCH_SESSION, msg, submit_key=_SUBMIT)
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def orch_limit_check():
|
|
|
|
|
"""Signal-tick limit care for the orchestrator: heal_orchestrator only runs on
|
|
|
|
|
heavy ticks, so this keeps the orchestrator's limit-resume latency ≤ SIGNAL_INTERVAL —
|
|
|
|
|
the same best treatment the loops get."""
|
|
|
|
|
if not WATCH_ORCHESTRATOR or not session_alive(ORCH_SESSION):
|
|
|
|
|
return
|
|
|
|
|
limit_tick("orchestrator", ORCH_SESSION, capture_pane(ORCH_SESSION, 40))
|
|
|
|
|
|
|
|
|
|
# ── handoff signalling ────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
_last_sha = ""
|
|
|
|
|
@ -578,6 +705,7 @@ def watchdog_loop():
|
|
|
|
|
while True:
|
|
|
|
|
handoff_check()
|
|
|
|
|
stall_check()
|
|
|
|
|
orch_limit_check()
|
|
|
|
|
|
|
|
|
|
if wake_elapsed >= ORCH_WAKE_INTERVAL:
|
|
|
|
|
# Reset only once the wake actually lands; if the orchestrator is busy/absent,
|
|
|
|
|
|