"""Recipe-specific keycloak admin-API helpers (not harness). Used by the upgrade/backup stages to write a real data marker (a realm) into mariadb and verify it survives upgrade / backup-restore.""" import json import ssl import sys import urllib.parse import urllib.request sys.path.insert(0, __file__.rsplit("/tests/", 1)[0] + "/runner") from harness import lifecycle # noqa: E402 MARKER_REALM = "cimarker" _CTX = ssl.create_default_context() _CTX.check_hostname = False _CTX.verify_mode = ssl.CERT_NONE def admin_password(domain: str) -> str: """Read the abra-generated admin password from inside the running keycloak container.""" return lifecycle.exec_in_app(domain, ["cat", "/run/secrets/admin_password"]).strip() def admin_token(domain: str, password: str, user: str = "admin") -> str: data = urllib.parse.urlencode( { "grant_type": "password", "client_id": "admin-cli", "username": user, "password": password, } ).encode() req = urllib.request.Request( f"https://{domain}/realms/master/protocol/openid-connect/token", data=data, headers={"Content-Type": "application/x-www-form-urlencoded"}, method="POST", ) with urllib.request.urlopen(req, timeout=30, context=_CTX) as r: return json.load(r)["access_token"] def _admin(domain, token, path, method="GET", body=None): data = json.dumps(body).encode() if body is not None else None headers = {"Authorization": "Bearer " + token} if data: headers["Content-Type"] = "application/json" req = urllib.request.Request( f"https://{domain}/admin{path}", data=data, headers=headers, method=method ) try: with urllib.request.urlopen(req, timeout=30, context=_CTX) as r: return r.status except urllib.error.HTTPError as e: return e.code def create_marker_realm(domain, token): return _admin(domain, token, "/realms", "POST", {"realm": MARKER_REALM, "enabled": True}) def marker_realm_exists(domain, token) -> bool: return _admin(domain, token, f"/realms/{MARKER_REALM}") == 200 def delete_marker_realm(domain, token): return _admin(domain, token, f"/realms/{MARKER_REALM}", "DELETE")