Move door controller code into a class

This commit is contained in:
Adam Goldsmith 2018-08-16 14:01:42 -04:00
parent 3787021ffe
commit 9d376a87b1
7 changed files with 133 additions and 118 deletions

2
.gitignore vendored
View File

@ -1,2 +1,2 @@
/__pycache__/ __pycache__/
/passwords.py /passwords.py

View File

@ -4,16 +4,11 @@ import urllib3
import os import os
import sys import sys
from io import StringIO from io import StringIO
from lxml import etree
from lxml.builder import ElementMaker
import requests import requests
from hid.DoorController import DoorController
from passwords import * from passwords import *
E_plain = ElementMaker(nsmap={"hid": "http://www.hidglobal.com/VertX"})
E = ElementMaker(namespace="http://www.hidglobal.com/VertX",
nsmap={"hid": "http://www.hidglobal.com/VertX"})
# it's fine, ssl certs are for losers anyway # it's fine, ssl certs are for losers anyway
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@ -23,6 +18,11 @@ try:
except NameError: except NameError:
config = json.load(open("config.json")) config = json.load(open("config.json"))
doors = {doorName: DoorController(doorData['ip'],
DOOR_USERNAME, DOOR_PASSWORD,
name=doorName, access=doorData['access'])
for doorName, doorData in config["doors"].items()}
fieldnames = "CardNumber,CardFormat,PinRequired,PinCode,ExtendedAccess,ExpiryDate,Forename,Initial,Surname,Email,Phone,Custom1,Custom2,Schedule1,Schedule2,Schedule3,Schedule4,Schedule5,Schedule6,Schedule7,Schedule8".split(",") fieldnames = "CardNumber,CardFormat,PinRequired,PinCode,ExtendedAccess,ExpiryDate,Forename,Initial,Surname,Email,Phone,Custom1,Custom2,Schedule1,Schedule2,Schedule3,Schedule4,Schedule5,Schedule6,Schedule7,Schedule8".split(",")
def getMembershipworksData(folders, columns): def getMembershipworksData(folders, columns):
@ -73,43 +73,3 @@ def getMembershipworksData(folders, columns):
sys.exit(1) sys.exit(1)
return list(csv.DictReader(StringIO(r.text))) return list(csv.DictReader(StringIO(r.text)))
def doImportRequest(ip, params=None, files=None):
"""Send a request to the door control import script"""
r = requests.post(
'https://' + ip + '/cgi-bin/import.cgi',
params=params,
files=files,
auth=requests.auth.HTTPDigestAuth(DOOR_USERNAME, DOOR_PASSWORD),
timeout=60,
verify=False) # ignore insecure SSL
xml = etree.XML(r.content)
if r.status_code != 200 \
or len(xml.findall("{http://www.hidglobal.com/VertX}Error")) > 0:
print("Door Updating Error: ", r.status_code, r.reason)
print(r.text)
sys.exit(1)
def doCSVImport(doorIP, csv):
"""Do the CSV import procedure on a door control"""
doImportRequest(doorIP, {"task": "importInit"})
doImportRequest(doorIP,
{"task": "importCardsPeople", "name": "cardspeopleschedule.csv"},
{"importCardsPeopleButton": ("cardspeopleschedule.csv", csv, 'text/csv')})
doImportRequest(doorIP, {"task": "importDone"})
def doXMLRequest(doorIP, xml, prefix=b'<?xml version="1.0" encoding="UTF-8"?>'):
if not isinstance(xml, str): xml = etree.tostring(xml)
r = requests.get(
'https://' + doorIP + '/cgi-bin/vertx_xml.cgi',
params={'XML': prefix + xml},
auth=requests.auth.HTTPDigestAuth(DOOR_USERNAME, DOOR_PASSWORD),
verify=False)
resp_xml = etree.XML(r.content)
# probably meed to be more sane about this
if r.status_code != 200 \
or len(resp_xml.findall("{*}Error")) > 0:
print("Door Updating Error: ", r.status_code, r.reason)
print(r.text)
sys.exit(1)
return resp_xml

View File

