"""HTTP convergence + JSON helpers for Phase-2 recipe tests (plan §4.2). Vendored from `references/recipe-maintainer/utils/tests/helpers.py` and adapted to the cc-ci harness (self-contained at runtime per DECISIONS Phase-2). The lifecycle / generic modules already have infra-level `http_get`/`http_fetch`/`http_body` (status + body) for the recipe-agnostic serving assertion; this module is the **canonical recipe-test API** — same shape as recipe-maintainer's helpers so parity-port tests read 1:1, but ignores TLS hostname/chain because per-run domains use the wildcard cert served via Traefik file provider (verified-by-real-cert is `generic.served_cert`, done once in `assert_serving`). Functions: http_get(url, headers=, timeout=) -> (status, json_or_None) http_post(url, data=, headers=, content_type=, timeout=) -> (status, json_or_None) retry_http_get(url, expect_status=200, max_wait=90, interval=10, ...) -> (status, json) retry_http_post(url, expect_fn=..., max_wait=90, interval=10, ...) -> (status, json) wait_for_http(url, label, max_wait=300, interval=10) -> int (polls until non-5xx) assert_converges(fn, description, max_wait=120, interval=10) -> truthy fn() result on success """ from __future__ import annotations import json import ssl import time import urllib.error import urllib.parse import urllib.request # Phase-2 tests hit per-run *.ci.commoninternet.net domains; Traefik file-provider serves the # operator's pre-issued wildcard cert. `generic.served_cert` does the real-cert sanity check once in # the install assertion; for content/API checks we don't want every assertion to re-verify the chain. _CTX = ssl.create_default_context() _CTX.check_hostname = False _CTX.verify_mode = ssl.CERT_NONE def _parse_body(raw: bytes) -> object | None: try: return json.loads(raw) except (json.JSONDecodeError, ValueError): return None def http_get( url: str, headers: dict[str, str] | None = None, timeout: int = 15 ) -> tuple[int, object | None]: """GET a URL, return (status, parsed_json_or_None). status=0 on transport failure. No retry — use retry_http_get or assert_converges for that.""" req = urllib.request.Request(url, method="GET") for k, v in (headers or {}).items(): req.add_header(k, v) try: with urllib.request.urlopen(req, timeout=timeout, context=_CTX) as resp: return resp.getcode(), _parse_body(resp.read()) except urllib.error.HTTPError as e: try: return e.code, _parse_body(e.read()) except Exception: # noqa: BLE001 return e.code, None except Exception: # noqa: BLE001 — transport-level (DNS, connect, TLS): caller polls return 0, None def http_post( url: str, data: dict | None = None, headers: dict[str, str] | None = None, content_type: str = "application/json", timeout: int = 15, ) -> tuple[int, object | None]: """POST to a URL, return (status, parsed_json_or_None). `data` is JSON-encoded if content_type='application/json', form-encoded if 'application/x-www-form-urlencoded' (the OIDC token endpoint form), or sent raw bytes if data is already bytes.""" if isinstance(data, (bytes, bytearray)): body: bytes | None = bytes(data) elif content_type == "application/json" and data is not None: body = json.dumps(data).encode() elif content_type == "application/x-www-form-urlencoded" and data is not None: body = urllib.parse.urlencode(data).encode() else: body = None req = urllib.request.Request(url, data=body, method="POST") req.add_header("Content-Type", content_type) for k, v in (headers or {}).items(): req.add_header(k, v) try: with urllib.request.urlopen(req, timeout=timeout, context=_CTX) as resp: return resp.getcode(), _parse_body(resp.read()) except urllib.error.HTTPError as e: try: return e.code, _parse_body(e.read()) except Exception: # noqa: BLE001 return e.code, None except Exception: # noqa: BLE001 return 0, None def http_request( method: str, url: str, data: dict | bytes | None = None, headers: dict[str, str] | None = None, content_type: str = "application/json", timeout: int = 15, ) -> tuple[int, object | None]: """Arbitrary-method HTTP (PUT/DELETE/PATCH) for parity tests that mutate. Same shape as http_post (returns (status, json_or_None)).""" if isinstance(data, (bytes, bytearray)): body: bytes | None = bytes(data) elif content_type == "application/json" and data is not None: body = json.dumps(data).encode() elif content_type == "application/x-www-form-urlencoded" and data is not None: body = urllib.parse.urlencode(data).encode() else: body = None req = urllib.request.Request(url, data=body, method=method.upper()) if body is not None: req.add_header("Content-Type", content_type) for k, v in (headers or {}).items(): req.add_header(k, v) try: with urllib.request.urlopen(req, timeout=timeout, context=_CTX) as resp: return resp.getcode(), _parse_body(resp.read()) except urllib.error.HTTPError as e: try: return e.code, _parse_body(e.read()) except Exception: # noqa: BLE001 return e.code, None except Exception: # noqa: BLE001 return 0, None def post_with_headers( url: str, data: dict | bytes | None = None, headers: dict[str, str] | None = None, content_type: str = "application/json", timeout: int = 15, ) -> tuple[int, object | None, dict[str, str]]: """Like http_post but ALSO returns the response headers as a dict — for APIs that hand back an auth token in a response header rather than the body (e.g. mattermost login → `Token` header). Returns (status, parsed_json_or_None, response_headers). status=0 + {} on transport failure.""" if isinstance(data, (bytes, bytearray)): body: bytes | None = bytes(data) elif content_type == "application/json" and data is not None: body = json.dumps(data).encode() elif content_type == "application/x-www-form-urlencoded" and data is not None: body = urllib.parse.urlencode(data).encode() else: body = None req = urllib.request.Request(url, data=body, method="POST") req.add_header("Content-Type", content_type) for k, v in (headers or {}).items(): req.add_header(k, v) try: with urllib.request.urlopen(req, timeout=timeout, context=_CTX) as resp: return resp.getcode(), _parse_body(resp.read()), dict(resp.headers) except urllib.error.HTTPError as e: try: return e.code, _parse_body(e.read()), dict(e.headers or {}) except Exception: # noqa: BLE001 return e.code, None, dict(getattr(e, "headers", {}) or {}) except Exception: # noqa: BLE001 return 0, None, {} def assert_converges( fn, description: str, max_wait: int = 120, interval: int = 10, ): """Retry fn() until it returns a truthy value or we time out. Returns the truthy value on success (so callers can keep it). Raises RuntimeError with last-seen error/result on timeout. fn() should return a truthy value on success, or raise / return falsy on failure.""" deadline = time.time() + max_wait last_error = None last_result = None attempts = 0 while time.time() < deadline: attempts += 1 try: result = fn() if result: return result last_result = result except Exception as e: # noqa: BLE001 — caller cares about timeout, not interim raises last_error = e time.sleep(interval) detail = "" if last_error: detail = f" Last error: {last_error}" elif last_result is not None: detail = f" Last result: {last_result}" raise RuntimeError( f"Did not converge: {description} after {max_wait}s ({attempts} attempts).{detail}" ) def wait_for_http( url: str, label: str = "", max_wait: int = 300, interval: int = 10, ) -> int: """Poll a URL until it returns a non-5xx response (any 2xx/3xx/4xx). Returns the status. Raises RuntimeError on timeout. Use for "wait until the app is answering" — for "wait until a specific resource is available", use retry_http_get with expect_status.""" def _check(): status, _ = http_get(url) if status and status < 500: return status return None return assert_converges(_check, f"{label or url} responding", max_wait, interval) def retry_http_get( url: str, headers: dict[str, str] | None = None, expect_status: int | tuple[int, ...] = 200, max_wait: int = 90, interval: int = 10, timeout: int = 15, ) -> tuple[int, object | None]: """GET with retry until status matches expect_status (int or tuple). Returns (status, json).""" expect = (expect_status,) if isinstance(expect_status, int) else tuple(expect_status) result: list[tuple[int, object | None]] = [(0, None)] def _check(): s, j = http_get(url, headers=headers, timeout=timeout) result[0] = (s, j) return s in expect assert_converges(_check, f"GET {url} -> {expect}", max_wait, interval) return result[0] def retry_http_post( url: str, data: dict | None = None, headers: dict[str, str] | None = None, content_type: str = "application/json", expect_fn=None, max_wait: int = 90, interval: int = 10, timeout: int = 15, ) -> tuple[int, object | None]: """POST with retry until expect_fn(status, json) is truthy. Defaults to any 2xx.""" if expect_fn is None: def expect_fn(s, _j): # noqa: ARG001 return 200 <= s < 300 result: list[tuple[int, object | None]] = [(0, None)] def _check(): s, j = http_post(url, data=data, headers=headers, content_type=content_type, timeout=timeout) result[0] = (s, j) return expect_fn(s, j) assert_converges(_check, f"POST {url}", max_wait, interval) return result[0]