style: repo-wide lint pass — make the lint gate green again

Push builds have been RED on the lint step since ~build 209 from accumulated
formatting drift. This is the mechanical cleanup: ruff format + ruff --fix
(UP038 isinstance unions, SIM105 contextlib.suppress, UP031 f-strings, SIM115
tempfile context manager), shfmt -i 2 -ci, nixpkgs-fmt/statix/deadnix (merged
attrsets, dropped unused lib args), yamllint, and shell quoting fixes in
tests/lasuite-docs/setup_custom_tests.sh. No behaviour changes intended;
lint: PASS, unit tests: 138 passed.
This commit is contained in:
2026-06-09 21:56:15 +00:00
parent e76d4005ab
commit 9a7772563a
115 changed files with 952 additions and 660 deletions

View File

@ -168,7 +168,9 @@ def secret_generate(domain: str, timeout: int = 300) -> None:
)
def deploy(domain: str, chaos: bool = True, timeout: int = 900, no_converge_checks: bool = False) -> None:
def deploy(
domain: str, chaos: bool = True, timeout: int = 900, no_converge_checks: bool = False
) -> None:
args = ["app", "deploy", domain, "-o", "-n"]
if chaos:
args.append("-C")
@ -203,7 +205,10 @@ def backup_create(domain: str, timeout: int = 900) -> str:
# remote and fails "authentication required: Unauthorized". Returns the captured output, whose
# restic JSON summary line carries the produced "snapshot_id" (the backup artifact, DG3) — note
# `abra app backup snapshots` needs a TTY and is awkward to script, so we read the create output.
out = _run_pty(["app", "backup", "create", domain, "-n", "-C", "-o"], timeout=timeout).stdout or ""
out = (
_run_pty(["app", "backup", "create", domain, "-n", "-C", "-o"], timeout=timeout).stdout
or ""
)
# Echo the backup output (incl. backupbot's pre-hook run / any "Failed to run command" or
# "Container ... not running" ERROR) into the run log. Backup is otherwise opaque: a pre-hook that
# fails to register/run leaves the DB dump out of the snapshot, surfacing only as a downstream

View File

