2022-02-28 16:06:10 -05:00
|
|
|
import csv
|
|
|
|
from io import StringIO
|
|
|
|
|
|
|
|
import requests
|
|
|
|
|
|
|
|
BASE_URL = "https://api.membershipworks.com"
|
|
|
|
|
|
|
|
# extracted from `SF._is.crm` in https://cdn.membershipworks.com/all.js
|
|
|
|
CRM = {
|
|
|
|
0: "Note",
|
|
|
|
4: "Profile Updated",
|
|
|
|
8: "Scheduled/Reminder Email",
|
|
|
|
9: "Renewal Notice",
|
|
|
|
10: "Join Date",
|
|
|
|
11: "Next Renewal Date",
|
|
|
|
12: "Membership Payment",
|
|
|
|
13: "Donation",
|
|
|
|
14: "Event Activity",
|
|
|
|
15: "Conversation",
|
|
|
|
16: "Contact Change",
|
|
|
|
17: "Label Change",
|
|
|
|
18: "Other Payment",
|
|
|
|
19: "Cart Payment",
|
|
|
|
20: "Payment Failed",
|
|
|
|
21: "Billing Updated",
|
|
|
|
22: "Form Checkout",
|
|
|
|
23: "Event Payment",
|
|
|
|
24: "Invoice",
|
|
|
|
25: "Invoice Payment",
|
|
|
|
26: "Renewal",
|
|
|
|
27: "Payment",
|
|
|
|
}
|
|
|
|
|
|
|
|
# Types of fields, extracted from a html snippet in all.js + some guessing
|
|
|
|
typ = {
|
|
|
|
1: "Text input",
|
|
|
|
2: "Password", # inferred from data
|
|
|
|
3: "Simple text area",
|
|
|
|
4: "Rich text area",
|
|
|
|
7: "Address",
|
|
|
|
8: "Check box",
|
|
|
|
9: "Select",
|
|
|
|
11: "Display value stored in field (ie. read only)",
|
|
|
|
12: "Required waiver/terms",
|
|
|
|
}
|
|
|
|
|
|
|
|
# more constants, this time extracted from the members csv export in all.js
|
|
|
|
staticFlags = {
|
|
|
|
"pos": {"lbl": "Position/relation (contacts)"},
|
|
|
|
"nte": {"lbl": "Admin note (contacts)"},
|
|
|
|
"pwd": {"lbl": "Password"},
|
|
|
|
"lgo": {"lbl": "Business card image URLs"},
|
|
|
|
"pfx": {"lbl": "Profile gallery image URLs"},
|
|
|
|
"lvl": {"lbl": "Membership levels"},
|
|
|
|
"aon": {"lbl": "Membership add-ons"},
|
|
|
|
"lbl": {"lbl": "Labels"},
|
|
|
|
"joi": {"lbl": "Join date"},
|
|
|
|
"end": {"lbl": "Renewal date"},
|
|
|
|
"spy": {"lbl": "Billing method"},
|
|
|
|
"rid": {"lbl": "Auto recurring billing ID"},
|
|
|
|
"ipa": {"lbl": "IP address"},
|
|
|
|
"_id": {"lbl": "Account ID"},
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
class MembershipWorksRemoteError(Exception):
|
|
|
|
def __init__(self, reason, r):
|
|
|
|
super().__init__(
|
|
|
|
f"Error when attempting {reason}: {r.status_code} {r.reason}\n{r.text}"
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class MembershipWorks:
|
|
|
|
def __init__(self):
|
|
|
|
self.sess = requests.Session()
|
|
|
|
self.org_info = None
|
|
|
|
self.auth_token = None
|
|
|
|
self.org_num = None
|
|
|
|
|
|
|
|
def login(self, username, password):
|
|
|
|
"""Authenticate against the membershipworks api"""
|
|
|
|
r = self.sess.post(
|
|
|
|
BASE_URL + "/v2/account/session",
|
|
|
|
data={"eml": username, "pwd": password},
|
|
|
|
headers={"X-Org": "10000"},
|
|
|
|
)
|
|
|
|
if r.status_code != 200 or "SF" not in r.json():
|
|
|
|
raise MembershipWorksRemoteError("login", r)
|
|
|
|
self.org_info = r.json()
|
|
|
|
self.auth_token = self.org_info["SF"]
|
|
|
|
self.org_num = self.org_info["org"]
|
|
|
|
self.sess.headers.update(
|
|
|
|
{
|
|
|
|
"X-Org": str(self.org_num),
|
|
|
|
"X-Role": "admin",
|
|
|
|
"Authorization": "Bearer " + self.auth_token,
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
def _inject_auth(self, kwargs):
|
|
|
|
# TODO: should probably be a decorator or something
|
|
|
|
if self.auth_token is None:
|
|
|
|
raise RuntimeError("Not Logged in to MembershipWorks")
|
|
|
|
# add auth token to params
|
|
|
|
if "params" not in kwargs:
|
|
|
|
kwargs["params"] = {}
|
|
|
|
kwargs["params"]["SF"] = self.auth_token
|
|
|
|
|
|
|
|
def _get_v1(self, *args, **kwargs):
|
|
|
|
self._inject_auth(kwargs)
|
|
|
|
# TODO: should probably do some error handling in here
|
|
|
|
return requests.get(*args, **kwargs)
|
|
|
|
|
|
|
|
def _post_v1(self, *args, **kwargs):
|
|
|
|
self._inject_auth(kwargs)
|
|
|
|
# TODO: should probably do some error handling in here
|
|
|
|
return requests.post(*args, **kwargs)
|
|
|
|
|
|
|
|
def _all_fields(self):
|
|
|
|
"""Parse out a list of fields from the org data.
|
|
|
|
|
|
|
|
Is this terrible? Yes. Also, not dissimilar to how MW does it
|
|
|
|
in all.js.
|
|
|
|
"""
|
|
|
|
fields = staticFlags.copy()
|
|
|
|
|
|
|
|
# TODO: this will take the later option, if the same field
|
|
|
|
# used in mulitple places. I don't know which lbl is used for
|
|
|
|
# csv export
|
|
|
|
|
|
|
|
# anm: member signup, acc: member manage, adm: admin manage
|
|
|
|
for screen_type in ["anm", "acc", "adm"]:
|
|
|
|
for box in self.org_info["tpl"][screen_type]:
|
|
|
|
for element in box["box"]:
|
|
|
|
if type(element["dat"]) != str:
|
|
|
|
for field in element["dat"]:
|
|
|
|
if "_id" in field:
|
|
|
|
if field["_id"] not in fields:
|
|
|
|
fields[field["_id"]] = field
|
|
|
|
|
|
|
|
return fields
|
|
|
|
|
|
|
|
def _parse_flags(self):
|
|
|
|
"""Parse the flags out of the org data.
|
|
|
|
|
|
|
|
This is terrible, and there might be a better way to do this.
|
|
|
|
"""
|
|
|
|
ret = {"folders": {}, "levels": {}, "addons": {}, "labels": {}}
|
|
|
|
|
|
|
|
for dek in self.org_info["dek"]:
|
|
|
|
# TODO: there must be a better way. this is stupid
|
|
|
|
if dek["dek"] == 1:
|
|
|
|
ret["folders"][dek["lbl"]] = dek["did"]
|
|
|
|
elif "cur" in dek:
|
|
|
|
ret["levels"][dek["lbl"]] = dek["did"]
|
|
|
|
elif "mux" in dek:
|
|
|
|
ret["addons"][dek["lbl"]] = dek["did"]
|
|
|
|
else:
|
|
|
|
ret["labels"][dek["lbl"]] = dek["did"]
|
|
|
|
|
|
|
|
return ret
|
|
|
|
|
|
|
|
def get_member_ids(self, folders):
|
|
|
|
folder_map = self._parse_flags()["folders"]
|
|
|
|
|
|
|
|
r = self.sess.get(
|
|
|
|
BASE_URL + "/v2/accounts",
|
|
|
|
params={"dek": ",".join([folder_map[f] for f in folders])},
|
|
|
|
)
|
|
|
|
if r.status_code != 200 or "usr" not in r.json():
|
|
|
|
raise MembershipWorksRemoteError("user listing", r)
|
|
|
|
|
|
|
|
# get list of member ID matching the search
|
|
|
|
# dedup with set() to work around people with alt uids
|
|
|
|
# TODO: figure out why people have alt uids
|
|
|
|
return set(user["uid"] for user in r.json()["usr"])
|
|
|
|
|
|
|
|
# TODO: has issues with aliasing header names:
|
|
|
|
# ex: "Personal Studio Space" Label vs Membership Addon/Field
|
|
|
|
def get_members(self, folders, columns):
|
|
|
|
"""Pull the members csv from the membershipworks api
|
|
|
|
folders: a list of the names of the folders to get
|
|
|
|
(see folder_map in this function for mapping to ids)
|
|
|
|
columns: which columns to get"""
|
|
|
|
ids = self.get_member_ids(folders)
|
|
|
|
|
|
|
|
# get members CSV
|
|
|
|
# TODO: maybe can just use previous get instead? would return JSON
|
|
|
|
r = self._post_v1(
|
|
|
|
BASE_URL + "/v1/csv",
|
|
|
|
data={
|
|
|
|
"_rt": "946702800", # unknown
|
|
|
|
"mux": "", # unknown
|
|
|
|
"tid": ",".join(ids), # ids of members to get
|
|
|
|
"var": columns,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
if r.status_code != 200:
|
|
|
|
raise MembershipWorksRemoteError("csv generation", r)
|
2022-05-31 12:31:25 -04:00
|
|
|
|
|
|
|
if r.text[0] == "\ufeff":
|
|
|
|
r.encoding = r.encoding + "-sig"
|
|
|
|
|
2022-02-28 16:06:10 -05:00
|
|
|
return list(csv.DictReader(StringIO(r.text)))
|
|
|
|
|
|
|
|
def get_transactions(self, start_date, end_date, json=False):
|
|
|
|
"""Get the transactions between start_date and end_date
|
|
|
|
|
|
|
|
Dates can be datetime.date or datetime.datetime
|
|
|
|
|
|
|
|
json gets a different version of the transactions list,
|
|
|
|
which contains a different set information
|
|
|
|
"""
|
|
|
|
r = self._get_v1(
|
|
|
|
BASE_URL + "/v1/csv",
|
|
|
|
params={
|
|
|
|
"crm": ",".join(str(k) for k in CRM.keys()),
|
|
|
|
**({"txl": ""} if json else {}),
|
|
|
|
"sdp": start_date.strftime("%s"),
|
|
|
|
"edp": end_date.strftime("%s"),
|
|
|
|
},
|
|
|
|
)
|
|
|
|
if r.status_code != 200:
|
|
|
|
raise MembershipWorksRemoteError("csv generation", r)
|
|
|
|
if json:
|
|
|
|
return r.json()
|
|
|
|
else:
|
2022-05-31 12:31:25 -04:00
|
|
|
if r.text[0] == "\ufeff":
|
|
|
|
r.encoding = r.encoding + "-sig"
|
|
|
|
|
2022-02-28 16:06:10 -05:00
|
|
|
return list(csv.DictReader(StringIO(r.text)))
|
|
|
|
|
|
|
|
def get_all_members(self):
|
|
|
|
"""Get all the data for all the members"""
|
|
|
|
folders = self._parse_flags()["folders"].keys()
|
|
|
|
fields = self._all_fields()
|
|
|
|
members = self.get_members(folders, ",".join(fields.keys()))
|
|
|
|
return members
|