import datetime import itertools from typing import TYPE_CHECKING, Any from django.conf import settings from django.contrib.auth.decorators import login_required, permission_required from django.contrib.auth.mixins import PermissionRequiredMixin from django.contrib.postgres.aggregates import StringAgg from django.core.exceptions import BadRequest from django.db.models import Count, F, FloatField, Func, Q, Value, Window from django.db.models.functions import Lead, NullIf, Trunc from django.http import Http404, HttpRequest, HttpResponse from django.shortcuts import render from django.urls import path, reverse_lazy from django.utils.text import slugify from django.views.generic import TemplateView from django.views.generic.list import ListView import django_filters import django_q.tasks as q2_tasks import django_tables2 as tables from django_filters.views import BaseFilterView from django_q.signing import BadSignature from django_tables2 import SingleTableMixin from django_tables2.export.views import ExportMixin from pydantic import BaseModel, Field, ValidationError from unifi_access import AccessClient, ResponseCode, UnifiAccessError from unifi_access.schemas import ( FullUser, NfcCard, NfcCardEnrollmentSessionId, NfcCardEnrollmentStatus, User, UserId, UserStatus, ) from doorcontrol.forms import AssignedNfcCardsReportFilters from doorcontrol.tasks.update_unifi_access import update_access_users from membershipworks.models import Member from .models import Door, HIDEvent from .tables import ( AssignedNfcCardsTable, BusiestDayOfWeekTable, BusiestTimeOfDayTable, DeniedAccessTable, DetailByDayTable, MostActiveMembersTable, UnitTimeTable, ) if TYPE_CHECKING: from django.core.paginator import Page REPORTS = [] def register_report(cls: "type[BaseAccessReport]"): REPORTS.append(cls) return cls class AccessReportFilterSet(django_filters.FilterSet): timestamp = django_filters.DateFromToRangeFilter() door = django_filters.ModelMultipleChoiceFilter( queryset=Door.objects.all(), distinct=False ) class BaseAccessReport( BaseFilterView, ExportMixin, SingleTableMixin, PermissionRequiredMixin, ListView ): model = HIDEvent permission_required = "doorcontrol.view_hidevent" paginate_by = 20 context_object_name = "object_list" template_name = "doorcontrol/access_report.dj.html" export_formats = ("csv", "xlsx", "ods") filterset_class = AccessReportFilterSet _report_name: str @classmethod def _report_types(cls): yield [ cls._report_name, reverse_lazy("doorcontrol:" + slugify(cls._report_name)), ] @classmethod def _urlpattern(cls): slug = slugify(cls._report_name) return path(f"reports/{slug}", cls.as_view(), name=slug) @property def export_name(self): return slugify(self._report_name) def _selected_report(self): return self._report_name def get_paginate_by(self, queryset) -> int | None: if "items_per_page" in self.request.GET: return int(self.request.GET["items_per_page"]) return super().get_paginate_by(queryset) def get_queryset(self): return super().get_queryset().select_related("door") def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["report_types"] = [ rt for report in REPORTS for rt in report._report_types() ] page: Page = context["page_obj"] context["paginator_range"] = page.paginator.get_elided_page_range(page.number) context["selected_report"] = self._selected_report() context["items_per_page"] = self.get_paginate_by(None) return context @register_report class AccessPerUnitTime(BaseAccessReport): table_class = UnitTimeTable UNIT_TIMES = ["day", "week", "month", "year"] @classmethod def _report_types(cls): for unit_time in cls.UNIT_TIMES: yield ( "Access per " + unit_time.title(), reverse_lazy("doorcontrol:access-per-unit-time", args=[unit_time]), ) @classmethod def _urlpattern(cls): return path( "reports/access-per-", cls.as_view(), name="access-per-unit-time", ) @property def _report_name(self): unit_time = self.kwargs["unit_time"] return "Access per " + unit_time.title() def _selected_report(self) -> str: return "Access per " + self.kwargs["unit_time"].title() def get_table_kwargs(self): unit_time = self.kwargs["unit_time"] if unit_time == "week": unit_time_column = tables.TemplateColumn( verbose_name=unit_time.title(), template_code=( "{{ value|date|default:default }} - " "{{ value|add:one_week|date|default:default }}" ), extra_context={"one_week": datetime.timedelta(weeks=1)}, ) else: if unit_time == "day": date_format = "DATE_FORMAT" elif unit_time == "month": date_format = "N Y" elif unit_time == "year": date_format = "Y" unit_time_column = tables.DateColumn( date_format, verbose_name=unit_time.title() ) return { "sequence": ("unit_time", "..."), "extra_columns": (("unit_time", unit_time_column),), } def get_table_data(self): unit_time = self.kwargs["unit_time"] if unit_time not in self.UNIT_TIMES: raise BadRequest("unit time must be one of day, week, month, or year") return ( super() .get_table_data() .filter(event_type__in=HIDEvent.EventType.any_granted_access()) .with_member_id() .values(unit_time=Trunc("timestamp", unit_time)) .annotate( members=Count("member_id", distinct=True), members_delta=( F("members") / Window( Lead(NullIf("members", 0.0)), order_by="-unit_time", output_field=FloatField(), ) * 100 - 100 ), access_count=Count("cardholder_id"), access_count_delta=( F("access_count") / Window( Lead(NullIf("access_count", 0.0)), order_by="-unit_time", output_field=FloatField(), ) * 100 - 100 ), ) .order_by("-unit_time") ) @register_report class DeniedAccess(BaseAccessReport): _report_name = "Denied Access" table_class = DeniedAccessTable def get_table_data(self): denied_event_types = [ t for t in HIDEvent.EventType if t.name.startswith("DENIED_ACCESS") ] return super().get_table_data().filter(event_type__in=denied_event_types) @register_report class MostActiveMembers(BaseAccessReport): _report_name = "Most Active Members" table_class = MostActiveMembersTable def get_table_data(self): return ( super() .get_table_data() .with_member_id() .filter(member_id__isnull=False) .values("member_id") .annotate( access_count=Count("member_id"), name=StringAgg( Func(Value(" "), "forename", "surname", function="concat_ws"), ", ", distinct=True, ), ) .order_by("-access_count") ) @register_report class DetailByDay(BaseAccessReport): _report_name = "Detail by Day" table_class = DetailByDayTable def get_table_data(self): return ( super() .get_table_data() .with_member_id() .values("timestamp__date", "member_id") .filter(member_id__isnull=False) .annotate( access_count=Count("member_id"), granted_access_count=Count( "member_id", filter=Q(event_type__in=HIDEvent.EventType.any_granted_access()), ), name=StringAgg( Func(Value(" "), "forename", "surname", function="concat_ws"), ", ", distinct=True, ), ) .order_by("-timestamp__date") ) @register_report class BusiestDayOfWeek(BaseAccessReport): _report_name = "Busiest Day of the Week" table_pagination = False table_class = BusiestDayOfWeekTable def get_table_data(self): return ( super() .get_table_data() .with_member_id() .values("timestamp__week_day") .annotate( events=Count("timestamp"), members=Count("member_id", distinct=True) ) ) @register_report class BusiestTimeOfDay(BaseAccessReport): _report_name = "Busiest Time of Day" table_pagination = False table_class = BusiestTimeOfDayTable def get_table_data(self): return ( super() .get_table_data() .with_member_id() .values("timestamp__hour") .annotate( events=Count("timestamp"), members=Count("member_id", distinct=True) ) ) def fetch_access_users_results(force_refresh: bool = False) -> list[FullUser] | None: task_group = update_access_users.q_task_group try: if force_refresh: q2_tasks.delete_group(task_group) refresh_task_id = q2_tasks.async_task( update_access_users, group=task_group, cached=5 * 60 ) return q2_tasks.result(refresh_task_id, wait=-1, cached=True) update_users_results = q2_tasks.result_group(task_group, cached=True) if update_users_results and len(update_users_results) > 0: return update_users_results[0] # TODO: this could be better except BadSignature: return None @login_required @permission_required("doorcontrol.assign_nfc_card", raise_exception=True) def assign_nfc_card_user_selector(request: HttpRequest): template_name = "doorcontrol/assign_nfc_card_user_selector.dj.html" filtered_users = [] if request.method == "POST": all_users = fetch_access_users_results( request.POST.get("force_refresh") == "true" ) template_name += "#results" all_filtered_users = ( user for user in all_users or [] if user.status == UserStatus.ACTIVE and request.POST.get("search", "").lower() in user.full_name.lower() ) filtered_users = list(itertools.islice(all_filtered_users, 10)) return render(request, template_name, {"users": filtered_users}) class AssignNfcCardStatus(BaseModel): class ErrorEntry(BaseModel): count: int code: ResponseCode | None msg: str extra_details: str | None session_id: NfcCardEnrollmentSessionId | None = None last_status: NfcCardEnrollmentStatus | None = None errors: list[ErrorEntry] = Field(default_factory=list) card: NfcCard | None = None user: User def append_error( self, error: UnifiAccessError, extra_details: str | None = None ) -> None: if self.errors and self.errors[-1].code == error.code: self.errors[-1].count += 1 else: self.errors.append( self.ErrorEntry( count=1, code=error.code, msg=error.msg, extra_details=extra_details, ) ) def append_raw_error(self, msg: str, extra_details: str | None = None) -> None: self.errors.append( self.ErrorEntry( count=1, code=None, msg=msg, extra_details=extra_details, ) ) class AssignNfcCardView(PermissionRequiredMixin, TemplateView): # for storage in request.session ENROLLMENT_STATUS_SESSION_KEY = "unifi_access_enrollment_status" permission_required = "doorcontrol.assign_nfc_card" template_name = "doorcontrol/assign_nfc_card.dj.html" def get_template_names(self) -> list[str]: templates = super().get_template_names() if ( self.request.method == "GET" and (part := self.request.GET.get("part")) ) or ( self.request.method == "POST" and (part := self.request.POST.get("part")) ): return [f"{template_name}#{part}" for template_name in templates] else: return templates def setup(self, request: HttpRequest, *args: Any, **kwargs: Any) -> None: super().setup(request, *args, **kwargs) self.access_client = AccessClient( settings.UNIFI_ACCESS_HOST, settings.UNIFI_ACCESS_API_TOKEN, verify=False ) try: status = AssignNfcCardStatus.model_validate( request.session.get(self.ENROLLMENT_STATUS_SESSION_KEY, "{}") ) except ValidationError: status = None if status is None or status.user.id != self.kwargs["user_id"]: try: user = self.access_client.fetch_user(UserId(self.kwargs["user_id"])) except UnifiAccessError as e: if e.code == ResponseCode.USER_ACCOUNT_NOT_EXIST: raise Http404( "No account with that id exists in UniFi Access" ) from e else: raise e status = AssignNfcCardStatus(user=user) self.status = status def get_context_data(self, **kwargs: Any) -> dict[str, Any]: self.request.session[self.ENROLLMENT_STATUS_SESSION_KEY] = ( self.status.model_dump() ) return super().get_context_data(**kwargs) | self.status.model_dump() def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: # poll an in-progress session if self.status.session_id: try: self.status.last_status = self.access_client.fetch_enroll_card_status( self.status.session_id ) self.status.card = self.access_client.fetch_nfc_card( self.status.last_status.token ) self.access_client.remove_enrollment_session(self.status.session_id) self.status.session_id = None except UnifiAccessError as e: match e.code: case ResponseCode.CREDS_NFC_READ_SESSION_NOT_FOUND: self.status.session_id = None case ResponseCode.CREDS_NFC_READ_POLL_TOKEN_EMPTY: # all is well, the reader just hasn't seen a card yet pass case ResponseCode.CREDS_NFC_CARD_IS_PROVISION: self.status.session_id = None self.status.append_error( e, "This card will need to be added by someone with admin access to the UniFi Access application", ) case _: self.status.append_error(e) return super().get(request, *args, **kwargs) def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: if "assign" in request.POST: if not self.status.last_status: self.status.append_raw_error( "Missing session status. Please start a new session and try again." ) elif request.POST.get("id") != self.status.last_status.id: self.status.append_raw_error( "Mismatched session status. Please start a new session and try again." ) else: try: self.access_client.assign_nfc_card_to_user( self.status.user.id, self.status.last_status.token ) self.status.card = self.access_client.fetch_nfc_card( self.status.last_status.token ) except UnifiAccessError as e: self.status.append_error(e) else: # remove old session, if it exists if self.status.session_id: self.access_client.remove_enrollment_session(self.status.session_id) # start a new session self.status = AssignNfcCardStatus(user=self.status.user) self.status.session_id = self.access_client.begin_enroll_card( settings.UNIFI_ACCESS_CARD_ASSIGNMENT_DEVICE ).session_id return super().get(request, *args, **kwargs) class AssignedNfcCardsReport( ExportMixin, SingleTableMixin, PermissionRequiredMixin, TemplateView ): permission_required = "doorcontrol.assign_nfc_card" template_name = "doorcontrol/assigned_nfc_cards_report.dj.html" table_class = AssignedNfcCardsTable export_formats = ("csv", "xlsx", "ods") def get_context_data(self, **kwargs: Any) -> dict[str, Any]: if "form" not in kwargs: kwargs["form"] = AssignedNfcCardsReportFilters(self.request.GET) return super().get_context_data(**kwargs) def get_table_data(self): access_users = fetch_access_users_results( force_refresh=("refresh" in self.request.GET) ) if access_users: access_users_by_employee_number = { user.employee_number: user for user in access_users } else: access_users_by_employee_number = {} form = AssignedNfcCardsReportFilters(self.request.GET) def get_filtered_members(): members = Member.objects.with_is_active().filter(is_active=True) if form.is_valid() and form.cleaned_data["has_mw_nfc_card"] is not None: members = members.alias( has_nfc_card_number=( Q(nfc_card_number__isnull=False) & ~Q(nfc_card_number="") ) ).filter(has_nfc_card_number=form.cleaned_data["has_mw_nfc_card"]) for member in members.all(): access_user = access_users_by_employee_number.get(member.uid, None) if ( form.is_valid() and form.cleaned_data["has_access_nfc_card"] is not None and access_user and ( bool(access_user.nfc_cards) != form.cleaned_data["has_access_nfc_card"] ) ): continue yield {"member": member, "access_user": access_user} return list(get_filtered_members())