@ -13,8 +13,15 @@ from __future__ import annotations
import time
def goto_with_retry(page, url, *, deadline_seconds: int = 120, accept_statuses=(200, 304),
goto_timeout_ms: int = 30_000, wait_until: str = "domcontentloaded"):
def goto_with_retry(
page,
url,
*,
deadline_seconds: int = 120,
accept_statuses=(200, 304),
goto_timeout_ms: int = 30_000,
wait_until: str = "domcontentloaded",
):
"""Poll `page.goto(url)` until status is in `accept_statuses` OR the deadline expires.
Returns the final Playwright response. Raises AssertionError if the deadline expires without

View File

@ -55,7 +55,9 @@ def enrolled_recipes() -> list[str]:
out = []
try:
for name in sorted(os.listdir(tests_dir)):
if os.path.isfile(os.path.join(tests_dir, name, "recipe_meta.py")) and is_enrolled(name):
if os.path.isfile(os.path.join(tests_dir, name, "recipe_meta.py")) and is_enrolled(
name
):
out.append(name)
except OSError:
pass
@ -122,11 +124,15 @@ def deploy_canonical(recipe: str, timeout: int = 900) -> None:
abra.recipe_checkout(recipe, version)
r = subprocess.run(
["abra", "app", "deploy", domain, version, "-o", "-n", "-f"],
capture_output=True, text=True, timeout=timeout,
capture_output=True,
text=True,
timeout=timeout,
)
if r.returncode != 0:
raise RuntimeError(f"deploy canonical {domain} {version} failed: "
f"{(r.stderr + ' ' + r.stdout).strip()[:300]}")
raise RuntimeError(
f"deploy canonical {domain} {version} failed: "
f"{(r.stderr + ' ' + r.stdout).strip()[:300]}"
)
_set_status(recipe, "warm")

View File

@ -148,7 +148,9 @@ RUNG_LABEL = {
"backup_restore": "backup/restore",
"functional": "functional",
}
SKIP_GREEN = "#57ab5a" # muted green — an intentional skip reads like a pass (but labelled, never inflating)
SKIP_GREEN = (
"#57ab5a" # muted green — an intentional skip reads like a pass (but labelled, never inflating)
)
def _skip_rows(skips: dict) -> str:
@ -159,14 +161,16 @@ def _skip_rows(skips: dict) -> str:
for rung, reason in (skips.get("intentional") or {}).items():
rows.append(
f'<tr class="stage"><td colspan="2"><span class="mark" style="color:{SKIP_GREEN}">⊘</span>'
f'<b>{html.escape(RUNG_LABEL.get(rung, rung))}</b></td>'
f"<b>{html.escape(RUNG_LABEL.get(rung, rung))}</b></td>"
f'<td class="st" style="color:{SKIP_GREEN}">intentional skip</td></tr>'
)
rows.append(f'<tr class="skipreason"><td></td><td colspan="2">{html.escape(reason)}</td></tr>')
rows.append(
f'<tr class="skipreason"><td></td><td colspan="2">{html.escape(reason)}</td></tr>'
)
for rung in skips.get("unintentional") or []:
rows.append(
f'<tr class="stage"><td colspan="2"><span class="mark" style="color:{GAP_COLOR}">⊘</span>'
f'<b>{html.escape(RUNG_LABEL.get(rung, rung))}</b></td>'
f"<b>{html.escape(RUNG_LABEL.get(rung, rung))}</b></td>"
f'<td class="st" style="color:{GAP_COLOR}">unintentional skip</td></tr>'
)
rows.append(

View File

@ -28,7 +28,7 @@ from __future__ import annotations
import contextlib
import json
import os
from typing import Iterable
from collections.abc import Iterable
from . import lifecycle, naming
@ -36,9 +36,7 @@ from . import lifecycle, naming
def declared_deps(recipe: str) -> list[str]:
"""Read `DEPS` from `tests/<recipe>/recipe_meta.py` — a list of recipe names this recipe needs
deployed alongside it. Returns [] if none."""
path = os.path.join(
os.path.dirname(__file__), "..", "..", "tests", recipe, "recipe_meta.py"
)
path = os.path.join(os.path.dirname(__file__), "..", "..", "tests", recipe, "recipe_meta.py")
if not os.path.exists(path):
return []
ns: dict = {}

View File

@ -222,7 +222,11 @@ def assert_restore_healthy(domain: str, meta: dict) -> None:
def perform_upgrade(
domain: str, recipe: str, head_ref: str | None, deploy_timeout: int = 900, meta: dict | None = None
domain: str,
recipe: str,
head_ref: str | None,
deploy_timeout: int = 900,
meta: dict | None = None,
) -> dict[str, str | None]:
"""Perform the UPGRADE op once, in place, to the PR-HEAD code under test (HC1): re-checkout the
PR head (the prev-tag base deploy reset the recipe working tree), then `abra app deploy --chaos`
@ -267,7 +271,9 @@ def perform_upgrade(
deploy_timeout=int(meta.get("DEPLOY_TIMEOUT", deploy_timeout)),
http_timeout=int(meta.get("HTTP_TIMEOUT", 300)),
)
lifecycle.wait_ready_probes(meta, domain, timeout=int(meta.get("DEPLOY_TIMEOUT", deploy_timeout)))
lifecycle.wait_ready_probes(
meta, domain, timeout=int(meta.get("DEPLOY_TIMEOUT", deploy_timeout))
)
after = lifecycle.deployed_identity(domain)
# Evidence (HC1): the chaos-version label = the deployed recipe commit; it should match the
# PR-head we checked out — proving the upgrade deployed the code under test, not a published tag.

View File

