cmsmanage/doorcontrol/hid/DoorController.py

240 lines
8.2 KiB
Python
Raw Normal View History

import csv
from datetime import datetime
from io import StringIO
import requests
import urllib3
from lxml import etree
from lxml.builder import ElementMaker
E_plain = ElementMaker(nsmap={"hid": "http://www.hidglobal.com/VertX"})
E = ElementMaker(
namespace="http://www.hidglobal.com/VertX",
nsmap={"hid": "http://www.hidglobal.com/VertX"},
)
E_corp = ElementMaker(
namespace="http://www.hidcorp.com/VertX", # stupid
nsmap={"hid": "http://www.hidcorp.com/VertX"},
)
ROOT = E_plain.VertXMessage
fieldnames = "CardNumber,CardFormat,PinRequired,PinCode,ExtendedAccess,ExpiryDate,Forename,Initial,Surname,Email,Phone,Custom1,Custom2,Schedule1,Schedule2,Schedule3,Schedule4,Schedule5,Schedule6,Schedule7,Schedule8".split(
","
)
# TODO: where should this live?
# it's fine, ssl certs are for losers anyway
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class RemoteError(Exception):
def __init__(self, r):
super().__init__(f"Door Updating Error: {r.status_code} {r.reason}\n{r.text}")
class DoorController:
def __init__(self, ip, username, password):
self.ip = ip
self.username = username
self.password = password
def doImport(self, params=None, files=None):
"""Send a request to the door control import script"""
r = requests.post(
"https://" + self.ip + "/cgi-bin/import.cgi",
params=params,
files=files,
auth=requests.auth.HTTPDigestAuth(self.username, self.password),
timeout=60,
verify=False,
) # ignore insecure SSL
xml = etree.XML(r.content)
if (
2024-01-18 14:00:36 -05:00
r.status_code != requests.codes.ok
or len(xml.findall("{http://www.hidglobal.com/VertX}Error")) > 0
):
raise RemoteError(r)
def doCSVImport(self, csv):
"""Do the CSV import procedure on a door control"""
self.doImport({"task": "importInit"})
self.doImport(
{"task": "importCardsPeople", "name": "cardspeopleschedule.csv"},
{"importCardsPeopleButton": ("cardspeopleschedule.csv", csv, "text/csv")},
)
self.doImport({"task": "importDone"})
def doXMLRequest(self, xml, prefix=b'<?xml version="1.0" encoding="UTF-8"?>'):
if not isinstance(xml, bytes):
xml = etree.tostring(xml)
r = requests.get(
"https://" + self.ip + "/cgi-bin/vertx_xml.cgi",
params={"XML": prefix + xml},
auth=requests.auth.HTTPDigestAuth(self.username, self.password),
verify=False,
)
resp_xml = etree.XML(r.content)
# probably meed to be more sane about this
2024-01-18 14:00:36 -05:00
if r.status_code != requests.codes.ok or len(resp_xml.findall("{*}Error")) > 0:
raise RemoteError(r)
return resp_xml
def get_scheduleMap(self):
schedules = self.doXMLRequest(
ROOT(E.Schedules({"action": "LR", "recordOffset": "0", "recordCount": "8"}))
)
return {
fmt.attrib["scheduleName"]: fmt.attrib["scheduleID"] for fmt in schedules[0]
}
def get_schedules(self):
# TODO: might be able to do in one request
schedules = self.doXMLRequest(ROOT(E.Schedules({"action": "LR"})))
etree.dump(schedules)
data = self.doXMLRequest(
ROOT(
*[
E.Schedules(
{"action": "LR", "scheduleID": schedule.attrib["scheduleID"]}
)
for schedule in schedules[0]
]
)
)
return ROOT(E_corp.Schedules({"action": "AD"}, *[s[0] for s in data]))
def set_schedules(self, schedules):
# clear all people
outString = StringIO()
writer = csv.DictWriter(outString, fieldnames)
writer.writeheader()
writer.writerow({})
outString.seek(0)
self.doCSVImport(outString)
# clear all schedules
delXML = ROOT(
*[
E.Schedules({"action": "DD", "scheduleID": str(ii)})
for ii in range(1, 8)
]
)
try:
self.doXMLRequest(delXML)
except RemoteError:
# don't care about failure to delete, they probably just didn't exist
pass
# load new schedules
self.doXMLRequest(schedules)
def get_cardFormats(self):
cardFormats = self.doXMLRequest(
ROOT(E.CardFormats({"action": "LR", "responseFormat": "expanded"}))
)
return {
fmt[0].attrib["value"]: fmt.attrib["formatID"]
for fmt in cardFormats[0].findall("{*}CardFormat[{*}FixedField]")
}
def set_cardFormat(self, formatName, templateID, facilityCode):
# TODO: add ability to delete formats
# delete example: <hid:CardFormats action="DD" formatID="7-1-244"/>
el = ROOT(
E.CardFormats(
{"action": "AD"},
E.CardFormat(
{"formatName": formatName, "templateID": str(templateID)},
E.FixedField({"value": str(facilityCode)}),
),
)
)
return self.doXMLRequest(el)
def get_records(self, req, count, params={}, stopFunction=None):
recordCount = 0
moreRecords = True
# note: all the "+/-1" bits are to work around a bug where the
# last returned entry is incomplete. There is probably a
# better way to do this, but for now I just get the last entry
# again in the next request. I suspect this probably ends
# poorly if the numbers line up poorly (ie an exact multiple
# of the returned record limit)
while True:
res = self.doXMLRequest(
ROOT(
req(
{
"action": "LR",
"recordCount": str(count - recordCount + 1),
"recordOffset": str(
recordCount - 1 if recordCount > 0 else 0
),
**params,
}
)
)
)
recordCount += int(res[0].get("recordCount")) - 1
moreRecords = res[0].get("moreRecords") == "true"
if moreRecords and (stopFunction is None or stopFunction(list(res[0]))):
yield list(res[0])[:-1]
else:
yield list(res[0])
break
def get_cardholders(self):
for page in self.get_records(
E.Cardholders, 1000, {"responseFormat": "expanded"}
):
yield from page
def get_credentials(self):
for page in self.get_records(E.Credentials, 1000):
yield from page
def update_credential(self, rawCardNumber: str, cardholderID: str):
return self.doXMLRequest(
ROOT(
E.Credentials(
{
"action": "UD",
"rawCardNumber": rawCardNumber,
"isCard": "true",
},
E.Credential({"cardholderID": cardholderID}),
)
)
)
def get_events(self, threshold):
def event_newer_than_threshold(event):
return datetime.fromisoformat(event.attrib["timestamp"]) > threshold
# These door controllers only store 5000 events max
for page in self.get_records(
E.EventMessages,
5000,
stopFunction=lambda events: event_newer_than_threshold(events[-1]),
):
events = [event for event in page if event_newer_than_threshold(event)]
if events:
yield events
def get_lock(self):
el = ROOT(E.Doors({"action": "LR", "responseFormat": "status"}))
xml = self.doXMLRequest(el)
relayState = xml.find("./{*}Doors/{*}Door").attrib["relayState"]
return "unlocked" if relayState == "set" else "locked"
def set_lock(self, lock=True):
el = ROOT(
E.Doors({"action": "CM", "command": "lockDoor" if lock else "unlockDoor"})
)
return self.doXMLRequest(el)