Skip to content

Commit

Permalink
Refactor RunSingle and RunSingleAfterglow into a single method (#1867)
Browse files Browse the repository at this point in the history
  • Loading branch information
Molter73 authored Sep 30, 2024
1 parent d81d96b commit b346e7a
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 90 deletions.
47 changes: 17 additions & 30 deletions collector/lib/CollectorConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ StringListEnvVar ignored_networks("ROX_IGNORE_NETWORKS", std::vector<std::string
// Connection endpoints matching a network prefix listed here will never be aggregated.
StringListEnvVar non_aggregated_networks("ROX_NON_AGGREGATED_NETWORKS", std::vector<std::string>());

BoolEnvVar set_enable_afterglow("ROX_ENABLE_AFTERGLOW", true);
BoolEnvVar enable_afterglow("ROX_ENABLE_AFTERGLOW", true);
FloatEnvVar afterglow_period("ROX_AFTERGLOW_PERIOD", 300.0);

BoolEnvVar set_enable_core_dump("ENABLE_CORE_DUMP", false);

Expand Down Expand Up @@ -287,41 +288,27 @@ void CollectorConfig::InitCollectorConfig(CollectorArgs* args) {
}

void CollectorConfig::HandleAfterglowEnvVars() {
if (!set_enable_afterglow) {
enable_afterglow_ = false;
}

if (const char* afterglow_period = std::getenv("ROX_AFTERGLOW_PERIOD")) {
afterglow_period_micros_ = static_cast<int64_t>(atof(afterglow_period) * 1000000);
}
constexpr int64_t SECOND = 1'000'000;
constexpr int64_t max_afterglow_period_micros = 300 * SECOND; // 5 minutes

const int64_t max_afterglow_period_micros = 300000000; // 5 minutes

if (afterglow_period_micros_ > max_afterglow_period_micros) {
CLOG(ERROR) << "User set afterglow period of " << afterglow_period_micros_ / 1000000
<< "s is greater than the maximum allowed afterglow period of " << max_afterglow_period_micros / 1000000 << "s";
CLOG(ERROR) << "Setting the afterglow period to " << max_afterglow_period_micros / 1000000 << "s";
afterglow_period_micros_ = max_afterglow_period_micros;
}

if (enable_afterglow_ && afterglow_period_micros_ > 0) {
CLOG(INFO) << "Afterglow is enabled";
return;
}

if (!enable_afterglow_) {
CLOG(INFO) << "Afterglow is disabled";
return;
}
afterglow_period_micros_ = static_cast<uint64_t>(afterglow_period.value() * SECOND);

if (afterglow_period_micros_ < 0) {
CLOG(ERROR) << "Invalid afterglow period " << afterglow_period_micros_ / 1000000 << ". ROX_AFTERGLOW_PERIOD must be positive.";
CLOG(ERROR) << "Invalid afterglow period " << afterglow_period_micros_ / SECOND << ". ROX_AFTERGLOW_PERIOD must be positive.";
} else if (afterglow_period_micros_ == 0) {
CLOG(ERROR) << "Afterglow period set to 0.";
} else {
CLOG(ERROR) << "Afterglow period set to 0";
if (afterglow_period_micros_ > max_afterglow_period_micros) {
CLOG(WARNING) << "User set afterglow period of " << afterglow_period_micros_ / SECOND
<< "s is greater than the maximum allowed afterglow period of " << max_afterglow_period_micros / SECOND << "s";
CLOG(WARNING) << "Setting the afterglow period to " << max_afterglow_period_micros / SECOND << "s";
afterglow_period_micros_ = max_afterglow_period_micros;
}

enable_afterglow_ = enable_afterglow.value();
}

enable_afterglow_ = false;
CLOG(INFO) << "Disabling afterglow";
CLOG(INFO) << "Afterglow is " << (enable_afterglow_ ? "enabled" : "disabled");
}

void CollectorConfig::HandleConnectionStatsEnvVars() {
Expand Down
4 changes: 2 additions & 2 deletions collector/lib/CollectorConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ class CollectorConfig {
std::vector<IPNet> non_aggregated_networks_;

HostConfig host_config_;
int64_t afterglow_period_micros_ = 300000000; // 5 minutes in microseconds
bool enable_afterglow_ = true;
int64_t afterglow_period_micros_ = 300'000'000; // 5 minutes in microseconds
bool enable_afterglow_ = false;
bool enable_core_dump_ = false;
bool enable_processes_listening_on_ports_;
bool import_users_;
Expand Down
8 changes: 8 additions & 0 deletions collector/lib/EnvVar.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,21 @@ struct ParsePath {
return true;
}
};

struct ParseFloat {
bool operator()(float* out, const std::string& str_val) {
*out = std::stof(str_val);
return true;
}
};
} // namespace internal

using BoolEnvVar = EnvVar<bool, internal::ParseBool>;
using StringListEnvVar = EnvVar<std::vector<std::string>, internal::ParseStringList>;
using StringEnvVar = EnvVar<std::string, internal::ParseString>;
using IntEnvVar = EnvVar<int, internal::ParseInt>;
using PathEnvVar = EnvVar<std::filesystem::path, internal::ParsePath>;
using FloatEnvVar = EnvVar<float, internal::ParseFloat>;

} // namespace collector

Expand Down
76 changes: 19 additions & 57 deletions collector/lib/NetworkStatusNotifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,7 @@ void NetworkStatusNotifier::Run() {

auto client_writer = comm_->PushNetworkConnectionInfoOpenStream([this](const sensor::NetworkFlowsControlMessage* msg) { OnRecvControlMessage(msg); });

if (enable_afterglow_) {
RunSingleAfterglow(client_writer.get());
} else {
RunSingle(client_writer.get());
}
RunSingle(client_writer.get());
if (thread_.should_stop()) {
return;
}
Expand Down Expand Up @@ -222,88 +218,52 @@ bool NetworkStatusNotifier::UpdateAllConnsAndEndpoints() {
void NetworkStatusNotifier::RunSingle(IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>* writer) {
WaitUntilWriterStarted(writer, 10);

ConnMap old_conn_state;
AdvertisedEndpointMap old_cep_state;
auto next_scrape = std::chrono::system_clock::now();

while (writer->Sleep(next_scrape)) {
next_scrape = std::chrono::system_clock::now() + std::chrono::seconds(scrape_interval_);

if (!UpdateAllConnsAndEndpoints()) {
continue;
}

ReportConnectionStats();

const sensor::NetworkConnectionInfoMessage* msg;
ConnMap new_conn_state;
AdvertisedEndpointMap new_cep_state;
WITH_TIMER(CollectorStats::net_fetch_state) {
new_conn_state = conn_tracker_->FetchConnState(true, true);
ConnectionTracker::ComputeDelta(new_conn_state, &old_conn_state);

new_cep_state = conn_tracker_->FetchEndpointState(true, true);
ConnectionTracker::ComputeDelta(new_cep_state, &old_cep_state);
}

WITH_TIMER(CollectorStats::net_create_message) {
msg = CreateInfoMessage(old_conn_state, old_cep_state);
old_conn_state = std::move(new_conn_state);
old_cep_state = std::move(new_cep_state);
}

if (!msg) {
continue;
}

WITH_TIMER(CollectorStats::net_write_message) {
if (!writer->Write(*msg, next_scrape)) {
CLOG(ERROR) << "Failed to write network connection info";
return;
}
}
}
}

void NetworkStatusNotifier::RunSingleAfterglow(IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>* writer) {
WaitUntilWriterStarted(writer, 10);

ConnMap old_conn_state;
AdvertisedEndpointMap old_cep_state;
auto next_scrape = std::chrono::system_clock::now();
int64_t time_at_last_scrape = NowMicros();

while (writer->Sleep(next_scrape)) {
CLOG(DEBUG) << "Starting network status notification";
next_scrape = std::chrono::system_clock::now() + std::chrono::seconds(scrape_interval_);

if (!UpdateAllConnsAndEndpoints()) {
CLOG(DEBUG) << "No connection or endpoint to report";
continue;
}

ReportConnectionStats();

int64_t time_micros = NowMicros();
const sensor::NetworkConnectionInfoMessage* msg;
AdvertisedEndpointMap new_cep_state;
ConnMap new_conn_state, delta_conn;
AdvertisedEndpointMap new_cep_state;
WITH_TIMER(CollectorStats::net_fetch_state) {
new_conn_state = conn_tracker_->FetchConnState(true, true);
ConnectionTracker::ComputeDeltaAfterglow(new_conn_state, old_conn_state, delta_conn, time_micros, time_at_last_scrape, afterglow_period_micros_);
if (enable_afterglow_) {
ConnectionTracker::ComputeDeltaAfterglow(new_conn_state, old_conn_state, delta_conn, time_micros, time_at_last_scrape, afterglow_period_micros_);
} else {
ConnectionTracker::ComputeDelta(new_conn_state, &old_conn_state);
}

new_cep_state = conn_tracker_->FetchEndpointState(true, true);
ConnectionTracker::ComputeDelta(new_cep_state, &old_cep_state);
}

WITH_TIMER(CollectorStats::net_create_message) {
// Report the deltas
msg = CreateInfoMessage(delta_conn, old_cep_state);
// Add new connections to the old_state and remove inactive connections that are older than the afterglow period.
ConnectionTracker::UpdateOldState(&old_conn_state, new_conn_state, time_micros, afterglow_period_micros_);
if (enable_afterglow_) {
msg = CreateInfoMessage(delta_conn, old_cep_state);
ConnectionTracker::UpdateOldState(&old_conn_state, new_conn_state, time_micros, afterglow_period_micros_);
} else {
msg = CreateInfoMessage(old_conn_state, old_cep_state);
old_conn_state = std::move(new_conn_state);
}
old_cep_state = std::move(new_cep_state);
time_at_last_scrape = time_micros;
}

if (!msg) {
CLOG(DEBUG) << "No update to report";
continue;
}

Expand All @@ -313,6 +273,8 @@ void NetworkStatusNotifier::RunSingleAfterglow(IDuplexClientWriter<sensor::Netwo
return;
}
}

CLOG(DEBUG) << "Network status notification done";
}
}

Expand Down
1 change: 0 additions & 1 deletion collector/lib/NetworkStatusNotifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class NetworkStatusNotifier : protected ProtoAllocator<sensor::NetworkConnection
void WaitUntilWriterStarted(IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>* writer, int wait_time);
bool UpdateAllConnsAndEndpoints();
void RunSingle(IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>* writer);
void RunSingleAfterglow(IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>* writer);
void ReceivePublicIPs(const sensor::IPAddressList& public_ips);
void ReceiveIPNetworks(const sensor::IPNetworkList& networks);

Expand Down
2 changes: 2 additions & 0 deletions integration-tests/container/schedule-curls/schedule-curls.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/usr/bin/env sh

set -ex

num_meta_iter=$1
num_iter=$2
sleep_between_curl_time=$3
Expand Down

0 comments on commit b346e7a

Please sign in to comment.