Clears cc-ci self-test lint failures: - ruff format: 9 files reformatted (all gtea test files + test_discovery.py) - ruff check --fix: bridge.py UP017 (datetime.UTC alias) + 6 gtea check errors - manifest.py B007: rename unused loop variable path → _path (no auto-fix available) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
145 lines
5.1 KiB
Python
145 lines
5.1 KiB
Python
"""gitea — beyond-parity: admin API CRUD lifecycle (phase gtea).
|
|
|
|
Proves the gitea admin REST API: user + org + token creation, read-back, and deletion.
|
|
Non-vacuous: a misconfigured gitea (broken DB, broken API routing) fails any of these.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import json
|
|
import os
|
|
import secrets
|
|
import sys
|
|
import urllib.error
|
|
import urllib.request
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "runner"))
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
|
import ssl
|
|
|
|
from ops import admin_creds # noqa: E402
|
|
|
|
_CTX = ssl.create_default_context()
|
|
_CTX.check_hostname = False
|
|
_CTX.verify_mode = ssl.CERT_NONE
|
|
|
|
|
|
def _api(domain, path, method="GET", body=None, user="", password="", token=""):
|
|
data = json.dumps(body).encode() if body is not None else None
|
|
if token:
|
|
headers = {"Authorization": f"token {token}"}
|
|
else:
|
|
auth = base64.b64encode(f"{user}:{password}".encode()).decode()
|
|
headers = {"Authorization": f"Basic {auth}"}
|
|
if data:
|
|
headers["Content-Type"] = "application/json"
|
|
req = urllib.request.Request(
|
|
f"https://{domain}/api/v1{path}", data=data, headers=headers, method=method
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=20, context=_CTX) as r:
|
|
raw = r.read()
|
|
return r.status, (json.loads(raw) if raw else {})
|
|
except urllib.error.HTTPError as e:
|
|
raw = e.read()
|
|
try:
|
|
return e.code, json.loads(raw)
|
|
except (ValueError, json.JSONDecodeError):
|
|
return e.code, {}
|
|
|
|
|
|
def test_admin_api_user_org_token_lifecycle(live_app):
|
|
"""Admin API CRUD: create user → create org → create API token → read-back → delete.
|
|
|
|
Proves the full admin-API lifecycle:
|
|
1. Create a test user (admin-creates-user API)
|
|
2. Create an org (admin-creates-org API)
|
|
3. Create an API token for the test user (user-generates-token API)
|
|
4. Read back the user + org via the token
|
|
5. Delete the token, org, and user (cleanup)
|
|
"""
|
|
adm_user, adm_pass = admin_creds(live_app)
|
|
suffix = secrets.token_hex(4)
|
|
test_user = f"testuser-{suffix}"
|
|
test_org = f"testorg-{suffix}"
|
|
token_name = f"ci-test-token-{suffix}"
|
|
test_pass = secrets.token_hex(12) + "A1" # at least one uppercase + digit for any policy
|
|
|
|
# 1. Create test user
|
|
status, body = _api(
|
|
live_app,
|
|
"/admin/users",
|
|
method="POST",
|
|
body={
|
|
"username": test_user,
|
|
"email": f"{test_user}@ci.local",
|
|
"password": test_pass,
|
|
"must_change_password": False,
|
|
"login_name": test_user,
|
|
"source_id": 0,
|
|
},
|
|
user=adm_user,
|
|
password=adm_pass,
|
|
)
|
|
assert status == 201, f"user create HTTP {status}: {body}"
|
|
assert body.get("login") == test_user, f"unexpected login: {body}"
|
|
|
|
try:
|
|
# 2. Create org (as admin, add test user as member)
|
|
status, body = _api(
|
|
live_app,
|
|
"/orgs",
|
|
method="POST",
|
|
body={"username": test_org, "visibility": "public"},
|
|
user=adm_user,
|
|
password=adm_pass,
|
|
)
|
|
assert status == 201, f"org create HTTP {status}: {body}"
|
|
assert body.get("username") == test_org, f"unexpected org: {body}"
|
|
|
|
try:
|
|
# 3. Create API token for test user (admin creates token on behalf of user).
|
|
# Gitea 1.22+ requires explicit scopes; supply the minimum needed for steps 4-5.
|
|
status, tok_body = _api(
|
|
live_app,
|
|
f"/users/{test_user}/tokens",
|
|
method="POST",
|
|
body={"name": token_name, "scopes": ["read:user", "read:organization"]},
|
|
user=adm_user,
|
|
password=adm_pass,
|
|
)
|
|
assert status == 201, f"token create HTTP {status}: {tok_body}"
|
|
token = tok_body.get("sha1")
|
|
assert token, f"token sha1 missing from response: {tok_body}"
|
|
|
|
# 4. Read back via token: authenticated call as test_user
|
|
status, me = _api(live_app, "/user", token=token)
|
|
assert status == 200, f"GET /user with token HTTP {status}"
|
|
assert me.get("login") == test_user, f"token authenticated as wrong user: {me}"
|
|
|
|
# 5. Read org via token
|
|
status, org = _api(live_app, f"/orgs/{test_org}", token=token)
|
|
assert status == 200, f"GET /orgs/{test_org} HTTP {status}: {org}"
|
|
assert org.get("username") == test_org
|
|
|
|
# 6. Delete the token
|
|
status, _ = _api(
|
|
live_app,
|
|
f"/users/{test_user}/tokens/{token_name}",
|
|
method="DELETE",
|
|
user=adm_user,
|
|
password=adm_pass,
|
|
)
|
|
assert status in (204, 404), f"token delete HTTP {status}"
|
|
|
|
finally:
|
|
# Delete org
|
|
_api(live_app, f"/orgs/{test_org}", method="DELETE", user=adm_user, password=adm_pass)
|
|
|
|
finally:
|
|
# Delete test user (admin only)
|
|
_api(
|
|
live_app, f"/admin/users/{test_user}", method="DELETE", user=adm_user, password=adm_pass
|
|
)
|