Skip to content

Commit

Permalink
Network scanning interface (#55)
Browse files Browse the repository at this point in the history
* Implement network scanning

* Bump minimum package versions
  • Loading branch information
puddly authored Jan 23, 2025
1 parent 2f7eec1 commit 22d2ccb
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dependencies = [
"click",
"coloredlogs",
"scapy",
"zigpy>=0.55.0",
"zigpy>=0.75.0",
"bellows>=0.43.0",
"zigpy-deconz>=0.21.0",
"zigpy-xbee>=0.18.0",
Expand Down
63 changes: 63 additions & 0 deletions zigpy_cli/radio.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,3 +316,66 @@ async def channel_hopper():
+ b"\n"
)
output.flush()


@radio.command()
@click.pass_obj
@click.option("-r", "--randomize", is_flag=True, type=bool, default=False)
@click.option("-u", "--unfiltered", is_flag=True, type=bool, default=False)
@click.option("-n", "--num-scans", type=int, default=-1)
@click.option("-d", "--duration-exponent", type=int, default=4)
@click.option(
"-c",
"--channels",
type=CHANNELS_LIST,
default=zigpy.types.Channels.ALL_CHANNELS,
)
@click_coroutine
async def network_scan(
app, randomize, unfiltered, num_scans, duration_exponent, channels
):
if app._network_scan is ControllerApplication._network_scan:
raise click.ClickException("Network scan is not supported by this radio")

await app.startup()
LOGGER.info("Running scan...")

seen_beacons = set()

if randomize:

def channels_iter_func():
while True:
yield random.choice(tuple(channels))

channels_iter = channels_iter_func()
else:
channels_iter = itertools.cycle(channels)

for scan in itertools.count():
channel = next(channels_iter)

if num_scans != -1 and scan > num_scans:
break

print(f"Scanning channel {channel}\r", end="", flush=True)

async for network in app.network_scan(
channels=zigpy.types.Channels.from_channel_list([channel]),
duration_exp=duration_exponent,
):
key = network.replace(lqi=None, rssi=None)

if key in seen_beacons:
continue

seen_beacons.add(key)

print(
f"channel: {network.channel}, "
f"network: {network.pan_id} ({network.extended_pan_id}), "
f"permitting joins: {int(network.permit_joining)}, "
f"nwk update id: {network.nwk_update_id}, "
f"lqi: {network.lqi:>4}, "
f"rssi: {network.rssi:>3}"
)

0 comments on commit 22d2ccb

Please sign in to comment.