cmsmanage/doorcontrol/hid/DoorController.py
Adam Goldsmith 32a91315ef doorcontrol: Improve pagination behavior of DoorController.get_records()
Use `DR` method to get total count of elements then paginate by
defined page size, instead of hacky bad automatically sized pagination
2024-08-29 21:45:56 -04:00

241 lines
8.0 KiB
Python

import contextlib
import csv
from datetime import datetime
from io import StringIO
from itertools import takewhile
import requests
import urllib3
from lxml import etree
from lxml.builder import ElementMaker
E_plain = ElementMaker(nsmap={"hid": "http://www.hidglobal.com/VertX"})
E = ElementMaker(
namespace="http://www.hidglobal.com/VertX",
nsmap={"hid": "http://www.hidglobal.com/VertX"},
)
E_corp = ElementMaker(
namespace="http://www.hidcorp.com/VertX", # stupid
nsmap={"hid": "http://www.hidcorp.com/VertX"},
)
ROOT = E_plain.VertXMessage
fieldnames = "CardNumber,CardFormat,PinRequired,PinCode,ExtendedAccess,ExpiryDate,Forename,Initial,Surname,Email,Phone,Custom1,Custom2,Schedule1,Schedule2,Schedule3,Schedule4,Schedule5,Schedule6,Schedule7,Schedule8".split(
","
)
# TODO: where should this live?
# it's fine, ssl certs are for losers anyway
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class RemoteError(Exception):
def __init__(self, r):
super().__init__(f"Door Updating Error: {r.status_code} {r.reason}\n{r.text}")
class UnsupportedPageSize(Exception):
def __init__(self, page_size: int) -> None:
super().__init__(
f"Page size {page_size} greater than supported by controller. "
"(controller returned moreRecords=true)"
)
class DoorController:
def __init__(self, ip, username, password):
self.ip = ip
self.username = username
self.password = password
def doImport(self, params=None, files=None):
"""Send a request to the door control import script"""
r = requests.post(
"https://" + self.ip + "/cgi-bin/import.cgi",
params=params,
files=files,
auth=requests.auth.HTTPDigestAuth(self.username, self.password),
timeout=60,
verify=False,
) # ignore insecure SSL
xml = etree.XML(r.content)
if (
r.status_code != requests.codes.ok
or len(xml.findall("{http://www.hidglobal.com/VertX}Error")) > 0
):
raise RemoteError(r)
def doCSVImport(self, csv):
"""Do the CSV import procedure on a door control"""
self.doImport({"task": "importInit"})
self.doImport(
{"task": "importCardsPeople", "name": "cardspeopleschedule.csv"},
{"importCardsPeopleButton": ("cardspeopleschedule.csv", csv, "text/csv")},
)
self.doImport({"task": "importDone"})
def doXMLRequest(self, xml, prefix=b'<?xml version="1.0" encoding="UTF-8"?>'):
if not isinstance(xml, bytes):
xml = etree.tostring(xml)
r = requests.get(
"https://" + self.ip + "/cgi-bin/vertx_xml.cgi",
params={"XML": prefix + xml},
auth=requests.auth.HTTPDigestAuth(self.username, self.password),
verify=False,
)
resp_xml = etree.XML(r.content)
# probably meed to be more sane about this
if r.status_code != requests.codes.ok or len(resp_xml.findall("{*}Error")) > 0:
raise RemoteError(r)
return resp_xml
def get_scheduleMap(self):
schedules = self.doXMLRequest(
ROOT(E.Schedules({"action": "LR", "recordOffset": "0", "recordCount": "8"}))
)
return {
fmt.attrib["scheduleName"]: fmt.attrib["scheduleID"] for fmt in schedules[0]
}
def get_schedules(self):
# TODO: might be able to do in one request
schedules = self.doXMLRequest(ROOT(E.Schedules({"action": "LR"})))
etree.dump(schedules)
data = self.doXMLRequest(
ROOT(
*[
E.Schedules(
{"action": "LR", "scheduleID": schedule.attrib["scheduleID"]}
)
for schedule in schedules[0]
]
)
)
return ROOT(E_corp.Schedules({"action": "AD"}, *[s[0] for s in data]))
def set_schedules(self, schedules):
# clear all people
outString = StringIO()
writer = csv.DictWriter(outString, fieldnames)
writer.writeheader()
writer.writerow({})
outString.seek(0)
self.doCSVImport(outString)
# clear all schedules
delXML = ROOT(
*[
E.Schedules({"action": "DD", "scheduleID": str(ii)})
for ii in range(1, 8)
]
)
# don't care about failure to delete, they probably just didn't exist
contextlib.suppress(self.doXMLRequest(delXML))
# load new schedules
self.doXMLRequest(schedules)
def get_cardFormats(self):
cardFormats = self.doXMLRequest(
ROOT(E.CardFormats({"action": "LR", "responseFormat": "expanded"}))
)
return {
fmt[0].attrib["value"]: fmt.attrib["formatID"]
for fmt in cardFormats[0].findall("{*}CardFormat[{*}FixedField]")
}
def set_cardFormat(self, formatName, templateID, facilityCode):
# TODO: add ability to delete formats
# delete example: <hid:CardFormats action="DD" formatID="7-1-244"/>
el = ROOT(
E.CardFormats(
{"action": "AD"},
E.CardFormat(
{"formatName": formatName, "templateID": str(templateID)},
E.FixedField({"value": str(facilityCode)}),
),
)
)
return self.doXMLRequest(el)
def get_records(
self,
req,
count_attr: str,
params: dict[str, str] | None = None,
page_size: int = 100,
):
dr = self.doXMLRequest(ROOT(req({"action": "DR"})))
for offset in range(0, int(dr[0].attrib[count_attr]), page_size):
res = self.doXMLRequest(
ROOT(
req(
{
"action": "LR",
"recordCount": str(page_size),
"recordOffset": str(offset),
**(params or {}),
}
)
)
)
# The web interface does sub-pagination when needed, but that is very messy.
# See previous versions of this function for an example :)
if res[0].attrib["moreRecords"] != "false":
raise UnsupportedPageSize(page_size)
yield list(res[0])
def get_cardholders(self):
for page in self.get_records(
E.Cardholders, "cardholdersInUse", params={"responseFormat": "expanded"}
):
yield from page
def get_credentials(self):
for page in self.get_records(E.Credentials, "credentialsInUse"):
yield from page
def update_credential(self, rawCardNumber: str, cardholderID: str):
return self.doXMLRequest(
ROOT(
E.Credentials(
{
"action": "UD",
"rawCardNumber": rawCardNumber,
"isCard": "true",
},
E.Credential({"cardholderID": cardholderID}),
)
)
)
def get_events(self, threshold: datetime):
def event_newer_than_threshold(event):
return datetime.fromisoformat(event.attrib["timestamp"]) > threshold
# smaller page size empirically determined
for page in self.get_records(E.EventMessages, "eventsInUse", page_size=25):
events = list(takewhile(event_newer_than_threshold, page))
if events:
yield events
else:
break
def get_lock(self):
el = ROOT(E.Doors({"action": "LR", "responseFormat": "status"}))
xml = self.doXMLRequest(el)
relayState = xml.find("./{*}Doors/{*}Door").attrib["relayState"]
return "unlocked" if relayState == "set" else "locked"
def set_lock(self, lock=True):
el = ROOT(
E.Doors({"action": "CM", "command": "lockDoor" if lock else "unlockDoor"})
)
return self.doXMLRequest(el)