cmsmanage/membershipworks/models.py
Adam Goldsmith 43f992e2c3 paperwork: Remove suffixes from names in shopleads mailing list
Mailman3 doesn't have display names per list anymore, so this is
somewhat confusing
2024-02-07 13:40:04 -05:00

717 lines
26 KiB
Python

from datetime import datetime
from typing import Optional
import django.core.mail.message
from django.conf import settings
from django.db import models
from django.db.models import (
Case,
Count,
Exists,
ExpressionWrapper,
F,
Func,
OuterRef,
Q,
Subquery,
Sum,
When,
)
from django.db.models.functions import Coalesce
from django.utils import timezone
from django_db_views.db_view import DBView
class BaseModel(models.Model):
_api_names_override = {}
_date_fields = {}
_allowed_missing_fields = []
class Meta:
abstract = True
@classmethod
def _remap_headers(cls, data):
for field in cls._meta.get_fields():
# TODO: more robust filtering of fields that don't have a column
if (
field.auto_created
or field.many_to_many
or not field.concrete
or field.generated
):
continue
field_name, api_name = field.get_attname_column()
if field_name in cls._api_names_override:
api_name = cls._api_names_override[field_name]
yield field_name, data[api_name]
@classmethod
def from_api_dict(cls, data):
data = data.copy()
for field in cls._allowed_missing_fields:
if field not in data:
data[field] = None
# parse date fields to datetime objects
for field, fmt in cls._date_fields.items():
if data[field]:
if fmt is None:
# can't use '%s' format string, have to use the special function
data[field] = datetime.fromtimestamp(
data[field], tz=timezone.get_current_timezone()
)
else:
data[field] = datetime.strptime(str(data[field]), fmt)
else:
# convert empty string to None to make NULL in SQL
data[field] = None
return cls(**dict(cls._remap_headers(data)))
class Flag(BaseModel):
id = models.CharField(max_length=24, primary_key=True)
name = models.TextField(null=True, blank=True)
type = models.CharField(max_length=6)
def __str__(self):
return f"{self.name} ({self.type})"
class Meta:
db_table = "flag"
ordering = ("name",)
class MemberQuerySet(models.QuerySet):
# TODO: maybe rename to reflect EXISTS?
@staticmethod
def has_flag(flag_type: str, flag_name: str):
return Exists(
Flag.objects.filter(type=flag_type, name=flag_name, members=OuterRef("pk"))
)
# TODO: it should be fairly easy to reduce the number of EXISTS by
# merging the ORed flags
def with_is_active(self):
return self.annotate(
is_active=(
self.has_flag("folder", "Members")
| self.has_flag("folder", "CMS Staff")
)
& ~(
self.has_flag("label", "Account On Hold")
| self.has_flag("level", "CMS Membership on hold")
| self.has_flag("folder", "Former Members")
)
)
# TODO: is this still a temporal table?
class Member(BaseModel):
objects = MemberQuerySet.as_manager()
uid = models.CharField(max_length=24, primary_key=True)
year_of_birth = models.TextField(db_column="Year of Birth", null=True, blank=True)
account_name = models.TextField(db_column="Account Name", null=True, blank=True)
first_name = models.TextField(db_column="First Name", null=True, blank=True)
last_name = models.TextField(db_column="Last Name", null=True, blank=True)
phone = models.TextField(db_column="Phone", null=True, blank=True)
email = models.TextField(db_column="Email", null=True, blank=True)
volunteer_email = models.TextField(
db_column="Volunteer Email", null=True, blank=True
)
address_street = models.TextField(
db_column="Address (Street)", null=True, blank=True
)
address_city = models.TextField(db_column="Address (City)", null=True, blank=True)
address_state_province = models.TextField(
db_column="Address (State/Province)", null=True, blank=True
)
address_postal_code = models.TextField(
db_column="Address (Postal Code)", null=True, blank=True
)
address_country = models.TextField(
db_column="Address (Country)", null=True, blank=True
)
profile_description = models.TextField(
db_column="Profile description", null=True, blank=True
)
website = models.TextField(db_column="Website", null=True, blank=True)
fax = models.TextField(db_column="Fax", null=True, blank=True)
contact_person = models.TextField(db_column="Contact Person", null=True, blank=True)
password = models.TextField(db_column="Password", null=True, blank=True)
position_relation = models.TextField(
db_column="Position/relation", null=True, blank=True
)
parent_account_id = models.TextField(
db_column="Parent Account ID", null=True, blank=True
)
closet_storage = models.TextField(
db_column="Closet Storage #", null=True, blank=True
)
storage_shelf = models.TextField(db_column="Storage Shelf #", null=True, blank=True)
personal_studio_space = models.TextField(
db_column="Personal Studio Space #", null=True, blank=True
)
access_permitted_shops_during_extended_hours = models.BooleanField(
db_column="Access Permitted Shops During Extended Hours?"
)
access_front_door_and_studio_space_during_extended_hours = models.BooleanField(
db_column="Access Front Door and Studio Space During Extended Hours?"
)
access_wood_shop = models.BooleanField(db_column="Access Wood Shop?")
access_metal_shop = models.BooleanField(db_column="Access Metal Shop?")
access_storage_closet = models.BooleanField(db_column="Access Storage Closet?")
access_studio_space = models.BooleanField(db_column="Access Studio Space?")
access_front_door = models.BooleanField(db_column="Access Front Door?")
access_card_number = models.TextField(
db_column="Access Card Number", null=True, blank=True
)
access_card_facility_code = models.TextField(
db_column="Access Card Facility Code", null=True, blank=True
)
auto_billing_id = models.TextField(
db_column="Auto Billing ID", null=True, blank=True
)
billing_method = models.TextField(db_column="Billing Method", null=True, blank=True)
renewal_date = models.DateField(db_column="Renewal Date", null=True, blank=True)
join_date = models.DateField(db_column="Join Date", null=True, blank=True)
admin_note = models.TextField(db_column="Admin note", null=True, blank=True)
profile_gallery_image_url = models.TextField(
db_column="Profile gallery image URL", null=True, blank=True
)
business_card_image_url = models.TextField(
db_column="Business card image URL", null=True, blank=True
)
instagram = models.TextField(db_column="Instagram", null=True, blank=True)
pinterest = models.TextField(db_column="Pinterest", null=True, blank=True)
youtube = models.TextField(db_column="Youtube", null=True, blank=True)
yelp = models.TextField(db_column="Yelp", null=True, blank=True)
google = models.TextField(db_column="Google+", null=True, blank=True)
bbb = models.TextField(db_column="BBB", null=True, blank=True)
twitter = models.TextField(db_column="Twitter", null=True, blank=True)
facebook = models.TextField(db_column="Facebook", null=True, blank=True)
linked_in = models.TextField(db_column="LinkedIn", null=True, blank=True)
do_not_show_street_address_in_profile = models.TextField(
db_column="Do not show street address in profile", null=True, blank=True
)
do_not_list_in_directory = models.TextField(
db_column="Do not list in directory", null=True, blank=True
)
how_did_you_hear = models.TextField(
db_column="HowDidYouHear", null=True, blank=True
)
authorize_charge = models.TextField(
db_column="authorizeCharge", null=True, blank=True
)
policy_agreement = models.TextField(
db_column="policyAgreement", null=True, blank=True
)
waiver_form_signed_and_on_file_date = models.DateField(
db_column="Waiver form signed and on file date.", null=True, blank=True
)
membership_agreement_signed_and_on_file_date = models.DateField(
db_column="Membership Agreement signed and on file date.", null=True, blank=True
)
ip_address = models.TextField(db_column="IP Address", null=True, blank=True)
audit_date = models.DateField(db_column="Audit Date", null=True, blank=True)
agreement_version = models.TextField(
db_column="Agreement Version", null=True, blank=True
)
paperwork_status = models.TextField(
db_column="Paperwork status", null=True, blank=True
)
membership_agreement_dated = models.BooleanField(
db_column="Membership agreement dated"
)
membership_agreement_acknowledgement_page_filled_out = models.BooleanField(
db_column="Membership Agreement Acknowledgement Page Filled Out"
)
membership_agreement_signed = models.BooleanField(
db_column="Membership Agreement Signed"
)
liability_form_filled_out = models.BooleanField(
db_column="Liability Form Filled Out"
)
flags = models.ManyToManyField(Flag, through="MemberFlag", related_name="members")
_api_names_override = {
"uid": "Account ID",
"how_did_you_hear": "Please tell us how you heard about the Claremont MakerSpace and what tools or shops you are most excited to start using:",
"authorize_charge": "Yes - I authorize TwinState MakerSpaces, Inc. to charge my credit card for the membership and other options that I have selected.",
"policy_agreement": "I have read the Claremont MakerSpace Membership Agreement & Policies, and agree to all terms stated therein.",
}
_date_fields = {
"Join Date": "%b %d, %Y",
"Renewal Date": "%b %d, %Y",
"Audit Date": "%m/%d/%Y",
"Membership Agreement signed and on file date.": "%m/%d/%Y",
"Waiver form signed and on file date.": "%m/%d/%Y",
}
def __str__(self):
return f"{self.account_name}"
class Meta:
db_table = "members"
ordering = ("first_name", "last_name")
indexes = [
models.Index(fields=["account_name"], name="account_name_idx"),
models.Index(fields=["first_name"], name="first_name_idx"),
models.Index(fields=["last_name"], name="last_name_idx"),
]
@classmethod
def from_user(cls, user) -> Optional["Member"]:
if hasattr(user, "ldap_user"):
return cls.objects.get(uid=user.ldap_user.attrs["employeeNumber"][0])
def sanitized_mailbox(self, use_volunteer=False) -> str:
if use_volunteer and self.volunteer_email:
email = self.volunteer_email
elif self.email:
email = self.email
else:
raise Exception(f"No Email Address for user: {self.uid}")
if not self.account_name:
return email
return django.core.mail.message.sanitize_address(
(self.account_name, email), settings.DEFAULT_CHARSET
)
class MemberFlag(BaseModel):
member = models.ForeignKey(
Member, on_delete=models.PROTECT, db_column="uid", db_constraint=False
)
flag = models.ForeignKey(Flag, on_delete=models.PROTECT)
def __str__(self):
return f"{self.member} - {self.flag}"
class Meta:
db_table = "memberflag"
constraints = [
models.UniqueConstraint(
fields=["member", "flag"], name="unique_member_flag"
)
]
class Transaction(BaseModel):
sid = models.CharField(max_length=256, null=True, blank=True)
member = models.ForeignKey(
Member,
on_delete=models.PROTECT,
db_column="uid",
db_constraint=False,
related_name="transactions",
null=True,
blank=True,
)
timestamp = models.DateTimeField()
type = models.TextField(null=True, blank=True)
sum = models.DecimalField(max_digits=13, decimal_places=4, null=True, blank=True)
fee = models.DecimalField(max_digits=13, decimal_places=4, null=True, blank=True)
event_id = models.TextField(null=True, blank=True)
for_what = models.TextField(db_column="For", null=True, blank=True)
items = models.TextField(db_column="Items", null=True, blank=True)
discount_code = models.TextField(db_column="Discount Code", null=True, blank=True)
note = models.TextField(db_column="Note", null=True, blank=True)
name = models.TextField(db_column="Name", null=True, blank=True)
contact_person = models.TextField(db_column="Contact Person", null=True, blank=True)
full_address = models.TextField(db_column="Full Address", null=True, blank=True)
street = models.TextField(db_column="Street", null=True, blank=True)
city = models.TextField(db_column="City", null=True, blank=True)
state_province = models.TextField(db_column="State/Province", null=True, blank=True)
postal_code = models.TextField(db_column="Postal Code", null=True, blank=True)
country = models.TextField(db_column="Country", null=True, blank=True)
phone = models.TextField(db_column="Phone", null=True, blank=True)
email = models.TextField(db_column="Email", null=True, blank=True)
_allowed_missing_fields = [
"sid",
"uid",
"eid",
"fee",
"sum",
]
_api_names_override = {
"event_id": "eid",
"timestamp": "_dp",
"type": "Transaction Type",
"for_what": "Reference",
}
_date_fields = {
"_dp": None,
}
def __str__(self):
return f"{self.type} [{self.member if self.member else self.name}] {self.timestamp}"
class Meta:
db_table = "transactions"
class EventCategory(models.Model):
id = models.IntegerField(primary_key=True)
title = models.TextField()
@classmethod
def from_api_dict(cls, id: int, data):
return cls(id=id, title=data["ttl"])
def __str__(self):
return self.title
class Event(BaseModel):
class EventCalendar(models.IntegerChoices):
HIDDEN = 0
GREEN = 1
RED = 2
YELLOW = 3
BLUE = 4
PURPLE = 5
MAGENTA = 6
GREY = 7
TEAL = 8
eid = models.CharField(max_length=255, primary_key=True)
url = models.TextField()
title = models.TextField()
start = models.DateTimeField()
end = models.DateTimeField(null=True, blank=True)
cap = models.IntegerField(null=True, blank=True)
count = models.IntegerField()
category = models.ForeignKey(EventCategory, on_delete=models.PROTECT)
calendar = models.IntegerField(choices=EventCalendar)
venue = models.TextField(null=True, blank=True)
occurred = models.GeneratedField(
expression=~(Q(cap=0) | Q(count=0) | Q(calendar=EventCalendar.HIDDEN)),
output_field=models.BooleanField(),
db_persist=False,
)
# TODO:
# "lgo": {
# "l": "https://d1tif55lvfk8gc.cloudfront.net/656e3842ae3975908b05e304.jpg?1673405126",
# "s": "https://d1tif55lvfk8gc.cloudfront.net/656e3842ae3975908b05e304s.jpg?1673405126"
# },
_api_names_override = {
"title": "ttl",
"category_id": "grp",
"start": "sdp",
"end": "edp",
"count": "cnt",
"calendar": "cal",
"venue": "adn",
}
_date_fields = {
"sdp": None,
"edp": None,
}
_allowed_missing_fields = ["cap", "edp", "adn"]
def __str__(self):
return self.title
class EventInstructor(models.Model):
name = models.TextField(blank=True)
member = models.OneToOneField(
Member, on_delete=models.PROTECT, null=True, blank=True, db_constraint=False
)
def __str__(self):
return str(self.member) if self.member else self.name
class EventExtQuerySet(models.QuerySet["EventExt"]):
def summarize(self, aggregate: bool = False):
method = self.aggregate if aggregate else self.annotate
return method(
count__sum=Sum("count", filter=F("occurred")),
instructor__count=Count("instructor", distinct=True, filter=F("occurred")),
meetings__sum=Sum("meetings", filter=F("occurred")),
duration__sum=Sum("duration", filter=F("occurred")),
person_hours__sum=Sum("person_hours", filter=F("occurred")),
event_count=Count("eid", filter=F("occurred")),
canceled_event_count=Count("eid", filter=~F("occurred")),
gross_revenue__sum=Sum("gross_revenue", filter=F("occurred")),
total_due_to_instructor__sum=Sum(
"total_due_to_instructor", filter=F("occurred")
),
net_revenue__sum=Sum("net_revenue", filter=F("occurred")),
)
def with_financials(self):
return self.annotate(
**{
field: Subquery(
EventTicketType.objects.filter(event=OuterRef("pk"))
.values("event__pk")
.annotate(d=Sum(field))
.values("d"),
output_field=models.DecimalField(),
)
for field in [
"quantity",
"amount",
"materials",
"amount_without_materials",
"instructor_fee",
"instructor_amount",
]
},
total_due_to_instructor=(
F("instructor_amount") + F("instructor_flat_rate")
),
gross_revenue=Coalesce(F("attendee_stats__gross_revenue"), 0.0),
net_revenue=ExpressionWrapper(
F("gross_revenue") - F("total_due_to_instructor"),
output_field=models.DecimalField(),
),
)
class EventExtManager(models.Manager["EventExt"]):
def get_queryset(self) -> models.QuerySet["EventExt"]:
return (
super()
.get_queryset()
.annotate(
meetings=Subquery(
EventMeetingTime.objects.filter(event=OuterRef("pk"))
.values("event__pk")
.annotate(d=Count("pk"))
.values("d")[:1],
output_field=models.IntegerField(),
),
duration=Subquery(
EventMeetingTime.objects.filter(event=OuterRef("pk"))
.values("event__pk")
.annotate(d=Sum("duration"))
.values("d")[:1],
output_field=models.DurationField(),
),
person_hours=ExpressionWrapper(
ExpressionWrapper(F("duration"), models.IntegerField())
* F("count"),
models.DurationField(),
),
# TODO: this could be a GeneratedField, but that
# currently breaks saving when the primary key is
# provided (Django 5.0.1)
details_timestamp=Func(
Func(F("details___ts"), function="FROM_UNIXTIME"),
template="CONVERT_TZ(%(expressions)s, @@session.time_zone, 'UTC')",
output_field=models.DateTimeField(),
),
)
)
class EventExt(Event):
"""Extension of `Event` to capture some fields not supported in MembershipWorks"""
objects = EventExtManager.from_queryset(EventExtQuerySet)()
instructor = models.ForeignKey(
EventInstructor, on_delete=models.PROTECT, null=True, blank=True
)
materials_fee = models.DecimalField(
max_digits=13, decimal_places=4, null=True, blank=True
)
materials_fee_included_in_price = models.BooleanField(null=True)
instructor_percentage = models.DecimalField(
max_digits=5, decimal_places=4, default=0.5
)
instructor_flat_rate = models.DecimalField(
max_digits=13, decimal_places=4, default=0
)
details = models.JSONField(null=True, blank=True)
class Meta:
verbose_name = "event"
ordering = ["-start"]
class EventMeetingTime(models.Model):
event = models.ForeignKey(
EventExt, on_delete=models.CASCADE, related_name="meeting_times"
)
start = models.DateTimeField()
end = models.DateTimeField()
duration = models.GeneratedField(
expression=F("end") - F("start"),
output_field=models.DurationField(),
db_persist=False,
)
class Meta:
constraints = [
models.UniqueConstraint(
fields=["event", "start", "end"], name="unique_event_start_end"
)
]
class EventTicketTypeManager(models.Manager["EventTicketType"]):
def get_queryset(self) -> models.QuerySet["EventTicketType"]:
members_folder = Subquery(
Flag.objects.filter(name="Members", type="folder").values("id")[:1]
)
qs = super().get_queryset()
return qs.annotate(
actual_price=Case(
When(
Q(restrict_to=members_folder) | Q(restrict_to__isnull=True),
"list_price",
),
# Use Members ticket price for any restricted ticket
# which is not the Members ticket
default=Subquery(
qs.filter(
event=OuterRef("event"), restrict_to=members_folder
).values("list_price"),
output_field=models.FloatField(),
),
),
is_members_ticket=(Q(restrict_to__isnull=False)),
materials=Case(
When(
(
Q(event__materials_fee_included_in_price=True)
| Q(event__materials_fee=0)
& Q(event__materials_fee__isnull=False)
),
ExpressionWrapper(
F("event__materials_fee") * F("quantity"),
output_field=models.DecimalField(),
),
),
When(Q(event__materials_fee_included_in_price__isnull=True), None),
default=0,
output_field=models.DecimalField(),
),
amount=ExpressionWrapper(
F("actual_price") * F("quantity"),
output_field=models.DecimalField(),
),
amount_without_materials=ExpressionWrapper(
F("amount") - F("materials"), output_field=models.DecimalField()
),
instructor_fee=ExpressionWrapper(
F("amount_without_materials") * F("event__instructor_percentage"),
output_field=models.DecimalField(),
),
instructor_amount=ExpressionWrapper(
F("instructor_fee") + F("materials"), output_field=models.DecimalField()
),
)
class EventTicketType(DBView):
objects = EventTicketTypeManager()
event = models.ForeignKey(
EventExt, on_delete=models.CASCADE, related_name="ticket_types"
)
label = models.TextField()
restrict_to = models.TextField(null=True, blank=True)
list_price = models.FloatField()
quantity = models.IntegerField()
# Due to the presence of JSON_TABLE, this view must (as of MariaDB
# 11.2.2) be created as the root user. See
# https://jira.mariadb.org/browse/MDEV-27898
# nested path/group_concat to workaround inability to create JSON columns using
# JSON_TABLE in views
view_definition = """
SELECT
row_number() over () as id,
eventext.event_ptr_id AS event_id,
tkt.label,
tkt.list_price,
tkt.quantity,
GROUP_CONCAT(tkt.restrict_to SEPARATOR ',') as restrict_to
FROM
membershipworks_eventext AS eventext,
JSON_TABLE (eventext.details, '$.tkt[*]' COLUMNS (
id FOR ORDINALITY,
label VARCHAR(256) PATH '$.lbl' ERROR ON ERROR,
list_price DOUBLE PATH '$.amt' ERROR ON ERROR,
quantity INTEGER PATH '$.cnt' DEFAULT 0 ON EMPTY ERROR ON ERROR,
NESTED PATH '$.dsp[*]' COLUMNS (
restrict_to VARCHAR(100) PATH '$' ERROR ON ERROR
)
)) AS tkt
GROUP BY event_id, id
"""
def __str__(self) -> str:
return f"{self.label}: {self.quantity} * {self.list_price}"
class Meta:
managed = False
base_manager_name = "objects"
class EventAttendeeStats(DBView):
event = models.ForeignKey(
EventExt, on_delete=models.CASCADE, related_name="attendee_stats"
)
gross_revenue = models.FloatField()
view_definition = """
SELECT eventext.event_ptr_id as event_id, SUM(tkt.s) as gross_revenue
FROM
membershipworks_eventext as eventext,
JSON_TABLE(eventext.details, '$.usr[*]' COLUMNS (
s DOUBLE PATH '$.sum' DEFAULT 0 ON EMPTY
)) as tkt
GROUP BY event_id
"""
class Meta:
managed = False
class EventAttendee(DBView):
event = models.ForeignKey(
EventExt, on_delete=models.CASCADE, related_name="attendees"
)
uid = models.ForeignKey(Member, on_delete=models.DO_NOTHING)
name = models.CharField(max_length=256)
email = models.CharField(max_length=256)
sum = models.FloatField()
view_definition = """
SELECT eventext.event_ptr_id as event_id, tkt.uid, tkt.name, tkt.email, tkt.sum
FROM
membershipworks_eventext as eventext,
JSON_TABLE(eventext.details, '$.usr[*]' COLUMNS (
uid VARCHAR(24) PATH '$.uid',
name VARCHAR(256) PATH '$.nam',
email VARCHAR(256) PATH '$.eml',
sum DOUBLE PATH '$.sum'
)) as tkt
"""
class Meta:
managed = False