Sanitized single-commit public mirror of recipe-maintainer. - Removed test-ssh/.testenv (live creds); added test-ssh/.testenv.example placeholders. - Removed plans/ and planned-updates/ (deployment-planning docs) so no client/ deployment domains appear in the public repo. - All other secret stores were already gitignored. - docs.coopcloud.tech retained as a submodule (public upstream).
208 lines
7.9 KiB
Python
208 lines
7.9 KiB
Python
#!/usr/bin/env python3
|
|
"""Setup Authentik OIDC integration for Immich.
|
|
|
|
Creates an OAuth2 provider, application, and test user in Authentik,
|
|
then creates an Immich admin account and configures OAuth via the
|
|
Immich API.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import urllib.error
|
|
import urllib.request
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
|
|
|
|
from lib.authentik import AuthentikAdmin
|
|
from lib.models import load_default_instance
|
|
from lib.secrets import load_secrets
|
|
|
|
# Configuration
|
|
PROVIDER_NAME = "immich"
|
|
APP_SLUG = "immich"
|
|
CLIENT_ID = "immich"
|
|
TEST_USER = "testuser"
|
|
TEST_PASS = "testpass123"
|
|
TEST_EMAIL = f"{TEST_USER}@test.example.com"
|
|
|
|
IMMICH_ADMIN_EMAIL = "admin@immich.test"
|
|
IMMICH_ADMIN_PASS = "adminpass123"
|
|
IMMICH_ADMIN_NAME = "Admin"
|
|
|
|
|
|
def _immich_request(method, url, data=None, headers=None, timeout=10):
|
|
"""Make an HTTP request to the Immich API."""
|
|
body = json.dumps(data).encode() if data is not None else None
|
|
req = urllib.request.Request(url, data=body, method=method)
|
|
req.add_header("Content-Type", "application/json")
|
|
for k, v in (headers or {}).items():
|
|
req.add_header(k, v)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
raw = resp.read()
|
|
if not raw:
|
|
return resp.getcode(), None
|
|
return resp.getcode(), json.loads(raw)
|
|
except urllib.error.HTTPError as e:
|
|
try:
|
|
raw = e.read().decode(errors="replace")
|
|
return e.code, json.loads(raw) if raw.strip() else None
|
|
except Exception:
|
|
return e.code, None
|
|
|
|
|
|
def main():
|
|
inst = load_default_instance()
|
|
immich_domain = inst.default_domain("immich")
|
|
immich_url = f"https://{immich_domain}"
|
|
ak_domain = inst.default_domain("authentik")
|
|
ak_url = f"https://{ak_domain}"
|
|
|
|
# Get Authentik admin token from synced secrets
|
|
ak_secrets = load_secrets(ak_domain)
|
|
ak_token = ak_secrets["admin_token"]
|
|
|
|
ak = AuthentikAdmin(ak_url, ak_token)
|
|
|
|
# Resolve Authentik UUIDs
|
|
uuids = ak.resolve_uuids()
|
|
|
|
# Step 1: Create OAuth2 provider
|
|
provider_pk, client_secret = ak.ensure_provider(
|
|
PROVIDER_NAME, CLIENT_ID,
|
|
redirect_uris=[
|
|
{"matching_mode": "strict", "url": f"{immich_url}/auth/login"},
|
|
{"matching_mode": "strict", "url": f"{immich_url}/user-settings"},
|
|
{"matching_mode": "strict", "url": "app.immich:///oauth-callback"},
|
|
],
|
|
uuids=uuids,
|
|
)
|
|
|
|
# Step 2: Create application
|
|
ak.ensure_application("Immich", APP_SLUG, provider_pk, immich_url)
|
|
|
|
# Step 3: Ensure test user with APP_PASSWORD
|
|
user_pk = ak.ensure_user(TEST_USER, TEST_EMAIL, TEST_PASS)
|
|
app_password = ak.ensure_app_password(user_pk)
|
|
|
|
# Step 4: Configure Immich OAuth via API
|
|
print("=== Configure Immich OAuth via API ===", flush=True)
|
|
|
|
# Create admin account (skip if already exists)
|
|
print(" Creating Immich admin account ...", flush=True)
|
|
status, resp = _immich_request("POST", f"{immich_url}/api/auth/admin-sign-up", {
|
|
"email": IMMICH_ADMIN_EMAIL,
|
|
"password": IMMICH_ADMIN_PASS,
|
|
"name": IMMICH_ADMIN_NAME,
|
|
})
|
|
if resp and ("error" in resp or "message" in resp):
|
|
msg = resp.get("error", resp.get("message", ""))
|
|
if "admin" in msg.lower():
|
|
print(" Admin account already exists, continuing", flush=True)
|
|
else:
|
|
print(f" Admin signup response: {msg}", flush=True)
|
|
else:
|
|
print(" Admin account created (or already existed)", flush=True)
|
|
|
|
# Login to get access token
|
|
print(" Logging in as Immich admin ...", flush=True)
|
|
status, resp = _immich_request("POST", f"{immich_url}/api/auth/login", {
|
|
"email": IMMICH_ADMIN_EMAIL,
|
|
"password": IMMICH_ADMIN_PASS,
|
|
})
|
|
if not resp or "accessToken" not in resp:
|
|
print(f" FAIL: Could not login to Immich. Response: {resp}", flush=True)
|
|
sys.exit(1)
|
|
access_token = resp["accessToken"]
|
|
print(f" Logged in (token: {access_token[:10]}...)", flush=True)
|
|
|
|
auth_headers = {"Authorization": f"Bearer {access_token}"}
|
|
|
|
# Get current system config
|
|
print(" Fetching current Immich system config ...", flush=True)
|
|
_, config = _immich_request("GET", f"{immich_url}/api/system-config",
|
|
headers=auth_headers)
|
|
|
|
# Merge OAuth settings
|
|
print(" Updating OAuth settings ...", flush=True)
|
|
oidc_issuer = f"{ak_url}/application/o/{APP_SLUG}/.well-known/openid-configuration"
|
|
oauth = config.get("oauth", {})
|
|
oauth.update({
|
|
"enabled": True,
|
|
"issuerUrl": oidc_issuer,
|
|
"clientId": CLIENT_ID,
|
|
"clientSecret": client_secret,
|
|
"scope": "openid email profile",
|
|
"autoRegister": True,
|
|
"autoLaunch": False,
|
|
"buttonText": "Login with Authentik",
|
|
"tokenEndpointAuthMethod": "client_secret_post",
|
|
"timeout": 30000,
|
|
"mobileOverrideEnabled": False,
|
|
"mobileRedirectUri": "",
|
|
"signingAlgorithm": "RS256",
|
|
"storageLabelClaim": "preferred_username",
|
|
"storageQuotaClaim": "",
|
|
"defaultStorageQuota": 0,
|
|
"profileSigningAlgorithm": "none",
|
|
"roleClaim": "immich_role",
|
|
})
|
|
config["oauth"] = oauth
|
|
|
|
_, put_resp = _immich_request("PUT", f"{immich_url}/api/system-config",
|
|
data=config, headers=auth_headers)
|
|
oauth_enabled = (put_resp or {}).get("oauth", {}).get("enabled", False)
|
|
if oauth_enabled:
|
|
print(" OAuth enabled successfully", flush=True)
|
|
else:
|
|
print(f" WARNING: Could not confirm OAuth enabled. Response: {put_resp}",
|
|
flush=True)
|
|
|
|
# Step 5: Write credentials file
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
creds_file = os.path.join(script_dir, f"authentik-test-credentials.{inst.domain_suffix}.toml")
|
|
print(f"=== Write credentials to {creds_file} ===", flush=True)
|
|
with open(creds_file, "w") as f:
|
|
f.write(f'# Authentik OIDC credentials for Immich test instance\n')
|
|
f.write(f'#\n')
|
|
f.write(f'# Authentik instance: {ak_domain}\n')
|
|
f.write(f'# Application slug: {APP_SLUG}\n')
|
|
f.write(f'# Created by: setup_authentik_integration.py\n')
|
|
f.write(f'\n')
|
|
f.write(f'# Authentik admin\n')
|
|
f.write(f'ak_token = "{ak_token}"\n')
|
|
f.write(f'\n')
|
|
f.write(f'# OIDC provider\n')
|
|
f.write(f'ak_app_slug = "{APP_SLUG}"\n')
|
|
f.write(f'ak_client_id = "{CLIENT_ID}"\n')
|
|
f.write(f'ak_client_secret = "{client_secret}"\n')
|
|
f.write(f'\n')
|
|
f.write(f'# Authentik OIDC endpoints\n')
|
|
f.write(f'ak_token_endpoint = "https://{ak_domain}/application/o/token/"\n')
|
|
f.write(f'ak_userinfo_endpoint = "https://{ak_domain}/application/o/userinfo/"\n')
|
|
f.write(f'ak_discovery_endpoint = "https://{ak_domain}/application/o/{APP_SLUG}/.well-known/openid-configuration"\n')
|
|
f.write(f'\n')
|
|
f.write(f'# Test user (password for browser login, app_password for password grant)\n')
|
|
f.write(f'ak_test_user = "{TEST_USER}"\n')
|
|
f.write(f'ak_test_pass = "{TEST_PASS}"\n')
|
|
f.write(f'ak_test_app_password = "{app_password}"\n')
|
|
f.write(f'ak_test_email = "{TEST_EMAIL}"\n')
|
|
f.write(f'\n')
|
|
f.write(f'# Immich instance\n')
|
|
f.write(f'immich_domain = "{immich_domain}"\n')
|
|
f.write(f'immich_admin_email = "{IMMICH_ADMIN_EMAIL}"\n')
|
|
f.write(f'immich_admin_pass = "{IMMICH_ADMIN_PASS}"\n')
|
|
print(f" Written to {creds_file}", flush=True)
|
|
|
|
print("", flush=True)
|
|
print("=== Authentik OIDC integration setup for Immich complete ===", flush=True)
|
|
print("", flush=True)
|
|
print("Next steps:", flush=True)
|
|
print(f" 1. Run OIDC test: python3 recipe-info/immich/tests/oidc_login.py", flush=True)
|
|
print(f" 2. Manual: open {immich_url} and click 'Login with Authentik'", flush=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|