diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index de66b16ebfd8..38d3c19c4982 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -327,14 +327,25 @@ void checkColumnNameLowerCase(const core::TypedExprPtr& typeExpr) { namespace { -void filterOutNullMapKeys(const Type& rootType, common::ScanSpec& rootSpec) { - rootSpec.visit(rootType, [](const Type& type, common::ScanSpec& spec) { +void processFieldSpec( + const RowTypePtr& dataColumns, + const TypePtr& outputType, + common::ScanSpec& fieldSpec) { + fieldSpec.visit(*outputType, [](const Type& type, common::ScanSpec& spec) { if (type.isMap() && !spec.isConstant()) { auto* keys = spec.childByName(common::ScanSpec::kMapKeysFieldName); VELOX_CHECK_NOT_NULL(keys); keys->addFilter(common::IsNotNull()); } }); + if (dataColumns) { + auto i = dataColumns->getChildIdxIfExists(fieldSpec.fieldName()); + if (i.has_value()) { + if (dataColumns->childAt(*i)->isMap() && outputType->isRow()) { + fieldSpec.setFlatMapAsStruct(true); + } + } + } } } // namespace @@ -374,7 +385,7 @@ std::shared_ptr makeScanSpec( auto it = outputSubfields.find(name); if (it == outputSubfields.end()) { auto* fieldSpec = spec->addFieldRecursively(name, *type, i); - filterOutNullMapKeys(*type, *fieldSpec); + processFieldSpec(dataColumns, type, *fieldSpec); filterSubfields.erase(name); continue; } @@ -390,7 +401,7 @@ std::shared_ptr makeScanSpec( } auto* fieldSpec = spec->addField(name, i); addSubfields(*type, subfieldSpecs, 1, pool, *fieldSpec); - filterOutNullMapKeys(*type, *fieldSpec); + processFieldSpec(dataColumns, type, *fieldSpec); subfieldSpecs.clear(); } @@ -404,7 +415,7 @@ std::shared_ptr makeScanSpec( auto& type = dataColumns->findChild(fieldName); auto* fieldSpec = spec->getOrCreateChild(common::Subfield(fieldName)); addSubfields(*type, subfieldSpecs, 1, pool, *fieldSpec); - filterOutNullMapKeys(*type, *fieldSpec); + processFieldSpec(dataColumns, type, *fieldSpec); subfieldSpecs.clear(); } } diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index 43e00cabd287..625658c9fddb 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -380,10 +380,16 @@ std::vector SplitReader::adaptColumns( childSpec->setConstantValue(nullptr); auto outputTypeIdx = readerOutputType_->getChildIdxIfExists(fieldName); if (outputTypeIdx.has_value()) { - // We know the fieldName exists in the file, make the type at that - // position match what we expect in the output. - columnTypes[fileTypeIdx.value()] = - readerOutputType_->childAt(*outputTypeIdx); + auto& outputType = readerOutputType_->childAt(*outputTypeIdx); + auto& columnType = columnTypes[*fileTypeIdx]; + if (childSpec->isFlatMapAsStruct()) { + // Flat map column read as struct. Leave the schema type as MAP. + VELOX_CHECK(outputType->isRow() && columnType->isMap()); + } else { + // We know the fieldName exists in the file, make the type at that + // position match what we expect in the output. + columnType = outputType; + } } } } diff --git a/velox/dwio/common/ScanSpec.h b/velox/dwio/common/ScanSpec.h index ebde3478aec5..0e298c532975 100644 --- a/velox/dwio/common/ScanSpec.h +++ b/velox/dwio/common/ScanSpec.h @@ -328,6 +328,14 @@ class ScanSpec { template void visit(const Type& type, F&& f); + bool isFlatMapAsStruct() const { + return isFlatMapAsStruct_; + } + + void setFlatMapAsStruct(bool value) { + isFlatMapAsStruct_ = value; + } + private: void reorder(); @@ -407,6 +415,10 @@ class ScanSpec { // Used only for bulk reader to project flat map features. std::vector flatMapFeatureSelection_; + + // This node represents a flat map column that need to be read as struct, + // i.e. in table schema it is a MAP, but in result vector it is ROW. + bool isFlatMapAsStruct_ = false; }; template diff --git a/velox/dwio/common/SelectiveRepeatedColumnReader.cpp b/velox/dwio/common/SelectiveRepeatedColumnReader.cpp index 8d269fc6813c..7b164245f0f9 100644 --- a/velox/dwio/common/SelectiveRepeatedColumnReader.cpp +++ b/velox/dwio/common/SelectiveRepeatedColumnReader.cpp @@ -322,6 +322,10 @@ void SelectiveMapColumnReader::read( void SelectiveMapColumnReader::getValues(RowSet rows, VectorPtr* result) { VELOX_DCHECK_NOT_NULL(result); + VELOX_CHECK( + !result->get() || result->get()->type()->isMap(), + "Expect MAP result vector, got {}", + result->get()->type()->toString()); prepareResult(*result, requestedType_->type(), rows.size(), &memoryPool_); auto* resultMap = result->get()->asUnchecked(); makeOffsetsAndSizes(rows, *resultMap); diff --git a/velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.cpp index 2a48f8ca25f8..f565ffa70f54 100644 --- a/velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.cpp @@ -186,8 +186,7 @@ class SelectiveFlatMapAsStructReader : public SelectiveStructColumnReaderBase { const std::shared_ptr& requestedType, const std::shared_ptr& fileType, DwrfParams& params, - common::ScanSpec& scanSpec, - const std::vector& /*keys*/) + common::ScanSpec& scanSpec) : SelectiveStructColumnReaderBase( requestedType, fileType, @@ -246,12 +245,9 @@ std::unique_ptr createReader( const std::shared_ptr& fileType, DwrfParams& params, common::ScanSpec& scanSpec) { - auto& mapColumnIdAsStruct = - params.stripeStreams().getRowReaderOptions().getMapColumnIdAsStruct(); - auto it = mapColumnIdAsStruct.find(requestedType->id()); - if (it != mapColumnIdAsStruct.end()) { + if (scanSpec.isFlatMapAsStruct()) { return std::make_unique>( - requestedType, fileType, params, scanSpec, it->second); + requestedType, fileType, params, scanSpec); } else { return std::make_unique>( requestedType, fileType, params, scanSpec); diff --git a/velox/dwio/dwrf/test/E2EFilterTest.cpp b/velox/dwio/dwrf/test/E2EFilterTest.cpp index 5f2646af2f7d..43b67e91e550 100644 --- a/velox/dwio/dwrf/test/E2EFilterTest.cpp +++ b/velox/dwio/dwrf/test/E2EFilterTest.cpp @@ -93,8 +93,8 @@ class E2EFilterTest : public E2EFilterTestBase { dwio::common::RowReaderOptions& opts, const std::shared_ptr& spec) override { E2EFilterTestBase::setUpRowReaderOptions(opts, spec); - if (!flatmapNodeIdsAsStruct_.empty()) { - opts.setFlatmapNodeIdsAsStruct(flatmapNodeIdsAsStruct_); + for (auto& field : flatMapAsStructFields_) { + spec->childByName(field)->setFlatMapAsStruct(true); } } @@ -127,6 +127,7 @@ class E2EFilterTest : public E2EFilterTestBase { mapFlatColsStructKeys.back().push_back(name); } columnTypes[i] = MAP(VARCHAR(), columnTypes[i]->childAt(0)); + flatMapAsStructFields_.push_back(rowType.nameOf(i)); } writerSchema = ROW( std::vector(rowType.names()), std::move(columnTypes)); @@ -137,10 +138,6 @@ class E2EFilterTest : public E2EFilterTestBase { } auto& child = schemaWithId->childAt(i); mapFlatCols.push_back(child->column()); - if (!rowType.childAt(i)->isRow()) { - continue; - } - flatmapNodeIdsAsStruct_[child->id()] = mapFlatColsStructKeys[i]; } config->set(dwrf::Config::FLATTEN_MAP, true); config->set(dwrf::Config::MAP_FLAT_DISABLE_DICT_ENCODING, false); @@ -158,8 +155,7 @@ class E2EFilterTest : public E2EFilterTestBase { } std::unique_ptr writer_; - std::unordered_map> - flatmapNodeIdsAsStruct_; + std::vector flatMapAsStructFields_; }; TEST_F(E2EFilterTest, integerDirect) { diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 7b82e0475b97..5d5296024843 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -4142,6 +4142,31 @@ TEST_F(TableScanTest, partitionKeyNotMatchPartitionKeysHandle) { assertQuery(op, split, "SELECT c0 FROM tmp"); } +TEST_F(TableScanTest, readFlatMapAsStruct) { + constexpr int kSize = 10; + std::vector keys = {"1", "2", "3"}; + auto vector = makeRowVector({makeRowVector( + keys, + { + makeFlatVector(kSize, folly::identity), + makeFlatVector(kSize, folly::identity, nullEvery(5)), + makeFlatVector(kSize, folly::identity, nullEvery(7)), + })}); + auto config = std::make_shared(); + config->set(dwrf::Config::FLATTEN_MAP, true); + config->set>(dwrf::Config::MAP_FLAT_COLS, {0}); + config->set>>( + dwrf::Config::MAP_FLAT_COLS_STRUCT_KEYS, {keys}); + auto file = TempFilePath::create(); + auto writeSchema = ROW({"c0"}, {MAP(INTEGER(), BIGINT())}); + writeToFile(file->getPath(), {vector}, config, writeSchema); + auto readSchema = asRowType(vector->type()); + auto plan = + PlanBuilder().tableScan(readSchema, {}, "", writeSchema).planNode(); + auto split = makeHiveConnectorSplit(file->getPath()); + AssertQueryBuilder(plan).split(split).assertResults(vector); +} + // TODO: re-enable this test once we add back driver suspension support for // table scan. TEST_F(TableScanTest, DISABLED_memoryArbitrationWithSlowTableScan) { diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.cpp b/velox/exec/tests/utils/HiveConnectorTestBase.cpp index 224b379da1a5..ed1882f77a35 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.cpp +++ b/velox/exec/tests/utils/HiveConnectorTestBase.cpp @@ -71,9 +71,17 @@ void HiveConnectorTestBase::writeToFile( const std::string& filePath, const std::vector& vectors, std::shared_ptr config) { + writeToFile(filePath, vectors, std::move(config), vectors[0]->type()); +} + +void HiveConnectorTestBase::writeToFile( + const std::string& filePath, + const std::vector& vectors, + std::shared_ptr config, + const TypePtr& schema) { velox::dwrf::WriterOptions options; options.config = config; - options.schema = vectors[0]->type(); + options.schema = schema; auto localWriteFile = std::make_unique(filePath, true, false); auto sink = std::make_unique( std::move(localWriteFile), filePath); diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.h b/velox/exec/tests/utils/HiveConnectorTestBase.h index d35cfbeb1cdd..ebc4437fbe2a 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.h +++ b/velox/exec/tests/utils/HiveConnectorTestBase.h @@ -49,6 +49,12 @@ class HiveConnectorTestBase : public OperatorTestBase { std::shared_ptr config = std::make_shared()); + void writeToFile( + const std::string& filePath, + const std::vector& vectors, + std::shared_ptr config, + const TypePtr& schema); + std::vector makeVectors( const RowTypePtr& rowType, int32_t numVectors,