From 518a808acd03054cb67c5d9a48ac027440364d62 Mon Sep 17 00:00:00 2001 From: Bikramjeet Vig Date: Thu, 2 May 2024 11:44:42 -0700 Subject: [PATCH] Enhance lazy vector consistency check (#9687) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/9687 A consistency check in lazy vectors to ensure an unloaded lazy vectors cannot be wrapped by two different top level vectors. We recently encountered a bug where this check was triggered in HashProbe. An input lazy vector was temporarily encapsulated before applying a filter, and the same vector was then encapsulated again at the same level while generating output, which triggered a failure in this check. This change adds support for this use case to the consistency check and ensures that HashProbe promptly destroys the temporary vector before encapsulating it again during output generation. Reviewed By: mbasmanova Differential Revision: D56842170 fbshipit-source-id: 65c5f2ad670898a39439351cbed5128c48a1ea2f --- velox/exec/HashProbe.cpp | 14 ++++--- velox/exec/HashProbe.h | 14 +++---- velox/exec/tests/HashJoinTest.cpp | 65 +++++++++++++++++++++++++++++++ velox/vector/ConstantVector.h | 6 ++- velox/vector/DictionaryVector.h | 5 ++- velox/vector/tests/VectorTest.cpp | 7 ++++ 6 files changed, 95 insertions(+), 16 deletions(-) diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index 66cc6b7801e9..3ff7e6c5d2e5 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -1053,7 +1053,7 @@ bool HashProbe::maybeReadSpillOutput() { return true; } -void HashProbe::fillFilterInput(vector_size_t size) { +RowVectorPtr HashProbe::createFilterInput(vector_size_t size) { std::vector filterColumns(filterInputType_->size()); for (auto projection : filterInputProjections_) { ensureLoadedIfNotAtEnd(projection.inputChannel); @@ -1069,11 +1069,12 @@ void HashProbe::fillFilterInput(vector_size_t size) { filterInputType_->children(), filterColumns); - filterInput_ = std::make_shared( + return std::make_shared( pool(), filterInputType_, nullptr, size, std::move(filterColumns)); } void HashProbe::prepareFilterRowsForNullAwareJoin( + RowVectorPtr& filterInput, vector_size_t numRows, bool filterPropagateNulls) { VELOX_CHECK_LE(numRows, kBatchSize); @@ -1087,7 +1088,7 @@ void HashProbe::prepareFilterRowsForNullAwareJoin( auto* rawNullRows = nullFilterInputRows_.asMutableRange().bits(); for (auto& projection : filterInputProjections_) { filterInputColumnDecodedVector_.decode( - *filterInput_->childAt(projection.outputChannel), filterInputRows_); + *filterInput->childAt(projection.outputChannel), filterInputRows_); if (filterInputColumnDecodedVector_.mayHaveNulls()) { SelectivityVector nullsInActiveRows(numRows); memcpy( @@ -1286,13 +1287,14 @@ int32_t HashProbe::evalFilter(int32_t numRows) { filterInputRows_.updateBounds(); } - fillFilterInput(numRows); + RowVectorPtr filterInput = createFilterInput(numRows); if (nullAware_) { - prepareFilterRowsForNullAwareJoin(numRows, filterPropagateNulls); + prepareFilterRowsForNullAwareJoin( + filterInput, numRows, filterPropagateNulls); } - EvalCtx evalCtx(operatorCtx_->execCtx(), filter_.get(), filterInput_.get()); + EvalCtx evalCtx(operatorCtx_->execCtx(), filter_.get(), filterInput.get()); filter_->eval(0, 1, true, filterInputRows_, evalCtx, filterResult_); decodedFilterResult_.decode(*filterResult_[0], filterInputRows_); diff --git a/velox/exec/HashProbe.h b/velox/exec/HashProbe.h index 997847fbb057..3b9af4fd3d8e 100644 --- a/velox/exec/HashProbe.h +++ b/velox/exec/HashProbe.h @@ -128,14 +128,17 @@ class HashProbe : public Operator { decodedFilterResult_.valueAt(row); } - // Populate filter input columns. - void fillFilterInput(vector_size_t size); + // Create a temporary input vector to be passed to the filter. This ensures it + // gets destroyed in case its wrapping an unloaded vector which eventually + // needs to be wrapped in fillOutput(). + RowVectorPtr createFilterInput(vector_size_t size); // Prepare filter row selectivity for null-aware join. 'numRows' // specifies the number of rows in 'filterInputRows_' to process. If // 'filterPropagateNulls' is true, the probe input row which has null in any // probe filter column can't pass the filter. void prepareFilterRowsForNullAwareJoin( + RowVectorPtr& filterInput, vector_size_t numRows, bool filterPropagateNulls); @@ -372,7 +375,7 @@ class HashProbe : public Operator { // side. Used by right semi project join. bool probeSideHasNullKeys_{false}; - // Rows in 'filterInput_' to apply 'filter_' to. + // Rows in the filter columns to apply 'filter_' to. SelectivityVector filterInputRows_; // Join filter. @@ -390,11 +393,6 @@ class HashProbe : public Operator { // Maps from column index in hash table to channel in 'filterInputType_'. std::vector filterTableProjections_; - // Temporary projection from probe and build for evaluating - // 'filter_'. This can always be reused since this does not escape - // this operator. - RowVectorPtr filterInput_; - // The following six fields are used in null-aware anti join filter // processing. diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 9eb9b2354ea6..7bf3b87496ef 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -4041,6 +4041,71 @@ TEST_F(HashJoinTest, lazyVectors) { } } +TEST_F(HashJoinTest, lazyVectorNotLoadedInFilter) { + // Ensure that if lazy vectors are temporarily wrapped during a filter's + // execution and remain unloaded, the temporary wrap is promptly + // discarded. This precaution prevents the generation of the probe's output + // from wrapping an unloaded vector while the temporary wrap is + // still alive. + // This is done by generating a sufficiently small batch to allow the lazy + // vector to remain unloaded, as it doesn't need to be split between batches. + // Then we use a filter that skips the execution of the expression containing + // the lazy vector, thereby avoiding its loading. + const vector_size_t vectorSize = 1'000; + auto probeVectors = makeBatches(1, [&](int32_t /*unused*/) { + return makeRowVector( + {makeFlatVector(vectorSize, folly::identity), + makeFlatVector(vectorSize, [](auto row) { return row % 23; }), + makeFlatVector( + vectorSize, [](auto row) { return row % 31; })}); + }); + + std::vector buildVectors = + makeBatches(1, [&](int32_t /*unused*/) { + return makeRowVector({makeFlatVector( + vectorSize, [](auto row) { return row * 3; })}); + }); + + std::shared_ptr probeFile = TempFilePath::create(); + writeToFile(probeFile->getPath(), probeVectors); + + std::shared_ptr buildFile = TempFilePath::create(); + writeToFile(buildFile->getPath(), buildVectors); + + createDuckDbTable("t", probeVectors); + createDuckDbTable("u", buildVectors); + + // Lazy vector is part of the filter but never gets loaded. + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId probeScanId; + core::PlanNodeId buildScanId; + auto op = PlanBuilder(planNodeIdGenerator) + .tableScan(asRowType(probeVectors[0]->type())) + .capturePlanNodeId(probeScanId) + .hashJoin( + {"c0"}, + {"c0"}, + PlanBuilder(planNodeIdGenerator) + .tableScan(asRowType(buildVectors[0]->type())) + .capturePlanNodeId(buildScanId) + .planNode(), + "c1 >= 0 OR c2 > 0", + {"c1", "c2"}) + .planNode(); + SplitInput splitInput = { + {probeScanId, + {exec::Split(makeHiveConnectorSplit(probeFile->getPath()))}}, + {buildScanId, + {exec::Split(makeHiveConnectorSplit(buildFile->getPath()))}}, + }; + HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) + .planNode(std::move(op)) + .inputSplits(splitInput) + .checkSpillStats(false) + .referenceQuery("SELECT t.c1, t.c2 FROM t, u WHERE t.c0 = u.c0") + .run(); +} + TEST_F(HashJoinTest, dynamicFilters) { const int32_t numSplits = 10; const int32_t numRowsProbe = 333; diff --git a/velox/vector/ConstantVector.h b/velox/vector/ConstantVector.h index 816c88a0cffc..2a0e47e44ade 100644 --- a/velox/vector/ConstantVector.h +++ b/velox/vector/ConstantVector.h @@ -129,7 +129,11 @@ class ConstantVector final : public SimpleVector { setInternalState(); } - virtual ~ConstantVector() override = default; + virtual ~ConstantVector() override { + if (valueVector_) { + valueVector_->clearContainingLazyAndWrapped(); + } + } bool isNullAt(vector_size_t /*idx*/) const override { VELOX_DCHECK(initialized_); diff --git a/velox/vector/DictionaryVector.h b/velox/vector/DictionaryVector.h index c7e422490ecc..97b4d346e4a9 100644 --- a/velox/vector/DictionaryVector.h +++ b/velox/vector/DictionaryVector.h @@ -64,7 +64,9 @@ class DictionaryVector : public SimpleVector { std::optional representedBytes = std::nullopt, std::optional storageByteCount = std::nullopt); - virtual ~DictionaryVector() override = default; + virtual ~DictionaryVector() override { + dictionaryValues_->clearContainingLazyAndWrapped(); + } bool mayHaveNulls() const override { VELOX_DCHECK(initialized_); @@ -196,6 +198,7 @@ class DictionaryVector : public SimpleVector { } void setDictionaryValues(VectorPtr dictionaryValues) { + dictionaryValues_->clearContainingLazyAndWrapped(); dictionaryValues_ = dictionaryValues; initialized_ = false; setInternalState(); diff --git a/velox/vector/tests/VectorTest.cpp b/velox/vector/tests/VectorTest.cpp index eb992afe7d3d..6711347ed8ba 100644 --- a/velox/vector/tests/VectorTest.cpp +++ b/velox/vector/tests/VectorTest.cpp @@ -2248,6 +2248,13 @@ TEST_F(VectorTest, nestedLazy) { "An unloaded lazy vector cannot be wrapped by two different top level" " vectors."); + // Verify that if the original dictionary layer is destroyed without loading + // the underlying vector then the lazy vector can be wrapped in a new encoding + // layer. + dict.reset(); + dict = BaseVector::wrapInDictionary( + nullptr, makeIndices(size, indexAt), size, lazy); + // Verify that the unloaded dictionary can be nested as long as it has one top // level vector. EXPECT_NO_THROW(BaseVector::wrapInDictionary(