Skip to content

Commit

Permalink
fix several issues
Browse files Browse the repository at this point in the history
  • Loading branch information
pipiche38 committed Jan 6, 2025
1 parent 0e2cc88 commit 2100d26
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 188 deletions.
106 changes: 43 additions & 63 deletions DevicesModules/custom_zlinky.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
# Initial authors: zaraki673 & pipiche38
#
# SPDX-License-Identifier: GPL-3.0 license
#

import binascii

Expand All @@ -24,42 +23,26 @@


def zlinky_clusters(self, Devices, nwkid, ep, cluster, attribut, value):
"""
Handles the processing of different ZLinky clusters based on the cluster type.
Parameters:
Devices: The list of devices to process.
nwkid: The network ID of the device.
ep: The endpoint identifier.
cluster: The cluster type (e.g., "0b01", "0702").
attribut: The attribute name being processed.
value: The value associated with the attribute.
"""
self.log.logging(
"ZLinky",
"Debug",
f"zlinky_clusters {cluster} - {nwkid}/{ep} Attribute: {attribut} Value: {value}",
nwkid
)

# Mapping clusters to their corresponding functions
cluster_handlers = {
"0b01": zlinky_meter_identification,
"0702": zlinky_cluster_metering,
"0b04": zlinky_cluster_electrical_measurement,
"ff66": zlinky_cluster_lixee_private
}

# Call the appropriate handler function if the cluster is found in the mapping
handler = cluster_handlers.get(cluster)
if handler:
handler(self, Devices, nwkid, ep, cluster, attribut, value)
self.log.logging( "ZLinky", "Debug", "zlinky_clusters %s - %s/%s Attribute: %s Value: %s" % (
cluster, nwkid, ep, attribut, value), nwkid, )

if cluster == "0b01":
zlinky_meter_identification(self, Devices, nwkid, ep, cluster, attribut, value)

elif cluster == "0702":
zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value)

elif cluster == "0b04":
zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, attribut, value)

elif cluster == "ff66":
zlinky_cluster_lixee_private(self, Devices, nwkid, ep, cluster, attribut, value)


def zlinky_meter_identification(self, Devices, nwkid, ep, cluster, attribut, value):
self.log.logging( "ZLinky", "Debug", "zlinky_meter_identification %s - %s/%s Attribute: %s Value: %s" % (
cluster, nwkid, ep, attribut, value), nwkid, )

