The raw 'tmux pipe-pane' logs are TUI-escape soup (the 191MB builder log). agent-log.py renders Claude's own JSONL transcript into a clean one-event- per-line <agent>.clean.log — read-only on a file the agent writes anyway, so zero agent slowdown and zero extra tokens. Resolves each agent's transcript (disambiguating the shared project dir by kickoff signature; tracks restarts). 'follow-all' runs as the cc-ci-cleanlogs session, wired into launch.py start so it comes up with the loops. render/tail subcommands for ad-hoc use. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
200 lines
7.1 KiB
Python
Executable File
200 lines
7.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Clean, greppable transcript logs for the cc-ci agents.
|
|
|
|
Claude Code already writes a structured JSONL transcript of every session under
|
|
~/.claude/projects/<cwd-slug>/<session-uuid>.jsonl. This renders that into a readable, greppable
|
|
one-event-per-line log — WITHOUT touching the agent (read-only on a file it writes anyway, so no
|
|
slowdown and zero extra tokens). The raw `tmux pipe-pane` logs are TUI-escape soup; use these instead.
|
|
|
|
Usage:
|
|
agent-log.py render <agent> print the clean transcript to stdout
|
|
agent-log.py tail <agent> [N] print the last N (default 40) rendered events
|
|
agent-log.py follow-all loop: keep <agent>.clean.log up to date for every agent (run in tmux)
|
|
|
|
Agents: builder adversary upgrader assistant orchestrator
|
|
Output logs (follow-all): /srv/cc-ci/.cc-ci-logs/<agent>.clean.log
|
|
|
|
Each line: `HH:MM:SS [kind] …` — kinds: user, asst (assistant text), tool:<Name>, result, think(skipped
|
|
by default). Long text/results are truncated (full detail stays in the JSONL); newlines collapsed to ⏎.
|
|
"""
|
|
import json, os, sys, time
|
|
|
|
PROJ = "/home/loops/.claude/projects"
|
|
LOGDIR = "/srv/cc-ci/.cc-ci-logs"
|
|
MAXLEN = 800 # truncate any single rendered block to this many chars
|
|
|
|
# agent -> (project-slug dir, signature substring in the first user/kickoff message)
|
|
AGENTS = {
|
|
"builder": ("-srv-cc-ci-orch-cc-ci", "cc-ci SUB-PHASE"),
|
|
"adversary": ("-srv-cc-ci-orch-cc-ci-adv", "Adversary agent"),
|
|
"upgrader": ("-srv-cc-ci-orch", "cc-ci UPGRADER"),
|
|
"assistant": ("-srv-cc-ci-orch", "cc-ci ASSISTANT"),
|
|
"orchestrator": ("-srv-cc-ci-orch", "cc-ci orchestrator"),
|
|
}
|
|
|
|
|
|
def _first_user_text(path, limit=4000):
|
|
"""Read the start of a transcript; return the first user-message text (the kickoff signature)."""
|
|
try:
|
|
with open(path) as f:
|
|
for _ in range(60):
|
|
ln = f.readline()
|
|
if not ln:
|
|
break
|
|
try:
|
|
o = json.loads(ln)
|
|
except Exception:
|
|
continue
|
|
if o.get("type") == "user":
|
|
c = (o.get("message") or {}).get("content")
|
|
if isinstance(c, str):
|
|
return c[:limit]
|
|
if isinstance(c, list):
|
|
return " ".join(b.get("text", "") for b in c if isinstance(b, dict) and b.get("type") == "text")[:limit]
|
|
except FileNotFoundError:
|
|
return ""
|
|
return ""
|
|
|
|
|
|
def active_jsonl(agent):
|
|
"""The agent's current transcript: newest *.jsonl in its slug dir whose kickoff matches the
|
|
signature (disambiguates the shared -srv-cc-ci-orch dir; tracks restarts → new uuid)."""
|
|
slug, sig = AGENTS[agent]
|
|
d = os.path.join(PROJ, slug)
|
|
try:
|
|
files = [os.path.join(d, f) for f in os.listdir(d) if f.endswith(".jsonl")]
|
|
except FileNotFoundError:
|
|
return None
|
|
files.sort(key=lambda p: os.path.getmtime(p), reverse=True)
|
|
for p in files:
|
|
if sig in _first_user_text(p):
|
|
return p
|
|
return None
|
|
|
|
|
|
def _clip(s):
|
|
s = " ".join(str(s).split()) if s else "" # collapse all whitespace/newlines
|
|
return s if len(s) <= MAXLEN else s[:MAXLEN] + " …[+%d]" % (len(s) - MAXLEN)
|
|
|
|
|
|
def render_line(o, show_think=False):
|
|
"""Render one JSONL event to zero or more clean lines (list of strings)."""
|
|
t = o.get("type")
|
|
if t not in ("user", "assistant"):
|
|
return []
|
|
ts = (o.get("timestamp") or "")[11:19] or "--:--:--"
|
|
m = o.get("message") or {}
|
|
c = m.get("content")
|
|
out = []
|
|
if isinstance(c, str):
|
|
if c.strip():
|
|
out.append(f"{ts} [user] {_clip(c)}")
|
|
return out
|
|
if not isinstance(c, list):
|
|
return out
|
|
for b in c:
|
|
if not isinstance(b, dict):
|
|
continue
|
|
bt = b.get("type")
|
|
if bt == "text":
|
|
txt = b.get("text", "")
|
|
if txt.strip():
|
|
out.append(f"{ts} [{'asst' if t=='assistant' else 'user'}] {_clip(txt)}")
|
|
elif bt == "thinking":
|
|
if show_think:
|
|
out.append(f"{ts} [think] {_clip(b.get('thinking',''))}")
|
|
elif bt == "tool_use":
|
|
inp = b.get("input") or {}
|
|
brief = inp.get("command") or inp.get("file_path") or inp.get("path") or inp.get("prompt") or json.dumps(inp)[:200]
|
|
out.append(f"{ts} [tool:{b.get('name')}] {_clip(brief)}")
|
|
elif bt == "tool_result":
|
|
rc = b.get("content")
|
|
if isinstance(rc, list):
|
|
rc = " ".join(x.get("text", "") for x in rc if isinstance(x, dict))
|
|
out.append(f"{ts} [result] {_clip(rc)}")
|
|
return out
|
|
|
|
|
|
def render_file(path, show_think=False):
|
|
lines = []
|
|
with open(path) as f:
|
|
for ln in f:
|
|
try:
|
|
o = json.loads(ln)
|
|
except Exception:
|
|
continue
|
|
lines += render_line(o, show_think)
|
|
return lines
|
|
|
|
|
|
def cmd_render(agent, show_think=False):
|
|
p = active_jsonl(agent)
|
|
if not p:
|
|
print(f"(no transcript found for {agent})", file=sys.stderr); sys.exit(1)
|
|
print(f"# {agent} ← {p}")
|
|
for l in render_file(p, show_think):
|
|
print(l)
|
|
|
|
|
|
def cmd_tail(agent, n=40):
|
|
p = active_jsonl(agent)
|
|
if not p:
|
|
print(f"(no transcript found for {agent})", file=sys.stderr); sys.exit(1)
|
|
for l in render_file(p)[-n:]:
|
|
print(l)
|
|
|
|
|
|
def cmd_follow_all():
|
|
"""One cheap process: keep every agent's <agent>.clean.log current. Re-resolves the active
|
|
transcript each pass (handles restarts) and appends only newly-parsed events."""
|
|
os.makedirs(LOGDIR, exist_ok=True)
|
|
state = {} # agent -> (path, byte_offset)
|
|
while True:
|
|
for agent in AGENTS:
|
|
p = active_jsonl(agent)
|
|
if not p:
|
|
continue
|
|
prev_path, off = state.get(agent, (None, 0))
|
|
if p != prev_path:
|
|
off = 0 # new/restarted session → render from the top
|
|
try:
|
|
size = os.path.getsize(p)
|
|
if off > size: # file truncated/rotated
|
|
off = 0
|
|
with open(p) as f:
|
|
f.seek(off)
|
|
chunk = f.read()
|
|
new_off = f.tell()
|
|
except FileNotFoundError:
|
|
continue
|
|
if chunk:
|
|
rendered = []
|
|
for ln in chunk.splitlines():
|
|
try:
|
|
o = json.loads(ln)
|
|
except Exception:
|
|
continue
|
|
rendered += render_line(o)
|
|
if rendered:
|
|
with open(os.path.join(LOGDIR, f"{agent}.clean.log"), "a") as out:
|
|
out.write("\n".join(rendered) + "\n")
|
|
state[agent] = (p, new_off)
|
|
time.sleep(5)
|
|
|
|
|
|
def main():
|
|
a = sys.argv[1:] or ["follow-all"]
|
|
cmd = a[0]
|
|
if cmd == "render":
|
|
cmd_render(a[1], "--think" in a)
|
|
elif cmd == "tail":
|
|
cmd_tail(a[1], int(a[2]) if len(a) > 2 and a[2].isdigit() else 40)
|
|
elif cmd == "follow-all":
|
|
cmd_follow_all()
|
|
else:
|
|
print(__doc__); sys.exit(2)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|