cmsmanage/doorcontrol/views.py

283 lines
8.7 KiB
Python

import calendar
import datetime
from django.contrib.auth.mixins import PermissionRequiredMixin
from django.core.exceptions import BadRequest
from django.core.paginator import Page
from django.db.models import Count, F, FloatField, Window
from django.db.models.functions import Lead, Trunc
from django.urls import path, reverse_lazy
from django.utils import dateparse
from django.utils.formats import date_format
from django.utils.text import slugify
from django.utils.timezone import localtime
from django.views.generic.list import ListView
from .models import HIDEvent
REPORTS = []
def register_report(cls: "BaseAccessReport"):
REPORTS.append(cls)
return cls
class BaseAccessReport(PermissionRequiredMixin, ListView):
model = HIDEvent
permission_required = "doorcontrol.view_hidevent"
paginate_by = 20
context_object_name = "object_list"
template_name = "doorcontrol/access_report.dj.html"
_report_name = None
@classmethod
def _report_types(cls):
yield [
cls._report_name,
reverse_lazy("doorcontrol:" + slugify(cls._report_name)),
]
@classmethod
def _urlpattern(cls):
slug = slugify(cls._report_name)
return path(f"reports/{slug}", cls.as_view(), name=slug)
def _selected_report(self):
return self._report_name
def _get_timestamp_range(self):
timestamp__gte = dateparse.parse_datetime(
self.request.GET.get("timestamp__gte") or "2019-01-01"
)
timestamp__lte = self.request.GET.get("timestamp__lte")
if timestamp__lte:
timestamp__lte = dateparse.parse_datetime(timestamp__lte)
else:
timestamp__lte = localtime()
return timestamp__gte, timestamp__lte
def get_paginate_by(self, queryset) -> int:
if "items_per_page" in self.request.GET:
return int(self.request.GET.get("items_per_page"))
return super().get_paginate_by(queryset)
def get_queryset(self):
return (
super()
.get_queryset()
.filter(timestamp__range=self._get_timestamp_range())
.select_related("door")
)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["report_types"] = [
rt for report in REPORTS for rt in report._report_types()
]
page: Page = context["page_obj"]
context["paginator_range"] = page.paginator.get_elided_page_range(page.number)
context["selected_report"] = self._selected_report()
context["items_per_page"] = self.get_paginate_by(None)
timestamp__gte, timestamp__lte = self._get_timestamp_range()
context["timestamp__gte"] = timestamp__gte
context["timestamp__lte"] = timestamp__lte
query_params = self.request.GET.copy()
if "page" in query_params:
query_params.pop("page")
context["query_params"] = query_params.urlencode()
return context
@register_report
class AccessPerUnitTime(BaseAccessReport):
UNIT_TIMES = ["day", "week", "month", "year"]
@classmethod
def _report_types(cls):
for unit_time in cls.UNIT_TIMES:
yield (
"Access per " + unit_time.title(),
reverse_lazy("doorcontrol:access-per-unit-time", args=[unit_time]),
)
@classmethod
def _urlpattern(cls):
return path(
"reports/access-per-<unit_time>",
cls.as_view(),
name="access-per-unit-time",
)
def _selected_report(self) -> str:
return "Access per " + self.kwargs["unit_time"].title()
def _format_date(self, date: datetime.datetime) -> str:
unit_time = self.kwargs["unit_time"]
if unit_time == "day":
return date_format(date, "DATE_FORMAT")
elif unit_time == "week":
return (
date_format(date, "DATE_FORMAT")
+ " - "
+ date_format(date + datetime.timedelta(weeks=1), "DATE_FORMAT")
)
elif unit_time == "month":
return date_format(date, "N Y")
elif unit_time == "year":
return date_format(date, "Y")
def get_queryset(self):
unit_time = self.kwargs["unit_time"]
if unit_time not in self.UNIT_TIMES:
raise BadRequest("unit time must be one of day, week, month, or year")
granted_event_types = [
t for t in HIDEvent.EventType if t.name.startswith("GRANTED_ACCESS")
]
events = (
super()
.get_queryset()
.filter(event_type__in=granted_event_types)
.values(unit_time=Trunc("timestamp", unit_time))
.annotate(
members=Count("cardholder_id", distinct=True),
members_delta=(
F("members")
/ Window(
Lead("members"),
order_by="-unit_time",
output_field=FloatField(),
)
* 100
- 100
),
access_count=Count("cardholder_id"),
access_count_delta=(
F("access_count")
/ Window(
Lead("access_count"),
order_by="-unit_time",
output_field=FloatField(),
)
* 100
- 100
),
)
.order_by("-unit_time")
)
return [
{
unit_time: self._format_date(event["unit_time"]),
"members": event["members"],
"Δ members": (
f'{event["members_delta"]:.2f}%'
if event["members_delta"] is not None
else ""
),
"access count": event["access_count"],
"Δ access count": (
f'{event["access_count_delta"]:.2f}%'
if event["access_count_delta"] is not None
else ""
),
}
for event in events
]
@register_report
class DeniedAccess(BaseAccessReport):
_report_name = "Denied Access"
def get_queryset(self):
denied_event_types = [
t for t in HIDEvent.EventType if t.name.startswith("DENIED_ACCESS")
]
events = (
super()
.get_queryset()
.filter(event_type__in=denied_event_types)
.with_decoded_card_number()
)
return [
{
"timestamp": event.timestamp,
"door name": event.door.name,
"event type": HIDEvent.EventType(event.event_type).label,
"name": " ".join(
n for n in [event.forename, event.surname] if n is not None
),
"raw card number": (
event.raw_card_number if event.raw_card_number is not None else ""
),
"card number": event.decoded_card_number() or "",
}
for event in events
]
@register_report
class MostActiveMembers(BaseAccessReport):
_report_name = "Most Active Members"
def get_queryset(self):
counts = (
super()
.get_queryset()
.values("cardholder_id", "forename", "surname")
.order_by()
.annotate(access_count=Count("cardholder_id"))
.order_by("-access_count")
)
return [
{
"cardholder id": count["cardholder_id"],
"name": " ".join(
n for n in [count["forename"], count["surname"]] if n is not None
),
"access count": count["access_count"],
}
for count in counts
]
@register_report
class BusiestDayOfWeek(BaseAccessReport):
_report_name = "Busiest Day of the Week"
def get_queryset(self):
return [
{
"week day": calendar.day_name[(count["timestamp__week_day"] - 2) % 7],
"events": count["events"],
}
for count in super()
.get_queryset()
.values("timestamp__week_day")
.annotate(events=Count("timestamp"))
]
@register_report
class BusiestTimeOfDay(BaseAccessReport):
_report_name = "Busiest Time of Day"
paginate_by = 24
def get_queryset(self):
return [
{"hour": f'{count["timestamp__hour"]}:00', "events": count["events"]}
for count in super()
.get_queryset()
.values("timestamp__hour")
.annotate(events=Count("timestamp"))
]