From 64593be998dbf192d08b17a06ee781935c1be10e Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Wed, 2 Oct 2024 16:32:56 -0400 Subject: [PATCH] Migrate `device_joined` (`test_device`) --- tests/test_device.py | 82 ++++++++++++++++++++++---------------------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/tests/test_device.py b/tests/test_device.py index 10461dbe..182f31ba 100644 --- a/tests/test_device.py +++ b/tests/test_device.py @@ -1,7 +1,7 @@ """Test ZHA device switch.""" import asyncio -from collections.abc import Awaitable, Callable +from collections.abc import Callable import logging import time from unittest import mock @@ -16,6 +16,7 @@ from zigpy.zcl.foundation import Status, WriteAttributesResponse import zigpy.zdo.types as zdo_t +from tests.common import join_zigpy_device from tests.conftest import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_TYPE from zha.application import Platform from zha.application.const import ( @@ -118,8 +119,8 @@ def device_without_basic_cluster_handler( @pytest.fixture async def ota_zha_device( - device_joined: Callable[[ZigpyDevice], Awaitable[Device]], zigpy_device_mock: Callable[..., ZigpyDevice], + zha_gateway: Gateway, ) -> Device: """ZHA device with OTA cluster fixture.""" zigpy_dev = zigpy_device_mock( @@ -135,7 +136,7 @@ async def ota_zha_device( "test model", ) - zha_device = await device_joined(zigpy_dev) + zha_device = await join_zigpy_device(zha_gateway, zigpy_dev) return zha_device @@ -153,11 +154,10 @@ async def _send_time_changed(zha_gateway: Gateway, seconds: int): async def test_check_available_success( zha_gateway: Gateway, device_with_basic_cluster_handler: ZigpyDevice, # pylint: disable=redefined-outer-name - device_joined: Callable[[ZigpyDevice], Awaitable[Device]], caplog: pytest.LogCaptureFixture, ) -> None: """Check device availability success on 1st try.""" - zha_device = await device_joined(device_with_basic_cluster_handler) + zha_device = await join_zigpy_device(zha_gateway, device_with_basic_cluster_handler) basic_ch = device_with_basic_cluster_handler.endpoints[3].basic assert not zha_device.is_coordinator @@ -249,11 +249,10 @@ def _update_last_seen(*args, **kwargs): # pylint: disable=unused-argument async def test_check_available_unsuccessful( zha_gateway: Gateway, device_with_basic_cluster_handler: ZigpyDevice, # pylint: disable=redefined-outer-name - device_joined: Callable[[ZigpyDevice], Awaitable[Device]], ) -> None: """Check device availability all tries fail.""" - zha_device = await device_joined(device_with_basic_cluster_handler) + zha_device = await join_zigpy_device(zha_gateway, device_with_basic_cluster_handler) basic_ch = device_with_basic_cluster_handler.endpoints[3].basic assert zha_device.available is True @@ -320,13 +319,14 @@ async def test_check_available_unsuccessful( async def test_check_available_no_basic_cluster_handler( zha_gateway: Gateway, device_without_basic_cluster_handler: ZigpyDevice, # pylint: disable=redefined-outer-name - device_joined: Callable[[ZigpyDevice], Awaitable[Device]], caplog: pytest.LogCaptureFixture, ) -> None: """Check device availability for a device without basic cluster.""" caplog.set_level(logging.DEBUG, logger="homeassistant.components.zha") - zha_device = await device_joined(device_without_basic_cluster_handler) + zha_device = await join_zigpy_device( + zha_gateway, device_without_basic_cluster_handler + ) assert zha_device.available is True @@ -344,8 +344,8 @@ async def test_check_available_no_basic_cluster_handler( async def test_device_is_active_coordinator( - device_joined: Callable[[ZigpyDevice], Awaitable[Device]], zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name + zha_gateway: Gateway, ) -> None: """Test that the current coordinator is uniquely detected.""" @@ -362,8 +362,8 @@ async def test_device_is_active_coordinator( # The two coordinators have different IEEE addresses assert current_coord_dev.ieee != old_coord_dev.ieee - current_coordinator = await device_joined(current_coord_dev) - stale_coordinator = await device_joined(old_coord_dev) + current_coordinator = await join_zigpy_device(zha_gateway, current_coord_dev) + stale_coordinator = await join_zigpy_device(zha_gateway, old_coord_dev) # Ensure the current ApplicationController's IEEE matches our coordinator's current_coordinator.gateway.application_controller.state.node_info.ieee = ( @@ -375,8 +375,8 @@ async def test_device_is_active_coordinator( async def test_coordinator_info_uses_node_info( - device_joined: Callable[[ZigpyDevice], Awaitable[Device]], zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name + zha_gateway: Gateway, ) -> None: """Test that the current coordinator uses strings from `node_info`.""" @@ -390,7 +390,7 @@ async def test_coordinator_info_uses_node_info( app.state.node_info.model = "Real Coordinator Model" app.state.node_info.manufacturer = "Real Coordinator Manufacturer" - current_coordinator = await device_joined(current_coord_dev) + current_coordinator = await join_zigpy_device(zha_gateway, current_coord_dev) assert current_coordinator.is_active_coordinator assert current_coordinator.model == "Real Coordinator Model" @@ -398,8 +398,8 @@ async def test_coordinator_info_uses_node_info( async def test_coordinator_info_generic_name( - device_joined: Callable[[ZigpyDevice], Awaitable[Device]], zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name + zha_gateway: Gateway, ) -> None: """Test that the current coordinator uses strings from `node_info`.""" @@ -413,7 +413,7 @@ async def test_coordinator_info_generic_name( app.state.node_info.model = None app.state.node_info.manufacturer = None - current_coordinator = await device_joined(current_coord_dev) + current_coordinator = await join_zigpy_device(zha_gateway, current_coord_dev) assert current_coordinator.is_active_coordinator assert current_coordinator.model == "Generic Zigbee Coordinator (EZSP)" @@ -421,12 +421,12 @@ async def test_coordinator_info_generic_name( async def test_async_get_clusters( - device_joined: Callable[[ZigpyDevice], Awaitable[Device]], zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name + zha_gateway: Gateway, ) -> None: """Test async_get_clusters method.""" zigpy_dev = zigpy_device(with_basic_cluster_handler=True) - zha_device = await device_joined(zigpy_dev) + zha_device = await join_zigpy_device(zha_gateway, zigpy_dev) assert zha_device.async_get_clusters() == { 3: { @@ -444,25 +444,25 @@ async def test_async_get_clusters( async def test_async_get_groupable_endpoints( - device_joined: Callable[[ZigpyDevice], Awaitable[Device]], zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name + zha_gateway: Gateway, ) -> None: """Test async_get_groupable_endpoints method.""" zigpy_dev = zigpy_device(with_basic_cluster_handler=True) zigpy_dev.endpoints[3].add_input_cluster(general.Groups.cluster_id) - zha_device = await device_joined(zigpy_dev) + zha_device = await join_zigpy_device(zha_gateway, zigpy_dev) assert zha_device.async_get_groupable_endpoints() == [3] async def test_async_get_std_clusters( - device_joined: Callable[[ZigpyDevice], Awaitable[Device]], zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name + zha_gateway: Gateway, ) -> None: """Test async_get_std_clusters method.""" zigpy_dev = zigpy_device(with_basic_cluster_handler=True) zigpy_dev.endpoints[3].profile_id = zigpy.profiles.zha.PROFILE_ID - zha_device = await device_joined(zigpy_dev) + zha_device = await join_zigpy_device(zha_gateway, zigpy_dev) assert zha_device.async_get_std_clusters() == { 3: { @@ -480,12 +480,12 @@ async def test_async_get_std_clusters( async def test_async_get_cluster( - device_joined: Callable[[ZigpyDevice], Awaitable[Device]], zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name + zha_gateway: Gateway, ) -> None: """Test async_get_cluster method.""" zigpy_dev = zigpy_device(with_basic_cluster_handler=True) - zha_device = await device_joined(zigpy_dev) + zha_device = await join_zigpy_device(zha_gateway, zigpy_dev) assert zha_device.async_get_cluster(3, general.OnOff.cluster_id) == ( zigpy_dev.endpoints[3].on_off @@ -493,12 +493,12 @@ async def test_async_get_cluster( async def test_async_get_cluster_attributes( - device_joined: Callable[[ZigpyDevice], Awaitable[Device]], zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name + zha_gateway: Gateway, ) -> None: """Test async_get_cluster_attributes method.""" zigpy_dev = zigpy_device(with_basic_cluster_handler=True) - zha_device = await device_joined(zigpy_dev) + zha_device = await join_zigpy_device(zha_gateway, zigpy_dev) assert ( zha_device.async_get_cluster_attributes(3, general.OnOff.cluster_id) @@ -513,12 +513,12 @@ async def test_async_get_cluster_attributes( async def test_async_get_cluster_commands( - device_joined: Callable[[ZigpyDevice], Awaitable[Device]], zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name + zha_gateway: Gateway, ) -> None: """Test async_get_cluster_commands method.""" zigpy_dev = zigpy_device(with_basic_cluster_handler=True) - zha_device = await device_joined(zigpy_dev) + zha_device = await join_zigpy_device(zha_gateway, zigpy_dev) assert zha_device.async_get_cluster_commands(3, general.OnOff.cluster_id) == { CLUSTER_COMMANDS_CLIENT: zigpy_dev.endpoints[3].on_off.client_commands, @@ -527,12 +527,12 @@ async def test_async_get_cluster_commands( async def test_write_zigbee_attribute( - device_joined: Callable[[ZigpyDevice], Awaitable[Device]], zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name + zha_gateway: Gateway, ) -> None: """Test write_zigbee_attribute method.""" zigpy_dev = zigpy_device(with_basic_cluster_handler=True) - zha_device = await device_joined(zigpy_dev) + zha_device = await join_zigpy_device(zha_gateway, zigpy_dev) with pytest.raises( ValueError, @@ -588,12 +588,12 @@ async def test_write_zigbee_attribute( async def test_issue_cluster_command( - device_joined: Callable[[ZigpyDevice], Awaitable[Device]], zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name + zha_gateway: Gateway, ) -> None: """Test issue_cluster_command method.""" zigpy_dev = zigpy_device(with_basic_cluster_handler=True) - zha_device = await device_joined(zigpy_dev) + zha_device = await join_zigpy_device(zha_gateway, zigpy_dev) with pytest.raises( ValueError, @@ -624,14 +624,14 @@ async def test_issue_cluster_command( async def test_async_add_to_group_remove_from_group( - device_joined: Callable[[ZigpyDevice], Awaitable[Device]], zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name + zha_gateway: Gateway, caplog: pytest.LogCaptureFixture, ) -> None: """Test async_add_to_group and async_remove_from_group methods.""" zigpy_dev = zigpy_device(with_basic_cluster_handler=True) zigpy_dev.endpoints[3].add_input_cluster(general.Groups.cluster_id) - zha_device = await device_joined(zigpy_dev) + zha_device = await join_zigpy_device(zha_gateway, zigpy_dev) group: Group = zha_device.gateway.groups[0x1001] @@ -694,19 +694,19 @@ async def test_async_add_to_group_remove_from_group( async def test_async_bind_to_group( - device_joined: Callable[[ZigpyDevice], Awaitable[Device]], zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name + zha_gateway: Gateway, caplog: pytest.LogCaptureFixture, ) -> None: """Test async_bind_to_group method.""" zigpy_dev = zigpy_device(with_basic_cluster_handler=True) zigpy_dev.endpoints[3].add_input_cluster(general.Groups.cluster_id) - zha_device = await device_joined(zigpy_dev) + zha_device = await join_zigpy_device(zha_gateway, zigpy_dev) zigpy_dev_remote = zigpy_device(with_basic_cluster_handler=True) zigpy_dev_remote._ieee = zigpy.types.EUI64.convert("00:0d:7f:00:0a:90:69:e8") zigpy_dev_remote.endpoints[3].add_output_cluster(general.OnOff.cluster_id) - zha_device_remote = await device_joined(zigpy_dev_remote) + zha_device_remote = await join_zigpy_device(zha_gateway, zigpy_dev_remote) assert zha_device_remote is not None group: Group = zha_device.gateway.groups[0x1001] @@ -735,12 +735,12 @@ async def test_async_bind_to_group( async def test_device_automation_triggers( - device_joined: Callable[[ZigpyDevice], Awaitable[Device]], zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name + zha_gateway: Gateway, ) -> None: """Test device automation triggers.""" zigpy_dev = zigpy_device(with_basic_cluster_handler=True) - zha_device = await device_joined(zigpy_dev) + zha_device = await join_zigpy_device(zha_gateway, zigpy_dev) assert get_device_automation_triggers(zha_device) == { ("device_offline", "device_offline"): {"device_event_type": "device_offline"} @@ -753,12 +753,12 @@ async def test_device_automation_triggers( async def test_device_properties( - device_joined: Callable[[ZigpyDevice], Awaitable[Device]], zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name + zha_gateway: Gateway, ) -> None: """Test device properties.""" zigpy_dev = zigpy_device(with_basic_cluster_handler=True) - zha_device = await device_joined(zigpy_dev) + zha_device = await join_zigpy_device(zha_gateway, zigpy_dev) assert zha_device.is_mains_powered is False assert zha_device.is_end_device is True