cmsmanage/doorcontrol/hid/DoorController.py

238 lines
8.1 KiB
Python

import contextlib
import csv
from datetime import datetime
from io import StringIO
import requests
import urllib3
from lxml import etree
from lxml.builder import ElementMaker
E_plain = ElementMaker(nsmap={"hid": "http://www.hidglobal.com/VertX"})
E = ElementMaker(
namespace="http://www.hidglobal.com/VertX",
nsmap={"hid": "http://www.hidglobal.com/VertX"},
)
E_corp = ElementMaker(
namespace="http://www.hidcorp.com/VertX", # stupid
nsmap={"hid": "http://www.hidcorp.com/VertX"},
)
ROOT = E_plain.VertXMessage
fieldnames = "CardNumber,CardFormat,PinRequired,PinCode,ExtendedAccess,ExpiryDate,Forename,Initial,Surname,Email,Phone,Custom1,Custom2,Schedule1,Schedule2,Schedule3,Schedule4,Schedule5,Schedule6,Schedule7,Schedule8".split(
","
)
# TODO: where should this live?
# it's fine, ssl certs are for losers anyway
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class RemoteError(Exception):
def __init__(self, r):
super().__init__(f"Door Updating Error: {r.status_code} {r.reason}\n{r.text}")
class DoorController:
def __init__(self, ip, username, password):
self.ip = ip
self.username = username
self.password = password
def doImport(self, params=None, files=None):
"""Send a request to the door control import script"""
r = requests.post(
"https://" + self.ip + "/cgi-bin/import.cgi",
params=params,
files=files,
auth=requests.auth.HTTPDigestAuth(self.username, self.password),
timeout=60,
verify=False,
) # ignore insecure SSL
xml = etree.XML(r.content)
if (
r.status_code != requests.codes.ok
or len(xml.findall("{http://www.hidglobal.com/VertX}Error")) > 0
):
raise RemoteError(r)
def doCSVImport(self, csv):
"""Do the CSV import procedure on a door control"""
self.doImport({"task": "importInit"})
self.doImport(
{"task": "importCardsPeople", "name": "cardspeopleschedule.csv"},
{"importCardsPeopleButton": ("cardspeopleschedule.csv", csv, "text/csv")},
)
self.doImport({"task": "importDone"})
def doXMLRequest(self, xml, prefix=b'<?xml version="1.0" encoding="UTF-8"?>'):
if not isinstance(xml, bytes):
xml = etree.tostring(xml)
r = requests.get(
"https://" + self.ip + "/cgi-bin/vertx_xml.cgi",
params={"XML": prefix + xml},
auth=requests.auth.HTTPDigestAuth(self.username, self.password),
verify=False,
)
resp_xml = etree.XML(r.content)
# probably meed to be more sane about this
if r.status_code != requests.codes.ok or len(resp_xml.findall("{*}Error")) > 0:
raise RemoteError(r)
return resp_xml
def get_scheduleMap(self):
schedules = self.doXMLRequest(
ROOT(E.Schedules({"action": "LR", "recordOffset": "0", "recordCount": "8"}))
)
return {
fmt.attrib["scheduleName"]: fmt.attrib["scheduleID"] for fmt in schedules[0]
}
def get_schedules(self):
# TODO: might be able to do in one request
schedules = self.doXMLRequest(ROOT(E.Schedules({"action": "LR"})))
etree.dump(schedules)
data = self.doXMLRequest(
ROOT(
*[
E.Schedules(
{"action": "LR", "scheduleID": schedule.attrib["scheduleID"]}
)
for schedule in schedules[0]
]
)
)
return ROOT(E_corp.Schedules({"action": "AD"}, *[s[0] for s in data]))
def set_schedules(self, schedules):
# clear all people
outString = StringIO()
writer = csv.DictWriter(outString, fieldnames)
writer.writeheader()
writer.writerow({})
outString.seek(0)
self.doCSVImport(outString)
# clear all schedules
delXML = ROOT(
*[
E.Schedules({"action": "DD", "scheduleID": str(ii)})
for ii in range(1, 8)
]
)
# don't care about failure to delete, they probably just didn't exist
contextlib.suppress(self.doXMLRequest(delXML))
# load new schedules
self.doXMLRequest(schedules)
def get_cardFormats(self):
cardFormats = self.doXMLRequest(
ROOT(E.CardFormats({"action": "LR", "responseFormat": "expanded"}))
)
return {
fmt[0].attrib["value"]: fmt.attrib["formatID"]
for fmt in cardFormats[0].findall("{*}CardFormat[{*}FixedField]")
}
def set_cardFormat(self, formatName, templateID, facilityCode):
# TODO: add ability to delete formats
# delete example: <hid:CardFormats action="DD" formatID="7-1-244"/>
el = ROOT(
E.CardFormats(
{"action": "AD"},
E.CardFormat(
{"formatName": formatName, "templateID": str(templateID)},
E.FixedField({"value": str(facilityCode)}),
),
)
)
return self.doXMLRequest(el)
def get_records(self, req, count, params={}, stopFunction=None):
recordCount = 0
moreRecords = True
# note: all the "+/-1" bits are to work around a bug where the
# last returned entry is incomplete. There is probably a
# better way to do this, but for now I just get the last entry
# again in the next request. I suspect this probably ends
# poorly if the numbers line up poorly (ie an exact multiple
# of the returned record limit)
while True:
res = self.doXMLRequest(
ROOT(
req(
{
"action": "LR",
"recordCount": str(count - recordCount + 1),
"recordOffset": str(
recordCount - 1 if recordCount > 0 else 0
),
**params,
}
)
)
)
recordCount += int(res[0].get("recordCount")) - 1
moreRecords = res[0].get("moreRecords") == "true"
if moreRecords and (stopFunction is None or stopFunction(list(res[0]))):
yield list(res[0])[:-1]
else:
yield list(res[0])
break
def get_cardholders(self):
for page in self.get_records(
E.Cardholders, 1000, {"responseFormat": "expanded"}
):
yield from page
def get_credentials(self):
for page in self.get_records(E.Credentials, 1000):
yield from page
def update_credential(self, rawCardNumber: str, cardholderID: str):
return self.doXMLRequest(
ROOT(
E.Credentials(
{
"action": "UD",
"rawCardNumber": rawCardNumber,
"isCard": "true",
},
E.Credential({"cardholderID": cardholderID}),
)
)
)
def get_events(self, threshold):
def event_newer_than_threshold(event):
return datetime.fromisoformat(event.attrib["timestamp"]) > threshold
# These door controllers only store 5000 events max
for page in self.get_records(
E.EventMessages,
5000,
stopFunction=lambda events: event_newer_than_threshold(events[-1]),
):
events = [event for event in page if event_newer_than_threshold(event)]
if events:
yield events
def get_lock(self):
el = ROOT(E.Doors({"action": "LR", "responseFormat": "status"}))
xml = self.doXMLRequest(el)
relayState = xml.find("./{*}Doors/{*}Door").attrib["relayState"]
return "unlocked" if relayState == "set" else "locked"
def set_lock(self, lock=True):
el = ROOT(
E.Doors({"action": "CM", "command": "lockDoor" if lock else "unlockDoor"})
)
return self.doXMLRequest(el)