293 lines
10 KiB
Python
293 lines
10 KiB
Python
import dataclasses
|
|
import logging
|
|
from datetime import timedelta
|
|
from typing import TypedDict
|
|
|
|
from django.utils import timezone
|
|
|
|
from django_q.tasks import async_task
|
|
|
|
from cmsmanage.django_q2_helper import q_task_group
|
|
from doorcontrol.hid.Credential import Credential
|
|
from doorcontrol.hid.DoorController import ROOT, E
|
|
from doorcontrol.models import AttributeScheduleRule, Door, FlagScheduleRule
|
|
from membershipworks.models import Member
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CardholderAttribs(TypedDict):
|
|
forename: str
|
|
middleName: str
|
|
surname: str
|
|
email: str
|
|
phone: str
|
|
custom1: str
|
|
custom2: str
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class DoorMember:
|
|
door: Door
|
|
attribs: CardholderAttribs
|
|
credentials: set[Credential]
|
|
schedules: set[str]
|
|
cardholderID: str | None = None
|
|
|
|
@classmethod
|
|
def from_membershipworks_member(cls, member: Member, door: Door):
|
|
if member.access_card_facility_code and member.access_card_number:
|
|
credentials = {
|
|
Credential(
|
|
code=(
|
|
member.access_card_facility_code,
|
|
member.access_card_number,
|
|
)
|
|
)
|
|
}
|
|
else:
|
|
credentials = set()
|
|
|
|
reasons_and_schedules: dict[str, str] = {}
|
|
if (
|
|
member.is_active
|
|
or member.flags.filter(name="Misc. Access", type="folder").exists()
|
|
) and getattr(member, door.access_field):
|
|
# TODO: could this be annotated?
|
|
reasons_and_schedules |= FlagScheduleRule.objects.filter(
|
|
doors=door, flag__members=member
|
|
).values_list("flag__name", "schedule__name")
|
|
|
|
# TODO: this seems like it could be cleaner
|
|
reasons_and_schedules |= {
|
|
attribute_rule.access_field: attribute_rule.schedule.name
|
|
for attribute_rule in AttributeScheduleRule.objects.filter(doors=door)
|
|
if getattr(member, attribute_rule.access_field)
|
|
}
|
|
|
|
# grant instructors access for ~1 hour around their class times
|
|
if hasattr(member, "eventinstructor") and getattr(member, door.access_field):
|
|
now = timezone.now()
|
|
margin = timedelta(hours=1)
|
|
if member.eventinstructor.eventext_set.filter(
|
|
occurred=True,
|
|
meeting_times__start__lt=now + margin,
|
|
meeting_times__end__gt=now - margin,
|
|
).exists():
|
|
reasons_and_schedules["Instructor for Active Class"] = "Unlimited"
|
|
|
|
reasons = sorted(reasons_and_schedules.keys())
|
|
|
|
return cls(
|
|
door=door,
|
|
attribs={
|
|
"forename": member.first_name,
|
|
"middleName": "",
|
|
"surname": member.last_name,
|
|
"email": member.email,
|
|
"phone": member.phone,
|
|
"custom1": "|".join(reasons).replace("&", "and"),
|
|
"custom2": member.uid,
|
|
},
|
|
credentials=credentials,
|
|
schedules=set(reasons_and_schedules.values()),
|
|
)
|
|
|
|
@classmethod
|
|
def from_cardholder(cls, data, door: Door):
|
|
return cls(
|
|
door=door,
|
|
attribs={
|
|
"forename": data.get("forename", ""),
|
|
"middleName": data.attrib.get("middleName", ""),
|
|
"surname": data.get("surname", ""),
|
|
"email": data.attrib.get("email", ""),
|
|
"phone": data.attrib.get("phone", ""),
|
|
"custom1": data.attrib.get("custom1", ""),
|
|
"custom2": data.attrib.get("custom2", ""),
|
|
},
|
|
cardholderID=data.attrib["cardholderID"],
|
|
credentials={
|
|
Credential(hex=(c.attrib["rawCardNumber"]))
|
|
for c in data.findall("{*}Credential")
|
|
},
|
|
schedules={r.attrib["scheduleName"] for r in data.findall("{*}Role")},
|
|
)
|
|
|
|
@property
|
|
def membershipworks_id(self):
|
|
return self.attribs["custom2"]
|
|
|
|
@property
|
|
def full_name(self):
|
|
return f"{self.attribs['forename']} {self.attribs['surname']}"
|
|
|
|
def update_attribs(self):
|
|
self.door.controller.doXMLRequest(
|
|
ROOT(
|
|
E.Cardholders(
|
|
{"action": "UD", "cardholderID": self.cardholderID},
|
|
E.CardHolder(self.attribs),
|
|
)
|
|
)
|
|
)
|
|
|
|
def update_credentials(
|
|
self,
|
|
existing_door_credentials: set[Credential],
|
|
all_members: list["DoorMember"],
|
|
old_credentials: set[Credential] = set(),
|
|
):
|
|
# cardholderID should be set on a member before this is called
|
|
assert self.cardholderID is not None
|
|
|
|
other_assigned_cards = {
|
|
card for m in all_members if m != self for card in m.credentials
|
|
}
|
|
|
|
# cards removed, and won't be reassigned to someone else
|
|
for card in (old_credentials - self.credentials) - other_assigned_cards:
|
|
self.door.controller.update_credential(card.hex, "")
|
|
|
|
if self.credentials - old_credentials: # cards added
|
|
for card in (
|
|
self.credentials & other_assigned_cards
|
|
): # new card exists in another member
|
|
logger.info(
|
|
[
|
|
m
|
|
for m in all_members
|
|
for card in m.credentials
|
|
if card in self.credentials
|
|
]
|
|
)
|
|
raise Exception(f"Duplicate Card in input data! {card}")
|
|
|
|
# card existed in door.controller, and needs to be reassigned
|
|
for card in self.credentials & existing_door_credentials:
|
|
self.door.controller.update_credential(card.hex, self.cardholderID)
|
|
|
|
# cards that never existed, and need to be created
|
|
if self.credentials - existing_door_credentials:
|
|
xml_credentials = [
|
|
E.Credential(
|
|
{
|
|
"formatName": str(credential.code[0]),
|
|
"cardNumber": str(credential.code[1]),
|
|
"formatID": self.door.card_formats[str(credential.code[0])],
|
|
"isCard": "true",
|
|
"cardholderID": self.cardholderID,
|
|
}
|
|
)
|
|
for credential in self.credentials - existing_door_credentials
|
|
]
|
|
|
|
self.door.controller.doXMLRequest(
|
|
ROOT(E.Credentials({"action": "AD"}, *xml_credentials))
|
|
)
|
|
|
|
def update_schedules(self):
|
|
roles = [
|
|
E.Role(
|
|
{
|
|
"roleID": self.cardholderID,
|
|
"scheduleID": self.door.schedules_map[schedule],
|
|
"resourceID": "0",
|
|
}
|
|
)
|
|
for schedule in self.schedules
|
|
]
|
|
|
|
self.door.controller.doXMLRequest(
|
|
ROOT(
|
|
E.RoleSet(
|
|
{"action": "UD", "roleSetID": self.cardholderID}, E.Roles(*roles)
|
|
)
|
|
)
|
|
)
|
|
|
|
|
|
def update_door(door: Door, dry_run: bool = False):
|
|
members = [
|
|
DoorMember.from_membershipworks_member(membershipworks_member, door)
|
|
for membershipworks_member in (Member.objects.with_is_active()).all()
|
|
]
|
|
|
|
cardholders = {
|
|
member.membershipworks_id: member
|
|
for member in [
|
|
DoorMember.from_cardholder(ch, door)
|
|
for ch in door.controller.get_cardholders()
|
|
]
|
|
}
|
|
|
|
existing_door_credentials = {
|
|
Credential(hex=c.attrib["rawCardNumber"])
|
|
for c in door.controller.get_credentials()
|
|
}
|
|
|
|
# TODO: can I combine requests?
|
|
for member in members:
|
|
# cardholder did not exist, so add them
|
|
if member.membershipworks_id not in cardholders:
|
|
logger.info(f"Adding Member {member.full_name}: {member}")
|
|
if not dry_run:
|
|
resp = door.controller.doXMLRequest(
|
|
ROOT(E.Cardholders({"action": "AD"}, E.Cardholder(member.attribs)))
|
|
)
|
|
member.cardholderID = resp.find("{*}Cardholders/{*}Cardholder").attrib[
|
|
"cardholderID"
|
|
]
|
|
|
|
member.update_attribs()
|
|
member.update_credentials(existing_door_credentials, members)
|
|
member.update_schedules()
|
|
|
|
# cardholder exists, compare contents
|
|
else:
|
|
existing_cardholder = cardholders.pop(member.membershipworks_id)
|
|
member.cardholderID = existing_cardholder.cardholderID
|
|
|
|
if member.attribs != existing_cardholder.attribs:
|
|
changes = {
|
|
k: f'"{existing_cardholder.attribs[k]}" -> "{v}"'
|
|
for k, v in member.attribs.items()
|
|
if existing_cardholder.attribs[k] != v
|
|
}
|
|
logger.info(f"Updating profile for {member.full_name}: {changes}")
|
|
if not dry_run:
|
|
member.update_attribs()
|
|
|
|
if member.credentials != existing_cardholder.credentials:
|
|
logger.info(
|
|
f"Updating card for {member.full_name}:"
|
|
f" {existing_cardholder.credentials} -> {member.credentials}"
|
|
)
|
|
if not dry_run:
|
|
member.update_credentials(
|
|
existing_door_credentials,
|
|
members,
|
|
old_credentials=existing_cardholder.credentials,
|
|
)
|
|
|
|
if member.schedules != existing_cardholder.schedules:
|
|
logger.info(
|
|
f"Updating schedule for {member.full_name}:"
|
|
f" {existing_cardholder.schedules} -> {member.schedules}"
|
|
)
|
|
if not dry_run:
|
|
member.update_schedules()
|
|
|
|
# TODO: delete cardholders that are no longer members?
|
|
|
|
|
|
@q_task_group("Update Door Controller Members and Cards")
|
|
def q_update_all_doors():
|
|
for door in Door.objects.all():
|
|
async_task(
|
|
update_door,
|
|
door,
|
|
cluster="internal",
|
|
group=f"Update HID Door Controller - {door.name}",
|
|
)
|