From 98308143adc593fabbbf23f1b9a12c02d462fed6 Mon Sep 17 00:00:00 2001 From: Christian Zentgraf Date: Tue, 5 Dec 2023 08:58:54 -0800 Subject: [PATCH] Fix writing dictionary data to parquet (#7025) Summary: In a chunked approach the partitioned data is sliced into dictionary vectors where the partition vectors are shared across chunks. A previous chunk row position is changed and not applicable to the prior chunk when the dictionary content is resolved. This results in incorrect data being written to the partitioned files. In addition, Arrow does not support NULL values in a dictionary and throws an exception. The exception is NotImplemented: Writing DictionaryArray with null encoded in dictionary type not yet supported See https://github.com/apache/arrow/blob/73589ddd60e4cbcd860102871692541989ea38c6/cpp/src/parquet/arrow/path_internal.cc#L752 To solve both issues, the dictionary vector representing the partitioning is flattened into a FlatVector. As a result the data is copied to persist it across chunks. To make the constant vectors in the TableWriterTest work they are also flattened into flat vectors with the same logic the dictionary vectors are. The ArrowBridge has a new optional option structure to indicate if dictionary and constant vectors should be flattened. Resolves https://github.com/facebookincubator/velox/issues/5560 Pull Request resolved: https://github.com/facebookincubator/velox/pull/7025 Reviewed By: mbasmanova Differential Revision: D51760838 Pulled By: Yuhta fbshipit-source-id: 1ef7d57af199ea0e0089200a06477759ac31a90a --- velox/dwio/parquet/writer/Writer.cpp | 5 +- velox/exec/tests/TableWriteTest.cpp | 16 +++- velox/vector/arrow/Bridge.cpp | 132 +++++++++++++++++++++------ velox/vector/arrow/Bridge.h | 13 ++- 4 files changed, 129 insertions(+), 37 deletions(-) diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index 54f875fce0eb..002e2c42ce1c 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -224,10 +224,11 @@ dwio::common::StripeProgress getStripeProgress( * This method assumes each input `ColumnarBatch` have same schema. */ void Writer::write(const VectorPtr& data) { + ArrowOptions options{.flattenDictionary = true, .flattenConstant = true}; ArrowArray array; ArrowSchema schema; - exportToArrow(data, array, generalPool_.get()); - exportToArrow(data, schema); + exportToArrow(data, array, generalPool_.get(), options); + exportToArrow(data, schema, options); PARQUET_ASSIGN_OR_THROW( auto recordBatch, ::arrow::ImportRecordBatch(&array, &schema)); if (!arrowContext_->schema) { diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index 8848330dbecc..1b818b7821f9 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -1073,8 +1073,10 @@ class PartitionedTableWriterTest static std::vector getTestParams() { std::vector testParams; const std::vector multiDriverOptions = {false, true}; - // Add Parquet with https://github.com/facebookincubator/velox/issues/5560 std::vector fileFormats = {FileFormat::DWRF}; + if (hasWriterFactory(FileFormat::PARQUET)) { + fileFormats.push_back(FileFormat::PARQUET); + } for (bool multiDrivers : multiDriverOptions) { for (FileFormat fileFormat : fileFormats) { testParams.push_back(TestParam{ @@ -1185,8 +1187,10 @@ class BucketedTableOnlyWriteTest static std::vector getTestParams() { std::vector testParams; const std::vector multiDriverOptions = {false, true}; - // Add Parquet with https://github.com/facebookincubator/velox/issues/5560 std::vector fileFormats = {FileFormat::DWRF}; + if (hasWriterFactory(FileFormat::PARQUET)) { + fileFormats.push_back(FileFormat::PARQUET); + } for (bool multiDrivers : multiDriverOptions) { for (FileFormat fileFormat : fileFormats) { testParams.push_back(TestParam{ @@ -1313,8 +1317,10 @@ class PartitionedWithoutBucketTableWriterTest static std::vector getTestParams() { std::vector testParams; const std::vector multiDriverOptions = {false, true}; - // Add Parquet with https://github.com/facebookincubator/velox/issues/5560 std::vector fileFormats = {FileFormat::DWRF}; + if (hasWriterFactory(FileFormat::PARQUET)) { + fileFormats.push_back(FileFormat::PARQUET); + } for (bool multiDrivers : multiDriverOptions) { for (FileFormat fileFormat : fileFormats) { testParams.push_back(TestParam{ @@ -1349,8 +1355,10 @@ class AllTableWriterTest : public TableWriteTest, static std::vector getTestParams() { std::vector testParams; const std::vector multiDriverOptions = {false, true}; - // Add Parquet with https://github.com/facebookincubator/velox/issues/5560 std::vector fileFormats = {FileFormat::DWRF}; + if (hasWriterFactory(FileFormat::PARQUET)) { + fileFormats.push_back(FileFormat::PARQUET); + } for (bool multiDrivers : multiDriverOptions) { for (FileFormat fileFormat : fileFormats) { testParams.push_back(TestParam{ diff --git a/velox/vector/arrow/Bridge.cpp b/velox/vector/arrow/Bridge.cpp index 39ed59fa91ae..180bc720de95 100644 --- a/velox/vector/arrow/Bridge.cpp +++ b/velox/vector/arrow/Bridge.cpp @@ -21,7 +21,9 @@ #include "velox/common/base/CheckedArithmetic.h" #include "velox/common/base/Exceptions.h" #include "velox/vector/ComplexVector.h" +#include "velox/vector/DictionaryVector.h" #include "velox/vector/FlatVector.h" +#include "velox/vector/VectorTypeUtils.h" #include "velox/vector/arrow/Abi.h" namespace facebook::velox { @@ -614,14 +616,16 @@ void exportToArrowImpl( const BaseVector&, const Selection&, ArrowArray&, - memory::MemoryPool*); + memory::MemoryPool*, + const ArrowOptions& options); void exportRows( const RowVector& vec, const Selection& rows, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder) { + VeloxToArrowBridgeHolder& holder, + const ArrowOptions& options) { out.n_buffers = 1; holder.resizeChildren(vec.childrenSize()); out.n_children = vec.childrenSize(); @@ -632,7 +636,8 @@ void exportRows( *vec.childAt(i)->loadedVector(), rows, *holder.allocateChild(i), - pool); + pool, + options); } catch (const VeloxException&) { for (column_index_t j = 0; j < i; ++j) { // When exception is thrown, i th child is guaranteed unset. @@ -693,7 +698,8 @@ void exportArrays( const Selection& rows, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder) { + VeloxToArrowBridgeHolder& holder, + const ArrowOptions& options) { Selection childRows(vec.elements()->size()); exportOffsets(vec, rows, out, pool, holder, childRows); holder.resizeChildren(1); @@ -701,7 +707,8 @@ void exportArrays( *vec.elements()->loadedVector(), childRows, *holder.allocateChild(0), - pool); + pool, + options); out.n_children = 1; out.children = holder.getChildrenArrays(); } @@ -711,7 +718,8 @@ void exportMaps( const Selection& rows, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder) { + VeloxToArrowBridgeHolder& holder, + const ArrowOptions& options) { RowVector child( pool, ROW({"key", "value"}, {vec.mapKeys()->type(), vec.mapValues()->type()}), @@ -721,11 +729,42 @@ void exportMaps( Selection childRows(child.size()); exportOffsets(vec, rows, out, pool, holder, childRows); holder.resizeChildren(1); - exportToArrowImpl(child, childRows, *holder.allocateChild(0), pool); + exportToArrowImpl(child, childRows, *holder.allocateChild(0), pool, options); out.n_children = 1; out.children = holder.getChildrenArrays(); } +template +void flattenAndExport( + const BaseVector& vec, + const Selection& rows, + ArrowArray& out, + memory::MemoryPool* pool, + VeloxToArrowBridgeHolder& holder) { + using NativeType = typename velox::TypeTraits::NativeType; + SelectivityVector allRows(vec.size()); + DecodedVector decoded(vec, allRows); + auto flatVector = BaseVector::create>( + vec.type(), decoded.size(), pool); + + if (decoded.mayHaveNulls()) { + allRows.applyToSelected([&](vector_size_t row) { + if (decoded.isNullAt(row)) { + flatVector->setNull(row, true); + } else { + flatVector->set(row, decoded.valueAt(row)); + } + }); + exportValidityBitmap(*flatVector, rows, out, pool, holder); + exportFlat(*flatVector, rows, out, pool, holder); + } else { + allRows.applyToSelected([&](vector_size_t row) { + flatVector->set(row, decoded.valueAt(row)); + }); + exportFlat(*flatVector, rows, out, pool, holder); + } +} + void exportDictionary( const BaseVector& vec, const Selection& rows, @@ -743,7 +782,22 @@ void exportDictionary( } auto& values = *vec.valueVector()->loadedVector(); out.dictionary = holder.allocateDictionary(); - exportToArrowImpl(values, Selection(values.size()), *out.dictionary, pool); + exportToArrowImpl( + values, Selection(values.size()), *out.dictionary, pool, ArrowOptions()); +} + +void exportFlattenedVector( + const BaseVector& vec, + const Selection& rows, + ArrowArray& out, + memory::MemoryPool* pool, + VeloxToArrowBridgeHolder& holder) { + VELOX_CHECK( + vec.valueVector() == nullptr || vec.wrappedVector()->isFlatEncoding(), + "An unsupported nested encoding was found."); + VELOX_CHECK(vec.isScalar(), "Flattening is only supported for scalar types."); + VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( + flattenAndExport, vec.typeKind(), vec, rows, out, pool, holder); } // Set the array as using "Null Layout" - no buffers are allocated. @@ -790,7 +844,7 @@ void exportConstantValue( wrapInBufferViewAsViewer(vec.valuesAsVoid(), bufferSize), vec.mayHaveNulls() ? 1 : 0); } - exportToArrowImpl(*valuesVector, selection, out, pool); + exportToArrowImpl(*valuesVector, selection, out, pool, ArrowOptions()); } // Velox constant vectors are exported as Arrow REE containing a single run @@ -837,7 +891,8 @@ void exportToArrowImpl( const BaseVector& vec, const Selection& rows, ArrowArray& out, - memory::MemoryPool* pool) { + memory::MemoryPool* pool, + const ArrowOptions& options) { auto holder = std::make_unique(); out.buffers = holder->getArrowBuffers(); out.length = rows.count(); @@ -850,19 +905,26 @@ void exportToArrowImpl( exportFlat(vec, rows, out, pool, *holder); break; case VectorEncoding::Simple::ROW: - exportRows(*vec.asUnchecked(), rows, out, pool, *holder); + exportRows( + *vec.asUnchecked(), rows, out, pool, *holder, options); break; case VectorEncoding::Simple::ARRAY: - exportArrays(*vec.asUnchecked(), rows, out, pool, *holder); + exportArrays( + *vec.asUnchecked(), rows, out, pool, *holder, options); break; case VectorEncoding::Simple::MAP: - exportMaps(*vec.asUnchecked(), rows, out, pool, *holder); + exportMaps( + *vec.asUnchecked(), rows, out, pool, *holder, options); break; case VectorEncoding::Simple::DICTIONARY: - exportDictionary(vec, rows, out, pool, *holder); + options.flattenDictionary + ? exportFlattenedVector(vec, rows, out, pool, *holder) + : exportDictionary(vec, rows, out, pool, *holder); break; case VectorEncoding::Simple::CONSTANT: - exportConstant(vec, rows, out, pool, *holder); + options.flattenConstant + ? exportFlattenedVector(vec, rows, out, pool, *holder) + : exportConstant(vec, rows, out, pool, *holder); break; default: VELOX_NYI("{} cannot be exported to Arrow yet.", vec.encoding()); @@ -986,11 +1048,16 @@ TypePtr importFromArrowImpl( void exportToArrow( const VectorPtr& vector, ArrowArray& arrowArray, - memory::MemoryPool* pool) { - exportToArrowImpl(*vector, Selection(vector->size()), arrowArray, pool); + memory::MemoryPool* pool, + const ArrowOptions& options) { + exportToArrowImpl( + *vector, Selection(vector->size()), arrowArray, pool, options); } -void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) { +void exportToArrow( + const VectorPtr& vec, + ArrowSchema& arrowSchema, + const ArrowOptions& options) { auto& type = vec->type(); arrowSchema.name = nullptr; @@ -1007,12 +1074,20 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) { if (vec->encoding() == VectorEncoding::Simple::DICTIONARY) { arrowSchema.n_children = 0; arrowSchema.children = nullptr; - arrowSchema.format = "i"; - bridgeHolder->dictionary = std::make_unique(); - arrowSchema.dictionary = bridgeHolder->dictionary.get(); - exportToArrow(vec->valueVector(), *arrowSchema.dictionary); - - } else if (vec->encoding() == VectorEncoding::Simple::CONSTANT) { + if (options.flattenDictionary) { + // Dictionary data is flattened. Set the underlying data types. + arrowSchema.dictionary = nullptr; + arrowSchema.format = + exportArrowFormatStr(type, bridgeHolder->formatBuffer); + } else { + arrowSchema.format = "i"; + bridgeHolder->dictionary = std::make_unique(); + arrowSchema.dictionary = bridgeHolder->dictionary.get(); + exportToArrow(vec->valueVector(), *arrowSchema.dictionary); + } + } else if ( + vec->encoding() == VectorEncoding::Simple::CONSTANT && + !options.flattenConstant) { // Arrow REE spec available in // https://arrow.apache.org/docs/format/Columnar.html#run-end-encoded-layout arrowSchema.format = "+r"; @@ -1024,7 +1099,7 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) { // Contants of complex types are stored in the `values` vector. if (valueVector != nullptr) { - exportToArrow(valueVector, *valuesChild); + exportToArrow(valueVector, *valuesChild, options); } else { valuesChild->format = exportArrowFormatStr(type, bridgeHolder->formatBuffer); @@ -1034,7 +1109,6 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) { bridgeHolder->setChildAtIndex( 0, newArrowSchema("i", "run_ends"), arrowSchema); bridgeHolder->setChildAtIndex(1, std::move(valuesChild), arrowSchema); - } else { arrowSchema.format = exportArrowFormatStr(type, bridgeHolder->formatBuffer); arrowSchema.dictionary = nullptr; @@ -1051,14 +1125,14 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) { 0, std::vector{maps.mapKeys(), maps.mapValues()}, maps.getNullCount()); - exportToArrow(rows, *child); + exportToArrow(rows, *child, options); child->name = "entries"; bridgeHolder->setChildAtIndex(0, std::move(child), arrowSchema); } else if (type->kind() == TypeKind::ARRAY) { auto child = std::make_unique(); auto& arrays = *vec->asUnchecked(); - exportToArrow(arrays.elements(), *child); + exportToArrow(arrays.elements(), *child, options); // Name is required, and "item" is the default name used in arrow itself. child->name = "item"; bridgeHolder->setChildAtIndex(0, std::move(child), arrowSchema); @@ -1091,7 +1165,7 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) { try { auto& currentSchema = bridgeHolder->childrenOwned[i]; currentSchema = std::make_unique(); - exportToArrow(rows.childAt(i), *currentSchema); + exportToArrow(rows.childAt(i), *currentSchema, options); currentSchema->name = bridgeHolder->rowType->nameOf(i).data(); arrowSchema.children[i] = currentSchema.get(); } catch (const VeloxException& e) { diff --git a/velox/vector/arrow/Bridge.h b/velox/vector/arrow/Bridge.h index faeb571d3816..ba30068f8957 100644 --- a/velox/vector/arrow/Bridge.h +++ b/velox/vector/arrow/Bridge.h @@ -25,6 +25,11 @@ struct ArrowArray; struct ArrowSchema; +struct ArrowOptions { + bool flattenDictionary{false}; + bool flattenConstant{false}; +}; + namespace facebook::velox { /// Export a generic Velox Vector to an ArrowArray, as defined by Arrow's C data @@ -57,7 +62,8 @@ namespace facebook::velox { void exportToArrow( const VectorPtr& vector, ArrowArray& arrowArray, - memory::MemoryPool* pool); + memory::MemoryPool* pool, + const ArrowOptions& options = ArrowOptions{}); /// Export the type of a Velox vector to an ArrowSchema. /// @@ -78,7 +84,10 @@ void exportToArrow( /// /// NOTE: Since Arrow couples type and encoding, we need both Velox type and /// actual data (containing encoding) to create an ArrowSchema. -void exportToArrow(const VectorPtr&, ArrowSchema&); +void exportToArrow( + const VectorPtr&, + ArrowSchema&, + const ArrowOptions& = ArrowOptions{}); /// Import an ArrowSchema into a Velox Type object. ///