Simplify return types from api methods, always raise exceptions

This commit is contained in:
Adam Goldsmith 2024-11-16 01:38:07 -05:00
parent 5b0c72dbef
commit 6cc8915bf1
12 changed files with 331 additions and 444 deletions

View File

@ -7,9 +7,9 @@ import requests
from pydantic import (
ConfigDict,
Field,
RootModel,
TypeAdapter,
)
from typing_extensions import TypeAliasType
from unifi_access.schemas import (
AccessPolicy,
@ -196,8 +196,7 @@ class ResponsePagination(ForbidExtraBaseModel):
total: int
# TODO: this has nicer syntax in Python 3.12, but not currently
# supported in Pydantic
# TODO: this has nicer syntax in Python 3.12, but not currently supported in Pydantic
ResponseDataType = TypeVar("ResponseDataType")
@ -214,12 +213,10 @@ class SuccessResponse(ForbidExtraBaseModel, Generic[ResponseDataType]):
return self
# TODO: this is natively supported by the `type` keyword in 3.12
Response = TypeAliasType(
"Response",
SuccessResponse[ResponseDataType] | ErrorResponse,
type_params=(ResponseDataType,),
)
class Response(RootModel[SuccessResponse[ResponseDataType] | ErrorResponse]):
@classmethod
def validate_and_unwrap(cls, r: requests.Response) -> ResponseDataType:
return cls.model_validate_json(r.content).root.success_or_raise().data
class RequestPagination(ForbidExtraBaseModel):
@ -258,7 +255,7 @@ class AccessClient:
user_email: str | None = None,
employee_number: str | None = None,
onboard_time: UnixTimestampDateTime | None = None,
) -> Response[User]:
) -> User:
"""3.2 User Registration"""
class RegisterUserRequest(ForbidExtraBaseModel):
@ -278,7 +275,7 @@ class AccessClient:
r = self._session.post(f"{self._base_url}/users", json=body)
# TODO: not the complete User
return TypeAdapter(Response[User]).validate_json(r.content)
return Response[User].validate_and_unwrap(r)
def update_user( # noqa: PLR0913
self,
@ -289,7 +286,7 @@ class AccessClient:
employee_number: str | None = None,
onboard_time: UnixTimestampDateTime | None = None,
status: UserStatus | None = None,
) -> Response[None]:
) -> None:
"""3.3 Update User"""
class UpdateUserRequest(ForbidExtraBaseModel):
@ -311,23 +308,23 @@ class AccessClient:
r = self._session.put(f"{self._base_url}/users/{user_id}", json=body)
# TODO: maybe not the complete User
return TypeAdapter(Response[None]).validate_json(r.content)
return Response[None].validate_and_unwrap(r)
def fetch_user(
self, user_id: UserId, expand_access_policies: bool = False
) -> Response[FullUser]:
) -> FullUser:
"""3.4 Fetch User"""
params = {"expand[]": "access_policy"} if expand_access_policies else {}
r = self._session.get(f"{self._base_url}/users/{user_id}", params=params)
return TypeAdapter(Response[FullUser]).validate_json(r.content)
return Response[FullUser].validate_and_unwrap(r)
def fetch_all_users(
self,
expand_access_policies: bool = False,
page_num: int | None = None,
page_size: int | None = None,
) -> Response[list[FullUser]]:
) -> SuccessResponse[list[FullUser]]:
"""3.5 Fetch All Users"""
class FetchAllUsersParams(RequestPagination):
@ -341,22 +338,24 @@ class AccessClient:
).model_dump(exclude_none=True, by_alias=True)
r = self._session.get(f"{self._base_url}/users", params=params)
return TypeAdapter(Response[list[FullUser]]).validate_json(r.content)
return (
Response[list[FullUser]].model_validate_json(r.content).root
).success_or_raise()
def assign_access_policy_to_user(
self, user_id: UserId, access_policy_ids: list[AccessPolicyId]
) -> Response[None]:
) -> None:
"""3.6 Assign Access Policy to User"""
body = {"access_policy_ids": access_policy_ids}
r = self._session.put(
f"{self._base_url}/users/{user_id}/access_policies", json=body
)
return TypeAdapter(Response[None]).validate_json(r.content)
return Response[None].validate_and_unwrap(r)
def assign_nfc_card_to_user(
self, user_id: UserId, token: NfcCardToken, force_add: bool | None = None
) -> Response[None]:
) -> None:
"""3.7 Assign NFC Card to User"""
body = AssignNfcCardRequest(token=token, force_add=force_add).model_dump(
@ -364,37 +363,35 @@ class AccessClient:
)
r = self._session.put(f"{self._base_url}/users/{user_id}/nfc_cards", json=body)
return TypeAdapter(Response[None]).validate_json(r.content)
return Response[None].validate_and_unwrap(r)
def unassign_nfc_card_from_user(
self, user_id: UserId, token: NfcCardToken
) -> Response[None]:
def unassign_nfc_card_from_user(self, user_id: UserId, token: NfcCardToken) -> None:
"""3.8 Unassign NFC Card from User"""
body = {"token": token}
r = self._session.put(
f"{self._base_url}/users/{user_id}/nfc_cards/delete", json=body
)
return TypeAdapter(Response[None]).validate_json(r.content)
return Response[None].validate_and_unwrap(r)
def assign_pin_code_to_user(self, user_id: UserId, pin_code: str) -> Response[None]:
def assign_pin_code_to_user(self, user_id: UserId, pin_code: str) -> None:
"""3.9 Assign PIN Code to User"""
body = {"pin_code": pin_code}
r = self._session.put(f"{self._base_url}/users/{user_id}/pin_codes", json=body)
return TypeAdapter(Response[None]).validate_json(r.content)
return Response[None].validate_and_unwrap(r)
def unassign_pin_code_from_user(self, user_id: UserId) -> Response[None]:
def unassign_pin_code_from_user(self, user_id: UserId) -> None:
"""3.10 Unassign PIN Code from User"""
r = self._session.delete(f"{self._base_url}/users/{user_id}/pin_codes")
return TypeAdapter(Response[None]).validate_json(r.content)
return Response[None].validate_and_unwrap(r)
def create_user_group(
self,
name: str,
up_id: UserGroupId | None = None,
) -> Response[UserGroup | None]:
) -> UserGroup | None:
"""3.11 Create User Group
!!! note "The docs do not describe this as returning any data"
Returns None if the user group already existed
@ -409,29 +406,29 @@ class AccessClient:
)
r = self._session.post(f"{self._base_url}/user_groups", json=body)
return TypeAdapter(Response[UserGroup | None]).validate_json(r.content)
return Response[UserGroup | None].validate_and_unwrap(r)
def fetch_all_user_groups(self) -> Response[list[UserGroup]]:
def fetch_all_user_groups(self) -> list[UserGroup]:
"""3.12 Fetch All User Groupes"""
r = self._session.get(f"{self._base_url}/user_groups")
return TypeAdapter(Response[list[UserGroup]]).validate_json(r.content)
return Response[list[UserGroup]].validate_and_unwrap(r)
def fetch_user_group(
self,
user_group_id: UserGroupId,
) -> Response[UserGroup]:
) -> UserGroup:
"""3.13 Fetch User Group"""
r = self._session.get(f"{self._base_url}/user_groups/{user_group_id}")
return TypeAdapter(Response[UserGroup]).validate_json(r.content)
return Response[UserGroup].validate_and_unwrap(r)
def update_user_group(
self,
user_group_id: UserGroupId,
name: str,
up_id: UserGroupId | None = None,
) -> Response[None]:
) -> None:
"""3.14 Update User Group"""
class UpdateUserGroupRequest(ForbidExtraBaseModel):
@ -445,72 +442,70 @@ class AccessClient:
r = self._session.put(
f"{self._base_url}/user_groups/{user_group_id}", json=body
)
return TypeAdapter(Response[None]).validate_json(r.content)
return Response[None].validate_and_unwrap(r)
def delete_user_group(
self,
user_group_id: UserGroupId,
) -> Response[None]:
) -> None:
"""3.15 Delete User Group"""
r = self._session.delete(f"{self._base_url}/user_groups/{user_group_id}")
return TypeAdapter(Response[None]).validate_json(r.content)
return Response[None].validate_and_unwrap(r)
def assign_user_to_user_group(
self,
user_group_id: UserGroupId,
users: list[UserId],
) -> Response[None]:
) -> None:
"""3.16 Assign User to User Group"""
r = self._session.post(
f"{self._base_url}/user_groups/{user_group_id}/users", json=users
)
return TypeAdapter(Response[None]).validate_json(r.content)
return Response[None].validate_and_unwrap(r)
def unassign_user_from_user_group(
self,
user_group_id: UserGroupId,
users: list[UserId],
) -> Response[None]:
) -> None:
"""3.17 Unassign User from User Group"""
r = self._session.post(
f"{self._base_url}/user_groups/{user_group_id}/users/delete", json=users
)
return TypeAdapter(Response[None]).validate_json(r.content)
return Response[None].validate_and_unwrap(r)
def fetch_users_in_a_user_group(
self,
user_group_id: UserGroupId,
) -> Response[list[User]]:
) -> list[User]:
"""3.18 Fetch Users in a User Group"""
r = self._session.get(f"{self._base_url}/user_groups/{user_group_id}/users")
return TypeAdapter(Response[list[User]]).validate_json(r.content)
return Response[list[User]].validate_and_unwrap(r)
def fetch_all_users_in_a_user_group(
self, user_group_id: UserGroupId
) -> Response[list[User]]:
def fetch_all_users_in_a_user_group(self, user_group_id: UserGroupId) -> list[User]:
"""3.19 Fetch All Users in a User Group"""
r = self._session.get(f"{self._base_url}/user_groups/{user_group_id}/users/all")
return TypeAdapter(Response[list[User]]).validate_json(r.content)
return Response[list[User]].validate_and_unwrap(r)
def fetch_the_access_policies_assigned_to_a_user(
self, user_id: UserId, only_user_policies: bool | None = None
) -> Response[list[AccessPolicy]]:
) -> list[AccessPolicy]:
"""3.20 Fetch the Access Policies Assigned to a User"""
params = {"only_user_policies": "true" if only_user_policies else "false"}
r = self._session.get(
f"{self._base_url}/users/{user_id}/access_policies", params=params
)
return TypeAdapter(Response[list[AccessPolicy]]).validate_json(r.content)
return Response[list[AccessPolicy]].validate_and_unwrap(r)
def assign_access_policy_to_user_group(
self, user_group_id: UserGroupId, access_policy_ids: list[AccessPolicyId]
) -> Response[None]:
) -> None:
"""3.21 Assign Access Policy to User Group"""
body = {"access_policy_ids": access_policy_ids}
@ -518,18 +513,18 @@ class AccessClient:
r = self._session.put(
f"{self._base_url}/user_groups/{user_group_id}/access_policies", json=body
)
return TypeAdapter(Response[None]).validate_json(r.content)
return Response[None].validate_and_unwrap(r)
def fetch_the_access_policies_assigned_to_a_user_group(
self,
user_group_id: UserGroupId,
) -> Response[list[AccessPolicy]]:
) -> list[AccessPolicy]:
"""3.22 Fetch the Access Policies Assigned to a User Group"""
r = self._session.get(
f"{self._base_url}/user_groups/{user_group_id}/access_policies"
)
return TypeAdapter(Response[list[AccessPolicy]]).validate_json(r.content)
return Response[list[AccessPolicy]].validate_and_unwrap(r)
def create_visitor( # noqa: PLR0913
self,
@ -544,7 +539,7 @@ class AccessClient:
visitor_company: str | None = None,
resources: Sequence[Resource] | None = None,
week_schedule: WeekSchedule | None = None,
) -> Response[Visitor]:
) -> Visitor:
"""4.2 Create Visitor"""
class CreateVisitorRequest(ForbidExtraBaseModel):
@ -587,16 +582,16 @@ class AccessClient:
).model_dump(exclude_none=True)
r = self._session.post(f"{self._base_url}/visitors", json=body)
return TypeAdapter(Response[Visitor]).validate_json(r.content)
return Response[Visitor].validate_and_unwrap(r)
def fetch_visitor(
self,
visitor_id: VisitorId,
) -> Response[Visitor]:
) -> Visitor:
"""4.3 Fetch Visitor"""
r = self._session.get(f"{self._base_url}/visitors/{visitor_id}")
return TypeAdapter(Response[Visitor]).validate_json(r.content)
return Response[Visitor].validate_and_unwrap(r)
def fetch_all_visitors(
self,
@ -605,7 +600,7 @@ class AccessClient:
expand: list[FetchAllVisitorsExpansion] | None = None,
page_num: int | None = None,
page_size: int | None = None,
) -> Response[list[Visitor]]:
) -> SuccessResponse[list[Visitor]]:
"""4.4 Fetch All Visitors"""
class FetchAllVisitorsRequest(RequestPagination):
@ -634,7 +629,9 @@ class AccessClient:
).model_dump(exclude_none=True, by_alias=True)
r = self._session.get(f"{self._base_url}/visitors", params=params)
return TypeAdapter(Response[list[Visitor]]).validate_json(r.content)
return (
Response[list[Visitor]].model_validate_json(r.content).root
).success_or_raise()
def update_visitor( # noqa: PLR0913
self,
@ -650,7 +647,7 @@ class AccessClient:
visitor_company: str | None = None,
resources: Sequence[Resource] | None = None,
week_schedule: WeekSchedule | None = None,
) -> Response[Visitor]:
) -> Visitor:
"""4.5 Update Visitor"""
class UpdateVisitorRequest(ForbidExtraBaseModel):
@ -693,22 +690,22 @@ class AccessClient:
).model_dump(exclude_none=True)
r = self._session.put(f"{self._base_url}/visitors/{visitor_id}", json=body)
return TypeAdapter(Response[Visitor]).validate_json(r.content)
return Response[Visitor].validate_and_unwrap(r)
def delete_visitor(
self, visitor_id: VisitorId, is_force: bool | None = None
) -> Response[None]:
) -> None:
"""4.6 Delete Visitor"""
params = {"is_force": "true" if is_force else "false"}
r = self._session.delete(
f"{self._base_url}/visitors/{visitor_id}", params=params
)
return TypeAdapter(Response[None]).validate_json(r.content)
return Response[None].validate_and_unwrap(r)
def assign_nfc_card_to_visitor(
self, visitor_id: VisitorId, token: NfcCardToken, force_add: bool | None = None
) -> Response[None]:
) -> None:
"""4.7 Assign NFC Card to Visitor"""
body = AssignNfcCardRequest(token=token, force_add=force_add).model_dump(
exclude_none=True
@ -717,42 +714,40 @@ class AccessClient:
r = self._session.put(
f"{self._base_url}/visitors/{visitor_id}/nfc_cards", json=body
)
return TypeAdapter(Response[None]).validate_json(r.content)
return Response[None].validate_and_unwrap(r)
def unassign_nfc_card_from_visitor(
self, visitor_id: VisitorId, token: NfcCardToken
) -> Response[None]:
) -> None:
"""4.8 Unassign NFC Card from Visitor"""
body = {"token": token}
r = self._session.put(
f"{self._base_url}/visitors/{visitor_id}/nfc_cards/delete", json=body
)
return TypeAdapter(Response[None]).validate_json(r.content)
return Response[None].validate_and_unwrap(r)
def assign_pin_code_to_visitor(
self, visitor_id: VisitorId, pin_code: str
) -> Response[None]:
def assign_pin_code_to_visitor(self, visitor_id: VisitorId, pin_code: str) -> None:
"""4.9 Assign PIN Code to Visitor"""
body = {"pin_code": pin_code}
r = self._session.put(
f"{self._base_url}/visitors/{visitor_id}/pin_codes", json=body
)
return TypeAdapter(Response[None]).validate_json(r.content)
return Response[None].validate_and_unwrap(r)
def unassign_pin_code_from_visitor(self, visitor_id: VisitorId) -> Response[None]:
def unassign_pin_code_from_visitor(self, visitor_id: VisitorId) -> None:
"""4.10 Unassign PIN Code from Visitor"""
r = self._session.delete(f"{self._base_url}/visitors/{visitor_id}/pin_codes")
return TypeAdapter(Response[None]).validate_json(r.content)
return Response[None].validate_and_unwrap(r)
def create_access_policy(
self,
name: str,
schedule_id: ScheduleId,
resources: Sequence[Resource] | None = None,
) -> Response[AccessPolicy]:
) -> AccessPolicy:
"""5.2 Create Access Policy"""
class CreateAccessPolicyRequest(ForbidExtraBaseModel):
@ -765,7 +760,7 @@ class AccessClient:
).model_dump(exclude_none=True)
r = self._session.post(f"{self._base_url}/access_policies", json=body)
return TypeAdapter(Response[AccessPolicy]).validate_json(r.content)
return Response[AccessPolicy].validate_and_unwrap(r)
def update_access_policy(
self,
@ -773,7 +768,7 @@ class AccessClient:
name: str | None = None,
resources: Sequence[Resource] | None = None,
schedule_id: ScheduleId | None = None,
) -> Response[AccessPolicy]:
) -> AccessPolicy:
"""5.3 Update Access Policy"""
class UpdateAccessPolicyRequest(ForbidExtraBaseModel):
@ -788,36 +783,34 @@ class AccessClient:
r = self._session.put(
f"{self._base_url}/access_policies/{access_policy_id}", json=body
)
return TypeAdapter(Response[AccessPolicy]).validate_json(r.content)
return Response[AccessPolicy].validate_and_unwrap(r)
def delete_access_policy(
self, access_policy_id: AccessPolicyId
) -> Response[Literal["success"]]:
) -> Literal["success"]:
"""5.4 Delete Access Policy"""
r = self._session.delete(f"{self._base_url}/access_policies/{access_policy_id}")
return TypeAdapter(Response[Literal["success"]]).validate_json(r.content)
return Response[Literal["success"]].validate_and_unwrap(r)
def fetch_access_policy(
self, access_policy_id: AccessPolicyId
) -> Response[AccessPolicy]:
def fetch_access_policy(self, access_policy_id: AccessPolicyId) -> AccessPolicy:
"""5.5 Fetch Access Policy"""
r = self._session.get(f"{self._base_url}/access_policies/{access_policy_id}")
return TypeAdapter(Response[AccessPolicy]).validate_json(r.content)
return Response[AccessPolicy].validate_and_unwrap(r)
def fetch_all_access_policies(self) -> Response[list[AccessPolicy]]:
def fetch_all_access_policies(self) -> list[AccessPolicy]:
"""5.6 Fetch All Access Policies"""
r = self._session.get(f"{self._base_url}/access_policies")
return TypeAdapter(Response[list[AccessPolicy]]).validate_json(r.content)
return Response[list[AccessPolicy]].validate_and_unwrap(r)
def create_holiday_group(
self,
name: str,
description: str | None = None,
holidays: list[PartialHoliday] | None = None,
) -> Response[HolidayGroup]:
) -> HolidayGroup:
"""5.8 Create Holiday Group"""
class CreateHolidayGroupRequest(ForbidExtraBaseModel):
@ -832,7 +825,7 @@ class AccessClient:
r = self._session.post(
f"{self._base_url}/access_policies/holiday_groups", json=body
)
return TypeAdapter(Response[HolidayGroup]).validate_json(r.content)
return Response[HolidayGroup].validate_and_unwrap(r)
def update_holiday_group(
self,
@ -841,7 +834,7 @@ class AccessClient:
name: str | None = None,
description: str | None = None,
holidays: list[PartialHoliday] | None = None,
) -> Response[HolidayGroup]:
) -> HolidayGroup:
"""5.9 Update Holiday Group"""
class UpdateHolidayGroupRequest(ForbidExtraBaseModel):
@ -857,35 +850,31 @@ class AccessClient:
f"{self._base_url}/access_policies/holiday_groups/{holiday_group_id}",
json=body,
)
return TypeAdapter(Response[HolidayGroup]).validate_json(r.content)
return Response[HolidayGroup].validate_and_unwrap(r)
def delete_holiday_group(
self, holiday_group_id: HolidayGroupId
) -> Response[Literal["success"]]:
) -> Literal["success"]:
"""5.10 Delete Holiday Group"""
r = self._session.delete(
f"{self._base_url}/access_policies/holiday_groups/{holiday_group_id}"
)
return TypeAdapter(Response[Literal["success"]]).validate_json(r.content)
return Response[Literal["success"]].validate_and_unwrap(r)
def fetch_holiday_group(
self, holiday_group_id: HolidayGroupId
) -> Response[HolidayGroup]:
def fetch_holiday_group(self, holiday_group_id: HolidayGroupId) -> HolidayGroup:
"""5.11 Fetch Holiday Group"""
r = self._session.get(
f"{self._base_url}/access_policies/holiday_groups/{holiday_group_id}"
)
return TypeAdapter(Response[HolidayGroup]).validate_json(r.content)
return Response[HolidayGroup].validate_and_unwrap(r)
def fetch_all_holiday_groups(self) -> Response[list[FetchAllHolidayGroupsResponse]]:
def fetch_all_holiday_groups(self) -> list[FetchAllHolidayGroupsResponse]:
"""5.12 Fetch All Holiday Groups"""
r = self._session.get(f"{self._base_url}/access_policies/holiday_groups")
return TypeAdapter(Response[list[FetchAllHolidayGroupsResponse]]).validate_json(
r.content
)
return Response[list[FetchAllHolidayGroupsResponse]].validate_and_unwrap(r)
def create_schedule(
self,
@ -894,7 +883,7 @@ class AccessClient:
week_schedule: WeekSchedule,
holiday_group_id: HolidayGroupId | None = None,
holiday_schedule: list[TimePeriod] | None = None,
) -> Response[Schedule]:
) -> Schedule:
"""5.14 Create Schedule"""
class CreateScheduleRequest(ForbidExtraBaseModel):
@ -911,7 +900,7 @@ class AccessClient:
).model_dump(exclude_none=True)
r = self._session.post(f"{self._base_url}/access_policies/schedules", json=body)
return TypeAdapter(Response[Schedule]).validate_json(r.content)
return Response[Schedule].validate_and_unwrap(r)
def update_schedule(
self,
@ -922,7 +911,7 @@ class AccessClient:
week_schedule: WeekSchedule | None = None,
holiday_group_id: HolidayGroupId | None = None,
holiday_schedule: list[TimePeriod] | None = None,
) -> Response[dict[Any, Any]]: # TODO: this should really return dict[Never, Never]
) -> dict[Any, Any]: # TODO: this should really return dict[Never, Never]
"""5.15 Update Schedule"""
class UpdateScheduleRequest(ForbidExtraBaseModel):
@ -941,40 +930,38 @@ class AccessClient:
r = self._session.put(
f"{self._base_url}/access_policies/schedules/{schedule_id}", json=body
)
return TypeAdapter(Response[dict[Any, Any]]).validate_json(r.content)
return Response[dict[Any, Any]].validate_and_unwrap(r)
# NOTE: the API docs incorrectly define the response as a subset of `Schedule`
def fetch_schedule(self, schedule_id: ScheduleId) -> Response[Schedule]:
def fetch_schedule(self, schedule_id: ScheduleId) -> Schedule:
"""5.16 Fetch Schedule"""
r = self._session.get(
f"{self._base_url}/access_policies/schedules/{schedule_id}"
)
return TypeAdapter(Response[Schedule]).validate_json(r.content)
return Response[Schedule].validate_and_unwrap(r)
def fetch_all_schedules(self) -> Response[list[FetchAllSchedulesResponse]]:
def fetch_all_schedules(self) -> list[FetchAllSchedulesResponse]:
"""5.17 Fetch All Schedules"""
r = self._session.get(f"{self._base_url}/access_policies/schedules")
return TypeAdapter(Response[list[FetchAllSchedulesResponse]]).validate_json(
r.content
)
return Response[list[FetchAllSchedulesResponse]].validate_and_unwrap(r)
def delete_schedule(self, schedule_id: ScheduleId) -> Response[Literal["success"]]:
def delete_schedule(self, schedule_id: ScheduleId) -> Literal["success"]:
"""5.18 Delete Schedule"""
r = self._session.delete(
f"{self._base_url}/access_policies/schedules/{schedule_id}"
)
return TypeAdapter(Response[Literal["success"]]).validate_json(r.content)
return Response[Literal["success"]].validate_and_unwrap(r)
def generate_pin_code(self) -> Response[str]:
def generate_pin_code(self) -> str:
r = self._session.post(f"{self._base_url}/credentials/pin_codes")
return TypeAdapter(Response["str"]).validate_json(r.content)
return Response["str"].validate_and_unwrap(r)
def begin_enroll_card(
self, device_id: DeviceId, reset_ua_card: bool | None = None
) -> Response[EnrollNfcCardResponse]:
) -> EnrollNfcCardResponse:
"""6.2 Enroll NFC Card"""
class EnrollNFCCardRequest(ForbidExtraBaseModel):
@ -988,81 +975,79 @@ class AccessClient:
r = self._session.post(
f"{self._base_url}/credentials/nfc_cards/sessions", json=body
)
return TypeAdapter(Response[EnrollNfcCardResponse]).validate_json(r.content)
return Response[EnrollNfcCardResponse].validate_and_unwrap(r)
def fetch_enroll_card_status(
self,
session_id: NfcCardEnrollmentSessionId,
) -> Response[NfcCardEnrollmentStatus]:
) -> NfcCardEnrollmentStatus:
"""6.3 Fetch NFC Card Enrollment Status"""
r = self._session.get(
f"{self._base_url}/credentials/nfc_cards/sessions/{session_id}"
)
return TypeAdapter(Response[NfcCardEnrollmentStatus]).validate_json(r.content)
return Response[NfcCardEnrollmentStatus].validate_and_unwrap(r)
def remove_enrollment_session(
self,
session_id: NfcCardEnrollmentSessionId,
) -> Response[Literal["success"]]:
) -> Literal["success"]:
"""6.4 Remove a Session Created for NFC Card Enrollment"""
r = self._session.delete(
f"{self._base_url}/credentials/nfc_cards/sessions/{session_id}"
)
return TypeAdapter(Response[str]).validate_json(r.content)
return Response[Literal["success"]].validate_and_unwrap(r)
def fetch_nfc_card(self, nfc_card_token: NfcCardToken) -> Response[NfcCard]:
def fetch_nfc_card(self, nfc_card_token: NfcCardToken) -> NfcCard:
"""6.7 Fetch NFC Card"""
r = self._session.get(
f"{self._base_url}/credentials/nfc_cards/tokens/{nfc_card_token}"
)
return TypeAdapter(Response[NfcCard]).validate_json(r.content)
return Response[NfcCard].validate_and_unwrap(r)
def fetch_all_nfc_cards(
self, page_num: int | None = None, page_size: int | None = None
) -> Response[list[NfcCard]]:
) -> SuccessResponse[list[NfcCard]]:
"""6.8 Fetch NFC Cards"""
params = RequestPagination(page_num=page_num, page_size=page_size)
r = self._session.get(
f"{self._base_url}/credentials/nfc_cards/tokens", params=params
)
return TypeAdapter(Response[list[NfcCard]]).validate_json(r.content)
return (
Response[list[NfcCard]].model_validate_json(r.content).root
).success_or_raise()
def delete_nfc_card(
self, nfc_card_token: NfcCardToken
) -> Response[Literal["success"]]:
def delete_nfc_card(self, nfc_card_token: NfcCardToken) -> Literal["success"]:
"""6.7 Fetch NFC Card"""
r = self._session.delete(
f"{self._base_url}/credentials/nfc_cards/tokens/{nfc_card_token}"
)
return TypeAdapter(Response[Literal["success"]]).validate_json(r.content)
return Response[Literal["success"]].validate_and_unwrap(r)
def fetch_door_group_topology(self) -> Response[DoorGroupTopology]:
def fetch_door_group_topology(self) -> list[DoorGroupTopology]:
"""7.1 Fetch Door Group Topology"""
r = self._session.get(f"{self._base_url}/door_groups/topology")
return TypeAdapter(Response[list[DoorGroupTopology]]).validate_json(r.content)
return Response[list[DoorGroupTopology]].validate_and_unwrap(r)
def create_door_group(
self, group_name: str, resources: list[ResourceId]
) -> Response[DoorGroup[Resource]]:
) -> DoorGroup[Resource]:
"""7.2 Create Door Group"""
body = {"group_name": group_name, "resources": resources}
r = self._session.post(f"{self._base_url}/door_groups", json=body)
return TypeAdapter(Response[DoorGroup[Resource]]).validate_json(r.content)
return Response[DoorGroup[Resource]].validate_and_unwrap(r)
def fetch_door_group(
self, door_group_id: DoorGroupId
) -> Response[DoorGroup[NamedResource]]:
def fetch_door_group(self, door_group_id: DoorGroupId) -> DoorGroup[NamedResource]:
"""7.3 Fetch Door Group"""
r = self._session.get(f"{self._base_url}/door_groups/{door_group_id}")
return TypeAdapter(Response[DoorGroup[NamedResource]]).validate_json(r.content)
return Response[DoorGroup[NamedResource]].validate_and_unwrap(r)
def update_door_group(
self,
door_group_id: DoorGroupId,
group_name: str | None = None,
resources: list[ResourceId] | None = None,
) -> Response[DoorGroup[Resource]]:
) -> DoorGroup[Resource]:
"""7.4 Update Door Group"""
class UpdateDoorGroupRequest(ForbidExtraBaseModel):
@ -1076,39 +1061,37 @@ class AccessClient:
r = self._session.put(
f"{self._base_url}/door_groups/{door_group_id}", json=body
)
return TypeAdapter(Response[DoorGroup[Resource]]).validate_json(r.content)
return Response[DoorGroup[Resource]].validate_and_unwrap(r)
def fetch_all_door_groups(self) -> Response[list[DoorGroup[Resource]]]:
def fetch_all_door_groups(self) -> list[DoorGroup[Resource]]:
"""7.5 Fetch All Door Groups"""
r = self._session.get(f"{self._base_url}/door_groups")
return TypeAdapter(Response[list[DoorGroup[Resource]]]).validate_json(r.content)
return Response[list[DoorGroup[Resource]]].validate_and_unwrap(r)
def delete_door_group(
self, door_group_id: DoorGroupId
) -> Response[Literal["success"]]:
def delete_door_group(self, door_group_id: DoorGroupId) -> Literal["success"]:
"""7.6 Delete Door Group"""
r = self._session.get(f"{self._base_url}/door_groups/{door_group_id}")
return TypeAdapter(Response[Literal["success"]]).validate_json(r.content)
return Response[Literal["success"]].validate_and_unwrap(r)
def fetch_door(self, door_id: DoorId) -> Response[Door]:
def fetch_door(self, door_id: DoorId) -> Door:
"""7.7 Fetch Door"""
r = self._session.get(f"{self._base_url}/doors/{door_id}")
return TypeAdapter(Response[Door]).validate_json(r.content)
return Response[Door].validate_and_unwrap(r)
def fetch_all_doors(self) -> Response[list[Door]]:
def fetch_all_doors(self) -> list[Door]:
"""7.8 Fetch All Doors"""
r = self._session.get(f"{self._base_url}/doors")
return TypeAdapter(Response[list[Door]]).validate_json(r.content)
return Response[list[Door]].validate_and_unwrap(r)
def unlock_door(self, door_id: DoorId) -> Response[Literal["success"]]:
def unlock_door(self, door_id: DoorId) -> Literal["success"]:
"""7.9 Remote Door Unlocking"""
r = self._session.put(f"{self._base_url}/doors/{door_id}/unlock")
return TypeAdapter(Response[Literal["success"]]).validate_json(r.content)
return Response[Literal["success"]].validate_and_unwrap(r)
# XXX: could do a better job of typing for interval
def set_temporary_door_locking_rule(
self, door_id: DoorId, type_: DoorLockingRuleType, interval: int | None = None
) -> Response[Literal["success"]]:
) -> Literal["success"]:
"""7.10 Set Temporary Door Locking Rule"""
class SetTemporaryDoorLockingRuleRequest(ForbidExtraBaseModel):
@ -1119,36 +1102,36 @@ class AccessClient:
type=type_, interval=interval
).model_dump(exclude_none=True)
r = self._session.put(f"{self._base_url}/doors/{door_id}/lock_rule", json=body)
return TypeAdapter(Response[Literal["success"]]).validate_json(r.content)
return Response[Literal["success"]].validate_and_unwrap(r)
def fetch_door_locking_rule(self, door_id: DoorId) -> Response[DoorLockingRule]:
def fetch_door_locking_rule(self, door_id: DoorId) -> DoorLockingRule:
"""7.11 Fetch Door Locking Rule"""
r = self._session.get(f"{self._base_url}/doors/{door_id}/lock_rule")
return TypeAdapter(Response[DoorLockingRule]).validate_json(r.content)
return Response[DoorLockingRule].validate_and_unwrap(r)
# TODO: why is this two bools instead of an enum? Could probably be improved.
def set_door_emergency_status(
self,
lockdown: bool = False,
evacuation: bool = False,
) -> Response[Literal["success"]]:
) -> Literal["success"]:
"""7.12 Set Door Emergency Status"""
body = DoorEmergencyStatus(lockdown=lockdown, evacuation=evacuation).model_dump(
exclude_none=True
)
r = self._session.put(f"{self._base_url}/doors/settings/emergency", json=body)
return TypeAdapter(Response[Literal["success"]]).validate_json(r.content)
return Response[Literal["success"]].validate_and_unwrap(r)
def fetch_door_emergency_status(self) -> Response[DoorEmergencyStatus]:
def fetch_door_emergency_status(self) -> DoorEmergencyStatus:
"""7.13 Fetch Door Emergency Status"""
r = self._session.get(f"{self._base_url}/doors/settings/emergency")
return TypeAdapter(Response[DoorEmergencyStatus]).validate_json(r.content)
return Response[DoorEmergencyStatus].validate_and_unwrap(r)
def fetch_devices(self) -> Response[list[Device]]:
def fetch_devices(self) -> list[Device]:
"""8.1 Fetch Devices"""
r = self._session.get(f"{self._base_url}/devices")
return TypeAdapter(Response[list[Device]]).validate_json(r.content)
return Response[list[Device]].validate_and_unwrap(r)
def fetch_system_logs( # noqa: PLR0913
self,
@ -1158,7 +1141,7 @@ class AccessClient:
actor_id: ActorId | None = None,
page_num: int | None = None,
page_size: int | None = None,
) -> Response[FetchSystemLogsResponse]:
) -> SuccessResponse[FetchSystemLogsResponse]:
"""9.2 Fetch System Logs"""
params = RequestPagination(page_num=page_num, page_size=page_size).model_dump(
exclude_none=True
@ -1180,7 +1163,9 @@ class AccessClient:
r = self._session.post(
f"{self._base_url}/system/logs", params=params, json=body
)
return TypeAdapter(Response[FetchSystemLogsResponse]).validate_json(r.content)
return (
Response[FetchSystemLogsResponse].model_validate_json(r.content)
).root.success_or_raise()
def export_system_logs(
self,
@ -1212,7 +1197,7 @@ class AccessClient:
def send_unifi_identity_invitations(
self, users: list[IdentityInvitationUser]
) -> Response[list[IdentityInvitationEmailFailure]]:
) -> list[IdentityInvitationEmailFailure]:
"""10.1 Send UniFi Identity Invitations"""
body = TypeAdapter(list[IdentityInvitationUser]).dump_python(
users, exclude_none=True
@ -1221,13 +1206,11 @@ class AccessClient:
r = self._session.post(
f"{self._base_url}/users/identity/invitations", json=body
)
return TypeAdapter(
Response[list[IdentityInvitationEmailFailure]]
).validate_json(r.content)
return Response[list[IdentityInvitationEmailFailure]].validate_and_unwrap(r)
def fetch_available_resources(
self, resource_type: Iterable[IdentityResourceType] | None = None
) -> Response[dict[IdentityResourceType, list[IdentityResource]]]:
) -> dict[IdentityResourceType, list[IdentityResource]]:
"""10.2 Fetch Available Resources"""
params = {"resource_type": ",".join(resource_type)} if resource_type else None
@ -1235,41 +1218,41 @@ class AccessClient:
r = self._session.get(
f"{self._base_url}/users/identity/assignments", params=params
)
return TypeAdapter(
Response[dict[IdentityResourceType, list[IdentityResource]]]
).validate_json(r.content)
return Response[
dict[IdentityResourceType, list[IdentityResource]]
].validate_and_unwrap(r)
def assign_resources_to_user(
self,
user_id: UserId,
resource_type: IdentityResourceType,
resource_ids: list[IdentityResourceId],
) -> Response[None]:
) -> None:
"""10.3 Assign Resources to Users"""
body = {"resource_type": resource_type, "resource_ids": resource_ids}
r = self._session.post(
f"{self._base_url}/users/{user_id}/identity/assignments", json=body
)
return TypeAdapter(Response[None]).validate_json(r.content)
return Response[None].validate_and_unwrap(r)
def fetch_resources_assigned_to_user(
self,
user_id: UserId,
) -> Response[dict[IdentityResourceType, list[IdentityResource]]]:
) -> dict[IdentityResourceType, list[IdentityResource]]:
"""10.4 Fetch Resources Assigned to Users"""
r = self._session.get(f"{self._base_url}/users/{user_id}/identity/assignments")
return TypeAdapter(
Response[dict[IdentityResourceType, list[IdentityResource]]]
).validate_json(r.content)
return Response[
dict[IdentityResourceType, list[IdentityResource]]
].validate_and_unwrap(r)
def assign_resources_to_user_group(
self,
user_group_id: UserGroupId,
resource_type: IdentityResourceType,
resource_ids: list[IdentityResourceId],
) -> Response[None]:
) -> None:
"""10.5 Assign Resources to User Groups"""
body = {"resource_type": resource_type, "resource_ids": resource_ids}
@ -1277,17 +1260,17 @@ class AccessClient:
f"{self._base_url}/user_groups/{user_group_id}/identity/assignments",
json=body,
)
return TypeAdapter(Response[None]).validate_json(r.content)
return Response[None].validate_and_unwrap(r)
def fetch_resources_assigned_to_user_group(
self,
user_group_id: UserGroupId,
) -> Response[dict[IdentityResourceType, list[IdentityResource]]]:
) -> dict[IdentityResourceType, list[IdentityResource]]:
"""10.6 Fetch Resources Assigned to User Groups"""
r = self._session.get(
f"{self._base_url}/user_groups/{user_group_id}/identity/assignments"
)
return TypeAdapter(
Response[dict[IdentityResourceType, list[IdentityResource]]]
).validate_json(r.content)
return Response[
dict[IdentityResourceType, list[IdentityResource]]
].validate_and_unwrap(r)

View File

@ -9,11 +9,7 @@ from unifi_access.schemas._base import DoorResource
@pytest.fixture
def holiday_group(live_access_client: AccessClient) -> Generator[HolidayGroup]:
_holiday_group = (
live_access_client.create_holiday_group("Test Holiday Group")
.success_or_raise()
.data
)
_holiday_group = live_access_client.create_holiday_group("Test Holiday Group")
yield _holiday_group
live_access_client.delete_holiday_group(_holiday_group.id)
@ -22,22 +18,18 @@ def holiday_group(live_access_client: AccessClient) -> Generator[HolidayGroup]:
def schedule(
live_access_client: AccessClient, holiday_group: HolidayGroup
) -> Generator[Schedule]:
_schedule = (
live_access_client.create_schedule(
"Test Schedule",
week_schedule=WeekSchedule(
sunday=[],
monday=[],
tuesday=[],
wednesday=[],
thursday=[],
friday=[],
saturday=[],
),
holiday_group_id=holiday_group.id,
)
.success_or_raise()
.data
_schedule = live_access_client.create_schedule(
"Test Schedule",
week_schedule=WeekSchedule(
sunday=[],
monday=[],
tuesday=[],
wednesday=[],
thursday=[],
friday=[],
saturday=[],
),
holiday_group_id=holiday_group.id,
)
yield _schedule
live_access_client.delete_schedule(_schedule.id)
@ -47,10 +39,8 @@ def schedule(
def access_policy(
live_access_client: AccessClient, schedule: Schedule
) -> Generator[AccessPolicy]:
_access_policy = (
live_access_client.create_access_policy("Test Access Policy", schedule.id)
.success_or_raise()
.data
_access_policy = live_access_client.create_access_policy(
"Test Access Policy", schedule.id
)
yield _access_policy
live_access_client.delete_access_policy(_access_policy.id)
@ -63,25 +53,17 @@ def test_access_policy_lifecycle(
access_policy: AccessPolicy,
) -> None:
assert access_policy
doors = live_access_client.fetch_all_doors().success_or_raise().data
doors = live_access_client.fetch_all_doors()
resources = [DoorResource(id=door.id) for door in doors]
updated_access_policy = (
live_access_client.update_access_policy(access_policy.id, resources=resources)
.success_or_raise()
.data
updated_access_policy = live_access_client.update_access_policy(
access_policy.id, resources=resources
)
assert updated_access_policy.resources == resources
updated_holiday_group = (
live_access_client.update_holiday_group(
holiday_group.id, description="Test Description"
)
.success_or_raise()
.data
updated_holiday_group = live_access_client.update_holiday_group(
holiday_group.id, description="Test Description"
)
assert updated_holiday_group.description == "Test Description"
holiday_groups = (
live_access_client.fetch_all_holiday_groups().success_or_raise().data
)
holiday_groups = live_access_client.fetch_all_holiday_groups()
assert any(g.id == updated_holiday_group.id for g in holiday_groups)

View File

@ -14,7 +14,7 @@ def user(live_access_client: AccessClient) -> User:
# unique on the server, and I can't delete users. Maybe
# should fetch user if it already exists?
# TODO: add an onboard time?
return live_access_client.register_user("Test", "User").success_or_raise().data
return live_access_client.register_user("Test", "User")
def test_user_lifecycle(live_access_client: AccessClient, user: User) -> None:
@ -24,7 +24,7 @@ def test_user_lifecycle(live_access_client: AccessClient, user: User) -> None:
assert user.status == UserStatus.ACTIVE
# Fetch user
fetched_user = live_access_client.fetch_user(user.id).success_or_raise().data
fetched_user = live_access_client.fetch_user(user.id)
assert fetched_user.id == user.id
assert fetched_user.first_name == "Test"
assert fetched_user.last_name == "User"
@ -33,10 +33,10 @@ def test_user_lifecycle(live_access_client: AccessClient, user: User) -> None:
# Update user
live_access_client.update_user(
user.id, last_name="Updated User", status=UserStatus.DEACTIVATED
).success_or_raise()
)
# Fetch again, and check changes
fetched_user = live_access_client.fetch_user(user.id).success_or_raise().data
fetched_user = live_access_client.fetch_user(user.id)
assert fetched_user.id == user.id
assert fetched_user.first_name == "Test"
assert fetched_user.last_name == "Updated User"
@ -44,7 +44,7 @@ def test_user_lifecycle(live_access_client: AccessClient, user: User) -> None:
# Check for the user in full user list
# TODO: test pagination
all_users = live_access_client.fetch_all_users().success_or_raise().data
all_users = live_access_client.fetch_all_users().data
matching_user = next(u for u in all_users if u.id == user.id)
assert matching_user.id == user.id
assert matching_user.first_name == "Test"
@ -69,9 +69,7 @@ def make_user_group(
name: str,
up_id: UserGroupId | None = None,
) -> UserGroup | None:
group = (
live_access_client.create_user_group(name, up_id).success_or_raise().data
)
group = live_access_client.create_user_group(name, up_id)
created_groups.append(group)
return group
@ -92,13 +90,11 @@ def test_user_groups_lifecycle(
assert group, "group 'Test Group' already existed, please delete"
# Fetch group
fetched_group = (
live_access_client.fetch_user_group(group.id).success_or_raise().data
)
fetched_group = live_access_client.fetch_user_group(group.id)
assert fetched_group.name == "Test Group"
# Check for group in in full group list
groups = live_access_client.fetch_all_user_groups().success_or_raise().data
groups = live_access_client.fetch_all_user_groups()
matching_group = next(g for g in groups if g.id == group.id)
assert matching_group
@ -112,17 +108,11 @@ def test_user_groups_lifecycle(
live_access_client.assign_user_to_user_group(group.id, [user.id])
# Fetch users in the group
users_in_group = (
live_access_client.fetch_users_in_a_user_group(group.id).success_or_raise().data
)
users_in_group = live_access_client.fetch_users_in_a_user_group(group.id)
assert len(users_in_group) == 1
assert users_in_group[0].id == user.id
# Fetch all users in the group
all_users_in_group = (
live_access_client.fetch_all_users_in_a_user_group(group.id)
.success_or_raise()
.data
)
all_users_in_group = live_access_client.fetch_all_users_in_a_user_group(group.id)
assert len(all_users_in_group) == 1
assert all_users_in_group[0].id == user.id

View File

@ -1,5 +1,4 @@
import datetime
import functools
from collections.abc import Callable, Generator
import pytest
@ -12,11 +11,8 @@ from unifi_access.schemas import FetchAllVisitorsExpansion, Visitor, VisitReason
def make_visitor(live_access_client: AccessClient) -> Generator[Callable[..., Visitor]]:
created_visitors: list[Visitor] = []
@functools.wraps(live_access_client.create_visitor)
def _make_visitor(*args, **kwargs) -> Visitor:
visitor = (
live_access_client.create_visitor(*args, **kwargs).success_or_raise().data
)
def _make_visitor(*args, **kwargs) -> Visitor: # type: ignore
visitor = live_access_client.create_visitor(*args, **kwargs)
created_visitors.append(visitor)
return visitor
@ -39,63 +35,47 @@ def test_visitor_lifecycle(
)
assert visitor.first_name == "Test"
fetched_visitor = (
live_access_client.fetch_visitor(visitor.id).success_or_raise().data
)
fetched_visitor = live_access_client.fetch_visitor(visitor.id)
assert fetched_visitor.first_name == "Test"
assert fetched_visitor.pin_code is None
# pin code assign/unassign
pin_code = live_access_client.generate_pin_code().success_or_raise().data
live_access_client.assign_pin_code_to_visitor(
visitor.id, pin_code
).success_or_raise()
fetched_visitor = (
live_access_client.fetch_visitor(visitor.id).success_or_raise().data
)
pin_code = live_access_client.generate_pin_code()
live_access_client.assign_pin_code_to_visitor(visitor.id, pin_code)
fetched_visitor = live_access_client.fetch_visitor(visitor.id)
assert fetched_visitor.pin_code
assert fetched_visitor.pin_code.token
live_access_client.unassign_pin_code_from_visitor(visitor.id).success_or_raise()
fetched_visitor = (
live_access_client.fetch_visitor(visitor.id).success_or_raise().data
)
live_access_client.unassign_pin_code_from_visitor(visitor.id)
fetched_visitor = live_access_client.fetch_visitor(visitor.id)
assert fetched_visitor.pin_code is None
updated_visitor = (
live_access_client.update_visitor(visitor.id, first_name="Updated Test")
.success_or_raise()
.data
updated_visitor = live_access_client.update_visitor(
visitor.id, first_name="Updated Test"
)
assert updated_visitor.first_name == "Updated Test"
all_visitors = live_access_client.fetch_all_visitors().success_or_raise().data
all_visitors = live_access_client.fetch_all_visitors().data
matched_visitor = next(v for v in all_visitors if v.id == visitor.id)
assert matched_visitor.first_name == "Updated Test"
expanded_all_visitors = (
live_access_client.fetch_all_visitors(
expand=[
FetchAllVisitorsExpansion.ACCESS_POLICY,
FetchAllVisitorsExpansion.RESOURCE,
FetchAllVisitorsExpansion.SCHEDULE,
FetchAllVisitorsExpansion.NFC_CARD,
FetchAllVisitorsExpansion.PIN_CODE,
]
)
.success_or_raise()
.data
)
expanded_all_visitors = live_access_client.fetch_all_visitors(
expand=[
FetchAllVisitorsExpansion.ACCESS_POLICY,
FetchAllVisitorsExpansion.RESOURCE,
FetchAllVisitorsExpansion.SCHEDULE,
FetchAllVisitorsExpansion.NFC_CARD,
FetchAllVisitorsExpansion.PIN_CODE,
]
).data
expanded_matched_visitor = next(
v for v in expanded_all_visitors if v.id == visitor.id
)
assert expanded_matched_visitor.first_name == "Updated Test"
# TODO: test expanded contents
non_expanded_all_visitors = (
live_access_client.fetch_all_visitors(expand=[FetchAllVisitorsExpansion.NONE])
.success_or_raise()
.data
)
non_expanded_all_visitors = live_access_client.fetch_all_visitors(
expand=[FetchAllVisitorsExpansion.NONE]
).data
non_expanded_matched_visitor = next(
v for v in non_expanded_all_visitors if v.id == visitor.id
)

View File

@ -85,8 +85,8 @@ class AccessPolicyTests(UnifiAccessTests):
),
],
schedule_id=ScheduleId("4e108aeb-ec9a-4822-bf86-170ea986f934"),
).success_or_raise()
assert resp.data.id == "bb5eb965-42dc-4206-9654-88a2d1c3aaa5"
)
assert resp.id == "bb5eb965-42dc-4206-9654-88a2d1c3aaa5"
@responses.activate
def test_update_access_policy(self) -> None:
@ -155,8 +155,8 @@ class AccessPolicyTests(UnifiAccessTests):
),
],
schedule_id=ScheduleId("4e108aeb-ec9a-4822-bf86-170ea986f934"),
).success_or_raise()
assert resp.data.id == "bb5eb965-42dc-4206-9654-88a2d1c3aaa5"
)
assert resp.id == "bb5eb965-42dc-4206-9654-88a2d1c3aaa5"
@responses.activate
def test_delete_access_policy(self) -> None:
@ -168,11 +168,9 @@ class AccessPolicyTests(UnifiAccessTests):
json={"code": "SUCCESS", "msg": "success", "data": "success"},
)
resp = self.client.delete_access_policy(
access_policy_id=access_policy_id
).success_or_raise()
resp = self.client.delete_access_policy(access_policy_id=access_policy_id)
assert resp.data == "success"
assert resp == "success"
@responses.activate
def test_fetch_access_policy(self) -> None:
@ -203,11 +201,9 @@ class AccessPolicyTests(UnifiAccessTests):
},
)
resp = self.client.fetch_access_policy(
access_policy_id=access_policy_id
).success_or_raise()
resp = self.client.fetch_access_policy(access_policy_id=access_policy_id)
assert resp.data.id == "ed09985f-cf52-486e-bc33-377b6ed7bbf2"
assert resp.id == "ed09985f-cf52-486e-bc33-377b6ed7bbf2"
@responses.activate
def test_fetch_all_access_policies(self) -> None:
@ -272,8 +268,8 @@ class AccessPolicyTests(UnifiAccessTests):
},
)
resp = self.client.fetch_all_access_policies().success_or_raise()
assert resp.data[0].id == "73f15cab-c725-4a76-a419-a4026d131e96"
resp = self.client.fetch_all_access_policies()
assert resp[0].id == "73f15cab-c725-4a76-a419-a4026d131e96"
@responses.activate
def test_create_holiday_group(self) -> None:
@ -354,8 +350,8 @@ class AccessPolicyTests(UnifiAccessTests):
end_time=datetime.datetime(2023, 8, 27, tzinfo=datetime.UTC),
),
],
).success_or_raise()
assert resp.data.id == "7be7a7a0-818f-4f76-98c3-1c38957f4dca"
)
assert resp.id == "7be7a7a0-818f-4f76-98c3-1c38957f4dca"
@responses.activate
def test_update_holiday_group(self) -> None:
@ -443,8 +439,8 @@ class AccessPolicyTests(UnifiAccessTests):
end_time=datetime.datetime(2023, 8, 27, tzinfo=datetime.UTC),
),
],
).success_or_raise()
assert resp.data.id == "7be7a7a0-818f-4f76-98c3-1c38957f4dca"
)
assert resp.id == "7be7a7a0-818f-4f76-98c3-1c38957f4dca"
@responses.activate
def test_delete_holiday_group(self) -> None:
@ -457,11 +453,9 @@ class AccessPolicyTests(UnifiAccessTests):
json={"code": "SUCCESS", "msg": "success", "data": "success"},
)
resp = self.client.delete_holiday_group(
holiday_group_id=holiday_group_id
).success_or_raise()
resp = self.client.delete_holiday_group(holiday_group_id=holiday_group_id)
assert resp.data == "success"
assert resp == "success"
@responses.activate
def test_fetch_holiday_group(self) -> None:
@ -501,10 +495,8 @@ class AccessPolicyTests(UnifiAccessTests):
},
)
resp = self.client.fetch_holiday_group(
holiday_group_id=holiday_group_id
).success_or_raise()
assert resp.data.id == "7be7a7a0-818f-4f76-98c3-1c38957f4dca"
resp = self.client.fetch_holiday_group(holiday_group_id=holiday_group_id)
assert resp.id == "7be7a7a0-818f-4f76-98c3-1c38957f4dca"
@responses.activate
def test_fetch_all_holiday_groups(self) -> None:
@ -534,8 +526,8 @@ class AccessPolicyTests(UnifiAccessTests):
},
)
resp = self.client.fetch_all_holiday_groups().success_or_raise()
assert resp.data[0].id == "8cc22b49-a7f4-49a6-9f04-044444992d6c"
resp = self.client.fetch_all_holiday_groups()
assert resp[0].id == "8cc22b49-a7f4-49a6-9f04-044444992d6c"
@responses.activate
def test_create_schedule(self) -> None:
@ -669,7 +661,7 @@ class AccessPolicyTests(UnifiAccessTests):
end_time=datetime.time(19, 0, 59),
),
],
).success_or_raise()
)
@responses.activate
def test_update_schedule(self) -> None:
@ -755,7 +747,7 @@ class AccessPolicyTests(UnifiAccessTests):
end_time=datetime.time(11, 45, 59),
)
],
).success_or_raise()
)
@responses.activate
def test_fetch_schedule(self) -> None:
@ -821,7 +813,7 @@ class AccessPolicyTests(UnifiAccessTests):
},
)
resp = self.client.fetch_schedule(schedule_id=schedule_id).success_or_raise()
resp = self.client.fetch_schedule(schedule_id=schedule_id)
@responses.activate
def test_fetch_all_schedules(self) -> None:
@ -853,7 +845,7 @@ class AccessPolicyTests(UnifiAccessTests):
},
)
resp = self.client.fetch_all_schedules().success_or_raise()
resp = self.client.fetch_all_schedules()
@responses.activate
def test_delete_schedule(self) -> None:
@ -866,4 +858,4 @@ class AccessPolicyTests(UnifiAccessTests):
json={"code": "SUCCESS", "msg": "success", "data": "success"},
)
resp = self.client.delete_schedule(schedule_id=schedule_id).success_or_raise()
resp = self.client.delete_schedule(schedule_id=schedule_id)

View File

@ -15,8 +15,8 @@ class CredentialTests(UnifiAccessTests):
match=[matchers.header_matcher(self.common_headers)],
json={"code": "SUCCESS", "data": "67203419", "msg": "success"},
)
resp = self.client.generate_pin_code().success_or_raise()
assert resp.data == "67203419"
resp = self.client.generate_pin_code()
assert resp == "67203419"
@responses.activate
def test_begin_enroll_card(self) -> None:
@ -37,8 +37,8 @@ class CredentialTests(UnifiAccessTests):
)
resp = self.client.begin_enroll_card(
device_id=DeviceId("0418d6a2bb7a"), reset_ua_card=False
).success_or_raise()
assert resp.data.session_id == "e8a97c52-6676-4c48-8589-bd518afc4094"
)
assert resp.session_id == "e8a97c52-6676-4c48-8589-bd518afc4094"
@responses.activate
def test_fetch_enroll_card_status(self) -> None:
@ -59,12 +59,10 @@ class CredentialTests(UnifiAccessTests):
},
},
)
resp = self.client.fetch_enroll_card_status(
session_id=session_id
).success_or_raise()
assert resp.data.id == "014A3151"
resp = self.client.fetch_enroll_card_status(session_id=session_id)
assert resp.id == "014A3151"
assert (
resp.data.token
resp.token
== "821f90b262e90c5c0fbcddf3d6d2f3b94cc015d6e8104ab4fb96e4c8b8e90cb7"
)
@ -79,9 +77,7 @@ class CredentialTests(UnifiAccessTests):
],
json={"code": "SUCCESS", "msg": "success", "data": "success"},
)
resp = self.client.remove_enrollment_session(
session_id=session_id
).success_or_raise()
resp = self.client.remove_enrollment_session(session_id=session_id)
@responses.activate
def test_fetch_nfc_card(self) -> None:
@ -115,7 +111,7 @@ class CredentialTests(UnifiAccessTests):
},
)
resp = self.client.fetch_nfc_card(nfc_card_token).success_or_raise()
resp = self.client.fetch_nfc_card(nfc_card_token)
@responses.activate
def test_fetch_all_nfc_cards(self) -> None:
@ -169,9 +165,7 @@ class CredentialTests(UnifiAccessTests):
},
)
resp = self.client.fetch_all_nfc_cards(
page_num=1, page_size=25
).success_or_raise()
resp = self.client.fetch_all_nfc_cards(page_num=1, page_size=25)
@responses.activate
def test_delete_nfc_card(self) -> None:
@ -185,5 +179,5 @@ class CredentialTests(UnifiAccessTests):
json={"code": "SUCCESS", "data": "success", "msg": "success"},
)
resp = self.client.delete_nfc_card(nfc_card_token).success_or_raise()
assert resp.data == "success"
resp = self.client.delete_nfc_card(nfc_card_token)
assert resp == "success"

View File

@ -38,5 +38,5 @@ class CredentialTests(UnifiAccessTests):
"msg": "success",
},
)
resp = self.client.fetch_devices().success_or_raise()
assert resp.data[0].name == "UA-HUB-3855"
resp = self.client.fetch_devices()
assert resp[0].name == "UA-HUB-3855"

View File

@ -61,7 +61,7 @@ class SpaceTests(UnifiAccessTests):
},
)
resp = self.client.fetch_door_group_topology().success_or_raise()
resp = self.client.fetch_door_group_topology()
@responses.activate
def test_create_door_group(self) -> None:
@ -95,7 +95,7 @@ class SpaceTests(UnifiAccessTests):
resp = self.client.create_door_group(
group_name="Test",
resources=[DoorId("6ff875d2-af87-470b-9cb5-774c6596afc8")],
).success_or_raise()
)
@responses.activate
def test_fetch_door_group_building(self) -> None:
@ -133,9 +133,7 @@ class SpaceTests(UnifiAccessTests):
},
)
resp = self.client.fetch_door_group(
door_group_id=door_group_id
).success_or_raise()
resp = self.client.fetch_door_group(door_group_id=door_group_id)
@responses.activate
def test_fetch_door_group_customized_groups(self) -> None:
@ -163,9 +161,7 @@ class SpaceTests(UnifiAccessTests):
},
)
resp = self.client.fetch_door_group(
door_group_id=door_group_id
).success_or_raise()
resp = self.client.fetch_door_group(door_group_id=door_group_id)
@responses.activate
def test_update_door_group(self) -> None:
@ -207,7 +203,7 @@ class SpaceTests(UnifiAccessTests):
DoorId("5a2c3d4e-1f6b-4c8d-9e0f-2a3b4c5d6e7f"),
DoorGroupId("2p3q4r5s-6t7u-8v9w-x0y1-z2a3b4c5d6e"),
],
).success_or_raise()
)
@responses.activate
def test_update_door_group_delete_resources(self) -> None:
@ -235,7 +231,7 @@ class SpaceTests(UnifiAccessTests):
resp = self.client.update_door_group(
door_group_id=door_group_id,
resources=[],
).success_or_raise()
)
@responses.activate
def test_fetch_all_door_groups(self) -> None:
@ -273,7 +269,7 @@ class SpaceTests(UnifiAccessTests):
},
)
resp = self.client.fetch_all_door_groups().success_or_raise()
resp = self.client.fetch_all_door_groups()
@responses.activate
def test_delete_door_groups(self) -> None:
@ -285,9 +281,7 @@ class SpaceTests(UnifiAccessTests):
json={"code": "SUCCESS", "data": "success", "msg": "success"},
)
resp = self.client.delete_door_group(
door_group_id=door_group_id
).success_or_raise()
resp = self.client.delete_door_group(door_group_id=door_group_id)
@responses.activate
def test_fetch_door(self) -> None:
@ -312,7 +306,7 @@ class SpaceTests(UnifiAccessTests):
},
)
resp = self.client.fetch_door(door_id=door_id).success_or_raise()
resp = self.client.fetch_door(door_id=door_id)
@responses.activate
def test_fetch_all_doors(self) -> None:
@ -348,7 +342,7 @@ class SpaceTests(UnifiAccessTests):
},
)
resp = self.client.fetch_all_doors().success_or_raise()
resp = self.client.fetch_all_doors()
@responses.activate
def test_unlock_door(self) -> None:
@ -360,7 +354,7 @@ class SpaceTests(UnifiAccessTests):
json={"code": "SUCCESS", "data": "success", "msg": "success"},
)
resp = self.client.unlock_door(door_id=door_id).success_or_raise()
resp = self.client.unlock_door(door_id=door_id)
@responses.activate
def test_set_temporary_door_locking_rule(self) -> None:
@ -378,7 +372,7 @@ class SpaceTests(UnifiAccessTests):
# Customized 10-minute unlocked
resp = self.client.set_temporary_door_locking_rule(
door_id=door_id, type_=DoorLockingRuleType.CUSTOM, interval=10
).success_or_raise()
)
# XXX: TODO: test the other examples?
@ -397,7 +391,7 @@ class SpaceTests(UnifiAccessTests):
},
)
resp = self.client.fetch_door_locking_rule(door_id=door_id).success_or_raise()
resp = self.client.fetch_door_locking_rule(door_id=door_id)
# XXX: TODO: test the other examples?
@responses.activate
@ -413,9 +407,7 @@ class SpaceTests(UnifiAccessTests):
)
# Keep it locked
resp = self.client.set_door_emergency_status(
lockdown=True, evacuation=False
).success_or_raise()
resp = self.client.set_door_emergency_status(lockdown=True, evacuation=False)
# XXX: TODO: test the other examples?
@responses.activate
@ -431,4 +423,4 @@ class SpaceTests(UnifiAccessTests):
},
)
resp = self.client.fetch_door_emergency_status().success_or_raise()
resp = self.client.fetch_door_emergency_status()

View File

@ -81,7 +81,7 @@ class SystemLogTests(UnifiAccessTests):
since=datetime.datetime.fromtimestamp(1690770546, tz=datetime.UTC),
until=datetime.datetime.fromtimestamp(1690771546, tz=datetime.UTC),
actor_id=UserId("3e1f196e-c97b-4748-aecb-eab5e9c251b2"),
).success_or_raise()
)
@responses.activate
def test_export_system_logs(self) -> None:

View File

@ -38,7 +38,7 @@ class UnifiIdentityTests(UnifiAccessTests):
email="example@*.com",
)
]
).success_or_raise()
)
@responses.activate
def test_send_unifi_identity_invitations_email_failure(self) -> None:
@ -78,7 +78,7 @@ class UnifiIdentityTests(UnifiAccessTests):
email="example@*.com",
)
]
).success_or_raise()
)
@responses.activate
def test_fetch_available_resources(self) -> None:
@ -122,7 +122,7 @@ class UnifiIdentityTests(UnifiAccessTests):
IdentityResourceType.WIFI,
IdentityResourceType.VPN,
]
).success_or_raise()
)
@responses.activate
def test_assign_resources_to_user(self) -> None:
@ -142,11 +142,11 @@ class UnifiIdentityTests(UnifiAccessTests):
json={"code": "SUCCESS", "data": None, "msg": "success"},
)
resp = self.client.assign_resources_to_user(
self.client.assign_resources_to_user(
user_id=user_id,
resource_type=IdentityResourceType.WIFI,
resource_ids=[IdentityResourceId("65dff9a8c188cb71cfac8e9a")],
).success_or_raise()
)
@responses.activate
def test_fetch_resources_assigned_to_user(self) -> None:
@ -182,9 +182,7 @@ class UnifiIdentityTests(UnifiAccessTests):
},
)
resp = self.client.fetch_resources_assigned_to_user(
user_id=user_id
).success_or_raise()
resp = self.client.fetch_resources_assigned_to_user(user_id=user_id)
@responses.activate
def test_assign_resources_to_user_group(self) -> None:
@ -204,11 +202,11 @@ class UnifiIdentityTests(UnifiAccessTests):
json={"code": "SUCCESS", "data": None, "msg": "success"},
)
resp = self.client.assign_resources_to_user_group(
self.client.assign_resources_to_user_group(
user_group_id=user_group_id,
resource_type=IdentityResourceType.WIFI,
resource_ids=[IdentityResourceId("65dff9a8c188cb71cfac8e9a")],
).success_or_raise()
)
@responses.activate
def test_fetch_resources_assigned_to_user_group(self) -> None:
@ -246,4 +244,4 @@ class UnifiIdentityTests(UnifiAccessTests):
resp = self.client.fetch_resources_assigned_to_user_group(
user_group_id=user_group_id
).success_or_raise()
)

View File

@ -61,8 +61,8 @@ class UserTests(UnifiAccessTests):
employee_number="100000",
onboard_time=datetime.datetime.fromtimestamp(1689150139, tz=datetime.UTC),
user_email="example@*.com",
).success_or_raise()
assert resp.data.id == "37f2b996-c2c5-487b-aa22-8b453ff14a4b"
)
assert resp.id == "37f2b996-c2c5-487b-aa22-8b453ff14a4b"
@responses.activate
def test_update_user(self) -> None:
@ -88,7 +88,7 @@ class UserTests(UnifiAccessTests):
json={"code": "SUCCESS", "msg": "success", "data": None},
)
resp = self.client.update_user(
self.client.update_user(
user_id=user_id,
first_name="H",
last_name="L",
@ -96,8 +96,7 @@ class UserTests(UnifiAccessTests):
onboard_time=datetime.datetime.fromtimestamp(1689150139, tz=datetime.UTC),
user_email="example@*.com",
status=UserStatus.ACTIVE,
).success_or_raise()
assert resp.data is None
)
@responses.activate
def test_fetch_user(self) -> None:
@ -164,9 +163,7 @@ class UserTests(UnifiAccessTests):
},
)
resp = self.client.fetch_user(
user_id=user_id, expand_access_policies=True
).success_or_raise()
resp = self.client.fetch_user(user_id=user_id, expand_access_policies=True)
# TODO: verify correctness of data?
@responses.activate
@ -261,7 +258,7 @@ class UserTests(UnifiAccessTests):
resp = self.client.fetch_all_users(
expand_access_policies=True, page_num=1, page_size=25
).success_or_raise()
)
assert resp.pagination
# TODO: verify correctness of data?
@ -284,14 +281,13 @@ class UserTests(UnifiAccessTests):
],
json={"code": "SUCCESS", "msg": "success"},
)
resp = self.client.assign_access_policy_to_user(
self.client.assign_access_policy_to_user(
user_id=user_id,
access_policy_ids=[
AccessPolicyId("03895c7f-9f53-4334-812b-5db9c122c109"),
AccessPolicyId("3b6bcb0c-7498-44cf-8615-00a96d824cbe"),
],
).success_or_raise()
assert resp.data is None
)
@responses.activate
def test_assign_nfc_card_to_user(self) -> None:
@ -310,14 +306,13 @@ class UserTests(UnifiAccessTests):
],
json={"code": "SUCCESS", "msg": "success"},
)
resp = self.client.assign_nfc_card_to_user(
self.client.assign_nfc_card_to_user(
user_id=user_id,
token=NfcCardToken(
"d27822fc682b478dc637c6db01813e465174df6e54ca515d8427db623cfda1d0"
),
force_add=True,
).success_or_raise()
assert resp.data is None
)
@responses.activate
def test_unassign_nfc_card_from_user(self) -> None:
@ -335,13 +330,12 @@ class UserTests(UnifiAccessTests):
],
json={"code": "SUCCESS", "msg": "success"},
)
resp = self.client.unassign_nfc_card_from_user(
self.client.unassign_nfc_card_from_user(
user_id=user_id,
token=NfcCardToken(
"d27822fc682b478dc637c6db01813e465174df6e54ca515d8427db623cfda1d0"
),
).success_or_raise()
assert resp.data is None
)
@responses.activate
def test_assign_pin_code_to_user(self) -> None:
@ -355,10 +349,7 @@ class UserTests(UnifiAccessTests):
],
json={"code": "SUCCESS", "msg": "success"},
)
resp = self.client.assign_pin_code_to_user(
user_id=user_id, pin_code="57301208"
).success_or_raise()
assert resp.data is None
self.client.assign_pin_code_to_user(user_id=user_id, pin_code="57301208")
@responses.activate
def test_unassign_pin_code_from_user(self) -> None:
@ -371,10 +362,9 @@ class UserTests(UnifiAccessTests):
],
json={"code": "SUCCESS", "msg": "success"},
)
resp = self.client.unassign_pin_code_from_user(
self.client.unassign_pin_code_from_user(
user_id=user_id,
).success_or_raise()
assert resp.data is None
)
@responses.activate
def test_create_user_group(self) -> None:
@ -406,7 +396,7 @@ class UserTests(UnifiAccessTests):
resp = self.client.create_user_group(
name="Group Name", up_id=UserGroupId("013d05d3-7262-4908-ba69-badbbbf8f5a6")
).success_or_raise()
)
@responses.activate
def test_fetch_all_user_groups(self) -> None:
@ -429,7 +419,7 @@ class UserTests(UnifiAccessTests):
},
)
resp = self.client.fetch_all_user_groups().success_or_raise()
resp = self.client.fetch_all_user_groups()
@responses.activate
def test_fetch_user_group(self) -> None:
@ -452,9 +442,7 @@ class UserTests(UnifiAccessTests):
},
)
resp = self.client.fetch_user_group(
user_group_id=user_group_id
).success_or_raise()
resp = self.client.fetch_user_group(user_group_id=user_group_id)
@responses.activate
def test_update_user_group(self) -> None:
@ -474,11 +462,11 @@ class UserTests(UnifiAccessTests):
json={"code": "SUCCESS", "msg": "success"},
)
resp = self.client.update_user_group(
self.client.update_user_group(
user_group_id=user_group_id,
name="Group Name",
up_id=UserGroupId("013d05d3-7262-4908-ba69-badbbbf8f5a6"),
).success_or_raise()
)
@responses.activate
def test_delete_user_group(self) -> None:
@ -491,9 +479,7 @@ class UserTests(UnifiAccessTests):
json={"code": "SUCCESS", "msg": "success"},
)
resp = self.client.delete_user_group(
user_group_id=user_group_id
).success_or_raise()
self.client.delete_user_group(user_group_id=user_group_id)
@responses.activate
def test_assign_user_to_user_group(self) -> None:
@ -514,13 +500,13 @@ class UserTests(UnifiAccessTests):
json={"code": "SUCCESS", "msg": "success"},
)
resp = self.client.assign_user_to_user_group(
self.client.assign_user_to_user_group(
user_group_id=user_group_id,
users=[
UserId("7c6e9102-acb7-4b89-8ed4-7561e6fb706c"),
UserId("fd63bc4c-52e0-4dbf-a699-e1233339c73b"),
],
).success_or_raise()
)
@responses.activate
def test_unassign_user_from_user_group(self) -> None:
@ -541,13 +527,13 @@ class UserTests(UnifiAccessTests):
json={"code": "SUCCESS", "msg": "success"},
)
resp = self.client.unassign_user_from_user_group(
self.client.unassign_user_from_user_group(
user_group_id=user_group_id,
users=[
UserId("7c6e9102-acb7-4b89-8ed4-7561e6fb706c"),
UserId("fd63bc4c-52e0-4dbf-a699-e1233339c73b"),
],
).success_or_raise()
)
@responses.activate
def test_fetch_users_in_a_user_group(self) -> None:
@ -583,7 +569,7 @@ class UserTests(UnifiAccessTests):
resp = self.client.fetch_users_in_a_user_group(
user_group_id=user_group_id,
).success_or_raise()
)
@responses.activate
def test_fetch_all_users_in_a_user_group(self) -> None:
@ -619,7 +605,7 @@ class UserTests(UnifiAccessTests):
resp = self.client.fetch_all_users_in_a_user_group(
user_group_id=user_group_id,
).success_or_raise()
)
@responses.activate
def test_fetch_the_access_policies_assigned_to_a_user(self) -> None:
@ -659,7 +645,7 @@ class UserTests(UnifiAccessTests):
resp = self.client.fetch_the_access_policies_assigned_to_a_user(
user_id=user_id, only_user_policies=False
).success_or_raise()
)
@responses.activate
def test_assign_access_policy_to_user_group(self) -> None:
@ -677,10 +663,10 @@ class UserTests(UnifiAccessTests):
json={"code": "SUCCESS", "data": None, "msg": "success"},
)
resp = self.client.assign_access_policy_to_user_group(
self.client.assign_access_policy_to_user_group(
user_group_id=user_group_id,
access_policy_ids=[AccessPolicyId("bbe48a65-2ac1-4bf6-bd65-bc8f9ee7fb75")],
).success_or_raise()
)
@responses.activate
def test_fetch_the_access_policies_assigned_to_a_user_group(self) -> None:
@ -717,4 +703,4 @@ class UserTests(UnifiAccessTests):
resp = self.client.fetch_the_access_policies_assigned_to_a_user_group(
user_group_id=user_group_id,
).success_or_raise()
)

View File

@ -177,8 +177,8 @@ class VisitorTests(UnifiAccessTests):
id=DoorGroupId("d5573467-d6b3-4e8f-8e48-8a322b91664a")
),
],
).success_or_raise()
assert resp.data.id == "fbe8d920-47d3-4cfd-bda7-bf4b0e26f73c"
)
assert resp.id == "fbe8d920-47d3-4cfd-bda7-bf4b0e26f73c"
@responses.activate
def test_fetch_visitor(self) -> None:
@ -226,7 +226,7 @@ class VisitorTests(UnifiAccessTests):
},
)
resp = self.client.fetch_visitor(visitor_id=visitor_id).success_or_raise()
resp = self.client.fetch_visitor(visitor_id=visitor_id)
# TODO: verify correctness of data?
@responses.activate
@ -279,9 +279,7 @@ class VisitorTests(UnifiAccessTests):
},
)
resp = self.client.fetch_all_visitors(
page_num=1, page_size=25
).success_or_raise()
resp = self.client.fetch_all_visitors(page_num=1, page_size=25)
assert resp.pagination
# TODO: verify correctness of data?
@ -335,7 +333,7 @@ class VisitorTests(UnifiAccessTests):
},
)
resp = self.client.fetch_all_visitors(keyword="H").success_or_raise()
resp = self.client.fetch_all_visitors(keyword="H")
assert resp.pagination
# TODO: verify correctness of data?
@ -407,7 +405,7 @@ class VisitorTests(UnifiAccessTests):
FetchAllVisitorsExpansion.NFC_CARD,
FetchAllVisitorsExpansion.PIN_CODE,
]
).success_or_raise()
)
assert resp.pagination
# TODO: verify correctness of data?
@ -452,9 +450,7 @@ class VisitorTests(UnifiAccessTests):
},
)
resp = self.client.fetch_all_visitors(
expand=[FetchAllVisitorsExpansion.NONE]
).success_or_raise()
resp = self.client.fetch_all_visitors(expand=[FetchAllVisitorsExpansion.NONE])
assert resp.pagination
# TODO: verify correctness of data?
@ -618,7 +614,7 @@ class VisitorTests(UnifiAccessTests):
id=DoorGroupId("d5573467-d6b3-4e8f-8e48-8a322b91664a")
),
],
).success_or_raise()
)
@responses.activate
def test_delete_visitor(self) -> None:
@ -633,11 +629,7 @@ class VisitorTests(UnifiAccessTests):
json={"code": "SUCCESS", "data": None, "msg": "success"},
)
resp = self.client.delete_visitor(
visitor_id=visitor_id,
is_force=True,
).success_or_raise()
# TODO: verify correctness of data?
self.client.delete_visitor(visitor_id=visitor_id, is_force=True)
@responses.activate
def test_assign_nfc_card_to_visitor(self) -> None:
@ -657,13 +649,13 @@ class VisitorTests(UnifiAccessTests):
json={"code": "SUCCESS", "msg": "success"},
)
resp = self.client.assign_nfc_card_to_visitor(
self.client.assign_nfc_card_to_visitor(
visitor_id=visitor_id,
token=NfcCardToken(
"d27822fc682b478dc637c6db01813e465174df6e54ca515d8427db623cfda1d0"
),
force_add=True,
).success_or_raise()
)
@responses.activate
def test_unassign_nfc_card_from_visitor(self) -> None:
@ -682,12 +674,12 @@ class VisitorTests(UnifiAccessTests):
json={"code": "SUCCESS", "msg": "success"},
)
resp = self.client.unassign_nfc_card_from_visitor(
self.client.unassign_nfc_card_from_visitor(
visitor_id=visitor_id,
token=NfcCardToken(
"d27822fc682b478dc637c6db01813e465174df6e54ca515d8427db623cfda1d0"
),
).success_or_raise()
)
@responses.activate
def test_assign_pin_code_to_visitor(self) -> None:
@ -702,10 +694,10 @@ class VisitorTests(UnifiAccessTests):
json={"code": "SUCCESS", "msg": "success"},
)
resp = self.client.assign_pin_code_to_visitor(
self.client.assign_pin_code_to_visitor(
visitor_id=visitor_id,
pin_code="57301208",
).success_or_raise()
)
@responses.activate
def test_unassign_pin_code_from_visitor(self) -> None:
@ -719,6 +711,4 @@ class VisitorTests(UnifiAccessTests):
json={"code": "SUCCESS", "msg": "success"},
)
resp = self.client.unassign_pin_code_from_visitor(
visitor_id=visitor_id
).success_or_raise()
self.client.unassign_pin_code_from_visitor(visitor_id=visitor_id)