Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
pipiche38 committed Nov 25, 2023
1 parent 1f85b79 commit 6852052
Showing 1 changed file with 93 additions and 19 deletions.
112 changes: 93 additions & 19 deletions Classes/OTA.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
from os.path import exists, isfile, join
from pathlib import Path

from Classes.AdminWidgets import AdminWidgets
from Classes.LoggingManagement import LoggingManagement
from Modules.sendZigateCommand import sendZigateCmd
from Modules.tools import get_device_nickname
from Modules.zigateConsts import ADDRESS_MODE, ZIGATE_EP
Expand Down Expand Up @@ -71,6 +69,59 @@


class OTAManagement(object):
"""
OTAManagement class for managing Over-The-Air (OTA) firmware updates.
Args:
zigbee_communitation: The Zigbee communication object.
PluginConf: The plugin configuration object.
DeviceConf: The device configuration object.
adminWidgets: The admin widgets object.
ZigateComm: The Zigate communication object.
HomeDirectory: The home directory path.
hardwareID: The hardware ID.
Devices: The list of Domoticz devices.
ListOfDevices: The global list of devices.
IEEE2NWK: The list of IEEE to NWKID mappings.
log: The logging object.
PluginHealth: The plugin health object.
readZclClusters: The ZCL clusters reader object.
Attributes:
zigbee_communication: The Zigbee communication object.
HB: The heartbeat value.
ListOfDevices: The global list of devices.
IEEE2NWK: The list of IEEE to NWKID mappings.
Devices: The list of Domoticz devices.
DeviceConf: The device configuration object.
adminWidgets: The admin widgets object.
ControllerLink: The Zigate communication object.
pluginconf: The plugin configuration object.
homeDirectory: The home directory path.
log: The logging object.
PluginHealth: The plugin health object.
readZclClusters: The ZCL clusters reader object.
ListOfImages: The list of available firmware loaded at plugin startup.
ImageLoaded: The dictionary containing information about the loaded firmware image.
ListInUpdate: The dictionary containing information about the firmware update in progress.
AuthorizedForDowngrade: The dictionary containing information about devices authorized for downgrade.
zigbee_ota_index: The Zigbee OTA index.
zigbee_ota_found_in_index: The list of Zigbee OTA firmware found in the index.
once: Flag indicating if the OTA process has started.
Methods:
_reset_ota_state: Reset the OTA update state.
cancel_current_firmware_update: Cancel the current firmware update.
ota_image_block_request: Handle the OTA image block request.
ota_image_page_request: Handle the OTA image page request.
ota_upgrade_end_request: Handle the OTA upgrade end request.
heartbeat: Perform the OTA heartbeat.
restapi_list_of_firmware: Get the list of available firmware.
restapi_firmware_update: Perform the firmware update.
query_next_image_request: Handle the OTA query next image request.
"""


def __init__(
self,
zigbee_communitation,
Expand All @@ -86,7 +137,9 @@ def __init__(
log,
PluginHealth,
readZclClusters
):
):

