Skip to content

Commit

Permalink
Add expiration to remove stale metrics (#61), fixes #59
Browse files Browse the repository at this point in the history
Co-authored-by: Jeremy Figgins <[email protected]>
  • Loading branch information
Quantamm and Jeremy Figgins authored Mar 11, 2023
1 parent c7c95d2 commit e601a6d
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 12 deletions.
3 changes: 2 additions & 1 deletion exampleconf/metric_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ metrics:
- name: 'mqtt_example' # Required(unique, if multiple, only last entry is kept)
help: 'MQTT example gauge' # Required
type: 'gauge' # Required ('gauge', 'counter', 'summary' or 'histogram')
expires: 60 # Optional time in seconds after last update to remove metric
#parameters: # Optional parameters for certain metrics
# buckets: # Optional (Passed as 'buckets' argument to Histogram)
# - .1
Expand All @@ -24,4 +25,4 @@ metrics:
regex: '(.*)' # Optional default '(.*)'
target_label: '__topic__' # Required (when label_configs is present and 'action' = 'replace')
replacement: '\1' # Optional default '\1'
action: 'replace' # Optional default 'replace'
action: 'replace' # Optional default 'replace'
56 changes: 45 additions & 11 deletions mqtt_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from yamlreader import yaml_load
import utils.prometheus_additions
import version
import threading

VERSION = version.__version__
SUFFIXES_PER_TYPE = {
Expand All @@ -27,6 +28,7 @@
"histogram": ['sum', 'count', 'bucket'],
"enum": [],
}
METRICS_LOCK = threading.Semaphore()


def _read_config(config_path):
Expand Down Expand Up @@ -308,16 +310,33 @@ def _update_metrics(metrics, msg):
_export_to_prometheus(
derived_metric['name'], derived_metric, derived_labels)

if metric.get('expires'):
if metric.get('expiration_timer'):
metric.get('expiration_timer').cancel()
logging.debug(f"_update_metric Canceled existing timer for {metric.get('name')}")

metric['expiration_timer'] = threading.Timer(metric.get('expires'), _clear_metric, args=(metric, derived_metric))
metric['expiration_timer'].start()
logging.debug(f"_update_metric Set a {metric.get('expires')} second expiration timer for {metric.get('name')}")


def _clear_metric(metric, derived_metric):
with METRICS_LOCK:
metric['prometheus_metric']['parent'].clear()
derived_metric['prometheus_metric']['parent'].clear()
logging.debug(f"_clear_metric cleared metric {metric.get('name')}")


# noinspection PyUnusedLocal
def _on_message(client, userdata, msg):
"""The callback for when a PUBLISH message is received from the server."""
logging.debug(
f'_on_message Msg received on topic: {msg.topic}, Value: {str(msg.payload)}')
with METRICS_LOCK:
"""The callback for when a PUBLISH message is received from the server."""
logging.debug(
f'_on_message Msg received on topic: {msg.topic}, Value: {str(msg.payload)}')

for topic in userdata.keys():
if _topic_matches(topic, msg.topic):
_update_metrics(userdata[topic], msg)
for topic in userdata.keys():
if _topic_matches(topic, msg.topic):
_update_metrics(userdata[topic], msg)


def _mqtt_init(mqtt_config, metrics):
Expand Down Expand Up @@ -358,7 +377,7 @@ def _export_to_prometheus(name, metric, labels):
valid_types = metric_wrappers.keys()
if metric['type'] not in valid_types:
logging.error(
f"Metric type: {metric['type']}, is not a valid metric type. Must be one of: {valid_types} - ingnoring"
f"Metric type: {metric['type']}, is not a valid metric type. Must be one of: {valid_types} - ignoring"
)
return

Expand Down Expand Up @@ -430,6 +449,10 @@ def __init__(self, name, help_text, label_names, *args, **kwargs) -> None:
self.metric = prometheus.Gauge(
name, help_text, list(label_names)
)

def clear(self):
self.metric.clear()

def update(self, label_values, value):
child = self.metric.labels(*label_values)
child.set(value)
Expand All @@ -446,15 +469,14 @@ def __init__(self, name, help_text, label_names, *args, **kwargs) -> None:
name, help_text, list(label_names)
)

def clear(self):
self.metric.clear()

def update(self, label_values, value):
child = self.metric.labels(*label_values)
child.inc(value)
return child

class HistogramWrapper():
"""
Wrapper to provide generic interface to Summary metric
"""

class CounterAbsoluteWrapper():
"""
Expand All @@ -466,6 +488,9 @@ def __init__(self, name, help_text, label_names, *args, **kwargs) -> None:
name, help_text, list(label_names)
)

def clear(self):
self.metric.clear()

def update(self, label_values, value):
child = self.metric.labels(*label_values)
child.set(value)
Expand All @@ -482,6 +507,9 @@ def __init__(self, name, help_text, label_names, *args, **kwargs) -> None:
name, help_text, list(label_names)
)

def clear(self):
self.metric.clear()

def update(self, label_values, value):
child = self.metric.labels(*label_values)
child.observe(value)
Expand All @@ -505,6 +533,9 @@ def __init__(self, name, help_text, label_names, *args, **kwargs) -> None:
name, help_text, list(label_names), **params
)

def clear(self):
self.metric.clear()

def update(self, label_values, value):
child = self.metric.labels(*label_values)
child.observe(value)
Expand All @@ -521,6 +552,9 @@ def __init__(self, name, help_text, label_names, *args, **kwargs) -> None:
name, help_text, list(label_names), **params
)

def clear(self):
self.metric.clear()

def update(self, label_values, value):
child = self.metric.labels(*label_values)
child.state(value)
Expand Down

0 comments on commit e601a6d

Please sign in to comment.