Files
cc-ci/tests/mumble/functional/_mumble_proto.py
autonomic-bot 9a7772563a style: repo-wide lint pass — make the lint gate green again
Push builds have been RED on the lint step since ~build 209 from accumulated
formatting drift. This is the mechanical cleanup: ruff format + ruff --fix
(UP038 isinstance unions, SIM105 contextlib.suppress, UP031 f-strings, SIM115
tempfile context manager), shfmt -i 2 -ci, nixpkgs-fmt/statix/deadnix (merged
attrsets, dropped unused lib args), yamllint, and shell quoting fixes in
tests/lasuite-docs/setup_custom_tests.sh. No behaviour changes intended;
lint: PASS, unit tests: 138 passed.
2026-06-09 21:56:15 +00:00

276 lines
9.3 KiB
Python

"""Minimal Mumble protocol client — vendored/adapted from the recipe-maintainer corpus
`recipe-info/mumble/tests/mumble_connect.py` (zero external deps; stdlib ssl/socket/struct only).
Mumble has no HTTP API for its voice server, so the canonical "is it really working" check is the
control-channel TLS handshake: connect, send Version + Authenticate, and read messages until the
server completes the handshake with a ServerSync. This module performs that handshake and returns a
structured result the cc-ci functional tests (parity port + recipe-specific) assert against.
On cc-ci the recipe is deployed with `compose.host-ports.yml`, which publishes 64738 directly on the
cc-ci host (`mode: host`); tests run on-host via cc-ci-run, so they connect to 127.0.0.1:64738.
"""
from __future__ import annotations
import contextlib
import socket
import ssl
import struct
import time
PORT = 64738
# Message types (Mumble.proto)
MSG_VERSION = 0
MSG_AUTHENTICATE = 2
MSG_REJECT = 4
MSG_SERVERSYNC = 5
MSG_CHANNELSTATE = 7
MSG_USERSTATE = 9
MSG_SERVERCONFIG = 24
REJECT_TYPES = {
0: "None",
1: "WrongVersion",
2: "InvalidUsername",
3: "WrongUserPW",
4: "WrongServerPW",
5: "UsernameInUse",
6: "ServerFull",
7: "NoCertificate",
8: "AuthenticatorFail",
}
_HEADER = ">HI"
_HEADER_SIZE = 6
def _enc_varint(value: int) -> bytes:
out = bytearray()
while value > 0x7F:
out.append((value & 0x7F) | 0x80)
value >>= 7
out.append(value & 0x7F)
return bytes(out)
def _enc_field_varint(field: int, value: int) -> bytes:
return _enc_varint(field << 3) + _enc_varint(value)
def _enc_field_string(field: int, value: str) -> bytes:
raw = value.encode("utf-8")
return _enc_varint((field << 3) | 2) + _enc_varint(len(raw)) + raw
def _dec_varint(data: bytes, off: int) -> tuple[int, int]:
result = shift = 0
while off < len(data):
b = data[off]
result |= (b & 0x7F) << shift
off += 1
if not (b & 0x80):
break
shift += 7
return result, off
def _dec_fields(data: bytes) -> dict:
fields: dict = {}
off = 0
while off < len(data):
tag, off = _dec_varint(data, off)
field, wire = tag >> 3, tag & 0x07
if wire == 0:
value, off = _dec_varint(data, off)
elif wire == 1:
value = struct.unpack_from("<Q", data, off)[0]
off += 8
elif wire == 2:
length, off = _dec_varint(data, off)
raw = data[off : off + length]
off += length
try:
value = raw.decode("utf-8")
except UnicodeDecodeError:
value = raw
elif wire == 5:
value = struct.unpack_from("<I", data, off)[0]
off += 4
else:
raise ValueError(f"unknown wire type {wire}")
fields[field] = value
return fields
def _send(sock, msg_type: int, payload: bytes = b"") -> None:
sock.sendall(struct.pack(_HEADER, msg_type, len(payload)) + payload)
def _recv(sock, timeout: float) -> tuple[int, bytes]:
sock.settimeout(timeout)
header = b""
while len(header) < _HEADER_SIZE:
chunk = sock.recv(_HEADER_SIZE - len(header))
if not chunk:
raise ConnectionError("connection closed reading header")
header += chunk
msg_type, length = struct.unpack(_HEADER, header)
payload = b""
while len(payload) < length:
chunk = sock.recv(length - len(payload))
if not chunk:
raise ConnectionError("connection closed reading payload")
payload += chunk
return msg_type, payload
def _build_version() -> bytes:
v = (1 << 16) | (5 << 8) | 0 # pretend client 1.5.0
return (
_enc_field_varint(1, v)
+ _enc_field_string(2, "cc-ci mumble probe 1.0")
+ _enc_field_string(3, "Linux")
)
def _build_authenticate(username: str, password: str = "") -> bytes:
payload = _enc_field_string(1, username)
if password:
payload += _enc_field_string(2, password)
payload += _enc_field_varint(5, 1) # opus = true
return payload
def handshake(
host: str = "127.0.0.1",
port: int = PORT,
username: str = "cc-ci-probe",
password: str = "",
timeout: float = 20.0,
) -> dict:
"""Full Mumble control-channel handshake. Returns a result dict:
tls_connect (bool), server_version (dict|None), auth_accepted (bool), channels (list[str]),
users (list[str]), server_sync (bool), welcome_text (str|None), server_config (dict),
error (str|None).
"""
result = {
"tls_connect": False,
"server_version": None,
"auth_accepted": False,
"channels": [],
"users": [],
"server_sync": False,
"welcome_text": None,
"server_config": {},
"error": None,
}
raw = tls = None
try:
raw = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
raw.settimeout(timeout)
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
tls = ctx.wrap_socket(raw, server_hostname=host)
tls.connect((host, port))
result["tls_connect"] = True
_send(tls, MSG_VERSION, _build_version())
_send(tls, MSG_AUTHENTICATE, _build_authenticate(username, password))
channels: dict = {}
users: dict = {}
deadline = time.time() + timeout
# Murmur usually sends ServerConfig around ServerSync; the exact order varies by version.
# So we don't stop at ServerSync — once it arrives we keep reading for a short grace window
# to also capture ServerConfig (used by the recipe-specific config round-trip tests), then
# stop. If both arrive we stop immediately.
grace_deadline = None
while time.time() < deadline:
if grace_deadline is not None and time.time() >= grace_deadline:
break
if result["server_sync"] and result["server_config"]:
break
remaining = deadline - time.time()
if grace_deadline is not None:
remaining = min(remaining, grace_deadline - time.time())
if remaining <= 0:
break
try:
msg_type, payload = _recv(tls, timeout=remaining)
except (TimeoutError, ConnectionError):
break
if msg_type == MSG_VERSION:
f = _dec_fields(payload)
v1 = f.get(1, 0)
result["server_version"] = {
"string": f"{(v1 >> 16) & 0xFF}.{(v1 >> 8) & 0xFF}.{v1 & 0xFF}",
"release": f.get(2, ""),
"os": f.get(3, ""),
}
elif msg_type == MSG_REJECT:
f = _dec_fields(payload)
result["error"] = (
f"Rejected: {REJECT_TYPES.get(f.get(1, 0), 'Unknown')} " f"{f.get(2, '')}"
)
return result
elif msg_type == MSG_CHANNELSTATE:
f = _dec_fields(payload)
ch_id = f.get(1, 0)
channels[ch_id] = f.get(3, f"channel-{ch_id}")
elif msg_type == MSG_USERSTATE:
f = _dec_fields(payload)
name = f.get(3, "")
if name:
users[f.get(1, 0)] = name
elif msg_type == MSG_SERVERCONFIG:
f = _dec_fields(payload)
# ServerConfig fields: 1 max_bandwidth, 2 welcome_text, 3 allow_html,
# 4 message_length, 5 image_message_length, 6 max_users
result["server_config"] = {
"max_bandwidth": f.get(1),
"welcome_text": f.get(2),
"allow_html": f.get(3),
"message_length": f.get(4),
"image_message_length": f.get(5),
"max_users": f.get(6),
}
elif msg_type == MSG_SERVERSYNC:
f = _dec_fields(payload)
result["welcome_text"] = f.get(3, "")
result["server_sync"] = True
result["auth_accepted"] = True
# keep reading a short grace window to catch a trailing ServerConfig
if grace_deadline is None:
grace_deadline = time.time() + 3.0
result["channels"] = list(channels.values())
result["users"] = list(users.values())
if not result["server_sync"] and not result["error"]:
result["error"] = "timed out waiting for ServerSync"
except (ConnectionError, ssl.SSLError, OSError) as e:
result["error"] = f"{type(e).__name__}: {e}"
finally:
if tls is not None:
with contextlib.suppress(OSError):
tls.shutdown(socket.SHUT_RDWR)
tls.close()
elif raw is not None:
raw.close()
return result
def retry_handshake(attempts: int = 12, interval: float = 5.0, **kwargs) -> dict:
"""Retry the handshake until ServerSync completes (cold-boot readiness) or attempts exhaust.
Returns the last result (caller asserts on it)."""
last: dict = {}
for _ in range(attempts):
last = handshake(**kwargs)
if last.get("server_sync"):
return last
time.sleep(interval)
return last