#!/usr/bin/env python3 import sys import requests import csv from io import StringIO import urllib3 from hashlib import md5 import os from passwords import * # it's fine, ssl certs are for losers anyway urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) fieldnames = "CardNumber,CardFormat,PinRequired,PinCode,ExtendedAccess,ExpiryDate,Forename,Initial,Surname,Email,Phone,Custom1,Custom2,Schedule1,Schedule2,Schedule3,Schedule4,Schedule5,Schedule6,Schedule7,Schedule8".split(",") # mapping of member levels to schedules memberLevels = {"CMS Staff": "7x24", "CMS Weekends Only": "Weekends Only", "CMS Weekdays Only": "Weekdays Only", "CMS Unlimited": "Unlimited", "CMS Nights & Weekends": "Nights and Weekends"} # door access permissions to controller IP addresses doors = { "Rental Studio": ["172.18.51.11"], "Storage Closet": ["172.18.51.12"], "Metal Shop": ["172.18.51.13"], "Wood Shop": ["172.18.51.14", "172.18.51.15"] } def getMembershipworksData(): """ Pull the members csv from the membershipworks api """ BASE_URL = "https://api.membershipworks.com/v1/" # login r = requests.post(BASE_URL + 'usr', data={"_st": "all", "eml": MEMBERSHIPWORKS_USERNAME, "org": "10000", "pwd": MEMBERSHIPWORKS_PASSWORD}) if r.status_code != 200 or 'SF' not in r.json(): print("MembershipWorks Login Error: ", r.status_code, r.reason) print(r.text) sys.exit(1) SFtoken = r.json()['SF'] # get list of member/staff IDs r = requests.get(BASE_URL + "ylp", params={"SF": SFtoken, "lbl": "5ae37979f033bfe8534f8799,5771675edcdf126302a2f6b9", # members and staff "org": "15475", # unknown "var": "_id,nam,ctc"}) if r.status_code != 200 or 'usr' not in r.json(): print("MembershipWorks User Listing Error: ", r.status_code, r.reason) print(r.text) sys.exit(1) ids = [user['_id'] for user in r.json()['usr']] # get members CSV # TODO: maybe can just use previous get instead? would return JSON r = requests.post(BASE_URL + "csv", params={"SF": SFtoken}, data={"_rt": "946702800", # unknown "mux": "", # unknown "tid": ",".join(ids), # ids of members to get "var": "lvl,xws,xms,xsc,xrs,xfd,xac,phn,eml,lbl,xcf,nam"}) # which columns to get if r.status_code != 200: print("MembershipWorks CSV Generation Error: ", r.status_code, r.reason) print(r.text) sys.exit(1) newHash = md5(bytes(r.text, 'utf8')).hexdigest() if os.path.exists('/tmp/doorUpdaterLastHash'): with open('/tmp/doorUpdaterLastHash', 'r') as f: if newHash == f.read(): print("hashes are the same, not updating") sys.exit(0) with open('/tmp/doorUpdaterLastHash', 'w') as f: f.write(newHash) return r.text def makeMember(member, doorAuth): """Create an output CSV row for the member""" if member["Access Card Number"] == "": #print(member["First Name"], member["Last Name"], " has no card number, ignoring") return out = {"Forename": member["First Name"], "Surname": member["Last Name"], "Initial": "", "CardNumber": member["Access Card Number"], "CardFormat": "A901146A-" + member["Access Card Facility Code"], "PinRequired": "0", "ExtendedAccess": "0", "ExpiryDate": "", "Email": member["Email"], "Phone": member["Phone"]} if member[doorAuth] == "Y" \ and not member["Account on Hold"] == "Account on Hold": memberLevel = [v for k, v in memberLevels.items() if member[k] == k] if len(memberLevel) == 1: out["Schedule1"] = memberLevel[0] else: print(member["First Name"], member["Last Name"], "has no/too many member levels!") return out def doRequest(ip, params=None, files=None): """Send a request to the door control import script""" r = requests.post( 'https://' + ip + '/cgi-bin/import.cgi', params=params, files=files, auth=requests.auth.HTTPDigestAuth(DOOR_USERNAME, DOOR_PASSWORD), timeout=10, verify=False) # ignore insecure SSL if r.status_code != 200: print("Door Updating Error: ", r.status_code, r.reason) print(r.text) sys.exit(1) def updateDoor(doorIP, csv): """Do the update import procedure on a door control""" doRequest(doorIP, {"task": "importInit"}) doRequest(doorIP, {"task": "importCardsPeople", "name": "cardspeopleschedule.csv"}, {"importCardsPeopleButton": ("cardspeopleschedule.csv", csv, 'text/csv')}) doRequest(doorIP, {"task": "importDone"}) def makeDoor(doorName, doorIPs, members): """Create a CSV for the given door""" outString = StringIO() writer = csv.DictWriter(outString, fieldnames) writer.writeheader() for member in members: member = makeMember(member, "Access " + doorName + "?") if member is not None: writer.writerow(member) print(outString.getvalue()) for doorIP in doorIPs: outString.seek(0) updateDoor(doorIP, outString) pass def main(): data = getMembershipworksData() reader = csv.DictReader(StringIO(data)) members = list(reader) for doorName, doorIP in doors.items(): makeDoor(doorName, doorIP, members) if __name__ == '__main__': main()