diff --git a/Z4D_decoders/z4d_decoder_IEEE_addr_req.py b/Z4D_decoders/z4d_decoder_IEEE_addr_req.py index abc8ed351..69121aba0 100644 --- a/Z4D_decoders/z4d_decoder_IEEE_addr_req.py +++ b/Z4D_decoders/z4d_decoder_IEEE_addr_req.py @@ -1,34 +1,62 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Implementation of Zigbee for Domoticz plugin. +# +# This file is part of Zigbee for Domoticz plugin. https://github.com/zigbeefordomoticz/Domoticz-Zigbee +# (C) 2015-2024 +# +# Initial authors: zaraki673 & pipiche38 +# +# SPDX-License-Identifier: GPL-3.0 license + import struct from Modules.sendZigateCommand import raw_APS_request def Decode0041(self, Devices, MsgData, MsgLQI): - self.log.logging('Input', 'Debug', 'Decode0041 - IEEE_addr_req: %s %s %s' % (MsgData, self.ControllerNWKID, self.ControllerIEEE)) + self.log.logging('Input', 'Debug', f"Decode0041 - IEEE_addr_req: {MsgData} {self.ControllerNWKID} {self.ControllerIEEE}") + + if not self.ControllerIEEE or self.ControllerNWKID in (None, "ffff"): + return + + # Parse message data + sqn, srcNwkId, srcEp, nwkid, reqType, startIndex = ( MsgData[:2], MsgData[2:6], MsgData[6:8], MsgData[8:12], MsgData[12:14], MsgData[14:16] ) + + # Log parsed details + self.log.logging('Input', 'Debug', f" source req SrcNwkId: {srcNwkId} NwkId: {nwkid} Type: {reqType} Idx: {startIndex}") - sqn = MsgData[:2] - srcNwkId = MsgData[2:6] - srcEp = MsgData[6:8] - nwkid = MsgData[8:12] - reqType = MsgData[12:14] - startIndex = MsgData[14:16] - self.log.logging('Input', 'Debug', ' source req nwkid: %s' % srcNwkId) - self.log.logging('Input', 'Debug', ' request NwkId : %s' % nwkid) - self.log.logging('Input', 'Debug', ' request Type : %s' % reqType) - self.log.logging('Input', 'Debug', ' request Idx : %s' % startIndex) Cluster = '8001' + status, payload = _generate_response_payload(self, nwkid, sqn) + + self.log.logging('Input', 'Debug', f"Decode0041 - response payload: {payload}") + raw_APS_request(self, srcNwkId, '00', Cluster, '0000', payload, zigpyzqn=sqn, zigate_ep='00') + + +def _generate_response_payload(self, nwkid, sqn): + """Generate the response payload based on the requested nwkid.""" if nwkid == self.ControllerNWKID: status = '00' - controller_ieee = '%016x' % struct.unpack('Q', struct.pack('>Q', int(self.ControllerIEEE, 16)))[0] - controller_nwkid = '%04x' % struct.unpack('H', struct.pack('>H', int(self.ControllerNWKID, 16)))[0] - payload = sqn + status + controller_ieee + controller_nwkid + '00' + ieee = _format_ieee(self, self.ControllerIEEE) + nwk_id = _format_nwkid(self, self.ControllerNWKID) + payload = sqn + status + ieee + nwk_id + '00' elif nwkid in self.ListOfDevices: status = '00' - device_ieee = '%016x' % struct.unpack('Q', struct.pack('>Q', int(self.ListOfDevices[nwkid]['IEEE'], 16)))[0] - device_nwkid = '%04x' % struct.unpack('H', struct.pack('>H', int(self.ControllerNWKID, 16)))[0] - payload = sqn + status + device_ieee + device_nwkid + '00' + ieee = _format_ieee(self, self.ListOfDevices[nwkid]['IEEE']) + nwk_id = _format_nwkid(self, self.ControllerNWKID) + payload = sqn + status + ieee + nwk_id + '00' else: status = '81' payload = sqn + status + nwkid - self.log.logging('Input', 'Debug', 'Decode0041 - response payload: %s' % payload) - raw_APS_request(self, srcNwkId, '00', Cluster, '0000', payload, zigpyzqn=sqn, zigate_ep='00') \ No newline at end of file + return status, payload + + +def _format_ieee(self, ieee): + """Format the IEEE address to 16-character hex string.""" + return f"{int(ieee, 16):016x}" + + +def _format_nwkid(self, nwkid): + """Format the NWK ID to 4-character hex string.""" + return f"{int(nwkid, 16):04x}"