-
-
Notifications
You must be signed in to change notification settings - Fork 43
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Check that ControllerIEEE and ControllerNWKID are correctly initializ…
…ed, otherwise drop the request (#1801)
- Loading branch information
Showing
1 changed file
with
47 additions
and
19 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,34 +1,62 @@ | ||
#!/usr/bin/env python3 | ||
# -*- coding: utf-8 -*- | ||
# | ||
# Implementation of Zigbee for Domoticz plugin. | ||
# | ||
# This file is part of Zigbee for Domoticz plugin. https://github.com/zigbeefordomoticz/Domoticz-Zigbee | ||
# (C) 2015-2024 | ||
# | ||
# Initial authors: zaraki673 & pipiche38 | ||
# | ||
# SPDX-License-Identifier: GPL-3.0 license | ||
|
||
import struct | ||
|
||
from Modules.sendZigateCommand import raw_APS_request | ||
|
||
|
||
def Decode0041(self, Devices, MsgData, MsgLQI): | ||
self.log.logging('Input', 'Debug', 'Decode0041 - IEEE_addr_req: %s %s %s' % (MsgData, self.ControllerNWKID, self.ControllerIEEE)) | ||
self.log.logging('Input', 'Debug', f"Decode0041 - IEEE_addr_req: {MsgData} {self.ControllerNWKID} {self.ControllerIEEE}") | ||
|
||
if not self.ControllerIEEE or self.ControllerNWKID in (None, "ffff"): | ||
return | ||
|
||
# Parse message data | ||
sqn, srcNwkId, srcEp, nwkid, reqType, startIndex = ( MsgData[:2], MsgData[2:6], MsgData[6:8], MsgData[8:12], MsgData[12:14], MsgData[14:16] ) | ||
|
||
# Log parsed details | ||
self.log.logging('Input', 'Debug', f" source req SrcNwkId: {srcNwkId} NwkId: {nwkid} Type: {reqType} Idx: {startIndex}") | ||
|
||
sqn = MsgData[:2] | ||
srcNwkId = MsgData[2:6] | ||
srcEp = MsgData[6:8] | ||
nwkid = MsgData[8:12] | ||
reqType = MsgData[12:14] | ||
startIndex = MsgData[14:16] | ||
self.log.logging('Input', 'Debug', ' source req nwkid: %s' % srcNwkId) | ||
self.log.logging('Input', 'Debug', ' request NwkId : %s' % nwkid) | ||
self.log.logging('Input', 'Debug', ' request Type : %s' % reqType) | ||
self.log.logging('Input', 'Debug', ' request Idx : %s' % startIndex) | ||
Cluster = '8001' | ||
status, payload = _generate_response_payload(self, nwkid, sqn) | ||
|
||
self.log.logging('Input', 'Debug', f"Decode0041 - response payload: {payload}") | ||
raw_APS_request(self, srcNwkId, '00', Cluster, '0000', payload, zigpyzqn=sqn, zigate_ep='00') | ||
|
||
|
||
def _generate_response_payload(self, nwkid, sqn): | ||
"""Generate the response payload based on the requested nwkid.""" | ||
if nwkid == self.ControllerNWKID: | ||
status = '00' | ||
controller_ieee = '%016x' % struct.unpack('Q', struct.pack('>Q', int(self.ControllerIEEE, 16)))[0] | ||
controller_nwkid = '%04x' % struct.unpack('H', struct.pack('>H', int(self.ControllerNWKID, 16)))[0] | ||
payload = sqn + status + controller_ieee + controller_nwkid + '00' | ||
ieee = _format_ieee(self, self.ControllerIEEE) | ||
nwk_id = _format_nwkid(self, self.ControllerNWKID) | ||
payload = sqn + status + ieee + nwk_id + '00' | ||
elif nwkid in self.ListOfDevices: | ||
status = '00' | ||
device_ieee = '%016x' % struct.unpack('Q', struct.pack('>Q', int(self.ListOfDevices[nwkid]['IEEE'], 16)))[0] | ||
device_nwkid = '%04x' % struct.unpack('H', struct.pack('>H', int(self.ControllerNWKID, 16)))[0] | ||
payload = sqn + status + device_ieee + device_nwkid + '00' | ||
ieee = _format_ieee(self, self.ListOfDevices[nwkid]['IEEE']) | ||
nwk_id = _format_nwkid(self, self.ControllerNWKID) | ||
payload = sqn + status + ieee + nwk_id + '00' | ||
else: | ||
status = '81' | ||
payload = sqn + status + nwkid | ||
self.log.logging('Input', 'Debug', 'Decode0041 - response payload: %s' % payload) | ||
raw_APS_request(self, srcNwkId, '00', Cluster, '0000', payload, zigpyzqn=sqn, zigate_ep='00') | ||
return status, payload | ||
|
||
|
||
def _format_ieee(self, ieee): | ||
"""Format the IEEE address to 16-character hex string.""" | ||
return f"{int(ieee, 16):016x}" | ||
|
||
|
||
def _format_nwkid(self, nwkid): | ||
"""Format the NWK ID to 4-character hex string.""" | ||
return f"{int(nwkid, 16):04x}" |