Skip to content

Commit

Permalink
Migrate device_joined (test_device)
Browse files Browse the repository at this point in the history
  • Loading branch information
puddly committed Oct 2, 2024
1 parent 84b5972 commit 64593be
Showing 1 changed file with 41 additions and 41 deletions.
82 changes: 41 additions & 41 deletions tests/test_device.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Test ZHA device switch."""

import asyncio
from collections.abc import Awaitable, Callable
from collections.abc import Callable
import logging
import time
from unittest import mock
Expand All @@ -16,6 +16,7 @@
from zigpy.zcl.foundation import Status, WriteAttributesResponse
import zigpy.zdo.types as zdo_t

from tests.common import join_zigpy_device
from tests.conftest import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_TYPE
from zha.application import Platform
from zha.application.const import (
Expand Down Expand Up @@ -118,8 +119,8 @@ def device_without_basic_cluster_handler(

@pytest.fixture
async def ota_zha_device(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device_mock: Callable[..., ZigpyDevice],
zha_gateway: Gateway,
) -> Device:
"""ZHA device with OTA cluster fixture."""
zigpy_dev = zigpy_device_mock(
Expand All @@ -135,7 +136,7 @@ async def ota_zha_device(
"test model",
)

zha_device = await device_joined(zigpy_dev)
zha_device = await join_zigpy_device(zha_gateway, zigpy_dev)
return zha_device


Expand All @@ -153,11 +154,10 @@ async def _send_time_changed(zha_gateway: Gateway, seconds: int):
async def test_check_available_success(
zha_gateway: Gateway,
device_with_basic_cluster_handler: ZigpyDevice, # pylint: disable=redefined-outer-name
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
caplog: pytest.LogCaptureFixture,
) -> None:
"""Check device availability success on 1st try."""
zha_device = await device_joined(device_with_basic_cluster_handler)
zha_device = await join_zigpy_device(zha_gateway, device_with_basic_cluster_handler)
basic_ch = device_with_basic_cluster_handler.endpoints[3].basic

assert not zha_device.is_coordinator
Expand Down Expand Up @@ -249,11 +249,10 @@ def _update_last_seen(*args, **kwargs): # pylint: disable=unused-argument
async def test_check_available_unsuccessful(
zha_gateway: Gateway,
device_with_basic_cluster_handler: ZigpyDevice, # pylint: disable=redefined-outer-name
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
) -> None:
"""Check device availability all tries fail."""

zha_device = await device_joined(device_with_basic_cluster_handler)
zha_device = await join_zigpy_device(zha_gateway, device_with_basic_cluster_handler)
basic_ch = device_with_basic_cluster_handler.endpoints[3].basic

assert zha_device.available is True
Expand Down Expand Up @@ -320,13 +319,14 @@ async def test_check_available_unsuccessful(
async def test_check_available_no_basic_cluster_handler(
zha_gateway: Gateway,
device_without_basic_cluster_handler: ZigpyDevice, # pylint: disable=redefined-outer-name
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
caplog: pytest.LogCaptureFixture,
) -> None:
"""Check device availability for a device without basic cluster."""
caplog.set_level(logging.DEBUG, logger="homeassistant.components.zha")

zha_device = await device_joined(device_without_basic_cluster_handler)
zha_device = await join_zigpy_device(
zha_gateway, device_without_basic_cluster_handler
)

assert zha_device.available is True

Expand All @@ -344,8 +344,8 @@ async def test_check_available_no_basic_cluster_handler(


async def test_device_is_active_coordinator(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
zha_gateway: Gateway,
) -> None:
"""Test that the current coordinator is uniquely detected."""

Expand All @@ -362,8 +362,8 @@ async def test_device_is_active_coordinator(
# The two coordinators have different IEEE addresses
assert current_coord_dev.ieee != old_coord_dev.ieee

current_coordinator = await device_joined(current_coord_dev)
stale_coordinator = await device_joined(old_coord_dev)
current_coordinator = await join_zigpy_device(zha_gateway, current_coord_dev)
stale_coordinator = await join_zigpy_device(zha_gateway, old_coord_dev)

# Ensure the current ApplicationController's IEEE matches our coordinator's
current_coordinator.gateway.application_controller.state.node_info.ieee = (
Expand All @@ -375,8 +375,8 @@ async def test_device_is_active_coordinator(


async def test_coordinator_info_uses_node_info(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
zha_gateway: Gateway,
) -> None:
"""Test that the current coordinator uses strings from `node_info`."""

Expand All @@ -390,16 +390,16 @@ async def test_coordinator_info_uses_node_info(
app.state.node_info.model = "Real Coordinator Model"
app.state.node_info.manufacturer = "Real Coordinator Manufacturer"

current_coordinator = await device_joined(current_coord_dev)
current_coordinator = await join_zigpy_device(zha_gateway, current_coord_dev)
assert current_coordinator.is_active_coordinator

assert current_coordinator.model == "Real Coordinator Model"
assert current_coordinator.manufacturer == "Real Coordinator Manufacturer"


async def test_coordinator_info_generic_name(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
zha_gateway: Gateway,
) -> None:
"""Test that the current coordinator uses strings from `node_info`."""

Expand All @@ -413,20 +413,20 @@ async def test_coordinator_info_generic_name(
app.state.node_info.model = None
app.state.node_info.manufacturer = None

current_coordinator = await device_joined(current_coord_dev)
current_coordinator = await join_zigpy_device(zha_gateway, current_coord_dev)
assert current_coordinator.is_active_coordinator

assert current_coordinator.model == "Generic Zigbee Coordinator (EZSP)"
assert current_coordinator.manufacturer == ""


async def test_async_get_clusters(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
zha_gateway: Gateway,
) -> None:
"""Test async_get_clusters method."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zha_device = await device_joined(zigpy_dev)
zha_device = await join_zigpy_device(zha_gateway, zigpy_dev)

