From fb045b02ea7b6cc27d23ba984cbf5d80561d1256 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Wed, 6 Dec 2023 18:27:51 -0800 Subject: [PATCH] Fix PartitionedOutput with constant keys (#7906) Summary: PartitionedOutput operator used to fail if some of the partitioning keys are constant. ``` VeloxRuntimeError: index < childrenSize_ (4294967295 vs. 1) Trying to access non-existing child in RowVector: [ROW ROW: 418 elements, no nulls] ... at Unknown.# 2 facebook::velox::RowVector::childAt(unsigned int)(Unknown Source) at Unknown.# 3 facebook::velox::exec::PartitionedOutput::collectNullRows()(Unknown Source) ``` Pull Request resolved: https://github.com/facebookincubator/velox/pull/7906 Reviewed By: xiaoxmeng Differential Revision: D51915939 Pulled By: mbasmanova fbshipit-source-id: 38fd37e20ae3ea6461d3a4af2225b23b73885dc8 --- velox/exec/PartitionedOutput.cpp | 3 ++ velox/exec/tests/MultiFragmentTest.cpp | 61 ++++++++++++++++++++++ velox/exec/tests/utils/PlanBuilder.cpp | 72 +++++++++++++++++++------- velox/exec/tests/utils/PlanBuilder.h | 4 +- 4 files changed, 120 insertions(+), 20 deletions(-) diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index d2a7b5f2e83f..307c708dbcfb 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -288,6 +288,9 @@ void PartitionedOutput::collectNullRows() { decodedVectors_.resize(keyChannels_.size()); for (auto i : keyChannels_) { + if (i == kConstantChannel) { + continue; + } auto& keyVector = input_->childAt(i); if (keyVector->mayHaveNulls()) { decodedVectors_[i].decode(*keyVector, rows_); diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index 6eec2781df09..add3f70d4b01 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -741,6 +741,67 @@ TEST_F(MultiFragmentTest, roundRobinPartition) { } } +// Test PartitionedOutput operator with constant partitioning keys. +TEST_F(MultiFragmentTest, constantKeys) { + auto data = makeRowVector({ + makeFlatVector( + 1'000, [](auto row) { return row; }, nullEvery(7)), + }); + + std::vector> tasks; + auto addTask = [&](std::shared_ptr task, + const std::vector& remoteTaskIds) { + tasks.emplace_back(task); + task->start(1); + if (!remoteTaskIds.empty()) { + addRemoteSplits(task, remoteTaskIds); + } + }; + + // Make leaf task: Values -> Repartitioning (3-way) + auto leafTaskId = makeTaskId("leaf", 0); + auto leafPlan = PlanBuilder() + .values({data}) + .partitionedOutput({"c0", "123"}, 3, true, {"c0"}) + .planNode(); + auto leafTask = makeTask(leafTaskId, leafPlan, 0); + addTask(leafTask, {}); + + // Make next stage tasks to count nulls. + core::PlanNodePtr finalAggPlan; + std::vector finalAggTaskIds; + for (int i = 0; i < 3; i++) { + finalAggPlan = + PlanBuilder() + .exchange(leafPlan->outputType()) + .project({"c0 is null AS co_is_null"}) + .partialAggregation({}, {"count_if(co_is_null)", "count(1)"}) + .partitionedOutput({}, 1) + .planNode(); + + finalAggTaskIds.push_back(makeTaskId("final-agg", i)); + auto task = makeTask(finalAggTaskIds.back(), finalAggPlan, i); + addTask(task, {leafTaskId}); + } + + // Collect results and verify number of nulls is 3 times larger than in the + // original data. + auto op = PlanBuilder() + .exchange(finalAggPlan->outputType()) + .finalAggregation( + {}, {"sum(a0)", "sum(a1)"}, {{BIGINT()}, {BIGINT()}}) + .planNode(); + + assertQuery( + op, + finalAggTaskIds, + "SELECT 3 * ceil(1000.0 / 7) /* number of null rows */, 1000 + 2 * ceil(1000.0 / 7) /* total number of rows */"); + + for (auto& task : tasks) { + ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); + } +} + TEST_F(MultiFragmentTest, replicateNullsAndAny) { auto data = makeRowVector({makeFlatVector( 1'000, [](auto row) { return row; }, nullEvery(7))}); diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index b5c0c1b9e29d..d76c3ee69087 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -1047,17 +1047,33 @@ PlanBuilder& PlanBuilder::assignUniqueId( namespace { core::PartitionFunctionSpecPtr createPartitionFunctionSpec( const RowTypePtr& inputType, - const std::vector& keys) { + const std::vector& keys, + memory::MemoryPool* pool) { if (keys.empty()) { return std::make_shared(); } else { std::vector keyIndices; keyIndices.reserve(keys.size()); + + std::vector constValues; + constValues.reserve(keys.size()); + for (const auto& key : keys) { - keyIndices.push_back(inputType->getChildIdx(key)); + if (auto field = + std::dynamic_pointer_cast( + key)) { + keyIndices.push_back(inputType->getChildIdx(field->name())); + } else if ( + auto constant = + std::dynamic_pointer_cast(key)) { + keyIndices.push_back(kConstantChannel); + constValues.push_back(constant->toConstantVector(pool)); + } else { + VELOX_UNREACHABLE(); + } } return std::make_shared( - inputType, std::move(keyIndices)); + inputType, std::move(keyIndices), std::move(constValues)); } } @@ -1097,10 +1113,11 @@ RowTypePtr rename( core::PlanNodePtr createLocalPartitionNode( const core::PlanNodeId& planNodeId, - const std::vector& keys, - const std::vector& sources) { + const std::vector& keys, + const std::vector& sources, + memory::MemoryPool* pool) { auto partitionFunctionFactory = - createPartitionFunctionSpec(sources[0]->outputType(), keys); + createPartitionFunctionSpec(sources[0]->outputType(), keys, pool); return std::make_shared( planNodeId, keys.empty() ? core::LocalPartitionNode::Type::kGather @@ -1124,11 +1141,13 @@ PlanBuilder& PlanBuilder::partitionedOutput( const std::vector& outputLayout) { VELOX_CHECK_NOT_NULL( planNode_, "PartitionedOutput cannot be the source node"); + + auto keyExprs = exprs(keys, planNode_->outputType()); return partitionedOutput( keys, numPartitions, replicateNullsAndAny, - createPartitionFunctionSpec(planNode_->outputType(), keys), + createPartitionFunctionSpec(planNode_->outputType(), keyExprs, pool_), outputLayout); } @@ -1146,7 +1165,7 @@ PlanBuilder& PlanBuilder::partitionedOutput( planNode_ = std::make_shared( nextPlanNodeId(), core::PartitionedOutputNode::Kind::kPartitioned, - exprs(keys), + exprs(keys, planNode_->outputType()), numPartitions, replicateNullsAndAny, std::move(partitionFunctionSpec), @@ -1183,12 +1202,17 @@ PlanBuilder& PlanBuilder::localPartition( const std::vector& keys, const std::vector& sources) { VELOX_CHECK_NULL(planNode_, "localPartition() must be the first call"); - planNode_ = createLocalPartitionNode(nextPlanNodeId(), keys, sources); + planNode_ = createLocalPartitionNode( + nextPlanNodeId(), exprs(keys, sources[0]->outputType()), sources, pool_); return *this; } PlanBuilder& PlanBuilder::localPartition(const std::vector& keys) { - planNode_ = createLocalPartitionNode(nextPlanNodeId(), keys, {planNode_}); + planNode_ = createLocalPartitionNode( + nextPlanNodeId(), + exprs(keys, planNode_->outputType()), + {planNode_}, + pool_); return *this; } @@ -1866,16 +1890,26 @@ PlanBuilder::fields(const std::vector& indices) { } std::vector PlanBuilder::exprs( - const std::vector& names) { - VELOX_CHECK_NOT_NULL(planNode_); - auto flds = fields(planNode_->outputType(), names); - std::vector expressions; - expressions.reserve(flds.size()); - for (const auto& fld : flds) { - expressions.emplace_back( - std::dynamic_pointer_cast(fld)); + const std::vector& expressions, + const RowTypePtr& inputType) { + std::vector typedExpressions; + for (auto& expr : expressions) { + auto typedExpression = core::Expressions::inferTypes( + parse::parseExpr(expr, options_), inputType, pool_); + + if (auto field = dynamic_cast( + typedExpression.get())) { + typedExpressions.push_back(typedExpression); + } else if ( + auto constant = dynamic_cast( + typedExpression.get())) { + typedExpressions.push_back(typedExpression); + } else { + VELOX_FAIL("Expected field name or constant: {}", expr); + } } - return expressions; + + return typedExpressions; } core::TypedExprPtr PlanBuilder::inferTypes( diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index d7a46e2ebb1a..bf151970732b 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -1006,7 +1006,9 @@ class PlanBuilder { std::shared_ptr field( const std::string& name); - std::vector exprs(const std::vector& names); + std::vector exprs( + const std::vector& expressions, + const RowTypePtr& inputType); std::vector> fields( const std::vector& names);