keycloak-scripts/add-users-keycloak-api.py

84 lines
2.2 KiB
Python
Executable File

#!/usr/bin/env python3
# See https://python-keycloak-client.readthedocs.io/en/latest/
import json
from os import environ
from os.path import exists
from pathlib import Path
from time import sleep
from keycloak import KeycloakAdmin
def init_keycloak():
KEYCLOAK_DOMAIN = environ.get("KEYCLOAK_DOMAIN")
KEYCLOAK_REALM = environ.get("KEYCLOAK_REALM")
KEYCLOAK_CLIENT_SECRET = environ.get("KEYCLOAK_CLIENT_SECRET")
client = KeycloakAdmin(
server_url=f"https://{KEYCLOAK_DOMAIN}/auth/",
realm_name=KEYCLOAK_REALM,
client_secret_key=KEYCLOAK_CLIENT_SECRET,
verify=True,
)
return client
def confirm():
answer = ""
while answer not in ["y", "n"]:
answer = input("OK to continue [Y/N]? ").lower()
return answer == "y"
if not exists(Path("accounts.txt").absolute()):
print("Missing accounts.txt!")
exit(1)
with open("accounts.txt") as handle:
emails = [email.strip() for email in handle.readlines()]
print(f"Parsed {emails} from accounts.txt")
keycloak = init_keycloak()
print(f"Current user account count: {keycloak.users_count()}")
for email in emails:
username = email.split("@")[0].strip()
print(f"processing {email} now...")
print(f"deriving {username} from {email} for account creation...")
payload = {
"email": email,
"username": username,
"enabled": True,
"realmRoles": [
"user_default",
],
"requiredActions": ["UPDATE_PASSWORD", "UPDATE_PROFILE"],
}
print(f"payload: {payload}")
if not confirm():
print("Bailing out on request...")
exit(1)
try:
user_id = keycloak.create_user(payload, exist_ok=True)
print(f"Account created for {email}")
keycloak.send_verify_email(user_id=user_id)
print(f"Verification mail sent to {email}")
except Exception as exception:
print(f"Keycloak user registration failed, saw: {exception}")
if not confirm():
print("Bailing out on request...")
exit(1)
print("Sleeping one second to not overload the server...")
sleep(1)
print(f"Final user account count: {keycloak.users_count()}")