517 lines
17 KiB
Python
517 lines
17 KiB
Python
import datetime
|
|
import itertools
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth.decorators import login_required, permission_required
|
|
from django.contrib.auth.mixins import PermissionRequiredMixin
|
|
from django.contrib.postgres.aggregates import StringAgg
|
|
from django.core.exceptions import BadRequest
|
|
from django.db.models import Count, F, FloatField, Func, Q, Value, Window
|
|
from django.db.models.functions import Lead, NullIf, Trunc
|
|
from django.http import Http404, HttpRequest, HttpResponse
|
|
from django.shortcuts import render
|
|
from django.urls import path, reverse_lazy
|
|
from django.utils.text import slugify
|
|
from django.views.generic import TemplateView
|
|
from django.views.generic.list import ListView
|
|
|
|
import django_filters
|
|
import django_q.tasks as q2_tasks
|
|
import django_tables2 as tables
|
|
from django_filters.views import BaseFilterView
|
|
from django_tables2 import SingleTableMixin
|
|
from django_tables2.export.views import ExportMixin
|
|
from pydantic import BaseModel, Field, ValidationError
|
|
from unifi_access import AccessClient, ResponseCode, UnifiAccessError
|
|
from unifi_access.schemas import (
|
|
FullUser,
|
|
NfcCard,
|
|
NfcCardEnrollmentSessionId,
|
|
NfcCardEnrollmentStatus,
|
|
User,
|
|
UserId,
|
|
UserStatus,
|
|
)
|
|
|
|
from .models import Door, HIDEvent
|
|
from .tables import (
|
|
BusiestDayOfWeekTable,
|
|
BusiestTimeOfDayTable,
|
|
DeniedAccessTable,
|
|
DetailByDayTable,
|
|
MostActiveMembersTable,
|
|
UnitTimeTable,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from django.core.paginator import Page
|
|
|
|
REPORTS = []
|
|
|
|
|
|
def register_report(cls: "type[BaseAccessReport]"):
|
|
REPORTS.append(cls)
|
|
return cls
|
|
|
|
|
|
class AccessReportFilterSet(django_filters.FilterSet):
|
|
timestamp = django_filters.DateFromToRangeFilter()
|
|
door = django_filters.ModelMultipleChoiceFilter(
|
|
queryset=Door.objects.all(), distinct=False
|
|
)
|
|
|
|
|
|
class BaseAccessReport(
|
|
BaseFilterView, ExportMixin, SingleTableMixin, PermissionRequiredMixin, ListView
|
|
):
|
|
model = HIDEvent
|
|
permission_required = "doorcontrol.view_hidevent"
|
|
paginate_by = 20
|
|
context_object_name = "object_list"
|
|
template_name = "doorcontrol/access_report.dj.html"
|
|
|
|
export_formats = ("csv", "xlsx", "ods")
|
|
|
|
filterset_class = AccessReportFilterSet
|
|
|
|
_report_name: str
|
|
|
|
@classmethod
|
|
def _report_types(cls):
|
|
yield [
|
|
cls._report_name,
|
|
reverse_lazy("doorcontrol:" + slugify(cls._report_name)),
|
|
]
|
|
|
|
@classmethod
|
|
def _urlpattern(cls):
|
|
slug = slugify(cls._report_name)
|
|
return path(f"reports/{slug}", cls.as_view(), name=slug)
|
|
|
|
@property
|
|
def export_name(self):
|
|
return slugify(self._report_name)
|
|
|
|
def _selected_report(self):
|
|
return self._report_name
|
|
|
|
def get_paginate_by(self, queryset) -> int | None:
|
|
if "items_per_page" in self.request.GET:
|
|
return int(self.request.GET["items_per_page"])
|
|
return super().get_paginate_by(queryset)
|
|
|
|
def get_queryset(self):
|
|
return super().get_queryset().select_related("door")
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
context["report_types"] = [
|
|
rt for report in REPORTS for rt in report._report_types()
|
|
]
|
|
|
|
page: Page = context["page_obj"]
|
|
context["paginator_range"] = page.paginator.get_elided_page_range(page.number)
|
|
context["selected_report"] = self._selected_report()
|
|
context["items_per_page"] = self.get_paginate_by(None)
|
|
|
|
return context
|
|
|
|
|
|
@register_report
|
|
class AccessPerUnitTime(BaseAccessReport):
|
|
table_class = UnitTimeTable
|
|
UNIT_TIMES = ["day", "week", "month", "year"]
|
|
|
|
@classmethod
|
|
def _report_types(cls):
|
|
for unit_time in cls.UNIT_TIMES:
|
|
yield (
|
|
"Access per " + unit_time.title(),
|
|
reverse_lazy("doorcontrol:access-per-unit-time", args=[unit_time]),
|
|
)
|
|
|
|
@classmethod
|
|
def _urlpattern(cls):
|
|
return path(
|
|
"reports/access-per-<unit_time>",
|
|
cls.as_view(),
|
|
name="access-per-unit-time",
|
|
)
|
|
|
|
@property
|
|
def _report_name(self):
|
|
unit_time = self.kwargs["unit_time"]
|
|
return "Access per " + unit_time.title()
|
|
|
|
def _selected_report(self) -> str:
|
|
return "Access per " + self.kwargs["unit_time"].title()
|
|
|
|
def get_table_kwargs(self):
|
|
unit_time = self.kwargs["unit_time"]
|
|
if unit_time == "week":
|
|
unit_time_column = tables.TemplateColumn(
|
|
verbose_name=unit_time.title(),
|
|
template_code=(
|
|
"{{ value|date|default:default }} - "
|
|
"{{ value|add:one_week|date|default:default }}"
|
|
),
|
|
extra_context={"one_week": datetime.timedelta(weeks=1)},
|
|
)
|
|
else:
|
|
if unit_time == "day":
|
|
date_format = "DATE_FORMAT"
|
|
elif unit_time == "month":
|
|
date_format = "N Y"
|
|
elif unit_time == "year":
|
|
date_format = "Y"
|
|
|
|
unit_time_column = tables.DateColumn(
|
|
date_format, verbose_name=unit_time.title()
|
|
)
|
|
|
|
return {
|
|
"sequence": ("unit_time", "..."),
|
|
"extra_columns": (("unit_time", unit_time_column),),
|
|
}
|
|
|
|
def get_table_data(self):
|
|
unit_time = self.kwargs["unit_time"]
|
|
if unit_time not in self.UNIT_TIMES:
|
|
raise BadRequest("unit time must be one of day, week, month, or year")
|
|
|
|
return (
|
|
super()
|
|
.get_table_data()
|
|
.filter(event_type__in=HIDEvent.EventType.any_granted_access())
|
|
.with_member_id()
|
|
.values(unit_time=Trunc("timestamp", unit_time))
|
|
.annotate(
|
|
members=Count("member_id", distinct=True),
|
|
members_delta=(
|
|
F("members")
|
|
/ Window(
|
|
Lead(NullIf("members", 0.0)),
|
|
order_by="-unit_time",
|
|
output_field=FloatField(),
|
|
)
|
|
* 100
|
|
- 100
|
|
),
|
|
access_count=Count("cardholder_id"),
|
|
access_count_delta=(
|
|
F("access_count")
|
|
/ Window(
|
|
Lead(NullIf("access_count", 0.0)),
|
|
order_by="-unit_time",
|
|
output_field=FloatField(),
|
|
)
|
|
* 100
|
|
- 100
|
|
),
|
|
)
|
|
.order_by("-unit_time")
|
|
)
|
|
|
|
|
|
@register_report
|
|
class DeniedAccess(BaseAccessReport):
|
|
_report_name = "Denied Access"
|
|
table_class = DeniedAccessTable
|
|
|
|
def get_table_data(self):
|
|
denied_event_types = [
|
|
t for t in HIDEvent.EventType if t.name.startswith("DENIED_ACCESS")
|
|
]
|
|
return super().get_table_data().filter(event_type__in=denied_event_types)
|
|
|
|
|
|
@register_report
|
|
class MostActiveMembers(BaseAccessReport):
|
|
_report_name = "Most Active Members"
|
|
table_class = MostActiveMembersTable
|
|
|
|
def get_table_data(self):
|
|
return (
|
|
super()
|
|
.get_table_data()
|
|
.with_member_id()
|
|
.filter(member_id__isnull=False)
|
|
.values("member_id")
|
|
.annotate(
|
|
access_count=Count("member_id"),
|
|
name=StringAgg(
|
|
Func(Value(" "), "forename", "surname", function="concat_ws"),
|
|
", ",
|
|
distinct=True,
|
|
),
|
|
)
|
|
.order_by("-access_count")
|
|
)
|
|
|
|
|
|
@register_report
|
|
class DetailByDay(BaseAccessReport):
|
|
_report_name = "Detail by Day"
|
|
table_class = DetailByDayTable
|
|
|
|
def get_table_data(self):
|
|
return (
|
|
super()
|
|
.get_table_data()
|
|
.with_member_id()
|
|
.values("timestamp__date", "member_id")
|
|
.filter(member_id__isnull=False)
|
|
.annotate(
|
|
access_count=Count("member_id"),
|
|
granted_access_count=Count(
|
|
"member_id",
|
|
filter=Q(event_type__in=HIDEvent.EventType.any_granted_access()),
|
|
),
|
|
name=StringAgg(
|
|
Func(Value(" "), "forename", "surname", function="concat_ws"),
|
|
", ",
|
|
distinct=True,
|
|
),
|
|
)
|
|
.order_by("-timestamp__date")
|
|
)
|
|
|
|
|
|
@register_report
|
|
class BusiestDayOfWeek(BaseAccessReport):
|
|
_report_name = "Busiest Day of the Week"
|
|
table_pagination = False
|
|
table_class = BusiestDayOfWeekTable
|
|
|
|
def get_table_data(self):
|
|
return (
|
|
super()
|
|
.get_table_data()
|
|
.with_member_id()
|
|
.values("timestamp__week_day")
|
|
.annotate(
|
|
events=Count("timestamp"), members=Count("member_id", distinct=True)
|
|
)
|
|
)
|
|
|
|
|
|
@register_report
|
|
class BusiestTimeOfDay(BaseAccessReport):
|
|
_report_name = "Busiest Time of Day"
|
|
table_pagination = False
|
|
table_class = BusiestTimeOfDayTable
|
|
|
|
def get_table_data(self):
|
|
return (
|
|
super()
|
|
.get_table_data()
|
|
.with_member_id()
|
|
.values("timestamp__hour")
|
|
.annotate(
|
|
events=Count("timestamp"), members=Count("member_id", distinct=True)
|
|
)
|
|
)
|
|
|
|
|
|
def update_access_users() -> list[FullUser]:
|
|
access_client = AccessClient(
|
|
settings.UNIFI_ACCESS_HOST, settings.UNIFI_ACCESS_API_TOKEN, verify=False
|
|
)
|
|
return list(access_client.fetch_all_users__unpaged())
|
|
|
|
|
|
@login_required
|
|
@permission_required("doorcontrol.assign_nfc_card", raise_exception=True)
|
|
def assign_nfc_card_user_selector(request: HttpRequest):
|
|
template_name = "doorcontrol/assign_nfc_card_user_selector.dj.html"
|
|
task_group = "update_access_users"
|
|
|
|
all_users: list[FullUser] | None = None
|
|
refresh_task_id = None
|
|
update_users_results = q2_tasks.result_group(task_group, cached=True)
|
|
if (
|
|
update_users_results
|
|
and len(update_users_results) > 0
|
|
and not request.POST.get("force_refresh")
|
|
):
|
|
all_users = update_users_results[0]
|
|
else:
|
|
q2_tasks.delete_group(task_group)
|
|
refresh_task_id = q2_tasks.async_task(
|
|
update_access_users, group=task_group, cached=5 * 60
|
|
)
|
|
|
|
filtered_users = []
|
|
|
|
if request.method == "POST":
|
|
if refresh_task_id:
|
|
all_users = q2_tasks.result(refresh_task_id, wait=-1, cached=True)
|
|
|
|
template_name += "#results"
|
|
all_filtered_users = (
|
|
user
|
|
for user in all_users or []
|
|
if user.status == UserStatus.ACTIVE
|
|
and request.POST.get("search", "").lower() in user.full_name.lower()
|
|
)
|
|
filtered_users = list(itertools.islice(all_filtered_users, 10))
|
|
|
|
return render(request, template_name, {"users": filtered_users})
|
|
|
|
|
|
class AssignNfcCardStatus(BaseModel):
|
|
class ErrorEntry(BaseModel):
|
|
count: int
|
|
code: ResponseCode | None
|
|
msg: str
|
|
extra_details: str | None
|
|
|
|
session_id: NfcCardEnrollmentSessionId | None = None
|
|
last_status: NfcCardEnrollmentStatus | None = None
|
|
errors: list[ErrorEntry] = Field(default_factory=list)
|
|
card: NfcCard | None = None
|
|
user: User
|
|
|
|
def append_error(
|
|
self, error: UnifiAccessError, extra_details: str | None = None
|
|
) -> None:
|
|
if self.errors and self.errors[-1].code == error.code:
|
|
self.errors[-1].count += 1
|
|
else:
|
|
self.errors.append(
|
|
self.ErrorEntry(
|
|
count=1,
|
|
code=error.code,
|
|
msg=error.msg,
|
|
extra_details=extra_details,
|
|
)
|
|
)
|
|
|
|
def append_raw_error(self, msg: str, extra_details: str | None = None) -> None:
|
|
self.errors.append(
|
|
self.ErrorEntry(
|
|
count=1,
|
|
code=None,
|
|
msg=msg,
|
|
extra_details=extra_details,
|
|
)
|
|
)
|
|
|
|
|
|
class AssignNfcCardView(PermissionRequiredMixin, TemplateView):
|
|
# for storage in request.session
|
|
ENROLLMENT_STATUS_SESSION_KEY = "unifi_access_enrollment_status"
|
|
requires_permission = "doorcontrol.assign_nfc_card"
|
|
template_name = "doorcontrol/assign_nfc_card.dj.html"
|
|
|
|
def get_template_names(self) -> list[str]:
|
|
templates = super().get_template_names()
|
|
if (
|
|
self.request.method == "GET" and (part := self.request.GET.get("part"))
|
|
) or (
|
|
self.request.method == "POST" and (part := self.request.POST.get("part"))
|
|
):
|
|
return [f"{template_name}#{part}" for template_name in templates]
|
|
else:
|
|
return templates
|
|
|
|
def setup(self, request: HttpRequest, *args: Any, **kwargs: Any) -> None:
|
|
super().setup(request, *args, **kwargs)
|
|
|
|
self.access_client = AccessClient(
|
|
settings.UNIFI_ACCESS_HOST, settings.UNIFI_ACCESS_API_TOKEN, verify=False
|
|
)
|
|
|
|
try:
|
|
status = AssignNfcCardStatus.model_validate(
|
|
request.session.get(self.ENROLLMENT_STATUS_SESSION_KEY, "{}")
|
|
)
|
|
except ValidationError:
|
|
status = None
|
|
|
|
if status is None or status.user.id != self.kwargs["user_id"]:
|
|
try:
|
|
user = self.access_client.fetch_user(UserId(self.kwargs["user_id"]))
|
|
except UnifiAccessError as e:
|
|
if e.code == ResponseCode.USER_ACCOUNT_NOT_EXIST:
|
|
raise Http404(
|
|
"No account with that id exists in UniFi Access"
|
|
) from e
|
|
else:
|
|
raise e
|
|
status = AssignNfcCardStatus(user=user)
|
|
self.status = status
|
|
|
|
def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
|
|
self.request.session[self.ENROLLMENT_STATUS_SESSION_KEY] = (
|
|
self.status.model_dump()
|
|
)
|
|
return super().get_context_data(**kwargs) | self.status.model_dump()
|
|
|
|
def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
# poll an in-progress session
|
|
if self.status.session_id:
|
|
try:
|
|
self.status.last_status = self.access_client.fetch_enroll_card_status(
|
|
self.status.session_id
|
|
)
|
|
self.status.card = self.access_client.fetch_nfc_card(
|
|
self.status.last_status.token
|
|
)
|
|
|
|
self.access_client.remove_enrollment_session(self.status.session_id)
|
|
self.status.session_id = None
|
|
|
|
except UnifiAccessError as e:
|
|
match e.code:
|
|
case ResponseCode.CREDS_NFC_READ_SESSION_NOT_FOUND:
|
|
self.status.session_id = None
|
|
case ResponseCode.CREDS_NFC_READ_POLL_TOKEN_EMPTY:
|
|
# all is well, the reader just hasn't seen a card yet
|
|
pass
|
|
case ResponseCode.CREDS_NFC_CARD_IS_PROVISION:
|
|
self.status.session_id = None
|
|
self.status.append_error(
|
|
e,
|
|
"This card will need to be added by someone with admin access to the UniFi Access application",
|
|
)
|
|
case _:
|
|
self.status.append_error(e)
|
|
|
|
return super().get(request, *args, **kwargs)
|
|
|
|
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
|
|
if "assign" in request.POST:
|
|
if not self.status.last_status:
|
|
self.status.append_raw_error(
|
|
"Missing session status. Please start a new session and try again."
|
|
)
|
|
elif request.POST.get("id") != self.status.last_status.id:
|
|
self.status.append_raw_error(
|
|
"Mismatched session status. Please start a new session and try again."
|
|
)
|
|
else:
|
|
try:
|
|
self.access_client.assign_nfc_card_to_user(
|
|
self.status.user.id, self.status.last_status.token
|
|
)
|
|
self.status.card = self.access_client.fetch_nfc_card(
|
|
self.status.last_status.token
|
|
)
|
|
except UnifiAccessError as e:
|
|
self.status.append_error(e)
|
|
|
|
else:
|
|
# remove old session, if it exists
|
|
if self.status.session_id:
|
|
self.access_client.remove_enrollment_session(self.status.session_id)
|
|
|
|
# start a new session
|
|
self.status = AssignNfcCardStatus(user=self.status.user)
|
|
self.status.session_id = self.access_client.begin_enroll_card(
|
|
settings.UNIFI_ACCESS_CARD_ASSIGNMENT_DEVICE
|
|
).session_id
|
|
|
|
return super().get(request, *args, **kwargs)
|