diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index 54f875fce0eb..002e2c42ce1c 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -224,10 +224,11 @@ dwio::common::StripeProgress getStripeProgress( * This method assumes each input `ColumnarBatch` have same schema. */ void Writer::write(const VectorPtr& data) { + ArrowOptions options{.flattenDictionary = true, .flattenConstant = true}; ArrowArray array; ArrowSchema schema; - exportToArrow(data, array, generalPool_.get()); - exportToArrow(data, schema); + exportToArrow(data, array, generalPool_.get(), options); + exportToArrow(data, schema, options); PARQUET_ASSIGN_OR_THROW( auto recordBatch, ::arrow::ImportRecordBatch(&array, &schema)); if (!arrowContext_->schema) { diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index 8848330dbecc..1b818b7821f9 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -1073,8 +1073,10 @@ class PartitionedTableWriterTest static std::vector getTestParams() { std::vector testParams; const std::vector multiDriverOptions = {false, true}; - // Add Parquet with https://github.com/facebookincubator/velox/issues/5560 std::vector fileFormats = {FileFormat::DWRF}; + if (hasWriterFactory(FileFormat::PARQUET)) { + fileFormats.push_back(FileFormat::PARQUET); + } for (bool multiDrivers : multiDriverOptions) { for (FileFormat fileFormat : fileFormats) { testParams.push_back(TestParam{ @@ -1185,8 +1187,10 @@ class BucketedTableOnlyWriteTest static std::vector getTestParams() { std::vector testParams; const std::vector multiDriverOptions = {false, true}; - // Add Parquet with https://github.com/facebookincubator/velox/issues/5560 std::vector fileFormats = {FileFormat::DWRF}; + if (hasWriterFactory(FileFormat::PARQUET)) { + fileFormats.push_back(FileFormat::PARQUET); + } for (bool multiDrivers : multiDriverOptions) { for (FileFormat fileFormat : fileFormats) { testParams.push_back(TestParam{ @@ -1313,8 +1317,10 @@ class PartitionedWithoutBucketTableWriterTest static std::vector getTestParams() { std::vector testParams; const std::vector multiDriverOptions = {false, true}; - // Add Parquet with https://github.com/facebookincubator/velox/issues/5560 std::vector fileFormats = {FileFormat::DWRF}; + if (hasWriterFactory(FileFormat::PARQUET)) { + fileFormats.push_back(FileFormat::PARQUET); + } for (bool multiDrivers : multiDriverOptions) { for (FileFormat fileFormat : fileFormats) { testParams.push_back(TestParam{ @@ -1349,8 +1355,10 @@ class AllTableWriterTest : public TableWriteTest, static std::vector getTestParams() { std::vector testParams; const std::vector multiDriverOptions = {false, true}; - // Add Parquet with https://github.com/facebookincubator/velox/issues/5560 std::vector fileFormats = {FileFormat::DWRF}; + if (hasWriterFactory(FileFormat::PARQUET)) { + fileFormats.push_back(FileFormat::PARQUET); + } for (bool multiDrivers : multiDriverOptions) { for (FileFormat fileFormat : fileFormats) { testParams.push_back(TestParam{ diff --git a/velox/vector/arrow/Bridge.cpp b/velox/vector/arrow/Bridge.cpp index 39ed59fa91ae..180bc720de95 100644 --- a/velox/vector/arrow/Bridge.cpp +++ b/velox/vector/arrow/Bridge.cpp @@ -21,7 +21,9 @@ #include "velox/common/base/CheckedArithmetic.h" #include "velox/common/base/Exceptions.h" #include "velox/vector/ComplexVector.h" +#include "velox/vector/DictionaryVector.h" #include "velox/vector/FlatVector.h" +#include "velox/vector/VectorTypeUtils.h" #include "velox/vector/arrow/Abi.h" namespace facebook::velox { @@ -614,14 +616,16 @@ void exportToArrowImpl( const BaseVector&, const Selection&, ArrowArray&, - memory::MemoryPool*); + memory::MemoryPool*, + const ArrowOptions& options); void exportRows( const RowVector& vec, const Selection& rows, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder) { + VeloxToArrowBridgeHolder& holder, + const ArrowOptions& options) { out.n_buffers = 1; holder.resizeChildren(vec.childrenSize()); out.n_children = vec.childrenSize(); @@ -632,7 +636,8 @@ void exportRows( *vec.childAt(i)->loadedVector(), rows, *holder.allocateChild(i), - pool); + pool, + options); } catch (const VeloxException&) { for (column_index_t j = 0; j < i; ++j) { // When exception is thrown, i th child is guaranteed unset. @@ -693,7 +698,8 @@ void exportArrays( const Selection& rows, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder) { + VeloxToArrowBridgeHolder& holder, + const ArrowOptions& options) { Selection childRows(vec.elements()->size()); exportOffsets(vec, rows, out, pool, holder, childRows); holder.resizeChildren(1); @@ -701,7 +707,8 @@ void exportArrays( *vec.elements()->loadedVector(), childRows, *holder.allocateChild(0), - pool); + pool, + options); out.n_children = 1; out.children = holder.getChildrenArrays(); } @@ -711,7 +718,8 @@ void exportMaps( const Selection& rows, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder) { + VeloxToArrowBridgeHolder& holder, + const ArrowOptions& options) { RowVector child( pool, ROW({"key", "value"}, {vec.mapKeys()->type(), vec.mapValues()->type()}), @@ -721,11 +729,42 @@ void exportMaps( Selection childRows(child.size()); exportOffsets(vec, rows, out, pool, holder, childRows); holder.resizeChildren(1); - exportToArrowImpl(child, childRows, *holder.allocateChild(0), pool); + exportToArrowImpl(child, childRows, *holder.allocateChild(0), pool, options); out.n_children = 1; out.children = holder.getChildrenArrays(); } +template +void flattenAndExport( + const BaseVector& vec, + const Selection& rows, + ArrowArray& out, + memory::MemoryPool* pool, + VeloxToArrowBridgeHolder& holder) { + using NativeType = typename velox::TypeTraits::NativeType; + SelectivityVector allRows(vec.size()); + DecodedVector decoded(vec, allRows); + auto flatVector = BaseVector::create>( + vec.type(), decoded.size(), pool); + + if (decoded.mayHaveNulls()) { + allRows.applyToSelected([&](vector_size_t row) { + if (decoded.isNullAt(row)) { + flatVector->setNull(row, true); + } else { + flatVector->set(row, decoded.valueAt(row)); + } + }); + exportValidityBitmap(*flatVector, rows, out, pool, holder); + exportFlat(*flatVector, rows, out, pool, holder); + } else { + allRows.applyToSelected([&](vector_size_t row) { + flatVector->set(row, decoded.valueAt(row)); + }); + exportFlat(*flatVector, rows, out, pool, holder); + } +} + void exportDictionary( const BaseVector& vec, const Selection& rows, @@ -743,7 +782,22 @@ void exportDictionary( } auto& values = *vec.valueVector()->loadedVector(); out.dictionary = holder.allocateDictionary(); - exportToArrowImpl(values, Selection(values.size()), *out.dictionary, pool); + exportToArrowImpl( + values, Selection(values.size()), *out.dictionary, pool, ArrowOptions()); +} + +void exportFlattenedVector( + const BaseVector& vec, + const Selection& rows, + ArrowArray& out, + memory::MemoryPool* pool, + VeloxToArrowBridgeHolder& holder) { + VELOX_CHECK( + vec.valueVector() == nullptr || vec.wrappedVector()->isFlatEncoding(), + "An unsupported nested encoding was found."); + VELOX_CHECK(vec.isScalar(), "Flattening is only supported for scalar types."); + VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( + flattenAndExport, vec.typeKind(), vec, rows, out, pool, holder); } // Set the array as using "Null Layout" - no buffers are allocated. @@ -790,7 +844,7 @@ void exportConstantValue( wrapInBufferViewAsViewer(vec.valuesAsVoid(), bufferSize), vec.mayHaveNulls() ? 1 : 0); } - exportToArrowImpl(*valuesVector, selection, out, pool); + exportToArrowImpl(*valuesVector, selection, out, pool, ArrowOptions()); } // Velox constant vectors are exported as Arrow REE containing a single run @@ -837,7 +891,8 @@ void exportToArrowImpl( const BaseVector& vec, const Selection& rows, ArrowArray& out, - memory::MemoryPool* pool) { + memory::MemoryPool* pool, + const ArrowOptions& options) { auto holder = std::make_unique(); out.buffers = holder->getArrowBuffers(); out.length = rows.count(); @@ -850,19 +905,26 @@ void exportToArrowImpl( exportFlat(vec, rows, out, pool, *holder); break; case VectorEncoding::Simple::ROW: - exportRows(*vec.asUnchecked(), rows, out, pool, *holder); + exportRows( + *vec.asUnchecked(), rows, out, pool, *holder, options); break; case VectorEncoding::Simple::ARRAY: - exportArrays(*vec.asUnchecked(), rows, out, pool, *holder); + exportArrays( + *vec.asUnchecked(), rows, out, pool, *holder, options); break; case VectorEncoding::Simple::MAP: - exportMaps(*vec.asUnchecked(), rows, out, pool, *holder); + exportMaps( + *vec.asUnchecked(), rows, out, pool, *holder, options); break; case VectorEncoding::Simple::DICTIONARY: - exportDictionary(vec, rows, out, pool, *holder); + options.flattenDictionary + ? exportFlattenedVector(vec, rows, out, pool, *holder) + : exportDictionary(vec, rows, out, pool, *holder); break; case VectorEncoding::Simple::CONSTANT: - exportConstant(vec, rows, out, pool, *holder); + options.flattenConstant + ? exportFlattenedVector(vec, rows, out, pool, *holder) + : exportConstant(vec, rows, out, pool, *holder); break; default: VELOX_NYI("{} cannot be exported to Arrow yet.", vec.encoding()); @@ -986,11 +1048,16 @@ TypePtr importFromArrowImpl( void exportToArrow( const VectorPtr& vector, ArrowArray& arrowArray, - memory::MemoryPool* pool) { - exportToArrowImpl(*vector, Selection(vector->size()), arrowArray, pool); + memory::MemoryPool* pool, + const ArrowOptions& options) { + exportToArrowImpl( + *vector, Selection(vector->size()), arrowArray, pool, options); } -void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) { +void exportToArrow( + const VectorPtr& vec, + ArrowSchema& arrowSchema, + const ArrowOptions& options) { auto& type = vec->type(); arrowSchema.name = nullptr; @@ -1007,12 +1074,20 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) { if (vec->encoding() == VectorEncoding::Simple::DICTIONARY) { arrowSchema.n_children = 0; arrowSchema.children = nullptr; - arrowSchema.format = "i"; - bridgeHolder->dictionary = std::make_unique(); - arrowSchema.dictionary = bridgeHolder->dictionary.get(); - exportToArrow(vec->valueVector(), *arrowSchema.dictionary); - - } else if (vec->encoding() == VectorEncoding::Simple::CONSTANT) { + if (options.flattenDictionary) { + // Dictionary data is flattened. Set the underlying data types. + arrowSchema.dictionary = nullptr; + arrowSchema.format = + exportArrowFormatStr(type, bridgeHolder->formatBuffer); + } else { + arrowSchema.format = "i"; + bridgeHolder->dictionary = std::make_unique(); + arrowSchema.dictionary = bridgeHolder->dictionary.get(); + exportToArrow(vec->valueVector(), *arrowSchema.dictionary); + } + } else if ( + vec->encoding() == VectorEncoding::Simple::CONSTANT && + !options.flattenConstant) { // Arrow REE spec available in // https://arrow.apache.org/docs/format/Columnar.html#run-end-encoded-layout arrowSchema.format = "+r"; @@ -1024,7 +1099,7 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) { // Contants of complex types are stored in the `values` vector. if (valueVector != nullptr) { - exportToArrow(valueVector, *valuesChild); + exportToArrow(valueVector, *valuesChild, options); } else { valuesChild->format = exportArrowFormatStr(type, bridgeHolder->formatBuffer); @@ -1034,7 +1109,6 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) { bridgeHolder->setChildAtIndex( 0, newArrowSchema("i", "run_ends"), arrowSchema); bridgeHolder->setChildAtIndex(1, std::move(valuesChild), arrowSchema); - } else { arrowSchema.format = exportArrowFormatStr(type, bridgeHolder->formatBuffer); arrowSchema.dictionary = nullptr; @@ -1051,14 +1125,14 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) { 0, std::vector{maps.mapKeys(), maps.mapValues()}, maps.getNullCount()); - exportToArrow(rows, *child); + exportToArrow(rows, *child, options); child->name = "entries"; bridgeHolder->setChildAtIndex(0, std::move(child), arrowSchema); } else if (type->kind() == TypeKind::ARRAY) { auto child = std::make_unique(); auto& arrays = *vec->asUnchecked(); - exportToArrow(arrays.elements(), *child); + exportToArrow(arrays.elements(), *child, options); // Name is required, and "item" is the default name used in arrow itself. child->name = "item"; bridgeHolder->setChildAtIndex(0, std::move(child), arrowSchema); @@ -1091,7 +1165,7 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) { try { auto& currentSchema = bridgeHolder->childrenOwned[i]; currentSchema = std::make_unique(); - exportToArrow(rows.childAt(i), *currentSchema); + exportToArrow(rows.childAt(i), *currentSchema, options); currentSchema->name = bridgeHolder->rowType->nameOf(i).data(); arrowSchema.children[i] = currentSchema.get(); } catch (const VeloxException& e) { diff --git a/velox/vector/arrow/Bridge.h b/velox/vector/arrow/Bridge.h index faeb571d3816..ba30068f8957 100644 --- a/velox/vector/arrow/Bridge.h +++ b/velox/vector/arrow/Bridge.h @@ -25,6 +25,11 @@ struct ArrowArray; struct ArrowSchema; +struct ArrowOptions { + bool flattenDictionary{false}; + bool flattenConstant{false}; +}; + namespace facebook::velox { /// Export a generic Velox Vector to an ArrowArray, as defined by Arrow's C data @@ -57,7 +62,8 @@ namespace facebook::velox { void exportToArrow( const VectorPtr& vector, ArrowArray& arrowArray, - memory::MemoryPool* pool); + memory::MemoryPool* pool, + const ArrowOptions& options = ArrowOptions{}); /// Export the type of a Velox vector to an ArrowSchema. /// @@ -78,7 +84,10 @@ void exportToArrow( /// /// NOTE: Since Arrow couples type and encoding, we need both Velox type and /// actual data (containing encoding) to create an ArrowSchema. -void exportToArrow(const VectorPtr&, ArrowSchema&); +void exportToArrow( + const VectorPtr&, + ArrowSchema&, + const ArrowOptions& = ArrowOptions{}); /// Import an ArrowSchema into a Velox Type object. ///