Files
recipe-maintainer/utils/tests/test_immich_sso.py
autonomic-bot f283a371bb recipe-maintainer: public snapshot (secrets + deployment plans removed, single commit)
Sanitized single-commit public mirror of recipe-maintainer.
- Removed test-ssh/.testenv (live creds); added test-ssh/.testenv.example placeholders.
- Removed plans/ and planned-updates/ (deployment-planning docs) so no client/
  deployment domains appear in the public repo.
- All other secret stores were already gitignored.
- docs.coopcloud.tech retained as a submodule (public upstream).
2026-06-16 20:18:24 +00:00

277 lines
9.3 KiB
Python
Executable File

#!/usr/bin/env python3
"""End-to-end test: create fresh Authentik + Immich instances, run SSO setup, verify.
Creates BRAND NEW app instances with unique domains via abra app new.
This ensures the scripts work on a completely clean install that has
never been configured before.
Usage:
python3 utils/tests/test_immich_sso.py
"""
import atexit
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from helpers import (
WORKSPACE, resolve_instance, run, abra, fresh_app, deploy_and_wait,
assert_converges, http_get, http_post, retry_http_get,
)
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
INSTANCE = resolve_instance()
SERVER = f"{INSTANCE}.commoninternet.net"
AK_TEST_DOMAIN = f"ak-immichtest.{SERVER}"
IMMICH_TEST_DOMAIN = f"immich-ssotest.{SERVER}"
AK_TEST_TOKEN = "ssotest-immich-admin-token"
IMMICH_ADMIN_EMAIL = "admin@immich-ssotest.test"
IMMICH_ADMIN_PASS = "adminpass123"
print("=== Immich SSO End-to-End Test ===")
print(f" Server: {SERVER}")
print(f" Authentik (new): {AK_TEST_DOMAIN}")
print(f" Immich (new): {IMMICH_TEST_DOMAIN}")
print()
# ---------------------------------------------------------------------------
# Cleanup on exit
# ---------------------------------------------------------------------------
def cleanup():
print()
print("=== Cleanup ===")
print(" Undeploying test instances ...")
abra(f"app undeploy {IMMICH_TEST_DOMAIN} --no-input", check=False, timeout=60)
abra(f"app undeploy {AK_TEST_DOMAIN} --no-input", check=False, timeout=60)
print(" Cleanup done")
atexit.register(cleanup)
failures = []
# ---------------------------------------------------------------------------
# Step 1: Create fresh Authentik
# ---------------------------------------------------------------------------
print("=== Step 1: Create fresh Authentik instance ===")
fresh_app("authentik", SERVER, AK_TEST_DOMAIN,
preset_secrets={"admin_token": AK_TEST_TOKEN})
deploy_and_wait(AK_TEST_DOMAIN, SERVER, f"https://{AK_TEST_DOMAIN}", "Authentik",
deploy_timeout=60, wait_max=300)
# The deploy post-command set_admin_pass gets killed because it runs before
# the worker has finished migrations. Re-run it with retries — the worker
# needs time to complete its bootstrap before akadmin exists.
def _set_admin_pass():
result = abra(
f"app cmd {AK_TEST_DOMAIN} worker set_admin_pass --chaos --no-input",
tty_wrap=True, check=False, timeout=120,
)
output = result.stdout if result else ""
if "Created authentik-bootstrap-token" in output or "Changed authentik-bootstrap-token" in output:
return True
if "Changed akadmin password" in output:
return True
return None
assert_converges(_set_admin_pass, "set_admin_pass (bootstrap token)",
max_wait=600, interval=15)
# Wait for the API to become ready with the admin token
ak_api = f"https://{AK_TEST_DOMAIN}/api/v3"
ak_headers = {"Authorization": f"Bearer {AK_TEST_TOKEN}"}
def _ak_api_ready():
code, body = http_get(f"{ak_api}/flows/instances/", headers=ak_headers)
if code == 200:
return True
print(f" API check: HTTP {code}", flush=True)
return False
assert_converges(_ak_api_ready, "Authentik API ready", max_wait=120)
print()
# ---------------------------------------------------------------------------
# Step 2: Create fresh Immich
# ---------------------------------------------------------------------------
print("=== Step 2: Create fresh Immich instance ===")
fresh_app("immich", SERVER, IMMICH_TEST_DOMAIN)
deploy_and_wait(IMMICH_TEST_DOMAIN, SERVER, f"https://{IMMICH_TEST_DOMAIN}", "Immich",
deploy_timeout=60, wait_max=180)
# Wait for Immich API to be ready (not just the web UI)
assert_converges(
lambda: http_get(f"https://{IMMICH_TEST_DOMAIN}/api/server/ping")[0] == 200,
"Immich API ready",
max_wait=120,
)
print()
# ---------------------------------------------------------------------------
# Step 3: Run setup script
# ---------------------------------------------------------------------------
print("=== Step 3: Run setup_immich_sso.py ===")
setup_script = os.path.join(WORKSPACE, "utils", "setup_immich_sso.py")
run(f"python3 {setup_script}"
f" --authentik-domain {AK_TEST_DOMAIN}"
f" --authentik-token {AK_TEST_TOKEN}"
f" --immich-domain {IMMICH_TEST_DOMAIN}"
f" --immich-admin-email {IMMICH_ADMIN_EMAIL}"
f" --immich-admin-pass {IMMICH_ADMIN_PASS}",
timeout=120)
print()
# ---------------------------------------------------------------------------
# Step 4: Verify OIDC discovery endpoint (with retries)
# ---------------------------------------------------------------------------
print("=== Step 4: Verify OIDC discovery endpoint ===")
discovery_url = (
f"https://{AK_TEST_DOMAIN}/application/o/immich"
f"/.well-known/openid-configuration"
)
try:
retry_http_get(discovery_url, expect_status=200, max_wait=60)
print(" PASS: OIDC discovery endpoint OK")
except RuntimeError as e:
print(f" FAIL: {e}")
failures.append("OIDC discovery")
print()
# ---------------------------------------------------------------------------
# Step 5: Obtain token from Authentik via password grant (with retries)
# ---------------------------------------------------------------------------
print("=== Step 5: Obtain token from Authentik ===")
# Get client secret — retry
def _get_client_secret():
_, providers = http_get(
f"{ak_api}/providers/oauth2/?search=immich", headers=ak_headers,
)
if not providers:
return None
for r in providers.get("results", []):
if r["name"] == "immich":
_, detail = http_get(
f"{ak_api}/providers/oauth2/{r['pk']}/", headers=ak_headers,
)
if detail:
return detail.get("client_secret")
return None
try:
client_secret = assert_converges(
_get_client_secret, "retrieve client secret", max_wait=60,
)
except RuntimeError:
client_secret = None
print(" FAIL: Could not retrieve client secret from Authentik")
failures.append("client secret retrieval")
if client_secret:
# Get app password — retry
def _get_app_password():
_, resp = http_get(
f"{ak_api}/core/tokens/testuser-app-password/view_key/",
headers=ak_headers,
)
return resp.get("key") if resp else None
try:
app_password = assert_converges(
_get_app_password, "retrieve app password", max_wait=60,
)
except RuntimeError:
app_password = None
print(" FAIL: Could not retrieve app password")
failures.append("app password retrieval")
if app_password:
# Token grant — retry
token_url = f"https://{AK_TEST_DOMAIN}/application/o/token/"
def _get_token():
_, resp = http_post(token_url, data={
"grant_type": "password",
"client_id": "immich",
"client_secret": client_secret,
"username": "testuser",
"password": app_password,
"scope": "openid email profile",
}, content_type="application/x-www-form-urlencoded")
return (resp or {}).get("access_token")
try:
access_token = assert_converges(
_get_token, "password grant token", max_wait=60,
)
print(f" PASS: Got access token ({len(access_token)} chars)")
except RuntimeError:
print(" FAIL: Token request did not succeed")
failures.append("token grant")
print()
# ---------------------------------------------------------------------------
# Step 6: Verify Immich OAuth authorize endpoint (with retries)
# ---------------------------------------------------------------------------
print("=== Step 6: Verify Immich /api/oauth/authorize ===")
def _check_authorize():
_, resp = http_post(
f"https://{IMMICH_TEST_DOMAIN}/api/oauth/authorize",
data={"redirectUri": f"https://{IMMICH_TEST_DOMAIN}/auth/login"},
)
url = (resp or {}).get("url", "")
return url if url else None
try:
redirect_url = assert_converges(
_check_authorize, "OAuth authorize returns redirect URL", max_wait=60,
)
if AK_TEST_DOMAIN in redirect_url:
print(" PASS: OAuth authorize returns Authentik redirect URL")
else:
print(f" WARN: Redirect URL doesn't contain Authentik domain: {redirect_url}")
print(" (May still be OK if using a different OIDC discovery URL)")
except RuntimeError:
print(" FAIL: /api/oauth/authorize did not return a redirect URL")
failures.append("OAuth authorize")
print()
# ---------------------------------------------------------------------------
# Result
# ---------------------------------------------------------------------------
if failures:
print(f"FAIL: Immich SSO end-to-end test failed: {', '.join(failures)}")
sys.exit(1)
else:
print("PASS: Immich SSO end-to-end test passed")
print(f" Fresh Authentik ({AK_TEST_DOMAIN}) + Immich ({IMMICH_TEST_DOMAIN})")
print(" created from scratch, SSO setup, OIDC discovery OK,")
print(" token grant OK, OAuth authorize endpoint returns redirect URL.")