Add typing annotation for DoorController

This commit is contained in:
Adam Goldsmith 2020-06-01 14:49:15 -04:00
parent 53b457b0a9
commit ab6d0bdbc4

View File

@ -1,6 +1,17 @@
import csv
from datetime import datetime
from io import StringIO
from typing import (
IO,
Callable,
Iterable,
List,
Literal,
Mapping,
Optional,
Tuple,
Union,
)
import requests
from lxml import etree
@ -8,6 +19,8 @@ from lxml.builder import ElementMaker
from requests import Session
from requests.adapters import HTTPAdapter
from .Credential import Credential
E_plain = ElementMaker(nsmap={"hid": "http://www.hidglobal.com/VertX"})
E = ElementMaker(
namespace="http://www.hidglobal.com/VertX",
@ -25,17 +38,25 @@ fieldnames = "CardNumber,CardFormat,PinRequired,PinCode,ExtendedAccess,ExpiryDat
class HostNameIgnoringAdapter(HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
def init_poolmanager(self, *args, **kwargs) -> None:
super().init_poolmanager(*args, **kwargs, assert_hostname=False)
class RemoteError(Exception):
def __init__(self, r):
def __init__(self, r: requests.Response) -> None:
super().__init__(f"Door Updating Error: {r.status_code} {r.reason}\n{r.text}")
class DoorController:
def __init__(self, ip, username, password, name="", access="", cert=None):
def __init__(
self,
ip: str,
username: str,
password: str,
name: str = "",
access: str = "",
cert: Optional[str] = None,
) -> None:
self.ip = ip
self.username = username
self.password = password
@ -46,20 +67,27 @@ class DoorController:
self.session.mount("https://", HostNameIgnoringAdapter())
self.session.verify = cert
self._cardFormats: Optional[Mapping[str, str]] = None
self._scheduleMap: Optional[Mapping[str, str]] = None
# lazy evaluated, hopefully won't change for the lifetime of this object
@property
def cardFormats(self):
def cardFormats(self) -> Mapping[str, str]:
if not self._cardFormats:
self._cardFormats = self.get_cardFormats()
return self._cardFormats
@property
def schedulesMap(self):
if not self._schedulesMap:
self._schedulesMap = self.get_schedulesMap()
return self._schedulesMap
def scheduleMap(self) -> Mapping[str, str]:
if not self._scheduleMap:
self._scheduleMap = self.get_scheduleMap()
return self._scheduleMap
def doImport(self, params=None, files=None):
def doImport(
self,
params: Optional[Mapping[str, str]] = None,
files: Optional[Mapping[str, Tuple[str, Union[IO[str], str], str]]] = None,
) -> None:
"""Send a request to the door control import script"""
r = self.session.post(
"https://" + self.ip + "/cgi-bin/import.cgi",
@ -75,7 +103,7 @@ class DoorController:
):
raise RemoteError(r)
def doCSVImport(self, csv):
def doCSVImport(self, csv: Union[IO[str], str]) -> None:
"""Do the CSV import procedure on a door control"""
self.doImport({"task": "importInit"})
self.doImport(
@ -84,7 +112,11 @@ class DoorController:
)
self.doImport({"task": "importDone"})
def doXMLRequest(self, xml, prefix=b'<?xml version="1.0" encoding="UTF-8"?>'):
def doXMLRequest(
self,
xml: Union[etree.Element, bytes],
prefix: bytes = b'<?xml version="1.0" encoding="UTF-8"?>',
) -> etree.XML:
if not isinstance(xml, bytes):
xml = etree.tostring(xml)
r = self.session.get(
@ -98,7 +130,7 @@ class DoorController:
raise RemoteError(r)
return resp_xml
def get_scheduleMap(self):
def get_scheduleMap(self) -> Mapping[str, str]:
schedules = self.doXMLRequest(
ROOT(E.Schedules({"action": "LR", "recordOffset": "0", "recordCount": "8"}))
)
@ -106,7 +138,7 @@ class DoorController:
fmt.attrib["scheduleName"]: fmt.attrib["scheduleID"] for fmt in schedules[0]
}
def get_schedules(self):
def get_schedules(self) -> etree.Element:
# TODO: might be able to do in one request
schedules = self.doXMLRequest(ROOT(E.Schedules({"action": "LR"})))
etree.dump(schedules)
@ -123,7 +155,7 @@ class DoorController:
)
return ROOT(E_corp.Schedules({"action": "AD"}, *[s[0] for s in data]))
def set_schedules(self, schedules):
def set_schedules(self, schedules: etree.Element) -> None:
# clear all people
outString = StringIO()
writer = csv.DictWriter(outString, fieldnames)
@ -148,12 +180,14 @@ class DoorController:
# load new schedules
self.doXMLRequest(schedules)
def set_cardholder_schedules(self, cardholderID, schedules):
def set_cardholder_schedules(
self, cardholderID: str, schedules: Iterable[str]
) -> etree.XML:
roles = [
E.Role(
{
"roleID": cardholderID,
"scheduleID": self.schedulesMap[schedule],
"scheduleID": self.scheduleMap[schedule],
"resourceID": "0",
}
)
@ -166,7 +200,7 @@ class DoorController:
return self.doXMLRequest(ROOT(roleSet))
def get_cardFormats(self):
def get_cardFormats(self) -> Mapping[str, str]:
cardFormats = self.doXMLRequest(
ROOT(E.CardFormats({"action": "LR", "responseFormat": "expanded"}))
)
@ -176,7 +210,9 @@ class DoorController:
for fmt in cardFormats[0].findall("{*}CardFormat[{*}FixedField]")
}
def set_cardFormat(self, formatName, templateID, facilityCode):
def set_cardFormat(
self, formatName: str, templateID: int, facilityCode: int
) -> etree.XML:
# TODO: add ability to delete formats
# delete example: <hid:CardFormats action="DD" formatID="7-1-244"/>
@ -191,8 +227,14 @@ class DoorController:
)
return self.doXMLRequest(el)
def get_records(self, req, count, params={}, stopFunction=None):
result = []
def get_records(
self,
req: ElementMaker,
count: int,
params: Mapping[str, str] = {},
stopFunction: Optional[Callable[[List[etree.Element]], bool]] = None,
) -> List[etree.Element]:
result: List[etree.Element] = []
recordCount = 0
moreRecords = True
@ -224,15 +266,17 @@ class DoorController:
return result
def get_cardholders(self):
def get_cardholders(self) -> List[etree.Element]:
return self.get_records(E.Cardholders, 1000, {"responseFormat": "expanded"})
def add_cardholder(self, attribs):
def add_cardholder(self, attribs: Mapping[str, str]) -> etree.XML:
return self.doXMLRequest(
ROOT(E.Cardholders({"action": "AD"}, E.Cardholder(attribs)))
)
def update_cardholder(self, cardholderID, attribs):
def update_cardholder(
self, cardholderID: str, attribs: Mapping[str, str]
) -> etree.XML:
return self.doXMLRequest(
ROOT(
E.Cardholders(
@ -242,10 +286,12 @@ class DoorController:
)
)
def get_credentials(self):
def get_credentials(self) -> List[etree.Element]:
return self.get_records(E.Credentials, 1000)
def add_credentials(self, credentials, cardholderID=None):
def add_credentials(
self, credentials: Iterable[Credential], cardholderID: Optional[str] = None
) -> etree.XML:
"""Create new Credentials. If a cardholderID is provided, assign the
new credentials to that cardholder"""
creds = [
@ -263,7 +309,9 @@ class DoorController:
return self.doXMLRequest(ROOT(E.Credentials({"action": "AD"}, *creds)))
def assign_credential(self, credential, cardholderID=None):
def assign_credential(
self, credential: Credential, cardholderID: Optional[str] = None
) -> etree.XML:
# empty string removes assignment
if cardholderID is None:
cardholderID = ""
@ -281,11 +329,13 @@ class DoorController:
)
)
def get_events(self, threshold):
def event_newer_than_threshold(event):
def get_events(self, threshold: datetime) -> List[etree.Element]:
def event_newer_than_threshold(event: etree.Element) -> bool:
return datetime.fromisoformat(event.attrib["timestamp"]) > threshold
def last_event_newer_than_threshold(events):
def last_event_newer_than_threshold(
events: List[etree.Element],
) -> etree.Element:
return (not events) or event_newer_than_threshold(events[-1])
return [
@ -296,13 +346,13 @@ class DoorController:
if event_newer_than_threshold(event)
]
def get_lock(self):
def get_lock(self) -> Union[Literal["locked"], Literal["unlocked"]]:
el = ROOT(E.Doors({"action": "LR", "responseFormat": "status"}))
xml = self.doXMLRequest(el)
relayState = xml.find("./{*}Doors/{*}Door").attrib["relayState"]
return "unlocked" if relayState == "set" else "locked"
def set_lock(self, lock=True):
def set_lock(self, lock: bool = True) -> etree.XML:
el = ROOT(
E.Doors({"action": "CM", "command": "lockDoor" if lock else "unlockDoor"})
)