memberPlumbing/doorUpdater.py

149 lines
5.3 KiB
Python

import sys
import requests
import csv
from io import StringIO
import urllib3
from passwords import *
# it's fine, ssl certs are for losers anyway
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
fieldnames = "CardNumber,CardFormat,PinRequired,PinCode,ExtendedAccess,ExpiryDate,Forename,Initial,Surname,Email,Phone,Custom1,Custom2,Schedule1,Schedule2,Schedule3,Schedule4,Schedule5,Schedule6,Schedule7,Schedule8".split(",")
# mapping of member levels to schedules
memberLevels = {"CMS Staff": "7x24",
"CMS Weekends Only": "Weekends Only",
"CMS Weekdays Only": "Weekdays Only",
"CMS Unlimited": "Unlimited",
"CMS Nights & Weekends": "Nights and Weekends"}
# door access permissions to controller IP addresses
doors = {
"Rental Studio": ["172.18.51.11"],
"Storage Closet": ["172.18.51.12"],
"Metal Shop": ["172.18.51.13"],
"Wood Shop": ["172.18.51.14", "172.18.51.15"]
}
def getMembershipworksData():
""" Pull the members csv from the membershipworks api """
BASE_URL = "https://api.membershipworks.com/v1/"
# login
r = requests.post(BASE_URL + 'usr',
data={"_st": "all",
"eml": MEMBERSHIPWORKS_USERNAME,
"org": "10000",
"pwd": MEMBERSHIPWORKS_PASSWORD})
if r.status_code != 200 or 'SF' not in r.json():
print("MembershipWorks Login Error: ", r.status_code, r.reason)
print(r.text)
sys.exit(1)
SFtoken = r.json()['SF']
# get list of member/staff IDs
r = requests.get(BASE_URL + "ylp",
params={"SF": SFtoken,
"lbl": "5ae37979f033bfe8534f8799,5771675edcdf126302a2f6b9", # members and staff
"org": "15475", # unknown
"var": "_id,nam,ctc"})
if r.status_code != 200 or 'usr' not in r.json():
print("MembershipWorks User Listing Error: ", r.status_code, r.reason)
print(r.text)
sys.exit(1)
ids = [user['_id'] for user in r.json()['usr']]
# get members CSV
# TODO: maybe can just use previous get instead? would return JSON
r = requests.post(BASE_URL + "csv",
params={"SF": SFtoken},
data={"_rt": "946702800", # unknown
"mux": "", # unknown
"tid": ",".join(ids), # ids of members to get
"var": "lvl,xws,xms,xsc,xrs,xfd,xac,phn,eml,lbl,xcf,nam"}) # which columns to get
if r.status_code != 200:
print("MembershipWorks CSV Generation Error: ", r.status_code, r.reason)
print(r.text)
sys.exit(1)
return r.text
def makeMember(member, doorAuth):
"""Create an output CSV row for the member"""
if member["Access Card Number"] == "":
#print(member["First Name"], member["Last Name"], " has no card number, ignoring")
return
out = {"Forename": member["First Name"],
"Surname": member["Last Name"],
"Initial": "",
"CardNumber": member["Access Card Number"],
"CardFormat": "A901146A-" + member["Access Card Facility Code"],
"PinRequired": "0",
"ExtendedAccess": "0",
"ExpiryDate": "",
"Email": member["Email"],
"Phone": member["Phone"]}
if member[doorAuth] == "Y" \
and not member["Account on Hold"] == "Account on Hold":
memberLevel = [v for k, v in memberLevels.items() if member[k] == k]
if len(memberLevel) == 1:
out["Schedule1"] = memberLevel[0]
else:
print(member["First Name"], member["Last Name"], "has no/too many member levels!")
return out
def doRequest(ip, params=None, files=None):
"""Send a request to the door control import script"""
r = requests.post(
'https://' + ip + '/cgi-bin/import.cgi',
params=params,
files=files,
auth=requests.auth.HTTPDigestAuth(DOOR_USERNAME, DOOR_PASSWORD),
timeout=1,
verify=False) # ignore insecure SSL
if r.status_code != 200:
print("Door Updating Error: ", r.status_code, r.reason)
print(r.text)
sys.exit(1)
def updateDoor(doorIP, csv):
"""Do the update import procedure on a door control"""
doRequest(doorIP, {"task": "importInit"})
doRequest(doorIP,
{"task": "importCardsPeople", "name": "cardspeopleschedule.csv"},
{"importCardsPeopleButton": ("cardspeopleschedule.csv", csv, 'text/csv')})
doRequest(doorIP, {"task": "importDone"})
def makeDoor(doorName, doorIPs, members):
"""Create a CSV for the given door"""
outString = StringIO()
writer = csv.DictWriter(outString, fieldnames)
writer.writeheader()
for member in members:
member = makeMember(member, "Access " + doorName + "?")
if member is not None:
writer.writerow(member)
print(outString.getvalue())
for doorIP in doorIPs:
outString.seek(0)
updateDoor(doorIP, outString)
pass
def main():
data = getMembershipworksData()
reader = csv.DictReader(StringIO(data))
members = list(reader)
for doorName, doorIP in doors.items():
makeDoor(doorName, doorIP, members)
if __name__ == '__main__':
main()