Skip to content

Commit

Permalink
Update GetAgentStatus and kernel header UDTF to allow kelvin filter…
Browse files Browse the repository at this point in the history
…ing (#2061)

Summary: Update `GetAgentStatus` and kernel header UDTF to allow kelvin
filtering

In order to leverage the `GetAgentStatus`'s `kernel_headers_installed`
column for #2051, it would be convenient for the the UDTF to provide the
ability to filter kelvins out -- they don't have access to kernel
headers since they don't have the host filesystem volume mounted. This
change introduces an `include_kelvin` init argument to the UDTFs with a
default of `true` to preserve the existing behavior.

This change also fixes a bug with UDTF's init arg default values, which
didn't work prior to this change. Please review commit by commit to see
the default arg bug fix followed by the UDTF changes.

Relevant Issues: #2051

Type of change: /kind bug

Test Plan: New logical planner test no longer fails with the following
error
```
$ bazel test -c opt src/carnot/planner:logical_planner_test --test_output=all

[ RUN      ] LogicalPlannerTest.one_pems_one_kelvin
src/carnot/planner/logical_planner_test.cc:64: Failure
Value of: IsOK(::px::StatusAdapter(__status_or_value__64))
  Actual: false (Invalid Argument : DATA_TYPE_UNKNOWN not handled as a default value)
Expected: true
```
  • Loading branch information
ddelnano authored Dec 16, 2024
1 parent a95d661 commit 1b14e8c
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 14 deletions.
18 changes: 18 additions & 0 deletions src/carnot/planner/logical_planner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,24 @@ TEST_F(LogicalPlannerTest, AppendSelfTest) {
EXPECT_OK(plan->ToProto());
}

constexpr char kAgentStatusQuery[] = R"pxl(
import px
# GetAgentStatus takes an include_kelvin argument. This defaults to True
# to preserve backwards compatibility.
px.display(px.GetAgentStatus())
)pxl";

TEST_F(LogicalPlannerTest, UDTFDefaultArgumentTest) {
auto planner = LogicalPlanner::Create(info_).ConsumeValueOrDie();
auto plan_or_s = planner->Plan(
MakeQueryRequest(testutils::CreateTwoPEMsOneKelvinPlannerState(testutils::kHttpEventsSchema),
kAgentStatusQuery));
EXPECT_OK(plan_or_s);
auto plan = plan_or_s.ConsumeValueOrDie();
EXPECT_OK(plan->ToProto());
}

constexpr char kPlannerQueryError[] = R"pxl(
import px
Expand Down
6 changes: 6 additions & 0 deletions src/carnot/udf/registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,25 @@ void DefaultToScalarValue(const UDTFArg&, planpb::ScalarValue*) {
template <>
void DefaultToScalarValue<types::BOOLEAN>(const UDTFArg& arg, planpb::ScalarValue* out) {
out->set_bool_value(arg.GetDefaultValue<types::BOOLEAN>().val);
out->set_data_type(types::BOOLEAN);
}

template <>
void DefaultToScalarValue<types::INT64>(const UDTFArg& arg, planpb::ScalarValue* out) {
out->set_int64_value(arg.GetDefaultValue<types::INT64>().val);
out->set_data_type(types::INT64);
}

template <>
void DefaultToScalarValue<types::TIME64NS>(const UDTFArg& arg, planpb::ScalarValue* out) {
out->set_time64_ns_value(arg.GetDefaultValue<types::TIME64NS>().val);
out->set_data_type(types::TIME64NS);
}

template <>
void DefaultToScalarValue<types::FLOAT64>(const UDTFArg& arg, planpb::ScalarValue* out) {
out->set_float64_value(arg.GetDefaultValue<types::FLOAT64>().val);
out->set_data_type(types::FLOAT64);
}

template <>
Expand All @@ -116,11 +120,13 @@ void DefaultToScalarValue<types::UINT128>(const UDTFArg& arg, planpb::ScalarValu

out_val->set_high(casted_arg.High64());
out_val->set_high(casted_arg.Low64());
out->set_data_type(types::UINT128);
}

template <>
void DefaultToScalarValue<types::STRING>(const UDTFArg& arg, planpb::ScalarValue* out) {
out->set_string_value(std::string(arg.GetDefaultValue<types::STRING>()));
out->set_data_type(types::STRING);
}
} // namespace

Expand Down
1 change: 1 addition & 0 deletions src/carnot/udf/registry_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ udtfs {
semantic_type: ST_NONE
default_value {
int64_value: 123
data_type: INT64
}
}
args {
Expand Down
49 changes: 35 additions & 14 deletions src/vizier/funcs/md_udtfs/md_udtfs_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,10 @@ class GetAgentStatus final : public carnot::udf::UDTF<GetAgentStatus> {
kKernelHeadersInstalledDesc));
}

