Files
cc-ci/tests/keycloak/kc_admin.py
autonomic-bot 2cede01ed7 style(1b): auto-format + lint-clean the whole codebase (RL1)
Mechanical, semantics-preserving cleanup so the codebase passes the new lint stage:
- ruff format: all 32 Python files (wraps long signatures, normalizes quotes/blank lines).
- nixpkgs-fmt: modules/drone-runner.nix.
- shfmt (-i 2 -ci): scripts/*.sh.

Lint fixes (reviewed, behavior-preserving — no test weakened):
- ruff SIM105: try/except-pass -> contextlib.suppress (abra.py app_config rm; lifecycle.py janitor).
- ruff SIM115: open().read() -> with open() (run_recipe_ci.py redaction-values + gitea-token).
- statix: merge repeated sops `secrets.*` keys into one `secrets = { ... }` (comments kept);
  empty fn pattern `{ ... }:` -> `_:` (packages.nix).
- deadnix: drop unused lambda args (flake `self`; configuration.nix `lib`; overlay `final` -> `_`).

Verified on cc-ci: `scripts/lint.sh` -> lint: PASS; nixosConfigurations.cc-ci evaluates;
all Python byte-compiles. The deployed bridge/dashboard/runner source changes hash (reformat),
so cc-ci will be rebuilt to the new closure in W2 before the cold D1-D10 re-verification.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 20:52:05 +01:00

68 lines
2.2 KiB
Python

"""Recipe-specific keycloak admin-API helpers (not harness). Used by the upgrade/backup stages to
write a real data marker (a realm) into mariadb and verify it survives upgrade / backup-restore."""
import json
import ssl
import sys
import urllib.parse
import urllib.request
sys.path.insert(0, __file__.rsplit("/tests/", 1)[0] + "/runner")
from harness import lifecycle # noqa: E402
MARKER_REALM = "cimarker"
_CTX = ssl.create_default_context()
_CTX.check_hostname = False
_CTX.verify_mode = ssl.CERT_NONE
def admin_password(domain: str) -> str:
"""Read the abra-generated admin password from inside the running keycloak container."""
return lifecycle.exec_in_app(domain, ["cat", "/run/secrets/admin_password"]).strip()
def admin_token(domain: str, password: str, user: str = "admin") -> str:
data = urllib.parse.urlencode(
{
"grant_type": "password",
"client_id": "admin-cli",
"username": user,
"password": password,
}
).encode()
req = urllib.request.Request(
f"https://{domain}/realms/master/protocol/openid-connect/token",
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
return json.load(r)["access_token"]
def _admin(domain, token, path, method="GET", body=None):
data = json.dumps(body).encode() if body is not None else None
headers = {"Authorization": "Bearer " + token}
if data:
headers["Content-Type"] = "application/json"
req = urllib.request.Request(
f"https://{domain}/admin{path}", data=data, headers=headers, method=method
)
try:
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
return r.status
except urllib.error.HTTPError as e:
return e.code
def create_marker_realm(domain, token):
return _admin(domain, token, "/realms", "POST", {"realm": MARKER_REALM, "enabled": True})
def marker_realm_exists(domain, token) -> bool:
return _admin(domain, token, f"/realms/{MARKER_REALM}") == 200
def delete_marker_realm(domain, token):
return _admin(domain, token, f"/realms/{MARKER_REALM}", "DELETE")