From 6cc8915bf1cdfffef7fbeb3c5673638fadb4157d Mon Sep 17 00:00:00 2001 From: Adam Goldsmith Date: Sat, 16 Nov 2024 01:38:07 -0500 Subject: [PATCH] Simplify return types from api methods, always raise exceptions --- src/unifi_access/_client.py | 349 +++++++++++++-------------- tests/live/test_access_policy.py | 60 ++--- tests/live/test_user_lifecycle.py | 30 +-- tests/live/test_visitor_lifecycle.py | 66 ++--- tests/test_access_policy.py | 58 ++--- tests/test_credential.py | 30 +-- tests/test_device.py | 4 +- tests/test_space.py | 38 ++- tests/test_system_log.py | 2 +- tests/test_unifi_identity.py | 20 +- tests/test_user.py | 76 +++--- tests/test_visitor.py | 42 ++-- 12 files changed, 331 insertions(+), 444 deletions(-) diff --git a/src/unifi_access/_client.py b/src/unifi_access/_client.py index a6ebb07..713c5a3 100644 --- a/src/unifi_access/_client.py +++ b/src/unifi_access/_client.py @@ -7,9 +7,9 @@ import requests from pydantic import ( ConfigDict, Field, + RootModel, TypeAdapter, ) -from typing_extensions import TypeAliasType from unifi_access.schemas import ( AccessPolicy, @@ -196,8 +196,7 @@ class ResponsePagination(ForbidExtraBaseModel): total: int -# TODO: this has nicer syntax in Python 3.12, but not currently -# supported in Pydantic +# TODO: this has nicer syntax in Python 3.12, but not currently supported in Pydantic ResponseDataType = TypeVar("ResponseDataType") @@ -214,12 +213,10 @@ class SuccessResponse(ForbidExtraBaseModel, Generic[ResponseDataType]): return self -# TODO: this is natively supported by the `type` keyword in 3.12 -Response = TypeAliasType( - "Response", - SuccessResponse[ResponseDataType] | ErrorResponse, - type_params=(ResponseDataType,), -) +class Response(RootModel[SuccessResponse[ResponseDataType] | ErrorResponse]): + @classmethod + def validate_and_unwrap(cls, r: requests.Response) -> ResponseDataType: + return cls.model_validate_json(r.content).root.success_or_raise().data class RequestPagination(ForbidExtraBaseModel): @@ -258,7 +255,7 @@ class AccessClient: user_email: str | None = None, employee_number: str | None = None, onboard_time: UnixTimestampDateTime | None = None, - ) -> Response[User]: + ) -> User: """3.2 User Registration""" class RegisterUserRequest(ForbidExtraBaseModel): @@ -278,7 +275,7 @@ class AccessClient: r = self._session.post(f"{self._base_url}/users", json=body) # TODO: not the complete User - return TypeAdapter(Response[User]).validate_json(r.content) + return Response[User].validate_and_unwrap(r) def update_user( # noqa: PLR0913 self, @@ -289,7 +286,7 @@ class AccessClient: employee_number: str | None = None, onboard_time: UnixTimestampDateTime | None = None, status: UserStatus | None = None, - ) -> Response[None]: + ) -> None: """3.3 Update User""" class UpdateUserRequest(ForbidExtraBaseModel): @@ -311,23 +308,23 @@ class AccessClient: r = self._session.put(f"{self._base_url}/users/{user_id}", json=body) # TODO: maybe not the complete User - return TypeAdapter(Response[None]).validate_json(r.content) + return Response[None].validate_and_unwrap(r) def fetch_user( self, user_id: UserId, expand_access_policies: bool = False - ) -> Response[FullUser]: + ) -> FullUser: """3.4 Fetch User""" params = {"expand[]": "access_policy"} if expand_access_policies else {} r = self._session.get(f"{self._base_url}/users/{user_id}", params=params) - return TypeAdapter(Response[FullUser]).validate_json(r.content) + return Response[FullUser].validate_and_unwrap(r) def fetch_all_users( self, expand_access_policies: bool = False, page_num: int | None = None, page_size: int | None = None, - ) -> Response[list[FullUser]]: + ) -> SuccessResponse[list[FullUser]]: """3.5 Fetch All Users""" class FetchAllUsersParams(RequestPagination): @@ -341,22 +338,24 @@ class AccessClient: ).model_dump(exclude_none=True, by_alias=True) r = self._session.get(f"{self._base_url}/users", params=params) - return TypeAdapter(Response[list[FullUser]]).validate_json(r.content) + return ( + Response[list[FullUser]].model_validate_json(r.content).root + ).success_or_raise() def assign_access_policy_to_user( self, user_id: UserId, access_policy_ids: list[AccessPolicyId] - ) -> Response[None]: + ) -> None: """3.6 Assign Access Policy to User""" body = {"access_policy_ids": access_policy_ids} r = self._session.put( f"{self._base_url}/users/{user_id}/access_policies", json=body ) - return TypeAdapter(Response[None]).validate_json(r.content) + return Response[None].validate_and_unwrap(r) def assign_nfc_card_to_user( self, user_id: UserId, token: NfcCardToken, force_add: bool | None = None - ) -> Response[None]: + ) -> None: """3.7 Assign NFC Card to User""" body = AssignNfcCardRequest(token=token, force_add=force_add).model_dump( @@ -364,37 +363,35 @@ class AccessClient: ) r = self._session.put(f"{self._base_url}/users/{user_id}/nfc_cards", json=body) - return TypeAdapter(Response[None]).validate_json(r.content) + return Response[None].validate_and_unwrap(r) - def unassign_nfc_card_from_user( - self, user_id: UserId, token: NfcCardToken - ) -> Response[None]: + def unassign_nfc_card_from_user(self, user_id: UserId, token: NfcCardToken) -> None: """3.8 Unassign NFC Card from User""" body = {"token": token} r = self._session.put( f"{self._base_url}/users/{user_id}/nfc_cards/delete", json=body ) - return TypeAdapter(Response[None]).validate_json(r.content) + return Response[None].validate_and_unwrap(r) - def assign_pin_code_to_user(self, user_id: UserId, pin_code: str) -> Response[None]: + def assign_pin_code_to_user(self, user_id: UserId, pin_code: str) -> None: """3.9 Assign PIN Code to User""" body = {"pin_code": pin_code} r = self._session.put(f"{self._base_url}/users/{user_id}/pin_codes", json=body) - return TypeAdapter(Response[None]).validate_json(r.content) + return Response[None].validate_and_unwrap(r) - def unassign_pin_code_from_user(self, user_id: UserId) -> Response[None]: + def unassign_pin_code_from_user(self, user_id: UserId) -> None: """3.10 Unassign PIN Code from User""" r = self._session.delete(f"{self._base_url}/users/{user_id}/pin_codes") - return TypeAdapter(Response[None]).validate_json(r.content) + return Response[None].validate_and_unwrap(r) def create_user_group( self, name: str, up_id: UserGroupId | None = None, - ) -> Response[UserGroup | None]: + ) -> UserGroup | None: """3.11 Create User Group !!! note "The docs do not describe this as returning any data" Returns None if the user group already existed @@ -409,29 +406,29 @@ class AccessClient: ) r = self._session.post(f"{self._base_url}/user_groups", json=body) - return TypeAdapter(Response[UserGroup | None]).validate_json(r.content) + return Response[UserGroup | None].validate_and_unwrap(r) - def fetch_all_user_groups(self) -> Response[list[UserGroup]]: + def fetch_all_user_groups(self) -> list[UserGroup]: """3.12 Fetch All User Groupes""" r = self._session.get(f"{self._base_url}/user_groups") - return TypeAdapter(Response[list[UserGroup]]).validate_json(r.content) + return Response[list[UserGroup]].validate_and_unwrap(r) def fetch_user_group( self, user_group_id: UserGroupId, - ) -> Response[UserGroup]: + ) -> UserGroup: """3.13 Fetch User Group""" r = self._session.get(f"{self._base_url}/user_groups/{user_group_id}") - return TypeAdapter(Response[UserGroup]).validate_json(r.content) + return Response[UserGroup].validate_and_unwrap(r) def update_user_group( self, user_group_id: UserGroupId, name: str, up_id: UserGroupId | None = None, - ) -> Response[None]: + ) -> None: """3.14 Update User Group""" class UpdateUserGroupRequest(ForbidExtraBaseModel): @@ -445,72 +442,70 @@ class AccessClient: r = self._session.put( f"{self._base_url}/user_groups/{user_group_id}", json=body ) - return TypeAdapter(Response[None]).validate_json(r.content) + return Response[None].validate_and_unwrap(r) def delete_user_group( self, user_group_id: UserGroupId, - ) -> Response[None]: + ) -> None: """3.15 Delete User Group""" r = self._session.delete(f"{self._base_url}/user_groups/{user_group_id}") - return TypeAdapter(Response[None]).validate_json(r.content) + return Response[None].validate_and_unwrap(r) def assign_user_to_user_group( self, user_group_id: UserGroupId, users: list[UserId], - ) -> Response[None]: + ) -> None: """3.16 Assign User to User Group""" r = self._session.post( f"{self._base_url}/user_groups/{user_group_id}/users", json=users ) - return TypeAdapter(Response[None]).validate_json(r.content) + return Response[None].validate_and_unwrap(r) def unassign_user_from_user_group( self, user_group_id: UserGroupId, users: list[UserId], - ) -> Response[None]: + ) -> None: """3.17 Unassign User from User Group""" r = self._session.post( f"{self._base_url}/user_groups/{user_group_id}/users/delete", json=users ) - return TypeAdapter(Response[None]).validate_json(r.content) + return Response[None].validate_and_unwrap(r) def fetch_users_in_a_user_group( self, user_group_id: UserGroupId, - ) -> Response[list[User]]: + ) -> list[User]: """3.18 Fetch Users in a User Group""" r = self._session.get(f"{self._base_url}/user_groups/{user_group_id}/users") - return TypeAdapter(Response[list[User]]).validate_json(r.content) + return Response[list[User]].validate_and_unwrap(r) - def fetch_all_users_in_a_user_group( - self, user_group_id: UserGroupId - ) -> Response[list[User]]: + def fetch_all_users_in_a_user_group(self, user_group_id: UserGroupId) -> list[User]: """3.19 Fetch All Users in a User Group""" r = self._session.get(f"{self._base_url}/user_groups/{user_group_id}/users/all") - return TypeAdapter(Response[list[User]]).validate_json(r.content) + return Response[list[User]].validate_and_unwrap(r) def fetch_the_access_policies_assigned_to_a_user( self, user_id: UserId, only_user_policies: bool | None = None - ) -> Response[list[AccessPolicy]]: + ) -> list[AccessPolicy]: """3.20 Fetch the Access Policies Assigned to a User""" params = {"only_user_policies": "true" if only_user_policies else "false"} r = self._session.get( f"{self._base_url}/users/{user_id}/access_policies", params=params ) - return TypeAdapter(Response[list[AccessPolicy]]).validate_json(r.content) + return Response[list[AccessPolicy]].validate_and_unwrap(r) def assign_access_policy_to_user_group( self, user_group_id: UserGroupId, access_policy_ids: list[AccessPolicyId] - ) -> Response[None]: + ) -> None: """3.21 Assign Access Policy to User Group""" body = {"access_policy_ids": access_policy_ids} @@ -518,18 +513,18 @@ class AccessClient: r = self._session.put( f"{self._base_url}/user_groups/{user_group_id}/access_policies", json=body ) - return TypeAdapter(Response[None]).validate_json(r.content) + return Response[None].validate_and_unwrap(r) def fetch_the_access_policies_assigned_to_a_user_group( self, user_group_id: UserGroupId, - ) -> Response[list[AccessPolicy]]: + ) -> list[AccessPolicy]: """3.22 Fetch the Access Policies Assigned to a User Group""" r = self._session.get( f"{self._base_url}/user_groups/{user_group_id}/access_policies" ) - return TypeAdapter(Response[list[AccessPolicy]]).validate_json(r.content) + return Response[list[AccessPolicy]].validate_and_unwrap(r) def create_visitor( # noqa: PLR0913 self, @@ -544,7 +539,7 @@ class AccessClient: visitor_company: str | None = None, resources: Sequence[Resource] | None = None, week_schedule: WeekSchedule | None = None, - ) -> Response[Visitor]: + ) -> Visitor: """4.2 Create Visitor""" class CreateVisitorRequest(ForbidExtraBaseModel): @@ -587,16 +582,16 @@ class AccessClient: ).model_dump(exclude_none=True) r = self._session.post(f"{self._base_url}/visitors", json=body) - return TypeAdapter(Response[Visitor]).validate_json(r.content) + return Response[Visitor].validate_and_unwrap(r) def fetch_visitor( self, visitor_id: VisitorId, - ) -> Response[Visitor]: + ) -> Visitor: """4.3 Fetch Visitor""" r = self._session.get(f"{self._base_url}/visitors/{visitor_id}") - return TypeAdapter(Response[Visitor]).validate_json(r.content) + return Response[Visitor].validate_and_unwrap(r) def fetch_all_visitors( self, @@ -605,7 +600,7 @@ class AccessClient: expand: list[FetchAllVisitorsExpansion] | None = None, page_num: int | None = None, page_size: int | None = None, - ) -> Response[list[Visitor]]: + ) -> SuccessResponse[list[Visitor]]: """4.4 Fetch All Visitors""" class FetchAllVisitorsRequest(RequestPagination): @@ -634,7 +629,9 @@ class AccessClient: ).model_dump(exclude_none=True, by_alias=True) r = self._session.get(f"{self._base_url}/visitors", params=params) - return TypeAdapter(Response[list[Visitor]]).validate_json(r.content) + return ( + Response[list[Visitor]].model_validate_json(r.content).root + ).success_or_raise() def update_visitor( # noqa: PLR0913 self, @@ -650,7 +647,7 @@ class AccessClient: visitor_company: str | None = None, resources: Sequence[Resource] | None = None, week_schedule: WeekSchedule | None = None, - ) -> Response[Visitor]: + ) -> Visitor: """4.5 Update Visitor""" class UpdateVisitorRequest(ForbidExtraBaseModel): @@ -693,22 +690,22 @@ class AccessClient: ).model_dump(exclude_none=True) r = self._session.put(f"{self._base_url}/visitors/{visitor_id}", json=body) - return TypeAdapter(Response[Visitor]).validate_json(r.content) + return Response[Visitor].validate_and_unwrap(r) def delete_visitor( self, visitor_id: VisitorId, is_force: bool | None = None - ) -> Response[None]: + ) -> None: """4.6 Delete Visitor""" params = {"is_force": "true" if is_force else "false"} r = self._session.delete( f"{self._base_url}/visitors/{visitor_id}", params=params ) - return TypeAdapter(Response[None]).validate_json(r.content) + return Response[None].validate_and_unwrap(r) def assign_nfc_card_to_visitor( self, visitor_id: VisitorId, token: NfcCardToken, force_add: bool | None = None - ) -> Response[None]: + ) -> None: """4.7 Assign NFC Card to Visitor""" body = AssignNfcCardRequest(token=token, force_add=force_add).model_dump( exclude_none=True @@ -717,42 +714,40 @@ class AccessClient: r = self._session.put( f"{self._base_url}/visitors/{visitor_id}/nfc_cards", json=body ) - return TypeAdapter(Response[None]).validate_json(r.content) + return Response[None].validate_and_unwrap(r) def unassign_nfc_card_from_visitor( self, visitor_id: VisitorId, token: NfcCardToken - ) -> Response[None]: + ) -> None: """4.8 Unassign NFC Card from Visitor""" body = {"token": token} r = self._session.put( f"{self._base_url}/visitors/{visitor_id}/nfc_cards/delete", json=body ) - return TypeAdapter(Response[None]).validate_json(r.content) + return Response[None].validate_and_unwrap(r) - def assign_pin_code_to_visitor( - self, visitor_id: VisitorId, pin_code: str - ) -> Response[None]: + def assign_pin_code_to_visitor(self, visitor_id: VisitorId, pin_code: str) -> None: """4.9 Assign PIN Code to Visitor""" body = {"pin_code": pin_code} r = self._session.put( f"{self._base_url}/visitors/{visitor_id}/pin_codes", json=body ) - return TypeAdapter(Response[None]).validate_json(r.content) + return Response[None].validate_and_unwrap(r) - def unassign_pin_code_from_visitor(self, visitor_id: VisitorId) -> Response[None]: + def unassign_pin_code_from_visitor(self, visitor_id: VisitorId) -> None: """4.10 Unassign PIN Code from Visitor""" r = self._session.delete(f"{self._base_url}/visitors/{visitor_id}/pin_codes") - return TypeAdapter(Response[None]).validate_json(r.content) + return Response[None].validate_and_unwrap(r) def create_access_policy( self, name: str, schedule_id: ScheduleId, resources: Sequence[Resource] | None = None, - ) -> Response[AccessPolicy]: + ) -> AccessPolicy: """5.2 Create Access Policy""" class CreateAccessPolicyRequest(ForbidExtraBaseModel): @@ -765,7 +760,7 @@ class AccessClient: ).model_dump(exclude_none=True) r = self._session.post(f"{self._base_url}/access_policies", json=body) - return TypeAdapter(Response[AccessPolicy]).validate_json(r.content) + return Response[AccessPolicy].validate_and_unwrap(r) def update_access_policy( self, @@ -773,7 +768,7 @@ class AccessClient: name: str | None = None, resources: Sequence[Resource] | None = None, schedule_id: ScheduleId | None = None, - ) -> Response[AccessPolicy]: + ) -> AccessPolicy: """5.3 Update Access Policy""" class UpdateAccessPolicyRequest(ForbidExtraBaseModel): @@ -788,36 +783,34 @@ class AccessClient: r = self._session.put( f"{self._base_url}/access_policies/{access_policy_id}", json=body ) - return TypeAdapter(Response[AccessPolicy]).validate_json(r.content) + return Response[AccessPolicy].validate_and_unwrap(r) def delete_access_policy( self, access_policy_id: AccessPolicyId - ) -> Response[Literal["success"]]: + ) -> Literal["success"]: """5.4 Delete Access Policy""" r = self._session.delete(f"{self._base_url}/access_policies/{access_policy_id}") - return TypeAdapter(Response[Literal["success"]]).validate_json(r.content) + return Response[Literal["success"]].validate_and_unwrap(r) - def fetch_access_policy( - self, access_policy_id: AccessPolicyId - ) -> Response[AccessPolicy]: + def fetch_access_policy(self, access_policy_id: AccessPolicyId) -> AccessPolicy: """5.5 Fetch Access Policy""" r = self._session.get(f"{self._base_url}/access_policies/{access_policy_id}") - return TypeAdapter(Response[AccessPolicy]).validate_json(r.content) + return Response[AccessPolicy].validate_and_unwrap(r) - def fetch_all_access_policies(self) -> Response[list[AccessPolicy]]: + def fetch_all_access_policies(self) -> list[AccessPolicy]: """5.6 Fetch All Access Policies""" r = self._session.get(f"{self._base_url}/access_policies") - return TypeAdapter(Response[list[AccessPolicy]]).validate_json(r.content) + return Response[list[AccessPolicy]].validate_and_unwrap(r) def create_holiday_group( self, name: str, description: str | None = None, holidays: list[PartialHoliday] | None = None, - ) -> Response[HolidayGroup]: + ) -> HolidayGroup: """5.8 Create Holiday Group""" class CreateHolidayGroupRequest(ForbidExtraBaseModel): @@ -832,7 +825,7 @@ class AccessClient: r = self._session.post( f"{self._base_url}/access_policies/holiday_groups", json=body ) - return TypeAdapter(Response[HolidayGroup]).validate_json(r.content) + return Response[HolidayGroup].validate_and_unwrap(r) def update_holiday_group( self, @@ -841,7 +834,7 @@ class AccessClient: name: str | None = None, description: str | None = None, holidays: list[PartialHoliday] | None = None, - ) -> Response[HolidayGroup]: + ) -> HolidayGroup: """5.9 Update Holiday Group""" class UpdateHolidayGroupRequest(ForbidExtraBaseModel): @@ -857,35 +850,31 @@ class AccessClient: f"{self._base_url}/access_policies/holiday_groups/{holiday_group_id}", json=body, ) - return TypeAdapter(Response[HolidayGroup]).validate_json(r.content) + return Response[HolidayGroup].validate_and_unwrap(r) def delete_holiday_group( self, holiday_group_id: HolidayGroupId - ) -> Response[Literal["success"]]: + ) -> Literal["success"]: """5.10 Delete Holiday Group""" r = self._session.delete( f"{self._base_url}/access_policies/holiday_groups/{holiday_group_id}" ) - return TypeAdapter(Response[Literal["success"]]).validate_json(r.content) + return Response[Literal["success"]].validate_and_unwrap(r) - def fetch_holiday_group( - self, holiday_group_id: HolidayGroupId - ) -> Response[HolidayGroup]: + def fetch_holiday_group(self, holiday_group_id: HolidayGroupId) -> HolidayGroup: """5.11 Fetch Holiday Group""" r = self._session.get( f"{self._base_url}/access_policies/holiday_groups/{holiday_group_id}" ) - return TypeAdapter(Response[HolidayGroup]).validate_json(r.content) + return Response[HolidayGroup].validate_and_unwrap(r) - def fetch_all_holiday_groups(self) -> Response[list[FetchAllHolidayGroupsResponse]]: + def fetch_all_holiday_groups(self) -> list[FetchAllHolidayGroupsResponse]: """5.12 Fetch All Holiday Groups""" r = self._session.get(f"{self._base_url}/access_policies/holiday_groups") - return TypeAdapter(Response[list[FetchAllHolidayGroupsResponse]]).validate_json( - r.content - ) + return Response[list[FetchAllHolidayGroupsResponse]].validate_and_unwrap(r) def create_schedule( self, @@ -894,7 +883,7 @@ class AccessClient: week_schedule: WeekSchedule, holiday_group_id: HolidayGroupId | None = None, holiday_schedule: list[TimePeriod] | None = None, - ) -> Response[Schedule]: + ) -> Schedule: """5.14 Create Schedule""" class CreateScheduleRequest(ForbidExtraBaseModel): @@ -911,7 +900,7 @@ class AccessClient: ).model_dump(exclude_none=True) r = self._session.post(f"{self._base_url}/access_policies/schedules", json=body) - return TypeAdapter(Response[Schedule]).validate_json(r.content) + return Response[Schedule].validate_and_unwrap(r) def update_schedule( self, @@ -922,7 +911,7 @@ class AccessClient: week_schedule: WeekSchedule | None = None, holiday_group_id: HolidayGroupId | None = None, holiday_schedule: list[TimePeriod] | None = None, - ) -> Response[dict[Any, Any]]: # TODO: this should really return dict[Never, Never] + ) -> dict[Any, Any]: # TODO: this should really return dict[Never, Never] """5.15 Update Schedule""" class UpdateScheduleRequest(ForbidExtraBaseModel): @@ -941,40 +930,38 @@ class AccessClient: r = self._session.put( f"{self._base_url}/access_policies/schedules/{schedule_id}", json=body ) - return TypeAdapter(Response[dict[Any, Any]]).validate_json(r.content) + return Response[dict[Any, Any]].validate_and_unwrap(r) # NOTE: the API docs incorrectly define the response as a subset of `Schedule` - def fetch_schedule(self, schedule_id: ScheduleId) -> Response[Schedule]: + def fetch_schedule(self, schedule_id: ScheduleId) -> Schedule: """5.16 Fetch Schedule""" r = self._session.get( f"{self._base_url}/access_policies/schedules/{schedule_id}" ) - return TypeAdapter(Response[Schedule]).validate_json(r.content) + return Response[Schedule].validate_and_unwrap(r) - def fetch_all_schedules(self) -> Response[list[FetchAllSchedulesResponse]]: + def fetch_all_schedules(self) -> list[FetchAllSchedulesResponse]: """5.17 Fetch All Schedules""" r = self._session.get(f"{self._base_url}/access_policies/schedules") - return TypeAdapter(Response[list[FetchAllSchedulesResponse]]).validate_json( - r.content - ) + return Response[list[FetchAllSchedulesResponse]].validate_and_unwrap(r) - def delete_schedule(self, schedule_id: ScheduleId) -> Response[Literal["success"]]: + def delete_schedule(self, schedule_id: ScheduleId) -> Literal["success"]: """5.18 Delete Schedule""" r = self._session.delete( f"{self._base_url}/access_policies/schedules/{schedule_id}" ) - return TypeAdapter(Response[Literal["success"]]).validate_json(r.content) + return Response[Literal["success"]].validate_and_unwrap(r) - def generate_pin_code(self) -> Response[str]: + def generate_pin_code(self) -> str: r = self._session.post(f"{self._base_url}/credentials/pin_codes") - return TypeAdapter(Response["str"]).validate_json(r.content) + return Response["str"].validate_and_unwrap(r) def begin_enroll_card( self, device_id: DeviceId, reset_ua_card: bool | None = None - ) -> Response[EnrollNfcCardResponse]: + ) -> EnrollNfcCardResponse: """6.2 Enroll NFC Card""" class EnrollNFCCardRequest(ForbidExtraBaseModel): @@ -988,81 +975,79 @@ class AccessClient: r = self._session.post( f"{self._base_url}/credentials/nfc_cards/sessions", json=body ) - return TypeAdapter(Response[EnrollNfcCardResponse]).validate_json(r.content) + return Response[EnrollNfcCardResponse].validate_and_unwrap(r) def fetch_enroll_card_status( self, session_id: NfcCardEnrollmentSessionId, - ) -> Response[NfcCardEnrollmentStatus]: + ) -> NfcCardEnrollmentStatus: """6.3 Fetch NFC Card Enrollment Status""" r = self._session.get( f"{self._base_url}/credentials/nfc_cards/sessions/{session_id}" ) - return TypeAdapter(Response[NfcCardEnrollmentStatus]).validate_json(r.content) + return Response[NfcCardEnrollmentStatus].validate_and_unwrap(r) def remove_enrollment_session( self, session_id: NfcCardEnrollmentSessionId, - ) -> Response[Literal["success"]]: + ) -> Literal["success"]: """6.4 Remove a Session Created for NFC Card Enrollment""" r = self._session.delete( f"{self._base_url}/credentials/nfc_cards/sessions/{session_id}" ) - return TypeAdapter(Response[str]).validate_json(r.content) + return Response[Literal["success"]].validate_and_unwrap(r) - def fetch_nfc_card(self, nfc_card_token: NfcCardToken) -> Response[NfcCard]: + def fetch_nfc_card(self, nfc_card_token: NfcCardToken) -> NfcCard: """6.7 Fetch NFC Card""" r = self._session.get( f"{self._base_url}/credentials/nfc_cards/tokens/{nfc_card_token}" ) - return TypeAdapter(Response[NfcCard]).validate_json(r.content) + return Response[NfcCard].validate_and_unwrap(r) def fetch_all_nfc_cards( self, page_num: int | None = None, page_size: int | None = None - ) -> Response[list[NfcCard]]: + ) -> SuccessResponse[list[NfcCard]]: """6.8 Fetch NFC Cards""" params = RequestPagination(page_num=page_num, page_size=page_size) r = self._session.get( f"{self._base_url}/credentials/nfc_cards/tokens", params=params ) - return TypeAdapter(Response[list[NfcCard]]).validate_json(r.content) + return ( + Response[list[NfcCard]].model_validate_json(r.content).root + ).success_or_raise() - def delete_nfc_card( - self, nfc_card_token: NfcCardToken - ) -> Response[Literal["success"]]: + def delete_nfc_card(self, nfc_card_token: NfcCardToken) -> Literal["success"]: """6.7 Fetch NFC Card""" r = self._session.delete( f"{self._base_url}/credentials/nfc_cards/tokens/{nfc_card_token}" ) - return TypeAdapter(Response[Literal["success"]]).validate_json(r.content) + return Response[Literal["success"]].validate_and_unwrap(r) - def fetch_door_group_topology(self) -> Response[DoorGroupTopology]: + def fetch_door_group_topology(self) -> list[DoorGroupTopology]: """7.1 Fetch Door Group Topology""" r = self._session.get(f"{self._base_url}/door_groups/topology") - return TypeAdapter(Response[list[DoorGroupTopology]]).validate_json(r.content) + return Response[list[DoorGroupTopology]].validate_and_unwrap(r) def create_door_group( self, group_name: str, resources: list[ResourceId] - ) -> Response[DoorGroup[Resource]]: + ) -> DoorGroup[Resource]: """7.2 Create Door Group""" body = {"group_name": group_name, "resources": resources} r = self._session.post(f"{self._base_url}/door_groups", json=body) - return TypeAdapter(Response[DoorGroup[Resource]]).validate_json(r.content) + return Response[DoorGroup[Resource]].validate_and_unwrap(r) - def fetch_door_group( - self, door_group_id: DoorGroupId - ) -> Response[DoorGroup[NamedResource]]: + def fetch_door_group(self, door_group_id: DoorGroupId) -> DoorGroup[NamedResource]: """7.3 Fetch Door Group""" r = self._session.get(f"{self._base_url}/door_groups/{door_group_id}") - return TypeAdapter(Response[DoorGroup[NamedResource]]).validate_json(r.content) + return Response[DoorGroup[NamedResource]].validate_and_unwrap(r) def update_door_group( self, door_group_id: DoorGroupId, group_name: str | None = None, resources: list[ResourceId] | None = None, - ) -> Response[DoorGroup[Resource]]: + ) -> DoorGroup[Resource]: """7.4 Update Door Group""" class UpdateDoorGroupRequest(ForbidExtraBaseModel): @@ -1076,39 +1061,37 @@ class AccessClient: r = self._session.put( f"{self._base_url}/door_groups/{door_group_id}", json=body ) - return TypeAdapter(Response[DoorGroup[Resource]]).validate_json(r.content) + return Response[DoorGroup[Resource]].validate_and_unwrap(r) - def fetch_all_door_groups(self) -> Response[list[DoorGroup[Resource]]]: + def fetch_all_door_groups(self) -> list[DoorGroup[Resource]]: """7.5 Fetch All Door Groups""" r = self._session.get(f"{self._base_url}/door_groups") - return TypeAdapter(Response[list[DoorGroup[Resource]]]).validate_json(r.content) + return Response[list[DoorGroup[Resource]]].validate_and_unwrap(r) - def delete_door_group( - self, door_group_id: DoorGroupId - ) -> Response[Literal["success"]]: + def delete_door_group(self, door_group_id: DoorGroupId) -> Literal["success"]: """7.6 Delete Door Group""" r = self._session.get(f"{self._base_url}/door_groups/{door_group_id}") - return TypeAdapter(Response[Literal["success"]]).validate_json(r.content) + return Response[Literal["success"]].validate_and_unwrap(r) - def fetch_door(self, door_id: DoorId) -> Response[Door]: + def fetch_door(self, door_id: DoorId) -> Door: """7.7 Fetch Door""" r = self._session.get(f"{self._base_url}/doors/{door_id}") - return TypeAdapter(Response[Door]).validate_json(r.content) + return Response[Door].validate_and_unwrap(r) - def fetch_all_doors(self) -> Response[list[Door]]: + def fetch_all_doors(self) -> list[Door]: """7.8 Fetch All Doors""" r = self._session.get(f"{self._base_url}/doors") - return TypeAdapter(Response[list[Door]]).validate_json(r.content) + return Response[list[Door]].validate_and_unwrap(r) - def unlock_door(self, door_id: DoorId) -> Response[Literal["success"]]: + def unlock_door(self, door_id: DoorId) -> Literal["success"]: """7.9 Remote Door Unlocking""" r = self._session.put(f"{self._base_url}/doors/{door_id}/unlock") - return TypeAdapter(Response[Literal["success"]]).validate_json(r.content) + return Response[Literal["success"]].validate_and_unwrap(r) # XXX: could do a better job of typing for interval def set_temporary_door_locking_rule( self, door_id: DoorId, type_: DoorLockingRuleType, interval: int | None = None - ) -> Response[Literal["success"]]: + ) -> Literal["success"]: """7.10 Set Temporary Door Locking Rule""" class SetTemporaryDoorLockingRuleRequest(ForbidExtraBaseModel): @@ -1119,36 +1102,36 @@ class AccessClient: type=type_, interval=interval ).model_dump(exclude_none=True) r = self._session.put(f"{self._base_url}/doors/{door_id}/lock_rule", json=body) - return TypeAdapter(Response[Literal["success"]]).validate_json(r.content) + return Response[Literal["success"]].validate_and_unwrap(r) - def fetch_door_locking_rule(self, door_id: DoorId) -> Response[DoorLockingRule]: + def fetch_door_locking_rule(self, door_id: DoorId) -> DoorLockingRule: """7.11 Fetch Door Locking Rule""" r = self._session.get(f"{self._base_url}/doors/{door_id}/lock_rule") - return TypeAdapter(Response[DoorLockingRule]).validate_json(r.content) + return Response[DoorLockingRule].validate_and_unwrap(r) # TODO: why is this two bools instead of an enum? Could probably be improved. def set_door_emergency_status( self, lockdown: bool = False, evacuation: bool = False, - ) -> Response[Literal["success"]]: + ) -> Literal["success"]: """7.12 Set Door Emergency Status""" body = DoorEmergencyStatus(lockdown=lockdown, evacuation=evacuation).model_dump( exclude_none=True ) r = self._session.put(f"{self._base_url}/doors/settings/emergency", json=body) - return TypeAdapter(Response[Literal["success"]]).validate_json(r.content) + return Response[Literal["success"]].validate_and_unwrap(r) - def fetch_door_emergency_status(self) -> Response[DoorEmergencyStatus]: + def fetch_door_emergency_status(self) -> DoorEmergencyStatus: """7.13 Fetch Door Emergency Status""" r = self._session.get(f"{self._base_url}/doors/settings/emergency") - return TypeAdapter(Response[DoorEmergencyStatus]).validate_json(r.content) + return Response[DoorEmergencyStatus].validate_and_unwrap(r) - def fetch_devices(self) -> Response[list[Device]]: + def fetch_devices(self) -> list[Device]: """8.1 Fetch Devices""" r = self._session.get(f"{self._base_url}/devices") - return TypeAdapter(Response[list[Device]]).validate_json(r.content) + return Response[list[Device]].validate_and_unwrap(r) def fetch_system_logs( # noqa: PLR0913 self, @@ -1158,7 +1141,7 @@ class AccessClient: actor_id: ActorId | None = None, page_num: int | None = None, page_size: int | None = None, - ) -> Response[FetchSystemLogsResponse]: + ) -> SuccessResponse[FetchSystemLogsResponse]: """9.2 Fetch System Logs""" params = RequestPagination(page_num=page_num, page_size=page_size).model_dump( exclude_none=True @@ -1180,7 +1163,9 @@ class AccessClient: r = self._session.post( f"{self._base_url}/system/logs", params=params, json=body ) - return TypeAdapter(Response[FetchSystemLogsResponse]).validate_json(r.content) + return ( + Response[FetchSystemLogsResponse].model_validate_json(r.content) + ).root.success_or_raise() def export_system_logs( self, @@ -1212,7 +1197,7 @@ class AccessClient: def send_unifi_identity_invitations( self, users: list[IdentityInvitationUser] - ) -> Response[list[IdentityInvitationEmailFailure]]: + ) -> list[IdentityInvitationEmailFailure]: """10.1 Send UniFi Identity Invitations""" body = TypeAdapter(list[IdentityInvitationUser]).dump_python( users, exclude_none=True @@ -1221,13 +1206,11 @@ class AccessClient: r = self._session.post( f"{self._base_url}/users/identity/invitations", json=body ) - return TypeAdapter( - Response[list[IdentityInvitationEmailFailure]] - ).validate_json(r.content) + return Response[list[IdentityInvitationEmailFailure]].validate_and_unwrap(r) def fetch_available_resources( self, resource_type: Iterable[IdentityResourceType] | None = None - ) -> Response[dict[IdentityResourceType, list[IdentityResource]]]: + ) -> dict[IdentityResourceType, list[IdentityResource]]: """10.2 Fetch Available Resources""" params = {"resource_type": ",".join(resource_type)} if resource_type else None @@ -1235,41 +1218,41 @@ class AccessClient: r = self._session.get( f"{self._base_url}/users/identity/assignments", params=params ) - return TypeAdapter( - Response[dict[IdentityResourceType, list[IdentityResource]]] - ).validate_json(r.content) + return Response[ + dict[IdentityResourceType, list[IdentityResource]] + ].validate_and_unwrap(r) def assign_resources_to_user( self, user_id: UserId, resource_type: IdentityResourceType, resource_ids: list[IdentityResourceId], - ) -> Response[None]: + ) -> None: """10.3 Assign Resources to Users""" body = {"resource_type": resource_type, "resource_ids": resource_ids} r = self._session.post( f"{self._base_url}/users/{user_id}/identity/assignments", json=body ) - return TypeAdapter(Response[None]).validate_json(r.content) + return Response[None].validate_and_unwrap(r) def fetch_resources_assigned_to_user( self, user_id: UserId, - ) -> Response[dict[IdentityResourceType, list[IdentityResource]]]: + ) -> dict[IdentityResourceType, list[IdentityResource]]: """10.4 Fetch Resources Assigned to Users""" r = self._session.get(f"{self._base_url}/users/{user_id}/identity/assignments") - return TypeAdapter( - Response[dict[IdentityResourceType, list[IdentityResource]]] - ).validate_json(r.content) + return Response[ + dict[IdentityResourceType, list[IdentityResource]] + ].validate_and_unwrap(r) def assign_resources_to_user_group( self, user_group_id: UserGroupId, resource_type: IdentityResourceType, resource_ids: list[IdentityResourceId], - ) -> Response[None]: + ) -> None: """10.5 Assign Resources to User Groups""" body = {"resource_type": resource_type, "resource_ids": resource_ids} @@ -1277,17 +1260,17 @@ class AccessClient: f"{self._base_url}/user_groups/{user_group_id}/identity/assignments", json=body, ) - return TypeAdapter(Response[None]).validate_json(r.content) + return Response[None].validate_and_unwrap(r) def fetch_resources_assigned_to_user_group( self, user_group_id: UserGroupId, - ) -> Response[dict[IdentityResourceType, list[IdentityResource]]]: + ) -> dict[IdentityResourceType, list[IdentityResource]]: """10.6 Fetch Resources Assigned to User Groups""" r = self._session.get( f"{self._base_url}/user_groups/{user_group_id}/identity/assignments" ) - return TypeAdapter( - Response[dict[IdentityResourceType, list[IdentityResource]]] - ).validate_json(r.content) + return Response[ + dict[IdentityResourceType, list[IdentityResource]] + ].validate_and_unwrap(r) diff --git a/tests/live/test_access_policy.py b/tests/live/test_access_policy.py index 3901220..c1b9e17 100644 --- a/tests/live/test_access_policy.py +++ b/tests/live/test_access_policy.py @@ -9,11 +9,7 @@ from unifi_access.schemas._base import DoorResource @pytest.fixture def holiday_group(live_access_client: AccessClient) -> Generator[HolidayGroup]: - _holiday_group = ( - live_access_client.create_holiday_group("Test Holiday Group") - .success_or_raise() - .data - ) + _holiday_group = live_access_client.create_holiday_group("Test Holiday Group") yield _holiday_group live_access_client.delete_holiday_group(_holiday_group.id) @@ -22,22 +18,18 @@ def holiday_group(live_access_client: AccessClient) -> Generator[HolidayGroup]: def schedule( live_access_client: AccessClient, holiday_group: HolidayGroup ) -> Generator[Schedule]: - _schedule = ( - live_access_client.create_schedule( - "Test Schedule", - week_schedule=WeekSchedule( - sunday=[], - monday=[], - tuesday=[], - wednesday=[], - thursday=[], - friday=[], - saturday=[], - ), - holiday_group_id=holiday_group.id, - ) - .success_or_raise() - .data + _schedule = live_access_client.create_schedule( + "Test Schedule", + week_schedule=WeekSchedule( + sunday=[], + monday=[], + tuesday=[], + wednesday=[], + thursday=[], + friday=[], + saturday=[], + ), + holiday_group_id=holiday_group.id, ) yield _schedule live_access_client.delete_schedule(_schedule.id) @@ -47,10 +39,8 @@ def schedule( def access_policy( live_access_client: AccessClient, schedule: Schedule ) -> Generator[AccessPolicy]: - _access_policy = ( - live_access_client.create_access_policy("Test Access Policy", schedule.id) - .success_or_raise() - .data + _access_policy = live_access_client.create_access_policy( + "Test Access Policy", schedule.id ) yield _access_policy live_access_client.delete_access_policy(_access_policy.id) @@ -63,25 +53,17 @@ def test_access_policy_lifecycle( access_policy: AccessPolicy, ) -> None: assert access_policy - doors = live_access_client.fetch_all_doors().success_or_raise().data + doors = live_access_client.fetch_all_doors() resources = [DoorResource(id=door.id) for door in doors] - updated_access_policy = ( - live_access_client.update_access_policy(access_policy.id, resources=resources) - .success_or_raise() - .data + updated_access_policy = live_access_client.update_access_policy( + access_policy.id, resources=resources ) assert updated_access_policy.resources == resources - updated_holiday_group = ( - live_access_client.update_holiday_group( - holiday_group.id, description="Test Description" - ) - .success_or_raise() - .data + updated_holiday_group = live_access_client.update_holiday_group( + holiday_group.id, description="Test Description" ) assert updated_holiday_group.description == "Test Description" - holiday_groups = ( - live_access_client.fetch_all_holiday_groups().success_or_raise().data - ) + holiday_groups = live_access_client.fetch_all_holiday_groups() assert any(g.id == updated_holiday_group.id for g in holiday_groups) diff --git a/tests/live/test_user_lifecycle.py b/tests/live/test_user_lifecycle.py index f3ec300..f67615b 100644 --- a/tests/live/test_user_lifecycle.py +++ b/tests/live/test_user_lifecycle.py @@ -14,7 +14,7 @@ def user(live_access_client: AccessClient) -> User: # unique on the server, and I can't delete users. Maybe # should fetch user if it already exists? # TODO: add an onboard time? - return live_access_client.register_user("Test", "User").success_or_raise().data + return live_access_client.register_user("Test", "User") def test_user_lifecycle(live_access_client: AccessClient, user: User) -> None: @@ -24,7 +24,7 @@ def test_user_lifecycle(live_access_client: AccessClient, user: User) -> None: assert user.status == UserStatus.ACTIVE # Fetch user - fetched_user = live_access_client.fetch_user(user.id).success_or_raise().data + fetched_user = live_access_client.fetch_user(user.id) assert fetched_user.id == user.id assert fetched_user.first_name == "Test" assert fetched_user.last_name == "User" @@ -33,10 +33,10 @@ def test_user_lifecycle(live_access_client: AccessClient, user: User) -> None: # Update user live_access_client.update_user( user.id, last_name="Updated User", status=UserStatus.DEACTIVATED - ).success_or_raise() + ) # Fetch again, and check changes - fetched_user = live_access_client.fetch_user(user.id).success_or_raise().data + fetched_user = live_access_client.fetch_user(user.id) assert fetched_user.id == user.id assert fetched_user.first_name == "Test" assert fetched_user.last_name == "Updated User" @@ -44,7 +44,7 @@ def test_user_lifecycle(live_access_client: AccessClient, user: User) -> None: # Check for the user in full user list # TODO: test pagination - all_users = live_access_client.fetch_all_users().success_or_raise().data + all_users = live_access_client.fetch_all_users().data matching_user = next(u for u in all_users if u.id == user.id) assert matching_user.id == user.id assert matching_user.first_name == "Test" @@ -69,9 +69,7 @@ def make_user_group( name: str, up_id: UserGroupId | None = None, ) -> UserGroup | None: - group = ( - live_access_client.create_user_group(name, up_id).success_or_raise().data - ) + group = live_access_client.create_user_group(name, up_id) created_groups.append(group) return group @@ -92,13 +90,11 @@ def test_user_groups_lifecycle( assert group, "group 'Test Group' already existed, please delete" # Fetch group - fetched_group = ( - live_access_client.fetch_user_group(group.id).success_or_raise().data - ) + fetched_group = live_access_client.fetch_user_group(group.id) assert fetched_group.name == "Test Group" # Check for group in in full group list - groups = live_access_client.fetch_all_user_groups().success_or_raise().data + groups = live_access_client.fetch_all_user_groups() matching_group = next(g for g in groups if g.id == group.id) assert matching_group @@ -112,17 +108,11 @@ def test_user_groups_lifecycle( live_access_client.assign_user_to_user_group(group.id, [user.id]) # Fetch users in the group - users_in_group = ( - live_access_client.fetch_users_in_a_user_group(group.id).success_or_raise().data - ) + users_in_group = live_access_client.fetch_users_in_a_user_group(group.id) assert len(users_in_group) == 1 assert users_in_group[0].id == user.id # Fetch all users in the group - all_users_in_group = ( - live_access_client.fetch_all_users_in_a_user_group(group.id) - .success_or_raise() - .data - ) + all_users_in_group = live_access_client.fetch_all_users_in_a_user_group(group.id) assert len(all_users_in_group) == 1 assert all_users_in_group[0].id == user.id diff --git a/tests/live/test_visitor_lifecycle.py b/tests/live/test_visitor_lifecycle.py index 159a7ad..59babfa 100644 --- a/tests/live/test_visitor_lifecycle.py +++ b/tests/live/test_visitor_lifecycle.py @@ -1,5 +1,4 @@ import datetime -import functools from collections.abc import Callable, Generator import pytest @@ -12,11 +11,8 @@ from unifi_access.schemas import FetchAllVisitorsExpansion, Visitor, VisitReason def make_visitor(live_access_client: AccessClient) -> Generator[Callable[..., Visitor]]: created_visitors: list[Visitor] = [] - @functools.wraps(live_access_client.create_visitor) - def _make_visitor(*args, **kwargs) -> Visitor: - visitor = ( - live_access_client.create_visitor(*args, **kwargs).success_or_raise().data - ) + def _make_visitor(*args, **kwargs) -> Visitor: # type: ignore + visitor = live_access_client.create_visitor(*args, **kwargs) created_visitors.append(visitor) return visitor @@ -39,63 +35,47 @@ def test_visitor_lifecycle( ) assert visitor.first_name == "Test" - fetched_visitor = ( - live_access_client.fetch_visitor(visitor.id).success_or_raise().data - ) + fetched_visitor = live_access_client.fetch_visitor(visitor.id) assert fetched_visitor.first_name == "Test" assert fetched_visitor.pin_code is None # pin code assign/unassign - pin_code = live_access_client.generate_pin_code().success_or_raise().data - live_access_client.assign_pin_code_to_visitor( - visitor.id, pin_code - ).success_or_raise() - fetched_visitor = ( - live_access_client.fetch_visitor(visitor.id).success_or_raise().data - ) + pin_code = live_access_client.generate_pin_code() + live_access_client.assign_pin_code_to_visitor(visitor.id, pin_code) + fetched_visitor = live_access_client.fetch_visitor(visitor.id) assert fetched_visitor.pin_code assert fetched_visitor.pin_code.token - live_access_client.unassign_pin_code_from_visitor(visitor.id).success_or_raise() - fetched_visitor = ( - live_access_client.fetch_visitor(visitor.id).success_or_raise().data - ) + live_access_client.unassign_pin_code_from_visitor(visitor.id) + fetched_visitor = live_access_client.fetch_visitor(visitor.id) assert fetched_visitor.pin_code is None - updated_visitor = ( - live_access_client.update_visitor(visitor.id, first_name="Updated Test") - .success_or_raise() - .data + updated_visitor = live_access_client.update_visitor( + visitor.id, first_name="Updated Test" ) assert updated_visitor.first_name == "Updated Test" - all_visitors = live_access_client.fetch_all_visitors().success_or_raise().data + all_visitors = live_access_client.fetch_all_visitors().data matched_visitor = next(v for v in all_visitors if v.id == visitor.id) assert matched_visitor.first_name == "Updated Test" - expanded_all_visitors = ( - live_access_client.fetch_all_visitors( - expand=[ - FetchAllVisitorsExpansion.ACCESS_POLICY, - FetchAllVisitorsExpansion.RESOURCE, - FetchAllVisitorsExpansion.SCHEDULE, - FetchAllVisitorsExpansion.NFC_CARD, - FetchAllVisitorsExpansion.PIN_CODE, - ] - ) - .success_or_raise() - .data - ) + expanded_all_visitors = live_access_client.fetch_all_visitors( + expand=[ + FetchAllVisitorsExpansion.ACCESS_POLICY, + FetchAllVisitorsExpansion.RESOURCE, + FetchAllVisitorsExpansion.SCHEDULE, + FetchAllVisitorsExpansion.NFC_CARD, + FetchAllVisitorsExpansion.PIN_CODE, + ] + ).data expanded_matched_visitor = next( v for v in expanded_all_visitors if v.id == visitor.id ) assert expanded_matched_visitor.first_name == "Updated Test" # TODO: test expanded contents - non_expanded_all_visitors = ( - live_access_client.fetch_all_visitors(expand=[FetchAllVisitorsExpansion.NONE]) - .success_or_raise() - .data - ) + non_expanded_all_visitors = live_access_client.fetch_all_visitors( + expand=[FetchAllVisitorsExpansion.NONE] + ).data non_expanded_matched_visitor = next( v for v in non_expanded_all_visitors if v.id == visitor.id ) diff --git a/tests/test_access_policy.py b/tests/test_access_policy.py index c4593df..1435e8c 100644 --- a/tests/test_access_policy.py +++ b/tests/test_access_policy.py @@ -85,8 +85,8 @@ class AccessPolicyTests(UnifiAccessTests): ), ], schedule_id=ScheduleId("4e108aeb-ec9a-4822-bf86-170ea986f934"), - ).success_or_raise() - assert resp.data.id == "bb5eb965-42dc-4206-9654-88a2d1c3aaa5" + ) + assert resp.id == "bb5eb965-42dc-4206-9654-88a2d1c3aaa5" @responses.activate def test_update_access_policy(self) -> None: @@ -155,8 +155,8 @@ class AccessPolicyTests(UnifiAccessTests): ), ], schedule_id=ScheduleId("4e108aeb-ec9a-4822-bf86-170ea986f934"), - ).success_or_raise() - assert resp.data.id == "bb5eb965-42dc-4206-9654-88a2d1c3aaa5" + ) + assert resp.id == "bb5eb965-42dc-4206-9654-88a2d1c3aaa5" @responses.activate def test_delete_access_policy(self) -> None: @@ -168,11 +168,9 @@ class AccessPolicyTests(UnifiAccessTests): json={"code": "SUCCESS", "msg": "success", "data": "success"}, ) - resp = self.client.delete_access_policy( - access_policy_id=access_policy_id - ).success_or_raise() + resp = self.client.delete_access_policy(access_policy_id=access_policy_id) - assert resp.data == "success" + assert resp == "success" @responses.activate def test_fetch_access_policy(self) -> None: @@ -203,11 +201,9 @@ class AccessPolicyTests(UnifiAccessTests): }, ) - resp = self.client.fetch_access_policy( - access_policy_id=access_policy_id - ).success_or_raise() + resp = self.client.fetch_access_policy(access_policy_id=access_policy_id) - assert resp.data.id == "ed09985f-cf52-486e-bc33-377b6ed7bbf2" + assert resp.id == "ed09985f-cf52-486e-bc33-377b6ed7bbf2" @responses.activate def test_fetch_all_access_policies(self) -> None: @@ -272,8 +268,8 @@ class AccessPolicyTests(UnifiAccessTests): }, ) - resp = self.client.fetch_all_access_policies().success_or_raise() - assert resp.data[0].id == "73f15cab-c725-4a76-a419-a4026d131e96" + resp = self.client.fetch_all_access_policies() + assert resp[0].id == "73f15cab-c725-4a76-a419-a4026d131e96" @responses.activate def test_create_holiday_group(self) -> None: @@ -354,8 +350,8 @@ class AccessPolicyTests(UnifiAccessTests): end_time=datetime.datetime(2023, 8, 27, tzinfo=datetime.UTC), ), ], - ).success_or_raise() - assert resp.data.id == "7be7a7a0-818f-4f76-98c3-1c38957f4dca" + ) + assert resp.id == "7be7a7a0-818f-4f76-98c3-1c38957f4dca" @responses.activate def test_update_holiday_group(self) -> None: @@ -443,8 +439,8 @@ class AccessPolicyTests(UnifiAccessTests): end_time=datetime.datetime(2023, 8, 27, tzinfo=datetime.UTC), ), ], - ).success_or_raise() - assert resp.data.id == "7be7a7a0-818f-4f76-98c3-1c38957f4dca" + ) + assert resp.id == "7be7a7a0-818f-4f76-98c3-1c38957f4dca" @responses.activate def test_delete_holiday_group(self) -> None: @@ -457,11 +453,9 @@ class AccessPolicyTests(UnifiAccessTests): json={"code": "SUCCESS", "msg": "success", "data": "success"}, ) - resp = self.client.delete_holiday_group( - holiday_group_id=holiday_group_id - ).success_or_raise() + resp = self.client.delete_holiday_group(holiday_group_id=holiday_group_id) - assert resp.data == "success" + assert resp == "success" @responses.activate def test_fetch_holiday_group(self) -> None: @@ -501,10 +495,8 @@ class AccessPolicyTests(UnifiAccessTests): }, ) - resp = self.client.fetch_holiday_group( - holiday_group_id=holiday_group_id - ).success_or_raise() - assert resp.data.id == "7be7a7a0-818f-4f76-98c3-1c38957f4dca" + resp = self.client.fetch_holiday_group(holiday_group_id=holiday_group_id) + assert resp.id == "7be7a7a0-818f-4f76-98c3-1c38957f4dca" @responses.activate def test_fetch_all_holiday_groups(self) -> None: @@ -534,8 +526,8 @@ class AccessPolicyTests(UnifiAccessTests): }, ) - resp = self.client.fetch_all_holiday_groups().success_or_raise() - assert resp.data[0].id == "8cc22b49-a7f4-49a6-9f04-044444992d6c" + resp = self.client.fetch_all_holiday_groups() + assert resp[0].id == "8cc22b49-a7f4-49a6-9f04-044444992d6c" @responses.activate def test_create_schedule(self) -> None: @@ -669,7 +661,7 @@ class AccessPolicyTests(UnifiAccessTests): end_time=datetime.time(19, 0, 59), ), ], - ).success_or_raise() + ) @responses.activate def test_update_schedule(self) -> None: @@ -755,7 +747,7 @@ class AccessPolicyTests(UnifiAccessTests): end_time=datetime.time(11, 45, 59), ) ], - ).success_or_raise() + ) @responses.activate def test_fetch_schedule(self) -> None: @@ -821,7 +813,7 @@ class AccessPolicyTests(UnifiAccessTests): }, ) - resp = self.client.fetch_schedule(schedule_id=schedule_id).success_or_raise() + resp = self.client.fetch_schedule(schedule_id=schedule_id) @responses.activate def test_fetch_all_schedules(self) -> None: @@ -853,7 +845,7 @@ class AccessPolicyTests(UnifiAccessTests): }, ) - resp = self.client.fetch_all_schedules().success_or_raise() + resp = self.client.fetch_all_schedules() @responses.activate def test_delete_schedule(self) -> None: @@ -866,4 +858,4 @@ class AccessPolicyTests(UnifiAccessTests): json={"code": "SUCCESS", "msg": "success", "data": "success"}, ) - resp = self.client.delete_schedule(schedule_id=schedule_id).success_or_raise() + resp = self.client.delete_schedule(schedule_id=schedule_id) diff --git a/tests/test_credential.py b/tests/test_credential.py index 6c0eb78..e33755f 100644 --- a/tests/test_credential.py +++ b/tests/test_credential.py @@ -15,8 +15,8 @@ class CredentialTests(UnifiAccessTests): match=[matchers.header_matcher(self.common_headers)], json={"code": "SUCCESS", "data": "67203419", "msg": "success"}, ) - resp = self.client.generate_pin_code().success_or_raise() - assert resp.data == "67203419" + resp = self.client.generate_pin_code() + assert resp == "67203419" @responses.activate def test_begin_enroll_card(self) -> None: @@ -37,8 +37,8 @@ class CredentialTests(UnifiAccessTests): ) resp = self.client.begin_enroll_card( device_id=DeviceId("0418d6a2bb7a"), reset_ua_card=False - ).success_or_raise() - assert resp.data.session_id == "e8a97c52-6676-4c48-8589-bd518afc4094" + ) + assert resp.session_id == "e8a97c52-6676-4c48-8589-bd518afc4094" @responses.activate def test_fetch_enroll_card_status(self) -> None: @@ -59,12 +59,10 @@ class CredentialTests(UnifiAccessTests): }, }, ) - resp = self.client.fetch_enroll_card_status( - session_id=session_id - ).success_or_raise() - assert resp.data.id == "014A3151" + resp = self.client.fetch_enroll_card_status(session_id=session_id) + assert resp.id == "014A3151" assert ( - resp.data.token + resp.token == "821f90b262e90c5c0fbcddf3d6d2f3b94cc015d6e8104ab4fb96e4c8b8e90cb7" ) @@ -79,9 +77,7 @@ class CredentialTests(UnifiAccessTests): ], json={"code": "SUCCESS", "msg": "success", "data": "success"}, ) - resp = self.client.remove_enrollment_session( - session_id=session_id - ).success_or_raise() + resp = self.client.remove_enrollment_session(session_id=session_id) @responses.activate def test_fetch_nfc_card(self) -> None: @@ -115,7 +111,7 @@ class CredentialTests(UnifiAccessTests): }, ) - resp = self.client.fetch_nfc_card(nfc_card_token).success_or_raise() + resp = self.client.fetch_nfc_card(nfc_card_token) @responses.activate def test_fetch_all_nfc_cards(self) -> None: @@ -169,9 +165,7 @@ class CredentialTests(UnifiAccessTests): }, ) - resp = self.client.fetch_all_nfc_cards( - page_num=1, page_size=25 - ).success_or_raise() + resp = self.client.fetch_all_nfc_cards(page_num=1, page_size=25) @responses.activate def test_delete_nfc_card(self) -> None: @@ -185,5 +179,5 @@ class CredentialTests(UnifiAccessTests): json={"code": "SUCCESS", "data": "success", "msg": "success"}, ) - resp = self.client.delete_nfc_card(nfc_card_token).success_or_raise() - assert resp.data == "success" + resp = self.client.delete_nfc_card(nfc_card_token) + assert resp == "success" diff --git a/tests/test_device.py b/tests/test_device.py index cee4325..1e9b0a7 100644 --- a/tests/test_device.py +++ b/tests/test_device.py @@ -38,5 +38,5 @@ class CredentialTests(UnifiAccessTests): "msg": "success", }, ) - resp = self.client.fetch_devices().success_or_raise() - assert resp.data[0].name == "UA-HUB-3855" + resp = self.client.fetch_devices() + assert resp[0].name == "UA-HUB-3855" diff --git a/tests/test_space.py b/tests/test_space.py index d326368..44df8eb 100644 --- a/tests/test_space.py +++ b/tests/test_space.py @@ -61,7 +61,7 @@ class SpaceTests(UnifiAccessTests): }, ) - resp = self.client.fetch_door_group_topology().success_or_raise() + resp = self.client.fetch_door_group_topology() @responses.activate def test_create_door_group(self) -> None: @@ -95,7 +95,7 @@ class SpaceTests(UnifiAccessTests): resp = self.client.create_door_group( group_name="Test", resources=[DoorId("6ff875d2-af87-470b-9cb5-774c6596afc8")], - ).success_or_raise() + ) @responses.activate def test_fetch_door_group_building(self) -> None: @@ -133,9 +133,7 @@ class SpaceTests(UnifiAccessTests): }, ) - resp = self.client.fetch_door_group( - door_group_id=door_group_id - ).success_or_raise() + resp = self.client.fetch_door_group(door_group_id=door_group_id) @responses.activate def test_fetch_door_group_customized_groups(self) -> None: @@ -163,9 +161,7 @@ class SpaceTests(UnifiAccessTests): }, ) - resp = self.client.fetch_door_group( - door_group_id=door_group_id - ).success_or_raise() + resp = self.client.fetch_door_group(door_group_id=door_group_id) @responses.activate def test_update_door_group(self) -> None: @@ -207,7 +203,7 @@ class SpaceTests(UnifiAccessTests): DoorId("5a2c3d4e-1f6b-4c8d-9e0f-2a3b4c5d6e7f"), DoorGroupId("2p3q4r5s-6t7u-8v9w-x0y1-z2a3b4c5d6e"), ], - ).success_or_raise() + ) @responses.activate def test_update_door_group_delete_resources(self) -> None: @@ -235,7 +231,7 @@ class SpaceTests(UnifiAccessTests): resp = self.client.update_door_group( door_group_id=door_group_id, resources=[], - ).success_or_raise() + ) @responses.activate def test_fetch_all_door_groups(self) -> None: @@ -273,7 +269,7 @@ class SpaceTests(UnifiAccessTests): }, ) - resp = self.client.fetch_all_door_groups().success_or_raise() + resp = self.client.fetch_all_door_groups() @responses.activate def test_delete_door_groups(self) -> None: @@ -285,9 +281,7 @@ class SpaceTests(UnifiAccessTests): json={"code": "SUCCESS", "data": "success", "msg": "success"}, ) - resp = self.client.delete_door_group( - door_group_id=door_group_id - ).success_or_raise() + resp = self.client.delete_door_group(door_group_id=door_group_id) @responses.activate def test_fetch_door(self) -> None: @@ -312,7 +306,7 @@ class SpaceTests(UnifiAccessTests): }, ) - resp = self.client.fetch_door(door_id=door_id).success_or_raise() + resp = self.client.fetch_door(door_id=door_id) @responses.activate def test_fetch_all_doors(self) -> None: @@ -348,7 +342,7 @@ class SpaceTests(UnifiAccessTests): }, ) - resp = self.client.fetch_all_doors().success_or_raise() + resp = self.client.fetch_all_doors() @responses.activate def test_unlock_door(self) -> None: @@ -360,7 +354,7 @@ class SpaceTests(UnifiAccessTests): json={"code": "SUCCESS", "data": "success", "msg": "success"}, ) - resp = self.client.unlock_door(door_id=door_id).success_or_raise() + resp = self.client.unlock_door(door_id=door_id) @responses.activate def test_set_temporary_door_locking_rule(self) -> None: @@ -378,7 +372,7 @@ class SpaceTests(UnifiAccessTests): # Customized 10-minute unlocked resp = self.client.set_temporary_door_locking_rule( door_id=door_id, type_=DoorLockingRuleType.CUSTOM, interval=10 - ).success_or_raise() + ) # XXX: TODO: test the other examples? @@ -397,7 +391,7 @@ class SpaceTests(UnifiAccessTests): }, ) - resp = self.client.fetch_door_locking_rule(door_id=door_id).success_or_raise() + resp = self.client.fetch_door_locking_rule(door_id=door_id) # XXX: TODO: test the other examples? @responses.activate @@ -413,9 +407,7 @@ class SpaceTests(UnifiAccessTests): ) # Keep it locked - resp = self.client.set_door_emergency_status( - lockdown=True, evacuation=False - ).success_or_raise() + resp = self.client.set_door_emergency_status(lockdown=True, evacuation=False) # XXX: TODO: test the other examples? @responses.activate @@ -431,4 +423,4 @@ class SpaceTests(UnifiAccessTests): }, ) - resp = self.client.fetch_door_emergency_status().success_or_raise() + resp = self.client.fetch_door_emergency_status() diff --git a/tests/test_system_log.py b/tests/test_system_log.py index 8a20664..8725dde 100644 --- a/tests/test_system_log.py +++ b/tests/test_system_log.py @@ -81,7 +81,7 @@ class SystemLogTests(UnifiAccessTests): since=datetime.datetime.fromtimestamp(1690770546, tz=datetime.UTC), until=datetime.datetime.fromtimestamp(1690771546, tz=datetime.UTC), actor_id=UserId("3e1f196e-c97b-4748-aecb-eab5e9c251b2"), - ).success_or_raise() + ) @responses.activate def test_export_system_logs(self) -> None: diff --git a/tests/test_unifi_identity.py b/tests/test_unifi_identity.py index 075510d..53d299e 100644 --- a/tests/test_unifi_identity.py +++ b/tests/test_unifi_identity.py @@ -38,7 +38,7 @@ class UnifiIdentityTests(UnifiAccessTests): email="example@*.com", ) ] - ).success_or_raise() + ) @responses.activate def test_send_unifi_identity_invitations_email_failure(self) -> None: @@ -78,7 +78,7 @@ class UnifiIdentityTests(UnifiAccessTests): email="example@*.com", ) ] - ).success_or_raise() + ) @responses.activate def test_fetch_available_resources(self) -> None: @@ -122,7 +122,7 @@ class UnifiIdentityTests(UnifiAccessTests): IdentityResourceType.WIFI, IdentityResourceType.VPN, ] - ).success_or_raise() + ) @responses.activate def test_assign_resources_to_user(self) -> None: @@ -142,11 +142,11 @@ class UnifiIdentityTests(UnifiAccessTests): json={"code": "SUCCESS", "data": None, "msg": "success"}, ) - resp = self.client.assign_resources_to_user( + self.client.assign_resources_to_user( user_id=user_id, resource_type=IdentityResourceType.WIFI, resource_ids=[IdentityResourceId("65dff9a8c188cb71cfac8e9a")], - ).success_or_raise() + ) @responses.activate def test_fetch_resources_assigned_to_user(self) -> None: @@ -182,9 +182,7 @@ class UnifiIdentityTests(UnifiAccessTests): }, ) - resp = self.client.fetch_resources_assigned_to_user( - user_id=user_id - ).success_or_raise() + resp = self.client.fetch_resources_assigned_to_user(user_id=user_id) @responses.activate def test_assign_resources_to_user_group(self) -> None: @@ -204,11 +202,11 @@ class UnifiIdentityTests(UnifiAccessTests): json={"code": "SUCCESS", "data": None, "msg": "success"}, ) - resp = self.client.assign_resources_to_user_group( + self.client.assign_resources_to_user_group( user_group_id=user_group_id, resource_type=IdentityResourceType.WIFI, resource_ids=[IdentityResourceId("65dff9a8c188cb71cfac8e9a")], - ).success_or_raise() + ) @responses.activate def test_fetch_resources_assigned_to_user_group(self) -> None: @@ -246,4 +244,4 @@ class UnifiIdentityTests(UnifiAccessTests): resp = self.client.fetch_resources_assigned_to_user_group( user_group_id=user_group_id - ).success_or_raise() + ) diff --git a/tests/test_user.py b/tests/test_user.py index 297c27f..38df509 100644 --- a/tests/test_user.py +++ b/tests/test_user.py @@ -61,8 +61,8 @@ class UserTests(UnifiAccessTests): employee_number="100000", onboard_time=datetime.datetime.fromtimestamp(1689150139, tz=datetime.UTC), user_email="example@*.com", - ).success_or_raise() - assert resp.data.id == "37f2b996-c2c5-487b-aa22-8b453ff14a4b" + ) + assert resp.id == "37f2b996-c2c5-487b-aa22-8b453ff14a4b" @responses.activate def test_update_user(self) -> None: @@ -88,7 +88,7 @@ class UserTests(UnifiAccessTests): json={"code": "SUCCESS", "msg": "success", "data": None}, ) - resp = self.client.update_user( + self.client.update_user( user_id=user_id, first_name="H", last_name="L", @@ -96,8 +96,7 @@ class UserTests(UnifiAccessTests): onboard_time=datetime.datetime.fromtimestamp(1689150139, tz=datetime.UTC), user_email="example@*.com", status=UserStatus.ACTIVE, - ).success_or_raise() - assert resp.data is None + ) @responses.activate def test_fetch_user(self) -> None: @@ -164,9 +163,7 @@ class UserTests(UnifiAccessTests): }, ) - resp = self.client.fetch_user( - user_id=user_id, expand_access_policies=True - ).success_or_raise() + resp = self.client.fetch_user(user_id=user_id, expand_access_policies=True) # TODO: verify correctness of data? @responses.activate @@ -261,7 +258,7 @@ class UserTests(UnifiAccessTests): resp = self.client.fetch_all_users( expand_access_policies=True, page_num=1, page_size=25 - ).success_or_raise() + ) assert resp.pagination # TODO: verify correctness of data? @@ -284,14 +281,13 @@ class UserTests(UnifiAccessTests): ], json={"code": "SUCCESS", "msg": "success"}, ) - resp = self.client.assign_access_policy_to_user( + self.client.assign_access_policy_to_user( user_id=user_id, access_policy_ids=[ AccessPolicyId("03895c7f-9f53-4334-812b-5db9c122c109"), AccessPolicyId("3b6bcb0c-7498-44cf-8615-00a96d824cbe"), ], - ).success_or_raise() - assert resp.data is None + ) @responses.activate def test_assign_nfc_card_to_user(self) -> None: @@ -310,14 +306,13 @@ class UserTests(UnifiAccessTests): ], json={"code": "SUCCESS", "msg": "success"}, ) - resp = self.client.assign_nfc_card_to_user( + self.client.assign_nfc_card_to_user( user_id=user_id, token=NfcCardToken( "d27822fc682b478dc637c6db01813e465174df6e54ca515d8427db623cfda1d0" ), force_add=True, - ).success_or_raise() - assert resp.data is None + ) @responses.activate def test_unassign_nfc_card_from_user(self) -> None: @@ -335,13 +330,12 @@ class UserTests(UnifiAccessTests): ], json={"code": "SUCCESS", "msg": "success"}, ) - resp = self.client.unassign_nfc_card_from_user( + self.client.unassign_nfc_card_from_user( user_id=user_id, token=NfcCardToken( "d27822fc682b478dc637c6db01813e465174df6e54ca515d8427db623cfda1d0" ), - ).success_or_raise() - assert resp.data is None + ) @responses.activate def test_assign_pin_code_to_user(self) -> None: @@ -355,10 +349,7 @@ class UserTests(UnifiAccessTests): ], json={"code": "SUCCESS", "msg": "success"}, ) - resp = self.client.assign_pin_code_to_user( - user_id=user_id, pin_code="57301208" - ).success_or_raise() - assert resp.data is None + self.client.assign_pin_code_to_user(user_id=user_id, pin_code="57301208") @responses.activate def test_unassign_pin_code_from_user(self) -> None: @@ -371,10 +362,9 @@ class UserTests(UnifiAccessTests): ], json={"code": "SUCCESS", "msg": "success"}, ) - resp = self.client.unassign_pin_code_from_user( + self.client.unassign_pin_code_from_user( user_id=user_id, - ).success_or_raise() - assert resp.data is None + ) @responses.activate def test_create_user_group(self) -> None: @@ -406,7 +396,7 @@ class UserTests(UnifiAccessTests): resp = self.client.create_user_group( name="Group Name", up_id=UserGroupId("013d05d3-7262-4908-ba69-badbbbf8f5a6") - ).success_or_raise() + ) @responses.activate def test_fetch_all_user_groups(self) -> None: @@ -429,7 +419,7 @@ class UserTests(UnifiAccessTests): }, ) - resp = self.client.fetch_all_user_groups().success_or_raise() + resp = self.client.fetch_all_user_groups() @responses.activate def test_fetch_user_group(self) -> None: @@ -452,9 +442,7 @@ class UserTests(UnifiAccessTests): }, ) - resp = self.client.fetch_user_group( - user_group_id=user_group_id - ).success_or_raise() + resp = self.client.fetch_user_group(user_group_id=user_group_id) @responses.activate def test_update_user_group(self) -> None: @@ -474,11 +462,11 @@ class UserTests(UnifiAccessTests): json={"code": "SUCCESS", "msg": "success"}, ) - resp = self.client.update_user_group( + self.client.update_user_group( user_group_id=user_group_id, name="Group Name", up_id=UserGroupId("013d05d3-7262-4908-ba69-badbbbf8f5a6"), - ).success_or_raise() + ) @responses.activate def test_delete_user_group(self) -> None: @@ -491,9 +479,7 @@ class UserTests(UnifiAccessTests): json={"code": "SUCCESS", "msg": "success"}, ) - resp = self.client.delete_user_group( - user_group_id=user_group_id - ).success_or_raise() + self.client.delete_user_group(user_group_id=user_group_id) @responses.activate def test_assign_user_to_user_group(self) -> None: @@ -514,13 +500,13 @@ class UserTests(UnifiAccessTests): json={"code": "SUCCESS", "msg": "success"}, ) - resp = self.client.assign_user_to_user_group( + self.client.assign_user_to_user_group( user_group_id=user_group_id, users=[ UserId("7c6e9102-acb7-4b89-8ed4-7561e6fb706c"), UserId("fd63bc4c-52e0-4dbf-a699-e1233339c73b"), ], - ).success_or_raise() + ) @responses.activate def test_unassign_user_from_user_group(self) -> None: @@ -541,13 +527,13 @@ class UserTests(UnifiAccessTests): json={"code": "SUCCESS", "msg": "success"}, ) - resp = self.client.unassign_user_from_user_group( + self.client.unassign_user_from_user_group( user_group_id=user_group_id, users=[ UserId("7c6e9102-acb7-4b89-8ed4-7561e6fb706c"), UserId("fd63bc4c-52e0-4dbf-a699-e1233339c73b"), ], - ).success_or_raise() + ) @responses.activate def test_fetch_users_in_a_user_group(self) -> None: @@ -583,7 +569,7 @@ class UserTests(UnifiAccessTests): resp = self.client.fetch_users_in_a_user_group( user_group_id=user_group_id, - ).success_or_raise() + ) @responses.activate def test_fetch_all_users_in_a_user_group(self) -> None: @@ -619,7 +605,7 @@ class UserTests(UnifiAccessTests): resp = self.client.fetch_all_users_in_a_user_group( user_group_id=user_group_id, - ).success_or_raise() + ) @responses.activate def test_fetch_the_access_policies_assigned_to_a_user(self) -> None: @@ -659,7 +645,7 @@ class UserTests(UnifiAccessTests): resp = self.client.fetch_the_access_policies_assigned_to_a_user( user_id=user_id, only_user_policies=False - ).success_or_raise() + ) @responses.activate def test_assign_access_policy_to_user_group(self) -> None: @@ -677,10 +663,10 @@ class UserTests(UnifiAccessTests): json={"code": "SUCCESS", "data": None, "msg": "success"}, ) - resp = self.client.assign_access_policy_to_user_group( + self.client.assign_access_policy_to_user_group( user_group_id=user_group_id, access_policy_ids=[AccessPolicyId("bbe48a65-2ac1-4bf6-bd65-bc8f9ee7fb75")], - ).success_or_raise() + ) @responses.activate def test_fetch_the_access_policies_assigned_to_a_user_group(self) -> None: @@ -717,4 +703,4 @@ class UserTests(UnifiAccessTests): resp = self.client.fetch_the_access_policies_assigned_to_a_user_group( user_group_id=user_group_id, - ).success_or_raise() + ) diff --git a/tests/test_visitor.py b/tests/test_visitor.py index 03ea1e3..ec4b2ba 100644 --- a/tests/test_visitor.py +++ b/tests/test_visitor.py @@ -177,8 +177,8 @@ class VisitorTests(UnifiAccessTests): id=DoorGroupId("d5573467-d6b3-4e8f-8e48-8a322b91664a") ), ], - ).success_or_raise() - assert resp.data.id == "fbe8d920-47d3-4cfd-bda7-bf4b0e26f73c" + ) + assert resp.id == "fbe8d920-47d3-4cfd-bda7-bf4b0e26f73c" @responses.activate def test_fetch_visitor(self) -> None: @@ -226,7 +226,7 @@ class VisitorTests(UnifiAccessTests): }, ) - resp = self.client.fetch_visitor(visitor_id=visitor_id).success_or_raise() + resp = self.client.fetch_visitor(visitor_id=visitor_id) # TODO: verify correctness of data? @responses.activate @@ -279,9 +279,7 @@ class VisitorTests(UnifiAccessTests): }, ) - resp = self.client.fetch_all_visitors( - page_num=1, page_size=25 - ).success_or_raise() + resp = self.client.fetch_all_visitors(page_num=1, page_size=25) assert resp.pagination # TODO: verify correctness of data? @@ -335,7 +333,7 @@ class VisitorTests(UnifiAccessTests): }, ) - resp = self.client.fetch_all_visitors(keyword="H").success_or_raise() + resp = self.client.fetch_all_visitors(keyword="H") assert resp.pagination # TODO: verify correctness of data? @@ -407,7 +405,7 @@ class VisitorTests(UnifiAccessTests): FetchAllVisitorsExpansion.NFC_CARD, FetchAllVisitorsExpansion.PIN_CODE, ] - ).success_or_raise() + ) assert resp.pagination # TODO: verify correctness of data? @@ -452,9 +450,7 @@ class VisitorTests(UnifiAccessTests): }, ) - resp = self.client.fetch_all_visitors( - expand=[FetchAllVisitorsExpansion.NONE] - ).success_or_raise() + resp = self.client.fetch_all_visitors(expand=[FetchAllVisitorsExpansion.NONE]) assert resp.pagination # TODO: verify correctness of data? @@ -618,7 +614,7 @@ class VisitorTests(UnifiAccessTests): id=DoorGroupId("d5573467-d6b3-4e8f-8e48-8a322b91664a") ), ], - ).success_or_raise() + ) @responses.activate def test_delete_visitor(self) -> None: @@ -633,11 +629,7 @@ class VisitorTests(UnifiAccessTests): json={"code": "SUCCESS", "data": None, "msg": "success"}, ) - resp = self.client.delete_visitor( - visitor_id=visitor_id, - is_force=True, - ).success_or_raise() - # TODO: verify correctness of data? + self.client.delete_visitor(visitor_id=visitor_id, is_force=True) @responses.activate def test_assign_nfc_card_to_visitor(self) -> None: @@ -657,13 +649,13 @@ class VisitorTests(UnifiAccessTests): json={"code": "SUCCESS", "msg": "success"}, ) - resp = self.client.assign_nfc_card_to_visitor( + self.client.assign_nfc_card_to_visitor( visitor_id=visitor_id, token=NfcCardToken( "d27822fc682b478dc637c6db01813e465174df6e54ca515d8427db623cfda1d0" ), force_add=True, - ).success_or_raise() + ) @responses.activate def test_unassign_nfc_card_from_visitor(self) -> None: @@ -682,12 +674,12 @@ class VisitorTests(UnifiAccessTests): json={"code": "SUCCESS", "msg": "success"}, ) - resp = self.client.unassign_nfc_card_from_visitor( + self.client.unassign_nfc_card_from_visitor( visitor_id=visitor_id, token=NfcCardToken( "d27822fc682b478dc637c6db01813e465174df6e54ca515d8427db623cfda1d0" ), - ).success_or_raise() + ) @responses.activate def test_assign_pin_code_to_visitor(self) -> None: @@ -702,10 +694,10 @@ class VisitorTests(UnifiAccessTests): json={"code": "SUCCESS", "msg": "success"}, ) - resp = self.client.assign_pin_code_to_visitor( + self.client.assign_pin_code_to_visitor( visitor_id=visitor_id, pin_code="57301208", - ).success_or_raise() + ) @responses.activate def test_unassign_pin_code_from_visitor(self) -> None: @@ -719,6 +711,4 @@ class VisitorTests(UnifiAccessTests): json={"code": "SUCCESS", "msg": "success"}, ) - resp = self.client.unassign_pin_code_from_visitor( - visitor_id=visitor_id - ).success_or_raise() + self.client.unassign_pin_code_from_visitor(visitor_id=visitor_id)