@ -73,7 +73,7 @@ def http_post(
`data` is JSON-encoded if content_type='application/json',
form-encoded if 'application/x-www-form-urlencoded' (the OIDC token endpoint form),
or sent raw bytes if data is already bytes."""
if isinstance(data, (bytes, bytearray)):
if isinstance(data, bytes | bytearray):
body: bytes | None = bytes(data)
elif content_type == "application/json" and data is not None:
body = json.dumps(data).encode()
@ -107,7 +107,7 @@ def http_request(
) -> tuple[int, object | None]:
"""Arbitrary-method HTTP (PUT/DELETE/PATCH) for parity tests that mutate. Same shape as
http_post (returns (status, json_or_None))."""
if isinstance(data, (bytes, bytearray)):
if isinstance(data, bytes | bytearray):
body: bytes | None = bytes(data)
elif content_type == "application/json" and data is not None:
body = json.dumps(data).encode()
@ -142,7 +142,7 @@ def post_with_headers(
"""Like http_post but ALSO returns the response headers as a dict — for APIs that hand back an
auth token in a response header rather than the body (e.g. mattermost login → `Token` header).
Returns (status, parsed_json_or_None, response_headers). status=0 + {} on transport failure."""
if isinstance(data, (bytes, bytearray)):
if isinstance(data, bytes | bytearray):
body: bytes | None = bytes(data)
elif content_type == "application/json" and data is not None:
body = json.dumps(data).encode()
@ -252,13 +252,16 @@ def retry_http_post(
) -> tuple[int, object | None]:
"""POST with retry until expect_fn(status, json) is truthy. Defaults to any 2xx."""
if expect_fn is None:
def expect_fn(s, _j): # noqa: ARG001
return 200 <= s < 300
result: list[tuple[int, object | None]] = [(0, None)]
def _check():
s, j = http_post(url, data=data, headers=headers, content_type=content_type, timeout=timeout)
s, j = http_post(
url, data=data, headers=headers, content_type=content_type, timeout=timeout
)
result[0] = (s, j)
return expect_fn(s, j)

View File

@ -113,7 +113,9 @@ def _assert_undeployed(domain: str) -> None:
)
def snapshot(recipe: str, domain: str, commit: str | None = None, version: str | None = None) -> dict:
def snapshot(
recipe: str, domain: str, commit: str | None = None, version: str | None = None
) -> dict:
"""Take a last-known-good snapshot of every data volume of <domain>'s stack. The app MUST be
undeployed. Atomically replaces the prior last-good. Returns the written meta dict."""
_assert_undeployed(domain)
@ -169,7 +171,9 @@ def restore(recipe: str, domain: str) -> dict:
for vol in meta.get("volumes", []):
tar_path = os.path.join(volumes_dir(recipe), f"{vol}.tar")
if vol not in current:
raise SnapshotError(f"snapshot volume {vol} absent from current stack {sorted(current)}")
raise SnapshotError(
f"snapshot volume {vol} absent from current stack {sorted(current)}"
)
mp = _volume_mountpoint(vol)
# Clear the volume contents (incl. dotfiles) without removing the mountpoint itself.
r = _run(["sh", "-c", f'rm -rf -- "{mp}"/* "{mp}"/.[!.]* "{mp}"/..?* 2>/dev/null; true'])

View File

@ -60,14 +60,17 @@ def sweep() -> int:
for r in recipes:
print(f"\n===== nightly: full-cold {r} (latest) =====", flush=True)
env = dict(os.environ, RECIPE=r)
env.pop("REF", None) # latest, not a PR head
env.pop("REF", None) # latest, not a PR head
env.pop("CCCI_QUICK", None)
env.pop("MODE", None)
rc = subprocess.run(
[sys.executable, os.path.join(_here(), "run_recipe_ci.py")], env=env
).returncode
results[r] = rc
print(f"nightly: {r} rc={rc} ({'green→canonical refreshed' if rc == 0 else 'red'})", flush=True)
print(
f"nightly: {r} rc={rc} ({'green→canonical refreshed' if rc == 0 else 'red'})",
flush=True,
)
# WC8 disk hygiene: drop warm data for de-enrolled canonicals; log the disk budget.
pruned = canonical.prune_stale()
if pruned:

View File

@ -43,11 +43,16 @@ def _traefik_setup(recipe: str, domain: str, version: str) -> None:
ssl_cert/ssl_key swarm secrets; NO ACME). Uses the proven abra.env_set (newline-safe, unlike the
bash set_env that bit keycloak)."""
cert_dir = "/var/lib/ci-certs/live"
if not (os.path.isfile(f"{cert_dir}/fullchain.pem") and os.path.isfile(f"{cert_dir}/privkey.pem")):
if not (
os.path.isfile(f"{cert_dir}/fullchain.pem") and os.path.isfile(f"{cert_dir}/privkey.pem")
):
raise RuntimeError(f"FATAL: wildcard cert missing at {cert_dir} (sops decrypt broken?)")
if not os.path.isfile(env_file(domain)):
_run(["abra", "app", "new", recipe, "-s", "default", "-D", domain, version, "-o", "-n"],
timeout=120, check=True)
_run(
["abra", "app", "new", recipe, "-s", "default", "-D", domain, version, "-o", "-n"],
timeout=120,
check=True,
)
abra.env_set(domain, "DOMAIN", domain)
abra.env_set(domain, "LETS_ENCRYPT_ENV", "")
abra.env_set(domain, "WILDCARDS_ENABLED", "1")
@ -61,11 +66,39 @@ def _traefik_setup(recipe: str, domain: str, version: str) -> None:
return any(s.endswith(f"_{name}_v1") for s in have)
if not _has("ssl_cert"):
_run(["abra", "app", "secret", "insert", domain, "ssl_cert", "v1",
f"{cert_dir}/fullchain.pem", "-f", "-n"], timeout=120, check=True)
_run(
[
"abra",
"app",
"secret",
"insert",
domain,
"ssl_cert",
"v1",
f"{cert_dir}/fullchain.pem",
"-f",
"-n",
],
timeout=120,
check=True,
)
if not _has("ssl_key"):
_run(["abra", "app", "secret", "insert", domain, "ssl_key", "v1",
f"{cert_dir}/privkey.pem", "-f", "-n"], timeout=120, check=True)
_run(
[
"abra",
"app",
"secret",
"insert",
domain,
"ssl_key",
"v1",
f"{cert_dir}/privkey.pem",
"-f",
"-n",
],
timeout=120,
check=True,
)
SPECS: dict[str, dict] = {
@ -218,8 +251,17 @@ def health_code(spec: dict) -> int:
domain = spec.get("health_domain", spec["domain"])
r = _run(
[
"curl", "-sk", "-o", "/dev/null", "-w", "%{http_code}", "--max-time", "10",
"--resolve", f"{domain}:443:127.0.0.1", f"https://{domain}{spec['health_path']}",
"curl",
"-sk",
"-o",
"/dev/null",
"-w",
"%{http_code}",
"--max-time",
"10",
"--resolve",
f"{domain}:443:127.0.0.1",
f"https://{domain}{spec['health_path']}",
],
timeout=20,
)
@ -230,7 +272,6 @@ def health_code(spec: dict) -> int:
def wait_healthy(spec: dict, timeout: int | None = None) -> bool:
domain = spec["domain"]
deadline = time.time() + (timeout or spec["health_timeout"])
while time.time() < deadline:
if health_code(spec) in tuple(spec["health_ok"]):
@ -325,15 +366,18 @@ def ensure_server() -> None:
def ensure_app_config(recipe: str, domain: str, version: str) -> None:
if not os.path.isfile(env_file(domain)):
_run(["abra", "app", "new", recipe, "-s", "default", "-D", domain, version, "-o", "-n"],
timeout=120, check=True)
_run(
["abra", "app", "new", recipe, "-s", "default", "-D", domain, version, "-o", "-n"],
timeout=120,
check=True,
)
abra.env_set(domain, "DOMAIN", domain)
abra.env_set(domain, "LETS_ENCRYPT_ENV", "")
def ensure_secrets(domain: str) -> None:
stack = lifecycle._stack_name(domain) # noqa: SLF001
have = {n for n in lifecycle._docker_names("secret", stack)} # noqa: SLF001
have = set(lifecycle._docker_names("secret", stack)) # noqa: SLF001
if not any(n.endswith("_admin_password_v1") for n in have):
abra.secret_generate(domain)
@ -393,8 +437,9 @@ def reconcile(app: str) -> str:
write_alert(app, "held-major", current=current, latest=latest, release_notes=notes[:4000])
return f"held-major:{current}->{latest}"
if notes_flag_manual_migration(notes):
write_alert(app, "held-manual-migration", current=current, latest=latest,
release_notes=notes[:4000])
write_alert(
app, "held-manual-migration", current=current, latest=latest, release_notes=notes[:4000]
)
return f"held-manual-migration:{current}->{latest}"
# WC1.1 health-gated upgrade with rollback.
@ -428,8 +473,14 @@ def reconcile(app: str) -> str:
warmsnap.restore(recipe, domain)
deploy_version(recipe, domain, last_good, dt)
recovered = wait_healthy(spec)
write_alert(app, "rollback", last_good=last_good, attempted=latest, recovered=recovered,
release_notes=notes[:2000])
write_alert(
app,
"rollback",
last_good=last_good,
attempted=latest,
recovered=recovered,
release_notes=notes[:2000],
)
if not recovered:
raise RuntimeError(f"{app} rollback to {last_good} did not become healthy")
return f"rolled-back:{latest}->{last_good}"