checkAndStoreAttributeValue( self, nwkid, ep, cluster, attribut, value, )
if attribut == "000d":
# Looks like in standard mode PREF is in VA while in historique mode ISOUSC is in A
Expand All @@ -73,14 +56,14 @@ def zlinky_meter_identification(self, Devices, nwkid, ep, cluster, attribut, val
# Mode standard
store_ZLinky_infos( self, nwkid, 'PREF', value)
store_ZLinky_infos( self, nwkid, 'ISOUSC', convert_kva_to_ampere(value) )

elif attribut == "000a":
store_ZLinky_infos( self, nwkid, 'VTIC', value)

elif attribut == "000e":
store_ZLinky_infos( self, nwkid, 'PCOUP', value)


def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value):
"""
Sets the color based on the counter and tariff information for the given device.
Expand All @@ -98,12 +81,12 @@ def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribu
# Exit if the tariff is BASE or not one of the supported types
if op_tarifiare == "BASE" or op_tarifiare not in ("TEMPO", "HC.."):
return

# Get previous value to ensure there's a change
previous_value = getAttributeValue(self, nwkid, ep, cluster, attribut)
if value == 0 or previous_value == value:
return

previous_color = get_tarif_color(self, nwkid)

# Set value based on attribute and tariff type
Expand All @@ -119,7 +102,7 @@ def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribu
new_color = "HP.."
elif op_tarifiare == "TEMPO":
new_color = "BHP"

# If the tariff is not TEMPO, proceed with updating the device
if op_tarifiare != "TEMPO":
if new_color != previous_color:
Expand All @@ -143,8 +126,8 @@ def zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribu
self.log.logging( "ZLinky", "Status", "Updating ZLinky color based on Counter", nwkid)
MajDomoDevice(self, Devices, nwkid, ep, "0009", new_color, Attribute_="0020")
zlinky_color_tarif(self, nwkid, new_color)


def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value):
# Smart Energy Metering

Expand Down Expand Up @@ -180,11 +163,11 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value):
checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value)
zlinky_totalisateur(self, nwkid, attribut, value)
MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value), Attribute_=attribut)
zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value)
store_ZLinky_infos( self, nwkid, 'EASF01', value)
store_ZLinky_infos( self, nwkid, 'HCHC', value)
store_ZLinky_infos( self, nwkid, 'EJPHN', value)
store_ZLinky_infos( self, nwkid, 'BBRHCJB', value)
zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value)

elif attribut == "0102":
# HP or BBRHPJB
Expand All @@ -194,51 +177,51 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value):
checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value)
zlinky_totalisateur(self, nwkid, attribut, value)
MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value), Attribute_=attribut)
zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value)
store_ZLinky_infos( self, nwkid, 'EASF02', value)
store_ZLinky_infos( self, nwkid, 'HCHP', value)
store_ZLinky_infos( self, nwkid, 'EJPHPM', value)
store_ZLinky_infos( self, nwkid, 'BBRHCJW', value)
zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value)

elif attribut == "0104":
if value == 0:
return
checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value)
zlinky_totalisateur(self, nwkid, attribut, value)
MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(value), Attribute_=attribut)
zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value)
store_ZLinky_infos( self, nwkid, 'EASF03', value)
store_ZLinky_infos( self, nwkid, 'BBRHCJW', value)
zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value)

elif attribut == "0106":
if value == 0:
return
checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value)
zlinky_totalisateur(self, nwkid, attribut, value)
MajDomoDevice(self, Devices, nwkid, "f2", cluster, str(value), Attribute_=attribut)
zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value)
store_ZLinky_infos( self, nwkid, 'EASF04', value)
store_ZLinky_infos( self, nwkid, 'BBRHPJW', value)
zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value)

elif attribut == "0108":
if value == 0:
return
checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value)
zlinky_totalisateur(self, nwkid, attribut, value)
MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(value), Attribute_=attribut)
zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value)
store_ZLinky_infos( self, nwkid, 'EASF05', value)
store_ZLinky_infos( self, nwkid, 'BBRHCJR', value)
zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value)

elif attribut == "010a":
if value == 0:
return
checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value)
zlinky_totalisateur(self, nwkid, attribut, value)
MajDomoDevice(self, Devices, nwkid, "f3", cluster, str(value), Attribute_=attribut)
zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value)
store_ZLinky_infos( self, nwkid, 'EASF06', value)
store_ZLinky_infos( self, nwkid, 'BBRHPJR', value)
zlinky_set_color_based_on_counter(self, Devices, nwkid, ep, cluster, attribut, value)

elif attribut == "010c":
if value == 0:
Expand Down Expand Up @@ -267,10 +250,9 @@ def zlinky_cluster_metering(self, Devices, nwkid, ep, cluster, attribut, value):

elif attribut == "0307": # PRM
store_ZLinky_infos( self, nwkid, 'PRM', value)

elif attribut == "0308": # Serial Number
self.log.logging( "ZLinky", "Debug", "Cluster0702 - 0x0308 - Serial Number %s" % (value), nwkid, )

checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value)
store_ZLinky_infos( self, nwkid, 'ADC0', value)
store_ZLinky_infos( self, nwkid, 'ADSC', value)
Expand All @@ -286,23 +268,22 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att

elif attribut == "050e":
store_ZLinky_infos( self, nwkid, 'ERQ2', value)

elif attribut == "090e":
store_ZLinky_infos( self, nwkid, 'ERQ3', value)

elif attribut == "0a0e":
store_ZLinky_infos( self, nwkid, 'ERQ4', value)

elif attribut == "050b": # Active Power

self.log.logging("Cluster", "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s Power %s" % (cluster, nwkid, ep, value))
checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value)
MajDomoDevice(self, Devices, nwkid, ep, cluster, str(value))
store_ZLinky_infos( self, nwkid, 'CCASN', value)

elif attribut == "090b":
store_ZLinky_infos( self, nwkid, 'CCASN-1',value)

elif attribut in ("0505", "0905", "0a05"): # RMS Voltage
self.log.logging("Cluster", "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s Voltage %s" % (cluster, nwkid, ep, value))
if value == 0xFFFF:
Expand Down Expand Up @@ -354,13 +335,13 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att
if value == 0x8000:
return
checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value)

_linkyMode = linky_mode( self, nwkid, protocol=True )

if _linkyMode in ( 0, 2,) and attribut == "050d":
# Historique Tri
store_ZLinky_infos( self, nwkid, 'PMAX', value)

elif _linkyMode in ( 1, 5, ) and attribut == "050d":
# Historic Mono
store_ZLinky_infos( self, nwkid, 'SMAXN', value)
Expand Down Expand Up @@ -402,18 +383,17 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att
self.log.logging( "ZLinky", "Error", "=====> zlinky_cluster_electrical_measurement %s - %s/%s Apparent Power %s out of range !!!" % (cluster, nwkid, ep, value), nwkid, )
return
checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value)

self.log.logging( "ZLinky", "Debug", "=====> zlinky_cluster_electrical_measurement %s - %s/%s Apparent Power %s" % (cluster, nwkid, ep, value), nwkid, )
# ApparentPower (Represents the single phase or Phase A, current demand of apparent (Square root of active and reactive power) power, in VA.)

self.log.logging( "ZLinky", "Debug", "=====> zlinky_cluster_electrical_measurement %s - %s/%s Apparent Power %s" % (cluster, nwkid, ep, value), nwkid, )

_linkyMode = linky_mode( self, nwkid, protocol=True )

if _linkyMode in ( 0, 2,) and attribut == "050f":
# Historique Tri
store_ZLinky_infos( self, nwkid, 'PAPP', value)

elif _linkyMode in ( 1, 5, ) and attribut == "050f":
# Historic Mono
store_ZLinky_infos( self, nwkid, 'SINSTS', value)
Expand All @@ -431,7 +411,7 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att
self.log.logging( "ZLinky", "Error", "=====> zlinky_cluster_electrical_measurement %s - %s/%s Unexpected %s/%s linkyMode: %s" % (
cluster, nwkid, ep, attribut, value, _linkyMode ), nwkid, )
return

tarif_color = None
if "ZLinky" in self.ListOfDevices[nwkid] and "Color" in self.ListOfDevices[nwkid]["ZLinky"]:
tarif_color = self.ListOfDevices[nwkid]["ZLinky"]["Color"]
Expand Down Expand Up @@ -460,7 +440,7 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att

elif attribut in ( "0a0f", ):
store_ZLinky_infos( self, nwkid, 'SINSTS3', value)

elif attribut in ("0908", "0a08"): # Current Phase 2 and Current Phase 3
# from random import randrange
# value = randrange( 0x0, 0x3c)
Expand All @@ -478,7 +458,7 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att
self.log.logging("Cluster", "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s %s Current L3 %s" % (cluster, nwkid, ep, attribut, value), nwkid)
MajDomoDevice( self, Devices, nwkid, "f3", "0009", zlinky_check_alarm(self, Devices, nwkid, ep, value), Attribute_="0005", )
store_ZLinky_infos( self, nwkid, 'IRMS3', value)

checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value)

elif attribut == "0511":
Expand All @@ -489,7 +469,7 @@ def zlinky_cluster_electrical_measurement(self, Devices, nwkid, ep, cluster, att

elif attribut == "0a11":
store_ZLinky_infos( self, nwkid, 'UMOY3', value)


def zlinky_cluster_lixee_private(self, Devices, nwkid, ep, cluster, attribut, value):
if nwkid not in self.ListOfDevices:
Expand Down
Loading

0 comments on commit 2100d26

Please sign in to comment.