cmsmanage/membershipworks/tasks/event_survey_emails.py
Adam Goldsmith eeb83388e1
All checks were successful
Ruff / ruff (push) Successful in 29s
Test / test (push) Successful in 3m55s
Directly use q_task_group name in django-q2 ensure_scheduled helper
2024-05-23 23:23:35 -04:00

75 lines
2.6 KiB
Python

import logging
from collections.abc import Iterable
from urllib.parse import quote, urlencode
from django.conf import settings
from django.core import mail
from django.db.models.functions import Now
from cmsmanage.django_q2_helper import q_task_group
from cmsmanage.email import TemplatedMultipartEmail
from membershipworks.models import EventExt
logger = logging.getLogger(__name__)
class EventSurveyEmail(TemplatedMultipartEmail):
# TODO: better wording
subject = (
"[Claremont MakerSpace] Please fill out a survey for your recent CMS class!"
)
from_email = "CMS Classes <classes@claremontmakerspace.org>"
template = "membershipworks/email/event_survey.dj.html"
@classmethod
def render_for_event(cls, event: EventExt) -> Iterable[mail.EmailMessage]:
# not using event.attendees because that makes getting ticket count harder
for attendee in event.details["usr"]:
# skip users with no tickets, as they were Voided
if sum(attendee["tkt"]) == 0:
continue
name = attendee["nam"]
email = attendee["eml"]
sanitized_email = mail.message.sanitize_address(
(name, email), settings.DEFAULT_CHARSET
)
survey_url = (
"https://claremontmakerspace.org/class-evaluation-form?"
+ urlencode(
{
"event_id": event.eid,
"instructor_name": (
str(event.instructor) if event.instructor else ""
),
"event_name": event.title,
"event_date": event.start.strftime("%Y-%m-%d %H:%M:%S"),
"participant_name": name,
"participant_email": email,
},
quote_via=quote,
)
)
yield cls.render(
{"event": event, "attendee_name": name, "survey_url": survey_url},
to=[sanitized_email],
)
@q_task_group("Send Event Survey Emails")
def send_survey_emails():
with mail.get_connection() as conn:
for event in EventExt.objects.filter(
occurred=True, should_survey=True, survey_email_sent=False, end__lt=Now()
):
logger.info("Sending survey messages for event: %s", event)
# mark as sent even if we don't finish, to prevent sending duplicates
event.survey_email_sent = True
event.save()
conn.send_messages(list(EventSurveyEmail.render_for_event(event)))