From 64adf5868ba8242225fd12ae357f3e758927bcbb Mon Sep 17 00:00:00 2001 From: rui-mo Date: Fri, 26 Jan 2024 10:52:20 +0800 Subject: [PATCH] Support different timestamp units in arrow bridge (7625) --- velox/connectors/hive/HiveConfig.cpp | 5 + velox/connectors/hive/HiveConfig.h | 8 + velox/connectors/hive/HiveDataSink.cpp | 2 + velox/core/QueryConfig.h | 11 + velox/dwio/common/Options.h | 1 + velox/dwio/parquet/writer/Writer.cpp | 12 +- velox/dwio/parquet/writer/Writer.h | 4 + velox/exec/ArrowStream.cpp | 4 +- velox/exec/ArrowStream.h | 1 + velox/exec/tests/ArrowStreamTest.cpp | 5 +- velox/vector/arrow/Bridge.cpp | 243 +++++++++++++----- velox/vector/arrow/Bridge.h | 13 +- .../arrow/tests/ArrowBridgeArrayTest.cpp | 191 ++++++++++---- .../arrow/tests/ArrowBridgeSchemaTest.cpp | 26 +- 14 files changed, 395 insertions(+), 131 deletions(-) diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index e6048f0d6aed..b624c65bc7b0 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -206,4 +206,9 @@ bool HiveConfig::s3UseProxyFromEnv() const { return config_->get(kS3UseProxyFromEnv, false); } +// static. +uint8_t HiveConfig::arrowBridgeTimestampUnit(const Config* session) const { + return session->get(kArrowBridgeTimestampUnit, 9 /* nano */); +} + } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/HiveConfig.h b/velox/connectors/hive/HiveConfig.h index e50b41157036..494e0ad05cab 100644 --- a/velox/connectors/hive/HiveConfig.h +++ b/velox/connectors/hive/HiveConfig.h @@ -178,6 +178,10 @@ class HiveConfig { static constexpr const char* kS3UseProxyFromEnv = "hive.s3.use-proxy-from-env"; + // Timestamp unit used during Velox-Arrow conversion. + static constexpr const char* kArrowBridgeTimestampUnit = + "arrow_bridge_timestamp_unit"; + InsertExistingPartitionsBehavior insertExistingPartitionsBehavior( const Config* session) const; @@ -247,6 +251,10 @@ class HiveConfig { bool s3UseProxyFromEnv() const; + /// Returns the timestamp unit used in Velox-Arrow conversion. + /// 0: second, 3: milli, 6: micro, 9: nano. + uint8_t arrowBridgeTimestampUnit(const Config* session) const; + HiveConfig(std::shared_ptr config) { VELOX_CHECK_NOT_NULL( config, "Config is null for HiveConfig initialization"); diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index d627119b9862..80fd2ff754b1 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -678,6 +678,8 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) { options.serdeParameters = std::map( insertTableHandle_->serdeParameters().begin(), insertTableHandle_->serdeParameters().end()); + options.arrowBridgeTimestampUnit = + hiveConfig_->arrowBridgeTimestampUnit(connectorSessionProperties); ioStats_.emplace_back(std::make_shared()); // Prevents the memory allocation during the writer creation. diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 39bf52cfdab9..7f2b110331d9 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -357,6 +357,10 @@ class QueryConfig { static constexpr const char* kDriverCpuTimeSliceLimitMs = "driver_cpu_time_slice_limit_ms"; + // Timestamp unit used during Velox-Arrow conversion. + static constexpr const char* kArrowBridgeTimestampUnit = + "arrow_bridge_timestamp_unit"; + uint64_t queryMaxMemoryPerNode() const { return toCapacity( get(kQueryMaxMemoryPerNode, "0B"), CapacityUnit::BYTE); @@ -582,6 +586,13 @@ class QueryConfig { return get(kSpillStartPartitionBit, kDefaultStartBit); } + /// Returns the timestamp unit used in Velox-Arrow conversion. + /// 0: second, 3: milli, 6: micro, 9: nano. + uint8_t arrowBridgeTimestampUnit() const { + constexpr uint8_t kDefaultUnit = 9; + return get(kArrowBridgeTimestampUnit, kDefaultUnit); + } + /// Returns the number of bits used to calculate the spilling partition /// number for hash join. The number of spilling partitions will be power of /// two. diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 92674f5a8fa2..200ef0efeb8c 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -583,6 +583,7 @@ struct WriterOptions { std::optional maxStripeSize{std::nullopt}; std::optional maxDictionaryMemory{std::nullopt}; std::map serdeParameters; + std::optional arrowBridgeTimestampUnit; }; } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index b6479189251d..52dda538d0d9 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -21,7 +21,6 @@ #include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/dwio/parquet/writer/arrow/Writer.h" #include "velox/exec/MemoryReclaimer.h" -#include "velox/vector/arrow/Bridge.h" namespace facebook::velox::parquet { @@ -234,6 +233,8 @@ Writer::Writer( } else { flushPolicy_ = std::make_unique(); } + options_.timestampUnit = + static_cast(options.arrowBridgeTimestampUnit); arrowContext_->properties = getArrowParquetWriterOptions(options, flushPolicy_); setMemoryReclaimers(); @@ -310,11 +311,10 @@ void Writer::write(const VectorPtr& data) { data->type()->equivalent(*schema_), "The file schema type should be equal with the input rowvector type."); - ArrowOptions options{.flattenDictionary = true, .flattenConstant = true}; ArrowArray array; ArrowSchema schema; - exportToArrow(data, array, generalPool_.get(), options); - exportToArrow(data, schema, options); + exportToArrow(data, array, generalPool_.get(), options_); + exportToArrow(data, schema, options_); // Convert the arrow schema to Schema and then update the column names based // on schema_. @@ -386,6 +386,10 @@ parquet::WriterOptions getParquetOptions( if (options.compressionKind.has_value()) { parquetOptions.compression = options.compressionKind.value(); } + if (options.arrowBridgeTimestampUnit.has_value()) { + parquetOptions.arrowBridgeTimestampUnit = + options.arrowBridgeTimestampUnit.value(); + } return parquetOptions; } diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index 6065366bce6b..4384c2720268 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -26,6 +26,7 @@ #include "velox/dwio/parquet/writer/arrow/Types.h" #include "velox/dwio/parquet/writer/arrow/util/Compression.h" #include "velox/vector/ComplexVector.h" +#include "velox/vector/arrow/Bridge.h" namespace facebook::velox::parquet { @@ -102,6 +103,7 @@ struct WriterOptions { std::shared_ptr codecOptions; std::unordered_map columnCompressionsMap; + uint8_t arrowBridgeTimestampUnit = static_cast(TimestampUnit::kNano); }; // Writes Velox vectors into a DataSink using Arrow Parquet writer. @@ -158,6 +160,8 @@ class Writer : public dwio::common::Writer { std::unique_ptr flushPolicy_; const RowTypePtr schema_; + + ArrowOptions options_{.flattenDictionary = true, .flattenConstant = true}; }; class ParquetWriterFactory : public dwio::common::WriterFactory { diff --git a/velox/exec/ArrowStream.cpp b/velox/exec/ArrowStream.cpp index 863e43f8ba22..d90734e842e4 100644 --- a/velox/exec/ArrowStream.cpp +++ b/velox/exec/ArrowStream.cpp @@ -27,6 +27,8 @@ ArrowStream::ArrowStream( operatorId, arrowStreamNode->id(), "ArrowStream") { + options_.timestampUnit = static_cast( + driverCtx->queryConfig().arrowBridgeTimestampUnit()); arrowStream_ = arrowStreamNode->arrowStream(); } @@ -66,7 +68,7 @@ RowVectorPtr ArrowStream::getOutput() { // Convert Arrow Array into RowVector and return. return std::dynamic_pointer_cast( - importFromArrowAsOwner(arrowSchema, arrowArray, pool())); + importFromArrowAsOwner(arrowSchema, arrowArray, options_, pool())); } bool ArrowStream::isFinished() { diff --git a/velox/exec/ArrowStream.h b/velox/exec/ArrowStream.h index c35894d0d283..34225f5f44c5 100644 --- a/velox/exec/ArrowStream.h +++ b/velox/exec/ArrowStream.h @@ -45,6 +45,7 @@ class ArrowStream : public SourceOperator { bool finished_ = false; std::shared_ptr arrowStream_; + ArrowOptions options_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/tests/ArrowStreamTest.cpp b/velox/exec/tests/ArrowStreamTest.cpp index f0fe9b37e04e..1b450e7200a3 100644 --- a/velox/exec/tests/ArrowStreamTest.cpp +++ b/velox/exec/tests/ArrowStreamTest.cpp @@ -45,7 +45,7 @@ class ArrowStreamTest : public OperatorTestBase { int getNext(struct ArrowArray* outArray) { if (vectorIndex_ < vectors_.size()) { - exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get()); + exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get(), options_); vectorIndex_ += 1; } else { // End of stream. Mark the array released. @@ -56,12 +56,13 @@ class ArrowStreamTest : public OperatorTestBase { } int getArrowSchema(ArrowSchema& out) { - exportToArrow(BaseVector::create(type_, 0, pool_.get()), out); + exportToArrow(BaseVector::create(type_, 0, pool_.get()), out, options_); return failGetSchema_ ? (int)ErrorCode::kGetSchemaFailed : (int)ErrorCode::kNoError; } private: + ArrowOptions options_; const std::shared_ptr pool_; const std::vector& vectors_; const TypePtr type_; diff --git a/velox/vector/arrow/Bridge.cpp b/velox/vector/arrow/Bridge.cpp index dd563315131a..b4703c11810f 100644 --- a/velox/vector/arrow/Bridge.cpp +++ b/velox/vector/arrow/Bridge.cpp @@ -215,6 +215,7 @@ static void releaseArrowSchema(ArrowSchema* arrowSchema) { // Returns the Arrow C data interface format type for a given Velox type. const char* exportArrowFormatStr( const TypePtr& type, + const ArrowOptions& options, std::string& formatBuffer) { if (type->isDecimal()) { // Decimal types encode the precision, scale values. @@ -250,9 +251,19 @@ const char* exportArrowFormatStr( return "z"; // binary case TypeKind::UNKNOWN: return "n"; // NullType - case TypeKind::TIMESTAMP: - return "ttn"; // time64 [nanoseconds] + switch (options.timestampUnit) { + case TimestampUnit::kSecond: + return "tss:"; + case TimestampUnit::kMilli: + return "tsm:"; + case TimestampUnit::kMicro: + return "tsu:"; + case TimestampUnit::kNano: + return "tsn:"; + default: + VELOX_UNREACHABLE(); + } // Complex/nested types. case TypeKind::ARRAY: static_assert(sizeof(vector_size_t) == 4); @@ -335,6 +346,7 @@ void gatherFromBuffer( const Type& type, const Buffer& buf, const Selection& rows, + const ArrowOptions& options, Buffer& out) { auto src = buf.as(); auto dst = out.asMutable(); @@ -352,8 +364,23 @@ void gatherFromBuffer( } else if (type.isTimestamp()) { auto srcTs = buf.as(); auto dstTs = out.asMutable(); - - rows.apply([&](vector_size_t i) { dstTs[j++] = srcTs[i].toNanos(); }); + switch (options.timestampUnit) { + case TimestampUnit::kSecond: + rows.apply( + [&](vector_size_t i) { dstTs[j++] = srcTs[i].getSeconds(); }); + break; + case TimestampUnit::kMilli: + rows.apply([&](vector_size_t i) { dstTs[j++] = srcTs[i].toMillis(); }); + break; + case TimestampUnit::kMicro: + rows.apply([&](vector_size_t i) { dstTs[j++] = srcTs[i].toMicros(); }); + break; + case TimestampUnit::kNano: + rows.apply([&](vector_size_t i) { dstTs[j++] = srcTs[i].toNanos(); }); + break; + default: + VELOX_UNREACHABLE(); + } } else { auto typeSize = type.cppSizeInBytes(); rows.apply([&](vector_size_t i) { @@ -383,9 +410,9 @@ struct BufferViewReleaser { const std::shared_ptr arrayReleaser_; }; -// Wraps a naked pointer using a Velox buffer view, without copying it. Adding a -// dummy releaser as the buffer lifetime is fully controled by the client of the -// API. +// Wraps a naked pointer using a Velox buffer view, without copying it. Adding +// a dummy releaser as the buffer lifetime is fully controled by the client of +// the API. BufferPtr wrapInBufferViewAsViewer(const void* buffer, size_t length) { static const BufferViewReleaser kViewerReleaser; return BufferView::create( @@ -472,11 +499,12 @@ VectorPtr createStringFlatVector( optionalNullCount(nullCount)); } -// This functions does two things: (a) sets the value of null_count, and (b) the -// validity buffer (if there is at least one null row). +// This functions does two things: (a) sets the value of null_count, and (b) +// the validity buffer (if there is at least one null row). void exportValidityBitmap( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -490,7 +518,7 @@ void exportValidityBitmap( // If we're only exporting a subset, create a new validity buffer. if (rows.changed()) { nulls = AlignedBuffer::allocate(out.length, pool); - gatherFromBuffer(*BOOLEAN(), *vec.nulls(), rows, *nulls); + gatherFromBuffer(*BOOLEAN(), *vec.nulls(), rows, options, *nulls); } // Set null counts. @@ -506,8 +534,8 @@ void exportValidityBitmap( } bool isFlatScalarZeroCopy(const TypePtr& type) { - // - Short decimals need to be converted to 128 bit values as they are mapped - // to Arrow Decimal128. + // - Short decimals need to be converted to 128 bit values as they are + // mapped to Arrow Decimal128. // - Velox's Timestamp representation (2x 64bit values) does not have an // equivalent in Arrow. return !type->isShortDecimal() && !type->isTimestamp(); @@ -528,6 +556,7 @@ size_t getArrowElementSize(const TypePtr& type) { void exportValues( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -545,7 +574,7 @@ void exportValues( ? AlignedBuffer::allocate(out.length, pool) : AlignedBuffer::allocate( checkedMultiply(out.length, size), pool); - gatherFromBuffer(*type, *vec.values(), rows, *values); + gatherFromBuffer(*type, *vec.values(), rows, options, *values); holder.setBuffer(1, values); } @@ -585,6 +614,7 @@ void exportStrings( void exportFlat( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -601,7 +631,7 @@ void exportFlat( case TypeKind::DOUBLE: case TypeKind::TIMESTAMP: case TypeKind::UNKNOWN: - exportValues(vec, rows, out, pool, holder); + exportValues(vec, rows, options, out, pool, holder); break; case TypeKind::VARCHAR: case TypeKind::VARBINARY: @@ -618,17 +648,17 @@ void exportFlat( void exportToArrowImpl( const BaseVector&, const Selection&, + const ArrowOptions& options, ArrowArray&, - memory::MemoryPool*, - const ArrowOptions& options); + memory::MemoryPool*); void exportRows( const RowVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder, - const ArrowOptions& options) { + VeloxToArrowBridgeHolder& holder) { out.n_buffers = 1; holder.resizeChildren(vec.childrenSize()); out.n_children = vec.childrenSize(); @@ -638,9 +668,9 @@ void exportRows( exportToArrowImpl( *vec.childAt(i)->loadedVector(), rows, + options, *holder.allocateChild(i), - pool, - options); + pool); } catch (const VeloxException&) { for (column_index_t j = 0; j < i; ++j) { // When exception is thrown, i th child is guaranteed unset. @@ -699,19 +729,19 @@ void exportOffsets( void exportArrays( const ArrayVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder, - const ArrowOptions& options) { + VeloxToArrowBridgeHolder& holder) { Selection childRows(vec.elements()->size()); exportOffsets(vec, rows, out, pool, holder, childRows); holder.resizeChildren(1); exportToArrowImpl( *vec.elements()->loadedVector(), childRows, + options, *holder.allocateChild(0), - pool, - options); + pool); out.n_children = 1; out.children = holder.getChildrenArrays(); } @@ -719,10 +749,10 @@ void exportArrays( void exportMaps( const MapVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder, - const ArrowOptions& options) { + VeloxToArrowBridgeHolder& holder) { RowVector child( pool, ROW({"key", "value"}, {vec.mapKeys()->type(), vec.mapValues()->type()}), @@ -732,7 +762,7 @@ void exportMaps( Selection childRows(child.size()); exportOffsets(vec, rows, out, pool, holder, childRows); holder.resizeChildren(1); - exportToArrowImpl(child, childRows, *holder.allocateChild(0), pool, options); + exportToArrowImpl(child, childRows, options, *holder.allocateChild(0), pool); out.n_children = 1; out.children = holder.getChildrenArrays(); } @@ -741,6 +771,7 @@ template void flattenAndExport( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -758,19 +789,20 @@ void flattenAndExport( flatVector->set(row, decoded.valueAt(row)); } }); - exportValidityBitmap(*flatVector, rows, out, pool, holder); - exportFlat(*flatVector, rows, out, pool, holder); + exportValidityBitmap(*flatVector, rows, options, out, pool, holder); + exportFlat(*flatVector, rows, options, out, pool, holder); } else { allRows.applyToSelected([&](vector_size_t row) { flatVector->set(row, decoded.valueAt(row)); }); - exportFlat(*flatVector, rows, out, pool, holder); + exportFlat(*flatVector, rows, options, out, pool, holder); } } void exportDictionary( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -778,7 +810,7 @@ void exportDictionary( out.n_children = 0; if (rows.changed()) { auto indices = AlignedBuffer::allocate(out.length, pool); - gatherFromBuffer(*INTEGER(), *vec.wrapInfo(), rows, *indices); + gatherFromBuffer(*INTEGER(), *vec.wrapInfo(), rows, options, *indices); holder.setBuffer(1, indices); } else { holder.setBuffer(1, vec.wrapInfo()); @@ -786,12 +818,13 @@ void exportDictionary( auto& values = *vec.valueVector()->loadedVector(); out.dictionary = holder.allocateDictionary(); exportToArrowImpl( - values, Selection(values.size()), *out.dictionary, pool, ArrowOptions()); + values, Selection(values.size()), options, *out.dictionary, pool); } void exportFlattenedVector( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -800,11 +833,12 @@ void exportFlattenedVector( "An unsupported nested encoding was found."); VELOX_CHECK(vec.isScalar(), "Flattening is only supported for scalar types."); VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( - flattenAndExport, vec.typeKind(), vec, rows, out, pool, holder); + flattenAndExport, vec.typeKind(), vec, rows, options, out, pool, holder); } void exportConstantValue( const BaseVector& vec, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool) { VectorPtr valuesVector; @@ -834,7 +868,7 @@ void exportConstantValue( wrapInBufferViewAsViewer(vec.valuesAsVoid(), bufferSize), vec.mayHaveNulls() ? 1 : 0); } - exportToArrowImpl(*valuesVector, selection, out, pool, ArrowOptions()); + exportToArrowImpl(*valuesVector, selection, options, out, pool); } // Velox constant vectors are exported as Arrow REE containing a single run @@ -842,6 +876,7 @@ void exportConstantValue( void exportConstant( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -853,7 +888,7 @@ void exportConstant( out.n_children = 2; holder.resizeChildren(2); out.children = holder.getChildrenArrays(); - exportConstantValue(vec, *holder.allocateChild(1), pool); + exportConstantValue(vec, options, *holder.allocateChild(1), pool); // Create the run ends child. auto* runEnds = holder.allocateChild(0); @@ -880,41 +915,41 @@ void exportConstant( void exportToArrowImpl( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, - memory::MemoryPool* pool, - const ArrowOptions& options) { + memory::MemoryPool* pool) { auto holder = std::make_unique(); out.buffers = holder->getArrowBuffers(); out.length = rows.count(); out.offset = 0; out.dictionary = nullptr; - exportValidityBitmap(vec, rows, out, pool, *holder); + exportValidityBitmap(vec, rows, options, out, pool, *holder); switch (vec.encoding()) { case VectorEncoding::Simple::FLAT: - exportFlat(vec, rows, out, pool, *holder); + exportFlat(vec, rows, options, out, pool, *holder); break; case VectorEncoding::Simple::ROW: exportRows( - *vec.asUnchecked(), rows, out, pool, *holder, options); + *vec.asUnchecked(), rows, options, out, pool, *holder); break; case VectorEncoding::Simple::ARRAY: exportArrays( - *vec.asUnchecked(), rows, out, pool, *holder, options); + *vec.asUnchecked(), rows, options, out, pool, *holder); break; case VectorEncoding::Simple::MAP: exportMaps( - *vec.asUnchecked(), rows, out, pool, *holder, options); + *vec.asUnchecked(), rows, options, out, pool, *holder); break; case VectorEncoding::Simple::DICTIONARY: options.flattenDictionary - ? exportFlattenedVector(vec, rows, out, pool, *holder) - : exportDictionary(vec, rows, out, pool, *holder); + ? exportFlattenedVector(vec, rows, options, out, pool, *holder) + : exportDictionary(vec, rows, options, out, pool, *holder); break; case VectorEncoding::Simple::CONSTANT: options.flattenConstant - ? exportFlattenedVector(vec, rows, out, pool, *holder) - : exportConstant(vec, rows, out, pool, *holder); + ? exportFlattenedVector(vec, rows, options, out, pool, *holder) + : exportConstant(vec, rows, options, out, pool, *holder); break; default: VELOX_NYI("{} cannot be exported to Arrow yet.", vec.encoding()); @@ -957,8 +992,7 @@ TypePtr importFromArrowImpl( return VARBINARY(); case 't': // temporal types. - // Mapping it to ttn for now. - if (format[1] == 't' && format[2] == 'n') { + if (format[1] == 's') { return TIMESTAMP(); } if (format[1] == 'd' && format[2] == 'D') { @@ -1050,7 +1084,7 @@ void exportToArrow( memory::MemoryPool* pool, const ArrowOptions& options) { exportToArrowImpl( - *vector, Selection(vector->size()), arrowArray, pool, options); + *vector, Selection(vector->size()), options, arrowArray, pool); } void exportToArrow( @@ -1077,12 +1111,12 @@ void exportToArrow( // Dictionary data is flattened. Set the underlying data types. arrowSchema.dictionary = nullptr; arrowSchema.format = - exportArrowFormatStr(type, bridgeHolder->formatBuffer); + exportArrowFormatStr(type, options, bridgeHolder->formatBuffer); } else { arrowSchema.format = "i"; bridgeHolder->dictionary = std::make_unique(); arrowSchema.dictionary = bridgeHolder->dictionary.get(); - exportToArrow(vec->valueVector(), *arrowSchema.dictionary); + exportToArrow(vec->valueVector(), *arrowSchema.dictionary, options); } } else if ( vec->encoding() == VectorEncoding::Simple::CONSTANT && @@ -1101,7 +1135,7 @@ void exportToArrow( exportToArrow(valueVector, *valuesChild, options); } else { valuesChild->format = - exportArrowFormatStr(type, bridgeHolder->formatBuffer); + exportArrowFormatStr(type, options, bridgeHolder->formatBuffer); } valuesChild->name = "values"; @@ -1109,7 +1143,8 @@ void exportToArrow( 0, newArrowSchema("i", "run_ends"), arrowSchema); bridgeHolder->setChildAtIndex(1, std::move(valuesChild), arrowSchema); } else { - arrowSchema.format = exportArrowFormatStr(type, bridgeHolder->formatBuffer); + arrowSchema.format = + exportArrowFormatStr(type, options, bridgeHolder->formatBuffer); arrowSchema.dictionary = nullptr; if (type->kind() == TypeKind::MAP) { @@ -1204,12 +1239,14 @@ TypePtr importFromArrow(const ArrowSchema& arrowSchema) { namespace { VectorPtr importFromArrowImpl( + const ArrowOptions& options, ArrowSchema& arrowSchema, ArrowArray& arrowArray, memory::MemoryPool* pool, bool isViewer); RowVectorPtr createRowVector( + const ArrowOptions& options, memory::MemoryPool* pool, const RowTypePtr& rowType, BufferPtr nulls, @@ -1224,7 +1261,11 @@ RowVectorPtr createRowVector( for (size_t i = 0; i < arrowArray.n_children; ++i) { childrenVector.push_back(importFromArrowImpl( - *arrowSchema.children[i], *arrowArray.children[i], pool, isViewer)); + options, + *arrowSchema.children[i], + *arrowArray.children[i], + pool, + isViewer)); } return std::make_shared( pool, @@ -1249,6 +1290,7 @@ BufferPtr computeSizes( } ArrayVectorPtr createArrayVector( + const ArrowOptions& options, memory::MemoryPool* pool, const TypePtr& type, BufferPtr nulls, @@ -1264,7 +1306,11 @@ ArrayVectorPtr createArrayVector( auto sizes = computeSizes(offsets->as(), arrowArray.length, pool); auto elements = importFromArrowImpl( - *arrowSchema.children[0], *arrowArray.children[0], pool, isViewer); + options, + *arrowSchema.children[0], + *arrowArray.children[0], + pool, + isViewer); return std::make_shared( pool, type, @@ -1277,6 +1323,7 @@ ArrayVectorPtr createArrayVector( } MapVectorPtr createMapVector( + const ArrowOptions& options, memory::MemoryPool* pool, const TypePtr& type, BufferPtr nulls, @@ -1292,7 +1339,11 @@ MapVectorPtr createMapVector( computeSizes(offsets->as(), arrowArray.length, pool); // Arrow wraps keys and values into a struct. auto entries = importFromArrowImpl( - *arrowSchema.children[0], *arrowArray.children[0], pool, isViewer); + options, + *arrowSchema.children[0], + *arrowArray.children[0], + pool, + isViewer); VELOX_CHECK(entries->type()->isRow()); const auto& rows = *entries->asUnchecked(); VELOX_CHECK_EQ(rows.childrenSize(), 2); @@ -1309,6 +1360,7 @@ MapVectorPtr createMapVector( } VectorPtr createDictionaryVector( + const ArrowOptions& options, memory::MemoryPool* pool, const TypePtr& indexType, BufferPtr nulls, @@ -1327,7 +1379,7 @@ VectorPtr createDictionaryVector( arrowArray.buffers[1], arrowArray.length * sizeof(vector_size_t)); auto type = importFromArrow(*arrowSchema.dictionary); auto wrapped = importFromArrowImpl( - *arrowSchema.dictionary, *arrowArray.dictionary, pool, isViewer); + options, *arrowSchema.dictionary, *arrowArray.dictionary, pool, isViewer); return BaseVector::wrapInDictionary( std::move(nulls), std::move(indices), @@ -1336,6 +1388,7 @@ VectorPtr createDictionaryVector( } VectorPtr createVectorFromReeArray( + const ArrowOptions& options, memory::MemoryPool* pool, const ArrowSchema& arrowSchema, const ArrowArray& arrowArray, @@ -1347,7 +1400,7 @@ VectorPtr createVectorFromReeArray( VELOX_CHECK_EQ(arrowArray.null_count, 0); auto values = importFromArrowImpl( - *arrowSchema.children[1], *arrowArray.children[1], pool, isViewer); + options, *arrowSchema.children[1], *arrowArray.children[1], pool, isViewer); const auto& runEndSchema = *arrowSchema.children[0]; auto runEndType = importFromArrowImpl(runEndSchema.format, runEndSchema); @@ -1388,6 +1441,7 @@ VectorPtr createVectorFromReeArray( } VectorPtr createTimestampVector( + const ArrowOptions& options, memory::MemoryPool* pool, const TypePtr& type, BufferPtr nulls, @@ -1399,10 +1453,37 @@ VectorPtr createTimestampVector( const auto* rawNulls = nulls->as(); if (length > nullCount) { - for (size_t i = 0; i < length; ++i) { - if (!bits::isBitNull(rawNulls, i)) { - rawTimestamps[i] = Timestamp::fromNanos(input[i]); - } + switch (options.timestampUnit) { + case TimestampUnit::kSecond: + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp(input[i], 0); + } + } + break; + case TimestampUnit::kMilli: + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp::fromMillis(input[i]); + } + } + break; + case TimestampUnit::kMicro: + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp::fromMicros(input[i]); + } + } + break; + case TimestampUnit::kNano: + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp::fromNanos(input[i]); + } + } + break; + default: + VELOX_UNREACHABLE(); } } return std::make_shared>( @@ -1422,6 +1503,7 @@ bool isREE(const ArrowSchema& arrowSchema) { } VectorPtr importFromArrowImpl( + const ArrowOptions& options, ArrowSchema& arrowSchema, ArrowArray& arrowArray, memory::MemoryPool* pool, @@ -1459,6 +1541,7 @@ VectorPtr importFromArrowImpl( if (arrowSchema.dictionary) { auto indexType = importFromArrowImpl(arrowSchema.format, arrowSchema); return createDictionaryVector( + options, pool, indexType, nulls, @@ -1469,7 +1552,7 @@ VectorPtr importFromArrowImpl( } if (isREE(arrowSchema)) { - return createVectorFromReeArray(pool, arrowSchema, arrowArray, isViewer); + return createVectorFromReeArray(options, pool, arrowSchema, arrowArray, isViewer); } // String data types (VARCHAR and VARBINARY). @@ -1489,6 +1572,7 @@ VectorPtr importFromArrowImpl( wrapInBufferView); } else if (type->isTimestamp()) { return createTimestampVector( + options, pool, type, nulls, @@ -1498,6 +1582,7 @@ VectorPtr importFromArrowImpl( } else if (type->isRow()) { // Row/structs. return createRowVector( + options, pool, std::dynamic_pointer_cast(type), nulls, @@ -1506,10 +1591,24 @@ VectorPtr importFromArrowImpl( isViewer); } else if (type->isArray()) { return createArrayVector( - pool, type, nulls, arrowSchema, arrowArray, isViewer, wrapInBufferView); + options, + pool, + type, + nulls, + arrowSchema, + arrowArray, + isViewer, + wrapInBufferView); } else if (type->isMap()) { return createMapVector( - pool, type, nulls, arrowSchema, arrowArray, isViewer, wrapInBufferView); + options, + pool, + type, + nulls, + arrowSchema, + arrowArray, + isViewer, + wrapInBufferView); } else if (type->isPrimitiveType()) { // Other primitive types. @@ -1537,13 +1636,19 @@ VectorPtr importFromArrowImpl( } VectorPtr importFromArrowImpl( + const ArrowOptions& options, ArrowSchema& arrowSchema, ArrowArray& arrowArray, memory::MemoryPool* pool, bool isViewer) { if (isViewer) { return importFromArrowImpl( - arrowSchema, arrowArray, pool, isViewer, wrapInBufferViewAsViewer); + options, + arrowSchema, + arrowArray, + pool, + isViewer, + wrapInBufferViewAsViewer); } // This Vector will take over the ownership of `arrowSchema` and `arrowArray` @@ -1570,6 +1675,7 @@ VectorPtr importFromArrowImpl( } }); VectorPtr imported = importFromArrowImpl( + options, arrowSchema, arrowArray, pool, @@ -1590,8 +1696,10 @@ VectorPtr importFromArrowImpl( VectorPtr importFromArrowAsViewer( const ArrowSchema& arrowSchema, const ArrowArray& arrowArray, + const ArrowOptions& options, memory::MemoryPool* pool) { return importFromArrowImpl( + options, const_cast(arrowSchema), const_cast(arrowArray), pool, @@ -1601,8 +1709,9 @@ VectorPtr importFromArrowAsViewer( VectorPtr importFromArrowAsOwner( ArrowSchema& arrowSchema, ArrowArray& arrowArray, + const ArrowOptions& options, memory::MemoryPool* pool) { - return importFromArrowImpl(arrowSchema, arrowArray, pool, false); + return importFromArrowImpl(options, arrowSchema, arrowArray, pool, false); } } // namespace facebook::velox diff --git a/velox/vector/arrow/Bridge.h b/velox/vector/arrow/Bridge.h index ba30068f8957..d50fae740cb2 100644 --- a/velox/vector/arrow/Bridge.h +++ b/velox/vector/arrow/Bridge.h @@ -25,13 +25,15 @@ struct ArrowArray; struct ArrowSchema; +enum class TimestampUnit { kSecond = 0, kMilli = 3, kMicro = 6, kNano = 9 }; + struct ArrowOptions { bool flattenDictionary{false}; bool flattenConstant{false}; + TimestampUnit timestampUnit = TimestampUnit::kNano; }; namespace facebook::velox { - /// Export a generic Velox Vector to an ArrowArray, as defined by Arrow's C data /// interface: /// @@ -63,7 +65,7 @@ void exportToArrow( const VectorPtr& vector, ArrowArray& arrowArray, memory::MemoryPool* pool, - const ArrowOptions& options = ArrowOptions{}); + const ArrowOptions& options); /// Export the type of a Velox vector to an ArrowSchema. /// @@ -84,10 +86,7 @@ void exportToArrow( /// /// NOTE: Since Arrow couples type and encoding, we need both Velox type and /// actual data (containing encoding) to create an ArrowSchema. -void exportToArrow( - const VectorPtr&, - ArrowSchema&, - const ArrowOptions& = ArrowOptions{}); +void exportToArrow(const VectorPtr&, ArrowSchema&, const ArrowOptions&); /// Import an ArrowSchema into a Velox Type object. /// @@ -135,6 +134,7 @@ TypePtr importFromArrow(const ArrowSchema& arrowSchema); VectorPtr importFromArrowAsViewer( const ArrowSchema& arrowSchema, const ArrowArray& arrowArray, + const ArrowOptions& options, memory::MemoryPool* pool); /// Import an ArrowArray and ArrowSchema into a Velox vector, acquiring @@ -152,6 +152,7 @@ VectorPtr importFromArrowAsViewer( VectorPtr importFromArrowAsOwner( ArrowSchema& arrowSchema, ArrowArray& arrowArray, + const ArrowOptions& options, memory::MemoryPool* pool); } // namespace facebook::velox diff --git a/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp b/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp index afde498c82c2..77f605be9eb5 100644 --- a/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp +++ b/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp @@ -54,7 +54,7 @@ class ArrowBridgeArrayExportTest : public testing::Test { const TypePtr& type = CppToType::create()) { auto flatVector = vectorMaker_.flatVectorNullable(inputData, type); ArrowArray arrowArray; - velox::exportToArrow(flatVector, arrowArray, pool_.get()); + velox::exportToArrow(flatVector, arrowArray, pool_.get(), options_); validateArray(inputData, arrowArray); @@ -67,7 +67,7 @@ class ArrowBridgeArrayExportTest : public testing::Test { void testArrayVector(const T& inputData) { auto arrayVector = vectorMaker_.arrayVectorNullable(inputData); ArrowArray arrowArray; - velox::exportToArrow(arrayVector, arrowArray, pool_.get()); + velox::exportToArrow(arrayVector, arrowArray, pool_.get(), options_); validateListArray(inputData, arrowArray); @@ -93,7 +93,7 @@ class ArrowBridgeArrayExportTest : public testing::Test { const VectorPtr& constantVector, const TInput& input) { ArrowArray arrowArray; - velox::exportToArrow(constantVector, arrowArray, pool_.get()); + velox::exportToArrow(constantVector, arrowArray, pool_.get(), options_); validateConstant( input, constantVector->size(), @@ -161,8 +161,33 @@ class ArrowBridgeArrayExportTest : public testing::Test { bits::isBitSet(reinterpret_cast(values), i)) << "mismatch at index " << i; } else if constexpr (std::is_same_v) { - EXPECT_EQ(inputData[i], Timestamp::fromNanos(values[i])) - << "mismatch at index " << i; + switch (options_.timestampUnit) { + case TimestampUnit::kSecond: + EXPECT_EQ( + Timestamp(inputData[i].value().getSeconds(), 0), + Timestamp(values[i], 0)) + << "mismatch at index " << i; + break; + case TimestampUnit::kMilli: + EXPECT_EQ( + Timestamp::fromMillis(inputData[i].value().toMillis()), + Timestamp::fromMillis(values[i])) + << "mismatch at index " << i; + break; + case TimestampUnit::kMicro: + EXPECT_EQ( + Timestamp::fromMicros(inputData[i].value().toMicros()), + Timestamp::fromMicros(values[i])) + << "mismatch at index " << i; + break; + case TimestampUnit::kNano: + EXPECT_EQ(inputData[i], Timestamp::fromNanos(values[i])) + << "mismatch at index " << i; + break; + default: + VELOX_USER_FAIL(fmt::format( + "Timestamp unit not supported: {}.", options_.timestampUnit)); + } } else { EXPECT_EQ(inputData[i], values[i]) << "mismatch at index " << i; } @@ -353,10 +378,12 @@ class ArrowBridgeArrayExportTest : public testing::Test { } void exportToArrow(const TypePtr& type, ArrowSchema& out) { - velox::exportToArrow(BaseVector::create(type, 0, pool_.get()), out); + velox::exportToArrow( + BaseVector::create(type, 0, pool_.get()), out, options_); } // Boiler plate structures required by vectorMaker. + ArrowOptions options_; std::shared_ptr queryCtx_{std::make_shared()}; std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; @@ -371,7 +398,7 @@ TEST_F(ArrowBridgeArrayExportTest, flatNotNull) { // Make sure that ArrowArray is correctly acquiring ownership, even after // the initial vector shared_ptr is gone. auto flatVector = vectorMaker_.flatVector(inputData); - velox::exportToArrow(flatVector, arrowArray, pool_.get()); + velox::exportToArrow(flatVector, arrowArray, pool_.get(), options_); } EXPECT_EQ(inputData.size(), arrowArray.length); @@ -494,16 +521,23 @@ TEST_F(ArrowBridgeArrayExportTest, flatDate) { } TEST_F(ArrowBridgeArrayExportTest, flatTimestamp) { - testFlatVector( - { - Timestamp(0, 0), - std::nullopt, - Timestamp(1699300965, 12'349), - Timestamp(-2208960000, 0), // 1900-01-01 - Timestamp(3155788800, 999'999'999), - std::nullopt, - }, - TIMESTAMP()); + for (uint8_t unit : + {(uint8_t)TimestampUnit::kSecond, + (uint8_t)TimestampUnit::kMilli, + (uint8_t)TimestampUnit::kMicro, + (uint8_t)TimestampUnit::kNano}) { + options_.timestampUnit = static_cast(unit); + testFlatVector( + { + Timestamp(0, 0), + std::nullopt, + Timestamp(1699300965, 12'349), + Timestamp(-2208960000, 0), // 1900-01-01 + Timestamp(3155788800, 999'999'999), + std::nullopt, + }, + TIMESTAMP()); + } // Out of range. If nanosecond precision is represented in Arrow, timestamps // starting around 2263-01-01 should overflow and throw a user exception. @@ -542,7 +576,7 @@ TEST_F(ArrowBridgeArrayExportTest, rowVector) { }); ArrowArray arrowArray; - velox::exportToArrow(vector, arrowArray, pool_.get()); + velox::exportToArrow(vector, arrowArray, pool_.get(), options_); EXPECT_EQ(col1.size(), arrowArray.length); EXPECT_EQ(0, arrowArray.null_count); @@ -579,7 +613,7 @@ TEST_F(ArrowBridgeArrayExportTest, rowVectorNullable) { vector->setNullCount(3); ArrowArray arrowArray; - velox::exportToArrow(vector, arrowArray, pool_.get()); + velox::exportToArrow(vector, arrowArray, pool_.get(), options_); EXPECT_EQ(col1.size(), arrowArray.length); EXPECT_EQ(3, arrowArray.null_count); @@ -607,7 +641,8 @@ TEST_F(ArrowBridgeArrayExportTest, rowVectorNullable) { TEST_F(ArrowBridgeArrayExportTest, rowVectorEmpty) { ArrowArray arrowArray; - velox::exportToArrow(vectorMaker_.rowVector({}), arrowArray, pool_.get()); + velox::exportToArrow( + vectorMaker_.rowVector({}), arrowArray, pool_.get(), options_); EXPECT_EQ(0, arrowArray.n_children); EXPECT_EQ(1, arrowArray.n_buffers); EXPECT_EQ(nullptr, arrowArray.children); @@ -617,11 +652,12 @@ TEST_F(ArrowBridgeArrayExportTest, rowVectorEmpty) { std::shared_ptr toArrow( const VectorPtr& vec, + const ArrowOptions& options, memory::MemoryPool* pool) { ArrowSchema schema; ArrowArray array; - exportToArrow(vec, schema); - exportToArrow(vec, array, pool); + exportToArrow(vec, schema, options); + exportToArrow(vec, array, pool, options); EXPECT_OK_AND_ASSIGN(auto type, arrow::ImportType(&schema)); EXPECT_OK_AND_ASSIGN(auto ans, arrow::ImportArray(&array, type)); return ans; @@ -655,7 +691,7 @@ TEST_F(ArrowBridgeArrayExportTest, arraySimple) { TEST_F(ArrowBridgeArrayExportTest, arrayCrossValidate) { auto vec = vectorMaker_.arrayVector({{1, 2, 3}, {4, 5}}); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::list(arrow::int64())); @@ -682,10 +718,10 @@ TEST_F(ArrowBridgeArrayExportTest, arrayDictionary) { ArrowSchema schema; ArrowArray data; - velox::exportToArrow(vec, schema); - velox::exportToArrow(vec, data, vec->pool()); + velox::exportToArrow(vec, schema, options_); + velox::exportToArrow(vec, data, vec->pool(), options_); - auto result = importFromArrowAsViewer(schema, data, vec->pool()); + auto result = importFromArrowAsViewer(schema, data, options_, vec->pool()); test::assertEqualVectors(result, vec); schema.release(&schema); data.release(&data); @@ -699,7 +735,7 @@ TEST_F(ArrowBridgeArrayExportTest, arrayGap) { auto sizes = makeBuffer({2, 2}); auto vec = std::make_shared( pool_.get(), ARRAY(BIGINT()), nullptr, 2, offsets, sizes, elements); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::list(arrow::int64())); @@ -724,7 +760,7 @@ TEST_F(ArrowBridgeArrayExportTest, arrayReorder) { auto sizes = makeBuffer({2, 2}); auto vec = std::make_shared( pool_.get(), ARRAY(BIGINT()), nullptr, 2, offsets, sizes, elements); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::list(arrow::int64())); @@ -754,7 +790,7 @@ TEST_F(ArrowBridgeArrayExportTest, arrayNested) { std::make_shared( pool_.get(), ARRAY(inner->type()), nullptr, 2, offsets, sizes, inner); }); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); ASSERT_EQ(*array->type(), *arrow::list(arrow::list(arrow::int64()))); auto& listArray = static_cast(*array); @@ -771,7 +807,7 @@ TEST_F(ArrowBridgeArrayExportTest, mapSimple) { auto allOnes = [](vector_size_t) { return 1; }; auto vec = vectorMaker_.mapVector(2, allOnes, allOnes, allOnes); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::map(arrow::int64(), arrow::int64())); @@ -806,7 +842,7 @@ TEST_F(ArrowBridgeArrayExportTest, mapNested) { std::make_shared( pool_.get(), type, nullptr, 2, offsets, sizes, keys, inner); }); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ( @@ -838,7 +874,7 @@ TEST_F(ArrowBridgeArrayExportTest, dictionarySimple) { allocateIndices(3, pool_.get()), 3, vectorMaker_.flatVector({1, 2, 3})); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); ASSERT_EQ(*array->type(), *arrow::dictionary(arrow::int32(), arrow::int64())); auto& dict = static_cast(*array); @@ -864,7 +900,7 @@ TEST_F(ArrowBridgeArrayExportTest, dictionaryNested) { std::make_shared( pool_.get(), ARRAY(inner->type()), nullptr, 2, offsets, sizes, inner); }); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); ASSERT_EQ( *array->type(), @@ -914,7 +950,7 @@ TEST_F(ArrowBridgeArrayExportTest, constantComplex) { TEST_F(ArrowBridgeArrayExportTest, constantCrossValidate) { auto vector = BaseVector::createConstant(VARCHAR(), "hello", 100, pool_.get()); - auto array = toArrow(vector, pool_.get()); + auto array = toArrow(vector, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); @@ -983,7 +1019,23 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { if constexpr (std::is_same_v) { bits::setBit(rawValues, i, *inputValues[i]); } else if constexpr (std::is_same_v) { - rawValues[i] = inputValues[i]->toNanos(); + switch (options_.timestampUnit) { + case TimestampUnit::kSecond: + rawValues[i] = inputValues[i]->getSeconds(); + break; + case TimestampUnit::kMilli: + rawValues[i] = inputValues[i]->toMillis(); + break; + case TimestampUnit::kMicro: + rawValues[i] = inputValues[i]->toMicros(); + break; + case TimestampUnit::kNano: + rawValues[i] = inputValues[i]->toNanos(); + break; + default: + VELOX_USER_FAIL(fmt::format( + "Timestamp unit not supported: {}.", options_.timestampUnit)); + } } else { rawValues[i] = *inputValues[i]; } @@ -1096,9 +1148,46 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { // Assert new vector contents. for (vector_size_t i = 0; i < convertedVector->size(); ++i) { - ASSERT_TRUE(expected->equalValueAt(convertedVector.get(), i, i)) - << "at " << i << ": " << expected->toString(i) << " vs. " - << convertedVector->toString(i); + if constexpr (std::is_same_v) { + if (expected->isNullAt(i)) { + ASSERT_TRUE(convertedVector->isNullAt(i)) + << "at " << i << ": " << expected->toString(i) << " vs. " + << convertedVector->toString(i); + } else { + auto convertedFlat = convertedVector->asFlatVector(); + switch (options_.timestampUnit) { + case TimestampUnit::kSecond: + EXPECT_EQ( + Timestamp(expected->valueAt(i).getSeconds(), 0), + convertedFlat->valueAt(i)) + << "mismatch at index " << i; + break; + case TimestampUnit::kMilli: + EXPECT_EQ( + Timestamp::fromMillis(expected->valueAt(i).toMillis()), + convertedFlat->valueAt(i)) + << "mismatch at index " << i; + break; + case TimestampUnit::kMicro: + EXPECT_EQ( + Timestamp::fromMicros(expected->valueAt(i).toMicros()), + convertedFlat->valueAt(i)) + << "mismatch at index " << i; + break; + case TimestampUnit::kNano: + EXPECT_EQ(expected->valueAt(i), convertedFlat->valueAt(i)) + << "mismatch at index " << i; + break; + default: + VELOX_USER_FAIL(fmt::format( + "Timestamp unit not supported: {}.", options_.timestampUnit)); + } + } + } else { + ASSERT_TRUE(expected->equalValueAt(convertedVector.get(), i, i)) + << "at " << i << ": " << expected->toString(i) << " vs. " + << convertedVector->toString(i); + } } } @@ -1131,8 +1220,15 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { testArrowImport("g", {-99.9, 4.3, 31.1, 129.11, -12}); testArrowImport("f", {-99.9, 4.3, 31.1, 129.11, -12}); - testArrowImport( - "ttn", {Timestamp(0, 0), std::nullopt, Timestamp(1699308257, 1234)}); + for (uint8_t unit : + {(uint8_t)TimestampUnit::kSecond, + (uint8_t)TimestampUnit::kMilli, + (uint8_t)TimestampUnit::kMicro, + (uint8_t)TimestampUnit::kNano}) { + options_.timestampUnit = static_cast(unit); + testArrowImport( + "ts", {Timestamp(0, 0), std::nullopt, Timestamp(1699308257, 1234)}); + } } void testImportWithoutNullsBuffer() { @@ -1319,6 +1415,8 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { ArrowArray data; velox::exportToArrow(vec, schema); velox::exportToArrow(vec, data, pool_.get()); + velox::exportToArrow(vec, schema, options_); + velox::exportToArrow(vec, data, pool_.get(), options_); ASSERT_OK_AND_ASSIGN(auto arrowType, arrow::ImportType(&schema)); ASSERT_OK_AND_ASSIGN(auto array2, arrow::ImportArray(&data, arrowType)); ASSERT_OK(array2->ValidateFull()); @@ -1557,6 +1655,7 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { EXPECT_NO_THROW(importFromArrow(arrowSchema, arrowArray, pool_.get())); } + ArrowOptions options_; std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; }; @@ -1571,7 +1670,7 @@ class ArrowBridgeArrayImportAsViewerTest : public ArrowBridgeArrayImportTest { ArrowArray& arrowArray, memory::MemoryPool* pool) override { return facebook::velox::importFromArrowAsViewer( - arrowSchema, arrowArray, pool); + arrowSchema, arrowArray, options_, pool); } }; @@ -1622,7 +1721,7 @@ class ArrowBridgeArrayImportAsOwnerTest ArrowArray& arrowArray, memory::MemoryPool* pool) override { return facebook::velox::importFromArrowAsOwner( - arrowSchema, arrowArray, pool); + arrowSchema, arrowArray, options_, pool); } }; @@ -1669,7 +1768,8 @@ TEST_F(ArrowBridgeArrayImportAsOwnerTest, inputsMarkedReleased) { ArrowSchema arrowSchema = makeArrowSchema("i"); ArrowArray arrowArray = makeArrowArray(buffers, 2, 4, 0); - auto _ = importFromArrowAsOwner(arrowSchema, arrowArray, pool_.get()); + auto _ = + importFromArrowAsOwner(arrowSchema, arrowArray, options_, pool_.get()); EXPECT_EQ(arrowSchema.release, nullptr); EXPECT_EQ(arrowArray.release, nullptr); @@ -1702,7 +1802,10 @@ TEST_F(ArrowBridgeArrayImportAsOwnerTest, releaseCalled) { // Create a Velox Vector from Arrow and then destruct it to trigger the // release callback calling - { auto _ = importFromArrowAsOwner(arrowSchema, arrowArray, pool_.get()); } + { + auto _ = + importFromArrowAsOwner(arrowSchema, arrowArray, options_, pool_.get()); + } EXPECT_TRUE(TestReleaseCalled::schemaReleaseCalled); EXPECT_TRUE(TestReleaseCalled::arrayReleaseCalled); diff --git a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp index a02e4cc08732..7b6788f129f2 100644 --- a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp +++ b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp @@ -120,7 +120,7 @@ class ArrowBridgeSchemaExportTest : public testing::Test { 3, // index to use for the constant BaseVector::create(type, 100, pool_.get())); - velox::exportToArrow(constantVector, arrowSchema); + velox::exportToArrow(constantVector, arrowSchema, options_); EXPECT_STREQ("+r", arrowSchema.format); EXPECT_EQ(nullptr, arrowSchema.name); @@ -155,7 +155,8 @@ class ArrowBridgeSchemaExportTest : public testing::Test { } void exportToArrow(const TypePtr& type, ArrowSchema& out) { - velox::exportToArrow(BaseVector::create(type, 0, pool_.get()), out); + velox::exportToArrow( + BaseVector::create(type, 0, pool_.get()), out, options_); } ArrowSchema makeArrowSchema(const char* format) { @@ -172,6 +173,7 @@ class ArrowBridgeSchemaExportTest : public testing::Test { }; } + ArrowOptions options_; std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; }; @@ -190,7 +192,15 @@ TEST_F(ArrowBridgeSchemaExportTest, scalar) { testScalarType(VARCHAR(), "u"); testScalarType(VARBINARY(), "z"); - testScalarType(TIMESTAMP(), "ttn"); + options_.timestampUnit = TimestampUnit::kSecond; + testScalarType(TIMESTAMP(), "tss:"); + options_.timestampUnit = TimestampUnit::kMilli; + testScalarType(TIMESTAMP(), "tsm:"); + options_.timestampUnit = TimestampUnit::kMicro; + testScalarType(TIMESTAMP(), "tsu:"); + options_.timestampUnit = TimestampUnit::kNano; + testScalarType(TIMESTAMP(), "tsn:"); + testScalarType(DATE(), "tdD"); testScalarType(DECIMAL(10, 4), "d:10,4"); @@ -360,7 +370,7 @@ TEST_F(ArrowBridgeSchemaImportTest, scalar) { EXPECT_EQ(*VARBINARY(), *testSchemaImport("Z")); // Temporal. - EXPECT_EQ(*TIMESTAMP(), *testSchemaImport("ttn")); + EXPECT_EQ(*TIMESTAMP(), *testSchemaImport("tsu:")); EXPECT_EQ(*DATE(), *testSchemaImport("tdD")); EXPECT_EQ(*DECIMAL(10, 4), *testSchemaImport("d:10,4")); @@ -373,7 +383,7 @@ TEST_F(ArrowBridgeSchemaImportTest, scalar) { TEST_F(ArrowBridgeSchemaImportTest, complexTypes) { // Array. EXPECT_EQ(*ARRAY(BIGINT()), *testSchemaImportComplex("+l", {"l"})); - EXPECT_EQ(*ARRAY(TIMESTAMP()), *testSchemaImportComplex("+l", {"ttn"})); + EXPECT_EQ(*ARRAY(TIMESTAMP()), *testSchemaImportComplex("+l", {"tsn:"})); EXPECT_EQ(*ARRAY(DATE()), *testSchemaImportComplex("+l", {"tdD"})); EXPECT_EQ(*ARRAY(VARCHAR()), *testSchemaImportComplex("+l", {"U"})); @@ -438,9 +448,11 @@ class ArrowBridgeSchemaTest : public testing::Test { } void exportToArrow(const TypePtr& type, ArrowSchema& out) { - velox::exportToArrow(BaseVector::create(type, 0, pool_.get()), out); + velox::exportToArrow( + BaseVector::create(type, 0, pool_.get()), out, options_); } + ArrowOptions options_; std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; }; @@ -517,7 +529,7 @@ TEST_F(ArrowBridgeSchemaImportTest, dictionaryTypeTest) { *testSchemaDictionaryImport( "i", makeComplexArrowSchema( - schemas, schemaPtrs, mapSchemas, mapSchemaPtrs, "+l", {"ttn"}))); + schemas, schemaPtrs, mapSchemas, mapSchemaPtrs, "+l", {"ts"}))); EXPECT_EQ( *ARRAY(DATE()), *testSchemaDictionaryImport(