"""Shared test helpers for SSO end-to-end tests. Provides: - abra command runner with TTY wrapper support - HTTP helpers with retry/convergence support - assert_converges: retry a callable until it returns truthy or timeout - wait_for_http: poll a URL until it responds - resolve_instance / resolve_domain / resolve_server: read from settings.toml - load_toml_credentials: load TOML credential files Every subprocess and HTTP call has a hard timeout to prevent hangs. """ import json import os import subprocess import sys import time import tomllib import urllib.error import urllib.request import urllib.parse WORKSPACE = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) # --------------------------------------------------------------------------- # Settings / instance resolution # --------------------------------------------------------------------------- def _load_settings(): """Load and return the parsed settings.toml.""" settings_path = os.path.join(WORKSPACE, "settings.toml") with open(settings_path, 'rb') as f: return tomllib.load(f) def resolve_instance(): """Read the default instance name from settings.toml.""" settings = _load_settings() return settings["default_instance"] def resolve_domain(recipe): """Get the domain for a recipe on the active instance.""" settings = _load_settings() instance = settings["default_instance"] suffix = settings["instances"][instance]["domain_suffix"] return f"{recipe}.{suffix}" def resolve_server(): """Get the server for the active instance.""" settings = _load_settings() instance = settings["default_instance"] return settings["instances"][instance]["server"] # --------------------------------------------------------------------------- # Credential loading # --------------------------------------------------------------------------- def resolve_domain_suffix(): """Get the domain suffix for the active instance.""" settings = _load_settings() instance = settings["default_instance"] return settings["instances"][instance]["domain_suffix"] def load_toml_credentials(recipe_dir, provider): """Load credentials from recipe-info//-test-credentials..toml. The domain suffix is auto-resolved from settings.toml. recipe_dir: absolute path to the recipe-info/ directory provider: credential provider name (e.g. 'keycloak', 'authentik') Returns: dict of credentials, or None if file doesn't exist. """ suffix = resolve_domain_suffix() path = os.path.join(recipe_dir, f"{provider}-test-credentials.{suffix}.toml") if not os.path.exists(path): return None with open(path, 'rb') as f: return tomllib.load(f) # --------------------------------------------------------------------------- # Shell / abra command helpers # --------------------------------------------------------------------------- def run(cmd, check=True, timeout=120): """Run a shell command with a hard timeout. Uses the Linux `timeout` command to guarantee the process tree is killed after `timeout` seconds, even if the process ignores signals. subprocess.run gets a slightly longer timeout as a fallback. """ # Wrap with Linux timeout --kill-after to hard-kill the entire process wrapped = f"timeout --kill-after=5 {timeout} {cmd}" print(f" $ {cmd}", flush=True) try: result = subprocess.run( wrapped, shell=True, capture_output=True, text=True, timeout=timeout + 15, # fallback: kill subprocess if Linux timeout fails ) except subprocess.TimeoutExpired: print(f" TIMEOUT after {timeout}s (subprocess fallback)", flush=True) raise RuntimeError(f"Command timed out after {timeout}s: {cmd}") if result.stdout.strip(): for line in result.stdout.strip().split("\n"): print(f" {line}", flush=True) if result.returncode != 0: if result.stderr.strip(): for line in result.stderr.strip().split("\n"): print(f" stderr: {line}", flush=True) # exit code 124 = Linux timeout killed it if result.returncode == 124: print(f" TIMEOUT after {timeout}s", flush=True) if check: raise RuntimeError(f"Command timed out after {timeout}s: {cmd}") elif check: raise RuntimeError( f"Command failed (exit {result.returncode}): {cmd}" ) return result def abra(args, tty_wrap=False, check=True, timeout=120): """Run an abra command, optionally with TTY wrapper.""" cmd = f"abra {args}" if tty_wrap: cmd = f'script -qefc "{cmd}" /dev/null 2>&1' return run(cmd, check=check, timeout=timeout) def fresh_app(recipe, server, domain, preset_secrets=None, env_overrides=None): """Create a fresh app instance, cleaning up any leftovers first. Undeploys, removes Docker secrets, volumes, and env file from previous runs, then runs abra app new, applies env_overrides, inserts any preset_secrets, and generates the rest. preset_secrets: dict of {secret_name: value} to insert before generating remaining secrets (e.g. {"admin_token": "..."}). env_overrides: dict of {KEY: value} to set/uncomment in the .env file after app new (e.g. {"COMPOSE_FILE": "compose.yml:compose.sso.yml"}). """ env_path = os.path.expanduser( f"~/.abra/servers/{server}/{domain}.env" ) # Undeploy if still running from a previous run abra(f"app undeploy {domain} --no-input", check=False, timeout=60) # Remove leftover volumes so the DB starts fresh (avoids password mismatch # when secrets are regenerated but the old DB volume persists) abra(f"app volume remove {domain} --force --no-input", check=False, timeout=60) # Remove leftover Docker secrets from the server if os.path.exists(env_path): abra(f"app secret remove {domain} --all --chaos --no-input", tty_wrap=True, check=False, timeout=60) # Remove leftover env file so app new succeeds if os.path.exists(env_path): print(f" Removing leftover env: {env_path}", flush=True) os.remove(env_path) abra(f"app new {recipe} --server {server} --domain {domain} --chaos --no-input", timeout=60) # Apply env overrides to the generated .env file if env_overrides: _apply_env_overrides(env_path, env_overrides) # Insert preset secrets before generate so they use our known values for name, value in (preset_secrets or {}).items(): abra(f"app secret insert {domain} {name} v1 {value} --chaos --no-input", tty_wrap=True, check=False, timeout=60) # Generate remaining secrets (check=False: warns if some already exist) abra(f"app secret generate {domain} --all --chaos --no-input", tty_wrap=True, check=False, timeout=60) def _apply_env_overrides(env_path, overrides): """Set or uncomment values in an abra .env file.""" with open(env_path) as f: lines = f.readlines() remaining = dict(overrides) new_lines = [] for line in lines: stripped = line.strip() matched = False for key, value in list(remaining.items()): # Match "KEY=...", "#KEY=...", or "# KEY=..." if stripped.lstrip("#").strip().startswith(f"{key}="): new_lines.append(f"{key}={value}\n") remaining.pop(key) matched = True print(f" env: {key}={value}", flush=True) break if not matched: new_lines.append(line) # Append any keys that weren't found in the file for key, value in remaining.items(): new_lines.append(f"{key}={value}\n") print(f" env: {key}={value} (appended)", flush=True) with open(env_path, "w") as f: f.writelines(new_lines) def deploy_and_wait(domain, server, url, label, deploy_timeout=60, wait_max=300): """Fire off an abra deploy and then poll until all services are ready. The deploy command sends the stack to Docker Swarm quickly, but post-deploy hooks (set_admin_pass etc.) can hang for minutes. We give the deploy command a short timeout — if it times out, the deploy has already been submitted to Swarm. Then we poll via SSH + `docker service ls` until all services show full replicas (e.g. 1/1), followed by an HTTP check to confirm the app is actually serving requests. """ print(f" Deploying {domain} (fire-and-poll) ...", flush=True) abra(f"app deploy {domain} --chaos --force --no-input", timeout=deploy_timeout, check=False) # The Docker stack name is the domain with dots replaced by underscores stack_prefix = domain.replace(".", "_") # Poll replicas via SSH + docker service ls def _all_replicas_ready(): result = run( f"ssh {server} \"docker service ls" f" --filter 'name={stack_prefix}'" f" --format '{{{{.Replicas}}}}'\"", check=False, timeout=30, ) if result.returncode != 0: return None lines = [l.strip() for l in result.stdout.strip().split("\n") if l.strip()] if not lines: return None for replicas in lines: # Format is "1/1" — desired/running must match parts = replicas.split("/") if len(parts) != 2: return None if parts[0] != parts[1]: return None return True assert_converges( _all_replicas_ready, f"{label} all replicas ready (docker service ls)", max_wait=wait_max, interval=15, ) # HTTP check to confirm the app is actually serving return wait_for_http(url, label, max_wait=120) # --------------------------------------------------------------------------- # Convergence helpers # --------------------------------------------------------------------------- def assert_converges(fn, description, max_wait=120, interval=10): """Retry fn() until it returns a truthy value or we time out. fn() should return a truthy value on success, or raise / return falsy on failure. The last return value or exception is reported on timeout. Returns the truthy value on success. """ print(f" Waiting for: {description} (up to {max_wait}s) ...", flush=True) deadline = time.time() + max_wait last_error = None last_result = None attempts = 0 while time.time() < deadline: attempts += 1 try: result = fn() if result: print( f" Converged after ~{int(time.time() - (deadline - max_wait))}s" f" ({attempts} attempts)", flush=True, ) return result last_result = result except Exception as e: last_error = e time.sleep(interval) # Timed out detail = "" if last_error: detail = f" Last error: {last_error}" elif last_result is not None: detail = f" Last result: {last_result}" raise RuntimeError( f"Did not converge: {description} after {max_wait}s" f" ({attempts} attempts).{detail}" ) def wait_for_http(url, label, max_wait=300, interval=10): """Poll a URL until it returns a non-5xx response. Raises on timeout.""" def _check(): try: req = urllib.request.Request(url, method="GET") with urllib.request.urlopen(req, timeout=10) as resp: code = resp.getcode() if 200 <= code < 500: return code except urllib.error.HTTPError as e: if e.code < 500: return e.code except Exception: pass return None code = assert_converges(_check, f"{label} responding at {url}", max_wait, interval) print(f" {label} is up (HTTP {code})", flush=True) return code # --------------------------------------------------------------------------- # HTTP helpers with retry # --------------------------------------------------------------------------- def http_get(url, headers=None, timeout=15): """GET a URL, return (status_code, parsed_json_or_None). Does NOT retry — use retry_http_get or assert_converges for that. """ req = urllib.request.Request(url, method="GET") for k, v in (headers or {}).items(): req.add_header(k, v) try: with urllib.request.urlopen(req, timeout=timeout) as resp: raw = resp.read() try: return resp.getcode(), json.loads(raw) except (json.JSONDecodeError, ValueError): return resp.getcode(), None except urllib.error.HTTPError as e: try: raw = e.read().decode(errors="replace") return e.code, json.loads(raw) except Exception: return e.code, None except Exception: return 0, None def http_post(url, data=None, headers=None, content_type="application/json", timeout=15): """POST to a URL, return (status_code, parsed_json_or_None). Does NOT retry — use assert_converges for that. """ if content_type == "application/json" and data is not None: body = json.dumps(data).encode() elif content_type == "application/x-www-form-urlencoded" and data is not None: body = urllib.parse.urlencode(data).encode() else: body = None req = urllib.request.Request(url, data=body, method="POST") req.add_header("Content-Type", content_type) for k, v in (headers or {}).items(): req.add_header(k, v) try: with urllib.request.urlopen(req, timeout=timeout) as resp: raw = resp.read() try: return resp.getcode(), json.loads(raw) except (json.JSONDecodeError, ValueError): return resp.getcode(), None except urllib.error.HTTPError as e: try: raw = e.read().decode(errors="replace") return e.code, json.loads(raw) except Exception: return e.code, None except Exception: return 0, None def retry_http_get(url, headers=None, expect_status=200, max_wait=90, interval=10, timeout=15): """GET with retries until expected status. Returns (status, json).""" result = [None, None] def _check(): s, j = http_get(url, headers=headers, timeout=timeout) result[0], result[1] = s, j return s == expect_status assert_converges(_check, f"GET {url} -> {expect_status}", max_wait, interval) return result[0], result[1] def retry_http_post(url, data=None, headers=None, content_type="application/json", expect_fn=None, max_wait=90, interval=10, timeout=15): """POST with retries until expect_fn(status, json) returns truthy. If expect_fn is None, succeeds on any 2xx. Returns (status, json). """ if expect_fn is None: expect_fn = lambda s, j: 200 <= s < 300 result = [None, None] def _check(): s, j = http_post(url, data=data, headers=headers, content_type=content_type, timeout=timeout) result[0], result[1] = s, j return expect_fn(s, j) assert_converges(_check, f"POST {url}", max_wait, interval) return result[0], result[1]