This repository has been archived on 2024-02-23. You can view files and clone it, but cannot push or open issues or pull requests.
memberPlumbing/hid/DoorController.py

102 lines
4.0 KiB
Python

import sys
from lxml import etree
from lxml.builder import ElementMaker
import requests
E_plain = ElementMaker(nsmap={"hid": "http://www.hidglobal.com/VertX"})
E = ElementMaker(namespace="http://www.hidglobal.com/VertX",
nsmap={"hid": "http://www.hidglobal.com/VertX"})
ROOT = E_plain.VertXMessage
fieldnames = "CardNumber,CardFormat,PinRequired,PinCode,ExtendedAccess,ExpiryDate,Forename,Initial,Surname,Email,Phone,Custom1,Custom2,Schedule1,Schedule2,Schedule3,Schedule4,Schedule5,Schedule6,Schedule7,Schedule8".split(",")
class DoorController():
def __init__(self, ip, username, password, name="", access=""):
self.ip = ip
self.username = username
self.password = password
self.name = name
self.access = access
def doImportRequest(self, params=None, files=None):
"""Send a request to the door control import script"""
r = requests.post(
'https://' + self.ip + '/cgi-bin/import.cgi',
params=params,
files=files,
auth=requests.auth.HTTPDigestAuth(self.username, self.password),
timeout=60,
verify=False) # ignore insecure SSL
xml = etree.XML(r.content)
if r.status_code != 200 \
or len(xml.findall("{http://www.hidglobal.com/VertX}Error")) > 0:
print("Door Updating Error: ", r.status_code, r.reason)
print(r.text)
sys.exit(1)
def doCSVImport(self, csv):
"""Do the CSV import procedure on a door control"""
self.doImportRequest({"task": "importInit"})
self.doImportRequest({"task": "importCardsPeople", "name": "cardspeopleschedule.csv"},
{"importCardsPeopleButton": ("cardspeopleschedule.csv", csv, 'text/csv')})
self.doImportRequest({"task": "importDone"})
def doXMLRequest(self, xml, prefix=b'<?xml version="1.0" encoding="UTF-8"?>'):
if not isinstance(xml, str): xml = etree.tostring(xml)
r = requests.get(
'https://' + self.ip + '/cgi-bin/vertx_xml.cgi',
params={'XML': prefix + xml},
auth=requests.auth.HTTPDigestAuth(self.username, self.password),
verify=False)
resp_xml = etree.XML(r.content)
# probably meed to be more sane about this
if r.status_code != 200 \
or len(resp_xml.findall("{*}Error")) > 0:
print("Door Updating Error: ", r.status_code, r.reason)
print(r.text)
sys.exit(1)
return resp_xml
def sendSchedules(self):
# clear all people
outString = StringIO()
writer = csv.DictWriter(outString, fieldnames)
writer.writeheader()
writer.writerow({})
outString.seek(0)
self.doCSVImport(outString)
# clear all schedules
delXML = ROOT(
*[E.Schedules({"action": "DD", "scheduleID": str(ii)})
for ii in range(1, 8)])
self.doXMLRequest(delXML)
# load new schedules
with open("schedules.xml", "rb") as f:
self.doXMLRequest(f.read())
def sendCardFormat(self, formatName, templateID, facilityCode):
# TODO: add delete formats
# delete example: <hid:CardFormats action="DD" formatID="7-1-244"/>
el = ROOT(
E.CardFormats({"action": "AD"},
E.CardFormat({"formatName": formatName,
"templateID": str(templateID)},
E.FixedField({"value": str(facilityCode)}))))
return self.doXMLRequest(el)
def lockOrUnlockDoor(self, lock=True):
el = ROOT(
E.Doors({"action": "CM",
"command": "lockDoor" if lock else "unlockDoor"}))
return self.doXMLRequest(el)
def getStatus(self):
el = ROOT(
E.Doors({"action": "LR", "responseFormat": "status"}))
xml = self.doXMLRequest(el)
relayState = xml.find('./{*}Doors/{*}Door').attrib['relayState']
return "unlocked" if relayState == "set" else "locked"