Unit tests (no agents/tmux): config load + defaults merge, kickoff-template assembly, phase machine (advance/idempotent-complete/append-resumes), limit reset-banner parsing, WAITING-UNTIL/stall parsing, claude+opencode activity detectors. Live smokes bring a throwaway project up THROUGH agents.py on each real backend in an isolated sandbox (unique prefix, opencode on a non-4096 port), verify attach + status + down, and clean up. tests/run.sh runs unit always + smokes when backends present; README documents it. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
527 lines
22 KiB
Python
Executable File
527 lines
22 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Unit tests for the agent-orchestrator harness (agents.py).
|
|
|
|
Pure-logic tests — NO agent CLIs spawned, NO live tmux sessions created. Every test builds a
|
|
throwaway config + fixture files in a tempdir and exercises the harness functions directly.
|
|
The one function that would spawn sessions (phase_advance_check → start/stop_loops) is tested
|
|
with those two hooks monkeypatched to recorders, so the phase-machine *logic* is covered without
|
|
launching anything.
|
|
|
|
Run: python3 -m unittest tests.test_unit (from repo root)
|
|
or python3 tests/test_unit.py
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import textwrap
|
|
import tempfile
|
|
import shutil
|
|
import unittest
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parent.parent
|
|
sys.path.insert(0, str(REPO_ROOT))
|
|
import agents # noqa: E402
|
|
|
|
|
|
# ── shared fixture config ────────────────────────────────────────────────────────
|
|
|
|
BASE_TOML = r"""
|
|
[watchdog]
|
|
signal_interval = 30
|
|
heavy_interval = 300
|
|
limit_probe_fallback = 300
|
|
limit_reset_slack = 45
|
|
stall_grace = 180
|
|
|
|
[defaults]
|
|
session_prefix = "aotest-ut-"
|
|
log_dir = "state"
|
|
backend = "claude"
|
|
model = "claude-sonnet-4-6"
|
|
watch = "none"
|
|
|
|
[backend.claude]
|
|
bin = "claude"
|
|
flags = "--dangerously-skip-permissions"
|
|
remote_control = true
|
|
supports_resume = true
|
|
prompt_delivery = "arg"
|
|
process_name = "claude"
|
|
submit_key = "Enter"
|
|
stall_idle = 300
|
|
active_re = "esc to interrupt|Running tool|\\u00b7 \\d+"
|
|
limit_re = "spend limit|usage limit|limit reached|reached your .*limit|out of (credits|tokens)"
|
|
fatal_re = "redacted_thinking|blocks cannot be modified"
|
|
|
|
[backend.opencode]
|
|
bin = "opencode"
|
|
attach = "{bin} attach {server} --dir {dir}"
|
|
server = "http://127.0.0.1:4096"
|
|
supports_resume = false
|
|
prompt_delivery = "ping"
|
|
process_name = "opencode"
|
|
footer_ui = true
|
|
log_grace = 180
|
|
connect_delay = 12
|
|
submit_key = "C-m"
|
|
stall_idle = 900
|
|
active_re = "esc interrupt|thinking|inferring|running tool|tool call|preparing patch|reading|searching"
|
|
limit_re = "usage limit|limit reached"
|
|
|
|
[backend.demo]
|
|
bin = "echo up; exec sleep 100000"
|
|
prompt_delivery = "exec"
|
|
|
|
[[agent]]
|
|
name = "builder"
|
|
kind = "loop"
|
|
role = "builder"
|
|
backend = "demo"
|
|
|
|
[[agent]]
|
|
name = "adversary"
|
|
kind = "loop"
|
|
role = "adversary"
|
|
backend = "demo"
|
|
|
|
[[agent]]
|
|
name = "cl"
|
|
kind = "persistent"
|
|
backend = "claude"
|
|
prompt = "hi"
|
|
|
|
[[agent]]
|
|
name = "oc"
|
|
kind = "persistent"
|
|
backend = "opencode"
|
|
prompt = "hi"
|
|
|
|
[[agent]]
|
|
name = "custom"
|
|
kind = "persistent"
|
|
session = "explicit-session"
|
|
model = "override-model"
|
|
dir = "/abs/somewhere"
|
|
backend = "demo"
|
|
prompt = "x"
|
|
|
|
[[service]]
|
|
name = "svc"
|
|
command = "sleep 1"
|
|
|
|
[loop]
|
|
state_file = "phase-idx"
|
|
resume_phase = true
|
|
auto_advance = true
|
|
done_marker = "## DONE"
|
|
kickoff_template = "prompts/kickoff.md"
|
|
roles_dir = "prompts"
|
|
handoff = { repo = ".", claim_pings = "adversary", review_pings = "builder", inboxes = ["ADVERSARY-INBOX.md", "BUILDER-INBOX.md"], state_subdir = "machine-docs" }
|
|
phases = [
|
|
{ id = "p1", plan = "PLAN1.md", status = "STATUS-p1.md" },
|
|
{ id = "p2", plan = "PLAN2.md", status = "STATUS-p2.md", models = { builder = "opus-x" } },
|
|
]
|
|
"""
|
|
|
|
KICKOFF_TMPL = "*** PROJECT PHASE: {phase_id} ***\nPLAN: {plan}\nSTATUS: {status}\nROLE: {role}\n---\n"
|
|
BUILDER_PROMPT = "You are the **Builder** agent. (builder role body marker)\n"
|
|
ADVERSARY_PROMPT = "You are the **Adversary** agent. (adversary role body marker)\n"
|
|
|
|
|
|
def _make_project(tmp, toml=BASE_TOML):
|
|
"""Write a self-contained project (config + prompts + machine-docs) into tmp; return cfg path."""
|
|
root = Path(tmp)
|
|
(root / "prompts").mkdir(parents=True, exist_ok=True)
|
|
(root / "machine-docs").mkdir(parents=True, exist_ok=True)
|
|
(root / "prompts" / "kickoff.md").write_text(KICKOFF_TMPL)
|
|
(root / "prompts" / "builder.md").write_text(BUILDER_PROMPT)
|
|
(root / "prompts" / "adversary.md").write_text(ADVERSARY_PROMPT)
|
|
cfg_path = root / "agents.toml"
|
|
cfg_path.write_text(toml)
|
|
return cfg_path
|
|
|
|
|
|
# ── config loading + defaults merge ────────────────────────────────────────────────
|
|
|
|
class TestConfigLoad(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp(prefix="aotest-ut-")
|
|
self.cfg_path = _make_project(self.tmp)
|
|
self.cfg = agents.load_config(self.cfg_path)
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def test_defaults_merge_into_agents(self):
|
|
b = self.cfg["agents"]["builder"]
|
|
self.assertEqual(b["session_prefix"], "aotest-ut-")
|
|
self.assertEqual(b["watch"], "none") # from defaults
|
|
self.assertEqual(b["kind"], "loop") # explicit
|
|
|
|
def test_session_name_defaults_to_prefix_plus_name(self):
|
|
self.assertEqual(self.cfg["agents"]["builder"]["session"], "aotest-ut-builder")
|
|
|
|
def test_explicit_session_overrides_prefix(self):
|
|
self.assertEqual(self.cfg["agents"]["custom"]["session"], "explicit-session")
|
|
|
|
def test_per_agent_override_wins_over_default(self):
|
|
# default model is claude-sonnet-4-6; custom overrides
|
|
self.assertEqual(self.cfg["agents"]["custom"]["model"], "override-model")
|
|
self.assertEqual(self.cfg["agents"]["builder"]["model"], "claude-sonnet-4-6")
|
|
|
|
def test_relative_dir_resolved_against_project_root(self):
|
|
# builder has no dir → defaults dir "." → project_dir
|
|
self.assertEqual(self.cfg["agents"]["builder"]["dir"], self.cfg["project_dir"])
|
|
|
|
def test_absolute_dir_kept(self):
|
|
self.assertEqual(self.cfg["agents"]["custom"]["dir"], "/abs/somewhere")
|
|
|
|
def test_log_dir_and_state_dir_resolved(self):
|
|
self.assertEqual(self.cfg["log_dir"], str(Path(self.cfg["project_dir"]) / "state"))
|
|
self.assertEqual(self.cfg["state_dir"], os.path.join(self.cfg["log_dir"], "state"))
|
|
self.assertTrue(Path(self.cfg["state_dir"]).is_dir()) # created on load
|
|
|
|
def test_service_session_named(self):
|
|
self.assertIn("svc", self.cfg["services"])
|
|
self.assertEqual(self.cfg["services"]["svc"]["session"], "aotest-ut-svc")
|
|
|
|
def test_backend_of_resolves(self):
|
|
b = agents.backend_of(self.cfg, self.cfg["agents"]["cl"])
|
|
self.assertEqual(b["prompt_delivery"], "arg")
|
|
self.assertEqual(b["submit_key"], "Enter")
|
|
|
|
def test_backend_of_unknown_dies(self):
|
|
a = dict(self.cfg["agents"]["cl"]); a["backend"] = "nope"
|
|
with self.assertRaises(SystemExit):
|
|
agents.backend_of(self.cfg, a)
|
|
|
|
def test_missing_session_prefix_dies(self):
|
|
bad = self.tmp + "/bad1"
|
|
p = _make_project(bad, toml='[defaults]\nlog_dir = "state"\n')
|
|
with self.assertRaises(SystemExit):
|
|
agents.load_config(p)
|
|
|
|
def test_missing_log_dir_dies(self):
|
|
bad = self.tmp + "/bad2"
|
|
p = _make_project(bad, toml='[defaults]\nsession_prefix = "x-"\n')
|
|
with self.assertRaises(SystemExit):
|
|
agents.load_config(p)
|
|
|
|
def test_env_override_model_single_invocation(self):
|
|
os.environ["AGENT_MODEL_cl"] = "env-only-model"
|
|
try:
|
|
cfg2 = agents.load_config(self.cfg_path)
|
|
self.assertEqual(cfg2["agents"]["cl"]["model"], "env-only-model")
|
|
finally:
|
|
del os.environ["AGENT_MODEL_cl"]
|
|
# without the env var the file value stands again
|
|
cfg3 = agents.load_config(self.cfg_path)
|
|
self.assertEqual(cfg3["agents"]["cl"]["model"], "claude-sonnet-4-6")
|
|
|
|
|
|
class TestExampleConfig(unittest.TestCase):
|
|
"""The SHIPPED agents.example.toml must parse and define the documented shape."""
|
|
def test_example_config_loads(self):
|
|
ex = REPO_ROOT / "agents.example.toml"
|
|
self.assertTrue(ex.exists(), "agents.example.toml missing from repo")
|
|
cfg = agents.load_config(ex)
|
|
self.assertIn("builder", cfg["agents"])
|
|
self.assertIn("adversary", cfg["agents"])
|
|
for be in ("demo", "claude", "opencode"):
|
|
self.assertIn(be, cfg["backends"], f"backend {be} missing from example")
|
|
self.assertEqual(len(agents.phases(cfg)), 2)
|
|
|
|
|
|
# ── kickoff-template assembly ──────────────────────────────────────────────────────
|
|
|
|
class TestKickoff(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp(prefix="aotest-ut-")
|
|
self.cfg = agents.load_config(_make_project(self.tmp))
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def test_kickoff_renders_slots_and_appends_role(self):
|
|
out = agents.build_loop_kickoff(self.cfg, self.cfg["agents"]["builder"])
|
|
self.assertIn("PROJECT PHASE: p1", out) # phase_id slot filled (phase idx 0)
|
|
self.assertIn("PLAN: PLAN1.md", out)
|
|
self.assertIn("STATUS: STATUS-p1.md", out)
|
|
self.assertIn("ROLE: builder", out)
|
|
self.assertIn("builder role body marker", out) # role prompt appended
|
|
self.assertNotIn("{phase_id}", out) # no unrendered slot
|
|
self.assertNotIn("{role}", out)
|
|
|
|
def test_kickoff_picks_correct_role_prompt(self):
|
|
out = agents.build_loop_kickoff(self.cfg, self.cfg["agents"]["adversary"])
|
|
self.assertIn("adversary role body marker", out)
|
|
self.assertNotIn("builder role body marker", out)
|
|
|
|
def test_agent_prompt_loop_returns_kickoff(self):
|
|
out = agents.agent_prompt(self.cfg, self.cfg["agents"]["builder"])
|
|
self.assertIn("PROJECT PHASE: p1", out)
|
|
|
|
def test_agent_prompt_persistent_returns_inline_prompt(self):
|
|
out = agents.agent_prompt(self.cfg, self.cfg["agents"]["cl"])
|
|
self.assertEqual(out, "hi")
|
|
|
|
def test_role_model_phase_override(self):
|
|
# phase p2 overrides builder model to opus-x; advance index to 1
|
|
Path(agents.phase_idx_file(self.cfg)).write_text("1")
|
|
self.assertEqual(agents.role_model(self.cfg, self.cfg["agents"]["builder"]), "opus-x")
|
|
# adversary has no override → its configured/default model
|
|
self.assertEqual(agents.role_model(self.cfg, self.cfg["agents"]["adversary"]),
|
|
"claude-sonnet-4-6")
|
|
|
|
|
|
# ── phase machine ──────────────────────────────────────────────────────────────────
|
|
|
|
class TestPhaseMachine(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp(prefix="aotest-ut-")
|
|
self.cfg = agents.load_config(_make_project(self.tmp))
|
|
self.md = Path(self.cfg["project_dir"]) / "machine-docs"
|
|
# monkeypatch the session-spawning hooks so the machine logic runs without tmux
|
|
self._orig = (agents.stop_loops, agents.start_loops, agents.handoff_reset)
|
|
self.calls = []
|
|
agents.stop_loops = lambda cfg: self.calls.append("stop")
|
|
agents.start_loops = lambda cfg: self.calls.append("start")
|
|
agents.handoff_reset = lambda: self.calls.append("reset")
|
|
|
|
def tearDown(self):
|
|
agents.stop_loops, agents.start_loops, agents.handoff_reset = self._orig
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def _status(self, basename, text):
|
|
(self.md / basename).write_text(text)
|
|
|
|
def test_phase_done_detects_marker(self):
|
|
self._status("STATUS-p1.md", "header\n## DONE\nall verified PASS\n")
|
|
self.assertTrue(agents.phase_done(self.cfg, "STATUS-p1.md"))
|
|
|
|
def test_phase_done_rejects_placeholder_body(self):
|
|
self._status("STATUS-p1.md", "## DONE\nnot yet — written here only when complete\n")
|
|
self.assertFalse(agents.phase_done(self.cfg, "STATUS-p1.md"))
|
|
|
|
def test_phase_done_false_when_no_marker(self):
|
|
self._status("STATUS-p1.md", "## In progress\nworking\n")
|
|
self.assertFalse(agents.phase_done(self.cfg, "STATUS-p1.md"))
|
|
|
|
def test_phase_done_false_when_file_missing(self):
|
|
self.assertFalse(agents.phase_done(self.cfg, "STATUS-nope.md"))
|
|
|
|
def test_cur_idx_reads_state_file(self):
|
|
Path(agents.phase_idx_file(self.cfg)).write_text("1")
|
|
self.assertEqual(agents.cur_idx(self.cfg), 1)
|
|
|
|
def test_advance_on_done(self):
|
|
Path(agents.phase_idx_file(self.cfg)).write_text("0")
|
|
self._status("STATUS-p1.md", "## DONE\nverified\n")
|
|
advanced = agents.phase_advance_check(self.cfg)
|
|
self.assertTrue(advanced)
|
|
self.assertEqual(agents.cur_idx(self.cfg), 1) # moved to p2
|
|
self.assertIn("stop", self.calls)
|
|
self.assertIn("start", self.calls)
|
|
|
|
def test_no_advance_when_not_done(self):
|
|
Path(agents.phase_idx_file(self.cfg)).write_text("0")
|
|
self._status("STATUS-p1.md", "## In progress\n")
|
|
self.assertFalse(agents.phase_advance_check(self.cfg))
|
|
self.assertEqual(agents.cur_idx(self.cfg), 0)
|
|
self.assertEqual(self.calls, [])
|
|
|
|
def test_sequence_complete_idempotent(self):
|
|
Path(agents.phase_idx_file(self.cfg)).write_text("1") # last phase
|
|
self._status("STATUS-p2.md", "## DONE\nverified\n")
|
|
marker = Path(self.cfg["log_dir"]) / "SEQUENCE-COMPLETE"
|
|
# first call: completes the sequence
|
|
self.assertTrue(agents.phase_advance_check(self.cfg))
|
|
self.assertTrue(marker.exists())
|
|
self.assertEqual(self.calls.count("stop"), 1)
|
|
# second call: idempotent — no re-stop, returns False
|
|
self.assertFalse(agents.phase_advance_check(self.cfg))
|
|
self.assertEqual(self.calls.count("stop"), 1)
|
|
|
|
def test_append_phase_clears_marker_and_resumes(self):
|
|
# simulate "sequence already complete", then a 3rd phase appended to the config
|
|
Path(agents.phase_idx_file(self.cfg)).write_text("1")
|
|
self._status("STATUS-p2.md", "## DONE\nverified\n")
|
|
marker = Path(self.cfg["log_dir"]) / "SEQUENCE-COMPLETE"
|
|
marker.write_text("stale completion\n")
|
|
self.cfg["loop"]["phases"].append(
|
|
{"id": "p3", "plan": "PLAN3.md", "status": "STATUS-p3.md"})
|
|
advanced = agents.phase_advance_check(self.cfg)
|
|
self.assertTrue(advanced)
|
|
self.assertEqual(agents.cur_idx(self.cfg), 2) # resumed onto p3
|
|
self.assertFalse(marker.exists()) # stale marker cleared
|
|
self.assertIn("start", self.calls)
|
|
|
|
def test_custom_done_marker(self):
|
|
self.cfg["loop"]["done_marker"] = "## SHIPPED"
|
|
self._status("STATUS-p1.md", "## SHIPPED\nverified\n")
|
|
self.assertTrue(agents.phase_done(self.cfg, "STATUS-p1.md"))
|
|
self.assertFalse(agents.phase_done(self.cfg, "STATUS-p2.md"))
|
|
|
|
|
|
# ── usage-limit banner reset parsing ───────────────────────────────────────────────
|
|
|
|
class TestLimitParsing(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp(prefix="aotest-ut-")
|
|
self.cfg = agents.load_config(_make_project(self.tmp))
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def test_parse_reset_pm(self):
|
|
ep = agents._parse_reset_epoch("You've hit your limit · resets at 10pm")
|
|
self.assertIsNotNone(ep)
|
|
self.assertEqual(datetime.fromtimestamp(ep).hour, 22)
|
|
|
|
def test_parse_reset_am_with_minutes(self):
|
|
ep = agents._parse_reset_epoch("resets 3:30am")
|
|
self.assertIsNotNone(ep)
|
|
dt = datetime.fromtimestamp(ep)
|
|
self.assertEqual((dt.hour, dt.minute), (3, 30))
|
|
|
|
def test_parse_reset_12am_is_midnight(self):
|
|
ep = agents._parse_reset_epoch("resets at 12am")
|
|
self.assertEqual(datetime.fromtimestamp(ep).hour, 0)
|
|
|
|
def test_parse_reset_invalid_hour_none(self):
|
|
self.assertIsNone(agents._parse_reset_epoch("resets at 25"))
|
|
|
|
def test_parse_reset_no_match_none(self):
|
|
self.assertIsNone(agents._parse_reset_epoch("everything is fine here"))
|
|
|
|
def test_parse_reset_picks_last_match(self):
|
|
ep = agents._parse_reset_epoch("resets at 9am ... actually resets at 11am")
|
|
self.assertEqual(datetime.fromtimestamp(ep).hour, 11)
|
|
|
|
def test_next_limit_until_unparsable_fallback(self):
|
|
now = time.time()
|
|
until, parsed = agents._next_limit_until(self.cfg, "limit reached, no time given", now)
|
|
self.assertFalse(parsed)
|
|
self.assertEqual(int(until), int(now + 300)) # limit_probe_fallback
|
|
|
|
def test_next_limit_until_within_window_uses_banner(self):
|
|
now = time.time()
|
|
t = datetime.now() + timedelta(hours=2)
|
|
h12 = t.hour % 12 or 12
|
|
ampm = "am" if t.hour < 12 else "pm"
|
|
banner = f"weekly limit · resets at {h12}:{t.minute:02d}{ampm}"
|
|
until, parsed = agents._next_limit_until(self.cfg, banner, now)
|
|
self.assertTrue(parsed)
|
|
self.assertGreater(until, now)
|
|
self.assertLessEqual(until - now, 6 * 3600 + 60) # within 6h window (+slack)
|
|
|
|
def test_next_limit_until_far_future_falls_back(self):
|
|
now = time.time()
|
|
t = datetime.now() + timedelta(hours=7) # > 6h window
|
|
h12 = t.hour % 12 or 12
|
|
ampm = "am" if t.hour < 12 else "pm"
|
|
banner = f"limit · resets at {h12}:{t.minute:02d}{ampm}"
|
|
until, parsed = agents._next_limit_until(self.cfg, banner, now)
|
|
self.assertFalse(parsed)
|
|
self.assertEqual(int(until), int(now + 300))
|
|
|
|
|
|
# ── stall / WAITING-UNTIL parsing ──────────────────────────────────────────────────
|
|
|
|
class TestWaitingUntil(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp(prefix="aotest-ut-")
|
|
self.cfg = agents.load_config(_make_project(self.tmp))
|
|
self.claude_agent = self.cfg["agents"]["cl"] # non-footer backend
|
|
self.oc_agent = self.cfg["agents"]["oc"] # footer_ui backend
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
def test_non_footer_finds_marker_anywhere(self):
|
|
pane = "blah blah\nWAITING-UNTIL: 2030-06-13T12:00:00Z\nmore output after\n"
|
|
ep = agents._parse_waiting_until(self.cfg, self.claude_agent, pane)
|
|
self.assertIsNotNone(ep)
|
|
self.assertEqual(ep, datetime.fromisoformat("2030-06-13T12:00:00+00:00").timestamp())
|
|
|
|
def test_non_footer_none_without_marker(self):
|
|
self.assertIsNone(agents._parse_waiting_until(
|
|
self.cfg, self.claude_agent, "just working, no marker"))
|
|
|
|
def test_footer_requires_marker_as_last_line(self):
|
|
# marker present but NOT the last non-empty line → ignored for a footer UI
|
|
pane = "WAITING-UNTIL: 2030-06-13T12:00:00Z\n ▣ Build · GPT · 2m 19s\n"
|
|
self.assertIsNone(agents._parse_waiting_until(self.cfg, self.oc_agent, pane))
|
|
|
|
def test_footer_honors_marker_when_last_line(self):
|
|
pane = "some work\nWAITING-UNTIL: 2030-06-13T12:00:00Z\n\n"
|
|
ep = agents._parse_waiting_until(self.cfg, self.oc_agent, pane)
|
|
self.assertIsNotNone(ep)
|
|
|
|
def test_bad_timestamp_none(self):
|
|
self.assertIsNone(agents._parse_waiting_until(
|
|
self.cfg, self.claude_agent, "WAITING-UNTIL: not-a-time"))
|
|
|
|
|
|
# ── backend activity detectors (claude + opencode footers) ──────────────────────────
|
|
|
|
class TestActivityDetection(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp = tempfile.mkdtemp(prefix="aotest-ut-")
|
|
self.cfg = agents.load_config(_make_project(self.tmp))
|
|
self.claude_agent = self.cfg["agents"]["cl"]
|
|
self.oc_agent = self.cfg["agents"]["oc"]
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
|
|
|
# claude: non-footer, active_re matched anywhere in the pane
|
|
def test_claude_active_esc_to_interrupt(self):
|
|
self.assertTrue(agents.pane_active(
|
|
self.cfg, self.claude_agent, "thinking...\n esc to interrupt", use_log=False))
|
|
|
|
def test_claude_active_running_tool(self):
|
|
self.assertTrue(agents.pane_active(
|
|
self.cfg, self.claude_agent, "Running tool: Bash", use_log=False))
|
|
|
|
def test_claude_active_spinner_dot_count(self):
|
|
self.assertTrue(agents.pane_active(
|
|
self.cfg, self.claude_agent, "Compiling · 137 tokens", use_log=False))
|
|
|
|
def test_claude_idle_is_not_active(self):
|
|
self.assertFalse(agents.pane_active(
|
|
self.cfg, self.claude_agent, "Done.\n> ", use_log=False))
|
|
|
|
# opencode: footer_ui — only the bottom rows count as activity
|
|
def test_opencode_active_footer(self):
|
|
pane = "~ Preparing patch...\n ⬝⬝■ esc interrupt 137.6K\n"
|
|
self.assertTrue(agents.pane_active(self.cfg, self.oc_agent, pane, use_log=False))
|
|
|
|
def test_opencode_idle_footer_not_active(self):
|
|
pane = " ▣ Build · GPT-5.4 · 2m 19s\n 178.4K (17%) ctrl+p commands\n"
|
|
self.assertFalse(agents.pane_active(self.cfg, self.oc_agent, pane, use_log=False))
|
|
|
|
def test_opencode_active_only_at_top_is_ignored(self):
|
|
# active marker far above the bottom 10 lines → a footer UI ignores it
|
|
pane = "running tool now\n" + "\n".join(f"line {i}" for i in range(20)) + \
|
|
"\n ▣ Build · GPT · idle\n"
|
|
self.assertFalse(agents.pane_active(self.cfg, self.oc_agent, pane, use_log=False))
|
|
|
|
def test_opencode_log_grace_fallback(self):
|
|
# idle footer, but a freshly-touched session log within the grace window → active
|
|
idle = " ▣ Build · GPT · idle\n 178K (17%) ctrl+p\n"
|
|
logp = agents._session_log_path(self.cfg, self.oc_agent["session"])
|
|
logp.parent.mkdir(parents=True, exist_ok=True)
|
|
logp.write_text("recent activity\n") # mtime = now
|
|
self.assertTrue(agents.pane_active(self.cfg, self.oc_agent, idle, use_log=True))
|
|
# remove the log → no fallback → idle footer reads as not active
|
|
logp.unlink()
|
|
self.assertFalse(agents.pane_active(self.cfg, self.oc_agent, idle, use_log=True))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main(verbosity=2)
|