From ad63e903ea4c8f58145eddc1be8f2cb0af2f913f Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Wed, 24 Apr 2024 10:05:19 -0700 Subject: [PATCH] Filter out null map key entries by default Summary: When reading from file, Presto Java filter out null maps by default, so we need to do the same in Velox. Differential Revision: D56522977 --- velox/connectors/hive/HiveConnectorUtil.cpp | 12 ++++++++- velox/connectors/hive/HiveConnectorUtil.h | 3 ++- velox/dwio/common/ScanSpec.h | 28 +++++++++++++++++++++ velox/exec/tests/TableScanTest.cpp | 17 +++++++++++++ 4 files changed, 58 insertions(+), 2 deletions(-) diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 8189a92b812f2..42007e1fc90fa 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -329,7 +329,8 @@ std::shared_ptr makeScanSpec( partitionKeys, const std::unordered_map>& infoColumns, - memory::MemoryPool* pool) { + memory::MemoryPool* pool, + bool mapNullKeysEnabled) { auto spec = std::make_shared("root"); folly::F14FastMap> filterSubfields; @@ -397,6 +398,15 @@ std::shared_ptr makeScanSpec( fieldSpec->addFilter(*pair.second); } + if (!mapNullKeysEnabled) { + spec->visit(*rowType, [](const Type& type, common::ScanSpec& node) { + if (type.isMap()) { + node.childByName(common::ScanSpec::kMapKeysFieldName) + ->addFilter(common::IsNotNull()); + } + }); + } + return spec; } diff --git a/velox/connectors/hive/HiveConnectorUtil.h b/velox/connectors/hive/HiveConnectorUtil.h index 2742866d516bb..1793acbaa98ae 100644 --- a/velox/connectors/hive/HiveConnectorUtil.h +++ b/velox/connectors/hive/HiveConnectorUtil.h @@ -57,7 +57,8 @@ std::shared_ptr makeScanSpec( partitionKeys, const std::unordered_map>& infoColumns, - memory::MemoryPool* pool); + memory::MemoryPool* pool, + bool mapNullKeysEnabled = false); void configureReaderOptions( dwio::common::ReaderOptions& readerOptions, diff --git a/velox/dwio/common/ScanSpec.h b/velox/dwio/common/ScanSpec.h index e980baaf90209..600e5eaa252c5 100644 --- a/velox/dwio/common/ScanSpec.h +++ b/velox/dwio/common/ScanSpec.h @@ -322,6 +322,10 @@ class ScanSpec { flatMapFeatureSelection_ = std::move(features); } + /// Invoke the function provided on each node of the ScanSpec tree. + template + void visit(const Type& type, F&& f); + private: void reorder(); @@ -403,6 +407,30 @@ class ScanSpec { std::vector flatMapFeatureSelection_; }; +template +void ScanSpec::visit(const Type& type, F&& f) { + f(type, *this); + switch (type.kind()) { + case TypeKind::ROW: + for (auto& child : children_) { + child->visit(*type.childAt(child->channel()), std::forward(f)); + } + break; + case TypeKind::MAP: + childByName(kMapKeysFieldName) + ->visit(*type.childAt(0), std::forward(f)); + childByName(kMapValuesFieldName) + ->visit(*type.childAt(1), std::forward(f)); + break; + case TypeKind::ARRAY: + childByName(kArrayElementsFieldName) + ->visit(*type.childAt(0), std::forward(f)); + break; + default: + break; + } +} + // Returns false if no value from a range defined by stats can pass the // filter. True, otherwise. bool testFilter( diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index b79c75751c06d..624d18619ec26 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -975,6 +975,23 @@ TEST_F(TableScanTest, subfieldPruningArrayType) { } } +TEST_F(TableScanTest, skipNullMapKeys) { + auto vector = makeRowVector({makeMapVector( + {0, 2}, + makeNullableFlatVector({std::nullopt, 2}), + makeFlatVector({1, 2}))}); + auto rowType = asRowType(vector->type()); + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), {vector}); + auto plan = PlanBuilder().tableScan(rowType).planNode(); + auto split = makeHiveConnectorSplit(filePath->getPath()); + auto expected = makeRowVector({makeMapVector( + {0, 1}, + makeNullableFlatVector(std::vector>(1, 2)), + makeFlatVector(std::vector(1, 2)))}); + AssertQueryBuilder(plan).split(split).assertResults(expected); +} + // Test reading files written before schema change, e.g. missing newly added // columns. TEST_F(TableScanTest, missingColumns) {