Skip to content

Commit

Permalink
Sync with zigpy 0.60.0 changes (#595)
Browse files Browse the repository at this point in the history
* Replace `permit_with_key` with `permit_with_link_key`

* Use zigpy `probe`

* Use zigpy device schema

* Use zigpy watchdog

* Fix existing unit tests

* Load board info into node state object

* Bump minimum zigpy version to 0.60.0

* Add a unit test for missing token value

* Remove dead code
  • Loading branch information
puddly authored Nov 16, 2023
1 parent 29dec53 commit 6b85794
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 434 deletions.
9 changes: 4 additions & 5 deletions bellows/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

import click
import click_log

from bellows.config import CONF_DEVICE, CONF_DEVICE_BAUDRATE, CONF_FLOW_CONTROL
import zigpy.config

from . import opts

Expand All @@ -16,9 +15,9 @@
@click.pass_context
def main(ctx, device, baudrate, flow_control):
ctx.obj = {
CONF_DEVICE: device,
CONF_DEVICE_BAUDRATE: baudrate,
CONF_FLOW_CONTROL: flow_control,
zigpy.config.CONF_DEVICE_PATH: device,
zigpy.config.CONF_DEVICE_BAUDRATE: baudrate,
zigpy.config.CONF_DEVICE_FLOW_CONTROL: flow_control,
}
click_log.basic_config()

Expand Down
7 changes: 3 additions & 4 deletions bellows/cli/opts.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import os

import click

from bellows.config import CONF_FLOW_CONTROL_DEFAULT
from zigpy.config import CONF_DEVICE_FLOW_CONTROL

from . import util

Expand Down Expand Up @@ -61,8 +60,8 @@

flow_control = click.option(
"--flow-control",
default=CONF_FLOW_CONTROL_DEFAULT,
type=click.Choice((CONF_FLOW_CONTROL_DEFAULT, "hardware")),
default="software",
type=click.Choice(CONF_DEVICE_FLOW_CONTROL),
envvar="EZSP_FLOW_CONTROL",
help="use hardware flow control",
)
Expand Down
14 changes: 0 additions & 14 deletions bellows/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,16 @@
CONF_NWK_TC_LINK_KEY,
CONF_NWK_UPDATE_ID,
CONFIG_SCHEMA,
SCHEMA_DEVICE,
cv_boolean,
)

CONF_DEVICE_BAUDRATE = "baudrate"
CONF_USE_THREAD = "use_thread"
CONF_EZSP_CONFIG = "ezsp_config"
CONF_EZSP_POLICIES = "ezsp_policies"
CONF_PARAM_MAX_WATCHDOG_FAILURES = "max_watchdog_failures"
CONF_FLOW_CONTROL = "flow_control"
CONF_FLOW_CONTROL_DEFAULT = "software"

SCHEMA_DEVICE = SCHEMA_DEVICE.extend(
{
vol.Optional(CONF_DEVICE_BAUDRATE, default=57600): int,
vol.Optional(CONF_FLOW_CONTROL, default=CONF_FLOW_CONTROL_DEFAULT): vol.In(
("hardware", "software")
),
},
)

