From 2115663116cb7aad5dbb6dcba16f6d5af287be43 Mon Sep 17 00:00:00 2001 From: Sean Rees Date: Fri, 25 Nov 2022 10:09:49 +0000 Subject: [PATCH 1/2] Fix reconnect bug and add watchdog timer We called disconnect() twice on the underlying libdyson object, which resulted in an unhandled exception and the process hanging. Unfortunately addressing this does not reliably result in reconnects; so also add a default 5 minute watchdog timer. If the timer expires, the process aborts so systemd can restart it. --- BUILD | 2 +- connect.py | 117 ++++++++++++++++++++++++++++++++++++++++------------- main.py | 55 ++++++++++++++++--------- 3 files changed, 126 insertions(+), 48 deletions(-) diff --git a/BUILD b/BUILD index e591cd3..4befa73 100644 --- a/BUILD +++ b/BUILD @@ -124,5 +124,5 @@ pkg_deb( package = "prometheus-dyson", postrm = "debian/postrm", prerm = "debian/prerm", - version = "0.4.0", + version = "0.4.1", ) diff --git a/connect.py b/connect.py index 4a4716c..8d8ef8b 100644 --- a/connect.py +++ b/connect.py @@ -1,7 +1,9 @@ """Wraps libdyson's connections with support for config & retries.""" +import time import functools import logging +import os import threading from typing import Callable, Dict, List, Optional @@ -31,8 +33,8 @@ class DeviceWrapper: def __init__(self, device: config.Device, environment_refresh_secs=30): self._config_device = device self._environment_refresh_secs = environment_refresh_secs - self._environment_timer : Optional[threading.Timer] = None - self._timeout_timer : Optional[threading.Timer] = None + self._environment_timer: Optional[threading.Timer] = None + self._timeout_timer: Optional[threading.Timer] = None self.libdyson = self._create_libdyson_device() @property @@ -61,16 +63,19 @@ def connect(self, host: str, retry_on_timeout_secs: int = 30): if self.is_connected: logger.info( - 'Already connected to %s (%s); no need to reconnect.', host, self.serial) + "Already connected to %s (%s); no need to reconnect.", host, self.serial + ) else: try: self.libdyson.connect(host) self._refresh_timer() except libdyson.exceptions.DysonConnectTimeout: logger.error( - 'Timeout connecting to %s (%s); will retry', host, self.serial) + "Timeout connecting to %s (%s); will retry", host, self.serial + ) self._timeout_timer = threading.Timer( - retry_on_timeout_secs, self.connect, args=[host]) + retry_on_timeout_secs, self.connect, args=[host] + ) self._timeout_timer.start() def disconnect(self): @@ -80,30 +85,36 @@ def disconnect(self): if self._timeout_timer: self._timeout_timer.cancel() - self.libdyson.disconnect() + # libdyson will handle disconnects on its own and will raise if you + # try to disconnect a second time. + if self.libdyson.is_connected: + self.libdyson.disconnect() def _refresh_timer(self): - self._environment_timer = threading.Timer(self._environment_refresh_secs, - self._timer_callback) + self._environment_timer = threading.Timer( + self._environment_refresh_secs, self._timer_callback + ) self._environment_timer.start() def _timer_callback(self): self._environment_timer = None if self.is_connected: - logger.debug( - 'Requesting updated environmental data from %s', self.serial) + logger.debug("Requesting updated environmental data from %s", self.serial) try: - self.libdyson.request_environmental_data() + self.libdyson.request_environmental_data() except AttributeError: - logger.error('Race with a disconnect? Skipping an iteration.') + logger.error("Race with a disconnect? Skipping an iteration.") self._refresh_timer() else: - logger.debug('Device %s is disconnected.', self.serial) + logger.debug("Device %s is disconnected.", self.serial) def _create_libdyson_device(self): - return libdyson.get_device(self.serial, self._config_device.credentials, - self._config_device.product_type) + return libdyson.get_device( + self.serial, + self._config_device.credentials, + self._config_device.product_type, + ) class ConnectionManager: @@ -113,16 +124,30 @@ class ConnectionManager: update_fn: A callable taking a name, serial, devices: a list of config.Device entities hosts: a dict of serial -> IP address, for direct (non-zeroconf) connections. + reconnect: True if we should automatically reconnect, False otherwise + watchdog_secs: Number of seconds to wait before terminating the process (0 for no watchdog) """ - def __init__(self, update_fn: Callable[[str, str, bool, bool], None], - devices: List[config.Device], hosts: Dict[str, str], reconnect: bool = True): + def __init__( + self, + update_fn: Callable[[str, str, bool, bool], None], + devices: List[config.Device], + hosts: Dict[str, str], + reconnect: bool = True, + watchdog_secs: int = 300, + ): self._update_fn = update_fn self._hosts = hosts self._reconnect = reconnect self._devices = [DeviceWrapper(d) for d in devices] - logger.info('Starting discovery...') + self._last_update_time = int(time.time()) + self._watchdog_secs = watchdog_secs + if watchdog_secs: + logger.info("Starting process watchdog with %d sec timeout", watchdog_secs) + self._start_watchdog() + + logger.info("Starting discovery...") self._discovery = libdyson.discovery.DysonDiscovery() self._discovery.start_discovery() @@ -133,10 +158,34 @@ def shutdown(self) -> None: """Disconnects from all devices.""" self._discovery.stop_discovery() + if self._watchdog_timer: + self._watchdog_timer.cancel() + for device in self._devices: - logger.info('Disconnecting from %s (%s)', device.name, device.serial) + logger.info("Disconnecting from %s (%s)", device.name, device.serial) device.disconnect() + def _start_watchdog(self): + self._watchdog_timer = threading.Timer( + self._watchdog_secs, self._watchdog_callback + ) + self._watchdog_timer.start() + + def _watchdog_callback(self): + now = int(time.time()) + delta = now - self._last_update_time + if delta > self._watchdog_secs: + logger.error( + "Watchdog process abort: last update was %d seconds ago -- process hung?", + delta, + ) + + # Use os.abort() here to force a crash. sys.exit will raise a SystemExit exception and + # walk the exception handlers, which might take a while. + os.abort() + else: + self._start_watchdog() + def _add_device(self, device: DeviceWrapper, add_listener=True): """Adds and connects to a device. @@ -154,12 +203,19 @@ def _add_device(self, device: DeviceWrapper, add_listener=True): manual_ip = self._hosts.get(device.serial.upper()) if manual_ip: - logger.info('Attempting connection to device "%s" (serial=%s) via configured IP %s', - device.name, device.serial, manual_ip) + logger.info( + 'Attempting connection to device "%s" (serial=%s) via configured IP %s', + device.name, + device.serial, + manual_ip, + ) device.connect(manual_ip) else: - logger.info('Attempting to discover device "%s" (serial=%s) via zeroconf', - device.name, device.serial) + logger.info( + 'Attempting to discover device "%s" (serial=%s) via zeroconf', + device.name, + device.serial, + ) callback_fn = functools.partial(self._discovery_callback, device) self._discovery.register_device(device.libdyson, callback_fn) @@ -170,14 +226,16 @@ def _discovery_callback(cls, device: DeviceWrapper, address: str): # When we call connect() on libpurecool or libdyson, that code spawns # a new thread for MQTT and returns. In other words: we don't need to # worry about connect() blocking zeroconf here. - logger.info('Discovered %s on %s', device.serial, address) + logger.info("Discovered %s on %s", device.serial, address) device.connect(address) def _device_callback(self, device, message): - logger.debug('Received update from %s: %s', device.serial, message) + logger.debug("Received update from %s: %s", device.serial, message) if not device.is_connected and self._reconnect: logger.info( - 'Device %s is now disconnected, clearing it and re-adding', device.serial) + "Device %s is now disconnected, clearing it and re-adding", + device.serial, + ) device.disconnect() self._discovery.stop_discovery() self._discovery.start_discovery() @@ -186,6 +244,7 @@ def _device_callback(self, device, message): is_state = message == libdyson.MessageType.STATE is_environ = message == libdyson.MessageType.ENVIRONMENTAL - self._update_fn(device.name, device.libdyson, is_state=is_state, - is_environmental=is_environ) - + self._last_update_time = int(time.time()) + self._update_fn( + device.name, device.libdyson, is_state=is_state, is_environmental=is_environ + ) diff --git a/main.py b/main.py index 4617569..55fe677 100755 --- a/main.py +++ b/main.py @@ -28,54 +28,73 @@ def _sleep_forever() -> None: def main(argv): """Main body of the program.""" parser = argparse.ArgumentParser(prog=argv[0]) - parser.add_argument('--port', help='HTTP server port', - type=int, default=8091) + parser.add_argument("--port", help="HTTP server port", type=int, default=8091) parser.add_argument( - '--config', help='Configuration file (INI file)', default='config.ini') + "--config", help="Configuration file (INI file)", default="config.ini" + ) parser.add_argument( - '--log_level', help='Logging level (DEBUG, INFO, WARNING, ERROR)', type=str, default='INFO') + "--log_level", + help="Logging level (DEBUG, INFO, WARNING, ERROR)", + type=str, + default="INFO", + ) parser.add_argument( - '--include_inactive_devices', - help='Do not use; this flag has no effect and remains for compatibility only', - action='store_true') + "--watchdog_timeout_seconds", + help="Timeout to abort the process if exceeded (0 for no watchdog)", + type=int, + default=300, + ) + parser.add_argument( + "--include_inactive_devices", + help="Do not use; this flag has no effect and remains for compatibility only", + action="store_true", + ) + args = parser.parse_args() try: level = getattr(logging, args.log_level) except AttributeError: - print(f'Invalid --log_level: {args.log_level}') + print(f"Invalid --log_level: {args.log_level}") sys.exit(-1) args = parser.parse_args() logging.basicConfig( - format='%(asctime)s [%(name)24s %(thread)d] %(levelname)10s %(message)s', - datefmt='%Y/%m/%d %H:%M:%S', - level=level) + format="%(asctime)s [%(name)24s %(thread)d] %(levelname)10s %(message)s", + datefmt="%Y/%m/%d %H:%M:%S", + level=level, + ) - logger.info('Starting up on port=%s', args.port) + logger.info("Starting up on port=%s", args.port) if args.include_inactive_devices: logger.warning( - '--include_inactive_devices is now inoperative and will be removed in a future release') + "--include_inactive_devices is now inoperative and will be removed in a future release" + ) try: cfg = config.Config(args.config) except: - logger.exception('Could not load configuration: %s', args.config) + logger.exception("Could not load configuration: %s", args.config) sys.exit(-1) devices = cfg.devices if len(devices) == 0: logger.fatal( - 'No devices configured; please re-run this program with --create_device_cache.') + "No devices configured; please re-run this program with --create_device_cache." + ) sys.exit(-2) prometheus_client.start_http_server(args.port) - - connect.ConnectionManager(metrics.Metrics().update, devices, cfg.hosts) + connect.ConnectionManager( + metrics.Metrics().update, + devices, + cfg.hosts, + watchdog_secs=args.watchdog_timeout_seconds, + ) _sleep_forever() -if __name__ == '__main__': +if __name__ == "__main__": main(sys.argv) From d1d66b4155ad834bb894965b34499c0ca7a713a1 Mon Sep 17 00:00:00 2001 From: Sean Rees Date: Fri, 25 Nov 2022 10:12:56 +0000 Subject: [PATCH 2/2] Add default tasks --- .vscode/tasks.json | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 .vscode/tasks.json diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000..73dbb06 --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,29 @@ +{ + // See https://go.microsoft.com/fwlink/?LinkId=733558 + // for the documentation about the tasks.json format + "version": "2.0.0", + "tasks": [ + { + "label": "Build", + "type": "shell", + "command": "bazel build ...", + "group": { + "kind": "build", + "isDefault": true + } + }, + { + "label": "Test", + "type": "shell", + "command": "bazel test ...", + "group": { + "kind": "test", + } + }, + { + "label": "Package", + "type": "shell", + "command": "bazel build :main-deb", + }, + ] +} \ No newline at end of file