diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 8189a92b812f2..ecdf6d2c48c29 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -319,6 +319,19 @@ void checkColumnNameLowerCase(const core::TypedExprPtr& typeExpr) { } } +namespace { + +void filterOutNullMapKeys(const Type& rootType, common::ScanSpec& rootSpec) { + rootSpec.visit(rootType, [](const Type& type, common::ScanSpec& spec) { + if (type.isMap()) { + spec.childByName(common::ScanSpec::kMapKeysFieldName) + ->addFilter(common::IsNotNull()); + } + }); +} + +} // namespace + std::shared_ptr makeScanSpec( const RowTypePtr& rowType, const folly::F14FastMap>& @@ -348,7 +361,8 @@ std::shared_ptr makeScanSpec( auto& type = rowType->childAt(i); auto it = outputSubfields.find(name); if (it == outputSubfields.end()) { - spec->addFieldRecursively(name, *type, i); + auto* fieldSpec = spec->addFieldRecursively(name, *type, i); + filterOutNullMapKeys(*type, *fieldSpec); filterSubfields.erase(name); continue; } @@ -362,7 +376,9 @@ std::shared_ptr makeScanSpec( } filterSubfields.erase(it); } - addSubfields(*type, subfieldSpecs, 1, pool, *spec->addField(name, i)); + auto* fieldSpec = spec->addField(name, i); + addSubfields(*type, subfieldSpecs, 1, pool, *fieldSpec); + filterOutNullMapKeys(*type, *fieldSpec); subfieldSpecs.clear(); } @@ -376,6 +392,7 @@ std::shared_ptr makeScanSpec( auto& type = dataColumns->findChild(fieldName); auto* fieldSpec = spec->getOrCreateChild(common::Subfield(fieldName)); addSubfields(*type, subfieldSpecs, 1, pool, *fieldSpec); + filterOutNullMapKeys(*type, *fieldSpec); subfieldSpecs.clear(); } } diff --git a/velox/dwio/common/ScanSpec.h b/velox/dwio/common/ScanSpec.h index e980baaf90209..9d998cce896af 100644 --- a/velox/dwio/common/ScanSpec.h +++ b/velox/dwio/common/ScanSpec.h @@ -322,6 +322,10 @@ class ScanSpec { flatMapFeatureSelection_ = std::move(features); } + /// Invoke the function provided on each node of the ScanSpec tree. + template + void visit(const Type& type, F&& f); + private: void reorder(); @@ -403,6 +407,31 @@ class ScanSpec { std::vector flatMapFeatureSelection_; }; +template +void ScanSpec::visit(const Type& type, F&& f) { + f(type, *this); + switch (type.kind()) { + case TypeKind::ROW: + for (auto& child : children_) { + VELOX_CHECK_NE(child->channel(), kNoChannel); + child->visit(*type.childAt(child->channel()), std::forward(f)); + } + break; + case TypeKind::MAP: + childByName(kMapKeysFieldName) + ->visit(*type.childAt(0), std::forward(f)); + childByName(kMapValuesFieldName) + ->visit(*type.childAt(1), std::forward(f)); + break; + case TypeKind::ARRAY: + childByName(kArrayElementsFieldName) + ->visit(*type.childAt(0), std::forward(f)); + break; + default: + break; + } +} + // Returns false if no value from a range defined by stats can pass the // filter. True, otherwise. bool testFilter( diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index b79c75751c06d..624d18619ec26 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -975,6 +975,23 @@ TEST_F(TableScanTest, subfieldPruningArrayType) { } } +TEST_F(TableScanTest, skipNullMapKeys) { + auto vector = makeRowVector({makeMapVector( + {0, 2}, + makeNullableFlatVector({std::nullopt, 2}), + makeFlatVector({1, 2}))}); + auto rowType = asRowType(vector->type()); + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), {vector}); + auto plan = PlanBuilder().tableScan(rowType).planNode(); + auto split = makeHiveConnectorSplit(filePath->getPath()); + auto expected = makeRowVector({makeMapVector( + {0, 1}, + makeNullableFlatVector(std::vector>(1, 2)), + makeFlatVector(std::vector(1, 2)))}); + AssertQueryBuilder(plan).split(split).assertResults(expected); +} + // Test reading files written before schema change, e.g. missing newly added // columns. TEST_F(TableScanTest, missingColumns) {