cmsmanage/doorcontrol/tasks/update_unifi_access.py

186 lines
6.7 KiB
Python
Raw Permalink Normal View History

import logging
from collections.abc import Mapping
from django.conf import settings
from django.db.models import Q
from unifi_access import AccessClient
from unifi_access.schemas import AccessPolicy, DoorResource, UserStatus
from unifi_access.schemas import User as AccessUser
from cmsmanage.django_q2_helper import q_task_group
from doorcontrol.models import (
AbstractScheduleRule,
ActiveEventInstructorRule,
AttributeScheduleRule,
Door,
FlagScheduleRule,
Schedule,
)
from membershipworks.models import Member
logger = logging.getLogger(__name__)
def update_user_groups(
unifi_access: AccessClient,
access_users_by_employee_number: Mapping[str, AccessUser],
) -> None:
groups = unifi_access.fetch_all_user_groups()
top_group = next(g for g in groups if g.up_id == "")
rule_groups = {g.name: g for g in groups if g.up_id == top_group.id}
access_policies_by_name = {
p.name: p for p in unifi_access.fetch_all_access_policies()
}
schedules_by_name = {s.name: s for s in unifi_access.fetch_all_schedules()}
doors_by_name = {d.name: d for d in unifi_access.fetch_all_doors()}
def sync_access_policy(schedule: Schedule, door: Door) -> AccessPolicy:
access_policy_name = f"{rule.schedule.name} | {door.name}"
expected_access_policy = {
"name": access_policy_name,
"resources": (
[DoorResource(id=doors_by_name[door.name].id)]
if door.name in doors_by_name
else []
),
"schedule_id": schedules_by_name[schedule.name].id,
}
if access_policy := access_policies_by_name.get(access_policy_name):
changes = {
k: v
for k, v in expected_access_policy.items()
if getattr(access_policy, k) != v
}
if changes:
logger.debug(
" - updating access policy %s: %s", access_policy_name, changes
)
access_policy = unifi_access.update_access_policy(
access_policy.id, **changes
)
access_policies_by_name[access_policy_name] = access_policy
else:
logger.debug(
" - creating access policy %s: %s",
access_policy_name,
expected_access_policy,
)
access_policy = unifi_access.create_access_policy(**expected_access_policy)
access_policies_by_name[access_policy_name] = access_policy
return access_policy
def update_groups_for_rule(rule: AbstractScheduleRule) -> None:
assert rule.name
logger.info("Syncing user group '%s'", rule.name)
rule_group = rule_groups.get(rule.name)
if not rule_group:
logger.debug(" - creating top level group for rule")
rule_group = unifi_access.create_user_group(rule.name)
# might be None if it already existed, but we already checked for that earlier
assert rule_group
door_groups = {g.name: g for g in groups if g.up_id == rule_group.id}
for door in rule.doors.all():
door_rule_group = door_groups.get(door.name)
if not door_rule_group:
logger.debug(" - creating child group for door %s", door.name)
door_rule_group = unifi_access.create_user_group(
door.name, up_id=rule_group.id
)
# might be None if it already existed, but we already checked for that earlier
assert door_rule_group
access_policy = sync_access_policy(rule.schedule, door)
unifi_access.assign_access_policy_to_user_group(
door_rule_group.id, [access_policy.id]
)
expected_group_members = (
rule.get_matching_members()
.with_is_active()
.filter(
Q(**{door.access_field: True})
& (
Q(is_active=True)
| Member.objects.has_flag("folder", "Misc. Access")
)
)
)
# all members should exist in Access by this point
expected_group_member_ids = [
access_users_by_employee_number[member.uid].id
for member in expected_group_members
]
if expected_group_member_ids:
unifi_access.assign_user_to_user_group(
door_rule_group.id, expected_group_member_ids
)
for rule in FlagScheduleRule.objects.all():
update_groups_for_rule(rule)
for rule in AttributeScheduleRule.objects.all():
update_groups_for_rule(rule)
# TODO: this could probably be done better by creating temporary
# schedules for active events
for rule in ActiveEventInstructorRule.objects.all():
update_groups_for_rule(rule)
def sync_members(access_client: AccessClient):
access_users_by_employee_number = {
user.employee_number: user
for user in access_client.fetch_all_users__unpaged()
if user.employee_number
}
for member in Member.objects.with_is_active().all():
logger.info("Syncing member %s", member)
expected_user = {
"first_name": member.first_name,
"last_name": member.last_name,
# TODO: omitted to avoid spamming members for now
# "user_email": member.email,
"employee_number": member.uid,
}
if access_user := access_users_by_employee_number.get(member.uid):
expected_user["status"] = (
UserStatus.ACTIVE
if (
member.is_active
or member.flags.filter(name="Misc. Access", type="folder").exists()
)
else UserStatus.DEACTIVATED
)
changes = {
k: v for k, v in expected_user.items() if getattr(access_user, k) != v
}
if changes:
logger.debug(" - updating, changes: %s", changes)
access_client.update_user(
**changes,
user_id=access_user.id,
)
else:
logger.debug(" - creating user: %s", expected_user)
access_client.register_user(**expected_user)
update_user_groups(access_client, access_users_by_employee_number)
@q_task_group("Update UniFi Access Data")
def update_access():
access_client = AccessClient(
host=settings.UNIFI_ACCESS_HOST,
api_token=settings.UNIFI_ACCESS_API_TOKEN,
# TODO: fix SSL cert
verify=False,
)
sync_members(access_client)