cmsmanage/doorcontrol/views.py

300 lines
8.7 KiB
Python
Raw Normal View History

import datetime
from django.contrib.auth.mixins import PermissionRequiredMixin
from django.core.exceptions import BadRequest
from django.core.paginator import Page
from django.db.models import Count, F, FloatField, Window
from django.db.models.functions import Lead, Trunc
from django.urls import path, reverse_lazy
from django.utils.text import slugify
from django.views.generic.list import ListView
import django_filters
import django_tables2 as tables
from django_filters.views import BaseFilterView
from django_mysql.models import GroupConcat
from django_mysql.models.functions import ConcatWS
from django_tables2 import SingleTableMixin
from django_tables2.export.views import ExportMixin
from .models import Door, HIDEvent
from .tables import (
BusiestDayOfWeekTable,
BusiestTimeOfDayTable,
DeniedAccessTable,
DetailByDayTable,
MostActiveMembersTable,
UnitTimeTable,
)
REPORTS = []
def register_report(cls: "BaseAccessReport"):
REPORTS.append(cls)
return cls
class AccessReportFilterSet(django_filters.FilterSet):
timestamp = django_filters.DateFromToRangeFilter()
door = django_filters.ModelMultipleChoiceFilter(
queryset=Door.objects.all(), distinct=False
)
class BaseAccessReport(
BaseFilterView, ExportMixin, SingleTableMixin, PermissionRequiredMixin, ListView
):
model = HIDEvent
permission_required = "doorcontrol.view_hidevent"
paginate_by = 20
context_object_name = "object_list"
template_name = "doorcontrol/access_report.dj.html"
export_formats = ("csv", "xlsx", "ods")
filterset_class = AccessReportFilterSet
_report_name = None
@classmethod
def _report_types(cls):
yield [
cls._report_name,
reverse_lazy("doorcontrol:" + slugify(cls._report_name)),
]
@classmethod
def _urlpattern(cls):
slug = slugify(cls._report_name)
return path(f"reports/{slug}", cls.as_view(), name=slug)
@property
def export_name(self):
return slugify(self._report_name)
def _selected_report(self):
return self._report_name
def get_paginate_by(self, queryset) -> int:
if "items_per_page" in self.request.GET:
return int(self.request.GET.get("items_per_page"))
return super().get_paginate_by(queryset)
def get_queryset(self):
return super().get_queryset().select_related("door")
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["report_types"] = [
rt for report in REPORTS for rt in report._report_types()
]
page: Page = context["page_obj"]
context["paginator_range"] = page.paginator.get_elided_page_range(page.number)
context["selected_report"] = self._selected_report()
context["items_per_page"] = self.get_paginate_by(None)
query_params = self.request.GET.copy()
if "page" in query_params:
query_params.pop("page")
context["query_params"] = query_params.urlencode()
return context
@register_report
class AccessPerUnitTime(BaseAccessReport):
table_class = UnitTimeTable
UNIT_TIMES = ["day", "week", "month", "year"]
@classmethod
def _report_types(cls):
for unit_time in cls.UNIT_TIMES:
yield (
"Access per " + unit_time.title(),
reverse_lazy("doorcontrol:access-per-unit-time", args=[unit_time]),
)
@classmethod
def _urlpattern(cls):
return path(
"reports/access-per-<unit_time>",
cls.as_view(),
name="access-per-unit-time",
)
@property
def _report_name(self):
unit_time = self.kwargs["unit_time"]
return "Access per " + unit_time.title()
def _selected_report(self) -> str:
return "Access per " + self.kwargs["unit_time"].title()
def get_table_kwargs(self):
unit_time = self.kwargs["unit_time"]
if unit_time == "week":
unit_time_column = tables.TemplateColumn(
verbose_name=unit_time.title(),
template_code=(
"{{ value|date|default:default }} - "
"{{ value|add:one_week|date|default:default }}"
),
extra_context={"one_week": datetime.timedelta(weeks=1)},
)
else:
if unit_time == "day":
date_format = "DATE_FORMAT"
elif unit_time == "month":
date_format = "N Y"
elif unit_time == "year":
date_format = "Y"
unit_time_column = tables.DateColumn(
date_format, verbose_name=unit_time.title()
)
return {
"sequence": ("unit_time", "..."),
"extra_columns": (("unit_time", unit_time_column),),
}
def get_table_data(self):
unit_time = self.kwargs["unit_time"]
if unit_time not in self.UNIT_TIMES:
raise BadRequest("unit time must be one of day, week, month, or year")
granted_event_types = [
t for t in HIDEvent.EventType if t.name.startswith("GRANTED_ACCESS")
]
return (
super()
.get_table_data()
.filter(event_type__in=granted_event_types)
.with_member_id()
.values(unit_time=Trunc("timestamp", unit_time))
.annotate(
members=Count("member_id", distinct=True),
members_delta=(
F("members")
/ Window(
Lead("members"),
order_by="-unit_time",
output_field=FloatField(),
)
* 100
- 100
),
access_count=Count("cardholder_id"),
access_count_delta=(
F("access_count")
/ Window(
Lead("access_count"),
order_by="-unit_time",
output_field=FloatField(),
)
* 100
- 100
),
)
.order_by("-unit_time")
)
@register_report
class DeniedAccess(BaseAccessReport):
_report_name = "Denied Access"
table_class = DeniedAccessTable
def get_table_data(self):
denied_event_types = [
t for t in HIDEvent.EventType if t.name.startswith("DENIED_ACCESS")
]
return (
super()
.get_table_data()
.filter(event_type__in=denied_event_types)
.with_decoded_card_number()
)
@register_report
class MostActiveMembers(BaseAccessReport):
_report_name = "Most Active Members"
table_class = MostActiveMembersTable
def get_table_data(self):
return (
super()
.get_table_data()
.with_member_id()
.filter(member_id__isnull=False)
.values("member_id")
.annotate(
access_count=Count("member_id"),
name=GroupConcat(
ConcatWS("forename", "surname", separator=" "), distinct=True
),
)
.order_by("-access_count")
)
@register_report
class DetailByDay(BaseAccessReport):
_report_name = "Detail by Day"
table_class = DetailByDayTable
def get_table_data(self):
return (
super()
.get_table_data()
.with_member_id()
.values("timestamp__date", "member_id")
.filter(member_id__isnull=False)
.annotate(
access_count=Count("member_id"),
name=GroupConcat(
ConcatWS("forename", "surname", separator=" "), distinct=True
),
)
.order_by("-timestamp__date")
)
@register_report
class BusiestDayOfWeek(BaseAccessReport):
_report_name = "Busiest Day of the Week"
table_pagination = False
table_class = BusiestDayOfWeekTable
def get_table_data(self):
return (
super()
.get_table_data()
.with_member_id()
.values("timestamp__week_day")
.annotate(
events=Count("timestamp"), members=Count("member_id", distinct=True)
)
)
@register_report
class BusiestTimeOfDay(BaseAccessReport):
_report_name = "Busiest Time of Day"
table_pagination = False
table_class = BusiestTimeOfDayTable
def get_table_data(self):
return (
super()
.get_table_data()
.with_member_id()
.values("timestamp__hour")
.annotate(
events=Count("timestamp"), members=Count("member_id", distinct=True)
)
)