feat: configurable per-gate token logging + responsive phase auto-advance
Two watchdog/metrics improvements to the loop machine: 1) Token-logging granularity is configurable via [watchdog].token_granularity: 'gate' (default) or 'phase'. In 'gate' mode, tokens are attributed to each claimed gate -- any 'claim(<label>)' commit on the work repo's origin/main (e.g. claim(D1-D5), claim(feat:multi-file); a leading 'feat:' is stripped) -- in addition to the per-phase rollup, appended to token-log.jsonl tagged phase_id='<phase>:<label>'. A change in the most-recently-claimed label is the boundary; the in-flight gate is also flushed when the phase ends. 'phase' mode keeps the original per-phase-only behaviour. 2) Phase auto-advance is now evaluated on EVERY signal tick instead of only the heavy tick, so a completed phase advances within signal_interval of its '## DONE' landing rather than idling up to heavy_interval. Healing stays on the heavy cadence. Note: gate-boundary detection assumes the loop's 'claim(<label>)' commit convention. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01UWTdUq2bsic7JZGqJp3nD6
This commit is contained in:
97
agents.py
97
agents.py
@ -757,6 +757,93 @@ def token_phase_flush(cfg, next_phase_id):
|
||||
fh.write(json.dumps(rec) + "\n")
|
||||
parts = ", ".join(f"{n}={per_agent[n]['total']:,}" for n in per_agent)
|
||||
log(f"[log_tokens] phase {rec['phase_id']}: {total['total']:,} tok in {dur}s ({parts})")
|
||||
|
||||
# ── token logging granularity: per phase, or also per GATE (log_tokens) ────────────
|
||||
# Phases are tracked per phase (token_phase_begin/flush). With token_granularity="gate" (the default)
|
||||
# tokens are ALSO attributed to each gate. A "gate" is a claimed unit — any `claim(<label>)` commit on
|
||||
# the work repo's origin/main (e.g. claim(D1-D5), claim(feat:multi-file)); a leading "feat:" is
|
||||
# stripped for readability. A change in the most-recently-claimed label is a boundary; on each
|
||||
# boundary the previous gate's per-agent token delta + duration is appended to token-log.jsonl tagged
|
||||
# phase_id="<phase>:<label>", so `agents.py tokens` lists it as its own row. The per-phase rollup
|
||||
# record is written either way; "phase" granularity logs only that.
|
||||
_gate_claim_re = re.compile(r"^claim\(\s*([^)]+?)\s*\)", re.I)
|
||||
|
||||
def token_granularity(cfg):
|
||||
"""'gate' (default; per claimed gate, plus the per-phase rollup) or 'phase' (per phase only)."""
|
||||
g = (cfg.get("watchdog", {}).get("token_granularity")
|
||||
or cfg.get("loop", {}).get("token_granularity") or "gate")
|
||||
return g if g in ("gate", "phase") else "gate"
|
||||
|
||||
def _token_gate_state_path(cfg):
|
||||
return Path(cfg["state_dir"]) / "token-gate.json"
|
||||
|
||||
def _latest_claimed_gate(cfg):
|
||||
"""Label of the most-recent `claim(<label>)` subject on the work repo's origin/main, or None.
|
||||
A leading 'feat:' is stripped so feature gates read as their bare name."""
|
||||
r = _git(handoff_repo(cfg), "log -1 --format=%s --grep 'claim(' origin/main")
|
||||
m = _gate_claim_re.match((r.stdout or "").strip())
|
||||
if not m:
|
||||
return None
|
||||
label = m.group(1).strip()
|
||||
return label[5:].strip() if label.lower().startswith("feat:") else label
|
||||
|
||||
def _write_token_delta(cfg, phase_id, st):
|
||||
"""Append one per-agent token delta record (vs the baseline in st) to token-log.jsonl."""
|
||||
cur = _token_cumulative(cfg)
|
||||
base = st.get("baseline", {})
|
||||
started = st.get("started")
|
||||
try:
|
||||
dur = round((datetime.now() - datetime.fromisoformat(started)).total_seconds(), 1)
|
||||
except Exception:
|
||||
dur = None
|
||||
per_agent = {n: _tok_delta(cur.get(n, {}), base.get(n, {})) for n in cur}
|
||||
total = {k: sum(per_agent[n][k] for n in per_agent) for k in _TOKEN_KEYS}
|
||||
rec = {"phase_id": phase_id, "started": started,
|
||||
"ended": datetime.now().isoformat(timespec="seconds"), "duration_s": dur,
|
||||
"agents": per_agent, "total": total}
|
||||
with _token_log_path(cfg).open("a") as fh:
|
||||
fh.write(json.dumps(rec) + "\n")
|
||||
return rec, per_agent, total
|
||||
|
||||
def gate_token_flush(cfg):
|
||||
"""Close out the currently-tracked gate (if any): write its token delta, then drop the state."""
|
||||
sf = _token_gate_state_path(cfg)
|
||||
try:
|
||||
st = json.loads(sf.read_text())
|
||||
except Exception:
|
||||
return
|
||||
gate = st.get("gate")
|
||||
if not gate:
|
||||
return
|
||||
phase_id = f"{st['phase']}:{gate}" if st.get("phase") else gate
|
||||
rec, per_agent, total = _write_token_delta(cfg, phase_id, st)
|
||||
parts = ", ".join(f"{n}={per_agent[n]['total']:,}" for n in per_agent)
|
||||
log(f"[log_tokens] gate {phase_id}: {total['total']:,} tok in {rec['duration_s']}s ({parts})")
|
||||
try:
|
||||
sf.unlink()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def gate_token_check(cfg):
|
||||
"""When token_granularity=='gate', detect gate boundaries and flush per-gate token deltas."""
|
||||
if not log_tokens_enabled(cfg) or token_granularity(cfg) != "gate":
|
||||
return
|
||||
current = _latest_claimed_gate(cfg)
|
||||
if not current:
|
||||
return
|
||||
sf = _token_gate_state_path(cfg)
|
||||
try:
|
||||
tracked = json.loads(sf.read_text()).get("gate")
|
||||
except Exception:
|
||||
tracked = None
|
||||
if tracked == current:
|
||||
return
|
||||
if tracked:
|
||||
gate_token_flush(cfg) # close out the previous gate before starting the next
|
||||
sf.write_text(json.dumps({"gate": current, "phase": cur_phase(cfg).get("id"),
|
||||
"started": datetime.now().isoformat(timespec="seconds"),
|
||||
"baseline": _token_cumulative(cfg)}))
|
||||
log(f"[log_tokens] tracking gate: {current}")
|
||||
if next_phase_id is not None:
|
||||
sf.write_text(json.dumps({"phase_id": next_phase_id,
|
||||
"started": datetime.now().isoformat(timespec="seconds"),
|
||||
@ -779,6 +866,8 @@ def phase_advance_check(cfg):
|
||||
ph = ps[idx]
|
||||
if not phase_done(cfg, ph["status"]):
|
||||
return False
|
||||
if log_tokens_enabled(cfg) and token_granularity(cfg) == "gate":
|
||||
gate_token_flush(cfg) # close out the last in-flight gate before leaving the phase
|
||||
nxt = idx + 1
|
||||
if nxt < len(ps):
|
||||
log(f"PHASE {ph['id']} DONE — auto-transitioning to {ps[nxt]['id']}")
|
||||
@ -826,7 +915,7 @@ def watchdog_loop(cfg_path):
|
||||
wake_elapsed = {a["name"]: 0 for a in cfg["agents"].values() if a.get("wake")}
|
||||
if log_tokens_enabled(cfg):
|
||||
token_phase_begin(cfg, cur_phase(cfg).get("id"))
|
||||
log("[log_tokens] enabled — per-phase token + time logging to token-log.jsonl")
|
||||
log(f"[log_tokens] enabled (granularity={token_granularity(cfg)}) — token-log.jsonl")
|
||||
while True:
|
||||
cfg = load_config(cfg_path) # re-read every tick: config is authoritative, no env drift
|
||||
has_loops = bool(loop_agents(cfg))
|
||||
@ -834,6 +923,7 @@ def watchdog_loop(cfg_path):
|
||||
|
||||
if has_loops:
|
||||
handoff_check(cfg)
|
||||
gate_token_check(cfg)
|
||||
for a in watched(cfg):
|
||||
if a["watch"] == "heal+stall":
|
||||
stall_check_one(cfg, a)
|
||||
@ -848,9 +938,12 @@ def watchdog_loop(cfg_path):
|
||||
if wake_agent(cfg, cfg["agents"][name]):
|
||||
wake_elapsed[name] = 0
|
||||
|
||||
# Auto-advance is checked EVERY tick (not just the heavy tick) so a completed phase advances
|
||||
# within signal_interval of its `## DONE` landing, instead of idling up to heavy_interval.
|
||||
advanced = phase_advance_check(cfg) if has_loops else False
|
||||
|
||||
if elapsed >= heavy:
|
||||
elapsed = 0
|
||||
advanced = phase_advance_check(cfg) if has_loops else False
|
||||
if not advanced:
|
||||
for a in watched(cfg):
|
||||
if seq_done and a["kind"] == "loop":
|
||||
|
||||
Reference in New Issue
Block a user