diff --git a/velox/exec/NestedLoopJoinProbe.cpp b/velox/exec/NestedLoopJoinProbe.cpp index 30f01a3f8318..e0eb706381a0 100644 --- a/velox/exec/NestedLoopJoinProbe.cpp +++ b/velox/exec/NestedLoopJoinProbe.cpp @@ -244,14 +244,16 @@ RowVectorPtr NestedLoopJoinProbe::getMismatchedOutput( } VELOX_CHECK_GT(numUnmatched, 0); - auto output = - BaseVector::create(outputType_, numUnmatched, pool()); - projectChildren(output, data, projections, numUnmatched, unmatchedMapping); - for (auto projection : nullProjections) { - output->childAt(projection.outputChannel) = BaseVector::createNullConstant( - outputType_->childAt(projection.outputChannel), output->size(), pool()); - } - return output; + std::vector projectedChildren(outputType_->size()); + projectChildren( + projectedChildren, data, projections, numUnmatched, unmatchedMapping); + for (auto [_, outputChannel] : nullProjections) { + VELOX_CHECK_GT(projectedChildren.size(), outputChannel); + projectedChildren[outputChannel] = BaseVector::createNullConstant( + outputType_->childAt(outputChannel), numUnmatched, pool()); + } + return std::make_shared( + pool(), outputType_, nullptr, numUnmatched, std::move(projectedChildren)); } void NestedLoopJoinProbe::finishProbeInput() { @@ -359,8 +361,6 @@ RowVectorPtr NestedLoopJoinProbe::getCrossProduct( const auto numOutputRows = probeCnt * buildSize; const bool probeCntChanged = (probeCnt != numPrevProbedRows_); numPrevProbedRows_ = probeCnt; - auto output = - BaseVector::create(outputType, numOutputRows, pool()); auto rawProbeIndices = initializeRowNumberMapping(probeIndices_, numOutputRows, pool()); @@ -382,15 +382,22 @@ RowVectorPtr NestedLoopJoinProbe::getCrossProduct( } } + std::vector projectedChildren(outputType->size()); projectChildren( - output, input_, probeProjections, numOutputRows, probeIndices_); + projectedChildren, + input_, + probeProjections, + numOutputRows, + probeIndices_); projectChildren( - output, + projectedChildren, buildVectors_.value()[buildIndex_], buildProjections, numOutputRows, buildIndices_); - return output; + + return std::make_shared( + pool(), outputType, nullptr, numOutputRows, std::move(projectedChildren)); } bool NestedLoopJoinProbe::advanceProbeRows(vector_size_t probeCnt) { @@ -465,17 +472,27 @@ RowVectorPtr NestedLoopJoinProbe::doMatch(vector_size_t probeCnt) { if (numOutputRows == 0) { return nullptr; } - auto output = - BaseVector::create(outputType_, numOutputRows, pool()); + + std::vector projectedChildren(outputType_->size()); projectChildren( - output, input_, identityProjections_, numOutputRows, probeOutMapping_); + projectedChildren, + input_, + identityProjections_, + numOutputRows, + probeOutMapping_); projectChildren( - output, + projectedChildren, buildVectors_.value()[buildIndex_], buildProjections_, numOutputRows, buildOutMapping_); - return output; + + return std::make_shared( + pool(), + outputType_, + nullptr, + numOutputRows, + std::move(projectedChildren)); } } // namespace facebook::velox::exec diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 76eb162159bb..440e62e08737 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -206,26 +206,26 @@ RowVectorPtr Operator::fillOutput( wrapResults = false; } - auto output{std::make_shared( - operatorCtx_->pool(), - outputType_, - nullptr, - size, - std::vector(outputType_->size(), nullptr))}; - output->resize(size); + std::vector projectedChildren(outputType_->size()); projectChildren( - output, + projectedChildren, input_, identityProjections_, size, wrapResults ? mapping : nullptr); projectChildren( - output, + projectedChildren, results_, resultProjections_, size, wrapResults ? mapping : nullptr); - return output; + + return std::make_shared( + operatorCtx_->pool(), + outputType_, + nullptr, + size, + std::move(projectedChildren)); } OperatorStats Operator::stats(bool clear) { diff --git a/velox/exec/OperatorUtils.cpp b/velox/exec/OperatorUtils.cpp index bc59e4aa3ec1..9d5e70a78c68 100644 --- a/velox/exec/OperatorUtils.cpp +++ b/velox/exec/OperatorUtils.cpp @@ -397,23 +397,27 @@ folly::Range initializeRowNumberMapping( } void projectChildren( - const RowVectorPtr& dest, + std::vector& projectedChildren, const RowVectorPtr& src, const std::vector& projections, int32_t size, const BufferPtr& mapping) { - projectChildren(dest, src->children(), projections, size, mapping); + projectChildren( + projectedChildren, src->children(), projections, size, mapping); } void projectChildren( - const RowVectorPtr& dest, + std::vector& projectedChildren, const std::vector& src, const std::vector& projections, int32_t size, const BufferPtr& mapping) { - for (const auto& projection : projections) { - dest->childAt(projection.outputChannel) = - wrapChild(size, mapping, src[projection.inputChannel]); + for (auto [inputChannel, outputChannel] : projections) { + if (outputChannel >= projectedChildren.size()) { + projectedChildren.resize(outputChannel + 1); + } + projectedChildren[outputChannel] = + wrapChild(size, mapping, src[inputChannel]); } } } // namespace facebook::velox::exec diff --git a/velox/exec/OperatorUtils.h b/velox/exec/OperatorUtils.h index c9ec14e12b41..6828cdf61d78 100644 --- a/velox/exec/OperatorUtils.h +++ b/velox/exec/OperatorUtils.h @@ -136,11 +136,14 @@ folly::Range initializeRowNumberMapping( vector_size_t size, memory::MemoryPool* pool); -/// Projects children of 'src' row vector to 'dest' row vector according to -/// 'projections' and 'mapping'. 'size' specifies number of projected rows in -/// 'dest'. +/// Projects children of 'src' row vector according to 'projections'. Optionally +/// takes a 'mapping' and 'size' that represent the indices and size, +/// respectively, of a dictionary wrapping that should be applied to the +/// projections. The output param 'projectedChildren' will contain all the final +/// projections at the expected channel index. Indices not specified in +/// 'projections' will be left untouched in 'projectedChildren'. void projectChildren( - const RowVectorPtr& dest, + std::vector& projectedChildren, const RowVectorPtr& src, const std::vector& projections, int32_t size, @@ -149,7 +152,7 @@ void projectChildren( /// Overload of the above function that takes reference to const vector of /// VectorPtr as 'src' argument, instead of row vector. void projectChildren( - const RowVectorPtr& dest, + std::vector& projectedChildren, const std::vector& src, const std::vector& projections, int32_t size, diff --git a/velox/exec/tests/OperatorUtilsTest.cpp b/velox/exec/tests/OperatorUtilsTest.cpp index df40347dfe29..f635ce0e30c2 100644 --- a/velox/exec/tests/OperatorUtilsTest.cpp +++ b/velox/exec/tests/OperatorUtilsTest.cpp @@ -311,12 +311,15 @@ TEST_F(OperatorUtilsTest, projectChildren) { { std::vector emptyProjection; - auto destRowVector = - BaseVector::create(srcRowType, srcVectorSize, pool()); + std::vector projectedChildren(srcRowType->size()); projectChildren( - destRowVector, srcRowVector, emptyProjection, srcVectorSize, nullptr); - for (vector_size_t i = 0; i < destRowVector->childrenSize(); ++i) { - ASSERT_EQ(destRowVector->childAt(i)->size(), 0); + projectedChildren, + srcRowVector, + emptyProjection, + srcVectorSize, + nullptr); + for (vector_size_t i = 0; i < projectedChildren.size(); ++i) { + ASSERT_EQ(projectedChildren[i], nullptr); } } @@ -325,17 +328,16 @@ TEST_F(OperatorUtilsTest, projectChildren) { for (auto i = 0; i < srcRowType->size(); ++i) { identicalProjections.emplace_back(i, i); } - auto destRowVector = - BaseVector::create(srcRowType, srcVectorSize, pool()); + std::vector projectedChildren(srcRowType->size()); projectChildren( - destRowVector, + projectedChildren, srcRowVector, identicalProjections, srcVectorSize, nullptr); for (const auto& projection : identicalProjections) { ASSERT_EQ( - destRowVector->childAt(projection.outputChannel).get(), + projectedChildren[projection.outputChannel].get(), srcRowVector->childAt(projection.inputChannel).get()); } } @@ -348,13 +350,12 @@ TEST_F(OperatorUtilsTest, projectChildren) { std::vector projections{}; projections.emplace_back(2, 0); projections.emplace_back(0, 1); - auto destRowVector = - BaseVector::create(destRowType, srcVectorSize, pool()); + std::vector projectedChildren(srcRowType->size()); projectChildren( - destRowVector, srcRowVector, projections, srcVectorSize, nullptr); + projectedChildren, srcRowVector, projections, srcVectorSize, nullptr); for (const auto& projection : projections) { ASSERT_EQ( - destRowVector->childAt(projection.outputChannel).get(), + projectedChildren[projection.outputChannel].get(), srcRowVector->childAt(projection.inputChannel).get()); } } diff --git a/velox/vector/BaseVector.cpp b/velox/vector/BaseVector.cpp index cd959a1dc1a7..d1c5b1c11a42 100644 --- a/velox/vector/BaseVector.cpp +++ b/velox/vector/BaseVector.cpp @@ -284,10 +284,9 @@ VectorPtr BaseVector::createInternal( case TypeKind::ROW: { std::vector children; auto rowType = type->as(); - // Children are reserved the parent size but are set to 0 elements. + // Children are reserved the parent size and accessible for those rows. for (int32_t i = 0; i < rowType.size(); ++i) { children.push_back(create(rowType.childAt(i), size, pool)); - children.back()->resize(0); } return std::make_shared( pool, type, nullptr, size, std::move(children)); diff --git a/velox/vector/tests/VectorSaverTest.cpp b/velox/vector/tests/VectorSaverTest.cpp index f494ca51d1c4..036865114c3c 100644 --- a/velox/vector/tests/VectorSaverTest.cpp +++ b/velox/vector/tests/VectorSaverTest.cpp @@ -50,6 +50,11 @@ class VectorSaverTest : public testing::Test, public VectorTestBase { // are the same. switch (expected->encoding()) { case VectorEncoding::Simple::CONSTANT: + if (expected->isNullAt(0)) { + // No need to compare value vector as deserialized RowVector can have + // different values in its flat children of size 1. + break; + } case VectorEncoding::Simple::DICTIONARY: if (expected->valueVector()) { ASSERT_TRUE(actual->valueVector() != nullptr); diff --git a/velox/vector/tests/VectorTest.cpp b/velox/vector/tests/VectorTest.cpp index 9180897874cc..8dc1965b4fc5 100644 --- a/velox/vector/tests/VectorTest.cpp +++ b/velox/vector/tests/VectorTest.cpp @@ -1055,6 +1055,10 @@ TEST_F(VectorTest, row) { BaseVector::createNullConstant(baseRow->type(), 50, pool_.get()); testCopy(allNull, numIterations_); testSlices(allNull); + // created from BaseVector::Create() + baseRow = BaseVector::create(baseRow->type(), vectorSize_, pool()); + testCopy(baseRow, numIterations_); + testSlices(baseRow); } TEST_F(VectorTest, array) {