Skip to content

Commit

Permalink
Filter out null map key entries by default
Browse files Browse the repository at this point in the history
Summary:
When reading from file, Presto Java filter out null maps by default, so
we need to do the same in Velox.

Differential Revision: D56522977
  • Loading branch information
Yuhta authored and facebook-github-bot committed Apr 25, 2024
1 parent a689fd4 commit c24bc58
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 8 deletions.
21 changes: 19 additions & 2 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,19 @@ void checkColumnNameLowerCase(const core::TypedExprPtr& typeExpr) {
}
}

namespace {

void filterOutNullMapKeys(const Type& rootType, common::ScanSpec& rootSpec) {
rootSpec.visit(rootType, [](const Type& type, common::ScanSpec& spec) {
if (type.isMap()) {
spec.childByName(common::ScanSpec::kMapKeysFieldName)
->addFilter(common::IsNotNull());
}
});
}

} // namespace

std::shared_ptr<common::ScanSpec> makeScanSpec(
const RowTypePtr& rowType,
const folly::F14FastMap<std::string, std::vector<const common::Subfield*>>&
Expand Down Expand Up @@ -348,7 +361,8 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
auto& type = rowType->childAt(i);
auto it = outputSubfields.find(name);
if (it == outputSubfields.end()) {
spec->addFieldRecursively(name, *type, i);
auto* fieldSpec = spec->addFieldRecursively(name, *type, i);
filterOutNullMapKeys(*type, *fieldSpec);
filterSubfields.erase(name);
continue;
}
Expand All @@ -362,7 +376,9 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
}
filterSubfields.erase(it);
}
addSubfields(*type, subfieldSpecs, 1, pool, *spec->addField(name, i));
auto* fieldSpec = spec->addField(name, i);
addSubfields(*type, subfieldSpecs, 1, pool, *fieldSpec);
filterOutNullMapKeys(*type, *fieldSpec);
subfieldSpecs.clear();
}

Expand All @@ -376,6 +392,7 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
auto& type = dataColumns->findChild(fieldName);
auto* fieldSpec = spec->getOrCreateChild(common::Subfield(fieldName));
addSubfields(*type, subfieldSpecs, 1, pool, *fieldSpec);
filterOutNullMapKeys(*type, *fieldSpec);
subfieldSpecs.clear();
}
}
Expand Down
9 changes: 7 additions & 2 deletions velox/connectors/hive/tests/HiveConnectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ groupSubfields(const std::vector<Subfield>& subfields) {
return grouped;
}

bool mapKeyIsNotNull(const ScanSpec& mapSpec) {
return dynamic_cast<IsNotNull*>(
mapSpec.childByName(ScanSpec::kMapKeysFieldName)->filter());
}

