#!/usr/bin/env python3 """End-to-end test: create fresh Authentik + Immich instances, run SSO setup, verify. Creates BRAND NEW app instances with unique domains via abra app new. This ensures the scripts work on a completely clean install that has never been configured before. Usage: python3 utils/tests/test_immich_sso.py """ import atexit import os import sys sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from helpers import ( WORKSPACE, resolve_instance, run, abra, fresh_app, deploy_and_wait, assert_converges, http_get, http_post, retry_http_get, ) # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- INSTANCE = resolve_instance() SERVER = f"{INSTANCE}.commoninternet.net" AK_TEST_DOMAIN = f"ak-immichtest.{SERVER}" IMMICH_TEST_DOMAIN = f"immich-ssotest.{SERVER}" AK_TEST_TOKEN = "ssotest-immich-admin-token" IMMICH_ADMIN_EMAIL = "admin@immich-ssotest.test" IMMICH_ADMIN_PASS = "adminpass123" print("=== Immich SSO End-to-End Test ===") print(f" Server: {SERVER}") print(f" Authentik (new): {AK_TEST_DOMAIN}") print(f" Immich (new): {IMMICH_TEST_DOMAIN}") print() # --------------------------------------------------------------------------- # Cleanup on exit # --------------------------------------------------------------------------- def cleanup(): print() print("=== Cleanup ===") print(" Undeploying test instances ...") abra(f"app undeploy {IMMICH_TEST_DOMAIN} --no-input", check=False, timeout=60) abra(f"app undeploy {AK_TEST_DOMAIN} --no-input", check=False, timeout=60) print(" Cleanup done") atexit.register(cleanup) failures = [] # --------------------------------------------------------------------------- # Step 1: Create fresh Authentik # --------------------------------------------------------------------------- print("=== Step 1: Create fresh Authentik instance ===") fresh_app("authentik", SERVER, AK_TEST_DOMAIN, preset_secrets={"admin_token": AK_TEST_TOKEN}) deploy_and_wait(AK_TEST_DOMAIN, SERVER, f"https://{AK_TEST_DOMAIN}", "Authentik", deploy_timeout=60, wait_max=300) # The deploy post-command set_admin_pass gets killed because it runs before # the worker has finished migrations. Re-run it with retries — the worker # needs time to complete its bootstrap before akadmin exists. def _set_admin_pass(): result = abra( f"app cmd {AK_TEST_DOMAIN} worker set_admin_pass --chaos --no-input", tty_wrap=True, check=False, timeout=120, ) output = result.stdout if result else "" if "Created authentik-bootstrap-token" in output or "Changed authentik-bootstrap-token" in output: return True if "Changed akadmin password" in output: return True return None assert_converges(_set_admin_pass, "set_admin_pass (bootstrap token)", max_wait=600, interval=15) # Wait for the API to become ready with the admin token ak_api = f"https://{AK_TEST_DOMAIN}/api/v3" ak_headers = {"Authorization": f"Bearer {AK_TEST_TOKEN}"} def _ak_api_ready(): code, body = http_get(f"{ak_api}/flows/instances/", headers=ak_headers) if code == 200: return True print(f" API check: HTTP {code}", flush=True) return False assert_converges(_ak_api_ready, "Authentik API ready", max_wait=120) print() # --------------------------------------------------------------------------- # Step 2: Create fresh Immich # --------------------------------------------------------------------------- print("=== Step 2: Create fresh Immich instance ===") fresh_app("immich", SERVER, IMMICH_TEST_DOMAIN) deploy_and_wait(IMMICH_TEST_DOMAIN, SERVER, f"https://{IMMICH_TEST_DOMAIN}", "Immich", deploy_timeout=60, wait_max=180) # Wait for Immich API to be ready (not just the web UI) assert_converges( lambda: http_get(f"https://{IMMICH_TEST_DOMAIN}/api/server/ping")[0] == 200, "Immich API ready", max_wait=120, ) print() # --------------------------------------------------------------------------- # Step 3: Run setup script # --------------------------------------------------------------------------- print("=== Step 3: Run setup_immich_sso.py ===") setup_script = os.path.join(WORKSPACE, "utils", "setup_immich_sso.py") run(f"python3 {setup_script}" f" --authentik-domain {AK_TEST_DOMAIN}" f" --authentik-token {AK_TEST_TOKEN}" f" --immich-domain {IMMICH_TEST_DOMAIN}" f" --immich-admin-email {IMMICH_ADMIN_EMAIL}" f" --immich-admin-pass {IMMICH_ADMIN_PASS}", timeout=120) print() # --------------------------------------------------------------------------- # Step 4: Verify OIDC discovery endpoint (with retries) # --------------------------------------------------------------------------- print("=== Step 4: Verify OIDC discovery endpoint ===") discovery_url = ( f"https://{AK_TEST_DOMAIN}/application/o/immich" f"/.well-known/openid-configuration" ) try: retry_http_get(discovery_url, expect_status=200, max_wait=60) print(" PASS: OIDC discovery endpoint OK") except RuntimeError as e: print(f" FAIL: {e}") failures.append("OIDC discovery") print() # --------------------------------------------------------------------------- # Step 5: Obtain token from Authentik via password grant (with retries) # --------------------------------------------------------------------------- print("=== Step 5: Obtain token from Authentik ===") # Get client secret — retry def _get_client_secret(): _, providers = http_get( f"{ak_api}/providers/oauth2/?search=immich", headers=ak_headers, ) if not providers: return None for r in providers.get("results", []): if r["name"] == "immich": _, detail = http_get( f"{ak_api}/providers/oauth2/{r['pk']}/", headers=ak_headers, ) if detail: return detail.get("client_secret") return None try: client_secret = assert_converges( _get_client_secret, "retrieve client secret", max_wait=60, ) except RuntimeError: client_secret = None print(" FAIL: Could not retrieve client secret from Authentik") failures.append("client secret retrieval") if client_secret: # Get app password — retry def _get_app_password(): _, resp = http_get( f"{ak_api}/core/tokens/testuser-app-password/view_key/", headers=ak_headers, ) return resp.get("key") if resp else None try: app_password = assert_converges( _get_app_password, "retrieve app password", max_wait=60, ) except RuntimeError: app_password = None print(" FAIL: Could not retrieve app password") failures.append("app password retrieval") if app_password: # Token grant — retry token_url = f"https://{AK_TEST_DOMAIN}/application/o/token/" def _get_token(): _, resp = http_post(token_url, data={ "grant_type": "password", "client_id": "immich", "client_secret": client_secret, "username": "testuser", "password": app_password, "scope": "openid email profile", }, content_type="application/x-www-form-urlencoded") return (resp or {}).get("access_token") try: access_token = assert_converges( _get_token, "password grant token", max_wait=60, ) print(f" PASS: Got access token ({len(access_token)} chars)") except RuntimeError: print(" FAIL: Token request did not succeed") failures.append("token grant") print() # --------------------------------------------------------------------------- # Step 6: Verify Immich OAuth authorize endpoint (with retries) # --------------------------------------------------------------------------- print("=== Step 6: Verify Immich /api/oauth/authorize ===") def _check_authorize(): _, resp = http_post( f"https://{IMMICH_TEST_DOMAIN}/api/oauth/authorize", data={"redirectUri": f"https://{IMMICH_TEST_DOMAIN}/auth/login"}, ) url = (resp or {}).get("url", "") return url if url else None try: redirect_url = assert_converges( _check_authorize, "OAuth authorize returns redirect URL", max_wait=60, ) if AK_TEST_DOMAIN in redirect_url: print(" PASS: OAuth authorize returns Authentik redirect URL") else: print(f" WARN: Redirect URL doesn't contain Authentik domain: {redirect_url}") print(" (May still be OK if using a different OIDC discovery URL)") except RuntimeError: print(" FAIL: /api/oauth/authorize did not return a redirect URL") failures.append("OAuth authorize") print() # --------------------------------------------------------------------------- # Result # --------------------------------------------------------------------------- if failures: print(f"FAIL: Immich SSO end-to-end test failed: {', '.join(failures)}") sys.exit(1) else: print("PASS: Immich SSO end-to-end test passed") print(f" Fresh Authentik ({AK_TEST_DOMAIN}) + Immich ({IMMICH_TEST_DOMAIN})") print(" created from scratch, SSO setup, OIDC discovery OK,") print(" token grant OK, OAuth authorize endpoint returns redirect URL.")