adversarial-mailman2to3/import_mailman3_list_config.py

164 lines
4.6 KiB
Python
Executable File

#!/bin/env python3
import sys
import requests
import argparse
import json
from pathlib import Path
from requests.auth import HTTPBasicAuth
REST_USER="restadmin"
REST_PATH="http://127.0.0.1:8001/3.0"
REST_PASS = Path("/run/secrets/mailman_rest_password").read_text().strip()
REST_AUTH = HTTPBasicAuth(REST_USER, REST_PASS)
BANNER="""
_) | __ / _|_)
` \ _` | | | ` \ _` | \ _ \ _| _ \ \ _| | _` |
_|_|_|\__,_|_|_|_|_|_|\__,_|_| _|___/\__|\___/_| _|_| _|\__, |
____/
"""
# def create_user(email):
# result = requests.get(f"{REST_PATH}/users/{email}")
# print(result)
# print(result.text)
# if not result.ok:
# print(f"user {email} not found, creating")
# result = requests.post(f"{REST_PATH}/users", auth=REST_AUTH, json={"email": email})
# if (result.ok):
# print("success")
# print(result)
# print(result.text)
# else:
# print("failed")
# print(result)
# print(result.reason)
# return False
# return True
def create_member(email, mlist, role):
result = requests.post(f"{REST_PATH}/members", auth=REST_AUTH, json={"list_id": mlist.replace('@', '.'), "subscriber": email, "role": role})
if (result.ok):
print(f"{email} <- {role} success")
print(result)
print(result.text)
return True
else:
print(f"{email} <- {role} failed")
print(result)
print(result.reason)
print(result.text)
return False
def create_ban(email, mlist):
result = requests.post(f"{REST_PATH}/lists/{mlist}/bans", auth=REST_AUTH, json={"email": email})
if (result.ok):
print(f"{email} <- BAN success")
print(result)
print(result.text)
return True
else:
print(f"{email} <- BAN failed")
print(result)
print(result.reason)
print(result.text)
return False
def parse_args(args):
parser = argparse.ArgumentParser("load a json config into local mailman configuration")
parser.add_argument("list", help="list name (eg `test`)")
parser.add_argument("patch", help="config to push to list", type=Path)
return parser.parse_args(args)
def main(args):
print(BANNER)
pargs = parse_args(args[1:])
if (not pargs.patch.exists()):
print("need input patchfile (json)")
return 1
with pargs.patch.open() as inf:
p = json.load(inf)
# determine if list exists, if not create with defaults
result = requests.get(REST_PATH + '/lists', auth=REST_AUTH)
if result.ok:
if (not (pargs.list in [x['fqdn_listname'] for x in result.json()['entries']])):
# create list
print("list not found: creating")
result = requests.post(f"{REST_PATH}/lists", auth=REST_AUTH, json={"fqdn_listname": pargs.list})
if (result.ok):
print("success")
print(result)
print(result.text)
else:
print("failed")
print(result)
print(result.reason)
print(result.text)
return 1
# list exists
else:
print("cannot get list of lists")
print(result)
print(result.reason)
return 1
# fixme pull out owner, moderator keys, we'll poke them in separately
owners = []
moderators = []
ban_list = []
if "owner" in p:
owners = p["owner"]
del p["owner"]
if "moderator" in p:
moderators = p["moderator"]
del p["moderator"]
if "ban_list" in p:
ban_list = p["ban_list"]
del p["ban_list"]
# patch config
cnt = 0
for k, v in p.items():
if (type(v) == bool):
v = str(v)
result = requests.patch(f"{REST_PATH}/lists/{pargs.list}/config", auth=REST_AUTH, json={k: v})
if result.ok:
cnt += 1
print(f"*** [OK] {k} success")
print(result)
print(result.text)
print("")
else:
print(f"*** [!!] {k} failed")
print(result)
print(result.reason)
print(result.text)
print("")
print(f"- {cnt} / {len(p) - 1} succeeded.")
print("assigning roles")
for user in owners:
create_member(user, pargs.list, "owner")
for user in moderators:
create_member(user, pargs.list, "moderator")
print("banning")
for user in ban_list:
create_ban(user, pargs.list)
if __name__ == "__main__":
sys.exit(main(sys.argv))