diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 7ed63620ee3e..5910de1bbe5f 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -975,7 +975,7 @@ void updateDWRFWriterOptions( std::dynamic_pointer_cast(writerOptions); VELOX_CHECK_NOT_NULL( dwrfWriterOptions, "DWRF writer expected a DWRF WriterOptions object."); - std::map configs; + std::map configs = writerOptions->serdeParameters; if (writerOptions->compressionKind.has_value()) { configs.emplace( diff --git a/velox/exec/TableWriter.cpp b/velox/exec/TableWriter.cpp index 604a2ec00472..1e8e8df3c415 100644 --- a/velox/exec/TableWriter.cpp +++ b/velox/exec/TableWriter.cpp @@ -63,18 +63,31 @@ TableWriter::TableWriter( planNodeId(), connectorPool_, spillConfig_.has_value() ? &(spillConfig_.value()) : nullptr); + setTypeMappings(tableWriteNode); +} - auto names = tableWriteNode->columnNames(); - auto types = tableWriteNode->columns()->children(); +void TableWriter::setTypeMappings( + const std::shared_ptr& tableWriteNode) { + auto outputNames = tableWriteNode->columnNames(); + auto outputTypes = tableWriteNode->columns()->children(); const auto& inputType = tableWriteNode->sources()[0]->outputType(); - inputMapping_.reserve(types.size()); + // Ids that map input to output columns. + inputMapping_.reserve(outputTypes.size()); + std::vector inputTypes; + + // Generate mappings between input and output types. Note that column names + // must match, but in some case the types won't, for example, when writing a + // struct (ROW) as a flat map (MAP). for (const auto& name : tableWriteNode->columns()->names()) { - inputMapping_.emplace_back(inputType->getChildIdx(name)); + auto idx = inputType->getChildIdx(name); + inputMapping_.emplace_back(idx); + inputTypes.emplace_back(inputType->childAt(idx)); } - mappedType_ = ROW(std::move(names), std::move(types)); + mappedOutputType_ = ROW(folly::copy(outputNames), std::move(outputTypes)); + mappedInputType_ = ROW(std::move(outputNames), std::move(inputTypes)); } void TableWriter::initialize() { @@ -88,7 +101,7 @@ void TableWriter::initialize() { void TableWriter::createDataSink() { dataSink_ = connector_->createDataSink( - mappedType_, + mappedOutputType_, insertTableHandle_, connectorQueryCtx_.get(), commitStrategy_); @@ -135,7 +148,7 @@ void TableWriter::addInput(RowVectorPtr input) { const auto mappedInput = std::make_shared( input->pool(), - mappedType_, + mappedInputType_, input->nulls(), input->size(), mappedChildren, diff --git a/velox/exec/TableWriter.h b/velox/exec/TableWriter.h index b71f24c14588..0edc36e9d38c 100644 --- a/velox/exec/TableWriter.h +++ b/velox/exec/TableWriter.h @@ -199,6 +199,11 @@ class TableWriter : public Operator { void updateStats(const connector::DataSink::Stats& stats); + // Sets type mappings in `inputMapping_`, `mappedInputType_`, and + // `mappedOutputType_`. + void setTypeMappings( + const std::shared_ptr& tableWriteNode); + std::string createTableCommitContext(bool lastOutput); void setConnectorMemoryReclaimer(); @@ -213,8 +218,16 @@ class TableWriter : public Operator { std::shared_ptr connector_; std::shared_ptr connectorQueryCtx_; std::unique_ptr dataSink_; + + // Contains the mappings between input and output columns. std::vector inputMapping_; - std::shared_ptr mappedType_; + + // Stores the mapped input and output types. Note that input types must have + // the same types as the types receing in addInput(), but they may be in a + // different order. Output type may have different types to allow the writer + // to convert them (for example, when writing structs as flap maps). + std::shared_ptr mappedInputType_; + std::shared_ptr mappedOutputType_; // The blocking future might be set when finish data sink. ContinueFuture blockingFuture_{ContinueFuture::makeEmpty()}; diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index d0c420d7c37c..a94bfb94ce2c 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -1271,6 +1271,68 @@ TEST_F(BasicTableWriteTest, roundTrip) { assertEqualResults({data}, {copy}); } +// Generates a struct (row), write it as a flap map, and check that it is read +// back as a map. +TEST_F(BasicTableWriteTest, structAsMap) { + // Input struct type. + vector_size_t size = 1'000; + auto data = makeRowVector( + {"col1"}, + { + makeRowVector( + // Struct field names are the feature/map keys. + {"1", "2"}, + { + makeFlatVector(size, [](auto row) { return row; }), + makeFlatVector(size, [](auto row) { return row; }), + }), + }); + + // Write it as a flat map. + auto outputType = ROW({"col1"}, {MAP(INTEGER(), INTEGER())}); + auto targetDirectoryPath = TempDirectoryPath::create(); + std::string fileName = "output_file"; + + auto plan = PlanBuilder() + .values({data}) + .tableWrite( + targetDirectoryPath->getPath(), + {}, + 0, + {}, + {}, + dwio::common::FileFormat::DWRF, + {}, + PlanBuilder::kHiveDefaultConnectorId, + { + {"orc.flatten.map", "true"}, + {"orc.map.flat.cols", "0"}, + {"orc.map.flat.cols.struct.keys", "[[\"1\", \"2\"]]"}, + }, + nullptr, + fileName, + common::CompressionKind_NONE, + outputType) + .planNode(); + auto writerResults = AssertQueryBuilder(plan).copyResults(pool()); + + // Check we get the expected map after reading. + auto expected = makeRowVector( + {"col1"}, + { + makeMapVector( + size, + [](auto /*row*/) { return 2; }, + [](auto row) { return row % 2 == 0 ? 2 : 1; }, + [](auto row) { return row / 2; }), + }); + plan = PlanBuilder().tableScan(outputType).planNode(); + AssertQueryBuilder(plan) + .split(makeHiveConnectorSplit( + targetDirectoryPath->getPath() + "/" + fileName)) + .assertResults(expected); +} + TEST_F(BasicTableWriteTest, targetFileName) { constexpr const char* kFileName = "test.dwrf"; auto data = makeRowVector({makeFlatVector(10, folly::identity)});