Files
cc-ci/tests/lasuite-docs/functional/test_oidc_login.py
autonomic-bot 9a7772563a style: repo-wide lint pass — make the lint gate green again
Push builds have been RED on the lint step since ~build 209 from accumulated
formatting drift. This is the mechanical cleanup: ruff format + ruff --fix
(UP038 isinstance unions, SIM105 contextlib.suppress, UP031 f-strings, SIM115
tempfile context manager), shfmt -i 2 -ci, nixpkgs-fmt/statix/deadnix (merged
attrsets, dropped unused lib args), yamllint, and shell quoting fixes in
tests/lasuite-docs/setup_custom_tests.sh. No behaviour changes intended;
lint: PASS, unit tests: 138 passed.
2026-06-09 21:56:15 +00:00

95 lines
3.9 KiB
Python

"""lasuite-docs — parity port of recipe-maintainer's oidc_login.py (Phase 2 P2).
SOURCE: references/recipe-maintainer/recipe-info/lasuite-docs/tests/oidc_login.py
End-to-end flow:
1. GET `/api/v1.0/users/me/` without auth → asserts the response REDIRECTS to the dep
keycloak's realm auth endpoint (the recipe is correctly configured to challenge
unauthenticated callers — wired via setup_custom_tests.sh).
2. Obtain an OIDC token from the dep keycloak via password grant
(the test user provisioned by the orchestrator's realm setup).
3. Call `/api/v1.0/users/me/` with `Authorization: Bearer <jwt>` → asserts 200 and the
returned user's email matches the provisioned test user.
Marked @pytest.mark.requires_deps — skips with `deps-not-ready` if setup_custom_tests failed.
"""
from __future__ import annotations
import os
import ssl
import sys
import urllib.error
import urllib.request
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "runner"))
from harness import http as harness_http # noqa: E402
from harness import sso
_CTX = ssl.create_default_context()
_CTX.check_hostname = False
_CTX.verify_mode = ssl.CERT_NONE
class _NoFollow(urllib.request.HTTPRedirectHandler):
def redirect_request(self, req, fp, code, msg, headers, newurl):
raise urllib.error.HTTPError(newurl, code, msg, headers, fp)
def _get_no_redirect(url: str) -> tuple[int, str]:
"""GET without auto-following redirects. Returns (status, redirect_url-or-body)."""
opener = urllib.request.build_opener(_NoFollow, urllib.request.HTTPSHandler(context=_CTX))
try:
with opener.open(url, timeout=15) as resp:
return resp.status, resp.read().decode(errors="replace")
except urllib.error.HTTPError as e:
if e.code in (301, 302, 303, 307, 308):
return e.code, e.headers.get("Location", "")
return e.code, ""
@pytest.mark.requires_deps
def test_oidc_login_via_keycloak(live_app, deps_creds):
"""Anonymous → redirect to keycloak; password-grant token → 200 from /api/v1.0/users/me/."""
kc = deps_creds["keycloak"]
# Step 1: unauthenticated GET → 302 to keycloak realm's auth endpoint
status, redirect = _get_no_redirect(f"https://{live_app}/api/v1.0/users/me/")
expected_prefix = f"https://{kc['domain']}/realms/{kc['realm']}/protocol/openid-connect/auth"
# Some configurations return 401 with WWW-Authenticate (an OIDC challenge) rather than a
# 302 redirect. Both are valid "auth-required" indicators — accept either, but if a
# redirect is returned it must point at the dep keycloak realm.
if status in (301, 302, 303, 307, 308):
assert expected_prefix in (
redirect or ""
), f"Docs redirected to {redirect!r}, expected to start with {expected_prefix!r}"
else:
assert status in (401, 403), (
f"GET /api/v1.0/users/me/ unauth: HTTP {status}; expected redirect to keycloak "
f"OR 401/403. (200 would be an auth leak.)"
)
# Step 2: obtain an OIDC token via password grant against the dep keycloak
creds = {
"client_id": kc["client_id"],
"client_secret": kc["client_secret"],
"user": kc["user"],
"password": kc["password"],
"token_url": kc["token_url"],
}
access_token = sso.oidc_password_grant(creds)
assert isinstance(access_token, str) and access_token.count(".") == 2, "expected JWT"
# Step 3: call the protected API with the Bearer token; assert 200 + user email
status, body = harness_http.http_get(
f"https://{live_app}/api/v1.0/users/me/",
headers={"Authorization": f"Bearer {access_token}"},
)
assert status == 200, f"GET /api/v1.0/users/me/ with token HTTP {status}: {body!r}"
assert isinstance(body, dict), f"unexpected response: {body!r}"
assert (
body.get("email") == kc["email"]
), f"unexpected user email: got {body.get('email')!r}, expected {kc['email']!r}"