Files
cc-ci/bridge/bridge.py
autonomic-bot 7addb9686c
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone Build is passing
bridge: polling primary + org-membership auth (orchestrator design change)
Polling is now the primary, read-only trigger (always-on thread); the /hook
webhook is an optional admin-registered push optimization deduped by comment id.
Authorize commenters via GET /orgs/{owner}/members/{user} (204, read-level) +
optional allowlist, replacing the admin-requiring /collaborators permission
endpoint. Bot never self-registers webhooks. Enroll = POLL_REPOS + tests/<recipe>/.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 02:41:25 +01:00

257 lines
10 KiB
Python

#!/usr/bin/env python3
"""cc-ci comment-bridge (§4.1).
When an *authorized* user comments exactly `!testme` on an open PR in an enrolled recipe repo,
trigger a parameterized Drone build of the cc-ci pipeline for that PR's head commit and post a PR
comment linking the run. Everything else is ignored.
Trigger paths (§4.1, SETTLED):
* POLLING is PRIMARY (always on): the bridge polls each enrolled repo's open PRs for new
`!testme` comments every POLL_INTERVAL seconds. This is outbound (cc-ci -> git.autonomic.zone)
and needs only READ + comment access — never repo-admin. It is the source of truth for D1.
* WEBHOOK is an OPTIONAL push optimization: the `/hook` endpoint stays live so a Gitea
`issue_comment` webhook, *if an admin registered one*, lowers latency. The bridge NEVER
self-registers a webhook (that needs repo-admin, which we refuse). Manual registration is
documented in docs/enroll-recipe.md.
Both paths share an in-memory seen-set keyed by comment id, so a comment seen by both fires at most
once (no double-trigger). On startup the first poll marks pre-existing comments seen so old comments
don't re-fire. Python stdlib only.
Authorization: a commenter is allowed iff they are a member of the repo's owning org
(`GET /orgs/{owner}/members/{user}` -> 204), which is readable by any org member (read-level, no
admin). An optional AUTH_ALLOWLIST (csv of usernames) is also honored. Fail-closed on any error.
Config (env): BRIDGE_LISTEN, GITEA_API, DRONE_URL, CI_REPO, HMAC_FILE, DRONE_TOKEN_FILE,
GITEA_TOKEN_FILE, POLL_INTERVAL (default 30), POLL_REPOS (csv of enrolled repos), AUTH_ALLOWLIST
(csv, optional).
"""
import hashlib
import hmac
import json
import os
import sys
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
GITEA_API = os.environ.get("GITEA_API", "https://git.autonomic.zone/api/v1")
DRONE_URL = os.environ.get("DRONE_URL", "https://drone.ci.commoninternet.net")
CI_REPO = os.environ.get("CI_REPO", "recipe-maintainers/cc-ci")
TRIGGER = "!testme"
ALLOWLIST = {u.strip() for u in os.environ.get("AUTH_ALLOWLIST", "").split(",") if u.strip()}
def _read(path):
with open(path) as fh:
return fh.read().strip()
HMAC_SECRET = _read(os.environ["HMAC_FILE"]).encode()
DRONE_TOKEN = _read(os.environ["DRONE_TOKEN_FILE"])
GITEA_TOKEN = _read(os.environ["GITEA_TOKEN_FILE"])
# Shared dedup across the poll + webhook paths: a comment id triggers at most one run.
_PROCESSED: set = set()
_PROCESSED_LOCK = threading.Lock()
def log(*a):
print(*a, file=sys.stderr, flush=True)
def _api(url, token, method="GET", data=None, scheme="token"):
# Gitea wants "Authorization: token <t>"; Drone wants "Authorization: Bearer <t>".
headers = {"Authorization": f"{scheme} {token}"} if token else {}
body = None
if data is not None:
body = json.dumps(data).encode()
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
raw = resp.read()
return resp.status, (json.loads(raw) if raw else None)
except urllib.error.HTTPError as e:
return e.code, None
except (urllib.error.URLError, OSError) as e:
log("api error", url, e)
return None, None
def is_authorized(full_name, user):
"""Allowed iff the user is a member of the repo's owning org (read-level membership check) or in
the static AUTH_ALLOWLIST. Uses GET /orgs/{owner}/members/{user} (204=member), which any org
member can read — no repo-admin needed. Fail-closed: anything other than a clean 204/allowlist
hit is rejected."""
if not user:
return False
if user in ALLOWLIST:
return True
owner = full_name.partition("/")[0]
status, _ = _api(f"{GITEA_API}/orgs/{owner}/members/{user}", GITEA_TOKEN)
return status == 204
def pr_head(owner, repo, number):
status, pr = _api(f"{GITEA_API}/repos/{owner}/{repo}/pulls/{number}", GITEA_TOKEN)
if status != 200 or not pr:
return None
head = pr.get("head", {})
return {"sha": head.get("sha"), "repo": (head.get("repo") or {}).get("full_name")}
def trigger_build(recipe, ref, pr, src):
# Drone "create build" with custom params -> exposed to the pipeline as env vars.
q = urllib.parse.urlencode(
{"branch": "main", "RECIPE": recipe, "REF": ref, "PR": str(pr), "SRC": src}
)
url = f"{DRONE_URL}/api/repos/{CI_REPO}/builds?{q}"
status, build = _api(url, DRONE_TOKEN, method="POST", scheme="Bearer")
if status in (200, 201) and build:
return build.get("number")
log("drone trigger failed", status)
return None
def post_comment(owner, repo, number, body):
_api(f"{GITEA_API}/repos/{owner}/{repo}/issues/{number}/comments", GITEA_TOKEN,
method="POST", data={"body": body})
def list_open_prs(full_name):
status, prs = _api(f"{GITEA_API}/repos/{full_name}/pulls?state=open&limit=50", GITEA_TOKEN)
return prs if status == 200 and prs else []
def list_comments(full_name, number):
status, cs = _api(f"{GITEA_API}/repos/{full_name}/issues/{number}/comments", GITEA_TOKEN)
return cs if status == 200 and cs else []
def _claim(comment_id) -> bool:
"""Atomically claim a comment id for processing. Returns False if already claimed (dedup)."""
if comment_id is None:
return True
with _PROCESSED_LOCK:
if comment_id in _PROCESSED:
return False
_PROCESSED.add(comment_id)
return True
def process_testme(full_name, owner, name, number, user, comment_id, source):
"""Shared by both paths. Dedupes by comment id, checks authorization, resolves the PR head,
triggers the build, comments the run link. Returns (run_url|None, reason)."""
if not _claim(comment_id):
return None, "duplicate"
if not is_authorized(full_name, user):
log(f"rejected: {user} is not an authorized org member on {full_name}")
return None, "not authorized"
head = pr_head(owner, name, number)
if not head or not head["sha"]:
return None, "cannot resolve PR head"
num = trigger_build(name, head["sha"], number, head["repo"] or full_name)
if not num:
post_comment(owner, name, number, "cc-ci: failed to start a CI run (see bridge logs).")
return None, "trigger failed"
run_url = f"{DRONE_URL}/{CI_REPO}/{num}"
post_comment(owner, name, number,
f"cc-ci: started CI run for `{name}` @ `{head['sha'][:8]}` → {run_url}")
log(f"[{source}] triggered build {num} for {name}@{head['sha'][:8]} "
f"(PR #{number}, comment {comment_id}) by {user}")
return run_url, "ok"
class Handler(BaseHTTPRequestHandler):
def _send(self, code, msg=""):
self.send_response(code)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(msg.encode())
def do_GET(self):
if self.path.rstrip("/") in ("/hook/healthz", "/healthz"):
return self._send(200, "ok")
return self._send(404, "not found")
def do_POST(self):
# Optional push optimization; polling is primary. Deduped against the poller by comment id.
length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(length)
sig = self.headers.get("X-Gitea-Signature", "")
expected = hmac.new(HMAC_SECRET, body, hashlib.sha256).hexdigest()
if not hmac.compare_digest(sig, expected):
log(f"rejected: bad signature event={self.headers.get('X-Gitea-Event')}")
return self._send(401, "bad signature")
if self.headers.get("X-Gitea-Event") != "issue_comment":
return self._send(204, "ignored")
try:
payload = json.loads(body)
except ValueError:
return self._send(400, "bad json")
action = payload.get("action")
c = payload.get("comment") or {}
issue = payload.get("issue") or {}
repo = payload.get("repository") or {}
if action != "created" or (c.get("body") or "").strip() != TRIGGER:
return self._send(204, "ignored")
if not issue.get("pull_request"):
return self._send(204, "not a PR")
run_url, reason = process_testme(
repo.get("full_name", ""), (repo.get("owner") or {}).get("login", ""),
repo.get("name", ""), issue.get("number"),
c.get("user", {}).get("login", ""), c.get("id"), "webhook")
if not run_url:
if reason == "duplicate":
return self._send(200, "already handled")
return self._send(403 if reason == "not authorized" else 502, reason)
return self._send(201, run_url)
def log_message(self, *a):
pass
def poll_loop():
"""Primary trigger path. Outbound, read-only. Fires on NEW `!testme` comments only (the first
pass marks pre-existing comments seen)."""
repos = [r.strip() for r in os.environ.get("POLL_REPOS", CI_REPO).split(",") if r.strip()]
interval = int(os.environ.get("POLL_INTERVAL", "30"))
first = True
log(f"poller (primary) watching {repos} every {interval}s")
while True:
for full_name in repos:
owner, _, name = full_name.partition("/")
for pr in list_open_prs(full_name):
number = pr.get("number")
for c in list_comments(full_name, number):
if (c.get("body") or "").strip() != TRIGGER:
continue
cid = c.get("id")
if first:
_claim(cid) # mark pre-existing comments seen; don't fire on startup
continue
user = (c.get("user") or {}).get("login", "")
process_testme(full_name, owner, name, number, user, cid, "poll")
first = False
time.sleep(interval)
def main():
# Polling is the primary trigger; start it unconditionally.
threading.Thread(target=poll_loop, daemon=True).start()
host, _, port = os.environ.get("BRIDGE_LISTEN", "0.0.0.0:8080").rpartition(":")
srv = ThreadingHTTPServer((host or "0.0.0.0", int(port)), Handler)
log(f"comment-bridge listening on {host or '0.0.0.0'}:{port} (poll primary + optional webhook)")
srv.serve_forever()
if __name__ == "__main__":
main()