diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 47f2611325422..db6a8fc5d0caf 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -118,10 +118,14 @@ HiveDataSource::HiveDataSource( if (remainingFilter) { remainingFilterExprSet_ = expressionEvaluator_->compile(remainingFilter); auto& remainingFilterExpr = remainingFilterExprSet_->expr(0); - folly::F14FastSet columnNames( - readerRowNames.begin(), readerRowNames.end()); + folly::F14FastMap columnNames; + for (int i = 0; i < readerRowNames.size(); ++i) { + columnNames[readerRowNames[i]] = i; + } for (auto& input : remainingFilterExpr->distinctFields()) { - if (columnNames.count(input->field()) > 0) { + auto it = columnNames.find(input->field()); + if (it != columnNames.end()) { + multiReferencedFields_.push_back(it->second); continue; } // Remaining filter may reference columns that are not used otherwise, @@ -352,9 +356,15 @@ int64_t HiveDataSource::estimatedRowSize() { } vector_size_t HiveDataSource::evaluateRemainingFilter(RowVectorPtr& rowVector) { - auto filterStartMicros = getCurrentTimeMicro(); filterRows_.resize(output_->size()); - + for (auto fieldIndex : multiReferencedFields_) { + LazyVector::ensureLoadedRows( + rowVector->childAt(fieldIndex), + filterRows_, + filterLazyDecoded_, + filterLazyBaseRows_); + } + auto filterStartMicros = getCurrentTimeMicro(); expressionEvaluator_->evaluate( remainingFilterExprSet_.get(), filterRows_, *rowVector, filterResult_); auto res = exec::processFilterResults( diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index 335bdb43b157b..9b0d14ebe3d95 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -153,11 +153,18 @@ class HiveDataSource : public DataSource { std::atomic totalRemainingFilterTime_{0}; uint64_t completedRows_ = 0; + // Field indices referenced in both remaining filter and output type. These + // columns need to be materialized eagerly to avoid missing values in output. + std::vector multiReferencedFields_; + + std::shared_ptr randomSkip_; + // Reusable memory for remaining filter evaluation. VectorPtr filterResult_; SelectivityVector filterRows_; + DecodedVector filterLazyDecoded_; + SelectivityVector filterLazyBaseRows_; exec::FilterEvalCtx filterEvalCtx_; - std::shared_ptr randomSkip_; // Remembers the WaveDataSource. Successive calls to toWaveDataSource() will // return the same. diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index dac13e5bcea16..5c0741beb656a 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -3003,6 +3003,44 @@ TEST_F(TableScanTest, remainingFilter) { "SELECT * FROM tmp WHERE not (c0 > 0 or c1 > c0)"); } +TEST_F(TableScanTest, remainingFilterLazyWithMultiReferences) { + constexpr int kSize = 10; + auto vector = makeRowVector({ + makeFlatVector(kSize, folly::identity), + makeFlatVector(kSize, folly::identity), + makeFlatVector(kSize, folly::identity), + }); + auto schema = asRowType(vector->type()); + auto file = TempFilePath::create(); + writeToFile(file->getPath(), {vector}); + CursorParameters params; + params.copyResult = false; + params.singleThreaded = true; + params.planNode = + PlanBuilder() + .tableScan(schema, {}, "NOT (c0 % 2 == 0 AND c2 % 3 == 0)") + .planNode(); + auto cursor = TaskCursor::create(params); + cursor->task()->addSplit( + "0", exec::Split(makeHiveConnectorSplit(file->getPath()))); + cursor->task()->noMoreSplits("0"); + int i = 0; + while (cursor->moveNext()) { + auto* result = cursor->current()->asUnchecked(); + ASSERT_FALSE(isLazyNotLoaded(*result->childAt(0))); + ASSERT_TRUE(isLazyNotLoaded(*result->childAt(1))); + ASSERT_FALSE(isLazyNotLoaded(*result->childAt(2))); + for (int j = 0; j < result->size(); ++i) { + ASSERT_LT(i, vector->size()); + if (i % 6 != 0) { + ASSERT_TRUE(result->loadedVector()->equalValueAt(vector.get(), j++, i)); + } + } + } + ASSERT_EQ(i, vector->size()); + ASSERT_TRUE(waitForTaskCompletion(cursor->task().get())); +} + TEST_F(TableScanTest, remainingFilterSkippedStrides) { auto rowType = ROW({{"c0", BIGINT()}, {"c1", BIGINT()}}); std::vector vectors(3);