Sanitized single-commit public mirror of recipe-maintainer. - Removed test-ssh/.testenv (live creds); added test-ssh/.testenv.example placeholders. - Removed plans/ and planned-updates/ (deployment-planning docs) so no client/ deployment domains appear in the public repo. - All other secret stores were already gitignored. - docs.coopcloud.tech retained as a submodule (public upstream).
430 lines
15 KiB
Python
430 lines
15 KiB
Python
"""Shared test helpers for SSO end-to-end tests.
|
|
|
|
Provides:
|
|
- abra command runner with TTY wrapper support
|
|
- HTTP helpers with retry/convergence support
|
|
- assert_converges: retry a callable until it returns truthy or timeout
|
|
- wait_for_http: poll a URL until it responds
|
|
- resolve_instance / resolve_domain / resolve_server: read from settings.toml
|
|
- load_toml_credentials: load TOML credential files
|
|
|
|
Every subprocess and HTTP call has a hard timeout to prevent hangs.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import tomllib
|
|
import urllib.error
|
|
import urllib.request
|
|
import urllib.parse
|
|
|
|
WORKSPACE = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Settings / instance resolution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _load_settings():
|
|
"""Load and return the parsed settings.toml."""
|
|
settings_path = os.path.join(WORKSPACE, "settings.toml")
|
|
with open(settings_path, 'rb') as f:
|
|
return tomllib.load(f)
|
|
|
|
|
|
def resolve_instance():
|
|
"""Read the default instance name from settings.toml."""
|
|
settings = _load_settings()
|
|
return settings["default_instance"]
|
|
|
|
|
|
def resolve_domain(recipe):
|
|
"""Get the domain for a recipe on the active instance."""
|
|
settings = _load_settings()
|
|
instance = settings["default_instance"]
|
|
suffix = settings["instances"][instance]["domain_suffix"]
|
|
return f"{recipe}.{suffix}"
|
|
|
|
|
|
def resolve_server():
|
|
"""Get the server for the active instance."""
|
|
settings = _load_settings()
|
|
instance = settings["default_instance"]
|
|
return settings["instances"][instance]["server"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Credential loading
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def resolve_domain_suffix():
|
|
"""Get the domain suffix for the active instance."""
|
|
settings = _load_settings()
|
|
instance = settings["default_instance"]
|
|
return settings["instances"][instance]["domain_suffix"]
|
|
|
|
|
|
def load_toml_credentials(recipe_dir, provider):
|
|
"""Load credentials from recipe-info/<recipe>/<provider>-test-credentials.<suffix>.toml.
|
|
|
|
The domain suffix is auto-resolved from settings.toml.
|
|
|
|
recipe_dir: absolute path to the recipe-info/<recipe> directory
|
|
provider: credential provider name (e.g. 'keycloak', 'authentik')
|
|
Returns: dict of credentials, or None if file doesn't exist.
|
|
"""
|
|
suffix = resolve_domain_suffix()
|
|
path = os.path.join(recipe_dir, f"{provider}-test-credentials.{suffix}.toml")
|
|
if not os.path.exists(path):
|
|
return None
|
|
with open(path, 'rb') as f:
|
|
return tomllib.load(f)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shell / abra command helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def run(cmd, check=True, timeout=120):
|
|
"""Run a shell command with a hard timeout.
|
|
|
|
Uses the Linux `timeout` command to guarantee the process tree is
|
|
killed after `timeout` seconds, even if the process ignores signals.
|
|
subprocess.run gets a slightly longer timeout as a fallback.
|
|
"""
|
|
# Wrap with Linux timeout --kill-after to hard-kill the entire process
|
|
wrapped = f"timeout --kill-after=5 {timeout} {cmd}"
|
|
print(f" $ {cmd}", flush=True)
|
|
try:
|
|
result = subprocess.run(
|
|
wrapped, shell=True, capture_output=True, text=True,
|
|
timeout=timeout + 15, # fallback: kill subprocess if Linux timeout fails
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
print(f" TIMEOUT after {timeout}s (subprocess fallback)", flush=True)
|
|
raise RuntimeError(f"Command timed out after {timeout}s: {cmd}")
|
|
if result.stdout.strip():
|
|
for line in result.stdout.strip().split("\n"):
|
|
print(f" {line}", flush=True)
|
|
if result.returncode != 0:
|
|
if result.stderr.strip():
|
|
for line in result.stderr.strip().split("\n"):
|
|
print(f" stderr: {line}", flush=True)
|
|
# exit code 124 = Linux timeout killed it
|
|
if result.returncode == 124:
|
|
print(f" TIMEOUT after {timeout}s", flush=True)
|
|
if check:
|
|
raise RuntimeError(f"Command timed out after {timeout}s: {cmd}")
|
|
elif check:
|
|
raise RuntimeError(
|
|
f"Command failed (exit {result.returncode}): {cmd}"
|
|
)
|
|
return result
|
|
|
|
|
|
def abra(args, tty_wrap=False, check=True, timeout=120):
|
|
"""Run an abra command, optionally with TTY wrapper."""
|
|
cmd = f"abra {args}"
|
|
if tty_wrap:
|
|
cmd = f'script -qefc "{cmd}" /dev/null 2>&1'
|
|
return run(cmd, check=check, timeout=timeout)
|
|
|
|
|
|
def fresh_app(recipe, server, domain, preset_secrets=None,
|
|
env_overrides=None):
|
|
"""Create a fresh app instance, cleaning up any leftovers first.
|
|
|
|
Undeploys, removes Docker secrets, volumes, and env file from previous
|
|
runs, then runs abra app new, applies env_overrides, inserts any
|
|
preset_secrets, and generates the rest.
|
|
|
|
preset_secrets: dict of {secret_name: value} to insert before
|
|
generating remaining secrets (e.g. {"admin_token": "..."}).
|
|
env_overrides: dict of {KEY: value} to set/uncomment in the .env file
|
|
after app new (e.g. {"COMPOSE_FILE": "compose.yml:compose.sso.yml"}).
|
|
"""
|
|
env_path = os.path.expanduser(
|
|
f"~/.abra/servers/{server}/{domain}.env"
|
|
)
|
|
|
|
# Undeploy if still running from a previous run
|
|
abra(f"app undeploy {domain} --no-input", check=False, timeout=60)
|
|
|
|
# Remove leftover volumes so the DB starts fresh (avoids password mismatch
|
|
# when secrets are regenerated but the old DB volume persists)
|
|
abra(f"app volume remove {domain} --force --no-input",
|
|
check=False, timeout=60)
|
|
|
|
# Remove leftover Docker secrets from the server
|
|
if os.path.exists(env_path):
|
|
abra(f"app secret remove {domain} --all --chaos --no-input",
|
|
tty_wrap=True, check=False, timeout=60)
|
|
|
|
# Remove leftover env file so app new succeeds
|
|
if os.path.exists(env_path):
|
|
print(f" Removing leftover env: {env_path}", flush=True)
|
|
os.remove(env_path)
|
|
|
|
abra(f"app new {recipe} --server {server} --domain {domain} --chaos --no-input",
|
|
timeout=60)
|
|
|
|
# Apply env overrides to the generated .env file
|
|
if env_overrides:
|
|
_apply_env_overrides(env_path, env_overrides)
|
|
|
|
# Insert preset secrets before generate so they use our known values
|
|
for name, value in (preset_secrets or {}).items():
|
|
abra(f"app secret insert {domain} {name} v1 {value} --chaos --no-input",
|
|
tty_wrap=True, check=False, timeout=60)
|
|
|
|
# Generate remaining secrets (check=False: warns if some already exist)
|
|
abra(f"app secret generate {domain} --all --chaos --no-input",
|
|
tty_wrap=True, check=False, timeout=60)
|
|
|
|
|
|
def _apply_env_overrides(env_path, overrides):
|
|
"""Set or uncomment values in an abra .env file."""
|
|
with open(env_path) as f:
|
|
lines = f.readlines()
|
|
|
|
remaining = dict(overrides)
|
|
new_lines = []
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
matched = False
|
|
for key, value in list(remaining.items()):
|
|
# Match "KEY=...", "#KEY=...", or "# KEY=..."
|
|
if stripped.lstrip("#").strip().startswith(f"{key}="):
|
|
new_lines.append(f"{key}={value}\n")
|
|
remaining.pop(key)
|
|
matched = True
|
|
print(f" env: {key}={value}", flush=True)
|
|
break
|
|
if not matched:
|
|
new_lines.append(line)
|
|
|
|
# Append any keys that weren't found in the file
|
|
for key, value in remaining.items():
|
|
new_lines.append(f"{key}={value}\n")
|
|
print(f" env: {key}={value} (appended)", flush=True)
|
|
|
|
with open(env_path, "w") as f:
|
|
f.writelines(new_lines)
|
|
|
|
|
|
def deploy_and_wait(domain, server, url, label,
|
|
deploy_timeout=60, wait_max=300):
|
|
"""Fire off an abra deploy and then poll until all services are ready.
|
|
|
|
The deploy command sends the stack to Docker Swarm quickly, but
|
|
post-deploy hooks (set_admin_pass etc.) can hang for minutes.
|
|
We give the deploy command a short timeout — if it times out,
|
|
the deploy has already been submitted to Swarm.
|
|
|
|
Then we poll via SSH + `docker service ls` until all services
|
|
show full replicas (e.g. 1/1), followed by an HTTP check to
|
|
confirm the app is actually serving requests.
|
|
"""
|
|
print(f" Deploying {domain} (fire-and-poll) ...", flush=True)
|
|
abra(f"app deploy {domain} --chaos --force --no-input",
|
|
timeout=deploy_timeout, check=False)
|
|
|
|
# The Docker stack name is the domain with dots replaced by underscores
|
|
stack_prefix = domain.replace(".", "_")
|
|
|
|
# Poll replicas via SSH + docker service ls
|
|
def _all_replicas_ready():
|
|
result = run(
|
|
f"ssh {server} \"docker service ls"
|
|
f" --filter 'name={stack_prefix}'"
|
|
f" --format '{{{{.Replicas}}}}'\"",
|
|
check=False, timeout=30,
|
|
)
|
|
if result.returncode != 0:
|
|
return None
|
|
lines = [l.strip() for l in result.stdout.strip().split("\n") if l.strip()]
|
|
if not lines:
|
|
return None
|
|
for replicas in lines:
|
|
# Format is "1/1" — desired/running must match
|
|
parts = replicas.split("/")
|
|
if len(parts) != 2:
|
|
return None
|
|
if parts[0] != parts[1]:
|
|
return None
|
|
return True
|
|
|
|
assert_converges(
|
|
_all_replicas_ready,
|
|
f"{label} all replicas ready (docker service ls)",
|
|
max_wait=wait_max,
|
|
interval=15,
|
|
)
|
|
|
|
# HTTP check to confirm the app is actually serving
|
|
return wait_for_http(url, label, max_wait=120)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Convergence helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def assert_converges(fn, description, max_wait=120, interval=10):
|
|
"""Retry fn() until it returns a truthy value or we time out.
|
|
|
|
fn() should return a truthy value on success, or raise / return falsy
|
|
on failure. The last return value or exception is reported on timeout.
|
|
|
|
Returns the truthy value on success.
|
|
"""
|
|
print(f" Waiting for: {description} (up to {max_wait}s) ...", flush=True)
|
|
deadline = time.time() + max_wait
|
|
last_error = None
|
|
last_result = None
|
|
attempts = 0
|
|
|
|
while time.time() < deadline:
|
|
attempts += 1
|
|
try:
|
|
result = fn()
|
|
if result:
|
|
print(
|
|
f" Converged after ~{int(time.time() - (deadline - max_wait))}s"
|
|
f" ({attempts} attempts)",
|
|
flush=True,
|
|
)
|
|
return result
|
|
last_result = result
|
|
except Exception as e:
|
|
last_error = e
|
|
time.sleep(interval)
|
|
|
|
# Timed out
|
|
detail = ""
|
|
if last_error:
|
|
detail = f" Last error: {last_error}"
|
|
elif last_result is not None:
|
|
detail = f" Last result: {last_result}"
|
|
raise RuntimeError(
|
|
f"Did not converge: {description} after {max_wait}s"
|
|
f" ({attempts} attempts).{detail}"
|
|
)
|
|
|
|
|
|
def wait_for_http(url, label, max_wait=300, interval=10):
|
|
"""Poll a URL until it returns a non-5xx response. Raises on timeout."""
|
|
def _check():
|
|
try:
|
|
req = urllib.request.Request(url, method="GET")
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
code = resp.getcode()
|
|
if 200 <= code < 500:
|
|
return code
|
|
except urllib.error.HTTPError as e:
|
|
if e.code < 500:
|
|
return e.code
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
code = assert_converges(_check, f"{label} responding at {url}", max_wait, interval)
|
|
print(f" {label} is up (HTTP {code})", flush=True)
|
|
return code
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# HTTP helpers with retry
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def http_get(url, headers=None, timeout=15):
|
|
"""GET a URL, return (status_code, parsed_json_or_None).
|
|
|
|
Does NOT retry — use retry_http_get or assert_converges for that.
|
|
"""
|
|
req = urllib.request.Request(url, method="GET")
|
|
for k, v in (headers or {}).items():
|
|
req.add_header(k, v)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
raw = resp.read()
|
|
try:
|
|
return resp.getcode(), json.loads(raw)
|
|
except (json.JSONDecodeError, ValueError):
|
|
return resp.getcode(), None
|
|
except urllib.error.HTTPError as e:
|
|
try:
|
|
raw = e.read().decode(errors="replace")
|
|
return e.code, json.loads(raw)
|
|
except Exception:
|
|
return e.code, None
|
|
except Exception:
|
|
return 0, None
|
|
|
|
|
|
def http_post(url, data=None, headers=None, content_type="application/json",
|
|
timeout=15):
|
|
"""POST to a URL, return (status_code, parsed_json_or_None).
|
|
|
|
Does NOT retry — use assert_converges for that.
|
|
"""
|
|
if content_type == "application/json" and data is not None:
|
|
body = json.dumps(data).encode()
|
|
elif content_type == "application/x-www-form-urlencoded" and data is not None:
|
|
body = urllib.parse.urlencode(data).encode()
|
|
else:
|
|
body = None
|
|
req = urllib.request.Request(url, data=body, method="POST")
|
|
req.add_header("Content-Type", content_type)
|
|
for k, v in (headers or {}).items():
|
|
req.add_header(k, v)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
raw = resp.read()
|
|
try:
|
|
return resp.getcode(), json.loads(raw)
|
|
except (json.JSONDecodeError, ValueError):
|
|
return resp.getcode(), None
|
|
except urllib.error.HTTPError as e:
|
|
try:
|
|
raw = e.read().decode(errors="replace")
|
|
return e.code, json.loads(raw)
|
|
except Exception:
|
|
return e.code, None
|
|
except Exception:
|
|
return 0, None
|
|
|
|
|
|
def retry_http_get(url, headers=None, expect_status=200, max_wait=90,
|
|
interval=10, timeout=15):
|
|
"""GET with retries until expected status. Returns (status, json)."""
|
|
result = [None, None]
|
|
def _check():
|
|
s, j = http_get(url, headers=headers, timeout=timeout)
|
|
result[0], result[1] = s, j
|
|
return s == expect_status
|
|
assert_converges(_check, f"GET {url} -> {expect_status}", max_wait, interval)
|
|
return result[0], result[1]
|
|
|
|
|
|
def retry_http_post(url, data=None, headers=None,
|
|
content_type="application/json", expect_fn=None,
|
|
max_wait=90, interval=10, timeout=15):
|
|
"""POST with retries until expect_fn(status, json) returns truthy.
|
|
|
|
If expect_fn is None, succeeds on any 2xx.
|
|
Returns (status, json).
|
|
"""
|
|
if expect_fn is None:
|
|
expect_fn = lambda s, j: 200 <= s < 300
|
|
result = [None, None]
|
|
def _check():
|
|
s, j = http_post(url, data=data, headers=headers,
|
|
content_type=content_type, timeout=timeout)
|
|
result[0], result[1] = s, j
|
|
return expect_fn(s, j)
|
|
assert_converges(_check, f"POST {url}", max_wait, interval)
|
|
return result[0], result[1]
|