Adam Goldsmith
32a91315ef
Use `DR` method to get total count of elements then paginate by defined page size, instead of hacky bad automatically sized pagination
241 lines
8.0 KiB
Python
241 lines
8.0 KiB
Python
import contextlib
|
|
import csv
|
|
from datetime import datetime
|
|
from io import StringIO
|
|
from itertools import takewhile
|
|
|
|
import requests
|
|
import urllib3
|
|
from lxml import etree
|
|
from lxml.builder import ElementMaker
|
|
|
|
E_plain = ElementMaker(nsmap={"hid": "http://www.hidglobal.com/VertX"})
|
|
E = ElementMaker(
|
|
namespace="http://www.hidglobal.com/VertX",
|
|
nsmap={"hid": "http://www.hidglobal.com/VertX"},
|
|
)
|
|
E_corp = ElementMaker(
|
|
namespace="http://www.hidcorp.com/VertX", # stupid
|
|
nsmap={"hid": "http://www.hidcorp.com/VertX"},
|
|
)
|
|
ROOT = E_plain.VertXMessage
|
|
|
|
fieldnames = "CardNumber,CardFormat,PinRequired,PinCode,ExtendedAccess,ExpiryDate,Forename,Initial,Surname,Email,Phone,Custom1,Custom2,Schedule1,Schedule2,Schedule3,Schedule4,Schedule5,Schedule6,Schedule7,Schedule8".split(
|
|
","
|
|
)
|
|
|
|
# TODO: where should this live?
|
|
# it's fine, ssl certs are for losers anyway
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
|
|
class RemoteError(Exception):
|
|
def __init__(self, r):
|
|
super().__init__(f"Door Updating Error: {r.status_code} {r.reason}\n{r.text}")
|
|
|
|
|
|
class UnsupportedPageSize(Exception):
|
|
def __init__(self, page_size: int) -> None:
|
|
super().__init__(
|
|
f"Page size {page_size} greater than supported by controller. "
|
|
"(controller returned moreRecords=true)"
|
|
)
|
|
|
|
|
|
class DoorController:
|
|
def __init__(self, ip, username, password):
|
|
self.ip = ip
|
|
self.username = username
|
|
self.password = password
|
|
|
|
def doImport(self, params=None, files=None):
|
|
"""Send a request to the door control import script"""
|
|
r = requests.post(
|
|
"https://" + self.ip + "/cgi-bin/import.cgi",
|
|
params=params,
|
|
files=files,
|
|
auth=requests.auth.HTTPDigestAuth(self.username, self.password),
|
|
timeout=60,
|
|
verify=False,
|
|
) # ignore insecure SSL
|
|
xml = etree.XML(r.content)
|
|
if (
|
|
r.status_code != requests.codes.ok
|
|
or len(xml.findall("{http://www.hidglobal.com/VertX}Error")) > 0
|
|
):
|
|
raise RemoteError(r)
|
|
|
|
def doCSVImport(self, csv):
|
|
"""Do the CSV import procedure on a door control"""
|
|
self.doImport({"task": "importInit"})
|
|
self.doImport(
|
|
{"task": "importCardsPeople", "name": "cardspeopleschedule.csv"},
|
|
{"importCardsPeopleButton": ("cardspeopleschedule.csv", csv, "text/csv")},
|
|
)
|
|
self.doImport({"task": "importDone"})
|
|
|
|
def doXMLRequest(self, xml, prefix=b'<?xml version="1.0" encoding="UTF-8"?>'):
|
|
if not isinstance(xml, bytes):
|
|
xml = etree.tostring(xml)
|
|
r = requests.get(
|
|
"https://" + self.ip + "/cgi-bin/vertx_xml.cgi",
|
|
params={"XML": prefix + xml},
|
|
auth=requests.auth.HTTPDigestAuth(self.username, self.password),
|
|
verify=False,
|
|
)
|
|
resp_xml = etree.XML(r.content)
|
|
# probably meed to be more sane about this
|
|
if r.status_code != requests.codes.ok or len(resp_xml.findall("{*}Error")) > 0:
|
|
raise RemoteError(r)
|
|
return resp_xml
|
|
|
|
def get_scheduleMap(self):
|
|
schedules = self.doXMLRequest(
|
|
ROOT(E.Schedules({"action": "LR", "recordOffset": "0", "recordCount": "8"}))
|
|
)
|
|
return {
|
|
fmt.attrib["scheduleName"]: fmt.attrib["scheduleID"] for fmt in schedules[0]
|
|
}
|
|
|
|
def get_schedules(self):
|
|
# TODO: might be able to do in one request
|
|
schedules = self.doXMLRequest(ROOT(E.Schedules({"action": "LR"})))
|
|
etree.dump(schedules)
|
|
|
|
data = self.doXMLRequest(
|
|
ROOT(
|
|
*[
|
|
E.Schedules(
|
|
{"action": "LR", "scheduleID": schedule.attrib["scheduleID"]}
|
|
)
|
|
for schedule in schedules[0]
|
|
]
|
|
)
|
|
)
|
|
return ROOT(E_corp.Schedules({"action": "AD"}, *[s[0] for s in data]))
|
|
|
|
def set_schedules(self, schedules):
|
|
# clear all people
|
|
outString = StringIO()
|
|
writer = csv.DictWriter(outString, fieldnames)
|
|
writer.writeheader()
|
|
writer.writerow({})
|
|
outString.seek(0)
|
|
self.doCSVImport(outString)
|
|
|
|
# clear all schedules
|
|
delXML = ROOT(
|
|
*[
|
|
E.Schedules({"action": "DD", "scheduleID": str(ii)})
|
|
for ii in range(1, 8)
|
|
]
|
|
)
|
|
# don't care about failure to delete, they probably just didn't exist
|
|
contextlib.suppress(self.doXMLRequest(delXML))
|
|
|
|
# load new schedules
|
|
self.doXMLRequest(schedules)
|
|
|
|
def get_cardFormats(self):
|
|
cardFormats = self.doXMLRequest(
|
|
ROOT(E.CardFormats({"action": "LR", "responseFormat": "expanded"}))
|
|
)
|
|
|
|
return {
|
|
fmt[0].attrib["value"]: fmt.attrib["formatID"]
|
|
for fmt in cardFormats[0].findall("{*}CardFormat[{*}FixedField]")
|
|
}
|
|
|
|
def set_cardFormat(self, formatName, templateID, facilityCode):
|
|
# TODO: add ability to delete formats
|
|
# delete example: <hid:CardFormats action="DD" formatID="7-1-244"/>
|
|
|
|
el = ROOT(
|
|
E.CardFormats(
|
|
{"action": "AD"},
|
|
E.CardFormat(
|
|
{"formatName": formatName, "templateID": str(templateID)},
|
|
E.FixedField({"value": str(facilityCode)}),
|
|
),
|
|
)
|
|
)
|
|
return self.doXMLRequest(el)
|
|
|
|
def get_records(
|
|
self,
|
|
req,
|
|
count_attr: str,
|
|
params: dict[str, str] | None = None,
|
|
page_size: int = 100,
|
|
):
|
|
dr = self.doXMLRequest(ROOT(req({"action": "DR"})))
|
|
|
|
for offset in range(0, int(dr[0].attrib[count_attr]), page_size):
|
|
res = self.doXMLRequest(
|
|
ROOT(
|
|
req(
|
|
{
|
|
"action": "LR",
|
|
"recordCount": str(page_size),
|
|
"recordOffset": str(offset),
|
|
**(params or {}),
|
|
}
|
|
)
|
|
)
|
|
)
|
|
|
|
# The web interface does sub-pagination when needed, but that is very messy.
|
|
# See previous versions of this function for an example :)
|
|
if res[0].attrib["moreRecords"] != "false":
|
|
raise UnsupportedPageSize(page_size)
|
|
|
|
yield list(res[0])
|
|
|
|
def get_cardholders(self):
|
|
for page in self.get_records(
|
|
E.Cardholders, "cardholdersInUse", params={"responseFormat": "expanded"}
|
|
):
|
|
yield from page
|
|
|
|
def get_credentials(self):
|
|
for page in self.get_records(E.Credentials, "credentialsInUse"):
|
|
yield from page
|
|
|
|
def update_credential(self, rawCardNumber: str, cardholderID: str):
|
|
return self.doXMLRequest(
|
|
ROOT(
|
|
E.Credentials(
|
|
{
|
|
"action": "UD",
|
|
"rawCardNumber": rawCardNumber,
|
|
"isCard": "true",
|
|
},
|
|
E.Credential({"cardholderID": cardholderID}),
|
|
)
|
|
)
|
|
)
|
|
|
|
def get_events(self, threshold: datetime):
|
|
def event_newer_than_threshold(event):
|
|
return datetime.fromisoformat(event.attrib["timestamp"]) > threshold
|
|
|
|
# smaller page size empirically determined
|
|
for page in self.get_records(E.EventMessages, "eventsInUse", page_size=25):
|
|
events = list(takewhile(event_newer_than_threshold, page))
|
|
if events:
|
|
yield events
|
|
else:
|
|
break
|
|
|
|
def get_lock(self):
|
|
el = ROOT(E.Doors({"action": "LR", "responseFormat": "status"}))
|
|
xml = self.doXMLRequest(el)
|
|
relayState = xml.find("./{*}Doors/{*}Door").attrib["relayState"]
|
|
return "unlocked" if relayState == "set" else "locked"
|
|
|
|
def set_lock(self, lock=True):
|
|
el = ROOT(
|
|
E.Doors({"action": "CM", "command": "lockDoor" if lock else "unlockDoor"})
|
|
)
|
|
return self.doXMLRequest(el)
|