From 1575b36151b0fc5701db1cbdb3346dd7663b715e Mon Sep 17 00:00:00 2001 From: Jean Christophe Roques Date: Mon, 8 Jul 2024 12:00:24 +0200 Subject: [PATCH] MON-63843-agent-linux-streaming --- agent/CMakeLists.txt | 7 +- agent/inc/com/centreon/agent/bireactor.hh | 84 ++++ .../com/centreon/agent/streaming_client.hh | 109 +++++ .../com/centreon/agent/streaming_server.hh | 73 ++++ agent/src/bireactor.cc | 207 +++++++++ agent/src/main.cc | 27 +- agent/src/streaming_client.cc | 230 ++++++++++ agent/src/streaming_server.cc | 237 +++++++++++ agent/test/scheduler_test.cc | 4 +- broker/core/multiplexing/src/muxer.cc | 29 +- engine/tests/CMakeLists.txt | 7 +- .../opentelemetry/agent_to_engine_test.cc | 327 ++++++++++++++ packaging/centreon-monitoring-agent.yaml | 10 + tests/broker-engine/opentelemetry.robot | 402 ++++++++++++++++++ tests/resources/Agent.py | 71 ++++ tests/resources/resources.resource | 34 ++ 16 files changed, 1837 insertions(+), 21 deletions(-) create mode 100644 agent/inc/com/centreon/agent/bireactor.hh create mode 100644 agent/inc/com/centreon/agent/streaming_client.hh create mode 100644 agent/inc/com/centreon/agent/streaming_server.hh create mode 100644 agent/src/bireactor.cc create mode 100644 agent/src/streaming_client.cc create mode 100644 agent/src/streaming_server.cc create mode 100644 engine/tests/opentelemetry/agent_to_engine_test.cc create mode 100644 tests/resources/Agent.py diff --git a/agent/CMakeLists.txt b/agent/CMakeLists.txt index 55293a28bf0..c64801646c4 100644 --- a/agent/CMakeLists.txt +++ b/agent/CMakeLists.txt @@ -101,6 +101,7 @@ add_custom_command( add_library(centagent_lib STATIC ${SRC_DIR}/agent.grpc.pb.cc ${SRC_DIR}/agent.pb.cc + ${SRC_DIR}/bireactor.cc ${SRC_DIR}/check.cc ${SRC_DIR}/check_exec.cc ${SRC_DIR}/opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.cc @@ -110,6 +111,8 @@ add_library(centagent_lib STATIC ${SRC_DIR}/opentelemetry/proto/resource/v1/resource.pb.cc ${SRC_DIR}/config.cc ${SRC_DIR}/scheduler.cc + ${SRC_DIR}/streaming_client.cc + ${SRC_DIR}/streaming_server.cc ) include_directories( @@ -136,7 +139,9 @@ target_link_libraries( centreon_process -L${Boost_LIBRARY_DIR_RELEASE} boost_program_options - fmt::fmt) + fmt::fmt + stdc++fs + ) target_precompile_headers(${CENTREON_AGENT} REUSE_FROM centagent_lib) diff --git a/agent/inc/com/centreon/agent/bireactor.hh b/agent/inc/com/centreon/agent/bireactor.hh new file mode 100644 index 00000000000..77c0123b411 --- /dev/null +++ b/agent/inc/com/centreon/agent/bireactor.hh @@ -0,0 +1,84 @@ +/** + * Copyright 2024 Centreon + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * For more information : contact@centreon.com + */ + +#ifndef CENTREON_AGENT_BIREACTOR_HH +#define CENTREON_AGENT_BIREACTOR_HH + +#include "agent.grpc.pb.h" + +namespace com::centreon::agent { + +template +class bireactor + : public bireactor_class, + public std::enable_shared_from_this> { + private: + static std::set> _instances; + static std::mutex _instances_m; + + bool _write_pending; + std::deque> _write_queue; + std::shared_ptr _read_current; + + const std::string_view _class_name; + + const std::string _peer; + + protected: + std::shared_ptr _io_context; + std::shared_ptr _logger; + + bool _alive; + mutable std::mutex _protect; + + public: + bireactor(const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::string_view& class_name, + const std::string& peer); + + virtual ~bireactor(); + + static void register_stream(const std::shared_ptr& strm); + + void start_read(); + + void start_write(); + void write(const std::shared_ptr& request); + + // bireactor part + void OnReadDone(bool ok) override; + + virtual void on_incomming_request( + const std::shared_ptr& request) = 0; + + virtual void on_error() = 0; + + void OnWriteDone(bool ok) override; + + // server version + void OnDone(); + // client version + void OnDone(const ::grpc::Status& /*s*/); + + virtual void shutdown(); +}; + +} // namespace com::centreon::agent + +#endif diff --git a/agent/inc/com/centreon/agent/streaming_client.hh b/agent/inc/com/centreon/agent/streaming_client.hh new file mode 100644 index 00000000000..28b5ff636fc --- /dev/null +++ b/agent/inc/com/centreon/agent/streaming_client.hh @@ -0,0 +1,109 @@ +/** + * Copyright 2024 Centreon + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * For more information : contact@centreon.com + */ + +#ifndef CENTREON_AGENT_STREAMING_CLIENT_HH +#define CENTREON_AGENT_STREAMING_CLIENT_HH + +#include "com/centreon/common/grpc/grpc_client.hh" + +#include "bireactor.hh" +#include "scheduler.hh" + +namespace com::centreon::agent { + +class streaming_client; + +class client_reactor + : public bireactor< + ::grpc::ClientBidiReactor> { + std::weak_ptr _parent; + ::grpc::ClientContext _context; + + public: + client_reactor(const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::shared_ptr& parent, + const std::string& peer); + + std::shared_ptr shared_from_this() { + return std::static_pointer_cast( + bireactor<::grpc::ClientBidiReactor>:: + shared_from_this()); + } + + ::grpc::ClientContext& get_context() { return _context; } + + void on_incomming_request( + const std::shared_ptr& request) override; + + void on_error() override; + + void shutdown() override; +}; + +/** + * @brief this object not only manages connection to engine, but also embed + * check scheduler + * + */ +class streaming_client : public common::grpc::grpc_client_base, + public std::enable_shared_from_this { + std::shared_ptr _io_context; + std::shared_ptr _logger; + std::string _supervised_host; + + std::unique_ptr _stub; + + std::shared_ptr _reactor; + std::shared_ptr _sched; + + std::mutex _protect; + + void _create_reactor(); + + void _start(); + + void _send(const std::shared_ptr& request); + + public: + streaming_client(const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::shared_ptr& conf, + const std::string& supervised_host); + + static std::shared_ptr load( + const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::shared_ptr& conf, + const std::string& supervised_host); + + void on_incomming_request(const std::shared_ptr& caller, + const std::shared_ptr& request); + void on_error(const std::shared_ptr& caller); + + void shutdown(); + + // use only for tests + engine_to_agent_request_ptr get_last_message_to_agent() const { + return _sched->get_last_message_to_agent(); + } +}; + +} // namespace com::centreon::agent + +#endif \ No newline at end of file diff --git a/agent/inc/com/centreon/agent/streaming_server.hh b/agent/inc/com/centreon/agent/streaming_server.hh new file mode 100644 index 00000000000..eba43886a3d --- /dev/null +++ b/agent/inc/com/centreon/agent/streaming_server.hh @@ -0,0 +1,73 @@ +/** + * Copyright 2024 Centreon + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * For more information : contact@centreon.com + */ + +#ifndef CENTREON_AGENT_STREAMING_SERVER_HH +#define CENTREON_AGENT_STREAMING_SERVER_HH + +#include "com/centreon/common/grpc/grpc_server.hh" + +#include "bireactor.hh" +#include "scheduler.hh" + +namespace com::centreon::agent { + +class server_reactor; + +/** + * @brief grpc engine to agent server (reverse connection) + * It accept only one connection at a time + * If another connection occurs, previous connection is shutdown + * This object is both grpc server and grpc service + */ +class streaming_server : public common::grpc::grpc_server_base, + public std::enable_shared_from_this, + public ReversedAgentService::Service { + std::shared_ptr _io_context; + std::shared_ptr _logger; + std::string _supervised_host; + + /** active engine to agent connection*/ + std::shared_ptr _incoming; + + mutable std::mutex _protect; + + void _start(); + + public: + streaming_server(const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::shared_ptr& conf, + const std::string& supervised_host); + + ~streaming_server(); + + static std::shared_ptr load( + const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::shared_ptr& conf, + const std::string& supervised_host); + + ::grpc::ServerBidiReactor* Import( + ::grpc::CallbackServerContext* context); + + void shutdown(); +}; + +} // namespace com::centreon::agent + +#endif diff --git a/agent/src/bireactor.cc b/agent/src/bireactor.cc new file mode 100644 index 00000000000..8a5673aa38f --- /dev/null +++ b/agent/src/bireactor.cc @@ -0,0 +1,207 @@ +/** + * Copyright 2024 Centreon + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * For more information : contact@centreon.com + */ + +#include "bireactor.hh" + +using namespace com::centreon::agent; + +/** + * @brief when BiReactor::OnDone is called by grpc layers, we should delete + * this. But this object is even used by others. + * So it's stored in this container and just removed from this container when + * OnDone is called + * + * @tparam bireactor_class + */ +template +std::set>> + bireactor::_instances; + +template +std::mutex bireactor::_instances_m; + +template +bireactor::bireactor( + const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::string_view& class_name, + const std::string& peer) + : _write_pending(false), + _alive(true), + _class_name(class_name), + _peer(peer), + _io_context(io_context), + _logger(logger) { + SPDLOG_LOGGER_DEBUG(_logger, "create {} this={:p} peer:{}", _class_name, + static_cast(this), _peer); +} + +template +bireactor::~bireactor() { + SPDLOG_LOGGER_DEBUG(_logger, "delete {} this={:p} peer:{}", _class_name, + static_cast(this), _peer); +} + +template +void bireactor::register_stream( + const std::shared_ptr& strm) { + std::lock_guard l(_instances_m); + _instances.insert(strm); +} + +template +void bireactor::start_read() { + std::lock_guard l(_protect); + if (!_alive) { + return; + } + std::shared_ptr to_read; + if (_read_current) { + return; + } + to_read = _read_current = std::make_shared(); + bireactor_class::StartRead(to_read.get()); +} + +template +void bireactor::OnReadDone(bool ok) { + if (ok) { + std::shared_ptr readden; + { + std::lock_guard l(_protect); + SPDLOG_LOGGER_TRACE(_logger, "{:p} {} peer {} receive: {}", + static_cast(this), _class_name, _peer, + _read_current->ShortDebugString()); + readden = _read_current; + _read_current.reset(); + } + start_read(); + if (readden->has_config()) { + on_incomming_request(readden); + } + } else { + SPDLOG_LOGGER_ERROR(_logger, "{:p} {} peer:{} fail read from stream", + static_cast(this), _class_name, _peer); + on_error(); + shutdown(); + } +} + +template +void bireactor::write( + const std::shared_ptr& request) { + { + std::lock_guard l(_protect); + if (!_alive) { + return; + } + _write_queue.push_back(request); + } + start_write(); +} + +template +void bireactor::start_write() { + std::shared_ptr to_send; + { + std::lock_guard l(_protect); + if (!_alive || _write_pending || _write_queue.empty()) { + return; + } + to_send = _write_queue.front(); + _write_pending = true; + } + bireactor_class::StartWrite(to_send.get()); +} + +template +void bireactor::OnWriteDone(bool ok) { + if (ok) { + { + std::lock_guard l(_protect); + _write_pending = false; + SPDLOG_LOGGER_TRACE(_logger, "{:p} {} {} sent", + static_cast(this), _class_name, + (*_write_queue.begin())->ShortDebugString()); + _write_queue.pop_front(); + } + start_write(); + } else { + SPDLOG_LOGGER_ERROR(_logger, "{:p} {} peer {} fail write to stream", + static_cast(this), _class_name, _peer); + on_error(); + shutdown(); + } +} + +template +void bireactor::OnDone() { + /**grpc has a bug, sometimes if we delete this class in this handler as it is + * described in examples, it also deletes used channel and does a pthread_join + * of the current thread witch go to a EDEADLOCK error and call grpc::Crash. + * So we uses asio thread to do the job + */ + _io_context->post([me = std::enable_shared_from_this< + bireactor>::shared_from_this(), + &peer = _peer, logger = _logger]() { + std::lock_guard l(_instances_m); + SPDLOG_LOGGER_DEBUG(logger, "{:p} server::OnDone() to {}", + static_cast(me.get()), peer); + _instances.erase(std::static_pointer_cast>(me)); + }); +} + +template +void bireactor::OnDone(const ::grpc::Status& status) { + /**grpc has a bug, sometimes if we delete this class in this handler as it is + * described in examples, it also deletes used channel and does a + * pthread_join of the current thread witch go to a EDEADLOCK error and call + * grpc::Crash. So we uses asio thread to do the job + */ + _io_context->post([me = std::enable_shared_from_this< + bireactor>::shared_from_this(), + status, &peer = _peer, logger = _logger]() { + std::lock_guard l(_instances_m); + if (status.ok()) { + SPDLOG_LOGGER_DEBUG(logger, "{:p} peer: {} client::OnDone({}) {}", + static_cast(me.get()), peer, + status.error_message(), status.error_details()); + } else { + SPDLOG_LOGGER_ERROR(logger, "{:p} peer:{} client::OnDone({}) {}", + static_cast(me.get()), peer, + status.error_message(), status.error_details()); + } + _instances.erase(std::static_pointer_cast>(me)); + }); +} + +template +void bireactor::shutdown() { + SPDLOG_LOGGER_DEBUG(_logger, "{:p} {}::shutdown", static_cast(this), + _class_name); +} + +namespace com::centreon::agent { + +template class bireactor< + ::grpc::ClientBidiReactor>; + +template class bireactor< + ::grpc::ServerBidiReactor>; + +} // namespace com::centreon::agent \ No newline at end of file diff --git a/agent/src/main.cc b/agent/src/main.cc index 562a1f05e46..e613d749d2f 100644 --- a/agent/src/main.cc +++ b/agent/src/main.cc @@ -21,6 +21,8 @@ #include #include "config.hh" +#include "streaming_client.hh" +#include "streaming_server.hh" using namespace com::centreon::agent; @@ -28,6 +30,9 @@ std::shared_ptr g_io_context = std::make_shared(); std::shared_ptr g_logger; +static std::shared_ptr _streaming_client; + +static std::shared_ptr _streaming_server; static asio::signal_set _signals(*g_io_context, SIGTERM, SIGUSR1, SIGUSR2); @@ -36,9 +41,16 @@ static void signal_handler(const boost::system::error_code& error, if (!error) { switch (signal_number) { case SIGTERM: - SPDLOG_LOGGER_INFO(g_logger, "SIGTERM received"); - g_io_context->stop(); - break; + case SIGINT: + SPDLOG_LOGGER_INFO(g_logger, "SIGTERM or SIGINT received"); + if (_streaming_client) { + _streaming_client->shutdown(); + } + if (_streaming_server) { + _streaming_server->shutdown(); + } + g_io_context->post([]() { g_io_context->stop(); }); + return; case SIGUSR2: SPDLOG_LOGGER_INFO(g_logger, "SIGUSR2 received"); if (g_logger->level()) { @@ -151,6 +163,7 @@ int main(int argc, char* argv[]) { try { // ignored but mandatory because of forks _signals.add(SIGPIPE); + _signals.add(SIGINT); _signals.async_wait(signal_handler); @@ -166,6 +179,14 @@ int main(int argc, char* argv[]) { return -1; } + if (conf->use_reverse_connection()) { + _streaming_server = streaming_server::load(g_io_context, g_logger, + grpc_conf, conf->get_host()); + } else { + _streaming_client = streaming_client::load(g_io_context, g_logger, + grpc_conf, conf->get_host()); + } + try { g_io_context->run(); } catch (const std::exception& e) { diff --git a/agent/src/streaming_client.cc b/agent/src/streaming_client.cc new file mode 100644 index 00000000000..5fa122c83cd --- /dev/null +++ b/agent/src/streaming_client.cc @@ -0,0 +1,230 @@ +/** + * Copyright 2024 Centreon + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * For more information : contact@centreon.com + */ + +#include "streaming_client.hh" +#include "check_exec.hh" +#include "com/centreon/clib/version.hh" +#include "com/centreon/common/defer.hh" + +using namespace com::centreon::agent; + +/** + * @brief Construct a new client reactor::client reactor object + * + * @param io_context + * @param parent we will keep a weak_ptr on streaming_client object + */ +client_reactor::client_reactor( + const std::shared_ptr& io_context, + const std::shared_ptr& logger, + + const std::shared_ptr& parent, + const std::string& peer) + : bireactor<::grpc::ClientBidiReactor>( + io_context, + logger, + "client", + peer), + _parent(parent) {} + +/** + * @brief pass request to streaming_client parent + * + * @param request + */ +void client_reactor::on_incomming_request( + const std::shared_ptr& request) { + std::shared_ptr parent = _parent.lock(); + if (!parent) { + shutdown(); + } else { + parent->on_incomming_request(shared_from_this(), request); + } +} + +/** + * @brief called whe OnReadDone or OnWriteDone ok parameter is false + * + */ +void client_reactor::on_error() { + std::shared_ptr parent = _parent.lock(); + if (parent) { + parent->on_error(shared_from_this()); + } +} + +/** + * @brief shutdown connection to engine if not yet done + * + */ +void client_reactor::shutdown() { + std::lock_guard l(_protect); + if (_alive) { + _alive = false; + bireactor<::grpc::ClientBidiReactor>::shutdown(); + RemoveHold(); + _context.TryCancel(); + } +} + +/** + * @brief Construct a new streaming client::streaming client object + * not use it, use load instead + * + * @param io_context + * @param conf + * @param supervised_hosts + */ +streaming_client::streaming_client( + const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::shared_ptr& conf, + const std::string& supervised_host) + : com::centreon::common::grpc::grpc_client_base(conf, logger), + _io_context(io_context), + _logger(logger), + _supervised_host(supervised_host) { + _stub = std::move(AgentService::NewStub(_channel)); +} + +/** + * @brief to call after construction + * + */ +void streaming_client::_start() { + std::weak_ptr weak_this = shared_from_this(); + + _sched = scheduler::load( + _io_context, _logger, _supervised_host, scheduler::default_config(), + [sender = std::move(weak_this)]( + const std::shared_ptr& request) { + auto parent = sender.lock(); + if (parent) { + parent->_send(request); + } + }, + check_exec::load); + _create_reactor(); +} + +/** + * @brief create reactor on current grpc channel + * and send agent infos (hostname, supervised hosts, collect version) + * + */ +void streaming_client::_create_reactor() { + std::lock_guard l(_protect); + if (_reactor) { + _reactor->shutdown(); + } + _reactor = std::make_shared( + _io_context, _logger, shared_from_this(), get_conf()->get_hostport()); + client_reactor::register_stream(_reactor); + _stub->async()->Export(&_reactor->get_context(), _reactor.get()); + _reactor->start_read(); + _reactor->AddHold(); + _reactor->StartCall(); + + // identifies to engine + std::shared_ptr who_i_am = + std::make_shared(); + auto infos = who_i_am->mutable_init(); + + infos->mutable_centreon_version()->set_major( + com::centreon::clib::version::major); + infos->mutable_centreon_version()->set_minor( + com::centreon::clib::version::minor); + infos->mutable_centreon_version()->set_patch( + com::centreon::clib::version::patch); + + infos->set_host(_supervised_host); + + _reactor->write(who_i_am); +} + +/** + * @brief construct a new streaming_client + * + * @param io_context + * @param conf + * @param supervised_hosts list of host to supervise (match to engine config) + * @return std::shared_ptr + */ +std::shared_ptr streaming_client::load( + const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::shared_ptr& conf, + const std::string& supervised_host) { + std::shared_ptr ret = std::make_shared( + io_context, logger, conf, supervised_host); + ret->_start(); + return ret; +} + +/** + * @brief send a request to engine + * + * @param request + */ +void streaming_client::_send(const std::shared_ptr& request) { + std::lock_guard l(_protect); + if (_reactor) + _reactor->write(request); +} + +/** + * @brief + * + * @param caller + * @param request + */ +void streaming_client::on_incomming_request( + const std::shared_ptr& caller, + const std::shared_ptr& request) { + // incoming request is used in main thread + _io_context->post([request, sched = _sched]() { sched->update(request); }); +} + +/** + * @brief called by _reactor when something was wrong + * Then we wait 10s to reconnect to engine + * + * @param caller + */ +void streaming_client::on_error(const std::shared_ptr& caller) { + std::lock_guard l(_protect); + if (caller == _reactor) { + _reactor.reset(); + common::defer(_io_context, std::chrono::seconds(10), + [me = shared_from_this()] { me->_create_reactor(); }); + } +} + +/** + * @brief stop and shutdown scheduler and connection + * After, this object is dead and must be deleted + * + */ +void streaming_client::shutdown() { + std::lock_guard l(_protect); + _sched->stop(); + if (_reactor) { + _reactor->shutdown(); + } +} diff --git a/agent/src/streaming_server.cc b/agent/src/streaming_server.cc new file mode 100644 index 00000000000..cfc23fabb11 --- /dev/null +++ b/agent/src/streaming_server.cc @@ -0,0 +1,237 @@ +/** + * Copyright 2024 Centreon + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * For more information : contact@centreon.com + */ + +#include "streaming_server.hh" +#include "check_exec.hh" +#include "com/centreon/clib/version.hh" +#include "scheduler.hh" + +using namespace com::centreon::agent; + +namespace com::centreon::agent { + +class server_reactor + : public bireactor< + ::grpc::ServerBidiReactor> { + std::shared_ptr _sched; + std::string _supervised_host; + + void _start(); + + public: + server_reactor(const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::string& supervised_hosts, + const std::string& peer); + + static std::shared_ptr load( + const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::string& supervised_hosts, + const std::string& peer); + + std::shared_ptr shared_from_this() { + return std::static_pointer_cast( + bireactor<::grpc::ServerBidiReactor>:: + shared_from_this()); + } + + void on_incomming_request( + const std::shared_ptr& request) override; + + void on_error() override; + + void shutdown() override; +}; + +server_reactor::server_reactor( + const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::string& supervised_host, + const std::string& peer) + : bireactor<::grpc::ServerBidiReactor>( + io_context, + logger, + "server", + peer), + _supervised_host(supervised_host) {} + +void server_reactor::_start() { + std::weak_ptr weak_this(shared_from_this()); + + _sched = scheduler::load( + _io_context, _logger, _supervised_host, scheduler::default_config(), + [sender = std::move(weak_this)]( + const std::shared_ptr& request) { + auto parent = sender.lock(); + if (parent) { + parent->write(request); + } + }, + check_exec::load); + + // identifies to engine + std::shared_ptr who_i_am = + std::make_shared(); + auto infos = who_i_am->mutable_init(); + + infos->mutable_centreon_version()->set_major( + com::centreon::clib::version::major); + infos->mutable_centreon_version()->set_minor( + com::centreon::clib::version::minor); + infos->mutable_centreon_version()->set_patch( + com::centreon::clib::version::patch); + infos->set_host(_supervised_host); + + write(who_i_am); +} + +std::shared_ptr server_reactor::load( + const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::string& supervised_host, + const std::string& peer) { + std::shared_ptr ret = std::make_shared( + io_context, logger, supervised_host, peer); + ret->_start(); + return ret; +} + +void server_reactor::on_incomming_request( + const std::shared_ptr& request) { + _io_context->post([sched = _sched, request]() { sched->update(request); }); +} + +void server_reactor::on_error() { + shutdown(); +} + +void server_reactor::shutdown() { + std::lock_guard l(_protect); + if (_alive) { + _alive = false; + _sched->stop(); + bireactor<::grpc::ServerBidiReactor>::shutdown(); + Finish(::grpc::Status::CANCELLED); + } +} + +} // namespace com::centreon::agent + +/** + * @brief Construct a new streaming server::streaming server object + * Not use it, use load instead + * @param io_context + * @param conf + * @param supervised_hosts list of supervised hosts that will be sent to engine + * in order to have checks configuration + */ +streaming_server::streaming_server( + const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::shared_ptr& conf, + const std::string& supervised_host) + : com::centreon::common::grpc::grpc_server_base(conf, logger), + _io_context(io_context), + _logger(logger), + _supervised_host(supervised_host) { + SPDLOG_LOGGER_INFO(_logger, "create grpc server listening on {}", + conf->get_hostport()); +} + +streaming_server::~streaming_server() { + SPDLOG_LOGGER_INFO(_logger, "delete grpc server listening on {}", + get_conf()->get_hostport()); +} + +/** + * @brief register service and start grpc server + * + */ +void streaming_server::_start() { + ::grpc::Service::MarkMethodCallback( + 0, new ::grpc::internal::CallbackBidiHandler< + ::com::centreon::agent::MessageToAgent, + ::com::centreon::agent::MessageFromAgent>( + [me = shared_from_this()](::grpc::CallbackServerContext* context) { + return me->Import(context); + })); + + _init([this](::grpc::ServerBuilder& builder) { + builder.RegisterService(this); + }); +} + +/** + * @brief construct and start a new streaming_server + * + * @param io_context + * @param conf + * @param supervised_hosts list of supervised hosts that will be sent to engine + * in order to have checks configuration + * @return std::shared_ptr + */ +std::shared_ptr streaming_server::load( + const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::shared_ptr& conf, + const std::string& supervised_host) { + std::shared_ptr ret = std::make_shared( + io_context, logger, conf, supervised_host); + ret->_start(); + return ret; +} + +/** + * @brief shutdown server and incoming connection + * + */ +void streaming_server::shutdown() { + SPDLOG_LOGGER_INFO(_logger, "shutdown grpc server listening on {}", + get_conf()->get_hostport()); + { + std::lock_guard l(_protect); + if (_incoming) { + _incoming->shutdown(); + _incoming.reset(); + } + } + common::grpc::grpc_server_base::shutdown(std::chrono::seconds(10)); +} + +/** + * @brief callback called on incoming connection + * + * @param context + * @return ::grpc::ServerBidiReactor* = + * _incoming + */ +::grpc::ServerBidiReactor* +streaming_server::Import(::grpc::CallbackServerContext* context) { + SPDLOG_LOGGER_INFO(_logger, "incoming connection from {}", context->peer()); + std::lock_guard l(_protect); + if (_incoming) { + _incoming->shutdown(); + } + _incoming = server_reactor::load(_io_context, _logger, _supervised_host, + context->peer()); + server_reactor::register_stream(_incoming); + _incoming->start_read(); + return _incoming.get(); +} diff --git a/agent/test/scheduler_test.cc b/agent/test/scheduler_test.cc index c741f0c7aac..a7ac335382e 100644 --- a/agent/test/scheduler_test.cc +++ b/agent/test/scheduler_test.cc @@ -316,7 +316,7 @@ TEST_F(scheduler_test, correct_output_examplar) { ASSERT_TRUE(exported_request); - SPDLOG_INFO("export:{}", exported_request->otel_request().DebugString()); + SPDLOG_INFO("export:{}", exported_request->otel_request().ShortDebugString()); ASSERT_EQ(exported_request->otel_request().resource_metrics_size(), 2); const ::opentelemetry::proto::metrics::v1::ResourceMetrics& res = @@ -457,4 +457,4 @@ TEST_F(scheduler_test, max_concurent) { ASSERT_EQ(concurent_check::checked.size(), 200); sched->stop(); -} \ No newline at end of file +} diff --git a/broker/core/multiplexing/src/muxer.cc b/broker/core/multiplexing/src/muxer.cc index c81a955206e..231edd0a5a3 100644 --- a/broker/core/multiplexing/src/muxer.cc +++ b/broker/core/multiplexing/src/muxer.cc @@ -317,20 +317,23 @@ void muxer::_execute_reader_if_needed() { bool expected = false; if (_reader_running.compare_exchange_strong(expected, true)) { com::centreon::common::pool::io_context_ptr()->post( - [me = shared_from_this()] { - std::vector> to_fill; - to_fill.reserve(me->_events_size); - bool still_events_to_read = me->read(to_fill, me->_events_size); - uint32_t written = me->_data_handler(to_fill); - if (written > 0) - me->ack_events(written); - if (written != to_fill.size()) { - me->_logger->error( - "Unable to handle all the incoming events in muxer '{}'", - me->_name); - me->clear_action_on_new_data(); + [me = shared_from_this(), this] { + absl::MutexLock lck(&_events_m); + if (_data_handler) { + std::vector> to_fill; + to_fill.reserve(_events_size); + bool still_events_to_read = read(to_fill, _events_size); + uint32_t written = _data_handler(to_fill); + if (written > 0) + ack_events(written); + if (written != to_fill.size()) { + _logger->error( + "Unable to handle all the incoming events in muxer '{}'", + _name); + clear_action_on_new_data(); + } + _reader_running.store(false); } - me->_reader_running.store(false); }); } } diff --git a/engine/tests/CMakeLists.txt b/engine/tests/CMakeLists.txt index 1dbadcf16ef..ff06101e3b6 100755 --- a/engine/tests/CMakeLists.txt +++ b/engine/tests/CMakeLists.txt @@ -113,6 +113,7 @@ if(WITH_TESTING) "${TESTS_DIR}/notifications/service_downtime_notification_test.cc" "${TESTS_DIR}/opentelemetry/agent_check_result_builder_test.cc" "${TESTS_DIR}/opentelemetry/agent_reverse_client_test.cc" + "${TESTS_DIR}/opentelemetry/agent_to_engine_test.cc" "${TESTS_DIR}/opentelemetry/grpc_config_test.cc" "${TESTS_DIR}/opentelemetry/host_serv_extractor_test.cc" "${TESTS_DIR}/opentelemetry/otl_server_test.cc" @@ -196,12 +197,14 @@ if(WITH_TESTING) cce_core log_v2 opentelemetry + centagent_lib "-Wl,-no-whole-archive" pb_open_telemetry_lib centreon_grpc centreon_http - -L${Boost_LIBRARY_DIR_RELEASE} - boost_url + centreon_process + -L${Boost_LIBRARY_DIR_RELEASE} + boost_url boost_program_options pthread ${GCOV} diff --git a/engine/tests/opentelemetry/agent_to_engine_test.cc b/engine/tests/opentelemetry/agent_to_engine_test.cc new file mode 100644 index 00000000000..91679611c36 --- /dev/null +++ b/engine/tests/opentelemetry/agent_to_engine_test.cc @@ -0,0 +1,327 @@ +/** + * Copyright 2024 Centreon + * + * This file is part of Centreon Engine. + * + * Centreon Engine is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License version 2 + * as published by the Free Software Foundation. + * + * Centreon Engine is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Centreon Engine. If not, see + * . + */ + +#include +#include + +#include +#include + +#include + +#include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h" +#include "opentelemetry/proto/metrics/v1/metrics.pb.h" + +#include "com/centreon/engine/contact.hh" +#include "com/centreon/engine/host.hh" +#include "com/centreon/engine/service.hh" + +#include "com/centreon/engine/command_manager.hh" +#include "com/centreon/engine/configuration/applier/connector.hh" +#include "com/centreon/engine/configuration/applier/contact.hh" +#include "com/centreon/engine/configuration/applier/host.hh" +#include "com/centreon/engine/configuration/applier/service.hh" + +#include "com/centreon/agent/streaming_client.hh" +#include "com/centreon/engine/modules/opentelemetry/otl_fmt.hh" +#include "com/centreon/engine/modules/opentelemetry/otl_server.hh" + +#include "../test_engine.hh" +#include "helper.hh" + +using namespace com::centreon::engine; +using namespace com::centreon::agent; +// using namespace com::centreon::engine::configuration; +// using namespace com::centreon::engine::configuration::applier; +using namespace com::centreon::engine::modules::opentelemetry; +using namespace ::opentelemetry::proto::collector::metrics::v1; + +class agent_to_engine_test : public TestEngine { + protected: + std::shared_ptr _server; + + // agent code is mono-thread so it runs on his own io_context run by only one + // thread + std::shared_ptr _agent_io_context; + + asio::executor_work_guard _worker; + std::thread _agent_io_ctx_thread; + + public: + agent_to_engine_test() + : _agent_io_context(std::make_shared()), + _worker{asio::make_work_guard(*_agent_io_context)}, + _agent_io_ctx_thread([this] { _agent_io_context->run(); }) {} + + ~agent_to_engine_test() { + _agent_io_context->stop(); + _agent_io_ctx_thread.join(); + } + + void SetUp() override { + spdlog::default_logger()->set_level(spdlog::level::trace); + ::fmt::formatter< ::opentelemetry::proto::collector::metrics::v1:: + ExportMetricsServiceRequest>::json_grpc_format = true; + timeperiod::timeperiods.clear(); + contact::contacts.clear(); + host::hosts.clear(); + host::hosts_by_id.clear(); + service::services.clear(); + service::services_by_id.clear(); + + init_config_state(); + + configuration::applier::connector conn_aply; + configuration::connector cnn("agent"); + cnn.parse("connector_line", + "opentelemetry " + "--processor=nagios_telegraf --extractor=attributes " + "--host_path=resource_metrics.scope_metrics.data.data_points." + "attributes.host " + "--service_path=resource_metrics.scope_metrics.data.data_points." + "attributes.service"); + conn_aply.add_object(cnn); + configuration::error_cnt err; + + configuration::applier::contact ct_aply; + configuration::contact ctct{new_configuration_contact("admin", true)}; + ct_aply.add_object(ctct); + ct_aply.expand_objects(*config); + ct_aply.resolve_object(ctct, err); + + configuration::host hst = + new_configuration_host("test_host", "admin", 1, "agent"); + + configuration::applier::host hst_aply; + hst_aply.add_object(hst); + + configuration::service svc{new_configuration_service( + "test_host", "test_svc", "admin", 1, "agent")}; + configuration::service svc2{new_configuration_service( + "test_host", "test_svc_2", "admin", 2, "agent")}; + configuration::service svc_no_otel{ + new_configuration_service("test_host", "test_svc_2", "admin", 3)}; + configuration::applier::service svc_aply; + svc_aply.add_object(svc); + svc_aply.add_object(svc2); + svc_aply.add_object(svc_no_otel); + + hst_aply.resolve_object(hst, err); + svc_aply.resolve_object(svc, err); + svc_aply.resolve_object(svc2, err); + svc_aply.resolve_object(svc_no_otel, err); + } + + void TearDown() override { + if (_server) { + _server->shutdown(std::chrono::seconds(15)); + _server.reset(); + } + deinit_config_state(); + } + + template + void start_server(const grpc_config::pointer& listen_endpoint, + const centreon_agent::agent_config::pointer& agent_conf, + const metric_handler_type& handler) { + _server = otl_server::load(_agent_io_context, listen_endpoint, agent_conf, + handler, spdlog::default_logger()); + } +}; + +bool compare_to_expected_host_metric( + const opentelemetry::proto::metrics::v1::ResourceMetrics& metric) { + bool host_found = false, serv_found = false; + for (const auto& attrib : metric.resource().attributes()) { + if (attrib.key() == "host.name") { + if (attrib.value().string_value() != "test_host") { + return false; + } + host_found = true; + } + if (attrib.key() == "service.name") { + if (!attrib.value().string_value().empty()) { + return false; + } + serv_found = true; + } + } + if (!host_found || !serv_found) { + return false; + } + const auto& scope_metric = metric.scope_metrics(); + if (scope_metric.size() != 1) + return false; + const auto& metrics = scope_metric.begin()->metrics(); + if (metrics.empty()) + return false; + const auto& status_metric = *metrics.begin(); + if (status_metric.name() != "status") + return false; + if (!status_metric.has_gauge()) + return false; + if (status_metric.gauge().data_points().empty()) + return false; + return status_metric.gauge().data_points().begin()->as_int() == 0; +} + +bool test_exemplars( + const google::protobuf::RepeatedPtrField< + ::opentelemetry::proto::metrics::v1::Exemplar>& examplars, + const std::map& expected) { + std::set matches; + + for (const auto& ex : examplars) { + if (ex.filtered_attributes().empty()) + continue; + auto search = expected.find(ex.filtered_attributes().begin()->key()); + if (search == expected.end()) + return false; + + if (search->second != ex.as_double()) + return false; + matches.insert(search->first); + } + return matches.size() == expected.size(); +} + +bool compare_to_expected_serv_metric( + const opentelemetry::proto::metrics::v1::ResourceMetrics& metric, + const std::string_view& serv_name) { + bool host_found = false, serv_found = false; + for (const auto& attrib : metric.resource().attributes()) { + if (attrib.key() == "host.name") { + if (attrib.value().string_value() != "test_host") { + return false; + } + host_found = true; + } + if (attrib.key() == "service.name") { + if (attrib.value().string_value() != serv_name) { + return false; + } + serv_found = true; + } + } + if (!host_found || !serv_found) { + return false; + } + const auto& scope_metric = metric.scope_metrics(); + if (scope_metric.size() != 1) + return false; + const auto& metrics = scope_metric.begin()->metrics(); + if (metrics.empty()) + return false; + + for (const auto& met : metrics) { + if (!met.has_gauge()) + return false; + if (met.name() == "metric") { + if (met.gauge().data_points().empty()) + return false; + if (met.gauge().data_points().begin()->as_double() != 12) + return false; + if (!test_exemplars(met.gauge().data_points().begin()->exemplars(), + {{"crit_gt", 75.0}, + {"crit_lt", 0.0}, + {"warn_gt", 50.0}, + {"warn_lt", 0.0}})) + return false; + } else if (met.name() == "metric2") { + if (met.gauge().data_points().empty()) + return false; + if (met.gauge().data_points().begin()->as_double() != 30) + return false; + if (!test_exemplars(met.gauge().data_points().begin()->exemplars(), + {{"crit_gt", 80.0}, + {"crit_lt", 75.0}, + {"warn_gt", 75.0}, + {"warn_lt", 50.0}, + {"min", 0.0}, + {"max", 100.0}})) + return false; + + } else if (met.name() == "status") { + if (met.gauge().data_points().begin()->as_int() != 0) + return false; + } else + return false; + } + + return true; +} + +TEST_F(agent_to_engine_test, server_send_conf_to_agent_and_receive_metrics) { + grpc_config::pointer listen_endpoint = + std::make_shared("127.0.0.1:4623", false); + + absl::Mutex mut; + std::vector received; + std::vector + resource_metrics; + + auto agent_conf = std::make_shared(1, 10, 1, 5); + + start_server(listen_endpoint, agent_conf, + [&](const metric_request_ptr& metric) { + absl::MutexLock l(&mut); + received.push_back(metric); + for (const opentelemetry::proto::metrics::v1::ResourceMetrics& + res_metric : metric->resource_metrics()) { + resource_metrics.push_back(&res_metric); + } + }); + + auto agent_client = + streaming_client::load(_agent_io_context, spdlog::default_logger(), + listen_endpoint, "test_host"); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + command_manager::instance().execute(); + + auto metric_received = [&]() { return resource_metrics.size() >= 3; }; + + mut.LockWhen(absl::Condition(&metric_received)); + mut.Unlock(); + + agent_client->shutdown(); + + _server->shutdown(std::chrono::seconds(15)); + + bool host_metric_found = true; + bool serv_1_found = false; + bool serv_2_found = false; + + for (const opentelemetry::proto::metrics::v1::ResourceMetrics* to_compare : + resource_metrics) { + if (compare_to_expected_serv_metric(*to_compare, "test_svc")) { + serv_1_found = true; + } else if (compare_to_expected_serv_metric(*to_compare, "test_svc_2")) { + serv_2_found = true; + } else if (compare_to_expected_host_metric(*to_compare)) { + host_metric_found = true; + } else { + SPDLOG_ERROR("bad resource metric: {}", to_compare->DebugString()); + ASSERT_TRUE(false); + } + } + ASSERT_TRUE(host_metric_found); + ASSERT_TRUE(serv_1_found); + ASSERT_TRUE(serv_2_found); +} \ No newline at end of file diff --git a/packaging/centreon-monitoring-agent.yaml b/packaging/centreon-monitoring-agent.yaml index 83bba81f424..2bff6437a93 100644 --- a/packaging/centreon-monitoring-agent.yaml +++ b/packaging/centreon-monitoring-agent.yaml @@ -51,6 +51,16 @@ contents: owner: centreon-monitoring-agent group: centreon-monitoring-agent +overrides: + rpm: + depends: + - openssl-libs >= 3 + - zlib + deb: + depends: + - libssl3 + - zlib1g + scripts: preinstall: ./scripts/centreon-monitoring-agent-preinstall.sh postinstall: ./scripts/centreon-monitoring-agent-postinstall.sh diff --git a/tests/broker-engine/opentelemetry.robot b/tests/broker-engine/opentelemetry.robot index 728e624a924..2397f9dbdca 100644 --- a/tests/broker-engine/opentelemetry.robot +++ b/tests/broker-engine/opentelemetry.robot @@ -2,6 +2,7 @@ Documentation Engine/Broker tests on opentelemetry engine server Resource ../resources/import.resource +Library ../resources/Agent.py Suite Setup Ctn Clean Before Suite Suite Teardown Ctn Clean After Suite @@ -142,6 +143,20 @@ BEOTEL_TELEGRAF_CHECK_HOST ${result} Ctn Check Host Output Resource Status With Timeout host_1 30 ${start} 0 HARD OK Should Be True ${result} hosts table not updated + + Log To Console export metrics + Ctn Send Otl To Engine 4317 ${resources_list} + + Sleep 5 + + + # feed and check + ${start} Ctn Get Round Current Date + Ctn Schedule Forced Host Check host_1 + + ${result} Ctn Check Host Check Status With Timeout host_1 30 ${start} 0 OK + Should Be True ${result} hosts table not updated + # check then feed, three times to modify hard state ${start} Ctn Get Round Current Date Ctn Schedule Forced Host Check host_1 @@ -224,6 +239,18 @@ BEOTEL_TELEGRAF_CHECK_SERVICE ${result} Ctn Check Service Output Resource Status With Timeout host_1 service_1 30 ${start} 0 HARD OK Should Be True ${result} services table not updated + Log To Console export metrics + Ctn Send Otl To Engine 4317 ${resources_list} + + Sleep 5 + + # feed and check + ${start} Ctn Get Round Current Date + Ctn Schedule Forced Svc Check host_1 service_1 + + ${result} Ctn Check Service Check Status With Timeout host_1 service_1 30 ${start} 0 OK + Should Be True ${result} services table not updated + # check then feed, three times to modify hard state ${start} Ctn Get Round Current Date ${resources_list} Ctn Create Otl Request ${2} host_1 service_1 @@ -392,6 +419,381 @@ BEOTEL_SERVE_TELEGRAF_CONFIGURATION_NO_CRYPTED ... unexpected telegraf server response: ${telegraf_conf_response.text} +BEOTEL_CENTREON_AGENT_CHECK_HOST + [Documentation] agent check host and we expect to get it in check result + [Tags] broker engine opentelemetry MON-63843 + Ctn Config Engine ${1} ${2} ${2} + Ctn Add Otl ServerModule + ... 0 + ... {"otel_server":{"host": "0.0.0.0","port": 4317},"max_length_grpc_log":0, "centreon_agent":{"check_interval":10, "export_period":10}} + Ctn Config Add Otl Connector + ... 0 + ... OTEL connector + ... opentelemetry --processor=centreon_agent --extractor=attributes --host_path=resource_metrics.resource.attributes.host.name --service_path=resource_metrics.resource.attributes.service.name + Ctn Engine Config Replace Value In Hosts ${0} host_1 check_command otel_check_icmp + Ctn Engine Config Add Command + ... ${0} + ... otel_check_icmp + ... /bin/echo "OK - 127.0.0.1: rta 0,010ms, lost 0%|rta=0,010ms;200,000;500,000;0; pl=0%;40;80;; rtmax=0,035ms;;;; rtmin=0,003ms;;;;" + ... OTEL connector + + Ctn Engine Config Set Value 0 log_level_checks trace + + Ctn Config Broker central + Ctn Config Broker module + Ctn Config Broker rrd + Ctn Config Centreon Agent + Ctn Broker Config Log central sql trace + + Ctn ConfigBBDO3 1 + Ctn Clear Retention + + ${start} Get Current Date + Ctn Start Broker + Ctn Start Engine + Ctn Start Agent + + # Let's wait for the otel server start + ${content} Create List unencrypted server listening on 0.0.0.0:4317 + ${result} Ctn Find In Log With Timeout ${engineLog0} ${start} ${content} 10 + Should Be True ${result} "unencrypted server listening on 0.0.0.0:4317" should be available. + Sleep 1 + + ${start} Ctn Get Round Current Date + Ctn Schedule Forced Host Check host_1 + + ${result} Ctn Check Host Check Status With Timeout host_1 30 ${start} 0 OK - 127.0.0.1 + Should Be True ${result} hosts table not updated + + Ctn Engine Config Replace Value In Hosts ${0} host_1 check_command otel_check_icmp_2 + Ctn Engine Config Add Command + ... ${0} + ... otel_check_icmp_2 + ... /bin/echo "OK check2 - 127.0.0.1: rta 0,010ms, lost 0%|rta=0,010ms;200,000;500,000;0; pl=0%;40;80;; rtmax=0,035ms;;;; rtmin=0,003ms;;;;" + ... OTEL connector + + #update conf engine, it must be taken into account by agent + Log To Console modify engine conf and reload engine + Ctn Reload Engine + + #wait for new data from agent + ${start} Ctn Get Round Current Date + ${content} Create List description: \"OK check2 + ${result} Ctn Find In Log With Timeout ${engineLog0} ${start} ${content} 22 + Should Be True ${result} "description: "OK check2" should be available. + + ${start} Ctn Get Round Current Date + Ctn Schedule Forced Host Check host_1 + + ${result} Ctn Check Host Check Status With Timeout host_1 30 ${start} 0 OK check2 - 127.0.0.1: rta 0,010ms, lost 0% + Should Be True ${result} hosts table not updated + + +BEOTEL_CENTREON_AGENT_CHECK_SERVICE + [Documentation] agent check service and we expect to get it in check result + [Tags] broker engine opentelemetry MON-63843 + Ctn Config Engine ${1} ${2} ${2} + Ctn Add Otl ServerModule + ... 0 + ... {"otel_server":{"host": "0.0.0.0","port": 4317},"max_length_grpc_log":0,"centreon_agent":{"check_interval":10, "export_period":15}} + Ctn Config Add Otl Connector + ... 0 + ... OTEL connector + ... opentelemetry --processor=centreon_agent --extractor=attributes --host_path=resource_metrics.resource.attributes.host.name --service_path=resource_metrics.resource.attributes.service.name + Ctn Engine Config Replace Value In Services ${0} service_1 check_command otel_check + Ctn Engine Config Add Command + ... ${0} + ... otel_check + ... /tmp/var/lib/centreon-engine/check.pl --id 456 + ... OTEL connector + + Ctn Engine Config Set Value 0 log_level_checks trace + + #service_1 check fail CRITICAL + Ctn Set Command Status 456 ${2} + + Ctn Config Broker central + Ctn Config Broker module + Ctn Config Broker rrd + Ctn Config Centreon Agent + Ctn Broker Config Log central sql trace + + Ctn ConfigBBDO3 1 + Ctn Clear Retention + + ${start} Ctn Get Round Current Date + Ctn Start Broker + Ctn Start Engine + Ctn Start Agent + + # Let's wait for the otel server start + ${content} Create List unencrypted server listening on 0.0.0.0:4317 + ${result} Ctn Find In Log With Timeout ${engineLog0} ${start} ${content} 10 + Should Be True ${result} "unencrypted server listening on 0.0.0.0:4317" should be available. + + ${content} Create List fifos:{"host_1,service_1" + ${result} Ctn Find In Log With Timeout ${engineLog0} ${start} ${content} 30 + Should Be True ${result} fifos not found in logs + + Ctn Schedule Forced Svc Check host_1 service_1 + + ${result} Ctn Check Service Check Status With Timeout host_1 service_1 60 ${start} 2 Test check 456 + Should Be True ${result} services table not updated + + ${start} Ctn Get Round Current Date + #service_1 check ok + Ctn Set Command Status 456 ${0} + + ${content} Create List as_int: 0 + ${result} Ctn Find In Log With Timeout ${engineLog0} ${start} ${content} 30 + Should Be True ${result} status 0 not found in logs + + Ctn Schedule Forced Svc Check host_1 service_1 + + ${result} Ctn Check Service Check Status With Timeout host_1 service_1 60 ${start} 0 Test check 456 + Should Be True ${result} services table not updated + + +BEOTEL_REVERSE_CENTREON_AGENT_CHECK_HOST + [Documentation] agent check host with reversed connection and we expect to get it in check result + [Tags] broker engine opentelemetry MON-63843 + Ctn Config Engine ${1} ${2} ${2} + Ctn Add Otl ServerModule + ... 0 + ... {"max_length_grpc_log":0,"centreon_agent":{"check_interval":10, "export_period":15, "reverse_connections":[{"host": "127.0.0.1","port": 4317}]}} + Ctn Config Add Otl Connector + ... 0 + ... OTEL connector + ... opentelemetry --processor=centreon_agent --extractor=attributes --host_path=resource_metrics.resource.attributes.host.name --service_path=resource_metrics.resource.attributes.service.name + Ctn Engine Config Replace Value In Hosts ${0} host_1 check_command otel_check_icmp + Ctn Engine Config Add Command + ... ${0} + ... otel_check_icmp + ... /bin/echo "OK - 127.0.0.1: rta 0,010ms, lost 0%|rta=0,010ms;200,000;500,000;0; pl=0%;40;80;; rtmax=0,035ms;;;; rtmin=0,003ms;;;;" + ... OTEL connector + + Ctn Engine Config Set Value 0 log_level_checks trace + + Ctn Config Broker central + Ctn Config Broker module + Ctn Config Broker rrd + Ctn Config Reverse Centreon Agent + Ctn Broker Config Log central sql trace + + Ctn ConfigBBDO3 1 + Ctn Clear Retention + + ${start} Get Current Date + Ctn Start Broker + Ctn Start Engine + Ctn Start Agent + + # Let's wait for engine to connect to agent + ${content} Create List init from [.\\s]*127.0.0.1:4317 + ${result} Ctn Find Regex In Log With Timeout ${engineLog0} ${start} ${content} 10 + Should Be True ${result} "init from localhost:4317" not found in log + Sleep 1 + + ${start} Ctn Get Round Current Date + Ctn Schedule Forced Host Check host_1 + + ${result} Ctn Check Host Check Status With Timeout host_1 30 ${start} 0 OK - 127.0.0.1 + Should Be True ${result} hosts table not updated + + Ctn Engine Config Replace Value In Hosts ${0} host_1 check_command otel_check_icmp_2 + Ctn Engine Config Add Command + ... ${0} + ... otel_check_icmp_2 + ... /bin/echo "OK check2 - 127.0.0.1: rta 0,010ms, lost 0%|rta=0,010ms;200,000;500,000;0; pl=0%;40;80;; rtmax=0,035ms;;;; rtmin=0,003ms;;;;" + ... OTEL connector + + #update conf engine, it must be taken into account by agent + Log To Console modify engine conf and reload engine + Ctn Reload Engine + + #wait for new data from agent + ${start} Ctn Get Round Current Date + ${content} Create List description: \"OK check2 + ${result} Ctn Find In Log With Timeout ${engineLog0} ${start} ${content} 30 + Should Be True ${result} "description: "OK check2" should be available. + + ${start} Ctn Get Round Current Date + Ctn Schedule Forced Host Check host_1 + + ${result} Ctn Check Host Check Status With Timeout host_1 30 ${start} 0 OK check2 - 127.0.0.1: rta 0,010ms, lost 0% + Should Be True ${result} hosts table not updated + + +BEOTEL_REVERSE_CENTREON_AGENT_CHECK_SERVICE + [Documentation] agent check service with reversed connection and we expect to get it in check result + [Tags] broker engine opentelemetry MON-63843 + Ctn Config Engine ${1} ${2} ${2} + Ctn Add Otl ServerModule + ... 0 + ... {"max_length_grpc_log":0,"centreon_agent":{"check_interval":10, "export_period":15, "reverse_connections":[{"host": "127.0.0.1","port": 4317}]}} + Ctn Config Add Otl Connector + ... 0 + ... OTEL connector + ... opentelemetry --processor=centreon_agent --extractor=attributes --host_path=resource_metrics.resource.attributes.host.name --service_path=resource_metrics.resource.attributes.service.name + Ctn Engine Config Replace Value In Services ${0} service_1 check_command otel_check + Ctn Engine Config Add Command + ... ${0} + ... otel_check + ... /tmp/var/lib/centreon-engine/check.pl --id 456 + ... OTEL connector + + Ctn Engine Config Set Value 0 log_level_checks trace + + #service_1 check fail CRITICAL + Ctn Set Command Status 456 ${2} + + Ctn Config Broker central + Ctn Config Broker module + Ctn Config Broker rrd + Ctn Config Reverse Centreon Agent + Ctn Broker Config Log central sql trace + + Ctn ConfigBBDO3 1 + Ctn Clear Retention + + ${start} Ctn Get Round Current Date + Ctn Start Broker + Ctn Start Engine + Ctn Start Agent + + # Let's wait for engine to connect to agent + ${content} Create List init from [.\\s]*127.0.0.1:4317 + ${result} Ctn Find Regex In Log With Timeout ${engineLog0} ${start} ${content} 10 + Should Be True ${result} "init from 127.0.0.1:4317" not found in log + + + ${content} Create List fifos:{"host_1,service_1" + ${result} Ctn Find In Log With Timeout ${engineLog0} ${start} ${content} 30 + Should Be True ${result} fifos not found in logs + + Ctn Schedule Forced Svc Check host_1 service_1 + + ${result} Ctn Check Service Check Status With Timeout host_1 service_1 60 ${start} 2 Test check 456 + Should Be True ${result} services table not updated + + ${start} Ctn Get Round Current Date + #service_1 check ok + Ctn Set Command Status 456 ${0} + + ${content} Create List as_int: 0 + ${result} Ctn Find In Log With Timeout ${engineLog0} ${start} ${content} 30 + Should Be True ${result} status 0 not found in logs + + Ctn Schedule Forced Svc Check host_1 service_1 + + ${result} Ctn Check Service Check Status With Timeout host_1 service_1 60 ${start} 0 Test check 456 + Should Be True ${result} services table not updated + +BEOTEL_CENTREON_AGENT_CHECK_HOST_CRYPTED + [Documentation] agent check host with encrypted connection and we expect to get it in check result + [Tags] broker engine opentelemetry MON-63843 + Ctn Config Engine ${1} ${2} ${2} + Copy File ../broker/grpc/test/grpc_test_keys/ca_1234.crt /tmp/ + Copy File ../broker/grpc/test/grpc_test_keys/server_1234.key /tmp/ + Copy File ../broker/grpc/test/grpc_test_keys/server_1234.crt /tmp/ + Ctn Add Otl ServerModule + ... 0 + ... {"otel_server":{"host": "0.0.0.0","port": 4317, "encryption": true, "public_cert": "/tmp/server_1234.crt", "private_key": "/tmp/server_1234.key", "ca_certificate": "/tmp/ca_1234.crt"},"max_length_grpc_log":0} + Ctn Config Add Otl Connector + ... 0 + ... OTEL connector + ... opentelemetry --processor=centreon_agent --extractor=attributes --host_path=resource_metrics.resource.attributes.host.name --service_path=resource_metrics.resource.attributes.service.name + Ctn Engine Config Replace Value In Hosts ${0} host_1 check_command otel_check_icmp + Ctn Engine Config Add Command + ... ${0} + ... otel_check_icmp + ... /bin/echo "OK - 127.0.0.1: rta 0,010ms, lost 0%|rta=0,010ms;200,000;500,000;0; pl=0%;40;80;; rtmax=0,035ms;;;; rtmin=0,003ms;;;;" + ... OTEL connector + + Ctn Engine Config Set Value 0 log_level_checks trace + + Ctn Config Broker central + Ctn Config Broker module + Ctn Config Broker rrd + Ctn Config Centreon Agent ${None} ${None} /tmp/ca_1234.crt + Ctn Broker Config Log central sql trace + + Ctn ConfigBBDO3 1 + Ctn Clear Retention + + ${start} Get Current Date + Ctn Start Broker + Ctn Start Engine + Ctn Start Agent + + # Let's wait for the otel server start + ${content} Create List encrypted server listening on 0.0.0.0:4317 + ${result} Ctn Find In Log With Timeout ${engineLog0} ${start} ${content} 10 + Should Be True ${result} "encrypted server listening on 0.0.0.0:4317" should be available. + Sleep 1 + + ${start} Ctn Get Round Current Date + Ctn Schedule Forced Host Check host_1 + + ${result} Ctn Check Host Check Status With Timeout host_1 30 ${start} 0 OK - 127.0.0.1 + Should Be True ${result} hosts table not updated + + + +BEOTEL_REVERSE_CENTREON_AGENT_CHECK_HOST_CRYPTED + [Documentation] agent check host with encrypted reversed connection and we expect to get it in check result + [Tags] broker engine opentelemetry MON-63843 + Ctn Config Engine ${1} ${2} ${2} + Copy File ../broker/grpc/test/grpc_test_keys/ca_1234.crt /tmp/ + Copy File ../broker/grpc/test/grpc_test_keys/server_1234.key /tmp/ + Copy File ../broker/grpc/test/grpc_test_keys/server_1234.crt /tmp/ + + Ctn Add Otl ServerModule + ... 0 + ... {"max_length_grpc_log":0,"centreon_agent":{"check_interval":10, "export_period":15, "reverse_connections":[{"host": "localhost","port": 4317, "encryption": true, "ca_certificate": "/tmp/ca_1234.crt"}]}} + + Ctn Config Add Otl Connector + ... 0 + ... OTEL connector + ... opentelemetry --processor=centreon_agent --extractor=attributes --host_path=resource_metrics.resource.attributes.host.name --service_path=resource_metrics.resource.attributes.service.name + Ctn Engine Config Replace Value In Hosts ${0} host_1 check_command otel_check_icmp + Ctn Engine Config Add Command + ... ${0} + ... otel_check_icmp + ... /bin/echo "OK - 127.0.0.1: rta 0,010ms, lost 0%|rta=0,010ms;200,000;500,000;0; pl=0%;40;80;; rtmax=0,035ms;;;; rtmin=0,003ms;;;;" + ... OTEL connector + + Ctn Engine Config Set Value 0 log_level_checks trace + + Ctn Config Broker central + Ctn Config Broker module + Ctn Config Broker rrd + Ctn Config Reverse Centreon Agent /tmp/server_1234.key /tmp/server_1234.crt /tmp/ca_1234.crt + Ctn Broker Config Log central sql trace + + Ctn ConfigBBDO3 1 + Ctn Clear Retention + + ${start} Get Current Date + Ctn Start Broker + Ctn Start Engine + Ctn Start Agent + + # Let's wait for engine to connect to agent + ${content} Create List init from localhost:4317 + ${result} Ctn Find In Log With Timeout ${engineLog0} ${start} ${content} 10 + Should Be True ${result} "init from localhost:4317" not found in log + Sleep 1 + + ${start} Ctn Get Round Current Date + Ctn Schedule Forced Host Check host_1 + + ${result} Ctn Check Host Check Status With Timeout host_1 30 ${start} 0 OK - 127.0.0.1 + Should Be True ${result} hosts table not updated + + + + *** Keywords *** Ctn Create Otl Request [Documentation] create an otl request with nagios telegraf style diff --git a/tests/resources/Agent.py b/tests/resources/Agent.py new file mode 100644 index 00000000000..4497a4453f3 --- /dev/null +++ b/tests/resources/Agent.py @@ -0,0 +1,71 @@ +#!/usr/bin/python3 +# +# Copyright 2023-2024 Centreon +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# For more information : contact@centreon.com +# + +from os import makedirs +from robot.libraries.BuiltIn import BuiltIn + +ETC_ROOT = BuiltIn().get_variable_value("${EtcRoot}") +CONF_DIR = ETC_ROOT + "/centreon-engine" + + +agent_config=""" +{ + "log_level":"trace", + "endpoint":"localhost:4317", + "host":"host_1", + "log_type":"file", + "log_file":"/tmp/var/log/centreon-engine/centreon-agent.log" """ + + +def ctn_config_centreon_agent(key_path:str = None, cert_path:str = None, ca_path:str = None): + """ctn_config_centreon_agent + Creates a default centreon agent config without encryption nor reverse connection + """ + makedirs(CONF_DIR, mode=0o777, exist_ok=True) + with open(f"{CONF_DIR}/centagent.json", "w") as ff: + ff.write(agent_config) + if key_path is not None or cert_path is not None or ca_path is not None: + ff.write(",\n \"encryption\":true") + if key_path is not None: + ff.write(f",\n \"private_key\":\"{key_path}\"") + if cert_path is not None: + ff.write(f",\n \"public_cert\":\"{cert_path}\"") + if ca_path is not None: + ff.write(f",\n \"ca_certificate\":\"{ca_path}\"") + ff.write("\n}\n") + + + +def ctn_config_reverse_centreon_agent(key_path:str = None, cert_path:str = None, ca_path:str = None): + """ctn_config_centreon_agent + Creates a default reversed centreon agent config without encryption listening on 0.0.0.0:4317 + """ + makedirs(CONF_DIR, mode=0o777, exist_ok=True) + with open(f"{CONF_DIR}/centagent.json", "w") as ff: + ff.write(agent_config) + ff.write(",\n \"reverse_connection\":true") + if key_path is not None or cert_path is not None or ca_path is not None: + ff.write(",\n \"encryption\":true") + if key_path is not None: + ff.write(f",\n \"private_key\":\"{key_path}\"") + if cert_path is not None: + ff.write(f",\n \"public_cert\":\"{cert_path}\"") + if ca_path is not None: + ff.write(f",\n \"ca_certificate\":\"{ca_path}\"") + ff.write("\n}\n") diff --git a/tests/resources/resources.resource b/tests/resources/resources.resource index f53d7f6ff08..8d4b37ed7da 100644 --- a/tests/resources/resources.resource +++ b/tests/resources/resources.resource @@ -233,6 +233,11 @@ Ctn Stop Engine Broker And Save Logs EXCEPT Log Can't kindly stop Broker END + TRY + Ctn Kindly Stop Agent + EXCEPT + Log Can't kindly stop Agent + END Ctn Save Logs If Failed Ctn Get Engine Pid @@ -283,7 +288,9 @@ Ctn Save Logs Copy Files ${rrdLog} ${failDir} Copy Files ${moduleLog0} ${failDir} Copy Files ${engineLog0} ${failDir} + Copy Files ${ENGINE_LOG}/*.log ${failDir} Copy Files ${EtcRoot}/centreon-engine/config0/*.cfg ${failDir}/etc/centreon-engine/config0 + Copy Files ${EtcRoot}/centreon-engine/*.json ${failDir}/etc/centreon-engine Copy Files ${EtcRoot}/centreon-broker/*.json ${failDir}/etc/centreon-broker Move Files /tmp/lua*.log ${failDir} @@ -384,3 +391,30 @@ Ctn Wait For Engine To Be Ready ... ${result} ... A message telling check_for_external_commands() should be available in config${i}/centengine.log. END + + +Ctn Start Agent + Start Process /usr/bin/centagent ${EtcRoot}/centreon-engine/centagent.json alias=centreon_agent + +Ctn Kindly Stop Agent + #in most case centreon_agent is not started + ${centreon_agent_process} Get Process Object centreon_agent + + IF ${{$centreon_agent_process is None}} RETURN + + Send Signal To Process SIGTERM centreon_agent + ${result} Wait For Process centreon_agent timeout=60s + # In case of process not stopping + IF "${result}" == "${None}" + Log To Console "fail to stop centreon_agent" + Ctn Save Logs + Ctn Dump Process centreon_agent /usr/bin/centagent centreon_agent + Send Signal To Process SIGKILL centreon_agent + Fail centreon_agent not correctly stopped (coredump generated) + ELSE + IF ${result.rc} != 0 + Ctn Save Logs + Ctn Coredump Info centreon_agent /usr/bin/centagent centreon_agent + Should Be Equal As Integers ${result.rc} 0 centreon_agent not correctly stopped + END + END