Adam Goldsmith c8b3edcacf
All checks were successful
Ruff / ruff (push) Successful in 1m0s
Test / test (push) Successful in 8m52s
doorcontrol: Add report for assigned NFC cards
2025-01-03 21:26:51 -05:00

573 lines
19 KiB
Python

import datetime
import itertools
from typing import TYPE_CHECKING, Any
from django.conf import settings
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.mixins import PermissionRequiredMixin
from django.contrib.postgres.aggregates import StringAgg
from django.core.exceptions import BadRequest
from django.db.models import Count, F, FloatField, Func, Q, Value, Window
from django.db.models.functions import Lead, NullIf, Trunc
from django.http import Http404, HttpRequest, HttpResponse
from django.shortcuts import render
from django.urls import path, reverse_lazy
from django.utils.text import slugify
from django.views.generic import TemplateView
from django.views.generic.list import ListView
import django_filters
import django_q.tasks as q2_tasks
import django_tables2 as tables
from django_filters.views import BaseFilterView
from django_q.signing import BadSignature
from django_tables2 import SingleTableMixin
from django_tables2.export.views import ExportMixin
from pydantic import BaseModel, Field, ValidationError
from unifi_access import AccessClient, ResponseCode, UnifiAccessError
from unifi_access.schemas import (
FullUser,
NfcCard,
NfcCardEnrollmentSessionId,
NfcCardEnrollmentStatus,
User,
UserId,
UserStatus,
)
from doorcontrol.forms import AssignedNfcCardsReportFilters
from doorcontrol.tasks.update_unifi_access import update_access_users
from membershipworks.models import Member
from .models import Door, HIDEvent
from .tables import (
AssignedNfcCardsTable,
BusiestDayOfWeekTable,
BusiestTimeOfDayTable,
DeniedAccessTable,
DetailByDayTable,
MostActiveMembersTable,
UnitTimeTable,
)
if TYPE_CHECKING:
from django.core.paginator import Page
REPORTS = []
def register_report(cls: "type[BaseAccessReport]"):
REPORTS.append(cls)
return cls
class AccessReportFilterSet(django_filters.FilterSet):
timestamp = django_filters.DateFromToRangeFilter()
door = django_filters.ModelMultipleChoiceFilter(
queryset=Door.objects.all(), distinct=False
)
class BaseAccessReport(
BaseFilterView, ExportMixin, SingleTableMixin, PermissionRequiredMixin, ListView
):
model = HIDEvent
permission_required = "doorcontrol.view_hidevent"
paginate_by = 20
context_object_name = "object_list"
template_name = "doorcontrol/access_report.dj.html"
export_formats = ("csv", "xlsx", "ods")
filterset_class = AccessReportFilterSet
_report_name: str
@classmethod
def _report_types(cls):
yield [
cls._report_name,
reverse_lazy("doorcontrol:" + slugify(cls._report_name)),
]
@classmethod
def _urlpattern(cls):
slug = slugify(cls._report_name)
return path(f"reports/{slug}", cls.as_view(), name=slug)
@property
def export_name(self):
return slugify(self._report_name)
def _selected_report(self):
return self._report_name
def get_paginate_by(self, queryset) -> int | None:
if "items_per_page" in self.request.GET:
return int(self.request.GET["items_per_page"])
return super().get_paginate_by(queryset)
def get_queryset(self):
return super().get_queryset().select_related("door")
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["report_types"] = [
rt for report in REPORTS for rt in report._report_types()
]
page: Page = context["page_obj"]
context["paginator_range"] = page.paginator.get_elided_page_range(page.number)
context["selected_report"] = self._selected_report()
context["items_per_page"] = self.get_paginate_by(None)
return context
@register_report
class AccessPerUnitTime(BaseAccessReport):
table_class = UnitTimeTable
UNIT_TIMES = ["day", "week", "month", "year"]
@classmethod
def _report_types(cls):
for unit_time in cls.UNIT_TIMES:
yield (
"Access per " + unit_time.title(),
reverse_lazy("doorcontrol:access-per-unit-time", args=[unit_time]),
)
@classmethod
def _urlpattern(cls):
return path(
"reports/access-per-<unit_time>",
cls.as_view(),
name="access-per-unit-time",
)
@property
def _report_name(self):
unit_time = self.kwargs["unit_time"]
return "Access per " + unit_time.title()
def _selected_report(self) -> str:
return "Access per " + self.kwargs["unit_time"].title()
def get_table_kwargs(self):
unit_time = self.kwargs["unit_time"]
if unit_time == "week":
unit_time_column = tables.TemplateColumn(
verbose_name=unit_time.title(),
template_code=(
"{{ value|date|default:default }} - "
"{{ value|add:one_week|date|default:default }}"
),
extra_context={"one_week": datetime.timedelta(weeks=1)},
)
else:
if unit_time == "day":
date_format = "DATE_FORMAT"
elif unit_time == "month":
date_format = "N Y"
elif unit_time == "year":
date_format = "Y"
unit_time_column = tables.DateColumn(
date_format, verbose_name=unit_time.title()
)
return {
"sequence": ("unit_time", "..."),
"extra_columns": (("unit_time", unit_time_column),),
}
def get_table_data(self):
unit_time = self.kwargs["unit_time"]
if unit_time not in self.UNIT_TIMES:
raise BadRequest("unit time must be one of day, week, month, or year")
return (
super()
.get_table_data()
.filter(event_type__in=HIDEvent.EventType.any_granted_access())
.with_member_id()
.values(unit_time=Trunc("timestamp", unit_time))
.annotate(
members=Count("member_id", distinct=True),
members_delta=(
F("members")
/ Window(
Lead(NullIf("members", 0.0)),
order_by="-unit_time",
output_field=FloatField(),
)
* 100
- 100
),
access_count=Count("cardholder_id"),
access_count_delta=(
F("access_count")
/ Window(
Lead(NullIf("access_count", 0.0)),
order_by="-unit_time",
output_field=FloatField(),
)
* 100
- 100
),
)
.order_by("-unit_time")
)
@register_report
class DeniedAccess(BaseAccessReport):
_report_name = "Denied Access"
table_class = DeniedAccessTable
def get_table_data(self):
denied_event_types = [
t for t in HIDEvent.EventType if t.name.startswith("DENIED_ACCESS")
]
return super().get_table_data().filter(event_type__in=denied_event_types)
@register_report
class MostActiveMembers(BaseAccessReport):
_report_name = "Most Active Members"
table_class = MostActiveMembersTable
def get_table_data(self):
return (
super()
.get_table_data()
.with_member_id()
.filter(member_id__isnull=False)
.values("member_id")
.annotate(
access_count=Count("member_id"),
name=StringAgg(
Func(Value(" "), "forename", "surname", function="concat_ws"),
", ",
distinct=True,
),
)
.order_by("-access_count")
)
@register_report
class DetailByDay(BaseAccessReport):
_report_name = "Detail by Day"
table_class = DetailByDayTable
def get_table_data(self):
return (
super()
.get_table_data()
.with_member_id()
.values("timestamp__date", "member_id")
.filter(member_id__isnull=False)
.annotate(
access_count=Count("member_id"),
granted_access_count=Count(
"member_id",
filter=Q(event_type__in=HIDEvent.EventType.any_granted_access()),
),
name=StringAgg(
Func(Value(" "), "forename", "surname", function="concat_ws"),
", ",
distinct=True,
),
)
.order_by("-timestamp__date")
)
@register_report
class BusiestDayOfWeek(BaseAccessReport):
_report_name = "Busiest Day of the Week"
table_pagination = False
table_class = BusiestDayOfWeekTable
def get_table_data(self):
return (
super()
.get_table_data()
.with_member_id()
.values("timestamp__week_day")
.annotate(
events=Count("timestamp"), members=Count("member_id", distinct=True)
)
)
@register_report
class BusiestTimeOfDay(BaseAccessReport):
_report_name = "Busiest Time of Day"
table_pagination = False
table_class = BusiestTimeOfDayTable
def get_table_data(self):
return (
super()
.get_table_data()
.with_member_id()
.values("timestamp__hour")
.annotate(
events=Count("timestamp"), members=Count("member_id", distinct=True)
)
)
def fetch_access_users_results(force_refresh: bool = False) -> list[FullUser] | None:
task_group = update_access_users.q_task_group
try:
if force_refresh:
q2_tasks.delete_group(task_group)
refresh_task_id = q2_tasks.async_task(
update_access_users, group=task_group, cached=5 * 60
)
return q2_tasks.result(refresh_task_id, wait=-1, cached=True)
update_users_results = q2_tasks.result_group(task_group, cached=True)
if update_users_results and len(update_users_results) > 0:
return update_users_results[0]
# TODO: this could be better
except BadSignature:
return None
@login_required
@permission_required("doorcontrol.assign_nfc_card", raise_exception=True)
def assign_nfc_card_user_selector(request: HttpRequest):
template_name = "doorcontrol/assign_nfc_card_user_selector.dj.html"
filtered_users = []
if request.method == "POST":
all_users = fetch_access_users_results(
request.POST.get("force_refresh") == "true"
)
template_name += "#results"
all_filtered_users = (
user
for user in all_users or []
if user.status == UserStatus.ACTIVE
and request.POST.get("search", "").lower() in user.full_name.lower()
)
filtered_users = list(itertools.islice(all_filtered_users, 10))
return render(request, template_name, {"users": filtered_users})
class AssignNfcCardStatus(BaseModel):
class ErrorEntry(BaseModel):
count: int
code: ResponseCode | None
msg: str
extra_details: str | None
session_id: NfcCardEnrollmentSessionId | None = None
last_status: NfcCardEnrollmentStatus | None = None
errors: list[ErrorEntry] = Field(default_factory=list)
card: NfcCard | None = None
user: User
def append_error(
self, error: UnifiAccessError, extra_details: str | None = None
) -> None:
if self.errors and self.errors[-1].code == error.code:
self.errors[-1].count += 1
else:
self.errors.append(
self.ErrorEntry(
count=1,
code=error.code,
msg=error.msg,
extra_details=extra_details,
)
)
def append_raw_error(self, msg: str, extra_details: str | None = None) -> None:
self.errors.append(
self.ErrorEntry(
count=1,
code=None,
msg=msg,
extra_details=extra_details,
)
)
class AssignNfcCardView(PermissionRequiredMixin, TemplateView):
# for storage in request.session
ENROLLMENT_STATUS_SESSION_KEY = "unifi_access_enrollment_status"
permission_required = "doorcontrol.assign_nfc_card"
template_name = "doorcontrol/assign_nfc_card.dj.html"
def get_template_names(self) -> list[str]:
templates = super().get_template_names()
if (
self.request.method == "GET" and (part := self.request.GET.get("part"))
) or (
self.request.method == "POST" and (part := self.request.POST.get("part"))
):
return [f"{template_name}#{part}" for template_name in templates]
else:
return templates
def setup(self, request: HttpRequest, *args: Any, **kwargs: Any) -> None:
super().setup(request, *args, **kwargs)
self.access_client = AccessClient(
settings.UNIFI_ACCESS_HOST, settings.UNIFI_ACCESS_API_TOKEN, verify=False
)
try:
status = AssignNfcCardStatus.model_validate(
request.session.get(self.ENROLLMENT_STATUS_SESSION_KEY, "{}")
)
except ValidationError:
status = None
if status is None or status.user.id != self.kwargs["user_id"]:
try:
user = self.access_client.fetch_user(UserId(self.kwargs["user_id"]))
except UnifiAccessError as e:
if e.code == ResponseCode.USER_ACCOUNT_NOT_EXIST:
raise Http404(
"No account with that id exists in UniFi Access"
) from e
else:
raise e
status = AssignNfcCardStatus(user=user)
self.status = status
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
self.request.session[self.ENROLLMENT_STATUS_SESSION_KEY] = (
self.status.model_dump()
)
return super().get_context_data(**kwargs) | self.status.model_dump()
def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
# poll an in-progress session
if self.status.session_id:
try:
self.status.last_status = self.access_client.fetch_enroll_card_status(
self.status.session_id
)
self.status.card = self.access_client.fetch_nfc_card(
self.status.last_status.token
)
self.access_client.remove_enrollment_session(self.status.session_id)
self.status.session_id = None
except UnifiAccessError as e:
match e.code:
case ResponseCode.CREDS_NFC_READ_SESSION_NOT_FOUND:
self.status.session_id = None
case ResponseCode.CREDS_NFC_READ_POLL_TOKEN_EMPTY:
# all is well, the reader just hasn't seen a card yet
pass
case ResponseCode.CREDS_NFC_CARD_IS_PROVISION:
self.status.session_id = None
self.status.append_error(
e,
"This card will need to be added by someone with admin access to the UniFi Access application",
)
case _:
self.status.append_error(e)
return super().get(request, *args, **kwargs)
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
if "assign" in request.POST:
if not self.status.last_status:
self.status.append_raw_error(
"Missing session status. Please start a new session and try again."
)
elif request.POST.get("id") != self.status.last_status.id:
self.status.append_raw_error(
"Mismatched session status. Please start a new session and try again."
)
else:
try:
self.access_client.assign_nfc_card_to_user(
self.status.user.id, self.status.last_status.token
)
self.status.card = self.access_client.fetch_nfc_card(
self.status.last_status.token
)
except UnifiAccessError as e:
self.status.append_error(e)
else:
# remove old session, if it exists
if self.status.session_id:
self.access_client.remove_enrollment_session(self.status.session_id)
# start a new session
self.status = AssignNfcCardStatus(user=self.status.user)
self.status.session_id = self.access_client.begin_enroll_card(
settings.UNIFI_ACCESS_CARD_ASSIGNMENT_DEVICE
).session_id
return super().get(request, *args, **kwargs)
class AssignedNfcCardsReport(
ExportMixin, SingleTableMixin, PermissionRequiredMixin, TemplateView
):
permission_required = "doorcontrol.assign_nfc_card"
template_name = "doorcontrol/assigned_nfc_cards_report.dj.html"
table_class = AssignedNfcCardsTable
export_formats = ("csv", "xlsx", "ods")
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
if "form" not in kwargs:
kwargs["form"] = AssignedNfcCardsReportFilters(self.request.GET)
return super().get_context_data(**kwargs)
def get_table_data(self):
access_users = fetch_access_users_results(
force_refresh=("refresh" in self.request.GET)
)
if access_users:
access_users_by_employee_number = {
user.employee_number: user for user in access_users
}
else:
access_users_by_employee_number = {}
form = AssignedNfcCardsReportFilters(self.request.GET)
def get_filtered_members():
members = Member.objects.with_is_active().filter(is_active=True)
if form.is_valid() and form.cleaned_data["has_mw_nfc_card"] is not None:
members = members.alias(
has_nfc_card_number=(
Q(nfc_card_number__isnull=False) & ~Q(nfc_card_number="")
)
).filter(has_nfc_card_number=form.cleaned_data["has_mw_nfc_card"])
for member in members.all():
access_user = access_users_by_employee_number.get(member.uid, None)
if (
form.is_valid()
and form.cleaned_data["has_access_nfc_card"] is not None
and access_user
and (
bool(access_user.nfc_cards)
!= form.cleaned_data["has_access_nfc_card"]
)
):
continue
yield {"member": member, "access_user": access_user}
return list(get_filtered_members())