"""Keycloak admin API client. Provides a KeycloakAdmin class for managing realms, OIDC clients, and users via the Keycloak REST API. Used by setup_keycloak_integration.py scripts. """ import json import urllib.error import urllib.parse import urllib.request class KeycloakAdmin: """Client for the Keycloak Admin REST API.""" def __init__(self, base_url: str, admin_user: str, admin_password: str): self.base_url = base_url.rstrip("/") self.admin_user = admin_user self.admin_password = admin_password def _request(self, method: str, path: str, *, data: dict | None = None, headers: dict | None = None, timeout: int = 30) -> tuple[int, dict | list | None]: """Make an HTTP request, return (status_code, parsed_json).""" url = f"{self.base_url}{path}" body = json.dumps(data).encode() if data is not None else None req = urllib.request.Request(url, data=body, method=method) req.add_header("Content-Type", "application/json") for k, v in (headers or {}).items(): req.add_header(k, v) try: with urllib.request.urlopen(req, timeout=timeout) as resp: raw = resp.read() if not raw: return resp.getcode(), None return resp.getcode(), json.loads(raw) except urllib.error.HTTPError as e: try: raw = e.read().decode(errors="replace") return e.code, json.loads(raw) if raw.strip() else None except Exception: return e.code, None def get_admin_token(self) -> str: """Get an admin access token from the master realm.""" url = f"{self.base_url}/realms/master/protocol/openid-connect/token" body = urllib.parse.urlencode({ "username": self.admin_user, "password": self.admin_password, "grant_type": "password", "client_id": "admin-cli", }).encode() req = urllib.request.Request(url, data=body, method="POST") req.add_header("Content-Type", "application/x-www-form-urlencoded") with urllib.request.urlopen(req, timeout=30) as resp: data = json.loads(resp.read()) return data["access_token"] def _auth_headers(self) -> dict: token = self.get_admin_token() return {"Authorization": f"Bearer {token}"} def ensure_realm(self, realm: str) -> None: """Create a realm if it doesn't exist.""" print(f"=== Ensure Keycloak realm '{realm}' ===", flush=True) headers = self._auth_headers() status, _ = self._request( "GET", f"/admin/realms/{realm}", headers=headers) if status == 404: status, _ = self._request("POST", "/admin/realms", data={ "realm": realm, "enabled": True, "registrationAllowed": False, }, headers=headers) print(f" Created realm '{realm}'", flush=True) else: print(f" Realm '{realm}' already exists, skipping", flush=True) def ensure_client(self, realm: str, client_id: str, redirect_uris: list[str], web_origins: list[str]) -> tuple[str, str]: """Create an OIDC client if needed, return (client_uuid, client_secret).""" print(f"=== Ensure OIDC client '{client_id}' ===", flush=True) headers = self._auth_headers() # Check if client exists status, data = self._request( "GET", f"/admin/realms/{realm}/clients?clientId={client_id}", headers=headers, ) if data and len(data) > 0: client_uuid = data[0]["id"] print(f" Client '{client_id}' already exists, skipping", flush=True) else: status, _ = self._request( "POST", f"/admin/realms/{realm}/clients", data={ "clientId": client_id, "enabled": True, "protocol": "openid-connect", "publicClient": False, "standardFlowEnabled": True, "directAccessGrantsEnabled": True, "serviceAccountsEnabled": True, "authorizationServicesEnabled": True, "redirectUris": redirect_uris, "webOrigins": web_origins, "attributes": {"pkce.code.challenge.method": ""}, }, headers=headers, ) print(f" Created client '{client_id}'", flush=True) # Re-fetch to get UUID headers = self._auth_headers() _, data = self._request( "GET", f"/admin/realms/{realm}/clients?clientId={client_id}", headers=headers, ) client_uuid = data[0]["id"] # Get client secret headers = self._auth_headers() _, secret_data = self._request( "GET", f"/admin/realms/{realm}/clients/{client_uuid}/client-secret", headers=headers, ) client_secret = secret_data["value"] print(f" Client UUID: {client_uuid}", flush=True) print(f" Client secret: {client_secret}", flush=True) return client_uuid, client_secret def ensure_user(self, realm: str, username: str, email: str, password: str, *, first_name: str = "Test", last_name: str = "User") -> str: """Create a user if needed, set password. Returns user ID.""" print(f"=== Ensure user '{username}' ===", flush=True) headers = self._auth_headers() _, data = self._request( "GET", f"/admin/realms/{realm}/users?username={username}", headers=headers, ) if data and len(data) > 0: user_id = data[0]["id"] print(f" User '{username}' exists ({user_id}), resetting password", flush=True) headers = self._auth_headers() self._request( "PUT", f"/admin/realms/{realm}/users/{user_id}/reset-password", data={ "type": "password", "value": password, "temporary": False, }, headers=headers, ) else: headers = self._auth_headers() self._request( "POST", f"/admin/realms/{realm}/users", data={ "username": username, "email": email, "firstName": first_name, "lastName": last_name, "emailVerified": True, "enabled": True, "credentials": [{ "type": "password", "value": password, "temporary": False, }], }, headers=headers, ) print(f" Created user '{username}'", flush=True) # Re-fetch to get ID headers = self._auth_headers() _, data = self._request( "GET", f"/admin/realms/{realm}/users?username={username}", headers=headers, ) user_id = data[0]["id"] return user_id