Skip to content

Commit

Permalink
Fix writing dictionary data to parquet (#7025)
Browse files Browse the repository at this point in the history
Summary:
In a chunked approach the partitioned data is sliced into
dictionary vectors where the partition vectors are shared
across chunks. A previous chunk row position is changed and
not applicable to the prior chunk when the dictionary content
is resolved. This results in incorrect data being written
to the partitioned files.

In addition, Arrow does not support NULL values in a
dictionary and throws an exception.
The exception is
NotImplemented: Writing DictionaryArray with null encoded in dictionary type not yet supported
See https://github.com/apache/arrow/blob/73589ddd60e4cbcd860102871692541989ea38c6/cpp/src/parquet/arrow/path_internal.cc#L752

To solve both issues, the dictionary vector representing
the partitioning is flattened into a FlatVector.
As a result the data is copied to persist it
across chunks.

To make the constant vectors in the TableWriterTest work
they are also flattened into flat vectors with the same
logic the dictionary vectors are.

The ArrowBridge has a new optional option structure to
indicate if dictionary and constant vectors should be flattened.

Resolves #5560

Pull Request resolved: #7025

Reviewed By: mbasmanova

Differential Revision: D51760838

Pulled By: Yuhta

fbshipit-source-id: 1ef7d57af199ea0e0089200a06477759ac31a90a
  • Loading branch information
czentgr authored and facebook-github-bot committed Dec 5, 2023
1 parent 975ca3a commit 9830814
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 37 deletions.
5 changes: 3 additions & 2 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,11 @@ dwio::common::StripeProgress getStripeProgress(
* This method assumes each input `ColumnarBatch` have same schema.
*/
void Writer::write(const VectorPtr& data) {
ArrowOptions options{.flattenDictionary = true, .flattenConstant = true};
ArrowArray array;
ArrowSchema schema;
exportToArrow(data, array, generalPool_.get());
exportToArrow(data, schema);
exportToArrow(data, array, generalPool_.get(), options);
exportToArrow(data, schema, options);
PARQUET_ASSIGN_OR_THROW(
auto recordBatch, ::arrow::ImportRecordBatch(&array, &schema));
if (!arrowContext_->schema) {
Expand Down
16 changes: 12 additions & 4 deletions velox/exec/tests/TableWriteTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1073,8 +1073,10 @@ class PartitionedTableWriterTest
static std::vector<uint64_t> getTestParams() {
std::vector<uint64_t> testParams;
const std::vector<bool> multiDriverOptions = {false, true};
// Add Parquet with https://github.com/facebookincubator/velox/issues/5560
std::vector<FileFormat> fileFormats = {FileFormat::DWRF};
if (hasWriterFactory(FileFormat::PARQUET)) {
fileFormats.push_back(FileFormat::PARQUET);
}
for (bool multiDrivers : multiDriverOptions) {
for (FileFormat fileFormat : fileFormats) {
testParams.push_back(TestParam{
Expand Down Expand Up @@ -1185,8 +1187,10 @@ class BucketedTableOnlyWriteTest
static std::vector<uint64_t> getTestParams() {
std::vector<uint64_t> testParams;
const std::vector<bool> multiDriverOptions = {false, true};
// Add Parquet with https://github.com/facebookincubator/velox/issues/5560
std::vector<FileFormat> fileFormats = {FileFormat::DWRF};
if (hasWriterFactory(FileFormat::PARQUET)) {
fileFormats.push_back(FileFormat::PARQUET);
}
for (bool multiDrivers : multiDriverOptions) {
for (FileFormat fileFormat : fileFormats) {
testParams.push_back(TestParam{
Expand Down Expand Up @@ -1313,8 +1317,10 @@ class PartitionedWithoutBucketTableWriterTest
static std::vector<uint64_t> getTestParams() {
std::vector<uint64_t> testParams;
const std::vector<bool> multiDriverOptions = {false, true};
// Add Parquet with https://github.com/facebookincubator/velox/issues/5560
std::vector<FileFormat> fileFormats = {FileFormat::DWRF};
if (hasWriterFactory(FileFormat::PARQUET)) {
fileFormats.push_back(FileFormat::PARQUET);
}
for (bool multiDrivers : multiDriverOptions) {
for (FileFormat fileFormat : fileFormats) {
testParams.push_back(TestParam{
Expand Down Expand Up @@ -1349,8 +1355,10 @@ class AllTableWriterTest : public TableWriteTest,
static std::vector<uint64_t> getTestParams() {
std::vector<uint64_t> testParams;
const std::vector<bool> multiDriverOptions = {false, true};
// Add Parquet with https://github.com/facebookincubator/velox/issues/5560
std::vector<FileFormat> fileFormats = {FileFormat::DWRF};
if (hasWriterFactory(FileFormat::PARQUET)) {
fileFormats.push_back(FileFormat::PARQUET);
}
for (bool multiDrivers : multiDriverOptions) {
for (FileFormat fileFormat : fileFormats) {
testParams.push_back(TestParam{
Expand Down
132 changes: 103 additions & 29 deletions velox/vector/arrow/Bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include "velox/common/base/CheckedArithmetic.h"
#include "velox/common/base/Exceptions.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/DictionaryVector.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/VectorTypeUtils.h"
#include "velox/vector/arrow/Abi.h"

namespace facebook::velox {
Expand Down Expand Up @@ -614,14 +616,16 @@ void exportToArrowImpl(
const BaseVector&,
const Selection&,
ArrowArray&,
memory::MemoryPool*);
memory::MemoryPool*,
const ArrowOptions& options);

void exportRows(
const RowVector& vec,
const Selection& rows,
ArrowArray& out,
memory::MemoryPool* pool,
VeloxToArrowBridgeHolder& holder) {
VeloxToArrowBridgeHolder& holder,
const ArrowOptions& options) {
out.n_buffers = 1;
holder.resizeChildren(vec.childrenSize());
out.n_children = vec.childrenSize();
Expand All @@ -632,7 +636,8 @@ void exportRows(
*vec.childAt(i)->loadedVector(),
rows,
*holder.allocateChild(i),
pool);
pool,
options);
} catch (const VeloxException&) {
for (column_index_t j = 0; j < i; ++j) {
// When exception is thrown, i th child is guaranteed unset.
Expand Down Expand Up @@ -693,15 +698,17 @@ void exportArrays(
const Selection& rows,
ArrowArray& out,
memory::MemoryPool* pool,
VeloxToArrowBridgeHolder& holder) {
VeloxToArrowBridgeHolder& holder,
const ArrowOptions& options) {
Selection childRows(vec.elements()->size());
exportOffsets(vec, rows, out, pool, holder, childRows);
holder.resizeChildren(1);
exportToArrowImpl(
*vec.elements()->loadedVector(),
childRows,
*holder.allocateChild(0),
pool);
pool,
options);
out.n_children = 1;
out.children = holder.getChildrenArrays();
}
Expand All @@ -711,7 +718,8 @@ void exportMaps(
const Selection& rows,
ArrowArray& out,
memory::MemoryPool* pool,
VeloxToArrowBridgeHolder& holder) {
VeloxToArrowBridgeHolder& holder,
const ArrowOptions& options) {
RowVector child(
pool,
ROW({"key", "value"}, {vec.mapKeys()->type(), vec.mapValues()->type()}),
Expand All @@ -721,11 +729,42 @@ void exportMaps(
Selection childRows(child.size());
exportOffsets(vec, rows, out, pool, holder, childRows);
holder.resizeChildren(1);
exportToArrowImpl(child, childRows, *holder.allocateChild(0), pool);
exportToArrowImpl(child, childRows, *holder.allocateChild(0), pool, options);
out.n_children = 1;
out.children = holder.getChildrenArrays();
}

template <TypeKind kind>
void flattenAndExport(
const BaseVector& vec,
const Selection& rows,
ArrowArray& out,
memory::MemoryPool* pool,
VeloxToArrowBridgeHolder& holder) {
using NativeType = typename velox::TypeTraits<kind>::NativeType;
SelectivityVector allRows(vec.size());
DecodedVector decoded(vec, allRows);
auto flatVector = BaseVector::create<FlatVector<NativeType>>(
vec.type(), decoded.size(), pool);

if (decoded.mayHaveNulls()) {
allRows.applyToSelected([&](vector_size_t row) {
if (decoded.isNullAt(row)) {
flatVector->setNull(row, true);
} else {
flatVector->set(row, decoded.valueAt<NativeType>(row));
}
});
exportValidityBitmap(*flatVector, rows, out, pool, holder);
exportFlat(*flatVector, rows, out, pool, holder);
} else {
allRows.applyToSelected([&](vector_size_t row) {
flatVector->set(row, decoded.valueAt<NativeType>(row));
});
exportFlat(*flatVector, rows, out, pool, holder);
}
}

void exportDictionary(
const BaseVector& vec,
const Selection& rows,
Expand All @@ -743,7 +782,22 @@ void exportDictionary(
}
auto& values = *vec.valueVector()->loadedVector();
out.dictionary = holder.allocateDictionary();
exportToArrowImpl(values, Selection(values.size()), *out.dictionary, pool);
exportToArrowImpl(
values, Selection(values.size()), *out.dictionary, pool, ArrowOptions());
}

void exportFlattenedVector(
const BaseVector& vec,
const Selection& rows,
ArrowArray& out,
memory::MemoryPool* pool,
VeloxToArrowBridgeHolder& holder) {
VELOX_CHECK(
vec.valueVector() == nullptr || vec.wrappedVector()->isFlatEncoding(),
"An unsupported nested encoding was found.");
VELOX_CHECK(vec.isScalar(), "Flattening is only supported for scalar types.");
VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH(
flattenAndExport, vec.typeKind(), vec, rows, out, pool, holder);
}

// Set the array as using "Null Layout" - no buffers are allocated.
Expand Down Expand Up @@ -790,7 +844,7 @@ void exportConstantValue(
wrapInBufferViewAsViewer(vec.valuesAsVoid(), bufferSize),
vec.mayHaveNulls() ? 1 : 0);
}
exportToArrowImpl(*valuesVector, selection, out, pool);
exportToArrowImpl(*valuesVector, selection, out, pool, ArrowOptions());
}

// Velox constant vectors are exported as Arrow REE containing a single run
Expand Down Expand Up @@ -837,7 +891,8 @@ void exportToArrowImpl(
const BaseVector& vec,
const Selection& rows,
ArrowArray& out,
memory::MemoryPool* pool) {
memory::MemoryPool* pool,
const ArrowOptions& options) {
auto holder = std::make_unique<VeloxToArrowBridgeHolder>();
out.buffers = holder->getArrowBuffers();
out.length = rows.count();
Expand All @@ -850,19 +905,26 @@ void exportToArrowImpl(
exportFlat(vec, rows, out, pool, *holder);
break;
case VectorEncoding::Simple::ROW:
exportRows(*vec.asUnchecked<RowVector>(), rows, out, pool, *holder);
exportRows(
*vec.asUnchecked<RowVector>(), rows, out, pool, *holder, options);
break;
case VectorEncoding::Simple::ARRAY:
exportArrays(*vec.asUnchecked<ArrayVector>(), rows, out, pool, *holder);
exportArrays(
*vec.asUnchecked<ArrayVector>(), rows, out, pool, *holder, options);
break;
case VectorEncoding::Simple::MAP:
exportMaps(*vec.asUnchecked<MapVector>(), rows, out, pool, *holder);
exportMaps(
*vec.asUnchecked<MapVector>(), rows, out, pool, *holder, options);
break;
case VectorEncoding::Simple::DICTIONARY:
exportDictionary(vec, rows, out, pool, *holder);
options.flattenDictionary
? exportFlattenedVector(vec, rows, out, pool, *holder)
: exportDictionary(vec, rows, out, pool, *holder);
break;
case VectorEncoding::Simple::CONSTANT:
exportConstant(vec, rows, out, pool, *holder);
options.flattenConstant
? exportFlattenedVector(vec, rows, out, pool, *holder)
: exportConstant(vec, rows, out, pool, *holder);
break;
default:
VELOX_NYI("{} cannot be exported to Arrow yet.", vec.encoding());
Expand Down Expand Up @@ -986,11 +1048,16 @@ TypePtr importFromArrowImpl(
void exportToArrow(
const VectorPtr& vector,
ArrowArray& arrowArray,
memory::MemoryPool* pool) {
exportToArrowImpl(*vector, Selection(vector->size()), arrowArray, pool);
memory::MemoryPool* pool,
const ArrowOptions& options) {
exportToArrowImpl(
*vector, Selection(vector->size()), arrowArray, pool, options);
}

void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) {
void exportToArrow(
const VectorPtr& vec,
ArrowSchema& arrowSchema,
const ArrowOptions& options) {
auto& type = vec->type();

arrowSchema.name = nullptr;
Expand All @@ -1007,12 +1074,20 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) {
if (vec->encoding() == VectorEncoding::Simple::DICTIONARY) {
arrowSchema.n_children = 0;
arrowSchema.children = nullptr;
arrowSchema.format = "i";
bridgeHolder->dictionary = std::make_unique<ArrowSchema>();
arrowSchema.dictionary = bridgeHolder->dictionary.get();
exportToArrow(vec->valueVector(), *arrowSchema.dictionary);

} else if (vec->encoding() == VectorEncoding::Simple::CONSTANT) {
if (options.flattenDictionary) {
// Dictionary data is flattened. Set the underlying data types.
arrowSchema.dictionary = nullptr;
arrowSchema.format =
exportArrowFormatStr(type, bridgeHolder->formatBuffer);
} else {
arrowSchema.format = "i";
bridgeHolder->dictionary = std::make_unique<ArrowSchema>();
arrowSchema.dictionary = bridgeHolder->dictionary.get();
exportToArrow(vec->valueVector(), *arrowSchema.dictionary);
}
} else if (
vec->encoding() == VectorEncoding::Simple::CONSTANT &&
!options.flattenConstant) {
// Arrow REE spec available in
// https://arrow.apache.org/docs/format/Columnar.html#run-end-encoded-layout
arrowSchema.format = "+r";
Expand All @@ -1024,7 +1099,7 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) {

// Contants of complex types are stored in the `values` vector.
if (valueVector != nullptr) {
exportToArrow(valueVector, *valuesChild);
exportToArrow(valueVector, *valuesChild, options);
} else {
valuesChild->format =
exportArrowFormatStr(type, bridgeHolder->formatBuffer);
Expand All @@ -1034,7 +1109,6 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) {
bridgeHolder->setChildAtIndex(
0, newArrowSchema("i", "run_ends"), arrowSchema);
bridgeHolder->setChildAtIndex(1, std::move(valuesChild), arrowSchema);

} else {
arrowSchema.format = exportArrowFormatStr(type, bridgeHolder->formatBuffer);
arrowSchema.dictionary = nullptr;
Expand All @@ -1051,14 +1125,14 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) {
0,
std::vector<VectorPtr>{maps.mapKeys(), maps.mapValues()},
maps.getNullCount());
exportToArrow(rows, *child);
exportToArrow(rows, *child, options);
child->name = "entries";
bridgeHolder->setChildAtIndex(0, std::move(child), arrowSchema);

} else if (type->kind() == TypeKind::ARRAY) {
auto child = std::make_unique<ArrowSchema>();
auto& arrays = *vec->asUnchecked<ArrayVector>();
exportToArrow(arrays.elements(), *child);
exportToArrow(arrays.elements(), *child, options);
// Name is required, and "item" is the default name used in arrow itself.
child->name = "item";
bridgeHolder->setChildAtIndex(0, std::move(child), arrowSchema);
Expand Down Expand Up @@ -1091,7 +1165,7 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) {
try {
auto& currentSchema = bridgeHolder->childrenOwned[i];
currentSchema = std::make_unique<ArrowSchema>();
exportToArrow(rows.childAt(i), *currentSchema);
exportToArrow(rows.childAt(i), *currentSchema, options);
currentSchema->name = bridgeHolder->rowType->nameOf(i).data();
arrowSchema.children[i] = currentSchema.get();
} catch (const VeloxException& e) {
Expand Down
13 changes: 11 additions & 2 deletions velox/vector/arrow/Bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
struct ArrowArray;
struct ArrowSchema;

struct ArrowOptions {
bool flattenDictionary{false};
bool flattenConstant{false};
};

namespace facebook::velox {

/// Export a generic Velox Vector to an ArrowArray, as defined by Arrow's C data
Expand Down Expand Up @@ -57,7 +62,8 @@ namespace facebook::velox {
void exportToArrow(
const VectorPtr& vector,
ArrowArray& arrowArray,
memory::MemoryPool* pool);
memory::MemoryPool* pool,
const ArrowOptions& options = ArrowOptions{});

/// Export the type of a Velox vector to an ArrowSchema.
///
Expand All @@ -78,7 +84,10 @@ void exportToArrow(
///
/// NOTE: Since Arrow couples type and encoding, we need both Velox type and
/// actual data (containing encoding) to create an ArrowSchema.
void exportToArrow(const VectorPtr&, ArrowSchema&);
void exportToArrow(
const VectorPtr&,
ArrowSchema&,
const ArrowOptions& = ArrowOptions{});

/// Import an ArrowSchema into a Velox Type object.
///
Expand Down

0 comments on commit 9830814

Please sign in to comment.