Files
cc-ci/bridge/bridge.py
autonomic-bot 23f1861b7a
Some checks failed
continuous-integration/drone/push Build is failing
fix(bridge): ignore pre-start trigger comments
2026-06-13 00:27:22 +00:00

431 lines
18 KiB
Python

#!/usr/bin/env python3
"""cc-ci comment-bridge (§4.1).
When an *authorized* user comments exactly `!testme` on an open PR in an enrolled recipe repo,
trigger a parameterized Drone build of the cc-ci pipeline for that PR's head commit and post a PR
comment linking the run. Everything else is ignored.
Trigger paths (§4.1, SETTLED):
* POLLING is PRIMARY (always on): the bridge polls each enrolled repo's open PRs for new
`!testme` comments every POLL_INTERVAL seconds. This is outbound (cc-ci -> git.autonomic.zone)
and needs only READ + comment access — never repo-admin. It is the source of truth for D1.
* WEBHOOK is an OPTIONAL push optimization: the `/hook` endpoint stays live so a Gitea
`issue_comment` webhook, *if an admin registered one*, lowers latency. The bridge NEVER
self-registers a webhook (that needs repo-admin, which we refuse). Manual registration is
documented in docs/enroll-recipe.md.
Both paths share an in-memory seen-set keyed by comment id, so a comment seen by both fires at most
once (no double-trigger). On startup the first poll marks pre-existing comments seen so old comments
don't re-fire. Python stdlib only.
Authorization: a commenter is allowed iff they are a member of the repo's owning org
(`GET /orgs/{owner}/members/{user}` -> 204), which is readable by any org member (read-level, no
admin). An optional AUTH_ALLOWLIST (csv of usernames) is also honored. Fail-closed on any error.
Config (env): BRIDGE_LISTEN, GITEA_API, DRONE_URL, CI_REPO, HMAC_FILE, DRONE_TOKEN_FILE,
GITEA_TOKEN_FILE, POLL_INTERVAL (default 30), POLL_REPOS (csv of enrolled repos), AUTH_ALLOWLIST
(csv, optional).
"""
import hashlib
import hmac
import json
import os
import sys
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime, timezone
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
GITEA_API = os.environ.get("GITEA_API", "https://git.autonomic.zone/api/v1")
DRONE_URL = os.environ.get("DRONE_URL", "https://drone.ci.commoninternet.net")
# Dashboard base URL — where per-run artifacts (summary card PNG, level badge SVG) are served
# (Phase 3 U2.3: /runs/<run_id>/...). The PR comment (U3) embeds the card + badge from here. The
# run_id is the Drone build number (== `num`), so the URLs are /runs/<num>/{summary.png,badge.svg}.
DASH_URL = os.environ.get("DASH_URL", "https://ci.commoninternet.net")
CI_REPO = os.environ.get("CI_REPO", "recipe-maintainers/cc-ci")
TRIGGER = "!testme"
# Hidden HTML-comment marker embedded in the bot's PR comment so a re-`!testme` UPDATES the same
# comment in place (R2/U3 "one comment per PR, updated in place") instead of stacking new ones.
# Invisible in rendered Gitea markdown.
COMMENT_MARKER = "<!-- cc-ci:testme -->"
def parse_trigger(body):
"""Parse a PR comment body into (is_trigger, quick). Exactly two accepted forms (trimmed):
`!testme` → (True, False) = full COLD run (default, authoritative);
`!testme --quick` → (True, True) = opt-in LOWER-CONFIDENCE fast lane (WC4/WC7).
Anything else (`!testmexyz`, `!testme foo`, prose) → (False, False) — must NOT trigger."""
s = (body or "").strip()
if s == TRIGGER:
return True, False
if s == f"{TRIGGER} --quick":
return True, True
return False, False
ALLOWLIST = {u.strip() for u in os.environ.get("AUTH_ALLOWLIST", "").split(",") if u.strip()}
def _read(path):
with open(path) as fh:
return fh.read().strip()
HMAC_SECRET = _read(os.environ["HMAC_FILE"]).encode()
DRONE_TOKEN = _read(os.environ["DRONE_TOKEN_FILE"])
GITEA_TOKEN = _read(os.environ["GITEA_TOKEN_FILE"])
# Shared dedup across the poll + webhook paths: a comment id triggers at most one run.
_PROCESSED: set = set()
_PROCESSED_LOCK = threading.Lock()
_PROCESS_STARTED_AT = datetime.now(timezone.utc)
def log(*a):
print(*a, file=sys.stderr, flush=True)
def _api(url, token, method="GET", data=None, scheme="token"):
# Gitea wants "Authorization: token <t>"; Drone wants "Authorization: Bearer <t>".
headers = {"Authorization": f"{scheme} {token}"} if token else {}
body = None
if data is not None:
body = json.dumps(data).encode()
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
raw = resp.read()
return resp.status, (json.loads(raw) if raw else None)
except urllib.error.HTTPError as e:
return e.code, None
except (urllib.error.URLError, OSError) as e:
log("api error", url, e)
return None, None
def is_authorized(full_name, user):
"""Allowed iff the user is a member of the repo's owning org (read-level membership check) or in
the static AUTH_ALLOWLIST. Uses GET /orgs/{owner}/members/{user} (204=member), which any org
member can read — no repo-admin needed. Fail-closed: anything other than a clean 204/allowlist
hit is rejected."""
if not user:
return False
if user in ALLOWLIST:
return True
owner = full_name.partition("/")[0]
status, _ = _api(f"{GITEA_API}/orgs/{owner}/members/{user}", GITEA_TOKEN)
return status == 204
def pr_head(owner, repo, number):
status, pr = _api(f"{GITEA_API}/repos/{owner}/{repo}/pulls/{number}", GITEA_TOKEN)
if status != 200 or not pr:
return None
head = pr.get("head", {})
return {"sha": head.get("sha"), "repo": (head.get("repo") or {}).get("full_name")}
def trigger_build(recipe, ref, pr, src, quick=False):
# Drone "create build" with custom params -> exposed to the pipeline as env vars. `--quick`
# (WC7) sets CCCI_QUICK=1 so run_recipe_ci takes the opt-in fast lane; absent => full cold.
params = {"branch": "main", "RECIPE": recipe, "REF": ref, "PR": str(pr), "SRC": src}
if quick:
params["CCCI_QUICK"] = "1"
q = urllib.parse.urlencode(params)
url = f"{DRONE_URL}/api/repos/{CI_REPO}/builds?{q}"
status, build = _api(url, DRONE_TOKEN, method="POST", scheme="Bearer")
if status in (200, 201) and build:
return build.get("number")
log("drone trigger failed", status)
return None
def post_comment(owner, repo, number, body):
status, c = _api(
f"{GITEA_API}/repos/{owner}/{repo}/issues/{number}/comments",
GITEA_TOKEN,
method="POST",
data={"body": body},
)
return c.get("id") if status in (200, 201) and c else None
def edit_comment(owner, repo, comment_id, body):
_api(
f"{GITEA_API}/repos/{owner}/{repo}/issues/comments/{comment_id}",
GITEA_TOKEN,
method="PATCH",
data={"body": body},
)
def post_commit_status(owner, repo, sha, state, target_url, description=""):
"""Post a Gitea commit status on a recipe PR's head SHA so testme-on-pr.sh can read
the verdict from GET /repos/{owner}/{repo}/commits/{sha}/status (Phase 5 / A5-2 fix)."""
_api(
f"{GITEA_API}/repos/{owner}/{repo}/statuses/{sha}",
GITEA_TOKEN,
method="POST",
data={
"state": state,
"target_url": target_url,
"description": description,
"context": "cc-ci/testme",
},
)
def build_status(num):
status, b = _api(f"{DRONE_URL}/api/repos/{CI_REPO}/builds/{num}", DRONE_TOKEN, scheme="Bearer")
return b.get("status") if status == 200 and b else None
_TERMINAL = {"success", "failure", "error", "killed"}
def artifact_available(url):
"""True iff the dashboard serves `url` (HTTP 200). Used to decide image-vs-text fallback for the
PR comment (R7: a render failure → text, never a broken image). Best-effort; any error → False."""
try:
req = urllib.request.Request(url, method="HEAD")
with urllib.request.urlopen(req, timeout=10) as r:
return getattr(r, "status", r.getcode()) == 200
except Exception: # noqa: BLE001 — unreachable/404/timeout all mean "fall back to text"
return False
def start_comment_body(recipe, sha, run_url, mode=""):
"""U3.1 — the YunoHost-shaped placeholder posted when a run starts: 🌻 marker + ⏳ + live-logs
link. Edited in place to the image-forward result by watch_and_reflect on completion."""
return (
f"{COMMENT_MARKER}\n"
f"🌻 **cc-ci** — testing `{recipe}` @ `{sha[:8]}`{mode}\n\n"
f"⏳ Run in progress — level pending. [Live logs]({run_url})."
)
def result_comment_body(recipe, sha, num, run_url, status):
"""U3.2 — the YunoHost-shaped result comment: 🌻 marker + a level/status **badge** + the
**summary card** image, both linking to the run; falls back to a compact text verdict if the
rendered card/badge isn't available (render failed, or the build didn't complete) — R7."""
badge_url = f"{DASH_URL}/runs/{num}/badge.svg"
card_url = f"{DASH_URL}/runs/{num}/summary.png"
icon = "" if status == "success" else ""
verdict = "passed" if status == "success" else (status or "did not complete")
header = f"{COMMENT_MARKER}\n🌻 **cc-ci** — `{recipe}` @ `{sha[:8]}` {icon} **{verdict}**"
links = f"[full logs]({run_url}) · [dashboard]({DASH_URL}/)"
# Image-forward (YunoHost style) only when the card actually rendered + is served; else text.
if artifact_available(card_url):
body = f"{header}\n\n[![cc-ci result card]({card_url})]({run_url})"
if artifact_available(badge_url):
body += f"\n\n[![level]({badge_url})]({run_url})"
return f"{body}\n\n{links}"
return (
f"{header}{run_url}\n\n_(summary card unavailable — see the run for details.)_ {links}"
)
def watch_and_reflect(owner, name, number, num, recipe, sha, comment_id, run_url):
"""Poll the Drone build to completion, then edit the PR comment to the YunoHost-style image-forward
result (🌻 + badge + summary card, linked; text fallback) — D7/R2/U3. Bounded by build timeout."""
import time as _t
deadline = _t.time() + 75 * 60
last = None
while _t.time() < deadline:
last = build_status(num)
if last in _TERMINAL:
break
_t.sleep(15)
if comment_id:
edit_comment(owner, name, comment_id, result_comment_body(recipe, sha, num, run_url, last))
git_state = "success" if last == "success" else "failure"
post_commit_status(owner, name, sha, git_state, run_url, f"cc-ci: {git_state}")
log(f"reflected outcome build {num} ({recipe} PR #{number}): {last}")
def list_open_prs(full_name):
status, prs = _api(f"{GITEA_API}/repos/{full_name}/pulls?state=open&limit=50", GITEA_TOKEN)
return prs if status == 200 and prs else []
def list_comments(full_name, number):
status, cs = _api(f"{GITEA_API}/repos/{full_name}/issues/{number}/comments", GITEA_TOKEN)
return cs if status == 200 and cs else []
def find_existing_comment(full_name, number):
"""Return the id of the bot's existing cc-ci PR comment (carrying COMMENT_MARKER), or None — so a
re-`!testme` UPDATES that comment in place (R2/U3) rather than stacking a new one each run."""
for c in list_comments(full_name, number):
if COMMENT_MARKER in (c.get("body") or ""):
return c.get("id")
return None
def _claim(comment_id) -> bool:
"""Atomically claim a comment id for processing. Returns False if already claimed (dedup)."""
if comment_id is None:
return True
with _PROCESSED_LOCK:
if comment_id in _PROCESSED:
return False
_PROCESSED.add(comment_id)
return True
def _is_preexisting_comment(comment) -> bool:
"""Treat trigger comments older than this bridge process as already-seen.
This closes the reopened-PR hole where a PR was CLOSED during bridge startup, so its old
`!testme` comments were never marked seen by the first poll pass; when that PR is later reopened,
the poller must not replay those historical comments as fresh triggers.
"""
created = (comment or {}).get("created_at")
if not created:
return False
try:
created_at = datetime.fromisoformat(created.replace("Z", "+00:00"))
except ValueError:
return False
return created_at <= _PROCESS_STARTED_AT
def process_testme(full_name, owner, name, number, user, comment_id, source, quick=False):
"""Shared by both paths. Dedupes by comment id, checks authorization, resolves the PR head,
triggers the build, comments the run link. Returns (run_url|None, reason)."""
if not _claim(comment_id):
return None, "duplicate"
if not is_authorized(full_name, user):
log(f"rejected: {user} is not an authorized org member on {full_name}")
return None, "not authorized"
head = pr_head(owner, name, number)
if not head or not head["sha"]:
return None, "cannot resolve PR head"
num = trigger_build(name, head["sha"], number, head["repo"] or full_name, quick=quick)
if not num:
post_comment(owner, name, number, "cc-ci: failed to start a CI run (see bridge logs).")
return None, "trigger failed"
run_url = f"{DRONE_URL}/{CI_REPO}/{num}"
post_commit_status(owner, name, head["sha"], "pending", run_url, "cc-ci run in progress")
mode = " **(--quick: lower-confidence fast lane; does not gate merge)**" if quick else ""
# One NEW comment PER `!testme` (operator preference 2026-06-02): post a fresh ⏳ placeholder each
# run so every re-`!testme` is visible in the PR timeline; watch_and_reflect then edits THIS
# comment to its result. (Previously a single marked comment was reused/edited in place.)
start_body = start_comment_body(name, head["sha"], run_url, mode)
cid = post_comment(owner, name, number, start_body)
log(
f"[{source}] triggered build {num} for {name}@{head['sha'][:8]} "
f"(PR #{number}, comment {comment_id}) by {user}"
)
# Reflect the final pass/fail back onto that comment when the build finishes (D7).
threading.Thread(
target=watch_and_reflect,
args=(owner, name, number, num, name, head["sha"], cid, run_url),
daemon=True,
).start()
return run_url, "ok"
class Handler(BaseHTTPRequestHandler):
def _send(self, code, msg=""):
self.send_response(code)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(msg.encode())
def do_GET(self):
if self.path.rstrip("/") in ("/hook/healthz", "/healthz"):
return self._send(200, "ok")
return self._send(404, "not found")
def do_POST(self):
# Optional push optimization; polling is primary. Deduped against the poller by comment id.
length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(length)
sig = self.headers.get("X-Gitea-Signature", "")
expected = hmac.new(HMAC_SECRET, body, hashlib.sha256).hexdigest()
if not hmac.compare_digest(sig, expected):
log(f"rejected: bad signature event={self.headers.get('X-Gitea-Event')}")
return self._send(401, "bad signature")
if self.headers.get("X-Gitea-Event") != "issue_comment":
return self._send(204, "ignored")
try:
payload = json.loads(body)
except ValueError:
return self._send(400, "bad json")
action = payload.get("action")
c = payload.get("comment") or {}
issue = payload.get("issue") or {}
repo = payload.get("repository") or {}
is_trigger, quick = parse_trigger(c.get("body"))
if action != "created" or not is_trigger:
return self._send(204, "ignored")
if not issue.get("pull_request"):
return self._send(204, "not a PR")
run_url, reason = process_testme(
repo.get("full_name", ""),
(repo.get("owner") or {}).get("login", ""),
repo.get("name", ""),
issue.get("number"),
c.get("user", {}).get("login", ""),
c.get("id"),
"webhook",
quick=quick,
)
if not run_url:
if reason == "duplicate":
return self._send(200, "already handled")
return self._send(403 if reason == "not authorized" else 502, reason)
return self._send(201, run_url)
def log_message(self, *a):
pass
def poll_loop():
"""Primary trigger path. Outbound, read-only. Fires on NEW `!testme` comments only (the first
pass marks pre-existing comments seen)."""
repos = [r.strip() for r in os.environ.get("POLL_REPOS", CI_REPO).split(",") if r.strip()]
interval = int(os.environ.get("POLL_INTERVAL", "30"))
first = True
log(f"poller (primary) watching {repos} every {interval}s")
while True:
for full_name in repos:
owner, _, name = full_name.partition("/")
for pr in list_open_prs(full_name):
number = pr.get("number")
for c in list_comments(full_name, number):
is_trigger, quick = parse_trigger(c.get("body"))
if not is_trigger:
continue
cid = c.get("id")
if first or _is_preexisting_comment(c):
_claim(cid) # mark pre-existing comments seen; don't fire on startup
continue
user = (c.get("user") or {}).get("login", "")
process_testme(full_name, owner, name, number, user, cid, "poll", quick=quick)
first = False
time.sleep(interval)
def main():
# Polling is the primary trigger; start it unconditionally.
threading.Thread(target=poll_loop, daemon=True).start()
host, _, port = os.environ.get("BRIDGE_LISTEN", "0.0.0.0:8080").rpartition(":")
srv = ThreadingHTTPServer((host or "0.0.0.0", int(port)), Handler)
log(f"comment-bridge listening on {host or '0.0.0.0'}:{port} (poll primary + optional webhook)")
srv.serve_forever()
if __name__ == "__main__":
main()