Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zigpyThread resilience #1783

Merged
merged 1 commit into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions Classes/ZigpyTransport/Transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@
import zigpy.types as t

from Classes.ZigateTransport.sqnMgmt import sqn_init_stack
from Classes.ZigpyTransport.forwarderThread import (forwarder_thread,
start_forwarder_thread,
from Classes.ZigpyTransport.forwarderThread import (start_forwarder_thread,
stop_forwarder_thread)
from Classes.ZigpyTransport.instrumentation import (
instrument_log_command_open, instrument_sendData, open_capture_rx_frames)
from Classes.ZigpyTransport.zigpyThread import (start_zigpy_thread,
stop_zigpy_thread,
zigpy_thread)
stop_zigpy_thread)


class ZigpyTransport(object):
Expand Down
128 changes: 98 additions & 30 deletions Classes/ZigpyTransport/zigpyThread.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,17 @@

import asyncio
import asyncio.events
import binascii
import contextlib
import json
import queue
import random
import sys
import time
import traceback
from pathlib import Path
from threading import Thread
from typing import Any, Optional

import serial
import serial_asyncio as pyserial_asyncio
import zigpy.config
import zigpy.device
import zigpy.exceptions
Expand Down Expand Up @@ -73,36 +71,70 @@ def stop_zigpy_thread(self):


def start_zigpy_thread(self):
self.log.logging("TransportZigpy", "Debug", "start_zigpy_thread - Starting zigpy thread")
self.log.logging("TransportZigpy", "Debug", "start_zigpy_thread - Starting Zigpy thread")

# Set appropriate event loop policy for Windows compatibility
if sys.platform == "win32" and (3, 8, 0) <= sys.version_info < (3, 9, 0):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

setup_zigpy_thread(self)
# Start the Zigpy thread if it's not already running
if not hasattr(self, 'zigpy_thread') or not self.zigpy_thread or not self.zigpy_thread.is_alive():
setup_zigpy_thread(self)
else:
self.log.logging("TransportZigpy", "Warning", "start_zigpy_thread - Zigpy thread is already running.")


def setup_zigpy_thread(self):
self.log.logging("TransportZigpy", "Debug", "setup_zigpy_thread - Starting zigpy thread")
self.zigpy_thread = Thread(name=f"ZigpyCom_{self.hardwareid}", target=zigpy_thread, args=(self,))
"""Setup and start the Zigpy thread."""
self.log.logging("TransportZigpy", "Debug", "setup_zigpy_thread - Initializing Zigpy thread")

# Create and start a new thread
self.zigpy_thread = Thread(name=f"ZigpyCom_{self.hardwareid}", target=zigpy_thread_function, args=(self,))
self.zigpy_thread.daemon = True
self.zigpy_thread.start()
self.log.logging("TransportZigpy", "Debug", "setup_zigpy_thread - Zigpy thread started")

self.log.logging("TransportZigpy", "Debug", "setup_zigpy_thread - zigpy thread started")

def zigpy_thread_function(self):
"""Initialize and run the Zigpy event loop for Zigbee communication."""
self.log.logging("TransportZigpy", "Log", "zigpyThread starting with a random sleep")

def zigpy_thread(self):
self.zigpy_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.zigpy_loop)
# Adding a random delay to stagger thread start times
time.sleep(random.uniform(0.5, 3.5))

if self.pluginconf.pluginConf["EventLoopInstrumentation"]:
self.zigpy_loop.set_debug(enabled=True)
# Create a new event loop for this thread
zigpy_loop = asyncio.new_event_loop()
asyncio.set_event_loop(zigpy_loop)

# Enable debug mode if specified in configuration
if self.pluginconf.pluginConf.get("EventLoopInstrumentation", False):
zigpy_loop.set_debug(True)

self.log.logging("TransportZigpy", "Log", f"zigpyThread EventLoop: {zigpy_loop}")

self.log.logging("TransportZigpy", "Log", "zigpyThread EventLoop : %s" %self.zigpy_loop)
try:
self.zigpy_loop.run_until_complete(start_zigpy_task(self, channel=0, extended_pan_id=0))
# Run the Zigpy task asynchronously
zigpy_loop.run_until_complete(start_zigpy_task(self, channel=0, extended_pan_id=0))

except asyncio.CancelledError:
# Handle cancellation gracefully if the loop is stopped externally
self.log.logging("TransportZigpy", "Warning", "zigpy_thread was cancelled.")

