From 5691b2a83abe13ee75a316b019c4d90c6cc419e3 Mon Sep 17 00:00:00 2001 From: David Mulcahey Date: Thu, 21 Mar 2024 22:28:17 -0400 Subject: [PATCH] coverage --- tests/test_switch.py | 567 +++++++++++++++++++++++- zha/zigbee/cluster_handlers/__init__.py | 1 - 2 files changed, 566 insertions(+), 2 deletions(-) diff --git a/tests/test_switch.py b/tests/test_switch.py index 97636227..9229743e 100644 --- a/tests/test_switch.py +++ b/tests/test_switch.py @@ -7,9 +7,21 @@ import pytest from slugify import slugify +from zhaquirks.const import ( + DEVICE_TYPE, + ENDPOINTS, + INPUT_CLUSTERS, + OUTPUT_CLUSTERS, + PROFILE_ID, +) from zigpy.device import Device as ZigpyDevice +from zigpy.exceptions import ZigbeeException from zigpy.profiles import zha -from zigpy.zcl.clusters import general +from zigpy.quirks import _DEVICE_REGISTRY, CustomCluster, CustomDevice +from zigpy.quirks.v2 import CustomDeviceV2, add_to_registry_v2 +import zigpy.types as t +from zigpy.zcl.clusters import closures, general +from zigpy.zcl.clusters.manufacturer_specific import ManufacturerSpecificCluster import zigpy.zcl.foundation as zcl_f from zha.application import Platform @@ -48,6 +60,24 @@ def zigpy_device(zigpy_device_mock: Callable[..., ZigpyDevice]) -> ZigpyDevice: return zigpy_device_mock(endpoints) +@pytest.fixture +def zigpy_cover_device(zigpy_device_mock): + """Zigpy cover device.""" + + endpoints = { + 1: { + SIG_EP_PROFILE: zha.PROFILE_ID, + SIG_EP_TYPE: zha.DeviceType.WINDOW_COVERING_DEVICE, + SIG_EP_INPUT: [ + general.Basic.cluster_id, + closures.WindowCovering.cluster_id, + ], + SIG_EP_OUTPUT: [], + } + } + return zigpy_device_mock(endpoints) + + @pytest.fixture async def device_switch_1( zigpy_device_mock: Callable[..., ZigpyDevice], @@ -325,3 +355,538 @@ def get_group_entity(group: Group, entity_id: str) -> Optional[GroupEntity]: } return entities.get(entity_id) + + +class WindowDetectionFunctionQuirk(CustomDevice): + """Quirk with window detection function attribute.""" + + class TuyaManufCluster(CustomCluster, ManufacturerSpecificCluster): + """Tuya manufacturer specific cluster.""" + + cluster_id = 0xEF00 + ep_attribute = "tuya_manufacturer" + + attributes = { + 0xEF01: ("window_detection_function", t.Bool), + 0xEF02: ("window_detection_function_inverter", t.Bool), + } + + def __init__(self, *args, **kwargs): + """Initialize with task.""" + super().__init__(*args, **kwargs) + self._attr_cache.update( + {0xEF01: False} + ) # entity won't be created without this + + replacement = { + ENDPOINTS: { + 1: { + PROFILE_ID: zha.PROFILE_ID, + DEVICE_TYPE: zha.DeviceType.ON_OFF_SWITCH, + INPUT_CLUSTERS: [general.Basic.cluster_id, TuyaManufCluster], + OUTPUT_CLUSTERS: [], + }, + } + } + + +@pytest.fixture +async def zigpy_device_tuya(zha_gateway: ZHAGateway, zigpy_device_mock, device_joined): + """Device tracker zigpy tuya device.""" + + zigpy_dev = zigpy_device_mock( + { + 1: { + SIG_EP_INPUT: [general.Basic.cluster_id], + SIG_EP_OUTPUT: [], + SIG_EP_TYPE: zha.DeviceType.ON_OFF_SWITCH, + } + }, + manufacturer="_TZE200_b6wax7g0", + quirk=WindowDetectionFunctionQuirk, + ) + + zha_device = await device_joined(zigpy_dev) + zha_device.available = True + return zigpy_dev + + +async def test_switch_configurable( + zha_gateway: ZHAGateway, device_joined, zigpy_device_tuya +) -> None: + """Test ZHA configurable switch platform.""" + + zha_device = await device_joined(zigpy_device_tuya) + cluster = zigpy_device_tuya.endpoints[1].tuya_manufacturer + entity_id = find_entity_id(Platform.SWITCH, zha_device) + assert entity_id is not None + entity = get_entity(zha_device, entity_id) + assert entity is not None + + # test that the state has changed from unavailable to off + assert bool(entity.get_state()["state"]) is False + + # turn on at switch + await send_attributes_report( + zha_gateway, cluster, {"window_detection_function": True} + ) + assert bool(entity.get_state()["state"]) is True + + # turn off at switch + await send_attributes_report( + zha_gateway, cluster, {"window_detection_function": False} + ) + assert bool(entity.get_state()["state"]) is False + + # turn on from HA + with patch( + "zigpy.zcl.Cluster.write_attributes", + return_value=[zcl_f.Status.SUCCESS, zcl_f.Status.SUCCESS], + ): + # turn on via UI + await entity.async_turn_on() + await zha_gateway.async_block_till_done() + assert cluster.write_attributes.mock_calls == [ + call({"window_detection_function": True}, manufacturer=None) + ] + + cluster.write_attributes.reset_mock() + + # turn off from HA + with patch( + "zigpy.zcl.Cluster.write_attributes", + return_value=[zcl_f.Status.SUCCESS, zcl_f.Status.SUCCESS], + ): + # turn off via UI + await entity.async_turn_off() + await zha_gateway.async_block_till_done() + assert cluster.write_attributes.mock_calls == [ + call({"window_detection_function": False}, manufacturer=None) + ] + + cluster.read_attributes.reset_mock() + await entity.async_update() + await zha_gateway.async_block_till_done() + # the mocking doesn't update the attr cache so this flips back to initial value + assert cluster.read_attributes.call_count == 2 + assert [ + call( + [ + "window_detection_function", + ], + allow_cache=False, + only_cache=False, + manufacturer=None, + ), + call( + [ + "window_detection_function_inverter", + ], + allow_cache=False, + only_cache=False, + manufacturer=None, + ), + ] == cluster.read_attributes.call_args_list + + cluster.write_attributes.reset_mock() + cluster.write_attributes.side_effect = ZigbeeException + + with pytest.raises(ZHAException): + await entity.async_turn_off() + await zha_gateway.async_block_till_done() + + assert cluster.write_attributes.mock_calls == [ + call({"window_detection_function": False}, manufacturer=None), + call({"window_detection_function": False}, manufacturer=None), + call({"window_detection_function": False}, manufacturer=None), + ] + + cluster.write_attributes.side_effect = None + + # test inverter + cluster.write_attributes.reset_mock() + cluster._attr_cache.update({0xEF02: True}) + + await entity.async_turn_off() + await zha_gateway.async_block_till_done() + assert cluster.write_attributes.mock_calls == [ + call({"window_detection_function": True}, manufacturer=None) + ] + + cluster.write_attributes.reset_mock() + await entity.async_turn_on() + await zha_gateway.async_block_till_done() + assert cluster.write_attributes.mock_calls == [ + call({"window_detection_function": False}, manufacturer=None) + ] + + +async def test_switch_configurable_custom_on_off_values( + zha_gateway: ZHAGateway, device_joined, zigpy_device_mock +) -> None: + """Test ZHA configurable switch platform.""" + + zigpy_dev = zigpy_device_mock( + { + 1: { + SIG_EP_INPUT: [general.Basic.cluster_id], + SIG_EP_OUTPUT: [], + SIG_EP_TYPE: zha.DeviceType.ON_OFF_SWITCH, + } + }, + manufacturer="manufacturer", + model="model", + ) + + ( + add_to_registry_v2(zigpy_dev.manufacturer, zigpy_dev.model) + .adds(WindowDetectionFunctionQuirk.TuyaManufCluster) + .switch( + "window_detection_function", + WindowDetectionFunctionQuirk.TuyaManufCluster.cluster_id, + on_value=3, + off_value=5, + ) + ) + + zigpy_device_ = _DEVICE_REGISTRY.get_device(zigpy_dev) + + assert isinstance(zigpy_device_, CustomDeviceV2) + cluster = zigpy_device_.endpoints[1].tuya_manufacturer + cluster.PLUGGED_ATTR_READS = {"window_detection_function": 5} + update_attribute_cache(cluster) + + zha_device = await device_joined(zigpy_device_) + + entity_id = find_entity_id(Platform.SWITCH, zha_device) + assert entity_id is not None + entity = get_entity(zha_device, entity_id) + assert entity is not None + + assert bool(entity.get_state()["state"]) is False + + # turn on at switch + await send_attributes_report(zha_gateway, cluster, {"window_detection_function": 3}) + assert bool(entity.get_state()["state"]) is True + + # turn off at switch + await send_attributes_report(zha_gateway, cluster, {"window_detection_function": 5}) + assert bool(entity.get_state()["state"]) is False + + # turn on from HA + with patch( + "zigpy.zcl.Cluster.write_attributes", + return_value=[zcl_f.WriteAttributesResponse.deserialize(b"\x00")[0]], + ): + # turn on via UI + await entity.async_turn_on() + await zha_gateway.async_block_till_done() + assert cluster.write_attributes.mock_calls == [ + call({"window_detection_function": 3}, manufacturer=None) + ] + cluster.write_attributes.reset_mock() + + # turn off from HA + with patch( + "zigpy.zcl.Cluster.write_attributes", + return_value=[zcl_f.WriteAttributesResponse.deserialize(b"\x00")[0]], + ): + # turn off via UI + await entity.async_turn_off() + await zha_gateway.async_block_till_done() + assert cluster.write_attributes.mock_calls == [ + call({"window_detection_function": 5}, manufacturer=None) + ] + + +async def test_switch_configurable_custom_on_off_values_force_inverted( + zha_gateway: ZHAGateway, device_joined, zigpy_device_mock +) -> None: + """Test ZHA configurable switch platform.""" + + zigpy_dev = zigpy_device_mock( + { + 1: { + SIG_EP_INPUT: [general.Basic.cluster_id], + SIG_EP_OUTPUT: [], + SIG_EP_TYPE: zha.DeviceType.ON_OFF_SWITCH, + } + }, + manufacturer="manufacturer2", + model="model2", + ) + + ( + add_to_registry_v2(zigpy_dev.manufacturer, zigpy_dev.model) + .adds(WindowDetectionFunctionQuirk.TuyaManufCluster) + .switch( + "window_detection_function", + WindowDetectionFunctionQuirk.TuyaManufCluster.cluster_id, + on_value=3, + off_value=5, + force_inverted=True, + ) + ) + + zigpy_device_ = _DEVICE_REGISTRY.get_device(zigpy_dev) + + assert isinstance(zigpy_device_, CustomDeviceV2) + cluster = zigpy_device_.endpoints[1].tuya_manufacturer + cluster.PLUGGED_ATTR_READS = {"window_detection_function": 5} + update_attribute_cache(cluster) + + zha_device = await device_joined(zigpy_device_) + + entity_id = find_entity_id(Platform.SWITCH, zha_device) + assert entity_id is not None + entity = get_entity(zha_device, entity_id) + assert entity is not None + + assert bool(entity.get_state()["state"]) is True + + # turn on at switch + await send_attributes_report(zha_gateway, cluster, {"window_detection_function": 3}) + assert bool(entity.get_state()["state"]) is False + + # turn off at switch + await send_attributes_report(zha_gateway, cluster, {"window_detection_function": 5}) + assert bool(entity.get_state()["state"]) is True + + # turn on from HA + with patch( + "zigpy.zcl.Cluster.write_attributes", + return_value=[zcl_f.WriteAttributesResponse.deserialize(b"\x00")[0]], + ): + # turn on via UI + await entity.async_turn_on() + await zha_gateway.async_block_till_done() + assert cluster.write_attributes.mock_calls == [ + call({"window_detection_function": 5}, manufacturer=None) + ] + cluster.write_attributes.reset_mock() + + # turn off from HA + with patch( + "zigpy.zcl.Cluster.write_attributes", + return_value=[zcl_f.WriteAttributesResponse.deserialize(b"\x00")[0]], + ): + # turn off via UI + await entity.async_turn_off() + await zha_gateway.async_block_till_done() + assert cluster.write_attributes.mock_calls == [ + call({"window_detection_function": 3}, manufacturer=None) + ] + + +async def test_switch_configurable_custom_on_off_values_inverter_attribute( + zha_gateway: ZHAGateway, device_joined, zigpy_device_mock +) -> None: + """Test ZHA configurable switch platform.""" + + zigpy_dev = zigpy_device_mock( + { + 1: { + SIG_EP_INPUT: [general.Basic.cluster_id], + SIG_EP_OUTPUT: [], + SIG_EP_TYPE: zha.DeviceType.ON_OFF_SWITCH, + } + }, + manufacturer="manufacturer3", + model="model3", + ) + + ( + add_to_registry_v2(zigpy_dev.manufacturer, zigpy_dev.model) + .adds(WindowDetectionFunctionQuirk.TuyaManufCluster) + .switch( + "window_detection_function", + WindowDetectionFunctionQuirk.TuyaManufCluster.cluster_id, + on_value=3, + off_value=5, + invert_attribute_name="window_detection_function_inverter", + ) + ) + + zigpy_device_ = _DEVICE_REGISTRY.get_device(zigpy_dev) + + assert isinstance(zigpy_device_, CustomDeviceV2) + cluster = zigpy_device_.endpoints[1].tuya_manufacturer + cluster.PLUGGED_ATTR_READS = { + "window_detection_function": 5, + "window_detection_function_inverter": t.Bool(True), + } + update_attribute_cache(cluster) + + zha_device = await device_joined(zigpy_device_) + + entity_id = find_entity_id(Platform.SWITCH, zha_device) + assert entity_id is not None + entity = get_entity(zha_device, entity_id) + assert entity is not None + + assert bool(entity.get_state()["state"]) is True + + # turn on at switch + await send_attributes_report(zha_gateway, cluster, {"window_detection_function": 3}) + assert bool(entity.get_state()["state"]) is False + + # turn off at switch + await send_attributes_report(zha_gateway, cluster, {"window_detection_function": 5}) + assert bool(entity.get_state()["state"]) is True + + # turn on from HA + with patch( + "zigpy.zcl.Cluster.write_attributes", + return_value=[zcl_f.WriteAttributesResponse.deserialize(b"\x00")[0]], + ): + # turn on via UI + await entity.async_turn_on() + await zha_gateway.async_block_till_done() + assert cluster.write_attributes.mock_calls == [ + call({"window_detection_function": 5}, manufacturer=None) + ] + cluster.write_attributes.reset_mock() + + # turn off from HA + with patch( + "zigpy.zcl.Cluster.write_attributes", + return_value=[zcl_f.WriteAttributesResponse.deserialize(b"\x00")[0]], + ): + # turn off via UI + await entity.async_turn_off() + await zha_gateway.async_block_till_done() + assert cluster.write_attributes.mock_calls == [ + call({"window_detection_function": 3}, manufacturer=None) + ] + + +WCAttrs = closures.WindowCovering.AttributeDefs +WCT = closures.WindowCovering.WindowCoveringType +WCCS = closures.WindowCovering.ConfigStatus +WCM = closures.WindowCovering.WindowCoveringMode + + +async def test_cover_inversion_switch( + zha_gateway: ZHAGateway, device_joined, zigpy_cover_device +) -> None: + """Test ZHA cover platform.""" + + # load up cover domain + cluster = zigpy_cover_device.endpoints[1].window_covering + cluster.PLUGGED_ATTR_READS = { + WCAttrs.current_position_lift_percentage.name: 65, + WCAttrs.current_position_tilt_percentage.name: 42, + WCAttrs.window_covering_type.name: WCT.Tilt_blind_tilt_and_lift, + WCAttrs.config_status.name: WCCS(~WCCS.Open_up_commands_reversed), + WCAttrs.window_covering_mode.name: WCM(WCM.LEDs_display_feedback), + } + update_attribute_cache(cluster) + zha_device = await device_joined(zigpy_cover_device) + assert ( + not zha_device.endpoints[1] + .all_cluster_handlers[f"1:0x{cluster.cluster_id:04x}"] + .inverted + ) + assert cluster.read_attributes.call_count == 3 + assert ( + WCAttrs.current_position_lift_percentage.name + in cluster.read_attributes.call_args[0][0] + ) + assert ( + WCAttrs.current_position_tilt_percentage.name + in cluster.read_attributes.call_args[0][0] + ) + + entity_id = find_entity_id(Platform.SWITCH, zha_device) + assert entity_id is not None + entity = get_entity(zha_device, entity_id) + assert entity is not None + + # test update + prev_call_count = cluster.read_attributes.call_count + await entity.async_update() + await zha_gateway.async_block_till_done() + assert cluster.read_attributes.call_count == prev_call_count + 1 + assert bool(entity.get_state()["state"]) is False + + # test to see the state remains after tilting to 0% + await send_attributes_report( + zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 0} + ) + assert bool(entity.get_state()["state"]) is False + + with patch( + "zigpy.zcl.Cluster.write_attributes", return_value=[0x1, zcl_f.Status.SUCCESS] + ): + cluster.PLUGGED_ATTR_READS = { + WCAttrs.config_status.name: WCCS.Operational + | WCCS.Open_up_commands_reversed, + } + # turn on from UI + await entity.async_turn_on() + await zha_gateway.async_block_till_done() + assert cluster.write_attributes.call_count == 1 + assert cluster.write_attributes.call_args_list[0] == call( + { + WCAttrs.window_covering_mode.name: WCM.Motor_direction_reversed + | WCM.LEDs_display_feedback + }, + manufacturer=None, + ) + + assert bool(entity.get_state()["state"]) is True + + cluster.write_attributes.reset_mock() + + # turn off from UI + cluster.PLUGGED_ATTR_READS = { + WCAttrs.config_status.name: WCCS.Operational, + } + await entity.async_turn_off() + await zha_gateway.async_block_till_done() + assert cluster.write_attributes.call_count == 1 + assert cluster.write_attributes.call_args_list[0] == call( + {WCAttrs.window_covering_mode.name: WCM.LEDs_display_feedback}, + manufacturer=None, + ) + + assert bool(entity.get_state()["state"]) is False + + cluster.write_attributes.reset_mock() + + # test that sending the command again does not result in a write + await entity.async_turn_off() + await zha_gateway.async_block_till_done() + assert cluster.write_attributes.call_count == 0 + + assert bool(entity.get_state()["state"]) is False + + +async def test_cover_inversion_switch_not_created( + zha_gateway: ZHAGateway, device_joined, zigpy_cover_device +) -> None: + """Test ZHA cover platform.""" + + # load up cover domain + cluster = zigpy_cover_device.endpoints[1].window_covering + cluster.PLUGGED_ATTR_READS = { + WCAttrs.current_position_lift_percentage.name: 65, + WCAttrs.current_position_tilt_percentage.name: 42, + WCAttrs.config_status.name: WCCS(~WCCS.Open_up_commands_reversed), + } + update_attribute_cache(cluster) + zha_device = await device_joined(zigpy_cover_device) + + assert cluster.read_attributes.call_count == 3 + assert ( + WCAttrs.current_position_lift_percentage.name + in cluster.read_attributes.call_args[0][0] + ) + assert ( + WCAttrs.current_position_tilt_percentage.name + in cluster.read_attributes.call_args[0][0] + ) + + # entity should not be created when mode or config status aren't present + entity_id = find_entity_id(Platform.SWITCH, zha_device) + assert entity_id is None diff --git a/zha/zigbee/cluster_handlers/__init__.py b/zha/zigbee/cluster_handlers/__init__.py index be02e69a..9a857449 100644 --- a/zha/zigbee/cluster_handlers/__init__.py +++ b/zha/zigbee/cluster_handlers/__init__.py @@ -555,7 +555,6 @@ async def write_attributes_safe( """Wrap `write_attributes` to throw an exception on attribute write failure.""" res = await self.write_attributes(attributes, manufacturer=manufacturer) - for record in res[0]: if record.status != Status.SUCCESS: try: