From ebd7e47481f0e977931ca37b693e38c617157ef9 Mon Sep 17 00:00:00 2001 From: Eran Kutner <5628151+ekutner@users.noreply.github.com> Date: Thu, 8 Feb 2024 22:47:49 +0200 Subject: [PATCH] Refactor health monitoring to allow multiple instances --- home_connect_async/__init__.py | 2 +- home_connect_async/api.py | 9 ++-- home_connect_async/common.py | 77 +++++++++++++++---------------- home_connect_async/homeconnect.py | 29 +++++++----- setup.py | 2 +- 5 files changed, 61 insertions(+), 58 deletions(-) diff --git a/home_connect_async/__init__.py b/home_connect_async/__init__.py index 0739cc1..a3624bf 100644 --- a/home_connect_async/__init__.py +++ b/home_connect_async/__init__.py @@ -1,6 +1,6 @@ from .homeconnect import HomeConnect from .appliance import Appliance from .auth import AuthManager, AbstractAuth -from .common import GlobalStatus, HomeConnectError, ConditionalLogger +from .common import HealthStatus, HomeConnectError, ConditionalLogger from .const import Events diff --git a/home_connect_async/api.py b/home_connect_async/api.py index a92af36..a7a718c 100644 --- a/home_connect_async/api.py +++ b/home_connect_async/api.py @@ -6,7 +6,7 @@ from aiohttp import ClientResponse from .auth import AbstractAuth -from .common import ConditionalLogger, HomeConnectError, GlobalStatus +from .common import ConditionalLogger, HomeConnectError, HealthStatus _LOGGER = logging.getLogger(__name__) @@ -43,9 +43,10 @@ def error_description(self) -> str | None: return None - def __init__(self, auth:AbstractAuth, lang:str=None): + def __init__(self, auth:AbstractAuth, lang:str, health:HealthStatus): self._auth = auth self._lang = lang + self._health = health self._call_counter = 0 async def _async_request(self, method:str, endpoint:str, data=None) -> ApiResponse: @@ -84,9 +85,9 @@ async def _async_request(self, method:str, endpoint:str, data=None) -> ApiRespon if response.status == 429: # Too Many Requests wait_time = response.headers.get('Retry-After') _LOGGER.debug('HTTP Error 429 - Too Many Requests. Sleeping for %s seconds and will retry', wait_time) - GlobalStatus.set_status(GlobalStatus.Status.BLOCKED, int(wait_time)) + self._health.set_status(self._health.Status.BLOCKED, int(wait_time)) await asyncio.sleep(int(wait_time)+1) - GlobalStatus.unset_status(GlobalStatus.Status.BLOCKED) + self._health.unset_status(self._health.Status.BLOCKED) elif method in ["PUT", "DELETE"] and response.status == 204: result = self.ApiResponse(response, None) return result diff --git a/home_connect_async/common.py b/home_connect_async/common.py index 3abff3a..7903be7 100644 --- a/home_connect_async/common.py +++ b/home_connect_async/common.py @@ -17,22 +17,22 @@ class LogMode(IntFlag): _log_flags:LogMode = None @classmethod - def mode(cls, log_flags:LogMode=None) -> LogMode: + def mode(self, log_flags:LogMode=None) -> LogMode: """ Gets or Sets the log flags for conditional logging """ if log_flags: - cls._log_flags = log_flags - return cls._log_flags + self._log_flags = log_flags + return self._log_flags @classmethod - def ismode(cls, logmode:LogMode) -> bool: + def ismode(self, logmode:LogMode) -> bool: """ Check if the specified logmode is enabled """ - return cls._log_flags & logmode + return self._log_flags & logmode @classmethod - def debug(cls, logger:Logger, logmode:LogMode, *args, **kwargs ) -> None: + def debug(self, logger:Logger, logmode:LogMode, *args, **kwargs ) -> None: """ Conditional debug log """ - if cls._log_flags & logmode: + if self._log_flags & logmode: logger.debug(*args, **kwargs) @@ -59,8 +59,8 @@ class Synchronization(): selected_program_lock = asyncio.Lock() -class GlobalStatus: - """ Store a global status for the library """ +class HealthStatus: + """ Store the Home Connect connection health status """ class Status(IntFlag): """ Enum for the current status of the Home Connect data loading process """ INIT = 0 @@ -72,50 +72,45 @@ class Status(IntFlag): LOADING_FAILED = 8 BLOCKED = 16 - _status:Status = Status.INIT - _blocked_until:datetime = None + def __init__(self) -> None: + self._status:self.Status = self.Status.INIT + self._blocked_until:datetime = None - @classmethod - def set_status(cls, status:Status, delay:int=None) -> None: + def set_status(self, status:Status, delay:int=None) -> None: """ Set the status """ - cls._status |= status + self._status |= status if delay: - cls._blocked_until = datetime.now() + timedelta(seconds=delay) + self._blocked_until = datetime.now() + timedelta(seconds=delay) - @classmethod - def unset_status(cls, status:Status) -> None: + def unset_status(self, status:Status) -> None: """ Set the status """ - cls._status &= ~status - if status == cls.Status.BLOCKED: - cls._blocked_until = None + self._status &= ~status + if status == self.Status.BLOCKED: + self._blocked_until = None - @classmethod - def get_status(cls) -> Status: + def get_status(self) -> Status: """ Get the status """ - if cls._status & cls.Status.BLOCKED: - return cls.Status.BLOCKED - elif cls._status & cls.Status.LOADING_FAILED: - return cls.Status.LOADING_FAILED - return cls._status + if self._status & self.Status.BLOCKED: + return self.Status.BLOCKED + elif self._status & self.Status.LOADING_FAILED: + return self.Status.LOADING_FAILED + return self._status - @classmethod - def get_status_str(cls) -> str: + def get_status_str(self) -> str: """ Return the status as a formatted string""" - if cls._blocked_until: - return f"Blocked for {cls.get_block_time_str()}" - elif cls._status & cls.Status.LOADING_FAILED: - return cls.Status.LOADING_FAILED.name + if self._blocked_until: + return f"Blocked for {self.get_block_time_str()}" + elif self._status & self.Status.LOADING_FAILED: + return self.Status.LOADING_FAILED.name else: - return cls._status.name + return self._status.name - @classmethod - def get_blocked_until(cls): - return cls._blocked_until + def get_blocked_until(self): + return self._blocked_until - @classmethod - def get_block_time_str(cls): - if cls._blocked_until: - delta = (cls._blocked_until - datetime.now()).seconds + def get_block_time_str(self): + if self._blocked_until: + delta = (self._blocked_until - datetime.now()).seconds if delta < 60: return f"{delta}s" else: diff --git a/home_connect_async/homeconnect.py b/home_connect_async/homeconnect.py index eb94abe..bc76bc1 100644 --- a/home_connect_async/homeconnect.py +++ b/home_connect_async/homeconnect.py @@ -14,7 +14,7 @@ from aiohttp_sse_client.client import MessageEvent from .const import Events -from .common import ConditionalLogger, HomeConnectError, GlobalStatus +from .common import ConditionalLogger, HomeConnectError, HealthStatus from .callback_registery import CallbackRegistry from .appliance import Appliance from .auth import AuthManager @@ -55,6 +55,7 @@ class RefreshMode(Enum): _api:Optional[HomeConnectApi] = field(default=None, metadata=config(encoder=lambda val: None, exclude=lambda val: True)) _updates_task:Optional[Task] = field(default=None, metadata=config(encoder=lambda val: None, exclude=lambda val: True)) _load_task:Optional[Task] = field(default=None, metadata=config(encoder=lambda val: None, exclude=lambda val: True)) + _health:Optional[HealthStatus] = field(default=None, metadata=config(encoder=lambda val: None, exclude=lambda val: True)) _callbacks:Optional[CallbackRegistry] = field(default_factory=lambda: CallbackRegistry(), metadata=config(encoder=lambda val: None, exclude=lambda val: True)) _sse_timeout:Optional[int] = field(default=None) @@ -82,7 +83,8 @@ async def async_create(cls, If auto_update is set to False then subscribe_for_updates() should be called to receive real-time updates to the data """ - api = HomeConnectApi(am, lang) + health = HealthStatus() + api = HomeConnectApi(am, lang, health) hc:HomeConnect = None if json_data: try: @@ -99,6 +101,7 @@ async def async_create(cls, hc = HomeConnect() hc._api = api + hc._health = health hc._refresh_mode = refresh hc._disabled_appliances = disabled_appliances hc._sse_timeout = sse_timeout @@ -135,8 +138,8 @@ async def async_load_data(self, ) -> None: """ Loads or just refreshes the data model from the cloud service """ #self.status |= self.HomeConnectStatus.LOADING - GlobalStatus.set_status(GlobalStatus.Status.RUNNING) - GlobalStatus.unset_status(GlobalStatus.Status.LOADING_FAILED) + self._health.set_status(self._health.Status.RUNNING) + self._health.unset_status(self._health.Status.LOADING_FAILED) try: if refresh == self.RefreshMode.NOTHING: @@ -182,11 +185,11 @@ async def async_load_data(self, del self.appliances[haId] #self.status |= self.HomeConnectStatus.LOADED - GlobalStatus.set_status(GlobalStatus.Status.LOADED) + self._health.set_status(self._health.Status.LOADED) except Exception as ex: _LOGGER.warning("Failed to load data from Home Connect (%s)", str(ex), exc_info=ex) #self.status = self.HomeConnectStatus.LOADING_FAILED - GlobalStatus.set_status(GlobalStatus.Status.LOADING_FAILED) + self._health.set_status(self._health.Status.LOADING_FAILED) if on_error: if inspect.iscoroutinefunction(on_error): await on_error(self, ex) @@ -235,6 +238,10 @@ def __getitem__(self, haId) -> Appliance: """ Supports simple access to an appliance based on its haId """ return self.appliances.get(haId) + @property + def health(self): + return self._health + #region - Event stream and updates @@ -258,7 +265,7 @@ def parse_sse_error(error:str) -> int: event_source = await self._api.async_get_event_stream('/api/homeappliances/events', self._sse_timeout) await event_source.connect() #self.status |= self.HomeConnectStatus.UPDATES - GlobalStatus.set_status(GlobalStatus.Status.UPDATES) + self._health.set_status(self._health.Status.UPDATES) async for event in event_source: _LOGGER.debug("Received event from SSE stream: %s", str(event)) @@ -271,11 +278,11 @@ def parse_sse_error(error:str) -> int: break except ConnectionRefusedError as ex: #self.status &= self.HomeConnectStatus.NOUPDATES - GlobalStatus.unset_status(GlobalStatus.Status.UPDATES) + self._health.unset_status(self._health.Status.UPDATES) _LOGGER.debug('ConnectionRefusedError in SSE connection refused. Will try again', exc_info=ex) except ConnectionError as ex: #self.status &= self.HomeConnectStatus.NOUPDATES - GlobalStatus.unset_status(GlobalStatus.Status.UPDATES) + self._health.unset_status(self._health.Status.UPDATES) error_code = parse_sse_error(ex.args[0]) if error_code == 429: backoff *= 2 @@ -294,7 +301,7 @@ def parse_sse_error(error:str) -> int: _LOGGER.debug("The SSE connection timeed-out, will renew and retry") except Exception as ex: #self.status &= self.HomeConnectStatus.NOUPDATES - GlobalStatus.unset_status(GlobalStatus.Status.UPDATES) + self._health.unset_status(self._health.Status.UPDATES) _LOGGER.debug('Exception in SSE event stream. Will wait for %d seconds and retry ', backoff, exc_info=ex) await asyncio.sleep(backoff) backoff *= 2 @@ -306,7 +313,7 @@ def parse_sse_error(error:str) -> int: event_source = None #self.status &= self.HomeConnectStatus.NOUPDATES - GlobalStatus.unset_status(GlobalStatus.Status.UPDATES) + self._health.unset_status(self._health.Status.UPDATES) _LOGGER.debug("Exiting SSE event stream") diff --git a/setup.py b/setup.py index 721e82f..cae7ded 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name = 'home-connect-async', packages = ['home_connect_async'], - version = '0.7.16', + version = '0.8.0', license='MIT', description = 'Async SDK for BSH Home Connect API', author = 'Eran Kutner',