Files
cc-ci/tests/matrix-synapse/functional/test_register_and_message.py
autonomic-bot 9a7772563a style: repo-wide lint pass — make the lint gate green again
Push builds have been RED on the lint step since ~build 209 from accumulated
formatting drift. This is the mechanical cleanup: ruff format + ruff --fix
(UP038 isinstance unions, SIM105 contextlib.suppress, UP031 f-strings, SIM115
tempfile context manager), shfmt -i 2 -ci, nixpkgs-fmt/statix/deadnix (merged
attrsets, dropped unused lib args), yamllint, and shell quoting fixes in
tests/lasuite-docs/setup_custom_tests.sh. No behaviour changes intended;
lint: PASS, unit tests: 138 passed.
2026-06-09 21:56:15 +00:00

246 lines
10 KiB
Python

"""matrix-synapse — recipe-specific functional test (Phase 2 P3 §4.3 prescribed test).
Plan §4.3 explicitly: "register two users (admin API); one sends a room message, the other reads
it" — the canonical create-and-read-back for matrix-synapse.
Implementation note: matrix-synapse's `/_synapse/admin/v1/*` endpoints are NOT routed publicly
by this recipe (Synapse's recommended posture), AND the recipe doesn't expose
`enable_registration_without_verification` as an env var (so we cannot just set
`ENABLE_REGISTRATION=true` — synapse refuses to start with open registration absent that flag).
So we use the **shared-secret admin register endpoint via `exec_in_app`**: from inside the
synapse container, curl `http://localhost:8008/_synapse/admin/v1/register` — this bypasses the
public router (where /_synapse/admin/* returns 404), uses the registration_shared_secret directly,
and works without changing the recipe's registration posture.
Flow:
1. Read the abra-generated `registration_shared_secret` from `/run/secrets/registration` inside
the synapse container.
2. For each user: GET admin/v1/register via localhost to obtain a nonce; HMAC-SHA1 the message
`nonce\\0user\\0pass\\0notadmin` keyed by the shared secret; POST the register payload back.
3. Both users login via the public `/_matrix/client/v3/login` to obtain access_tokens (login IS
routed publicly).
4. user_a creates a private_chat room (`POST /_matrix/client/v3/createRoom`); invites user_b.
5. user_b joins (`POST /_matrix/client/v3/join/<room_id>`).
6. user_a PUTs an `m.room.message` with a unique marker.
7. user_b GETs `/_matrix/client/v3/rooms/<room_id>/messages?dir=b` and asserts the marker is
present.
Non-vacuous: every step exercises a different synapse layer (admin shared-secret register,
client login, room create/invite/join, message send/receive). A broken Synapse fails AT the
step where it's broken.
"""
from __future__ import annotations
import hashlib
import hmac
import json
import os
import shlex
import sys
import uuid
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "runner"))
from harness import http as harness_http # noqa: E402
from harness import lifecycle
def _registration_secret(domain: str) -> str:
"""Read /run/secrets/registration from inside the synapse app container."""
return lifecycle.exec_in_app(domain, ["cat", "/run/secrets/registration"]).strip()
def _container_curl(domain: str, method: str, path: str, body: dict | None = None) -> dict:
"""curl http://localhost:8008<path> from inside the synapse container. Returns parsed JSON.
/_synapse/admin/* is bound on synapse's listener but NOT routed by the recipe's nginx, so we
have to talk to it via localhost from inside the container. The synapse container has curl in
its base image (matrixdotorg/synapse — Python image with curl available)."""
cmd_parts = ["curl", "-s", "-X", method, "-w", "\\n%{http_code}"]
if body is not None:
cmd_parts += ["-H", "Content-Type: application/json", "-d", json.dumps(body)]
cmd_parts.append(f"http://localhost:8008{path}")
# build a sh -c command so we can run curl with the JSON body properly quoted
sh_cmd = " ".join(shlex.quote(p) for p in cmd_parts)
out = lifecycle.exec_in_app(domain, ["sh", "-c", sh_cmd]).strip()
# Last newline-separated token is the HTTP status; everything before is the body
if "\n" in out:
body_str, _, status_str = out.rpartition("\n")
else:
body_str, status_str = out, "0"
try:
status = int(status_str.strip())
except ValueError:
status = 0
try:
parsed = json.loads(body_str) if body_str.strip() else None
except (json.JSONDecodeError, ValueError):
parsed = None
return {"status": status, "body": parsed, "raw": body_str}
def _admin_register(domain: str, secret: str, username: str, password: str, admin: bool) -> dict:
"""Register a user via the shared-secret admin endpoint, called from inside the container.
Readiness-robust: in the FULL lifecycle the custom tier runs right after the restore tier, which
`DROP DATABASE … WITH (FORCE)` + recreates synapse's postgres DB (pg_backup.sh restore). Synapse
is still re-establishing its DB connection pool in that window, so a registration (a DB *write*)
can transiently return HTTP 500 M_UNKNOWN even though HTTP health (a read) is already green. We
poll: re-fetch a fresh nonce + re-POST on 5xx/transport-error until 200 or timeout, then RAISE. A
4xx (real rejection — bad MAC, user exists, policy) is NOT retried (fail fast). The assertion is
unchanged (registration must succeed); only the post-restore recovery window is tolerated."""
import time
admin_flag = "admin" if admin else "notadmin"
deadline = time.monotonic() + 90 # bounded recovery window
attempt = 0
last = {"status": 0, "body": None, "raw": ""}
while time.monotonic() < deadline:
attempt += 1
# Step 1: GET a fresh nonce (single-use; re-fetch each attempt)
r = _container_curl(domain, "GET", "/_synapse/admin/v1/register")
if r["status"] in (500, 502, 503, 504, 0):
last = r
print(
f" [register] {username}: nonce GET transient {r['status']} "
f"(attempt {attempt}, synapse recovering) — retrying",
flush=True,
)
time.sleep(5)
continue
assert r["status"] == 200, f"nonce GET failed: status={r['status']} raw={r['raw'][:200]!r}"
nonce = (r["body"] or {}).get("nonce")
assert nonce, f"no nonce in response: {r['body']!r}"
# Step 2: HMAC and POST
msg = f"{nonce}\0{username}\0{password}\0{admin_flag}".encode()
mac = hmac.new(secret.encode(), msg, hashlib.sha1).hexdigest()
payload = {
"nonce": nonce,
"username": username,
"password": password,
"mac": mac,
"admin": admin,
}
r = _container_curl(domain, "POST", "/_synapse/admin/v1/register", body=payload)
if r["status"] == 200:
if attempt > 1:
print(
f" [register] {username}: succeeded on attempt {attempt} "
f"(synapse recovered)",
flush=True,
)
return r["body"] or {}
if r["status"] in (500, 502, 503, 504, 0):
last = r
print(
f" [register] {username}: POST transient {r['status']} "
f"(attempt {attempt}, synapse recovering) — retrying",
flush=True,
)
time.sleep(5)
continue
# a 4xx is a real rejection — fail fast, do not retry
raise AssertionError(
f"register {username!r} rejected: status={r['status']} body={r['body']!r}"
)
raise AssertionError(
f"register {username!r} never succeeded within the post-restore recovery window "
f"({attempt} attempts, 90s): last status={last['status']} body={last['body']!r}"
)
def _login(domain: str, username: str, password: str) -> str:
"""Public client-API password login → access_token."""
url = f"https://{domain}/_matrix/client/v3/login"
s, body = harness_http.http_post(
url,
data={
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": username},
"password": password,
},
)
assert s == 200, f"login {username} HTTP {s}: {body!r}"
token = (body or {}).get("access_token")
assert isinstance(token, str) and token, f"login returned no access_token: {body!r}"
return token
def _auth(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
def test_register_two_users_send_receive_message(live_app):
"""End-to-end: register 2 users via admin shared-secret (via container localhost); login;
create + invite + join a room; send and read a message."""
domain = live_app
secret = _registration_secret(domain)
assert (
secret and len(secret) >= 16
), f"registration shared secret missing/short: len={len(secret) if secret else 0}"
suffix = uuid.uuid4().hex[:8]
user_a = f"alice{suffix}"
user_b = f"bob{suffix}"
password = "TestPass-" + uuid.uuid4().hex[:8] + "1A"
# Register both via shared-secret admin register (container localhost)
_admin_register(domain, secret, user_a, password, admin=False)
_admin_register(domain, secret, user_b, password, admin=False)
# Login via the public client API
tok_a = _login(domain, user_a, password)
tok_b = _login(domain, user_b, password)
# user_a creates a room
s, body = harness_http.http_post(
f"https://{domain}/_matrix/client/v3/createRoom",
data={"preset": "private_chat", "name": f"ccci-room-{suffix}"},
headers=_auth(tok_a),
)
assert s == 200, f"createRoom HTTP {s}: {body!r}"
room_id = (body or {}).get("room_id")
assert isinstance(room_id, str) and room_id.startswith("!"), f"bad room_id: {room_id!r}"
# invite user_b
s, body = harness_http.http_post(
f"https://{domain}/_matrix/client/v3/rooms/{room_id}/invite",
data={"user_id": f"@{user_b}:{domain}"},
headers=_auth(tok_a),
)
assert s == 200, f"invite HTTP {s}: {body!r}"
# user_b joins
s, body = harness_http.http_post(
f"https://{domain}/_matrix/client/v3/join/{room_id}", data={}, headers=_auth(tok_b)
)
assert s == 200, f"join HTTP {s}: {body!r}"
# user_a sends a uniquely-marked message
marker = f"ccci-marker-{uuid.uuid4().hex}"
txn_id = uuid.uuid4().hex
s, body = harness_http.http_request(
"PUT",
f"https://{domain}/_matrix/client/v3/rooms/{room_id}/send/m.room.message/{txn_id}",
data={"msgtype": "m.text", "body": marker},
headers=_auth(tok_a),
)
assert s == 200, f"send HTTP {s}: {body!r}"
event_id = (body or {}).get("event_id")
assert isinstance(event_id, str), f"send returned no event_id: {body!r}"
# user_b reads the room's messages and finds the marker
s, body = harness_http.http_get(
f"https://{domain}/_matrix/client/v3/rooms/{room_id}/messages?dir=b&limit=20",
headers=_auth(tok_b),
)
assert s == 200, f"read messages HTTP {s}: {body!r}"
chunk = (body or {}).get("chunk", [])
bodies = [e.get("content", {}).get("body", "") for e in chunk if isinstance(e, dict)]
assert marker in bodies, (
f"user_b did not see user_a's marker {marker!r}. event_id={event_id}; "
f"messages={bodies[:10]}"
)