test: add tests/ — unit suite + isolated live claude/opencode smokes + runner
Unit tests (no agents/tmux): config load + defaults merge, kickoff-template assembly, phase machine (advance/idempotent-complete/append-resumes), limit reset-banner parsing, WAITING-UNTIL/stall parsing, claude+opencode activity detectors. Live smokes bring a throwaway project up THROUGH agents.py on each real backend in an isolated sandbox (unique prefix, opencode on a non-4096 port), verify attach + status + down, and clean up. tests/run.sh runs unit always + smokes when backends present; README documents it. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
526
tests/test_unit.py
Executable file
526
tests/test_unit.py
Executable file
@ -0,0 +1,526 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Unit tests for the agent-orchestrator harness (agents.py).
|
||||
|
||||
Pure-logic tests — NO agent CLIs spawned, NO live tmux sessions created. Every test builds a
|
||||
throwaway config + fixture files in a tempdir and exercises the harness functions directly.
|
||||
The one function that would spawn sessions (phase_advance_check → start/stop_loops) is tested
|
||||
with those two hooks monkeypatched to recorders, so the phase-machine *logic* is covered without
|
||||
launching anything.
|
||||
|
||||
Run: python3 -m unittest tests.test_unit (from repo root)
|
||||
or python3 tests/test_unit.py
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import textwrap
|
||||
import tempfile
|
||||
import shutil
|
||||
import unittest
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent
|
||||
sys.path.insert(0, str(REPO_ROOT))
|
||||
import agents # noqa: E402
|
||||
|
||||
|
||||
# ── shared fixture config ────────────────────────────────────────────────────────
|
||||
|
||||
BASE_TOML = r"""
|
||||
[watchdog]
|
||||
signal_interval = 30
|
||||
heavy_interval = 300
|
||||
limit_probe_fallback = 300
|
||||
limit_reset_slack = 45
|
||||
stall_grace = 180
|
||||
|
||||
[defaults]
|
||||
session_prefix = "aotest-ut-"
|
||||
log_dir = "state"
|
||||
backend = "claude"
|
||||
model = "claude-sonnet-4-6"
|
||||
watch = "none"
|
||||
|
||||
[backend.claude]
|
||||
bin = "claude"
|
||||
flags = "--dangerously-skip-permissions"
|
||||
remote_control = true
|
||||
supports_resume = true
|
||||
prompt_delivery = "arg"
|
||||
process_name = "claude"
|
||||
submit_key = "Enter"
|
||||
stall_idle = 300
|
||||
active_re = "esc to interrupt|Running tool|\\u00b7 \\d+"
|
||||
limit_re = "spend limit|usage limit|limit reached|reached your .*limit|out of (credits|tokens)"
|
||||
fatal_re = "redacted_thinking|blocks cannot be modified"
|
||||
|
||||
[backend.opencode]
|
||||
bin = "opencode"
|
||||
attach = "{bin} attach {server} --dir {dir}"
|
||||
server = "http://127.0.0.1:4096"
|
||||
supports_resume = false
|
||||
prompt_delivery = "ping"
|
||||
process_name = "opencode"
|
||||
footer_ui = true
|
||||
log_grace = 180
|
||||
connect_delay = 12
|
||||
submit_key = "C-m"
|
||||
stall_idle = 900
|
||||
active_re = "esc interrupt|thinking|inferring|running tool|tool call|preparing patch|reading|searching"
|
||||
limit_re = "usage limit|limit reached"
|
||||
|
||||
[backend.demo]
|
||||
bin = "echo up; exec sleep 100000"
|
||||
prompt_delivery = "exec"
|
||||
|
||||
[[agent]]
|
||||
name = "builder"
|
||||
kind = "loop"
|
||||
role = "builder"
|
||||
backend = "demo"
|
||||
|
||||
[[agent]]
|
||||
name = "adversary"
|
||||
kind = "loop"
|
||||
role = "adversary"
|
||||
backend = "demo"
|
||||
|
||||
[[agent]]
|
||||
name = "cl"
|
||||
kind = "persistent"
|
||||
backend = "claude"
|
||||
prompt = "hi"
|
||||
|
||||
[[agent]]
|
||||
name = "oc"
|
||||
kind = "persistent"
|
||||
backend = "opencode"
|
||||
prompt = "hi"
|
||||
|
||||
[[agent]]
|
||||
name = "custom"
|
||||
kind = "persistent"
|
||||
session = "explicit-session"
|
||||
model = "override-model"
|
||||
dir = "/abs/somewhere"
|
||||
backend = "demo"
|
||||
prompt = "x"
|
||||
|
||||
[[service]]
|
||||
name = "svc"
|
||||
command = "sleep 1"
|
||||
|
||||
[loop]
|
||||
state_file = "phase-idx"
|
||||
resume_phase = true
|
||||
auto_advance = true
|
||||
done_marker = "## DONE"
|
||||
kickoff_template = "prompts/kickoff.md"
|
||||
roles_dir = "prompts"
|
||||
handoff = { repo = ".", claim_pings = "adversary", review_pings = "builder", inboxes = ["ADVERSARY-INBOX.md", "BUILDER-INBOX.md"], state_subdir = "machine-docs" }
|
||||
phases = [
|
||||
{ id = "p1", plan = "PLAN1.md", status = "STATUS-p1.md" },
|
||||
{ id = "p2", plan = "PLAN2.md", status = "STATUS-p2.md", models = { builder = "opus-x" } },
|
||||
]
|
||||
"""
|
||||
|
||||
KICKOFF_TMPL = "*** PROJECT PHASE: {phase_id} ***\nPLAN: {plan}\nSTATUS: {status}\nROLE: {role}\n---\n"
|
||||
BUILDER_PROMPT = "You are the **Builder** agent. (builder role body marker)\n"
|
||||
ADVERSARY_PROMPT = "You are the **Adversary** agent. (adversary role body marker)\n"
|
||||
|
||||
|
||||
def _make_project(tmp, toml=BASE_TOML):
|
||||
"""Write a self-contained project (config + prompts + machine-docs) into tmp; return cfg path."""
|
||||
root = Path(tmp)
|
||||
(root / "prompts").mkdir(parents=True, exist_ok=True)
|
||||
(root / "machine-docs").mkdir(parents=True, exist_ok=True)
|
||||
(root / "prompts" / "kickoff.md").write_text(KICKOFF_TMPL)
|
||||
(root / "prompts" / "builder.md").write_text(BUILDER_PROMPT)
|
||||
(root / "prompts" / "adversary.md").write_text(ADVERSARY_PROMPT)
|
||||
cfg_path = root / "agents.toml"
|
||||
cfg_path.write_text(toml)
|
||||
return cfg_path
|
||||
|
||||
|
||||
# ── config loading + defaults merge ────────────────────────────────────────────────
|
||||
|
||||
class TestConfigLoad(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp(prefix="aotest-ut-")
|
||||
self.cfg_path = _make_project(self.tmp)
|
||||
self.cfg = agents.load_config(self.cfg_path)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tmp, ignore_errors=True)
|
||||
|
||||
def test_defaults_merge_into_agents(self):
|
||||
b = self.cfg["agents"]["builder"]
|
||||
self.assertEqual(b["session_prefix"], "aotest-ut-")
|
||||
self.assertEqual(b["watch"], "none") # from defaults
|
||||
self.assertEqual(b["kind"], "loop") # explicit
|
||||
|
||||
def test_session_name_defaults_to_prefix_plus_name(self):
|
||||
self.assertEqual(self.cfg["agents"]["builder"]["session"], "aotest-ut-builder")
|
||||
|
||||
def test_explicit_session_overrides_prefix(self):
|
||||
self.assertEqual(self.cfg["agents"]["custom"]["session"], "explicit-session")
|
||||
|
||||
def test_per_agent_override_wins_over_default(self):
|
||||
# default model is claude-sonnet-4-6; custom overrides
|
||||
self.assertEqual(self.cfg["agents"]["custom"]["model"], "override-model")
|
||||
self.assertEqual(self.cfg["agents"]["builder"]["model"], "claude-sonnet-4-6")
|
||||
|
||||
def test_relative_dir_resolved_against_project_root(self):
|
||||
# builder has no dir → defaults dir "." → project_dir
|
||||
self.assertEqual(self.cfg["agents"]["builder"]["dir"], self.cfg["project_dir"])
|
||||
|
||||
def test_absolute_dir_kept(self):
|
||||
self.assertEqual(self.cfg["agents"]["custom"]["dir"], "/abs/somewhere")
|
||||
|
||||
def test_log_dir_and_state_dir_resolved(self):
|
||||
self.assertEqual(self.cfg["log_dir"], str(Path(self.cfg["project_dir"]) / "state"))
|
||||
self.assertEqual(self.cfg["state_dir"], os.path.join(self.cfg["log_dir"], "state"))
|
||||
self.assertTrue(Path(self.cfg["state_dir"]).is_dir()) # created on load
|
||||
|
||||
def test_service_session_named(self):
|
||||
self.assertIn("svc", self.cfg["services"])
|
||||
self.assertEqual(self.cfg["services"]["svc"]["session"], "aotest-ut-svc")
|
||||
|
||||
def test_backend_of_resolves(self):
|
||||
b = agents.backend_of(self.cfg, self.cfg["agents"]["cl"])
|
||||
self.assertEqual(b["prompt_delivery"], "arg")
|
||||
self.assertEqual(b["submit_key"], "Enter")
|
||||
|
||||
def test_backend_of_unknown_dies(self):
|
||||
a = dict(self.cfg["agents"]["cl"]); a["backend"] = "nope"
|
||||
with self.assertRaises(SystemExit):
|
||||
agents.backend_of(self.cfg, a)
|
||||
|
||||
def test_missing_session_prefix_dies(self):
|
||||
bad = self.tmp + "/bad1"
|
||||
p = _make_project(bad, toml='[defaults]\nlog_dir = "state"\n')
|
||||
with self.assertRaises(SystemExit):
|
||||
agents.load_config(p)
|
||||
|
||||
def test_missing_log_dir_dies(self):
|
||||
bad = self.tmp + "/bad2"
|
||||
p = _make_project(bad, toml='[defaults]\nsession_prefix = "x-"\n')
|
||||
with self.assertRaises(SystemExit):
|
||||
agents.load_config(p)
|
||||
|
||||
def test_env_override_model_single_invocation(self):
|
||||
os.environ["AGENT_MODEL_cl"] = "env-only-model"
|
||||
try:
|
||||
cfg2 = agents.load_config(self.cfg_path)
|
||||
self.assertEqual(cfg2["agents"]["cl"]["model"], "env-only-model")
|
||||
finally:
|
||||
del os.environ["AGENT_MODEL_cl"]
|
||||
# without the env var the file value stands again
|
||||
cfg3 = agents.load_config(self.cfg_path)
|
||||
self.assertEqual(cfg3["agents"]["cl"]["model"], "claude-sonnet-4-6")
|
||||
|
||||
|
||||
class TestExampleConfig(unittest.TestCase):
|
||||
"""The SHIPPED agents.example.toml must parse and define the documented shape."""
|
||||
def test_example_config_loads(self):
|
||||
ex = REPO_ROOT / "agents.example.toml"
|
||||
self.assertTrue(ex.exists(), "agents.example.toml missing from repo")
|
||||
cfg = agents.load_config(ex)
|
||||
self.assertIn("builder", cfg["agents"])
|
||||
self.assertIn("adversary", cfg["agents"])
|
||||
for be in ("demo", "claude", "opencode"):
|
||||
self.assertIn(be, cfg["backends"], f"backend {be} missing from example")
|
||||
self.assertEqual(len(agents.phases(cfg)), 2)
|
||||
|
||||
|
||||
# ── kickoff-template assembly ──────────────────────────────────────────────────────
|
||||
|
||||
class TestKickoff(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp(prefix="aotest-ut-")
|
||||
self.cfg = agents.load_config(_make_project(self.tmp))
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tmp, ignore_errors=True)
|
||||
|
||||
def test_kickoff_renders_slots_and_appends_role(self):
|
||||
out = agents.build_loop_kickoff(self.cfg, self.cfg["agents"]["builder"])
|
||||
self.assertIn("PROJECT PHASE: p1", out) # phase_id slot filled (phase idx 0)
|
||||
self.assertIn("PLAN: PLAN1.md", out)
|
||||
self.assertIn("STATUS: STATUS-p1.md", out)
|
||||
self.assertIn("ROLE: builder", out)
|
||||
self.assertIn("builder role body marker", out) # role prompt appended
|
||||
self.assertNotIn("{phase_id}", out) # no unrendered slot
|
||||
self.assertNotIn("{role}", out)
|
||||
|
||||
def test_kickoff_picks_correct_role_prompt(self):
|
||||
out = agents.build_loop_kickoff(self.cfg, self.cfg["agents"]["adversary"])
|
||||
self.assertIn("adversary role body marker", out)
|
||||
self.assertNotIn("builder role body marker", out)
|
||||
|
||||
def test_agent_prompt_loop_returns_kickoff(self):
|
||||
out = agents.agent_prompt(self.cfg, self.cfg["agents"]["builder"])
|
||||
self.assertIn("PROJECT PHASE: p1", out)
|
||||
|
||||
def test_agent_prompt_persistent_returns_inline_prompt(self):
|
||||
out = agents.agent_prompt(self.cfg, self.cfg["agents"]["cl"])
|
||||
self.assertEqual(out, "hi")
|
||||
|
||||
def test_role_model_phase_override(self):
|
||||
# phase p2 overrides builder model to opus-x; advance index to 1
|
||||
Path(agents.phase_idx_file(self.cfg)).write_text("1")
|
||||
self.assertEqual(agents.role_model(self.cfg, self.cfg["agents"]["builder"]), "opus-x")
|
||||
# adversary has no override → its configured/default model
|
||||
self.assertEqual(agents.role_model(self.cfg, self.cfg["agents"]["adversary"]),
|
||||
"claude-sonnet-4-6")
|
||||
|
||||
|
||||
# ── phase machine ──────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestPhaseMachine(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp(prefix="aotest-ut-")
|
||||
self.cfg = agents.load_config(_make_project(self.tmp))
|
||||
self.md = Path(self.cfg["project_dir"]) / "machine-docs"
|
||||
# monkeypatch the session-spawning hooks so the machine logic runs without tmux
|
||||
self._orig = (agents.stop_loops, agents.start_loops, agents.handoff_reset)
|
||||
self.calls = []
|
||||
agents.stop_loops = lambda cfg: self.calls.append("stop")
|
||||
agents.start_loops = lambda cfg: self.calls.append("start")
|
||||
agents.handoff_reset = lambda: self.calls.append("reset")
|
||||
|
||||
def tearDown(self):
|
||||
agents.stop_loops, agents.start_loops, agents.handoff_reset = self._orig
|
||||
shutil.rmtree(self.tmp, ignore_errors=True)
|
||||
|
||||
def _status(self, basename, text):
|
||||
(self.md / basename).write_text(text)
|
||||
|
||||
def test_phase_done_detects_marker(self):
|
||||
self._status("STATUS-p1.md", "header\n## DONE\nall verified PASS\n")
|
||||
self.assertTrue(agents.phase_done(self.cfg, "STATUS-p1.md"))
|
||||
|
||||
def test_phase_done_rejects_placeholder_body(self):
|
||||
self._status("STATUS-p1.md", "## DONE\nnot yet — written here only when complete\n")
|
||||
self.assertFalse(agents.phase_done(self.cfg, "STATUS-p1.md"))
|
||||
|
||||
def test_phase_done_false_when_no_marker(self):
|
||||
self._status("STATUS-p1.md", "## In progress\nworking\n")
|
||||
self.assertFalse(agents.phase_done(self.cfg, "STATUS-p1.md"))
|
||||
|
||||
def test_phase_done_false_when_file_missing(self):
|
||||
self.assertFalse(agents.phase_done(self.cfg, "STATUS-nope.md"))
|
||||
|
||||
def test_cur_idx_reads_state_file(self):
|
||||
Path(agents.phase_idx_file(self.cfg)).write_text("1")
|
||||
self.assertEqual(agents.cur_idx(self.cfg), 1)
|
||||
|
||||
def test_advance_on_done(self):
|
||||
Path(agents.phase_idx_file(self.cfg)).write_text("0")
|
||||
self._status("STATUS-p1.md", "## DONE\nverified\n")
|
||||
advanced = agents.phase_advance_check(self.cfg)
|
||||
self.assertTrue(advanced)
|
||||
self.assertEqual(agents.cur_idx(self.cfg), 1) # moved to p2
|
||||
self.assertIn("stop", self.calls)
|
||||
self.assertIn("start", self.calls)
|
||||
|
||||
def test_no_advance_when_not_done(self):
|
||||
Path(agents.phase_idx_file(self.cfg)).write_text("0")
|
||||
self._status("STATUS-p1.md", "## In progress\n")
|
||||
self.assertFalse(agents.phase_advance_check(self.cfg))
|
||||
self.assertEqual(agents.cur_idx(self.cfg), 0)
|
||||
self.assertEqual(self.calls, [])
|
||||
|
||||
def test_sequence_complete_idempotent(self):
|
||||
Path(agents.phase_idx_file(self.cfg)).write_text("1") # last phase
|
||||
self._status("STATUS-p2.md", "## DONE\nverified\n")
|
||||
marker = Path(self.cfg["log_dir"]) / "SEQUENCE-COMPLETE"
|
||||
# first call: completes the sequence
|
||||
self.assertTrue(agents.phase_advance_check(self.cfg))
|
||||
self.assertTrue(marker.exists())
|
||||
self.assertEqual(self.calls.count("stop"), 1)
|
||||
# second call: idempotent — no re-stop, returns False
|
||||
self.assertFalse(agents.phase_advance_check(self.cfg))
|
||||
self.assertEqual(self.calls.count("stop"), 1)
|
||||
|
||||
def test_append_phase_clears_marker_and_resumes(self):
|
||||
# simulate "sequence already complete", then a 3rd phase appended to the config
|
||||
Path(agents.phase_idx_file(self.cfg)).write_text("1")
|
||||
self._status("STATUS-p2.md", "## DONE\nverified\n")
|
||||
marker = Path(self.cfg["log_dir"]) / "SEQUENCE-COMPLETE"
|
||||
marker.write_text("stale completion\n")
|
||||
self.cfg["loop"]["phases"].append(
|
||||
{"id": "p3", "plan": "PLAN3.md", "status": "STATUS-p3.md"})
|
||||
advanced = agents.phase_advance_check(self.cfg)
|
||||
self.assertTrue(advanced)
|
||||
self.assertEqual(agents.cur_idx(self.cfg), 2) # resumed onto p3
|
||||
self.assertFalse(marker.exists()) # stale marker cleared
|
||||
self.assertIn("start", self.calls)
|
||||
|
||||
def test_custom_done_marker(self):
|
||||
self.cfg["loop"]["done_marker"] = "## SHIPPED"
|
||||
self._status("STATUS-p1.md", "## SHIPPED\nverified\n")
|
||||
self.assertTrue(agents.phase_done(self.cfg, "STATUS-p1.md"))
|
||||
self.assertFalse(agents.phase_done(self.cfg, "STATUS-p2.md"))
|
||||
|
||||
|
||||
# ── usage-limit banner reset parsing ───────────────────────────────────────────────
|
||||
|
||||
class TestLimitParsing(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp(prefix="aotest-ut-")
|
||||
self.cfg = agents.load_config(_make_project(self.tmp))
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tmp, ignore_errors=True)
|
||||
|
||||
def test_parse_reset_pm(self):
|
||||
ep = agents._parse_reset_epoch("You've hit your limit · resets at 10pm")
|
||||
self.assertIsNotNone(ep)
|
||||
self.assertEqual(datetime.fromtimestamp(ep).hour, 22)
|
||||
|
||||
def test_parse_reset_am_with_minutes(self):
|
||||
ep = agents._parse_reset_epoch("resets 3:30am")
|
||||
self.assertIsNotNone(ep)
|
||||
dt = datetime.fromtimestamp(ep)
|
||||
self.assertEqual((dt.hour, dt.minute), (3, 30))
|
||||
|
||||
def test_parse_reset_12am_is_midnight(self):
|
||||
ep = agents._parse_reset_epoch("resets at 12am")
|
||||
self.assertEqual(datetime.fromtimestamp(ep).hour, 0)
|
||||
|
||||
def test_parse_reset_invalid_hour_none(self):
|
||||
self.assertIsNone(agents._parse_reset_epoch("resets at 25"))
|
||||
|
||||
def test_parse_reset_no_match_none(self):
|
||||
self.assertIsNone(agents._parse_reset_epoch("everything is fine here"))
|
||||
|
||||
def test_parse_reset_picks_last_match(self):
|
||||
ep = agents._parse_reset_epoch("resets at 9am ... actually resets at 11am")
|
||||
self.assertEqual(datetime.fromtimestamp(ep).hour, 11)
|
||||
|
||||
def test_next_limit_until_unparsable_fallback(self):
|
||||
now = time.time()
|
||||
until, parsed = agents._next_limit_until(self.cfg, "limit reached, no time given", now)
|
||||
self.assertFalse(parsed)
|
||||
self.assertEqual(int(until), int(now + 300)) # limit_probe_fallback
|
||||
|
||||
def test_next_limit_until_within_window_uses_banner(self):
|
||||
now = time.time()
|
||||
t = datetime.now() + timedelta(hours=2)
|
||||
h12 = t.hour % 12 or 12
|
||||
ampm = "am" if t.hour < 12 else "pm"
|
||||
banner = f"weekly limit · resets at {h12}:{t.minute:02d}{ampm}"
|
||||
until, parsed = agents._next_limit_until(self.cfg, banner, now)
|
||||
self.assertTrue(parsed)
|
||||
self.assertGreater(until, now)
|
||||
self.assertLessEqual(until - now, 6 * 3600 + 60) # within 6h window (+slack)
|
||||
|
||||
def test_next_limit_until_far_future_falls_back(self):
|
||||
now = time.time()
|
||||
t = datetime.now() + timedelta(hours=7) # > 6h window
|
||||
h12 = t.hour % 12 or 12
|
||||
ampm = "am" if t.hour < 12 else "pm"
|
||||
banner = f"limit · resets at {h12}:{t.minute:02d}{ampm}"
|
||||
until, parsed = agents._next_limit_until(self.cfg, banner, now)
|
||||
self.assertFalse(parsed)
|
||||
self.assertEqual(int(until), int(now + 300))
|
||||
|
||||
|
||||
# ── stall / WAITING-UNTIL parsing ──────────────────────────────────────────────────
|
||||
|
||||
class TestWaitingUntil(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp(prefix="aotest-ut-")
|
||||
self.cfg = agents.load_config(_make_project(self.tmp))
|
||||
self.claude_agent = self.cfg["agents"]["cl"] # non-footer backend
|
||||
self.oc_agent = self.cfg["agents"]["oc"] # footer_ui backend
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tmp, ignore_errors=True)
|
||||
|
||||
def test_non_footer_finds_marker_anywhere(self):
|
||||
pane = "blah blah\nWAITING-UNTIL: 2030-06-13T12:00:00Z\nmore output after\n"
|
||||
ep = agents._parse_waiting_until(self.cfg, self.claude_agent, pane)
|
||||
self.assertIsNotNone(ep)
|
||||
self.assertEqual(ep, datetime.fromisoformat("2030-06-13T12:00:00+00:00").timestamp())
|
||||
|
||||
def test_non_footer_none_without_marker(self):
|
||||
self.assertIsNone(agents._parse_waiting_until(
|
||||
self.cfg, self.claude_agent, "just working, no marker"))
|
||||
|
||||
def test_footer_requires_marker_as_last_line(self):
|
||||
# marker present but NOT the last non-empty line → ignored for a footer UI
|
||||
pane = "WAITING-UNTIL: 2030-06-13T12:00:00Z\n ▣ Build · GPT · 2m 19s\n"
|
||||
self.assertIsNone(agents._parse_waiting_until(self.cfg, self.oc_agent, pane))
|
||||
|
||||
def test_footer_honors_marker_when_last_line(self):
|
||||
pane = "some work\nWAITING-UNTIL: 2030-06-13T12:00:00Z\n\n"
|
||||
ep = agents._parse_waiting_until(self.cfg, self.oc_agent, pane)
|
||||
self.assertIsNotNone(ep)
|
||||
|
||||
def test_bad_timestamp_none(self):
|
||||
self.assertIsNone(agents._parse_waiting_until(
|
||||
self.cfg, self.claude_agent, "WAITING-UNTIL: not-a-time"))
|
||||
|
||||
|
||||
# ── backend activity detectors (claude + opencode footers) ──────────────────────────
|
||||
|
||||
class TestActivityDetection(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmp = tempfile.mkdtemp(prefix="aotest-ut-")
|
||||
self.cfg = agents.load_config(_make_project(self.tmp))
|
||||
self.claude_agent = self.cfg["agents"]["cl"]
|
||||
self.oc_agent = self.cfg["agents"]["oc"]
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tmp, ignore_errors=True)
|
||||
|
||||
# claude: non-footer, active_re matched anywhere in the pane
|
||||
def test_claude_active_esc_to_interrupt(self):
|
||||
self.assertTrue(agents.pane_active(
|
||||
self.cfg, self.claude_agent, "thinking...\n esc to interrupt", use_log=False))
|
||||
|
||||
def test_claude_active_running_tool(self):
|
||||
self.assertTrue(agents.pane_active(
|
||||
self.cfg, self.claude_agent, "Running tool: Bash", use_log=False))
|
||||
|
||||
def test_claude_active_spinner_dot_count(self):
|
||||
self.assertTrue(agents.pane_active(
|
||||
self.cfg, self.claude_agent, "Compiling · 137 tokens", use_log=False))
|
||||
|
||||
def test_claude_idle_is_not_active(self):
|
||||
self.assertFalse(agents.pane_active(
|
||||
self.cfg, self.claude_agent, "Done.\n> ", use_log=False))
|
||||
|
||||
# opencode: footer_ui — only the bottom rows count as activity
|
||||
def test_opencode_active_footer(self):
|
||||
pane = "~ Preparing patch...\n ⬝⬝■ esc interrupt 137.6K\n"
|
||||
self.assertTrue(agents.pane_active(self.cfg, self.oc_agent, pane, use_log=False))
|
||||
|
||||
def test_opencode_idle_footer_not_active(self):
|
||||
pane = " ▣ Build · GPT-5.4 · 2m 19s\n 178.4K (17%) ctrl+p commands\n"
|
||||
self.assertFalse(agents.pane_active(self.cfg, self.oc_agent, pane, use_log=False))
|
||||
|
||||
def test_opencode_active_only_at_top_is_ignored(self):
|
||||
# active marker far above the bottom 10 lines → a footer UI ignores it
|
||||
pane = "running tool now\n" + "\n".join(f"line {i}" for i in range(20)) + \
|
||||
"\n ▣ Build · GPT · idle\n"
|
||||
self.assertFalse(agents.pane_active(self.cfg, self.oc_agent, pane, use_log=False))
|
||||
|
||||
def test_opencode_log_grace_fallback(self):
|
||||
# idle footer, but a freshly-touched session log within the grace window → active
|
||||
idle = " ▣ Build · GPT · idle\n 178K (17%) ctrl+p\n"
|
||||
logp = agents._session_log_path(self.cfg, self.oc_agent["session"])
|
||||
logp.parent.mkdir(parents=True, exist_ok=True)
|
||||
logp.write_text("recent activity\n") # mtime = now
|
||||
self.assertTrue(agents.pane_active(self.cfg, self.oc_agent, idle, use_log=True))
|
||||
# remove the log → no fallback → idle footer reads as not active
|
||||
logp.unlink()
|
||||
self.assertFalse(agents.pane_active(self.cfg, self.oc_agent, idle, use_log=True))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main(verbosity=2)
|
||||
Reference in New Issue
Block a user