All checks were successful
continuous-integration/drone/push Build is passing
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
58 lines
2.1 KiB
Python
58 lines
2.1 KiB
Python
"""Recipe-specific keycloak admin-API helpers (not harness). Used by the upgrade/backup stages to
|
|
write a real data marker (a realm) into mariadb and verify it survives upgrade / backup-restore."""
|
|
import json
|
|
import ssl
|
|
import sys
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
sys.path.insert(0, __file__.rsplit("/tests/", 1)[0] + "/runner")
|
|
from harness import lifecycle # noqa: E402
|
|
|
|
MARKER_REALM = "cimarker"
|
|
_CTX = ssl.create_default_context()
|
|
_CTX.check_hostname = False
|
|
_CTX.verify_mode = ssl.CERT_NONE
|
|
|
|
|
|
def admin_password(domain: str) -> str:
|
|
"""Read the abra-generated admin password from inside the running keycloak container."""
|
|
return lifecycle.exec_in_app(domain, ["cat", "/run/secrets/admin_password"]).strip()
|
|
|
|
|
|
def admin_token(domain: str, password: str, user: str = "admin") -> str:
|
|
data = urllib.parse.urlencode({
|
|
"grant_type": "password", "client_id": "admin-cli", "username": user, "password": password,
|
|
}).encode()
|
|
req = urllib.request.Request(
|
|
f"https://{domain}/realms/master/protocol/openid-connect/token", data=data,
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"}, method="POST")
|
|
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
|
|
return json.load(r)["access_token"]
|
|
|
|
|
|
def _admin(domain, token, path, method="GET", body=None):
|
|
data = json.dumps(body).encode() if body is not None else None
|
|
headers = {"Authorization": "Bearer " + token}
|
|
if data:
|
|
headers["Content-Type"] = "application/json"
|
|
req = urllib.request.Request(f"https://{domain}/admin{path}", data=data, headers=headers,
|
|
method=method)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30, context=_CTX) as r:
|
|
return r.status
|
|
except urllib.error.HTTPError as e:
|
|
return e.code
|
|
|
|
|
|
def create_marker_realm(domain, token):
|
|
return _admin(domain, token, "/realms", "POST", {"realm": MARKER_REALM, "enabled": True})
|
|
|
|
|
|
def marker_realm_exists(domain, token) -> bool:
|
|
return _admin(domain, token, f"/realms/{MARKER_REALM}") == 200
|
|
|
|
|
|
def delete_marker_realm(domain, token):
|
|
return _admin(domain, token, f"/realms/{MARKER_REALM}", "DELETE")
|