cmsmanage/membershipworks/models.py
Adam Goldsmith 79651731b1
All checks were successful
Ruff / ruff (push) Successful in 29s
Test / test (push) Successful in 4m53s
membershipworks: Fix missing @property on EventExt.ready_for_invoice
2024-05-03 12:37:30 -04:00

788 lines
28 KiB
Python

import uuid
from datetime import datetime
from typing import Optional
import django.core.mail.message
from django.conf import settings
from django.contrib.auth.models import AbstractBaseUser
from django.db import models
from django.db.models import (
Case,
Count,
Exists,
ExpressionWrapper,
F,
Func,
OuterRef,
Q,
Subquery,
Sum,
Value,
When,
)
from django.db.models.functions import Coalesce
from django.urls import reverse
from django.utils import timezone
import nh3
from django_db_views.db_view import DBView
class BaseModel(models.Model):
_api_names_override = {}
_date_fields = {}
_allowed_missing_fields = []
class Meta:
abstract = True
@classmethod
def _remap_headers(cls, data):
for field in cls._meta.get_fields():
# TODO: more robust filtering of fields that don't have a column
if (
field.auto_created
or field.many_to_many
or not field.concrete
or field.generated
):
continue
field_name, api_name = field.get_attname_column()
if field_name in cls._api_names_override:
api_name = cls._api_names_override[field_name]
yield field_name, data[api_name]
@classmethod
def from_api_dict(cls, data):
data = data.copy()
for field in cls._allowed_missing_fields:
if field not in data:
data[field] = None
# parse date fields to datetime objects
for field, fmt in cls._date_fields.items():
if data[field]:
if fmt is None:
# can't use '%s' format string, have to use the special function
data[field] = datetime.fromtimestamp(
data[field], tz=timezone.get_current_timezone()
)
else:
data[field] = datetime.strptime(str(data[field]), fmt)
else:
# convert empty string to None to make NULL in SQL
data[field] = None
return cls(**dict(cls._remap_headers(data)))
class Flag(BaseModel):
id = models.CharField(max_length=24, primary_key=True)
name = models.TextField(null=True, blank=True)
type = models.CharField(max_length=6)
def __str__(self):
return f"{self.name} ({self.type})"
class Meta:
db_table = "flag"
ordering = ("name",)
class MemberQuerySet(models.QuerySet):
# TODO: maybe rename to reflect EXISTS?
@staticmethod
def has_flag(flag_type: str, flag_name: str):
return Exists(
Flag.objects.filter(type=flag_type, name=flag_name, members=OuterRef("pk"))
)
# TODO: it should be fairly easy to reduce the number of EXISTS by
# merging the ORed flags
def with_is_active(self):
return self.annotate(
is_active=(
self.has_flag("folder", "Members")
| self.has_flag("folder", "CMS Staff")
)
& ~(
self.has_flag("label", "Account On Hold")
| self.has_flag("level", "CMS Membership on hold")
| self.has_flag("folder", "Former Members")
)
)
# TODO: is this still a temporal table?
class Member(BaseModel):
objects = MemberQuerySet.as_manager()
uid = models.CharField(max_length=24, primary_key=True)
year_of_birth = models.TextField(db_column="Year of Birth", null=True, blank=True)
account_name = models.TextField(db_column="Account Name", null=True, blank=True)
first_name = models.TextField(db_column="First Name", null=True, blank=True)
last_name = models.TextField(db_column="Last Name", null=True, blank=True)
phone = models.TextField(db_column="Phone", null=True, blank=True)
email = models.TextField(db_column="Email", null=True, blank=True)
volunteer_email = models.TextField(
db_column="Volunteer Email", null=True, blank=True
)
address_street = models.TextField(
db_column="Address (Street)", null=True, blank=True
)
address_city = models.TextField(db_column="Address (City)", null=True, blank=True)
address_state_province = models.TextField(
db_column="Address (State/Province)", null=True, blank=True
)
address_postal_code = models.TextField(
db_column="Address (Postal Code)", null=True, blank=True
)
address_country = models.TextField(
db_column="Address (Country)", null=True, blank=True
)
profile_description = models.TextField(
db_column="Profile description", null=True, blank=True
)
website = models.TextField(db_column="Website", null=True, blank=True)
fax = models.TextField(db_column="Fax", null=True, blank=True)
contact_person = models.TextField(db_column="Contact Person", null=True, blank=True)
password = models.TextField(db_column="Password", null=True, blank=True)
position_relation = models.TextField(
db_column="Position/relation", null=True, blank=True
)
parent_account_id = models.TextField(
db_column="Parent Account ID", null=True, blank=True
)
closet_storage = models.TextField(
db_column="Closet Storage #", null=True, blank=True
)
storage_shelf = models.TextField(db_column="Storage Shelf #", null=True, blank=True)
personal_studio_space = models.TextField(
db_column="Personal Studio Space #", null=True, blank=True
)
access_permitted_shops_during_extended_hours = models.BooleanField(
db_column="Access Permitted Shops During Extended Hours?"
)
access_front_door_and_studio_space_during_extended_hours = models.BooleanField(
db_column="Access Front Door and Studio Space During Extended Hours?"
)
access_wood_shop = models.BooleanField(db_column="Access Wood Shop?")
access_metal_shop = models.BooleanField(db_column="Access Metal Shop?")
access_storage_closet = models.BooleanField(db_column="Access Storage Closet?")
access_studio_space = models.BooleanField(db_column="Access Studio Space?")
access_front_door = models.BooleanField(db_column="Access Front Door?")
access_card_number = models.TextField(
db_column="Access Card Number", null=True, blank=True
)
access_card_facility_code = models.TextField(
db_column="Access Card Facility Code", null=True, blank=True
)
auto_billing_id = models.TextField(
db_column="Auto Billing ID", null=True, blank=True
)
billing_method = models.TextField(db_column="Billing Method", null=True, blank=True)
renewal_date = models.DateField(db_column="Renewal Date", null=True, blank=True)
join_date = models.DateField(db_column="Join Date", null=True, blank=True)
admin_note = models.TextField(db_column="Admin note", null=True, blank=True)
profile_gallery_image_url = models.TextField(
db_column="Profile gallery image URL", null=True, blank=True
)
business_card_image_url = models.TextField(
db_column="Business card image URL", null=True, blank=True
)
instagram = models.TextField(db_column="Instagram", null=True, blank=True)
pinterest = models.TextField(db_column="Pinterest", null=True, blank=True)
youtube = models.TextField(db_column="Youtube", null=True, blank=True)
yelp = models.TextField(db_column="Yelp", null=True, blank=True)
google = models.TextField(db_column="Google+", null=True, blank=True)
bbb = models.TextField(db_column="BBB", null=True, blank=True)
twitter = models.TextField(db_column="Twitter", null=True, blank=True)
facebook = models.TextField(db_column="Facebook", null=True, blank=True)
linked_in = models.TextField(db_column="LinkedIn", null=True, blank=True)
do_not_show_street_address_in_profile = models.TextField(
db_column="Do not show street address in profile", null=True, blank=True
)
do_not_list_in_directory = models.TextField(
db_column="Do not list in directory", null=True, blank=True
)
how_did_you_hear = models.TextField(
db_column="HowDidYouHear", null=True, blank=True
)
authorize_charge = models.TextField(
db_column="authorizeCharge", null=True, blank=True
)
policy_agreement = models.TextField(
db_column="policyAgreement", null=True, blank=True
)
waiver_form_signed_and_on_file_date = models.DateField(
db_column="Waiver form signed and on file date.", null=True, blank=True
)
membership_agreement_signed_and_on_file_date = models.DateField(
db_column="Membership Agreement signed and on file date.", null=True, blank=True
)
ip_address = models.TextField(db_column="IP Address", null=True, blank=True)
audit_date = models.DateField(db_column="Audit Date", null=True, blank=True)
agreement_version = models.TextField(
db_column="Agreement Version", null=True, blank=True
)
paperwork_status = models.TextField(
db_column="Paperwork status", null=True, blank=True
)
membership_agreement_dated = models.BooleanField(
db_column="Membership agreement dated"
)
membership_agreement_acknowledgement_page_filled_out = models.BooleanField(
db_column="Membership Agreement Acknowledgement Page Filled Out"
)
membership_agreement_signed = models.BooleanField(
db_column="Membership Agreement Signed"
)
liability_form_filled_out = models.BooleanField(
db_column="Liability Form Filled Out"
)
flags = models.ManyToManyField(Flag, through="MemberFlag", related_name="members")
_api_names_override = {
"uid": "Account ID",
"how_did_you_hear": "Please tell us how you heard about the Claremont MakerSpace and what tools or shops you are most excited to start using:",
"authorize_charge": "Yes - I authorize TwinState MakerSpaces, Inc. to charge my credit card for the membership and other options that I have selected.",
"policy_agreement": "I have read the Claremont MakerSpace Membership Agreement & Policies, and agree to all terms stated therein.",
}
_date_fields = {
"Join Date": "%b %d, %Y",
"Renewal Date": "%b %d, %Y",
"Audit Date": "%m/%d/%Y",
"Membership Agreement signed and on file date.": "%m/%d/%Y",
"Waiver form signed and on file date.": "%m/%d/%Y",
}
def __str__(self):
return f"{self.account_name}"
class Meta:
db_table = "members"
ordering = ("first_name", "last_name")
indexes = [
models.Index(fields=["account_name"], name="account_name_idx"),
models.Index(fields=["first_name"], name="first_name_idx"),
models.Index(fields=["last_name"], name="last_name_idx"),
]
@classmethod
def from_user(cls, user) -> Optional["Member"]:
if hasattr(user, "ldap_user"):
return cls.objects.get(uid=user.ldap_user.attrs["employeeNumber"][0])
def sanitized_mailbox(self, use_volunteer=False) -> str:
if use_volunteer and self.volunteer_email:
email = self.volunteer_email
elif self.email:
email = self.email
else:
raise Exception(f"No Email Address for user: {self.uid}")
if not self.account_name:
return email
return django.core.mail.message.sanitize_address(
(self.account_name, email), settings.DEFAULT_CHARSET
)
class MemberFlag(BaseModel):
member = models.ForeignKey(
Member, on_delete=models.PROTECT, db_column="uid", db_constraint=False
)
flag = models.ForeignKey(Flag, on_delete=models.PROTECT)
def __str__(self):
return f"{self.member} - {self.flag}"
class Meta:
db_table = "memberflag"
constraints = [
models.UniqueConstraint(
fields=["member", "flag"], name="unique_member_flag"
)
]
class Transaction(BaseModel):
sid = models.CharField(max_length=256, null=True, blank=True)
member = models.ForeignKey(
Member,
on_delete=models.PROTECT,
db_column="uid",
db_constraint=False,
related_name="transactions",
null=True,
blank=True,
)
timestamp = models.DateTimeField()
type = models.TextField(null=True, blank=True)
sum = models.DecimalField(max_digits=13, decimal_places=4, null=True, blank=True)
fee = models.DecimalField(max_digits=13, decimal_places=4, null=True, blank=True)
event_id = models.TextField(null=True, blank=True)
for_what = models.TextField(db_column="For", null=True, blank=True)
items = models.TextField(db_column="Items", null=True, blank=True)
discount_code = models.TextField(db_column="Discount Code", null=True, blank=True)
note = models.TextField(db_column="Note", null=True, blank=True)
name = models.TextField(db_column="Name", null=True, blank=True)
contact_person = models.TextField(db_column="Contact Person", null=True, blank=True)
full_address = models.TextField(db_column="Full Address", null=True, blank=True)
street = models.TextField(db_column="Street", null=True, blank=True)
city = models.TextField(db_column="City", null=True, blank=True)
state_province = models.TextField(db_column="State/Province", null=True, blank=True)
postal_code = models.TextField(db_column="Postal Code", null=True, blank=True)
country = models.TextField(db_column="Country", null=True, blank=True)
phone = models.TextField(db_column="Phone", null=True, blank=True)
email = models.TextField(db_column="Email", null=True, blank=True)
_allowed_missing_fields = [
"sid",
"uid",
"eid",
"fee",
"sum",
]
_api_names_override = {
"event_id": "eid",
"timestamp": "_dp",
"type": "Transaction Type",
"for_what": "Reference",
}
_date_fields = {
"_dp": None,
}
def __str__(self):
return f"{self.type} [{self.member if self.member else self.name}] {self.timestamp}"
class Meta:
db_table = "transactions"
class EventCategory(models.Model):
id = models.IntegerField(primary_key=True)
title = models.TextField()
@classmethod
def from_api_dict(cls, id: int, data):
return cls(id=id, title=data["ttl"])
def __str__(self):
return self.title
class Event(BaseModel):
class EventCalendar(models.IntegerChoices):
HIDDEN = 0
GREEN = 1
RED = 2
YELLOW = 3
BLUE = 4
PURPLE = 5
MAGENTA = 6
GREY = 7
TEAL = 8
eid = models.CharField(max_length=255, primary_key=True)
url = models.TextField()
title = models.TextField()
start = models.DateTimeField()
end = models.DateTimeField(null=True, blank=True)
cap = models.IntegerField(null=True, blank=True)
count = models.IntegerField()
category = models.ForeignKey(EventCategory, on_delete=models.PROTECT)
calendar = models.IntegerField(choices=EventCalendar)
venue = models.TextField(null=True, blank=True)
occurred = models.GeneratedField(
expression=~(Q(cap=0) | Q(count=0) | Q(calendar=EventCalendar.HIDDEN)),
output_field=models.BooleanField(),
db_persist=False,
)
# TODO:
# "lgo": {
# "l": "https://d1tif55lvfk8gc.cloudfront.net/656e3842ae3975908b05e304.jpg?1673405126",
# "s": "https://d1tif55lvfk8gc.cloudfront.net/656e3842ae3975908b05e304s.jpg?1673405126"
# },
_api_names_override = {
"title": "ttl",
"category_id": "grp",
"start": "sdp",
"end": "edp",
"count": "cnt",
"calendar": "cal",
"venue": "adn",
}
_date_fields = {
"sdp": None,
"edp": None,
}
_allowed_missing_fields = ["cap", "edp", "adn"]
@property
def unescaped_title(self):
return nh3.clean(self.title, tags=set())
def __str__(self):
return self.unescaped_title
class EventInstructor(models.Model):
name = models.TextField(blank=True)
member = models.OneToOneField(
Member, on_delete=models.PROTECT, null=True, blank=True, db_constraint=False
)
def __str__(self):
return str(self.member) if self.member else self.name
class EventExtQuerySet(models.QuerySet["EventExt"]):
def summarize(self, aggregate: bool = False):
method = self.aggregate if aggregate else self.annotate
return method(
count__sum=Sum("count", filter=F("occurred")),
instructor__count=Count("instructor", distinct=True, filter=F("occurred")),
meetings__sum=Sum("meetings", filter=F("occurred")),
duration__sum=Sum("duration", filter=F("occurred")),
person_hours__sum=Sum("person_hours", filter=F("occurred")),
event_count=Count("eid", filter=F("occurred")),
canceled_event_count=Count("eid", filter=~F("occurred")),
gross_revenue__sum=Sum("gross_revenue", filter=F("occurred")),
total_due_to_instructor__sum=Sum(
"total_due_to_instructor", filter=F("occurred")
),
net_revenue__sum=Sum("net_revenue", filter=F("occurred")),
)
def with_financials(self):
return self.annotate(
**{
field: Subquery(
EventTicketType.objects.filter(event=OuterRef("pk"))
.values("event__pk")
.annotate(d=Sum(field))
.values("d"),
output_field=models.DecimalField(),
)
for field in [
"quantity",
"amount",
"materials",
"amount_without_materials",
"instructor_revenue",
"instructor_amount",
]
},
total_due_to_instructor=(
F("instructor_amount") + F("instructor_flat_rate")
),
gross_revenue=Coalesce(F("attendee_stats__gross_revenue"), 0.0),
net_revenue=ExpressionWrapper(
F("gross_revenue") - F("total_due_to_instructor"),
output_field=models.DecimalField(),
),
)
class EventExtManager(models.Manager["EventExt"]):
def get_queryset(self) -> models.QuerySet["EventExt"]:
return (
super()
.get_queryset()
.annotate(
meetings=Subquery(
EventMeetingTime.objects.filter(event=OuterRef("pk"))
.values("event__pk")
.annotate(d=Count("pk"))
.values("d")[:1],
output_field=models.IntegerField(),
),
duration=Subquery(
EventMeetingTime.objects.filter(event=OuterRef("pk"))
.values("event__pk")
.annotate(d=Sum("duration"))
.values("d")[:1],
output_field=models.DurationField(),
),
person_hours=ExpressionWrapper(
ExpressionWrapper(F("duration"), models.IntegerField())
* F("count"),
models.DurationField(),
),
# TODO: this could be a GeneratedField, but that
# currently breaks saving when the primary key is
# provided (Django 5.0.1)
details_timestamp=Func(
Func(F("details___ts"), function="FROM_UNIXTIME"),
template="CONVERT_TZ(%(expressions)s, @@session.time_zone, 'UTC')",
output_field=models.DateTimeField(),
),
)
)
class EventExt(Event):
"""Extension of `Event` to capture some fields not supported in MembershipWorks"""
objects = EventExtManager.from_queryset(EventExtQuerySet)()
instructor = models.ForeignKey(
EventInstructor, on_delete=models.PROTECT, null=True, blank=True
)
materials_fee = models.DecimalField(
max_digits=13, decimal_places=4, null=True, blank=True
)
materials_fee_included_in_price = models.BooleanField(null=True)
instructor_percentage = models.DecimalField(
max_digits=5, decimal_places=4, default=0.5
)
instructor_flat_rate = models.DecimalField(
max_digits=13, decimal_places=4, default=0
)
details = models.JSONField(null=True, blank=True)
def get_absolute_url(self) -> str:
return reverse("membershipworks:event-detail", kwargs={"eid": self.eid})
def user_is_instructor(self, user: AbstractBaseUser) -> bool:
if self.instructor is None:
return False
member = Member.from_user(user)
if member is not None:
return self.instructor.member == member
return False
@property
def ready_for_invoice(self) -> bool:
return (
self.instructor is not None
and self.instructor.member is not None
and self.materials_fee is not None
and (
self.materials_fee_included_in_price is not None
or self.materials_fee == 0
)
and self.total_due_to_instructor is not None
)
class Meta:
verbose_name = "event"
ordering = ["-start"]
class EventMeetingTime(models.Model):
event = models.ForeignKey(
EventExt, on_delete=models.CASCADE, related_name="meeting_times"
)
start = models.DateTimeField()
end = models.DateTimeField()
duration = models.GeneratedField(
expression=F("end") - F("start"),
output_field=models.DurationField(),
db_persist=False,
)
class Meta:
constraints = [
models.UniqueConstraint(
fields=["event", "start", "end"], name="unique_event_start_end"
),
models.CheckConstraint(check=Q(end__gt=F("start")), name="end_after_start"),
]
class EventInvoice(models.Model):
uuid = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
event = models.OneToOneField(
EventExt, on_delete=models.PROTECT, related_name="invoice"
)
date_submitted = models.DateField()
date_paid = models.DateField(blank=True, null=True)
pdf = models.FileField(upload_to="protected/invoices/%Y/%m/%d/")
amount = models.DecimalField(max_digits=13, decimal_places=4)
def __str__(self) -> str:
return f'"{self.event}" submitted={self.date_submitted} paid:{self.date_paid} ${self.amount}'
class EventTicketTypeQuerySet(models.QuerySet["EventTicketType"]):
def group_by_ticket_type(self):
return self.values("is_members_ticket").annotate(
label=Case(
When(Q(is_members_ticket=True), Value("Members")),
default=Value("Non-Members"),
),
actual_price=F("actual_price"),
**{
field: Sum(field)
for field in [
"quantity",
"materials",
"amount",
"amount_without_materials",
"instructor_revenue",
"instructor_amount",
]
},
)
class EventTicketTypeManager(models.Manager["EventTicketType"]):
def get_queryset(self) -> models.QuerySet["EventTicketType"]:
members_folder = Subquery(
Flag.objects.filter(name="Members", type="folder").values("id")[:1]
)
qs = super().get_queryset()
return qs.annotate(
actual_price=Case(
When(
Q(restrict_to=members_folder) | Q(restrict_to__isnull=True),
"list_price",
),
# Use Members ticket price for any restricted ticket
# which is not the Members ticket
default=Subquery(
qs.filter(
event=OuterRef("event"), restrict_to=members_folder
).values("list_price"),
output_field=models.FloatField(),
),
),
is_members_ticket=(Q(restrict_to__isnull=False)),
materials=Case(
When(
(
Q(event__materials_fee_included_in_price=True)
| Q(event__materials_fee=0)
& Q(event__materials_fee__isnull=False)
),
ExpressionWrapper(
F("event__materials_fee") * F("quantity"),
output_field=models.DecimalField(),
),
),
When(Q(event__materials_fee_included_in_price__isnull=True), None),
default=0,
output_field=models.DecimalField(),
),
amount=ExpressionWrapper(
F("actual_price") * F("quantity"),
output_field=models.DecimalField(),
),
amount_without_materials=ExpressionWrapper(
F("amount") - F("materials"), output_field=models.DecimalField()
),
instructor_revenue=ExpressionWrapper(
F("amount_without_materials") * F("event__instructor_percentage"),
output_field=models.DecimalField(),
),
instructor_amount=ExpressionWrapper(
F("instructor_revenue") + F("materials"),
output_field=models.DecimalField(),
),
)
class EventTicketType(DBView):
objects = EventTicketTypeManager.from_queryset(EventTicketTypeQuerySet)()
event = models.ForeignKey(
EventExt, on_delete=models.CASCADE, related_name="ticket_types"
)
label = models.TextField()
restrict_to = models.TextField(null=True, blank=True)
list_price = models.FloatField()
quantity = models.IntegerField()
# Due to the presence of JSON_TABLE, this view must (as of MariaDB
# 11.2.2) be created as the root user. See
# https://jira.mariadb.org/browse/MDEV-27898
# nested path/group_concat to workaround inability to create JSON columns using
# JSON_TABLE in views
view_definition = """
SELECT
row_number() over () as id,
eventext.event_ptr_id AS event_id,
tkt.label,
tkt.list_price,
tkt.quantity,
GROUP_CONCAT(tkt.restrict_to SEPARATOR ',') as restrict_to
FROM
membershipworks_eventext AS eventext,
JSON_TABLE (eventext.details, '$.tkt[*]' COLUMNS (
id FOR ORDINALITY,
label VARCHAR(256) PATH '$.lbl' ERROR ON ERROR,
list_price DOUBLE PATH '$.amt' ERROR ON ERROR,
quantity INTEGER PATH '$.cnt' DEFAULT 0 ON EMPTY ERROR ON ERROR,
NESTED PATH '$.dsp[*]' COLUMNS (
restrict_to VARCHAR(100) PATH '$' ERROR ON ERROR
)
)) AS tkt
GROUP BY event_id, id
"""
def __str__(self) -> str:
return f"{self.label}: {self.quantity} * {self.list_price}"
class Meta:
managed = False
base_manager_name = "objects"
class EventAttendeeStats(DBView):
event = models.ForeignKey(
EventExt, on_delete=models.CASCADE, related_name="attendee_stats"
)
gross_revenue = models.FloatField()
view_definition = """
SELECT eventext.event_ptr_id as event_id, SUM(tkt.s) as gross_revenue
FROM
membershipworks_eventext as eventext,
JSON_TABLE(eventext.details, '$.usr[*]' COLUMNS (
s DOUBLE PATH '$.sum' DEFAULT 0 ON EMPTY
)) as tkt
GROUP BY event_id
"""
class Meta:
managed = False
class EventAttendee(DBView):
event = models.ForeignKey(
EventExt, on_delete=models.CASCADE, related_name="attendees"
)
uid = models.ForeignKey(Member, on_delete=models.DO_NOTHING)
name = models.CharField(max_length=256)
email = models.CharField(max_length=256)
sum = models.FloatField()
view_definition = """
SELECT eventext.event_ptr_id as event_id, tkt.uid, tkt.name, tkt.email, tkt.sum
FROM
membershipworks_eventext as eventext,
JSON_TABLE(eventext.details, '$.usr[*]' COLUMNS (
uid VARCHAR(24) PATH '$.uid',
name VARCHAR(256) PATH '$.nam',
email VARCHAR(256) PATH '$.eml',
sum DOUBLE PATH '$.sum'
)) as tkt
"""
class Meta:
managed = False