300 lines
8.7 KiB
Python
300 lines
8.7 KiB
Python
import datetime
|
|
|
|
from django.contrib.auth.mixins import PermissionRequiredMixin
|
|
from django.core.exceptions import BadRequest
|
|
from django.core.paginator import Page
|
|
from django.db.models import Count, F, FloatField, Window
|
|
from django.db.models.functions import Lead, Trunc
|
|
from django.urls import path, reverse_lazy
|
|
from django.utils.text import slugify
|
|
from django.views.generic.list import ListView
|
|
|
|
import django_filters
|
|
import django_tables2 as tables
|
|
from django_filters.views import BaseFilterView
|
|
from django_mysql.models import GroupConcat
|
|
from django_mysql.models.functions import ConcatWS
|
|
from django_tables2 import SingleTableMixin
|
|
from django_tables2.export.views import ExportMixin
|
|
|
|
from .models import Door, HIDEvent
|
|
from .tables import (
|
|
BusiestDayOfWeekTable,
|
|
BusiestTimeOfDayTable,
|
|
DeniedAccessTable,
|
|
DetailByDayTable,
|
|
MostActiveMembersTable,
|
|
UnitTimeTable,
|
|
)
|
|
|
|
REPORTS = []
|
|
|
|
|
|
def register_report(cls: "BaseAccessReport"):
|
|
REPORTS.append(cls)
|
|
return cls
|
|
|
|
|
|
class AccessReportFilterSet(django_filters.FilterSet):
|
|
timestamp = django_filters.DateFromToRangeFilter()
|
|
door = django_filters.ModelMultipleChoiceFilter(
|
|
queryset=Door.objects.all(), distinct=False
|
|
)
|
|
|
|
|
|
class BaseAccessReport(
|
|
BaseFilterView, ExportMixin, SingleTableMixin, PermissionRequiredMixin, ListView
|
|
):
|
|
model = HIDEvent
|
|
permission_required = "doorcontrol.view_hidevent"
|
|
paginate_by = 20
|
|
context_object_name = "object_list"
|
|
template_name = "doorcontrol/access_report.dj.html"
|
|
|
|
export_formats = ("csv", "xlsx", "ods")
|
|
|
|
filterset_class = AccessReportFilterSet
|
|
|
|
_report_name = None
|
|
|
|
@classmethod
|
|
def _report_types(cls):
|
|
yield [
|
|
cls._report_name,
|
|
reverse_lazy("doorcontrol:" + slugify(cls._report_name)),
|
|
]
|
|
|
|
@classmethod
|
|
def _urlpattern(cls):
|
|
slug = slugify(cls._report_name)
|
|
return path(f"reports/{slug}", cls.as_view(), name=slug)
|
|
|
|
@property
|
|
def export_name(self):
|
|
return slugify(self._report_name)
|
|
|
|
def _selected_report(self):
|
|
return self._report_name
|
|
|
|
def get_paginate_by(self, queryset) -> int:
|
|
if "items_per_page" in self.request.GET:
|
|
return int(self.request.GET.get("items_per_page"))
|
|
return super().get_paginate_by(queryset)
|
|
|
|
def get_queryset(self):
|
|
return super().get_queryset().select_related("door")
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
context["report_types"] = [
|
|
rt for report in REPORTS for rt in report._report_types()
|
|
]
|
|
|
|
page: Page = context["page_obj"]
|
|
context["paginator_range"] = page.paginator.get_elided_page_range(page.number)
|
|
context["selected_report"] = self._selected_report()
|
|
context["items_per_page"] = self.get_paginate_by(None)
|
|
|
|
query_params = self.request.GET.copy()
|
|
if "page" in query_params:
|
|
query_params.pop("page")
|
|
context["query_params"] = query_params.urlencode()
|
|
|
|
return context
|
|
|
|
|
|
@register_report
|
|
class AccessPerUnitTime(BaseAccessReport):
|
|
table_class = UnitTimeTable
|
|
UNIT_TIMES = ["day", "week", "month", "year"]
|
|
|
|
@classmethod
|
|
def _report_types(cls):
|
|
for unit_time in cls.UNIT_TIMES:
|
|
yield (
|
|
"Access per " + unit_time.title(),
|
|
reverse_lazy("doorcontrol:access-per-unit-time", args=[unit_time]),
|
|
)
|
|
|
|
@classmethod
|
|
def _urlpattern(cls):
|
|
return path(
|
|
"reports/access-per-<unit_time>",
|
|
cls.as_view(),
|
|
name="access-per-unit-time",
|
|
)
|
|
|
|
@property
|
|
def _report_name(self):
|
|
unit_time = self.kwargs["unit_time"]
|
|
return "Access per " + unit_time.title()
|
|
|
|
def _selected_report(self) -> str:
|
|
return "Access per " + self.kwargs["unit_time"].title()
|
|
|
|
def get_table_kwargs(self):
|
|
unit_time = self.kwargs["unit_time"]
|
|
if unit_time == "week":
|
|
unit_time_column = tables.TemplateColumn(
|
|
verbose_name=unit_time.title(),
|
|
template_code=(
|
|
"{{ value|date|default:default }} - "
|
|
"{{ value|add:one_week|date|default:default }}"
|
|
),
|
|
extra_context={"one_week": datetime.timedelta(weeks=1)},
|
|
)
|
|
else:
|
|
if unit_time == "day":
|
|
date_format = "DATE_FORMAT"
|
|
elif unit_time == "month":
|
|
date_format = "N Y"
|
|
elif unit_time == "year":
|
|
date_format = "Y"
|
|
|
|
unit_time_column = tables.DateColumn(
|
|
date_format, verbose_name=unit_time.title()
|
|
)
|
|
|
|
return {
|
|
"sequence": ("unit_time", "..."),
|
|
"extra_columns": (("unit_time", unit_time_column),),
|
|
}
|
|
|
|
def get_table_data(self):
|
|
unit_time = self.kwargs["unit_time"]
|
|
if unit_time not in self.UNIT_TIMES:
|
|
raise BadRequest("unit time must be one of day, week, month, or year")
|
|
|
|
granted_event_types = [
|
|
t for t in HIDEvent.EventType if t.name.startswith("GRANTED_ACCESS")
|
|
]
|
|
return (
|
|
super()
|
|
.get_table_data()
|
|
.filter(event_type__in=granted_event_types)
|
|
.with_member_id()
|
|
.values(unit_time=Trunc("timestamp", unit_time))
|
|
.annotate(
|
|
members=Count("member_id", distinct=True),
|
|
members_delta=(
|
|
F("members")
|
|
/ Window(
|
|
Lead("members"),
|
|
order_by="-unit_time",
|
|
output_field=FloatField(),
|
|
)
|
|
* 100
|
|
- 100
|
|
),
|
|
access_count=Count("cardholder_id"),
|
|
access_count_delta=(
|
|
F("access_count")
|
|
/ Window(
|
|
Lead("access_count"),
|
|
order_by="-unit_time",
|
|
output_field=FloatField(),
|
|
)
|
|
* 100
|
|
- 100
|
|
),
|
|
)
|
|
.order_by("-unit_time")
|
|
)
|
|
|
|
|
|
@register_report
|
|
class DeniedAccess(BaseAccessReport):
|
|
_report_name = "Denied Access"
|
|
table_class = DeniedAccessTable
|
|
|
|
def get_table_data(self):
|
|
denied_event_types = [
|
|
t for t in HIDEvent.EventType if t.name.startswith("DENIED_ACCESS")
|
|
]
|
|
return (
|
|
super()
|
|
.get_table_data()
|
|
.filter(event_type__in=denied_event_types)
|
|
.with_decoded_card_number()
|
|
)
|
|
|
|
|
|
@register_report
|
|
class MostActiveMembers(BaseAccessReport):
|
|
_report_name = "Most Active Members"
|
|
table_class = MostActiveMembersTable
|
|
|
|
def get_table_data(self):
|
|
return (
|
|
super()
|
|
.get_table_data()
|
|
.with_member_id()
|
|
.filter(member_id__isnull=False)
|
|
.values("member_id")
|
|
.annotate(
|
|
access_count=Count("member_id"),
|
|
name=GroupConcat(
|
|
ConcatWS("forename", "surname", separator=" "), distinct=True
|
|
),
|
|
)
|
|
.order_by("-access_count")
|
|
)
|
|
|
|
|
|
@register_report
|
|
class DetailByDay(BaseAccessReport):
|
|
_report_name = "Detail by Day"
|
|
table_class = DetailByDayTable
|
|
|
|
def get_table_data(self):
|
|
return (
|
|
super()
|
|
.get_table_data()
|
|
.with_member_id()
|
|
.values("timestamp__date", "member_id")
|
|
.filter(member_id__isnull=False)
|
|
.annotate(
|
|
access_count=Count("member_id"),
|
|
name=GroupConcat(
|
|
ConcatWS("forename", "surname", separator=" "), distinct=True
|
|
),
|
|
)
|
|
.order_by("-timestamp__date")
|
|
)
|
|
|
|
|
|
@register_report
|
|
class BusiestDayOfWeek(BaseAccessReport):
|
|
_report_name = "Busiest Day of the Week"
|
|
table_pagination = False
|
|
table_class = BusiestDayOfWeekTable
|
|
|
|
def get_table_data(self):
|
|
return (
|
|
super()
|
|
.get_table_data()
|
|
.with_member_id()
|
|
.values("timestamp__week_day")
|
|
.annotate(
|
|
events=Count("timestamp"), members=Count("member_id", distinct=True)
|
|
)
|
|
)
|
|
|
|
|
|
@register_report
|
|
class BusiestTimeOfDay(BaseAccessReport):
|
|
_report_name = "Busiest Time of Day"
|
|
table_pagination = False
|
|
table_class = BusiestTimeOfDayTable
|
|
|
|
def get_table_data(self):
|
|
return (
|
|
super()
|
|
.get_table_data()
|
|
.with_member_id()
|
|
.values("timestamp__hour")
|
|
.annotate(
|
|
events=Count("timestamp"), members=Count("member_id", distinct=True)
|
|
)
|
|
)
|