Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust buffer size instead of cpu-per-core #1706

Merged
merged 3 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 40 additions & 29 deletions collector/lib/CollectorConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ void CollectorConfig::HandleSinspEnvVars() {

if ((envvar = std::getenv("ROX_COLLECTOR_SINSP_BUFFER_SIZE")) != NULL) {
try {
sinsp_buffer_size_ = std::stoi(envvar);
sinsp_buffer_size_ = std::stoll(envvar);
CLOG(INFO) << "Sinsp buffer size: " << sinsp_buffer_size_;
} catch (...) {
CLOG(ERROR) << "Invalid buffer size value: '" << envvar << "'";
Expand All @@ -312,7 +312,7 @@ void CollectorConfig::HandleSinspEnvVars() {

if ((envvar = std::getenv("ROX_COLLECTOR_SINSP_TOTAL_BUFFER_SIZE")) != NULL) {
try {
sinsp_total_buffer_size_ = std::stoi(envvar);
sinsp_total_buffer_size_ = std::stoll(envvar);
CLOG(INFO) << "Sinsp total buffer size: " << sinsp_buffer_size_;
} catch (...) {
CLOG(ERROR) << "Invalid total buffer size value: '" << envvar << "'";
Expand All @@ -321,7 +321,7 @@ void CollectorConfig::HandleSinspEnvVars() {

if ((envvar = std::getenv("ROX_COLLECTOR_SINSP_THREAD_CACHE_SIZE")) != NULL) {
try {
sinsp_thread_cache_size_ = std::stoi(envvar);
sinsp_thread_cache_size_ = std::stoll(envvar);
CLOG(INFO) << "Sinsp thread cache size: " << sinsp_thread_cache_size_;
} catch (...) {
CLOG(ERROR) << "Invalid thread cache size value: '" << envvar << "'";
Expand Down Expand Up @@ -382,54 +382,65 @@ std::ostream& operator<<(std::ostream& os, const CollectorConfig& c) {
<< ", enable_external_ips:" << c.EnableExternalIPs();
}

// Returns number of CPUs per buffer for the purpose of ring buffer allocation.
// Returns size of ring buffers to be allocated.
// The value is adjusted based on the allowed total limit, e.g.:
//
// * total limit for ring buffers is 1Gi
// * one buffer takes 8Mi
// * maximum allowed number of buffers is 128
// * there is one buffer per cpu
// * total number of CPU cores is 150
//
// In this scenario, n_buffers = 150 / 1 > 128, thus the adjusted value will be
// returned: ceil(150 / 128) = 2. Allocating one buffer per 2 CPU cores will
// allow us to fit into the total limit: (150 / 2) * 8Mi < 1Gi.
// In this scenario, buffers_total = (150 / cpu_per_buffer) * 8Mi > 1Gi. Thus
// the adjusted value will be returned: ceil(1Gi / 150) = 6Mi. Allocating one
// buffer of size 6Mi per CPU core will allow us to fit into the total limit.
//
unsigned int CollectorConfig::GetSinspCpuPerBuffer() const {
unsigned int n_buffers, max_n_buffers;

if (sinsp_buffer_size_ == 0) {
CLOG(WARNING) << "Trying to calculate cpu-per-buffer without"
"initialized buffer size. Return unmodified "
<< sinsp_cpu_per_buffer_ << " CPUs per buffer.";
return sinsp_cpu_per_buffer_;
unsigned int CollectorConfig::GetSinspBufferSize() const {
unsigned int max_buffer_size, effective_buffer_size, n_buffers;

if (sinsp_total_buffer_size_ == 0) {
CLOG(WARNING) << "Trying to calculate buffer size without "
"requested total buffer size. Return unmodified "
<< sinsp_buffer_size_ << " bytes.";
return sinsp_buffer_size_;
}

if (sinsp_cpu_per_buffer_ == 0) {
CLOG(WARNING) << "Trying to calculate cpu-per-buffer without "
CLOG(WARNING) << "Trying to calculate buffer size without "
"requested cpu-per-buffer. Return unmodified "
<< sinsp_cpu_per_buffer_ << " CPUs per buffer.";
return sinsp_cpu_per_buffer_;
<< sinsp_buffer_size_ << " bytes.";
return sinsp_buffer_size_;
}

if (host_config_.GetNumPossibleCPUs() == 0) {
CLOG(WARNING) << "Trying to calculate cpu-per-buffer without"
CLOG(WARNING) << "Trying to calculate buffer size without"
"number of possible CPUs. Return unmodified "
<< sinsp_cpu_per_buffer_ << " CPUs per buffer.";
return sinsp_cpu_per_buffer_;
<< sinsp_buffer_size_ << " bytes.";
return sinsp_buffer_size_;
}

// Round to the larger value, since one buffer will be allocated even if the
// last group of CPUs is less than sinsp_cpu_per_buffer_
n_buffers = std::ceil(((float)host_config_.GetNumPossibleCPUs() / (float)sinsp_cpu_per_buffer_));
max_n_buffers = std::ceil((float)sinsp_total_buffer_size_ / (float)sinsp_buffer_size_);
n_buffers = std::ceil((float)host_config_.GetNumPossibleCPUs() / (float)sinsp_cpu_per_buffer_);

// Baseline case, nothing to adjust
if (n_buffers <= max_n_buffers) {
return sinsp_cpu_per_buffer_;
max_buffer_size = std::ceil((float)sinsp_total_buffer_size_ / (float)n_buffers);

// It would be awkward to have a buffer size of e.g. 3.14159..., align by the
// highest level, e.g. MiB, KiB.
int magnitude = 0;
while (max_buffer_size >> 10) {
max_buffer_size = max_buffer_size >> 10;
magnitude++;
}

effective_buffer_size = std::min(sinsp_buffer_size_,
(max_buffer_size << magnitude * 10));

if (effective_buffer_size != sinsp_buffer_size_) {
CLOG(INFO) << "Use modified ringbuf size of "
<< effective_buffer_size << " bytes.";
}

// Otherwise reduce sinsp_cpu_per_buffer_ to fit into the total limit
return std::ceil((float)host_config_.GetNumPossibleCPUs() / (float)max_n_buffers);
return effective_buffer_size;
}

void CollectorConfig::SetSinspBufferSize(unsigned int buffer_size) {
Expand Down
4 changes: 2 additions & 2 deletions collector/lib/CollectorConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ class CollectorConfig {
const std::vector<double>& GetConnectionStatsQuantiles() const { return connection_stats_quantiles_; }
double GetConnectionStatsError() const { return connection_stats_error_; }
unsigned int GetConnectionStatsWindow() const { return connection_stats_window_; }
unsigned int GetSinspBufferSize() const { return sinsp_buffer_size_; }
unsigned int GetSinspCpuPerBuffer() const;
unsigned int GetSinspCpuPerBuffer() const { return sinsp_cpu_per_buffer_; }
unsigned int GetSinspBufferSize() const;
unsigned int GetSinspTotalBufferSize() const { return sinsp_total_buffer_size_; }
unsigned int GetSinspThreadCacheSize() const { return sinsp_thread_cache_size_; }

Expand Down
34 changes: 24 additions & 10 deletions collector/test/CollectorConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,33 @@ class MockCollectorConfig : public CollectorConfig {
}
};

TEST(CollectorConfigTest, TestSinspCpuPerBufferRaw) {
// Test that unmodified value is returned, when some dependency values are
// missing
TEST(CollectorConfigTest, TestSinspBufferSizeReturnUnmodified) {
using namespace collector;

MockCollectorConfig config;
HostConfig hconfig;
hconfig.SetNumPossibleCPUs(0);
config.MockSetSinspBufferSize(0);
config.MockSetSinspCpuPerBuffer(1);
config.MockSetSinspCpuPerBuffer(0);
config.MockSetSinspTotalBufferSize(0);
config.MockSetSinspBufferSize(1);
config.MockSetHostConfig(&hconfig);

// Buffer size is not initialized
EXPECT_EQ(1, config.GetSinspCpuPerBuffer());
// CPU-per-buffer is not initialized
EXPECT_EQ(1, config.GetSinspBufferSize());

// Number of CPUs is not initialized
config.MockSetSinspBufferSize(1024);
EXPECT_EQ(1, config.GetSinspCpuPerBuffer());
config.MockSetSinspCpuPerBuffer(1);
EXPECT_EQ(1, config.GetSinspBufferSize());

// Total buffers size is not initialized
hconfig.SetNumPossibleCPUs(1);
config.MockSetHostConfig(&hconfig);
EXPECT_EQ(1, config.GetSinspBufferSize());
}

// Test adjusted value
TEST(CollectorConfigTest, TestSinspCpuPerBufferAdjusted) {
using namespace collector;

Expand All @@ -58,12 +67,17 @@ TEST(CollectorConfigTest, TestSinspCpuPerBufferAdjusted) {
// Low number of CPUs, raw value
hconfig.SetNumPossibleCPUs(16);
config.MockSetHostConfig(&hconfig);
EXPECT_EQ(1, config.GetSinspCpuPerBuffer());
EXPECT_EQ(8 * 1024 * 1024, config.GetSinspBufferSize());

// High number of CPUs, adjusted value
// High number of CPUs, adjusted value, aligned to MiB
hconfig.SetNumPossibleCPUs(150);
config.MockSetHostConfig(&hconfig);
EXPECT_EQ(3, config.GetSinspCpuPerBuffer());
EXPECT_EQ(3 * 1024 * 1024, config.GetSinspBufferSize());

// Extreme number of CPUs, adjusted value, aligned to KiB
hconfig.SetNumPossibleCPUs(1024);
config.MockSetHostConfig(&hconfig);
EXPECT_EQ(512 * 1024, config.GetSinspBufferSize());
}

} // namespace collector
4 changes: 2 additions & 2 deletions docs/references.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ bytes. The default value is 16 MB.

* `ROX_COLLECTOR_SINSP_TOTAL_BUFFER_SIZE`: Specifies the allowed total size of
all sinsp buffer in bytes. If the actual value will be larger than that due to
number of available CPUs, `ROX_COLLECTOR_SINSP_CPU_PER_BUFFER` will be adjusted
to match the limit. The default value is 512 MB and based on the default memory
number of available CPUs, `ROX_COLLECTOR_SINSP_BUFFER_SIZE` will be adjusted to
match the limit. The default value is 512 MB and based on the default memory
limit specified for Collector DaemonSet in ACS.

* `ROX_COLLECTOR_SINSP_THREAD_CACHE_SIZE`: Puts upper limit on how many
Expand Down
Loading