keycloak-scripts/add-users-keycloak-api.py

70 lines
1.7 KiB
Python
Executable File

#!/usr/bin/env python3
# See https://python-keycloak-client.readthedocs.io/en/latest/
import json
from os import environ
from os.path import exists
from pathlib import Path
from keycloak import KeycloakAdmin
def init_keycloak():
KEYCLOAK_DOMAIN = environ.get("KEYCLOAK_DOMAIN")
KEYCLOAK_REALM = environ.get("KEYCLOAK_REALM")
KEYCLOAK_CLIENT_SECRET = environ.get("KEYCLOAK_CLIENT_SECRET")
client = KeycloakAdmin(
server_url=f"https://{KEYCLOAK_DOMAIN}/auth/",
realm_name=KEYCLOAK_REALM,
client_secret_key=KEYCLOAK_CLIENT_SECRET,
verify=True,
)
return client
def confirm():
answer = ""
while answer not in ["y", "n"]:
answer = input("OK to continue [Y/N]? ").lower()
return answer == "y"
if not exists(Path("accounts.txt").absolute()):
print("Missing accounts.txt!")
exit(1)
with open("emails.txt") as handle:
emails = handle.readlines()
keycloak = init_keycloak()
for email in emails:
username = email.split("@")[0].strip()
print(f"processing {email} now...")
print(f"deriving {username} from {email} for account creation...")
payload = {
"email": email,
"username": username,
"enabled": True,
"realmRoles": [
"user_default",
],
}
try:
user_id = keycloak.create_user(payload, exist_ok=False)
keycloak.send_update_account(
user_id=user_id, payload=json.dumps(["UPDATE_PASSWORD", "UPDATE_PROFILE"])
)
keycloak.send_verify_email(user_id=user_id)
except Exception as exception:
print(f"Keycloak user registration failed, saw: {exception}")
if not confirm():
print("Bailing out on request...")
exit(1)