assert zha_device.async_get_clusters() == {
3: {
Expand All @@ -444,25 +444,25 @@ async def test_async_get_clusters(


async def test_async_get_groupable_endpoints(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
zha_gateway: Gateway,
) -> None:
"""Test async_get_groupable_endpoints method."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zigpy_dev.endpoints[3].add_input_cluster(general.Groups.cluster_id)
zha_device = await device_joined(zigpy_dev)
zha_device = await join_zigpy_device(zha_gateway, zigpy_dev)

assert zha_device.async_get_groupable_endpoints() == [3]


async def test_async_get_std_clusters(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
zha_gateway: Gateway,
) -> None:
"""Test async_get_std_clusters method."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zigpy_dev.endpoints[3].profile_id = zigpy.profiles.zha.PROFILE_ID
zha_device = await device_joined(zigpy_dev)
zha_device = await join_zigpy_device(zha_gateway, zigpy_dev)

assert zha_device.async_get_std_clusters() == {
3: {
Expand All @@ -480,25 +480,25 @@ async def test_async_get_std_clusters(


async def test_async_get_cluster(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
zha_gateway: Gateway,
) -> None:
"""Test async_get_cluster method."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zha_device = await device_joined(zigpy_dev)
zha_device = await join_zigpy_device(zha_gateway, zigpy_dev)

assert zha_device.async_get_cluster(3, general.OnOff.cluster_id) == (
zigpy_dev.endpoints[3].on_off
)


async def test_async_get_cluster_attributes(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
zha_gateway: Gateway,
) -> None:
"""Test async_get_cluster_attributes method."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zha_device = await device_joined(zigpy_dev)
zha_device = await join_zigpy_device(zha_gateway, zigpy_dev)

assert (
zha_device.async_get_cluster_attributes(3, general.OnOff.cluster_id)
Expand All @@ -513,12 +513,12 @@ async def test_async_get_cluster_attributes(


async def test_async_get_cluster_commands(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
zha_gateway: Gateway,
) -> None:
"""Test async_get_cluster_commands method."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zha_device = await device_joined(zigpy_dev)
zha_device = await join_zigpy_device(zha_gateway, zigpy_dev)

assert zha_device.async_get_cluster_commands(3, general.OnOff.cluster_id) == {
CLUSTER_COMMANDS_CLIENT: zigpy_dev.endpoints[3].on_off.client_commands,
Expand All @@ -527,12 +527,12 @@ async def test_async_get_cluster_commands(


async def test_write_zigbee_attribute(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
zha_gateway: Gateway,
) -> None:
"""Test write_zigbee_attribute method."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zha_device = await device_joined(zigpy_dev)
zha_device = await join_zigpy_device(zha_gateway, zigpy_dev)

with pytest.raises(
ValueError,
Expand Down Expand Up @@ -588,12 +588,12 @@ async def test_write_zigbee_attribute(


async def test_issue_cluster_command(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
zha_gateway: Gateway,
) -> None:
"""Test issue_cluster_command method."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zha_device = await device_joined(zigpy_dev)
zha_device = await join_zigpy_device(zha_gateway, zigpy_dev)

with pytest.raises(
ValueError,
Expand Down Expand Up @@ -624,14 +624,14 @@ async def test_issue_cluster_command(


async def test_async_add_to_group_remove_from_group(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
zha_gateway: Gateway,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test async_add_to_group and async_remove_from_group methods."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zigpy_dev.endpoints[3].add_input_cluster(general.Groups.cluster_id)
zha_device = await device_joined(zigpy_dev)
zha_device = await join_zigpy_device(zha_gateway, zigpy_dev)

group: Group = zha_device.gateway.groups[0x1001]

Expand Down Expand Up @@ -694,19 +694,19 @@ async def test_async_add_to_group_remove_from_group(


async def test_async_bind_to_group(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
zha_gateway: Gateway,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test async_bind_to_group method."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zigpy_dev.endpoints[3].add_input_cluster(general.Groups.cluster_id)
zha_device = await device_joined(zigpy_dev)
zha_device = await join_zigpy_device(zha_gateway, zigpy_dev)

zigpy_dev_remote = zigpy_device(with_basic_cluster_handler=True)
zigpy_dev_remote._ieee = zigpy.types.EUI64.convert("00:0d:7f:00:0a:90:69:e8")
zigpy_dev_remote.endpoints[3].add_output_cluster(general.OnOff.cluster_id)
zha_device_remote = await device_joined(zigpy_dev_remote)
zha_device_remote = await join_zigpy_device(zha_gateway, zigpy_dev_remote)
assert zha_device_remote is not None

group: Group = zha_device.gateway.groups[0x1001]
Expand Down Expand Up @@ -735,12 +735,12 @@ async def test_async_bind_to_group(


async def test_device_automation_triggers(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
zha_gateway: Gateway,
) -> None:
"""Test device automation triggers."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zha_device = await device_joined(zigpy_dev)
zha_device = await join_zigpy_device(zha_gateway, zigpy_dev)

assert get_device_automation_triggers(zha_device) == {
("device_offline", "device_offline"): {"device_event_type": "device_offline"}
Expand All @@ -753,12 +753,12 @@ async def test_device_automation_triggers(


async def test_device_properties(
device_joined: Callable[[ZigpyDevice], Awaitable[Device]],
zigpy_device: Callable[..., ZigpyDevice], # pylint: disable=redefined-outer-name
zha_gateway: Gateway,
) -> None:
"""Test device properties."""
zigpy_dev = zigpy_device(with_basic_cluster_handler=True)
zha_device = await device_joined(zigpy_dev)
zha_device = await join_zigpy_device(zha_gateway, zigpy_dev)

assert zha_device.is_mains_powered is False
assert zha_device.is_end_device is True
Expand Down

0 comments on commit 64593be

Please sign in to comment.