Skip to content

Commit

Permalink
we apply forbidden filter on default filter(all) (#1536)
Browse files Browse the repository at this point in the history
MON-144608
  • Loading branch information
jean-christophe81 committed Jul 25, 2024
1 parent 67f3c36 commit 0e93b19
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ class endpoint {
static bool loaded();

static multiplexing::muxer_filter parse_filters(
const std::set<std::string>& str_filters);
const std::set<std::string>& str_filters,
const multiplexing::muxer_filter& forbidden_filter);
};
} // namespace applier
} // namespace config
Expand Down
101 changes: 63 additions & 38 deletions broker/core/src/config/applier/endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,15 @@ endpoint::~endpoint() {
*/
void endpoint::apply(std::list<config::endpoint> const& endpoints) {
// Log messages.
_logger->info("endpoint applier: loading configuration");
SPDLOG_LOGGER_INFO(_logger, "endpoint applier: loading configuration");

{
if (_logger->level() <= spdlog::level::debug) {
std::vector<std::string> eps;
for (auto& ep : endpoints)
eps.push_back(ep.name);
_logger->debug("endpoint applier: {} endpoints to apply: {}",
endpoints.size(), fmt::format("{}", fmt::join(eps, ", ")));
SPDLOG_LOGGER_DEBUG(_logger, "endpoint applier: {} endpoints to apply: {}",
endpoints.size(),
fmt::format("{}", fmt::join(eps, ", ")));
}

// Copy endpoint configurations and apply eventual modifications.
Expand All @@ -129,8 +130,9 @@ void endpoint::apply(std::list<config::endpoint> const& endpoints) {
// resources that might be used by other endpoints.
auto it = _endpoints.find(ep);
if (it != _endpoints.end()) {
_logger->debug("endpoint applier: removing old endpoint {}",
it->first.name);
SPDLOG_LOGGER_DEBUG(_logger,
"endpoint applier: removing old endpoint {}",
it->first.name);
/* failover::exit() is called. */
it->second->exit();
delete it->second;
Expand All @@ -141,13 +143,14 @@ void endpoint::apply(std::list<config::endpoint> const& endpoints) {

// Update existing endpoints.
for (auto it = _endpoints.begin(), end = _endpoints.end(); it != end; ++it) {
_logger->debug("endpoint applier: updating endpoint {}", it->first.name);
SPDLOG_LOGGER_DEBUG(_logger, "endpoint applier: updating endpoint {}",
it->first.name);
it->second->update();
}

// Debug message.
_logger->debug("endpoint applier: {} endpoints to create",
endp_to_create.size());
SPDLOG_LOGGER_DEBUG(_logger, "endpoint applier: {} endpoints to create",
endp_to_create.size());

// Create new endpoints.
for (config::endpoint& ep : endp_to_create) {
Expand All @@ -156,7 +159,8 @@ void endpoint::apply(std::list<config::endpoint> const& endpoints) {
if (ep.name.empty() ||
std::find_if(endp_to_create.begin(), endp_to_create.end(),
name_match_failover(ep.name)) == endp_to_create.end()) {
_logger->debug("endpoint applier: creating endpoint {}", ep.name);
SPDLOG_LOGGER_DEBUG(_logger, "endpoint applier: creating endpoint {}",
ep.name);
bool is_acceptor;
std::shared_ptr<io::endpoint> e{_create_endpoint(ep, is_acceptor)};
std::unique_ptr<processing::endpoint> endp;
Expand All @@ -173,15 +177,18 @@ void endpoint::apply(std::list<config::endpoint> const& endpoints) {
* if broker sends data to map. This is needed because a failover needs
* its peer to ack events to release them (and a failover is also able
* to write data). */
multiplexing::muxer_filter r_filter = parse_filters(ep.read_filters);
multiplexing::muxer_filter w_filter = parse_filters(ep.write_filters);
multiplexing::muxer_filter r_filter =
parse_filters(ep.read_filters, e->get_stream_forbidden_filter());
multiplexing::muxer_filter w_filter =
parse_filters(ep.write_filters, e->get_stream_forbidden_filter());
if (is_acceptor) {
w_filter -= e->get_stream_forbidden_filter();
r_filter -= e->get_stream_forbidden_filter();
std::unique_ptr<processing::acceptor> acceptr(
std::make_unique<processing::acceptor>(e, ep.name, r_filter,
w_filter));
_logger->debug(
SPDLOG_LOGGER_DEBUG(
_logger,
"endpoint applier: acceptor '{}' configured with write filters: {} "
"and read filters: {}",
ep.name, w_filter.get_allowed_categories(),
Expand All @@ -193,15 +200,17 @@ void endpoint::apply(std::list<config::endpoint> const& endpoints) {
/* Are there missing events in the w_filter ? */
if (!e->get_stream_mandatory_filter().is_in(w_filter)) {
w_filter |= e->get_stream_mandatory_filter();
_logger->debug(
SPDLOG_LOGGER_DEBUG(
_logger,
"endpoint applier: The configured write filters for the endpoint "
"'{}' are too restrictive. Mandatory categories added to them",
ep.name);
}
/* Are there events in w_filter that are forbidden ? */
if (w_filter.contains_some_of(e->get_stream_forbidden_filter())) {
w_filter -= e->get_stream_forbidden_filter();
_logger->error(
SPDLOG_LOGGER_ERROR(
_logger,
"endpoint applier: The configured write filters for the endpoint "
"'{}' contain forbidden filters. These ones are removed",
ep.name);
Expand All @@ -210,13 +219,14 @@ void endpoint::apply(std::list<config::endpoint> const& endpoints) {
/* Are there events in r_filter that are forbidden ? */
if (r_filter.contains_some_of(e->get_stream_forbidden_filter())) {
r_filter -= e->get_stream_forbidden_filter();
_logger->error(
SPDLOG_LOGGER_ERROR(
_logger,
"endpoint applier: The configured read filters for the endpoint "
"'{}' contain forbidden filters. These ones are removed",
ep.name);
}
_logger->debug(
"endpoint applier: filters {} for endpoint '{}' applied.",
SPDLOG_LOGGER_DEBUG(
_logger, "endpoint applier: filters {} for endpoint '{}' applied.",
w_filter.get_allowed_categories(), ep.name);

auto mux = multiplexing::muxer::create(
Expand All @@ -230,7 +240,8 @@ void endpoint::apply(std::list<config::endpoint> const& endpoints) {
}

// Run thread.
_logger->debug(
SPDLOG_LOGGER_DEBUG(
_logger,
"endpoint applier: endpoint thread {} of '{}' is registered and "
"ready to run",
static_cast<void*>(endp.get()), ep.name);
Expand All @@ -245,22 +256,24 @@ void endpoint::apply(std::list<config::endpoint> const& endpoints) {
*/
void endpoint::_discard() {
_discarding = true;
_logger->debug("endpoint applier: destruction");
SPDLOG_LOGGER_DEBUG(_logger, "endpoint applier: destruction");

// wait for failover and feeder to push endloop event
::usleep(processing::idle_microsec_wait_idle_thread_delay + 100000);
// Exit threads.
{
_logger->debug("endpoint applier: requesting threads termination");
SPDLOG_LOGGER_DEBUG(_logger,
"endpoint applier: requesting threads termination");
std::unique_lock<std::timed_mutex> lock(_endpointsm);

// Send termination requests.
// We begin with feeders
for (auto it = _endpoints.begin(); it != _endpoints.end();) {
if (it->second->is_feeder()) {
it->second->wait_for_all_events_written(5000);
_logger->trace("endpoint applier: send exit signal to endpoint '{}'",
it->second->get_name());
SPDLOG_LOGGER_TRACE(
_logger, "endpoint applier: send exit signal to endpoint '{}'",
it->second->get_name());
delete it->second;
it = _endpoints.erase(it);
} else
Expand All @@ -270,19 +283,22 @@ void endpoint::_discard() {

// Exit threads.
{
_logger->debug("endpoint applier: requesting threads termination");
SPDLOG_LOGGER_DEBUG(_logger,
"endpoint applier: requesting threads termination");
std::unique_lock<std::timed_mutex> lock(_endpointsm);

// We continue with failovers
for (auto it = _endpoints.begin(); it != _endpoints.end();) {
it->second->wait_for_all_events_written(5000);
_logger->trace("endpoint applier: send exit signal on endpoint '{}'",
it->second->get_name());
SPDLOG_LOGGER_TRACE(_logger,
"endpoint applier: send exit signal on endpoint '{}'",
it->second->get_name());
delete it->second;
it = _endpoints.erase(it);
}

_logger->debug("endpoint applier: all threads are terminated");
SPDLOG_LOGGER_DEBUG(_logger,
"endpoint applier: all threads are terminated");
}

// Stop multiplexing: we must stop the engine after failovers otherwise
Expand Down Expand Up @@ -373,7 +389,8 @@ processing::failover* endpoint::_create_failover(
std::shared_ptr<io::endpoint> endp,
std::list<config::endpoint>& l) {
// Debug message.
_logger->info("endpoint applier: creating new failover '{}'", cfg.name);
SPDLOG_LOGGER_INFO(_logger, "endpoint applier: creating new failover '{}'",
cfg.name);

// Check that failover is configured.
std::shared_ptr<processing::failover> failovr;
Expand All @@ -382,7 +399,8 @@ processing::failover* endpoint::_create_failover(
std::list<config::endpoint>::iterator it =
std::find_if(l.begin(), l.end(), failover_match_name(front_failover));
if (it == l.end())
_logger->error(
SPDLOG_LOGGER_ERROR(
_logger,
"endpoint applier: could not find failover '{}' for endpoint '{}'",
front_failover, cfg.name);
else {
Expand Down Expand Up @@ -411,7 +429,8 @@ processing::failover* endpoint::_create_failover(
bool is_acceptor{false};
std::shared_ptr<io::endpoint> endp(_create_endpoint(*it, is_acceptor));
if (is_acceptor) {
_logger->error(
SPDLOG_LOGGER_ERROR(
_logger,
"endpoint applier: secondary failover '{}' is an acceptor and "
"cannot therefore be instantiated for endpoint '{}'",
*failover_it, cfg.name);
Expand Down Expand Up @@ -462,8 +481,8 @@ std::shared_ptr<io::endpoint> endpoint::_create_endpoint(config::endpoint& cfg,

endp = std::shared_ptr<io::endpoint>(
it->second.endpntfactry->new_endpoint(cfg, is_acceptor, cache));
_logger->info(" create endpoint {} for endpoint '{}'", it->first,
cfg.name);
SPDLOG_LOGGER_INFO(_logger, " create endpoint {} for endpoint '{}'",
it->first, cfg.name);
level = it->second.osi_to + 1;
break;
}
Expand All @@ -484,8 +503,8 @@ std::shared_ptr<io::endpoint> endpoint::_create_endpoint(config::endpoint& cfg,
(it->second.endpntfactry->has_endpoint(cfg, nullptr))) {
std::shared_ptr<io::endpoint> current(
it->second.endpntfactry->new_endpoint(cfg, is_acceptor));
_logger->info(" create endpoint {} for endpoint '{}'", it->first,
cfg.name);
SPDLOG_LOGGER_INFO(_logger, " create endpoint {} for endpoint '{}'",
it->first, cfg.name);
current->from(endp);
endp = current;
level = it->second.osi_to;
Expand Down Expand Up @@ -545,7 +564,8 @@ void endpoint::_diff_endpoints(
list_it = std::find_if(new_ep.begin(), new_ep.end(),
failover_match_name(failover));
if (list_it == new_ep.end())
_logger->error(
SPDLOG_LOGGER_ERROR(
_logger,
"endpoint applier: could not find failover '{}' for endpoint "
"'{}'",
failover, entry.name);
Expand All @@ -570,11 +590,14 @@ void endpoint::_diff_endpoints(
* Create filters from a set of categories.
*
* @param[in] cfg Endpoint configuration.
* @param[in] forbidden_filter forbidden filter applied in case of default
* filter config
*
* @return Filters.
*/
multiplexing::muxer_filter endpoint::parse_filters(
const std::set<std::string>& str_filters) {
const std::set<std::string>& str_filters,
const multiplexing::muxer_filter& forbidden_filter) {
auto logger = log_v2::instance().get(log_v2::CONFIG);
multiplexing::muxer_filter elements({});
std::forward_list<fmt::string_view> applied_filters;
Expand All @@ -595,6 +618,7 @@ multiplexing::muxer_filter endpoint::parse_filters(

if (str_filters.size() == 1 && *str_filters.begin() == "all") {
elements = multiplexing::muxer_filter();
elements -= forbidden_filter;
applied_filters.emplace_front("all");
} else {
for (auto& str : str_filters) {
Expand All @@ -610,10 +634,11 @@ multiplexing::muxer_filter endpoint::parse_filters(
}
if (applied_filters.empty() && !str_filters.empty()) {
fill_elements("all");
elements -= forbidden_filter;
applied_filters.emplace_front("all");
}
}
logger->info("Filters applied on endpoint:{}",
fmt::join(applied_filters, ", "));
SPDLOG_LOGGER_INFO(logger, "Filters applied on endpoint:{}",
fmt::join(applied_filters, ", "));
return elements;
}
Loading

3 comments on commit 0e93b19

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
0 4 0 4 0 0s

Failed Tests

Name Message ⏱️ Duration Suite
BEOTEL_TELEGRAF_CHECK_HOST "unencrypted server listening on 0.0.0.0:4317" should be available. 0.000 s Opentelemetry
BEOTEL_TELEGRAF_CHECK_SERVICE "unencrypted server listening on 0.0.0.0:4317" should be available. 0.000 s Opentelemetry
BEOTEL_SERVE_TELEGRAF_CONFIGURATION_CRYPTED "server listen on 0.0.0.0:1443" should be available. 0.000 s Opentelemetry
BEOTEL_SERVE_TELEGRAF_CONFIGURATION_NO_CRYPTED "server listen on 0.0.0.0:1443" should be available. 0.000 s Opentelemetry

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
0 4 0 4 0 0s

Failed Tests

Name Message ⏱️ Duration Suite
BEOTEL_TELEGRAF_CHECK_HOST "unencrypted server listening on 0.0.0.0:4317" should be available. 0.000 s Opentelemetry
BEOTEL_TELEGRAF_CHECK_SERVICE "unencrypted server listening on 0.0.0.0:4317" should be available. 0.000 s Opentelemetry
BEOTEL_SERVE_TELEGRAF_CONFIGURATION_CRYPTED "server listen on 0.0.0.0:1443" should be available. 0.000 s Opentelemetry
BEOTEL_SERVE_TELEGRAF_CONFIGURATION_NO_CRYPTED "server listen on 0.0.0.0:1443" should be available. 0.000 s Opentelemetry

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
0 4 0 4 0 0s

Failed Tests

Name Message ⏱️ Duration Suite
BEOTEL_TELEGRAF_CHECK_HOST "unencrypted server listening on 0.0.0.0:4317" should be available. 0.000 s Opentelemetry
BEOTEL_TELEGRAF_CHECK_SERVICE "unencrypted server listening on 0.0.0.0:4317" should be available. 0.000 s Opentelemetry
BEOTEL_SERVE_TELEGRAF_CONFIGURATION_CRYPTED "server listen on 0.0.0.0:1443" should be available. 0.000 s Opentelemetry
BEOTEL_SERVE_TELEGRAF_CONFIGURATION_NO_CRYPTED "server listen on 0.0.0.0:1443" should be available. 0.000 s Opentelemetry

Please sign in to comment.