Skip to content

Commit

Permalink
fix(table-writer): Allow structs to be written as flat maps (#11909)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #11909

Fixing table writer type mapping to allow output and input types to
differ to enable plans that write structs as flat maps. Also fixing
configurations passing in dwrf not to ignore writerOption serdeParameters.

Reviewed By: Yuhta

Differential Revision: D67415905

fbshipit-source-id: de6aea27549ffa78992211f5aeb2106314422f7c
  • Loading branch information
pedroerp authored and facebook-github-bot committed Dec 19, 2024
1 parent 51a5214 commit 3a9089a
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 9 deletions.
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,7 @@ void updateDWRFWriterOptions(
std::dynamic_pointer_cast<dwrf::WriterOptions>(writerOptions);
VELOX_CHECK_NOT_NULL(
dwrfWriterOptions, "DWRF writer expected a DWRF WriterOptions object.");
std::map<std::string, std::string> configs;
std::map<std::string, std::string> configs = writerOptions->serdeParameters;

if (writerOptions->compressionKind.has_value()) {
configs.emplace(
Expand Down
27 changes: 20 additions & 7 deletions velox/exec/TableWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,31 @@ TableWriter::TableWriter(
planNodeId(),
connectorPool_,
spillConfig_.has_value() ? &(spillConfig_.value()) : nullptr);
setTypeMappings(tableWriteNode);
}

auto names = tableWriteNode->columnNames();
auto types = tableWriteNode->columns()->children();
void TableWriter::setTypeMappings(
const std::shared_ptr<const core::TableWriteNode>& tableWriteNode) {
auto outputNames = tableWriteNode->columnNames();
auto outputTypes = tableWriteNode->columns()->children();

const auto& inputType = tableWriteNode->sources()[0]->outputType();

inputMapping_.reserve(types.size());
// Ids that map input to output columns.
inputMapping_.reserve(outputTypes.size());
std::vector<TypePtr> inputTypes;

// Generate mappings between input and output types. Note that column names
// must match, but in some case the types won't, for example, when writing a
// struct (ROW) as a flat map (MAP).
for (const auto& name : tableWriteNode->columns()->names()) {
inputMapping_.emplace_back(inputType->getChildIdx(name));
auto idx = inputType->getChildIdx(name);
inputMapping_.emplace_back(idx);
inputTypes.emplace_back(inputType->childAt(idx));
}

mappedType_ = ROW(std::move(names), std::move(types));
mappedOutputType_ = ROW(folly::copy(outputNames), std::move(outputTypes));
mappedInputType_ = ROW(std::move(outputNames), std::move(inputTypes));
}

void TableWriter::initialize() {
Expand All @@ -88,7 +101,7 @@ void TableWriter::initialize() {

void TableWriter::createDataSink() {
dataSink_ = connector_->createDataSink(
mappedType_,
mappedOutputType_,
insertTableHandle_,
connectorQueryCtx_.get(),
commitStrategy_);
Expand Down Expand Up @@ -135,7 +148,7 @@ void TableWriter::addInput(RowVectorPtr input) {

const auto mappedInput = std::make_shared<RowVector>(
input->pool(),
mappedType_,
mappedInputType_,
input->nulls(),
input->size(),
mappedChildren,
Expand Down
15 changes: 14 additions & 1 deletion velox/exec/TableWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ class TableWriter : public Operator {

void updateStats(const connector::DataSink::Stats& stats);

// Sets type mappings in `inputMapping_`, `mappedInputType_`, and
// `mappedOutputType_`.
void setTypeMappings(
const std::shared_ptr<const core::TableWriteNode>& tableWriteNode);

std::string createTableCommitContext(bool lastOutput);

void setConnectorMemoryReclaimer();
Expand All @@ -213,8 +218,16 @@ class TableWriter : public Operator {
std::shared_ptr<connector::Connector> connector_;
std::shared_ptr<connector::ConnectorQueryCtx> connectorQueryCtx_;
std::unique_ptr<connector::DataSink> dataSink_;

// Contains the mappings between input and output columns.
std::vector<column_index_t> inputMapping_;
std::shared_ptr<const RowType> mappedType_;

// Stores the mapped input and output types. Note that input types must have
// the same types as the types receing in addInput(), but they may be in a
// different order. Output type may have different types to allow the writer
// to convert them (for example, when writing structs as flap maps).
std::shared_ptr<const RowType> mappedInputType_;
std::shared_ptr<const RowType> mappedOutputType_;

// The blocking future might be set when finish data sink.
ContinueFuture blockingFuture_{ContinueFuture::makeEmpty()};
Expand Down
62 changes: 62 additions & 0 deletions velox/exec/tests/TableWriteTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,68 @@ TEST_F(BasicTableWriteTest, roundTrip) {
assertEqualResults({data}, {copy});
}

// Generates a struct (row), write it as a flap map, and check that it is read
// back as a map.
TEST_F(BasicTableWriteTest, structAsMap) {
// Input struct type.
vector_size_t size = 1'000;
auto data = makeRowVector(
{"col1"},
{
makeRowVector(
// Struct field names are the feature/map keys.
{"1", "2"},
{
makeFlatVector<int32_t>(size, [](auto row) { return row; }),
makeFlatVector<int32_t>(size, [](auto row) { return row; }),
}),
});

// Write it as a flat map.
auto outputType = ROW({"col1"}, {MAP(INTEGER(), INTEGER())});
auto targetDirectoryPath = TempDirectoryPath::create();
std::string fileName = "output_file";

auto plan = PlanBuilder()
.values({data})
.tableWrite(
targetDirectoryPath->getPath(),
{},
0,
{},
{},
dwio::common::FileFormat::DWRF,
{},
PlanBuilder::kHiveDefaultConnectorId,
{
{"orc.flatten.map", "true"},
{"orc.map.flat.cols", "0"},
{"orc.map.flat.cols.struct.keys", "[[\"1\", \"2\"]]"},
},
nullptr,
fileName,
common::CompressionKind_NONE,
outputType)
.planNode();
auto writerResults = AssertQueryBuilder(plan).copyResults(pool());

// Check we get the expected map after reading.
auto expected = makeRowVector(
{"col1"},
{
makeMapVector<int32_t, int32_t>(
size,
[](auto /*row*/) { return 2; },
[](auto row) { return row % 2 == 0 ? 2 : 1; },
[](auto row) { return row / 2; }),
});
plan = PlanBuilder().tableScan(outputType).planNode();
AssertQueryBuilder(plan)
.split(makeHiveConnectorSplit(
targetDirectoryPath->getPath() + "/" + fileName))
.assertResults(expected);
}

TEST_F(BasicTableWriteTest, targetFileName) {
constexpr const char* kFileName = "test.dwrf";
auto data = makeRowVector({makeFlatVector<int64_t>(10, folly::identity)});
Expand Down

0 comments on commit 3a9089a

Please sign in to comment.