Skip to content

Commit

Permalink
Fix lazy evaluation causing incorrect results in remaining filter (fa…
Browse files Browse the repository at this point in the history
…cebookincubator#10072)

Summary:

Similar to facebookincubator#10045, we
discovered the same issue also happens in remaining filter.

Differential Revision: D58208097
  • Loading branch information
Yuhta authored and facebook-github-bot committed Jun 5, 2024
1 parent 93846c5 commit 576427a
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 5 deletions.
19 changes: 14 additions & 5 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,14 @@ HiveDataSource::HiveDataSource(
if (remainingFilter) {
remainingFilterExprSet_ = expressionEvaluator_->compile(remainingFilter);
auto& remainingFilterExpr = remainingFilterExprSet_->expr(0);
folly::F14FastSet<std::string> columnNames(
readerRowNames.begin(), readerRowNames.end());
folly::F14FastMap<std::string, column_index_t> columnNames;
for (int i = 0; i < readerRowNames.size(); ++i) {
columnNames[readerRowNames[i]] = i;
}
for (auto& input : remainingFilterExpr->distinctFields()) {
if (columnNames.count(input->field()) > 0) {
auto it = columnNames.find(input->field());
if (it != columnNames.end()) {
multiplyReferencedFields_.push_back(it->second);
continue;
}
// Remaining filter may reference columns that are not used otherwise,
Expand Down Expand Up @@ -352,9 +356,14 @@ int64_t HiveDataSource::estimatedRowSize() {
}

vector_size_t HiveDataSource::evaluateRemainingFilter(RowVectorPtr& rowVector) {
auto filterStartMicros = getCurrentTimeMicro();
filterRows_.resize(output_->size());

DecodedVector decoded;
SelectivityVector baseRows;
for (auto fieldIndex : multiplyReferencedFields_) {
LazyVector::ensureLoadedRows(
rowVector->childAt(fieldIndex), filterRows_, decoded, baseRows);
}
auto filterStartMicros = getCurrentTimeMicro();
expressionEvaluator_->evaluate(
remainingFilterExprSet_.get(), filterRows_, *rowVector, filterResult_);
auto res = exec::processFilterResults(
Expand Down
4 changes: 4 additions & 0 deletions velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ class HiveDataSource : public DataSource {
exec::FilterEvalCtx filterEvalCtx_;
std::shared_ptr<random::RandomSkipTracker> randomSkip_;

// Field indices referenced in both remaining filter and output type. These
// columns need to be materialized eagerly to avoid missing values in output.
std::vector<column_index_t> multiplyReferencedFields_;

// Remembers the WaveDataSource. Successive calls to toWaveDataSource() will
// return the same.
std::shared_ptr<wave::WaveDataSource> waveDataSource_;
Expand Down
38 changes: 38 additions & 0 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3003,6 +3003,44 @@ TEST_F(TableScanTest, remainingFilter) {
"SELECT * FROM tmp WHERE not (c0 > 0 or c1 > c0)");
}

TEST_F(TableScanTest, remainingFilterLazyWithMultiReferences) {
constexpr int kSize = 10;
auto vector = makeRowVector({
makeFlatVector<int64_t>(kSize, folly::identity),
makeFlatVector<int64_t>(kSize, folly::identity),
makeFlatVector<int64_t>(kSize, folly::identity),
});
auto schema = asRowType(vector->type());
auto file = TempFilePath::create();
writeToFile(file->getPath(), {vector});
CursorParameters params;
params.copyResult = false;
params.singleThreaded = true;
params.planNode =
PlanBuilder()
.tableScan(schema, {}, "NOT (c0 % 2 == 0 AND c2 % 3 == 0)")
.planNode();
auto cursor = TaskCursor::create(params);
cursor->task()->addSplit(
"0", exec::Split(makeHiveConnectorSplit(file->getPath())));
cursor->task()->noMoreSplits("0");
int i = 0;
while (cursor->moveNext()) {
auto* result = cursor->current()->asUnchecked<RowVector>();
ASSERT_FALSE(isLazyNotLoaded(*result->childAt(0)));
ASSERT_TRUE(isLazyNotLoaded(*result->childAt(1)));
ASSERT_FALSE(isLazyNotLoaded(*result->childAt(2)));
for (int j = 0; j < result->size(); ++i) {
ASSERT_LT(i, vector->size());
if (i % 6 != 0) {
ASSERT_TRUE(result->loadedVector()->equalValueAt(vector.get(), j++, i));
}
}
}
ASSERT_EQ(i, vector->size());
ASSERT_TRUE(waitForTaskCompletion(cursor->task().get()));
}

TEST_F(TableScanTest, remainingFilterSkippedStrides) {
auto rowType = ROW({{"c0", BIGINT()}, {"c1", BIGINT()}});
std::vector<RowVectorPtr> vectors(3);
Expand Down

0 comments on commit 576427a

Please sign in to comment.