From dc2c66b3e28f51b5f1d978c08eeacb36ca08b232 Mon Sep 17 00:00:00 2001 From: yan ma Date: Sun, 21 Apr 2024 18:11:46 +0800 Subject: [PATCH] max-by --- velox/exec/Aggregate.h | 2 + velox/exec/Driver.cpp | 3 + velox/exec/FilterProject.cpp | 19 +- velox/exec/GroupingSet.cpp | 11 + velox/exec/HashAggregation.cpp | 2 + velox/exec/Operator.cpp | 11 +- velox/exec/tests/CMakeLists.txt | 55 +---- velox/exec/tests/FilterProjectTest.cpp | 216 ------------------ velox/expression/Expr.cpp | 7 +- .../lib/aggregates/MinMaxByAggregatesBase.h | 182 +++++++++++++-- velox/functions/sparksql/CMakeLists.txt | 1 - .../tests/MinMaxByAggregationTest.cpp | 14 +- 12 files changed, 223 insertions(+), 300 deletions(-) diff --git a/velox/exec/Aggregate.h b/velox/exec/Aggregate.h index d6bc12aefcde..30980daad3a2 100644 --- a/velox/exec/Aggregate.h +++ b/velox/exec/Aggregate.h @@ -15,6 +15,7 @@ */ #pragma once +#include #include #include "velox/common/memory/HashStringAllocator.h" @@ -384,6 +385,7 @@ class Aggregate { if (mask & nullMask_) { group[nullByte_] = mask & ~nullMask_; --numNulls_; + std::cout << "null in group is cleared." << std::endl; return true; } } diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index f6e466cb5a5a..8368c4c1a2da 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include #include "Driver.h" #include #include @@ -596,6 +597,7 @@ StopReason Driver::runInternal( curOperatorId_, kOpMethodGetOutput); if (intermediateResult) { + std::cout << "driver" <toString(0) << std::endl; VELOX_CHECK( intermediateResult->size() > 0, "Operator::getOutput() must return nullptr or " @@ -703,6 +705,7 @@ StopReason Driver::runInternal( curOperatorId_, kOpMethodGetOutput); if (result) { + std::cout << "final result " << result->toString(0) << std::endl; VELOX_CHECK( result->size() > 0, "Operator::getOutput() must return nullptr or " diff --git a/velox/exec/FilterProject.cpp b/velox/exec/FilterProject.cpp index 055846b4ad18..fd091c8e3e9e 100644 --- a/velox/exec/FilterProject.cpp +++ b/velox/exec/FilterProject.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ #include "velox/exec/FilterProject.h" +#include #include "velox/core/Expressions.h" #include "velox/expression/Expr.h" #include "velox/expression/FieldReference.h" @@ -122,10 +123,16 @@ bool FilterProject::isFinished() { } RowVectorPtr FilterProject::getOutput() { + // for (vector_size_t i = 0; i < input_->size(); i++) { + // std::cout << "FilterProject input:" << input_->toString(i) << std::endl; + // } + if (allInputProcessed()) { return nullptr; } - + for (vector_size_t i = 0; i < input_->size(); i++) { + std::cout << "FilterProject input:" << input_->toString(i) << std::endl; + } vector_size_t size = input_->size(); LocalSelectivityVector localRows(*operatorCtx_->execCtx(), size); auto* rows = localRows.get(); @@ -140,10 +147,14 @@ RowVectorPtr FilterProject::getOutput() { } if (!hasFilter_) { + numProcessedInputRows_ = size; VELOX_CHECK(!isIdentityProjection_); auto results = project(*rows, evalCtx); - + for (vector_size_t i = 0; i < results.size(); i++) { + std::cout << "FilterProject fillOutput:" << results.at(i)->toString() + << std::endl; + } return fillOutput(size, nullptr, results); } @@ -165,6 +176,10 @@ RowVectorPtr FilterProject::getOutput() { } results = project(*rows, evalCtx); } + for (vector_size_t i = 0; i < results.size(); i++) { + std::cout << "FilterProject input:" << results.at(i)->toString() + << std::endl; + } return fillOutput( numOut, diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index 9ffc0fb529bd..90c493a66c2d 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -520,6 +520,7 @@ void GroupingSet::initializeGlobalAggregation() { void GroupingSet::addGlobalAggregationInput( const RowVectorPtr& input, bool mayPushdown) { + std::cout << "GlobalAggregationInput " << input->toString(0) << std::endl; initializeGlobalAggregation(); auto numRows = input->size(); @@ -584,8 +585,13 @@ bool GroupingSet::getGlobalAggregationOutput( auto& resultVector = result->childAt(aggregates_[i].output); if (isPartial_) { function->extractAccumulators(groups, 1, &resultVector); + std::cout << "extractAccumulator result" << resultVector->toString(0) + << std::endl; + } else { function->extractValues(groups, 1, &resultVector); + std::cout << "extractValue result" << resultVector->toString(0) + << std::endl; } } @@ -763,8 +769,13 @@ void GroupingSet::extractGroups( if (isPartial_) { function->extractAccumulators( groups.data(), groups.size(), &aggregateVector); + std::cout << "extractAccumulator result" << aggregateVector->toString() + << std::endl; + } else { function->extractValues(groups.data(), groups.size(), &aggregateVector); + std::cout << "extractValue result" << aggregateVector->toString() + << std::endl; } } diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index 56489e705cb6..0b4d9522cf68 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include #include "velox/exec/HashAggregation.h" #include #include "velox/exec/Task.h" @@ -317,6 +318,7 @@ RowVectorPtr HashAggregation::getOutput() { return nullptr; } numOutputRows_ += output_->size(); + std::cout << "hashAgg " << output_->toString(0) << std::endl; return output_; } diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 45c83098b561..6fc6dc5e732c 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ #include "velox/exec/Operator.h" +#include #include "velox/common/base/Counters.h" #include "velox/common/base/StatsReporter.h" #include "velox/common/base/SuccinctPrinter.h" @@ -207,6 +208,9 @@ RowVectorPtr Operator::fillOutput( if (size == input_->size() && (!mapping || isSequence(mapping->as(), 0, size))) { if (isIdentityProjection_) { + for (vector_size_t i = 0; i < input_->size(); i++) { + std::cout << "Identity projection:" << input_->toString(i) << std::endl; + } return std::move(input_); } wrapResults = false; @@ -226,12 +230,17 @@ RowVectorPtr Operator::fillOutput( size, wrapResults ? mapping : nullptr); - return std::make_shared( + auto res = std::make_shared( operatorCtx_->pool(), outputType_, nullptr, size, std::move(projectedChildren)); + for (vector_size_t i = 0; i < res->size(); i++) { + std::cout << "fill Output end: " << res->toString(i) << std::endl; + } + + return res; } RowVectorPtr Operator::fillOutput( diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index ddfb25743d23..fb469f7e7955 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -28,62 +28,9 @@ target_link_libraries(aggregate_companion_functions_test velox_exec add_executable( velox_exec_test - AddressableNonNullValueListTest.cpp - AggregationTest.cpp - AggregateFunctionRegistryTest.cpp - ArrowStreamTest.cpp - AssignUniqueIdTest.cpp - AsyncConnectorTest.cpp - ContainerRowSerdeTest.cpp - CustomJoinTest.cpp - EnforceSingleRowTest.cpp - ExchangeClientTest.cpp - ExpandTest.cpp FilterProjectTest.cpp - FunctionResolutionTest.cpp - HashBitRangeTest.cpp - HashJoinBridgeTest.cpp - HashJoinTest.cpp - HashPartitionFunctionTest.cpp - HashTableTest.cpp - LimitTest.cpp - LocalPartitionTest.cpp Main.cpp - MarkDistinctTest.cpp - MemoryReclaimerTest.cpp - MergeJoinTest.cpp - MergeTest.cpp - MultiFragmentTest.cpp - NestedLoopJoinTest.cpp - OrderByTest.cpp - OutputBufferManagerTest.cpp - PartitionedOutputTest.cpp - PlanNodeSerdeTest.cpp - PlanNodeToStringTest.cpp - PrefixSortTest.cpp - PrintPlanWithStatsTest.cpp - ProbeOperatorStateTest.cpp - RoundRobinPartitionFunctionTest.cpp - RowContainerTest.cpp - RowNumberTest.cpp - SortBufferTest.cpp - SpillerTest.cpp - SpillTest.cpp - SplitToStringTest.cpp - SqlTest.cpp - StreamingAggregationTest.cpp - TableScanTest.cpp - TableWriteTest.cpp - TaskListenerTest.cpp - ThreadDebugInfoTest.cpp - TopNRowNumberTest.cpp - TopNTest.cpp - UnnestTest.cpp - UnorderedStreamReaderTest.cpp - ValuesTest.cpp - VectorHasherTest.cpp - WindowFunctionRegistryTest.cpp - WindowTest.cpp) +) add_executable( velox_exec_infra_test diff --git a/velox/exec/tests/FilterProjectTest.cpp b/velox/exec/tests/FilterProjectTest.cpp index 57e8d8280be1..49e450176ebf 100644 --- a/velox/exec/tests/FilterProjectTest.cpp +++ b/velox/exec/tests/FilterProjectTest.cpp @@ -58,69 +58,6 @@ class FilterProjectTest : public OperatorTestBase { {BIGINT(), INTEGER(), SMALLINT(), DOUBLE()})}; }; -TEST_F(FilterProjectTest, filter) { - std::vector vectors; - for (int32_t i = 0; i < 10; ++i) { - auto vector = std::dynamic_pointer_cast( - BatchMaker::createBatch(rowType_, 100, *pool_)); - vectors.push_back(vector); - } - createDuckDbTable(vectors); - - assertFilter(vectors); -} - -TEST_F(FilterProjectTest, filterOverDictionary) { - std::vector vectors; - for (int32_t i = 0; i < 10; ++i) { - auto vector = std::dynamic_pointer_cast( - BatchMaker::createBatch(rowType_, 100, *pool_)); - - auto indices = - AlignedBuffer::allocate(2 * vector->size(), pool_.get()); - auto indicesPtr = indices->asMutable(); - for (int32_t j = 0; j < vector->size() / 2; j++) { - indicesPtr[2 * j] = j; - indicesPtr[2 * j + 1] = j; - } - std::vector newChildren = vector->children(); - newChildren[1] = BaseVector::wrapInDictionary( - BufferPtr(nullptr), indices, vector->size(), vector->childAt(1)); - vectors.push_back(std::make_shared( - pool_.get(), - rowType_, - BufferPtr(nullptr), - vector->size(), - newChildren, - 0 /*nullCount*/)); - } - createDuckDbTable(vectors); - - assertFilter(vectors); -} - -TEST_F(FilterProjectTest, filterOverConstant) { - std::vector vectors; - for (int32_t i = 0; i < 10; ++i) { - auto vector = std::dynamic_pointer_cast( - BatchMaker::createBatch(rowType_, 100, *pool_)); - - std::vector newChildren = vector->children(); - newChildren[1] = - BaseVector::wrapInConstant(vector->size(), 7, vector->childAt(1)); - vectors.push_back(std::make_shared( - pool_.get(), - rowType_, - BufferPtr(nullptr), - vector->size(), - newChildren, - 0 /*nullCount*/)); - } - createDuckDbTable(vectors); - - assertFilter(vectors); -} - TEST_F(FilterProjectTest, project) { std::vector vectors; for (int32_t i = 0; i < 10; ++i) { @@ -210,156 +147,3 @@ TEST_F(FilterProjectTest, projectOverLazy) { .planNode(); assertQuery(plan, "SELECT c0 > 0 AND c1 > 0, c1 + 5.2 FROM tmp"); } - -TEST_F(FilterProjectTest, filterProject) { - std::vector vectors; - for (int32_t i = 0; i < 10; ++i) { - auto vector = std::dynamic_pointer_cast( - BatchMaker::createBatch(rowType_, 100, *pool_)); - vectors.push_back(vector); - } - createDuckDbTable(vectors); - - auto plan = PlanBuilder() - .values(vectors) - .filter("c1 % 10 > 0") - .project({"c0", "c1", "c0 + c1"}) - .planNode(); - - assertQuery(plan, "SELECT c0, c1, c0 + c1 FROM tmp WHERE c1 % 10 > 0"); -} - -TEST_F(FilterProjectTest, dereference) { - std::vector vectors; - for (int32_t i = 0; i < 10; ++i) { - auto vector = std::dynamic_pointer_cast( - BatchMaker::createBatch(rowType_, 100, *pool_)); - vectors.push_back(vector); - } - createDuckDbTable(vectors); - - auto plan = PlanBuilder() - .values(vectors) - .project({"row_constructor(c1, c2) AS c1_c2"}) - .project({"c1_c2.c1", "c1_c2.c2"}) - .planNode(); - assertQuery(plan, "SELECT c1, c2 FROM tmp"); - - plan = PlanBuilder() - .values(vectors) - .project({"row_constructor(c1, c2) AS c1_c2"}) - .filter("c1_c2.c1 % 10 = 5") - .project({"c1_c2.c1", "c1_c2.c2"}) - .planNode(); - assertQuery(plan, "SELECT c1, c2 FROM tmp WHERE c1 % 10 = 5"); -} - -TEST_F(FilterProjectTest, allFailedOrPassed) { - std::vector vectors; - for (int32_t i = 0; i < 10; ++i) { - // We alternate between a batch where all pass and a batch where - // no row passes. c0 is flat vector. c1 is constant vector. - const int32_t value = i % 2 == 0 ? 0 : 1; - - vectors.push_back(makeRowVector({ - makeFlatVector(100, [&](auto row) { return value; }), - makeConstant(value, 100), - })); - } - createDuckDbTable(vectors); - - // filter over flat vector - assertFilter(std::move(vectors), "c0 = 0"); - - // filter over constant vector - assertFilter(std::move(vectors), "c1 = 0"); -} - -// Tests fusing of consecutive filters and projects. -TEST_F(FilterProjectTest, filterProjectFused) { - std::vector vectors; - for (int32_t i = 0; i < 10; ++i) { - auto vector = std::dynamic_pointer_cast( - BatchMaker::createBatch(rowType_, 100, *pool_)); - vectors.push_back(vector); - } - createDuckDbTable(vectors); - - auto plan = PlanBuilder() - .values(vectors) - .filter("c0 % 10 < 9") - .project({"c0", "c1", "c0 % 100 + c1 % 50 AS e1"}) - .filter("c0 % 10 < 8") - .project({"c0", "c1", "e1", "c0 % 100 AS e2"}) - .filter("c0 % 10 < 5") - .project({"c0", "c1", "e1", "e2"}) - .planNode(); - - assertQuery( - plan, - "SELECT c0, c1, c0 %100 + c1 % 50, c0 % 100 FROM tmp WHERE c0 % 10 < 5"); -} - -TEST_F(FilterProjectTest, projectAndIdentityOverLazy) { - // Verify that a lazy column which is a part of both an identity projection - // and a regular projection is loaded correctly. This is done by running a - // projection that only operates over a subset of the rows of the lazy vector. - vector_size_t size = 20; - auto valueAt = [](auto row) -> int32_t { return row; }; - auto lazyVectors = makeRowVector({ - makeFlatVector(size, valueAt), - vectorMaker_.lazyFlatVector(size, valueAt), - }); - - auto vectors = makeRowVector({ - makeFlatVector(size, valueAt), - makeFlatVector(size, valueAt), - }); - - createDuckDbTable({vectors}); - - auto plan = PlanBuilder() - .values({lazyVectors}) - .project({"c0 < 10 AND c1 < 10", "c1"}) - .planNode(); - assertQuery(plan, "SELECT c0 < 10 AND c1 < 10, c1 FROM tmp"); -} - -// Verify the optimization of avoiding copy in null propagation does not break -// the case when the field is shared between multiple parents. -TEST_F(FilterProjectTest, nestedFieldReferenceSharedChild) { - auto shared = makeFlatVector(10, folly::identity); - auto vector = makeRowVector({ - makeRowVector({ - makeRowVector({shared}, nullEvery(2)), - makeRowVector({shared}, nullEvery(3)), - }), - }); - auto plan = - PlanBuilder() - .values({vector}) - .project({"coalesce((c0).c0.c0, 0) + coalesce((c0).c1.c0, 0)"}) - .planNode(); - auto expected = makeFlatVector(10); - for (int i = 0; i < 10; ++i) { - expected->set(i, (i % 2 == 0 ? 0 : i) + (i % 3 == 0 ? 0 : i)); - } - AssertQueryBuilder(plan).assertResults(makeRowVector({expected})); -} - -TEST_F(FilterProjectTest, numSilentThrow) { - auto row = makeRowVector( - {makeFlatVector(100, [&](auto row) { return row; })}); - - core::PlanNodeId filterId; - // Change the plan when /0 error is fixed not to throw. - auto plan = PlanBuilder() - .values({row}) - .filter("try (c0 / 0) = 1") - .capturePlanNodeId(filterId) - .planNode(); - - auto task = AssertQueryBuilder(plan).assertEmptyResults(); - auto planStats = toPlanStats(task->taskStats()); - ASSERT_EQ(100, planStats.at(filterId).customStats.at("numSilentThrow").sum); -} diff --git a/velox/expression/Expr.cpp b/velox/expression/Expr.cpp index 1d458c7f55d3..e67777796555 100644 --- a/velox/expression/Expr.cpp +++ b/velox/expression/Expr.cpp @@ -689,9 +689,8 @@ class ExprExceptionContext { /// attempted again on subsequent calls. std::string onTopLevelException(VeloxException::Type exceptionType, void* arg) { auto* context = static_cast(arg); - - const char* basePath = - FLAGS_velox_save_input_on_expression_any_failure_path.c_str(); + std::string path = "/tmp/logs"; + const char* basePath = path.c_str(); if (strlen(basePath) == 0 && exceptionType == VeloxException::Type::kSystem) { basePath = FLAGS_velox_save_input_on_expression_system_failure_path.c_str(); } @@ -1906,7 +1905,7 @@ void ExprSet::eval( context.ensureFieldLoaded(field->index(context), rows); } - if (FLAGS_velox_experimental_save_input_on_fatal_signal) { + if (true) { auto other = process::GetThreadDebugInfo(); process::ThreadDebugInfo debugInfo; if (other) { diff --git a/velox/functions/lib/aggregates/MinMaxByAggregatesBase.h b/velox/functions/lib/aggregates/MinMaxByAggregatesBase.h index 82cf11507e6a..1024380648a6 100644 --- a/velox/functions/lib/aggregates/MinMaxByAggregatesBase.h +++ b/velox/functions/lib/aggregates/MinMaxByAggregatesBase.h @@ -14,6 +14,7 @@ * limitations under the License. */ #pragma once +#include #include "velox/exec/Aggregate.h" #include "velox/exec/ContainerRowSerde.h" @@ -179,9 +180,11 @@ class MinMaxByAggregateBase : public exec::Aggregate { void extractValues(char** groups, int32_t numGroups, VectorPtr* result) override { + std::cout << "extract values" << std::endl; VELOX_CHECK(result); (*result)->resize(numGroups); - uint64_t* rawNulls = getRawNulls(result->get()); + uint64_t* rawNulls = + (*result)->mutableNulls(numGroups)->asMutable(); T* rawValues = nullptr; uint64_t* rawBoolValues = nullptr; @@ -194,27 +197,37 @@ class MinMaxByAggregateBase : public exec::Aggregate { rawValues = vector->mutableRawValues(); } } - + // uint64_t* rawValueNulls = + // vector->mutableRawNulls(rowVector->size())->asMutable(); for (int32_t i = 0; i < numGroups; ++i) { char* group = groups[i]; - if (isNull(group) || valueIsNull(group)) { + if (isNull(group)) { (*result)->setNull(i, true); + std::cout << "extract group null" << std::endl; } else { - clearNull(rawNulls, i); - extract( - value(group), *result, i, rawValues, rawBoolValues); + // clearNull(rawNulls, i); + (*result)->setNull(i, false); + std::cout << "extract group not null" << std::endl; + const bool isValueNull = valueIsNull(group); + // bits::setNull(rawNulls, i, isValueNull); + if (LIKELY(!isValueNull)) { + extract( + value(group), *result, i, rawValues, rawBoolValues); + } } } } void extractAccumulators(char** groups, int32_t numGroups, VectorPtr* result) override { + std::cout << "extract accumulators" << std::endl; auto rowVector = (*result)->as(); rowVector->resize(numGroups); auto valueVector = rowVector->childAt(0); auto comparisonVector = rowVector->childAt(1); - uint64_t* rawNulls = getRawNulls(rowVector); + uint64_t* rawNulls = + rowVector->mutableNulls(rowVector->size())->asMutable(); T* rawValues = nullptr; uint64_t* rawBoolValues = nullptr; @@ -245,8 +258,11 @@ class MinMaxByAggregateBase : public exec::Aggregate { char* group = groups[i]; if (isNull(group)) { rowVector->setNull(i, true); + std::cout << "extract group null" << std::endl; } else { - clearNull(rawNulls, i); + // clearNull(rawNulls, i); + rowVector->setNull(i, false); + std::cout << "extract group not null" << std::endl; const bool isValueNull = valueIsNull(group); bits::setNull(rawValueNulls, i, isValueNull); if (LIKELY(!isValueNull)) { @@ -285,6 +301,18 @@ class MinMaxByAggregateBase : public exec::Aggregate { const auto* indices = decodedComparison_.indices(); if (decodedValue_.mayHaveNulls() || decodedComparison_.mayHaveNulls()) { rows.applyToSelected([&](vector_size_t i) { + if (decodedValue_.isNullAt(i)) { + std::cout << "RAW input value is null" << std::endl; + if (!decodedComparison_.isNullAt(i)) { + std::cout << "RAW input comparison is " + << decodedComparison_.valueAt(i) << std::endl; + } + } else { + std::cout << "RAW input value is " + << decodedValue_.valueAt(i) << std::endl; + std::cout << "RAW input comparison is " + << decodedComparison_.valueAt(i) << std::endl; + } if (checkNestedNulls( decodedComparison_, indices, i, throwOnNestedNulls_)) { return; @@ -299,6 +327,18 @@ class MinMaxByAggregateBase : public exec::Aggregate { }); } else { rows.applyToSelected([&](vector_size_t i) { + if (decodedValue_.isNullAt(i)) { + std::cout << "RAW input value is null" << std::endl; + if (!decodedComparison_.isNullAt(i)) { + std::cout << "RAW input comparison is " + << decodedComparison_.valueAt(i) << std::endl; + } + } else { + std::cout << "RAW input value is " + << decodedValue_.valueAt(i) << std::endl; + std::cout << "RAW input comparison is " + << decodedComparison_.valueAt(i) << std::endl; + } if (throwOnNestedNulls_) { checkNestedNulls(decodedComparison_, indices, i, throwOnNestedNulls_); } @@ -323,24 +363,72 @@ class MinMaxByAggregateBase : public exec::Aggregate { if (decodedIntermediateResult_.isConstantMapping() && decodedIntermediateResult_.isNullAt(0)) { + if (!decodedComparison_.isNullAt(0) || !decodedValue_.isNullAt(0)) { + std::cout + << "decodedIntermediate result is null, but value or comparison is not " + << std::endl; + } return; } if (decodedIntermediateResult_.mayHaveNulls()) { - rows.applyToSelected([&](vector_size_t i) { - if (decodedIntermediateResult_.isNullAt(i)) { - return; - } - const auto decodedIndex = decodedIntermediateResult_.index(i); - updateValues( - groups[i], - decodedValue_, - decodedComparison_, - decodedIndex, - decodedValue_.isNullAt(decodedIndex), - mayUpdate); - }); + rows.applyToSelected( + [&](vector_size_t i) { + if (decodedIntermediateResult_.isNullAt(i)) { + if (decodedValue_.isNullAt(i)) { + std::cout << "addIntermediateResults is null, value is null" + << std::endl; + if (!decodedComparison_.isNullAt(i)) { + std::cout + << "addIntermediateResults is null, input comparison is " + << decodedComparison_.valueAt(i) << std::endl; + } + } else { + std::cout << "addIntermediateResults is null, input value is " + << decodedValue_.valueAt(i) << std::endl; + std::cout << "addIntermediateResults is null, input comparison is " + << decodedComparison_.valueAt(i) + << std::endl; + } + return; + } + if (decodedValue_.isNullAt(i)) { + std::cout << "addIntermediateResults input value is null" + << std::endl; + if (!decodedComparison_.isNullAt(i)) { + std::cout << "addIntermediateResults input comparison is " + << decodedComparison_.valueAt(i) + << std::endl; + } + } else { + std::cout << "addIntermediateResults input value is " + << decodedValue_.valueAt(i) << std::endl; + std::cout << "addIntermediateResults input comparison is " + << decodedComparison_.valueAt(i) << std::endl; + } + const auto decodedIndex = decodedIntermediateResult_.index(i); + updateValues( + groups[i], + decodedValue_, + decodedComparison_, + decodedIndex, + decodedValue_.isNullAt(decodedIndex), + mayUpdate); + }); } else { rows.applyToSelected([&](vector_size_t i) { + if (decodedValue_.isNullAt(i)) { + std::cout << "addIntermediateResults input value is null" + << std::endl; + if (!decodedComparison_.isNullAt(i)) { + std::cout << "addIntermediateResults input comparison is " + << decodedComparison_.valueAt(i) << std::endl; + } + } else { + std::cout << "addIntermediateResults input value is " + << decodedValue_.valueAt(i) << std::endl; + std::cout << "addIntermediateResults input comparison is " + << decodedComparison_.valueAt(i) << std::endl; + } const auto decodedIndex = decodedIntermediateResult_.index(i); updateValues( groups[i], @@ -382,6 +470,18 @@ class MinMaxByAggregateBase : public exec::Aggregate { } else if ( decodedValue_.mayHaveNulls() || decodedComparison_.mayHaveNulls()) { rows.applyToSelected([&](vector_size_t i) { + if (decodedValue_.isNullAt(i)) { + std::cout << "addSingleGroupRaw input value is null" << std::endl; + if (!decodedComparison_.isNullAt(i)) { + std::cout << "addSingleGroupRaw input comparison is " + << decodedComparison_.valueAt(i) << std::endl; + } + } else { + std::cout << "addSingleGroupRaw input value is " + << decodedValue_.valueAt(i) << std::endl; + std::cout << "addSingleGroupRaw input comparison is " + << decodedComparison_.valueAt(i) << std::endl; + } if (checkNestedNulls( decodedComparison_, indices, i, throwOnNestedNulls_)) { return; @@ -396,6 +496,18 @@ class MinMaxByAggregateBase : public exec::Aggregate { }); } else { rows.applyToSelected([&](vector_size_t i) { + if (decodedValue_.isNullAt(i)) { + std::cout << "addSingleGroupRaw input value is null" << std::endl; + if (!decodedComparison_.isNullAt(i)) { + std::cout << "addSingleGroupRaw input comparison is " + << decodedComparison_.valueAt(i) << std::endl; + } + } else { + std::cout << "addSingleGroupRaw input value is " + << decodedValue_.valueAt(i) << std::endl; + std::cout << "addSingleGroupRaw input comparison is " + << decodedComparison_.valueAt(i) << std::endl; + } if (throwOnNestedNulls_) { checkNestedNulls(decodedComparison_, indices, i, throwOnNestedNulls_); } @@ -437,6 +549,20 @@ class MinMaxByAggregateBase : public exec::Aggregate { } else if (decodedIntermediateResult_.mayHaveNulls()) { rows.applyToSelected([&](vector_size_t i) { if (decodedIntermediateResult_.isNullAt(i)) { + if (decodedValue_.isNullAt(i)) { + std::cout + << "addSingleGroupIntermediateResults is null, value is null" + << std::endl; + if (!decodedComparison_.isNullAt(i)) { + std::cout << "addSingleGroupIntermediateResults is null, input comparison is " + << decodedComparison_.valueAt(i) << std::endl; + } + } else { + std::cout << "addSingleGroupIntermediateResults is null, input value is " + << decodedValue_.valueAt(i) << std::endl; + std::cout << "addSingleGroupIntermediateResults is null, input comparison is " + << decodedComparison_.valueAt(i) << std::endl; + } return; } const auto decodedIndex = decodedIntermediateResult_.index(i); @@ -450,6 +576,20 @@ class MinMaxByAggregateBase : public exec::Aggregate { }); } else { rows.applyToSelected([&](vector_size_t i) { + if (decodedValue_.isNullAt(i)) { + std::cout << "addSingleGroupIntermediateResults input value is null " + << std::endl; + if (!decodedComparison_.isNullAt(i)) { + std::cout + << "addSingleGroupIntermediateResults input comparison is " + << decodedComparison_.valueAt(i) << std::endl; + } + } else { + std::cout << "addSingleGroupIntermediateResults input value is " + << decodedValue_.valueAt(i) << std::endl; + std::cout << "addSingleGroupIntermediateResults input comparison is " + << decodedComparison_.valueAt(i) << std::endl; + } const auto decodedIndex = decodedIntermediateResult_.index(i); updateValues( group, diff --git a/velox/functions/sparksql/CMakeLists.txt b/velox/functions/sparksql/CMakeLists.txt index b747e013d410..5ce509aff85e 100644 --- a/velox/functions/sparksql/CMakeLists.txt +++ b/velox/functions/sparksql/CMakeLists.txt @@ -60,7 +60,6 @@ add_subdirectory(window) if(${VELOX_ENABLE_AGGREGATES}) add_subdirectory(aggregates) endif() - if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) add_subdirectory(coverage) diff --git a/velox/functions/sparksql/aggregates/tests/MinMaxByAggregationTest.cpp b/velox/functions/sparksql/aggregates/tests/MinMaxByAggregationTest.cpp index e15b14b51547..5c694cdf03b5 100644 --- a/velox/functions/sparksql/aggregates/tests/MinMaxByAggregationTest.cpp +++ b/velox/functions/sparksql/aggregates/tests/MinMaxByAggregationTest.cpp @@ -32,7 +32,19 @@ class MinMaxByAggregateTest : public AggregationTestBase { registerAggregateFunctions("spark_"); } }; +TEST_F(MinMaxByAggregateTest, maxByNull) { + auto vectors = {makeRowVector({ + makeNullableFlatVector({1, 2, std::nullopt}), + makeFlatVector({11, 12, 13}), + })}; + + auto expected = {makeRowVector({ + makeNullableFlatVector({std::nullopt}), + })}; + testAggregations(vectors, {}, {"spark_max_by(c0, c1)"}, expected); +} +/* TEST_F(MinMaxByAggregateTest, maxBy) { auto vectors = {makeRowVector({ makeFlatVector({1, 2, 3}), @@ -214,6 +226,6 @@ TEST_F(MinMaxByAggregateTest, rowCompare) { testAggregations( {data}, {}, {"spark_min_by(c0, c1)", "spark_max_by(c0, c1)"}, {expected}); } - +*/ } // namespace } // namespace facebook::velox::functions::aggregate::sparksql::test