except RuntimeError as e:
# Handle cases like "Cannot run the event loop while another loop is running"
self.log.logging("TransportZigpy", "Error", f"zigpy_thread encountered a runtime error: {e}")

except Exception as e:
self.log.logging("TransportZigpy", "Error", "zigpy_thread error when starting %s" %e)
# Log any errors encountered during execution
self.log.logging("TransportZigpy", "Error", f"zigpy_thread error when starting: {e}")

finally:
self.zigpy_loop.close()
# Ensure the event loop is closed
if not zigpy_loop.is_closed():
zigpy_loop.close()
self.log.logging("TransportZigpy", "Log", "Event loop closed successfully in zigpy_thread.")
else:
self.log.logging("TransportZigpy", "Log", "Event loop was already closed in zigpy_thread.")


async def start_zigpy_task(self, channel, extended_pan_id):
Expand All @@ -120,12 +152,29 @@ async def start_zigpy_task(self, channel, extended_pan_id):

self.log.logging( "TransportZigpy", "Debug", f"start_zigpy_task -extendedPANID {self.pluginconf.pluginConf['extendedPANID']} {extended_pan_id}", )

await radio_start(self, self.statistics, self.pluginconf, self.use_of_zigpy_persistent_db, self._radiomodule, self._serialPort, set_channel=channel, set_extendedPanId=extended_pan_id),
try:
await radio_start(self, self.statistics, self.pluginconf, self.use_of_zigpy_persistent_db, self._radiomodule, self._serialPort, set_channel=channel, set_extendedPanId=extended_pan_id),

except Exception as e:
self.log.logging("TransportZigpy", "Error", f"start_zigpy_task error in radio_start: {e}")

# Run forever
self.writer_queue = queue.Queue() # We MUST use queue and not asyncio.Queue, because it is not compatible with the Domoticz framework

await worker_loop(self)
try:
await worker_loop(self)

except asyncio.CancelledError:
# Handle cancellation gracefully if the loop is stopped externally
self.log.logging("TransportZigpy", "Warning", "start_zigpy_task worker_loop(self) was cancelled.")

except RuntimeError as e:
# Handle cases like "Cannot run the event loop while another loop is running"
self.log.logging("TransportZigpy", "Error", f"start_zigpy_task worker_loop(self) encountered a runtime error: {e}")

except Exception as e:
# Log any errors encountered during execution
self.log.logging("TransportZigpy", "Error", f"start_zigpy_task worker_loop(self) error: {e}")

# We exit the worker_loop, shutdown time
await self.app.shutdown()
Expand Down Expand Up @@ -157,6 +206,7 @@ async def radio_start(self, statistics, pluginconf, use_of_zigpy_persistent_db,
try:
if radiomodule == "ezsp":
import bellows.config as radio_specific_conf

from Classes.ZigpyTransport.AppBellows import App_bellows as App

config = ezsp_configuration_setup(self, radio_specific_conf, serialPort)
Expand Down Expand Up @@ -369,19 +419,37 @@ def display_network_infos(self):
async def worker_loop(self):
self.log.logging("TransportZigpy", "Debug", "worker_loop - ZigyTransport: worker_loop start.")

while self.zigpy_running:
command_to_send = await get_next_command(self)
try:
while self.zigpy_running:
try:
# Fetch the next command to process, waits if queue is empty
command_to_send = await get_next_command(self)

# Break the loop if no command was retrieved due to an error
if command_to_send is None:
continue

# Handle the stop command and exit the loop gracefully
if command_to_send == "STOP":
self.log.logging("TransportZigpy", "Debug", "worker_loop - Shutting down ... exit.")
self.zigpy_running = False
break

# Process the received command
await process_incoming_command(self, command_to_send)

except asyncio.CancelledError:
# Gracefully handle cancellation
self.log.logging("TransportZigpy", "Debug", "worker_loop - Task was cancelled.")
break

if command_to_send is None:
continue
except Exception as e:
# Log unexpected exceptions but continue processing other commands
self.log.logging("TransportZigpy", "Error", f"Unexpected error in worker_loop: {e}")

if command_to_send == "STOP":
# Shutting down
self.log.logging("TransportZigpy", "Debug", "worker_loop - Shutting down ... exit.")
self.zigpy_running = False
break

await process_incoming_command(self, command_to_send),
finally:
# Final cleanup if needed
self.log.logging("TransportZigpy", "Debug", "worker_loop - Exiting loop and cleaning up resources.")


async def process_incoming_command(self, command_to_send):
Expand Down
Loading