mailman-sync/mailman_sync.py
Adam Goldsmith 8fac612dda Set moderation_action to accept for moderators/owners
this is apparently not automatic with Mailman 3
2024-05-08 14:46:05 -04:00

298 lines
9.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Update Mailman 3 lists via a json API of the form:
{
LIST: {
"config"?: {
"real_name": REAL_NAME,
"subject_prefix": PREFIX,
"reply_to_address": REPLY_TO_ADDRESS,
}
"moderators"?: [ADDRESS, ...],
"members": [ADDRESS, ...]
},
...
}
"""
import json
import os
import email.utils
import configargparse
import mailmanclient
import requests
def config_list(
list: mailmanclient.MailingList,
dry_run: bool,
real_name: str,
subject_prefix: str,
reply_to_address: str,
):
# TODO: some of this is announcement list specific, some not
config_changes = {
"display_name": real_name,
"subject_prefix": subject_prefix,
"reply_to_address": reply_to_address,
"reply_goes_to_list": "explicit_header_only",
"first_strip_reply_to": True,
# quiet member management
"send_welcome_message": False,
"send_goodbye_message": False,
"allow_list_posts": False,
"anonymous_list": True,
"archive_policy": "private",
"member_roster_visibility": "moderators",
"subscription_policy": "confirm_then_moderate",
"unsubscription_policy": "confirm",
"default_member_action": "hold",
"default_nonmember_action": "discard",
"advertised": False,
}
for setting, value in config_changes.items():
old_value = list.settings.get(setting)
if old_value != value:
print(f"Changed {setting}: {old_value} -> {value}")
if not dry_run:
list.settings.update(config_changes)
list.settings.save()
# TODO: this will never update someone's display name
def diff_roster(
expected_members: set[str], existing_members: set[mailmanclient.Member]
) -> tuple[dict[str, str], list[mailmanclient.Member]]:
expected_members_dict = dict(
list(reversed(email.utils.parseaddr(member))) for member in expected_members
)
existing_members_dict = {str(member.address): member for member in existing_members}
members_to_add = {
k: expected_members_dict[k]
for k in set(expected_members_dict) - set(existing_members_dict)
}
members_to_remove = [
existing_members_dict[k]
for k in set(existing_members_dict) - set(expected_members_dict)
]
return members_to_add, members_to_remove
def sync_members(
list: mailmanclient.MailingList, dry_run: bool, expected_members: set[str]
):
members_to_add, members_to_remove = diff_roster(expected_members, set(list.members))
for address, display_name in members_to_add.items():
print(f"Adding '{display_name} <{address}>' to list {list}")
if not dry_run:
list.subscribe(
address,
display_name,
pre_verified=True,
pre_confirmed=True,
pre_approved=True,
send_welcome_message=False,
)
# TODO: could use `.mass_unsubscribe()` instead
for member in members_to_remove:
print(f"Removing {member} from list {list}")
if not dry_run:
list.unsubscribe(member.address, pre_approved=True, pre_confirmed=True)
def sync_moderators(
list: mailmanclient.MailingList, dry_run: bool, expected_members: set[str]
):
members_to_add, members_to_remove = diff_roster(
expected_members, set(list.moderators)
)
for address, display_name in members_to_add.items():
print(f"Adding '{display_name} <{address}>' as moderator to list {list}")
if not dry_run:
list.add_moderator(address, display_name)
for member in members_to_remove:
print(f"Removing {member} as moderator from list {list}")
if not dry_run:
list.remove_moderator(member.address.email)
def fixup_moderation_actions(list: mailmanclient.MailingList, dry_run: bool):
for member in list.members:
if list.is_owner_or_mod(member.address):
new_moderation_action = "accept"
else:
new_moderation_action = None
if member.moderation_action != new_moderation_action:
print(
f"Updating Moderation action for {member} ({member.moderation_action} -> {new_moderation_action}) in list {list}"
)
if not dry_run:
member.moderation_action = new_moderation_action
member.save()
def update_cloudflare_lists(
dry_run: bool,
auth_token: str,
account_id: str,
script_name: str,
binding_name: str,
expected: list[str],
):
settings_api_url = f"https://api.cloudflare.com/client/v4/accounts/{account_id}/workers/scripts/{script_name}/settings"
r = requests.get(
settings_api_url,
headers={"Authorization": auth_token},
)
data = r.json()
# TODO: more error handling
if not data["success"]:
print("Failed to fetch Cloudflare email worker settings")
bindings_dict = {b["name"]: b for b in data["result"]["bindings"]}
if set(bindings_dict.get(binding_name, {}).get("json", [])) != set(expected):
print("Cloudflare email worker list out of date, updating")
bindings_dict[binding_name] = {
"name": binding_name,
"json": expected,
"type": "json",
}
if not dry_run:
r = requests.patch(
settings_api_url,
headers={"Authorization": auth_token},
files={
"settings": json.dumps({"bindings": list(bindings_dict.values())}),
},
)
else:
print("Cloudflare email worker list already up to date")
def main(
api: str,
api_auth: str,
mailman_client: mailmanclient.Client,
dry_run: bool,
mail_host: str,
cf_auth_token: str,
cf_account_id: str,
cf_script_name: str,
cf_binding_name: str,
):
r = requests.get(api, headers={"Authorization": api_auth})
if not r.ok:
print(f"Failed to get mailing list data from api: {r.status_code} {r.text}")
return
expected_lists = r.json()
domain = mailman_client.get_domain(mail_host)
existing_lists = {list.list_name.lower(): list for list in domain.get_lists()}
for name, props in expected_lists.items():
if name.lower() in existing_lists:
list = existing_lists[name.lower()]
elif dry_run:
print(f"Skipping non-existing list {name} in dry run mode")
continue
else:
list = domain.create_list(name, props.get("style"))
if "config" not in props:
print(f"Not configuring {name}, as it has no config section")
else:
print(f"Configuring/syncing {name}...")
config_list(list, dry_run, **props["config"])
sync_members(
list, dry_run, set(props["members"]) | set(props.get("moderators", []))
)
if "moderators" in props:
sync_moderators(list, dry_run, set(props["moderators"]))
fixup_moderation_actions(list, dry_run)
lists = [list.list_name.lower() for list in domain.get_lists()]
update_cloudflare_lists(
dry_run, cf_auth_token, cf_account_id, cf_script_name, cf_binding_name, lists
)
def parse_arguments():
argp = configargparse.ArgumentParser(description=__doc__)
argp.add("-c", "--config", is_config_file=True, help="Config file path")
argp.add_argument("--api", required=True, help="API endpoint to retrieve JSON from")
argp.add_argument(
"--api-auth", required=True, help="API Authorization header token"
)
argp.add_argument("--mail-host", help="Base domain for all lists", required=True)
argp.add_argument(
"--mailman-url",
help="base URL for for Mailman3 REST API",
default="http://localhost:9001",
)
argp.add_argument(
"--mailman-user", help="Username for Mailman3 REST API", default="restadmin"
)
argp.add_argument(
"--mailman-pass", help="Password for Mailman3 REST API", required=True
)
argp.add_argument(
"--cf-auth-token", help="Auth token for CloudFlare API", required=True
)
argp.add_argument(
"--cf-account-id",
help="Account ID of CloudFlare email worker script",
required=True,
)
argp.add_argument(
"--cf-script-name", help="Name of CloudFlare email worker script", required=True
)
argp.add_argument(
"--cf-binding-name",
help="Name of environment variable in email worker script",
default="MAILMAN_LISTS",
)
argp.add_argument(
"-n",
"--dry-run",
action="store_true",
help="Don't make changes, just print what would happen",
)
return argp.parse_args()
if __name__ == "__main__":
args = parse_arguments()
mailman_client = mailmanclient.Client(
args.mailman_url + "/3.1",
args.mailman_user,
args.mailman_pass,
)
main(
args.api,
args.api_auth,
mailman_client,
args.dry_run,
args.mail_host,
args.cf_auth_token,
args.cf_account_id,
args.cf_script_name,
args.cf_binding_name,
)