Files
cc-ci/bridge/bridge.py
autonomic-bot 5d48436577
Some checks failed
continuous-integration/drone/push Build is failing
fix(5 A5-1/A5-2): bridge commit status posting + enroll custom-html-tiny
A5-2: bridge.py now posts Gitea commit statuses on the recipe PR's head SHA:
- pending on build trigger (so testme-on-pr.sh sees the run immediately)
- success/failure on build finish (so testme-on-pr.sh returns VERDICT=GREEN/RED)
Added post_commit_status() using the existing _api() helper + GITEA_TOKEN.
Called from process_testme() (pending) and watch_and_reflect() (terminal state).

A5-1: added recipe-maintainers/custom-html-tiny to bridge POLL_REPOS in
bridge.nix so !testme on custom-html-tiny PRs is picked up by the bridge poller.
2026-05-31 13:48:12 +00:00

408 lines
17 KiB
Python

#!/usr/bin/env python3
"""cc-ci comment-bridge (§4.1).
When an *authorized* user comments exactly `!testme` on an open PR in an enrolled recipe repo,
trigger a parameterized Drone build of the cc-ci pipeline for that PR's head commit and post a PR
comment linking the run. Everything else is ignored.
Trigger paths (§4.1, SETTLED):
* POLLING is PRIMARY (always on): the bridge polls each enrolled repo's open PRs for new
`!testme` comments every POLL_INTERVAL seconds. This is outbound (cc-ci -> git.autonomic.zone)
and needs only READ + comment access — never repo-admin. It is the source of truth for D1.
* WEBHOOK is an OPTIONAL push optimization: the `/hook` endpoint stays live so a Gitea
`issue_comment` webhook, *if an admin registered one*, lowers latency. The bridge NEVER
self-registers a webhook (that needs repo-admin, which we refuse). Manual registration is
documented in docs/enroll-recipe.md.
Both paths share an in-memory seen-set keyed by comment id, so a comment seen by both fires at most
once (no double-trigger). On startup the first poll marks pre-existing comments seen so old comments
don't re-fire. Python stdlib only.
Authorization: a commenter is allowed iff they are a member of the repo's owning org
(`GET /orgs/{owner}/members/{user}` -> 204), which is readable by any org member (read-level, no
admin). An optional AUTH_ALLOWLIST (csv of usernames) is also honored. Fail-closed on any error.
Config (env): BRIDGE_LISTEN, GITEA_API, DRONE_URL, CI_REPO, HMAC_FILE, DRONE_TOKEN_FILE,
GITEA_TOKEN_FILE, POLL_INTERVAL (default 30), POLL_REPOS (csv of enrolled repos), AUTH_ALLOWLIST
(csv, optional).
"""
import hashlib
import hmac
import json
import os
import sys
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
GITEA_API = os.environ.get("GITEA_API", "https://git.autonomic.zone/api/v1")
DRONE_URL = os.environ.get("DRONE_URL", "https://drone.ci.commoninternet.net")
# Dashboard base URL — where per-run artifacts (summary card PNG, level badge SVG) are served
# (Phase 3 U2.3: /runs/<run_id>/...). The PR comment (U3) embeds the card + badge from here. The
# run_id is the Drone build number (== `num`), so the URLs are /runs/<num>/{summary.png,badge.svg}.
DASH_URL = os.environ.get("DASH_URL", "https://ci.commoninternet.net")
CI_REPO = os.environ.get("CI_REPO", "recipe-maintainers/cc-ci")
TRIGGER = "!testme"
# Hidden HTML-comment marker embedded in the bot's PR comment so a re-`!testme` UPDATES the same
# comment in place (R2/U3 "one comment per PR, updated in place") instead of stacking new ones.
# Invisible in rendered Gitea markdown.
COMMENT_MARKER = "<!-- cc-ci:testme -->"
def parse_trigger(body):
"""Parse a PR comment body into (is_trigger, quick). Exactly two accepted forms (trimmed):
`!testme` → (True, False) = full COLD run (default, authoritative);
`!testme --quick` → (True, True) = opt-in LOWER-CONFIDENCE fast lane (WC4/WC7).
Anything else (`!testmexyz`, `!testme foo`, prose) → (False, False) — must NOT trigger."""
s = (body or "").strip()
if s == TRIGGER:
return True, False
if s == f"{TRIGGER} --quick":
return True, True
return False, False
ALLOWLIST = {u.strip() for u in os.environ.get("AUTH_ALLOWLIST", "").split(",") if u.strip()}
def _read(path):
with open(path) as fh:
return fh.read().strip()
HMAC_SECRET = _read(os.environ["HMAC_FILE"]).encode()
DRONE_TOKEN = _read(os.environ["DRONE_TOKEN_FILE"])
GITEA_TOKEN = _read(os.environ["GITEA_TOKEN_FILE"])
# Shared dedup across the poll + webhook paths: a comment id triggers at most one run.
_PROCESSED: set = set()
_PROCESSED_LOCK = threading.Lock()
def log(*a):
print(*a, file=sys.stderr, flush=True)
def _api(url, token, method="GET", data=None, scheme="token"):
# Gitea wants "Authorization: token <t>"; Drone wants "Authorization: Bearer <t>".
headers = {"Authorization": f"{scheme} {token}"} if token else {}
body = None
if data is not None:
body = json.dumps(data).encode()
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
raw = resp.read()
return resp.status, (json.loads(raw) if raw else None)
except urllib.error.HTTPError as e:
return e.code, None
except (urllib.error.URLError, OSError) as e:
log("api error", url, e)
return None, None
def is_authorized(full_name, user):
"""Allowed iff the user is a member of the repo's owning org (read-level membership check) or in
the static AUTH_ALLOWLIST. Uses GET /orgs/{owner}/members/{user} (204=member), which any org
member can read — no repo-admin needed. Fail-closed: anything other than a clean 204/allowlist
hit is rejected."""
if not user:
return False
if user in ALLOWLIST:
return True
owner = full_name.partition("/")[0]
status, _ = _api(f"{GITEA_API}/orgs/{owner}/members/{user}", GITEA_TOKEN)
return status == 204
def pr_head(owner, repo, number):
status, pr = _api(f"{GITEA_API}/repos/{owner}/{repo}/pulls/{number}", GITEA_TOKEN)
if status != 200 or not pr:
return None
head = pr.get("head", {})
return {"sha": head.get("sha"), "repo": (head.get("repo") or {}).get("full_name")}
def trigger_build(recipe, ref, pr, src, quick=False):
# Drone "create build" with custom params -> exposed to the pipeline as env vars. `--quick`
# (WC7) sets CCCI_QUICK=1 so run_recipe_ci takes the opt-in fast lane; absent => full cold.
params = {"branch": "main", "RECIPE": recipe, "REF": ref, "PR": str(pr), "SRC": src}
if quick:
params["CCCI_QUICK"] = "1"
q = urllib.parse.urlencode(params)
url = f"{DRONE_URL}/api/repos/{CI_REPO}/builds?{q}"
status, build = _api(url, DRONE_TOKEN, method="POST", scheme="Bearer")
if status in (200, 201) and build:
return build.get("number")
log("drone trigger failed", status)
return None
def post_comment(owner, repo, number, body):
status, c = _api(
f"{GITEA_API}/repos/{owner}/{repo}/issues/{number}/comments",
GITEA_TOKEN,
method="POST",
data={"body": body},
)
return c.get("id") if status in (200, 201) and c else None
def edit_comment(owner, repo, comment_id, body):
_api(
f"{GITEA_API}/repos/{owner}/{repo}/issues/comments/{comment_id}",
GITEA_TOKEN,
method="PATCH",
data={"body": body},
)
def post_commit_status(owner, repo, sha, state, target_url, description=""):
"""Post a Gitea commit status on a recipe PR's head SHA so testme-on-pr.sh can read
the verdict from GET /repos/{owner}/{repo}/commits/{sha}/status (Phase 5 / A5-2 fix)."""
_api(
f"{GITEA_API}/repos/{owner}/{repo}/statuses/{sha}",
GITEA_TOKEN,
method="POST",
data={"state": state, "target_url": target_url,
"description": description, "context": "cc-ci/testme"},
)
def build_status(num):
status, b = _api(f"{DRONE_URL}/api/repos/{CI_REPO}/builds/{num}", DRONE_TOKEN, scheme="Bearer")
return b.get("status") if status == 200 and b else None
_TERMINAL = {"success", "failure", "error", "killed"}
def artifact_available(url):
"""True iff the dashboard serves `url` (HTTP 200). Used to decide image-vs-text fallback for the
PR comment (R7: a render failure → text, never a broken image). Best-effort; any error → False."""
try:
req = urllib.request.Request(url, method="HEAD")
with urllib.request.urlopen(req, timeout=10) as r:
return getattr(r, "status", r.getcode()) == 200
except Exception: # noqa: BLE001 — unreachable/404/timeout all mean "fall back to text"
return False
def start_comment_body(recipe, sha, run_url, mode=""):
"""U3.1 — the YunoHost-shaped placeholder posted when a run starts: 🌻 marker + ⏳ + live-logs
link. Edited in place to the image-forward result by watch_and_reflect on completion."""
return (
f"{COMMENT_MARKER}\n"
f"🌻 **cc-ci** — testing `{recipe}` @ `{sha[:8]}`{mode}\n\n"
f"⏳ Run in progress — level pending. [Live logs]({run_url})."
)
def result_comment_body(recipe, sha, num, run_url, status):
"""U3.2 — the YunoHost-shaped result comment: 🌻 marker + a level/status **badge** + the
**summary card** image, both linking to the run; falls back to a compact text verdict if the
rendered card/badge isn't available (render failed, or the build didn't complete) — R7."""
badge_url = f"{DASH_URL}/runs/{num}/badge.svg"
card_url = f"{DASH_URL}/runs/{num}/summary.png"
icon = "" if status == "success" else ""
verdict = "passed" if status == "success" else (status or "did not complete")
header = f"{COMMENT_MARKER}\n🌻 **cc-ci** — `{recipe}` @ `{sha[:8]}` {icon} **{verdict}**"
links = f"[full logs]({run_url}) · [dashboard]({DASH_URL}/)"
# Image-forward (YunoHost style) only when the card actually rendered + is served; else text.
if artifact_available(card_url):
body = f"{header}\n\n[![cc-ci result card]({card_url})]({run_url})"
if artifact_available(badge_url):
body += f"\n\n[![level]({badge_url})]({run_url})"
return f"{body}\n\n{links}"
return f"{header}{run_url}\n\n_(summary card unavailable — see the run for details.)_ {links}"
def watch_and_reflect(owner, name, number, num, recipe, sha, comment_id, run_url):
"""Poll the Drone build to completion, then edit the PR comment to the YunoHost-style image-forward
result (🌻 + badge + summary card, linked; text fallback) — D7/R2/U3. Bounded by build timeout."""
import time as _t
deadline = _t.time() + 75 * 60
last = None
while _t.time() < deadline:
last = build_status(num)
if last in _TERMINAL:
break
_t.sleep(15)
if comment_id:
edit_comment(owner, name, comment_id, result_comment_body(recipe, sha, num, run_url, last))
git_state = "success" if last == "success" else "failure"
post_commit_status(owner, name, sha, git_state, run_url, f"cc-ci: {git_state}")
log(f"reflected outcome build {num} ({recipe} PR #{number}): {last}")
def list_open_prs(full_name):
status, prs = _api(f"{GITEA_API}/repos/{full_name}/pulls?state=open&limit=50", GITEA_TOKEN)
return prs if status == 200 and prs else []
def list_comments(full_name, number):
status, cs = _api(f"{GITEA_API}/repos/{full_name}/issues/{number}/comments", GITEA_TOKEN)
return cs if status == 200 and cs else []
def find_existing_comment(full_name, number):
"""Return the id of the bot's existing cc-ci PR comment (carrying COMMENT_MARKER), or None — so a
re-`!testme` UPDATES that comment in place (R2/U3) rather than stacking a new one each run."""
for c in list_comments(full_name, number):
if COMMENT_MARKER in (c.get("body") or ""):
return c.get("id")
return None
def _claim(comment_id) -> bool:
"""Atomically claim a comment id for processing. Returns False if already claimed (dedup)."""
if comment_id is None:
return True
with _PROCESSED_LOCK:
if comment_id in _PROCESSED:
return False
_PROCESSED.add(comment_id)
return True
def process_testme(full_name, owner, name, number, user, comment_id, source, quick=False):
"""Shared by both paths. Dedupes by comment id, checks authorization, resolves the PR head,
triggers the build, comments the run link. Returns (run_url|None, reason)."""
if not _claim(comment_id):
return None, "duplicate"
if not is_authorized(full_name, user):
log(f"rejected: {user} is not an authorized org member on {full_name}")
return None, "not authorized"
head = pr_head(owner, name, number)
if not head or not head["sha"]:
return None, "cannot resolve PR head"
num = trigger_build(name, head["sha"], number, head["repo"] or full_name, quick=quick)
if not num:
post_comment(owner, name, number, "cc-ci: failed to start a CI run (see bridge logs).")
return None, "trigger failed"
run_url = f"{DRONE_URL}/{CI_REPO}/{num}"
post_commit_status(owner, name, head["sha"], "pending", run_url, "cc-ci run in progress")
mode = " **(--quick: lower-confidence fast lane; does not gate merge)**" if quick else ""
# R2/U3: one comment per PR, updated in place. Reuse the existing marked comment if present
# (re-`!testme` refreshes it back to the ⏳ placeholder), else post a new one.
start_body = start_comment_body(name, head["sha"], run_url, mode)
existing = find_existing_comment(full_name, number)
if existing:
edit_comment(owner, name, existing, start_body)
cid = existing
else:
cid = post_comment(owner, name, number, start_body)
log(
f"[{source}] triggered build {num} for {name}@{head['sha'][:8]} "
f"(PR #{number}, comment {comment_id}) by {user}"
)
# Reflect the final pass/fail back onto that comment when the build finishes (D7).
threading.Thread(
target=watch_and_reflect,
args=(owner, name, number, num, name, head["sha"], cid, run_url),
daemon=True,
).start()
return run_url, "ok"
class Handler(BaseHTTPRequestHandler):
def _send(self, code, msg=""):
self.send_response(code)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(msg.encode())
def do_GET(self):
if self.path.rstrip("/") in ("/hook/healthz", "/healthz"):
return self._send(200, "ok")
return self._send(404, "not found")
def do_POST(self):
# Optional push optimization; polling is primary. Deduped against the poller by comment id.
length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(length)
sig = self.headers.get("X-Gitea-Signature", "")
expected = hmac.new(HMAC_SECRET, body, hashlib.sha256).hexdigest()
if not hmac.compare_digest(sig, expected):
log(f"rejected: bad signature event={self.headers.get('X-Gitea-Event')}")
return self._send(401, "bad signature")
if self.headers.get("X-Gitea-Event") != "issue_comment":
return self._send(204, "ignored")
try:
payload = json.loads(body)
except ValueError:
return self._send(400, "bad json")
action = payload.get("action")
c = payload.get("comment") or {}
issue = payload.get("issue") or {}
repo = payload.get("repository") or {}
is_trigger, quick = parse_trigger(c.get("body"))
if action != "created" or not is_trigger:
return self._send(204, "ignored")
if not issue.get("pull_request"):
return self._send(204, "not a PR")
run_url, reason = process_testme(
repo.get("full_name", ""),
(repo.get("owner") or {}).get("login", ""),
repo.get("name", ""),
issue.get("number"),
c.get("user", {}).get("login", ""),
c.get("id"),
"webhook",
quick=quick,
)
if not run_url:
if reason == "duplicate":
return self._send(200, "already handled")
return self._send(403 if reason == "not authorized" else 502, reason)
return self._send(201, run_url)
def log_message(self, *a):
pass
def poll_loop():
"""Primary trigger path. Outbound, read-only. Fires on NEW `!testme` comments only (the first
pass marks pre-existing comments seen)."""
repos = [r.strip() for r in os.environ.get("POLL_REPOS", CI_REPO).split(",") if r.strip()]
interval = int(os.environ.get("POLL_INTERVAL", "30"))
first = True
log(f"poller (primary) watching {repos} every {interval}s")
while True:
for full_name in repos:
owner, _, name = full_name.partition("/")
for pr in list_open_prs(full_name):
number = pr.get("number")
for c in list_comments(full_name, number):
is_trigger, quick = parse_trigger(c.get("body"))
if not is_trigger:
continue
cid = c.get("id")
if first:
_claim(cid) # mark pre-existing comments seen; don't fire on startup
continue
user = (c.get("user") or {}).get("login", "")
process_testme(full_name, owner, name, number, user, cid, "poll", quick=quick)
first = False
time.sleep(interval)
def main():
# Polling is the primary trigger; start it unconditionally.
threading.Thread(target=poll_loop, daemon=True).start()
host, _, port = os.environ.get("BRIDGE_LISTEN", "0.0.0.0:8080").rpartition(":")
srv = ThreadingHTTPServer((host or "0.0.0.0", int(port)), Handler)
log(f"comment-bridge listening on {host or '0.0.0.0'}:{port} (poll primary + optional webhook)")
srv.serve_forever()
if __name__ == "__main__":
main()