@ -44,29 +44,29 @@ def makeMember(member, doorAuth):
return out return out
def makeDoor(doorName, doorData, members, hashes): def makeDoor(door, members, hashes):
"""Create a CSV for the given door""" """Create a CSV for the given door"""
outString = StringIO() outString = StringIO()
writer = csv.DictWriter(outString, fieldnames) writer = csv.DictWriter(outString, fieldnames)
writer.writeheader() writer.writeheader()
for member in members: for member in members:
member = makeMember(member, "Access " + doorData["access"] + "?") member = makeMember(member, "Access " + door.access + "?")
if member is not None: if member is not None:
writer.writerow(member) writer.writerow(member)
import datetime as DT import datetime as DT
timestamp = DT.datetime.now().strftime("%Y-%m-%d %H:%M:%S") timestamp = DT.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open("/tmp/" + doorName + timestamp + ".csv", "w") as f: with open("/tmp/" + door.name + timestamp + ".csv", "w") as f:
f.write(outString.getvalue()) f.write(outString.getvalue())
outString.seek(0) outString.seek(0)
doorHash = md5(bytes(outString.getvalue(), 'utf8')).hexdigest() doorHash = md5(bytes(outString.getvalue(), 'utf8')).hexdigest()
if doorHash == hashes.get(doorName): if doorHash == hashes.get(door.name):
print("Door", doorName, "not changed, not updating") print("Door", door.name, "not changed, not updating")
else: else:
print("Door", doorName, "changed, trying to update") print("Door", door.name, "changed, trying to update")
hashes[doorName] = doorHash hashes[door.name] = doorHash
doCSVImport(doorData["ip"], outString) door.doCSVImport(outString)
# write out hash if we sucessfully updated this door # write out hash if we sucessfully updated this door
with open('/tmp/doorUpdaterLastHash', 'w') as f: with open('/tmp/doorUpdaterLastHash', 'w') as f:
json.dump(hashes, f) json.dump(hashes, f)
@ -83,9 +83,9 @@ def main():
else: else:
hashes = {} hashes = {}
for doorName, doorData in config["doors"].items(): for door in doors.values():
print(doorName, doorData) print(door.name, door.ip)
makeDoor(doorName, doorData, members, hashes) makeDoor(door, members, hashes)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -16,53 +16,10 @@ def codeToHex(facility, code):
# hexToCode("01E29DA1") <-> codeToHex(241, 20176) # hexToCode("01E29DA1") <-> codeToHex(241, 20176)
def sendSchedule(target_ip):
# clear all people
outString = StringIO()
writer = csv.DictWriter(outString, fieldnames)
writer.writeheader()
writer.writerow({})
outString.seek(0)
doCSVImport(target_ip, outString)
# clear all schedules
delXML = E_plain.VertXMessage(
*[E.Schedules({"action": "DD", "scheduleID": str(ii)})
for ii in range(1, 8)])
doXMLRequest(target_ip, delXML)
# load new schedules
with open("schedules.xml", "rb") as f:
doXMLRequest(target_ip, f.read())
def sendCardFormat(targetIP, formatName, templateID, facilityCode):
# TODO: add delete formats
# delete example: <hid:CardFormats action="DD" formatID="7-1-244"/>
el = E_plain.VertXMessage(
E.CardFormats({"action": "AD"},
E.CardFormat({"formatName": formatName,
"templateID": str(templateID)}
E.FixedField({"value": str(facilityCode)}))))
return doXMLRequest(targetIP, el)
def lockOrUnlockDoor(targetIP, lock=True):
el = E_plain.VertXMessage(
E.Doors({"action": "CM",
"command": "lockDoor" if lock else "unlockDoor"}))
return doXMLRequest(targetIP, el)
def getStatus(targetIP):
el = E_plain.VertXMessage(
E.Doors({"action": "LR", "responseFormat": "status"}))
xml = doXMLRequest(targetIP, el)
relayState = xml.find('./{*}Doors/{*}Door').attrib['relayState']
return "unlocked" if relayState == "set" else "locked"
def forEachDoor(fxn): def forEachDoor(fxn):
for doorName, doorData in config["doors"].items(): for door in doors.values():
print(doorName) print(door.name)
fxn(doorName, doorData) fxn(door)
#forEachDoor(lambda name, data: sendCardFormat(data["ip"], "A901146A-244", 1, 244)) #forEachDoor(lambda door: door.sendCardFormat("A901146A-244", 1, 244))
#forEachDoor(lambda name, data: sendSchedule(data["ip"])) #forEachDoor(lambda door: door.sendSchedules())

