Skip to content

Commit

Permalink
Merge pull request #21 from seanrees/watchdog
Browse files Browse the repository at this point in the history
Fix reconnect bug and add watchdog timer and bump to 0.4.1
  • Loading branch information
seanrees authored Nov 25, 2022
2 parents 2d8bc3a + d1d66b4 commit bac75b8
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 48 deletions.
29 changes: 29 additions & 0 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
// See https://go.microsoft.com/fwlink/?LinkId=733558
// for the documentation about the tasks.json format
"version": "2.0.0",
"tasks": [
{
"label": "Build",
"type": "shell",
"command": "bazel build ...",
"group": {
"kind": "build",
"isDefault": true
}
},
{
"label": "Test",
"type": "shell",
"command": "bazel test ...",
"group": {
"kind": "test",
}
},
{
"label": "Package",
"type": "shell",
"command": "bazel build :main-deb",
},
]
}
2 changes: 1 addition & 1 deletion BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -124,5 +124,5 @@ pkg_deb(
package = "prometheus-dyson",
postrm = "debian/postrm",
prerm = "debian/prerm",
version = "0.4.0",
version = "0.4.1",
)
117 changes: 88 additions & 29 deletions connect.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Wraps libdyson's connections with support for config & retries."""

import time
import functools
import logging
import os
import threading

from typing import Callable, Dict, List, Optional
Expand Down Expand Up @@ -31,8 +33,8 @@ class DeviceWrapper:
def __init__(self, device: config.Device, environment_refresh_secs=30):
self._config_device = device
self._environment_refresh_secs = environment_refresh_secs
self._environment_timer : Optional[threading.Timer] = None
self._timeout_timer : Optional[threading.Timer] = None
self._environment_timer: Optional[threading.Timer] = None
self._timeout_timer: Optional[threading.Timer] = None
self.libdyson = self._create_libdyson_device()

@property
Expand Down Expand Up @@ -61,16 +63,19 @@ def connect(self, host: str, retry_on_timeout_secs: int = 30):

if self.is_connected:
logger.info(
'Already connected to %s (%s); no need to reconnect.', host, self.serial)
"Already connected to %s (%s); no need to reconnect.", host, self.serial
)
else:
try:
self.libdyson.connect(host)
self._refresh_timer()
except libdyson.exceptions.DysonConnectTimeout:
logger.error(
'Timeout connecting to %s (%s); will retry', host, self.serial)
"Timeout connecting to %s (%s); will retry", host, self.serial
)
self._timeout_timer = threading.Timer(
retry_on_timeout_secs, self.connect, args=[host])
retry_on_timeout_secs, self.connect, args=[host]
)
self._timeout_timer.start()

def disconnect(self):
Expand All @@ -80,30 +85,36 @@ def disconnect(self):
if self._timeout_timer:
self._timeout_timer.cancel()

self.libdyson.disconnect()
# libdyson will handle disconnects on its own and will raise if you
# try to disconnect a second time.
if self.libdyson.is_connected:
self.libdyson.disconnect()

def _refresh_timer(self):
self._environment_timer = threading.Timer(self._environment_refresh_secs,
self._timer_callback)
self._environment_timer = threading.Timer(
self._environment_refresh_secs, self._timer_callback
)
self._environment_timer.start()

def _timer_callback(self):
self._environment_timer = None

if self.is_connected:
logger.debug(
'Requesting updated environmental data from %s', self.serial)
logger.debug("Requesting updated environmental data from %s", self.serial)
try:
self.libdyson.request_environmental_data()
self.libdyson.request_environmental_data()
except AttributeError:
logger.error('Race with a disconnect? Skipping an iteration.')
logger.error("Race with a disconnect? Skipping an iteration.")
self._refresh_timer()
else:
logger.debug('Device %s is disconnected.', self.serial)
logger.debug("Device %s is disconnected.", self.serial)

def _create_libdyson_device(self):
return libdyson.get_device(self.serial, self._config_device.credentials,
self._config_device.product_type)
return libdyson.get_device(
self.serial,
self._config_device.credentials,
self._config_device.product_type,
)


class ConnectionManager:
Expand All @@ -113,16 +124,30 @@ class ConnectionManager:
update_fn: A callable taking a name, serial,
devices: a list of config.Device entities
hosts: a dict of serial -> IP address, for direct (non-zeroconf) connections.
reconnect: True if we should automatically reconnect, False otherwise
watchdog_secs: Number of seconds to wait before terminating the process (0 for no watchdog)
"""

def __init__(self, update_fn: Callable[[str, str, bool, bool], None],
devices: List[config.Device], hosts: Dict[str, str], reconnect: bool = True):
def __init__(
self,
update_fn: Callable[[str, str, bool, bool], None],
devices: List[config.Device],
hosts: Dict[str, str],
reconnect: bool = True,
watchdog_secs: int = 300,
):
self._update_fn = update_fn
self._hosts = hosts
self._reconnect = reconnect
self._devices = [DeviceWrapper(d) for d in devices]

logger.info('Starting discovery...')
self._last_update_time = int(time.time())
self._watchdog_secs = watchdog_secs
if watchdog_secs:
logger.info("Starting process watchdog with %d sec timeout", watchdog_secs)
self._start_watchdog()

logger.info("Starting discovery...")
self._discovery = libdyson.discovery.DysonDiscovery()
self._discovery.start_discovery()

Expand All @@ -133,10 +158,34 @@ def shutdown(self) -> None:
"""Disconnects from all devices."""
self._discovery.stop_discovery()

if self._watchdog_timer:
self._watchdog_timer.cancel()

for device in self._devices:
logger.info('Disconnecting from %s (%s)', device.name, device.serial)
logger.info("Disconnecting from %s (%s)", device.name, device.serial)
device.disconnect()

def _start_watchdog(self):
self._watchdog_timer = threading.Timer(
self._watchdog_secs, self._watchdog_callback
)
self._watchdog_timer.start()

def _watchdog_callback(self):
now = int(time.time())
delta = now - self._last_update_time
if delta > self._watchdog_secs:
logger.error(
"Watchdog process abort: last update was %d seconds ago -- process hung?",
delta,
)

# Use os.abort() here to force a crash. sys.exit will raise a SystemExit exception and
# walk the exception handlers, which might take a while.
os.abort()
else:
self._start_watchdog()

def _add_device(self, device: DeviceWrapper, add_listener=True):
"""Adds and connects to a device.
Expand All @@ -154,12 +203,19 @@ def _add_device(self, device: DeviceWrapper, add_listener=True):

