Skip to content

Commit

Permalink
Parse EZSP readCounters laxly (#644)
Browse files Browse the repository at this point in the history
* Parse EZSP counters laxly

* Move them up to the EZSP protocol abstraction

* Fix unit tests
  • Loading branch information
puddly authored Aug 9, 2024
1 parent 15f5f36 commit da8a0c3
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 8 deletions.
8 changes: 8 additions & 0 deletions bellows/ezsp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,11 @@ async def send_broadcast(
@abc.abstractmethod
async def set_source_route(self, nwk: t.NWK, relays: list[t.NWK]) -> t.sl_Status:
raise NotImplementedError

@abc.abstractmethod
async def read_counters(self) -> dict[t.EmberCounterType, int]:
raise NotImplementedError

@abc.abstractmethod
async def read_and_clear_counters(self) -> dict[t.EmberCounterType, int]:
raise NotImplementedError
8 changes: 8 additions & 0 deletions bellows/ezsp/v4/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,11 @@ async def send_broadcast(
async def set_source_route(self, nwk: t.NWK, relays: list[t.NWK]) -> t.sl_Status:
(res,) = await self.setSourceRoute(destination=nwk, relayList=relays)
return t.sl_Status.from_ember_status(res)

async def read_counters(self) -> dict[t.EmberCounterType, t.uint16_t]:
(res,) = await self.readCounters()
return dict(zip(t.EmberCounterType, res))

async def read_and_clear_counters(self) -> dict[t.EmberCounterType, t.uint16_t]:
(res,) = await self.readAndClearCounters()
return dict(zip(t.EmberCounterType, res))
4 changes: 2 additions & 2 deletions bellows/ezsp/v4/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,14 @@
0x65,
{},
{
"values": t.FixedList[t.uint16_t, len(t.EmberCounterType)],
"values": t.List[t.uint16_t],
},
),
"readCounters": (
0xF1,
{},
{
"values": t.FixedList[t.uint16_t, len(t.EmberCounterType)],
"values": t.List[t.uint16_t],
},
),
"counterRolloverHandler": (
Expand Down
6 changes: 3 additions & 3 deletions bellows/zigbee/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -890,11 +890,11 @@ async def _watchdog_feed(self):
)

if remainder > 0:
(res,) = await self._ezsp.readCounters()
current_counters = await self._ezsp.read_counters()
else:
(res,) = await self._ezsp.readAndClearCounters()
current_counters = await self._ezsp.read_and_clear_counters()

for cnt_type, value in zip(t.EmberCounterType, res):
for cnt_type, value in current_counters.items():
counters[cnt_type.name[8:]].update(value)

if remainder == 0:
Expand Down
6 changes: 3 additions & 3 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -1210,10 +1210,10 @@ async def counters_mock():
if nop_success % 2:
raise EzspError
else:
return ([0, 1, 2, 3],)
return {t.EmberCounterType(i): v for i, v in enumerate([0, 1, 2, 3])}
raise asyncio.TimeoutError

app._ezsp.readCounters = AsyncMock(side_effect=counters_mock)
app._ezsp.read_counters = AsyncMock(side_effect=counters_mock)
app._ezsp.nop = AsyncMock(side_effect=EzspError)
app._ezsp.getValue = AsyncMock(
return_value=(t.EzspStatus.ERROR_OUT_OF_MEMORY, b"\x20")
Expand All @@ -1222,7 +1222,7 @@ async def counters_mock():
app._ctrl_event.set()

await app._watchdog_feed()
assert app._ezsp.readCounters.await_count != 0
assert app._ezsp.read_counters.await_count != 0
assert app._ezsp.nop.await_count == 0

cnt = t.EmberCounterType
Expand Down
16 changes: 16 additions & 0 deletions tests/test_ezsp_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,3 +363,19 @@ async def test_add_transient_link_key(ezsp_f) -> None:
key=t.KeyData("ZigBeeAlliance09"),
)
assert status == t.sl_Status.OK


@pytest.mark.parametrize("length", [40, 41])
async def test_read_counters(ezsp_f, length: int) -> None:
"""Test parsing of a `readCounters` response, including truncation."""
ezsp_f.readCounters.return_value = (list(range(length)),)
ezsp_f.readAndClearCounters.return_value = (list(range(length)),)
counters1 = await ezsp_f.read_counters()
counters2 = await ezsp_f.read_and_clear_counters()
assert (
ezsp_f.readCounters.mock_calls
== ezsp_f.readAndClearCounters.mock_calls
== [call()]
)

assert counters1 == counters2 == {t.EmberCounterType(i): i for i in range(length)}
1 change: 1 addition & 0 deletions tests/test_ezsp_v7.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest

from bellows.ash import DataFrame
import bellows.ezsp.v7
import bellows.types as t

Expand Down

0 comments on commit da8a0c3

Please sign in to comment.