View File

@ -7,27 +7,27 @@ import requests
from common import * from common import *
def getStrings(targetIP): def getStrings(door):
"""Parses out the message strings from source.""" """Parses out the message strings from source."""
r = requests.get('https://' + targetIP + '/html/en_EN/en_EN.js', r = requests.get('https://' + door.ip + '/html/en_EN/en_EN.js',
auth=requests.auth.HTTPDigestAuth(DOOR_USERNAME, DOOR_PASSWORD), auth=requests.auth.HTTPDigestAuth(door.username, door.password),
verify=False) verify=False)
regex = re.compile(r'([0-9]+)="([^"]*)') regex = re.compile(r'([0-9]+)="([^"]*)')
strings = [regex.search(s) for s in r.text.split(';') strings = [regex.search(s) for s in r.text.split(';')
if s.startswith('localeStrings.eventDetails')] if s.startswith('localeStrings.eventDetails')]
print({int(g.group(1)): g.group(2) for g in strings}) print({int(g.group(1)): g.group(2) for g in strings})
def getMessages(doorName, doorIP): def getMessages(door):
# get parameters for messages to get? # get parameters for messages to get?
# honestly not really sure why this is required, their API is confusing # honestly not really sure why this is required, their API is confusing
parXMLIn = E_plain.VertXMessage( parXMLIn = E_plain.VertXMessage(
E.EventMessages({"action": "LR"})) E.EventMessages({"action": "LR"}))
parXMLOut = doXMLRequest(doorIP, parXMLIn) parXMLOut = door.doXMLRequest(parXMLIn)
etree.dump(parXMLOut) etree.dump(parXMLOut)
if os.path.exists("logs/" + doorName + ".xml"): if os.path.exists("logs/" + door.name + ".xml"):
# read last log # read last log
tree = etree.ElementTree(file="logs/" + doorName + ".xml") tree = etree.ElementTree(file="logs/" + door.name + ".xml")
root = tree.getroot() root = tree.getroot()
recordCount = int(parXMLOut[0].attrib["historyRecordMarker"]) - \ recordCount = int(parXMLOut[0].attrib["historyRecordMarker"]) - \
int(root[0][0].attrib["recordMarker"]) int(root[0][0].attrib["recordMarker"])
@ -46,7 +46,7 @@ def getMessages(doorName, doorIP):
"recordCount": str(recordCount), "recordCount": str(recordCount),
"historyRecordMarker": parXMLOut[0].attrib["historyRecordMarker"], "historyRecordMarker": parXMLOut[0].attrib["historyRecordMarker"],
"historyTimestamp": parXMLOut[0].attrib["historyTimestamp"]})) "historyTimestamp": parXMLOut[0].attrib["historyTimestamp"]}))
eventsXMLOut = doXMLRequest(doorIP, eventsXMLIn) eventsXMLOut = door.doXMLRequest(eventsXMLIn)
#TODO: handle modeRecords=true #TODO: handle modeRecords=true
for index, event in enumerate(eventsXMLOut[0]): for index, event in enumerate(eventsXMLOut[0]):
@ -60,8 +60,8 @@ def getMessages(doorName, doorIP):
tree.write("logs/" + doorName + ".xml") tree.write("logs/" + doorName + ".xml")
def main(): def main():
for doorName, doorData in config["doors"].items(): for door in doors.values():
getMessages(doorName, doorData["ip"]) getMessages(door)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

98
hid/DoorController.py Normal file
View File

