Skip to content

Commit

Permalink
Fix PartitionedOutput with constant keys (#7906)
Browse files Browse the repository at this point in the history
Summary:
PartitionedOutput operator used to fail if some of the partitioning keys are constant.

```
VeloxRuntimeError: index < childrenSize_ (4294967295 vs. 1) Trying to access non-existing child in RowVector: [ROW ROW<foo_id:BIGINT>: 418 elements, no nulls]
...
	at Unknown.# 2  facebook::velox::RowVector::childAt(unsigned int)(Unknown Source)
	at Unknown.# 3  facebook::velox::exec::PartitionedOutput::collectNullRows()(Unknown Source)
```

Pull Request resolved: #7906

Reviewed By: xiaoxmeng

Differential Revision: D51915939

Pulled By: mbasmanova

fbshipit-source-id: 38fd37e20ae3ea6461d3a4af2225b23b73885dc8
  • Loading branch information
mbasmanova authored and facebook-github-bot committed Dec 7, 2023
1 parent 0c46847 commit fb045b0
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 20 deletions.
3 changes: 3 additions & 0 deletions velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ void PartitionedOutput::collectNullRows() {
decodedVectors_.resize(keyChannels_.size());

for (auto i : keyChannels_) {
if (i == kConstantChannel) {
continue;
}
auto& keyVector = input_->childAt(i);
if (keyVector->mayHaveNulls()) {
decodedVectors_[i].decode(*keyVector, rows_);
Expand Down
61 changes: 61 additions & 0 deletions velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,67 @@ TEST_F(MultiFragmentTest, roundRobinPartition) {
}
}

// Test PartitionedOutput operator with constant partitioning keys.
TEST_F(MultiFragmentTest, constantKeys) {
auto data = makeRowVector({
makeFlatVector<int32_t>(
1'000, [](auto row) { return row; }, nullEvery(7)),
});

std::vector<std::shared_ptr<Task>> tasks;
auto addTask = [&](std::shared_ptr<Task> task,
const std::vector<std::string>& remoteTaskIds) {
tasks.emplace_back(task);
task->start(1);
if (!remoteTaskIds.empty()) {
addRemoteSplits(task, remoteTaskIds);
}
};

// Make leaf task: Values -> Repartitioning (3-way)
auto leafTaskId = makeTaskId("leaf", 0);
auto leafPlan = PlanBuilder()
.values({data})
.partitionedOutput({"c0", "123"}, 3, true, {"c0"})
.planNode();
auto leafTask = makeTask(leafTaskId, leafPlan, 0);
addTask(leafTask, {});

// Make next stage tasks to count nulls.
core::PlanNodePtr finalAggPlan;
std::vector<std::string> finalAggTaskIds;
for (int i = 0; i < 3; i++) {
finalAggPlan =
PlanBuilder()
.exchange(leafPlan->outputType())
.project({"c0 is null AS co_is_null"})
.partialAggregation({}, {"count_if(co_is_null)", "count(1)"})
.partitionedOutput({}, 1)
.planNode();

finalAggTaskIds.push_back(makeTaskId("final-agg", i));
auto task = makeTask(finalAggTaskIds.back(), finalAggPlan, i);
addTask(task, {leafTaskId});
}

// Collect results and verify number of nulls is 3 times larger than in the
// original data.
auto op = PlanBuilder()
.exchange(finalAggPlan->outputType())
.finalAggregation(
{}, {"sum(a0)", "sum(a1)"}, {{BIGINT()}, {BIGINT()}})
.planNode();

assertQuery(
op,
finalAggTaskIds,
"SELECT 3 * ceil(1000.0 / 7) /* number of null rows */, 1000 + 2 * ceil(1000.0 / 7) /* total number of rows */");

for (auto& task : tasks) {
ASSERT_TRUE(waitForTaskCompletion(task.get())) << task->taskId();
}
}

TEST_F(MultiFragmentTest, replicateNullsAndAny) {
auto data = makeRowVector({makeFlatVector<int32_t>(
1'000, [](auto row) { return row; }, nullEvery(7))});
Expand Down
72 changes: 53 additions & 19 deletions velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1047,17 +1047,33 @@ PlanBuilder& PlanBuilder::assignUniqueId(
namespace {
core::PartitionFunctionSpecPtr createPartitionFunctionSpec(
const RowTypePtr& inputType,
const std::vector<std::string>& keys) {
const std::vector<core::TypedExprPtr>& keys,
memory::MemoryPool* pool) {
if (keys.empty()) {
return std::make_shared<core::GatherPartitionFunctionSpec>();
} else {
std::vector<column_index_t> keyIndices;
keyIndices.reserve(keys.size());

std::vector<VectorPtr> constValues;
constValues.reserve(keys.size());

for (const auto& key : keys) {
keyIndices.push_back(inputType->getChildIdx(key));
if (auto field =
std::dynamic_pointer_cast<const core::FieldAccessTypedExpr>(
key)) {
keyIndices.push_back(inputType->getChildIdx(field->name()));
} else if (
auto constant =
std::dynamic_pointer_cast<const core::ConstantTypedExpr>(key)) {
keyIndices.push_back(kConstantChannel);
constValues.push_back(constant->toConstantVector(pool));
} else {
VELOX_UNREACHABLE();
}
}
return std::make_shared<HashPartitionFunctionSpec>(
inputType, std::move(keyIndices));
inputType, std::move(keyIndices), std::move(constValues));
}
}

Expand Down Expand Up @@ -1097,10 +1113,11 @@ RowTypePtr rename(

core::PlanNodePtr createLocalPartitionNode(
const core::PlanNodeId& planNodeId,
const std::vector<std::string>& keys,
const std::vector<core::PlanNodePtr>& sources) {
const std::vector<core::TypedExprPtr>& keys,
const std::vector<core::PlanNodePtr>& sources,
memory::MemoryPool* pool) {
auto partitionFunctionFactory =
createPartitionFunctionSpec(sources[0]->outputType(), keys);
createPartitionFunctionSpec(sources[0]->outputType(), keys, pool);
return std::make_shared<core::LocalPartitionNode>(
planNodeId,
keys.empty() ? core::LocalPartitionNode::Type::kGather
Expand All @@ -1124,11 +1141,13 @@ PlanBuilder& PlanBuilder::partitionedOutput(
const std::vector<std::string>& outputLayout) {
VELOX_CHECK_NOT_NULL(
planNode_, "PartitionedOutput cannot be the source node");

auto keyExprs = exprs(keys, planNode_->outputType());
return partitionedOutput(
keys,
numPartitions,
replicateNullsAndAny,
createPartitionFunctionSpec(planNode_->outputType(), keys),
createPartitionFunctionSpec(planNode_->outputType(), keyExprs, pool_),
outputLayout);
}

Expand All @@ -1146,7 +1165,7 @@ PlanBuilder& PlanBuilder::partitionedOutput(
planNode_ = std::make_shared<core::PartitionedOutputNode>(
nextPlanNodeId(),
core::PartitionedOutputNode::Kind::kPartitioned,
exprs(keys),
exprs(keys, planNode_->outputType()),
numPartitions,
replicateNullsAndAny,
std::move(partitionFunctionSpec),
Expand Down Expand Up @@ -1183,12 +1202,17 @@ PlanBuilder& PlanBuilder::localPartition(
const std::vector<std::string>& keys,
const std::vector<core::PlanNodePtr>& sources) {
VELOX_CHECK_NULL(planNode_, "localPartition() must be the first call");
planNode_ = createLocalPartitionNode(nextPlanNodeId(), keys, sources);
planNode_ = createLocalPartitionNode(
nextPlanNodeId(), exprs(keys, sources[0]->outputType()), sources, pool_);
return *this;
}

PlanBuilder& PlanBuilder::localPartition(const std::vector<std::string>& keys) {
planNode_ = createLocalPartitionNode(nextPlanNodeId(), keys, {planNode_});
planNode_ = createLocalPartitionNode(
nextPlanNodeId(),
exprs(keys, planNode_->outputType()),
{planNode_},
pool_);
return *this;
}

Expand Down Expand Up @@ -1866,16 +1890,26 @@ PlanBuilder::fields(const std::vector<column_index_t>& indices) {
}

std::vector<core::TypedExprPtr> PlanBuilder::exprs(
const std::vector<std::string>& names) {
VELOX_CHECK_NOT_NULL(planNode_);
auto flds = fields(planNode_->outputType(), names);
std::vector<core::TypedExprPtr> expressions;
expressions.reserve(flds.size());
for (const auto& fld : flds) {
expressions.emplace_back(
std::dynamic_pointer_cast<const core::FieldAccessTypedExpr>(fld));
const std::vector<std::string>& expressions,
const RowTypePtr& inputType) {
std::vector<core::TypedExprPtr> typedExpressions;
for (auto& expr : expressions) {
auto typedExpression = core::Expressions::inferTypes(
parse::parseExpr(expr, options_), inputType, pool_);

if (auto field = dynamic_cast<const core::FieldAccessTypedExpr*>(
typedExpression.get())) {
typedExpressions.push_back(typedExpression);
} else if (
auto constant = dynamic_cast<const core::ConstantTypedExpr*>(
typedExpression.get())) {
typedExpressions.push_back(typedExpression);
} else {
VELOX_FAIL("Expected field name or constant: {}", expr);
}
}
return expressions;

return typedExpressions;
}

core::TypedExprPtr PlanBuilder::inferTypes(
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/tests/utils/PlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,9 @@ class PlanBuilder {
std::shared_ptr<const core::FieldAccessTypedExpr> field(
const std::string& name);

std::vector<core::TypedExprPtr> exprs(const std::vector<std::string>& names);
std::vector<core::TypedExprPtr> exprs(
const std::vector<std::string>& expressions,
const RowTypePtr& inputType);

std::vector<std::shared_ptr<const core::FieldAccessTypedExpr>> fields(
const std::vector<std::string>& names);
Expand Down

0 comments on commit fb045b0

Please sign in to comment.