cmsmanage/doorcontrol/tasks/update_doors.py
Adam Goldsmith b4329a5b77
Some checks failed
Ruff / ruff (push) Successful in 34s
Test / test (push) Failing after 3m31s
doorcontrol: Keep better track of which cards are 26 bit
2024-12-03 18:58:52 -05:00

317 lines
11 KiB
Python

import dataclasses
import logging
from datetime import timedelta
from typing import TypedDict
from django.utils import timezone
from django_q.tasks import async_task
from cmsmanage.django_q2_helper import q_task_group
from doorcontrol.hid.Credential import Credential
from doorcontrol.hid.DoorController import ROOT, E
from doorcontrol.models import AttributeScheduleRule, Door, FlagScheduleRule
from membershipworks.models import Member
logger = logging.getLogger(__name__)
class CardholderAttribs(TypedDict):
forename: str
middleName: str
surname: str
email: str
phone: str
custom1: str
custom2: str
@dataclasses.dataclass
class DoorMember:
door: Door
attribs: CardholderAttribs
credentials: set[Credential]
schedules: set[str]
cardholderID: str | None = None
@classmethod
def from_membershipworks_member(cls, member: Member, door: Door):
credentials = set()
if member.access_card_facility_code and member.access_card_number:
credentials.add(
Credential.from_code(
member.access_card_facility_code,
member.access_card_number,
)
)
if member.nfc_card_number:
credentials.add(Credential.from_raw_hex(member.nfc_card_number))
reasons_and_schedules: dict[str, str] = {}
if (
member.is_active
or member.flags.filter(name="Misc. Access", type="folder").exists()
) and getattr(member, door.access_field):
# TODO: could this be annotated?
reasons_and_schedules |= FlagScheduleRule.objects.filter(
doors=door, flag__members=member
).values_list("flag__name", "schedule__name")
# TODO: this seems like it could be cleaner
reasons_and_schedules |= {
attribute_rule.access_field: attribute_rule.schedule.name
for attribute_rule in AttributeScheduleRule.objects.filter(doors=door)
if getattr(member, attribute_rule.access_field)
}
# grant instructors access for ~1 hour around their class times
if hasattr(member, "eventinstructor") and getattr(member, door.access_field):
now = timezone.now()
margin = timedelta(hours=1)
if member.eventinstructor.eventext_set.filter(
occurred=True,
meeting_times__start__lt=now + margin,
meeting_times__end__gt=now - margin,
).exists():
reasons_and_schedules["Instructor for Active Class"] = "Unlimited"
reasons = sorted(reasons_and_schedules.keys())
return cls(
door=door,
attribs={
"forename": member.first_name,
"middleName": "",
"surname": member.last_name,
"email": member.email,
"phone": member.phone,
"custom1": "|".join(reasons).replace("&", "and"),
"custom2": member.uid,
},
credentials=credentials,
schedules=set(reasons_and_schedules.values()),
)
@classmethod
def from_cardholder(cls, data, door: Door):
return cls(
door=door,
attribs={
"forename": data.get("forename", ""),
"middleName": data.attrib.get("middleName", ""),
"surname": data.get("surname", ""),
"email": data.attrib.get("email", ""),
"phone": data.attrib.get("phone", ""),
"custom1": data.attrib.get("custom1", ""),
"custom2": data.attrib.get("custom2", ""),
},
cardholderID=data.attrib["cardholderID"],
credentials={
(
Credential.from_26bit_hex(c.attrib["rawCardNumber"])
if "formatID" in c.attrib
else Credential.from_raw_hex(c.attrib["rawCardNumber"])
)
for c in data.findall("{*}Credential")
},
schedules={r.attrib["scheduleName"] for r in data.findall("{*}Role")},
)
@property
def membershipworks_id(self):
return self.attribs["custom2"]
@property
def full_name(self):
return f"{self.attribs['forename']} {self.attribs['surname']}"
def update_attribs(self):
self.door.controller.doXMLRequest(
ROOT(
E.Cardholders(
{"action": "UD", "cardholderID": self.cardholderID},
E.CardHolder(self.attribs),
)
)
)
def update_credentials(
self,
existing_door_credentials: set[Credential],
all_members: list["DoorMember"],
old_credentials: set[Credential],
):
# cardholderID should be set on a member before this is called
assert self.cardholderID is not None
other_assigned_cards = {
card for m in all_members if m != self for card in m.credentials
}
# cards removed, and won't be reassigned to someone else
for card in (old_credentials - self.credentials) - other_assigned_cards:
self.door.controller.update_credential(card.hex, "")
if self.credentials - old_credentials: # cards added
for card in (
self.credentials & other_assigned_cards
): # new card exists in another member
logger.info(
[
m
for m in all_members
for card in m.credentials
if card in self.credentials
]
)
raise Exception(f"Duplicate Card in input data! {card}")
# card existed in door.controller, and needs to be reassigned
for card in self.credentials & existing_door_credentials:
self.door.controller.update_credential(card.hex, self.cardholderID)
# cards that never existed, and need to be created
if self.credentials - existing_door_credentials:
xml_credentials = [
(
E.Credential(
{
"formatName": str(credential.facility_code),
"cardNumber": str(credential.card_number),
"formatID": self.door.card_formats[
str(credential.facility_code)
],
"isCard": "true",
"cardholderID": self.cardholderID,
}
)
if credential.is_26bit
else E.Credential(
{
"cardNumber": credential.hex,
"isCard": "true",
"cardholderID": self.cardholderID,
}
)
)
for credential in self.credentials - existing_door_credentials
]
self.door.controller.doXMLRequest(
ROOT(E.Credentials({"action": "AD"}, *xml_credentials))
)
def update_schedules(self):
roles = [
E.Role(
{
"roleID": self.cardholderID,
"scheduleID": self.door.schedules_map[schedule],
"resourceID": "0",
}
)
for schedule in self.schedules
]
self.door.controller.doXMLRequest(
ROOT(
E.RoleSet(
{"action": "UD", "roleSetID": self.cardholderID}, E.Roles(*roles)
)
)
)
def update_door(door: Door, dry_run: bool = False):
logger.info(f"Updating {door}")
logger.debug(f"Fetching members from database for {door}")
members = [
DoorMember.from_membershipworks_member(membershipworks_member, door)
for membershipworks_member in (Member.objects.with_is_active()).all()
]
logger.debug(f"Fetching cardholders from {door}")
cardholders = {
member.membershipworks_id: member
for member in [
DoorMember.from_cardholder(ch, door)
for ch in door.controller.get_cardholders()
]
}
logger.debug(f"Fetching credentials from {door}")
existing_door_credentials = {
(
Credential.from_26bit_hex(c.attrib["rawCardNumber"])
if "formatID" in c.attrib
else Credential.from_raw_hex(c.attrib["rawCardNumber"])
)
for c in door.controller.get_credentials()
}
logger.debug(f"Syncing members with {door}")
# TODO: can I combine requests?
for member in members:
# cardholder did not exist, so add them
if member.membershipworks_id not in cardholders:
logger.info(f"Adding Member {member.full_name}: {member}")
if not dry_run:
resp = door.controller.doXMLRequest(
ROOT(E.Cardholders({"action": "AD"}, E.Cardholder(member.attribs)))
)
member.cardholderID = resp.find("{*}Cardholders/{*}Cardholder").attrib[
"cardholderID"
]
member.update_attribs()
member.update_credentials(existing_door_credentials, members, set())
member.update_schedules()
# cardholder exists, compare contents
else:
existing_cardholder = cardholders.pop(member.membershipworks_id)
member.cardholderID = existing_cardholder.cardholderID
if member.attribs != existing_cardholder.attribs:
changes = {
k: f'"{existing_cardholder.attribs[k]}" -> "{v}"'
for k, v in member.attribs.items()
if existing_cardholder.attribs[k] != v
}
logger.info(f"Updating profile for {member.full_name}: {changes}")
if not dry_run:
member.update_attribs()
if member.credentials != existing_cardholder.credentials:
logger.info(
f"Updating card for {member.full_name}:"
f" {existing_cardholder.credentials} -> {member.credentials}"
)
if not dry_run:
member.update_credentials(
existing_door_credentials,
members,
old_credentials=existing_cardholder.credentials,
)
if member.schedules != existing_cardholder.schedules:
logger.info(
f"Updating schedule for {member.full_name}:"
f" {existing_cardholder.schedules} -> {member.schedules}"
)
if not dry_run:
member.update_schedules()
# TODO: delete cardholders that are no longer members?
@q_task_group("Update Door Controller Members and Cards")
def q_update_all_doors():
for door in Door.objects.all():
async_task(
update_door,
door,
cluster="internal",
group=f"Update HID Door Controller - {door.name}",
)