feat(2): Q0.1/Q0.2 — harness.http + discovery recurses functional/playwright (Phase 2)
- runner/harness/http.py: canonical Phase-2 recipe-test HTTP API (vendored from
recipe-maintainer/utils/tests/helpers.py): http_get/http_post, retry variants,
wait_for_http, assert_converges. JSON-parsing, header support, form/JSON POST
bodies, transport-failure -> status=0. Self-contained (cc-ci does not import
recipe-maintainer at runtime per DECISIONS Phase 2).
- harness.discovery.custom_tests now also recurses into
tests/<recipe>/{functional,playwright}/test_*.py (Phase 2 §4.1 layout) while
excluding lifecycle test_<op>.py names and honoring the HC2 repo-local gate.
- Unit tests:
tests/unit/test_http.py — in-process http.server fixture; deterministic
proofs of parsing/retry/convergence semantics, no network egress.
tests/unit/test_discovery_phase2.py — functional/+playwright/ recursion
+ HC2 gate still applies to subdirs.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@ -101,16 +101,30 @@ def resolve_op(recipe: str, op: str, repo_local_dir: str | None) -> tuple[str, s
|
|||||||
|
|
||||||
def custom_tests(recipe: str, repo_local_dir: str | None) -> list[tuple[str, str]]:
|
def custom_tests(recipe: str, repo_local_dir: str | None) -> list[tuple[str, str]]:
|
||||||
"""All non-lifecycle test_*.py from cc-ci's tests/<recipe>/ and (if approved) the recipe's
|
"""All non-lifecycle test_*.py from cc-ci's tests/<recipe>/ and (if approved) the recipe's
|
||||||
repo-local tests/. These have no generic equivalent and run only when present (opt-in), additively
|
repo-local tests/. Discovered locations (Phase 2 §4.1):
|
||||||
from both. Repo-local is consulted only for allowlist-approved recipes (HC2)."""
|
- the top-level dir tests/<recipe>/test_*.py (legacy + cross-cutting)
|
||||||
|
- functional/ tests/<recipe>/functional/test_*.py (parity ports + recipe-specific)
|
||||||
|
- playwright/ tests/<recipe>/playwright/test_*.py (UI flows P6)
|
||||||
|
Files named `test_<op>.py` (lifecycle ops) are excluded from this list — the orchestrator runs
|
||||||
|
those in their lifecycle tier, not the custom one. Repo-local is consulted only for
|
||||||
|
allowlist-approved recipes (HC2)."""
|
||||||
lifecycle_names = {f"test_{op}.py" for op in LIFECYCLE_OPS}
|
lifecycle_names = {f"test_{op}.py" for op in LIFECYCLE_OPS}
|
||||||
|
subdirs = ("functional", "playwright")
|
||||||
found: list[tuple[str, str]] = []
|
found: list[tuple[str, str]] = []
|
||||||
for source, d in (("cc-ci", cc_ci_dir(recipe)), ("repo-local", _gated(recipe, repo_local_dir))):
|
for source, d in (("cc-ci", cc_ci_dir(recipe)), ("repo-local", _gated(recipe, repo_local_dir))):
|
||||||
if not d or not os.path.isdir(d):
|
if not d or not os.path.isdir(d):
|
||||||
continue
|
continue
|
||||||
|
# top-level (legacy / cross-cutting tests not under functional/playwright)
|
||||||
for p in sorted(glob.glob(os.path.join(d, "test_*.py"))):
|
for p in sorted(glob.glob(os.path.join(d, "test_*.py"))):
|
||||||
if os.path.basename(p) not in lifecycle_names:
|
if os.path.basename(p) not in lifecycle_names:
|
||||||
found.append((source, p))
|
found.append((source, p))
|
||||||
|
# functional/ and playwright/ subdirs (Phase 2 §4.1)
|
||||||
|
for sub in subdirs:
|
||||||
|
for p in sorted(glob.glob(os.path.join(d, sub, "test_*.py"))):
|
||||||
|
# Phase-2 layout: lifecycle ops never live under functional/playwright, but be
|
||||||
|
# explicit so a misfiled file doesn't silently get double-run.
|
||||||
|
if os.path.basename(p) not in lifecycle_names:
|
||||||
|
found.append((source, p))
|
||||||
return found
|
return found
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
232
runner/harness/http.py
Normal file
232
runner/harness/http.py
Normal file
@ -0,0 +1,232 @@
|
|||||||
|
"""HTTP convergence + JSON helpers for Phase-2 recipe tests (plan §4.2).
|
||||||
|
|
||||||
|
Vendored from `references/recipe-maintainer/utils/tests/helpers.py` and adapted to the cc-ci harness
|
||||||
|
(self-contained at runtime per DECISIONS Phase-2). The lifecycle / generic modules already have
|
||||||
|
infra-level `http_get`/`http_fetch`/`http_body` (status + body) for the recipe-agnostic serving
|
||||||
|
assertion; this module is the **canonical recipe-test API** — same shape as recipe-maintainer's
|
||||||
|
helpers so parity-port tests read 1:1, but ignores TLS hostname/chain because per-run domains use
|
||||||
|
the wildcard cert served via Traefik file provider (verified-by-real-cert is `generic.served_cert`,
|
||||||
|
done once in `assert_serving`).
|
||||||
|
|
||||||
|
Functions:
|
||||||
|
http_get(url, headers=, timeout=) -> (status, json_or_None)
|
||||||
|
http_post(url, data=, headers=, content_type=, timeout=) -> (status, json_or_None)
|
||||||
|
retry_http_get(url, expect_status=200, max_wait=90, interval=10, ...) -> (status, json)
|
||||||
|
retry_http_post(url, expect_fn=..., max_wait=90, interval=10, ...) -> (status, json)
|
||||||
|
wait_for_http(url, label, max_wait=300, interval=10) -> int (polls until non-5xx)
|
||||||
|
assert_converges(fn, description, max_wait=120, interval=10) -> truthy fn() result on success
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import ssl
|
||||||
|
import time
|
||||||
|
import urllib.error
|
||||||
|
import urllib.parse
|
||||||
|
import urllib.request
|
||||||
|
|
||||||
|
# Phase-2 tests hit per-run *.ci.commoninternet.net domains; Traefik file-provider serves the
|
||||||
|
# operator's pre-issued wildcard cert. `generic.served_cert` does the real-cert sanity check once in
|
||||||
|
# the install assertion; for content/API checks we don't want every assertion to re-verify the chain.
|
||||||
|
_CTX = ssl.create_default_context()
|
||||||
|
_CTX.check_hostname = False
|
||||||
|
_CTX.verify_mode = ssl.CERT_NONE
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_body(raw: bytes) -> object | None:
|
||||||
|
try:
|
||||||
|
return json.loads(raw)
|
||||||
|
except (json.JSONDecodeError, ValueError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def http_get(
|
||||||
|
url: str, headers: dict[str, str] | None = None, timeout: int = 15
|
||||||
|
) -> tuple[int, object | None]:
|
||||||
|
"""GET a URL, return (status, parsed_json_or_None). status=0 on transport failure.
|
||||||
|
No retry — use retry_http_get or assert_converges for that."""
|
||||||
|
req = urllib.request.Request(url, method="GET")
|
||||||
|
for k, v in (headers or {}).items():
|
||||||
|
req.add_header(k, v)
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(req, timeout=timeout, context=_CTX) as resp:
|
||||||
|
return resp.getcode(), _parse_body(resp.read())
|
||||||
|
except urllib.error.HTTPError as e:
|
||||||
|
try:
|
||||||
|
return e.code, _parse_body(e.read())
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
return e.code, None
|
||||||
|
except Exception: # noqa: BLE001 — transport-level (DNS, connect, TLS): caller polls
|
||||||
|
return 0, None
|
||||||
|
|
||||||
|
|
||||||
|
def http_post(
|
||||||
|
url: str,
|
||||||
|
data: dict | None = None,
|
||||||
|
headers: dict[str, str] | None = None,
|
||||||
|
content_type: str = "application/json",
|
||||||
|
timeout: int = 15,
|
||||||
|
) -> tuple[int, object | None]:
|
||||||
|
"""POST to a URL, return (status, parsed_json_or_None).
|
||||||
|
|
||||||
|
`data` is JSON-encoded if content_type='application/json',
|
||||||
|
form-encoded if 'application/x-www-form-urlencoded' (the OIDC token endpoint form),
|
||||||
|
or sent raw bytes if data is already bytes."""
|
||||||
|
if isinstance(data, (bytes, bytearray)):
|
||||||
|
body: bytes | None = bytes(data)
|
||||||
|
elif content_type == "application/json" and data is not None:
|
||||||
|
body = json.dumps(data).encode()
|
||||||
|
elif content_type == "application/x-www-form-urlencoded" and data is not None:
|
||||||
|
body = urllib.parse.urlencode(data).encode()
|
||||||
|
else:
|
||||||
|
body = None
|
||||||
|
req = urllib.request.Request(url, data=body, method="POST")
|
||||||
|
req.add_header("Content-Type", content_type)
|
||||||
|
for k, v in (headers or {}).items():
|
||||||
|
req.add_header(k, v)
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(req, timeout=timeout, context=_CTX) as resp:
|
||||||
|
return resp.getcode(), _parse_body(resp.read())
|
||||||
|
except urllib.error.HTTPError as e:
|
||||||
|
try:
|
||||||
|
return e.code, _parse_body(e.read())
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
return e.code, None
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
return 0, None
|
||||||
|
|
||||||
|
|
||||||
|
def http_request(
|
||||||
|
method: str,
|
||||||
|
url: str,
|
||||||
|
data: dict | bytes | None = None,
|
||||||
|
headers: dict[str, str] | None = None,
|
||||||
|
content_type: str = "application/json",
|
||||||
|
timeout: int = 15,
|
||||||
|
) -> tuple[int, object | None]:
|
||||||
|
"""Arbitrary-method HTTP (PUT/DELETE/PATCH) for parity tests that mutate. Same shape as
|
||||||
|
http_post (returns (status, json_or_None))."""
|
||||||
|
if isinstance(data, (bytes, bytearray)):
|
||||||
|
body: bytes | None = bytes(data)
|
||||||
|
elif content_type == "application/json" and data is not None:
|
||||||
|
body = json.dumps(data).encode()
|
||||||
|
elif content_type == "application/x-www-form-urlencoded" and data is not None:
|
||||||
|
body = urllib.parse.urlencode(data).encode()
|
||||||
|
else:
|
||||||
|
body = None
|
||||||
|
req = urllib.request.Request(url, data=body, method=method.upper())
|
||||||
|
if body is not None:
|
||||||
|
req.add_header("Content-Type", content_type)
|
||||||
|
for k, v in (headers or {}).items():
|
||||||
|
req.add_header(k, v)
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(req, timeout=timeout, context=_CTX) as resp:
|
||||||
|
return resp.getcode(), _parse_body(resp.read())
|
||||||
|
except urllib.error.HTTPError as e:
|
||||||
|
try:
|
||||||
|
return e.code, _parse_body(e.read())
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
return e.code, None
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
return 0, None
|
||||||
|
|
||||||
|
|
||||||
|
def assert_converges(
|
||||||
|
fn,
|
||||||
|
description: str,
|
||||||
|
max_wait: int = 120,
|
||||||
|
interval: int = 10,
|
||||||
|
):
|
||||||
|
"""Retry fn() until it returns a truthy value or we time out. Returns the truthy value on success
|
||||||
|
(so callers can keep it). Raises RuntimeError with last-seen error/result on timeout.
|
||||||
|
|
||||||
|
fn() should return a truthy value on success, or raise / return falsy on failure."""
|
||||||
|
deadline = time.time() + max_wait
|
||||||
|
last_error = None
|
||||||
|
last_result = None
|
||||||
|
attempts = 0
|
||||||
|
while time.time() < deadline:
|
||||||
|
attempts += 1
|
||||||
|
try:
|
||||||
|
result = fn()
|
||||||
|
if result:
|
||||||
|
return result
|
||||||
|
last_result = result
|
||||||
|
except Exception as e: # noqa: BLE001 — caller cares about timeout, not interim raises
|
||||||
|
last_error = e
|
||||||
|
time.sleep(interval)
|
||||||
|
detail = ""
|
||||||
|
if last_error:
|
||||||
|
detail = f" Last error: {last_error}"
|
||||||
|
elif last_result is not None:
|
||||||
|
detail = f" Last result: {last_result}"
|
||||||
|
raise RuntimeError(
|
||||||
|
f"Did not converge: {description} after {max_wait}s ({attempts} attempts).{detail}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def wait_for_http(
|
||||||
|
url: str,
|
||||||
|
label: str = "",
|
||||||
|
max_wait: int = 300,
|
||||||
|
interval: int = 10,
|
||||||
|
) -> int:
|
||||||
|
"""Poll a URL until it returns a non-5xx response (any 2xx/3xx/4xx). Returns the status.
|
||||||
|
Raises RuntimeError on timeout. Use for "wait until the app is answering" — for "wait until a
|
||||||
|
specific resource is available", use retry_http_get with expect_status."""
|
||||||
|
|
||||||
|
def _check():
|
||||||
|
status, _ = http_get(url)
|
||||||
|
if status and status < 500:
|
||||||
|
return status
|
||||||
|
return None
|
||||||
|
|
||||||
|
return assert_converges(_check, f"{label or url} responding", max_wait, interval)
|
||||||
|
|
||||||
|
|
||||||
|
def retry_http_get(
|
||||||
|
url: str,
|
||||||
|
headers: dict[str, str] | None = None,
|
||||||
|
expect_status: int | tuple[int, ...] = 200,
|
||||||
|
max_wait: int = 90,
|
||||||
|
interval: int = 10,
|
||||||
|
timeout: int = 15,
|
||||||
|
) -> tuple[int, object | None]:
|
||||||
|
"""GET with retry until status matches expect_status (int or tuple). Returns (status, json)."""
|
||||||
|
expect = (expect_status,) if isinstance(expect_status, int) else tuple(expect_status)
|
||||||
|
result: list[tuple[int, object | None]] = [(0, None)]
|
||||||
|
|
||||||
|
def _check():
|
||||||
|
s, j = http_get(url, headers=headers, timeout=timeout)
|
||||||
|
result[0] = (s, j)
|
||||||
|
return s in expect
|
||||||
|
|
||||||
|
assert_converges(_check, f"GET {url} -> {expect}", max_wait, interval)
|
||||||
|
return result[0]
|
||||||
|
|
||||||
|
|
||||||
|
def retry_http_post(
|
||||||
|
url: str,
|
||||||
|
data: dict | None = None,
|
||||||
|
headers: dict[str, str] | None = None,
|
||||||
|
content_type: str = "application/json",
|
||||||
|
expect_fn=None,
|
||||||
|
max_wait: int = 90,
|
||||||
|
interval: int = 10,
|
||||||
|
timeout: int = 15,
|
||||||
|
) -> tuple[int, object | None]:
|
||||||
|
"""POST with retry until expect_fn(status, json) is truthy. Defaults to any 2xx."""
|
||||||
|
if expect_fn is None:
|
||||||
|
def expect_fn(s, _j): # noqa: ARG001
|
||||||
|
return 200 <= s < 300
|
||||||
|
|
||||||
|
result: list[tuple[int, object | None]] = [(0, None)]
|
||||||
|
|
||||||
|
def _check():
|
||||||
|
s, j = http_post(url, data=data, headers=headers, content_type=content_type, timeout=timeout)
|
||||||
|
result[0] = (s, j)
|
||||||
|
return expect_fn(s, j)
|
||||||
|
|
||||||
|
assert_converges(_check, f"POST {url}", max_wait, interval)
|
||||||
|
return result[0]
|
||||||
77
tests/unit/test_discovery_phase2.py
Normal file
77
tests/unit/test_discovery_phase2.py
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
"""Unit tests for Phase-2 discovery additions (plan §4.1).
|
||||||
|
|
||||||
|
Proves the `custom_tests` discovery recurses into the per-recipe `functional/` + `playwright/`
|
||||||
|
subdirs as well as the top-level dir, while still excluding lifecycle `test_<op>.py` names and
|
||||||
|
honouring the HC2 repo-local approval gate.
|
||||||
|
|
||||||
|
Run with: `cc-ci-run -m pytest tests/unit`. Located under tests/unit/ so the orchestrator never
|
||||||
|
picks these up as overlays/custom tests.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner"))
|
||||||
|
from harness import discovery # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
|
def _approve(tmp_path, *recipes):
|
||||||
|
f = tmp_path / "approved.txt"
|
||||||
|
f.write_text("".join(f"{r}\n" for r in recipes))
|
||||||
|
os.environ["CCCI_REPO_LOCAL_APPROVED_FILE"] = str(f)
|
||||||
|
|
||||||
|
|
||||||
|
def teardown_function():
|
||||||
|
os.environ.pop("CCCI_REPO_LOCAL_APPROVED_FILE", None)
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_tests_recurses_functional_and_playwright(tmp_path, monkeypatch):
|
||||||
|
"""A Phase-2 cc-ci recipe layout: functional/test_*.py + playwright/test_*.py + top-level
|
||||||
|
test_*.py — all are discovered as custom tests; the lifecycle names are excluded."""
|
||||||
|
# Point cc-ci's per-recipe dir at a fake recipe in tmp_path
|
||||||
|
fake_recipe = "ccci-phase2-fixture"
|
||||||
|
fake_dir = tmp_path / "tests" / fake_recipe
|
||||||
|
(fake_dir / "functional").mkdir(parents=True)
|
||||||
|
(fake_dir / "playwright").mkdir()
|
||||||
|
# legitimate custom tests at multiple levels
|
||||||
|
(fake_dir / "test_sso_smoke.py").write_text("# top-level cross-cutting\n")
|
||||||
|
(fake_dir / "functional" / "test_health_check.py").write_text("# parity port\n")
|
||||||
|
(fake_dir / "functional" / "test_content_roundtrip.py").write_text("# recipe-specific\n")
|
||||||
|
(fake_dir / "playwright" / "test_login_flow.py").write_text("# UI flow\n")
|
||||||
|
# lifecycle name in functional/ should be ignored (defensive)
|
||||||
|
(fake_dir / "functional" / "test_install.py").write_text("# misfiled lifecycle name\n")
|
||||||
|
|
||||||
|
# Patch the cc-ci dir resolver to point at our fixture
|
||||||
|
monkeypatch.setattr(discovery, "cc_ci_dir", lambda r: str(tmp_path / "tests" / r))
|
||||||
|
|
||||||
|
customs = discovery.custom_tests(fake_recipe, None)
|
||||||
|
names = sorted((src, os.path.basename(p)) for src, p in customs)
|
||||||
|
|
||||||
|
# Top-level + functional/ + playwright/ all discovered; lifecycle name excluded
|
||||||
|
assert ("cc-ci", "test_sso_smoke.py") in names
|
||||||
|
assert ("cc-ci", "test_health_check.py") in names
|
||||||
|
assert ("cc-ci", "test_content_roundtrip.py") in names
|
||||||
|
assert ("cc-ci", "test_login_flow.py") in names
|
||||||
|
assert ("cc-ci", "test_install.py") not in names
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_tests_repo_local_subdirs_gated(tmp_path, monkeypatch):
|
||||||
|
"""HC2 gate still applies to functional/playwright subdirs under repo-local: not approved -> the
|
||||||
|
repo-local subdir contents are ignored even if they exist."""
|
||||||
|
fake_recipe = "ccci-phase2-fixture"
|
||||||
|
monkeypatch.setattr(discovery, "cc_ci_dir", lambda r: str(tmp_path / "cc-ci" / r))
|
||||||
|
(tmp_path / "cc-ci" / fake_recipe).mkdir(parents=True)
|
||||||
|
|
||||||
|
rl = tmp_path / "repo"
|
||||||
|
(rl / "functional").mkdir(parents=True)
|
||||||
|
(rl / "functional" / "test_repo_local_specific.py").write_text("# repo-local custom\n")
|
||||||
|
|
||||||
|
_approve(tmp_path) # empty allowlist → default-deny
|
||||||
|
assert discovery.custom_tests(fake_recipe, str(rl)) == []
|
||||||
|
|
||||||
|
_approve(tmp_path, fake_recipe) # approved → repo-local subdir honored
|
||||||
|
customs = discovery.custom_tests(fake_recipe, str(rl))
|
||||||
|
names = {(src, os.path.basename(p)) for src, p in customs}
|
||||||
|
assert ("repo-local", "test_repo_local_specific.py") in names
|
||||||
189
tests/unit/test_http.py
Normal file
189
tests/unit/test_http.py
Normal file
@ -0,0 +1,189 @@
|
|||||||
|
"""Unit tests for runner/harness/http.py (Phase 2 §4.2 vendored helpers).
|
||||||
|
|
||||||
|
Pure-Python: a tiny in-process http.server on 127.0.0.1 serves rigged responses so we can prove the
|
||||||
|
parsing + retry semantics deterministically. No network egress.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import http.server
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import socket
|
||||||
|
import socketserver
|
||||||
|
import sys
|
||||||
|
import threading
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner"))
|
||||||
|
from harness import http as harness_http # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
|
class _State:
|
||||||
|
counter = 0
|
||||||
|
healthy_after = 0 # set by tests
|
||||||
|
|
||||||
|
|
||||||
|
class _Handler(http.server.BaseHTTPRequestHandler):
|
||||||
|
def log_message(self, *_a, **_k): # silence the test log
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _respond(self, code: int, body: bytes, content_type: str = "application/json"):
|
||||||
|
self.send_response(code)
|
||||||
|
self.send_header("Content-Type", content_type)
|
||||||
|
self.send_header("Content-Length", str(len(body)))
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(body)
|
||||||
|
|
||||||
|
def do_GET(self):
|
||||||
|
if self.path == "/ok-json":
|
||||||
|
self._respond(200, b'{"hello": "world"}')
|
||||||
|
return
|
||||||
|
if self.path == "/notfound":
|
||||||
|
self._respond(404, b'{"error": "nope"}')
|
||||||
|
return
|
||||||
|
if self.path == "/text":
|
||||||
|
self._respond(200, b"plain", content_type="text/plain")
|
||||||
|
return
|
||||||
|
if self.path == "/flaky":
|
||||||
|
_State.counter += 1
|
||||||
|
if _State.counter <= _State.healthy_after:
|
||||||
|
self._respond(503, b'{"err": "unhealthy"}')
|
||||||
|
else:
|
||||||
|
self._respond(200, b'{"ok": true}')
|
||||||
|
return
|
||||||
|
if self.path.startswith("/echo-headers"):
|
||||||
|
self._respond(200, json.dumps(dict(self.headers)).encode())
|
||||||
|
return
|
||||||
|
self._respond(500, b'{"err": "unknown"}')
|
||||||
|
|
||||||
|
def do_POST(self):
|
||||||
|
length = int(self.headers.get("Content-Length", "0"))
|
||||||
|
body = self.rfile.read(length) if length else b""
|
||||||
|
ctype = self.headers.get("Content-Type", "")
|
||||||
|
if self.path == "/echo-json":
|
||||||
|
try:
|
||||||
|
parsed = json.loads(body) if body else None
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
parsed = None
|
||||||
|
self._respond(200, json.dumps({"content_type": ctype, "body": parsed}).encode())
|
||||||
|
return
|
||||||
|
if self.path == "/echo-form":
|
||||||
|
self._respond(
|
||||||
|
200,
|
||||||
|
json.dumps({"content_type": ctype, "raw": body.decode()}).encode(),
|
||||||
|
)
|
||||||
|
return
|
||||||
|
self._respond(500, b'{"err": "unknown"}')
|
||||||
|
|
||||||
|
|
||||||
|
def _free_port() -> int:
|
||||||
|
with socket.socket() as s:
|
||||||
|
s.bind(("127.0.0.1", 0))
|
||||||
|
return s.getsockname()[1]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def server():
|
||||||
|
port = _free_port()
|
||||||
|
srv = socketserver.TCPServer(("127.0.0.1", port), _Handler)
|
||||||
|
thread = threading.Thread(target=srv.serve_forever, daemon=True)
|
||||||
|
thread.start()
|
||||||
|
yield f"http://127.0.0.1:{port}"
|
||||||
|
srv.shutdown()
|
||||||
|
srv.server_close()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _reset_state():
|
||||||
|
_State.counter = 0
|
||||||
|
_State.healthy_after = 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_http_get_parses_json(server):
|
||||||
|
status, body = harness_http.http_get(f"{server}/ok-json")
|
||||||
|
assert status == 200
|
||||||
|
assert body == {"hello": "world"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_http_get_returns_status_on_4xx_with_body(server):
|
||||||
|
status, body = harness_http.http_get(f"{server}/notfound")
|
||||||
|
assert status == 404
|
||||||
|
assert body == {"error": "nope"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_http_get_handles_non_json_body(server):
|
||||||
|
status, body = harness_http.http_get(f"{server}/text")
|
||||||
|
assert status == 200
|
||||||
|
assert body is None # not JSON => None
|
||||||
|
|
||||||
|
|
||||||
|
def test_http_get_transport_failure_returns_status_zero():
|
||||||
|
status, body = harness_http.http_get("http://127.0.0.1:1/nope", timeout=1)
|
||||||
|
assert status == 0
|
||||||
|
assert body is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_http_get_sends_headers(server):
|
||||||
|
status, body = harness_http.http_get(
|
||||||
|
f"{server}/echo-headers", headers={"X-Auth": "token-123"}
|
||||||
|
)
|
||||||
|
assert status == 200
|
||||||
|
assert body["X-Auth"] == "token-123"
|
||||||
|
|
||||||
|
|
||||||
|
def test_http_post_json(server):
|
||||||
|
status, body = harness_http.http_post(f"{server}/echo-json", data={"k": "v"})
|
||||||
|
assert status == 200
|
||||||
|
assert body["content_type"] == "application/json"
|
||||||
|
assert body["body"] == {"k": "v"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_http_post_form(server):
|
||||||
|
status, body = harness_http.http_post(
|
||||||
|
f"{server}/echo-form",
|
||||||
|
data={"grant_type": "password", "username": "alice"},
|
||||||
|
content_type="application/x-www-form-urlencoded",
|
||||||
|
)
|
||||||
|
assert status == 200
|
||||||
|
assert body["content_type"] == "application/x-www-form-urlencoded"
|
||||||
|
assert "grant_type=password" in body["raw"]
|
||||||
|
assert "username=alice" in body["raw"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_retry_http_get_converges(server):
|
||||||
|
_State.healthy_after = 2 # fail twice, then succeed
|
||||||
|
status, body = harness_http.retry_http_get(
|
||||||
|
f"{server}/flaky", expect_status=200, max_wait=5, interval=1
|
||||||
|
)
|
||||||
|
assert status == 200
|
||||||
|
assert body == {"ok": True}
|
||||||
|
assert _State.counter == 3
|
||||||
|
|
||||||
|
|
||||||
|
def test_retry_http_get_times_out():
|
||||||
|
# nonexistent host; expect a RuntimeError after the short max_wait
|
||||||
|
with pytest.raises(RuntimeError, match="Did not converge"):
|
||||||
|
harness_http.retry_http_get(
|
||||||
|
"http://127.0.0.1:1/never",
|
||||||
|
expect_status=200,
|
||||||
|
max_wait=2,
|
||||||
|
interval=1,
|
||||||
|
timeout=1,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_wait_for_http_returns_status(server):
|
||||||
|
status = harness_http.wait_for_http(f"{server}/ok-json", "ok", max_wait=2, interval=1)
|
||||||
|
assert status == 200
|
||||||
|
|
||||||
|
|
||||||
|
def test_assert_converges_returns_value():
|
||||||
|
n = [0]
|
||||||
|
|
||||||
|
def fn():
|
||||||
|
n[0] += 1
|
||||||
|
return "done" if n[0] >= 3 else None
|
||||||
|
|
||||||
|
assert harness_http.assert_converges(fn, "fn", max_wait=2, interval=0) == "done"
|
||||||
Reference in New Issue
Block a user