diff --git a/zigpy_znp/zigbee/application.py b/zigpy_znp/zigbee/application.py index 7510375..0293bec 100644 --- a/zigpy_znp/zigbee/application.py +++ b/zigpy_znp/zigbee/application.py @@ -3,6 +3,7 @@ import os import asyncio import logging +from typing import AsyncGenerator import zigpy.zcl import zigpy.zdo @@ -248,6 +249,51 @@ async def set_tx_power(self, dbm: int) -> None: "Requested TX power %d was adjusted to %d", dbm, rsp.StatusOrPower ) + async def network_scan( + self, channels: t.Channels, duration_exp: int + ) -> AsyncGenerator[zigpy.types.NetworkBeacon, None]: + # XXX: Network scanning only works if the stack is not running! + await self._znp.reset() + + async with self._znp.capture_responses( + [ + c.ZDO.BeaconNotifyInd.Callback(partial=True), + c.ZDO.NwkDiscoveryCnf.Callback(partial=True), + ] + ) as updates: + await self._znp.request( + c.ZDO.NetworkDiscoveryReq.Req( + Channels=channels, + ScanDuration=duration_exp, + ), + RspStatus=t.Status.SUCCESS, + ) + + while True: + update = await updates.get() + + if isinstance(update, c.ZDO.NwkDiscoveryCnf.Callback): + break + + for beacon in update.Beacons: + yield zigpy.types.NetworkBeacon( + pan_id=beacon.PanId, + extended_pan_id=beacon.ExtendedPanId, + channel=beacon.Channel, + permit_joining=beacon.PermitJoining, + stack_profile=beacon.StackProfile, + nwk_update_id=beacon.UpdateId, + lqi=beacon.LQI, + src=beacon.Src, + depth=beacon.Depth, + router_capacity=beacon.RouterCapacity, + device_capacity=beacon.DeviceCapacity, + protocol_version=beacon.ProtocolVersion, + # Only thing not supported is `rssi` + ) + + await self.start_network() + def get_dst_address(self, cluster: zigpy.zcl.Cluster) -> zdo_t.MultiAddress: """ Helper to get a dst address for bind/unbind operations.