import csv from datetime import datetime from io import StringIO import requests import urllib3 from lxml import etree from lxml.builder import ElementMaker E_plain = ElementMaker(nsmap={"hid": "http://www.hidglobal.com/VertX"}) E = ElementMaker( namespace="http://www.hidglobal.com/VertX", nsmap={"hid": "http://www.hidglobal.com/VertX"}, ) E_corp = ElementMaker( namespace="http://www.hidcorp.com/VertX", # stupid nsmap={"hid": "http://www.hidcorp.com/VertX"}, ) ROOT = E_plain.VertXMessage fieldnames = "CardNumber,CardFormat,PinRequired,PinCode,ExtendedAccess,ExpiryDate,Forename,Initial,Surname,Email,Phone,Custom1,Custom2,Schedule1,Schedule2,Schedule3,Schedule4,Schedule5,Schedule6,Schedule7,Schedule8".split( "," ) # TODO: where should this live? # it's fine, ssl certs are for losers anyway urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class RemoteError(Exception): def __init__(self, r): super().__init__(f"Door Updating Error: {r.status_code} {r.reason}\n{r.text}") class DoorController: def __init__(self, ip, username, password): self.ip = ip self.username = username self.password = password def doImport(self, params=None, files=None): """Send a request to the door control import script""" r = requests.post( "https://" + self.ip + "/cgi-bin/import.cgi", params=params, files=files, auth=requests.auth.HTTPDigestAuth(self.username, self.password), timeout=60, verify=False, ) # ignore insecure SSL xml = etree.XML(r.content) if ( r.status_code != 200 or len(xml.findall("{http://www.hidglobal.com/VertX}Error")) > 0 ): raise RemoteError(r) def doCSVImport(self, csv): """Do the CSV import procedure on a door control""" self.doImport({"task": "importInit"}) self.doImport( {"task": "importCardsPeople", "name": "cardspeopleschedule.csv"}, {"importCardsPeopleButton": ("cardspeopleschedule.csv", csv, "text/csv")}, ) self.doImport({"task": "importDone"}) def doXMLRequest(self, xml, prefix=b''): if not isinstance(xml, bytes): xml = etree.tostring(xml) r = requests.get( "https://" + self.ip + "/cgi-bin/vertx_xml.cgi", params={"XML": prefix + xml}, auth=requests.auth.HTTPDigestAuth(self.username, self.password), verify=False, ) resp_xml = etree.XML(r.content) # probably meed to be more sane about this if r.status_code != 200 or len(resp_xml.findall("{*}Error")) > 0: raise RemoteError(r) return resp_xml def get_scheduleMap(self): schedules = self.doXMLRequest( ROOT(E.Schedules({"action": "LR", "recordOffset": "0", "recordCount": "8"})) ) return { fmt.attrib["scheduleName"]: fmt.attrib["scheduleID"] for fmt in schedules[0] } def get_schedules(self): # TODO: might be able to do in one request schedules = self.doXMLRequest(ROOT(E.Schedules({"action": "LR"}))) etree.dump(schedules) data = self.doXMLRequest( ROOT( *[ E.Schedules( {"action": "LR", "scheduleID": schedule.attrib["scheduleID"]} ) for schedule in schedules[0] ] ) ) return ROOT(E_corp.Schedules({"action": "AD"}, *[s[0] for s in data])) def set_schedules(self, schedules): # clear all people outString = StringIO() writer = csv.DictWriter(outString, fieldnames) writer.writeheader() writer.writerow({}) outString.seek(0) self.doCSVImport(outString) # clear all schedules delXML = ROOT( *[ E.Schedules({"action": "DD", "scheduleID": str(ii)}) for ii in range(1, 8) ] ) try: self.doXMLRequest(delXML) except RemoteError: # don't care about failure to delete, they probably just didn't exist pass # load new schedules self.doXMLRequest(schedules) def get_cardFormats(self): cardFormats = self.doXMLRequest( ROOT(E.CardFormats({"action": "LR", "responseFormat": "expanded"})) ) return { fmt[0].attrib["value"]: fmt.attrib["formatID"] for fmt in cardFormats[0].findall("{*}CardFormat[{*}FixedField]") } def set_cardFormat(self, formatName, templateID, facilityCode): # TODO: add ability to delete formats # delete example: el = ROOT( E.CardFormats( {"action": "AD"}, E.CardFormat( {"formatName": formatName, "templateID": str(templateID)}, E.FixedField({"value": str(facilityCode)}), ), ) ) return self.doXMLRequest(el) def get_records(self, req, count, params={}, stopFunction=None): recordCount = 0 moreRecords = True # note: all the "+/-1" bits are to work around a bug where the # last returned entry is incomplete. There is probably a # better way to do this, but for now I just get the last entry # again in the next request. I suspect this probably ends # poorly if the numbers line up poorly (ie an exact multiple # of the returned record limit) while True: res = self.doXMLRequest( ROOT( req( { "action": "LR", "recordCount": str(count - recordCount + 1), "recordOffset": str( recordCount - 1 if recordCount > 0 else 0 ), **params, } ) ) ) recordCount += int(res[0].get("recordCount")) - 1 moreRecords = res[0].get("moreRecords") == "true" if moreRecords and (stopFunction is None or stopFunction(list(res[0]))): yield list(res[0])[:-1] else: yield list(res[0]) break def get_cardholders(self): for page in self.get_records( E.Cardholders, 1000, {"responseFormat": "expanded"} ): yield from page def get_credentials(self): for page in self.get_records(E.Credentials, 1000): yield from page def update_credential(self, rawCardNumber: str, cardholderID: str): return self.doXMLRequest( ROOT( E.Credentials( { "action": "UD", "rawCardNumber": rawCardNumber, "isCard": "true", }, E.Credential({"cardholderID": cardholderID}), ) ) ) def get_events(self, threshold): def event_newer_than_threshold(event): return datetime.fromisoformat(event.attrib["timestamp"]) > threshold # These door controllers only store 5000 events max for page in self.get_records( E.EventMessages, 5000, stopFunction=lambda events: event_newer_than_threshold(events[-1]), ): events = [event for event in page if event_newer_than_threshold(event)] if events: yield events def get_lock(self): el = ROOT(E.Doors({"action": "LR", "responseFormat": "status"})) xml = self.doXMLRequest(el) relayState = xml.find("./{*}Doors/{*}Door").attrib["relayState"] return "unlocked" if relayState == "set" else "locked" def set_lock(self, lock=True): el = ROOT( E.Doors({"action": "CM", "command": "lockDoor" if lock else "unlockDoor"}) ) return self.doXMLRequest(el)