diff --git a/broker/unified_sql/inc/com/centreon/broker/unified_sql/stream.hh b/broker/unified_sql/inc/com/centreon/broker/unified_sql/stream.hh index 00594da1a64..02b5667ab43 100644 --- a/broker/unified_sql/inc/com/centreon/broker/unified_sql/stream.hh +++ b/broker/unified_sql/inc/com/centreon/broker/unified_sql/stream.hh @@ -1,20 +1,21 @@ -/* -** Copyright 2019-2022 Centreon -** -** Licensed under the Apache License, Version 2.0 (the "License"); -** you may not use this file except in compliance with the License. -** You may obtain a copy of the License at -** -** http://www.apache.org/licenses/LICENSE-2.0 -** -** Unless required by applicable law or agreed to in writing, software -** distributed under the License is distributed on an "AS IS" BASIS, -** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -** See the License for the specific language governing permissions and -** limitations under the License. -** -** For more information : contact@centreon.com -*/ +/** + * Copyright 2019-2024 Centreon + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * For more information : contact@centreon.com + */ + #ifndef CCB_UNIFIED_SQL_STREAM_HH #define CCB_UNIFIED_SQL_STREAM_HH #include @@ -324,8 +325,8 @@ class stream : public io::stream { database::mysql_stmt _flapping_status_insupdate; database::mysql_stmt _host_check_update; database::mysql_stmt _pb_host_check_update; - database::mysql_stmt _host_dependency_insupdate; - database::mysql_stmt _pb_host_dependency_insupdate; + database::mysql_stmt _host_exe_dependency_insupdate; + database::mysql_stmt _host_notif_dependency_insupdate; database::mysql_stmt _host_group_insupdate; database::mysql_stmt _pb_host_group_insupdate; database::mysql_stmt _host_group_member_delete; diff --git a/broker/unified_sql/src/stream.cc b/broker/unified_sql/src/stream.cc index e64583faf13..333d7240323 100644 --- a/broker/unified_sql/src/stream.cc +++ b/broker/unified_sql/src/stream.cc @@ -1317,6 +1317,22 @@ void stream::_init_statements() { "last_check=?," // 9: last_check "output=? " // 10: output "WHERE id=? AND parent_id=?"); // 11, 12: service_id and host_id + const std::string host_exe_dep_query( + "INSERT INTO hosts_hosts_dependencies (dependent_host_id, host_id, " + "dependency_period, execution_failure_options, inherits_parent) " + "VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE " + "dependency_period=VALUES(dependency_period), " + "execution_failure_options=VALUES(execution_failure_options), " + "inherits_parent=VALUES(inherits_parent)"); + _host_exe_dependency_insupdate = _mysql.prepare_query(host_exe_dep_query); + const std::string host_notif_dep_query( + "INSERT INTO hosts_hosts_dependencies (dependent_host_id, host_id, " + "dependency_period, notification_failure_options, inherits_parent) " + "VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE " + "dependency_period=VALUES(dependency_period), " + "notification_failure_options=VALUES(notification_failure_options), " + "inherits_parent=VALUES(inherits_parent)"); + _host_notif_dependency_insupdate = _mysql.prepare_query(host_notif_dep_query); if (_store_in_hosts_services) { if (_bulk_prepared_statement) { auto hu = std::make_unique(hscr_query); diff --git a/broker/unified_sql/src/stream_sql.cc b/broker/unified_sql/src/stream_sql.cc index 29bdf945024..df57f6c3944 100644 --- a/broker/unified_sql/src/stream_sql.cc +++ b/broker/unified_sql/src/stream_sql.cc @@ -1212,19 +1212,29 @@ void stream::_process_host_dependency(const std::shared_ptr& d) { hd.execution_failure_options, hd.notification_failure_options); - // Prepare queries. - if (!_host_dependency_insupdate.prepared()) { - query_preparator::event_unique unique; - unique.insert("host_id"); - unique.insert("dependent_host_id"); - query_preparator qp(neb::host_dependency::static_type(), unique); - _host_dependency_insupdate = qp.prepare_insert_or_update(_mysql); - } - // Process object. - _host_dependency_insupdate << hd; - _mysql.run_statement(_host_dependency_insupdate, - database::mysql_error::store_host_dependency, conn); + if (!hd.execution_failure_options.empty()) { + _host_exe_dependency_insupdate.bind_value_as_i32(0, hd.dependent_host_id); + _host_exe_dependency_insupdate.bind_value_as_i32(1, hd.host_id); + _host_exe_dependency_insupdate.bind_value_as_str(2, hd.dependency_period); + _host_exe_dependency_insupdate.bind_value_as_str( + 3, hd.execution_failure_options); + _host_exe_dependency_insupdate.bind_value_as_tiny(4, hd.inherits_parent); + _mysql.run_statement(_host_exe_dependency_insupdate, + database::mysql_error::store_host_dependency, conn); + } else if (!hd.notification_failure_options.empty()) { + _host_notif_dependency_insupdate.bind_value_as_i32(0, + hd.dependent_host_id); + _host_notif_dependency_insupdate.bind_value_as_i32(1, hd.host_id); + _host_notif_dependency_insupdate.bind_value_as_str(2, + hd.dependency_period); + _host_notif_dependency_insupdate.bind_value_as_str( + 3, hd.notification_failure_options); + _host_notif_dependency_insupdate.bind_value_as_tiny(4, + hd.inherits_parent); + _mysql.run_statement(_host_notif_dependency_insupdate, + database::mysql_error::store_host_dependency, conn); + } _add_action(conn, actions::host_dependencies); } // Delete. @@ -1269,33 +1279,32 @@ void stream::_process_pb_host_dependency(const std::shared_ptr& d) { hd.dependent_host_id(), hd.host_id(), hd.execution_failure_options(), hd.notification_failure_options()); - // Prepare queries. - if (!_pb_host_dependency_insupdate.prepared()) { - query_preparator::event_pb_unique unique{ - {6, "host_id", io::protobuf_base::invalid_on_zero, 0}, - {3, "dependent_host_id", io::protobuf_base::invalid_on_zero, 0}}; - query_preparator qp(neb::pb_host_dependency::static_type(), unique); - _pb_host_dependency_insupdate = qp.prepare_insert_or_update_table( - _mysql, "hosts_hosts_dependencies ", /*space is mandatory to avoid - conflict with _process_host_dependency*/ - {{3, "dependent_host_id", io::protobuf_base::invalid_on_zero, 0}, - {6, "host_id", io::protobuf_base::invalid_on_zero, 0}, - {2, "dependency_period", 0, - get_hosts_hosts_dependencies_col_size( - hosts_hosts_dependencies_dependency_period)}, - {5, "execution_failure_options", 0, - get_hosts_hosts_dependencies_col_size( - hosts_hosts_dependencies_execution_failure_options)}, - {7, "inherits_parent", 0, 0}, - {8, "notification_failure_options", 0, - get_hosts_hosts_dependencies_col_size( - hosts_hosts_dependencies_notification_failure_options)}}); - } - // Process object. - _pb_host_dependency_insupdate << hd_protobuf; - _mysql.run_statement(_pb_host_dependency_insupdate, - database::mysql_error::store_host_dependency, conn); + if (!hd.execution_failure_options().empty()) { + _host_exe_dependency_insupdate.bind_value_as_i32(0, + hd.dependent_host_id()); + _host_exe_dependency_insupdate.bind_value_as_i32(1, hd.host_id()); + _host_exe_dependency_insupdate.bind_value_as_str(2, + hd.dependency_period()); + _host_exe_dependency_insupdate.bind_value_as_str( + 3, hd.execution_failure_options()); + _host_exe_dependency_insupdate.bind_value_as_tiny(4, + hd.inherits_parent()); + _mysql.run_statement(_host_exe_dependency_insupdate, + database::mysql_error::store_host_dependency, conn); + } else if (!hd.notification_failure_options().empty()) { + _host_notif_dependency_insupdate.bind_value_as_i32( + 0, hd.dependent_host_id()); + _host_notif_dependency_insupdate.bind_value_as_i32(1, hd.host_id()); + _host_notif_dependency_insupdate.bind_value_as_str( + 2, hd.dependency_period()); + _host_notif_dependency_insupdate.bind_value_as_str( + 3, hd.notification_failure_options()); + _host_notif_dependency_insupdate.bind_value_as_tiny(4, + hd.inherits_parent()); + _mysql.run_statement(_host_notif_dependency_insupdate, + database::mysql_error::store_host_dependency, conn); + } _add_action(conn, actions::host_dependencies); } // Delete. diff --git a/tests/bam/boolean_rules.robot b/tests/bam/boolean_rules.robot index 73d7c452f2f..b407c8fdc87 100644 --- a/tests/bam/boolean_rules.robot +++ b/tests/bam/boolean_rules.robot @@ -394,7 +394,7 @@ BABOOCOMPL_RESTART FOR ${i} IN RANGE ${15} ${21} ${2} Remove Files /tmp/ba${id_ba__sid[0]}_*.dot ${result} Check Ba Status With Timeout boolean-ba 2 30 - Dump Ba 51001 ${id_ba__sid[0]} /tmp/ba${id_ba__sid[0]}_1.dot + Broker Get Ba 51001 ${id_ba__sid[0]} /tmp/ba${id_ba__sid[0]}_1.dot Should Be True ${result} Step${i}: The 'boolean-ba' BA is not CRITICAL as expected ${start} Get Current Date @@ -404,7 +404,7 @@ BABOOCOMPL_RESTART ${result} Find In Log With Timeout ${centralLog} ${start} ${content} 60 Should Be True ${result} It seems that no cache has been restored into BAM. - Dump Ba 51001 ${id_ba__sid[0]} /tmp/ba${id_ba__sid[0]}_2.dot + Broker Get Ba 51001 ${id_ba__sid[0]} /tmp/ba${id_ba__sid[0]}_2.dot Wait Until Created /tmp/ba${id_ba__sid[0]}_2.dot ${result} Compare Dot Files /tmp/ba${id_ba__sid[0]}_1.dot /tmp/ba${id_ba__sid[0]}_2.dot @@ -488,7 +488,7 @@ BABOOCOMPL_RELOAD FOR ${i} IN RANGE ${15} ${21} ${2} Remove Files /tmp/ba${id_ba__sid[0]}_*.dot ${result} Check Ba Status With Timeout boolean-ba 2 30 - Dump Ba 51001 ${id_ba__sid[0]} /tmp/ba${id_ba__sid[0]}_1.dot + Broker Get Ba 51001 ${id_ba__sid[0]} /tmp/ba${id_ba__sid[0]}_1.dot Should Be True ${result} Step${i}: The 'boolean-ba' BA is not CRITICAL as expected ${start} Get Current Date @@ -498,7 +498,7 @@ BABOOCOMPL_RELOAD ${result} Find In Log With Timeout ${centralLog} ${start} ${content} 60 Should Be True ${result} It seems that no cache has been restored into BAM. - Dump Ba 51001 ${id_ba__sid[0]} /tmp/ba${id_ba__sid[0]}_2.dot + Broker Get Ba 51001 ${id_ba__sid[0]} /tmp/ba${id_ba__sid[0]}_2.dot Wait Until Created /tmp/ba${id_ba__sid[0]}_2.dot ${result} Compare Dot Files /tmp/ba${id_ba__sid[0]}_1.dot /tmp/ba${id_ba__sid[0]}_2.dot diff --git a/tests/broker-engine/bbdo-protobuf.robot b/tests/broker-engine/bbdo-protobuf.robot index 1c6f77b5e6c..17b51ed2152 100644 --- a/tests/broker-engine/bbdo-protobuf.robot +++ b/tests/broker-engine/bbdo-protobuf.robot @@ -176,7 +176,7 @@ BEPBCVS [Teardown] Stop Engine Broker And Save Logs True BEPB_HOST_DEPENDENCY - [Documentation] bbdo_version 3 communication of host dependencies. + [Documentation] BBDO 3 communication of host dependencies. [Tags] broker engine protobuf bbdo Config Engine ${1} Config Engine Add Cfg File 0 dependencies.cfg @@ -191,7 +191,7 @@ BEPB_HOST_DEPENDENCY Start Broker True Start Engine - ${result} Common.Check Host Dependencies 2 1 24x7 1 ou ${EMPTY} 30 + ${result} Common.Check Host Dependencies 2 1 24x7 1 ou dp 30 Should Be True ${result} No notification dependency from 2 to 1 with timeperiod 24x7 on 'ou' Config Engine ${1} diff --git a/tests/broker/log.robot b/tests/broker/log.robot index 48fa3408b4a..d79a24d264a 100644 --- a/tests/broker/log.robot +++ b/tests/broker/log.robot @@ -36,10 +36,10 @@ BLEC1 Broker Config Log central sql debug ${start} Get Current Date Start Broker - ${result} Get Broker Log Level 51001 central core + ${result} Get Broker Log Level 51001 core Should Be Equal ${result} trace - Set Broker Log Level 51001 central core debug - ${result} Get Broker Log Level 51001 central core + Set Broker Log Level 51001 core debug + ${result} Get Broker Log Level 51001 core Should Be Equal ${result} debug BLEC2 @@ -51,9 +51,9 @@ BLEC2 Broker Config Log central sql debug ${start} Get Current Date Start Broker - ${result} Get Broker Log Level 51001 central core + ${result} Get Broker Log Level 51001 core Should Be Equal ${result} trace - ${result} Set Broker Log Level 51001 central core foo + ${result} Set Broker Log Level 51001 core foo Should Be Equal ${result} Enum LogLevelEnum has no value defined for name 'FOO' BLEC3 @@ -65,5 +65,5 @@ BLEC3 Broker Config Log central sql debug ${start} Get Current Date Start Broker - ${result} Set Broker Log Level 51001 central foo trace + ${result} Set Broker Log Level 51001 foo trace Should Be Equal ${result} The 'foo' logger does not exist diff --git a/tests/resources/Broker.py b/tests/resources/Broker.py index 7f727272df2..12f1143316b 100755 --- a/tests/resources/Broker.py +++ b/tests/resources/Broker.py @@ -1,16 +1,12 @@ import signal from os import setsid from os import makedirs -from os.path import exists, dirname +from os.path import exists import pymysql.cursors import time import re import shutil import psutil -import socket -import sys -import time -from datetime import datetime from subprocess import getoutput import subprocess as subp from robot.api import logger @@ -22,12 +18,10 @@ import broker_pb2_grpc from google.protobuf import empty_pb2 from google.protobuf.json_format import MessageToJson -from robot.libraries.BuiltIn import BuiltIn from Common import DB_NAME_STORAGE, DB_NAME_CONF, DB_USER, DB_PASS, DB_HOST, DB_PORT, VAR_ROOT, ETC_ROOT, TESTS_PARAMS TIMEOUT = 30 - config = { "central": """{{ "centreonBroker": {{ @@ -267,7 +261,7 @@ "json_fifo": "{7}/lib/centreon-broker/central-rrd-master-stats.json" }} ], - "grpc": {{ + "grpc": {{ "port": 51002 }} }} @@ -402,17 +396,22 @@ def _apply_conf(name, callback): else: filename = "central-rrd.json" - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "r") - buf = f.read() - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "r") as f: + buf = f.read() conf = json.loads(buf) callback(conf) - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "w") - f.write(json.dumps(conf, indent=2)) - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "w") as f: + f.write(json.dumps(conf, indent=2)) def config_broker(name: str, poller_inst: int = 1): + """ + Configure a broker instance for test. Write the configuration files. + + Args: + name (str): name of the conf broker wanted + poller_inst (int, optional): Defaults to 1. + """ makedirs(ETC_ROOT, mode=0o777, exist_ok=True) makedirs(VAR_ROOT, mode=0o777, exist_ok=True) makedirs(ETC_ROOT + "/centreon-broker", mode=0o777, exist_ok=True) @@ -454,9 +453,8 @@ def config_broker(name: str, poller_inst: int = 1): conf["centreonBroker"]["poller_name"] = f"Poller{i}" conf["centreonBroker"]["poller_id"] = i + 1 - f = open(broker_name, "w") - f.write(json.dumps(conf, indent=2)) - f.close() + with open(broker_name, "w") as f: + f.write(json.dumps(conf, indent=2)) if default_bbdo_version is not None: broker_config_add_item( f"{name}{i}", "bbdo_version", default_bbdo_version) @@ -465,10 +463,9 @@ def config_broker(name: str, poller_inst: int = 1): f"{name}{i}", "bbdo_client", "5669", "grpc", "localhost") else: - f = open(f"{ETC_ROOT}/centreon-broker/{filename}", "w") - f.write(config[name].format(broker_id, broker_name, - DB_HOST, DB_PORT, DB_USER, DB_PASS, DB_NAME_STORAGE, VAR_ROOT)) - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "w") as f: + f.write(config[name].format(broker_id, broker_name, + DB_HOST, DB_PORT, DB_USER, DB_PASS, DB_NAME_STORAGE, VAR_ROOT)) if default_bbdo_version is not None: if default_bbdo_version >= "3.0.0" and (name == "central" or name == "central_map"): config_broker_sql_output(name, 'unified_sql') @@ -486,6 +483,12 @@ def config_broker(name: str, poller_inst: int = 1): def change_broker_tcp_output_to_grpc(name: str): + """ + Update broker configuration to use a gRPC output instead of a TCP one. + + Args: + name (str): name of the conf broker wanted to be changed + """ def output_to_grpc(conf): output_dict = conf["centreonBroker"]["output"] for i, v in enumerate(output_dict): @@ -493,19 +496,36 @@ def output_to_grpc(conf): v["type"] = "grpc" if "transport_protocol" in v: v["transport_protocol"] = "grpc" + _apply_conf(name, output_to_grpc) def add_path_to_rrd_output(name: str, path: str): + """ + Set the path for the rrd output. If no rrd output is defined, this function + does nothing. + + Args: + name (str): The broker instance name among central, rrd and module%d. + path (str): path to the rrd output. + """ def rrd_output(conf): output_dict = conf["centreonBroker"]["output"] for i, v in enumerate(output_dict): if v["type"] == "rrd": v["path"] = path + _apply_conf(name, rrd_output) def change_broker_tcp_input_to_grpc(name: str): + """ + Update the broker configuration to use gRPC input instead of a TCP one. + If no tcp input is found, no replacement is done. + + Args: + name: The broker instance name among central, rrd and module%d. + """ def input_to_grpc(conf): input_dict = conf["centreonBroker"]["input"] for i, v in enumerate(input_dict): @@ -513,91 +533,208 @@ def input_to_grpc(conf): v["type"] = "grpc" if "transport_protocol" in v: v["transport_protocol"] = "grpc" + _apply_conf(name, input_to_grpc) -def add_broker_crypto(json_dict, add_cert: bool, only_ca_cert: bool): +def _add_broker_crypto(json_dict, add_cert: bool, only_ca_cert: bool): json_dict["encryption"] = "yes" if (add_cert): json_dict["ca_certificate"] = "/tmp/ca_1234.crt" - if (only_ca_cert == False): + if only_ca_cert is False: json_dict["public_cert"] = "/tmp/server_1234.crt" json_dict["private_key"] = "/tmp/server_1234.key" def add_broker_tcp_input_grpc_crypto(name: str, add_cert: bool, reversed: bool): - def crypto_modifier(conf): + """ + Add some crypto to broker gRPC input. + + Args: + name: The broker instance name among central, rrd and module%d. + add_cert (bool): True to add a certificate, False otherwise. + reversed (bool): True if only a CA certificate is provided. + + *Example:* + + | Add Broker Tcp Input Grpc Crypto | central | ${True} | ${False} | + """ + def _crypto_modifier(conf): input_dict = conf["centreonBroker"]["input"] for i, v in enumerate(input_dict): if v["type"] == "grpc": - add_broker_crypto(v, add_cert, reversed) - _apply_conf(name, crypto_modifier) + _add_broker_crypto(v, add_cert, reversed) + + _apply_conf(name, _crypto_modifier) def add_broker_tcp_output_grpc_crypto(name: str, add_cert: bool, reversed: bool): - def crypto_modifier(conf): + """ + Add grpc crypto to broker tcp output + + Args: + name: The broker instance name among central, rrd and module%d. + add_cert (bool): True to add a certificate, False otherwise. + reversed (bool): False if only a we just want a CA certificate. + + *Example:* + + | Add Broker Tcp Output Grpc Crypto | module0 | ${True} | ${False} | + """ + def _crypto_modifier(conf): input_dict = conf["centreonBroker"]["output"] for i, v in enumerate(input_dict): if v["type"] == "grpc": - add_broker_crypto(v, add_cert, not reversed) - _apply_conf(name, crypto_modifier) + _add_broker_crypto(v, add_cert, not reversed) + + _apply_conf(name, _crypto_modifier) def add_host_to_broker_output(name: str, output_name: str, host_ip: str): + """ + Add a host to some broker output. This is useful for a grpc or tcp client + where we want where to connect to. + + Args: + name (str): The broker instance name among central, rrd and module%d. + output_name (str): The name of the output to modify. + host_ip (str): the host address to set. + + *Example:* + + | Add Host To Broker Output | module0 | central-module-master-output | localhost | + """ def modifier(conf): input_dict = conf["centreonBroker"]["output"] for i, v in enumerate(input_dict): if (v["name"] == output_name): v["host"] = host_ip + _apply_conf(name, modifier) def add_host_to_broker_input(name: str, input_name: str, host_ip: str): + """ + Add host to some broker input. This is useful for a grpc or tcp client + where we want to set where to connect to. + + Args: + name: The broker instance name among central, rrd and module%d. + input_name (str): the name of the input to modify. + host_ip (str): the host address to set. + + *Example:* + + | Add Host To Broker Input | central | central-broker-master-input | localhost | + """ def modifier(conf): input_dict = conf["centreonBroker"]["input"] for i, v in enumerate(input_dict): if (v["name"] == input_name): v["host"] = host_ip + _apply_conf(name, modifier) def remove_host_from_broker_output(name: str, output_name: str): + """ + Remove the host entry from a broker output given by its name. + + Args: + name: The broker instance name among central, rrd and module%d. + output_name (str): The name of the output containing a host entry. + + *Example:* + + | Remove Host From Broker Output | module0 | central-module-master-output | + """ + def modifier(conf): input_dict = conf["centreonBroker"]["output"] for i, v in enumerate(input_dict): if (v["name"] == output_name): v.pop("host") + _apply_conf(name, modifier) def remove_host_from_broker_input(name: str, input_name: str): + """ + Remove the host entry from a broker input given by its name. + + Args: + name: The broker instance name among central, rrd and module%d. + input_name (str): The name of the input containing a host entry. + + *Example:* + + | Remove Host From Broker Input | central | central-broker-master-input | + """ + def modifier(conf): input_dict = conf["centreonBroker"]["input"] for i, v in enumerate(input_dict): if (v["name"] == input_name): v.pop("host") + _apply_conf(name, modifier) def change_broker_compression_output(config_name: str, output_name: str, compression_value: str): + """ + Change the compression option of a broker output. + + Args: + config_name (str): The broker instance name among central, rrd and module%d. + output_name (str): The output name to modify. + compression_value (str): The compression value. "yes/no", "1/0" or "true/false". + + *Example:* + + | Change Broker Compression Output | module0 | central-module-master-output | yes | + """ def compression_modifier(conf): output_dict = conf["centreonBroker"]["output"] for i, v in enumerate(output_dict): if (v["name"] == output_name): v["compression"] = compression_value + _apply_conf(config_name, compression_modifier) def change_broker_compression_input(config_name: str, input_name: str, compression_value: str): + """ + Change the compression option of a broker input. + + Args: + config_name (str): The broker instance name among central, rrd and module%d. + input_name (str): The input name to modify. + compression_value (str): The compression value: "yes/no", "1/0" or "true/false". + + *Example:* + + | Change Broker Compression Input | central | central-broker-master-input | yes | + """ def compression_modifier(conf): input_dict = conf["centreonBroker"]["input"] for i, v in enumerate(input_dict): if (v["name"] == input_name): v["compression"] = compression_value + _apply_conf(config_name, compression_modifier) def config_broker_remove_rrd_output(name): + """ + Remove rrd output from a broker configuration + + Args: + name: The broker instance name among central, rrd and module%d. + + *Example:* + + | Config Broker Remove Rrd Output | central | + """ if name == 'central': filename = "central-broker.json" elif name.startswith('module'): @@ -619,6 +756,23 @@ def config_broker_remove_rrd_output(name): def config_broker_bbdo_input(name, stream, port, proto, host=None): + """ + Configure Broker BBDO input. It can be a client or a server. We provide a + port number and a protocol that is grpc or tcp. + + Args: + name: The broker instance name among central, rrd and module%d. + stream: The type of stream among [bbdo_server, bbdo_client]. + port: A port number. + proto: grpc or tcp. + host (str, optional): Defaults to None. Used to provide a host, needed + in the case of bbdo_client. + + *Example:* + + | Config Broker Bbdo Input | central | bbdo_server | 5669 | grpc | | + | Config Broker Bbdo Input | rrd | bbdo_client | 5670 | tcp | localhost | + """ if stream != "bbdo_server" and stream != "bbdo_client": raise Exception( "config_broker_bbdo_input_output() function only accepts stream in ('bbdo_server', 'bbdo_client')") @@ -629,18 +783,18 @@ def config_broker_bbdo_input(name, stream, port, proto, host=None): if name == 'central': filename = "central-broker.json" elif name.startswith('module'): - filename = "central-{}.json".format(name) + filename = f"central-{name}.json" else: filename = "central-rrd.json" input_name = "central-rrd-master-input" - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "r") - buf = f.read() - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "r") as f: + buf = f.read() conf = json.loads(buf) io_dict = conf["centreonBroker"]["input"] # Cleanup for i, v in enumerate(io_dict): - if (v["type"] == "ipv4" or v["type"] == "grpc" or v["type"] == "bbdo_client" or v["type"] == "bbdo_server") and v["port"] == port: + if (v["type"] == "ipv4" or v["type"] == "grpc" or v["type"] == "bbdo_client" or v["type"] == "bbdo_server") and \ + v["port"] == port: io_dict.pop(i) stream = { "name": input_name, @@ -651,12 +805,27 @@ def config_broker_bbdo_input(name, stream, port, proto, host=None): if host is not None: stream["host"] = host io_dict.append(stream) - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "w") - f.write(json.dumps(conf, indent=2)) - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "w") as f: + f.write(json.dumps(conf, indent=2)) def config_broker_bbdo_output(name, stream, port, proto, host=None): + """ + Configure Broker BBDO output. It can be a client or a server. We provide a + port number and a protocol that is grpc or tcp. + + Args: + name: The broker instance name among central, rrd and module%d. + stream (str): The type of stream among [bbdo_server, bbdo_client]. + port (int): A port number. + proto (str): grpc or tcp. + host (str, optional): Defaults to None. Used to provide a host to connect, + needed in the case of bbdo_client. + + *Example:* + + | Config Broker Bbdo Output | central | bbdo_client | 5670 | tcp | localhost | + """ if stream != "bbdo_server" and stream != "bbdo_client": raise Exception( "config_broker_bbdo_output() function only accepts stream in ('bbdo_server', 'bbdo_client')") @@ -672,14 +841,14 @@ def config_broker_bbdo_output(name, stream, port, proto, host=None): output_name = 'central-module-master-output' else: filename = "central-rrd.json" - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "r") - buf = f.read() - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "r") as f: + buf = f.read() conf = json.loads(buf) io_dict = conf["centreonBroker"]["output"] # Cleanup for i, v in enumerate(io_dict): - if (v["type"] == "ipv4" or v["type"] == "grpc" or v["type"] == "bbdo_client" or v["type"] == "bbdo_server") and v["port"] == port: + if (v["type"] == "ipv4" or v["type"] == "grpc" or v["type"] == "bbdo_client" or v["type"] == "bbdo_server") and \ + v["port"] == port: io_dict.pop(i) stream = { "name": f"{output_name}", @@ -690,12 +859,19 @@ def config_broker_bbdo_output(name, stream, port, proto, host=None): if host is not None: stream["host"] = host io_dict.append(stream) - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "w") - f.write(json.dumps(conf, indent=2)) - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "w") as f: + f.write(json.dumps(conf, indent=2)) def config_broker_sql_output(name, output, queries_per_transaction: int = 20000): + """ + Configure the broker sql output. + + Args: + name (str): The broker instance name among central, rrd and module%d. + output (str): One string among "unified_sql" and "sql/perfdata". + queries_per_transaction (int, optional): Defaults to 20000. + """ if name == 'central': filename = "central-broker.json" elif name.startswith('module'): @@ -703,9 +879,8 @@ def config_broker_sql_output(name, output, queries_per_transaction: int = 20000) else: filename = "central-rrd.json" - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "r") - buf = f.read() - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "r") as f: + buf = f.read() conf = json.loads(buf) output_dict = conf["centreonBroker"]["output"] for i, v in enumerate(output_dict): @@ -769,12 +944,23 @@ def config_broker_sql_output(name, output, queries_per_transaction: int = 20000) "insert_in_index_data": "1", "type": "storage" }) - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "w") - f.write(json.dumps(conf, indent=2)) - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "w") as f: + f.write(json.dumps(conf, indent=2)) def broker_config_clear_outputs_except(name, ex: list): + """ + Remove all the outputs of the broker configuration except those of types given + in the ex list. + + Args: + name: The broker instance name among central, rrd and module%d. + ex (list): A list of type. + + *Example:* + + | Broker Config Clear Outputs Except | central | ["sql", "storage"] | + """ if name == 'central': filename = "central-broker.json" elif name.startswith('module'): @@ -782,7 +968,7 @@ def broker_config_clear_outputs_except(name, ex: list): else: filename = "central-rrd.json" - with open(ETC_ROOT + "/centreon-broker/{}".format(filename), "r") as f: + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "r") as f: buf = f.read() conf = json.loads(buf) output_dict = conf["centreonBroker"]["output"] @@ -790,14 +976,22 @@ def broker_config_clear_outputs_except(name, ex: list): if v["type"] not in ex: output_dict.pop(i) - with open(ETC_ROOT + "/centreon-broker/{}".format(filename), "w") as f: + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "w") as f: f.write(json.dumps(conf, indent=2)) def config_broker_victoria_output(): + """ + Configure broker to add a Victoria output. If some old VictoriaMetrics + outputs exist, they are removed. + + *Example:* + + | Config Broker Victoria Output | + """ filename = "central-broker.json" - with open(ETC_ROOT + "/centreon-broker/{}".format(filename), "r") as f: + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "r") as f: buf = f.read() conf = json.loads(buf) output_dict = conf["centreonBroker"]["output"] @@ -813,29 +1007,50 @@ def config_broker_victoria_output(): "db_password": "titi", "queries_per_transaction": "1", }) - with open(ETC_ROOT + "/centreon-broker/{}".format(filename), "w") as f: + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "w") as f: f.write(json.dumps(conf, indent=2)) def broker_config_add_item(name, key, value): + """ + Add an item to the broker configuration + + Args: + name (str): Which broker instance: central, rrd or module%d. + key (str): The key to add directly in the configuration first level. + value: The value. + + *Example:* + + | Broker Config Add Item | module0 | bbdo_version | 3.0.1 | + """ if name == 'central': filename = "central-broker.json" elif name == 'rrd': filename = "central-rrd.json" elif name.startswith('module'): - filename = "central-{}.json".format(name) + filename = f"central-{name}.json" - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "r") - buf = f.read() - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "r") as f: + buf = f.read() conf = json.loads(buf) conf["centreonBroker"][key] = value - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "w") - f.write(json.dumps(conf, indent=2)) - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "w") as f: + f.write(json.dumps(conf, indent=2)) def broker_config_remove_item(name, key): + """ + Remove an item from the broker configuration + + Args: + name: The broker instance name among central, rrd and module%d + key: The key to remove. It must be defined at the first level of the configuration. + + *Example:* + + | Broker Config Remove Item | module0 | bbdo_version | + """ if name == 'central': filename = "central-broker.json" elif name == 'rrd': @@ -843,17 +1058,27 @@ def broker_config_remove_item(name, key): elif name.startswith('module'): filename = "central-{}.json".format(name) - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "r") - buf = f.read() - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "r") as f: + buf = f.read() conf = json.loads(buf) conf["centreonBroker"].pop(key) - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "w") - f.write(json.dumps(conf, indent=2)) - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "w") as f: + f.write(json.dumps(conf, indent=2)) def broker_config_add_lua_output(name, output, luafile): + """ + Add a lua output to the broker configuration. + + Args: + name (str): The broker instance name among central, rrd, module%d + output (str): The name of the Lua output. + luafile (str): The full name of the Lua script. + + *Example:* + + | Broker Config Add Lua Output | central | test-protobuf | /tmp/lua.lua | + """ if name == 'central': filename = "central-broker.json" elif name.startswith('module'): @@ -861,9 +1086,8 @@ def broker_config_add_lua_output(name, output, luafile): else: filename = "central-rrd.json" - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "r") - buf = f.read() - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "r") as f: + buf = f.read() conf = json.loads(buf) output_dict = conf["centreonBroker"]["output"] output_dict.append({ @@ -871,21 +1095,32 @@ def broker_config_add_lua_output(name, output, luafile): "path": luafile, "type": "lua" }) - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "w") - f.write(json.dumps(conf, indent=2)) - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "w") as f: + f.write(json.dumps(conf, indent=2)) def broker_config_output_set(name, output, key, value): + """ + Set an attribute value in a broker output. + + Args: + name (str): The broker instance among central, rrd, module%d. + output (str): The output to work with. + key (str): The key whose value is to modify. + value (str): The new value to set. + + *Example:* + + | Broker Config Output Set | central | central-broker-master-sql | host | localhost | + """ if name == 'central': filename = "central-broker.json" elif name.startswith('module'): - filename = "central-{}.json".format(name) + filename = f"central-{name}.json" else: filename = "central-rrd.json" - f = open(f"{ETC_ROOT}/centreon-broker/{filename}", "r") - buf = f.read() - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "r") as f: + buf = f.read() conf = json.loads(buf) output_dict = [elem for i, elem in enumerate( conf["centreonBroker"]["output"]) if elem["name"] == output][0] @@ -895,26 +1130,49 @@ def broker_config_output_set(name, output, key, value): def broker_config_output_set_json(name, output, key, value): + """ + Set an attribute value in a broker output. The value is given as a json string. + + Args: + name (str): The broker instance among central, rrd, module%d. + output (str): The output to work with. + key (str): The key whose value is to modify. + value (str): The new value to set. + + *Example:* + + | Broker Config Output Set Json | central | central-broker-master-sql | filters | {"category": ["neb", "foo", "bar"]} | + """ if name == 'central': filename = "central-broker.json" elif name.startswith('module'): filename = "central-{}.json".format(name) else: filename = "central-rrd.json" - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "r") - buf = f.read() - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "r") as f: + buf = f.read() conf = json.loads(buf) output_dict = [elem for i, elem in enumerate( conf["centreonBroker"]["output"]) if elem["name"] == output][0] j = json.loads(value) output_dict[key] = j - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "w") - f.write(json.dumps(conf, indent=2)) - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "w") as f: + f.write(json.dumps(conf, indent=2)) def broker_config_output_remove(name, output, key): + """ + Remove a key from an output of the broker configuration. + + Args: + name: The broker instance among central, rrd, module%d. + output: The output to work with. + key: The key to remove. + + *Example:* + + | Broker Config Output Remove | central | centreon-broker-master-rrd | host | + """ if name == 'central': filename = "central-broker.json" elif name.startswith('module'): @@ -936,6 +1194,19 @@ def broker_config_output_remove(name, output, key): def broker_config_input_set(name, inp, key, value): + """ + Set an attribute in an input of a broker configuration. + + Args: + name (str): The broker instance among central, rrd, module%d. + inp (str): The input to work with. + key (str): The key whose value is to modify. + value (str): The new value to set. + + *Example:* + + | Broker Config Input Set | rrd | rrd-broker-master-input | encryption | yes | + """ if name == 'central': filename = "central-broker.json" elif name.startswith('module'): @@ -956,80 +1227,131 @@ def broker_config_input_set(name, inp, key, value): def broker_config_input_remove(name, inp, key): + """ + Remove a key from an input of the broker configuration. + + Args: + name: The broker instance among central, rrd, module%d. + inp: The input to work with. + key: The key to remove. + """ if name == 'central': filename = "central-broker.json" elif name.startswith('module'): filename = "central-{}.json".format(name) else: filename = "central-rrd.json" - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "r") - buf = f.read() - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "r") as f: + buf = f.read() conf = json.loads(buf) input_dict = [elem for i, elem in enumerate( conf["centreonBroker"]["input"]) if elem["name"] == inp][0] if key in input_dict: input_dict.pop(key) - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "w") - f.write(json.dumps(conf, indent=2)) - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "w") as f: + f.write(json.dumps(conf, indent=2)) def broker_config_log(name, key, value): + """ + Configure broker log level. + + Args: + name (str): The broker instance among central, rrd, module%d. + key (str): The logger name to modify. + value (str): The level among error, trace, info, debug, etc... + + *Example:* + + | Broker Config Log | central | bam | trace | + """ if name == 'central': filename = "central-broker.json" elif name.startswith('module'): filename = "central-{}.json".format(name) else: filename = "central-rrd.json" - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "r") - buf = f.read() - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "r") as f: + buf = f.read() conf = json.loads(buf) loggers = conf["centreonBroker"]["log"]["loggers"] loggers[key] = value - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "w") - f.write(json.dumps(conf, indent=2)) - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "w") as f: + f.write(json.dumps(conf, indent=2)) def broker_config_flush_log(name, value): + """ + Configure the flush interval of the broker loggers. This value is in seconds, with 0, every logs are flushed. + + Args: + name (str): the broker instance among central, rrd, module%d. + value (int): The value in seconds. + + *Example:* + + | Broker Config Flush Log | central | 1 | + """ if name == 'central': filename = "central-broker.json" elif name.startswith('module'): filename = "central-{}.json".format(name) else: filename = "central-rrd.json" - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "r") - buf = f.read() - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "r") as f: + buf = f.read() conf = json.loads(buf) log = conf["centreonBroker"]["log"] log["flush_period"] = value - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "w") - f.write(json.dumps(conf, indent=2)) - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "w") as f: + f.write(json.dumps(conf, indent=2)) def broker_config_source_log(name, value): + """ + Configure if logs should contain the source file and its line number. + + Args: + name: The broker instance name among central, rrd and module%d. + value: A boolean that can be "true/false", "1/0" or "yes/no". + + *Example:* + + | Broker Config Source Log | central | 1 | + """ if name == 'central': filename = "central-broker.json" elif name.startswith('module'): filename = "central-{}.json".format(name) else: filename = "central-rrd.json" - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "r") - buf = f.read() - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "r") as f: + buf = f.read() conf = json.loads(buf) log = conf["centreonBroker"]["log"] log["log_source"] = value - f = open(ETC_ROOT + "/centreon-broker/{}".format(filename), "w") - f.write(json.dumps(conf, indent=2)) - f.close() + with open(f"{ETC_ROOT}/centreon-broker/{filename}", "w") as f: + f.write(json.dumps(conf, indent=2)) def check_broker_stats_exist(name, key1, key2, timeout=TIMEOUT): + """ + Return True if the Broker stats file contain keys pair (key1,key2). key2 must + be a daughter key of key1. + + Should be true if the poller is connected to the central broker. + + Args: + name: The broker instance name among central, rrd and module%d. + key1 (str): A key at first level. + key2 (str): A key under the key1 key. + timeout (int, optional): . Defaults to TIMEOUT. + + *Example:* + + | ${exist} | Check Broker Stats Exist | mysql manager | poller | waiting tasks in connection 0 | + | Should Be True | ${exist} | + """ limit = time.time() + timeout while time.time() < limit: if name == 'central': @@ -1056,6 +1378,18 @@ def check_broker_stats_exist(name, key1, key2, timeout=TIMEOUT): def get_broker_stats_size(name, key, timeout=TIMEOUT): + """ + Return the number of items under the given key in the stats file. + + Args: + name: The broker instance name among central, rrd and module%d. + key: The key to work with. + timeout (int, optional): Defaults to TIMEOUT = 30s. + + *Example:* + + | ${size} | Get Broker Stats Size | central | poller | # 2 | + """ limit = time.time() + timeout retval = 0 while time.time() < limit: @@ -1086,25 +1420,30 @@ def get_broker_stats_size(name, key, timeout=TIMEOUT): time.sleep(5) return retval -def get_broker_stats(name: str, expected:str, timeout: int, *keys): - """! - read a value from broker stats - @param name central, module or rrd - @param expected: value expected (regexp) - @timeout delay to find key in stats - @param keys keys in json stats output - @return True if value found and matches expected + +def get_broker_stats(name: str, expected: str, timeout: int, *keys): + """ + Read a value from the broker stats file following the given keys. If the value is the expected one, return True. + + Args: + name The broker instance to work with among central, module%d or rrd + expected: value expected (regexp) + timeout: duration in seconds after what the check fails. + keys: keys in json stats output + + Returns: True if the expected value was found, otherwise it returns False. """ def json_get(json_dict, keys: tuple, index: int): try: key = keys[index] - if index == len(keys) -1: + if index == len(keys) - 1: return json_dict[key] else: return json_get(json_dict[key], keys, index + 1) except: return None + limit = time.time() + timeout if name == 'central': filename = "central-broker-master-stats.json" @@ -1134,14 +1473,21 @@ def json_get(json_dict, keys: tuple, index: int): return False -## -# @brief Gets count indexes that does not exist in index_data. -# -# @param count:int The number of indexes to get. -# -# @return a list of index ids. -# def get_not_existing_indexes(count: int): + """ + Gets count indexes that does not exist in the centreon_storage.index_data table. + + Args: + count (int): The number of indexes to get. + + *Example:* + + | @{indexes} | Get Not Existing Indexes | 10 | + | Log To Console | @{indexes} | + + Returns: + a list of index IDs. + """ # Connect to the database connection = pymysql.connect(host=DB_HOST, user=DB_USER, @@ -1172,14 +1518,20 @@ def get_not_existing_indexes(count: int): return ids_db -## -# @brief Gets count indexes from available ones. -# -# @param count:int The number of indexes to get. -# -# @return a list of index ids. -# def get_indexes_to_delete(count: int): + """ + Gets count indexes from centreon_storage.index_data that really exist. + + Args: + count (int): The number of indexes to get. + + *Example:* + + | @{indexes} | Get Not Existing Indexes | 10 | + + Returns: + A list of index IDs. + """ files = [os.path.basename(x) for x in glob.glob( VAR_ROOT + "/lib/centreon/metrics/[0-9]*.rrd")] ids = [int(f.split(".")[0]) for f in files] @@ -1213,23 +1565,30 @@ def get_indexes_to_delete(count: int): def delete_all_rrd_metrics(): - """! remove all rrd metrics files """ - with os.scandir(VAR_ROOT + "/lib/centreon/metrics/") as it: + Remove all rrd metrics files. + """ + with os.scandir(f"{VAR_ROOT}/lib/centreon/metrics/") as it: for entry in it: if entry.is_file(): os.remove(entry.path) def check_rrd_info(metric_id: int, key: str, value, timeout: int = 60): - """! execute rrdtool info and check one value of the returned informations - @param metric_id - @param key key to search in the rrdtool info result - @param value value to search in the rrdtool info result fot key - @param timeout timeout for metric file creation - @return True if key = value found """ + Execute rrdtool info and check one value of the returned informations + + Args: + metric_id (int): A metric ID. + key (str): The key whose value is to check. + value: The expected value. + timeout (int, optional): Defaults to 60. + + *Example:* + | ${result} | Check Rrd Info | 1 | step | 60 | + | Should Be True | ${result} | + """ limit = time.time() + timeout while time.time() < limit: res = getoutput( @@ -1245,11 +1604,23 @@ def check_rrd_info(metric_id: int, key: str, value, timeout: int = 60): def get_metrics_for_service(service_id: int, metric_name: str = "%", timeout: int = 60): - """! scan data base every 5s to extract metric ids for a service + """ + Try to get the metric IDs of a service. + + Warning: + A service is identified by a host ID and a service ID. This function should be used with caution. + + Args: + service_id (int): The ID of the service. + metric_name (str, optional): Defaults to "%". + timeout (int, optional): Defaults to 60. + + Returns: + A list of metric IDs or None if no metric found. - @param service_id id of the service - @param timeout timeout in second - @return array of metric ids + *Example:* + + | ${metrics} | Get Metrics For Service | 1 | % | """ limit = time.time() + timeout @@ -1276,14 +1647,20 @@ def get_metrics_for_service(service_id: int, metric_name: str = "%", timeout: in return None -## -# @brief Gets count metrics that does not exist. -# -# @param count:int The number of metrics to get. -# -# @return a list of metric ids. -# def get_not_existing_metrics(count: int): + """ + Return a list of metrics that does not exist. + + Args: + count (int): How many metric IDs do we want. + + Returns: + A list of IDs. + + *Example:* + + | @{metrics} | Get Not Existing Metrics | 10 | + """ files = [os.path.basename(x) for x in glob.glob( VAR_ROOT + "/lib/centreon/metrics/[0-9]*.rrd")] ids = [int(f.split(".")[0]) for f in files] @@ -1308,20 +1685,22 @@ def get_not_existing_metrics(count: int): index = 1 retval = [] while len(retval) < count: - if not index in inter: + if index not in inter: retval.append(index) index += 1 return retval -## -# @brief Gets count metrics from available ones. -# -# @param count:int The number of metrics to get. -# -# @return a list of metric ids. -# def get_metrics_to_delete(count: int): + """ + Get count metrics from availables ones. + + Args: + count (int): The number of metrics to get. + + Returns: + A list of metric IDs. + """ files = [os.path.basename(x) for x in glob.glob( VAR_ROOT + "/lib/centreon/metrics/[0-9]*.rrd")] ids = [int(f.split(".")[0]) for f in files] @@ -1346,12 +1725,13 @@ def get_metrics_to_delete(count: int): return inter[:count] -## -# @brief creat metrics from available ones. -# -# @param count:int The number of metrics to create. -# def create_metrics(count: int): + """ + Create count metrics from available ones. + + Args: + count (int): How many metrics to create. + """ files = [os.path.basename(x) for x in glob.glob( VAR_ROOT + "/lib/centreon/metrics/[0-9]*.rrd")] ids = [int(f.split(".")[0]) for f in files] @@ -1395,7 +1775,8 @@ def create_metrics(count: int): def run_reverse_bam(duration, interval): - """Launch a map_client.py script that emulates map. + """ + Launch the map_client.py script that simulates map. Args: duration: The duration in seconds before to stop map_client.py @@ -1409,17 +1790,36 @@ def run_reverse_bam(duration, interval): def start_map(): + """ + Launch the map_client_types.py script that simulates map. + """ global map_process map_process = subp.Popen("broker/map_client_types.py", shell=True, stdout=subp.DEVNULL, stdin=subp.DEVNULL) def clear_map_logs(): + """ + Reset the content of the /tmp/map-output.log file. + """ with open('/tmp/map-output.log', 'w') as f: f.write("") def check_map_output(categories_str, expected_events, timeout: int = TIMEOUT): + """ + Check the content of the /tmp/map-output.log file. This file contains informations on categories/elements of each + received event. A list of categories and event types are given to this function, so it can check if the file + contain these types of events. If it contains them, True is returned. + + Args: + categories_str: A list of categories, for example ["1", "2"]. + expected_events: A list of event types. + timeout (int, optional): A number of seconds, the default value is TIMEOUT. + + Returns: + True on success, otherwise False. + """ retval = False limit = time.time() + timeout while time.time() < limit: @@ -1466,11 +1866,20 @@ def check_map_output(categories_str, expected_events, timeout: int = TIMEOUT): def get_map_output(): + """ + The map_client_types.py script writes on STDOUT. This function allows to get this output. + + Returns: + A string containing the output. + """ global map_process return map_process.communicate()[0] def stop_map(): + """ + Stop the script simulating map. Works with map_client_type. + """ for proc in psutil.process_iter(): if 'map_client_type' in proc.name(): logger.console( @@ -1492,13 +1901,17 @@ def stop_map(): logger.console("map_client_type stopped") -## -# @brief Get count indexes that are available to rebuild them. -# -# @param count is the number of indexes to get. -# -# @return a list of indexes def get_indexes_to_rebuild(count: int, nb_day=180): + """ + Get count indexes that are available to rebuild. + + Args: + count (int): The number of indexes to get. + nb_day (int, optional): Defaults to 180. + + Returns: + A list of indexes. + """ files = [os.path.basename(x) for x in glob.glob( VAR_ROOT + "/lib/centreon/metrics/[0-9]*.rrd")] ids = [int(f.split(".")[0]) for f in files] @@ -1523,7 +1936,7 @@ def get_indexes_to_rebuild(count: int, nb_day=180): logger.console( "building data for metric {} index_id {}".format(r['metric_id'], index_id)) # We go back to 180 days with steps of 5 mn - start = int(time.time()/86400)*86400 - \ + start = int(time.time() / 86400) * 86400 - \ 24 * 60 * 60 * nb_day value = int(r['metric_id']) // 2 status_value = index_id % 3 @@ -1531,8 +1944,9 @@ def get_indexes_to_rebuild(count: int, nb_day=180): r['metric_id'], start)) # We set the value to a constant on 180 days for i in range(0, 24 * 60 * 60 * nb_day, 60 * 5): - cursor.execute("INSERT INTO data_bin (id_metric, ctime, value, status) VALUES ({},{},{},'{}')".format( - r['metric_id'], start + i, value, status_value)) + cursor.execute( + "INSERT INTO data_bin (id_metric, ctime, value, status) VALUES ({},{},{},'{}')".format( + r['metric_id'], start + i, value, status_value)) connection.commit() retval.append(index_id) @@ -1544,12 +1958,13 @@ def get_indexes_to_rebuild(count: int, nb_day=180): return retval -## -# @brief add a value at the mid of the first day of each metric -# -# -# @return a list of indexes of pair