diff --git a/DevicesModules/custom_zlinky.py b/DevicesModules/custom_zlinky.py index ac01c0ae3..605e5bb6d 100644 --- a/DevicesModules/custom_zlinky.py +++ b/DevicesModules/custom_zlinky.py @@ -9,7 +9,6 @@ # Initial authors: zaraki673 & pipiche38 # # SPDX-License-Identifier: GPL-3.0 license -# import binascii @@ -24,42 +23,26 @@ def zlinky_clusters(self, Devices, nwkid, ep, cluster, attribut, value): - """ - Handles the processing of different ZLinky clusters based on the cluster type. - - Parameters: - Devices: The list of devices to process. - nwkid: The network ID of the device. - ep: The endpoint identifier. - cluster: The cluster type (e.g., "0b01", "0702"). - attribut: The attribute name being processed. - value: The value associated with the attribute. - """ - self.log.logging( - "ZLinky", - "Debug", - f"zlinky_clusters {cluster} - {nwkid}/{ep} Attribute: {attribut} Value: {value}", - nwkid - ) - - # Mapping clusters to their corresponding functions - cluster_handlers = { - "0b01": zlinky_meter_identification, - "0702": zlinky_cluster_metering, - "0b04": zlinky_cluster_electrical_measurement, - "ff66": zlinky_cluster_lixee_private - } - - # Call the appropriate handler function if the cluster is found in the mapping - handler = cluster_handlers.get(cluster) - if handler: - handler(self, Devices, nwkid, ep, cluster, attribut, value) + self.log.logging( "ZLinky", "Debug", "zlinky_clusters %s - %s/%s Attribute: %s Value: %s" % ( + cluster, nwkid, ep, attribut, value), nwkid, ) + + if cluster == "0b01": + zlinky_meter_identification(self, Devices, nwkid, ep, cluster, attribut, value) + + elif cluster == "0702": + zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value) + + elif cluster == "0b04": + zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, attribut, value) + + elif cluster == "ff66": + zlinky_cluster_lixee_private(self, Devices, nwkid, ep, cluster, attribut, value) def zlinky_meter_identification(self, Devices, nwkid, ep, cluster, attribut, value): self.log.logging( "ZLinky", "Debug", "zlinky_meter_identification %s - %s/%s Attribute: %s Value: %s" % ( cluster, nwkid, ep, attribut, value), nwkid, ) - + checkAndStoreAttributeValue( self, nwkid, ep, cluster, attribut, value, ) if attribut == "000d": # Looks like in standard mode PREF is in VA while in historique mode ISOUSC is in A @@ -73,14 +56,14 @@ def zlinky_meter_identification(self, Devices, nwkid, ep, cluster, attribut, val # Mode standard store_ZLinky_infos( self, nwkid, 'PREF', value) store_ZLinky_infos( self, nwkid, 'ISOUSC', convert_kva_to_ampere(value) ) - + elif attribut == "000a": store_ZLinky_infos( self, nwkid, 'VTIC', value) - + elif attribut == "000e": store_ZLinky_infos( self, nwkid, 'PCOUP', value) - + def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value): """ Sets the color based on the counter and tariff information for the given device. @@ -98,12 +81,12 @@ def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribu # Exit if the tariff is BASE or not one of the supported types if op_tarifiare == "BASE" or op_tarifiare not in ("TEMPO", "HC.."): return - + # Get previous value to ensure there's a change previous_value = getAttributeValue(self, nwkid, ep, cluster, attribut) if value == 0 or previous_value == value: return - + previous_color = get_tarif_color(self, nwkid) # Set value based on attribute and tariff type @@ -119,7 +102,7 @@ def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribu new_color = "HP.." elif op_tarifiare == "TEMPO": new_color = "BHP" - + # If the tariff is not TEMPO, proceed with updating the device if op_tarifiare != "TEMPO": if new_color != previous_color: @@ -143,8 +126,8 @@ def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribu self.log.logging( "ZLinky", "Status", "Updating ZLinky color based on Counter", nwkid) MajDomoDevice(self, Devices, nwkid, ep, "0009", new_color, Attribute_="0020") zlinky_color_tarif(self, nwkid, new_color) - - + + def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): # Smart Energy Metering @@ -180,11 +163,11 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) zlinky_totalisateur(self, nwkid, attribut, value) MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value), Attribute_=attribut) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'EASF01', value) store_ZLinky_infos( self, nwkid, 'HCHC', value) store_ZLinky_infos( self, nwkid, 'EJPHN', value) store_ZLinky_infos( self, nwkid, 'BBRHCJB', value) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) elif attribut == "0102": # HP or BBRHPJB @@ -194,11 +177,11 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) zlinky_totalisateur(self, nwkid, attribut, value) MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value), Attribute_=attribut) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'EASF02', value) store_ZLinky_infos( self, nwkid, 'HCHP', value) store_ZLinky_infos( self, nwkid, 'EJPHPM', value) store_ZLinky_infos( self, nwkid, 'BBRHCJW', value) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) elif attribut == "0104": if value == 0: @@ -206,9 +189,9 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) zlinky_totalisateur(self, nwkid, attribut, value) MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(value), Attribute_=attribut) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'EASF03', value) store_ZLinky_infos( self, nwkid, 'BBRHCJW', value) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) elif attribut == "0106": if value == 0: @@ -216,9 +199,9 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) zlinky_totalisateur(self, nwkid, attribut, value) MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(value), Attribute_=attribut) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'EASF04', value) store_ZLinky_infos( self, nwkid, 'BBRHPJW', value) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) elif attribut == "0108": if value == 0: @@ -226,9 +209,9 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) zlinky_totalisateur(self, nwkid, attribut, value) MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(value), Attribute_=attribut) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'EASF05', value) store_ZLinky_infos( self, nwkid, 'BBRHCJR', value) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) elif attribut == "010a": if value == 0: @@ -236,9 +219,9 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) zlinky_totalisateur(self, nwkid, attribut, value) MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(value), Attribute_=attribut) + zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'EASF06', value) store_ZLinky_infos( self, nwkid, 'BBRHPJR', value) - zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value) elif attribut == "010c": if value == 0: @@ -267,10 +250,9 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value): elif attribut == "0307": # PRM store_ZLinky_infos( self, nwkid, 'PRM', value) - + elif attribut == "0308": # Serial Number self.log.logging( "ZLinky", "Debug", "Cluster0702 - 0x0308 - Serial Number %s" % (value), nwkid, ) - checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) store_ZLinky_infos( self, nwkid, 'ADC0', value) store_ZLinky_infos( self, nwkid, 'ADSC', value) @@ -286,15 +268,14 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att elif attribut == "050e": store_ZLinky_infos( self, nwkid, 'ERQ2', value) - + elif attribut == "090e": store_ZLinky_infos( self, nwkid, 'ERQ3', value) elif attribut == "0a0e": store_ZLinky_infos( self, nwkid, 'ERQ4', value) - + elif attribut == "050b": # Active Power - self.log.logging("Cluster", "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s Power %s" % (cluster, nwkid, ep, value)) checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value)) @@ -302,7 +283,7 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att elif attribut == "090b": store_ZLinky_infos( self, nwkid, 'CCASN-1',value) - + elif attribut in ("0505", "0905", "0a05"): # RMS Voltage self.log.logging("Cluster", "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s Voltage %s" % (cluster, nwkid, ep, value)) if value == 0xFFFF: @@ -354,13 +335,13 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att if value == 0x8000: return checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - + _linkyMode = linky_mode( self, nwkid, protocol=True ) - + if _linkyMode in ( 0, 2,) and attribut == "050d": # Historique Tri store_ZLinky_infos( self, nwkid, 'PMAX', value) - + elif _linkyMode in ( 1, 5, ) and attribut == "050d": # Historic Mono store_ZLinky_infos( self, nwkid, 'SMAXN', value) @@ -402,18 +383,17 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att self.log.logging( "ZLinky", "Error", "=====> zlinky_cluster_electrical_measurement %s - %s/%s Apparent Power %s out of range !!!" % (cluster, nwkid, ep, value), nwkid, ) return checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) - + self.log.logging( "ZLinky", "Debug", "=====> zlinky_cluster_electrical_measurement %s - %s/%s Apparent Power %s" % (cluster, nwkid, ep, value), nwkid, ) # ApparentPower (Represents the single phase or Phase A, current demand of apparent (Square root of active and reactive power) power, in VA.) self.log.logging( "ZLinky", "Debug", "=====> zlinky_cluster_electrical_measurement %s - %s/%s Apparent Power %s" % (cluster, nwkid, ep, value), nwkid, ) - _linkyMode = linky_mode( self, nwkid, protocol=True ) - + if _linkyMode in ( 0, 2,) and attribut == "050f": # Historique Tri store_ZLinky_infos( self, nwkid, 'PAPP', value) - + elif _linkyMode in ( 1, 5, ) and attribut == "050f": # Historic Mono store_ZLinky_infos( self, nwkid, 'SINSTS', value) @@ -431,7 +411,7 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att self.log.logging( "ZLinky", "Error", "=====> zlinky_cluster_electrical_measurement %s - %s/%s Unexpected %s/%s linkyMode: %s" % ( cluster, nwkid, ep, attribut, value, _linkyMode ), nwkid, ) return - + tarif_color = None if "ZLinky" in self.ListOfDevices[nwkid] and "Color" in self.ListOfDevices[nwkid]["ZLinky"]: tarif_color = self.ListOfDevices[nwkid]["ZLinky"]["Color"] @@ -460,7 +440,7 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att elif attribut in ( "0a0f", ): store_ZLinky_infos( self, nwkid, 'SINSTS3', value) - + elif attribut in ("0908", "0a08"): # Current Phase 2 and Current Phase 3 # from random import randrange # value = randrange( 0x0, 0x3c) @@ -478,7 +458,7 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att self.log.logging("Cluster", "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s %s Current L3 %s" % (cluster, nwkid, ep, attribut, value), nwkid) MajDomoDevice( self, Devices, nwkid, "f3", "0009", zlinky_check_alarm(self, Devices, nwkid, ep, value), Attribute_="0005", ) store_ZLinky_infos( self, nwkid, 'IRMS3', value) - + checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) elif attribut == "0511": @@ -489,7 +469,7 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att elif attribut == "0a11": store_ZLinky_infos( self, nwkid, 'UMOY3', value) - + def zlinky_cluster_lixee_private(self, Devices, nwkid, ep, cluster, attribut, value): if nwkid not in self.ListOfDevices: diff --git a/Modules/zlinky.py b/Modules/zlinky.py index 94084cc56..e0751aa94 100644 --- a/Modules/zlinky.py +++ b/Modules/zlinky.py @@ -52,7 +52,6 @@ "ZLinky_TIC-standard-tri": (), "ZLinky_TIC-standard-tri-prod": (), } - ZLinky_TIC_COMMAND = { # Mode Historique "0000": "OPTARIF", @@ -100,28 +99,19 @@ "0300": "PROTOCOL Linky" } - -def convert_kva_to_ampere(kva: float) -> float: - """ - Converts kilovolt-amperes (kVA) to amperes (A) assuming a voltage of 200V. - - Parameters: - kva (float): The value in kilovolt-amperes to convert. - - Returns: - float: The equivalent value in amperes. - """ - VOLTAGE = 200 # Assumed voltage in volts - return (kva * 1000) / VOLTAGE - +def convert_kva_to_ampere( kva ): + return ( kva * 1000) / 200 def zlinky_color_tarif(self, MsgSrcAddr, color): - self.ListOfDevices.setdefault(MsgSrcAddr, {}).setdefault("ZLinky", {})["Color"] = color - + if "ZLinky" not in self.ListOfDevices[MsgSrcAddr]: + self.ListOfDevices[MsgSrcAddr]["ZLinky"] = {} + self.ListOfDevices[MsgSrcAddr]["ZLinky"]["Color"] = color -def store_ZLinky_infos(self, nwkid, command_tic, value): - self.ListOfDevices.setdefault(nwkid, {}).setdefault('ZLinky', {})[command_tic] = value +def store_ZLinky_infos( self, nwkid, command_tic, value): + if 'ZLinky' not in self.ListOfDevices[ nwkid ]: + self.ListOfDevices[ nwkid ][ 'ZLinky' ] = {} + self.ListOfDevices[ nwkid ][ 'ZLinky' ][ command_tic ] = value def get_ISOUSC( self, nwkid ): @@ -155,53 +145,23 @@ def get_ISOUSC( self, nwkid ): return 0 +def get_OPTARIF( self, nwkid): -def get_OPTARIF(self, nwkid): - """ - Retrieves the OPTARIF value for a given network ID (nwkid). - If not found, defaults to "BASE". - - Parameters: - nwkid: The network ID to retrieve the value for. - - Returns: - The OPTARIF value as a string or "BASE" if not found. - """ - return self.ListOfDevices.get(nwkid, {}).get("ZLinky", {}).get("OPTARIF", "BASE") + if ( + "ZLinky" in self.ListOfDevices[nwkid] + and "OPTARIF" in self.ListOfDevices[nwkid]["ZLinky"] + ): + return self.ListOfDevices[nwkid]["ZLinky"]["OPTARIF"] + return "BASE" -def get_instant_power(self, nwkid): - """ - Retrieves the instant power value for a given network ID (nwkid). - If not available, returns 0. - - Parameters: - nwkid: The network ID to retrieve the value for. - - Returns: - The instant power value as a rounded float, or 0 if not found. - """ - try: - value = self.ListOfDevices[nwkid]["Ep"]["01"]["0b04"]["050f"] - return round(float(value), 2) - except (KeyError, TypeError, ValueError): - return 0 +def get_instant_power( self, nwkid ): + return round(float(self.ListOfDevices[nwkid]["Ep"]["01"]["0b04"]["050f"]), 2) if "0b04" in self.ListOfDevices[nwkid]["Ep"]["01"] and "050f" in self.ListOfDevices[nwkid]["Ep"]["01"]["0b04"] else 0 +def get_tarif_color( self, nwkid ): + return self.ListOfDevices[nwkid]["ZLinky"]["Color"] if "ZLinky" in self.ListOfDevices[nwkid] and "Color" in self.ListOfDevices[nwkid]["ZLinky"] else None -def get_tarif_color(self, nwkid): - """ - Retrieves the tarif color for a given network ID (nwkid). - If not found, returns None. - Parameters: - nwkid: The network ID to retrieve the color for. - - Returns: - The color value, or None if not found. - """ - return self.ListOfDevices.get(nwkid, {}).get("ZLinky", {}).get("Color") - - def zlinky_check_alarm(self, Devices, MsgSrcAddr, MsgSrcEp, value): if value == 0: @@ -229,93 +189,55 @@ def zlinky_check_alarm(self, Devices, MsgSrcAddr, MsgSrcEp, value): return "00|Normal" -def linky_mode(self, nwkid, protocol=False): - """ - Retrieves the Linky mode for a given network ID (nwkid). - If not available, returns None. If `protocol` is True, returns the protocol value instead. - - Parameters: - nwkid: The network ID to retrieve the mode for. - protocol: A boolean flag to return the protocol value instead of the mode (default is False). - - Returns: - The Linky mode or protocol, or None if not found. - """ - device = self.ListOfDevices.get(nwkid, {}) - zlinky = device.get('ZLinky', {}) + +def linky_mode( self, nwkid , protocol=False): - # Return the protocol value if requested - if protocol: - return zlinky.get('PROTOCOL Linky') + if 'ZLinky' not in self.ListOfDevices[ nwkid ]: + return None - # Return mode if the protocol is in ZLINKY_MODE - protocol_linky = zlinky.get('PROTOCOL Linky') - if protocol_linky in ZLINKY_MODE: - return ZLINKY_MODE[protocol_linky].get("Mode") + if 'PROTOCOL Linky' not in self.ListOfDevices[ nwkid ]['ZLinky']: + return get_linky_mode_from_ep(self, nwkid ) - # Fallback to getting mode from Ep - return get_linky_mode_from_ep(self, nwkid) if protocol_linky is None else None + if self.ListOfDevices[ nwkid ]['ZLinky']['PROTOCOL Linky'] in ZLINKY_MODE and not protocol: + return ZLINKY_MODE[ self.ListOfDevices[ nwkid ]['ZLinky']['PROTOCOL Linky'] ]["Mode"] + elif protocol: + return self.ListOfDevices[ nwkid ]['ZLinky']['PROTOCOL Linky'] + + return None def get_linky_mode_from_ep(self, nwkid): - """ - Retrieves the Linky mode from the "Ep" section of a given network ID (nwkid). - If the mode is found in ZLINKY_MODE, it returns the mode. Otherwise, returns None. - - Parameters: - nwkid: The network ID to retrieve the mode from. - - Returns: - The Linky mode if found in ZLINKY_MODE, or None if not found. - """ ep = self.ListOfDevices.get(nwkid, {}).get("Ep", {}).get("01", {}).get("ff66", {}).get("0300") - return ZLINKY_MODE.get(ep) if ep in ZLINKY_MODE else None + return ep if ep in ZLINKY_MODE else None def linky_device_conf(self, nwkid): - """ - Retrieves the configuration for a Linky device based on its protocol. - If the protocol is not found, it attempts to get the mode from the Ep section. - - Parameters: - nwkid: The network ID of the device. - - Returns: - The configuration value for the Linky device or "ZLinky_TIC" if not found. - """ device = self.ListOfDevices.get(nwkid, {}) zlinky_info = device.get('ZLinky', {}) protocol_linky = zlinky_info.get('PROTOCOL Linky') - # If protocol_linky is not present, try to retrieve the mode from the Ep section if not protocol_linky: mode = get_linky_mode_from_ep(self, nwkid) if mode: self.log.logging("Cluster", "Status", f"linky_device_conf {nwkid} found 0xff66/0x0300: {mode}") zlinky_info['PROTOCOL Linky'] = mode - return ZLINKY_MODE.get(mode, {}).get("Conf", "ZLinky_TIC") + return ZLINKY_MODE[mode]["Conf"] + else: + return "ZLinky_TIC" - # If the protocol is in ZLINKY_MODE, return its configuration - if protocol_linky in ZLINKY_MODE: - self.log.logging("Cluster", "Debug", f"linky_device_conf {nwkid} found Protocol Linky: {protocol_linky}") - return ZLINKY_MODE[protocol_linky].get("Conf", "ZLinky_TIC") - - return "ZLinky_TIC" + if protocol_linky not in ZLINKY_MODE: + return "ZLinky_TIC" + + self.log.logging("Cluster", "Debug", f"linky_device_conf {nwkid} found Protocol Linky: {protocol_linky}") + return ZLINKY_MODE[protocol_linky]["Conf"] -def linky_upgrade_authorized(current_model, new_model): - """ - Checks if an upgrade from the current model to the new model is authorized. - - Parameters: - current_model: The model currently in use. - new_model: The model to upgrade to. - - Returns: - True if the upgrade is authorized, otherwise False. - """ - return ZLINKY_UPGRADE_PATHS.get(current_model, {}).get(new_model, False) +def linky_upgrade_authorized( current_model, new_model ): + return ( + current_model in ZLINKY_UPGRADE_PATHS + and new_model in ZLINKY_UPGRADE_PATHS[current_model] + ) def update_zlinky_device_model_if_needed( self, nwkid ): @@ -356,7 +278,6 @@ def update_zlinky_device_model_if_needed( self, nwkid ): if "Heartbeat" in self.ListOfDevices[nwkid]: self.ListOfDevices[nwkid]["Heartbeat"] = "-1" - CONTACT_SEC = { 0: "fermé", 1: "ouvert"