@ -0,0 +1,98 @@
import sys
from lxml import etree
from lxml.builder import ElementMaker
import requests
E_plain = ElementMaker(nsmap={"hid": "http://www.hidglobal.com/VertX"})
E = ElementMaker(namespace="http://www.hidglobal.com/VertX",
nsmap={"hid": "http://www.hidglobal.com/VertX"})
class DoorController():
def __init__(self, ip, username, password, name="", access=""):
self.ip = ip
self.username = username
self.password = password
self.name = name
self.access = access
def doImportRequest(self, params=None, files=None):
"""Send a request to the door control import script"""
r = requests.post(
'https://' + self.ip + '/cgi-bin/import.cgi',
params=params,
files=files,
auth=requests.auth.HTTPDigestAuth(self.username, self.password),
timeout=60,
verify=False) # ignore insecure SSL
xml = etree.XML(r.content)
if r.status_code != 200 \
or len(xml.findall("{http://www.hidglobal.com/VertX}Error")) > 0:
print("Door Updating Error: ", r.status_code, r.reason)
print(r.text)
sys.exit(1)
def doCSVImport(self, csv):
"""Do the CSV import procedure on a door control"""
self.doImportRequest({"task": "importInit"})
self.doImportRequest({"task": "importCardsPeople", "name": "cardspeopleschedule.csv"},
{"importCardsPeopleButton": ("cardspeopleschedule.csv", csv, 'text/csv')})
self.doImportRequest({"task": "importDone"})
def doXMLRequest(self, xml, prefix=b'<?xml version="1.0" encoding="UTF-8"?>'):
if not isinstance(xml, str): xml = etree.tostring(xml)
r = requests.get(
'https://' + self.ip + '/cgi-bin/vertx_xml.cgi',
params={'XML': prefix + xml},
auth=requests.auth.HTTPDigestAuth(self.username, self.password),
verify=False)
resp_xml = etree.XML(r.content)
# probably meed to be more sane about this
if r.status_code != 200 \
or len(resp_xml.findall("{*}Error")) > 0:
print("Door Updating Error: ", r.status_code, r.reason)
print(r.text)
sys.exit(1)
return resp_xml
def sendSchedules(self):
# clear all people
outString = StringIO()
writer = csv.DictWriter(outString, fieldnames)
writer.writeheader()
writer.writerow({})
outString.seek(0)
self.doCSVImport(outString)
# clear all schedules
delXML = E_plain.VertXMessage(
*[E.Schedules({"action": "DD", "scheduleID": str(ii)})
for ii in range(1, 8)])
self.doXMLRequest(delXML)
# load new schedules
with open("schedules.xml", "rb") as f:
self.doXMLRequest(f.read())
def sendCardFormat(self, formatName, templateID, facilityCode):
# TODO: add delete formats
# delete example: <hid:CardFormats action="DD" formatID="7-1-244"/>
el = E_plain.VertXMessage(
E.CardFormats({"action": "AD"},
E.CardFormat({"formatName": formatName,
"templateID": str(templateID)},
E.FixedField({"value": str(facilityCode)}))))
return self.doXMLRequest(el)
def lockOrUnlockDoor(self, lock=True):
el = E_plain.VertXMessage(
E.Doors({"action": "CM",
"command": "lockDoor" if lock else "unlockDoor"}))
return self.doXMLRequest(el)
def getStatus(self):
el = E_plain.VertXMessage(
E.Doors({"action": "LR", "responseFormat": "status"}))
xml = self.doXMLRequest(el)
relayState = xml.find('./{*}Doors/{*}Door').attrib['relayState']
return "unlocked" if relayState == "set" else "locked"

View File

@ -7,7 +7,7 @@ from flask import Flask, render_template, request
app = Flask(__name__) app = Flask(__name__)
from common import * from common import *
from doorUtil import lockOrUnlockDoor from hid.DoorController import DoorController
def parse_list(member, regex): def parse_list(member, regex):
data_list = [] data_list = []
@ -64,7 +64,7 @@ def main():
@app.route('/frontDoor/<lock>', methods=['POST']) @app.route('/frontDoor/<lock>', methods=['POST'])
def unlockLockDoor(lock): def unlockLockDoor(lock):
lockOrUnlockDoor(config['doors']['Front Door']['ip'], lock != 'unlock') doors['Front Door'].lockOrUnlockDoor(lock != 'unlock')
return ('', http.HTTPStatus.NO_CONTENT) return ('', http.HTTPStatus.NO_CONTENT)
if __name__ == "__main__": if __name__ == "__main__":