Skip to content

Commit

Permalink
Implement network_scan
Browse files Browse the repository at this point in the history
  • Loading branch information
puddly committed Aug 19, 2024
1 parent 41684b9 commit d97058f
Showing 1 changed file with 38 additions and 0 deletions.
38 changes: 38 additions & 0 deletions bellows/zigbee/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import statistics
import sys
from typing import AsyncGenerator

if sys.version_info[:2] < (3, 11):
from async_timeout import timeout as asyncio_timeout # pragma: no cover
Expand Down Expand Up @@ -698,6 +699,43 @@ async def energy_scan(
for channel in list(channels)
}

async def network_scan(
self, channels: t.Channels, duration: int
) -> AsyncGenerator[zigpy.types.NetworkBeacon]:
"""Scans for networks and yields network beacons."""
queue = asyncio.Queue()

Check warning on line 706 in bellows/zigbee/application.py

View check run for this annotation

Codecov / codecov/patch

bellows/zigbee/application.py#L706

Added line #L706 was not covered by tests

with self._ezsp.callback_for_commands(

Check warning on line 708 in bellows/zigbee/application.py

View check run for this annotation

Codecov / codecov/patch

bellows/zigbee/application.py#L708

Added line #L708 was not covered by tests
{"networkFoundHandler", "scanCompleteHandler"},
callback=lambda command, response: queue.put_nowait((command, response)),
):
# XXX: replace with normal command invocation once overload is removed
(status,) = await self._ezsp._command(

Check warning on line 713 in bellows/zigbee/application.py

View check run for this annotation

Codecov / codecov/patch

bellows/zigbee/application.py#L713

Added line #L713 was not covered by tests
"startScan",
scanType=t.EzspNetworkScanType.ACTIVE_SCAN,
channelMask=channels,
duration=duration,
)

while True:
command, response = await queue.get()

Check warning on line 721 in bellows/zigbee/application.py

View check run for this annotation

Codecov / codecov/patch

bellows/zigbee/application.py#L721

Added line #L721 was not covered by tests

if command == "scanCompleteHandler":
break

Check warning on line 724 in bellows/zigbee/application.py

View check run for this annotation

Codecov / codecov/patch

bellows/zigbee/application.py#L723-L724

Added lines #L723 - L724 were not covered by tests

(networkFound, lastHopLqi, lastHopRssi) = response

Check warning on line 726 in bellows/zigbee/application.py

View check run for this annotation

Codecov / codecov/patch

bellows/zigbee/application.py#L726

Added line #L726 was not covered by tests

yield zigpy.types.NetworkBeacon(

Check warning on line 728 in bellows/zigbee/application.py

View check run for this annotation

Codecov / codecov/patch

bellows/zigbee/application.py#L728

Added line #L728 was not covered by tests
pan_id=networkFound.panId,
extended_pan_id=networkFound.extendedPanId,
channel=networkFound.channel,
nwk_update_id=networkFound.nwkUpdateId,
permit_joining=bool(networkFound.allowingJoin),
stack_profile=networkFound.stackProfile,
lqi=lastHopLqi,
rssi=lastHopRssi,
)

async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:
if not self.is_controller_running:
raise ControllerError("ApplicationController is not running")
Expand Down

0 comments on commit d97058f

Please sign in to comment.