import datetime from django.contrib.auth.mixins import PermissionRequiredMixin from django.core.paginator import Page from django.core.exceptions import BadRequest from django.db.models import Count from django.db.models.functions import Trunc from django.urls import reverse_lazy from django.utils import dateparse from django.utils.formats import date_format from django.utils.text import slugify from django.utils.timezone import localtime from django.views.generic.list import ListView from .models import HIDEvent REPORT_TYPES = [] def register_report(cls: "BaseAccessReport"): REPORT_TYPES.extend(cls._report_types()) return cls class BaseAccessReport(PermissionRequiredMixin, ListView): model = HIDEvent permission_required = "doorcontrol.view_hidevent" paginate_by = 20 context_object_name = "object_list" template_name = "doorcontrol/access_report.dj.html" _report_name = None @classmethod def _report_types(cls): yield [ cls._report_name, reverse_lazy("doorcontrol:" + slugify(cls._report_name)), ] def _selected_report(self): return self._report_name def _get_timestamp_range(self): timestamp__gte = dateparse.parse_datetime( self.request.GET.get("timestamp__gte") or "2019-01-01" ) timestamp__lte = self.request.GET.get("timestamp__lte") if timestamp__lte: timestamp__lte = dateparse.parse_datetime(timestamp__lte) else: timestamp__lte = localtime() return timestamp__gte, timestamp__lte def get_paginate_by(self, queryset) -> int: if "items_per_page" in self.request.GET: return int(self.request.GET.get("items_per_page")) return super().get_paginate_by(queryset) def get_queryset(self): return ( super().get_queryset().filter(timestamp__range=self._get_timestamp_range()) ) def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["report_types"] = REPORT_TYPES page: Page = context["page_obj"] context["paginator_range"] = page.paginator.get_elided_page_range(page.number) context["selected_report"] = self._selected_report() context["items_per_page"] = self.get_paginate_by(None) timestamp__gte, timestamp__lte = self._get_timestamp_range() context["timestamp__gte"] = timestamp__gte context["timestamp__lte"] = timestamp__lte query_params = self.request.GET.copy() if "page" in query_params: query_params.pop("page") context["query_params"] = query_params.urlencode() return context @register_report class AccessPerUnitTime(BaseAccessReport): UNIT_TIMES = ["day", "week", "month", "year"] @classmethod def _report_types(cls): for unit_time in cls.UNIT_TIMES: yield ( "Access per " + unit_time.title(), reverse_lazy("doorcontrol:access-per-unit-time", args=[unit_time]), ) def _selected_report(self) -> str: return "Access per " + self.kwargs["unit_time"].title() def _format_date(self, date: datetime.datetime) -> str: unit_time = self.kwargs["unit_time"] if unit_time == "day": return date_format(date, "DATE_FORMAT") elif unit_time == "week": return ( date_format(date, "DATE_FORMAT") + " - " + date_format(date + datetime.timedelta(weeks=1), "DATE_FORMAT") ) elif unit_time == "month": return date_format(date, "N Y") elif unit_time == "year": return date_format(date, "Y") def get_queryset(self): unit_time = self.kwargs["unit_time"] if unit_time not in self.UNIT_TIMES: raise BadRequest("unit time must be one of day, week, month, or year") granted_event_types = [ t for t in HIDEvent.EventType if t.name.startswith("GRANTED_ACCESS") ] events = ( super() .get_queryset() .filter(event_type__in=granted_event_types) .values(unit_time=Trunc("timestamp", unit_time)) .annotate( members=Count("cardholder_id", distinct=True), access_count=Count("cardholder_id"), ) .order_by("-unit_time") .values("unit_time", "members", "access_count") ) return [ { unit_time: self._format_date(event["unit_time"]), "members": event["members"], "access count": event["access_count"], } for event in events ] @register_report class DeniedAccess(BaseAccessReport): _report_name = "Denied Access" def get_queryset(self): denied_event_types = [ t for t in HIDEvent.EventType if t.name.startswith("DENIED_ACCESS") ] events = ( super() .get_queryset() .filter(event_type__in=denied_event_types) .with_decoded_card_number() ) return [ { "timestamp": event.timestamp, "door name": event.door_name, "event type": HIDEvent.EventType(event.event_type).label, "name": " ".join( (n for n in [event.forename, event.surname] if n is not None) ), "raw card number": ( event.raw_card_number if event.raw_card_number is not None else "" ), "card number": event.decoded_card_number() or "", } for event in events ] @register_report class MostActiveMembers(BaseAccessReport): _report_name = "Most Active Members" def get_queryset(self): counts = ( super() .get_queryset() .values("cardholder_id", "forename", "surname") .order_by() .annotate(access_count=Count("cardholder_id")) .order_by("-access_count") ) return [ { "cardholder id": count["cardholder_id"], "name": " ".join( (n for n in [count["forename"], count["surname"]] if n is not None) ), "access count": count["access_count"], } for count in counts ]