517 lines
17 KiB
Python
Raw Normal View History

import datetime
import itertools
from typing import TYPE_CHECKING, Any
from django.conf import settings
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.mixins import PermissionRequiredMixin
from django.contrib.postgres.aggregates import StringAgg
from django.core.exceptions import BadRequest
from django.db.models import Count, F, FloatField, Func, Q, Value, Window
from django.db.models.functions import Lead, NullIf, Trunc
from django.http import Http404, HttpRequest, HttpResponse
from django.shortcuts import render
from django.urls import path, reverse_lazy
from django.utils.text import slugify
from django.views.generic import TemplateView
from django.views.generic.list import ListView
import django_filters
import django_q.tasks as q2_tasks
import django_tables2 as tables
from django_filters.views import BaseFilterView
from django_tables2 import SingleTableMixin
from django_tables2.export.views import ExportMixin
from pydantic import BaseModel, Field, ValidationError
from unifi_access import AccessClient, ResponseCode, UnifiAccessError
from unifi_access.schemas import (
FullUser,
NfcCard,
NfcCardEnrollmentSessionId,
NfcCardEnrollmentStatus,
User,
UserId,
UserStatus,
)
from .models import Door, HIDEvent
from .tables import (
BusiestDayOfWeekTable,
BusiestTimeOfDayTable,
DeniedAccessTable,
DetailByDayTable,
MostActiveMembersTable,
UnitTimeTable,
)
if TYPE_CHECKING:
from django.core.paginator import Page
REPORTS = []
2024-05-04 16:38:51 -04:00
def register_report(cls: "type[BaseAccessReport]"):
REPORTS.append(cls)
return cls
class AccessReportFilterSet(django_filters.FilterSet):
timestamp = django_filters.DateFromToRangeFilter()
door = django_filters.ModelMultipleChoiceFilter(
queryset=Door.objects.all(), distinct=False
)
class BaseAccessReport(
BaseFilterView, ExportMixin, SingleTableMixin, PermissionRequiredMixin, ListView
):
model = HIDEvent
permission_required = "doorcontrol.view_hidevent"
paginate_by = 20
context_object_name = "object_list"
template_name = "doorcontrol/access_report.dj.html"
export_formats = ("csv", "xlsx", "ods")
filterset_class = AccessReportFilterSet
2024-05-04 16:38:51 -04:00
_report_name: str
@classmethod
def _report_types(cls):
yield [
cls._report_name,
reverse_lazy("doorcontrol:" + slugify(cls._report_name)),
]
@classmethod
def _urlpattern(cls):
slug = slugify(cls._report_name)
return path(f"reports/{slug}", cls.as_view(), name=slug)
@property
def export_name(self):
return slugify(self._report_name)
def _selected_report(self):
return self._report_name
2024-05-04 16:38:51 -04:00
def get_paginate_by(self, queryset) -> int | None:
if "items_per_page" in self.request.GET:
2024-05-04 16:38:51 -04:00
return int(self.request.GET["items_per_page"])
return super().get_paginate_by(queryset)
def get_queryset(self):
return super().get_queryset().select_related("door")
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["report_types"] = [
rt for report in REPORTS for rt in report._report_types()
]
page: Page = context["page_obj"]
context["paginator_range"] = page.paginator.get_elided_page_range(page.number)
context["selected_report"] = self._selected_report()
context["items_per_page"] = self.get_paginate_by(None)
return context
@register_report
class AccessPerUnitTime(BaseAccessReport):
table_class = UnitTimeTable
UNIT_TIMES = ["day", "week", "month", "year"]
@classmethod
def _report_types(cls):
for unit_time in cls.UNIT_TIMES:
yield (
"Access per " + unit_time.title(),
reverse_lazy("doorcontrol:access-per-unit-time", args=[unit_time]),
)
@classmethod
def _urlpattern(cls):
return path(
"reports/access-per-<unit_time>",
cls.as_view(),
name="access-per-unit-time",
)
@property
def _report_name(self):
unit_time = self.kwargs["unit_time"]
return "Access per " + unit_time.title()
def _selected_report(self) -> str:
return "Access per " + self.kwargs["unit_time"].title()
def get_table_kwargs(self):
unit_time = self.kwargs["unit_time"]
if unit_time == "week":
unit_time_column = tables.TemplateColumn(
verbose_name=unit_time.title(),
template_code=(
"{{ value|date|default:default }} - "
"{{ value|add:one_week|date|default:default }}"
),
extra_context={"one_week": datetime.timedelta(weeks=1)},
)
else:
if unit_time == "day":
date_format = "DATE_FORMAT"
elif unit_time == "month":
date_format = "N Y"
elif unit_time == "year":
date_format = "Y"
unit_time_column = tables.DateColumn(
date_format, verbose_name=unit_time.title()
)
return {
"sequence": ("unit_time", "..."),
"extra_columns": (("unit_time", unit_time_column),),
}
def get_table_data(self):
unit_time = self.kwargs["unit_time"]
if unit_time not in self.UNIT_TIMES:
raise BadRequest("unit time must be one of day, week, month, or year")
return (
super()
.get_table_data()
.filter(event_type__in=HIDEvent.EventType.any_granted_access())
.with_member_id()
.values(unit_time=Trunc("timestamp", unit_time))
.annotate(
members=Count("member_id", distinct=True),
members_delta=(
F("members")
/ Window(
Lead(NullIf("members", 0.0)),
order_by="-unit_time",
output_field=FloatField(),
)
* 100
- 100
),
access_count=Count("cardholder_id"),
access_count_delta=(
F("access_count")
/ Window(
Lead(NullIf("access_count", 0.0)),
order_by="-unit_time",
output_field=FloatField(),
)
* 100
- 100
),
)
.order_by("-unit_time")
)
@register_report
class DeniedAccess(BaseAccessReport):
_report_name = "Denied Access"
table_class = DeniedAccessTable
def get_table_data(self):
denied_event_types = [
t for t in HIDEvent.EventType if t.name.startswith("DENIED_ACCESS")
]
return super().get_table_data().filter(event_type__in=denied_event_types)
@register_report
class MostActiveMembers(BaseAccessReport):
_report_name = "Most Active Members"
table_class = MostActiveMembersTable
def get_table_data(self):
return (
super()
.get_table_data()
.with_member_id()
.filter(member_id__isnull=False)
.values("member_id")
.annotate(
access_count=Count("member_id"),
name=StringAgg(
Func(Value(" "), "forename", "surname", function="concat_ws"),
", ",
distinct=True,
),
)
.order_by("-access_count")
)
@register_report
class DetailByDay(BaseAccessReport):
_report_name = "Detail by Day"
table_class = DetailByDayTable
def get_table_data(self):
return (
super()
.get_table_data()
.with_member_id()
.values("timestamp__date", "member_id")
.filter(member_id__isnull=False)
.annotate(
access_count=Count("member_id"),
granted_access_count=Count(
"member_id",
filter=Q(event_type__in=HIDEvent.EventType.any_granted_access()),
),
name=StringAgg(
Func(Value(" "), "forename", "surname", function="concat_ws"),
", ",
distinct=True,
),
)
.order_by("-timestamp__date")
)
@register_report
class BusiestDayOfWeek(BaseAccessReport):
_report_name = "Busiest Day of the Week"
table_pagination = False
table_class = BusiestDayOfWeekTable
def get_table_data(self):
return (
super()
.get_table_data()
.with_member_id()
.values("timestamp__week_day")
.annotate(
events=Count("timestamp"), members=Count("member_id", distinct=True)
)
)
@register_report
class BusiestTimeOfDay(BaseAccessReport):
_report_name = "Busiest Time of Day"
table_pagination = False
table_class = BusiestTimeOfDayTable
def get_table_data(self):
return (
super()
.get_table_data()
.with_member_id()
.values("timestamp__hour")
.annotate(
events=Count("timestamp"), members=Count("member_id", distinct=True)
)
)
def update_access_users() -> list[FullUser]:
access_client = AccessClient(
settings.UNIFI_ACCESS_HOST, settings.UNIFI_ACCESS_API_TOKEN, verify=False
)
return list(access_client.fetch_all_users__unpaged())
@login_required
@permission_required("doorcontrol.assign_nfc_card", raise_exception=True)
def assign_nfc_card_user_selector(request: HttpRequest):
template_name = "doorcontrol/assign_nfc_card_user_selector.dj.html"
task_group = "update_access_users"
all_users: list[FullUser] | None = None
refresh_task_id = None
update_users_results = q2_tasks.result_group(task_group, cached=True)
if (
update_users_results
and len(update_users_results) > 0
and not request.POST.get("force_refresh")
):
all_users = update_users_results[0]
else:
q2_tasks.delete_group(task_group)
refresh_task_id = q2_tasks.async_task(
update_access_users, group=task_group, cached=5 * 60
)
filtered_users = []
if request.method == "POST":
if refresh_task_id:
all_users = q2_tasks.result(refresh_task_id, wait=-1, cached=True)
template_name += "#results"
all_filtered_users = (
user
for user in all_users or []
if user.status == UserStatus.ACTIVE
and request.POST.get("search", "").lower() in user.full_name.lower()
)
filtered_users = list(itertools.islice(all_filtered_users, 10))
return render(request, template_name, {"users": filtered_users})
class AssignNfcCardStatus(BaseModel):
class ErrorEntry(BaseModel):
count: int
code: ResponseCode | None
msg: str
extra_details: str | None
session_id: NfcCardEnrollmentSessionId | None = None
last_status: NfcCardEnrollmentStatus | None = None
errors: list[ErrorEntry] = Field(default_factory=list)
card: NfcCard | None = None
user: User
def append_error(
self, error: UnifiAccessError, extra_details: str | None = None
) -> None:
if self.errors and self.errors[-1].code == error.code:
self.errors[-1].count += 1
else:
self.errors.append(
self.ErrorEntry(
count=1,
code=error.code,
msg=error.msg,
extra_details=extra_details,
)
)
def append_raw_error(self, msg: str, extra_details: str | None = None) -> None:
self.errors.append(
self.ErrorEntry(
count=1,
code=None,
msg=msg,
extra_details=extra_details,
)
)
class AssignNfcCardView(PermissionRequiredMixin, TemplateView):
# for storage in request.session
ENROLLMENT_STATUS_SESSION_KEY = "unifi_access_enrollment_status"
requires_permission = "doorcontrol.assign_nfc_card"
template_name = "doorcontrol/assign_nfc_card.dj.html"
def get_template_names(self) -> list[str]:
templates = super().get_template_names()
if (
self.request.method == "GET" and (part := self.request.GET.get("part"))
) or (
self.request.method == "POST" and (part := self.request.POST.get("part"))
):
return [f"{template_name}#{part}" for template_name in templates]
else:
return templates
def setup(self, request: HttpRequest, *args: Any, **kwargs: Any) -> None:
super().setup(request, *args, **kwargs)
self.access_client = AccessClient(
settings.UNIFI_ACCESS_HOST, settings.UNIFI_ACCESS_API_TOKEN, verify=False
)
try:
status = AssignNfcCardStatus.model_validate(
request.session.get(self.ENROLLMENT_STATUS_SESSION_KEY, "{}")
)
except ValidationError:
status = None
if status is None or status.user.id != self.kwargs["user_id"]:
try:
user = self.access_client.fetch_user(UserId(self.kwargs["user_id"]))
except UnifiAccessError as e:
if e.code == ResponseCode.USER_ACCOUNT_NOT_EXIST:
raise Http404(
"No account with that id exists in UniFi Access"
) from e
else:
raise e
status = AssignNfcCardStatus(user=user)
self.status = status
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
self.request.session[self.ENROLLMENT_STATUS_SESSION_KEY] = (
self.status.model_dump()
)
return super().get_context_data(**kwargs) | self.status.model_dump()
def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
# poll an in-progress session
if self.status.session_id:
try:
self.status.last_status = self.access_client.fetch_enroll_card_status(
self.status.session_id
)
self.status.card = self.access_client.fetch_nfc_card(
self.status.last_status.token
)
self.access_client.remove_enrollment_session(self.status.session_id)
self.status.session_id = None
except UnifiAccessError as e:
match e.code:
case ResponseCode.CREDS_NFC_READ_SESSION_NOT_FOUND:
self.status.session_id = None
case ResponseCode.CREDS_NFC_READ_POLL_TOKEN_EMPTY:
# all is well, the reader just hasn't seen a card yet
pass
case ResponseCode.CREDS_NFC_CARD_IS_PROVISION:
self.status.session_id = None
self.status.append_error(
e,
"This card will need to be added by someone with admin access to the UniFi Access application",
)
case _:
self.status.append_error(e)
return super().get(request, *args, **kwargs)
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
if "assign" in request.POST:
if not self.status.last_status:
self.status.append_raw_error(
"Missing session status. Please start a new session and try again."
)
elif request.POST.get("id") != self.status.last_status.id:
self.status.append_raw_error(
"Mismatched session status. Please start a new session and try again."
)
else:
try:
self.access_client.assign_nfc_card_to_user(
self.status.user.id, self.status.last_status.token
)
self.status.card = self.access_client.fetch_nfc_card(
self.status.last_status.token
)
except UnifiAccessError as e:
self.status.append_error(e)
else:
# remove old session, if it exists
if self.status.session_id:
self.access_client.remove_enrollment_session(self.status.session_id)
# start a new session
self.status = AssignNfcCardStatus(user=self.status.user)
self.status.session_id = self.access_client.begin_enroll_card(
settings.UNIFI_ACCESS_CARD_ASSIGNMENT_DEVICE
).session_id
return super().get(request, *args, **kwargs)