Skip to content

Commit

Permalink
Enhance lazy vector consistency check (facebookincubator#9687)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#9687

A consistency check in lazy vectors to ensure an unloaded lazy vectors
cannot be wrapped by two different top level vectors. We recently
encountered a bug where this check was triggered in HashProbe. An
input lazy vector was temporarily encapsulated before applying a
filter, and the same vector was then encapsulated again at the same
level while generating output, which triggered a failure in this
check. This change adds support for this use case to the consistency
check and ensures that HashProbe promptly destroys the temporary
vector before encapsulating it again during output generation.

Reviewed By: mbasmanova

Differential Revision: D56842170

fbshipit-source-id: 65c5f2ad670898a39439351cbed5128c48a1ea2f
  • Loading branch information
Bikramjeet Vig authored and facebook-github-bot committed May 2, 2024
1 parent abb94a3 commit 518a808
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 16 deletions.
14 changes: 8 additions & 6 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ bool HashProbe::maybeReadSpillOutput() {
return true;
}

void HashProbe::fillFilterInput(vector_size_t size) {
RowVectorPtr HashProbe::createFilterInput(vector_size_t size) {
std::vector<VectorPtr> filterColumns(filterInputType_->size());
for (auto projection : filterInputProjections_) {
ensureLoadedIfNotAtEnd(projection.inputChannel);
Expand All @@ -1069,11 +1069,12 @@ void HashProbe::fillFilterInput(vector_size_t size) {
filterInputType_->children(),
filterColumns);

filterInput_ = std::make_shared<RowVector>(
return std::make_shared<RowVector>(
pool(), filterInputType_, nullptr, size, std::move(filterColumns));
}

void HashProbe::prepareFilterRowsForNullAwareJoin(
RowVectorPtr& filterInput,
vector_size_t numRows,
bool filterPropagateNulls) {
VELOX_CHECK_LE(numRows, kBatchSize);
Expand All @@ -1087,7 +1088,7 @@ void HashProbe::prepareFilterRowsForNullAwareJoin(
auto* rawNullRows = nullFilterInputRows_.asMutableRange().bits();
for (auto& projection : filterInputProjections_) {
filterInputColumnDecodedVector_.decode(
*filterInput_->childAt(projection.outputChannel), filterInputRows_);
*filterInput->childAt(projection.outputChannel), filterInputRows_);
if (filterInputColumnDecodedVector_.mayHaveNulls()) {
SelectivityVector nullsInActiveRows(numRows);
memcpy(
Expand Down Expand Up @@ -1286,13 +1287,14 @@ int32_t HashProbe::evalFilter(int32_t numRows) {
filterInputRows_.updateBounds();
}

fillFilterInput(numRows);
RowVectorPtr filterInput = createFilterInput(numRows);

if (nullAware_) {
prepareFilterRowsForNullAwareJoin(numRows, filterPropagateNulls);
prepareFilterRowsForNullAwareJoin(
filterInput, numRows, filterPropagateNulls);
}

EvalCtx evalCtx(operatorCtx_->execCtx(), filter_.get(), filterInput_.get());
EvalCtx evalCtx(operatorCtx_->execCtx(), filter_.get(), filterInput.get());
filter_->eval(0, 1, true, filterInputRows_, evalCtx, filterResult_);

decodedFilterResult_.decode(*filterResult_[0], filterInputRows_);
Expand Down
14 changes: 6 additions & 8 deletions velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,17 @@ class HashProbe : public Operator {
decodedFilterResult_.valueAt<bool>(row);
}

// Populate filter input columns.
void fillFilterInput(vector_size_t size);
// Create a temporary input vector to be passed to the filter. This ensures it
// gets destroyed in case its wrapping an unloaded vector which eventually
// needs to be wrapped in fillOutput().
RowVectorPtr createFilterInput(vector_size_t size);

// Prepare filter row selectivity for null-aware join. 'numRows'
// specifies the number of rows in 'filterInputRows_' to process. If
// 'filterPropagateNulls' is true, the probe input row which has null in any
// probe filter column can't pass the filter.
void prepareFilterRowsForNullAwareJoin(
RowVectorPtr& filterInput,
vector_size_t numRows,
bool filterPropagateNulls);

Expand Down Expand Up @@ -372,7 +375,7 @@ class HashProbe : public Operator {
// side. Used by right semi project join.
bool probeSideHasNullKeys_{false};

// Rows in 'filterInput_' to apply 'filter_' to.
// Rows in the filter columns to apply 'filter_' to.
SelectivityVector filterInputRows_;

// Join filter.
Expand All @@ -390,11 +393,6 @@ class HashProbe : public Operator {
// Maps from column index in hash table to channel in 'filterInputType_'.
std::vector<IdentityProjection> filterTableProjections_;

// Temporary projection from probe and build for evaluating
// 'filter_'. This can always be reused since this does not escape
// this operator.
RowVectorPtr filterInput_;

// The following six fields are used in null-aware anti join filter
// processing.

Expand Down
65 changes: 65 additions & 0 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4041,6 +4041,71 @@ TEST_F(HashJoinTest, lazyVectors) {
}
}

TEST_F(HashJoinTest, lazyVectorNotLoadedInFilter) {
// Ensure that if lazy vectors are temporarily wrapped during a filter's
// execution and remain unloaded, the temporary wrap is promptly
// discarded. This precaution prevents the generation of the probe's output
// from wrapping an unloaded vector while the temporary wrap is
// still alive.
// This is done by generating a sufficiently small batch to allow the lazy
// vector to remain unloaded, as it doesn't need to be split between batches.
// Then we use a filter that skips the execution of the expression containing
// the lazy vector, thereby avoiding its loading.
const vector_size_t vectorSize = 1'000;
auto probeVectors = makeBatches(1, [&](int32_t /*unused*/) {
return makeRowVector(
{makeFlatVector<int32_t>(vectorSize, folly::identity),
makeFlatVector<int64_t>(vectorSize, [](auto row) { return row % 23; }),
makeFlatVector<int32_t>(
vectorSize, [](auto row) { return row % 31; })});
});

std::vector<RowVectorPtr> buildVectors =
makeBatches(1, [&](int32_t /*unused*/) {
return makeRowVector({makeFlatVector<int32_t>(
vectorSize, [](auto row) { return row * 3; })});
});

std::shared_ptr<TempFilePath> probeFile = TempFilePath::create();
writeToFile(probeFile->getPath(), probeVectors);

std::shared_ptr<TempFilePath> buildFile = TempFilePath::create();
writeToFile(buildFile->getPath(), buildVectors);

createDuckDbTable("t", probeVectors);
createDuckDbTable("u", buildVectors);

// Lazy vector is part of the filter but never gets loaded.
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
core::PlanNodeId probeScanId;
core::PlanNodeId buildScanId;
auto op = PlanBuilder(planNodeIdGenerator)
.tableScan(asRowType(probeVectors[0]->type()))
.capturePlanNodeId(probeScanId)
.hashJoin(
{"c0"},
{"c0"},
PlanBuilder(planNodeIdGenerator)
.tableScan(asRowType(buildVectors[0]->type()))
.capturePlanNodeId(buildScanId)
.planNode(),
"c1 >= 0 OR c2 > 0",
{"c1", "c2"})
.planNode();
SplitInput splitInput = {
{probeScanId,
{exec::Split(makeHiveConnectorSplit(probeFile->getPath()))}},
{buildScanId,
{exec::Split(makeHiveConnectorSplit(buildFile->getPath()))}},
};
HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
.planNode(std::move(op))
.inputSplits(splitInput)
.checkSpillStats(false)
.referenceQuery("SELECT t.c1, t.c2 FROM t, u WHERE t.c0 = u.c0")
.run();
}

TEST_F(HashJoinTest, dynamicFilters) {
const int32_t numSplits = 10;
const int32_t numRowsProbe = 333;
Expand Down
6 changes: 5 additions & 1 deletion velox/vector/ConstantVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ class ConstantVector final : public SimpleVector<T> {
setInternalState();
}

virtual ~ConstantVector() override = default;
virtual ~ConstantVector() override {
if (valueVector_) {
valueVector_->clearContainingLazyAndWrapped();
}
}

bool isNullAt(vector_size_t /*idx*/) const override {
VELOX_DCHECK(initialized_);
Expand Down
5 changes: 4 additions & 1 deletion velox/vector/DictionaryVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ class DictionaryVector : public SimpleVector<T> {
std::optional<ByteCount> representedBytes = std::nullopt,
std::optional<ByteCount> storageByteCount = std::nullopt);

virtual ~DictionaryVector() override = default;
virtual ~DictionaryVector() override {
dictionaryValues_->clearContainingLazyAndWrapped();
}

bool mayHaveNulls() const override {
VELOX_DCHECK(initialized_);
Expand Down Expand Up @@ -196,6 +198,7 @@ class DictionaryVector : public SimpleVector<T> {
}

void setDictionaryValues(VectorPtr dictionaryValues) {
dictionaryValues_->clearContainingLazyAndWrapped();
dictionaryValues_ = dictionaryValues;
initialized_ = false;
setInternalState();
Expand Down
7 changes: 7 additions & 0 deletions velox/vector/tests/VectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2248,6 +2248,13 @@ TEST_F(VectorTest, nestedLazy) {
"An unloaded lazy vector cannot be wrapped by two different top level"
" vectors.");

// Verify that if the original dictionary layer is destroyed without loading
// the underlying vector then the lazy vector can be wrapped in a new encoding
// layer.
dict.reset();
dict = BaseVector::wrapInDictionary(
nullptr, makeIndices(size, indexAt), size, lazy);

// Verify that the unloaded dictionary can be nested as long as it has one top
// level vector.
EXPECT_NO_THROW(BaseVector::wrapInDictionary(
Expand Down

0 comments on commit 518a808

Please sign in to comment.