import datetime from typing import TYPE_CHECKING from django.contrib.auth.mixins import PermissionRequiredMixin from django.core.exceptions import BadRequest from django.db.models import Count, F, FloatField, Q, Window from django.db.models.functions import Lead, Trunc from django.urls import path, reverse_lazy from django.utils.text import slugify from django.views.generic.list import ListView import django_filters import django_tables2 as tables from django_filters.views import BaseFilterView from django_mysql.models.aggregates import GroupConcat from django_mysql.models.functions import ConcatWS from django_tables2 import SingleTableMixin from django_tables2.export.views import ExportMixin from .models import Door, HIDEvent from .tables import ( BusiestDayOfWeekTable, BusiestTimeOfDayTable, DeniedAccessTable, DetailByDayTable, MostActiveMembersTable, UnitTimeTable, ) if TYPE_CHECKING: from django.core.paginator import Page REPORTS = [] def register_report(cls: "type[BaseAccessReport]"): REPORTS.append(cls) return cls class AccessReportFilterSet(django_filters.FilterSet): timestamp = django_filters.DateFromToRangeFilter() door = django_filters.ModelMultipleChoiceFilter( queryset=Door.objects.all(), distinct=False ) class BaseAccessReport( BaseFilterView, ExportMixin, SingleTableMixin, PermissionRequiredMixin, ListView ): model = HIDEvent permission_required = "doorcontrol.view_hidevent" paginate_by = 20 context_object_name = "object_list" template_name = "doorcontrol/access_report.dj.html" export_formats = ("csv", "xlsx", "ods") filterset_class = AccessReportFilterSet _report_name: str @classmethod def _report_types(cls): yield [ cls._report_name, reverse_lazy("doorcontrol:" + slugify(cls._report_name)), ] @classmethod def _urlpattern(cls): slug = slugify(cls._report_name) return path(f"reports/{slug}", cls.as_view(), name=slug) @property def export_name(self): return slugify(self._report_name) def _selected_report(self): return self._report_name def get_paginate_by(self, queryset) -> int | None: if "items_per_page" in self.request.GET: return int(self.request.GET["items_per_page"]) return super().get_paginate_by(queryset) def get_queryset(self): return super().get_queryset().select_related("door") def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["report_types"] = [ rt for report in REPORTS for rt in report._report_types() ] page: Page = context["page_obj"] context["paginator_range"] = page.paginator.get_elided_page_range(page.number) context["selected_report"] = self._selected_report() context["items_per_page"] = self.get_paginate_by(None) query_params = self.request.GET.copy() if "page" in query_params: query_params.pop("page") context["query_params"] = query_params.urlencode() return context @register_report class AccessPerUnitTime(BaseAccessReport): table_class = UnitTimeTable UNIT_TIMES = ["day", "week", "month", "year"] @classmethod def _report_types(cls): for unit_time in cls.UNIT_TIMES: yield ( "Access per " + unit_time.title(), reverse_lazy("doorcontrol:access-per-unit-time", args=[unit_time]), ) @classmethod def _urlpattern(cls): return path( "reports/access-per-", cls.as_view(), name="access-per-unit-time", ) @property def _report_name(self): unit_time = self.kwargs["unit_time"] return "Access per " + unit_time.title() def _selected_report(self) -> str: return "Access per " + self.kwargs["unit_time"].title() def get_table_kwargs(self): unit_time = self.kwargs["unit_time"] if unit_time == "week": unit_time_column = tables.TemplateColumn( verbose_name=unit_time.title(), template_code=( "{{ value|date|default:default }} - " "{{ value|add:one_week|date|default:default }}" ), extra_context={"one_week": datetime.timedelta(weeks=1)}, ) else: if unit_time == "day": date_format = "DATE_FORMAT" elif unit_time == "month": date_format = "N Y" elif unit_time == "year": date_format = "Y" unit_time_column = tables.DateColumn( date_format, verbose_name=unit_time.title() ) return { "sequence": ("unit_time", "..."), "extra_columns": (("unit_time", unit_time_column),), } def get_table_data(self): unit_time = self.kwargs["unit_time"] if unit_time not in self.UNIT_TIMES: raise BadRequest("unit time must be one of day, week, month, or year") return ( super() .get_table_data() .filter(event_type__in=HIDEvent.EventType.any_granted_access()) .with_member_id() .values(unit_time=Trunc("timestamp", unit_time)) .annotate( members=Count("member_id", distinct=True), members_delta=( F("members") / Window( Lead("members"), order_by="-unit_time", output_field=FloatField(), ) * 100 - 100 ), access_count=Count("cardholder_id"), access_count_delta=( F("access_count") / Window( Lead("access_count"), order_by="-unit_time", output_field=FloatField(), ) * 100 - 100 ), ) .order_by("-unit_time") ) @register_report class DeniedAccess(BaseAccessReport): _report_name = "Denied Access" table_class = DeniedAccessTable def get_table_data(self): denied_event_types = [ t for t in HIDEvent.EventType if t.name.startswith("DENIED_ACCESS") ] return ( super() .get_table_data() .filter(event_type__in=denied_event_types) .with_decoded_card_number() ) @register_report class MostActiveMembers(BaseAccessReport): _report_name = "Most Active Members" table_class = MostActiveMembersTable def get_table_data(self): return ( super() .get_table_data() .with_member_id() .filter(member_id__isnull=False) .values("member_id") .annotate( access_count=Count("member_id"), name=GroupConcat( ConcatWS("forename", "surname", separator=" "), distinct=True ), ) .order_by("-access_count") ) @register_report class DetailByDay(BaseAccessReport): _report_name = "Detail by Day" table_class = DetailByDayTable def get_table_data(self): return ( super() .get_table_data() .with_member_id() .values("timestamp__date", "member_id") .filter(member_id__isnull=False) .annotate( access_count=Count("member_id"), granted_access_count=Count( "member_id", filter=Q(event_type__in=HIDEvent.EventType.any_granted_access()), ), name=GroupConcat( ConcatWS("forename", "surname", separator=" "), distinct=True ), ) .order_by("-timestamp__date") ) @register_report class BusiestDayOfWeek(BaseAccessReport): _report_name = "Busiest Day of the Week" table_pagination = False table_class = BusiestDayOfWeekTable def get_table_data(self): return ( super() .get_table_data() .with_member_id() .values("timestamp__week_day") .annotate( events=Count("timestamp"), members=Count("member_id", distinct=True) ) ) @register_report class BusiestTimeOfDay(BaseAccessReport): _report_name = "Busiest Time of Day" table_pagination = False table_class = BusiestTimeOfDayTable def get_table_data(self): return ( super() .get_table_data() .with_member_id() .values("timestamp__hour") .annotate( events=Count("timestamp"), members=Count("member_id", distinct=True) ) )