-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(stream) add InfluxDB 2 support #38
Comments
Hi, yes why not. Do you have any pointer to the v2 InfluxDB documentation please? |
Hello Using librairies (sc_events), and a event (category=3 and element=1) I'm able to retrieve the information. I'm pretty sure that I need to do something else Here the sample of an event : { ["sc_logger"] = { ["logfile"] = /var/log/centreon-broker/influxdb-metrics-apiv2.log,["severity"] = 3,} ,["params"] = { ["host_severity_operator"] = >=,["accepted_authors"] = ,["__internal_ts_last_flush"] = 1624450359,["accepted_servicegroups"] = ,["max_buffer_size"] = 1,["ack_service_status"] = 0,1,2,3,["influxdb2_config"] = { } ,["skip_anon_events"] = 1,["state_type_mapping"] = { [0] = SOFT,[1] = HARD,} ,["service_status"] = 0,1,2,3,["hard_only"] = 1,["category_mapping"] = { ["correlation"] = 4,["bbdo"] = 2,["extcmd"] = 7,["bam"] = 6,["neb"] = 1,["dumper"] = 5,["storage"] = 3,} ,["local_time_diff_from_utc"] = 3600.0,["host_status"] = 0,1,2,["accepted_bvs"] = ,["element_mapping"] = { [1] = { ["log_entry"] = 17,["host_dependency"] = 9,["host"] = 12,["host_check"] = 8,["instance_configuration"] = 25,["service_group"] = 21,["flapping_status"] = 7,["comment"] = 2,["host_parent"] = 13,["service_status"] = 24,["acknowledgement"] = 1,["service"] = 23,["service_group_member"] = 22,["service_dependency"] = 20,["event_handler"] = 6,["service_check"] = 19,["custom_variable_status"] = 4,["host_group_member"] = 11,["custom_variable"] = 3,["host_status"] = 14,["module"] = 18,["host_group"] = 10,["instance"] = 15,["instance_status"] = 16,["downtime"] = 5,} ,[6] = { ["dimension_ba_timeperiod_relation"] = 14,["meta_service_status"] = 3,["dimension_timeperiod"] = 13,["ba_status"] = 1,["dimension_bv_event"] = 10,["dimension_truncate_table_signal"] = 11,["bam_rebuild"] = 12,["ba_event"] = 4,["inherited_downtime"] = 17,["dimension_timeperiod_exclusion"] = 16,["dimension_ba_event"] = 7,["dimension_kpi_event"] = 8,["kpi_status"] = 2,["ba_duration_event"] = 6,["dimension_timeperiod_exception"] = 15,["kpi_event"] = 5,["dimension_ba_bv_relation_event"] = 9,} ,[3] = { ["remove_graph"] = 3,["metric"] = 1,["rebuild"] = 2,["metric_mapping"] = 6,["status"] = 4,["index_mapping"] = 5,} ,} ,["timestamp_conversion_format"] = %Y-%m-%d %X,["influx_org"] = Serca,["ba_status"] = 0,1,2,["dt_host_status"] = 0,1,2,["acknowledged"] = 0,["influx_token"] = EXRksyBOp4HrpPqekVleVNPZTk80MaEN2gwe2sED2vzIwoYiJNVr5jcdSl8H11YEEUo0p8dQdVrSZZ0EqFb7mg==,["influx_bucket"] = Initial_Bucket,["skip_nil_id"] = 1,["accepted_pollers"] = ,["validatedEvents"] = { } ,["status_mapping"] = { [1] = { [24] = { [0] = OK,[1] = WARNING,[2] = CRITICAL,[3] = UNKNOWN,} ,[5] = { [2] = { [0] = UP,[1] = DOWN,[2] = UNREACHABLE,} ,[1] = { [0] = OK,[1] = WARNING,[2] = CRITICAL,[3] = UNKNOWN,} ,} ,[14] = { [0] = UP,[1] = DOWN,[2] = UNREACHABLE,} ,} ,[6] = { [1] = { [0] = OK,[1] = WARNING,[2] = CRITICAL,} ,} ,[3] = { } ,} ,["max_buffer_age"] = 5,["ack_host_status"] = 0,1,2,["enable_service_status_dedup"] = 0,["enable_host_status_dedup"] = 0,["influx_host"] = localhost,["accepted_categories"] = storage,["accepted_elements"] = metric,["in_downtime"] = 0,["service_severity_operator"] = >=,["max_stored_events"] = 10,["accepted_hostgroups"] = ,["dt_service_status"] = 0,1,2,3,} ,["sc_broker"] = { ["logger"] = { ["logfile"] = /var/log/centreon-broker/influxdb-metrics-apiv2.log,["severity"] = 3,} ,} ,["event"] = { ["rrd_len"] = 15552000,["interval"] = 120,["ctime"] = 1624450376,["element"] = 1,["category"] = 3,["service_id"] = 202,["value_type"] = 0,["name"] = Failures,["host_id"] = 27,["cache"] = { } ,["is_for_rebuild"] = false,["value"] = 0.0,["_type"] = 196609,["metric_id"] = 752,} ,["sc_common"] = { ["logger"] = { ["logfile"] = /var/log/centreon-broker/influxdb-metrics-apiv2.log,["severity"] = 3,} ,} ,} |
Hello, here are a few information you need to know. You are talking about category 3 and element 1. This means that you want to send a metric event from the storage category. Here is the bad news. This is deprecated (and by deprecated, here is the code about that : --- is_valid_storage: DEPRECATED method, use NEB category to get metric data instead
-- @return true (boolean)
function ScEvent:is_valid_storage_event()
return true
end If you want to get your metrics you'll need to use category 1, element 24 (service_status event from the neb cateogry) in this neb event you have the perfdata information (here are all the information that are in a service_status event) Knowing that, you can use the broker.parse_perfdata() method documented here local perf, err_str = broker.parse_perfdata("pl=45%;40;80;0;100", true)
if perf then
print("Content of 'pl'")
for i,v in pairs(perf['pl']) do
print(i .. " => " .. tostring(v))
end
else
print("parse_perfdata error: " .. err_str)
end will output Content of 'pl'
value => 45
uom => %
min => 0
max => 100
warning_low => 0
warning_high => 40
warning_mode => false
critical_low => 0
critical_high => 80
critical_mode => false based on all the above information, you can now create your stream connector like an event stream connector. You just need to format your data in order to send the metrics data instead of the status information and such things. Have a nice day |
Working LUA Script #!/usr/bin/lua
local sc_common = require("centreon-stream-connectors-lib.sc_common")
local sc_logger = require("centreon-stream-connectors-lib.sc_logger")
local sc_broker = require("centreon-stream-connectors-lib.sc_broker")
local sc_event = require("centreon-stream-connectors-lib.sc_event")
local sc_params = require("centreon-stream-connectors-lib.sc_params")
local EventQueue = {}
function dump(o)
if type(o) == 'table' then
local s = '{ '
for k,v in pairs(o) do
if type(k) ~= 'number' then k = '"'..k..'"' end
s = s .. '['..k..'] = ' .. dump(v) .. ','
end
return s .. '} '
else
return tostring(o)
end
end
function EventQueue.new(params)
local self = {}
-- initiate EventQueue variables
self.events = {}
self.fail = false
-- set up log configuration
local logfile = params.logfile or "/var/log/centreon-broker/influxdb-metrics-apiv2.log"
local log_level = params.log_level or 3
-- initiate mandatory objects
self.sc_logger = sc_logger.new(logfile, log_level)
self.sc_common = sc_common.new(self.sc_logger)
self.sc_broker = sc_broker.new(self.sc_logger)
self.sc_params = sc_params.new(self.sc_common, self.sc_logger)
-- initiate parameters dedicated to this stream connector
self.sc_params.params.influx_host = params.influx_host
self.sc_params.params.influx_org = params.influx_org
self.sc_params.params.influx_bucket = params.influx_bucket
self.sc_params.params.influx_token = params.influx_token
-- overriding default parameters for this stream connector
params.accepted_categories = "neb"
params.accepted_elements = "service_status"
-- checking mandatory parameters and setting a fail flag
if not params.influx_host then
self.sc_logger:error("influx_host is a mandatory parameter")
self.fail = true
end
if not params.influx_org then
self.sc_logger:error("influx_org is a mandatory parameter")
self.fail = true
end
if not params.influx_bucket then
self.sc_logger:error("influx_bucket is a mandatory parameter")
self.fail = true
end
if not params.influx_token then
self.sc_logger:error("influx_token is a mandatory parameter")
self.fail = true
end
-- apply users params and check syntax of standard ones
self.sc_params:param_override(params)
self.sc_params:check_params()
-- return EventQueue object
setmetatable(self, { __index = EventQueue })
return self
end
--------------------------------------------------------------------------------
-- EventQueue:send_data, send data to external tool
-- @return (boolean)
--------------------------------------------------------------------------------
function EventQueue:send_data ()
local http = require"socket.http"
local ltn12 = require"ltn12"
local respbody = {}
local data = self.sc_event.event.formated_event
local reqbody = "centreon_metrics,host="
local temp_reqbody
local influx_url = "http://" .. self.sc_params.params.influx_host .. ":8086/api/v2/write?org="
influx_url = influx_url .. self.sc_params.params.influx_org .. "&bucket=" .. self.sc_params.params.influx_bucket
local auth = '["Authorization"] = "Token ' .. self.sc_params.params.influx_token .. '"'
--self.sc_logger:debug("EventQueue:send_data: Write to InfluxDB " .. tostring(influx_url))
--self.sc_logger:debug("EventQueue:send_data: Write to InfluxDB " .. tostring(auth))
reqbody = reqbody .. data.my_host .. ",description=" .. data.my_description
-- Warning if the description contain a space we need to use gsub(" ", "\\ ") to add a backslash otherwise influxdb will failed to insert
self.sc_logger:debug("EventQueue:send_data: reqbody " .. tostring(reqbody))
-- self.sc_logger:debug("EventQueue:send_data: perf " .. tostring(dump(data.my_perf)))
for k,v in pairs(data.my_perf) do
temp_reqbody=reqbody .. " " .. k .. "="
for i,j in pairs(v)
do
if i == "value" then
temp_reqbody=temp_reqbody .. j .. " " .. os.time() .. "000000000 "
end
end
self.sc_logger:debug("EventQueue:send_data: perf " .. tostring(dump(k)) .. " " .. tostring(temp_reqbody))
local result, respcode, respheaders, respstatus = http.request {
method = "POST",
url = influx_url,
source = ltn12.source.string(temp_reqbody),
headers = {
["Authorization"] = "Token xxxxxxxxxx",
-- auth ,
["Content-Type"] = "text/plain; charset=utf-8",
["content-length"] = tostring(#temp_reqbody)
},
sink = ltn12.sink.table(respbody)
}
-- -- get body as string by concatenating table filled by sink
-- self.sc_logger:debug("EventQueue:send_data: url result " .. tostring(dump(respbody)))
self.sc_logger:debug("EventQueue:send_data: url result " .. tostring(dump(result)))
-- respbody = table.concat(respbody)
end
return true
end
--------------------------------------------------------------------------------
-- EventQueue:flush, flush stored events
-- Called when the max number of events or the max age are reached
-- @return (boolean)
--------------------------------------------------------------------------------
function EventQueue:flush ()
self.sc_logger:debug("EventQueue:flush: Concatenating all the events as one string")
-- send stored events
retval = self:send_data()
-- reset stored events list
self.events = {}
-- and update the timestamp
self.sc_params.params.__internal_ts_last_flush = os.time()
return retval
end
--------------------------------------------------------------------------------
-- EventQueue:format_event, build your own table with the desired information
-- @return true (boolean)
--------------------------------------------------------------------------------
function EventQueue:format_event()
-- starting to handle shared information between host and service
self.sc_event.event.formated_event = {
-- name of host has been stored in a cache table when calling is_valid_even()
my_host = self.sc_event.event.cache.host.name,
my_description = self.sc_event.event.cache.service.description,
my_perf = {}
}
local perf, err_str = broker.parse_perfdata(self.sc_event.event.perfdata, true)
if perf then
self.sc_event.event.formated_event.my_perf = perf
end
self:add()
return true
end
--------------------------------------------------------------------------------
-- EventQueue:add, add an event to the sending queue
--------------------------------------------------------------------------------
function EventQueue:add ()
-- store event in self.events list
self.events[#self.events + 1] = self.sc_event.event.formated_event
end
local queue
function init(params)
queue = EventQueue.new(params)
end
function write(event)
-- skip event if a mandatory parameter is missing
if queue.fail then
queue.sc_logger:error("Skipping event because a mandatory parameter is not set")
return true
end
-- initiate event object
queue.sc_event = sc_event.new(event, queue.sc_params.params, queue.sc_common, queue.sc_logger, queue.sc_broker)
-- drop event if wrong category
if not queue.sc_event:is_valid_category() then
return true
end
-- drop event if wrong element
if not queue.sc_event:is_valid_element() then
return true
end
-- First, are there some old events waiting in the flush queue ?
if (#queue.events > 0 and os.time() - queue.sc_params.params.__internal_ts_last_flush > queue.sc_params.params.max_buffer_age) then
queue.sc_logger:debug("write: Queue max age (" .. os.time() - queue.sc_params.params.__internal_ts_last_flush .. "/" .. queue.sc_params.params.max_buffer_age .. ") is reached, flushing data")
queue:flush()
end
-- Then we check that the event queue is not already full
if (#queue.events >= queue.sc_params.params.max_buffer_size) then
queue.sc_logger:debug("write: Queue max size (" .. #queue.events .. "/" .. queue.sc_params.params.max_buffer_size .. ") is reached BEFORE APPENDING AN EVENT, trying to flush data before appending more events, after 1 second pause.")
os.execute("sleep " .. tonumber(1))
queue:flush()
end
-- drop event if it is not validated
if queue.sc_event:is_valid_event() then
queue:format_event()
else
return true
end
-- if queue.sc_event:is_valid_event() then
-- queue:format_event()
-- else
-- queue.sc_logger:debug("Not a valid event")
-- return true
-- end
-- Then we check whether it is time to send the events to the receiver and flush
if (#queue.events >= queue.sc_params.params.max_buffer_size) then
queue.sc_logger:debug("write: Queue max size (" .. #queue.events .. "/" .. queue.sc_params.params.max_buffer_size .. ") is reached, flushing data")
queue:flush()
end
return true
end
|
Hello, thanks for that, i've edited your comment to put it in a code section and I've removed the token that was in the code. In the comming days or month I'll put it in our official repo. with some changes to make it uses our latest features. is there a company that must be credited in addition to you ? |
:) No, put uakm2201 and I'll be happy |
Here is the 2 (new) |
As stated elsewhere, would be nice to keep the various upcoming InfluxDB Stream Connectors compatible with the current one. |
The current one is not working with influx v2, that 's because I push this one ... |
I've pushed, sorry |
Thanks for the files.
Thanks in advance. |
Hi,
Is there a plan to take support for InfluxDB 2 ?
API v1 run but InfluxDB ask to switch to the v2.
Thanks.
The text was updated successfully, but these errors were encountered: