From 576427ab648712e4a0e5de6d40511444fd08afc1 Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Wed, 5 Jun 2024 14:41:02 -0700 Subject: [PATCH] Fix lazy evaluation causing incorrect results in remaining filter (#10072) Summary: Similar to https://github.com/facebookincubator/velox/pull/10045, we discovered the same issue also happens in remaining filter. Differential Revision: D58208097 --- velox/connectors/hive/HiveDataSource.cpp | 19 ++++++++---- velox/connectors/hive/HiveDataSource.h | 4 +++ velox/exec/tests/TableScanTest.cpp | 38 ++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 5 deletions(-) diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 47f2611325422..8c7a7ed027f56 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -118,10 +118,14 @@ HiveDataSource::HiveDataSource( if (remainingFilter) { remainingFilterExprSet_ = expressionEvaluator_->compile(remainingFilter); auto& remainingFilterExpr = remainingFilterExprSet_->expr(0); - folly::F14FastSet columnNames( - readerRowNames.begin(), readerRowNames.end()); + folly::F14FastMap columnNames; + for (int i = 0; i < readerRowNames.size(); ++i) { + columnNames[readerRowNames[i]] = i; + } for (auto& input : remainingFilterExpr->distinctFields()) { - if (columnNames.count(input->field()) > 0) { + auto it = columnNames.find(input->field()); + if (it != columnNames.end()) { + multiplyReferencedFields_.push_back(it->second); continue; } // Remaining filter may reference columns that are not used otherwise, @@ -352,9 +356,14 @@ int64_t HiveDataSource::estimatedRowSize() { } vector_size_t HiveDataSource::evaluateRemainingFilter(RowVectorPtr& rowVector) { - auto filterStartMicros = getCurrentTimeMicro(); filterRows_.resize(output_->size()); - + DecodedVector decoded; + SelectivityVector baseRows; + for (auto fieldIndex : multiplyReferencedFields_) { + LazyVector::ensureLoadedRows( + rowVector->childAt(fieldIndex), filterRows_, decoded, baseRows); + } + auto filterStartMicros = getCurrentTimeMicro(); expressionEvaluator_->evaluate( remainingFilterExprSet_.get(), filterRows_, *rowVector, filterResult_); auto res = exec::processFilterResults( diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index 335bdb43b157b..b70e481bfde0b 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -159,6 +159,10 @@ class HiveDataSource : public DataSource { exec::FilterEvalCtx filterEvalCtx_; std::shared_ptr randomSkip_; + // Field indices referenced in both remaining filter and output type. These + // columns need to be materialized eagerly to avoid missing values in output. + std::vector multiplyReferencedFields_; + // Remembers the WaveDataSource. Successive calls to toWaveDataSource() will // return the same. std::shared_ptr waveDataSource_; diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index dac13e5bcea16..5c0741beb656a 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -3003,6 +3003,44 @@ TEST_F(TableScanTest, remainingFilter) { "SELECT * FROM tmp WHERE not (c0 > 0 or c1 > c0)"); } +TEST_F(TableScanTest, remainingFilterLazyWithMultiReferences) { + constexpr int kSize = 10; + auto vector = makeRowVector({ + makeFlatVector(kSize, folly::identity), + makeFlatVector(kSize, folly::identity), + makeFlatVector(kSize, folly::identity), + }); + auto schema = asRowType(vector->type()); + auto file = TempFilePath::create(); + writeToFile(file->getPath(), {vector}); + CursorParameters params; + params.copyResult = false; + params.singleThreaded = true; + params.planNode = + PlanBuilder() + .tableScan(schema, {}, "NOT (c0 % 2 == 0 AND c2 % 3 == 0)") + .planNode(); + auto cursor = TaskCursor::create(params); + cursor->task()->addSplit( + "0", exec::Split(makeHiveConnectorSplit(file->getPath()))); + cursor->task()->noMoreSplits("0"); + int i = 0; + while (cursor->moveNext()) { + auto* result = cursor->current()->asUnchecked(); + ASSERT_FALSE(isLazyNotLoaded(*result->childAt(0))); + ASSERT_TRUE(isLazyNotLoaded(*result->childAt(1))); + ASSERT_FALSE(isLazyNotLoaded(*result->childAt(2))); + for (int j = 0; j < result->size(); ++i) { + ASSERT_LT(i, vector->size()); + if (i % 6 != 0) { + ASSERT_TRUE(result->loadedVector()->equalValueAt(vector.get(), j++, i)); + } + } + } + ASSERT_EQ(i, vector->size()); + ASSERT_TRUE(waitForTaskCompletion(cursor->task().get())); +} + TEST_F(TableScanTest, remainingFilterSkippedStrides) { auto rowType = ROW({{"c0", BIGINT()}, {"c1", BIGINT()}}); std::vector vectors(3);