#!/usr/bin/env python3 """End-to-end test: create fresh Authentik + CryptPad instances, run SSO setup, verify. Creates BRAND NEW app instances with unique domains via abra app new. This ensures the scripts work on a completely clean install that has never been configured before. Usage: python3 utils/tests/test_cryptpad_sso.py """ import atexit import os import sys sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from helpers import ( WORKSPACE, resolve_instance, run, abra, fresh_app, deploy_and_wait, assert_converges, http_get, http_post, retry_http_get, ) # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- INSTANCE = resolve_instance() SERVER = f"{INSTANCE}.commoninternet.net" AK_TEST_DOMAIN = f"ak-cpadtest.{SERVER}" CPAD_TEST_DOMAIN = f"cpad-ssotest.{SERVER}" AK_TEST_TOKEN = "ssotest-cpad-admin-token" print("=== CryptPad SSO End-to-End Test ===") print(f" Server: {SERVER}") print(f" Authentik (new): {AK_TEST_DOMAIN}") print(f" CryptPad (new): {CPAD_TEST_DOMAIN}") print() # --------------------------------------------------------------------------- # Cleanup on exit # --------------------------------------------------------------------------- def cleanup(): print() print("=== Cleanup ===") print(" Undeploying test instances ...") abra(f"app undeploy {CPAD_TEST_DOMAIN} --no-input", check=False, timeout=60) abra(f"app undeploy {AK_TEST_DOMAIN} --no-input", check=False, timeout=60) print(" Cleanup done") atexit.register(cleanup) failures = [] # --------------------------------------------------------------------------- # Step 1: Create fresh Authentik # --------------------------------------------------------------------------- print("=== Step 1: Create fresh Authentik instance ===") fresh_app("authentik", SERVER, AK_TEST_DOMAIN, preset_secrets={"admin_token": AK_TEST_TOKEN}) deploy_and_wait(AK_TEST_DOMAIN, SERVER, f"https://{AK_TEST_DOMAIN}", "Authentik", deploy_timeout=60, wait_max=300) # The deploy post-command set_admin_pass gets killed because it runs before # the worker has finished migrations. Re-run it with retries — the worker # needs time to complete its bootstrap before akadmin exists. def _set_admin_pass(): result = abra( f"app cmd {AK_TEST_DOMAIN} worker set_admin_pass --chaos --no-input", tty_wrap=True, check=False, timeout=120, ) output = result.stdout if result else "" if "Created authentik-bootstrap-token" in output or "Changed authentik-bootstrap-token" in output: return True if "Changed akadmin password" in output: return True return None assert_converges(_set_admin_pass, "set_admin_pass (bootstrap token)", max_wait=600, interval=15) # Wait for the API to become ready with the admin token ak_api = f"https://{AK_TEST_DOMAIN}/api/v3" ak_headers = {"Authorization": f"Bearer {AK_TEST_TOKEN}"} def _ak_api_ready(): code, body = http_get(f"{ak_api}/flows/instances/", headers=ak_headers) if code == 200: return True print(f" API check: HTTP {code}", flush=True) return False assert_converges(_ak_api_ready, "Authentik API ready", max_wait=120) print() # --------------------------------------------------------------------------- # Step 2: Create fresh CryptPad # --------------------------------------------------------------------------- print("=== Step 2: Create fresh CryptPad instance ===") fresh_app("cryptpad", SERVER, CPAD_TEST_DOMAIN) deploy_and_wait(CPAD_TEST_DOMAIN, SERVER, f"https://{CPAD_TEST_DOMAIN}", "CryptPad", deploy_timeout=60, wait_max=180) print() # --------------------------------------------------------------------------- # Step 3: Run setup script # --------------------------------------------------------------------------- print("=== Step 3: Run setup_cryptpad_sso.py ===") setup_script = os.path.join(WORKSPACE, "utils", "setup_cryptpad_sso.py") run(f"python3 {setup_script}" f" --authentik-domain {AK_TEST_DOMAIN}" f" --authentik-token {AK_TEST_TOKEN}" f" --cryptpad-domain {CPAD_TEST_DOMAIN}", timeout=120) print() # --------------------------------------------------------------------------- # Step 4: Redeploy CryptPad with SSO config # --------------------------------------------------------------------------- print("=== Step 4: Redeploy CryptPad with SSO config ===") deploy_and_wait(CPAD_TEST_DOMAIN, SERVER, f"https://{CPAD_TEST_DOMAIN}", "CryptPad (post-SSO)", deploy_timeout=60, wait_max=300) print() # --------------------------------------------------------------------------- # Step 5: Verify OIDC discovery endpoint (with retries) # --------------------------------------------------------------------------- print("=== Step 5: Verify OIDC discovery endpoint ===") discovery_url = ( f"https://{AK_TEST_DOMAIN}/application/o/cryptpad" f"/.well-known/openid-configuration" ) try: retry_http_get(discovery_url, expect_status=200, max_wait=60) print(" PASS: OIDC discovery endpoint OK") except RuntimeError as e: print(f" FAIL: {e}") failures.append("OIDC discovery") print() # --------------------------------------------------------------------------- # Step 6: Obtain token from Authentik via password grant (with retries) # --------------------------------------------------------------------------- print("=== Step 6: Obtain token from Authentik ===") # Get client secret — retry because Authentik may still be indexing the provider def _get_client_secret(): _, providers = http_get( f"{ak_api}/providers/oauth2/?search=cryptpad", headers=ak_headers, ) if not providers: return None for r in providers.get("results", []): if r["name"] == "cryptpad": _, detail = http_get( f"{ak_api}/providers/oauth2/{r['pk']}/", headers=ak_headers, ) if detail: return detail.get("client_secret") return None try: client_secret = assert_converges( _get_client_secret, "retrieve client secret", max_wait=60, ) except RuntimeError: client_secret = None print(" FAIL: Could not retrieve client secret from Authentik") failures.append("client secret retrieval") if client_secret: # Get app password — retry def _get_app_password(): _, resp = http_get( f"{ak_api}/core/tokens/testuser-app-password/view_key/", headers=ak_headers, ) return resp.get("key") if resp else None try: app_password = assert_converges( _get_app_password, "retrieve app password", max_wait=60, ) except RuntimeError: app_password = None print(" FAIL: Could not retrieve app password") failures.append("app password retrieval") if app_password: # Token grant — retry (Authentik token endpoint may take a moment) token_url = f"https://{AK_TEST_DOMAIN}/application/o/token/" def _get_token(): _, resp = http_post(token_url, data={ "grant_type": "password", "client_id": "cryptpad", "client_secret": client_secret, "username": "testuser", "password": app_password, "scope": "openid email profile", }, content_type="application/x-www-form-urlencoded") return (resp or {}).get("access_token") try: access_token = assert_converges( _get_token, "password grant token", max_wait=60, ) print(f" PASS: Got access token ({len(access_token)} chars)") except RuntimeError: print(" FAIL: Token request did not succeed") failures.append("token grant") print() # --------------------------------------------------------------------------- # Step 7: Verify CryptPad /ssoauth endpoint (with retries) # --------------------------------------------------------------------------- print("=== Step 7: Verify CryptPad /ssoauth endpoint ===") def _check_ssoauth(): status, _ = http_get(f"https://{CPAD_TEST_DOMAIN}/ssoauth") # Any response except 404 means the SSO plugin is loaded if status == 404: return None if status == 0: return None # connection error, retry return status try: sso_status = assert_converges( _check_ssoauth, "/ssoauth endpoint exists (not 404)", max_wait=120, ) print(f" PASS: /ssoauth endpoint exists (HTTP {sso_status})") except RuntimeError: print(" FAIL: /ssoauth returned 404 -- SSO plugin may not be loaded") failures.append("/ssoauth endpoint") print() # --------------------------------------------------------------------------- # Result # --------------------------------------------------------------------------- if failures: print(f"FAIL: CryptPad SSO end-to-end test failed: {', '.join(failures)}") sys.exit(1) else: print("PASS: CryptPad SSO end-to-end test passed") print(f" Fresh Authentik ({AK_TEST_DOMAIN}) + CryptPad ({CPAD_TEST_DOMAIN})") print(" created from scratch, SSO setup, OIDC discovery OK,") print(" token grant OK, /ssoauth endpoint exists.")