From 0e93b19e361e1e9d4818e9cfc511b743acfa5e4d Mon Sep 17 00:00:00 2001 From: jean-christophe81 <98889244+jean-christophe81@users.noreply.github.com> Date: Mon, 22 Jul 2024 11:16:56 +0200 Subject: [PATCH] we apply forbidden filter on default filter(all) (#1536) MON-144608 --- .../broker/config/applier/endpoint.hh | 3 +- broker/core/src/config/applier/endpoint.cc | 101 +++++++++++------- tests/broker-engine/muxer_filter.robot | 57 +++++----- 3 files changed, 93 insertions(+), 68 deletions(-) diff --git a/broker/core/inc/com/centreon/broker/config/applier/endpoint.hh b/broker/core/inc/com/centreon/broker/config/applier/endpoint.hh index 30099ecac64..1e44b023ba8 100644 --- a/broker/core/inc/com/centreon/broker/config/applier/endpoint.hh +++ b/broker/core/inc/com/centreon/broker/config/applier/endpoint.hh @@ -84,7 +84,8 @@ class endpoint { static bool loaded(); static multiplexing::muxer_filter parse_filters( - const std::set& str_filters); + const std::set& str_filters, + const multiplexing::muxer_filter& forbidden_filter); }; } // namespace applier } // namespace config diff --git a/broker/core/src/config/applier/endpoint.cc b/broker/core/src/config/applier/endpoint.cc index 60edff28267..02c456e3ae7 100644 --- a/broker/core/src/config/applier/endpoint.cc +++ b/broker/core/src/config/applier/endpoint.cc @@ -102,14 +102,15 @@ endpoint::~endpoint() { */ void endpoint::apply(std::list const& endpoints) { // Log messages. - _logger->info("endpoint applier: loading configuration"); + SPDLOG_LOGGER_INFO(_logger, "endpoint applier: loading configuration"); - { + if (_logger->level() <= spdlog::level::debug) { std::vector eps; for (auto& ep : endpoints) eps.push_back(ep.name); - _logger->debug("endpoint applier: {} endpoints to apply: {}", - endpoints.size(), fmt::format("{}", fmt::join(eps, ", "))); + SPDLOG_LOGGER_DEBUG(_logger, "endpoint applier: {} endpoints to apply: {}", + endpoints.size(), + fmt::format("{}", fmt::join(eps, ", "))); } // Copy endpoint configurations and apply eventual modifications. @@ -129,8 +130,9 @@ void endpoint::apply(std::list const& endpoints) { // resources that might be used by other endpoints. auto it = _endpoints.find(ep); if (it != _endpoints.end()) { - _logger->debug("endpoint applier: removing old endpoint {}", - it->first.name); + SPDLOG_LOGGER_DEBUG(_logger, + "endpoint applier: removing old endpoint {}", + it->first.name); /* failover::exit() is called. */ it->second->exit(); delete it->second; @@ -141,13 +143,14 @@ void endpoint::apply(std::list const& endpoints) { // Update existing endpoints. for (auto it = _endpoints.begin(), end = _endpoints.end(); it != end; ++it) { - _logger->debug("endpoint applier: updating endpoint {}", it->first.name); + SPDLOG_LOGGER_DEBUG(_logger, "endpoint applier: updating endpoint {}", + it->first.name); it->second->update(); } // Debug message. - _logger->debug("endpoint applier: {} endpoints to create", - endp_to_create.size()); + SPDLOG_LOGGER_DEBUG(_logger, "endpoint applier: {} endpoints to create", + endp_to_create.size()); // Create new endpoints. for (config::endpoint& ep : endp_to_create) { @@ -156,7 +159,8 @@ void endpoint::apply(std::list const& endpoints) { if (ep.name.empty() || std::find_if(endp_to_create.begin(), endp_to_create.end(), name_match_failover(ep.name)) == endp_to_create.end()) { - _logger->debug("endpoint applier: creating endpoint {}", ep.name); + SPDLOG_LOGGER_DEBUG(_logger, "endpoint applier: creating endpoint {}", + ep.name); bool is_acceptor; std::shared_ptr e{_create_endpoint(ep, is_acceptor)}; std::unique_ptr endp; @@ -173,15 +177,18 @@ void endpoint::apply(std::list const& endpoints) { * if broker sends data to map. This is needed because a failover needs * its peer to ack events to release them (and a failover is also able * to write data). */ - multiplexing::muxer_filter r_filter = parse_filters(ep.read_filters); - multiplexing::muxer_filter w_filter = parse_filters(ep.write_filters); + multiplexing::muxer_filter r_filter = + parse_filters(ep.read_filters, e->get_stream_forbidden_filter()); + multiplexing::muxer_filter w_filter = + parse_filters(ep.write_filters, e->get_stream_forbidden_filter()); if (is_acceptor) { w_filter -= e->get_stream_forbidden_filter(); r_filter -= e->get_stream_forbidden_filter(); std::unique_ptr acceptr( std::make_unique(e, ep.name, r_filter, w_filter)); - _logger->debug( + SPDLOG_LOGGER_DEBUG( + _logger, "endpoint applier: acceptor '{}' configured with write filters: {} " "and read filters: {}", ep.name, w_filter.get_allowed_categories(), @@ -193,7 +200,8 @@ void endpoint::apply(std::list const& endpoints) { /* Are there missing events in the w_filter ? */ if (!e->get_stream_mandatory_filter().is_in(w_filter)) { w_filter |= e->get_stream_mandatory_filter(); - _logger->debug( + SPDLOG_LOGGER_DEBUG( + _logger, "endpoint applier: The configured write filters for the endpoint " "'{}' are too restrictive. Mandatory categories added to them", ep.name); @@ -201,7 +209,8 @@ void endpoint::apply(std::list const& endpoints) { /* Are there events in w_filter that are forbidden ? */ if (w_filter.contains_some_of(e->get_stream_forbidden_filter())) { w_filter -= e->get_stream_forbidden_filter(); - _logger->error( + SPDLOG_LOGGER_ERROR( + _logger, "endpoint applier: The configured write filters for the endpoint " "'{}' contain forbidden filters. These ones are removed", ep.name); @@ -210,13 +219,14 @@ void endpoint::apply(std::list const& endpoints) { /* Are there events in r_filter that are forbidden ? */ if (r_filter.contains_some_of(e->get_stream_forbidden_filter())) { r_filter -= e->get_stream_forbidden_filter(); - _logger->error( + SPDLOG_LOGGER_ERROR( + _logger, "endpoint applier: The configured read filters for the endpoint " "'{}' contain forbidden filters. These ones are removed", ep.name); } - _logger->debug( - "endpoint applier: filters {} for endpoint '{}' applied.", + SPDLOG_LOGGER_DEBUG( + _logger, "endpoint applier: filters {} for endpoint '{}' applied.", w_filter.get_allowed_categories(), ep.name); auto mux = multiplexing::muxer::create( @@ -230,7 +240,8 @@ void endpoint::apply(std::list const& endpoints) { } // Run thread. - _logger->debug( + SPDLOG_LOGGER_DEBUG( + _logger, "endpoint applier: endpoint thread {} of '{}' is registered and " "ready to run", static_cast(endp.get()), ep.name); @@ -245,13 +256,14 @@ void endpoint::apply(std::list const& endpoints) { */ void endpoint::_discard() { _discarding = true; - _logger->debug("endpoint applier: destruction"); + SPDLOG_LOGGER_DEBUG(_logger, "endpoint applier: destruction"); // wait for failover and feeder to push endloop event ::usleep(processing::idle_microsec_wait_idle_thread_delay + 100000); // Exit threads. { - _logger->debug("endpoint applier: requesting threads termination"); + SPDLOG_LOGGER_DEBUG(_logger, + "endpoint applier: requesting threads termination"); std::unique_lock lock(_endpointsm); // Send termination requests. @@ -259,8 +271,9 @@ void endpoint::_discard() { for (auto it = _endpoints.begin(); it != _endpoints.end();) { if (it->second->is_feeder()) { it->second->wait_for_all_events_written(5000); - _logger->trace("endpoint applier: send exit signal to endpoint '{}'", - it->second->get_name()); + SPDLOG_LOGGER_TRACE( + _logger, "endpoint applier: send exit signal to endpoint '{}'", + it->second->get_name()); delete it->second; it = _endpoints.erase(it); } else @@ -270,19 +283,22 @@ void endpoint::_discard() { // Exit threads. { - _logger->debug("endpoint applier: requesting threads termination"); + SPDLOG_LOGGER_DEBUG(_logger, + "endpoint applier: requesting threads termination"); std::unique_lock lock(_endpointsm); // We continue with failovers for (auto it = _endpoints.begin(); it != _endpoints.end();) { it->second->wait_for_all_events_written(5000); - _logger->trace("endpoint applier: send exit signal on endpoint '{}'", - it->second->get_name()); + SPDLOG_LOGGER_TRACE(_logger, + "endpoint applier: send exit signal on endpoint '{}'", + it->second->get_name()); delete it->second; it = _endpoints.erase(it); } - _logger->debug("endpoint applier: all threads are terminated"); + SPDLOG_LOGGER_DEBUG(_logger, + "endpoint applier: all threads are terminated"); } // Stop multiplexing: we must stop the engine after failovers otherwise @@ -373,7 +389,8 @@ processing::failover* endpoint::_create_failover( std::shared_ptr endp, std::list& l) { // Debug message. - _logger->info("endpoint applier: creating new failover '{}'", cfg.name); + SPDLOG_LOGGER_INFO(_logger, "endpoint applier: creating new failover '{}'", + cfg.name); // Check that failover is configured. std::shared_ptr failovr; @@ -382,7 +399,8 @@ processing::failover* endpoint::_create_failover( std::list::iterator it = std::find_if(l.begin(), l.end(), failover_match_name(front_failover)); if (it == l.end()) - _logger->error( + SPDLOG_LOGGER_ERROR( + _logger, "endpoint applier: could not find failover '{}' for endpoint '{}'", front_failover, cfg.name); else { @@ -411,7 +429,8 @@ processing::failover* endpoint::_create_failover( bool is_acceptor{false}; std::shared_ptr endp(_create_endpoint(*it, is_acceptor)); if (is_acceptor) { - _logger->error( + SPDLOG_LOGGER_ERROR( + _logger, "endpoint applier: secondary failover '{}' is an acceptor and " "cannot therefore be instantiated for endpoint '{}'", *failover_it, cfg.name); @@ -462,8 +481,8 @@ std::shared_ptr endpoint::_create_endpoint(config::endpoint& cfg, endp = std::shared_ptr( it->second.endpntfactry->new_endpoint(cfg, is_acceptor, cache)); - _logger->info(" create endpoint {} for endpoint '{}'", it->first, - cfg.name); + SPDLOG_LOGGER_INFO(_logger, " create endpoint {} for endpoint '{}'", + it->first, cfg.name); level = it->second.osi_to + 1; break; } @@ -484,8 +503,8 @@ std::shared_ptr endpoint::_create_endpoint(config::endpoint& cfg, (it->second.endpntfactry->has_endpoint(cfg, nullptr))) { std::shared_ptr current( it->second.endpntfactry->new_endpoint(cfg, is_acceptor)); - _logger->info(" create endpoint {} for endpoint '{}'", it->first, - cfg.name); + SPDLOG_LOGGER_INFO(_logger, " create endpoint {} for endpoint '{}'", + it->first, cfg.name); current->from(endp); endp = current; level = it->second.osi_to; @@ -545,7 +564,8 @@ void endpoint::_diff_endpoints( list_it = std::find_if(new_ep.begin(), new_ep.end(), failover_match_name(failover)); if (list_it == new_ep.end()) - _logger->error( + SPDLOG_LOGGER_ERROR( + _logger, "endpoint applier: could not find failover '{}' for endpoint " "'{}'", failover, entry.name); @@ -570,11 +590,14 @@ void endpoint::_diff_endpoints( * Create filters from a set of categories. * * @param[in] cfg Endpoint configuration. + * @param[in] forbidden_filter forbidden filter applied in case of default + * filter config * * @return Filters. */ multiplexing::muxer_filter endpoint::parse_filters( - const std::set& str_filters) { + const std::set& str_filters, + const multiplexing::muxer_filter& forbidden_filter) { auto logger = log_v2::instance().get(log_v2::CONFIG); multiplexing::muxer_filter elements({}); std::forward_list applied_filters; @@ -595,6 +618,7 @@ multiplexing::muxer_filter endpoint::parse_filters( if (str_filters.size() == 1 && *str_filters.begin() == "all") { elements = multiplexing::muxer_filter(); + elements -= forbidden_filter; applied_filters.emplace_front("all"); } else { for (auto& str : str_filters) { @@ -610,10 +634,11 @@ multiplexing::muxer_filter endpoint::parse_filters( } if (applied_filters.empty() && !str_filters.empty()) { fill_elements("all"); + elements -= forbidden_filter; applied_filters.emplace_front("all"); } } - logger->info("Filters applied on endpoint:{}", - fmt::join(applied_filters, ", ")); + SPDLOG_LOGGER_INFO(logger, "Filters applied on endpoint:{}", + fmt::join(applied_filters, ", ")); return elements; } diff --git a/tests/broker-engine/muxer_filter.robot b/tests/broker-engine/muxer_filter.robot index cb00f4e333e..8ed6ad8174e 100644 --- a/tests/broker-engine/muxer_filter.robot +++ b/tests/broker-engine/muxer_filter.robot @@ -6,10 +6,32 @@ Resource ../resources/import.resource Suite Setup Ctn Clean Before Suite Suite Teardown Ctn Clean After Suite Test Setup Ctn Stop Processes -Test Teardown Ctn Save Logs If Failed +Test Teardown Ctn Stop Engine Broker And Save Logs True *** Test Cases *** +NO_FILTER_NO_ERROR + [Documentation] no filter configured => no filter error. + [Tags] broker engine filter + Ctn Config Engine ${1} ${50} ${20} + Ctn Config Broker central + Ctn Config Broker module ${1} + Ctn Config Broker rrd + Ctn Broker Config Log central sql debug + Ctn Config Broker Sql Output central unified_sql + Ctn Config BBDO3 1 + Ctn Clear Broker Logs + + ${start} Get Current Date + Ctn Start Broker True + Ctn Start engine + + ${content} Create List + ... are too restrictive contain forbidden filters + ${result} Ctn Find In Log With Timeout ${centralLog} ${start} ${content} 15 + Should Not Be True ${result} An message of filter error has been found + + STUPID_FILTER [Documentation] Unified SQL is configured with only the bbdo category as filter. An error is raised by broker and broker should run correctly. [Tags] broker engine filter @@ -28,13 +50,10 @@ STUPID_FILTER Ctn Start engine ${content} Create List - ... The configured write filters for the endpoint 'central-broker-unified-sql' contain forbidden filters. These ones are removed The configured read filters for the endpoint 'central-broker-unified-sql' contain forbidden filters. These ones are removed + ... The configured write filters for the endpoint 'central-broker-unified-sql' contain forbidden filters. These ones are removed ${result} Ctn Find In Log With Timeout ${centralLog} ${start} ${content} 60 Should Be True ${result} A message telling bad filter should be available. - Ctn Stop Engine - Ctn Kindly Stop Broker True - STORAGE_ON_LUA [Documentation] The category 'storage' is applied on the stream connector. Only events of this category should be sent to this stream. [Tags] broker engine filter @@ -62,9 +81,6 @@ STORAGE_ON_LUA ${grep_res} Grep File /tmp/all_lua_event.log "category":[^3] regexp=True Should Be Empty ${grep_res} Events of category different than 'storage' found. - Ctn Stop Engine - Ctn Kindly Stop Broker True - FILTER_ON_LUA_EVENT [Documentation] stream connector with a bad configured filter generate a log error message [Tags] broker engine filter @@ -109,9 +125,6 @@ FILTER_ON_LUA_EVENT ... All the lines in all_lua_event.log should contain "_type":196620 END - Ctn Stop Engine - Ctn Kindly Stop Broker True - BAM_STREAM_FILTER [Documentation] With bbdo version 3.0.1, a BA of type 'worst' with one service is ... configured. The BA is in critical state, because of its service. we watch its events @@ -203,9 +216,6 @@ BAM_STREAM_FILTER ... centreon-bam-reporting event neb:.* rejected by write filter regexp=True Should Not Be Empty ${grep_res} We should reject events of Neb category. They are not rejected. - Ctn Stop Engine - Ctn Kindly Stop Broker True - UNIFIED_SQL_FILTER [Documentation] With bbdo version 3.0.1, we watch events written or rejected in unified_sql [Tags] broker engine bam filter @@ -240,9 +250,6 @@ UNIFIED_SQL_FILTER Should Not Be Empty ${grep_res} END - Ctn Stop Engine - Ctn Kindly Stop Broker True - CBD_RELOAD_AND_FILTERS [Documentation] We start engine/broker with a classical configuration. All is up and running. Some filters are added to the rrd output and cbd is reloaded. All is still up and running but some events are rejected. Then all is newly set as filter and all events are sent to rrd broker. [Tags] broker engine filter @@ -271,8 +278,8 @@ CBD_RELOAD_AND_FILTERS # We check that output filters to rrd are set to "all" ${content} Create List ... endpoint applier: The configured write filters for the endpoint 'centreon-broker-master-rrd' contain forbidden filters. These ones are removed - ${result} Ctn Find In Log With Timeout ${centralLog} ${start} ${content} 60 - Should Be True ${result} No message about the output filters to rrd broker. + ${result} Ctn Find In Log With Timeout ${centralLog} ${start} ${content} 15 + Should Not Be True ${result} No message about the output filters to rrd broker. # New configuration Ctn Broker Config Output Set Json central centreon-broker-master-rrd filters {"category": [ "storage"]} @@ -320,8 +327,8 @@ CBD_RELOAD_AND_FILTERS # We check that output filters to rrd are set to "all" ${content} Create List ... endpoint applier: The configured write filters for the endpoint 'centreon-broker-master-rrd' contain forbidden filters. These ones are removed - ${result} Ctn Find In Log With Timeout ${centralLog} ${start} ${content} 60 - Should Be True ${result} No message about the output filters to rrd broker. + ${result} Ctn Find In Log With Timeout ${centralLog} ${start} ${content} 15 + Should Not Be True ${result} No message about the output filters to rrd broker. ${start} Get Current Date # Let's wait for storage data written into rrd files @@ -337,9 +344,6 @@ CBD_RELOAD_AND_FILTERS ... False ... Some events are rejected by the rrd output whereas all categories are enabled. - Ctn Stop Engine - Ctn Kindly Stop Broker True - CBD_RELOAD_AND_FILTERS_WITH_OPR [Documentation] We start engine/broker with an almost classical configuration, just the connection between cbd central and cbd rrd is reversed with one peer retention. All is up and running. Some filters are added to the rrd output and cbd is reloaded. All is still up and running but some events are rejected. Then all is newly set as filter and all events are sent to rrd broker. [Tags] broker engine filter @@ -435,9 +439,6 @@ CBD_RELOAD_AND_FILTERS_WITH_OPR ... False ... Some events are rejected by the rrd output whereas all categories are enabled. - Ctn Stop Engine - Ctn Kindly Stop Broker True - SEVERAL_FILTERS_ON_LUA_EVENT [Documentation] Two stream connectors with different filters are configured. [Tags] broker engine filter @@ -507,5 +508,3 @@ SEVERAL_FILTERS_ON_LUA_EVENT ... "_type":65565 ... All the lines in all_lua_event-bis.log should contain "_type":65565 END - Ctn Stop Engine - Ctn Kindly Stop Broker True