"""matrix-synapse — pre-op seed hooks (Phase 1e HC3). The orchestrator runs these BEFORE the op; the matching test_.py asserts post-op (assertion-only). Backup/restore still use a dedicated `ci_marker` row in postgres (the recipe's pg_backup.sh dump path). Upgrade now seeds REAL Matrix application data instead: two users, a room, and a message. The helper persists only the test metadata to `/data/ccci-upgrade-state.json` so the post-upgrade assertion can log back in and prove the pre-upgrade message is still readable via the real Matrix API. """ import hashlib import hmac import json import os import sys import uuid sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "runner")) from harness import http as harness_http, lifecycle # noqa: E402 UPGRADE_STATE = "/data/ccci-upgrade-state.json" def _psql(domain, sql): cmd = f'PGPASSWORD=$(cat /run/secrets/db_password) psql -U synapse -d synapse -tAc "{sql}"' return lifecycle.exec_in_app(domain, ["sh", "-c", cmd], service="db").strip() def _seed(domain, value): _psql( domain, "CREATE TABLE IF NOT EXISTS ci_marker(v text); DELETE FROM ci_marker; " f"INSERT INTO ci_marker VALUES('{value}');", ) assert _psql(domain, "SELECT v FROM ci_marker;") == value def _registration_secret(domain: str) -> str: return lifecycle.exec_in_app(domain, ["cat", "/run/secrets/registration"]).strip() def _container_curl(domain: str, method: str, path: str, body: dict | None = None) -> dict: import shlex cmd_parts = ["curl", "-s", "-X", method, "-w", "\\n%{http_code}"] if body is not None: cmd_parts += ["-H", "Content-Type: application/json", "-d", json.dumps(body)] cmd_parts.append(f"http://localhost:8008{path}") sh_cmd = " ".join(shlex.quote(p) for p in cmd_parts) out = lifecycle.exec_in_app(domain, ["sh", "-c", sh_cmd]).strip() if "\n" in out: body_str, _, status_str = out.rpartition("\n") else: body_str, status_str = out, "0" try: status = int(status_str.strip()) except ValueError: status = 0 try: parsed = json.loads(body_str) if body_str.strip() else None except (json.JSONDecodeError, ValueError): parsed = None return {"status": status, "body": parsed, "raw": body_str} def _admin_register(domain: str, secret: str, username: str, password: str, admin: bool) -> dict: import time admin_flag = "admin" if admin else "notadmin" deadline = time.monotonic() + 90 last = {"status": 0, "body": None, "raw": ""} attempt = 0 while time.monotonic() < deadline: attempt += 1 r = _container_curl(domain, "GET", "/_synapse/admin/v1/register") if r["status"] in (500, 502, 503, 504, 0): last = r time.sleep(5) continue assert r["status"] == 200, f"nonce GET failed: status={r['status']} raw={r['raw'][:200]!r}" nonce = (r["body"] or {}).get("nonce") assert nonce, f"no nonce in response: {r['body']!r}" msg = f"{nonce}\0{username}\0{password}\0{admin_flag}".encode() mac = hmac.new(secret.encode(), msg, hashlib.sha1).hexdigest() payload = { "nonce": nonce, "username": username, "password": password, "mac": mac, "admin": admin, } r = _container_curl(domain, "POST", "/_synapse/admin/v1/register", body=payload) if r["status"] == 200: return r["body"] or {} if r["status"] in (500, 502, 503, 504, 0): last = r time.sleep(5) continue raise AssertionError(f"register {username!r} rejected: status={r['status']} body={r['body']!r}") raise AssertionError( f"register {username!r} never succeeded within 90s: " f"last status={last['status']} body={last['body']!r}" ) def _login(domain: str, username: str, password: str) -> str: url = f"https://{domain}/_matrix/client/v3/login" s, body = harness_http.http_post( url, data={ "type": "m.login.password", "identifier": {"type": "m.id.user", "user": username}, "password": password, }, ) assert s == 200, f"login {username} HTTP {s}: {body!r}" token = (body or {}).get("access_token") assert isinstance(token, str) and token, f"login returned no access_token: {body!r}" return token def _auth(token: str) -> dict: return {"Authorization": f"Bearer {token}"} def _write_upgrade_state(domain: str, payload: dict) -> None: script = ( "import json; " f"open({UPGRADE_STATE!r}, 'w').write(json.dumps({json.dumps(payload)}))" ) lifecycle.exec_in_app(domain, ["python", "-c", script]) def _read_messages(domain: str, room_id: str, token: str) -> list[str]: s, body = harness_http.http_get( f"https://{domain}/_matrix/client/v3/rooms/{room_id}/messages?dir=b&limit=20", headers=_auth(token), ) assert s == 200, f"read messages HTTP {s}: {body!r}" chunk = (body or {}).get("chunk", []) return [e.get("content", {}).get("body", "") for e in chunk if isinstance(e, dict)] def pre_upgrade(domain, meta): secret = _registration_secret(domain) assert secret and len(secret) >= 16, ( f"registration shared secret missing/short: len={len(secret) if secret else 0}" ) suffix = uuid.uuid4().hex[:8] user_a = f"upgradea{suffix}" user_b = f"upgradeb{suffix}" password = "UpgradePass-" + uuid.uuid4().hex[:8] + "1A" _admin_register(domain, secret, user_a, password, admin=False) _admin_register(domain, secret, user_b, password, admin=False) tok_a = _login(domain, user_a, password) tok_b = _login(domain, user_b, password) s, body = harness_http.http_post( f"https://{domain}/_matrix/client/v3/createRoom", data={"preset": "private_chat", "name": f"ccci-upgrade-room-{suffix}"}, headers=_auth(tok_a), ) assert s == 200, f"createRoom HTTP {s}: {body!r}" room_id = (body or {}).get("room_id") assert isinstance(room_id, str) and room_id.startswith("!"), f"bad room_id: {room_id!r}" s, body = harness_http.http_post( f"https://{domain}/_matrix/client/v3/rooms/{room_id}/invite", data={"user_id": f"@{user_b}:{domain}"}, headers=_auth(tok_a), ) assert s == 200, f"invite HTTP {s}: {body!r}" s, body = harness_http.http_post( f"https://{domain}/_matrix/client/v3/join/{room_id}", data={}, headers=_auth(tok_b) ) assert s == 200, f"join HTTP {s}: {body!r}" marker = f"ccci-upgrade-marker-{uuid.uuid4().hex}" txn_id = uuid.uuid4().hex s, body = harness_http.http_request( "PUT", f"https://{domain}/_matrix/client/v3/rooms/{room_id}/send/m.room.message/{txn_id}", data={"msgtype": "m.text", "body": marker}, headers=_auth(tok_a), ) assert s == 200, f"send HTTP {s}: {body!r}" assert isinstance((body or {}).get("event_id"), str), f"send returned no event_id: {body!r}" bodies = _read_messages(domain, room_id, tok_b) assert marker in bodies, f"pre-upgrade marker {marker!r} not visible before upgrade: {bodies[:10]}" _write_upgrade_state( domain, {"user_b": user_b, "password": password, "room_id": room_id, "marker": marker}, ) def pre_backup(domain, meta): _seed(domain, "original") def pre_restore(domain, meta): # drop the marker table (diverge from the backup) so a successful restore is observable _psql(domain, "DROP TABLE ci_marker;") assert _psql(domain, "SELECT to_regclass('public.ci_marker');") in ( "", "NULL", ), "drop did not take"