Skip to content

Commit

Permalink
otel services work in passive mode (#1637)
Browse files Browse the repository at this point in the history
* otel services work in passive mode

update doc

* fix ut

* fix anomaly-detection
  • Loading branch information
jean-christophe81 authored Sep 2, 2024
1 parent 6ae97cf commit f91bec1
Show file tree
Hide file tree
Showing 40 changed files with 618 additions and 1,804 deletions.
3 changes: 3 additions & 0 deletions CMakeListsWindows.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
# For more information : [email protected]
#

#in order to make fmt compile
add_definitions("/utf-8")

# When we build from cache (CI), we don't use vcpkg cmaketool, so we tell to cmake where to find packages info
if (BUILD_FROM_CACHE)
LIST(APPEND CMAKE_PREFIX_PATH "build_windows/vcpkg_installed/x64-windows")
Expand Down
2 changes: 1 addition & 1 deletion common/src/rapidjson_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ const rapidjson::Value& rapidjson_helper::get_member(
*/
rapidjson::Document rapidjson_helper::read_from_file(
const std::string_view& path) {
FILE* to_close = fopen(path.data(), "r+b");
FILE* to_close = fopen(path.data(), "r");
if (!to_close) {
throw exceptions::msg_fmt("Fail to read file '{}' : {}", path,
strerror(errno));
Expand Down
38 changes: 23 additions & 15 deletions common/tests/process_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,29 +45,33 @@ class process_test : public ::testing::Test {
};

class process_wait : public process {
std::mutex _cond_m;
std::condition_variable _cond;
std::string _stdout;
std::string _stderr;
bool _stdout_eof = false;
bool _stderr_eof = false;
bool _process_ended = false;

void _notify() {
if (_stdout_eof && _stderr_eof && _process_ended) {
_cond.notify_one();
}
public:
void reset_end() {
std::lock_guard l(_cond_m);
_stdout_eof = false;
_stderr_eof = false;
_process_ended = false;
}

public:
void on_stdout_read(const boost::system::error_code& err,
size_t nb_read) override {
if (!err) {
std::string_view line(_stdout_read_buffer, nb_read);
_stdout += line;
SPDLOG_LOGGER_DEBUG(_logger, "read from stdout: {}", line);
} else if (err == asio::error::eof || err == asio::error::broken_pipe) {
std::unique_lock l(_cond_m);
_stdout_eof = true;
_notify();
l.unlock();
_cond.notify_one();
}
process::on_stdout_read(err, nb_read);
}
Expand All @@ -79,8 +83,10 @@ class process_wait : public process {
_stderr += line;
SPDLOG_LOGGER_DEBUG(_logger, "read from stderr: {}", line);
} else if (err == asio::error::eof || err == asio::error::broken_pipe) {
std::unique_lock l(_cond_m);
_stderr_eof = true;
_notify();
l.unlock();
_cond.notify_one();
}
process::on_stderr_read(err, nb_read);
}
Expand All @@ -89,8 +95,10 @@ class process_wait : public process {
int raw_exit_status) override {
process::on_process_end(err, raw_exit_status);
SPDLOG_LOGGER_DEBUG(_logger, "process end");
std::unique_lock l(_cond_m);
_process_ended = true;
_notify();
l.unlock();
_cond.notify_one();
}

template <typename string_type>
Expand All @@ -109,9 +117,9 @@ class process_wait : public process {
const std::string& get_stderr() const { return _stderr; }

void wait() {
std::mutex dummy;
std::unique_lock l(dummy);
_cond.wait(l);
std::unique_lock l(_cond_m);
_cond.wait(l,
[this] { return _process_ended && _stderr_eof && _stdout_eof; });
}
};

Expand Down Expand Up @@ -154,11 +162,11 @@ TEST_F(process_test, call_start_several_time) {
new process_wait(g_io_context, _logger, ECHO_PATH, {"hello"}));
std::string expected;
for (int ii = 0; ii < 10; ++ii) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
to_wait->reset_end();
to_wait->start_process(true);
to_wait->wait();
expected += "hello" END_OF_LINE;
}
to_wait->wait();
ASSERT_EQ(to_wait->get_exit_status(), 0);
ASSERT_EQ(to_wait->get_stdout(), expected);
ASSERT_EQ(to_wait->get_stderr(), "");
Expand All @@ -169,11 +177,11 @@ TEST_F(process_test, call_start_several_time_no_args) {
new process_wait(g_io_context, _logger, ECHO_PATH " hello"));
std::string expected;
for (int ii = 0; ii < 10; ++ii) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
to_wait->reset_end();
to_wait->start_process(true);
to_wait->wait();
expected += "hello" END_OF_LINE;
}
to_wait->wait();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
ASSERT_EQ(to_wait->get_exit_status(), 0);
ASSERT_EQ(to_wait->get_stdout(), expected);
Expand Down
3 changes: 0 additions & 3 deletions engine/inc/com/centreon/engine/check_result.hh
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ class check_result {
return _object_check_type;
}
void set_object_check_type(enum check_source object_check_type);
uint64_t get_command_id() const { return _command_id; }
void set_command_id(uint64_t command_id) { _command_id = command_id; }

inline notifier* get_notifier() { return _notifier; }
void set_notifier(notifier* notifier);
Expand Down Expand Up @@ -81,7 +79,6 @@ class check_result {

private:
enum check_source _object_check_type; // is this a service or a host check?
uint64_t _command_id;
notifier* _notifier;
// was this an active or passive service check?
enum checkable::check_type _check_type;
Expand Down
21 changes: 17 additions & 4 deletions engine/inc/com/centreon/engine/checks/checker.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@
#include "com/centreon/engine/anomalydetection.hh"
#include "com/centreon/engine/commands/command.hh"

namespace com::centreon::engine {
namespace com::centreon::engine::checks {

namespace checks {
/**
* @class checks check_result.hh
* @brief Run object and reap the result.
Expand Down Expand Up @@ -57,6 +56,9 @@ class checker : public commands::command_listener {

void wait_completion(e_completion_filter filter = e_completion_filter::all);

template <class queue_handler>
void inspect_reap_partial(queue_handler&& handler) const;

private:
checker(bool used_by_test);
checker(checker const& right);
Expand All @@ -66,7 +68,7 @@ class checker : public commands::command_listener {
host::host_state _execute_sync(host* hst);

/* A mutex to protect access on _waiting_check_result and _to_reap_partial */
std::mutex _mut_reap;
mutable std::mutex _mut_reap;
/*
* Here is the list of prepared check results but with a command being
* running. When the command will be finished, each check result is get back
Expand All @@ -92,8 +94,19 @@ class checker : public commands::command_listener {
std::condition_variable _finish_cond;
bool _finished;
};
} // namespace checks

/**
* @brief allow to inspect _to_reap_partial
*
* @tparam queue_handler
* @param handler must have () (const std::deque<check_result::pointer> &)
*/
template <class queue_handler>
void checker::inspect_reap_partial(queue_handler&& handler) const {
std::lock_guard<std::mutex> lock(_mut_reap);
handler(_to_reap_partial);
}

} // namespace com::centreon::engine::checks

#endif // !CCE_CHECKS_CHECKER_HH
21 changes: 15 additions & 6 deletions engine/inc/com/centreon/engine/commands/otel_connector.hh
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ namespace com::centreon::engine::commands {
* open telemetry request run command line configure converter who converts
* data_points to result
*/
class otel_connector : public command,
public std::enable_shared_from_this<otel_connector> {
class otel_connector : public command {
otel::host_serv_list::pointer _host_serv_list;

public:
Expand All @@ -43,16 +42,17 @@ class otel_connector : public command,
static otel_connector_container _commands;

std::shared_ptr<otel::host_serv_extractor> _extractor;
std::shared_ptr<otel::check_result_builder_config> _conv_conf;
std::shared_ptr<otel::otl_check_result_builder_base> _check_result_builder;

std::shared_ptr<spdlog::logger> _logger;

void init();

public:
static void create(const std::string& connector_name,
const std::string& cmd_line,
commands::command_listener* listener);
static std::shared_ptr<otel_connector> create(
const std::string& connector_name,
const std::string& cmd_line,
commands::command_listener* listener);

static bool remove(const std::string& connector_name);

Expand All @@ -62,6 +62,10 @@ class otel_connector : public command,
static std::shared_ptr<otel_connector> get_otel_connector(
const std::string& connector_name);

static std::shared_ptr<otel_connector> get_otel_connector_from_host_serv(
const std::string_view& host,
const std::string_view& serv);

static void clear();

static void init_all();
Expand All @@ -76,6 +80,11 @@ class otel_connector : public command,

void update(const std::string& cmd_line);

void process_data_pts(
const std::string_view& host,
const std::string_view& serv,
const modules::opentelemetry::metric_to_datapoints& data_pts);

virtual uint64_t run(const std::string& processed_cmd,
nagios_macros& macros,
uint32_t timeout,
Expand Down
51 changes: 34 additions & 17 deletions engine/inc/com/centreon/engine/commands/otel_interface.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
#include "com/centreon/engine/commands/result.hh"
#include "com/centreon/engine/macros/defines.hh"

namespace com::centreon::engine::modules::opentelemetry {
class metric_to_datapoints;
}

namespace com::centreon::engine::commands::otel {

/**
Expand Down Expand Up @@ -66,14 +70,34 @@ class host_serv_list {
const std::string& service_description);
void remove(const std::string& host, const std::string& service_description);

bool contains(const std::string& host,
const std::string& service_description) const;
template <class string_type>
bool contains(const string_type& host,
const string_type& service_description) const;

template <typename host_set, typename service_set>
host_serv_metric match(const host_set& hosts,
const service_set& services) const;
};

/**
* @brief test if a host serv pair is contained in list
*
* @param host
* @param service_description
* @return true found
* @return false not found
*/
template <class string_type>
bool host_serv_list::contains(const string_type& host,
const string_type& service_description) const {
absl::ReaderMutexLock l(&_data_m);
auto host_search = _data.find(host);
if (host_search != _data.end()) {
return host_search->second.contains(service_description);
}
return false;
}

template <typename host_set, typename service_set>
host_serv_metric host_serv_list::match(const host_set& hosts,
const service_set& services) const {
Expand Down Expand Up @@ -111,13 +135,15 @@ class host_serv_extractor {
virtual ~host_serv_extractor() = default;
};

class check_result_builder_config {
class otl_check_result_builder_base {
public:
virtual ~check_result_builder_config() = default;
virtual ~otl_check_result_builder_base() = default;
virtual void process_data_pts(
const std::string_view& host,
const std::string_view& serv,
const modules::opentelemetry::metric_to_datapoints& data_pts) = 0;
};

using result_callback = std::function<void(const result&)>;

class open_telemetry_base;

/**
Expand All @@ -139,17 +165,8 @@ class open_telemetry_base
const std::string& cmdline,
const host_serv_list::pointer& host_serv_list) = 0;

virtual std::shared_ptr<check_result_builder_config>
create_check_result_builder_config(const std::string& cmd_line) = 0;

virtual bool check(
const std::string& processed_cmd,
const std::shared_ptr<check_result_builder_config>& conv_conf,
uint64_t command_id,
nagios_macros& macros,
uint32_t timeout,
commands::result& res,
result_callback&& handler) = 0;
virtual std::shared_ptr<otl_check_result_builder_base>
create_check_result_builder(const std::string& cmdline) = 0;
};

}; // namespace com::centreon::engine::commands::otel
Expand Down
53 changes: 48 additions & 5 deletions engine/inc/com/centreon/engine/service.hh
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,55 @@ class servicegroup;
class serviceescalation;
} // namespace com::centreon::engine

/**
* @brief pair with host_name in first and serv in second
*
*/
using host_serv_pair = std::pair<std::string /*host*/, std::string /*serv*/>;

/**
* @brief This struct is used to lookup in a host_serv_pair indexed container
* with a std::pair<std::string_view, std::string_view>
*
*/
struct host_serv_hash_eq {
using is_transparent = void;
using host_serv_string_view = std::pair<std::string_view, std::string_view>;

size_t operator()(const host_serv_pair& to_hash) const {
return absl::Hash<host_serv_pair>()(to_hash);
}
size_t operator()(const host_serv_string_view& to_hash) const {
return absl::Hash<host_serv_string_view>()(to_hash);
}

bool operator()(const host_serv_pair& left,
const host_serv_pair& right) const {
return left == right;
}
bool operator()(const host_serv_pair& left,
const host_serv_string_view& right) const {
return left.first == right.first && left.second == right.second;
}
bool operator()(const host_serv_string_view& left,
const host_serv_pair& right) const {
return left.first == right.first && left.second == right.second;
}
bool operator()(const host_serv_string_view& left,
const host_serv_string_view& right) const {
return left == right;
}
};

using service_map =
absl::flat_hash_map<std::pair<std::string, std::string>,
std::shared_ptr<com::centreon::engine::service>>;
using service_map_unsafe =
absl::flat_hash_map<std::pair<std::string, std::string>,
com::centreon::engine::service*>;
absl::flat_hash_map<host_serv_pair,
std::shared_ptr<com::centreon::engine::service>,
host_serv_hash_eq,
host_serv_hash_eq>;
using service_map_unsafe = absl::flat_hash_map<host_serv_pair,
com::centreon::engine::service*,
host_serv_hash_eq,
host_serv_hash_eq>;
using service_id_map =
absl::btree_map<std::pair<uint64_t, uint64_t>,
std::shared_ptr<com::centreon::engine::service>>;
Expand Down
Loading

0 comments on commit f91bec1

Please sign in to comment.