Skip to content

Commit

Permalink
Support complext schema changed
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Dec 6, 2023
1 parent 9a3889a commit 2df5416
Showing 1 changed file with 13 additions and 23 deletions.
36 changes: 13 additions & 23 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <arrow/c/bridge.h>
#include <arrow/io/interfaces.h>
#include <arrow/table.h>
#include <iostream>
#include "velox/vector/arrow/Bridge.h"

#include "velox/dwio/parquet/writer/Writer.h"
Expand Down Expand Up @@ -123,6 +122,7 @@ Compression::type getArrowParquetCompression(
}

namespace {

std::shared_ptr<WriterProperties> getArrowParquetWriterOptions(
const parquet::WriterOptions& options,
const std::unique_ptr<DefaultFlushPolicy>& flushPolicy) {
Expand All @@ -140,9 +140,9 @@ std::shared_ptr<WriterProperties> getArrowParquetWriterOptions(
return properties->build();
}

void validateSchema(const RowTypePtr schema) {
void validateSchemaRecursive(const RowTypePtr schema) {
// Check the schema's field names is not empty and unique.
VELOX_USER_CHECK_NOT_NULL(schema, "File schema must not be null.");
VELOX_USER_CHECK_NOT_NULL(schema, "File schema must not be empty.");
const auto& fieldNames = schema->names();

folly::F14FastSet<std::string> uniqueNames;
Expand All @@ -159,7 +159,7 @@ void validateSchema(const RowTypePtr schema) {
if (auto childSchema =
std::dynamic_pointer_cast<const RowType>(schema->childAt(i))) {
// Perform validation recursively for child RowTypePtr
validateSchema(childSchema);
validateSchemaRecursive(childSchema);
}
}
}
Expand All @@ -179,7 +179,7 @@ Writer::Writer(
options.bufferGrowRatio)),
arrowContext_(std::make_shared<ArrowContext>()),
schema_(std::move(schema)) {
validateSchema(schema_);
validateSchemaRecursive(schema_);

if (options.flushPolicyFactory) {
flushPolicy_ = options.flushPolicyFactory();
Expand Down Expand Up @@ -263,24 +263,14 @@ void Writer::write(const VectorPtr& dataToWrite) {
auto rowVector =
std::dynamic_pointer_cast<facebook::velox::RowVector>(dataToWrite);
VELOX_CHECK_NULL(rowVector->nulls());

// Update the schema for the complex type.
const auto& children = rowVector->children();
const auto size = schema_->size();
std::vector<VectorPtr> newChildren = children;
for (auto i = 0; i < size; i++) {
auto columnType = schema_->childAt(i);
bool isComplexType = std::dynamic_pointer_cast<const RowType>(columnType) ||
std::dynamic_pointer_cast<const MapType>(columnType) ||
std::dynamic_pointer_cast<const ArrayType>(columnType);

if (isComplexType) {
newChildren[i]->setType(columnType);
}
}

VectorPtr data = std::make_shared<facebook::velox::RowVector>(
rowVector->pool(), schema_, nullptr, rowVector->size(), newChildren);
rowVector->setType(schema_);

auto data = std::make_shared<facebook::velox::RowVector>(
rowVector->pool(),
schema_,
nullptr,
rowVector->size(),
rowVector->children());

ArrowOptions options{.flattenDictionary = true, .flattenConstant = true};
ArrowArray array;
Expand Down

0 comments on commit 2df5416

Please sign in to comment.