204 lines
6.4 KiB
Python
204 lines
6.4 KiB
Python
import datetime
|
|
|
|
from django.contrib.auth.mixins import PermissionRequiredMixin
|
|
from django.core.paginator import Page
|
|
from django.core.exceptions import BadRequest
|
|
from django.db.models import Count
|
|
from django.db.models.functions import Trunc
|
|
from django.urls import reverse_lazy
|
|
from django.utils import dateparse
|
|
from django.utils.formats import date_format
|
|
from django.utils.text import slugify
|
|
from django.utils.timezone import localtime
|
|
from django.views.generic.list import ListView
|
|
|
|
from .models import HIDEvent
|
|
|
|
|
|
REPORT_TYPES = []
|
|
|
|
|
|
def register_report(cls: "BaseAccessReport"):
|
|
REPORT_TYPES.extend(cls._report_types())
|
|
return cls
|
|
|
|
|
|
class BaseAccessReport(PermissionRequiredMixin, ListView):
|
|
model = HIDEvent
|
|
permission_required = "doorcontrol.view_hidevent"
|
|
paginate_by = 20
|
|
context_object_name = "object_list"
|
|
template_name = "doorcontrol/access_report.dj.html"
|
|
|
|
_report_name = None
|
|
|
|
@classmethod
|
|
def _report_types(cls):
|
|
yield [
|
|
cls._report_name,
|
|
reverse_lazy("doorcontrol:" + slugify(cls._report_name)),
|
|
]
|
|
|
|
def _selected_report(self):
|
|
return self._report_name
|
|
|
|
def _get_timestamp_range(self):
|
|
timestamp__gte = dateparse.parse_datetime(
|
|
self.request.GET.get("timestamp__gte") or "2019-01-01"
|
|
)
|
|
timestamp__lte = self.request.GET.get("timestamp__lte")
|
|
if timestamp__lte:
|
|
timestamp__lte = dateparse.parse_datetime(timestamp__lte)
|
|
else:
|
|
timestamp__lte = localtime()
|
|
|
|
return timestamp__gte, timestamp__lte
|
|
|
|
def get_paginate_by(self, queryset) -> int:
|
|
if "items_per_page" in self.request.GET:
|
|
return int(self.request.GET.get("items_per_page"))
|
|
return super().get_paginate_by(queryset)
|
|
|
|
def get_queryset(self):
|
|
return (
|
|
super().get_queryset().filter(timestamp__range=self._get_timestamp_range())
|
|
)
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
context["report_types"] = REPORT_TYPES
|
|
|
|
page: Page = context["page_obj"]
|
|
context["paginator_range"] = page.paginator.get_elided_page_range(page.number)
|
|
context["selected_report"] = self._selected_report()
|
|
context["items_per_page"] = self.get_paginate_by(None)
|
|
|
|
timestamp__gte, timestamp__lte = self._get_timestamp_range()
|
|
context["timestamp__gte"] = timestamp__gte
|
|
context["timestamp__lte"] = timestamp__lte
|
|
|
|
query_params = self.request.GET.copy()
|
|
if "page" in query_params:
|
|
query_params.pop("page")
|
|
context["query_params"] = query_params.urlencode()
|
|
|
|
return context
|
|
|
|
|
|
@register_report
|
|
class AccessPerUnitTime(BaseAccessReport):
|
|
UNIT_TIMES = ["day", "week", "month", "year"]
|
|
|
|
@classmethod
|
|
def _report_types(cls):
|
|
for unit_time in cls.UNIT_TIMES:
|
|
yield (
|
|
"Access per " + unit_time.title(),
|
|
reverse_lazy("doorcontrol:access-per-unit-time", args=[unit_time]),
|
|
)
|
|
|
|
def _selected_report(self) -> str:
|
|
return "Access per " + self.kwargs["unit_time"].title()
|
|
|
|
def _format_date(self, date: datetime.datetime) -> str:
|
|
unit_time = self.kwargs["unit_time"]
|
|
if unit_time == "day":
|
|
return date_format(date, "DATE_FORMAT")
|
|
elif unit_time == "week":
|
|
return (
|
|
date_format(date, "DATE_FORMAT")
|
|
+ " - "
|
|
+ date_format(date + datetime.timedelta(weeks=1), "DATE_FORMAT")
|
|
)
|
|
elif unit_time == "month":
|
|
return date_format(date, "N Y")
|
|
elif unit_time == "year":
|
|
return date_format(date, "Y")
|
|
|
|
def get_queryset(self):
|
|
unit_time = self.kwargs["unit_time"]
|
|
if unit_time not in self.UNIT_TIMES:
|
|
raise BadRequest("unit time must be one of day, week, month, or year")
|
|
|
|
granted_event_types = [
|
|
t for t in HIDEvent.EventType if t.name.startswith("GRANTED_ACCESS")
|
|
]
|
|
events = (
|
|
super()
|
|
.get_queryset()
|
|
.filter(event_type__in=granted_event_types)
|
|
.values(unit_time=Trunc("timestamp", unit_time))
|
|
.annotate(
|
|
members=Count("cardholder_id", distinct=True),
|
|
access_count=Count("cardholder_id"),
|
|
)
|
|
.order_by("-unit_time")
|
|
.values("unit_time", "members", "access_count")
|
|
)
|
|
return [
|
|
{
|
|
unit_time: self._format_date(event["unit_time"]),
|
|
"members": event["members"],
|
|
"access count": event["access_count"],
|
|
}
|
|
for event in events
|
|
]
|
|
|
|
|
|
@register_report
|
|
class DeniedAccess(BaseAccessReport):
|
|
_report_name = "Denied Access"
|
|
|
|
def get_queryset(self):
|
|
denied_event_types = [
|
|
t for t in HIDEvent.EventType if t.name.startswith("DENIED_ACCESS")
|
|
]
|
|
events = (
|
|
super()
|
|
.get_queryset()
|
|
.filter(event_type__in=denied_event_types)
|
|
.with_decoded_card_number()
|
|
)
|
|
|
|
return [
|
|
{
|
|
"timestamp": event.timestamp,
|
|
"door name": event.door_name,
|
|
"event type": HIDEvent.EventType(event.event_type).label,
|
|
"name": " ".join(
|
|
(n for n in [event.forename, event.surname] if n is not None)
|
|
),
|
|
"raw card number": (
|
|
event.raw_card_number if event.raw_card_number is not None else ""
|
|
),
|
|
"card number": event.decoded_card_number() or "",
|
|
}
|
|
for event in events
|
|
]
|
|
|
|
|
|
@register_report
|
|
class MostActiveMembers(BaseAccessReport):
|
|
_report_name = "Most Active Members"
|
|
|
|
def get_queryset(self):
|
|
counts = (
|
|
super()
|
|
.get_queryset()
|
|
.values("cardholder_id", "forename", "surname")
|
|
.order_by()
|
|
.annotate(access_count=Count("cardholder_id"))
|
|
.order_by("-access_count")
|
|
)
|
|
|
|
return [
|
|
{
|
|
"cardholder id": count["cardholder_id"],
|
|
"name": " ".join(
|
|
(n for n in [count["forename"], count["surname"]] if n is not None)
|
|
),
|
|
"access count": count["access_count"],
|
|
}
|
|
for count in counts
|
|
]
|