2024-01-19 15:16:47 -05:00
|
|
|
import contextlib
|
2023-11-08 12:34:11 -05:00
|
|
|
import csv
|
|
|
|
from datetime import datetime
|
|
|
|
from io import StringIO
|
2024-08-26 22:04:03 -04:00
|
|
|
from itertools import takewhile
|
2023-11-08 12:34:11 -05:00
|
|
|
|
|
|
|
import requests
|
|
|
|
import urllib3
|
|
|
|
from lxml import etree
|
|
|
|
from lxml.builder import ElementMaker
|
|
|
|
|
|
|
|
E_plain = ElementMaker(nsmap={"hid": "http://www.hidglobal.com/VertX"})
|
|
|
|
E = ElementMaker(
|
|
|
|
namespace="http://www.hidglobal.com/VertX",
|
|
|
|
nsmap={"hid": "http://www.hidglobal.com/VertX"},
|
|
|
|
)
|
|
|
|
E_corp = ElementMaker(
|
|
|
|
namespace="http://www.hidcorp.com/VertX", # stupid
|
|
|
|
nsmap={"hid": "http://www.hidcorp.com/VertX"},
|
|
|
|
)
|
|
|
|
ROOT = E_plain.VertXMessage
|
|
|
|
|
|
|
|
fieldnames = "CardNumber,CardFormat,PinRequired,PinCode,ExtendedAccess,ExpiryDate,Forename,Initial,Surname,Email,Phone,Custom1,Custom2,Schedule1,Schedule2,Schedule3,Schedule4,Schedule5,Schedule6,Schedule7,Schedule8".split(
|
|
|
|
","
|
|
|
|
)
|
|
|
|
|
|
|
|
# TODO: where should this live?
|
|
|
|
# it's fine, ssl certs are for losers anyway
|
|
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
|
|
|
|
|
|
|
|
class RemoteError(Exception):
|
|
|
|
def __init__(self, r):
|
|
|
|
super().__init__(f"Door Updating Error: {r.status_code} {r.reason}\n{r.text}")
|
|
|
|
|
|
|
|
|
2024-08-26 22:04:03 -04:00
|
|
|
class UnsupportedPageSize(Exception):
|
|
|
|
def __init__(self, page_size: int) -> None:
|
|
|
|
super().__init__(
|
|
|
|
f"Page size {page_size} greater than supported by controller. "
|
|
|
|
"(controller returned moreRecords=true)"
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2023-11-08 12:34:11 -05:00
|
|
|
class DoorController:
|
|
|
|
def __init__(self, ip, username, password):
|
|
|
|
self.ip = ip
|
|
|
|
self.username = username
|
|
|
|
self.password = password
|
|
|
|
|
|
|
|
def doImport(self, params=None, files=None):
|
|
|
|
"""Send a request to the door control import script"""
|
|
|
|
r = requests.post(
|
|
|
|
"https://" + self.ip + "/cgi-bin/import.cgi",
|
|
|
|
params=params,
|
|
|
|
files=files,
|
|
|
|
auth=requests.auth.HTTPDigestAuth(self.username, self.password),
|
|
|
|
timeout=60,
|
|
|
|
verify=False,
|
|
|
|
) # ignore insecure SSL
|
|
|
|
xml = etree.XML(r.content)
|
|
|
|
if (
|
2024-01-18 14:00:36 -05:00
|
|
|
r.status_code != requests.codes.ok
|
2023-11-08 12:34:11 -05:00
|
|
|
or len(xml.findall("{http://www.hidglobal.com/VertX}Error")) > 0
|
|
|
|
):
|
|
|
|
raise RemoteError(r)
|
|
|
|
|
|
|
|
def doCSVImport(self, csv):
|
|
|
|
"""Do the CSV import procedure on a door control"""
|
|
|
|
self.doImport({"task": "importInit"})
|
|
|
|
self.doImport(
|
|
|
|
{"task": "importCardsPeople", "name": "cardspeopleschedule.csv"},
|
|
|
|
{"importCardsPeopleButton": ("cardspeopleschedule.csv", csv, "text/csv")},
|
|
|
|
)
|
|
|
|
self.doImport({"task": "importDone"})
|
|
|
|
|
|
|
|
def doXMLRequest(self, xml, prefix=b'<?xml version="1.0" encoding="UTF-8"?>'):
|
|
|
|
if not isinstance(xml, bytes):
|
|
|
|
xml = etree.tostring(xml)
|
|
|
|
r = requests.get(
|
|
|
|
"https://" + self.ip + "/cgi-bin/vertx_xml.cgi",
|
|
|
|
params={"XML": prefix + xml},
|
|
|
|
auth=requests.auth.HTTPDigestAuth(self.username, self.password),
|
|
|
|
verify=False,
|
|
|
|
)
|
|
|
|
resp_xml = etree.XML(r.content)
|
|
|
|
# probably meed to be more sane about this
|
2024-01-18 14:00:36 -05:00
|
|
|
if r.status_code != requests.codes.ok or len(resp_xml.findall("{*}Error")) > 0:
|
2023-11-08 12:34:11 -05:00
|
|
|
raise RemoteError(r)
|
|
|
|
return resp_xml
|
|
|
|
|
|
|
|
def get_scheduleMap(self):
|
|
|
|
schedules = self.doXMLRequest(
|
|
|
|
ROOT(E.Schedules({"action": "LR", "recordOffset": "0", "recordCount": "8"}))
|
|
|
|
)
|
|
|
|
return {
|
|
|
|
fmt.attrib["scheduleName"]: fmt.attrib["scheduleID"] for fmt in schedules[0]
|
|
|
|
}
|
|
|
|
|
|
|
|
def get_schedules(self):
|
|
|
|
# TODO: might be able to do in one request
|
|
|
|
schedules = self.doXMLRequest(ROOT(E.Schedules({"action": "LR"})))
|
|
|
|
etree.dump(schedules)
|
|
|
|
|
|
|
|
data = self.doXMLRequest(
|
|
|
|
ROOT(
|
|
|
|
*[
|
|
|
|
E.Schedules(
|
|
|
|
{"action": "LR", "scheduleID": schedule.attrib["scheduleID"]}
|
|
|
|
)
|
|
|
|
for schedule in schedules[0]
|
|
|
|
]
|
|
|
|
)
|
|
|
|
)
|
|
|
|
return ROOT(E_corp.Schedules({"action": "AD"}, *[s[0] for s in data]))
|
|
|
|
|
|
|
|
def set_schedules(self, schedules):
|
|
|
|
# clear all people
|
|
|
|
outString = StringIO()
|
|
|
|
writer = csv.DictWriter(outString, fieldnames)
|
|
|
|
writer.writeheader()
|
|
|
|
writer.writerow({})
|
|
|
|
outString.seek(0)
|
|
|
|
self.doCSVImport(outString)
|
|
|
|
|
|
|
|
# clear all schedules
|
|
|
|
delXML = ROOT(
|
|
|
|
*[
|
|
|
|
E.Schedules({"action": "DD", "scheduleID": str(ii)})
|
|
|
|
for ii in range(1, 8)
|
|
|
|
]
|
|
|
|
)
|
2024-01-19 15:16:47 -05:00
|
|
|
# don't care about failure to delete, they probably just didn't exist
|
|
|
|
contextlib.suppress(self.doXMLRequest(delXML))
|
2023-11-08 12:34:11 -05:00
|
|
|
|
|
|
|
# load new schedules
|
|
|
|
self.doXMLRequest(schedules)
|
|
|
|
|
|
|
|
def get_cardFormats(self):
|
|
|
|
cardFormats = self.doXMLRequest(
|
|
|
|
ROOT(E.CardFormats({"action": "LR", "responseFormat": "expanded"}))
|
|
|
|
)
|
|
|
|
|
|
|
|
return {
|
|
|
|
fmt[0].attrib["value"]: fmt.attrib["formatID"]
|
|
|
|
for fmt in cardFormats[0].findall("{*}CardFormat[{*}FixedField]")
|
|
|
|
}
|
|
|
|
|
|
|
|
def set_cardFormat(self, formatName, templateID, facilityCode):
|
|
|
|
# TODO: add ability to delete formats
|
|
|
|
# delete example: <hid:CardFormats action="DD" formatID="7-1-244"/>
|
|
|
|
|
|
|
|
el = ROOT(
|
|
|
|
E.CardFormats(
|
|
|
|
{"action": "AD"},
|
|
|
|
E.CardFormat(
|
|
|
|
{"formatName": formatName, "templateID": str(templateID)},
|
|
|
|
E.FixedField({"value": str(facilityCode)}),
|
|
|
|
),
|
|
|
|
)
|
|
|
|
)
|
|
|
|
return self.doXMLRequest(el)
|
|
|
|
|
2024-08-26 22:04:03 -04:00
|
|
|
def get_records(
|
|
|
|
self,
|
|
|
|
req,
|
|
|
|
count_attr: str,
|
|
|
|
params: dict[str, str] | None = None,
|
|
|
|
page_size: int = 100,
|
|
|
|
):
|
|
|
|
dr = self.doXMLRequest(ROOT(req({"action": "DR"})))
|
|
|
|
|
|
|
|
for offset in range(0, int(dr[0].attrib[count_attr]), page_size):
|
2023-11-08 12:34:11 -05:00
|
|
|
res = self.doXMLRequest(
|
|
|
|
ROOT(
|
|
|
|
req(
|
|
|
|
{
|
|
|
|
"action": "LR",
|
2024-08-26 22:04:03 -04:00
|
|
|
"recordCount": str(page_size),
|
|
|
|
"recordOffset": str(offset),
|
2024-08-07 14:29:52 -04:00
|
|
|
**(params or {}),
|
2023-11-08 12:34:11 -05:00
|
|
|
}
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
2024-08-26 22:04:03 -04:00
|
|
|
# The web interface does sub-pagination when needed, but that is very messy.
|
|
|
|
# See previous versions of this function for an example :)
|
|
|
|
if res[0].attrib["moreRecords"] != "false":
|
|
|
|
raise UnsupportedPageSize(page_size)
|
|
|
|
|
|
|
|
yield list(res[0])
|
2023-11-08 12:34:11 -05:00
|
|
|
|
|
|
|
def get_cardholders(self):
|
|
|
|
for page in self.get_records(
|
2024-08-26 22:04:03 -04:00
|
|
|
E.Cardholders, "cardholdersInUse", params={"responseFormat": "expanded"}
|
2023-11-08 12:34:11 -05:00
|
|
|
):
|
|
|
|
yield from page
|
|
|
|
|
|
|
|
def get_credentials(self):
|
2024-08-26 22:04:03 -04:00
|
|
|
for page in self.get_records(E.Credentials, "credentialsInUse"):
|
2023-11-08 12:34:11 -05:00
|
|
|
yield from page
|
|
|
|
|
|
|
|
def update_credential(self, rawCardNumber: str, cardholderID: str):
|
|
|
|
return self.doXMLRequest(
|
|
|
|
ROOT(
|
|
|
|
E.Credentials(
|
|
|
|
{
|
|
|
|
"action": "UD",
|
|
|
|
"rawCardNumber": rawCardNumber,
|
|
|
|
"isCard": "true",
|
|
|
|
},
|
|
|
|
E.Credential({"cardholderID": cardholderID}),
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
2024-08-26 22:04:03 -04:00
|
|
|
def get_events(self, threshold: datetime):
|
2023-11-08 12:34:11 -05:00
|
|
|
def event_newer_than_threshold(event):
|
|
|
|
return datetime.fromisoformat(event.attrib["timestamp"]) > threshold
|
|
|
|
|
2024-08-26 22:04:03 -04:00
|
|
|
# smaller page size empirically determined
|
|
|
|
for page in self.get_records(E.EventMessages, "eventsInUse", page_size=25):
|
|
|
|
events = list(takewhile(event_newer_than_threshold, page))
|
2023-11-08 12:34:11 -05:00
|
|
|
if events:
|
|
|
|
yield events
|
2024-08-26 22:04:03 -04:00
|
|
|
else:
|
|
|
|
break
|
2023-11-08 12:34:11 -05:00
|
|
|
|
|
|
|
def get_lock(self):
|
|
|
|
el = ROOT(E.Doors({"action": "LR", "responseFormat": "status"}))
|
|
|
|
xml = self.doXMLRequest(el)
|
|
|
|
relayState = xml.find("./{*}Doors/{*}Door").attrib["relayState"]
|
|
|
|
return "unlocked" if relayState == "set" else "locked"
|
|
|
|
|
|
|
|
def set_lock(self, lock=True):
|
|
|
|
el = ROOT(
|
|
|
|
E.Doors({"action": "CM", "command": "lockDoor" if lock else "unlockDoor"})
|
|
|
|
)
|
|
|
|
return self.doXMLRequest(el)
|