From 9d376a87b1597df1f8cd6e1bab9dea21109ee6d3 Mon Sep 17 00:00:00 2001 From: Adam Goldsmith Date: Thu, 16 Aug 2018 14:01:42 -0400 Subject: [PATCH] Move door controller code into a class --- .gitignore | 2 +- common.py | 52 +++-------------------- doorUpdater.py | 22 +++++----- doorUtil.py | 53 +++-------------------- events.py | 20 ++++----- hid/DoorController.py | 98 +++++++++++++++++++++++++++++++++++++++++++ membershipViewer.py | 4 +- 7 files changed, 133 insertions(+), 118 deletions(-) create mode 100644 hid/DoorController.py diff --git a/.gitignore b/.gitignore index 08b3f43..b31a110 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ -/__pycache__/ +__pycache__/ /passwords.py diff --git a/common.py b/common.py index 11bd9e2..cf64ff3 100644 --- a/common.py +++ b/common.py @@ -4,16 +4,11 @@ import urllib3 import os import sys from io import StringIO -from lxml import etree -from lxml.builder import ElementMaker import requests +from hid.DoorController import DoorController from passwords import * -E_plain = ElementMaker(nsmap={"hid": "http://www.hidglobal.com/VertX"}) -E = ElementMaker(namespace="http://www.hidglobal.com/VertX", - nsmap={"hid": "http://www.hidglobal.com/VertX"}) - # it's fine, ssl certs are for losers anyway urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -23,6 +18,11 @@ try: except NameError: config = json.load(open("config.json")) +doors = {doorName: DoorController(doorData['ip'], + DOOR_USERNAME, DOOR_PASSWORD, + name=doorName, access=doorData['access']) + for doorName, doorData in config["doors"].items()} + fieldnames = "CardNumber,CardFormat,PinRequired,PinCode,ExtendedAccess,ExpiryDate,Forename,Initial,Surname,Email,Phone,Custom1,Custom2,Schedule1,Schedule2,Schedule3,Schedule4,Schedule5,Schedule6,Schedule7,Schedule8".split(",") def getMembershipworksData(folders, columns): @@ -73,43 +73,3 @@ def getMembershipworksData(folders, columns): sys.exit(1) return list(csv.DictReader(StringIO(r.text))) - -def doImportRequest(ip, params=None, files=None): - """Send a request to the door control import script""" - r = requests.post( - 'https://' + ip + '/cgi-bin/import.cgi', - params=params, - files=files, - auth=requests.auth.HTTPDigestAuth(DOOR_USERNAME, DOOR_PASSWORD), - timeout=60, - verify=False) # ignore insecure SSL - xml = etree.XML(r.content) - if r.status_code != 200 \ - or len(xml.findall("{http://www.hidglobal.com/VertX}Error")) > 0: - print("Door Updating Error: ", r.status_code, r.reason) - print(r.text) - sys.exit(1) - -def doCSVImport(doorIP, csv): - """Do the CSV import procedure on a door control""" - doImportRequest(doorIP, {"task": "importInit"}) - doImportRequest(doorIP, - {"task": "importCardsPeople", "name": "cardspeopleschedule.csv"}, - {"importCardsPeopleButton": ("cardspeopleschedule.csv", csv, 'text/csv')}) - doImportRequest(doorIP, {"task": "importDone"}) - -def doXMLRequest(doorIP, xml, prefix=b''): - if not isinstance(xml, str): xml = etree.tostring(xml) - r = requests.get( - 'https://' + doorIP + '/cgi-bin/vertx_xml.cgi', - params={'XML': prefix + xml}, - auth=requests.auth.HTTPDigestAuth(DOOR_USERNAME, DOOR_PASSWORD), - verify=False) - resp_xml = etree.XML(r.content) - # probably meed to be more sane about this - if r.status_code != 200 \ - or len(resp_xml.findall("{*}Error")) > 0: - print("Door Updating Error: ", r.status_code, r.reason) - print(r.text) - sys.exit(1) - return resp_xml diff --git a/doorUpdater.py b/doorUpdater.py index d3db6a4..69003b8 100755 --- a/doorUpdater.py +++ b/doorUpdater.py @@ -44,29 +44,29 @@ def makeMember(member, doorAuth): return out -def makeDoor(doorName, doorData, members, hashes): +def makeDoor(door, members, hashes): """Create a CSV for the given door""" outString = StringIO() writer = csv.DictWriter(outString, fieldnames) writer.writeheader() for member in members: - member = makeMember(member, "Access " + doorData["access"] + "?") + member = makeMember(member, "Access " + door.access + "?") if member is not None: writer.writerow(member) import datetime as DT timestamp = DT.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - with open("/tmp/" + doorName + timestamp + ".csv", "w") as f: + with open("/tmp/" + door.name + timestamp + ".csv", "w") as f: f.write(outString.getvalue()) outString.seek(0) doorHash = md5(bytes(outString.getvalue(), 'utf8')).hexdigest() - if doorHash == hashes.get(doorName): - print("Door", doorName, "not changed, not updating") + if doorHash == hashes.get(door.name): + print("Door", door.name, "not changed, not updating") else: - print("Door", doorName, "changed, trying to update") - hashes[doorName] = doorHash - doCSVImport(doorData["ip"], outString) + print("Door", door.name, "changed, trying to update") + hashes[door.name] = doorHash + door.doCSVImport(outString) # write out hash if we sucessfully updated this door with open('/tmp/doorUpdaterLastHash', 'w') as f: json.dump(hashes, f) @@ -83,9 +83,9 @@ def main(): else: hashes = {} - for doorName, doorData in config["doors"].items(): - print(doorName, doorData) - makeDoor(doorName, doorData, members, hashes) + for door in doors.values(): + print(door.name, door.ip) + makeDoor(door, members, hashes) if __name__ == '__main__': main() diff --git a/doorUtil.py b/doorUtil.py index 1feb77f..663fef3 100644 --- a/doorUtil.py +++ b/doorUtil.py @@ -16,53 +16,10 @@ def codeToHex(facility, code): # hexToCode("01E29DA1") <-> codeToHex(241, 20176) -def sendSchedule(target_ip): - # clear all people - outString = StringIO() - writer = csv.DictWriter(outString, fieldnames) - writer.writeheader() - writer.writerow({}) - outString.seek(0) - doCSVImport(target_ip, outString) - - # clear all schedules - delXML = E_plain.VertXMessage( - *[E.Schedules({"action": "DD", "scheduleID": str(ii)}) - for ii in range(1, 8)]) - doXMLRequest(target_ip, delXML) - - # load new schedules - with open("schedules.xml", "rb") as f: - doXMLRequest(target_ip, f.read()) - -def sendCardFormat(targetIP, formatName, templateID, facilityCode): - # TODO: add delete formats - # delete example: - - el = E_plain.VertXMessage( - E.CardFormats({"action": "AD"}, - E.CardFormat({"formatName": formatName, - "templateID": str(templateID)} - E.FixedField({"value": str(facilityCode)})))) - return doXMLRequest(targetIP, el) - -def lockOrUnlockDoor(targetIP, lock=True): - el = E_plain.VertXMessage( - E.Doors({"action": "CM", - "command": "lockDoor" if lock else "unlockDoor"})) - return doXMLRequest(targetIP, el) - -def getStatus(targetIP): - el = E_plain.VertXMessage( - E.Doors({"action": "LR", "responseFormat": "status"})) - xml = doXMLRequest(targetIP, el) - relayState = xml.find('./{*}Doors/{*}Door').attrib['relayState'] - return "unlocked" if relayState == "set" else "locked" - def forEachDoor(fxn): - for doorName, doorData in config["doors"].items(): - print(doorName) - fxn(doorName, doorData) + for door in doors.values(): + print(door.name) + fxn(door) -#forEachDoor(lambda name, data: sendCardFormat(data["ip"], "A901146A-244", 1, 244)) -#forEachDoor(lambda name, data: sendSchedule(data["ip"])) +#forEachDoor(lambda door: door.sendCardFormat("A901146A-244", 1, 244)) +#forEachDoor(lambda door: door.sendSchedules()) diff --git a/events.py b/events.py index e90aa2f..216debc 100755 --- a/events.py +++ b/events.py @@ -7,27 +7,27 @@ import requests from common import * -def getStrings(targetIP): +def getStrings(door): """Parses out the message strings from source.""" - r = requests.get('https://' + targetIP + '/html/en_EN/en_EN.js', - auth=requests.auth.HTTPDigestAuth(DOOR_USERNAME, DOOR_PASSWORD), + r = requests.get('https://' + door.ip + '/html/en_EN/en_EN.js', + auth=requests.auth.HTTPDigestAuth(door.username, door.password), verify=False) regex = re.compile(r'([0-9]+)="([^"]*)') strings = [regex.search(s) for s in r.text.split(';') if s.startswith('localeStrings.eventDetails')] print({int(g.group(1)): g.group(2) for g in strings}) -def getMessages(doorName, doorIP): +def getMessages(door): # get parameters for messages to get? # honestly not really sure why this is required, their API is confusing parXMLIn = E_plain.VertXMessage( E.EventMessages({"action": "LR"})) - parXMLOut = doXMLRequest(doorIP, parXMLIn) + parXMLOut = door.doXMLRequest(parXMLIn) etree.dump(parXMLOut) - if os.path.exists("logs/" + doorName + ".xml"): + if os.path.exists("logs/" + door.name + ".xml"): # read last log - tree = etree.ElementTree(file="logs/" + doorName + ".xml") + tree = etree.ElementTree(file="logs/" + door.name + ".xml") root = tree.getroot() recordCount = int(parXMLOut[0].attrib["historyRecordMarker"]) - \ int(root[0][0].attrib["recordMarker"]) @@ -46,7 +46,7 @@ def getMessages(doorName, doorIP): "recordCount": str(recordCount), "historyRecordMarker": parXMLOut[0].attrib["historyRecordMarker"], "historyTimestamp": parXMLOut[0].attrib["historyTimestamp"]})) - eventsXMLOut = doXMLRequest(doorIP, eventsXMLIn) + eventsXMLOut = door.doXMLRequest(eventsXMLIn) #TODO: handle modeRecords=true for index, event in enumerate(eventsXMLOut[0]): @@ -60,8 +60,8 @@ def getMessages(doorName, doorIP): tree.write("logs/" + doorName + ".xml") def main(): - for doorName, doorData in config["doors"].items(): - getMessages(doorName, doorData["ip"]) + for door in doors.values(): + getMessages(door) if __name__ == '__main__': main() diff --git a/hid/DoorController.py b/hid/DoorController.py new file mode 100644 index 0000000..cae8962 --- /dev/null +++ b/hid/DoorController.py @@ -0,0 +1,98 @@ +import sys +from lxml import etree +from lxml.builder import ElementMaker +import requests + +E_plain = ElementMaker(nsmap={"hid": "http://www.hidglobal.com/VertX"}) +E = ElementMaker(namespace="http://www.hidglobal.com/VertX", + nsmap={"hid": "http://www.hidglobal.com/VertX"}) + +class DoorController(): + def __init__(self, ip, username, password, name="", access=""): + self.ip = ip + self.username = username + self.password = password + self.name = name + self.access = access + + def doImportRequest(self, params=None, files=None): + """Send a request to the door control import script""" + r = requests.post( + 'https://' + self.ip + '/cgi-bin/import.cgi', + params=params, + files=files, + auth=requests.auth.HTTPDigestAuth(self.username, self.password), + timeout=60, + verify=False) # ignore insecure SSL + xml = etree.XML(r.content) + if r.status_code != 200 \ + or len(xml.findall("{http://www.hidglobal.com/VertX}Error")) > 0: + print("Door Updating Error: ", r.status_code, r.reason) + print(r.text) + sys.exit(1) + + def doCSVImport(self, csv): + """Do the CSV import procedure on a door control""" + self.doImportRequest({"task": "importInit"}) + self.doImportRequest({"task": "importCardsPeople", "name": "cardspeopleschedule.csv"}, + {"importCardsPeopleButton": ("cardspeopleschedule.csv", csv, 'text/csv')}) + self.doImportRequest({"task": "importDone"}) + + def doXMLRequest(self, xml, prefix=b''): + if not isinstance(xml, str): xml = etree.tostring(xml) + r = requests.get( + 'https://' + self.ip + '/cgi-bin/vertx_xml.cgi', + params={'XML': prefix + xml}, + auth=requests.auth.HTTPDigestAuth(self.username, self.password), + verify=False) + resp_xml = etree.XML(r.content) + # probably meed to be more sane about this + if r.status_code != 200 \ + or len(resp_xml.findall("{*}Error")) > 0: + print("Door Updating Error: ", r.status_code, r.reason) + print(r.text) + sys.exit(1) + return resp_xml + + def sendSchedules(self): + # clear all people + outString = StringIO() + writer = csv.DictWriter(outString, fieldnames) + writer.writeheader() + writer.writerow({}) + outString.seek(0) + self.doCSVImport(outString) + + # clear all schedules + delXML = E_plain.VertXMessage( + *[E.Schedules({"action": "DD", "scheduleID": str(ii)}) + for ii in range(1, 8)]) + self.doXMLRequest(delXML) + + # load new schedules + with open("schedules.xml", "rb") as f: + self.doXMLRequest(f.read()) + + def sendCardFormat(self, formatName, templateID, facilityCode): + # TODO: add delete formats + # delete example: + + el = E_plain.VertXMessage( + E.CardFormats({"action": "AD"}, + E.CardFormat({"formatName": formatName, + "templateID": str(templateID)}, + E.FixedField({"value": str(facilityCode)})))) + return self.doXMLRequest(el) + + def lockOrUnlockDoor(self, lock=True): + el = E_plain.VertXMessage( + E.Doors({"action": "CM", + "command": "lockDoor" if lock else "unlockDoor"})) + return self.doXMLRequest(el) + + def getStatus(self): + el = E_plain.VertXMessage( + E.Doors({"action": "LR", "responseFormat": "status"})) + xml = self.doXMLRequest(el) + relayState = xml.find('./{*}Doors/{*}Door').attrib['relayState'] + return "unlocked" if relayState == "set" else "locked" diff --git a/membershipViewer.py b/membershipViewer.py index 8b99177..a905d8f 100644 --- a/membershipViewer.py +++ b/membershipViewer.py @@ -7,7 +7,7 @@ from flask import Flask, render_template, request app = Flask(__name__) from common import * -from doorUtil import lockOrUnlockDoor +from hid.DoorController import DoorController def parse_list(member, regex): data_list = [] @@ -64,7 +64,7 @@ def main(): @app.route('/frontDoor/', methods=['POST']) def unlockLockDoor(lock): - lockOrUnlockDoor(config['doors']['Front Door']['ip'], lock != 'unlock') + doors['Front Door'].lockOrUnlockDoor(lock != 'unlock') return ('', http.HTTPStatus.NO_CONTENT) if __name__ == "__main__":