Stream connectors
Centreon Broker provides a stream connector. If no suitable connector is found among those provided, then this stream connector is sure to fulfill your needs. Its principle is to expose a part of the Centreon Broker API through the Lua interpreter, and the user simply inputs their requirements.
The exposed Centreon Broker
Here is the minimal acceptable Lua code for operation as stream connector:
function init(conf)
end
function write(d)
return true
end
We recommend placing Lua scripts in the /usr/share/centreon-broker/lua
directory. If this directory does not exist, you can create it. Just make sure the
directory is accessible to the centreon-broker user. If a stream connector
is composed of several files (a main script and a module, for example), you can
put them in that directory. If a dynamic library (\*.so
file) is used by
a Lua script, put it in the /usr/share/centreon-broker/lua/lib
directory.
When Centreon Broker starts, it initializes all the configured connectors.
For the stream connector, it loads the Lua script, checks its syntax and
verifies that the init()
and write()
functions exist.
Centreon Broker also checks if a filter(category, element)
function exists.
Let's focus on those functions. The init
function is called when the
connector is initialized. The argument provided to this function is a Lua
table containing information given by the user in the Centreon web output broker
configuration interface. For example, if an IP address is provided with the
name address and the value 192.168.1.18, then this information will be
accessible through conf["address"]
.
The write()
function is called each time an event is received from a poller
through the broker. This event is configured to be sent to this connector.
This function needs one argument, which is the event translated as a Lua table.
The write()
function must return a boolean that is true if the events are processed
and false otherwise.
If this function does not return a boolean, Broker will raise an error.
The Broker Lua SDK
To simplify the Lua developer's job, several objects are proposed and made directly available to the script.
The broker_log object
broker_log:set_parameters(level, filename)
allows the user to set a log level and a file name. The level is an integer from 1 to 3, from the most important to least important. The file name must contain the full path. In addition, the file must be accessible to centreon-broker. If this method is not called, then logs will be written in the centreon broker logs.broker_log:info(level, content)
writes an information log if the given level is less than or equal to the one configured. The content is the text to write in the logs.broker_log:warning(level, content)
works likelog_info
but writes a warning.broker_log:error(level, content)
works likelog_info
but writes an error.
Here is an example:
function init(conf)
-- Here, we allow all logs with level 3 and set the output to
-- /tmp/test.log.
broker_log:set_parameters(3, "/tmp/test.log")
end
function write(d)
-- For each entry, we write the key and the value.
for k,v in pairs(d) do
broker_log:info(3, k .. " => " .. tostring(v))
end
return true
end
Here, when the init
function is executed, the broker_log object is
configured with a max level of 3 and an output file /tmp/test.log.
Then on each write()
call, events received are logged as
info. We get a result like this:
Fri Jan 26 08:31:49 2018: INFO: service_id => 21
Fri Jan 26 08:31:49 2018: INFO: type => 0
Fri Jan 26 08:31:49 2018: INFO: host_id => 13
Fri Jan 26 08:31:49 2018: INFO: element => 1
Fri Jan 26 08:31:49 2018: INFO: comment_data =>
To use a method in Lua, the separator between the object and the method is
:
. broker_log is an object since it contains information such as the max level or the destination file.
The TCP broker socket
A basic TCP socket is available in the Broker SDK. A simple example is the following:
local socket = broker_tcp_socket.new()
socket:connect("10.40.1.2", 9002)
socket:write(message)
answer = socket:read()
socket:close()
- In this example, the first call is broker_tcp_socket.new(). It creates a new socket by calling the function new() stored in the table broker_tcp_socket.
- The next step is a call to the connect method, which needs two arguments: a string given an ip or a host name to connect to and the port to use.
- The next step is a call to the write method, which writes a string into the socket. The method call is finished when the entire message is sent.
- The read method gets a string as its result.
- When the user has finished using the socket, they can close it with the close() method.
This socket object also provides a get_state() method that returns a string:
- unconnected: the socket is not connected
- hostLookup: the socket is performing a host name lookup
- connecting: the socket has started establishing a connection
- connected: a connection is established
- closing: the socket is about to close (data may still be waiting to be written).
The broker table
Several functions are available in this table. These functions are not methods; broker is just a table containing them. It includes the following:
json_encode(object)
, which converts a Lua object into json. The json is returned as a string by the function.json_decode(json)
, which converts a json string into Lua. The object is directly returned by the method. A second value is also returned. it is only defined when an error occurred and contains a string describing the error.parse_perfdata(str)
, which takes a string containing perfdata as argument. A second boolean argument is available. If it is true, the returned table is larger and gives all the details on the metrics as well as the warning and critical thresholds. On success it returns a table containing the values retrieved from the perfdata. On failure it returns a nil object and an error description string.- ``url_encode(text)
, which
converts the string text into an url encoded string. stat(filename)
, which calls the systemstat
function on the file. On success we get a table containing various information about the file (see example below). Otherwise, this table isnil
and a second return value is given containing an error message.md5(str)
, which computes the md5 of the stringstr
and returns it as a string.bbdo_version()
, which returns the BBDO version configured in Centreon Broker.
local obj = {
a = 1,
b = 2,
c = { 'aa', 'bb', 'cc' }
}
local json = broker.json_encode(obj)
print(json)
returns
{ "a": 1, "b": 2, "c": [ 'aa', 'bb', 'cc' ] }
A second example is the following:
local json = { "a": 1, "b": 2, "c": [ 'aa', 'bb', 'cc' ] }
local obj, err = broker.json_decode(json)
for i,v in pairs(obj) do
print(i .. " => " .. tostring(v))
end
should return something like this:
a => 1
b => 2
c => table: 0x12ef67b5
In this case (no error), err
is nil
.
It is also easy to access each field of the object; for example:
obj['a']
gives 1, or obj['c'][1]
gives aa, or obj.b
gives 2.
Here is an example showing the possibilities of the parse_perfdata
function.
local perf, err_str = broker.parse_perfdata(" 'one value'=2s;3;5;0;9 'a b c'=3.14KB;0.8;1;0;10")
if perf then
for i,v in pairs(perf) do
print(i .. " => " .. tostring(v))
end
else
print("parse_perfdata error: " .. err_str)
end
should return something like this:
a b c => 3.14
one value => 2
If now, we call the same function with true as second argument:
local perf, err_str = broker.parse_perfdata("pl=45%;40;80;0;100", true)
if perf then
print("Content of 'pl'")
for i,v in pairs(perf['pl']) do
print(i .. " => " .. tostring(v))
end
else
print("parse_perfdata error: " .. err_str)
end
should return something like this:
Content of 'pl'
value => 45
uom => %
min => 0
max => 100
warning_low => 0
warning_high => 40
warning_mode => false
critical_low => 0
critical_high => 80
critical_mode => false
Be careful: keys are not sorted in hash tables, so if you make such a program, you will probably not have data in the same order.
local str = broker.url_encode("La leçon du château de l'araignée")
print(str)
should return something like this:
La%20le%C3%A7on%20du%20ch%C3%A2teau%20de%20l%27araign%C3%A9e
local s, err = broker.stat("filename")
for i,v in pairs(perf['pl']) do
print(i .. " => " .. tostring(v))
end
should return something like this:
uid=>1000
gif=>1000
size=>279
ctime=>1587641144
mtime=>1587641144
atime=>1587641144
If an error occurs, s
is nil
whereas err
contains a string
containing an error message.
local info = broker.md5('Hello World!')
print(info)
should return a string of the form:
ed076287532e86365e841e92bfc50d8c
local v = broker.bbdo_version()
print(v)
should return a string of the form:
2.0.0
The broker_cache object
This object provides several methods to access the cache. Among data, we can get hostnames, etc...
The functions described here need the cache to be filled. It is important for that to enable the NEB events, otherwise those functions will just return
nil
. The cache is filled when an engine restarts.
The available methods are:
get_ba(ba_id)
, which gets ba information from its id. This function returns a table if found or nil otherwise.- get_bv(bv_id)`, which gets bv informations from its id. This function returns a table if found or nil otherwise.
get_bvs(ba_id)
, which gets all the bv containing the ba of id ba_id. This function returns an array of bv ids, potentially empty if no bv are found.get_check_command(host_id[, service_id])
, which gets the check command configured in the host or service. The service_id is optional; if given we want the check command from a service, otherwise we want the check command from a host. If nothing is found in the cache, nil is returned.get_hostgroup_name(id)
, which gets from the cache the host group name of the given id. This function returns a string or nil otherwise.get_hostgroups(host_id)
, which gets the list of host groups containing the host corresponding to host_id. The return value is an array of objects, each one containing two fields, group_id and group_name.get_hostname(id)
, which gets from the cache the host name corresponding to the given host id. This function returns a string with the host name or nil otherwise.get_index_mapping(index_id)
, which gets from the cache the index mapping object of the given index id. The result is a table containing three keys,index_id
,host_id
andservice_id
.get_instance_name(instance_id)
, which gets from the cache the instance name corresponding to the instance id.get_metric_mapping(metric_id)
, which gets from the cache the metric mapping object of the given metric id. The result is a table containing two keys:metric_id
andindex_id
.get_service_description(host_id,service_id)
, which gets from the cache the service description of the given pair host_id / service_id. This function returns a string or nil otherwise.get_servicegroup_name(id)
, which gets from the cache the service group name* of the given id. This function returns a string or nil otherwise.get_servicegroups(host_id, service_id)
, which gets the list of service groups containing the service corresponding to the pair host_id / service_id. The return value is an array of objects, each one containing two fields: group_id and group_name.get_notes(host_id[,service_id])
, which gets the notes configured in the host or service. The service_id is optional. If given, we want notes from a service, otherwise we want notes from a host. If the object is not found in the cache, nil is returned.get_notes_url(host_id[, service_id])
, which gets the notes url configured in the host or service. The service_id is optional. If given, we want notes url from a service, otherwise we want it from a host. If the object is not found in the cache, nil is returned.get_action_url(host_id)
, which gets the action url configured in the host or service. The service_id is optional. If given, we want action url from a service, otherwise we want it from a host. If the object is not found in the cache, nil is returned.get_severity(host_id[,service_id])
, which gets the severity of a host or a service. If you only provide the host_id, we suppose you want to get a host severity. If a host or a service does not have any severity, the function returns a nil value.
The init() function
This function must not be defined as local
, otherwise it will not be
detected by centreon broker.
Imagine we have made a configuration like this:
with two custom entries:
- an elastic-address string with
172.17.0.1
as content. - an elastic-port number with 9200 as content.
Then, the init()
function has access to them like this:
function init(conf)
local my_address = conf['elastic-address']
local port = conf['elastic-port']
end
The write() function
This function must not be defined as local
, otherwise it will not be
seen by the broker.
The only argument given to the write()
function is an event. It is given
with the same data as the ones we can see in Centreon Broker.
To classify the event, we have two data, which are category
and element
.
Those two pieces of information are integers. If we concatenate those two numbers,
we obtain a longer integer equal to the event type
also available in the
event as _type
.
int | short | short |
---|---|---|
_type = | category | elem |
Sometimes, you might want the hostname corresponding to an event, but you only get
the host_id
. It is possible to get the hostname using the
broker_cache:get_hostname(id)
method.
For example:
function write(d)
if d.category == 3 and d.element == 1 then
local hostname = broker_cache:get_hostname(d.host_id)
if hostname then
broker_log:info("host name "
.. hostname .. " corresponds to host id " .. d.host_id)
else
-- Unable to get the name, we wait for the next time, wishing
-- the cache will contain the name. And so return 0
return true
end
end
-- Here, we can add the event processing...
return true
end
The write
function return value is a boolean. While this value is false,
Broker keeps the sent events in memory and if needed in retention. When you
are sure all events are processed, the idea is that write
returns true
and then Broker frees the events stack.
Behind this, it is possible to avoid committing events one by one.
The write
function can store them in a stack and return false, and when
a given limit is reached, it can send all of them to their destination and
return true.
The filter() function
The function must not be defined as local
, otherwise it will not be
detected by Centreon Broker.
It takes two parameters into account: category
and element
. We have
already seen both in the previous section. The category is an integer from 1 to 7,
or the value 65535. The element
parameter gives details of the event, for example,
for the category NEB, elements are Acknowledgement, Comment, etc.
given as integers.
The flush() function
When the Broker queue size reaches the maximum allowed size, it continues to fill this queue into a file and does not send any more events to streams. While events in the queue are not acknowledged, streams will not receive any more events.
In several cases, this can lead to issues. The idea is that the stream has kept
events in memory waiting for other events to send them to a database. But
Broker queue is full and Broker does not call the stream's write
function
anymore, since it writes events directly to its retention files waiting for an
acknowledgement from the stream that will not arrive because write
is not called.
The solution to fix this lock in Broker is a flush
function called regularly
by Broker that just asks the stream to flush its data. This function returns
a boolean that is true if the stream managed to flush its queue. Once Broker
receives information of a successful flush, it can newly call the stream
write
function.
function write(d)
if d.category ~= 3 or d.element ~= 1 then
return false
end
-- We don't send data each time. We fill a queue and when it is full we
-- send its content with the flush() function.
queue[#queue + 1] = serialize(d)
local retval = false
if #queue > max_queue_size then
retval = flush()
end
-- Here, we return retval that is false except when flush has sent the
-- queue content.
return retval
end
function flush()
-- if the queue is empty, nothing to do and we can tell Broker to empty its
-- retention
if #queue == 0 then
return true
end
-- Here, we should write some code to send events stored in the queue.
--
-- ...
if success then
return true
else
return false
end
end
For details on types, categories and their id, see The BBDO protocol