diff --git a/Classes/WebServer/WebServer.py b/Classes/WebServer/WebServer.py index eaf3d4efd..38ed9c0d2 100644 --- a/Classes/WebServer/WebServer.py +++ b/Classes/WebServer/WebServer.py @@ -1082,8 +1082,8 @@ def rest_zDevice(self, verb, data, parameters): if attribut == "Battery" and attribut in self.ListOfDevices[item]: if self.ListOfDevices[item]["Battery"] in ( {}, ) and "IASBattery" in self.ListOfDevices[item]: device[attribut] = str(self.ListOfDevices[item][ "IASBattery" ]) - elif isinstance( self.ListOfDevices[item]["Battery"], int): - device[attribut] = self.ListOfDevices[item]["Battery"] + elif isinstance( self.ListOfDevices[item]["Battery"], (int,float)): + device[attribut] = int(self.ListOfDevices[item]["Battery"]) device["BatteryInside"] = True elif item == "CheckParam": @@ -1530,43 +1530,53 @@ def rest_zigate_mode(self, verb, data, parameters): _response["Data"] = json.dumps("ZiGate mode: %s requested" % mode) return _response + def rest_battery_state(self, verb, data, parameters): _response = prepResponseMessage(self, setupHeadersResponse()) _response["Headers"]["Content-Type"] = "application/json; charset=utf-8" if verb == "GET": _battEnv = {"Battery":{"<30%":{}, "<50%": {}, ">50%" : {}},"Update Time":{ "Unknown": {}, "< 1 week": {}, "> 1 week": {}}} for x in self.ListOfDevices: + self.logging("Debug", f"rest_battery_state - {x}") if x == "0000": - continue + continue - if self.ListOfDevices[x]["ZDeviceName"] == "": - _deviceName = x - else: - _deviceName = self.ListOfDevices[x]["ZDeviceName"] + battery = self.ListOfDevices[x].get("Battery") - if "Battery" in self.ListOfDevices[x] and isinstance(self.ListOfDevices[x]["Battery"], int): - if self.ListOfDevices[x]["Battery"] > 50: - _battEnv["Battery"][">50%"][_deviceName] = {"Battery": self.ListOfDevices[x]["Battery"]} + if battery is None: + continue + self.logging("Debug", f"rest_battery_state - {x} Battery found") - elif self.ListOfDevices[x]["Battery"] > 30: - _battEnv["Battery"]["<50%"][_deviceName] = {"Battery": self.ListOfDevices[x]["Battery"]} + _device_name = self.ListOfDevices[x].get("ZDeviceName", x ) - else: - _battEnv["Battery"]["<30%"][_deviceName] = {"Battery": self.ListOfDevices[x]["Battery"]} + if not isinstance( battery, (int, float)): + self.logging("Debug", f"rest_battery_state - {x} Battery found, but not int !! {type(battery)}") + continue + battery = int(battery) - if "BatteryUpdateTime" in self.ListOfDevices[x]: - if (int(time.time()) - self.ListOfDevices[x]["BatteryUpdateTime"]) > 604800: # one week in seconds - _battEnv["Update Time"]["> 1 week"][_deviceName] = {"BatteryUpdateTime": self.ListOfDevices[x]["BatteryUpdateTime"]} + if self.ListOfDevices[x]["Battery"] > 50: + _battEnv["Battery"][">50%"][_device_name] = {"Battery": battery} - else: - _battEnv["Update Time"]["< 1 week"][_deviceName] = {"BatteryUpdateTime": self.ListOfDevices[x]["BatteryUpdateTime"]} + elif self.ListOfDevices[x]["Battery"] > 30: + _battEnv["Battery"]["<50%"][_device_name] = {"Battery": battery} + + else: + _battEnv["Battery"]["<30%"][_device_name] = {"Battery": battery} + + if "BatteryUpdateTime" in self.ListOfDevices[x]: + if (int(time.time()) - self.ListOfDevices[x]["BatteryUpdateTime"]) > 604800: # one week in seconds + _battEnv["Update Time"]["> 1 week"][_device_name] = {"BatteryUpdateTime": self.ListOfDevices[x]["BatteryUpdateTime"]} else: - _battEnv["Update Time"]["Unknown"][_deviceName] = "Unknown" + _battEnv["Update Time"]["< 1 week"][_device_name] = {"BatteryUpdateTime": self.ListOfDevices[x]["BatteryUpdateTime"]} + else: + _battEnv["Update Time"]["Unknown"][_device_name] = "Unknown" + + self.logging("Debug", f"rest_battery_state - {_battEnv}") _response["Data"] = json.dumps(_battEnv, sort_keys=True) return _response - + def logging(self, logType, message): self.log.logging("WebServer", logType, message) diff --git a/Classes/WebServer/com.py b/Classes/WebServer/com.py index 952aa2ee6..2bea81a7b 100644 --- a/Classes/WebServer/com.py +++ b/Classes/WebServer/com.py @@ -352,6 +352,8 @@ def run_server(self, host='0.0.0.0', port=9440): # nosec self.logging( "Log", f"++ WebUI - SSL Private key {server_private_key}") context = check_cert_and_key(self, server_certificate, server_private_key) + if context: + context.minimum_version = ssl.TLSVersion.TLSv1_2 if context: self.server = context.wrap_socket( self.server, server_side=True, ) diff --git a/Classes/ZigpyTransport/AppGeneric.py b/Classes/ZigpyTransport/AppGeneric.py index c0fc501cc..b3eba417e 100644 --- a/Classes/ZigpyTransport/AppGeneric.py +++ b/Classes/ZigpyTransport/AppGeneric.py @@ -163,7 +163,7 @@ def connection_lost(self, exc: Exception) -> None: super(type(self),self).connection_lost(exc) - if not self.shutting_down and not self.restarting and isinstance( exc, serial.serialutil.SerialException): + if not self.shutting_down and not self.restarting and isinstance( exc, (serial.serialutil.SerialException, TimeoutError)): LOGGER.error( "++++++++++++++++++++++ Connection to coordinator failed on Serial, let's restart the plugin") LOGGER.warning( f"--> : self.shutting_down: {self.shutting_down}, {self.restarting}") diff --git a/DevicesModules/custom_zlinky.py b/DevicesModules/custom_zlinky.py index a3abf4485..c14004a3a 100644 --- a/DevicesModules/custom_zlinky.py +++ b/DevicesModules/custom_zlinky.py @@ -1,35 +1,56 @@ -import binascii +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Implementation of Zigbee for Domoticz plugin. +# +# This file is part of Zigbee for Domoticz plugin. https://github.com/zigbeefordomoticz/Domoticz-Zigbee +# (C) 2015-2024 +# +# Initial authors: zaraki673 & pipiche38 +# +# SPDX-License-Identifier: GPL-3.0 license + +import binascii from Modules.domoMaj import MajDomoDevice -from Modules.tools import checkAndStoreAttributeValue +from Modules.readAttributes import (ReadAttributeReq_Scheduled_ZLinky, + ReadAttributeRequest_ff66) +from Modules.tools import checkAndStoreAttributeValue, getAttributeValue from Modules.zlinky import (ZLINK_CONF_MODEL, ZLinky_TIC_COMMAND, - convert_kva_to_ampere, decode_STEG, linky_mode, + convert_kva_to_ampere, decode_STEG, + get_linky_mode_from_ep, get_ltarf, get_OPTARIF, + get_ptec, get_tarif_color, linky_mode, store_ZLinky_infos, update_zlinky_device_model_if_needed, zlinky_check_alarm, zlinky_color_tarif, zlinky_totalisateur) -def zlinky_clusters(self, Devices, nwkid, ep, cluster, attribut, value): - self.log.logging( "ZLinky", "Debug", "zlinky_clusters %s - %s/%s Attribute: %s Value: %s" % ( +def zlinky_clusters(self, domoticz_devices, nwkid, ep, cluster, attribut, value): + self.log.logging( "ZLinky", "Debug", "zlinky_clusters %s - %s/%s Attribute: %s Value: >%s<" % ( cluster, nwkid, ep, attribut, value), nwkid, ) + if value == "": + self.log.logging( "ZLinky", "Debug", "zlinky_clusters - empty value, do not go further", nwkid) + return + if cluster == "0b01": - zlinky_meter_identification(self, Devices, nwkid, ep, cluster, attribut, value) - + zlinky_meter_identification(self, nwkid, ep, cluster, attribut, value) + elif cluster == "0702": - zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value) - + zlinky_cluster_metering(self, domoticz_devices, nwkid, ep, cluster, attribut, value) + elif cluster == "0b04": - zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, attribut, value) - + zlinky_cluster_electrical_measurement(self, domoticz_devices, nwkid, ep, cluster, attribut, value) + elif cluster == "ff66": - zlinky_cluster_lixee_private(self, Devices, nwkid, ep, cluster, attribut, value) - -def zlinky_meter_identification(self, Devices, nwkid, ep, cluster, attribut, value): + zlinky_cluster_lixee_private(self, domoticz_devices, nwkid, ep, cluster, attribut, value) + + +def zlinky_meter_identification(self, nwkid, ep, cluster, attribut, value): self.log.logging( "ZLinky", "Debug", "zlinky_meter_identification %s - %s/%s Attribute: %s Value: %s" % ( cluster, nwkid, ep, attribut, value), nwkid, ) - + checkAndStoreAttributeValue( self, nwkid, ep, cluster, attribut, value, ) if attribut == "000d": # Looks like in standard mode PREF is in VA while in historique mode ISOUSC is in A @@ -43,175 +64,229 @@ def zlinky_meter_identification(self, Devices, nwkid, ep, cluster, attribut, val # Mode standard store_ZLinky_infos( self, nwkid, 'PREF', value) store_ZLinky_infos( self, nwkid, 'ISOUSC', convert_kva_to_ampere(value) ) - + elif attribut == "000a": store_ZLinky_infos( self, nwkid, 'VTIC', value) - + elif attribut == "000e": store_ZLinky_infos( self, nwkid, 'PCOUP', value) -def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): - # Smart Energy Metering - - self.log.logging( "ZLinky", "Debug", "zlinky_cluster_metering - %s - %s/%s attribut: %s value: %s" % ( - cluster, nwkid, ep, attribut, value), nwkid, ) - - if attribut == "0000": # CurrentSummationDelivered - # HP or Base - self.log.logging( "ZLinky", "Debug", "Cluster0702 - 0x0000 ZLinky_TIC Value: %s" % (value), nwkid, ) - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value), Attribute_=attribut) - store_ZLinky_infos( self, nwkid, 'BASE', value) - store_ZLinky_infos( self, nwkid, 'EAST', value) - - elif attribut == "0001": # CURRENT_SUMMATION_RECEIVED - self.log.logging("Cluster", "Debug", "Cluster0702 - CURRENT_SUMMATION_RECEIVED %s " % (value), nwkid) - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - store_ZLinky_infos( self, nwkid, 'EAIT', value) - - - elif attribut == "0020": - if value == 0: - return - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - MajDomoDevice(self, Devices, nwkid, ep, "0009", str(value), Attribute_="0020") - zlinky_color_tarif(self, nwkid, str(value)) - store_ZLinky_infos( self, nwkid, 'PTEC', value) - - elif attribut == "0100": - # HC or Base or BBRHCJB - if value == "": - return - self.log.logging( "ZLinky", "Debug", "Cluster0702 - 0x0100 ZLinky_TIC Conso: %s " % (value), nwkid, ) - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - zlinky_totalisateur(self, nwkid, attribut, value) - MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value), Attribute_=attribut) - store_ZLinky_infos( self, nwkid, 'EASF01', value) - store_ZLinky_infos( self, nwkid, 'HCHC', value) - store_ZLinky_infos( self, nwkid, 'EJPHN', value) - store_ZLinky_infos( self, nwkid, 'BBRHCJB', value) - - elif attribut == "0102": - # HP or BBRHPJB - if value == 0: - return - self.log.logging( "ZLinky", "Debug", "Cluster0702 - 0x0100 ZLinky_TIC Conso: %s " % (value), nwkid, ) - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - zlinky_totalisateur(self, nwkid, attribut, value) - MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value), Attribute_=attribut) - store_ZLinky_infos( self, nwkid, 'EASF02', value) - store_ZLinky_infos( self, nwkid, 'HCHP', value) - store_ZLinky_infos( self, nwkid, 'EJPHPM', value) - store_ZLinky_infos( self, nwkid, 'BBRHCJW', value) - - elif attribut == "0104": - if value == 0: +def zlinky_set_color_based_on_counter(self, domoticz_devices, nwkid, ep, cluster, attribut, value): + """ + Sets the color based on the counter and tariff information for the given device. + + Parameters: + domoticz_devices: The list of domoticz_devices to process. + nwkid: The network ID of the device. + ep: The endpoint identifier. + cluster: The cluster type (e.g., "0b01"). + attribut: The attribute being processed. + value: The current value of the attribute. + """ + + def _normalize_tempo_color(color): + if "HP" in color: + prefix = "HP" + elif "HC" in color: + prefix = "HC" + else: + return color # Return the original color if neither "HP" nor "HC" is found + + if "ROUGE" in color: + return "R" + prefix + if "BLANC" in color: + return "W" + prefix + if "BLEU" in color: + return "B" + prefix + return color # Return the original color if no matching color is found + + + def _zlinky_update_color(nwkid, previous_color, new_color): + """Update the device color, if it has changed request a Read Attribute to get the Color""" + + if get_linky_mode_from_ep(self, nwkid) in ( 0, 2): + # Historique mode, we can rely on PTEC + ptect_value = get_ptec(self, nwkid) + self.log.logging("ZLinky", "Debug", f"_zlinky_update_color - PTEC {ptect_value}", nwkid) + + if ptect_value and ptect_value != new_color: + # Looks like the PTEC info is not aligned with the current color ! + self.log.logging("ZLinky", "Status", f"Requesting PTEC as not inline {ptect_value} to {previous_color}/{new_color}", nwkid) + ReadAttributeReq_Scheduled_ZLinky(self, nwkid) + zlinky_color_tarif(self, nwkid, new_color) return - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - zlinky_totalisateur(self, nwkid, attribut, value) - MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(value), Attribute_=attribut) - store_ZLinky_infos( self, nwkid, 'EASF03', value) - store_ZLinky_infos( self, nwkid, 'BBRHCJW', value) - - elif attribut == "0106": - if value == 0: - return - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - zlinky_totalisateur(self, nwkid, attribut, value) - MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(value), Attribute_=attribut) - store_ZLinky_infos( self, nwkid, 'EASF04', value) - store_ZLinky_infos( self, nwkid, 'BBRHPJW', value) + # Standard mode, we rely on LTARF ( Libellé tarif fournisseur en cours) + ltarf_value = _normalize_tempo_color( get_ltarf(self, nwkid) ) + self.log.logging("ZLinky", "Debug", f"_zlinky_update_color - LTARF >{ltarf_value}<", nwkid) + + if ltarf_value and ltarf_value != new_color: + self.log.logging("ZLinky", "Status", f"Requesting LTARF (0xff66) as not inline {ltarf_value} to {previous_color}/{new_color}", nwkid) + ReadAttributeRequest_ff66(self, nwkid) + zlinky_color_tarif(self, nwkid, new_color) + + + def get_corresponding_color(attribut, op_tarifiare): + """Determine the new color based on the attribute and tariff type.""" + color_map = { + "HC..": { + "0100": "HC..", + "0102": "HP.."}, + "TEMPO": { + "0100": "BHC", + "0102": "BHP", + "0104": "WHC", + "0106": "WHP", + "0108": "RHC", + "010a": "RHP"}, + "EJP": { + "0100": "EJPHN", + "0102": "EJPHPM"} + } + self.log.logging("ZLinky", "Debug", f"get_corresponding_color: >{op_tarifiare}< >{attribut}<", nwkid) + return color_map.get(op_tarifiare, {}).get(attribut) + + self.log.logging("ZLinky", "Debug", f"Cluster: {cluster}, Attribute: {attribut}, Value: {value}", nwkid) + + # Fetch current tariff + op_tarifiare = get_OPTARIF(self, nwkid) + self.log.logging("ZLinky", "Debug", f"OPTARIF: {op_tarifiare}", nwkid) + + # Exit early for unsupported tariffs + if op_tarifiare == "BASE" or op_tarifiare not in {"TEMPO", "HC.."}: + return - elif attribut == "0108": - if value == 0: - return - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - zlinky_totalisateur(self, nwkid, attribut, value) - MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(value), Attribute_=attribut) - store_ZLinky_infos( self, nwkid, 'EASF05', value) - store_ZLinky_infos( self, nwkid, 'BBRHCJR', value) + # Get previous values + previous_value = getAttributeValue(self, nwkid, ep, cluster, attribut) - elif attribut == "010a": - if value == 0: - return - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - zlinky_totalisateur(self, nwkid, attribut, value) - MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(value), Attribute_=attribut) - store_ZLinky_infos( self, nwkid, 'EASF06', value) - store_ZLinky_infos( self, nwkid, 'BBRHPJR', value) + # Exit if value is zero or hasn't changed + if value == 0 or previous_value == value: + return - elif attribut == "010c": - if value == 0: - return - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - store_ZLinky_infos( self, nwkid, 'EASF07', value) + # Get previous Color + previous_color_value = getAttributeValue(self, nwkid, ep, "0702", "0020") + previous_color = get_tarif_color(self, nwkid) + self.log.logging("ZLinky", "Debug", f"PrevValue: {previous_value}, PrevValueAttributColor: {previous_color_value} PrevColor: {previous_color}", nwkid) - elif attribut == "010e": - if value == 0: - return - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - store_ZLinky_infos( self, nwkid, 'EASF08', value) + # Determine the current color + new_color = get_corresponding_color(attribut, op_tarifiare) + if not new_color: + return - elif attribut == "0110": - if value == 0: - return - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - store_ZLinky_infos( self, nwkid, 'EASF09', value) + # Handle updates for non-TEMPO tariffs + if op_tarifiare != "TEMPO": + self.log.logging("ZLinky", "Debug", f"Non-TEMPO: PrevColor: {previous_color}, NewColor: {new_color}", nwkid) + _zlinky_update_color(nwkid, previous_color, new_color) + return - elif attribut == "0112": - if value == 0: + # Handle updates for TEMPO-specific tariffs + self.log.logging("ZLinky", "Debug", f"TEMPO: PrevColor: {previous_color}, NewColor: {new_color}", nwkid) + _zlinky_update_color(nwkid, previous_color, new_color) + + +def zlinky_cluster_metering(self, domoticz_devices, nwkid, ep, cluster, attribut, value): + """ + Handles Smart Energy Metering cluster attributes and updates domoticz_devices accordingly. + + Parameters: + domoticz_devices: The list of domoticz_devices to process. + nwkid: The network ID of the device. + ep: The endpoint identifier. + cluster: The cluster type. + attribut: The attribute being processed. + value: The current value of the attribute. + """ + def handle_attribut_value(attribut, store_keys=None, update_color=False, totalize=False, maj_ep=None): + """Helper function to handle attribute values.""" + if not value: return - - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - store_ZLinky_infos( self, nwkid, 'EASF10', value) - - elif attribut == "0307": # PRM - store_ZLinky_infos( self, nwkid, 'PRM', value) - - elif attribut == "0308": # Serial Number - self.log.logging( "ZLinky", "Debug", "Cluster0702 - 0x0308 - Serial Number %s" % (value), nwkid, ) - - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - store_ZLinky_infos( self, nwkid, 'ADC0', value) - store_ZLinky_infos( self, nwkid, 'ADSC', value) - - -def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, attribut, value): + self.log.logging("ZLinky", "Debug", f"Cluster0702 - {attribut} ZLinky_TIC Value: {value}", nwkid) + maj_ep = maj_ep or ep + MajDomoDevice(self, domoticz_devices, nwkid, maj_ep, cluster, str(value), Attribute_=attribut) + + if attribut == "0020": + MajDomoDevice(self, domoticz_devices, nwkid, "01", "0009", value, Attribute_="0020") + zlinky_color_tarif(self, nwkid, str(value)) + + if update_color: + zlinky_set_color_based_on_counter(self, domoticz_devices, nwkid, ep, cluster, attribut, value) + + if totalize: + zlinky_totalisateur(self, nwkid, attribut, value) + + if store_keys: + for key in store_keys: + store_ZLinky_infos(self, nwkid, key, value) + + checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) + + self.log.logging( + "ZLinky", "Debug", + f"zlinky_cluster_metering - {cluster} - {nwkid}/{ep} attribut: {attribut} value: {value}", + nwkid, + ) + + # Define attribute handlers + attribute_handlers = { + "0000": lambda: handle_attribut_value("0000", ["BASE", "EAST"]), + "0001": lambda: handle_attribut_value("0001", ["EAIT"]), + "0020": lambda: handle_attribut_value("0020", ["PTEC"]), + "0100": lambda: handle_attribut_value("0100", ["EASF01", "HCHC", "EJPHN", "BBRHCJB"], update_color=True, totalize=True), + "0102": lambda: handle_attribut_value("0102", ["EASF02", "HCHP", "EJPHPM", "BBRHCJW"], update_color=True, totalize=True), + "0104": lambda: handle_attribut_value("0104", ["EASF03", "BBRHCJW"], update_color=True, totalize=True, maj_ep="f2"), + "0106": lambda: handle_attribut_value("0106", ["EASF04", "BBRHPJW"], update_color=True, totalize=True, maj_ep="f2"), + "0108": lambda: handle_attribut_value("0108", ["EASF05", "BBRHCJR"], update_color=True, totalize=True, maj_ep="f3"), + "010a": lambda: handle_attribut_value("010a", ["EASF06", "BBRHPJR"], update_color=True, totalize=True, maj_ep="f3"), + "010c": lambda: handle_attribut_value("010c", ["EASF07"]), + "010e": lambda: handle_attribut_value("010e", ["EASF08"]), + "0110": lambda: handle_attribut_value("0110", ["EASF09"]), + "0112": lambda: handle_attribut_value("0112", ["EASF10"]), + "0307": lambda: store_ZLinky_infos(self, nwkid, "PRM", value), + "0308": lambda: handle_attribut_value("0308", ["ADC0", "ADSC"]), + } + + # Process attribute using handler + if attribut in attribute_handlers: + attribute_handlers[attribut]() + else: + self.log.logging("ZLinky", "Warning", f"Unhandled attribute: {attribut}", nwkid) + + +def zlinky_cluster_electrical_measurement(self, domoticz_devices, nwkid, ep, cluster, attribut, value): self.log.logging( "ZLinky", "Debug", "zlinky_cluster_electrical_measurement - %s - %s/%s attribut: %s value: %s" % ( cluster, nwkid, ep, attribut, value), nwkid, ) + if attribut == "0305": store_ZLinky_infos( self, nwkid, 'ERQ1', value) elif attribut == "050e": store_ZLinky_infos( self, nwkid, 'ERQ2', value) - + elif attribut == "090e": store_ZLinky_infos( self, nwkid, 'ERQ3', value) elif attribut == "0a0e": store_ZLinky_infos( self, nwkid, 'ERQ4', value) - + elif attribut == "050b": # Active Power - - self.log.logging("Cluster", "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s Power %s" % (cluster, nwkid, ep, value)) + self.log.logging([ "ZLinky", "Cluster"], "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s Power %s" % (cluster, nwkid, ep, value)) checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value)) + MajDomoDevice(self, domoticz_devices, nwkid, ep, cluster, str(value)) store_ZLinky_infos( self, nwkid, 'CCASN', value) elif attribut == "090b": store_ZLinky_infos( self, nwkid, 'CCASN-1',value) - + elif attribut in ("0505", "0905", "0a05"): # RMS Voltage - self.log.logging("Cluster", "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s Voltage %s" % (cluster, nwkid, ep, value)) + self.log.logging([ "ZLinky", "Cluster"], "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s Voltage %s" % (cluster, nwkid, ep, value)) if value == 0xFFFF: return checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) if attribut == "0505": - MajDomoDevice(self, Devices, nwkid, ep, "0001", str(value)) + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0001", str(value)) if "Model" in self.ListOfDevices[nwkid] and self.ListOfDevices[nwkid]["Model"] in ZLINK_CONF_MODEL: store_ZLinky_infos( self, nwkid, 'URMS1', value) elif attribut == "0905": @@ -234,10 +309,10 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att store_ZLinky_infos( self, nwkid, 'IRMS1', value) checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, ep, cluster, str(value), Attribute_=attribut) # Check if Intensity is below subscription level - MajDomoDevice( self, Devices, nwkid, ep, "0009", zlinky_check_alarm(self, Devices, nwkid, ep, value), Attribute_="0005", ) + MajDomoDevice( self, domoticz_devices, nwkid, ep, "0009", zlinky_check_alarm(self, domoticz_devices, nwkid, ep, value), Attribute_="0005", ) elif attribut in ("050a", "090a", "0a0a"): # Max Current if value == 0xFFFF: @@ -256,13 +331,13 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att if value == 0x8000: return checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - + _linkyMode = linky_mode( self, nwkid, protocol=True ) - + if _linkyMode in ( 0, 2,) and attribut == "050d": # Historique Tri store_ZLinky_infos( self, nwkid, 'PMAX', value) - + elif _linkyMode in ( 1, 5, ) and attribut == "050d": # Historic Mono store_ZLinky_infos( self, nwkid, 'SMAXN', value) @@ -304,18 +379,17 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att self.log.logging( "ZLinky", "Error", "=====> zlinky_cluster_electrical_measurement %s - %s/%s Apparent Power %s out of range !!!" % (cluster, nwkid, ep, value), nwkid, ) return checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - + self.log.logging( "ZLinky", "Debug", "=====> zlinky_cluster_electrical_measurement %s - %s/%s Apparent Power %s" % (cluster, nwkid, ep, value), nwkid, ) # ApparentPower (Represents the single phase or Phase A, current demand of apparent (Square root of active and reactive power) power, in VA.) self.log.logging( "ZLinky", "Debug", "=====> zlinky_cluster_electrical_measurement %s - %s/%s Apparent Power %s" % (cluster, nwkid, ep, value), nwkid, ) - _linkyMode = linky_mode( self, nwkid, protocol=True ) - + if _linkyMode in ( 0, 2,) and attribut == "050f": # Historique Tri store_ZLinky_infos( self, nwkid, 'PAPP', value) - + elif _linkyMode in ( 1, 5, ) and attribut == "050f": # Historic Mono store_ZLinky_infos( self, nwkid, 'SINSTS', value) @@ -333,27 +407,27 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att self.log.logging( "ZLinky", "Error", "=====> zlinky_cluster_electrical_measurement %s - %s/%s Unexpected %s/%s linkyMode: %s" % ( cluster, nwkid, ep, attribut, value, _linkyMode ), nwkid, ) return - + tarif_color = None if "ZLinky" in self.ListOfDevices[nwkid] and "Color" in self.ListOfDevices[nwkid]["ZLinky"]: tarif_color = self.ListOfDevices[nwkid]["ZLinky"]["Color"] if tarif_color == "White": - MajDomoDevice(self, Devices, nwkid, "01", cluster, str(0), Attribute_=attribut) - MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(value), Attribute_=attribut) - MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(0), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "01", cluster, str(0), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "f2", cluster, str(value), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "f3", cluster, str(0), Attribute_=attribut) elif tarif_color == "Red": - MajDomoDevice(self, Devices, nwkid, "01", cluster, str(0), Attribute_=attribut) - MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(0), Attribute_=attribut) - MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(value), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "01", cluster, str(0), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "f2", cluster, str(0), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "f3", cluster, str(value), Attribute_=attribut) else: # All others - MajDomoDevice(self, Devices, nwkid, "01", cluster, str(value), Attribute_=attribut) - MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(0), Attribute_=attribut) - MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(0), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "01", cluster, str(value), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "f2", cluster, str(0), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "f3", cluster, str(0), Attribute_=attribut) else: - MajDomoDevice(self, Devices, nwkid, "01", cluster, str(value), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, "01", cluster, str(value), Attribute_=attribut) self.log.logging( "ZLinky", "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s Apparent Power %s" % (cluster, nwkid, ep, value), nwkid, ) @@ -362,25 +436,25 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att elif attribut in ( "0a0f", ): store_ZLinky_infos( self, nwkid, 'SINSTS3', value) - + elif attribut in ("0908", "0a08"): # Current Phase 2 and Current Phase 3 # from random import randrange # value = randrange( 0x0, 0x3c) if value == 0xFFFF: return - MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value), Attribute_=attribut) + MajDomoDevice(self, domoticz_devices, nwkid, ep, cluster, str(value), Attribute_=attribut) # Check if Intensity is below subscription level if attribut == "0908": - self.log.logging("Cluster", "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s %s Current L2 %s" % (cluster, nwkid, ep, attribut, value), nwkid) - MajDomoDevice( self, Devices, nwkid, "f2", "0009", zlinky_check_alarm(self, Devices, nwkid, ep, value), Attribute_="0005", ) + self.log.logging( [ "ZLinky", "Cluster"], "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s %s Current L2 %s" % (cluster, nwkid, ep, attribut, value), nwkid) + MajDomoDevice( self, domoticz_devices, nwkid, "f2", "0009", zlinky_check_alarm(self, domoticz_devices, nwkid, ep, value), Attribute_="0005", ) store_ZLinky_infos( self, nwkid, 'IRMS2', value) elif attribut == "0a08": - self.log.logging("Cluster", "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s %s Current L3 %s" % (cluster, nwkid, ep, attribut, value), nwkid) - MajDomoDevice( self, Devices, nwkid, "f3", "0009", zlinky_check_alarm(self, Devices, nwkid, ep, value), Attribute_="0005", ) + self.log.logging([ "ZLinky", "Cluster"], "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s %s Current L3 %s" % (cluster, nwkid, ep, attribut, value), nwkid) + MajDomoDevice( self, domoticz_devices, nwkid, "f3", "0009", zlinky_check_alarm(self, domoticz_devices, nwkid, ep, value), Attribute_="0005", ) store_ZLinky_infos( self, nwkid, 'IRMS3', value) - + checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) elif attribut == "0511": @@ -391,9 +465,12 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att elif attribut == "0a11": store_ZLinky_infos( self, nwkid, 'UMOY3', value) + - -def zlinky_cluster_lixee_private(self, Devices, nwkid, ep, cluster, attribut, value): +def zlinky_cluster_lixee_private(self, domoticz_devices, nwkid, ep, cluster, attribut, value): + + self.log.logging([ "ZLinky", "Cluster"], "Debug", f"zlinky_cluster_lixee_private ({attribut}) value: {value}", nwkid) + if nwkid not in self.ListOfDevices: return if "Ep" not in self.ListOfDevices[nwkid]: @@ -410,7 +487,6 @@ def zlinky_cluster_lixee_private(self, Devices, nwkid, ep, cluster, attribut, va checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, ZLinky_TIC_COMMAND[ attribut ], value) - if attribut == "0000": # Option tarifaire value = ''.join(map(lambda x: x if ord(x) in range(128) else ' ', value)) @@ -431,14 +507,16 @@ def zlinky_cluster_lixee_private(self, Devices, nwkid, ep, cluster, attribut, va return # Couleur du Lendemain DEMAIN Trigger Alarm + self.log.logging([ "ZLinky", "Cluster"], "Debug", f"zlinky_cluster_lixee_private ({attribut}) DEMAIN {value}", nwkid) + if value == "BLAN": - MajDomoDevice(self, Devices, nwkid, ep, "0009", "20|Tomorrow WHITE day", Attribute_="0001") + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", "20|Tomorrow WHITE day", Attribute_="0001") elif value == "BLEU": - MajDomoDevice(self, Devices, nwkid, ep, "0009", "10|Tomorrow BLUE day", Attribute_="0001") + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", "10|Tomorrow BLUE day", Attribute_="0001") elif value == "ROUG": - MajDomoDevice(self, Devices, nwkid, ep, "0009", "40|Tomorrow RED day", Attribute_="0001") + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", "40|Tomorrow RED day", Attribute_="0001") else: - MajDomoDevice(self, Devices, nwkid, ep, "0009", "00|No information", Attribute_="0001") + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", "00|No information", Attribute_="0001") checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) @@ -467,9 +545,9 @@ def zlinky_cluster_lixee_private(self, Devices, nwkid, ep, cluster, attribut, va value = int(value) if value == 0: - MajDomoDevice(self, Devices, nwkid, ep, "0009", "00|No information", Attribute_="0001") + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", "00|No information", Attribute_="0001") else: - MajDomoDevice( self, Devices, nwkid, ep, "0009", "40|Mobile peak preannoncement: %s" % value, Attribute_="0001", ) + MajDomoDevice( self, domoticz_devices, nwkid, ep, "0009", "40|Mobile peak preannoncement: %s" % value, Attribute_="0001", ) checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) @@ -492,44 +570,44 @@ def zlinky_cluster_lixee_private(self, Devices, nwkid, ep, cluster, attribut, va _tmpattr = "0a08" if value == 0: - MajDomoDevice(self, Devices, nwkid, _tmpep, "0009", "00|Normal", Attribute_="0005") + MajDomoDevice(self, domoticz_devices, nwkid, _tmpep, "0009", "00|Normal", Attribute_="0005") return # value is equal to the Amper over the souscription # Issue critical alarm - MajDomoDevice(self, Devices, nwkid, _tmpep, "0009", "04|Critical", Attribute_="0005") + MajDomoDevice(self, domoticz_devices, nwkid, _tmpep, "0009", "04|Critical", Attribute_="0005") # Isse Current on the corresponding Ampere - MajDomoDevice(self, Devices, nwkid, ep, "0b04", str(value), Attribute_=_tmpattr) + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0b04", str(value), Attribute_=_tmpattr) elif attribut == "0201": - # Standard : NTARF + # Standard : NTARF / Numéro de l’index tarifaire en cours checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - store_ZLinky_infos( self, nwkid, 'LTARF', value) + store_ZLinky_infos( self, nwkid, 'NTARF', value) elif attribut in ( "0200", ): - # Standard : LTARF - s_tarif = "" - if "BLEU" in value: - # HC BLUE - s_tarif = "B" - elif "BLAN" in value: - # HC BLANC - s_tarif = "W" - elif "ROUG" in value: - # HC ROUGE - s_tarif = "R" + # Standard : LTARF / Libellé tarif fournisseur en cours + tarif_mapping = { + "BLEU": "B", + "BLAN": "W", + "ROUG": "R" + } + s_tarif = next((tarif_mapping[key] for key in tarif_mapping if key in value), "") + if "HP" in value: s_tarif += "HP" elif "HC" in value: s_tarif += "HC" - MajDomoDevice(self, Devices, nwkid, ep, "0009", s_tarif, Attribute_="0020") + if len(s_tarif) == 3: + self.log.logging([ "ZLinky", "Cluster"], "Debug", f"zlinky_cluster_lixee_private ({attribut}) LTARF {value}", nwkid) + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", s_tarif, Attribute_="0020") + checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - store_ZLinky_infos( self, nwkid, 'NTARF', value) - + store_ZLinky_infos( self, nwkid, 'LTARF', value) + elif attribut in ( "0202", ): - # Standard : DATE + # Standard : DATE / Date et heure courant checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'DATE', value) diff --git a/Modules/actuators.py b/Modules/actuators.py index 4fd182545..2e8621d85 100644 --- a/Modules/actuators.py +++ b/Modules/actuators.py @@ -257,7 +257,7 @@ def actuator_setcolor(self, nwkid, EPout, value, Color): if ( "Param" in self.ListOfDevices[nwkid] and "moveToLevel" in self.ListOfDevices[nwkid]["Param"] ): transitionMoveLevel = "%04x" % int(self.ListOfDevices[nwkid]["Param"]["moveToLevel"]) - force_color_command = get_deviceconf_parameter_value(self, self.ListOfDevices[nwkid]["Model"], "FORCE_COLOR_COMMAND", return_default=None) + force_color_command = is_tuya_hue_saturation_needed(self, nwkid) ColorCapabilitiesList = device_color_capabilities( self, nwkid, EPout) self.log.logging("Command", "Debug", "actuator_setcolor force_color_command %s" % force_color_command, nwkid) @@ -266,7 +266,7 @@ def actuator_setcolor(self, nwkid, EPout, value, Color): handle_color_mode_2(self, nwkid, EPout, Hue_List) actuator_setlevel(self, nwkid, EPout, value, "Light", transitionMoveLevel) - elif Hue_List["m"] == 3 and force_color_command == "TuyaMovetoHueandSaturation": + elif Hue_List["m"] == 3 and force_color_command: handle_color_mode_tuya( self, nwkid, EPout, Hue_List, value) elif Hue_List["m"] == 3: @@ -308,7 +308,7 @@ def handle_color_mode_3(self, nwkid, EPout, Hue_List): #strxy = Hex_Format(4, x) + Hex_Format(4, y) self.log.logging("Command", "Debug", "handle_color_mode_3 Set Temp X: %s Y: %s" % (x, y), nwkid) transitionMoveLevel , transitionRGB , transitionMoveLevel , transitionHue , transitionTemp = get_all_transition_mode( self, nwkid) - if get_deviceconf_parameter_value(self, self.ListOfDevices[nwkid]["Model"], "TUYAColorControlRgbMode", return_default=None): + if is_tuya_rgb_mode_needed(self, nwkid): tuya_color_control_rgbMode( self, nwkid, "01") zcl_move_to_colour(self, nwkid, EPout, Hex_Format(4, x), Hex_Format(4, y), transitionRGB) @@ -326,7 +326,7 @@ def handle_color_mode_4(self, nwkid, EPout, Hue_List ): TempMired = 1000000 // TempKelvin self.log.logging( "Command", "Log", "handle_color_mode_4 Set Temp Kelvin: %s-%s" % ( TempMired, Hex_Format(4, TempMired)), nwkid ) - if get_deviceconf_parameter_value(self, self.ListOfDevices[nwkid]["Model"], "TUYAColorControlRgbMode", return_default=None): + if is_tuya_rgb_mode_needed(self, nwkid): tuya_color_control_rgbMode( self, nwkid, "01") zcl_move_to_colour_temperature( self, nwkid, EPout, Hex_Format(4, TempMired), transitionTemp) @@ -339,7 +339,7 @@ def handle_color_mode_4(self, nwkid, EPout, Hue_List ): self.log.logging("Command", "Log", "handle_color_mode_4 Set Hue X: %s Saturation: %s" % ( hue, saturation), nwkid) - if get_deviceconf_parameter_value(self, self.ListOfDevices[nwkid]["Model"], "TUYAColorControlRgbMode", return_default=None): + if is_tuya_rgb_mode_needed(self, nwkid): tuya_color_control_rgbMode( self, nwkid, "01") zcl_move_hue_and_saturation(self, nwkid, EPout, Hex_Format(2, hue), Hex_Format(2, saturation), transitionRGB) @@ -353,10 +353,10 @@ def handle_color_mode_9998( self, nwkid, EPout, Hue_List, value): hue = int(hue * 254 // 360) self.log.logging("Command", "Debug", "handle_color_mode_9998 Set Hue X: %s Saturation: %s" % (hue, saturation), nwkid) - if get_deviceconf_parameter_value(self, self.ListOfDevices[nwkid]["Model"], "TUYAColorControlRgbMode", return_default=None): + if is_tuya_rgb_mode_needed(self, nwkid): tuya_color_control_rgbMode( self, nwkid, "01") - if get_deviceconf_parameter_value(self, self.ListOfDevices[nwkid]["Model"], "FORCE_COLOR_COMMAND", return_default=None) == "TuyaMovetoHueandSaturation": + if is_tuya_hue_saturation_needed(self, nwkid): tuya_Move_To_Hue_Saturation( self, nwkid, hue, saturation, value) else: zcl_move_hue_and_saturation(self, nwkid, EPout, Hex_Format(2, hue), Hex_Format(2, saturation), transitionRGB) @@ -365,6 +365,22 @@ def handle_color_mode_9998( self, nwkid, EPout, Hue_List, value): actuator_setlevel(self, nwkid, EPout, value, "Light", transitionMoveLevel) +def is_tuya_hue_saturation_needed(self, nwkid): + model = self.ListOfDevices[nwkid].get("Model","") + tuya_move_to_hue_and_saturation = get_deviceconf_parameter_value(self, model, "FORCE_COLOR_COMMAND", return_default=None) or get_device_config_param(self, nwkid, "FORCE_COLOR_COMMAND") + return tuya_move_to_hue_and_saturation == "TuyaMovetoHueandSaturation" + + +def is_tuya_rgb_mode_needed(self, nwkid): + model = self.ListOfDevices[nwkid].get("Model","") + return get_deviceconf_parameter_value( + self, + model, + "TUYAColorControlRgbMode", + return_default=None, + ) or get_device_config_param(self, nwkid, "TUYAColorControlRgbMode") + + def handle_color_mode_tuya( self, nwkid, EPout, Hue_List, value): self.log.logging("Command", "Debug", "handle_color_mode_tuya Hue_list: %s Value: %s" % ( @@ -379,7 +395,7 @@ def handle_color_mode_tuya( self, nwkid, EPout, Hue_List, value): self.log.logging("Command", "Log", "handle_color_mode_tuya Set Hue X: %s Saturation: %s Value: %s" % ( hue, saturation, value), nwkid) - if get_deviceconf_parameter_value(self, self.ListOfDevices[nwkid]["Model"], "TUYAColorControlRgbMode", return_default=None): + if is_tuya_rgb_mode_needed(self, nwkid): tuya_color_control_rgbMode( self, nwkid, "01") tuya_Move_To_Hue_Saturation( self, nwkid, EPout, hue, saturation, transitionHue, value ) diff --git a/Modules/checkingUpdate.py b/Modules/checkingUpdate.py index d67c305f8..621a89aed 100644 --- a/Modules/checkingUpdate.py +++ b/Modules/checkingUpdate.py @@ -23,6 +23,8 @@ ZIGATEV1OPTIPDM_TXT_RECORD = "zigatev1optipdm.pipiche.net" ZIGATEV2_FIRMWARE_TXT_RECORD = "zigatev2.pipiche.net" +DNS_REQ_TIMEOUT = 2 + ZIGATE_DNS_RECORDS = { "03": ZIGATEV1_FIRMWARE_TXT_RECORD, "04": ZIGATEV1OPTIPDM_TXT_RECORD, @@ -65,12 +67,12 @@ def check_plugin_version_against_dns(self, zigbee_communication, branch, zigate_ return (0, 0, 0) -def _get_dns_txt_record(self, record, timeout=1): +def _get_dns_txt_record(self, record, timeout=DNS_REQ_TIMEOUT): if not self.internet_available: return None try: - result = dns.resolver.resolve(record, "TXT", tcp=True, lifetime=1).response.answer[0] + result = dns.resolver.resolve(record, "TXT", tcp=True, lifetime=timeout).response.answer[0] return str(result[0]).strip('"') except dns.resolver.Timeout: @@ -131,7 +133,7 @@ def is_zigate_firmware_available(self, currentMajorVersion, currentFirmwareVersi def is_internet_available(): try: - response = requests.get("http://www.google.com", timeout=3) + response = requests.get("https://www.google.com", timeout=3) # Check if the status code is a success code (2xx) return response.status_code == 200 except requests.ConnectionError: diff --git a/Modules/heartbeat.py b/Modules/heartbeat.py index 065a3ef80..9a5d10a77 100755 --- a/Modules/heartbeat.py +++ b/Modules/heartbeat.py @@ -47,6 +47,7 @@ ReadAttributeRequest_0402, ReadAttributeRequest_0405, ReadAttributeRequest_0702_0000, + ReadAttributeRequest_0702_0017, ReadAttributeRequest_0702_PC321, ReadAttributeRequest_0702_ZLinky_TIC, ReadAttributeRequest_ff66, @@ -329,9 +330,6 @@ def pollingManufSpecificDevices(self, NwkId, HB): FUNC_MANUF = { "TuyaTRV5Polling": tuya_trv5_polling, - "ZLinkyPolling0702": ReadAttributeRequest_0702_ZLinky_TIC, - "ZLinkyPollingGlobal": ReadAttributeReq_ZLinky, - "PollingCusterff66": ReadAttributeRequest_ff66, "OnOffPollingFreq": ManufSpecOnOffPolling, "PowerPollingFreq": ReadAttributeRequest_0b04_050b_0505_0508, "MeterPollingFreq": ReadAttributeRequest_0702_0000, @@ -344,7 +342,11 @@ def pollingManufSpecificDevices(self, NwkId, HB): "HumiPollingFreq": ReadAttributeRequest_0405, "BattPollingFreq": ReadAttributeRequest_0001, "ZLinkyIndexes": ReadAttributeReq_Scheduled_ZLinky, # Based on a specific time - "ZLinkyPollingPTEC": ReadAttributeReq_Scheduled_ZLinky # Every 15' by default + "ZLinkyPollingPTEC": ReadAttributeReq_Scheduled_ZLinky, # Every 15' by default + "ZLinkyPolling0702": ReadAttributeRequest_0702_ZLinky_TIC, + "ZLinkyPollingGlobal": ReadAttributeReq_ZLinky, + "PollingCusterff66": ReadAttributeRequest_ff66, + "InletTempPolling": ReadAttributeRequest_0702_0017, # Retreive Inlet Temperature } if "Param" not in self.ListOfDevices[NwkId]: @@ -369,6 +371,7 @@ def pollingManufSpecificDevices(self, NwkId, HB): if _current_time == _target_time and "ScheduledZLinkyRead" not in self.ListOfDevices[ NwkId ]: self.ListOfDevices[ NwkId ][ "ScheduledZLinkyRead" ] = True ReadAttributeReq_Scheduled_ZLinky( self, NwkId) + ReadAttributeRequest_ff66( self, NwkId) elif _current_time != _target_time and "ScheduledZLinkyRead" in self.ListOfDevices[ NwkId ]: del self.ListOfDevices[ NwkId ][ "ScheduledZLinkyRead" ] diff --git a/Modules/readAttributes.py b/Modules/readAttributes.py index 9920351bb..68916fa26 100644 --- a/Modules/readAttributes.py +++ b/Modules/readAttributes.py @@ -1333,6 +1333,34 @@ def ReadAttributeRequest_0702_0000(self, key): self.log.logging("ReadAttributes", "Debug", "Request Summation on 0x0702 cluster: " + key + " EPout = " + EPout, nwkid=key) ReadAttributeReq(self, key, ZIGATE_EP, EPout, "0702", listAttributes, ackIsDisabled=is_ack_tobe_disabled(self, key)) + +def ReadAttributeRequest_0702_0017(self, key): + """ + Reads the InletTemperature (Attribute 0x0017) from the Metering cluster (0x0702). + + Parameters: + self: The instance of the class. + key: The network identifier for the device. + """ + + # Define the cluster and attribute + cluster_id = "0702" + attributes_list = [0x0017,] + + # Get the list of endpoints for the cluster + endpoints = getListOfEpForCluster(self, key, cluster_id) + for endpoint in endpoints: + self.log.logging( + "ReadAttributes", + "Debug", + f"Requesting InletTemperature (Attribute {attributes_list}) on cluster {cluster_id} for key {key}, EP = {endpoint}.", + nwkid=key + ) + + # Send the read attribute request + ReadAttributeReq(self, key, ZIGATE_EP, endpoint, cluster_id, attributes_list, ackIsDisabled=is_ack_tobe_disabled(self, key)) + + def ReadAttributeRequest_0702_multiplier_divisor(self, key): ListOfEp = getListOfEpForCluster(self, key, "0702") for EPout in ListOfEp: @@ -1346,10 +1374,11 @@ def ReadAttributeReq_ZLinky(self, nwkid): self.log.logging("ReadAttributes", "Debug", "ReadAttributeReq_ZLinky: " + nwkid + " EPout = " + EPout, nwkid=nwkid) for cluster in ( "0702", "0b01", "0b04" ): - self.log.logging("ZLinky", "Debug", "ReadAttributeReq_ZLinky: " + nwkid + " EPout = " + EPout + " Cluster = " + cluster, nwkid=nwkid) + self.log.logging(["ReadAttributes", "ZLinky"], "Debug", "ReadAttributeReq_ZLinky: " + nwkid + " EPout = " + EPout + " Cluster = " + cluster, nwkid=nwkid) listAttributes = retreive_ListOfAttributesByCluster(self, nwkid, EPout, cluster) ReadAttributeReq(self, nwkid, ZIGATE_EP, EPout, cluster, listAttributes, ackIsDisabled=False) + def ReadAttributeReq_Scheduled_ZLinky(self, nwkid): # - La couleur du jour est déterminée au fur et à mesure de l'année et est diffusée par Edf la veille vers 12h. # - Une journée Tempo commence à 6h du matin jusqu'au lendemain même heure. @@ -1370,7 +1399,7 @@ def ReadAttributeReq_Scheduled_ZLinky(self, nwkid): EPout = "01" for cluster in WORK_TO_BE_DONE: - self.log.logging("ZLinky", "Log", "ReadAttributeReq_Scheduled_ZLinky: %s cluster %s attribute: %s" %( + self.log.logging(["ReadAttributes", "ZLinky"], "Log", "ReadAttributeReq_Scheduled_ZLinky: %s cluster %s attribute: %s" %( nwkid, cluster, WORK_TO_BE_DONE[ cluster ]), nwkid=nwkid) ReadAttributeReq(self, nwkid, ZIGATE_EP, EPout, cluster, WORK_TO_BE_DONE[ cluster ], ackIsDisabled=False) @@ -1394,9 +1423,10 @@ def ReadAttributeRequest_0702_ZLinky_TIC(self, key): else: listAttributes = [0x0020, 0x0100, 0x0102, 0x0104, 0x0106, 0x0108, 0x10A] - self.log.logging("ReadAttributes", "ZLinky", "Request ZLinky infos on 0x0702 cluster: " + key + " EPout = " + EPout, nwkid=key) + self.log.logging(["ReadAttributes", "ZLinky"], "ZLinky", "Request ZLinky infos on 0x0702 cluster: " + key + " EPout = " + EPout, nwkid=key) ReadAttributeReq(self, key, ZIGATE_EP, EPout, "0702", listAttributes, ackIsDisabled=False) + def ReadAttribute_ZLinkyIndex( self, nwkid ): # This can be used as a backup if the reporting do not work @@ -1414,8 +1444,8 @@ def ReadAttribute_ZLinkyIndex( self, nwkid ): nwkid, "0702", INDEX_ATTRIBUTES[ optarif ], optarif), nwkid=nwkid) if optarif in INDEX_ATTRIBUTES: ReadAttributeReq(self, nwkid, ZIGATE_EP, EPout, "0702", INDEX_ATTRIBUTES[ optarif ], ackIsDisabled=False) - - + + def ReadAttributeRequest_0702_PC321(self, key): # Cluster 0x0702 Metering / Specific 0x0000 @@ -1699,12 +1729,12 @@ def ReadAttributeRequest_ff66(self, key): # Cluster ZLinky self.log.logging("ReadAttributes", "Debug", "ReadAttributeRequest_ff66 - Key: %s " % key, nwkid=key) - EPout = "01" - listAttributes = retreive_ListOfAttributesByCluster(self, key, EPout, "ff66") - self.log.logging("ReadAttributes", "Debug", "ReadAttributeRequest_ff66 - Key: %s request %s" % ( - key, listAttributes), nwkid=key) + ep_out = "01" + attribute_list = retreive_ListOfAttributesByCluster(self, key, ep_out, "ff66") + self.log.logging(["ReadAttributes", "ZLinky"], "Debug", "ReadAttributeRequest_ff66 - Key: %s request %s" % ( + key, attribute_list), nwkid=key) - ReadAttributeReq(self, key, ZIGATE_EP, EPout, "ff66", listAttributes, ackIsDisabled=is_ack_tobe_disabled(self, key)) + ReadAttributeReq(self, key, ZIGATE_EP, ep_out, "ff66", attribute_list, ackIsDisabled=is_ack_tobe_disabled(self, key)) def ReadAttributeRequest_fc7d(self, key): # Cluster IKEA diff --git a/Modules/tools.py b/Modules/tools.py index aea21b899..4ced7cdde 100644 --- a/Modules/tools.py +++ b/Modules/tools.py @@ -108,6 +108,10 @@ def getListOfEpForCluster(self, NwkId, SearchCluster): oldFashion = ( "ClusterType" in self.ListOfDevices[NwkId] and self.ListOfDevices[NwkId]["ClusterType"] not in ({}, "") ) for Ep in list(self.ListOfDevices[NwkId]["Ep"].keys()): + # check that is not a Fake Ep + if is_fake_ep(self, NwkId, Ep): + continue + if SearchCluster not in self.ListOfDevices[NwkId]["Ep"][Ep]: continue diff --git a/Modules/zlinky.py b/Modules/zlinky.py index e0751aa94..fd96ecd57 100644 --- a/Modules/zlinky.py +++ b/Modules/zlinky.py @@ -102,17 +102,17 @@ def convert_kva_to_ampere( kva ): return ( kva * 1000) / 200 + def zlinky_color_tarif(self, MsgSrcAddr, color): - if "ZLinky" not in self.ListOfDevices[MsgSrcAddr]: - self.ListOfDevices[MsgSrcAddr]["ZLinky"] = {} - self.ListOfDevices[MsgSrcAddr]["ZLinky"]["Color"] = color + self.ListOfDevices.setdefault(MsgSrcAddr, {}).setdefault("ZLinky", {})["Color"] = color -def store_ZLinky_infos( self, nwkid, command_tic, value): +def store_ZLinky_infos( self, nwkid, command_tic, value): if 'ZLinky' not in self.ListOfDevices[ nwkid ]: self.ListOfDevices[ nwkid ][ 'ZLinky' ] = {} self.ListOfDevices[ nwkid ][ 'ZLinky' ][ command_tic ] = value + def get_ISOUSC( self, nwkid ): if ( @@ -145,23 +145,71 @@ def get_ISOUSC( self, nwkid ): return 0 -def get_OPTARIF( self, nwkid): +def get_OPTARIF(self, nwkid): + """ + Retrieves the 'OPTARIF' value for a given network ID (nwkid) from the 'ZLinky' device data. - if ( - "ZLinky" in self.ListOfDevices[nwkid] - and "OPTARIF" in self.ListOfDevices[nwkid]["ZLinky"] - ): - return self.ListOfDevices[nwkid]["ZLinky"]["OPTARIF"] + If the 'OPTARIF' value is found and is a byte string, it decodes it to a regular string + and removes any null byte characters. If 'OPTARIF' is not found or if it's not a byte + string, the method returns the default value "BASE". - return "BASE" + Args: + nwkid (str): The network ID used to access the device data in ListOfDevices. -def get_instant_power( self, nwkid ): - return round(float(self.ListOfDevices[nwkid]["Ep"]["01"]["0b04"]["050f"]), 2) if "0b04" in self.ListOfDevices[nwkid]["Ep"]["01"] and "050f" in self.ListOfDevices[nwkid]["Ep"]["01"]["0b04"] else 0 + Returns: + str: The cleaned 'OPTARIF' value, or "BASE" if not found. + """ + zlinky = self.ListOfDevices.get(nwkid, {}).get("ZLinky", {}) + + # Get the raw value of "OPTARIF", or default to "BASE" + optarif_value = zlinky.get("OPTARIF", "BASE") + + # If the value is a byte string, decode and clean up + if isinstance(optarif_value, bytes): + # Decode the byte string to UTF-8, ignoring errors, and remove null bytes + optarif_value = optarif_value.decode('utf-8', errors='ignore').strip('\x00') + + # Remove null characters and strip whitespace + if isinstance(optarif_value, str): + optarif_value = optarif_value.replace('\u0000', '').replace('\x00', '').strip() + + return optarif_value + + +def get_instant_power(self, nwkid): + try: + device = self.ListOfDevices.get(nwkid, {}) + ep = device.get("Ep", {}).get("01", {}) + cluster = ep.get("0b04", {}) + power = cluster.get("050f") + return round(float(power), 2) if power is not None else 0 + except (ValueError, TypeError): + return 0 + + +def get_tarif_color(self, nwkid): + return self.ListOfDevices.get(nwkid, {}).get("ZLinky", {}).get("Color") + + +def get_ptec(self, nwkid): + """ Retreive Current Tarif. (Historic)""" + return self.ListOfDevices.get(nwkid, {}).get("ZLinky", {}).get("PTEC") + +def get_ltarf(self, nwkid): + """ Retreive Current Tarif. (Standard)""" + + _ltarf = self.ListOfDevices.get(nwkid, {}).get("ZLinky", {}).get("LTARF") + # If the value is a byte string, decode and clean up + if isinstance(_ltarf, bytes): + # Decode the byte string to UTF-8, ignoring errors, and remove null bytes + _ltarf = _ltarf.decode('utf-8', errors='ignore').strip('\x00') + + # Remove null characters and strip whitespace + if isinstance(_ltarf, str): + _ltarf = _ltarf.replace('\u0000', '').replace('\x00', '').strip() + + return _ltarf -def get_tarif_color( self, nwkid ): - return self.ListOfDevices[nwkid]["ZLinky"]["Color"] if "ZLinky" in self.ListOfDevices[nwkid] and "Color" in self.ListOfDevices[nwkid]["ZLinky"] else None - - def zlinky_check_alarm(self, Devices, MsgSrcAddr, MsgSrcEp, value): if value == 0: @@ -324,74 +372,133 @@ def update_zlinky_device_model_if_needed( self, nwkid ): } +#def decode_STEG(stge): +# """ decoding of STGE Linky frame""" +# # Contact Sec : bit 0 +# # Organe de coupure: bits 1 à 3 +# # Etat du cache-bornes distributeur: bit 4 +# # Surtension sur une des phases: bit 6 +# # Dépassement de la puissance de référence bit 7 +# # Fonctionnement produ/conso: bit 8 +# # Sens de l'énégerie active: bit 9 +# # Tarif en cours contrat fourniture: bit 10 à 13 +# # Tarif en cours contrat distributeur: bit 14 et 15 +# # Mode dégradée de l'horloge: bit 16 +# # Etat de sortie tic: bit 17 +# # Etat de sortie Euridis: bit 19 et 20 +# # Statut du CPL: bit 21 et 22 +# # Synchro CPL: bit 23 +# # Couleur du jour: bit 24 et 25 +# # Couleur du lendemain: bit 26 et 27 +# # Préavis points mobiles: bit 28 à 29 +# # Pointe mobile: bit 30 et 31 +# +# try: +# stge = int(stge, 16) +# except ValueError: +# return {} +# +# STEG_ATTRIBUTES = { +# 'contact_sec': stge & 0x00000001, +# 'organe_coupure': (stge & 0x0000000E) >> 1, +# 'etat_cache_bornes': (stge & 0x00000010) >> 4, +# 'sur_tension': (stge & 0x00000040) >> 6, +# 'depassement_puissance': (stge & 0x00000080) >> 7, +# 'mode_fonctionnement': (stge & 0x00000100) >> 8, +# 'sens_energie': (stge & 0x00000200) >> 9, +# 'tarif_fourniture': (stge & 0x0001F000) >> 12, +# 'tarif_distributeur': (stge & 0x00060000) >> 14, +# 'Mode_horloge': (stge & 0x00100000) >> 16, +# 'sortie_tic': (stge & 0x00200000) >> 17, +# 'sortie_euridis': (stge & 0x00C00000) >> 19, +# 'status_cpl': (stge & 0x03000000) >> 21, +# 'synchro_cpl': (stge & 0x08000000) >> 23, +# 'couleur_jour': (stge & 0x30000000) >> 24, +# 'couleur_demain': (stge & 0xC0000000) >> 26, +# 'preavis_point_mobile': (stge & 0x30000000) >> 28, +# 'pointe_mobile': (stge & 0xC0000000) >> 30, +# } +# +# # Decode mapped values +# STEG_ATTRIBUTES_MAPPING = { +# 'contact_sec': CONTACT_SEC, +# 'etat_cache_bornes': ETAT_CACHE_BORNES, +# 'mode_fonctionnement': FONCTION_PROD_CONSO, +# 'sens_energie': SENS_ENERGIE, +# 'Mode_horloge': HORLOGE, +# 'sortie_tic': SORTIE_TIC, +# 'sortie_euridis': SORTIE_EURIDIS, +# 'status_cpl': STATUT_CPL, +# 'synchro_cpl': SYNCHRO_CPL, +# 'couleur_jour': COULEUR, +# 'couleur_demain': COULEUR, +# } +# +# # Decode mapped values for applicable attributes +# for attr, mapping in STEG_ATTRIBUTES_MAPPING.items(): +# if attr in STEG_ATTRIBUTES and STEG_ATTRIBUTES[attr] in mapping: +# STEG_ATTRIBUTES[attr] = mapping[STEG_ATTRIBUTES[attr]] +# +# return STEG_ATTRIBUTES + def decode_STEG(stge): - """ decoding of STGE Linky frame""" - # Contact Sec : bit 0 - # Organe de coupure: bits 1 à 3 - # Etat du cache-bornes distributeur: bit 4 - # Surtension sur une des phases: bit 6 - # Dépassement de la puissance de référence bit 7 - # Fonctionnement produ/conso: bit 8 - # Sens de l'énégerie active: bit 9 - # Tarif en cours contrat fourniture: bit 10 à 13 - # Tarif en cours contrat distributeur: bit 14 et 15 - # Mode dégradée de l'horloge: bit 16 - # Etat de sortie tic: bit 17 - # Etat de sortie Euridis: bit 19 et 20 - # Statut du CPL: bit 21 et 22 - # Synchro CPL: bit 23 - # Couleur du jour: bit 24 et 25 - # Couleur du lendemain: bit 26 et 27 - # Préavis points mobiles: bit 28 à 29 - # Pointe mobile: bit 30 et 31 + """ Decoding of STGE Linky frame """ + # Attempt to convert the input into an integer, return an empty dictionary on failure. try: stge = int(stge, 16) except ValueError: return {} - STEG_ATTRIBUTES = { - 'contact_sec': stge & 0x00000001, - 'organe_coupure': (stge & 0x0000000E) >> 1, - 'etat_cache_bornes': (stge & 0x00000010) >> 4, - 'sur_tension': (stge & 0x00000040) >> 6, - 'depassement_puissance': (stge & 0x00000080) >> 7, - 'mode_fonctionnement': (stge & 0x00000100) >> 8, - 'sens_energie': (stge & 0x00000200) >> 9, - 'tarif_fourniture': (stge & 0x0001F000) >> 12, - 'tarif_distributeur': (stge & 0x00060000) >> 14, - 'Mode_horloge': (stge & 0x00100000) >> 16, - 'sortie_tic': (stge & 0x00200000) >> 17, - 'sortie_euridis': (stge & 0x00C00000) >> 19, - 'status_cpl': (stge & 0x03000000) >> 21, - 'synchro_cpl': (stge & 0x08000000) >> 23, - 'couleur_jour': (stge & 0x30000000) >> 24, - 'couleur_demain': (stge & 0xC0000000) >> 26, - 'preavis_point_mobile': (stge & 0x30000000) >> 28, - 'pointe_mobile': (stge & 0xC0000000) >> 30, + # Define bit masks and corresponding shifts for each attribute + ATTRIBUTE_DEFINITIONS = [ + ('contact_sec', 0x00000001, 0), # bit 0 + ('organe_coupure', 0x0000000E, 1), # bits 1-3 + ('etat_cache_bornes', 0x00000010, 4), # bit 4 + ('sur_tension', 0x00000040, 6), # bit 6 + ('depassement_puissance', 0x00000080, 7), # bit 7 + ('mode_fonctionnement', 0x00000100, 8), # bit 8 + ('sens_energie', 0x00000200, 9), # bit 9 + ('tarif_fourniture', 0x0001F000, 12), # bits 10-13 + ('tarif_distributeur', 0x00060000, 14), # bits 14-15 + ('Mode_horloge', 0x00100000, 16), # bit 16 + ('sortie_tic', 0x00200000, 17), # bit 17 + ('sortie_euridis', 0x00C00000, 19), # bits 19-20 + ('status_cpl', 0x03000000, 21), # bits 21-22 + ('synchro_cpl', 0x08000000, 23), # bit 23 + ('couleur_jour', 0x30000000, 24), # bits 24-25 + ('couleur_demain', 0xC0000000, 26), # bits 26-27 + ('preavis_point_mobile', 0x30000000, 28), # bits 28-29 + ('pointe_mobile', 0xC0000000, 30), # bits 30-31 + ] + + # Define the mappings for each attribute + MAPPINGS = { + 'contact_sec': {0: "fermé", 1: "ouvert"}, + 'etat_cache_bornes': {0: "fermé", 1: "ouvert"}, + 'mode_fonctionnement': {0: "consommateur", 1: "producteur"}, + 'sens_energie': {0: "énergie active positive", 1: "énergie active négative"}, + 'Mode_horloge': {0: "horloge correcte", 1: "horloge en mode dégradée"}, + 'sortie_tic': {0: "mode historique", 1: "mode standard"}, + 'sortie_euridis': {0: "désactivée", 1: "activée sans sécurité", 3: "activée avec sécurité"}, + 'status_cpl': {0: "New/Unlock", 1: "New/Lock", 2: "Registered"}, + 'synchro_cpl': {0: "compteur non synchronisé", 1: "compteur synchronisé"}, + 'couleur_jour': {0: "Pas d'annonce", 1: "Bleu", 2: "Blanc", 3: "Rouge"}, + 'couleur_demain': {0: "Pas d'annonce", 1: "Bleu", 2: "Blanc", 3: "Rouge"}, } - # Decode mapped values - STEG_ATTRIBUTES_MAPPING = { - 'contact_sec': CONTACT_SEC, - 'etat_cache_bornes': ETAT_CACHE_BORNES, - 'mode_fonctionnement': FONCTION_PROD_CONSO, - 'sens_energie': SENS_ENERGIE, - 'Mode_horloge': HORLOGE, - 'sortie_tic': SORTIE_TIC, - 'sortie_euridis': SORTIE_EURIDIS, - 'status_cpl': STATUT_CPL, - 'synchro_cpl': SYNCHRO_CPL, - 'couleur_jour': COULEUR, - 'couleur_demain': COULEUR, - } + # Initialize the result dictionary + result = {} + + # Extract and map each attribute based on the definitions + for attr, mask, shift in ATTRIBUTE_DEFINITIONS: + # Extract the value by applying the mask and shifting + value = (stge & mask) >> shift - # Decode mapped values for applicable attributes - for attr, mapping in STEG_ATTRIBUTES_MAPPING.items(): - if attr in STEG_ATTRIBUTES and STEG_ATTRIBUTES[attr] in mapping: - STEG_ATTRIBUTES[attr] = mapping[STEG_ATTRIBUTES[attr]] + # Apply mapping if it exists, otherwise keep the raw value + result[attr] = MAPPINGS.get(attr, {}).get(value, value) - return STEG_ATTRIBUTES + return result def zlinky_sum_all_indexes(self, nwkid): diff --git a/Zigbee/zclDecoders.py b/Zigbee/zclDecoders.py index 430055492..ae529bddc 100644 --- a/Zigbee/zclDecoders.py +++ b/Zigbee/zclDecoders.py @@ -6,7 +6,6 @@ import struct -from distutils.command.build import build from os import stat from Modules.tools import (is_direction_to_client, is_direction_to_server, diff --git a/sonar-project.properties b/sonar-project.properties new file mode 100644 index 000000000..b3ddfcdb1 --- /dev/null +++ b/sonar-project.properties @@ -0,0 +1,3 @@ +sonar.issue.ignore.multicriteria=e1 +sonar.issue.ignore.multicriteria.e1.ruleKey=python:S5332 +sonar.issue.ignore.multicriteria.e1.resourceKey=**