import datetime import itertools from typing import TYPE_CHECKING, Any from django.conf import settings from django.contrib.auth.mixins import PermissionRequiredMixin from django.contrib.postgres.aggregates import StringAgg from django.core.exceptions import BadRequest from django.db.models import Count, F, FloatField, Func, Q, Value, Window from django.db.models.functions import Lead, NullIf, Trunc from django.http import Http404, HttpRequest, HttpResponse from django.shortcuts import render from django.urls import path, reverse_lazy from django.utils.text import slugify from django.views.generic import TemplateView from django.views.generic.list import ListView import django_filters import django_q.tasks as q2_tasks import django_tables2 as tables from django_filters.views import BaseFilterView from django_tables2 import SingleTableMixin from django_tables2.export.views import ExportMixin from pydantic import BaseModel, Field, ValidationError from unifi_access import AccessClient, ResponseCode, UnifiAccessError from unifi_access.schemas import ( FullUser, NfcCard, NfcCardEnrollmentSessionId, NfcCardEnrollmentStatus, User, UserId, UserStatus, ) from .models import Door, HIDEvent from .tables import ( BusiestDayOfWeekTable, BusiestTimeOfDayTable, DeniedAccessTable, DetailByDayTable, MostActiveMembersTable, UnitTimeTable, ) if TYPE_CHECKING: from django.core.paginator import Page REPORTS = [] def register_report(cls: "type[BaseAccessReport]"): REPORTS.append(cls) return cls class AccessReportFilterSet(django_filters.FilterSet): timestamp = django_filters.DateFromToRangeFilter() door = django_filters.ModelMultipleChoiceFilter( queryset=Door.objects.all(), distinct=False ) class BaseAccessReport( BaseFilterView, ExportMixin, SingleTableMixin, PermissionRequiredMixin, ListView ): model = HIDEvent permission_required = "doorcontrol.view_hidevent" paginate_by = 20 context_object_name = "object_list" template_name = "doorcontrol/access_report.dj.html" export_formats = ("csv", "xlsx", "ods") filterset_class = AccessReportFilterSet _report_name: str @classmethod def _report_types(cls): yield [ cls._report_name, reverse_lazy("doorcontrol:" + slugify(cls._report_name)), ] @classmethod def _urlpattern(cls): slug = slugify(cls._report_name) return path(f"reports/{slug}", cls.as_view(), name=slug) @property def export_name(self): return slugify(self._report_name) def _selected_report(self): return self._report_name def get_paginate_by(self, queryset) -> int | None: if "items_per_page" in self.request.GET: return int(self.request.GET["items_per_page"]) return super().get_paginate_by(queryset) def get_queryset(self): return super().get_queryset().select_related("door") def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["report_types"] = [ rt for report in REPORTS for rt in report._report_types() ] page: Page = context["page_obj"] context["paginator_range"] = page.paginator.get_elided_page_range(page.number) context["selected_report"] = self._selected_report() context["items_per_page"] = self.get_paginate_by(None) return context @register_report class AccessPerUnitTime(BaseAccessReport): table_class = UnitTimeTable UNIT_TIMES = ["day", "week", "month", "year"] @classmethod def _report_types(cls): for unit_time in cls.UNIT_TIMES: yield ( "Access per " + unit_time.title(), reverse_lazy("doorcontrol:access-per-unit-time", args=[unit_time]), ) @classmethod def _urlpattern(cls): return path( "reports/access-per-", cls.as_view(), name="access-per-unit-time", ) @property def _report_name(self): unit_time = self.kwargs["unit_time"] return "Access per " + unit_time.title() def _selected_report(self) -> str: return "Access per " + self.kwargs["unit_time"].title() def get_table_kwargs(self): unit_time = self.kwargs["unit_time"] if unit_time == "week": unit_time_column = tables.TemplateColumn( verbose_name=unit_time.title(), template_code=( "{{ value|date|default:default }} - " "{{ value|add:one_week|date|default:default }}" ), extra_context={"one_week": datetime.timedelta(weeks=1)}, ) else: if unit_time == "day": date_format = "DATE_FORMAT" elif unit_time == "month": date_format = "N Y" elif unit_time == "year": date_format = "Y" unit_time_column = tables.DateColumn( date_format, verbose_name=unit_time.title() ) return { "sequence": ("unit_time", "..."), "extra_columns": (("unit_time", unit_time_column),), } def get_table_data(self): unit_time = self.kwargs["unit_time"] if unit_time not in self.UNIT_TIMES: raise BadRequest("unit time must be one of day, week, month, or year") return ( super() .get_table_data() .filter(event_type__in=HIDEvent.EventType.any_granted_access()) .with_member_id() .values(unit_time=Trunc("timestamp", unit_time)) .annotate( members=Count("member_id", distinct=True), members_delta=( F("members") / Window( Lead(NullIf("members", 0.0)), order_by="-unit_time", output_field=FloatField(), ) * 100 - 100 ), access_count=Count("cardholder_id"), access_count_delta=( F("access_count") / Window( Lead(NullIf("access_count", 0.0)), order_by="-unit_time", output_field=FloatField(), ) * 100 - 100 ), ) .order_by("-unit_time") ) @register_report class DeniedAccess(BaseAccessReport): _report_name = "Denied Access" table_class = DeniedAccessTable def get_table_data(self): denied_event_types = [ t for t in HIDEvent.EventType if t.name.startswith("DENIED_ACCESS") ] return super().get_table_data().filter(event_type__in=denied_event_types) @register_report class MostActiveMembers(BaseAccessReport): _report_name = "Most Active Members" table_class = MostActiveMembersTable def get_table_data(self): return ( super() .get_table_data() .with_member_id() .filter(member_id__isnull=False) .values("member_id") .annotate( access_count=Count("member_id"), name=StringAgg( Func(Value(" "), "forename", "surname", function="concat_ws"), ", ", distinct=True, ), ) .order_by("-access_count") ) @register_report class DetailByDay(BaseAccessReport): _report_name = "Detail by Day" table_class = DetailByDayTable def get_table_data(self): return ( super() .get_table_data() .with_member_id() .values("timestamp__date", "member_id") .filter(member_id__isnull=False) .annotate( access_count=Count("member_id"), granted_access_count=Count( "member_id", filter=Q(event_type__in=HIDEvent.EventType.any_granted_access()), ), name=StringAgg( Func(Value(" "), "forename", "surname", function="concat_ws"), ", ", distinct=True, ), ) .order_by("-timestamp__date") ) @register_report class BusiestDayOfWeek(BaseAccessReport): _report_name = "Busiest Day of the Week" table_pagination = False table_class = BusiestDayOfWeekTable def get_table_data(self): return ( super() .get_table_data() .with_member_id() .values("timestamp__week_day") .annotate( events=Count("timestamp"), members=Count("member_id", distinct=True) ) ) @register_report class BusiestTimeOfDay(BaseAccessReport): _report_name = "Busiest Time of Day" table_pagination = False table_class = BusiestTimeOfDayTable def get_table_data(self): return ( super() .get_table_data() .with_member_id() .values("timestamp__hour") .annotate( events=Count("timestamp"), members=Count("member_id", distinct=True) ) ) def update_access_users() -> list[FullUser]: access_client = AccessClient( settings.UNIFI_ACCESS_HOST, settings.UNIFI_ACCESS_API_TOKEN, verify=False ) return list(access_client.fetch_all_users__unpaged()) def assign_nfc_card_user_selector(request: HttpRequest): template_name = "doorcontrol/assign_nfc_card_user_selector.dj.html" task_group = "update_access_users" all_users: list[FullUser] | None = None refresh_task_id = None update_users_results = q2_tasks.result_group(task_group, cached=True) if ( update_users_results and len(update_users_results) > 0 and not request.POST.get("force_refresh") ): all_users = update_users_results[0] else: q2_tasks.delete_group(task_group) refresh_task_id = q2_tasks.async_task( update_access_users, group=task_group, cached=5 * 60 ) filtered_users = [] if request.method == "POST": if refresh_task_id: all_users = q2_tasks.result(refresh_task_id, wait=-1, cached=True) template_name += "#results" all_filtered_users = ( user for user in all_users or [] if user.status == UserStatus.ACTIVE and request.POST.get("search", "").lower() in user.full_name.lower() ) filtered_users = list(itertools.islice(all_filtered_users, 10)) return render(request, template_name, {"users": filtered_users}) class AssignNfcCardStatus(BaseModel): class ErrorEntry(BaseModel): count: int code: ResponseCode | None msg: str extra_details: str | None session_id: NfcCardEnrollmentSessionId | None = None last_status: NfcCardEnrollmentStatus | None = None errors: list[ErrorEntry] = Field(default_factory=list) card: NfcCard | None = None user: User def append_error( self, error: UnifiAccessError, extra_details: str | None = None ) -> None: if self.errors and self.errors[-1].code == error.code: self.errors[-1].count += 1 else: self.errors.append( self.ErrorEntry( count=1, code=error.code, msg=error.msg, extra_details=extra_details, ) ) def append_raw_error(self, msg: str, extra_details: str | None = None) -> None: self.errors.append( self.ErrorEntry( count=1, code=None, msg=msg, extra_details=extra_details, ) ) class AssignNfcCardView(TemplateView): # for storage in request.session ENROLLMENT_STATUS_SESSION_KEY = "unifi_access_enrollment_status" template_name = "doorcontrol/assign_nfc_card.dj.html" def get_template_names(self) -> list[str]: templates = super().get_template_names() if ( self.request.method == "GET" and (part := self.request.GET.get("part")) ) or ( self.request.method == "POST" and (part := self.request.POST.get("part")) ): return [f"{template_name}#{part}" for template_name in templates] else: return templates def setup(self, request: HttpRequest, *args: Any, **kwargs: Any) -> None: super().setup(request, *args, **kwargs) self.access_client = AccessClient( settings.UNIFI_ACCESS_HOST, settings.UNIFI_ACCESS_API_TOKEN, verify=False ) try: status = AssignNfcCardStatus.model_validate( request.session.get(self.ENROLLMENT_STATUS_SESSION_KEY, "{}") ) except ValidationError: status = None if status is None or status.user.id != self.kwargs["user_id"]: try: user = self.access_client.fetch_user(UserId(self.kwargs["user_id"])) except UnifiAccessError as e: if e.code == ResponseCode.USER_ACCOUNT_NOT_EXIST: raise Http404( "No account with that id exists in UniFi Access" ) from e else: raise e status = AssignNfcCardStatus(user=user) self.status = status def get_context_data(self, **kwargs: Any) -> dict[str, Any]: self.request.session[self.ENROLLMENT_STATUS_SESSION_KEY] = ( self.status.model_dump() ) return super().get_context_data(**kwargs) | self.status.model_dump() def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: # poll an in-progress session if self.status.session_id: try: self.status.last_status = self.access_client.fetch_enroll_card_status( self.status.session_id ) self.status.card = self.access_client.fetch_nfc_card( self.status.last_status.token ) self.access_client.remove_enrollment_session(self.status.session_id) self.status.session_id = None except UnifiAccessError as e: match e.code: case ResponseCode.CREDS_NFC_READ_SESSION_NOT_FOUND: self.status.session_id = None case ResponseCode.CREDS_NFC_READ_POLL_TOKEN_EMPTY: # all is well, the reader just hasn't seen a card yet pass case ResponseCode.CREDS_NFC_CARD_IS_PROVISION: self.status.session_id = None self.status.append_error( e, "This card will need to be added by someone with admin access to the UniFi Access application", ) case _: self.status.append_error(e) return super().get(request, *args, **kwargs) def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: if "assign" in request.POST: if not self.status.last_status: self.status.append_raw_error( "Missing session status. Please start a new session and try again." ) elif request.POST.get("id") != self.status.last_status.id: self.status.append_raw_error( "Mismatched session status. Please start a new session and try again." ) else: try: self.access_client.assign_nfc_card_to_user( self.status.user.id, self.status.last_status.token ) self.status.card = self.access_client.fetch_nfc_card( self.status.last_status.token ) except UnifiAccessError as e: self.status.append_error(e) else: # remove old session, if it exists if self.status.session_id: self.access_client.remove_enrollment_session(self.status.session_id) # start a new session self.status = AssignNfcCardStatus(user=self.status.user) self.status.session_id = self.access_client.begin_enroll_card( settings.UNIFI_ACCESS_CARD_ASSIGNMENT_DEVICE ).session_id return super().get(request, *args, **kwargs)