diff --git a/src/carnot/planner/logical_planner_test.cc b/src/carnot/planner/logical_planner_test.cc index 7203d3d4456..b15cf201484 100644 --- a/src/carnot/planner/logical_planner_test.cc +++ b/src/carnot/planner/logical_planner_test.cc @@ -272,6 +272,24 @@ TEST_F(LogicalPlannerTest, AppendSelfTest) { EXPECT_OK(plan->ToProto()); } +constexpr char kAgentStatusQuery[] = R"pxl( +import px + +# GetAgentStatus takes an include_kelvin argument. This defaults to True +# to preserve backwards compatibility. +px.display(px.GetAgentStatus()) +)pxl"; + +TEST_F(LogicalPlannerTest, UDTFDefaultArgumentTest) { + auto planner = LogicalPlanner::Create(info_).ConsumeValueOrDie(); + auto plan_or_s = planner->Plan( + MakeQueryRequest(testutils::CreateTwoPEMsOneKelvinPlannerState(testutils::kHttpEventsSchema), + kAgentStatusQuery)); + EXPECT_OK(plan_or_s); + auto plan = plan_or_s.ConsumeValueOrDie(); + EXPECT_OK(plan->ToProto()); +} + constexpr char kPlannerQueryError[] = R"pxl( import px diff --git a/src/carnot/udf/registry.cc b/src/carnot/udf/registry.cc index d3e15036484..8feebba2e88 100644 --- a/src/carnot/udf/registry.cc +++ b/src/carnot/udf/registry.cc @@ -92,21 +92,25 @@ void DefaultToScalarValue(const UDTFArg&, planpb::ScalarValue*) { template <> void DefaultToScalarValue(const UDTFArg& arg, planpb::ScalarValue* out) { out->set_bool_value(arg.GetDefaultValue().val); + out->set_data_type(types::BOOLEAN); } template <> void DefaultToScalarValue(const UDTFArg& arg, planpb::ScalarValue* out) { out->set_int64_value(arg.GetDefaultValue().val); + out->set_data_type(types::INT64); } template <> void DefaultToScalarValue(const UDTFArg& arg, planpb::ScalarValue* out) { out->set_time64_ns_value(arg.GetDefaultValue().val); + out->set_data_type(types::TIME64NS); } template <> void DefaultToScalarValue(const UDTFArg& arg, planpb::ScalarValue* out) { out->set_float64_value(arg.GetDefaultValue().val); + out->set_data_type(types::FLOAT64); } template <> @@ -116,11 +120,13 @@ void DefaultToScalarValue(const UDTFArg& arg, planpb::ScalarValu out_val->set_high(casted_arg.High64()); out_val->set_high(casted_arg.Low64()); + out->set_data_type(types::UINT128); } template <> void DefaultToScalarValue(const UDTFArg& arg, planpb::ScalarValue* out) { out->set_string_value(std::string(arg.GetDefaultValue())); + out->set_data_type(types::STRING); } } // namespace diff --git a/src/carnot/udf/registry_test.cc b/src/carnot/udf/registry_test.cc index adfe0d92f8b..41de2a82df1 100644 --- a/src/carnot/udf/registry_test.cc +++ b/src/carnot/udf/registry_test.cc @@ -421,6 +421,7 @@ udtfs { semantic_type: ST_NONE default_value { int64_value: 123 + data_type: INT64 } } args { diff --git a/src/vizier/funcs/md_udtfs/md_udtfs_impl.h b/src/vizier/funcs/md_udtfs/md_udtfs_impl.h index 7cb40966933..e48dd4ce790 100644 --- a/src/vizier/funcs/md_udtfs/md_udtfs_impl.h +++ b/src/vizier/funcs/md_udtfs/md_udtfs_impl.h @@ -304,9 +304,10 @@ class GetAgentStatus final : public carnot::udf::UDTF { kKernelHeadersInstalledDesc)); } - Status Init(FunctionContext*) { + Status Init(FunctionContext*, types::BoolValue include_kelvin) { px::vizier::services::metadata::AgentInfoRequest req; resp_ = std::make_unique(); + include_kelvin_ = include_kelvin.val; grpc::ClientContext ctx; add_context_authentication_func_(&ctx); @@ -317,6 +318,11 @@ class GetAgentStatus final : public carnot::udf::UDTF { return Status::OK(); } + static constexpr auto InitArgs() { + return MakeArray(UDTFArg::Make( + "include_kelvin", "Whether to include Kelvin agents in the output", true)); + } + bool NextRecord(FunctionContext*, RecordWriter* rw) { const auto& agent_metadata = resp_->info(idx_); const auto& agent_info = agent_metadata.agent(); @@ -329,15 +335,18 @@ class GetAgentStatus final : public carnot::udf::UDTF { } // TODO(zasgar): Figure out abort mechanism; - rw->Append(absl::MakeUint128(u.ab, u.cd)); - rw->Append(agent_info.asid()); - rw->Append(agent_info.info().host_info().hostname()); - rw->Append(agent_info.info().ip_address()); - rw->Append(StringValue(magic_enum::enum_name(agent_status.state()))); - rw->Append(agent_info.create_time_ns()); - rw->Append(agent_status.ns_since_last_heartbeat()); - rw->Append( - agent_info.info().host_info().kernel_headers_installed()); + auto host_info = agent_info.info().host_info(); + auto collects_data = agent_info.info().capabilities().collects_data(); + if (collects_data || include_kelvin_) { + rw->Append(absl::MakeUint128(u.ab, u.cd)); + rw->Append(agent_info.asid()); + rw->Append(host_info.hostname()); + rw->Append(agent_info.info().ip_address()); + rw->Append(StringValue(magic_enum::enum_name(agent_status.state()))); + rw->Append(agent_info.create_time_ns()); + rw->Append(agent_status.ns_since_last_heartbeat()); + rw->Append(host_info.kernel_headers_installed()); + } ++idx_; return idx_ < resp_->info_size(); @@ -345,6 +354,7 @@ class GetAgentStatus final : public carnot::udf::UDTF { private: int idx_ = 0; + bool include_kelvin_ = false; std::unique_ptr resp_; std::shared_ptr stub_; std::function add_context_authentication_func_; @@ -425,9 +435,10 @@ class GetLinuxHeadersStatus final : public carnot::udf::UDTF(); + include_kelvin_ = include_kelvin.val; grpc::ClientContext ctx; add_context_authentication_func_(&ctx); @@ -438,14 +449,23 @@ class GetLinuxHeadersStatus final : public carnot::udf::UDTF( + "include_kelvin", "Whether to include Kelvin agents in the output", true)); + } + bool NextRecord(FunctionContext*, RecordWriter* rw) { const auto& agent_metadata = resp_->info(idx_); const auto& agent_info = agent_metadata.agent(); const auto asid = agent_info.asid(); - const auto kernel_headers_installed = agent_info.info().host_info().kernel_headers_installed(); - rw->Append(asid); - rw->Append(kernel_headers_installed); + auto collects_data = agent_info.info().capabilities().collects_data(); + const auto host_info = agent_info.info().host_info(); + const auto kernel_headers_installed = host_info.kernel_headers_installed(); + if (collects_data || include_kelvin_) { + rw->Append(asid); + rw->Append(kernel_headers_installed); + } ++idx_; return idx_ < resp_->info_size(); @@ -453,6 +473,7 @@ class GetLinuxHeadersStatus final : public carnot::udf::UDTF resp_; std::shared_ptr stub_; std::function add_context_authentication_func_;