# Pointers to external objects
self.zigbee_communication = zigbee_communitation
self.HB = 0
self.ListOfDevices = ListOfDevices # Point to the Global ListOfDevices
Expand All @@ -101,9 +154,10 @@ def __init__(
self.PluginHealth = PluginHealth
self.readZclClusters = readZclClusters

# Properties for firmware/image management
self.ListOfImages = {} # List of available firmware loaded at plugin startup

self.ImageLoaded = { # Indicates information of the current Firmware/Image loaded on ZiGate
self.ImageLoaded = {
"ImageVersion": None,
"image_type": None,
"manufacturer_code": None,
Expand All @@ -126,14 +180,18 @@ def __init__(
"AuthorizedForUpdate": [],
"Retry": 0,
}

self.AuthorizedForDowngrade = {}
self.zigbee_ota_index = None
self.zigbee_ota_found_in_index = []
self.once = True

# Load Zigbee OTA index and scan the folder
loading_zigbee_ota_index( self )
logging(self, "Debug", "zigbee_ota_index: %s" %self.zigbee_ota_index)
ota_scan_folder(self)


def cancel_current_firmware_update(self):
self.ListInUpdate["NwkId"] = None
self.ListInUpdate["Status"] = None
Expand All @@ -143,10 +201,10 @@ def cancel_current_firmware_update(self):
self.ImageLoaded["LoadedTimeStamp"] = 0
self.ListInUpdate["Process"] = None


def ota_image_block_request(self, MsgData): # OK 13/10
# ota_image_block_request(self, Devices, MsgData, MsgLQI): # OTA image block request
# BLOCK_REQUEST 0x8501 ZiGate will receive this command when device asks OTA firmware
logging(self, "Debug", "ota_image_block_request - Request Firmware Block (%s) %s" % (len(MsgData), MsgData))

if len(MsgData) not in ( 60 , 62):
logging(self, "Debug", "ota_image_block_request - Incorrect lenght (%s) %s" % (len(MsgData), MsgData))
Expand Down Expand Up @@ -181,7 +239,6 @@ def ota_image_block_request(self, MsgData): # OK 13/10
prepare_and_send_block(self, MsgSrcAddr, MsgEP, MsgFileOffset, intMsgImageVersion, intMsgImageType, intMsgManufCode, MsgBlockRequestDelay, MsgMaxDataSize, intMsgFieldControl, MsgSQN, )



def ota_image_page_request( self, MsgData ):
MsgSQN = MsgData[:2]
MsgEP = MsgData[2:4]
Expand Down Expand Up @@ -338,7 +395,6 @@ def heartbeat(self):
_handle_timeout(self)



def restapi_list_of_firmware(self):
brand = {}
for x in self.ListOfImages["Brands"]:
Expand All @@ -357,6 +413,7 @@ def restapi_list_of_firmware(self):
brand[x].append(image)
return [brand]


def restapi_firmware_update(self, data): #

if len(data) > 1:
Expand All @@ -372,6 +429,7 @@ def restapi_firmware_update(self, data): #
if force_update:
self.AuthorizedForDowngrade[ target_nwkid ] = True


def query_next_image_request(self, srcnwkid, srcep, Sqn, Data):
# This is a Client -> Server (direction set to 0x00)
# The server takes the client’s information in the command and determines whether it has a suitable image for the particular client.
Expand Down Expand Up @@ -429,12 +487,13 @@ def query_next_image_request(self, srcnwkid, srcep, Sqn, Data):
zcl_raw_ota_query_next_image_response(self, Sqn, srcnwkid, ZIGATE_EP, srcep, '98')


# Routines sending Data
# Local Routines and other helpers
def _handle_ota_timeout(self):
logging(self, "Error", "Ota timed out on NwkId: %s for block: %s" % (
self.ListInUpdate["NwkId"], self.ListInUpdate["intFileOffset"]))
_reset_ota_state(self)


def _retry_notification(self):
self.ListInUpdate["Retry"] += 1
logging(self, "Log", "Ota retries notifying device %s" % self.ListInUpdate["NwkId"])
Expand All @@ -444,10 +503,12 @@ def _retry_notification(self):
self.ImageLoaded["image_type"],
self.ImageLoaded["manufacturer_code"])


def _handle_timeout(self):
logging(self, "Error", "Ota detects Timeout while notifying device %s" % self.ListInUpdate["NwkId"])
_reset_ota_state(self)


def _reset_ota_state(self):
if self.ListInUpdate["NwkId"] in self.ListInUpdate["AuthorizedForUpdate"]:
self.ListInUpdate["AuthorizedForUpdate"].remove(self.ListInUpdate["NwkId"])
Expand All @@ -459,6 +520,7 @@ def _reset_ota_state(self):
self.ImageLoaded["NotifiedTimeStamp"] = 0
self.ListInUpdate["Process"] = None


def ota_load_image_to_zigate(self, image_type, force_version=None):
# Load the image headers into Zigate

Expand All @@ -485,12 +547,15 @@ def ota_load_image_to_zigate(self, image_type, force_version=None):

_update_image_loaded_info(self, decoded_header, force_version)


def _log_debug_unknown_image_type(self, image_type):
logging(self, "Debug", f"ota_load_image_to_zigate - Unknown Image {image_type} in {list(self.ListOfImages['ImageType'].keys())}")


def _log_debug_image_not_found(self, image_type, brand):
logging(self, "Debug", f"ota_load_image_to_zigate - Image {image_type} not found in {list(self.ListOfImages['Brands'][brand].keys())}")


def _format_image_data(self, decoded_header, force_version):
return (
f"{ADDRESS_MODE['short']:02x}0000"
Expand All @@ -500,9 +565,11 @@ def _format_image_data(self, decoded_header, force_version):
f"{decoded_header['security_cred_version']} {decoded_header['upgrade_file_dest']} {decoded_header['min_hw_version']} {decoded_header['max_hw_version']}"
)


def _is_controller_in_raw_mode(self):
return "ControllerInRawMode" in self.pluginconf.pluginConf and self.pluginconf.pluginConf["ControllerInRawMode"]


def _update_image_loaded_info(self, decoded_header, force_version):
self.ImageLoaded["ImageVersion"] = force_version or decoded_header['image_version']
self.ImageLoaded["image_type"] = decoded_header['image_type']
Expand All @@ -518,21 +585,23 @@ def build_ota_data_block(self, block_request, max_data_size):

return sequence, offset, length, raw_ota_data


def build_ota_message(self, dest_addr, dest_ep, sequence, status, offset, image_version, image_type, manufacturer_code, length, raw_ota_data):
data = "02" + dest_addr + ZIGATE_EP + dest_ep
data += f"{sequence:02x}{status:02x}{offset:08x}{image_version}{image_type}{manufacturer_code}{length:02x}"
data += "".join(f"{i:02x}" for i in raw_ota_data)

return data


def update_list_in_update(self, offset, length):
self.ListInUpdate["TimeStamps"] = time.time()
self.ListInUpdate["Status"] = "Transfer Progress"
self.ListInUpdate["Received"] = offset
self.ListInUpdate["Sent"] = offset + length


def ota_send_block(self, dest_addr, dest_ep, image_type, msg_image_version, block_request, disable_ack=False):
logging(self, "Debug", f"ota_send_block - Addr: {dest_addr}/{dest_ep} Type: 0x{image_type}")

if image_type not in self.ListOfImages["ImageType"]:
logging(self, "Error", f"ota_send_block - unknown image_type {image_type}")
Expand Down Expand Up @@ -565,6 +634,7 @@ def ota_send_block(self, dest_addr, dest_ep, image_type, msg_image_version, bloc

self.ControllerLink.sendData("0502", data, ackIsDisabled=False, NwkId=dest_addr)


def ota_image_advertize(self, dest_addr, dest_ep, image_version, image_type=0xFFFF, manufacturer_code=0xFFFF):
# 'IMAGE_NOTIFY 0x0505 Notify desired device that ota is available. After loading headers use this.'
# The 'query jitter' mechanism can be used to prevent a flood of replies to an Image Notify broadcast
Expand Down Expand Up @@ -696,8 +766,6 @@ def ota_management(self, MsgSrcAddr, MsgEP, delay=500):
self.ControllerLink.sendData("0506", datas, ackIsDisabled=False, NwkId=MsgSrcAddr)


################
# Local routines
def cleanup_after_completed_upgrade(self, NwkId, Status):
# Cleanup
logging(self, "Debug", "cleanup_after_completed_upgrade - Cleanup and house keeping %s %s" % (NwkId, Status))
Expand Down Expand Up @@ -948,6 +1016,7 @@ def ota_extract_image_headers(self, subfolder, image): # OK 13/10

return headers["image_type"], headers, ota_image


def _open_image_file(self, filename): # OK 13/10
try:
with open(filename, "rb") as file:
Expand All @@ -960,6 +1029,7 @@ def _open_image_file(self, filename): # OK 13/10
return None
return ota_image


def offset_start_firmware(self, ota_image): # OK 13/10
# Search for the OTA Upgrade File Identifier ( “0x0BEEF11E” )
offset = None
Expand All @@ -972,6 +1042,7 @@ def offset_start_firmware(self, ota_image): # OK 13/10
None,
)


def unpack_headers(self, ota_image): # OK 13/10
try:
header_data = list(struct.unpack("<LHHHHHLH32BLBQHH", ota_image[:69]))
Expand Down Expand Up @@ -1003,6 +1074,7 @@ def unpack_headers(self, ota_image): # OK 13/10

return dict(zip(header_headers, header_data_compact))


def prepare_and_send_block(self, MsgSrcAddr, MsgEP, MsgFileOffset, intMsgImageVersion, intMsgImageType, intMsgManufCode, MsgBlockRequestDelay, MsgMaxDataSize, intMsgFieldControl, MsgSQN, disableACK=False):
self.ListInUpdate["Retry"] = 0

Expand All @@ -1013,15 +1085,15 @@ def prepare_and_send_block(self, MsgSrcAddr, MsgEP, MsgFileOffset, intMsgImageVe

if intMsgImageType not in self.ListOfImages["ImageType"]:
# Image Type unknown or not loaded
logging( self, "Error", "ota_image_block_request %s/%s - 0x%04x image not found" % (MsgSrcAddr, MsgEP, intMsgImageType), )
logging( self, "Error", "prepare_and_send_block %s/%s - 0x%04x image not found" % (MsgSrcAddr, MsgEP, intMsgImageType), )
return

if self.ListInUpdate["NwkId"] and intMsgImageType != self.ListInUpdate["intImageType"] and MsgSrcAddr != self.ListInUpdate["NwkId"]:
# Request which do not belongs to the current upgrade
logging( self, "Error", "ota_image_block_request %s/%s - request update while an other is in progress %s " % (MsgSrcAddr, MsgEP, self.ListInUpdate["NwkId"]), )
logging( self, "Error", "prepare_and_send_block %s/%s - request update while an other is in progress %s " % (MsgSrcAddr, MsgEP, self.ListInUpdate["NwkId"]), )
return

logging( self, "Debug", "ota_image_block_request - [%3s] OTA image Block request - %s/%s Offset: %s version: 0x%08X Type: 0%04X Code: 0x%04X Delay: %s MaxSize: %s Control: 0x%02X" % (
logging( self, "Debug", "prepare_and_send_block - [%3s] request - %s/%s Offset: %s version: 0x%08X Type: 0%04X Code: 0x%04X Delay: %s MaxSize: %s Control: 0x%02X" % (
int(MsgSQN, 16), MsgSrcAddr, MsgEP, int(MsgFileOffset, 16), intMsgImageVersion, intMsgImageType, intMsgManufCode, MsgBlockRequestDelay, MsgMaxDataSize, intMsgFieldControl, ),)

if self.ListInUpdate["Process"] is None:
Expand All @@ -1030,18 +1102,15 @@ def prepare_and_send_block(self, MsgSrcAddr, MsgEP, MsgFileOffset, intMsgImageVe
else:
self.ListInUpdate["Process"] = "OnGoing"

display_percentage_progress(self, MsgSrcAddr, MsgEP, intMsgImageType, MsgFileOffset)

self.ListInUpdate["Status"] = "Block requested"
self.ListInUpdate["intFileOffset"] = int(MsgFileOffset, 16)
self.ListInUpdate["LastBlockSent"] = time.time()

# self. ota_management( MsgSrcAddr, MsgEP )

logging( self, "Debug", "ota_image_block_request - Block Request for %s/%s Image Type: 0x%04X Image Version: %08X Seq: %s Offset: %s Size: %s FieldCtrl: 0x%02X" % (
logging( self, "Debug", "prepare_and_send_block - Block Request for %s/%s Image Type: 0x%04X Image Version: %08X Seq: %s Offset: %s Size: %s FieldCtrl: 0x%02X" % (
MsgSrcAddr, block_request["ReqEp"], block_request["ImageType"], block_request["ImageVersion"], MsgSQN, block_request["Offset"], block_request["MaxDataSize"], block_request["FieldControl"], ),)

ota_send_block(self, MsgSrcAddr, MsgEP, intMsgImageType, intMsgImageVersion, block_request, disable_ack=disableACK)
display_percentage_progress(self, MsgSrcAddr, MsgEP, intMsgImageType, MsgFileOffset)


def initialize_block_request(self, MsgSrcAddr, MsgEP, MsgFileOffset, intMsgImageVersion, intMsgImageType, intMsgManufCode, MsgBlockRequestDelay, MsgMaxDataSize, intMsgFieldControl, MsgSQN):
Expand Down Expand Up @@ -1190,11 +1259,13 @@ def notify_upgrade_end(

self.adminWidgets.updateNotificationWidget(self.Devices, _textmsg)


def convert_time(seconds):
hours, remainder = divmod(seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return hours, minutes, seconds


def _logging_headers(self, headers): # OK 13/10

if not self.pluginconf.pluginConf.get("debugOTA", False):
Expand Down Expand Up @@ -1243,8 +1314,10 @@ def display_percentage_progress(self, MsgSrcAddr, MsgEP, intMsgImageType, MsgFil
_completion = round((int(MsgFileOffset, 16) / _size) * 100, 1)

if _completion % 5 == 0:
logging(self, "Status", f"Firmware transfer for {MsgSrcAddr}/{MsgEP} - Progress: {_completion:4.1f} %")
update_firmware_health(self, MsgSrcAddr, _completion)


def update_firmware_health(self, MsgSrcAddr, completion):
firmware_update_health = self.PluginHealth.setdefault("Firmware Update", {})

Expand Down Expand Up @@ -1304,6 +1377,7 @@ def start_upgrade_infos(self, MsgSrcAddr, intMsgImageType, intMsgManufCode, MsgF
)
self.adminWidgets.updateNotificationWidget(self.Devices, _textmsg)


def loading_zigbee_ota_index( self ):

self.zigbee_ota_index = []
Expand Down

0 comments on commit 6852052

Please sign in to comment.