From 924874aafae02cec8521acae6ae80f6d8d915441 Mon Sep 17 00:00:00 2001 From: mfowler Date: Sun, 14 Jun 2026 21:48:17 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20optional=20log=5Ftokens=20=E2=80=94=20p?= =?UTF-8?q?er-phase=20token=20+=20time=20accounting?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When [watchdog].log_tokens (or [loop].log_tokens) is true, the watchdog records for each phase how many tokens each agent used (and the total) and how long the phase took, appended to /token-log.jsonl. Tokens are summed from each agent's session transcript, attributed by working dir. View with `agents.py tokens`. Baseline snapshot at phase start + delta at phase advance/complete; robust across watchdog restarts. Validated: the transcript sum matches an independent external collector exactly. Co-Authored-By: Claude Opus 4.8 --- README.md | 18 ++++++++ agents.py | 131 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 149 insertions(+) diff --git a/README.md b/README.md index 39a8b1d..9316de4 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,23 @@ heavy_interval = 300 # seconds between heal + phase-advance checks limit_probe_fallback = 300 # re-probe cadence for a usage-limited agent when reset time is unparsable limit_reset_slack = 45 # seconds to wait past a parsed reset before probing stall_grace = 180 # seconds of slack past a WAITING-UNTIL marker before a stall reboot +log_tokens = false # opt-in: record per-phase token + time usage (see below) +``` + +**Per-phase token + time logging (`log_tokens`).** Set `log_tokens = true` (under `[watchdog]` or +`[loop]`) and the watchdog records, for **each phase**, how many tokens **each agent** used and how +long the phase took — appended as one JSON object per phase to `/token-log.jsonl`. Tokens +are summed from each agent's Claude Code session transcript and attributed **by working dir**, so +give each agent its own `dir` (the Builder/Adversary loop pair already uses separate clones) for +accurate per-agent numbers. The watchdog snapshots a baseline when a phase starts and writes the +delta (per agent, and the total) when the phase advances or the sequence completes — robust across +watchdog restarts. Pretty-print it with `agents.py tokens`: + +``` +phase dur(s) builder adversary TOTAL +----------------------------------------------------- +lex 372.0 3,910,118 3,221,447 7,131,565 +parse 410.5 ... ``` ### `[defaults]` — inherited by every agent @@ -240,6 +257,7 @@ agents.py status table of every agent: kind, backend, model, w agents.py watchdog the supervisor loop (what the watchdog session runs) agents.py logs tail that session's log agents.py phase [show|next|set N] inspect / move the loop phase index +agents.py tokens per-phase token + time report (when [watchdog].log_tokens = true) agents.py selftest regression-test the backend activity detector (needs no config) agents.py init [dir] scaffold a starter agents.toml + prompts/ in a project dir --config PATH use a specific config (default: ./agents.toml) diff --git a/agents.py b/agents.py index d7eab02..50ed503 100755 --- a/agents.py +++ b/agents.py @@ -14,6 +14,7 @@ Usage: agents.py watchdog the supervisor loop (reads the config every tick) agents.py logs tail an agent's session log agents.py phase [set N|next|show] inspect / move the loop phase + agents.py tokens per-phase token + time report (needs [watchdog].log_tokens = true) agents.py selftest backend activity-detector regression checks (no config needed) agents.py init [dir] scaffold a starter agents.toml + prompts/ in a project dir @@ -663,6 +664,99 @@ def start_loops(cfg): for a in loop_agents(cfg): start_agent(cfg, a) +# ── optional per-phase token + time logging (log_tokens) ────────────────────────── +# When [watchdog].log_tokens (or [loop].log_tokens) is true, the watchdog records, for each phase, +# how many tokens each agent used and how long the phase took, appended to /token-log.jsonl. +# Tokens are summed from each agent's Claude Code session transcript, attributed by working dir — so +# give each agent its OWN dir for accurate per-agent numbers (the Builder/Adversary loop pair already +# uses separate clones). View with: agents.py tokens. + +def log_tokens_enabled(cfg): + return bool(cfg.get("watchdog", {}).get("log_tokens") or cfg.get("loop", {}).get("log_tokens")) + +def _transcript_dir(workdir): + name = str(workdir).rstrip("/").replace("/", "-").replace(".", "-") + return Path(os.path.expanduser("~/.claude/projects")) / name + +def _sum_tokens(workdir): + t = {"input": 0, "output": 0, "cache_create": 0, "cache_read": 0} + d = _transcript_dir(workdir) + if d.is_dir(): + for f in d.glob("*.jsonl"): + try: + for line in f.open(errors="ignore"): + try: + o = json.loads(line) + except Exception: + continue + if o.get("type") == "assistant": + u = (o.get("message", {}) or {}).get("usage", {}) or {} + t["input"] += u.get("input_tokens", 0) or 0 + t["output"] += u.get("output_tokens", 0) or 0 + t["cache_create"] += u.get("cache_creation_input_tokens", 0) or 0 + t["cache_read"] += u.get("cache_read_input_tokens", 0) or 0 + except OSError: + continue + t["total"] = t["input"] + t["output"] + t["cache_create"] + t["cache_read"] + return t + +def _token_cumulative(cfg): + """Cumulative tokens per agent so far, summed from each agent's transcript dir.""" + return {a["name"]: _sum_tokens(a["dir"]) for a in cfg["agents"].values()} + +_TOKEN_KEYS = ("input", "output", "cache_create", "cache_read", "total") +def _token_state_path(cfg): return Path(cfg["state_dir"]) / "token-phase.json" +def _token_log_path(cfg): return Path(cfg["log_dir"]) / "token-log.jsonl" +def _tok_delta(cur, base): return {k: cur.get(k, 0) - base.get(k, 0) for k in _TOKEN_KEYS} + +def token_phase_begin(cfg, phase_id): + """Set the baseline (cumulative tokens + start time) for the phase now starting. Idempotent + across watchdog restarts: keeps the original baseline if already tracking this phase.""" + if not log_tokens_enabled(cfg): + return + sf = _token_state_path(cfg) + try: + if json.loads(sf.read_text()).get("phase_id") == phase_id: + return + except Exception: + pass + sf.write_text(json.dumps({"phase_id": phase_id, + "started": datetime.now().isoformat(timespec="seconds"), + "baseline": _token_cumulative(cfg)})) + +def token_phase_flush(cfg, next_phase_id): + """Close the current phase: append its per-agent + total token deltas and duration to the + token-log, then re-baseline for next_phase_id (or finalize tracking if None).""" + if not log_tokens_enabled(cfg): + return + sf = _token_state_path(cfg) + try: + st = json.loads(sf.read_text()) + except Exception: + return + cur = _token_cumulative(cfg) + base = st.get("baseline", {}) + started = st.get("started") + try: + dur = round((datetime.now() - datetime.fromisoformat(started)).total_seconds(), 1) + except Exception: + dur = None + per_agent = {n: _tok_delta(cur.get(n, {}), base.get(n, {})) for n in cur} + total = {k: sum(per_agent[n][k] for n in per_agent) for k in _TOKEN_KEYS} + rec = {"phase_id": st.get("phase_id"), "started": started, + "ended": datetime.now().isoformat(timespec="seconds"), "duration_s": dur, + "agents": per_agent, "total": total} + with _token_log_path(cfg).open("a") as fh: + fh.write(json.dumps(rec) + "\n") + parts = ", ".join(f"{n}={per_agent[n]['total']:,}" for n in per_agent) + log(f"[log_tokens] phase {rec['phase_id']}: {total['total']:,} tok in {dur}s ({parts})") + if next_phase_id is not None: + sf.write_text(json.dumps({"phase_id": next_phase_id, + "started": datetime.now().isoformat(timespec="seconds"), + "baseline": cur})) + else: + sf.unlink(missing_ok=True) + def phase_advance_check(cfg): """On heavy tick: if the current phase is DONE, advance (or finish the sequence). @@ -681,6 +775,7 @@ def phase_advance_check(cfg): nxt = idx + 1 if nxt < len(ps): log(f"PHASE {ph['id']} DONE — auto-transitioning to {ps[nxt]['id']}") + token_phase_flush(cfg, ps[nxt]["id"]) stop_loops(cfg) Path(phase_idx_file(cfg)).write_text(str(nxt)) if marker.exists(): @@ -692,6 +787,7 @@ def phase_advance_check(cfg): if marker.exists(): return False # already handled — idempotent (no re-log, no re-stop) log(f"PHASE SEQUENCE COMPLETE (last phase {ph['id']} DONE) — stopping loops") + token_phase_flush(cfg, None) stop_loops(cfg) ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") marker.write_text(f"phase sequence complete {ts}. Loops stopped; build finished.\n") @@ -721,6 +817,9 @@ def watchdog_loop(cfg_path): f"signal={sig}s heavy={heavy}s, watching: {[a['name'] for a in watched(cfg)]}") elapsed = heavy # force a heavy check on first tick wake_elapsed = {a["name"]: 0 for a in cfg["agents"].values() if a.get("wake")} + if log_tokens_enabled(cfg): + token_phase_begin(cfg, cur_phase(cfg).get("id")) + log("[log_tokens] enabled — per-phase token + time logging to token-log.jsonl") while True: cfg = load_config(cfg_path) # re-read every tick: config is authoritative, no env drift has_loops = bool(loop_agents(cfg)) @@ -840,6 +939,37 @@ def cmd_phase(cfg, args): Path(phase_idx_file(cfg)).write_text(str(int(args[1]))) print(f"phase idx now {cur_idx(cfg)} ({cur_phase(cfg).get('id')})") +def cmd_tokens(cfg): + """Pretty-print /token-log.jsonl: per-phase tokens by agent + total + duration.""" + p = _token_log_path(cfg) + if not p.exists(): + print(f"no token log at {p}\n(set [watchdog].log_tokens = true and run the loop)"); return + recs = [] + for line in p.read_text().splitlines(): + try: recs.append(json.loads(line)) + except Exception: pass + if not recs: + print("token log is empty"); return + names = [] + for r in recs: + for n in r.get("agents", {}): + if n not in names: names.append(n) + w = max([7] + [len(n) for n in names]) + hdr = f"{'phase':<10} {'dur(s)':>8} " + " ".join(f"{n:>{w}}" for n in names) + f" {'TOTAL':>13}" + print(hdr); print("-" * len(hdr)) + grand = {n: 0 for n in names} + durtot = 0.0 + for r in recs: + ag = r.get("agents", {}) + cells = " ".join(f"{ag.get(n,{}).get('total',0):>{w},}" for n in names) + print(f"{str(r.get('phase_id')):<10} {str(r.get('duration_s')):>8} {cells} " + f"{r.get('total',{}).get('total',0):>13,}") + for n in names: grand[n] += ag.get(n,{}).get("total",0) + durtot += r.get("duration_s") or 0 + print("-" * len(hdr)) + cells = " ".join(f"{grand[n]:>{w},}" for n in names) + print(f"{'TOTAL':<10} {durtot:>8.0f} {cells} {sum(grand.values()):>13,}") + def cmd_selftest(): """Self-contained regression checks for the footer-UI activity detector. Needs no config.""" backend = { @@ -917,6 +1047,7 @@ def main(): elif cmd == "status": cmd_status(cfg) elif cmd == "watchdog": watchdog_loop(cfg_path) elif cmd == "phase": cmd_phase(cfg, rest) + elif cmd == "tokens": cmd_tokens(cfg) elif cmd == "logs": if not rest: die("usage: agents.py logs ")