diff --git a/.gitignore b/.gitignore
index 08b3f43..b31a110 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,2 @@
-/__pycache__/
+__pycache__/
/passwords.py
diff --git a/common.py b/common.py
index 11bd9e2..cf64ff3 100644
--- a/common.py
+++ b/common.py
@@ -4,16 +4,11 @@ import urllib3
import os
import sys
from io import StringIO
-from lxml import etree
-from lxml.builder import ElementMaker
import requests
+from hid.DoorController import DoorController
from passwords import *
-E_plain = ElementMaker(nsmap={"hid": "http://www.hidglobal.com/VertX"})
-E = ElementMaker(namespace="http://www.hidglobal.com/VertX",
- nsmap={"hid": "http://www.hidglobal.com/VertX"})
-
# it's fine, ssl certs are for losers anyway
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@@ -23,6 +18,11 @@ try:
except NameError:
config = json.load(open("config.json"))
+doors = {doorName: DoorController(doorData['ip'],
+ DOOR_USERNAME, DOOR_PASSWORD,
+ name=doorName, access=doorData['access'])
+ for doorName, doorData in config["doors"].items()}
+
fieldnames = "CardNumber,CardFormat,PinRequired,PinCode,ExtendedAccess,ExpiryDate,Forename,Initial,Surname,Email,Phone,Custom1,Custom2,Schedule1,Schedule2,Schedule3,Schedule4,Schedule5,Schedule6,Schedule7,Schedule8".split(",")
def getMembershipworksData(folders, columns):
@@ -73,43 +73,3 @@ def getMembershipworksData(folders, columns):
sys.exit(1)
return list(csv.DictReader(StringIO(r.text)))
-
-def doImportRequest(ip, params=None, files=None):
- """Send a request to the door control import script"""
- r = requests.post(
- 'https://' + ip + '/cgi-bin/import.cgi',
- params=params,
- files=files,
- auth=requests.auth.HTTPDigestAuth(DOOR_USERNAME, DOOR_PASSWORD),
- timeout=60,
- verify=False) # ignore insecure SSL
- xml = etree.XML(r.content)
- if r.status_code != 200 \
- or len(xml.findall("{http://www.hidglobal.com/VertX}Error")) > 0:
- print("Door Updating Error: ", r.status_code, r.reason)
- print(r.text)
- sys.exit(1)
-
-def doCSVImport(doorIP, csv):
- """Do the CSV import procedure on a door control"""
- doImportRequest(doorIP, {"task": "importInit"})
- doImportRequest(doorIP,
- {"task": "importCardsPeople", "name": "cardspeopleschedule.csv"},
- {"importCardsPeopleButton": ("cardspeopleschedule.csv", csv, 'text/csv')})
- doImportRequest(doorIP, {"task": "importDone"})
-
-def doXMLRequest(doorIP, xml, prefix=b''):
- if not isinstance(xml, str): xml = etree.tostring(xml)
- r = requests.get(
- 'https://' + doorIP + '/cgi-bin/vertx_xml.cgi',
- params={'XML': prefix + xml},
- auth=requests.auth.HTTPDigestAuth(DOOR_USERNAME, DOOR_PASSWORD),
- verify=False)
- resp_xml = etree.XML(r.content)
- # probably meed to be more sane about this
- if r.status_code != 200 \
- or len(resp_xml.findall("{*}Error")) > 0:
- print("Door Updating Error: ", r.status_code, r.reason)
- print(r.text)
- sys.exit(1)
- return resp_xml
diff --git a/doorUpdater.py b/doorUpdater.py
index d3db6a4..69003b8 100755
--- a/doorUpdater.py
+++ b/doorUpdater.py
@@ -44,29 +44,29 @@ def makeMember(member, doorAuth):
return out
-def makeDoor(doorName, doorData, members, hashes):
+def makeDoor(door, members, hashes):
"""Create a CSV for the given door"""
outString = StringIO()
writer = csv.DictWriter(outString, fieldnames)
writer.writeheader()
for member in members:
- member = makeMember(member, "Access " + doorData["access"] + "?")
+ member = makeMember(member, "Access " + door.access + "?")
if member is not None:
writer.writerow(member)
import datetime as DT
timestamp = DT.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- with open("/tmp/" + doorName + timestamp + ".csv", "w") as f:
+ with open("/tmp/" + door.name + timestamp + ".csv", "w") as f:
f.write(outString.getvalue())
outString.seek(0)
doorHash = md5(bytes(outString.getvalue(), 'utf8')).hexdigest()
- if doorHash == hashes.get(doorName):
- print("Door", doorName, "not changed, not updating")
+ if doorHash == hashes.get(door.name):
+ print("Door", door.name, "not changed, not updating")
else:
- print("Door", doorName, "changed, trying to update")
- hashes[doorName] = doorHash
- doCSVImport(doorData["ip"], outString)
+ print("Door", door.name, "changed, trying to update")
+ hashes[door.name] = doorHash
+ door.doCSVImport(outString)
# write out hash if we sucessfully updated this door
with open('/tmp/doorUpdaterLastHash', 'w') as f:
json.dump(hashes, f)
@@ -83,9 +83,9 @@ def main():
else:
hashes = {}
- for doorName, doorData in config["doors"].items():
- print(doorName, doorData)
- makeDoor(doorName, doorData, members, hashes)
+ for door in doors.values():
+ print(door.name, door.ip)
+ makeDoor(door, members, hashes)
if __name__ == '__main__':
main()
diff --git a/doorUtil.py b/doorUtil.py
index 1feb77f..663fef3 100644
--- a/doorUtil.py
+++ b/doorUtil.py
@@ -16,53 +16,10 @@ def codeToHex(facility, code):
# hexToCode("01E29DA1") <-> codeToHex(241, 20176)
-def sendSchedule(target_ip):
- # clear all people
- outString = StringIO()
- writer = csv.DictWriter(outString, fieldnames)
- writer.writeheader()
- writer.writerow({})
- outString.seek(0)
- doCSVImport(target_ip, outString)
-
- # clear all schedules
- delXML = E_plain.VertXMessage(
- *[E.Schedules({"action": "DD", "scheduleID": str(ii)})
- for ii in range(1, 8)])
- doXMLRequest(target_ip, delXML)
-
- # load new schedules
- with open("schedules.xml", "rb") as f:
- doXMLRequest(target_ip, f.read())
-
-def sendCardFormat(targetIP, formatName, templateID, facilityCode):
- # TODO: add delete formats
- # delete example:
-
- el = E_plain.VertXMessage(
- E.CardFormats({"action": "AD"},
- E.CardFormat({"formatName": formatName,
- "templateID": str(templateID)}
- E.FixedField({"value": str(facilityCode)}))))
- return doXMLRequest(targetIP, el)
-
-def lockOrUnlockDoor(targetIP, lock=True):
- el = E_plain.VertXMessage(
- E.Doors({"action": "CM",
- "command": "lockDoor" if lock else "unlockDoor"}))
- return doXMLRequest(targetIP, el)
-
-def getStatus(targetIP):
- el = E_plain.VertXMessage(
- E.Doors({"action": "LR", "responseFormat": "status"}))
- xml = doXMLRequest(targetIP, el)
- relayState = xml.find('./{*}Doors/{*}Door').attrib['relayState']
- return "unlocked" if relayState == "set" else "locked"
-
def forEachDoor(fxn):
- for doorName, doorData in config["doors"].items():
- print(doorName)
- fxn(doorName, doorData)
+ for door in doors.values():
+ print(door.name)
+ fxn(door)
-#forEachDoor(lambda name, data: sendCardFormat(data["ip"], "A901146A-244", 1, 244))
-#forEachDoor(lambda name, data: sendSchedule(data["ip"]))
+#forEachDoor(lambda door: door.sendCardFormat("A901146A-244", 1, 244))
+#forEachDoor(lambda door: door.sendSchedules())
diff --git a/events.py b/events.py
index e90aa2f..216debc 100755
--- a/events.py
+++ b/events.py
@@ -7,27 +7,27 @@ import requests
from common import *
-def getStrings(targetIP):
+def getStrings(door):
"""Parses out the message strings from source."""
- r = requests.get('https://' + targetIP + '/html/en_EN/en_EN.js',
- auth=requests.auth.HTTPDigestAuth(DOOR_USERNAME, DOOR_PASSWORD),
+ r = requests.get('https://' + door.ip + '/html/en_EN/en_EN.js',
+ auth=requests.auth.HTTPDigestAuth(door.username, door.password),
verify=False)
regex = re.compile(r'([0-9]+)="([^"]*)')
strings = [regex.search(s) for s in r.text.split(';')
if s.startswith('localeStrings.eventDetails')]
print({int(g.group(1)): g.group(2) for g in strings})
-def getMessages(doorName, doorIP):
+def getMessages(door):
# get parameters for messages to get?
# honestly not really sure why this is required, their API is confusing
parXMLIn = E_plain.VertXMessage(
E.EventMessages({"action": "LR"}))
- parXMLOut = doXMLRequest(doorIP, parXMLIn)
+ parXMLOut = door.doXMLRequest(parXMLIn)
etree.dump(parXMLOut)
- if os.path.exists("logs/" + doorName + ".xml"):
+ if os.path.exists("logs/" + door.name + ".xml"):
# read last log
- tree = etree.ElementTree(file="logs/" + doorName + ".xml")
+ tree = etree.ElementTree(file="logs/" + door.name + ".xml")
root = tree.getroot()
recordCount = int(parXMLOut[0].attrib["historyRecordMarker"]) - \
int(root[0][0].attrib["recordMarker"])
@@ -46,7 +46,7 @@ def getMessages(doorName, doorIP):
"recordCount": str(recordCount),
"historyRecordMarker": parXMLOut[0].attrib["historyRecordMarker"],
"historyTimestamp": parXMLOut[0].attrib["historyTimestamp"]}))
- eventsXMLOut = doXMLRequest(doorIP, eventsXMLIn)
+ eventsXMLOut = door.doXMLRequest(eventsXMLIn)
#TODO: handle modeRecords=true
for index, event in enumerate(eventsXMLOut[0]):
@@ -60,8 +60,8 @@ def getMessages(doorName, doorIP):
tree.write("logs/" + doorName + ".xml")
def main():
- for doorName, doorData in config["doors"].items():
- getMessages(doorName, doorData["ip"])
+ for door in doors.values():
+ getMessages(door)
if __name__ == '__main__':
main()
diff --git a/hid/DoorController.py b/hid/DoorController.py
new file mode 100644
index 0000000..cae8962
--- /dev/null
+++ b/hid/DoorController.py
@@ -0,0 +1,98 @@
+import sys
+from lxml import etree
+from lxml.builder import ElementMaker
+import requests
+
+E_plain = ElementMaker(nsmap={"hid": "http://www.hidglobal.com/VertX"})
+E = ElementMaker(namespace="http://www.hidglobal.com/VertX",
+ nsmap={"hid": "http://www.hidglobal.com/VertX"})
+
+class DoorController():
+ def __init__(self, ip, username, password, name="", access=""):
+ self.ip = ip
+ self.username = username
+ self.password = password
+ self.name = name
+ self.access = access
+
+ def doImportRequest(self, params=None, files=None):
+ """Send a request to the door control import script"""
+ r = requests.post(
+ 'https://' + self.ip + '/cgi-bin/import.cgi',
+ params=params,
+ files=files,
+ auth=requests.auth.HTTPDigestAuth(self.username, self.password),
+ timeout=60,
+ verify=False) # ignore insecure SSL
+ xml = etree.XML(r.content)
+ if r.status_code != 200 \
+ or len(xml.findall("{http://www.hidglobal.com/VertX}Error")) > 0:
+ print("Door Updating Error: ", r.status_code, r.reason)
+ print(r.text)
+ sys.exit(1)
+
+ def doCSVImport(self, csv):
+ """Do the CSV import procedure on a door control"""
+ self.doImportRequest({"task": "importInit"})
+ self.doImportRequest({"task": "importCardsPeople", "name": "cardspeopleschedule.csv"},
+ {"importCardsPeopleButton": ("cardspeopleschedule.csv", csv, 'text/csv')})
+ self.doImportRequest({"task": "importDone"})
+
+ def doXMLRequest(self, xml, prefix=b''):
+ if not isinstance(xml, str): xml = etree.tostring(xml)
+ r = requests.get(
+ 'https://' + self.ip + '/cgi-bin/vertx_xml.cgi',
+ params={'XML': prefix + xml},
+ auth=requests.auth.HTTPDigestAuth(self.username, self.password),
+ verify=False)
+ resp_xml = etree.XML(r.content)
+ # probably meed to be more sane about this
+ if r.status_code != 200 \
+ or len(resp_xml.findall("{*}Error")) > 0:
+ print("Door Updating Error: ", r.status_code, r.reason)
+ print(r.text)
+ sys.exit(1)
+ return resp_xml
+
+ def sendSchedules(self):
+ # clear all people
+ outString = StringIO()
+ writer = csv.DictWriter(outString, fieldnames)
+ writer.writeheader()
+ writer.writerow({})
+ outString.seek(0)
+ self.doCSVImport(outString)
+
+ # clear all schedules
+ delXML = E_plain.VertXMessage(
+ *[E.Schedules({"action": "DD", "scheduleID": str(ii)})
+ for ii in range(1, 8)])
+ self.doXMLRequest(delXML)
+
+ # load new schedules
+ with open("schedules.xml", "rb") as f:
+ self.doXMLRequest(f.read())
+
+ def sendCardFormat(self, formatName, templateID, facilityCode):
+ # TODO: add delete formats
+ # delete example:
+
+ el = E_plain.VertXMessage(
+ E.CardFormats({"action": "AD"},
+ E.CardFormat({"formatName": formatName,
+ "templateID": str(templateID)},
+ E.FixedField({"value": str(facilityCode)}))))
+ return self.doXMLRequest(el)
+
+ def lockOrUnlockDoor(self, lock=True):
+ el = E_plain.VertXMessage(
+ E.Doors({"action": "CM",
+ "command": "lockDoor" if lock else "unlockDoor"}))
+ return self.doXMLRequest(el)
+
+ def getStatus(self):
+ el = E_plain.VertXMessage(
+ E.Doors({"action": "LR", "responseFormat": "status"}))
+ xml = self.doXMLRequest(el)
+ relayState = xml.find('./{*}Doors/{*}Door').attrib['relayState']
+ return "unlocked" if relayState == "set" else "locked"
diff --git a/membershipViewer.py b/membershipViewer.py
index 8b99177..a905d8f 100644
--- a/membershipViewer.py
+++ b/membershipViewer.py
@@ -7,7 +7,7 @@ from flask import Flask, render_template, request
app = Flask(__name__)
from common import *
-from doorUtil import lockOrUnlockDoor
+from hid.DoorController import DoorController
def parse_list(member, regex):
data_list = []
@@ -64,7 +64,7 @@ def main():
@app.route('/frontDoor/', methods=['POST'])
def unlockLockDoor(lock):
- lockOrUnlockDoor(config['doors']['Front Door']['ip'], lock != 'unlock')
+ doors['Front Door'].lockOrUnlockDoor(lock != 'unlock')
return ('', http.HTTPStatus.NO_CONTENT)
if __name__ == "__main__":