Skip to content

Commit

Permalink
Filter out null map key entries by default
Browse files Browse the repository at this point in the history
Summary:
When reading from file, Presto Java filter out null maps by default, so
we need to do the same in Velox.

Differential Revision: D56522977
  • Loading branch information
Yuhta authored and facebook-github-bot committed Apr 24, 2024
1 parent 0643fa5 commit 2e54a2f
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 4 deletions.
28 changes: 25 additions & 3 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,17 @@ void checkColumnNameLowerCase(const core::TypedExprPtr& typeExpr) {
}
}

namespace {
void filterOutNullMapKeys(const Type& rootType, common::ScanSpec& rootSpec) {
rootSpec.visit(rootType, [](const Type& type, common::ScanSpec& spec) {
if (type.isMap()) {
spec.childByName(common::ScanSpec::kMapKeysFieldName)
->addFilter(common::IsNotNull());
}
});
}
} // namespace

std::shared_ptr<common::ScanSpec> makeScanSpec(
const RowTypePtr& rowType,
const folly::F14FastMap<std::string, std::vector<const common::Subfield*>>&
Expand All @@ -329,7 +340,8 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
partitionKeys,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns,
memory::MemoryPool* pool) {
memory::MemoryPool* pool,
bool mapNullKeysEnabled) {
auto spec = std::make_shared<common::ScanSpec>("root");
folly::F14FastMap<std::string, std::vector<const common::Subfield*>>
filterSubfields;
Expand All @@ -348,7 +360,10 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
auto& type = rowType->childAt(i);
auto it = outputSubfields.find(name);
if (it == outputSubfields.end()) {
spec->addFieldRecursively(name, *type, i);
auto* fieldSpec = spec->addFieldRecursively(name, *type, i);
if (!mapNullKeysEnabled) {
filterOutNullMapKeys(*type, *fieldSpec);
}
filterSubfields.erase(name);
continue;
}
Expand All @@ -362,7 +377,11 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
}
filterSubfields.erase(it);
}
addSubfields(*type, subfieldSpecs, 1, pool, *spec->addField(name, i));
auto* fieldSpec = spec->addField(name, i);
addSubfields(*type, subfieldSpecs, 1, pool, *fieldSpec);
if (!mapNullKeysEnabled) {
filterOutNullMapKeys(*type, *fieldSpec);
}
subfieldSpecs.clear();
}

Expand All @@ -376,6 +395,9 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
auto& type = dataColumns->findChild(fieldName);
auto* fieldSpec = spec->getOrCreateChild(common::Subfield(fieldName));
addSubfields(*type, subfieldSpecs, 1, pool, *fieldSpec);
if (!mapNullKeysEnabled) {
filterOutNullMapKeys(*type, *fieldSpec);
}
subfieldSpecs.clear();
}
}
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
partitionKeys,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns,
memory::MemoryPool* pool);
memory::MemoryPool* pool,
bool mapNullKeysEnabled = false);

void configureReaderOptions(
dwio::common::ReaderOptions& readerOptions,
Expand Down
29 changes: 29 additions & 0 deletions velox/dwio/common/ScanSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ class ScanSpec {
flatMapFeatureSelection_ = std::move(features);
}

/// Invoke the function provided on each node of the ScanSpec tree.
template <typename F>
void visit(const Type& type, F&& f);

private:
void reorder();

Expand Down Expand Up @@ -403,6 +407,31 @@ class ScanSpec {
std::vector<std::string> flatMapFeatureSelection_;
};

template <typename F>
void ScanSpec::visit(const Type& type, F&& f) {
f(type, *this);
switch (type.kind()) {
case TypeKind::ROW:
for (auto& child : children_) {
VELOX_CHECK_NE(child->channel(), kNoChannel);
child->visit(*type.childAt(child->channel()), std::forward<F>(f));
}
break;
case TypeKind::MAP:
childByName(kMapKeysFieldName)
->visit(*type.childAt(0), std::forward<F>(f));
childByName(kMapValuesFieldName)
->visit(*type.childAt(1), std::forward<F>(f));
break;
case TypeKind::ARRAY:
childByName(kArrayElementsFieldName)
->visit(*type.childAt(0), std::forward<F>(f));
break;
default:
break;
}
}

// Returns false if no value from a range defined by stats can pass the
// filter. True, otherwise.
bool testFilter(
Expand Down
17 changes: 17 additions & 0 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,23 @@ TEST_F(TableScanTest, subfieldPruningArrayType) {
}
}

TEST_F(TableScanTest, skipNullMapKeys) {
auto vector = makeRowVector({makeMapVector(
{0, 2},
makeNullableFlatVector<int64_t>({std::nullopt, 2}),
makeFlatVector<int64_t>({1, 2}))});
auto rowType = asRowType(vector->type());
auto filePath = TempFilePath::create();
writeToFile(filePath->getPath(), {vector});
auto plan = PlanBuilder().tableScan(rowType).planNode();
auto split = makeHiveConnectorSplit(filePath->getPath());
auto expected = makeRowVector({makeMapVector(
{0, 1},
makeNullableFlatVector(std::vector<std::optional<int64_t>>(1, 2)),
makeFlatVector(std::vector<int64_t>(1, 2)))});
AssertQueryBuilder(plan).split(split).assertResults(expected);
}

// Test reading files written before schema change, e.g. missing newly added
// columns.
TEST_F(TableScanTest, missingColumns) {
Expand Down

0 comments on commit 2e54a2f

Please sign in to comment.