Files
cc-ci/tests/keycloak/kc_admin.py
autonomic-bot 8a7c0d8328
All checks were successful
continuous-integration/drone/push Build is passing
M6.5: keycloak upgrade + backup stages (DB data survival via realm marker)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 02:04:18 +01:00

58 lines
2.1 KiB
Python

"""Recipe-specific keycloak admin-API helpers (not harness). Used by the upgrade/backup stages to
write a real data marker (a realm) into mariadb and verify it survives upgrade / backup-restore."""
import json
import ssl
import sys
import urllib.parse
import urllib.request
sys.path.insert(0, __file__.rsplit("/tests/", 1)[0] + "/runner")
from harness import lifecycle # noqa: E402
MARKER_REALM = "cimarker"
_CTX = ssl.create_default_context()
_CTX.check_hostname = False
_CTX.verify_mode = ssl.CERT_NONE
def admin_password(domain: str) -> str:
"""Read the abra-generated admin password from inside the running keycloak container."""
return lifecycle.exec_in_app(domain, ["cat", "/run/secrets/admin_password"]).strip()
def admin_token(domain: str, password: str, user: str = "admin") -> str:
data = urllib.parse.urlencode({
"grant_type": "password", "client_id": "admin-cli", "username": user, "password": password,
}).encode()
req = urllib.request.Request(
f"https://{domain}/realms/master/protocol/openid-connect/token", data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"}, method="POST")
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
return json.load(r)["access_token"]
def _admin(domain, token, path, method="GET", body=None):
data = json.dumps(body).encode() if body is not None else None
headers = {"Authorization": "Bearer " + token}
if data:
headers["Content-Type"] = "application/json"
req = urllib.request.Request(f"https://{domain}/admin{path}", data=data, headers=headers,
method=method)
try:
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
return r.status
except urllib.error.HTTPError as e:
return e.code
def create_marker_realm(domain, token):
return _admin(domain, token, "/realms", "POST", {"realm": MARKER_REALM, "enabled": True})
def marker_realm_exists(domain, token) -> bool:
return _admin(domain, token, f"/realms/{MARKER_REALM}") == 200
def delete_marker_realm(domain, token):
return _admin(domain, token, f"/realms/{MARKER_REALM}", "DELETE")