175 lines
8.1 KiB
Python
175 lines
8.1 KiB
Python
"""L5 lint rung — run `abra recipe lint` against the exact ref under test (phase lvl5).
|
|
|
|
Executor + classifier for the fifth ladder rung. Design constraints (plan-phase-lvl5 §2):
|
|
|
|
- **Lints the recipe's CONTENT, not the harness plumbing.** abra lint reads every
|
|
`compose*.yml` in the tree (including the CI's untracked install_steps overlays) and
|
|
force-fetches tags from `origin` (which on PR runs is the private mirror, unauthenticated
|
|
here → FATA). Both are harness artifacts, so the executor lints a PRISTINE scratch clone of
|
|
the per-run tree, checked out at the exact tested ref: `origin` becomes a local path (tag
|
|
fetch works offline, no auth) and the run's true tag set rides along (fetch_recipe pulls the
|
|
upstream version tags into the per-run tree). No lint rule is filtered or ignored.
|
|
- **rc is not the verdict.** `abra recipe lint` exits non-zero only when it cannot lint
|
|
(FATA); rule outcomes live in its table — error-severity ❌ rows print a trailing
|
|
"WARN critical errors present …" sentinel but still exit 0. So the classifier parses the
|
|
table: FAIL iff an error-severity rule is unsatisfied (or the FATA is content-attributable:
|
|
"unable to validate recipe" — the recipe config itself is invalid). PASS iff the table
|
|
rendered and no error rule failed. ANYTHING else — timeout, abra/script missing, tag-fetch
|
|
FATA, unparseable output — is "unver": loud, never a silent pass, never an intentional skip.
|
|
- **Best-effort + time-bounded.** Hard ~60s timeout (observed runtime ≈0.7s); the caller
|
|
wraps run_lint in try/except besides — a wedged lint can never hang or fail a run, and the
|
|
run VERDICT is untouched by any lint outcome (lint is a level rung, not a gate).
|
|
- Full command output (+ cmd, rc, ref header) is captured to `lint.txt` in the run artifact
|
|
dir; results.json carries status + short excerpt (failing rule ids).
|
|
|
|
abra needs a PTY even with -n ("inappropriate ioctl on device") → run via util-linux
|
|
`script -qec`, same trick as harness.abra._run_pty.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
import shlex
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
|
|
from . import abra
|
|
|
|
LINT_TIMEOUT = 60 # hard budget, seconds; observed ~0.7s per recipe
|
|
|
|
# Strip ANSI escape sequences from PTY output before parsing.
|
|
_ANSI = re.compile(r"\x1b\[[0-9;?]*[A-Za-z]")
|
|
|
|
# A table row: ┃ R014 ┃ description ┃ error ┃ ✅/❌ ┃ skipped ┃ how-to-fix ┃ — abra renders the
|
|
# grid with HEAVY box-drawing verticals (┃ U+2503); accept the light variant (│ U+2502) too.
|
|
_ROW = re.compile(
|
|
r"^\s*[│┃]\s*(R\d+)\s*[│┃](.*?)[│┃]\s*(warn|error)\s*[│┃]\s*(✅|❌)\s*[│┃]\s*([^│┃]*)[│┃]"
|
|
)
|
|
|
|
# abra's trailing sentinel when any error-severity rule is unsatisfied (cross-check only).
|
|
_SENTINEL = "critical errors present"
|
|
|
|
# FATA classes that are the RECIPE's fault (its config cannot even be validated) — a lint
|
|
# FAIL, not an unverified rung. Everything else non-zero is environmental → unver.
|
|
_CONTENT_FATA = "unable to validate recipe"
|
|
|
|
|
|
def parse_table(output: str) -> list[dict]:
|
|
"""Parse the lint table → rows {rule, desc, severity, satisfied(bool), skipped(bool)}.
|
|
Tolerant: lines that don't match are ignored; returns [] when no table rendered."""
|
|
rows = []
|
|
for line in _ANSI.sub("", output).replace("\r", "\n").splitlines():
|
|
m = _ROW.match(line)
|
|
if not m:
|
|
continue
|
|
rule, desc, severity, mark, skipped = m.groups()
|
|
rows.append(
|
|
{
|
|
"rule": rule,
|
|
"desc": desc.strip(),
|
|
"severity": severity,
|
|
"satisfied": mark == "✅",
|
|
"skipped": skipped.strip() not in ("", "-"),
|
|
}
|
|
)
|
|
return rows
|
|
|
|
|
|
def classify(rc: int | None, output: str) -> tuple[str, str, list[str]]:
|
|
"""(status, detail, failed_rule_ids) from a finished lint invocation.
|
|
|
|
status ∈ {"pass","fail","unver"}; never a silent pass: pass requires a parsed table with
|
|
zero unsatisfied error-severity rules AND no sentinel. `rc=None` means the run itself blew
|
|
up (timeout/missing binary) — always unver; the caller supplies the detail.
|
|
"""
|
|
if rc is None:
|
|
return "unver", "lint did not run", []
|
|
if rc != 0:
|
|
first = next((ln for ln in _ANSI.sub("", output).splitlines() if "FATA" in ln), "").strip()
|
|
if _CONTENT_FATA in output:
|
|
# The recipe config itself failed validation — attributable to recipe content.
|
|
return "fail", first or "recipe config failed validation", []
|
|
return "unver", first or f"abra recipe lint exited {rc} with no table", []
|
|
rows = parse_table(output)
|
|
if not rows:
|
|
return "unver", "no lint table in output (rc=0)", []
|
|
failed = [
|
|
r["rule"]
|
|
for r in rows
|
|
if r["severity"] == "error" and not r["satisfied"] and not r["skipped"]
|
|
]
|
|
if failed:
|
|
return "fail", f"error rule(s) unsatisfied: {', '.join(failed)}", failed
|
|
if _SENTINEL in output:
|
|
# abra says critical errors but our parse found none — distrust the parse, never inflate.
|
|
return "fail", "abra reported critical errors (table parse found none)", []
|
|
return "pass", "", []
|
|
|
|
|
|
def run_lint(recipe: str, ref: str | None, out_dir: str | None) -> dict:
|
|
"""Execute the lint rung for `recipe` at exactly `ref` (a sha; None → the per-run tree's
|
|
current HEAD). Returns {"status","detail","rules_failed"} and writes lint.txt into
|
|
`out_dir` (when given). Never raises: every failure mode is caught into status "unver"."""
|
|
scratch = None
|
|
rc: int | None = None
|
|
output = ""
|
|
try:
|
|
src_tree = abra.recipe_dir(recipe)
|
|
scratch = tempfile.mkdtemp(prefix="ccci-lint-")
|
|
lint_abra = os.path.join(scratch, "abra")
|
|
os.makedirs(os.path.join(lint_abra, "recipes"))
|
|
clone = os.path.join(lint_abra, "recipes", recipe)
|
|
subprocess.run(
|
|
["git", "clone", "--quiet", src_tree, clone],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=LINT_TIMEOUT,
|
|
)
|
|
if ref:
|
|
subprocess.run(
|
|
["git", "-C", clone, "checkout", "-f", "--quiet", ref],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=LINT_TIMEOUT,
|
|
)
|
|
# catalogue: R006 (published catalogue version) reads it; servers: harmless, some abra
|
|
# paths stat it. Symlink the live ones (read-only use).
|
|
for shared in ("catalogue", "servers"):
|
|
src = os.path.join(abra.abra_dir(), shared)
|
|
if os.path.exists(src):
|
|
os.symlink(os.path.realpath(src), os.path.join(lint_abra, shared))
|
|
env = dict(os.environ, ABRA_DIR=lint_abra)
|
|
proc = subprocess.run(
|
|
["script", "-qec", f"abra recipe lint -n {shlex.quote(recipe)}", "/dev/null"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=LINT_TIMEOUT,
|
|
env=env,
|
|
)
|
|
rc, output = proc.returncode, proc.stdout + proc.stderr
|
|
status, detail, failed = classify(rc, output)
|
|
except subprocess.TimeoutExpired:
|
|
status, detail, failed = "unver", f"lint timed out after {LINT_TIMEOUT}s", []
|
|
except Exception as e: # noqa: BLE001 — rung must never break the run; unver is the honest floor
|
|
status, detail, failed = "unver", f"lint executor error: {e.__class__.__name__}: {e}", []
|
|
finally:
|
|
if scratch:
|
|
shutil.rmtree(scratch, ignore_errors=True)
|
|
if status == "unver":
|
|
print(f"!! lint rung UNVERIFIED for {recipe}: {detail}", flush=True)
|
|
if out_dir:
|
|
try:
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
with open(os.path.join(out_dir, "lint.txt"), "w", encoding="utf-8") as f:
|
|
f.write(
|
|
f"$ abra recipe lint -n {recipe} (ref={ref or 'HEAD'})\n"
|
|
f"rc={rc} status={status} {detail}\n\n{output}"
|
|
)
|
|
except OSError as e:
|
|
print(f" lint: could not write lint.txt (non-fatal): {e}", flush=True)
|
|
return {"status": status, "detail": detail, "rules_failed": failed}
|