CONFIG_SCHEMA = CONFIG_SCHEMA.extend(
{
vol.Required(CONF_DEVICE): SCHEMA_DEVICE,
vol.Optional(CONF_PARAM_MAX_WATCHDOG_FAILURES, default=4): int,
vol.Optional(CONF_EZSP_CONFIG, default={}): dict,
vol.Optional(CONF_EZSP_POLICIES, default={}): vol.Schema(
Expand Down
61 changes: 15 additions & 46 deletions bellows/ezsp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,46 +97,6 @@ def maybe_remove(_):
with contextlib.suppress(ValueError):
listeners.remove(future)

@classmethod
async def probe(cls, device_config: dict) -> bool | dict[str, int | str | bool]:
"""Probe port for the device presence."""
device_config = conf.SCHEMA_DEVICE(device_config)
probe_configs = [device_config]

# Try probing with 115200 baud rate first, as it is the most common
if device_config[conf.CONF_DEVICE_BAUDRATE] != 115200:
probe_configs.insert(
0, {**device_config, conf.CONF_DEVICE_BAUDRATE: 115200}
)

for config in probe_configs:
ezsp = cls(config)

try:
async with asyncio_timeout(
UART_PROBE_TIMEOUT
if not ezsp.is_tcp_serial_port
else NETWORK_PROBE_TIMEOUT
):
await ezsp._probe()

return config
except Exception as exc:
LOGGER.debug(
"Unsuccessful radio probe of '%s' port",
device_config[conf.CONF_DEVICE_PATH],
exc_info=exc,
)
finally:
ezsp.close()

return False

async def _probe(self) -> None:
"""Open port and try sending a command"""
await self.connect(use_thread=False)
await self.startup_reset()

@property
def is_tcp_serial_port(self) -> bool:
parsed_path = urllib.parse.urlparse(self._config[conf.CONF_DEVICE_PATH])
Expand Down Expand Up @@ -344,10 +304,12 @@ def frame_received(self, data: bytes) -> None:

self._protocol(data)

async def get_board_info(self) -> tuple[str, str, str]:
async def get_board_info(
self,
) -> tuple[str, str, str | None] | tuple[None, None, str | None]:
"""Return board info."""

tokens = []
tokens = {}

for token in (t.EzspMfgTokenId.MFG_STRING, t.EzspMfgTokenId.MFG_BOARD_NAME):
(value,) = await self.getMfgToken(token)
Expand All @@ -361,11 +323,15 @@ async def get_board_info(self) -> tuple[str, str, str]:
except UnicodeDecodeError:
result = "0x" + result.hex().upper()

tokens.append(result)
if not result:
result = None

tokens[token] = result

(status, ver_info_bytes) = await self.getValue(
self.types.EzspValueId.VALUE_VERSION_INFO
)
version = None

if status == t.EmberStatus.SUCCESS:
build, ver_info_bytes = t.uint16_t.deserialize(ver_info_bytes)
Expand All @@ -374,9 +340,12 @@ async def get_board_info(self) -> tuple[str, str, str]:
patch, ver_info_bytes = t.uint8_t.deserialize(ver_info_bytes)
special, ver_info_bytes = t.uint8_t.deserialize(ver_info_bytes)
version = f"{major}.{minor}.{patch}.{special} build {build}"
else:
version = "unknown stack version"
return tokens[0], tokens[1], version

return (
tokens[t.EzspMfgTokenId.MFG_STRING],
tokens[t.EzspMfgTokenId.MFG_BOARD_NAME],
version,
)

async def _get_nv3_restored_eui64_key(self) -> t.NV3KeyId | None:
"""Get the NV3 key for the device's restored EUI64, if one exists."""
Expand Down
13 changes: 4 additions & 9 deletions bellows/uart.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,9 @@
else:
from asyncio import timeout as asyncio_timeout # pragma: no cover

import zigpy.config
import zigpy.serial

from bellows.config import (
CONF_DEVICE_BAUDRATE,
CONF_DEVICE_PATH,
CONF_FLOW_CONTROL,
CONF_FLOW_CONTROL_DEFAULT,
)
from bellows.thread import EventLoopThread, ThreadsafeProxy
import bellows.types as t

Expand Down Expand Up @@ -376,16 +371,16 @@ async def _connect(config, application):
connection_done_future = loop.create_future()
protocol = Gateway(application, connection_future, connection_done_future)

if config[CONF_FLOW_CONTROL] == CONF_FLOW_CONTROL_DEFAULT:
if config[zigpy.config.CONF_DEVICE_FLOW_CONTROL] is None:
xon_xoff, rtscts = True, False
else:
xon_xoff, rtscts = False, True

transport, protocol = await zigpy.serial.create_serial_connection(
loop,
lambda: protocol,
url=config[CONF_DEVICE_PATH],
baudrate=config[CONF_DEVICE_BAUDRATE],
url=config[zigpy.config.CONF_DEVICE_PATH],
baudrate=config[zigpy.config.CONF_DEVICE_BAUDRATE],
xonxoff=xon_xoff,
rtscts=rtscts,
)
Expand Down
Loading

0 comments on commit 6b85794

Please sign in to comment.