297 lines
8.7 KiB
Python

import datetime
from typing import TYPE_CHECKING
from django.contrib.auth.mixins import PermissionRequiredMixin
from django.contrib.postgres.aggregates import StringAgg
from django.core.exceptions import BadRequest
from django.db.models import Count, F, FloatField, Func, Q, Value, Window
from django.db.models.functions import Lead, NullIf, Trunc
from django.urls import path, reverse_lazy
from django.utils.text import slugify
from django.views.generic.list import ListView
import django_filters
import django_tables2 as tables
from django_filters.views import BaseFilterView
from django_tables2 import SingleTableMixin
from django_tables2.export.views import ExportMixin
from .models import Door, HIDEvent
from .tables import (
BusiestDayOfWeekTable,
BusiestTimeOfDayTable,
DeniedAccessTable,
DetailByDayTable,
MostActiveMembersTable,
UnitTimeTable,
)
if TYPE_CHECKING:
from django.core.paginator import Page
REPORTS = []
def register_report(cls: "type[BaseAccessReport]"):
REPORTS.append(cls)
return cls
class AccessReportFilterSet(django_filters.FilterSet):
timestamp = django_filters.DateFromToRangeFilter()
door = django_filters.ModelMultipleChoiceFilter(
queryset=Door.objects.all(), distinct=False
)
class BaseAccessReport(
BaseFilterView, ExportMixin, SingleTableMixin, PermissionRequiredMixin, ListView
):
model = HIDEvent
permission_required = "doorcontrol.view_hidevent"
paginate_by = 20
context_object_name = "object_list"
template_name = "doorcontrol/access_report.dj.html"
export_formats = ("csv", "xlsx", "ods")
filterset_class = AccessReportFilterSet
_report_name: str
@classmethod
def _report_types(cls):
yield [
cls._report_name,
reverse_lazy("doorcontrol:" + slugify(cls._report_name)),
]
@classmethod
def _urlpattern(cls):
slug = slugify(cls._report_name)
return path(f"reports/{slug}", cls.as_view(), name=slug)
@property
def export_name(self):
return slugify(self._report_name)
def _selected_report(self):
return self._report_name
def get_paginate_by(self, queryset) -> int | None:
if "items_per_page" in self.request.GET:
return int(self.request.GET["items_per_page"])
return super().get_paginate_by(queryset)
def get_queryset(self):
return super().get_queryset().select_related("door")
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["report_types"] = [
rt for report in REPORTS for rt in report._report_types()
]
page: Page = context["page_obj"]
context["paginator_range"] = page.paginator.get_elided_page_range(page.number)
context["selected_report"] = self._selected_report()
context["items_per_page"] = self.get_paginate_by(None)
return context
@register_report
class AccessPerUnitTime(BaseAccessReport):
table_class = UnitTimeTable
UNIT_TIMES = ["day", "week", "month", "year"]
@classmethod
def _report_types(cls):
for unit_time in cls.UNIT_TIMES:
yield (
"Access per " + unit_time.title(),
reverse_lazy("doorcontrol:access-per-unit-time", args=[unit_time]),
)
@classmethod
def _urlpattern(cls):
return path(
"reports/access-per-<unit_time>",
cls.as_view(),
name="access-per-unit-time",
)
@property
def _report_name(self):
unit_time = self.kwargs["unit_time"]
return "Access per " + unit_time.title()
def _selected_report(self) -> str:
return "Access per " + self.kwargs["unit_time"].title()
def get_table_kwargs(self):
unit_time = self.kwargs["unit_time"]
if unit_time == "week":
unit_time_column = tables.TemplateColumn(
verbose_name=unit_time.title(),
template_code=(
"{{ value|date|default:default }} - "
"{{ value|add:one_week|date|default:default }}"
),
extra_context={"one_week": datetime.timedelta(weeks=1)},
)
else:
if unit_time == "day":
date_format = "DATE_FORMAT"
elif unit_time == "month":
date_format = "N Y"
elif unit_time == "year":
date_format = "Y"
unit_time_column = tables.DateColumn(
date_format, verbose_name=unit_time.title()
)
return {
"sequence": ("unit_time", "..."),
"extra_columns": (("unit_time", unit_time_column),),
}
def get_table_data(self):
unit_time = self.kwargs["unit_time"]
if unit_time not in self.UNIT_TIMES:
raise BadRequest("unit time must be one of day, week, month, or year")
return (
super()
.get_table_data()
.filter(event_type__in=HIDEvent.EventType.any_granted_access())
.with_member_id()
.values(unit_time=Trunc("timestamp", unit_time))
.annotate(
members=Count("member_id", distinct=True),
members_delta=(
F("members")
/ Window(
Lead(NullIf("members", 0.0)),
order_by="-unit_time",
output_field=FloatField(),
)
* 100
- 100
),
access_count=Count("cardholder_id"),
access_count_delta=(
F("access_count")
/ Window(
Lead(NullIf("access_count", 0.0)),
order_by="-unit_time",
output_field=FloatField(),
)
* 100
- 100
),
)
.order_by("-unit_time")
)
@register_report
class DeniedAccess(BaseAccessReport):
_report_name = "Denied Access"
table_class = DeniedAccessTable
def get_table_data(self):
denied_event_types = [
t for t in HIDEvent.EventType if t.name.startswith("DENIED_ACCESS")
]
return super().get_table_data().filter(event_type__in=denied_event_types)
@register_report
class MostActiveMembers(BaseAccessReport):
_report_name = "Most Active Members"
table_class = MostActiveMembersTable
def get_table_data(self):
return (
super()
.get_table_data()
.with_member_id()
.filter(member_id__isnull=False)
.values("member_id")
.annotate(
access_count=Count("member_id"),
name=StringAgg(
Func(Value(" "), "forename", "surname", function="concat_ws"),
", ",
distinct=True,
),
)
.order_by("-access_count")
)
@register_report
class DetailByDay(BaseAccessReport):
_report_name = "Detail by Day"
table_class = DetailByDayTable
def get_table_data(self):
return (
super()
.get_table_data()
.with_member_id()
.values("timestamp__date", "member_id")
.filter(member_id__isnull=False)
.annotate(
access_count=Count("member_id"),
granted_access_count=Count(
"member_id",
filter=Q(event_type__in=HIDEvent.EventType.any_granted_access()),
),
name=StringAgg(
Func(Value(" "), "forename", "surname", function="concat_ws"),
", ",
distinct=True,
),
)
.order_by("-timestamp__date")
)
@register_report
class BusiestDayOfWeek(BaseAccessReport):
_report_name = "Busiest Day of the Week"
table_pagination = False
table_class = BusiestDayOfWeekTable
def get_table_data(self):
return (
super()
.get_table_data()
.with_member_id()
.values("timestamp__week_day")
.annotate(
events=Count("timestamp"), members=Count("member_id", distinct=True)
)
)
@register_report
class BusiestTimeOfDay(BaseAccessReport):
_report_name = "Busiest Time of Day"
table_pagination = False
table_class = BusiestTimeOfDayTable
def get_table_data(self):
return (
super()
.get_table_data()
.with_member_id()
.values("timestamp__hour")
.annotate(
events=Count("timestamp"), members=Count("member_id", distinct=True)
)
)