"""bluesky-pds — recipe-specific functional test (Phase 2 P3 §4.3 prescribed create-and-read). Plan §4.3 explicitly: "bluesky-pds — create a test account (goat CLI), create a post via atproto, fetch it back, delete the account." The recipe-maintainer corpus's `goat_account.py` ports here; we also add the atproto post round-trip — proves the recipe's defining behavior (account lifecycle + atproto repo CRUD). Flow (all account-management via the `goat` CLI inside the PDS container, atproto repo CRUD via the public XRPC API): 1. `goat pds describe ` (in-container) — assert the recipe-served DID `did:web:` appears in the output (proves the PDS is self-identifying correctly). 2. `goat pds admin account create --handle --email --password ` to create a per-run UUID-suffixed test account. Parse the new account's DID from output. 3. `POST /xrpc/com.atproto.server.createSession` (public XRPC) with the new account's handle + password → obtain accessJwt. 4. `POST /xrpc/com.atproto.repo.createRecord` with collection=`app.bsky.feed.post`, the new account's DID as `repo`, and a `text` field carrying a unique marker. Parse the returned `uri` (atproto record URI: `at:///app.bsky.feed.post/`). 5. `GET /xrpc/com.atproto.repo.getRecord?repo=&collection=app.bsky.feed.post&rkey=` → assert the returned record's `value.text` matches the marker (post round-trip ✓). 6. `goat pds admin account delete ` cleanup (idempotent — best-effort; per-run teardown would clean it anyway). Non-vacuous: every step exercises a different PDS layer (PDS-DID, admin API, public auth, repo CRUD). A wedged PDS subsystem fails AT its layer. """ from __future__ import annotations import contextlib import os import re import secrets import shlex import sys import uuid sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", "runner")) from harness import http as harness_http # noqa: E402 from harness import lifecycle PDS_HOST_LOCAL = "http://localhost:3000" def _in_container(domain: str, shell_cmd: str) -> str: """Run `shell_cmd` inside the PDS app container via exec_in_app (sh -c wrapper).""" # The admin_pw_flag uses $(cat ...) which only the sh inside the container can expand — # callers pass the raw shell command including those substitutions. return lifecycle.exec_in_app(domain, ["sh", "-c", shell_cmd], timeout=120) def _goat_admin(domain: str, args: str) -> str: """`goat pds admin ` inside the container, with --admin-password from /run/secrets and --pds-host pointing at localhost:3000 (the PDS's internal listener).""" cmd = ( f"goat pds admin {args} " f'--admin-password "$(cat /run/secrets/pds_admin_password)" ' f"--pds-host {PDS_HOST_LOCAL} 2>&1" ) return _in_container(domain, cmd) def _xrpc_post( domain: str, nsid: str, data: dict, token: str | None = None ) -> tuple[int, dict | None]: headers = {} if token: headers["Authorization"] = f"Bearer {token}" return harness_http.http_post(f"https://{domain}/xrpc/{nsid}", data=data, headers=headers) def _xrpc_get( domain: str, nsid: str, query: str, token: str | None = None ) -> tuple[int, dict | None]: headers = {} if token: headers["Authorization"] = f"Bearer {token}" return harness_http.http_get(f"https://{domain}/xrpc/{nsid}?{query}", headers=headers) def test_account_lifecycle_and_post_roundtrip(live_app): """Full account + post round-trip via goat CLI + atproto XRPC.""" domain = live_app suffix = uuid.uuid4().hex[:8] handle = f"ccci-{suffix}.{domain}" email = f"ccci-{suffix}@{domain}" password = "ccci-" + secrets.token_urlsafe(12) # Step 1: PDS describe via goat — recipe self-identifies as did:web: out = _in_container(domain, f"goat pds describe {PDS_HOST_LOCAL} 2>&1") assert ( f"did:web:{domain}" in out ), f"goat pds describe did not contain expected DID 'did:web:{domain}'. Output:\n{out[:500]!r}" # Step 2: Create account (UUID-suffixed handle = no run-to-run collision) out = _goat_admin( domain, f"account create --handle {shlex.quote(handle)} --email {shlex.quote(email)} " f"--password {shlex.quote(password)}", ) m = re.search(r"did:plc:[a-z0-9]+", out) assert m, f"goat account create produced no DID. Output:\n{out[:500]!r}" new_did = m.group(0) cleanup_did = new_did # for the finally cleanup try: # Step 3: Public-API session create (login as the new account) s, body = _xrpc_post( domain, "com.atproto.server.createSession", data={"identifier": handle, "password": password}, ) assert s == 200, f"createSession HTTP {s}: {body!r}" token = (body or {}).get("accessJwt") assert token, f"createSession returned no accessJwt: {body!r}" # Step 4: Create a post via atproto repo.createRecord marker = f"ccci-bskypost-{uuid.uuid4().hex}" s, body = _xrpc_post( domain, "com.atproto.repo.createRecord", data={ "repo": new_did, "collection": "app.bsky.feed.post", "record": { "$type": "app.bsky.feed.post", "text": marker, "createdAt": "2026-05-28T12:00:00Z", }, }, token=token, ) assert s == 200, f"createRecord HTTP {s}: {body!r}" record_uri = (body or {}).get("uri", "") # URI format: at:///app.bsky.feed.post/ assert record_uri.startswith( f"at://{new_did}/app.bsky.feed.post/" ), f"unexpected record uri: {record_uri!r}" rkey = record_uri.rsplit("/", 1)[-1] assert rkey, f"no rkey in uri: {record_uri!r}" # Step 5: Fetch the post back via repo.getRecord — assert text round-trips s, body = _xrpc_get( domain, "com.atproto.repo.getRecord", f"repo={new_did}&collection=app.bsky.feed.post&rkey={rkey}", token=token, ) assert s == 200, f"getRecord HTTP {s}: {body!r}" record_value = (body or {}).get("value", {}) assert ( record_value.get("text") == marker ), f"post text did not round-trip: created={marker!r}, fetched={record_value.get('text')!r}" assert record_value.get("$type") == "app.bsky.feed.post" finally: # Step 6: Best-effort cleanup. (The per-run domain teardown will discard the volume # too, but we exercise the delete-account path because it's part of §4.3.) if cleanup_did: with contextlib.suppress(Exception): _goat_admin(domain, f"account delete {cleanup_did}")