TEST_F(HiveConnectorTest, hiveConfig) {
ASSERT_EQ(
HiveConfig::insertExistingPartitionsBehaviorString(
Expand Down Expand Up @@ -210,7 +215,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) {
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_TRUE(c0->flatMapFeatureSelection().empty());
ASSERT_FALSE(c0->childByName(ScanSpec::kMapKeysFieldName)->filter());
ASSERT_TRUE(mapKeyIsNotNull(*c0));
auto* values = c0->childByName(ScanSpec::kMapValuesFieldName);
ASSERT_EQ(
values->maxArrayElementsCount(),
Expand All @@ -229,7 +234,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) {
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_FALSE(c0->childByName(ScanSpec::kMapKeysFieldName)->filter());
ASSERT_TRUE(mapKeyIsNotNull(*c0));
auto* values = c0->childByName(ScanSpec::kMapValuesFieldName);
ASSERT_EQ(
values->maxArrayElementsCount(),
Expand Down
29 changes: 29 additions & 0 deletions velox/dwio/common/ScanSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ class ScanSpec {
flatMapFeatureSelection_ = std::move(features);
}

/// Invoke the function provided on each node of the ScanSpec tree.
template <typename F>
void visit(const Type& type, F&& f);

private:
void reorder();

Expand Down Expand Up @@ -403,6 +407,31 @@ class ScanSpec {
std::vector<std::string> flatMapFeatureSelection_;
};

template <typename F>
void ScanSpec::visit(const Type& type, F&& f) {
f(type, *this);
switch (type.kind()) {
case TypeKind::ROW:
for (auto& child : children_) {
VELOX_CHECK_NE(child->channel(), kNoChannel);
child->visit(*type.childAt(child->channel()), std::forward<F>(f));
}
break;
case TypeKind::MAP:
childByName(kMapKeysFieldName)
->visit(*type.childAt(0), std::forward<F>(f));
childByName(kMapValuesFieldName)
->visit(*type.childAt(1), std::forward<F>(f));
break;
case TypeKind::ARRAY:
childByName(kArrayElementsFieldName)
->visit(*type.childAt(0), std::forward<F>(f));
break;
default:
break;
}
}

// Returns false if no value from a range defined by stats can pass the
// filter. True, otherwise.
bool testFilter(
Expand Down
17 changes: 17 additions & 0 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,23 @@ TEST_F(TableScanTest, subfieldPruningArrayType) {
}
}

TEST_F(TableScanTest, skipNullMapKeys) {
auto vector = makeRowVector({makeMapVector(
{0, 2},
makeNullableFlatVector<int64_t>({std::nullopt, 2}),
makeFlatVector<int64_t>({1, 2}))});
auto rowType = asRowType(vector->type());
auto filePath = TempFilePath::create();
writeToFile(filePath->getPath(), {vector});
auto plan = PlanBuilder().tableScan(rowType).planNode();
auto split = makeHiveConnectorSplit(filePath->getPath());
auto expected = makeRowVector({makeMapVector(
{0, 1},
makeNullableFlatVector(std::vector<std::optional<int64_t>>(1, 2)),
makeFlatVector(std::vector<int64_t>(1, 2)))});
AssertQueryBuilder(plan).split(split).assertResults(expected);
}

// Test reading files written before schema change, e.g. missing newly added
// columns.
TEST_F(TableScanTest, missingColumns) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ TEST_F(MaxSizeForStatsTest, complexRecursiveGlobalAggregate) {
createMapOfArraysVector<int8_t, int64_t>({
{{1, std::nullopt}},
{{2, {{4, 5, std::nullopt}}}},
{{std::nullopt, {{7, 8, 9}}}},
{{3, {{7, 8, 9}}}},
}),
}),
})};
Expand Down Expand Up @@ -261,7 +261,7 @@ TEST_F(MaxSizeForStatsTest, dictionaryEncodingTest) {
createMapOfArraysVector<int8_t, int64_t>({
{{1, std::nullopt}},
{{2, {{4, 5, std::nullopt}}}},
{{std::nullopt, {{7, 8, 9}}}},
{{3, {{7, 8, 9}}}},
}),
});
vector_size_t size = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ TEST_F(SumDataSizeForStatsTest, complexRecursiveGlobalAggregate) {
createMapOfArraysVector<int8_t, int64_t>({
{{1, std::nullopt}},
{{2, {{4, 5, std::nullopt}}}},
{{std::nullopt, {{7, 8, 9}}}},
{{3, {{7, 8, 9}}}},
}),
}),
})};
Expand Down Expand Up @@ -256,7 +256,7 @@ TEST_F(SumDataSizeForStatsTest, dictionaryEncodingTest) {
createMapOfArraysVector<int8_t, int64_t>({
{{1, std::nullopt}},
{{2, {{4, 5, std::nullopt}}}},
{{std::nullopt, {{7, 8, 9}}}},
{{3, {{7, 8, 9}}}},
}),
});
vector_size_t size = 3;
Expand Down

0 comments on commit c24bc58

Please sign in to comment.