feat: optional log_tokens — per-phase token + time accounting
When [watchdog].log_tokens (or [loop].log_tokens) is true, the watchdog records for each phase how many tokens each agent used (and the total) and how long the phase took, appended to <log_dir>/token-log.jsonl. Tokens are summed from each agent's session transcript, attributed by working dir. View with `agents.py tokens`. Baseline snapshot at phase start + delta at phase advance/complete; robust across watchdog restarts. Validated: the transcript sum matches an independent external collector exactly. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
18
README.md
18
README.md
@ -63,6 +63,23 @@ heavy_interval = 300 # seconds between heal + phase-advance checks
|
||||
limit_probe_fallback = 300 # re-probe cadence for a usage-limited agent when reset time is unparsable
|
||||
limit_reset_slack = 45 # seconds to wait past a parsed reset before probing
|
||||
stall_grace = 180 # seconds of slack past a WAITING-UNTIL marker before a stall reboot
|
||||
log_tokens = false # opt-in: record per-phase token + time usage (see below)
|
||||
```
|
||||
|
||||
**Per-phase token + time logging (`log_tokens`).** Set `log_tokens = true` (under `[watchdog]` or
|
||||
`[loop]`) and the watchdog records, for **each phase**, how many tokens **each agent** used and how
|
||||
long the phase took — appended as one JSON object per phase to `<log_dir>/token-log.jsonl`. Tokens
|
||||
are summed from each agent's Claude Code session transcript and attributed **by working dir**, so
|
||||
give each agent its own `dir` (the Builder/Adversary loop pair already uses separate clones) for
|
||||
accurate per-agent numbers. The watchdog snapshots a baseline when a phase starts and writes the
|
||||
delta (per agent, and the total) when the phase advances or the sequence completes — robust across
|
||||
watchdog restarts. Pretty-print it with `agents.py tokens`:
|
||||
|
||||
```
|
||||
phase dur(s) builder adversary TOTAL
|
||||
-----------------------------------------------------
|
||||
lex 372.0 3,910,118 3,221,447 7,131,565
|
||||
parse 410.5 ...
|
||||
```
|
||||
|
||||
### `[defaults]` — inherited by every agent
|
||||
@ -240,6 +257,7 @@ agents.py status table of every agent: kind, backend, model, w
|
||||
agents.py watchdog the supervisor loop (what the <prefix>watchdog session runs)
|
||||
agents.py logs <name> tail that session's log
|
||||
agents.py phase [show|next|set N] inspect / move the loop phase index
|
||||
agents.py tokens per-phase token + time report (when [watchdog].log_tokens = true)
|
||||
agents.py selftest regression-test the backend activity detector (needs no config)
|
||||
agents.py init [dir] scaffold a starter agents.toml + prompts/ in a project dir
|
||||
--config PATH use a specific config (default: ./agents.toml)
|
||||
|
||||
131
agents.py
131
agents.py
@ -14,6 +14,7 @@ Usage:
|
||||
agents.py watchdog the supervisor loop (reads the config every tick)
|
||||
agents.py logs <name> tail an agent's session log
|
||||
agents.py phase [set N|next|show] inspect / move the loop phase
|
||||
agents.py tokens per-phase token + time report (needs [watchdog].log_tokens = true)
|
||||
agents.py selftest backend activity-detector regression checks (no config needed)
|
||||
agents.py init [dir] scaffold a starter agents.toml + prompts/ in a project dir
|
||||
|
||||
@ -663,6 +664,99 @@ def start_loops(cfg):
|
||||
for a in loop_agents(cfg):
|
||||
start_agent(cfg, a)
|
||||
|
||||
# ── optional per-phase token + time logging (log_tokens) ──────────────────────────
|
||||
# When [watchdog].log_tokens (or [loop].log_tokens) is true, the watchdog records, for each phase,
|
||||
# how many tokens each agent used and how long the phase took, appended to <log_dir>/token-log.jsonl.
|
||||
# Tokens are summed from each agent's Claude Code session transcript, attributed by working dir — so
|
||||
# give each agent its OWN dir for accurate per-agent numbers (the Builder/Adversary loop pair already
|
||||
# uses separate clones). View with: agents.py tokens.
|
||||
|
||||
def log_tokens_enabled(cfg):
|
||||
return bool(cfg.get("watchdog", {}).get("log_tokens") or cfg.get("loop", {}).get("log_tokens"))
|
||||
|
||||
def _transcript_dir(workdir):
|
||||
name = str(workdir).rstrip("/").replace("/", "-").replace(".", "-")
|
||||
return Path(os.path.expanduser("~/.claude/projects")) / name
|
||||
|
||||
def _sum_tokens(workdir):
|
||||
t = {"input": 0, "output": 0, "cache_create": 0, "cache_read": 0}
|
||||
d = _transcript_dir(workdir)
|
||||
if d.is_dir():
|
||||
for f in d.glob("*.jsonl"):
|
||||
try:
|
||||
for line in f.open(errors="ignore"):
|
||||
try:
|
||||
o = json.loads(line)
|
||||
except Exception:
|
||||
continue
|
||||
if o.get("type") == "assistant":
|
||||
u = (o.get("message", {}) or {}).get("usage", {}) or {}
|
||||
t["input"] += u.get("input_tokens", 0) or 0
|
||||
t["output"] += u.get("output_tokens", 0) or 0
|
||||
t["cache_create"] += u.get("cache_creation_input_tokens", 0) or 0
|
||||
t["cache_read"] += u.get("cache_read_input_tokens", 0) or 0
|
||||
except OSError:
|
||||
continue
|
||||
t["total"] = t["input"] + t["output"] + t["cache_create"] + t["cache_read"]
|
||||
return t
|
||||
|
||||
def _token_cumulative(cfg):
|
||||
"""Cumulative tokens per agent so far, summed from each agent's transcript dir."""
|
||||
return {a["name"]: _sum_tokens(a["dir"]) for a in cfg["agents"].values()}
|
||||
|
||||
_TOKEN_KEYS = ("input", "output", "cache_create", "cache_read", "total")
|
||||
def _token_state_path(cfg): return Path(cfg["state_dir"]) / "token-phase.json"
|
||||
def _token_log_path(cfg): return Path(cfg["log_dir"]) / "token-log.jsonl"
|
||||
def _tok_delta(cur, base): return {k: cur.get(k, 0) - base.get(k, 0) for k in _TOKEN_KEYS}
|
||||
|
||||
def token_phase_begin(cfg, phase_id):
|
||||
"""Set the baseline (cumulative tokens + start time) for the phase now starting. Idempotent
|
||||
across watchdog restarts: keeps the original baseline if already tracking this phase."""
|
||||
if not log_tokens_enabled(cfg):
|
||||
return
|
||||
sf = _token_state_path(cfg)
|
||||
try:
|
||||
if json.loads(sf.read_text()).get("phase_id") == phase_id:
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
sf.write_text(json.dumps({"phase_id": phase_id,
|
||||
"started": datetime.now().isoformat(timespec="seconds"),
|
||||
"baseline": _token_cumulative(cfg)}))
|
||||
|
||||
def token_phase_flush(cfg, next_phase_id):
|
||||
"""Close the current phase: append its per-agent + total token deltas and duration to the
|
||||
token-log, then re-baseline for next_phase_id (or finalize tracking if None)."""
|
||||
if not log_tokens_enabled(cfg):
|
||||
return
|
||||
sf = _token_state_path(cfg)
|
||||
try:
|
||||
st = json.loads(sf.read_text())
|
||||
except Exception:
|
||||
return
|
||||
cur = _token_cumulative(cfg)
|
||||
base = st.get("baseline", {})
|
||||
started = st.get("started")
|
||||
try:
|
||||
dur = round((datetime.now() - datetime.fromisoformat(started)).total_seconds(), 1)
|
||||
except Exception:
|
||||
dur = None
|
||||
per_agent = {n: _tok_delta(cur.get(n, {}), base.get(n, {})) for n in cur}
|
||||
total = {k: sum(per_agent[n][k] for n in per_agent) for k in _TOKEN_KEYS}
|
||||
rec = {"phase_id": st.get("phase_id"), "started": started,
|
||||
"ended": datetime.now().isoformat(timespec="seconds"), "duration_s": dur,
|
||||
"agents": per_agent, "total": total}
|
||||
with _token_log_path(cfg).open("a") as fh:
|
||||
fh.write(json.dumps(rec) + "\n")
|
||||
parts = ", ".join(f"{n}={per_agent[n]['total']:,}" for n in per_agent)
|
||||
log(f"[log_tokens] phase {rec['phase_id']}: {total['total']:,} tok in {dur}s ({parts})")
|
||||
if next_phase_id is not None:
|
||||
sf.write_text(json.dumps({"phase_id": next_phase_id,
|
||||
"started": datetime.now().isoformat(timespec="seconds"),
|
||||
"baseline": cur}))
|
||||
else:
|
||||
sf.unlink(missing_ok=True)
|
||||
|
||||
def phase_advance_check(cfg):
|
||||
"""On heavy tick: if the current phase is DONE, advance (or finish the sequence).
|
||||
|
||||
@ -681,6 +775,7 @@ def phase_advance_check(cfg):
|
||||
nxt = idx + 1
|
||||
if nxt < len(ps):
|
||||
log(f"PHASE {ph['id']} DONE — auto-transitioning to {ps[nxt]['id']}")
|
||||
token_phase_flush(cfg, ps[nxt]["id"])
|
||||
stop_loops(cfg)
|
||||
Path(phase_idx_file(cfg)).write_text(str(nxt))
|
||||
if marker.exists():
|
||||
@ -692,6 +787,7 @@ def phase_advance_check(cfg):
|
||||
if marker.exists():
|
||||
return False # already handled — idempotent (no re-log, no re-stop)
|
||||
log(f"PHASE SEQUENCE COMPLETE (last phase {ph['id']} DONE) — stopping loops")
|
||||
token_phase_flush(cfg, None)
|
||||
stop_loops(cfg)
|
||||
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
marker.write_text(f"phase sequence complete {ts}. Loops stopped; build finished.\n")
|
||||
@ -721,6 +817,9 @@ def watchdog_loop(cfg_path):
|
||||
f"signal={sig}s heavy={heavy}s, watching: {[a['name'] for a in watched(cfg)]}")
|
||||
elapsed = heavy # force a heavy check on first tick
|
||||
wake_elapsed = {a["name"]: 0 for a in cfg["agents"].values() if a.get("wake")}
|
||||
if log_tokens_enabled(cfg):
|
||||
token_phase_begin(cfg, cur_phase(cfg).get("id"))
|
||||
log("[log_tokens] enabled — per-phase token + time logging to token-log.jsonl")
|
||||
while True:
|
||||
cfg = load_config(cfg_path) # re-read every tick: config is authoritative, no env drift
|
||||
has_loops = bool(loop_agents(cfg))
|
||||
@ -840,6 +939,37 @@ def cmd_phase(cfg, args):
|
||||
Path(phase_idx_file(cfg)).write_text(str(int(args[1])))
|
||||
print(f"phase idx now {cur_idx(cfg)} ({cur_phase(cfg).get('id')})")
|
||||
|
||||
def cmd_tokens(cfg):
|
||||
"""Pretty-print <log_dir>/token-log.jsonl: per-phase tokens by agent + total + duration."""
|
||||
p = _token_log_path(cfg)
|
||||
if not p.exists():
|
||||
print(f"no token log at {p}\n(set [watchdog].log_tokens = true and run the loop)"); return
|
||||
recs = []
|
||||
for line in p.read_text().splitlines():
|
||||
try: recs.append(json.loads(line))
|
||||
except Exception: pass
|
||||
if not recs:
|
||||
print("token log is empty"); return
|
||||
names = []
|
||||
for r in recs:
|
||||
for n in r.get("agents", {}):
|
||||
if n not in names: names.append(n)
|
||||
w = max([7] + [len(n) for n in names])
|
||||
hdr = f"{'phase':<10} {'dur(s)':>8} " + " ".join(f"{n:>{w}}" for n in names) + f" {'TOTAL':>13}"
|
||||
print(hdr); print("-" * len(hdr))
|
||||
grand = {n: 0 for n in names}
|
||||
durtot = 0.0
|
||||
for r in recs:
|
||||
ag = r.get("agents", {})
|
||||
cells = " ".join(f"{ag.get(n,{}).get('total',0):>{w},}" for n in names)
|
||||
print(f"{str(r.get('phase_id')):<10} {str(r.get('duration_s')):>8} {cells} "
|
||||
f"{r.get('total',{}).get('total',0):>13,}")
|
||||
for n in names: grand[n] += ag.get(n,{}).get("total",0)
|
||||
durtot += r.get("duration_s") or 0
|
||||
print("-" * len(hdr))
|
||||
cells = " ".join(f"{grand[n]:>{w},}" for n in names)
|
||||
print(f"{'TOTAL':<10} {durtot:>8.0f} {cells} {sum(grand.values()):>13,}")
|
||||
|
||||
def cmd_selftest():
|
||||
"""Self-contained regression checks for the footer-UI activity detector. Needs no config."""
|
||||
backend = {
|
||||
@ -917,6 +1047,7 @@ def main():
|
||||
elif cmd == "status": cmd_status(cfg)
|
||||
elif cmd == "watchdog": watchdog_loop(cfg_path)
|
||||
elif cmd == "phase": cmd_phase(cfg, rest)
|
||||
elif cmd == "tokens": cmd_tokens(cfg)
|
||||
elif cmd == "logs":
|
||||
if not rest:
|
||||
die("usage: agents.py logs <name>")
|
||||
|
||||
Reference in New Issue
Block a user