bridge/bridge.py: parse_trigger(body) → (is_trigger, quick); accepts exactly '!testme' (cold, default) and '!testme --quick' (opt-in fast lane), rejects '!testmexyz'/'!testme foo'/etc. Threaded through both poll + webhook paths and process_testme → trigger_build adds the CCCI_QUICK=1 Drone param (auto-exposed to run_recipe_ci). PR comment labels a quick run lower-confidence. .drone.yml echoes quick=. +3 unit tests (incl. the !testmexyz negative). 64 unit pass. WC7: default !testme stays full cold; --quick opt-in, never gates merge. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
338 lines
13 KiB
Python
338 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""cc-ci comment-bridge (§4.1).
|
|
|
|
When an *authorized* user comments exactly `!testme` on an open PR in an enrolled recipe repo,
|
|
trigger a parameterized Drone build of the cc-ci pipeline for that PR's head commit and post a PR
|
|
comment linking the run. Everything else is ignored.
|
|
|
|
Trigger paths (§4.1, SETTLED):
|
|
* POLLING is PRIMARY (always on): the bridge polls each enrolled repo's open PRs for new
|
|
`!testme` comments every POLL_INTERVAL seconds. This is outbound (cc-ci -> git.autonomic.zone)
|
|
and needs only READ + comment access — never repo-admin. It is the source of truth for D1.
|
|
* WEBHOOK is an OPTIONAL push optimization: the `/hook` endpoint stays live so a Gitea
|
|
`issue_comment` webhook, *if an admin registered one*, lowers latency. The bridge NEVER
|
|
self-registers a webhook (that needs repo-admin, which we refuse). Manual registration is
|
|
documented in docs/enroll-recipe.md.
|
|
|
|
Both paths share an in-memory seen-set keyed by comment id, so a comment seen by both fires at most
|
|
once (no double-trigger). On startup the first poll marks pre-existing comments seen so old comments
|
|
don't re-fire. Python stdlib only.
|
|
|
|
Authorization: a commenter is allowed iff they are a member of the repo's owning org
|
|
(`GET /orgs/{owner}/members/{user}` -> 204), which is readable by any org member (read-level, no
|
|
admin). An optional AUTH_ALLOWLIST (csv of usernames) is also honored. Fail-closed on any error.
|
|
|
|
Config (env): BRIDGE_LISTEN, GITEA_API, DRONE_URL, CI_REPO, HMAC_FILE, DRONE_TOKEN_FILE,
|
|
GITEA_TOKEN_FILE, POLL_INTERVAL (default 30), POLL_REPOS (csv of enrolled repos), AUTH_ALLOWLIST
|
|
(csv, optional).
|
|
"""
|
|
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import os
|
|
import sys
|
|
import threading
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
|
|
GITEA_API = os.environ.get("GITEA_API", "https://git.autonomic.zone/api/v1")
|
|
DRONE_URL = os.environ.get("DRONE_URL", "https://drone.ci.commoninternet.net")
|
|
CI_REPO = os.environ.get("CI_REPO", "recipe-maintainers/cc-ci")
|
|
TRIGGER = "!testme"
|
|
|
|
|
|
def parse_trigger(body):
|
|
"""Parse a PR comment body into (is_trigger, quick). Exactly two accepted forms (trimmed):
|
|
`!testme` → (True, False) = full COLD run (default, authoritative);
|
|
`!testme --quick` → (True, True) = opt-in LOWER-CONFIDENCE fast lane (WC4/WC7).
|
|
Anything else (`!testmexyz`, `!testme foo`, prose) → (False, False) — must NOT trigger."""
|
|
s = (body or "").strip()
|
|
if s == TRIGGER:
|
|
return True, False
|
|
if s == f"{TRIGGER} --quick":
|
|
return True, True
|
|
return False, False
|
|
ALLOWLIST = {u.strip() for u in os.environ.get("AUTH_ALLOWLIST", "").split(",") if u.strip()}
|
|
|
|
|
|
def _read(path):
|
|
with open(path) as fh:
|
|
return fh.read().strip()
|
|
|
|
|
|
HMAC_SECRET = _read(os.environ["HMAC_FILE"]).encode()
|
|
DRONE_TOKEN = _read(os.environ["DRONE_TOKEN_FILE"])
|
|
GITEA_TOKEN = _read(os.environ["GITEA_TOKEN_FILE"])
|
|
|
|
# Shared dedup across the poll + webhook paths: a comment id triggers at most one run.
|
|
_PROCESSED: set = set()
|
|
_PROCESSED_LOCK = threading.Lock()
|
|
|
|
|
|
def log(*a):
|
|
print(*a, file=sys.stderr, flush=True)
|
|
|
|
|
|
def _api(url, token, method="GET", data=None, scheme="token"):
|
|
# Gitea wants "Authorization: token <t>"; Drone wants "Authorization: Bearer <t>".
|
|
headers = {"Authorization": f"{scheme} {token}"} if token else {}
|
|
body = None
|
|
if data is not None:
|
|
body = json.dumps(data).encode()
|
|
headers["Content-Type"] = "application/json"
|
|
req = urllib.request.Request(url, data=body, headers=headers, method=method)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
raw = resp.read()
|
|
return resp.status, (json.loads(raw) if raw else None)
|
|
except urllib.error.HTTPError as e:
|
|
return e.code, None
|
|
except (urllib.error.URLError, OSError) as e:
|
|
log("api error", url, e)
|
|
return None, None
|
|
|
|
|
|
def is_authorized(full_name, user):
|
|
"""Allowed iff the user is a member of the repo's owning org (read-level membership check) or in
|
|
the static AUTH_ALLOWLIST. Uses GET /orgs/{owner}/members/{user} (204=member), which any org
|
|
member can read — no repo-admin needed. Fail-closed: anything other than a clean 204/allowlist
|
|
hit is rejected."""
|
|
if not user:
|
|
return False
|
|
if user in ALLOWLIST:
|
|
return True
|
|
owner = full_name.partition("/")[0]
|
|
status, _ = _api(f"{GITEA_API}/orgs/{owner}/members/{user}", GITEA_TOKEN)
|
|
return status == 204
|
|
|
|
|
|
def pr_head(owner, repo, number):
|
|
status, pr = _api(f"{GITEA_API}/repos/{owner}/{repo}/pulls/{number}", GITEA_TOKEN)
|
|
if status != 200 or not pr:
|
|
return None
|
|
head = pr.get("head", {})
|
|
return {"sha": head.get("sha"), "repo": (head.get("repo") or {}).get("full_name")}
|
|
|
|
|
|
def trigger_build(recipe, ref, pr, src, quick=False):
|
|
# Drone "create build" with custom params -> exposed to the pipeline as env vars. `--quick`
|
|
# (WC7) sets CCCI_QUICK=1 so run_recipe_ci takes the opt-in fast lane; absent => full cold.
|
|
params = {"branch": "main", "RECIPE": recipe, "REF": ref, "PR": str(pr), "SRC": src}
|
|
if quick:
|
|
params["CCCI_QUICK"] = "1"
|
|
q = urllib.parse.urlencode(params)
|
|
url = f"{DRONE_URL}/api/repos/{CI_REPO}/builds?{q}"
|
|
status, build = _api(url, DRONE_TOKEN, method="POST", scheme="Bearer")
|
|
if status in (200, 201) and build:
|
|
return build.get("number")
|
|
log("drone trigger failed", status)
|
|
return None
|
|
|
|
|
|
def post_comment(owner, repo, number, body):
|
|
status, c = _api(
|
|
f"{GITEA_API}/repos/{owner}/{repo}/issues/{number}/comments",
|
|
GITEA_TOKEN,
|
|
method="POST",
|
|
data={"body": body},
|
|
)
|
|
return c.get("id") if status in (200, 201) and c else None
|
|
|
|
|
|
def edit_comment(owner, repo, comment_id, body):
|
|
_api(
|
|
f"{GITEA_API}/repos/{owner}/{repo}/issues/comments/{comment_id}",
|
|
GITEA_TOKEN,
|
|
method="PATCH",
|
|
data={"body": body},
|
|
)
|
|
|
|
|
|
def build_status(num):
|
|
status, b = _api(f"{DRONE_URL}/api/repos/{CI_REPO}/builds/{num}", DRONE_TOKEN, scheme="Bearer")
|
|
return b.get("status") if status == 200 and b else None
|
|
|
|
|
|
_TERMINAL = {"success", "failure", "error", "killed"}
|
|
|
|
|
|
def watch_and_reflect(owner, name, number, num, recipe, sha, comment_id, run_url):
|
|
"""Poll the Drone build to completion, then edit the PR comment to reflect the outcome (D7).
|
|
Bounded by the build timeout (60m) + margin."""
|
|
import time as _t
|
|
|
|
deadline = _t.time() + 75 * 60
|
|
last = None
|
|
while _t.time() < deadline:
|
|
last = build_status(num)
|
|
if last in _TERMINAL:
|
|
break
|
|
_t.sleep(15)
|
|
icon = {"success": "✅"}.get(last, "❌")
|
|
verdict = "passed" if last == "success" else (last or "did not complete")
|
|
if comment_id:
|
|
edit_comment(
|
|
owner,
|
|
name,
|
|
comment_id,
|
|
f"cc-ci: run for `{recipe}` @ `{sha[:8]}` {icon} **{verdict}** → {run_url}",
|
|
)
|
|
log(f"reflected outcome build {num} ({recipe} PR #{number}): {last}")
|
|
|
|
|
|
def list_open_prs(full_name):
|
|
status, prs = _api(f"{GITEA_API}/repos/{full_name}/pulls?state=open&limit=50", GITEA_TOKEN)
|
|
return prs if status == 200 and prs else []
|
|
|
|
|
|
def list_comments(full_name, number):
|
|
status, cs = _api(f"{GITEA_API}/repos/{full_name}/issues/{number}/comments", GITEA_TOKEN)
|
|
return cs if status == 200 and cs else []
|
|
|
|
|
|
def _claim(comment_id) -> bool:
|
|
"""Atomically claim a comment id for processing. Returns False if already claimed (dedup)."""
|
|
if comment_id is None:
|
|
return True
|
|
with _PROCESSED_LOCK:
|
|
if comment_id in _PROCESSED:
|
|
return False
|
|
_PROCESSED.add(comment_id)
|
|
return True
|
|
|
|
|
|
def process_testme(full_name, owner, name, number, user, comment_id, source, quick=False):
|
|
"""Shared by both paths. Dedupes by comment id, checks authorization, resolves the PR head,
|
|
triggers the build, comments the run link. Returns (run_url|None, reason)."""
|
|
if not _claim(comment_id):
|
|
return None, "duplicate"
|
|
if not is_authorized(full_name, user):
|
|
log(f"rejected: {user} is not an authorized org member on {full_name}")
|
|
return None, "not authorized"
|
|
head = pr_head(owner, name, number)
|
|
if not head or not head["sha"]:
|
|
return None, "cannot resolve PR head"
|
|
num = trigger_build(name, head["sha"], number, head["repo"] or full_name, quick=quick)
|
|
if not num:
|
|
post_comment(owner, name, number, "cc-ci: failed to start a CI run (see bridge logs).")
|
|
return None, "trigger failed"
|
|
run_url = f"{DRONE_URL}/{CI_REPO}/{num}"
|
|
mode = " **(--quick: lower-confidence fast lane; does not gate merge)**" if quick else ""
|
|
cid = post_comment(
|
|
owner, name, number,
|
|
f"cc-ci: started CI run for `{name}` @ `{head['sha'][:8]}`{mode} → {run_url}",
|
|
)
|
|
log(
|
|
f"[{source}] triggered build {num} for {name}@{head['sha'][:8]} "
|
|
f"(PR #{number}, comment {comment_id}) by {user}"
|
|
)
|
|
# Reflect the final pass/fail back onto that comment when the build finishes (D7).
|
|
threading.Thread(
|
|
target=watch_and_reflect,
|
|
args=(owner, name, number, num, name, head["sha"], cid, run_url),
|
|
daemon=True,
|
|
).start()
|
|
return run_url, "ok"
|
|
|
|
|
|
class Handler(BaseHTTPRequestHandler):
|
|
def _send(self, code, msg=""):
|
|
self.send_response(code)
|
|
self.send_header("Content-Type", "text/plain")
|
|
self.end_headers()
|
|
self.wfile.write(msg.encode())
|
|
|
|
def do_GET(self):
|
|
if self.path.rstrip("/") in ("/hook/healthz", "/healthz"):
|
|
return self._send(200, "ok")
|
|
return self._send(404, "not found")
|
|
|
|
def do_POST(self):
|
|
# Optional push optimization; polling is primary. Deduped against the poller by comment id.
|
|
length = int(self.headers.get("Content-Length", 0))
|
|
body = self.rfile.read(length)
|
|
|
|
sig = self.headers.get("X-Gitea-Signature", "")
|
|
expected = hmac.new(HMAC_SECRET, body, hashlib.sha256).hexdigest()
|
|
if not hmac.compare_digest(sig, expected):
|
|
log(f"rejected: bad signature event={self.headers.get('X-Gitea-Event')}")
|
|
return self._send(401, "bad signature")
|
|
if self.headers.get("X-Gitea-Event") != "issue_comment":
|
|
return self._send(204, "ignored")
|
|
try:
|
|
payload = json.loads(body)
|
|
except ValueError:
|
|
return self._send(400, "bad json")
|
|
|
|
action = payload.get("action")
|
|
c = payload.get("comment") or {}
|
|
issue = payload.get("issue") or {}
|
|
repo = payload.get("repository") or {}
|
|
is_trigger, quick = parse_trigger(c.get("body"))
|
|
if action != "created" or not is_trigger:
|
|
return self._send(204, "ignored")
|
|
if not issue.get("pull_request"):
|
|
return self._send(204, "not a PR")
|
|
|
|
run_url, reason = process_testme(
|
|
repo.get("full_name", ""),
|
|
(repo.get("owner") or {}).get("login", ""),
|
|
repo.get("name", ""),
|
|
issue.get("number"),
|
|
c.get("user", {}).get("login", ""),
|
|
c.get("id"),
|
|
"webhook",
|
|
quick=quick,
|
|
)
|
|
if not run_url:
|
|
if reason == "duplicate":
|
|
return self._send(200, "already handled")
|
|
return self._send(403 if reason == "not authorized" else 502, reason)
|
|
return self._send(201, run_url)
|
|
|
|
def log_message(self, *a):
|
|
pass
|
|
|
|
|
|
def poll_loop():
|
|
"""Primary trigger path. Outbound, read-only. Fires on NEW `!testme` comments only (the first
|
|
pass marks pre-existing comments seen)."""
|
|
repos = [r.strip() for r in os.environ.get("POLL_REPOS", CI_REPO).split(",") if r.strip()]
|
|
interval = int(os.environ.get("POLL_INTERVAL", "30"))
|
|
first = True
|
|
log(f"poller (primary) watching {repos} every {interval}s")
|
|
while True:
|
|
for full_name in repos:
|
|
owner, _, name = full_name.partition("/")
|
|
for pr in list_open_prs(full_name):
|
|
number = pr.get("number")
|
|
for c in list_comments(full_name, number):
|
|
is_trigger, quick = parse_trigger(c.get("body"))
|
|
if not is_trigger:
|
|
continue
|
|
cid = c.get("id")
|
|
if first:
|
|
_claim(cid) # mark pre-existing comments seen; don't fire on startup
|
|
continue
|
|
user = (c.get("user") or {}).get("login", "")
|
|
process_testme(full_name, owner, name, number, user, cid, "poll", quick=quick)
|
|
first = False
|
|
time.sleep(interval)
|
|
|
|
|
|
def main():
|
|
# Polling is the primary trigger; start it unconditionally.
|
|
threading.Thread(target=poll_loop, daemon=True).start()
|
|
host, _, port = os.environ.get("BRIDGE_LISTEN", "0.0.0.0:8080").rpartition(":")
|
|
srv = ThreadingHTTPServer((host or "0.0.0.0", int(port)), Handler)
|
|
log(f"comment-bridge listening on {host or '0.0.0.0'}:{port} (poll primary + optional webhook)")
|
|
srv.serve_forever()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|