From 6a2038bda2e9308af1bfbc186d40fb73935722a7 Mon Sep 17 00:00:00 2001 From: Thomas LEVEIL Date: Thu, 13 Apr 2017 16:15:23 +0200 Subject: [PATCH] support monitoring multiple clusters example configuration: ModulePath "/usr/lib/collectd/" Import "elasticsearch_collectd" Port 9200 Port 10200 --- elasticsearch_collectd.py | 841 ++++++++++++++++++-------------------- 1 file changed, 408 insertions(+), 433 deletions(-) diff --git a/elasticsearch_collectd.py b/elasticsearch_collectd.py index facfc5b..2f076c7 100755 --- a/elasticsearch_collectd.py +++ b/elasticsearch_collectd.py @@ -22,47 +22,7 @@ import ssl PREFIX = "elasticsearch" -ES_HOST = "localhost" -ES_PORT = 9200 -ES_URL_SCHEME = "http" -ES_CLUSTER = None -ES_USERNAME = "" -ES_PASSWORD = "" -ES_VERSION = None -ES_MASTER_ELIGIBLE = None - -ENABLE_INDEX_STATS = True -ENABLE_CLUSTER_STATS = True - -ES_NODE_URL = "" -ES_CLUSTER_URL = "" -ES_INDEX_URL = "" -ES_INDEX = [] - -Stat = collections.namedtuple('Stat', ('type', 'path')) - -NODE_STATS_CUR = {} -INDEX_STATS_CUR = {} -CLUSTER_STATS_CUR = {} - -COLLECTION_INTERVAL = 10 - -INDEX_INTERVAL = 300 - -INDEX_SKIP = 0 - -SKIP_COUNT = 0 - -CLUSTER_STATUS = {'green': 0, 'yellow': 1, 'red': 2} - -DETAILED_METRICS = True -INDEX_SUMMARY_ONLY = False -THREAD_POOLS = [] -CONFIGURED_THREAD_POOLS = set() - -ES_CURRENT_MASTER = False -MASTER_ONLY = False -NODE_ID = None +CLUSTERS = [] DEFAULTS = set([ # AUTOMATICALLY GENERATED METRIC NAMES @@ -114,6 +74,10 @@ "indices.store.throttle-time", ]) +CLUSTER_STATUS = {'green': 0, 'yellow': 1, 'red': 2} + +Stat = collections.namedtuple('Stat', ('type', 'path')) + # DICT: ElasticSearch 1.0.0 NODE_STATS = { # STORE @@ -624,7 +588,8 @@ def read_callback(): If this method throws, the plugin will be skipped for an increasing amount of time until it returns normally again""" log.info('Read callback called') - fetch_stats() + for c in CLUSTERS: + c.fetch_stats() def str_to_bool(value): @@ -645,136 +610,86 @@ def str_to_bool(value): def configure_callback(conf): """called by collectd to configure the plugin. This is called only once""" - global ES_HOST, ES_PORT, ES_NODE_URL, ES_URL_SCHEME, ES_VERSION, \ - ES_CLUSTER, ES_INDEX, ENABLE_INDEX_STATS, ENABLE_CLUSTER_STATS, \ - DETAILED_METRICS, COLLECTION_INTERVAL, INDEX_INTERVAL, \ - CONFIGURED_THREAD_POOLS, DEFAULTS, ES_USERNAME, ES_PASSWORD, \ - MASTER_ONLY, INDEX_SUMMARY_ONLY + c = Cluster() for node in conf.children: if node.key == 'Host': - ES_HOST = node.values[0] + c.es_host = node.values[0] elif node.key == 'Port': - ES_PORT = int(node.values[0]) + c.es_port = int(node.values[0]) elif node.key == 'Protocol': - ES_URL_SCHEME = node.values[0] + c.es_url_scheme = node.values[0] log.notice( - 'overriding elasticsearch url scheme to %s' % ES_URL_SCHEME) + 'overriding elasticsearch url scheme to %s' % c.es_url_scheme) elif node.key == 'Username': - ES_USERNAME = node.values[0] + c.es_username = node.values[0] elif node.key == 'Password': - ES_PASSWORD = node.values[0] + c.es_password = node.values[0] elif node.key == 'Verbose': handle.verbose = str_to_bool(node.values[0]) elif node.key == 'Cluster': - ES_CLUSTER = node.values[0] + c.es_cluster = node.values[0] log.notice( - 'overriding elasticsearch cluster name to %s' % ES_CLUSTER) + 'overriding elasticsearch cluster name to %s' % c.es_cluster) elif node.key == 'Version': - ES_VERSION = node.values[0] + c.es_version = node.values[0] log.notice( - 'overriding elasticsearch version number to %s' % ES_VERSION) + 'overriding elasticsearch version number to %s' % c.es_version) elif node.key == 'Indexes': - ES_INDEX = node.values + c.es_index = node.values elif node.key == 'EnableIndexStats': - ENABLE_INDEX_STATS = str_to_bool(node.values[0]) + c.enable_index_stats = str_to_bool(node.values[0]) elif node.key == 'EnableClusterHealth': - ENABLE_CLUSTER_STATS = str_to_bool(node.values[0]) + c.enable_cluster_stats = str_to_bool(node.values[0]) elif node.key == 'Interval': - COLLECTION_INTERVAL = int(node.values[0]) + c.collection_interval = int(node.values[0]) elif node.key == 'IndexInterval': - INDEX_INTERVAL = int(node.values[0]) + c.index_interval = int(node.values[0]) elif node.key == "DetailedMetrics": - DETAILED_METRICS = str_to_bool(node.values[0]) + c.detailed_metrics = str_to_bool(node.values[0]) elif node.key == "IndexSummaryOnly": - INDEX_SUMMARY_ONLY = str_to_bool(node.values[0]) + c.index_summary_only = str_to_bool(node.values[0]) elif node.key == "ThreadPools": for thread_pool in node.values: - CONFIGURED_THREAD_POOLS.add(thread_pool) + c.configured_thread_pools.add(thread_pool) # Include required thread pools (search and index) - CONFIGURED_THREAD_POOLS.add('search') - CONFIGURED_THREAD_POOLS.add('index') + c.configured_thread_pools.add('search') + c.configured_thread_pools.add('index') elif node.key == "AdditionalMetrics": for metric_name in node.values: - DEFAULTS.add(metric_name) + c.defaults.add(metric_name) elif node.key == "IndexStatsMasterOnly": - MASTER_ONLY = str_to_bool(node.values[0]) + c.master_only = str_to_bool(node.values[0]) else: log.warning('Unknown config key: %s.' % node.key) - log.info('HOST: %s' % ES_HOST) - log.info('PORT: %s' % ES_PORT) - log.info('ES_INDEX: %s' % ES_INDEX) - log.info('ENABLE_INDEX_STATS: %s' % ENABLE_INDEX_STATS) - log.info('ENABLE_CLUSTER_STATS: %s' % ENABLE_CLUSTER_STATS) - log.info('COLLECTION_INTERVAL: %s' % COLLECTION_INTERVAL) - log.info('INDEX_INTERVAL: %s' % INDEX_INTERVAL) - log.info('DETAILED_METRICS: %s' % DETAILED_METRICS) - log.info('INDEX_SUMMARY_ONLY: %s' % INDEX_SUMMARY_ONLY) - log.info('CONFIGURED_THREAD_POOLS: %s' % CONFIGURED_THREAD_POOLS) - log.info('METRICS TO COLLECT: %s' % DEFAULTS) - log.info('MASTER_ONLY: %s' % MASTER_ONLY) + log.info('host: %s' % c.es_host) + log.info('port: %s' % c.es_port) + log.info('es_index: %s' % c.es_index) + log.info('enable_index_stats: %s' % c.enable_index_stats) + log.info('enable_cluster_stats: %s' % c.enable_cluster_stats) + log.info('self.collection_interval: %s' % c.collection_interval) + log.info('index_interval: %s' % c.index_interval) + log.info('detailed_metrics: %s' % c.detailed_metrics) + log.info('index_summary_only: %s' % c.index_summary_only) + log.info('configured_thread_pools: %s' % c.configured_thread_pools) + log.info('metrics to collect: %s' % c.defaults) + log.info('master_only: %s' % c.master_only) # determine node information - load_es_info() + c.load_es_info() # initialize stats map based on ES version - init_stats() + c.init_stats() + + # add the cluster config to the list of clusters to monitor + CLUSTERS.append(c) # register the read callback now that we have the complete config - collectd.register_read(read_callback, interval=COLLECTION_INTERVAL) + collectd.register_read(read_callback, interval=c.collection_interval) log.notice( 'started elasticsearch plugin with interval = %d seconds' % - COLLECTION_INTERVAL) - - -def sanitize_intervals(): - """Sanitizes the index interval to be greater or equal to and divisible by - the collection interval - """ - global INDEX_INTERVAL, COLLECTION_INTERVAL, INDEX_SKIP, SKIP_COUNT - # Sanitize the COLLECTION_INTERVAL and INDEX_INTERVAL - # ? INDEX_INTERVAL > COLLECTION_INTERVAL: - # check if INDEX_INTERVAL is divisible by COLLECTION_INTERVAL - if INDEX_INTERVAL > COLLECTION_INTERVAL: - # ? INDEX_INTERVAL % COLLECTION_INTERVAL > 0: - # round the INDEX_INTERVAL up to a compatible value - if INDEX_INTERVAL % COLLECTION_INTERVAL > 0: - INDEX_INTERVAL = INDEX_INTERVAL + COLLECTION_INTERVAL - \ - (INDEX_INTERVAL % COLLECTION_INTERVAL) - log.warning('The Elasticsearch Index Interval must be \ -greater or equal to than and divisible by the collection Interval. The \ -Elasticsearch Index Interval has been rounded to: %s' % INDEX_INTERVAL) - - # ? INDEX_INTERVAL < COLLECTION_INTERVAL : - # Set INDEX_INTERVAL = COLLECTION_INTERVAL - elif INDEX_INTERVAL < COLLECTION_INTERVAL: - INDEX_INTERVAL = COLLECTION_INTERVAL - log.warning('WARN: The Elasticsearch Index Interval must be greater \ -or equal to than and divisible by the collection Interval. The Elasticsearch \ -Index Interval has been rounded to: %s' % INDEX_INTERVAL) - - # INDEX_SKIP = INDEX_INTERVAL / COLLECTION_INTERVAL - INDEX_SKIP = (INDEX_INTERVAL / COLLECTION_INTERVAL) - - # ENSURE INDEX IS COLLECTED ON THE FIRST COLLECTION - SKIP_COUNT = INDEX_SKIP - - -def remove_deprecated_node_stats(): - """Remove deprecated node stats from the list of stats to collect""" - global DEPRECATED_NODE_STATS, ES_VERSION, NODE_STATS_CUR - NODE_STATS_CUR = remove_deprecated_elements(DEPRECATED_NODE_STATS, - NODE_STATS_CUR, - ES_VERSION) - - -def remove_deprecated_threads(): - """Remove deprecated thread_pools from the list of stats to collect""" - global DEPRECATED_THREAD_POOLS, ES_VERSION, THREAD_POOLS - THREAD_POOLS = remove_deprecated_elements(DEPRECATED_THREAD_POOLS, - THREAD_POOLS, - ES_VERSION) + c.collection_interval) def remove_deprecated_elements(deprecated, elements, version): @@ -789,9 +704,9 @@ def remove_deprecated_elements(deprecated, elements, version): # prior to the current version for dep in deprecated: if (major >= dep['major']) \ - or (major == dep['major'] and minor >= dep['minor']) \ - or (major == dep['major'] and minor == dep['minor'] and - revision >= dep['revision']): + or (major == dep['major'] and minor >= dep['minor']) \ + or (major == dep['major'] and minor == dep['minor'] and + revision >= dep['revision']): if type(elements) is list: for key in dep['keys']: if key in elements: @@ -803,271 +718,363 @@ def remove_deprecated_elements(deprecated, elements, version): return elements -# helper methods -def init_stats(): - global ES_HOST, ES_PORT, ES_NODE_URL, ES_URL_SCHEME, ES_CLUSTER_URL, \ - ES_INDEX_URL, ES_VERSION, NODE_STATS_CUR, INDEX_STATS_CUR, \ - CLUSTER_STATS_CUR, ENABLE_INDEX_STATS, ENABLE_CLUSTER_STATS, \ - INDEX_INTERVAL, INDEX_SKIP, COLLECTION_INTERVAL, SKIP_COUNT, \ - DEPRECATED_NODE_STATS, THREAD_POOLS, CONFIGURED_THREAD_POOLS - - sanitize_intervals() - - ES_NODE_URL = ES_URL_SCHEME + "://" + ES_HOST + ":" + str(ES_PORT) + \ - "/_nodes/_local/stats/transport,http,process,jvm,indices,thread_pool" - NODE_STATS_CUR = dict(NODE_STATS.items()) - INDEX_STATS_CUR = dict(INDEX_STATS.items()) - if not ES_VERSION.startswith("1."): - NODE_STATS_CUR.update(NODE_STATS_ES_2) +class Cluster(object): - remove_deprecated_node_stats() - - if ES_VERSION.startswith("1.1") or ES_VERSION.startswith("1.2"): - INDEX_STATS_CUR.update(INDEX_STATS_ES_1_1) - else: - # 1.3 and higher - INDEX_STATS_CUR.update(INDEX_STATS_ES_1_1) - INDEX_STATS_CUR.update(INDEX_STATS_ES_1_3) - - # version agnostic settings - if not ES_INDEX: - # get all index stats - ES_INDEX_URL = ES_URL_SCHEME + "://" + ES_HOST + \ - ":" + str(ES_PORT) + "/_all/_stats" - else: - ES_INDEX_URL = ES_URL_SCHEME + "://" + ES_HOST + ":" + \ - str(ES_PORT) + "/" + ",".join(ES_INDEX) + "/_stats" - - # common thread pools for all ES versions - thread_pools = ['generic', 'index', 'get', 'snapshot', 'bulk', 'warmer', - 'flush', 'search', 'refresh'] - - # Add the 1.0 metrics - if not ES_VERSION.startswith("0."): - thread_pools.extend(['merge', 'optimize']) - - # Add the 2.0 metrics - if not ES_VERSION.startswith("1."): - thread_pools.extend(['suggest', 'percolate', 'management', 'listener', - 'fetch_shard_store', 'fetch_shard_started']) + def __init__(self): + self.collection_interval = 10 + self.es_host = "localhost" + self.es_port = 9200 + self.es_url_scheme = "http" + self.es_username = "" + self.es_password = "" + self.es_cluster = None + self.es_version = None + self.es_index = [] + self.enable_index_stats = True + self.enable_cluster_stats = True + self.index_interval = 300 + self.detailed_metrics = True + self.configured_thread_pools = set() + self.defaults = DEFAULTS + self.master_only = False + self.index_summary_only = False + + self.node_stats_cur = {} + self.index_stats_cur = {} + self.cluster_stats_cur = {} + + self.index_skip = 0 + self.skip_count = 0 + + self.es_master_eligible = None + + self.es_node_url = "" + self.es_cluster_url = "" + self.es_index_url = "" + + self.thread_pools = [] + + self.es_current_master = False + self.node_id = None + + def sanatize_intervals(self): + """Sanitizes the index interval to be greater or equal to and divisible by + the collection interval + """ + # Sanitize the self.collection_interval and self.index_interval + # ? self.index_interval > self.collection_interval: + # check if self.index_interval is divisible by self.collection_interval + if self.index_interval > self.collection_interval: + # ? self.index_interval % self.collection_interval > 0: + # round the self.index_interval up to a compatible value + if self.index_interval % self.collection_interval > 0: + self.index_interval = self.index_interval + self.collection_interval - \ + (self.index_interval % self.collection_interval) + log.warning('The Elasticsearch Index Interval must be \ + greater or equal to than and divisible by the collection Interval. The \ + Elasticsearch Index Interval has been rounded to: %s' % self.index_interval) + + # ? self.index_interval < self.collection_interval : + # Set self.index_interval = self.collection_interval + elif self.index_interval < self.collection_interval: + self.index_interval = self.collection_interval + log.warning('WARN: The Elasticsearch Index Interval must be greater \ + or equal to than and divisible by the collection Interval. The Elasticsearch \ + Index Interval has been rounded to: %s' % self.index_interval) + + # self.index_skip = self.index_interval / self.collection_interval + self.index_skip = (self.index_interval / self.collection_interval) + + # ENSURE INDEX IS COLLECTED ON THE FIRST COLLECTION + self.skip_count = self.index_skip + + def remove_deprecated_node_stats(self): + """Remove deprecated node stats from the list of stats to collect""" + self.node_stats_cur = remove_deprecated_elements(DEPRECATED_NODE_STATS, + self.node_stats_cur, + self.es_version) + + def remove_deprecated_threads(self): + """Remove deprecated thread_pools from the list of stats to collect""" + self.thread_pools = remove_deprecated_elements(DEPRECATED_THREAD_POOLS, + self.thread_pools, + self.es_version) + + # helper methods + def init_stats(self): + self.sanatize_intervals() + + self.es_node_url = self.es_url_scheme + "://" + self.es_host + ":" + str(self.es_port) + \ + "/_nodes/_local/stats/transport,http,process,jvm,indices,thread_pool" + self.node_stats_cur = dict(NODE_STATS.items()) + self.index_stats_cur = dict(INDEX_STATS.items()) + if not self.es_version.startswith("1."): + self.node_stats_cur.update(NODE_STATS_ES_2) + + self.remove_deprecated_node_stats() + + if self.es_version.startswith("1.1") or self.es_version.startswith("1.2"): + self.index_stats_cur.update(INDEX_STATS_ES_1_1) + else: + # 1.3 and higher + self.index_stats_cur.update(INDEX_STATS_ES_1_1) + self.index_stats_cur.update(INDEX_STATS_ES_1_3) + + # version agnostic settings + if not self.es_index: + # get all index stats + self.es_index_url = self.es_url_scheme + "://" + self.es_host + \ + ":" + str(self.es_port) + "/_all/_stats" + else: + self.es_index_url = self.es_url_scheme + "://" + self.es_host + ":" + \ + str(self.es_port) + "/" + ",".join(self.es_index) + "/_stats" - # Add the 2.1 metrics - if not ES_VERSION.startswith("1.") and not ES_VERSION.startswith("2.0"): - thread_pools.extend(['force_merge']) + # common thread pools for all ES versions + thread_pools = ['generic', 'index', 'get', 'snapshot', 'bulk', 'warmer', + 'flush', 'search', 'refresh'] - # Legacy support for old configurations without Thread Pools configuration - if len(CONFIGURED_THREAD_POOLS) == 0: - THREAD_POOLS = list(CONFIGURED_THREAD_POOLS) - else: - # Filter out the thread pools that aren't specified by user - THREAD_POOLS = filter(lambda pool: pool in CONFIGURED_THREAD_POOLS, - thread_pools) + # Add the 1.0 metrics + if not self.es_version.startswith("0."): + thread_pools.extend(['merge', 'optimize']) - remove_deprecated_threads() + # Add the 2.0 metrics + if not self.es_version.startswith("1."): + thread_pools.extend(['suggest', 'percolate', 'management', 'listener', + 'fetch_shard_store', 'fetch_shard_started']) - ES_CLUSTER_URL = ES_URL_SCHEME + "://" + ES_HOST + \ - ":" + str(ES_PORT) + "/_cluster/health" + # Add the 2.1 metrics + if not self.es_version.startswith("1.") and not self.es_version.startswith("2.0"): + thread_pools.extend(['force_merge']) - log.notice('Initialized with version=%s, host=%s, port=%s, url=%s' % - (ES_VERSION, ES_HOST, ES_PORT, ES_NODE_URL)) + # Legacy support for old configurations without Thread Pools configuration + if len(self.configured_thread_pools) == 0: + self.thread_pools = list(self.configured_thread_pools) + else: + # Filter out the thread pools that aren't specified by user + self.thread_pools = filter(lambda pool: pool in self.configured_thread_pools, + thread_pools) + self.remove_deprecated_threads() -# FUNCTION: Collect node stats from JSON result -def lookup_node_stat(stat, json): - node = json['nodes'].keys()[0] - val = dig_it_up(json, NODE_STATS_CUR[stat].path % node) + self.es_cluster_url = self.es_url_scheme + "://" + self.es_host + \ + ":" + str(self.es_port) + "/_cluster/health" - # Check to make sure we have a valid result - # dig_it_up returns False if no match found - if not isinstance(val, bool): - return int(val) - else: - return None + log.notice('Initialized with version=%s, host=%s, port=%s, url=%s' % + (self.es_version, self.es_host, self.es_port, self.es_node_url)) + # FUNCTION: Collect node stats from JSON result + def lookup_node_stat(self, stat, json): + node = json['nodes'].keys()[0] + val = dig_it_up(json, self.node_stats_cur[stat].path % node) -def fetch_stats(): - """ - fetches all required stats from ElasticSearch. This method also sets - ES_CLUSTER - """ - global ES_CLUSTER, SKIP_COUNT, INDEX_SKIP, THREAD_POOLS - - node_json_stats = fetch_url(ES_NODE_URL) - if node_json_stats: - if ES_CLUSTER is None: - ES_CLUSTER = node_json_stats['cluster_name'] + # Check to make sure we have a valid result + # dig_it_up returns False if no match found + if not isinstance(val, bool): + return int(val) else: - log.info('Configured with cluster_json_stats=%s' % ES_CLUSTER) - log.info('Parsing node_json_stats') - parse_node_stats(node_json_stats, NODE_STATS_CUR) - log.info('Parsing thread pool stats') - parse_thread_pool_stats(node_json_stats, THREAD_POOLS) - - # check the current master - detect_es_master() - - # load cluster and index stats only on master eligible nodes, this - # avoids collecting too many metrics if the cluster has a lot of nodes - if ENABLE_CLUSTER_STATS and ES_MASTER_ELIGIBLE: - cluster_json_stats = fetch_url(ES_CLUSTER_URL) - log.info('Parsing cluster stats') - parse_cluster_stats(cluster_json_stats, CLUSTER_STATS) - - if (ENABLE_INDEX_STATS and ES_MASTER_ELIGIBLE and - SKIP_COUNT >= INDEX_SKIP) \ - and ((MASTER_ONLY and ES_CURRENT_MASTER) - or (not MASTER_ONLY)): - # Reset skip count - SKIP_COUNT = 0 - indices = fetch_url(ES_INDEX_URL) - if indices: - if INDEX_SUMMARY_ONLY: - log.info('Parsing index stats for _all summary') - parse_index_stats(indices['_all'], '_all') - else: - indexes_json_stats = indices['indices'] - for index_name in indexes_json_stats.keys(): - log.info('Parsing index stats for index: %s' % index_name) - parse_index_stats(indexes_json_stats[index_name], - index_name) - # Increment skip count - SKIP_COUNT += 1 + return None + def fetch_stats(self): + """ + fetches all required stats from ElasticSearch. This method also sets + self.es_cluster + """ -def fetch_url(url): - response = None - try: - log.info('Fetching api information from: %s' % url) - request = urllib2.Request(url) - if ES_USERNAME: - authheader = base64.encodestring('%s:%s' % - (ES_USERNAME, ES_PASSWORD) - ).replace('\n', '') - request.add_header("Authorization", "Basic %s" % authheader) - ctx = None - if ES_URL_SCHEME == "https": - ctx = ssl._create_unverified_context() - response = urllib2.urlopen(request, context=ctx, timeout=10) - log.info('Raw api response: %s' % response) - return json.load(response) - except (urllib2.URLError, urllib2.HTTPError), e: - log.error('Error connecting to %s - %r : %s' % - (url, e, e)) - return None - finally: - if response is not None: - response.close() - - -def load_es_info(): - global ES_VERSION, ES_CLUSTER, ES_MASTER_ELIGIBLE, NODE_ID - - json = fetch_url(ES_URL_SCHEME + "://" + ES_HOST + ":" + str(ES_PORT) + - "/_nodes/_local") - if json is None: - # assume some sane defaults - if ES_VERSION is None: - ES_VERSION = "1.0.0" - if ES_CLUSTER is None: - ES_CLUSTER = "elasticsearch" - ES_MASTER_ELIGIBLE = True - log.warning('Unable to determine node \ -information, defaulting to version %s, cluster %s and master %s' % - (ES_VERSION, ES_CLUSTER, ES_MASTER_ELIGIBLE)) - return - - # Identify the current node - NODE_ID = json['nodes'].keys()[0] - log.notice('current node id: %s' % NODE_ID) - - cluster_name = json['cluster_name'] - # we should have only one entry with the current node information - node_info = json['nodes'].itervalues().next() - version = node_info['version'] - # a node is master eligible by default unless it's configured otherwise - master_eligible = True - if 'node' in node_info['settings'] and \ - 'master' in node_info['settings']['node']: - master_eligible = node_info['settings']['node']['master'] == 'true' - - # update global settings - ES_MASTER_ELIGIBLE = master_eligible - if ES_VERSION is None: - ES_VERSION = version - if ES_CLUSTER is None: - ES_CLUSTER = cluster_name - - log.notice('version: %s, cluster: %s, master eligible: %s' % - (ES_VERSION, ES_CLUSTER, ES_MASTER_ELIGIBLE)) - - -def detect_es_master(): - """Determines if this is the current master. This method sets - ES_CURRENT_MASTER""" - global ES_CURRENT_MASTER - # determine current master - cluster_state = fetch_url(ES_URL_SCHEME + "://" + ES_HOST + ":" - + str(ES_PORT) + "/_cluster/state/master_node") - if ES_CURRENT_MASTER is False and cluster_state['master_node'] == NODE_ID: - ES_CURRENT_MASTER = True - log.notice('current master: %s' % ES_CURRENT_MASTER) - elif ES_CURRENT_MASTER is True and cluster_state['master_node'] != NODE_ID: - ES_CURRENT_MASTER = False - log.notice('current master: %s' % ES_CURRENT_MASTER) - else: - log.debug('current master: %s' % ES_CURRENT_MASTER) - - -def parse_node_stats(json, stats): - """Parse node stats response from ElasticSearch""" - for name, key in stats.iteritems(): - if DETAILED_METRICS is True or name in DEFAULTS: - result = lookup_node_stat(name, json) - dispatch_stat(result, name, key) - - -def parse_thread_pool_stats(json, stats): - """Parse thread pool stats response from ElasticSearch""" - for pool in THREAD_POOLS: - for metric_type, value in THREAD_POOL_METRICS.iteritems(): - for attr in value: - name = 'thread_pool.{0}'.format(attr) - key = Stat(metric_type, 'nodes.%s.thread_pool.{0}.{1}'. - format(pool, attr)) - if DETAILED_METRICS is True or name in DEFAULTS: - node = json['nodes'].keys()[0] - result = dig_it_up(json, key.path % node) - # Check to make sure we have a valid result - # dig_it_up returns False if no match found - if not isinstance(result, bool): - result = int(result) - else: - result = None - - dispatch_stat(result, name, key, {'thread_pool': pool}) - - -def parse_cluster_stats(json, stats): - """Parse cluster stats response from ElasticSearch""" - # convert the status color into a number - json['status'] = CLUSTER_STATUS[json['status']] - for name, key in stats.iteritems(): - if DETAILED_METRICS is True or name in DEFAULTS: - result = dig_it_up(json, key.path) - dispatch_stat(result, name, key) - - -def parse_index_stats(json, index_name): - """Parse index stats response from ElasticSearch""" - for name, key in INDEX_STATS_CUR.iteritems(): - # filter default metrics - if DETAILED_METRICS is True or \ - name.replace("[index={index_name}]", "") in DEFAULTS: - result = dig_it_up(json, key.path) - # update the index name in the type_instance to include - # the index as a dimensions - name = name.format(index_name=sanitize_type_instance(index_name)) - dispatch_stat(result, name, key) + node_json_stats = self.fetch_url(self.es_node_url) + if node_json_stats: + if self.es_cluster is None: + self.es_cluster = node_json_stats['cluster_name'] + else: + log.info('Configured with cluster_json_stats=%s' % self.es_cluster) + log.info('Parsing node_json_stats') + self.parse_node_stats(node_json_stats, self.node_stats_cur) + log.info('Parsing thread pool stats') + self.parse_thread_pool_stats(node_json_stats, self.thread_pools) + + # check the current master + self.detect_es_master() + + # load cluster and index stats only on master eligible nodes, this + # avoids collecting too many metrics if the cluster has a lot of nodes + if self.enable_cluster_stats and self.es_master_eligible: + cluster_json_stats = self.fetch_url(self.es_cluster_url) + log.info('Parsing cluster stats') + self.parse_cluster_stats(cluster_json_stats, CLUSTER_STATS) + + if (self.enable_index_stats and self.es_master_eligible and + self.skip_count >= self.index_skip) \ + and ((self.master_only and self.es_current_master) + or (not self.master_only)): + # Reset skip count + self.skip_count = 0 + indices = self.fetch_url(self.es_index_url) + if indices: + if self.index_summary_only: + log.info('Parsing index stats for _all summary') + self.parse_index_stats(indices['_all'], '_all') + else: + indexes_json_stats = indices['indices'] + for index_name in indexes_json_stats.keys(): + log.info('Parsing index stats for index: %s' % index_name) + self.parse_index_stats(indexes_json_stats[index_name], index_name) + # Increment skip count + self.skip_count += 1 + + def fetch_url(self, url): + response = None + try: + log.info('Fetching api information from: %s' % url) + request = urllib2.Request(url) + if self.es_username: + authheader = base64.encodestring('%s:%s' % + (self.es_username, self.es_password) + ).replace('\n', '') + request.add_header("Authorization", "Basic %s" % authheader) + ctx = None + if self.es_url_scheme == "https": + ctx = ssl._create_unverified_context() + response = urllib2.urlopen(request, context=ctx, timeout=10) + log.info('Raw api response: %s' % response) + return json.load(response) + except (urllib2.URLError, urllib2.HTTPError), e: + log.error('Error connecting to %s - %r : %s' % + (url, e, e)) + return None + finally: + if response is not None: + response.close() + + def load_es_info(self): + json = self.fetch_url(self.es_url_scheme + "://" + self.es_host + ":" + str(self.es_port) + + "/_nodes/_local") + if json is None: + # assume some sane defaults + if self.es_version is None: + self.es_version = "1.0.0" + if self.es_cluster is None: + self.es_cluster = "elasticsearch" + self.es_master_eligible = True + log.warning('Unable to determine node \ + information, defaulting to version %s, cluster %s and master %s' % + (self.es_version, self.es_cluster, self.es_master_eligible)) + return + + # Identify the current node + self.node_id = json['nodes'].keys()[0] + log.notice('current node id: %s' % self.node_id) + + cluster_name = json['cluster_name'] + # we should have only one entry with the current node information + node_info = json['nodes'].itervalues().next() + version = node_info['version'] + # a node is master eligible by default unless it's configured otherwise + master_eligible = True + if 'node' in node_info['settings'] and \ + 'master' in node_info['settings']['node']: + master_eligible = node_info['settings']['node']['master'] == 'true' + + # update settings + self.es_master_eligible = master_eligible + if self.es_version is None: + self.es_version = version + if self.es_cluster is None: + self.es_cluster = cluster_name + + log.notice('version: %s, cluster: %s, master eligible: %s' % + (self.es_version, self.es_cluster, self.es_master_eligible)) + + def detect_es_master(self): + """Determines if this is the current master. This method sets + self.es_current_master""" + # determine current master + cluster_state = self.fetch_url(self.es_url_scheme + "://" + self.es_host + ":" + str(self.es_port) + + "/_cluster/state/master_node") + if self.es_current_master is False and cluster_state['master_node'] == self.node_id: + self.es_current_master = True + log.notice('current master: %s' % self.es_current_master) + elif self.es_current_master is True and cluster_state['master_node'] != self.node_id: + self.es_current_master = False + log.notice('current master: %s' % self.es_current_master) + else: + log.debug('current master: %s' % self.es_current_master) + + def parse_node_stats(self, json, stats): + """Parse node stats response from ElasticSearch""" + for name, key in stats.iteritems(): + if self.detailed_metrics is True or name in self.defaults: + result = self.lookup_node_stat(name, json) + self.dispatch_stat(result, name, key) + + def parse_thread_pool_stats(self, json, stats): + """Parse thread pool stats response from ElasticSearch""" + for pool in self.thread_pools: + for metric_type, value in THREAD_POOL_METRICS.iteritems(): + for attr in value: + name = 'thread_pool.{0}'.format(attr) + key = Stat(metric_type, 'nodes.%s.thread_pool.{0}.{1}'. + format(pool, attr)) + if self.detailed_metrics is True or name in self.defaults: + node = json['nodes'].keys()[0] + result = dig_it_up(json, key.path % node) + # Check to make sure we have a valid result + # dig_it_up returns False if no match found + if not isinstance(result, bool): + result = int(result) + else: + result = None + + self.dispatch_stat(result, name, key, {'thread_pool': pool}) + + def parse_cluster_stats(self, json, stats): + """Parse cluster stats response from ElasticSearch""" + # convert the status color into a number + json['status'] = CLUSTER_STATUS[json['status']] + for name, key in stats.iteritems(): + if self.detailed_metrics is True or name in self.defaults: + result = dig_it_up(json, key.path) + self.dispatch_stat(result, name, key) + + def parse_index_stats(self, json, index_name): + """Parse index stats response from ElasticSearch""" + for name, key in self.index_stats_cur.iteritems(): + # filter default metrics + if self.detailed_metrics is True or \ + name.replace("[index={index_name}]", "") in self.defaults: + result = dig_it_up(json, key.path) + # update the index name in the type_instance to include + # the index as a dimensions + name = name.format(index_name=sanitize_type_instance(index_name)) + self.dispatch_stat(result, name, key) + + def dispatch_stat(self, result, name, key, dimensions=None): + """Read a key from info response data and dispatch a value""" + log.info(('Parameters to be emitted:\n name: {n}\n key: {k}' + '\n dimensions: {d}\n result: {r}').format(n=name, + k=key, + d=dimensions, + r=result)) + if result is None: + log.warning('Value not found for %s' % name) + return + estype = key.type + value = int(result) + log.info('Sending value[%s]: %s=%s' % (estype, name, value)) + + val = collectd.Values(plugin='elasticsearch') + val.plugin_instance = self.es_cluster + + # If dimensions are provided, format them and append + # them to the plugin_instance + if dimensions: + val.plugin_instance += '[{dims}]'.format(dims=','.join(['='.join(d) + for d in dimensions.items()])) + + val.type = estype + val.type_instance = name + val.values = [value] + val.meta = {'0': True} + log.info('Emitting value: %s' % val) + val.dispatch() def sanitize_type_instance(index_name): @@ -1081,37 +1088,6 @@ def sanitize_type_instance(index_name): return ascii_index_name.replace('/', '_') -def dispatch_stat(result, name, key, dimensions=None): - """Read a key from info response data and dispatch a value""" - log.info(('Parameters to be emitted:\n name: {n}\n key: {k}' - '\n dimensions: {d}\n result: {r}').format(n=name, - k=key, - d=dimensions, - r=result)) - if result is None: - log.warning('Value not found for %s' % name) - return - estype = key.type - value = int(result) - log.info('Sending value[%s]: %s=%s' % (estype, name, value)) - - val = collectd.Values(plugin='elasticsearch') - val.plugin_instance = ES_CLUSTER - - # If dimensions are provided, format them and append - # them to the plugin_instance - if dimensions: - val.plugin_instance += '[{dims}]'.format(dims=','.join(['='.join(d) - for d in dimensions.items()])) - - val.type = estype - val.type_instance = name - val.values = [value] - val.meta = {'0': True} - log.info('Emitting value: %s' % val) - val.dispatch() - - def dig_it_up(obj, path): try: if type(path) in (str, unicode): @@ -1249,36 +1225,35 @@ def notice(self, msg): log.addHandler(handle) -def configure_test(): +def configure_test(cluster): """Configure the plugin for testing""" - global CONFIGURED_THREAD_POOLS, DETAILED_METRICS, INDEX_INTERVAL, \ - ENABLE_INDEX_STATS, ENABLE_CLUSTER_STATS, ES_MASTER_ELIGIBLE # Ensure all possible threadpools are eligible for collection - CONFIGURED_THREAD_POOLS = set(['generic', 'index', 'get', 'snapshot', + cluster.configured_thread_pools = set(['generic', 'index', 'get', 'snapshot', 'bulk', 'warmer', 'flush', 'search', 'refresh', 'suggest', 'percolate', 'management', 'listener', 'fetch_shard_store', 'fetch_shard_started', 'force_merge', 'merge', 'optimize', ]) - DETAILED_METRICS = True - INDEX_INTERVAL = 10 - ENABLE_INDEX_STATS = True - ENABLE_CLUSTER_STATS = True - ES_MASTER_ELIGIBLE = True + cluster.detailed_metrics = True + cluster.index_interval = 10 + cluster.enable_index_stats = True + cluster.enable_cluster_stats = True + cluster.es_master_eligible = True if __name__ == '__main__': import sys + c = Cluster() # allow user to override ES host name for easier testing if len(sys.argv) > 1: - ES_HOST = sys.argv[1] + c.es_host = sys.argv[1] handle.verbose = True + configure_test(c) collectd = CollectdMock() - configure_test() - load_es_info() - init_stats() - fetch_stats() + c.load_es_info() + c.init_stats() + c.fetch_stats() else: import collectd collectd.register_config(configure_callback)