from typing import Optional, Any, Type, cast from django import forms from django.core import mail from django.contrib import admin, messages from django.db.models import Value from django.db.models.query import QuerySet from django.db.models.functions import Now, Concat, LPad from django.http import HttpRequest from .models import ( AbstractAudit, CmsRedRiverVeteransScholarship, Department, CertificationDefinition, Certification, CertificationAudit, CertificationVersion, CertificationVersionAnnotated, InstructorOrVendor, SpecialProgram, Waiver, WaiverAudit, ) from .forms import CertificationForm from .certification_emails import all_certification_emails class AlwaysChangedModelForm(forms.models.ModelForm): """By always returning true even unchanged inlines will get validated and saved.""" def has_changed(self) -> bool: return True class AbstractAuditInline(admin.TabularInline): extra = 0 form = AlwaysChangedModelForm def get_formset( self, request: HttpRequest, obj: Optional[AbstractAudit] = None, **kwargs: Any ) -> Type[ "forms.models.BaseInlineFormSet[AbstractAudit, Any, forms.models.ModelForm[Any]]" ]: formset = super().get_formset(request, obj, **kwargs) formset.form.base_fields["user"].initial = request.user return formset @admin.register(Department) class DepartmentAdmin(admin.ModelAdmin): search_fields = ["name"] list_display = [ "name", "parent", "has_mailing_list", "shop_lead_flag", "list_reply_to_address", ] class CertificationVersionInline(admin.TabularInline): model = CertificationVersion extra = 1 readonly_fields = ( "semantic_version", "is_latest", "is_current", ) @admin.display(description="Latest", boolean=True) def is_latest(self, obj: CertificationVersionAnnotated) -> bool: return obj.is_latest @admin.display(description="Current", boolean=True) def is_current(self, obj: CertificationVersionAnnotated) -> bool: return obj.is_current @admin.register(CertificationDefinition) class CertificationDefinitionAdmin(admin.ModelAdmin): search_fields = ["name"] list_display = [ "name", "department", "latest_semantic_version", ] list_filter = ["department"] inlines = [CertificationVersionInline] @admin.display(description="Latest Version") def latest_semantic_version(self, obj: CertificationDefinition) -> str: return str(obj.latest_version().semantic_version()) class CertificationAuditInline(AbstractAuditInline): model = CertificationAudit @admin.register(Certification) class CertificationAdmin(admin.ModelAdmin): form = CertificationForm search_fields = [ "name", "certification_version__definition__name", "certification_version__definition__department__name", ] autocomplete_fields = ["member"] exclude = ["shop_lead_notified"] inlines = [CertificationAuditInline] def get_queryset(self, request: HttpRequest) -> QuerySet[Certification]: qs = super().get_queryset(request) return qs.prefetch_related("certification_version__definition__department") @admin.display( description="Certification Name", ordering="certification_version__definition__name", ) def name(self, obj: Certification) -> str: return obj.certification_version.definition.name @admin.display( description="Certification Version", ordering=( Concat( LPad("certification_version__major", 4, Value("0")), LPad("certification_version__minor", 4, Value("0")), LPad("certification_version__patch", 4, Value("0")), "certification_version__prerelease", "certification_version__approval_date", ) ), ) def certification_semantic_version(self, obj: Certification) -> str: return str(obj.certification_version.semantic_version()) @admin.display(description="Current", boolean=True) def is_current(self, obj: Certification) -> bool: return cast(CertificationVersionAnnotated, obj.certification_version).is_current @admin.display( description="Department", ordering="certification_version__definition__department", ) def certification_department(self, obj: Certification) -> Department: return obj.certification_version.definition.department @admin.display(description="Latest Audit") def latest_audit(self, obj: Certification) -> CertificationAudit: return obj.audits.latest() list_display = [ "name", "name", "certification_semantic_version", "is_current", "certification_department", "date", "shop_lead_notified", "certified_by", "latest_audit", ] list_display_links = [ "name", "name", ] list_filter = [ "certification_version__definition__department", ("shop_lead_notified", admin.EmptyFieldListFilter), ("audits", admin.EmptyFieldListFilter), ] actions = ["send_notifications"] @admin.action( description="Notify Shop Leads and Members of selected certifications" ) def send_notifications( self, request: HttpRequest, queryset: QuerySet[Certification] ) -> None: try: emails = list(all_certification_emails(queryset)) with mail.get_connection() as conn: conn.send_messages(emails) for cert in queryset: cert.shop_lead_notified = Now() cert.save() self.message_user( request, f"{len(emails)} notifications sent for {len(queryset)} certifications", messages.SUCCESS, ) except Exception as e: self.message_user( request, f"Failed to send notifications! {e}", messages.ERROR, ) raise @admin.register(InstructorOrVendor) class InstructorOrVendorAdmin(admin.ModelAdmin): search_fields = ["name"] list_display = [ "name", "instructor_agreement_date", "w9_date", "phone", "email_address", ] @admin.register(SpecialProgram) class SpecialProgramAdmin(admin.ModelAdmin): search_fields = ["program_name"] list_display = [ "program_name", "discount_percent", "discount_code", "membership_code", "start_date", "end_date", "program_amount", "program_status", ] class WaiverAuditInline(AbstractAuditInline): model = WaiverAudit @admin.register(Waiver) class WaiverAdmin(admin.ModelAdmin): search_fields = ["name"] list_display = [ "name", "date", "emergency_contact_name", "emergency_contact_number", "waiver_version", "guardian_name", "guardian_relation", "guardian_date", "latest_audit", ] list_filter = [ "waiver_version", ("audits", admin.EmptyFieldListFilter), ] inlines = [WaiverAuditInline] @admin.display(description="Latest Audit") def latest_audit(self, obj: Waiver) -> WaiverAudit: return obj.audits.latest() admin.site.register(CmsRedRiverVeteransScholarship)