feat(gtea): build full gitea test suite (M1 build — all files)
Some checks failed
continuous-integration/drone/push Build is failing
Some checks failed
continuous-integration/drone/push Build is failing
- tests/gitea/recipe_meta.py: updated from dep-provider stub to dual-role (dep + recipe-under-test).
Adds BACKUP_CAPABLE=True, READY_PROBE (/api/v1/version), SCREENSHOT (sign-in page), LFS-
conditional EXTRA_ENV (compose.lfs.yml + GITEA_LFS_START_SERVER only when RECIPE=gitea AND
overlay present — dep path unchanged). All existing dep keys preserved; 10/10 dep unit tests pass.
- tests/gitea/ops.py: NEW — admin user creation via gitea CLI (ci_admin, creds in /tmp per-domain
file), marker repo lifecycle (pre_install/pre_upgrade/pre_backup create; pre_restore deletes to
diverge from backup state).
- tests/gitea/test_{install,upgrade,backup,restore}.py: NEW — lifecycle overlays. Install checks
API + admin auth + Playwright sign-in. Upgrade/backup/restore assert marker repo continuity.
- tests/gitea/custom/: NEW — test_health.py (parity: HTTP 200 root), test_git_push.py (parity:
create→clone→push→verify→delete), test_admin_api.py (beyond-parity: user+org+token CRUD),
test_lfs_roundtrip.py (LFS OID round-trip + JWT stability; skips on main, runs on PR #1 head).
- tests/gitea/PARITY.md: NEW — mapping table, source note (recipe-info corpus not upstream repo),
beyond-parity rationale, backup/restore real-tier note, DB choice, dep-split mechanism, LFS skip.
- machine-docs/STATUS-gtea.md: NEW — phase status (building M1).
- machine-docs/BACKLOG-gtea.md: merged with Adversary init.
- machine-docs/JOURNAL-gtea.md: Builder log with design decisions + unit test results.
- machine-docs/REVIEW-gtea.md: kept Adversary init content.
- machine-docs/DECISIONS.md: appended gtea section (LFS split, admin mgmt, marker design).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
130
tests/gitea/custom/test_admin_api.py
Normal file
130
tests/gitea/custom/test_admin_api.py
Normal file
@ -0,0 +1,130 @@
|
||||
"""gitea — beyond-parity: admin API CRUD lifecycle (phase gtea).
|
||||
|
||||
Proves the gitea admin REST API: user + org + token creation, read-back, and deletion.
|
||||
Non-vacuous: a misconfigured gitea (broken DB, broken API routing) fails any of these.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import secrets
|
||||
import sys
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "runner"))
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
from ops import admin_creds # noqa: E402
|
||||
|
||||
import ssl
|
||||
|
||||
_CTX = ssl.create_default_context()
|
||||
_CTX.check_hostname = False
|
||||
_CTX.verify_mode = ssl.CERT_NONE
|
||||
|
||||
|
||||
def _api(domain, path, method="GET", body=None, user="", password="", token=""):
|
||||
data = json.dumps(body).encode() if body is not None else None
|
||||
if token:
|
||||
headers = {"Authorization": f"token {token}"}
|
||||
else:
|
||||
auth = base64.b64encode(f"{user}:{password}".encode()).decode()
|
||||
headers = {"Authorization": f"Basic {auth}"}
|
||||
if data:
|
||||
headers["Content-Type"] = "application/json"
|
||||
req = urllib.request.Request(
|
||||
f"https://{domain}/api/v1{path}", data=data, headers=headers, method=method
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=20, context=_CTX) as r:
|
||||
raw = r.read()
|
||||
return r.status, (json.loads(raw) if raw else {})
|
||||
except urllib.error.HTTPError as e:
|
||||
raw = e.read()
|
||||
try:
|
||||
return e.code, json.loads(raw)
|
||||
except (ValueError, json.JSONDecodeError):
|
||||
return e.code, {}
|
||||
|
||||
|
||||
def test_admin_api_user_org_token_lifecycle(live_app):
|
||||
"""Admin API CRUD: create user → create org → create API token → read-back → delete.
|
||||
|
||||
Proves the full admin-API lifecycle:
|
||||
1. Create a test user (admin-creates-user API)
|
||||
2. Create an org (admin-creates-org API)
|
||||
3. Create an API token for the test user (user-generates-token API)
|
||||
4. Read back the user + org via the token
|
||||
5. Delete the token, org, and user (cleanup)
|
||||
"""
|
||||
adm_user, adm_pass = admin_creds(live_app)
|
||||
suffix = secrets.token_hex(4)
|
||||
test_user = f"testuser-{suffix}"
|
||||
test_org = f"testorg-{suffix}"
|
||||
token_name = f"ci-test-token-{suffix}"
|
||||
test_pass = secrets.token_hex(12) + "A1" # at least one uppercase + digit for any policy
|
||||
|
||||
# 1. Create test user
|
||||
status, body = _api(
|
||||
live_app, "/admin/users", method="POST",
|
||||
body={
|
||||
"username": test_user,
|
||||
"email": f"{test_user}@ci.local",
|
||||
"password": test_pass,
|
||||
"must_change_password": False,
|
||||
"login_name": test_user,
|
||||
"source_id": 0,
|
||||
},
|
||||
user=adm_user, password=adm_pass,
|
||||
)
|
||||
assert status == 201, f"user create HTTP {status}: {body}"
|
||||
assert body.get("login") == test_user, f"unexpected login: {body}"
|
||||
|
||||
try:
|
||||
# 2. Create org (as admin, add test user as member)
|
||||
status, body = _api(
|
||||
live_app, "/orgs", method="POST",
|
||||
body={"username": test_org, "visibility": "public"},
|
||||
user=adm_user, password=adm_pass,
|
||||
)
|
||||
assert status == 201, f"org create HTTP {status}: {body}"
|
||||
assert body.get("username") == test_org, f"unexpected org: {body}"
|
||||
|
||||
try:
|
||||
# 3. Create API token for test user (admin creates token on behalf of user)
|
||||
status, tok_body = _api(
|
||||
live_app, f"/users/{test_user}/tokens", method="POST",
|
||||
body={"name": token_name},
|
||||
user=adm_user, password=adm_pass,
|
||||
)
|
||||
assert status == 201, f"token create HTTP {status}: {tok_body}"
|
||||
token = tok_body.get("sha1")
|
||||
assert token, f"token sha1 missing from response: {tok_body}"
|
||||
|
||||
# 4. Read back via token: authenticated call as test_user
|
||||
status, me = _api(live_app, "/user", token=token)
|
||||
assert status == 200, f"GET /user with token HTTP {status}"
|
||||
assert me.get("login") == test_user, f"token authenticated as wrong user: {me}"
|
||||
|
||||
# 5. Read org via token
|
||||
status, org = _api(live_app, f"/orgs/{test_org}", token=token)
|
||||
assert status == 200, f"GET /orgs/{test_org} HTTP {status}: {org}"
|
||||
assert org.get("username") == test_org
|
||||
|
||||
# 6. Delete the token
|
||||
status, _ = _api(
|
||||
live_app, f"/users/{test_user}/tokens/{token_name}", method="DELETE",
|
||||
user=adm_user, password=adm_pass,
|
||||
)
|
||||
assert status in (204, 404), f"token delete HTTP {status}"
|
||||
|
||||
finally:
|
||||
# Delete org
|
||||
_api(live_app, f"/orgs/{test_org}", method="DELETE", user=adm_user, password=adm_pass)
|
||||
|
||||
finally:
|
||||
# Delete test user (admin only)
|
||||
_api(live_app, f"/admin/users/{test_user}", method="DELETE",
|
||||
user=adm_user, password=adm_pass)
|
||||
122
tests/gitea/custom/test_git_push.py
Normal file
122
tests/gitea/custom/test_git_push.py
Normal file
@ -0,0 +1,122 @@
|
||||
"""gitea — parity port of recipe-info/gitea/tests/git_push.py (phase gtea).
|
||||
|
||||
SOURCE: references/recipe-maintainer/recipe-info/gitea/tests/git_push.py
|
||||
|
||||
Original: create repo via API → clone → commit → push over HTTPS → verify commit via API → delete.
|
||||
cc-ci port: same flow, adapted to the per-run domain and the harness admin creds from ops.py.
|
||||
Non-vacuous: a broken SCM fails the push or the API verification.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "runner"))
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
from ops import admin_creds # noqa: E402
|
||||
|
||||
import ssl
|
||||
|
||||
_CTX = ssl.create_default_context()
|
||||
_CTX.check_hostname = False
|
||||
_CTX.verify_mode = ssl.CERT_NONE
|
||||
|
||||
|
||||
def _api(domain, path, method="GET", body=None, user="", password=""):
|
||||
data = json.dumps(body).encode() if body is not None else None
|
||||
auth = base64.b64encode(f"{user}:{password}".encode()).decode()
|
||||
headers = {"Authorization": f"Basic {auth}"}
|
||||
if data:
|
||||
headers["Content-Type"] = "application/json"
|
||||
req = urllib.request.Request(
|
||||
f"https://{domain}/api/v1{path}", data=data, headers=headers, method=method
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=20, context=_CTX) as r:
|
||||
raw = r.read()
|
||||
return r.status, (json.loads(raw) if raw else {})
|
||||
except urllib.error.HTTPError as e:
|
||||
raw = e.read()
|
||||
try:
|
||||
return e.code, json.loads(raw)
|
||||
except (ValueError, json.JSONDecodeError):
|
||||
return e.code, {}
|
||||
|
||||
|
||||
def _run_git(args, cwd, env=None):
|
||||
result = subprocess.run(
|
||||
["git"] + args,
|
||||
cwd=cwd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
env={**os.environ, **(env or {})},
|
||||
timeout=60,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f"git {' '.join(args)} failed:\n{result.stderr}")
|
||||
return result.stdout.strip()
|
||||
|
||||
|
||||
def test_git_push(live_app):
|
||||
"""Parity with recipe-info/gitea/tests/git_push.py: create repo → clone → push → verify."""
|
||||
user, password = admin_creds(live_app)
|
||||
repo_name = "ci-test-push"
|
||||
|
||||
# 1. Create test repo
|
||||
status, body = _api(
|
||||
live_app, "/user/repos", method="POST",
|
||||
body={"name": repo_name, "private": False, "auto_init": False},
|
||||
user=user, password=password,
|
||||
)
|
||||
assert status == 201, f"repo create HTTP {status}: {body}"
|
||||
clone_url = body.get("clone_url") or f"https://{live_app}/{user}/{repo_name}.git"
|
||||
|
||||
tmpdir = tempfile.mkdtemp(prefix="ccci-gitea-push-")
|
||||
try:
|
||||
git_env = {
|
||||
"GIT_AUTHOR_NAME": "CI Test Bot",
|
||||
"GIT_AUTHOR_EMAIL": "ci@ci.local",
|
||||
"GIT_COMMITTER_NAME": "CI Test Bot",
|
||||
"GIT_COMMITTER_EMAIL": "ci@ci.local",
|
||||
# Embed credentials so HTTPS push works without interactive prompt.
|
||||
"GIT_CONFIG_COUNT": "1",
|
||||
"GIT_CONFIG_KEY_0": f"url.https://{user}:{password}@{live_app}/.insteadOf",
|
||||
"GIT_CONFIG_VALUE_0": f"https://{live_app}/",
|
||||
}
|
||||
|
||||
# 2. Clone (empty repo)
|
||||
_run_git(["clone", clone_url, tmpdir], cwd="/tmp", env=git_env)
|
||||
_run_git(["checkout", "-b", "main"], cwd=tmpdir, env=git_env)
|
||||
|
||||
# 3. Commit a file
|
||||
readme = os.path.join(tmpdir, "README.md")
|
||||
with open(readme, "w") as f:
|
||||
f.write(f"# {repo_name}\n\nAutomated ci push test.\n")
|
||||
_run_git(["add", "README.md"], cwd=tmpdir, env=git_env)
|
||||
_run_git(["commit", "-m", "test: automated push test"], cwd=tmpdir, env=git_env)
|
||||
|
||||
# 4. Push
|
||||
_run_git(["push", "origin", "HEAD:main"], cwd=tmpdir, env=git_env)
|
||||
|
||||
# 5. Verify commit landed via API
|
||||
status, commits = _api(
|
||||
live_app, f"/repos/{user}/{repo_name}/commits?limit=1",
|
||||
user=user, password=password,
|
||||
)
|
||||
assert status == 200 and commits, f"commit list HTTP {status}: {commits}"
|
||||
commit_msg = commits[0].get("commit", {}).get("message", "").strip()
|
||||
assert "automated push test" in commit_msg, (
|
||||
f"Unexpected commit message: {commit_msg!r}"
|
||||
)
|
||||
finally:
|
||||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||||
# 6. Cleanup — delete the test repo
|
||||
_api(live_app, f"/repos/{user}/{repo_name}", method="DELETE", user=user, password=password)
|
||||
22
tests/gitea/custom/test_health.py
Normal file
22
tests/gitea/custom/test_health.py
Normal file
@ -0,0 +1,22 @@
|
||||
"""gitea — parity port of recipe-info/gitea/tests/health_check.py (phase gtea).
|
||||
|
||||
SOURCE: references/recipe-maintainer/recipe-info/gitea/tests/health_check.py
|
||||
|
||||
The original checks HTTP 200 from the root URL. The cc-ci port preserves the assertion shape,
|
||||
adapted to the ephemeral per-run domain via the `live_app` fixture.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "runner"))
|
||||
from harness import http as harness_http # noqa: E402
|
||||
|
||||
|
||||
def test_gitea_root_returns_200(live_app):
|
||||
"""Parity with recipe-info/gitea/tests/health_check.py: HTTP 200 from the root URL."""
|
||||
url = f"https://{live_app}"
|
||||
status, _ = harness_http.retry_http_get(url, expect_status=200, max_wait=60, interval=3)
|
||||
assert status == 200, f"gitea at {url} returned HTTP {status} (expected 200)"
|
||||
216
tests/gitea/custom/test_lfs_roundtrip.py
Normal file
216
tests/gitea/custom/test_lfs_roundtrip.py
Normal file
@ -0,0 +1,216 @@
|
||||
"""gitea — LFS round-trip capstone (phase gtea, PR #1 lfs-plain-gitea).
|
||||
|
||||
This test is the PROOF for PR #1 (feat: support Git LFS on plain gitea):
|
||||
- On gitea main: compose.lfs.yml is ABSENT → test skips (declared EXPECTED_NA in PARITY.md).
|
||||
- On lfs-plain-gitea PR head: compose.lfs.yml is PRESENT → test runs and must pass.
|
||||
|
||||
What it tests:
|
||||
1. LFS object upload (git lfs push) → download round-trip: fetched bytes hash to the OID.
|
||||
2. LFS JWT secret stability: after `abra app restart`, the rendered app.ini LFS_JWT_SECRET
|
||||
is unchanged and tokens still validate (the regression the PR fixes).
|
||||
|
||||
Non-vacuous: a gitea without LFS enabled cannot accept lfs push (step 1 fails); a rotating
|
||||
LFS_JWT_SECRET breaks existing tokens (step 2 fails).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "runner"))
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
from harness import abra as harness_abra, lifecycle # noqa: E402
|
||||
from ops import admin_creds # noqa: E402
|
||||
|
||||
import ssl
|
||||
|
||||
_CTX = ssl.create_default_context()
|
||||
_CTX.check_hostname = False
|
||||
_CTX.verify_mode = ssl.CERT_NONE
|
||||
|
||||
_LFS_OVERLAY = "compose.lfs.yml"
|
||||
|
||||
|
||||
def _lfs_available() -> bool:
|
||||
abra_dir = os.environ.get("ABRA_DIR") or os.path.expanduser("~/.abra")
|
||||
return os.path.exists(os.path.join(abra_dir, "recipes", "gitea", _LFS_OVERLAY))
|
||||
|
||||
|
||||
def _api(domain, path, method="GET", body=None, user="", password=""):
|
||||
data = json.dumps(body).encode() if body is not None else None
|
||||
auth = base64.b64encode(f"{user}:{password}".encode()).decode()
|
||||
headers = {"Authorization": f"Basic {auth}"}
|
||||
if data:
|
||||
headers["Content-Type"] = "application/json"
|
||||
req = urllib.request.Request(
|
||||
f"https://{domain}/api/v1{path}", data=data, headers=headers, method=method
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=20, context=_CTX) as r:
|
||||
raw = r.read()
|
||||
return r.status, (json.loads(raw) if raw else {})
|
||||
except urllib.error.HTTPError as e:
|
||||
raw = e.read()
|
||||
try:
|
||||
return e.code, json.loads(raw)
|
||||
except (ValueError, json.JSONDecodeError):
|
||||
return e.code, {}
|
||||
|
||||
|
||||
def _run_git(args, cwd, env=None):
|
||||
full_env = {**os.environ, **(env or {})}
|
||||
result = subprocess.run(
|
||||
["git"] + args, cwd=cwd, capture_output=True, text=True, env=full_env, timeout=120
|
||||
)
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f"git {' '.join(args)} failed:\n{result.stderr}")
|
||||
return result.stdout.strip()
|
||||
|
||||
|
||||
def test_lfs_roundtrip(live_app):
|
||||
"""LFS object upload→download round-trip + JWT-secret stability after restart.
|
||||
|
||||
Skips when compose.lfs.yml is absent (gitea main — LFS not enabled in this deploy).
|
||||
"""
|
||||
if not _lfs_available():
|
||||
import pytest
|
||||
pytest.skip(
|
||||
"compose.lfs.yml absent in gitea recipe checkout — LFS is not enabled on this branch. "
|
||||
"This test runs on lfs-plain-gitea (PR #1) and is EXPECTED_NA on main."
|
||||
)
|
||||
|
||||
user, password = admin_creds(live_app)
|
||||
repo_name = "ci-lfs-test"
|
||||
git_env = {
|
||||
"GIT_AUTHOR_NAME": "CI LFS Bot",
|
||||
"GIT_AUTHOR_EMAIL": "ci@ci.local",
|
||||
"GIT_COMMITTER_NAME": "CI LFS Bot",
|
||||
"GIT_COMMITTER_EMAIL": "ci@ci.local",
|
||||
"GIT_CONFIG_COUNT": "1",
|
||||
"GIT_CONFIG_KEY_0": f"url.https://{user}:{password}@{live_app}/.insteadOf",
|
||||
"GIT_CONFIG_VALUE_0": f"https://{live_app}/",
|
||||
# Suppress interactive LFS credential prompts
|
||||
"GIT_TERMINAL_PROMPT": "0",
|
||||
}
|
||||
|
||||
# 1. Create LFS test repo
|
||||
status, body = _api(
|
||||
live_app, "/user/repos", method="POST",
|
||||
body={"name": repo_name, "private": False, "auto_init": True, "default_branch": "main"},
|
||||
user=user, password=password,
|
||||
)
|
||||
assert status in (201, 409), f"repo create HTTP {status}: {body}"
|
||||
clone_url = f"https://{live_app}/{user}/{repo_name}.git"
|
||||
|
||||
tmpdir = tempfile.mkdtemp(prefix="ccci-gitea-lfs-")
|
||||
try:
|
||||
# 2. Clone repo
|
||||
_run_git(["clone", clone_url, tmpdir], cwd="/tmp", env=git_env)
|
||||
_run_git(["lfs", "install"], cwd=tmpdir, env=git_env)
|
||||
|
||||
# 3. Track *.bin as LFS
|
||||
_run_git(["lfs", "track", "*.bin"], cwd=tmpdir, env=git_env)
|
||||
_run_git(["add", ".gitattributes"], cwd=tmpdir, env=git_env)
|
||||
|
||||
# 4. Create a 1KB binary blob (content: random-looking but deterministic per test)
|
||||
blob = bytes(range(256)) * 4 # 1024 bytes
|
||||
blob_path = os.path.join(tmpdir, "testblob.bin")
|
||||
with open(blob_path, "wb") as f:
|
||||
f.write(blob)
|
||||
expected_sha256 = hashlib.sha256(blob).hexdigest()
|
||||
expected_oid = f"sha256:{expected_sha256}"
|
||||
|
||||
_run_git(["add", "testblob.bin"], cwd=tmpdir, env=git_env)
|
||||
_run_git(["commit", "-m", "test: add LFS blob"], cwd=tmpdir, env=git_env)
|
||||
|
||||
# 5. Push (LFS upload happens here)
|
||||
_run_git(["push", "origin", "HEAD:main"], cwd=tmpdir, env=git_env)
|
||||
|
||||
# Verify git-lfs pointer was tracked correctly
|
||||
lfs_ls = subprocess.run(
|
||||
["git", "lfs", "ls-files"],
|
||||
cwd=tmpdir, capture_output=True, text=True, env={**os.environ, **git_env}
|
||||
)
|
||||
assert "testblob.bin" in lfs_ls.stdout, f"testblob.bin not in git-lfs ls-files: {lfs_ls.stdout}"
|
||||
|
||||
# 6. Download in a FRESH clone (proves the LFS server stores and serves the object)
|
||||
fresh_dir = tempfile.mkdtemp(prefix="ccci-gitea-lfs-dl-")
|
||||
try:
|
||||
_run_git(["clone", clone_url, fresh_dir], cwd="/tmp", env=git_env)
|
||||
fetched_path = os.path.join(fresh_dir, "testblob.bin")
|
||||
assert os.path.exists(fetched_path), "testblob.bin not fetched in fresh clone"
|
||||
with open(fetched_path, "rb") as f:
|
||||
fetched = f.read()
|
||||
fetched_sha256 = hashlib.sha256(fetched).hexdigest()
|
||||
assert fetched_sha256 == expected_sha256, (
|
||||
f"LFS round-trip OID mismatch: expected {expected_oid}, got sha256:{fetched_sha256}"
|
||||
)
|
||||
finally:
|
||||
shutil.rmtree(fresh_dir, ignore_errors=True)
|
||||
|
||||
# 7. JWT-secret stability: restart gitea + assert LFS_JWT_SECRET unchanged.
|
||||
# Read the current secret from inside the container's rendered app.ini.
|
||||
current_jwt = lifecycle.exec_in_app(
|
||||
live_app,
|
||||
["sh", "-c", "grep -E '^LFS_JWT_SECRET' /etc/gitea/app.ini || echo NOT_FOUND"],
|
||||
timeout=30,
|
||||
).strip()
|
||||
assert current_jwt and "NOT_FOUND" not in current_jwt, (
|
||||
"Could not read LFS_JWT_SECRET from /etc/gitea/app.ini before restart"
|
||||
)
|
||||
|
||||
# Restart the gitea container
|
||||
lifecycle.exec_in_app(live_app, ["true"], timeout=5) # no-op to confirm exec works
|
||||
subprocess.run(
|
||||
["docker", "service", "update", "--force",
|
||||
live_app.replace(".", "_") + "_app"],
|
||||
capture_output=True, timeout=120,
|
||||
)
|
||||
# Wait for gitea to come back up
|
||||
from harness import generic
|
||||
# Re-read meta from the live_app fixture (meta is not in scope here — use the stored meta)
|
||||
import time
|
||||
deadline = time.time() + 120
|
||||
while time.time() < deadline:
|
||||
status2, _ = _api(live_app, "/api/v1/version", user=user, password=password)
|
||||
if status2 == 200:
|
||||
break
|
||||
time.sleep(5)
|
||||
assert status2 == 200, "gitea did not come back up after restart"
|
||||
|
||||
jwt_after = lifecycle.exec_in_app(
|
||||
live_app,
|
||||
["sh", "-c", "grep -E '^LFS_JWT_SECRET' /etc/gitea/app.ini || echo NOT_FOUND"],
|
||||
timeout=30,
|
||||
).strip()
|
||||
assert jwt_after == current_jwt, (
|
||||
"LFS_JWT_SECRET changed after restart — the regression the PR fixes is still present: "
|
||||
f"before={current_jwt!r} after={jwt_after!r}"
|
||||
)
|
||||
|
||||
# 8. Verify a fresh clone still works after restart (tokens still validate)
|
||||
post_restart_dir = tempfile.mkdtemp(prefix="ccci-gitea-lfs-restart-")
|
||||
try:
|
||||
_run_git(["clone", clone_url, post_restart_dir], cwd="/tmp", env=git_env)
|
||||
pr_blob = os.path.join(post_restart_dir, "testblob.bin")
|
||||
assert os.path.exists(pr_blob), "testblob.bin not fetched in post-restart clone"
|
||||
with open(pr_blob, "rb") as f:
|
||||
pr_data = f.read()
|
||||
assert hashlib.sha256(pr_data).hexdigest() == expected_sha256, (
|
||||
"LFS object corrupted after restart — JWT secret may have changed"
|
||||
)
|
||||
finally:
|
||||
shutil.rmtree(post_restart_dir, ignore_errors=True)
|
||||
|
||||
finally:
|
||||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||||
_api(live_app, f"/repos/{user}/{repo_name}", method="DELETE", user=user, password=password)
|
||||
Reference in New Issue
Block a user