From 879e89791d0f37d9132f66f5f8f62f02dfe2177e Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 28 Nov 2023 16:07:11 +0800 Subject: [PATCH 1/2] Revert "Enable spilling for partial aggregation (7558)" This reverts commit 45117f0f1eff6b41c115684cab8d51822b6f6759. --- velox/core/PlanNode.cpp | 3 +- velox/core/tests/PlanFragmentTest.cpp | 8 +- velox/exec/GroupingSet.cpp | 8 +- velox/exec/tests/AggregationTest.cpp | 126 ------------------ .../aggregates/ApproxPercentileAggregate.cpp | 33 ++--- 5 files changed, 18 insertions(+), 160 deletions(-) diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index bb3be12f2983..9adbb398bac1 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -237,7 +237,8 @@ bool AggregationNode::canSpill(const QueryConfig& queryConfig) const { } // TODO: add spilling for pre-grouped aggregation later: // https://github.com/facebookincubator/velox/issues/3264 - return preGroupedKeys().empty() && queryConfig.aggregationSpillEnabled(); + return (isFinal() || isSingle()) && preGroupedKeys().empty() && + queryConfig.aggregationSpillEnabled(); } void AggregationNode::addDetails(std::stringstream& stream) const { diff --git a/velox/core/tests/PlanFragmentTest.cpp b/velox/core/tests/PlanFragmentTest.cpp index 4c17e3f44db7..daf88bb8a483 100644 --- a/velox/core/tests/PlanFragmentTest.cpp +++ b/velox/core/tests/PlanFragmentTest.cpp @@ -159,14 +159,14 @@ TEST_F(PlanFragmentTest, aggregationCanSpill) { {AggregationNode::Step::kSingle, true, true, false, false, true}, {AggregationNode::Step::kIntermediate, false, true, false, false, false}, {AggregationNode::Step::kIntermediate, true, false, false, false, false}, - {AggregationNode::Step::kIntermediate, true, true, true, false, true}, + {AggregationNode::Step::kIntermediate, true, true, true, false, false}, {AggregationNode::Step::kIntermediate, true, true, false, true, false}, - {AggregationNode::Step::kIntermediate, true, true, false, false, true}, + {AggregationNode::Step::kIntermediate, true, true, false, false, false}, {AggregationNode::Step::kPartial, false, true, false, false, false}, {AggregationNode::Step::kPartial, true, false, false, false, false}, - {AggregationNode::Step::kPartial, true, true, true, false, true}, + {AggregationNode::Step::kPartial, true, true, true, false, false}, {AggregationNode::Step::kPartial, true, true, false, true, false}, - {AggregationNode::Step::kPartial, true, true, false, false, true}, + {AggregationNode::Step::kPartial, true, true, false, false, false}, {AggregationNode::Step::kFinal, false, true, false, false, false}, {AggregationNode::Step::kFinal, true, false, false, false, false}, {AggregationNode::Step::kFinal, true, true, true, false, true}, diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index d279c216bfd6..9ecf91f79a50 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -826,7 +826,7 @@ const HashLookup& GroupingSet::hashLookup() const { void GroupingSet::ensureInputFits(const RowVectorPtr& input) { // Spilling is considered if this is a final or single aggregation and // spillPath is set. - if (spillConfig_ == nullptr) { + if (isPartial_ || spillConfig_ == nullptr) { return; } @@ -909,7 +909,7 @@ void GroupingSet::ensureOutputFits() { // to reserve memory for the output as we can't reclaim much memory from this // operator itself. The output processing can reclaim memory from the other // operator or query through memory arbitration. - if (spillConfig_ == nullptr || hasSpilled()) { + if (isPartial_ || spillConfig_ == nullptr || hasSpilled()) { return; } @@ -956,10 +956,6 @@ void GroupingSet::spill() { return; } - if (hasSpilled() && spiller_->finalized()) { - return; - } - if (!hasSpilled()) { auto rows = table_->rows(); VELOX_DCHECK(pool_.trackUsage()); diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 441326e610b7..d91c5c6dea77 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -20,7 +20,6 @@ #include "folly/experimental/EventCount.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/FileSystems.h" -#include "velox/common/memory/Memory.h" #include "velox/common/testutil/TestValue.h" #include "velox/dwio/common/tests/utils/BatchMaker.h" #include "velox/exec/Aggregate.h" @@ -398,33 +397,6 @@ class AggregationTest : public OperatorTestBase { VARCHAR()})}; folly::Random::DefaultGenerator rng_; memory::MemoryReclaimer::Stats reclaimerStats_; - - std::shared_ptr newQueryCtx( - int64_t memoryCapacity = memory::kMaxMemory) { - std::unordered_map> configs; - std::shared_ptr pool = memoryManager_->addRootPool( - "", memoryCapacity, MemoryReclaimer::create()); - auto queryCtx = std::make_shared( - executor_.get(), - core::QueryConfig({}), - configs, - cache::AsyncDataCache::getInstance(), - std::move(pool)); - return queryCtx; - } - - void setupMemory() { - memory::MemoryManagerOptions options; - options.arbitratorKind = "SHARED"; - options.checkUsageLeak = true; - memoryAllocator_ = memory::MemoryAllocator::createDefaultInstance(); - options.allocator = memoryAllocator_.get(); - memoryManager_ = std::make_unique(options); - } - - private: - std::shared_ptr memoryAllocator_; - std::unique_ptr memoryManager_; }; template <> @@ -875,104 +847,6 @@ TEST_F(AggregationTest, partialAggregationMemoryLimit) { .customStats.count("flushRowCount")); } -// TODO move to arbitrator test -TEST_F(AggregationTest, partialAggregationSpill) { - VectorFuzzer::Options fuzzerOpts; - fuzzerOpts.vectorSize = 128; - RowTypePtr rowType = ROW( - {{"c0", INTEGER()}, - {"c1", INTEGER()}, - {"c2", INTEGER()}, - {"c3", INTEGER()}, - {"c4", INTEGER()}, - {"c5", INTEGER()}, - {"c6", INTEGER()}, - {"c7", INTEGER()}, - {"c8", INTEGER()}, - {"c9", INTEGER()}, - {"c10", INTEGER()}}); - VectorFuzzer fuzzer(std::move(fuzzerOpts), pool()); - - std::vector vectors; - - const int32_t numVectors = 2000; - for (int i = 0; i < numVectors; i++) { - vectors.push_back(fuzzer.fuzzRow(rowType)); - } - - createDuckDbTable(vectors); - - setupMemory(); - - core::PlanNodeId partialAggNodeId; - core::PlanNodeId finalAggNodeId; - // Set an artificially low limit on the amount of data to accumulate in - // the partial aggregation. - - // Distinct aggregation. - auto spillDirectory1 = exec::test::TempDirectoryPath::create(); - auto task = AssertQueryBuilder(duckDbQueryRunner_) - .queryCtx(newQueryCtx(10LL << 10 << 10)) - .spillDirectory(spillDirectory1->path) - .config(QueryConfig::kSpillEnabled, "true") - .config(QueryConfig::kAggregationSpillEnabled, "true") - .config( - QueryConfig::kAggregationSpillMemoryThreshold, - std::to_string(0)) // always spill on final agg - .plan(PlanBuilder() - .values(vectors) - .partialAggregation({"c0"}, {}) - .capturePlanNodeId(partialAggNodeId) - .finalAggregation() - .capturePlanNodeId(finalAggNodeId) - .planNode()) - .assertResults("SELECT distinct c0 FROM tmp"); - - checkSpillStats(toPlanStats(task->taskStats()).at(partialAggNodeId), true); - checkSpillStats(toPlanStats(task->taskStats()).at(finalAggNodeId), true); - - // Count aggregation. - auto spillDirectory2 = exec::test::TempDirectoryPath::create(); - task = AssertQueryBuilder(duckDbQueryRunner_) - .queryCtx(newQueryCtx(10LL << 10 << 10)) - .spillDirectory(spillDirectory2->path) - .config(QueryConfig::kSpillEnabled, "true") - .config(QueryConfig::kAggregationSpillEnabled, "true") - .config( - QueryConfig::kAggregationSpillMemoryThreshold, - std::to_string(0)) // always spill on final agg - .plan(PlanBuilder() - .values(vectors) - .partialAggregation({"c0"}, {"count(1)"}) - .capturePlanNodeId(partialAggNodeId) - .finalAggregation() - .capturePlanNodeId(finalAggNodeId) - .planNode()) - .assertResults("SELECT c0, count(1) FROM tmp GROUP BY 1"); - - checkSpillStats(toPlanStats(task->taskStats()).at(partialAggNodeId), true); - checkSpillStats(toPlanStats(task->taskStats()).at(finalAggNodeId), true); - - // Global aggregation. - task = AssertQueryBuilder(duckDbQueryRunner_) - .queryCtx(newQueryCtx(10LL << 10 << 10)) - .plan(PlanBuilder() - .values(vectors) - .partialAggregation({}, {"sum(c0)"}) - .capturePlanNodeId(partialAggNodeId) - .finalAggregation() - .capturePlanNodeId(finalAggNodeId) - .planNode()) - .assertResults("SELECT sum(c0) FROM tmp"); - EXPECT_EQ( - 0, - toPlanStats(task->taskStats()) - .at(partialAggNodeId) - .customStats.count("flushRowCount")); - checkSpillStats(toPlanStats(task->taskStats()).at(partialAggNodeId), false); - checkSpillStats(toPlanStats(task->taskStats()).at(finalAggNodeId), false); -} - TEST_F(AggregationTest, partialDistinctWithAbandon) { auto vectors = { // 1st batch will produce 100 distinct groups from 10 rows. diff --git a/velox/functions/prestosql/aggregates/ApproxPercentileAggregate.cpp b/velox/functions/prestosql/aggregates/ApproxPercentileAggregate.cpp index 528786dbc9a5..dc24b2095381 100644 --- a/velox/functions/prestosql/aggregates/ApproxPercentileAggregate.cpp +++ b/velox/functions/prestosql/aggregates/ApproxPercentileAggregate.cpp @@ -639,19 +639,6 @@ class ApproxPercentileAggregate : public exec::Aggregate { DecodedVector decodedDigest_; private: - bool isConstantVector(const VectorPtr& vec) { - if (vec->isConstantEncoding()) { - return true; - } - VELOX_USER_CHECK(vec->size() > 0); - for (vector_size_t i = 1; i < vec->size(); ++i) { - if (!vec->equalValueAt(vec.get(), i, 0)) { - return false; - } - } - return true; - } - template void addIntermediateImpl( std::conditional_t group, @@ -663,8 +650,7 @@ class ApproxPercentileAggregate : public exec::Aggregate { if constexpr (checkIntermediateInputs) { VELOX_USER_CHECK(rowVec); for (int i = kPercentiles; i <= kAccuracy; ++i) { - VELOX_USER_CHECK(isConstantVector( - rowVec->childAt(i))); // spilling flats constant encoding + VELOX_USER_CHECK(rowVec->childAt(i)->isConstantEncoding()); } for (int i = kK; i <= kMaxValue; ++i) { VELOX_USER_CHECK(rowVec->childAt(i)->isFlatEncoding()); @@ -691,9 +677,10 @@ class ApproxPercentileAggregate : public exec::Aggregate { } DecodedVector percentiles(*rowVec->childAt(kPercentiles), *baseRows); - DecodedVector percentileIsArray( - *rowVec->childAt(kPercentilesIsArray), *baseRows); - DecodedVector accuracy(*rowVec->childAt(kAccuracy), *baseRows); + auto percentileIsArray = + rowVec->childAt(kPercentilesIsArray)->asUnchecked>(); + auto accuracy = + rowVec->childAt(kAccuracy)->asUnchecked>(); auto k = rowVec->childAt(kK)->asUnchecked>(); auto n = rowVec->childAt(kN)->asUnchecked>(); auto minValue = rowVec->childAt(kMinValue)->asUnchecked>(); @@ -723,7 +710,7 @@ class ApproxPercentileAggregate : public exec::Aggregate { return; } int i = decoded.index(row); - if (percentileIsArray.isNullAt(i)) { + if (percentileIsArray->isNullAt(i)) { return; } if (!accumulator) { @@ -733,10 +720,10 @@ class ApproxPercentileAggregate : public exec::Aggregate { percentilesBase->elements()->asFlatVector(); if constexpr (checkIntermediateInputs) { VELOX_USER_CHECK(percentileBaseElements); - VELOX_USER_CHECK(!percentiles.isNullAt(indexInBaseVector)); + VELOX_USER_CHECK(!percentilesBase->isNullAt(indexInBaseVector)); } - bool isArray = percentileIsArray.valueAt(i); + bool isArray = percentileIsArray->valueAt(i); const double* data; vector_size_t len; std::vector isNull; @@ -744,8 +731,8 @@ class ApproxPercentileAggregate : public exec::Aggregate { percentilesBase, indexInBaseVector, data, len, isNull); checkSetPercentile(isArray, data, len, isNull); - if (!accuracy.isNullAt(i)) { - checkSetAccuracy(accuracy.valueAt(i)); + if (!accuracy->isNullAt(i)) { + checkSetAccuracy(accuracy->valueAt(i)); } } if constexpr (kSingleGroup) { From 44d2214d285a07d5c5b674a5ebc834d6741954e9 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 13 Nov 2023 16:36:43 +0800 Subject: [PATCH 2/2] fixup fixup fixup fixup fixup fixup fixup fixup fixup fixup fixup fixup fixup fixup Revert "fixup" This reverts commit ecf530e5e2c889c24e8f52ad3c277577fcea4174. Revert "fixup" This reverts commit 9bd48defff73a0fc3b987f6a72cf7d0ae3995590. fixup fixup fixup fixup fixup fixup fixup fixup Revert "fixup" This reverts commit f9efc1a2e3e7f76cff8000a7ad7530aaf47eb361. fixup fixup fixup fixup test This reverts commit 73bf319b76d74a794d2fcffa3b992f581d69f6a1. --- velox/core/PlanNode.cpp | 12 ++++- velox/core/PlanNode.h | 8 ++++ velox/core/QueryConfig.h | 13 +++++- velox/exec/GroupingSet.cpp | 20 +++++++-- velox/exec/tests/SharedArbitratorTest.cpp | 44 +++++++++++++++++++ .../aggregates/ApproxPercentileAggregate.cpp | 33 +++++++++----- 6 files changed, 113 insertions(+), 17 deletions(-) diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index 9adbb398bac1..87f91d8c7182 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -237,8 +237,16 @@ bool AggregationNode::canSpill(const QueryConfig& queryConfig) const { } // TODO: add spilling for pre-grouped aggregation later: // https://github.com/facebookincubator/velox/issues/3264 - return (isFinal() || isSingle()) && preGroupedKeys().empty() && - queryConfig.aggregationSpillEnabled(); + if ((isFinal() || isSingle()) && queryConfig.aggregationSpillEnabled()) { + return preGroupedKeys().empty(); + } + + if ((isIntermediate() || isPartial()) && + queryConfig.partialAggregationSpillEnabled()) { + return preGroupedKeys().empty(); + } + + return false; } void AggregationNode::addDetails(std::stringstream& stream) const { diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index a041cdc9aee6..99073e4380e9 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -607,6 +607,14 @@ class AggregationNode : public PlanNode { return step_ == Step::kSingle; } + bool isIntermediate() const { + return step_ == Step::kIntermediate; + } + + bool isPartial() const { + return step_ == Step::kPartial; + } + folly::dynamic serialize() const override; static PlanNodePtr create(const folly::dynamic& obj, void* context); diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 340fdd5412a3..2fc6a903d06e 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -198,6 +198,11 @@ class QueryConfig { static constexpr const char* kAggregationSpillEnabled = "aggregation_spill_enabled"; + /// Partial aggregation spilling flag, only applies if "spill_enabled" flag is + /// set. + static constexpr const char* kPartialAggregationSpillEnabled = + "partial_aggregation_spill_enabled"; + /// Join spilling flag, only applies if "spill_enabled" flag is set. static constexpr const char* kJoinSpillEnabled = "join_spill_enabled"; @@ -493,11 +498,17 @@ class QueryConfig { } /// Returns 'is aggregation spilling enabled' flag. Must also check the - /// spillEnabled()!g + /// spillEnabled()! bool aggregationSpillEnabled() const { return get(kAggregationSpillEnabled, true); } + /// Returns 'is partial aggregation spilling enabled' flag. Must also check + /// the spillEnabled()! + bool partialAggregationSpillEnabled() const { + return get(kPartialAggregationSpillEnabled, false); + } + /// Returns 'is join spilling enabled' flag. Must also check the /// spillEnabled()! bool joinSpillEnabled() const { diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index 9ecf91f79a50..636e8a9170a3 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -725,6 +725,7 @@ bool GroupingSet::getOutput( } if (hasSpilled()) { + spill(); return getOutputWithSpill(maxOutputRows, maxOutputBytes, result); } VELOX_CHECK(!isDistinct()); @@ -826,7 +827,7 @@ const HashLookup& GroupingSet::hashLookup() const { void GroupingSet::ensureInputFits(const RowVectorPtr& input) { // Spilling is considered if this is a final or single aggregation and // spillPath is set. - if (isPartial_ || spillConfig_ == nullptr) { + if (spillConfig_ == nullptr) { return; } @@ -909,7 +910,7 @@ void GroupingSet::ensureOutputFits() { // to reserve memory for the output as we can't reclaim much memory from this // operator itself. The output processing can reclaim memory from the other // operator or query through memory arbitration. - if (isPartial_ || spillConfig_ == nullptr || hasSpilled()) { + if (spillConfig_ == nullptr || hasSpilled()) { return; } @@ -929,6 +930,9 @@ void GroupingSet::ensureOutputFits() { return; } } + if (hasSpilled()) { + return; + } spill(RowContainerIterator{}); } @@ -955,7 +959,6 @@ void GroupingSet::spill() { if (table_ == nullptr || table_->numDistinct() == 0) { return; } - if (!hasSpilled()) { auto rows = table_->rows(); VELOX_DCHECK(pool_.trackUsage()); @@ -1045,7 +1048,16 @@ bool GroupingSet::getOutputWithSpill( if (merge_ == nullptr) { return false; } - return mergeNext(maxOutputRows, maxOutputBytes, result); + bool hasData = mergeNext(maxOutputRows, maxOutputBytes, result); + if (!hasData) { + // If spill has been finalized, reset merge stream and spiller. This would + // help partial aggregation replay the spilling procedure once needed again. + merge_ = nullptr; + mergeRows_ = nullptr; + mergeArgs_.clear(); + spiller_ = nullptr; + } + return hasData; } bool GroupingSet::mergeNext( diff --git a/velox/exec/tests/SharedArbitratorTest.cpp b/velox/exec/tests/SharedArbitratorTest.cpp index 6577e7d0e70b..3d1e9350c395 100644 --- a/velox/exec/tests/SharedArbitratorTest.cpp +++ b/velox/exec/tests/SharedArbitratorTest.cpp @@ -1053,6 +1053,50 @@ TEST_F(SharedArbitrationTest, reclaimFromDistinctAggregation) { waitForAllTasksToBeDeleted(); } +TEST_F(SharedArbitrationTest, reclaimFromPartialAggregation) { + const uint64_t maxQueryCapacity = 20L << 20; + std::vector vectors = newVectors(1024, maxQueryCapacity * 2); + createDuckDbTable(vectors); + const auto spillDirectory = exec::test::TempDirectoryPath::create(); + core::PlanNodeId partialAggNodeId; + core::PlanNodeId finalAggNodeId; + std::shared_ptr queryCtx = newQueryCtx(maxQueryCapacity); + auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kPartialAggregationSpillEnabled, "true") + .config(core::QueryConfig::kAggregationSpillEnabled, "true") + .config( + core::QueryConfig::kMaxPartialAggregationMemory, + std::to_string(1LL << 30)) // disable flush + .config( + core::QueryConfig::kMaxExtendedPartialAggregationMemory, + std::to_string(1LL << 30)) // disable flush + .config( + core::QueryConfig::kAbandonPartialAggregationMinPct, + "200") // avoid abandoning + .config( + core::QueryConfig::kAbandonPartialAggregationMinRows, + std::to_string(1LL << 30)) // avoid abandoning + .queryCtx(queryCtx) + .plan(PlanBuilder() + .values(vectors) + .partialAggregation({"c0"}, {"count(1)"}) + .capturePlanNodeId(partialAggNodeId) + .finalAggregation() + .capturePlanNodeId(finalAggNodeId) + .planNode()) + .assertResults("SELECT c0, count(1) FROM tmp GROUP BY c0"); + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& partialStats = taskStats.at(partialAggNodeId); + auto& finalStats = taskStats.at(finalAggNodeId); + ASSERT_GT(partialStats.spilledBytes, 0); + ASSERT_GT(finalStats.spilledBytes, 0); + task.reset(); + waitForAllTasksToBeDeleted(); +} + DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimFromAggregationOnNoMoreInput) { const int numVectors = 32; std::vector vectors; diff --git a/velox/functions/prestosql/aggregates/ApproxPercentileAggregate.cpp b/velox/functions/prestosql/aggregates/ApproxPercentileAggregate.cpp index dc24b2095381..528786dbc9a5 100644 --- a/velox/functions/prestosql/aggregates/ApproxPercentileAggregate.cpp +++ b/velox/functions/prestosql/aggregates/ApproxPercentileAggregate.cpp @@ -639,6 +639,19 @@ class ApproxPercentileAggregate : public exec::Aggregate { DecodedVector decodedDigest_; private: + bool isConstantVector(const VectorPtr& vec) { + if (vec->isConstantEncoding()) { + return true; + } + VELOX_USER_CHECK(vec->size() > 0); + for (vector_size_t i = 1; i < vec->size(); ++i) { + if (!vec->equalValueAt(vec.get(), i, 0)) { + return false; + } + } + return true; + } + template void addIntermediateImpl( std::conditional_t group, @@ -650,7 +663,8 @@ class ApproxPercentileAggregate : public exec::Aggregate { if constexpr (checkIntermediateInputs) { VELOX_USER_CHECK(rowVec); for (int i = kPercentiles; i <= kAccuracy; ++i) { - VELOX_USER_CHECK(rowVec->childAt(i)->isConstantEncoding()); + VELOX_USER_CHECK(isConstantVector( + rowVec->childAt(i))); // spilling flats constant encoding } for (int i = kK; i <= kMaxValue; ++i) { VELOX_USER_CHECK(rowVec->childAt(i)->isFlatEncoding()); @@ -677,10 +691,9 @@ class ApproxPercentileAggregate : public exec::Aggregate { } DecodedVector percentiles(*rowVec->childAt(kPercentiles), *baseRows); - auto percentileIsArray = - rowVec->childAt(kPercentilesIsArray)->asUnchecked>(); - auto accuracy = - rowVec->childAt(kAccuracy)->asUnchecked>(); + DecodedVector percentileIsArray( + *rowVec->childAt(kPercentilesIsArray), *baseRows); + DecodedVector accuracy(*rowVec->childAt(kAccuracy), *baseRows); auto k = rowVec->childAt(kK)->asUnchecked>(); auto n = rowVec->childAt(kN)->asUnchecked>(); auto minValue = rowVec->childAt(kMinValue)->asUnchecked>(); @@ -710,7 +723,7 @@ class ApproxPercentileAggregate : public exec::Aggregate { return; } int i = decoded.index(row); - if (percentileIsArray->isNullAt(i)) { + if (percentileIsArray.isNullAt(i)) { return; } if (!accumulator) { @@ -720,10 +733,10 @@ class ApproxPercentileAggregate : public exec::Aggregate { percentilesBase->elements()->asFlatVector(); if constexpr (checkIntermediateInputs) { VELOX_USER_CHECK(percentileBaseElements); - VELOX_USER_CHECK(!percentilesBase->isNullAt(indexInBaseVector)); + VELOX_USER_CHECK(!percentiles.isNullAt(indexInBaseVector)); } - bool isArray = percentileIsArray->valueAt(i); + bool isArray = percentileIsArray.valueAt(i); const double* data; vector_size_t len; std::vector isNull; @@ -731,8 +744,8 @@ class ApproxPercentileAggregate : public exec::Aggregate { percentilesBase, indexInBaseVector, data, len, isNull); checkSetPercentile(isArray, data, len, isNull); - if (!accuracy->isNullAt(i)) { - checkSetAccuracy(accuracy->valueAt(i)); + if (!accuracy.isNullAt(i)) { + checkSetAccuracy(accuracy.valueAt(i)); } } if constexpr (kSingleGroup) {