manual_ip = self._hosts.get(device.serial.upper())
if manual_ip:
logger.info('Attempting connection to device "%s" (serial=%s) via configured IP %s',
device.name, device.serial, manual_ip)
logger.info(
'Attempting connection to device "%s" (serial=%s) via configured IP %s',
device.name,
device.serial,
manual_ip,
)
device.connect(manual_ip)
else:
logger.info('Attempting to discover device "%s" (serial=%s) via zeroconf',
device.name, device.serial)
logger.info(
'Attempting to discover device "%s" (serial=%s) via zeroconf',
device.name,
device.serial,
)
callback_fn = functools.partial(self._discovery_callback, device)
self._discovery.register_device(device.libdyson, callback_fn)

Expand All @@ -170,14 +226,16 @@ def _discovery_callback(cls, device: DeviceWrapper, address: str):
# When we call connect() on libpurecool or libdyson, that code spawns
# a new thread for MQTT and returns. In other words: we don't need to
# worry about connect() blocking zeroconf here.
logger.info('Discovered %s on %s', device.serial, address)
logger.info("Discovered %s on %s", device.serial, address)
device.connect(address)

def _device_callback(self, device, message):
logger.debug('Received update from %s: %s', device.serial, message)
logger.debug("Received update from %s: %s", device.serial, message)
if not device.is_connected and self._reconnect:
logger.info(
'Device %s is now disconnected, clearing it and re-adding', device.serial)
"Device %s is now disconnected, clearing it and re-adding",
device.serial,
)
device.disconnect()
self._discovery.stop_discovery()
self._discovery.start_discovery()
Expand All @@ -186,6 +244,7 @@ def _device_callback(self, device, message):

is_state = message == libdyson.MessageType.STATE
is_environ = message == libdyson.MessageType.ENVIRONMENTAL
self._update_fn(device.name, device.libdyson, is_state=is_state,
is_environmental=is_environ)

self._last_update_time = int(time.time())
self._update_fn(
device.name, device.libdyson, is_state=is_state, is_environmental=is_environ
)
55 changes: 37 additions & 18 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,54 +28,73 @@ def _sleep_forever() -> None:
def main(argv):
"""Main body of the program."""
parser = argparse.ArgumentParser(prog=argv[0])
parser.add_argument('--port', help='HTTP server port',
type=int, default=8091)
parser.add_argument("--port", help="HTTP server port", type=int, default=8091)
parser.add_argument(
'--config', help='Configuration file (INI file)', default='config.ini')
"--config", help="Configuration file (INI file)", default="config.ini"
)
parser.add_argument(
'--log_level', help='Logging level (DEBUG, INFO, WARNING, ERROR)', type=str, default='INFO')
"--log_level",
help="Logging level (DEBUG, INFO, WARNING, ERROR)",
type=str,
default="INFO",
)
parser.add_argument(
'--include_inactive_devices',
help='Do not use; this flag has no effect and remains for compatibility only',
action='store_true')
"--watchdog_timeout_seconds",
help="Timeout to abort the process if exceeded (0 for no watchdog)",
type=int,
default=300,
)
parser.add_argument(
"--include_inactive_devices",
help="Do not use; this flag has no effect and remains for compatibility only",
action="store_true",
)

args = parser.parse_args()

try:
level = getattr(logging, args.log_level)
except AttributeError:
print(f'Invalid --log_level: {args.log_level}')
print(f"Invalid --log_level: {args.log_level}")
sys.exit(-1)
args = parser.parse_args()

logging.basicConfig(
format='%(asctime)s [%(name)24s %(thread)d] %(levelname)10s %(message)s',
datefmt='%Y/%m/%d %H:%M:%S',
level=level)
format="%(asctime)s [%(name)24s %(thread)d] %(levelname)10s %(message)s",
datefmt="%Y/%m/%d %H:%M:%S",
level=level,
)

logger.info('Starting up on port=%s', args.port)
logger.info("Starting up on port=%s", args.port)

if args.include_inactive_devices:
logger.warning(
'--include_inactive_devices is now inoperative and will be removed in a future release')
"--include_inactive_devices is now inoperative and will be removed in a future release"
)

try:
cfg = config.Config(args.config)
except:
logger.exception('Could not load configuration: %s', args.config)
logger.exception("Could not load configuration: %s", args.config)
sys.exit(-1)

devices = cfg.devices
if len(devices) == 0:
logger.fatal(
'No devices configured; please re-run this program with --create_device_cache.')
"No devices configured; please re-run this program with --create_device_cache."
)
sys.exit(-2)

prometheus_client.start_http_server(args.port)

connect.ConnectionManager(metrics.Metrics().update, devices, cfg.hosts)
connect.ConnectionManager(
metrics.Metrics().update,
devices,
cfg.hosts,
watchdog_secs=args.watchdog_timeout_seconds,
)

_sleep_forever()


if __name__ == '__main__':
if __name__ == "__main__":
main(sys.argv)

0 comments on commit bac75b8

Please sign in to comment.