forked from seanrees/prometheus-dyson
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
executable file
·234 lines (188 loc) · 8.2 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
#!/usr/bin/python3
"""Exports Dyson Pure Hot+Cool (DysonLink) statistics as Prometheus metrics."""
import argparse
import functools
import logging
import sys
import time
import threading
from typing import Callable, Dict, List
import prometheus_client
import libdyson
import libdyson.dyson_device
import libdyson.exceptions
import config
import metrics
class DeviceWrapper:
"""Wrapper for a config.Device.
This class has two main purposes:
1) To associate a device name & libdyson.DysonFanDevice together
2) To start background thread that asks the DysonFanDevice for updated
environmental data on a periodic basis.
Args:
device: a config.Device to wrap
environment_refresh_secs: how frequently to refresh environmental data
"""
def __init__(self, device: config.Device, environment_refresh_secs=30):
self._config_device = device
self._environment_refresh_secs = environment_refresh_secs
self.libdyson = self._create_libdyson_device()
@property
def name(self) -> str:
"""Returns device name, e.g; 'Living Room'."""
return self._config_device.name
@property
def serial(self) -> str:
"""Returns device serial number, e.g; AB1-XX-1234ABCD."""
return self._config_device.serial
@property
def is_connected(self) -> bool:
"""True if we're connected to the Dyson device."""
return self.libdyson.is_connected
def connect(self, host: str, retry_on_timeout_secs: int = 30):
"""Connect to the device and start the environmental monitoring timer.
Args:
host: ip or hostname of Dyson device
retry_on_timeout_secs: number of seconds to wait in between retries. this will block the running thread.
"""
if self.is_connected:
logging.info(
'Already connected to %s (%s); no need to reconnect.', host, self.serial)
else:
try:
self.libdyson.connect(host)
self._refresh_timer()
except libdyson.exceptions.DysonConnectTimeout:
logging.error(
'Timeout connecting to %s (%s); will retry', host, self.serial)
threading.Timer(retry_on_timeout_secs,
self.connect, args=[host]).start()
def disconnect(self):
"""Disconnect from the Dyson device."""
self.libdyson.disconnect()
def _refresh_timer(self):
timer = threading.Timer(self._environment_refresh_secs,
self._timer_callback)
timer.start()
def _timer_callback(self):
if self.is_connected:
logging.debug(
'Requesting updated environmental data from %s', self.serial)
self.libdyson.request_environmental_data()
self._refresh_timer()
else:
logging.debug('Device %s is disconnected.', self.serial)
def _create_libdyson_device(self):
return libdyson.get_device(self.serial, self._config_device.credentials,
self._config_device.product_type)
class ConnectionManager:
"""Manages connections via manual IP or via libdyson Discovery.
Args:
update_fn: A callable taking a name, serial,
devices: a list of config.Device entities
hosts: a dict of serial -> IP address, for direct (non-zeroconf) connections.
"""
def __init__(self, update_fn: Callable[[str, str, bool, bool], None],
devices: List[config.Device], hosts: Dict[str, str]):
self._update_fn = update_fn
self._hosts = hosts
logging.info('Starting discovery...')
self._discovery = libdyson.discovery.DysonDiscovery()
self._discovery.start_discovery()
for device in devices:
self._add_device(DeviceWrapper(device))
def _add_device(self, device: DeviceWrapper, add_listener=True):
"""Adds and connects to a device.
This will connect directly if the host is specified in hosts at
initialisation, otherwise we will attempt discovery via zeroconf.
Args:
device: a config.Device to add
add_listener: if True, will add callback listeners. Set to False if
add_device() has been called on this device already.
"""
if add_listener:
callback_fn = functools.partial(self._device_callback, device)
device.libdyson.add_message_listener(callback_fn)
manual_ip = self._hosts.get(device.serial.upper())
if manual_ip:
logging.info('Attempting connection to device "%s" (serial=%s) via configured IP %s',
device.name, device.serial, manual_ip)
device.connect(manual_ip)
else:
logging.info('Attempting to discover device "%s" (serial=%s) via zeroconf',
device.name, device.serial)
callback_fn = functools.partial(self._discovery_callback, device)
self._discovery.register_device(device.libdyson, callback_fn)
@classmethod
def _discovery_callback(cls, device: DeviceWrapper, address: str):
# A note on concurrency: used with DysonDiscovery, this will be called
# back in a separate thread created by the underlying zeroconf library.
# When we call connect() on libpurecool or libdyson, that code spawns
# a new thread for MQTT and returns. In other words: we don't need to
# worry about connect() blocking zeroconf here.
logging.info('Discovered %s on %s', device.serial, address)
device.connect(address)
def _device_callback(self, device, message):
logging.debug('Received update from %s: %s', device.serial, message)
if not device.is_connected:
logging.info(
'Device %s is now disconnected, clearing it and re-adding', device.serial)
device.disconnect()
self._discovery.stop_discovery()
self._discovery.start_discovery()
self._add_device(device, add_listener=False)
return
is_state = message == libdyson.MessageType.STATE
is_environ = message == libdyson.MessageType.ENVIRONMENTAL
self._update_fn(device.name, device.libdyson, is_state=is_state,
is_environmental=is_environ)
def _sleep_forever() -> None:
"""Sleeps the calling thread until a keyboard interrupt occurs."""
while True:
try:
time.sleep(1)
except KeyboardInterrupt:
break
def main(argv):
"""Main body of the program."""
parser = argparse.ArgumentParser(prog=argv[0])
parser.add_argument('--port', help='HTTP server port',
type=int, default=8091)
parser.add_argument(
'--config', help='Configuration file (INI file)', default='config.ini')
parser.add_argument(
'--log_level', help='Logging level (DEBUG, INFO, WARNING, ERROR)', type=str, default='INFO')
parser.add_argument(
'--include_inactive_devices',
help='Do not use; this flag has no effect and remains for compatibility only',
action='store_true')
args = parser.parse_args()
try:
level = getattr(logging, args.log_level)
except AttributeError:
print(f'Invalid --log_level: {args.log_level}')
sys.exit(-1)
args = parser.parse_args()
logging.basicConfig(
format='%(asctime)s [%(thread)d] %(levelname)10s %(message)s',
datefmt='%Y/%m/%d %H:%M:%S',
level=level)
logging.info('Starting up on port=%s', args.port)
if args.include_inactive_devices:
logging.warning(
'--include_inactive_devices is now inoperative and will be removed in a future release')
try:
cfg = config.Config(args.config)
except:
logging.exception('Could not load configuration: %s', args.config)
sys.exit(-1)
devices = cfg.devices
if len(devices) == 0:
logging.fatal(
'No devices configured; please re-run this program with --create_device_cache.')
sys.exit(-2)
prometheus_client.start_http_server(args.port)
ConnectionManager(metrics.Metrics().update, devices, cfg.hosts)
_sleep_forever()
if __name__ == '__main__':
main(sys.argv)