Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ROX-25719: Receive runtime-configuration in network channel #1870

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion collector/lib/CollectorConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ BoolEnvVar set_import_users("ROX_COLLECTOR_SET_IMPORT_USERS", false);

BoolEnvVar collect_connection_status("ROX_COLLECT_CONNECTION_STATUS", true);

BoolEnvVar enable_external_ips("ROX_ENABLE_EXTERNAL_IPS", false);
BoolEnvVar enable_external_ips("ROX_EXTERNAL_IPS", false);

BoolEnvVar enable_connection_stats("ROX_COLLECTOR_ENABLE_CONNECTION_STATS", true);

Expand Down
31 changes: 30 additions & 1 deletion collector/lib/CollectorConfig.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#ifndef _COLLECTOR_CONFIG_H_
#define _COLLECTOR_CONFIG_H_

#include <optional>
#include <ostream>
#include <vector>

#include <json/json.h>

#include <grpcpp/channel.h>

#include <internalapi/sensor/collector.pb.h>

#include "CollectionMethod.h"
#include "HostConfig.h"
#include "NetworkConnection.h"
Expand Down Expand Up @@ -75,7 +78,19 @@ class CollectorConfig {
bool IsProcessesListeningOnPortsEnabled() const { return enable_processes_listening_on_ports_; }
bool ImportUsers() const { return import_users_; }
bool CollectConnectionStatus() const { return collect_connection_status_; }
bool EnableExternalIPs() const { return enable_external_ips_; }

// EnableExternalIPs will check for the existence
// of a runtime configuration, and defer to that value
// otherwise, we rely on the feature flag (env var)
bool EnableExternalIPs() const {
if (runtime_config_.has_value()) {
auto cfg = runtime_config_.value();
auto network_cfg = cfg.network_connection_config();
return network_cfg.enable_external_ips();
}
return enable_external_ips_;
}

bool EnableConnectionStats() const { return enable_connection_stats_; }
bool EnableDetailedMetrics() const { return enable_detailed_metrics_; }
bool EnableRuntimeConfig() const { return enable_runtime_config_; }
Expand All @@ -93,6 +108,14 @@ class CollectorConfig {

static std::pair<option::ArgStatus, std::string> CheckConfiguration(const char* config, Json::Value* root);

void SetRuntimeConfig(sensor::CollectorConfig runtime_config) {
runtime_config_ = std::move(runtime_config);
}

std::optional<sensor::CollectorConfig> GetRuntimeConfig() const {
return runtime_config_;
}

std::shared_ptr<grpc::Channel> grpc_channel;

protected:
Expand Down Expand Up @@ -146,6 +169,8 @@ class CollectorConfig {

std::optional<TlsConfig> tls_config_;

std::optional<sensor::CollectorConfig> runtime_config_;

void HandleAfterglowEnvVars();
void HandleConnectionStatsEnvVars();
void HandleSinspEnvVars();
Expand All @@ -155,6 +180,10 @@ class CollectorConfig {
void SetSinspTotalBufferSize(unsigned int total_buffer_size);
void SetSinspCpuPerBuffer(unsigned int buffer_size);
void SetHostConfig(HostConfig* config);

void SetEnableExternalIPs(bool value) {
enable_external_ips_ = value;
}
};

std::ostream& operator<<(std::ostream& os, const CollectorConfig& c);
Expand Down
28 changes: 14 additions & 14 deletions collector/lib/DuplexGRPC.h
Original file line number Diff line number Diff line change
Expand Up @@ -529,11 +529,11 @@ class DuplexClientWriter : public DuplexClient, public IDuplexClientWriter<W> {

// Write methods.

Result Write(const W& obj, const gpr_timespec& deadline) {
Result Write(const W& obj, const gpr_timespec& deadline) override {
return DoSync<const W&>(&DuplexClientWriter::WriteAsyncInternal, obj, deadline);
}

Result WriteAsync(const W& obj) {
Result WriteAsync(const W& obj) override {
return Result(WriteAsyncInternal(obj));
}

Expand Down Expand Up @@ -732,14 +732,14 @@ class StdoutDuplexClientWriter : public IDuplexClientWriter<W> {

// Write methods.

Result Write(const W& obj, const gpr_timespec& deadline) {
Result Write(const W& obj, const gpr_timespec& deadline) override {
std::string output;
google::protobuf::util::MessageToJsonString(obj, &output, google::protobuf::util::JsonPrintOptions{});
CLOG(DEBUG) << "GRPC: " << output;
return Result(Status::OK);
}

Result WriteAsync(const W& obj) {
Result WriteAsync(const W& obj) override {
std::string output;
google::protobuf::util::MessageToJsonString(obj, &output, google::protobuf::util::JsonPrintOptions{});
CLOG(DEBUG) << "GRPC: " << output;
Expand All @@ -751,41 +751,41 @@ class StdoutDuplexClientWriter : public IDuplexClientWriter<W> {
return Result(Status::OK);
}

bool Sleep(const gpr_timespec& deadline) {
bool Sleep(const gpr_timespec& deadline) override {
return true;
}

Result WritesDoneAsync() {
Result WritesDoneAsync() override {
SimonBaeumer marked this conversation as resolved.
Show resolved Hide resolved
return Result(Status::OK);
}

Result WritesDone(const gpr_timespec& deadline) {
Result WritesDone(const gpr_timespec& deadline) override {
return Result(Status::OK);
}

Result FinishAsync() {
Result FinishAsync() override {
return Result(Status::OK);
}

Result WaitUntilStarted(const gpr_timespec& deadline) {
Result WaitUntilStarted(const gpr_timespec& deadline) override {
return Result(Status::OK);
}

Result WaitUntilFinished(const gpr_timespec& deadline) {
Result WaitUntilFinished(const gpr_timespec& deadline) override {
return Result(Status::OK);
}

Result Finish(grpc::Status* status, const gpr_timespec& deadline) {
Result Finish(grpc::Status* status, const gpr_timespec& deadline) override {
return Result(Status::OK);
}

grpc::Status Finish(const gpr_timespec& deadline) {
grpc::Status Finish(const gpr_timespec& deadline) override {
return grpc::Status(grpc::StatusCode::OK, "Ok");
}

void TryCancel() {}
void TryCancel() override {}

Result Shutdown() {
Result Shutdown() override {
return Result(Status::OK);
}

Expand Down
4 changes: 2 additions & 2 deletions collector/lib/NetworkConnectionInfoServiceComm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ void NetworkConnectionInfoServiceComm::TryCancel() {
}
}

std::unique_ptr<IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>> NetworkConnectionInfoServiceComm::PushNetworkConnectionInfoOpenStream(std::function<void(const sensor::NetworkFlowsControlMessage*)> receive_func) {
std::unique_ptr<IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>> NetworkConnectionInfoServiceComm::PushNetworkConnectionInfoOpenStream(std::function<void(const sensor::MsgToCollector*)> receive_func) {
if (!context_) {
ResetClientContext();
}

if (channel_) {
return DuplexClient::CreateWithReadCallback(
&sensor::NetworkConnectionInfoService::Stub::AsyncPushNetworkConnectionInfo,
&sensor::NetworkConnectionInfoService::Stub::AsyncCommunicate,
channel_, context_.get(), std::move(receive_func));
} else {
return MakeUnique<collector::grpc_duplex_impl::StdoutDuplexClientWriter<sensor::NetworkConnectionInfoMessage>>();
Expand Down
4 changes: 2 additions & 2 deletions collector/lib/NetworkConnectionInfoServiceComm.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class INetworkConnectionInfoServiceComm {

virtual sensor::NetworkConnectionInfoService::StubInterface* GetStub() = 0;

virtual std::unique_ptr<IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>> PushNetworkConnectionInfoOpenStream(std::function<void(const sensor::NetworkFlowsControlMessage*)> receive_func) = 0;
virtual std::unique_ptr<IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>> PushNetworkConnectionInfoOpenStream(std::function<void(const sensor::MsgToCollector*)> receive_func) = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I think it is dangerous to change the existing message, this will break installations which are using Collector and Sensor in different versions.
Is this something we have done before like this? What was the expierence? (e.g. support cases)

Preferably we find a solution which is backward compatible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As Sensor and Collector are deployed using the same Daemonset, we usually consider that a version mismatch can only be temporary. Introducing a new method here will make Sensor reply with a "method not found" if there is any mismatch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As Sensor and Collector are deployed using the same Daemonset

Are they? Sensor has its own Deployment.

Copy link
Member

@SimonBaeumer SimonBaeumer Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erthalion I think @ovalenti refers to the same Secured Cluster deployment, in it they share the same version. I think its better to continue the discussion in the slack thread to not loose context.

};

class NetworkConnectionInfoServiceComm : public INetworkConnectionInfoServiceComm {
Expand All @@ -43,7 +43,7 @@ class NetworkConnectionInfoServiceComm : public INetworkConnectionInfoServiceCom
return stub_.get();
}

std::unique_ptr<IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>> PushNetworkConnectionInfoOpenStream(std::function<void(const sensor::NetworkFlowsControlMessage*)> receive_func) override;
std::unique_ptr<IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>> PushNetworkConnectionInfoOpenStream(std::function<void(const sensor::MsgToCollector*)> receive_func) override;

private:
static constexpr char kHostnameMetadataKey[] = "rox-collector-hostname";
Expand Down
4 changes: 2 additions & 2 deletions collector/lib/NetworkStatusNotifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ std::vector<IPNet> readNetworks(const std::string& networks, Address::Family fam
return ip_nets;
}

void NetworkStatusNotifier::OnRecvControlMessage(const sensor::NetworkFlowsControlMessage* msg) {
void NetworkStatusNotifier::OnRecvMessage(const sensor::MsgToCollector* msg) {
if (!msg) {
return;
}
Expand Down Expand Up @@ -118,7 +118,7 @@ void NetworkStatusNotifier::Run() {
break;
}

auto client_writer = comm_->PushNetworkConnectionInfoOpenStream([this](const sensor::NetworkFlowsControlMessage* msg) { OnRecvControlMessage(msg); });
auto client_writer = comm_->PushNetworkConnectionInfoOpenStream([this](const sensor::MsgToCollector* msg) { OnRecvMessage(msg); });

if (enable_afterglow_) {
RunSingleAfterglow(client_writer.get());
Expand Down
2 changes: 1 addition & 1 deletion collector/lib/NetworkStatusNotifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class NetworkStatusNotifier : protected ProtoAllocator<sensor::NetworkConnection
sensor::NetworkAddress* EndpointToProto(const Endpoint& endpoint);
storage::NetworkProcessUniqueKey* ProcessToProto(const collector::IProcess& process);

void OnRecvControlMessage(const sensor::NetworkFlowsControlMessage* msg);
void OnRecvMessage(const sensor::MsgToCollector* msg);

void Run();
void WaitUntilWriterStarted(IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>* writer, int wait_time);
Expand Down
2 changes: 1 addition & 1 deletion collector/proto/third_party/stackrox
Submodule stackrox updated 2004 files
58 changes: 58 additions & 0 deletions collector/test/CollectorConfigTest.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <optional>

#include "CollectorArgs.h"
#include "CollectorConfig.h"
#include "gmock/gmock.h"
Expand Down Expand Up @@ -26,6 +28,10 @@ class MockCollectorConfig : public CollectorConfig {
void MockSetSinspCpuPerBuffer(unsigned int value) {
SetSinspCpuPerBuffer(value);
}

void MockSetEnableExternalIPs(bool value) {
SetEnableExternalIPs(value);
}
};

// Test that unmodified value is returned, when some dependency values are
Expand Down Expand Up @@ -87,4 +93,56 @@ TEST(CollectorConfigTest, TestSinspCpuPerBufferAdjusted) {
EXPECT_EQ(16384, config.GetSinspBufferSize());
}

TEST(CollectorConfigTest, TestSetRuntimeConfig) {
MockCollectorConfig config;

EXPECT_EQ(std::nullopt, config.GetRuntimeConfig());

sensor::CollectorConfig runtime_config;

config.SetRuntimeConfig(runtime_config);

EXPECT_NE(std::nullopt, config.GetRuntimeConfig());
}

TEST(CollectorConfigTest, TestEnableExternalIpsFeatureFlag) {
MockCollectorConfig config;

// without the presence of the runtime configuration
// the enable_external_ips_ flag should be used

config.MockSetEnableExternalIPs(false);

EXPECT_FALSE(config.EnableExternalIPs());

config.MockSetEnableExternalIPs(true);

EXPECT_TRUE(config.EnableExternalIPs());
}

TEST(CollectorConfigTest, TestEnableExternalIpsRuntimeConfig) {
MockCollectorConfig config;

// With the presence of runtime config, the feature
// flag should be ignored

config.MockSetEnableExternalIPs(true);

sensor::CollectorConfig runtime_config;
sensor::NetworkConnectionConfig* network_config = runtime_config.mutable_network_connection_config();

network_config->set_enable_external_ips(false);

config.SetRuntimeConfig(runtime_config);

EXPECT_FALSE(config.EnableExternalIPs());

config.MockSetEnableExternalIPs(false);

network_config->set_enable_external_ips(true);
config.SetRuntimeConfig(runtime_config);

EXPECT_TRUE(config.EnableExternalIPs());
}

} // namespace collector
10 changes: 5 additions & 5 deletions collector/test/NetworkStatusNotifierTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class MockNetworkConnectionInfoServiceComm : public INetworkConnectionInfoServic
MOCK_METHOD(bool, WaitForConnectionReady, (const std::function<bool()>& check_interrupted), (override));
MOCK_METHOD(void, TryCancel, (), (override));
MOCK_METHOD(sensor::NetworkConnectionInfoService::StubInterface*, GetStub, (), (override));
MOCK_METHOD(std::unique_ptr<IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>>, PushNetworkConnectionInfoOpenStream, (std::function<void(const sensor::NetworkFlowsControlMessage*)> receive_func), (override));
MOCK_METHOD(std::unique_ptr<IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>>, PushNetworkConnectionInfoOpenStream, (std::function<void(const sensor::MsgToCollector*)> receive_func), (override));
};

/* gRPC payload objects are not strictly the ones of our internal model.
Expand Down Expand Up @@ -201,7 +201,7 @@ TEST(NetworkStatusNotifier, SimpleStartStop) {
We return an object that will get called when connections and endpoints are reported */
EXPECT_CALL(*comm, PushNetworkConnectionInfoOpenStream)
.Times(1)
.WillOnce([&sem, &running](std::function<void(const sensor::NetworkFlowsControlMessage*)> receive_func) -> std::unique_ptr<IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>> {
.WillOnce([&sem, &running](std::function<void(const sensor::MsgToCollector*)> receive_func) -> std::unique_ptr<IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>> {
auto duplex_writer = MakeUnique<MockDuplexClientWriter>();

// the service is sending Sensor a message
Expand Down Expand Up @@ -250,7 +250,7 @@ TEST(NetworkStatusNotifier, UpdateIPnoAfterglow) {
std::shared_ptr<MockConnScraper> conn_scraper = std::make_shared<MockConnScraper>();
auto conn_tracker = std::make_shared<ConnectionTracker>();
auto comm = std::make_shared<MockNetworkConnectionInfoServiceComm>();
std::function<void(const sensor::NetworkFlowsControlMessage*)> network_flows_callback;
std::function<void(const sensor::MsgToCollector*)> network_flows_callback;
Semaphore sem(0); // to wait for the service to accomplish its job.

// the connection as scrapped (public)
Expand All @@ -274,7 +274,7 @@ TEST(NetworkStatusNotifier, UpdateIPnoAfterglow) {
&running,
&conn2,
&conn3,
&network_flows_callback](std::function<void(const sensor::NetworkFlowsControlMessage*)> receive_func) -> std::unique_ptr<IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>> {
&network_flows_callback](std::function<void(const sensor::MsgToCollector*)> receive_func) -> std::unique_ptr<IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>> {
auto duplex_writer = MakeUnique<MockDuplexClientWriter>();
network_flows_callback = receive_func;

Expand All @@ -301,7 +301,7 @@ TEST(NetworkStatusNotifier, UpdateIPnoAfterglow) {
.WillOnce(ReturnPointee(&running)) // first time, we let the scrapper do its job
.WillOnce([&running, &network_flows_callback](const gpr_timespec& deadline) {
// The connection is known now, let's declare a "known network"
sensor::NetworkFlowsControlMessage msg;
sensor::MsgToCollector msg;
unsigned char content[] = {139, 45, 0, 0, 16}; // address in network order, plus prefix length
std::string network((char*)content, sizeof(content));

Expand Down
2 changes: 1 addition & 1 deletion integration-tests/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ARG TEST_ROOT="/tests"

FROM golang:1.21 as builder
FROM golang:1.22 as builder

ARG TEST_ROOT

Expand Down
Loading