Status Init(FunctionContext*) {
Status Init(FunctionContext*, types::BoolValue include_kelvin) {
px::vizier::services::metadata::AgentInfoRequest req;
resp_ = std::make_unique<px::vizier::services::metadata::AgentInfoResponse>();
include_kelvin_ = include_kelvin.val;

grpc::ClientContext ctx;
add_context_authentication_func_(&ctx);
Expand All @@ -317,6 +318,11 @@ class GetAgentStatus final : public carnot::udf::UDTF<GetAgentStatus> {
return Status::OK();
}

static constexpr auto InitArgs() {
return MakeArray(UDTFArg::Make<types::BOOLEAN>(
"include_kelvin", "Whether to include Kelvin agents in the output", true));
}

bool NextRecord(FunctionContext*, RecordWriter* rw) {
const auto& agent_metadata = resp_->info(idx_);
const auto& agent_info = agent_metadata.agent();
Expand All @@ -329,22 +335,26 @@ class GetAgentStatus final : public carnot::udf::UDTF<GetAgentStatus> {
}
// TODO(zasgar): Figure out abort mechanism;

rw->Append<IndexOf("agent_id")>(absl::MakeUint128(u.ab, u.cd));
rw->Append<IndexOf("asid")>(agent_info.asid());
rw->Append<IndexOf("hostname")>(agent_info.info().host_info().hostname());
rw->Append<IndexOf("ip_address")>(agent_info.info().ip_address());
rw->Append<IndexOf("agent_state")>(StringValue(magic_enum::enum_name(agent_status.state())));
rw->Append<IndexOf("create_time")>(agent_info.create_time_ns());
rw->Append<IndexOf("last_heartbeat_ns")>(agent_status.ns_since_last_heartbeat());
rw->Append<IndexOf("kernel_headers_installed")>(
agent_info.info().host_info().kernel_headers_installed());
auto host_info = agent_info.info().host_info();
auto collects_data = agent_info.info().capabilities().collects_data();
if (collects_data || include_kelvin_) {
rw->Append<IndexOf("agent_id")>(absl::MakeUint128(u.ab, u.cd));
rw->Append<IndexOf("asid")>(agent_info.asid());
rw->Append<IndexOf("hostname")>(host_info.hostname());
rw->Append<IndexOf("ip_address")>(agent_info.info().ip_address());
rw->Append<IndexOf("agent_state")>(StringValue(magic_enum::enum_name(agent_status.state())));
rw->Append<IndexOf("create_time")>(agent_info.create_time_ns());
rw->Append<IndexOf("last_heartbeat_ns")>(agent_status.ns_since_last_heartbeat());
rw->Append<IndexOf("kernel_headers_installed")>(host_info.kernel_headers_installed());
}

++idx_;
return idx_ < resp_->info_size();
}

private:
int idx_ = 0;
bool include_kelvin_ = false;
std::unique_ptr<px::vizier::services::metadata::AgentInfoResponse> resp_;
std::shared_ptr<MDSStub> stub_;
std::function<void(grpc::ClientContext*)> add_context_authentication_func_;
Expand Down Expand Up @@ -425,9 +435,10 @@ class GetLinuxHeadersStatus final : public carnot::udf::UDTF<GetLinuxHeadersStat
kKernelHeadersInstalledDesc));
}

Status Init(FunctionContext*) {
Status Init(FunctionContext*, BoolValue include_kelvin) {
px::vizier::services::metadata::AgentInfoRequest req;
resp_ = std::make_unique<px::vizier::services::metadata::AgentInfoResponse>();
include_kelvin_ = include_kelvin.val;

grpc::ClientContext ctx;
add_context_authentication_func_(&ctx);
Expand All @@ -438,21 +449,31 @@ class GetLinuxHeadersStatus final : public carnot::udf::UDTF<GetLinuxHeadersStat
return Status::OK();
}

static constexpr auto InitArgs() {
return MakeArray(UDTFArg::Make<types::BOOLEAN>(
"include_kelvin", "Whether to include Kelvin agents in the output", true));
}

bool NextRecord(FunctionContext*, RecordWriter* rw) {
const auto& agent_metadata = resp_->info(idx_);
const auto& agent_info = agent_metadata.agent();

const auto asid = agent_info.asid();
const auto kernel_headers_installed = agent_info.info().host_info().kernel_headers_installed();
rw->Append<IndexOf("asid")>(asid);
rw->Append<IndexOf("kernel_headers_installed")>(kernel_headers_installed);
auto collects_data = agent_info.info().capabilities().collects_data();
const auto host_info = agent_info.info().host_info();
const auto kernel_headers_installed = host_info.kernel_headers_installed();
if (collects_data || include_kelvin_) {
rw->Append<IndexOf("asid")>(asid);
rw->Append<IndexOf("kernel_headers_installed")>(kernel_headers_installed);
}

++idx_;
return idx_ < resp_->info_size();
}

private:
int idx_ = 0;
bool include_kelvin_ = false;
std::unique_ptr<px::vizier::services::metadata::AgentInfoResponse> resp_;
std::shared_ptr<MDSStub> stub_;
std::function<void(grpc::ClientContext*)> add_context_authentication_func_;
Expand Down

0 comments on commit 1b14e8c

Please sign in to comment.