From e1454c7d12cf4e7a0d6e44b9307e84c1ec7d9015 Mon Sep 17 00:00:00 2001 From: rui-mo Date: Fri, 17 Nov 2023 10:44:32 +0800 Subject: [PATCH] Support timestamp units in arrow bridge --- velox/connectors/hive/HiveConfig.cpp | 4 + velox/connectors/hive/HiveConfig.h | 8 + velox/connectors/hive/HiveDataSink.cpp | 2 + velox/dwio/common/Options.h | 1 + velox/dwio/parquet/writer/Writer.cpp | 12 +- velox/dwio/parquet/writer/Writer.h | 4 + velox/exec/tests/ArrowStreamTest.cpp | 5 +- velox/vector/arrow/Bridge.cpp | 261 ++++++++++++++---- velox/vector/arrow/Bridge.h | 10 +- .../arrow/tests/ArrowBridgeArrayTest.cpp | 186 ++++++++++--- .../arrow/tests/ArrowBridgeSchemaTest.cpp | 26 +- 11 files changed, 394 insertions(+), 125 deletions(-) diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index e6048f0d6aedb..3699c8fabba73 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -206,4 +206,8 @@ bool HiveConfig::s3UseProxyFromEnv() const { return config_->get(kS3UseProxyFromEnv, false); } +uint8_t HiveConfig::arrowBridgeTimestampUnit(const Config* session) const { + return session->get(kArrowBridgeTimestampUnit, 9 /* nano */); +} + } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/HiveConfig.h b/velox/connectors/hive/HiveConfig.h index e50b411570363..494e0ad05caba 100644 --- a/velox/connectors/hive/HiveConfig.h +++ b/velox/connectors/hive/HiveConfig.h @@ -178,6 +178,10 @@ class HiveConfig { static constexpr const char* kS3UseProxyFromEnv = "hive.s3.use-proxy-from-env"; + // Timestamp unit used during Velox-Arrow conversion. + static constexpr const char* kArrowBridgeTimestampUnit = + "arrow_bridge_timestamp_unit"; + InsertExistingPartitionsBehavior insertExistingPartitionsBehavior( const Config* session) const; @@ -247,6 +251,10 @@ class HiveConfig { bool s3UseProxyFromEnv() const; + /// Returns the timestamp unit used in Velox-Arrow conversion. + /// 0: second, 3: milli, 6: micro, 9: nano. + uint8_t arrowBridgeTimestampUnit(const Config* session) const; + HiveConfig(std::shared_ptr config) { VELOX_CHECK_NOT_NULL( config, "Config is null for HiveConfig initialization"); diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index d627119b9862d..b7fd053d5ffcc 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -675,6 +675,8 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) { hiveConfig_->orcWriterMaxStripeSize(connectorSessionProperties)); options.maxDictionaryMemory = std::optional( hiveConfig_->orcWriterMaxDictionaryMemory(connectorSessionProperties)); + options.arrowBridgeTimestampUnit = + hiveConfig_->arrowBridgeTimestampUnit(connectorSessionProperties); options.serdeParameters = std::map( insertTableHandle_->serdeParameters().begin(), insertTableHandle_->serdeParameters().end()); diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 92674f5a8fa20..200ef0efeb8c5 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -583,6 +583,7 @@ struct WriterOptions { std::optional maxStripeSize{std::nullopt}; std::optional maxDictionaryMemory{std::nullopt}; std::map serdeParameters; + std::optional arrowBridgeTimestampUnit; }; } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index b6479189251d0..52dda538d0d95 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -21,7 +21,6 @@ #include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/dwio/parquet/writer/arrow/Writer.h" #include "velox/exec/MemoryReclaimer.h" -#include "velox/vector/arrow/Bridge.h" namespace facebook::velox::parquet { @@ -234,6 +233,8 @@ Writer::Writer( } else { flushPolicy_ = std::make_unique(); } + options_.timestampUnit = + static_cast(options.arrowBridgeTimestampUnit); arrowContext_->properties = getArrowParquetWriterOptions(options, flushPolicy_); setMemoryReclaimers(); @@ -310,11 +311,10 @@ void Writer::write(const VectorPtr& data) { data->type()->equivalent(*schema_), "The file schema type should be equal with the input rowvector type."); - ArrowOptions options{.flattenDictionary = true, .flattenConstant = true}; ArrowArray array; ArrowSchema schema; - exportToArrow(data, array, generalPool_.get(), options); - exportToArrow(data, schema, options); + exportToArrow(data, array, generalPool_.get(), options_); + exportToArrow(data, schema, options_); // Convert the arrow schema to Schema and then update the column names based // on schema_. @@ -386,6 +386,10 @@ parquet::WriterOptions getParquetOptions( if (options.compressionKind.has_value()) { parquetOptions.compression = options.compressionKind.value(); } + if (options.arrowBridgeTimestampUnit.has_value()) { + parquetOptions.arrowBridgeTimestampUnit = + options.arrowBridgeTimestampUnit.value(); + } return parquetOptions; } diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index 6065366bce6b5..4384c2720268d 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -26,6 +26,7 @@ #include "velox/dwio/parquet/writer/arrow/Types.h" #include "velox/dwio/parquet/writer/arrow/util/Compression.h" #include "velox/vector/ComplexVector.h" +#include "velox/vector/arrow/Bridge.h" namespace facebook::velox::parquet { @@ -102,6 +103,7 @@ struct WriterOptions { std::shared_ptr codecOptions; std::unordered_map columnCompressionsMap; + uint8_t arrowBridgeTimestampUnit = static_cast(TimestampUnit::kNano); }; // Writes Velox vectors into a DataSink using Arrow Parquet writer. @@ -158,6 +160,8 @@ class Writer : public dwio::common::Writer { std::unique_ptr flushPolicy_; const RowTypePtr schema_; + + ArrowOptions options_{.flattenDictionary = true, .flattenConstant = true}; }; class ParquetWriterFactory : public dwio::common::WriterFactory { diff --git a/velox/exec/tests/ArrowStreamTest.cpp b/velox/exec/tests/ArrowStreamTest.cpp index f0fe9b37e04e0..1b450e7200a37 100644 --- a/velox/exec/tests/ArrowStreamTest.cpp +++ b/velox/exec/tests/ArrowStreamTest.cpp @@ -45,7 +45,7 @@ class ArrowStreamTest : public OperatorTestBase { int getNext(struct ArrowArray* outArray) { if (vectorIndex_ < vectors_.size()) { - exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get()); + exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get(), options_); vectorIndex_ += 1; } else { // End of stream. Mark the array released. @@ -56,12 +56,13 @@ class ArrowStreamTest : public OperatorTestBase { } int getArrowSchema(ArrowSchema& out) { - exportToArrow(BaseVector::create(type_, 0, pool_.get()), out); + exportToArrow(BaseVector::create(type_, 0, pool_.get()), out, options_); return failGetSchema_ ? (int)ErrorCode::kGetSchemaFailed : (int)ErrorCode::kNoError; } private: + ArrowOptions options_; const std::shared_ptr pool_; const std::vector& vectors_; const TypePtr type_; diff --git a/velox/vector/arrow/Bridge.cpp b/velox/vector/arrow/Bridge.cpp index dd563315131af..7128875ad326c 100644 --- a/velox/vector/arrow/Bridge.cpp +++ b/velox/vector/arrow/Bridge.cpp @@ -215,6 +215,7 @@ static void releaseArrowSchema(ArrowSchema* arrowSchema) { // Returns the Arrow C data interface format type for a given Velox type. const char* exportArrowFormatStr( const TypePtr& type, + const ArrowOptions& options, std::string& formatBuffer) { if (type->isDecimal()) { // Decimal types encode the precision, scale values. @@ -250,9 +251,19 @@ const char* exportArrowFormatStr( return "z"; // binary case TypeKind::UNKNOWN: return "n"; // NullType - case TypeKind::TIMESTAMP: - return "ttn"; // time64 [nanoseconds] + switch (options.timestampUnit) { + case TimestampUnit::kSecond: + return "tss:"; + case TimestampUnit::kMilli: + return "tsm:"; + case TimestampUnit::kMicro: + return "tsu:"; + case TimestampUnit::kNano: + return "tsn:"; + default: + VELOX_UNREACHABLE(); + } // Complex/nested types. case TypeKind::ARRAY: static_assert(sizeof(vector_size_t) == 4); @@ -335,6 +346,7 @@ void gatherFromBuffer( const Type& type, const Buffer& buf, const Selection& rows, + const ArrowOptions& options, Buffer& out) { auto src = buf.as(); auto dst = out.asMutable(); @@ -352,8 +364,23 @@ void gatherFromBuffer( } else if (type.isTimestamp()) { auto srcTs = buf.as(); auto dstTs = out.asMutable(); - - rows.apply([&](vector_size_t i) { dstTs[j++] = srcTs[i].toNanos(); }); + switch (options.timestampUnit) { + case TimestampUnit::kSecond: + rows.apply( + [&](vector_size_t i) { dstTs[j++] = srcTs[i].getSeconds(); }); + break; + case TimestampUnit::kMilli: + rows.apply([&](vector_size_t i) { dstTs[j++] = srcTs[i].toMillis(); }); + break; + case TimestampUnit::kMicro: + rows.apply([&](vector_size_t i) { dstTs[j++] = srcTs[i].toMicros(); }); + break; + case TimestampUnit::kNano: + rows.apply([&](vector_size_t i) { dstTs[j++] = srcTs[i].toNanos(); }); + break; + default: + VELOX_UNREACHABLE(); + } } else { auto typeSize = type.cppSizeInBytes(); rows.apply([&](vector_size_t i) { @@ -384,8 +411,8 @@ struct BufferViewReleaser { }; // Wraps a naked pointer using a Velox buffer view, without copying it. Adding a -// dummy releaser as the buffer lifetime is fully controled by the client of the -// API. +// dummy releaser as the buffer lifetime is fully controlled by the client of +// the API. BufferPtr wrapInBufferViewAsViewer(const void* buffer, size_t length) { static const BufferViewReleaser kViewerReleaser; return BufferView::create( @@ -472,11 +499,12 @@ VectorPtr createStringFlatVector( optionalNullCount(nullCount)); } -// This functions does two things: (a) sets the value of null_count, and (b) the -// validity buffer (if there is at least one null row). +// This functions does two things: (a) sets the value of null_count, and (b) +// the validity buffer (if there is at least one null row). void exportValidityBitmap( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -490,7 +518,7 @@ void exportValidityBitmap( // If we're only exporting a subset, create a new validity buffer. if (rows.changed()) { nulls = AlignedBuffer::allocate(out.length, pool); - gatherFromBuffer(*BOOLEAN(), *vec.nulls(), rows, *nulls); + gatherFromBuffer(*BOOLEAN(), *vec.nulls(), rows, options, *nulls); } // Set null counts. @@ -506,8 +534,8 @@ void exportValidityBitmap( } bool isFlatScalarZeroCopy(const TypePtr& type) { - // - Short decimals need to be converted to 128 bit values as they are mapped - // to Arrow Decimal128. + // - Short decimals need to be converted to 128 bit values as they are + // mapped to Arrow Decimal128. // - Velox's Timestamp representation (2x 64bit values) does not have an // equivalent in Arrow. return !type->isShortDecimal() && !type->isTimestamp(); @@ -528,6 +556,7 @@ size_t getArrowElementSize(const TypePtr& type) { void exportValues( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -545,7 +574,7 @@ void exportValues( ? AlignedBuffer::allocate(out.length, pool) : AlignedBuffer::allocate( checkedMultiply(out.length, size), pool); - gatherFromBuffer(*type, *vec.values(), rows, *values); + gatherFromBuffer(*type, *vec.values(), rows, options, *values); holder.setBuffer(1, values); } @@ -585,6 +614,7 @@ void exportStrings( void exportFlat( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -601,7 +631,7 @@ void exportFlat( case TypeKind::DOUBLE: case TypeKind::TIMESTAMP: case TypeKind::UNKNOWN: - exportValues(vec, rows, out, pool, holder); + exportValues(vec, rows, options, out, pool, holder); break; case TypeKind::VARCHAR: case TypeKind::VARBINARY: @@ -618,17 +648,17 @@ void exportFlat( void exportToArrowImpl( const BaseVector&, const Selection&, + const ArrowOptions& options, ArrowArray&, - memory::MemoryPool*, - const ArrowOptions& options); + memory::MemoryPool*); void exportRows( const RowVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder, - const ArrowOptions& options) { + VeloxToArrowBridgeHolder& holder) { out.n_buffers = 1; holder.resizeChildren(vec.childrenSize()); out.n_children = vec.childrenSize(); @@ -638,9 +668,9 @@ void exportRows( exportToArrowImpl( *vec.childAt(i)->loadedVector(), rows, + options, *holder.allocateChild(i), - pool, - options); + pool); } catch (const VeloxException&) { for (column_index_t j = 0; j < i; ++j) { // When exception is thrown, i th child is guaranteed unset. @@ -699,19 +729,19 @@ void exportOffsets( void exportArrays( const ArrayVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder, - const ArrowOptions& options) { + VeloxToArrowBridgeHolder& holder) { Selection childRows(vec.elements()->size()); exportOffsets(vec, rows, out, pool, holder, childRows); holder.resizeChildren(1); exportToArrowImpl( *vec.elements()->loadedVector(), childRows, + options, *holder.allocateChild(0), - pool, - options); + pool); out.n_children = 1; out.children = holder.getChildrenArrays(); } @@ -719,10 +749,10 @@ void exportArrays( void exportMaps( const MapVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder, - const ArrowOptions& options) { + VeloxToArrowBridgeHolder& holder) { RowVector child( pool, ROW({"key", "value"}, {vec.mapKeys()->type(), vec.mapValues()->type()}), @@ -732,7 +762,7 @@ void exportMaps( Selection childRows(child.size()); exportOffsets(vec, rows, out, pool, holder, childRows); holder.resizeChildren(1); - exportToArrowImpl(child, childRows, *holder.allocateChild(0), pool, options); + exportToArrowImpl(child, childRows, options, *holder.allocateChild(0), pool); out.n_children = 1; out.children = holder.getChildrenArrays(); } @@ -741,6 +771,7 @@ template void flattenAndExport( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -758,19 +789,20 @@ void flattenAndExport( flatVector->set(row, decoded.valueAt(row)); } }); - exportValidityBitmap(*flatVector, rows, out, pool, holder); - exportFlat(*flatVector, rows, out, pool, holder); + exportValidityBitmap(*flatVector, rows, options, out, pool, holder); + exportFlat(*flatVector, rows, options, out, pool, holder); } else { allRows.applyToSelected([&](vector_size_t row) { flatVector->set(row, decoded.valueAt(row)); }); - exportFlat(*flatVector, rows, out, pool, holder); + exportFlat(*flatVector, rows, options, out, pool, holder); } } void exportDictionary( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -778,7 +810,7 @@ void exportDictionary( out.n_children = 0; if (rows.changed()) { auto indices = AlignedBuffer::allocate(out.length, pool); - gatherFromBuffer(*INTEGER(), *vec.wrapInfo(), rows, *indices); + gatherFromBuffer(*INTEGER(), *vec.wrapInfo(), rows, options, *indices); holder.setBuffer(1, indices); } else { holder.setBuffer(1, vec.wrapInfo()); @@ -786,12 +818,13 @@ void exportDictionary( auto& values = *vec.valueVector()->loadedVector(); out.dictionary = holder.allocateDictionary(); exportToArrowImpl( - values, Selection(values.size()), *out.dictionary, pool, ArrowOptions()); + values, Selection(values.size()), options, *out.dictionary, pool); } void exportFlattenedVector( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -800,11 +833,12 @@ void exportFlattenedVector( "An unsupported nested encoding was found."); VELOX_CHECK(vec.isScalar(), "Flattening is only supported for scalar types."); VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( - flattenAndExport, vec.typeKind(), vec, rows, out, pool, holder); + flattenAndExport, vec.typeKind(), vec, rows, options, out, pool, holder); } void exportConstantValue( const BaseVector& vec, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool) { VectorPtr valuesVector; @@ -834,7 +868,7 @@ void exportConstantValue( wrapInBufferViewAsViewer(vec.valuesAsVoid(), bufferSize), vec.mayHaveNulls() ? 1 : 0); } - exportToArrowImpl(*valuesVector, selection, out, pool, ArrowOptions()); + exportToArrowImpl(*valuesVector, selection, options, out, pool); } // Velox constant vectors are exported as Arrow REE containing a single run @@ -842,6 +876,7 @@ void exportConstantValue( void exportConstant( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -853,7 +888,7 @@ void exportConstant( out.n_children = 2; holder.resizeChildren(2); out.children = holder.getChildrenArrays(); - exportConstantValue(vec, *holder.allocateChild(1), pool); + exportConstantValue(vec, options, *holder.allocateChild(1), pool); // Create the run ends child. auto* runEnds = holder.allocateChild(0); @@ -880,41 +915,41 @@ void exportConstant( void exportToArrowImpl( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, - memory::MemoryPool* pool, - const ArrowOptions& options) { + memory::MemoryPool* pool) { auto holder = std::make_unique(); out.buffers = holder->getArrowBuffers(); out.length = rows.count(); out.offset = 0; out.dictionary = nullptr; - exportValidityBitmap(vec, rows, out, pool, *holder); + exportValidityBitmap(vec, rows, options, out, pool, *holder); switch (vec.encoding()) { case VectorEncoding::Simple::FLAT: - exportFlat(vec, rows, out, pool, *holder); + exportFlat(vec, rows, options, out, pool, *holder); break; case VectorEncoding::Simple::ROW: exportRows( - *vec.asUnchecked(), rows, out, pool, *holder, options); + *vec.asUnchecked(), rows, options, out, pool, *holder); break; case VectorEncoding::Simple::ARRAY: exportArrays( - *vec.asUnchecked(), rows, out, pool, *holder, options); + *vec.asUnchecked(), rows, options, out, pool, *holder); break; case VectorEncoding::Simple::MAP: exportMaps( - *vec.asUnchecked(), rows, out, pool, *holder, options); + *vec.asUnchecked(), rows, options, out, pool, *holder); break; case VectorEncoding::Simple::DICTIONARY: options.flattenDictionary - ? exportFlattenedVector(vec, rows, out, pool, *holder) - : exportDictionary(vec, rows, out, pool, *holder); + ? exportFlattenedVector(vec, rows, options, out, pool, *holder) + : exportDictionary(vec, rows, options, out, pool, *holder); break; case VectorEncoding::Simple::CONSTANT: options.flattenConstant - ? exportFlattenedVector(vec, rows, out, pool, *holder) - : exportConstant(vec, rows, out, pool, *holder); + ? exportFlattenedVector(vec, rows, options, out, pool, *holder) + : exportConstant(vec, rows, options, out, pool, *holder); break; default: VELOX_NYI("{} cannot be exported to Arrow yet.", vec.encoding()); @@ -957,8 +992,7 @@ TypePtr importFromArrowImpl( return VARBINARY(); case 't': // temporal types. - // Mapping it to ttn for now. - if (format[1] == 't' && format[2] == 'n') { + if (format[1] == 's') { return TIMESTAMP(); } if (format[1] == 'd' && format[2] == 'D') { @@ -1050,7 +1084,7 @@ void exportToArrow( memory::MemoryPool* pool, const ArrowOptions& options) { exportToArrowImpl( - *vector, Selection(vector->size()), arrowArray, pool, options); + *vector, Selection(vector->size()), options, arrowArray, pool); } void exportToArrow( @@ -1077,12 +1111,12 @@ void exportToArrow( // Dictionary data is flattened. Set the underlying data types. arrowSchema.dictionary = nullptr; arrowSchema.format = - exportArrowFormatStr(type, bridgeHolder->formatBuffer); + exportArrowFormatStr(type, options, bridgeHolder->formatBuffer); } else { arrowSchema.format = "i"; bridgeHolder->dictionary = std::make_unique(); arrowSchema.dictionary = bridgeHolder->dictionary.get(); - exportToArrow(vec->valueVector(), *arrowSchema.dictionary); + exportToArrow(vec->valueVector(), *arrowSchema.dictionary, options); } } else if ( vec->encoding() == VectorEncoding::Simple::CONSTANT && @@ -1101,7 +1135,7 @@ void exportToArrow( exportToArrow(valueVector, *valuesChild, options); } else { valuesChild->format = - exportArrowFormatStr(type, bridgeHolder->formatBuffer); + exportArrowFormatStr(type, options, bridgeHolder->formatBuffer); } valuesChild->name = "values"; @@ -1109,7 +1143,8 @@ void exportToArrow( 0, newArrowSchema("i", "run_ends"), arrowSchema); bridgeHolder->setChildAtIndex(1, std::move(valuesChild), arrowSchema); } else { - arrowSchema.format = exportArrowFormatStr(type, bridgeHolder->formatBuffer); + arrowSchema.format = + exportArrowFormatStr(type, options, bridgeHolder->formatBuffer); arrowSchema.dictionary = nullptr; if (type->kind() == TypeKind::MAP) { @@ -1203,6 +1238,33 @@ TypePtr importFromArrow(const ArrowSchema& arrowSchema) { namespace { +TimestampUnit getTimestampUnit(const ArrowSchema& arrowSchema) { + const char* format = arrowSchema.dictionary ? arrowSchema.dictionary->format + : arrowSchema.format; + VELOX_USER_CHECK_NOT_NULL(format); + VELOX_USER_CHECK_GE( + strlen(format), + 2, + "The arrow format string of timestamp type should contain 'ts'."); + VELOX_USER_CHECK_EQ(format[0], 't', "The first character should be 't'."); + VELOX_USER_CHECK_EQ(format[1], 's', "The second character should be 's'."); + if (strlen(format) > 2) { + switch (format[2]) { + case 's': + return TimestampUnit::kSecond; + case 'm': + return TimestampUnit::kMilli; + case 'u': + return TimestampUnit::kMicro; + case 'n': + return TimestampUnit::kNano; + default: + VELOX_UNREACHABLE(); + } + } + return TimestampUnit::kNano; +} + VectorPtr importFromArrowImpl( ArrowSchema& arrowSchema, ArrowArray& arrowArray, @@ -1387,23 +1449,100 @@ VectorPtr createVectorFromReeArray( } } +void setTimestamps( + const int64_t* input, + int64_t length, + TimestampUnit unit, + Timestamp* rawTimestamps) { + switch (unit) { + case TimestampUnit::kSecond: { + for (int64_t i = 0; i < length; ++i) { + rawTimestamps[i] = Timestamp(input[i], 0); + } + break; + } + case TimestampUnit::kMilli: { + for (int64_t i = 0; i < length; ++i) { + rawTimestamps[i] = Timestamp::fromMillis(input[i]); + } + break; + } + case TimestampUnit::kMicro: { + for (int64_t i = 0; i < length; ++i) { + rawTimestamps[i] = Timestamp::fromMicros(input[i]); + } + break; + } + case TimestampUnit::kNano: { + for (int64_t i = 0; i < length; ++i) { + rawTimestamps[i] = Timestamp::fromNanos(input[i]); + } + break; + } + default: + VELOX_UNREACHABLE(); + } +} + +void setTimestamps( + const int64_t* input, + BufferPtr nulls, + int64_t length, + TimestampUnit unit, + Timestamp* rawTimestamps) { + const auto* rawNulls = nulls->as(); + switch (unit) { + case TimestampUnit::kSecond: { + for (size_t i = 0; i < length; ++i) { + if (rawNulls && !bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp(input[i], 0); + } + } + break; + } + case TimestampUnit::kMilli: { + for (size_t i = 0; i < length; ++i) { + if (rawNulls && !bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp::fromMillis(input[i]); + } + } + break; + } + case TimestampUnit::kMicro: { + for (size_t i = 0; i < length; ++i) { + if (rawNulls && !bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp::fromMicros(input[i]); + } + } + break; + } + case TimestampUnit::kNano: { + for (size_t i = 0; i < length; ++i) { + if (rawNulls && !bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp::fromNanos(input[i]); + } + } + break; + } + default: + VELOX_UNREACHABLE(); + } +} + VectorPtr createTimestampVector( memory::MemoryPool* pool, const TypePtr& type, + TimestampUnit unit, BufferPtr nulls, const int64_t* input, size_t length, int64_t nullCount) { BufferPtr timestamps = AlignedBuffer::allocate(length, pool); auto* rawTimestamps = timestamps->asMutable(); - const auto* rawNulls = nulls->as(); - - if (length > nullCount) { - for (size_t i = 0; i < length; ++i) { - if (!bits::isBitNull(rawNulls, i)) { - rawTimestamps[i] = Timestamp::fromNanos(input[i]); - } - } + if (nulls == nullptr) { + setTimestamps(input, length, unit, rawTimestamps); + } else if (length > nullCount) { + setTimestamps(input, nulls, length, unit, rawTimestamps); } return std::make_shared>( pool, @@ -1491,6 +1630,7 @@ VectorPtr importFromArrowImpl( return createTimestampVector( pool, type, + getTimestampUnit(arrowSchema), nulls, static_cast(arrowArray.buffers[1]), arrowArray.length, @@ -1584,7 +1724,6 @@ VectorPtr importFromArrowImpl( return imported; } - } // namespace VectorPtr importFromArrowAsViewer( diff --git a/velox/vector/arrow/Bridge.h b/velox/vector/arrow/Bridge.h index ba30068f8957c..d03f1d412a675 100644 --- a/velox/vector/arrow/Bridge.h +++ b/velox/vector/arrow/Bridge.h @@ -25,9 +25,12 @@ struct ArrowArray; struct ArrowSchema; +enum class TimestampUnit { kSecond = 0, kMilli = 3, kMicro = 6, kNano = 9 }; + struct ArrowOptions { bool flattenDictionary{false}; bool flattenConstant{false}; + TimestampUnit timestampUnit = TimestampUnit::kNano; }; namespace facebook::velox { @@ -63,7 +66,7 @@ void exportToArrow( const VectorPtr& vector, ArrowArray& arrowArray, memory::MemoryPool* pool, - const ArrowOptions& options = ArrowOptions{}); + const ArrowOptions& options); /// Export the type of a Velox vector to an ArrowSchema. /// @@ -84,10 +87,7 @@ void exportToArrow( /// /// NOTE: Since Arrow couples type and encoding, we need both Velox type and /// actual data (containing encoding) to create an ArrowSchema. -void exportToArrow( - const VectorPtr&, - ArrowSchema&, - const ArrowOptions& = ArrowOptions{}); +void exportToArrow(const VectorPtr&, ArrowSchema&, const ArrowOptions&); /// Import an ArrowSchema into a Velox Type object. /// diff --git a/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp b/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp index afde498c82c2d..259ce8604cbb7 100644 --- a/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp +++ b/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp @@ -21,6 +21,7 @@ #include #include "velox/common/base/Nulls.h" +#include "velox/common/base/tests/GTestUtils.h" #include "velox/core/QueryCtx.h" #include "velox/vector/arrow/Bridge.h" #include "velox/vector/tests/utils/VectorMaker.h" @@ -54,7 +55,7 @@ class ArrowBridgeArrayExportTest : public testing::Test { const TypePtr& type = CppToType::create()) { auto flatVector = vectorMaker_.flatVectorNullable(inputData, type); ArrowArray arrowArray; - velox::exportToArrow(flatVector, arrowArray, pool_.get()); + velox::exportToArrow(flatVector, arrowArray, pool_.get(), options_); validateArray(inputData, arrowArray); @@ -67,7 +68,7 @@ class ArrowBridgeArrayExportTest : public testing::Test { void testArrayVector(const T& inputData) { auto arrayVector = vectorMaker_.arrayVectorNullable(inputData); ArrowArray arrowArray; - velox::exportToArrow(arrayVector, arrowArray, pool_.get()); + velox::exportToArrow(arrayVector, arrowArray, pool_.get(), options_); validateListArray(inputData, arrowArray); @@ -93,7 +94,7 @@ class ArrowBridgeArrayExportTest : public testing::Test { const VectorPtr& constantVector, const TInput& input) { ArrowArray arrowArray; - velox::exportToArrow(constantVector, arrowArray, pool_.get()); + velox::exportToArrow(constantVector, arrowArray, pool_.get(), options_); validateConstant( input, constantVector->size(), @@ -161,8 +162,34 @@ class ArrowBridgeArrayExportTest : public testing::Test { bits::isBitSet(reinterpret_cast(values), i)) << "mismatch at index " << i; } else if constexpr (std::is_same_v) { - EXPECT_EQ(inputData[i], Timestamp::fromNanos(values[i])) - << "mismatch at index " << i; + switch (options_.timestampUnit) { + case TimestampUnit::kSecond: + EXPECT_EQ( + Timestamp(inputData[i].value().getSeconds(), 0), + Timestamp(values[i], 0)) + << "mismatch at index " << i; + break; + case TimestampUnit::kMilli: + EXPECT_EQ( + Timestamp::fromMillis(inputData[i].value().toMillis()), + Timestamp::fromMillis(values[i])) + << "mismatch at index " << i; + break; + case TimestampUnit::kMicro: + EXPECT_EQ( + Timestamp::fromMicros(inputData[i].value().toMicros()), + Timestamp::fromMicros(values[i])) + << "mismatch at index " << i; + break; + case TimestampUnit::kNano: + EXPECT_EQ(inputData[i], Timestamp::fromNanos(values[i])) + << "mismatch at index " << i; + break; + default: + VELOX_USER_FAIL(fmt::format( + "Timestamp unit not supported: {}.", + (int8_t)options_.timestampUnit)); + } } else { EXPECT_EQ(inputData[i], values[i]) << "mismatch at index " << i; } @@ -353,10 +380,12 @@ class ArrowBridgeArrayExportTest : public testing::Test { } void exportToArrow(const TypePtr& type, ArrowSchema& out) { - velox::exportToArrow(BaseVector::create(type, 0, pool_.get()), out); + velox::exportToArrow( + BaseVector::create(type, 0, pool_.get()), out, options_); } // Boiler plate structures required by vectorMaker. + ArrowOptions options_; std::shared_ptr queryCtx_{std::make_shared()}; std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; @@ -371,7 +400,7 @@ TEST_F(ArrowBridgeArrayExportTest, flatNotNull) { // Make sure that ArrowArray is correctly acquiring ownership, even after // the initial vector shared_ptr is gone. auto flatVector = vectorMaker_.flatVector(inputData); - velox::exportToArrow(flatVector, arrowArray, pool_.get()); + velox::exportToArrow(flatVector, arrowArray, pool_.get(), options_); } EXPECT_EQ(inputData.size(), arrowArray.length); @@ -494,16 +523,23 @@ TEST_F(ArrowBridgeArrayExportTest, flatDate) { } TEST_F(ArrowBridgeArrayExportTest, flatTimestamp) { - testFlatVector( - { - Timestamp(0, 0), - std::nullopt, - Timestamp(1699300965, 12'349), - Timestamp(-2208960000, 0), // 1900-01-01 - Timestamp(3155788800, 999'999'999), - std::nullopt, - }, - TIMESTAMP()); + for (TimestampUnit unit : + {TimestampUnit::kSecond, + TimestampUnit::kMilli, + TimestampUnit::kMicro, + TimestampUnit::kNano}) { + options_.timestampUnit = unit; + testFlatVector( + { + Timestamp(0, 0), + std::nullopt, + Timestamp(1699300965, 12'349), + Timestamp(-2208960000, 0), // 1900-01-01 + Timestamp(3155788800, 999'999'999), + std::nullopt, + }, + TIMESTAMP()); + } // Out of range. If nanosecond precision is represented in Arrow, timestamps // starting around 2263-01-01 should overflow and throw a user exception. @@ -542,7 +578,7 @@ TEST_F(ArrowBridgeArrayExportTest, rowVector) { }); ArrowArray arrowArray; - velox::exportToArrow(vector, arrowArray, pool_.get()); + velox::exportToArrow(vector, arrowArray, pool_.get(), options_); EXPECT_EQ(col1.size(), arrowArray.length); EXPECT_EQ(0, arrowArray.null_count); @@ -579,7 +615,7 @@ TEST_F(ArrowBridgeArrayExportTest, rowVectorNullable) { vector->setNullCount(3); ArrowArray arrowArray; - velox::exportToArrow(vector, arrowArray, pool_.get()); + velox::exportToArrow(vector, arrowArray, pool_.get(), options_); EXPECT_EQ(col1.size(), arrowArray.length); EXPECT_EQ(3, arrowArray.null_count); @@ -607,7 +643,8 @@ TEST_F(ArrowBridgeArrayExportTest, rowVectorNullable) { TEST_F(ArrowBridgeArrayExportTest, rowVectorEmpty) { ArrowArray arrowArray; - velox::exportToArrow(vectorMaker_.rowVector({}), arrowArray, pool_.get()); + velox::exportToArrow( + vectorMaker_.rowVector({}), arrowArray, pool_.get(), options_); EXPECT_EQ(0, arrowArray.n_children); EXPECT_EQ(1, arrowArray.n_buffers); EXPECT_EQ(nullptr, arrowArray.children); @@ -617,11 +654,12 @@ TEST_F(ArrowBridgeArrayExportTest, rowVectorEmpty) { std::shared_ptr toArrow( const VectorPtr& vec, + const ArrowOptions& options, memory::MemoryPool* pool) { ArrowSchema schema; ArrowArray array; - exportToArrow(vec, schema); - exportToArrow(vec, array, pool); + exportToArrow(vec, schema, options); + exportToArrow(vec, array, pool, options); EXPECT_OK_AND_ASSIGN(auto type, arrow::ImportType(&schema)); EXPECT_OK_AND_ASSIGN(auto ans, arrow::ImportArray(&array, type)); return ans; @@ -655,7 +693,7 @@ TEST_F(ArrowBridgeArrayExportTest, arraySimple) { TEST_F(ArrowBridgeArrayExportTest, arrayCrossValidate) { auto vec = vectorMaker_.arrayVector({{1, 2, 3}, {4, 5}}); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::list(arrow::int64())); @@ -682,8 +720,8 @@ TEST_F(ArrowBridgeArrayExportTest, arrayDictionary) { ArrowSchema schema; ArrowArray data; - velox::exportToArrow(vec, schema); - velox::exportToArrow(vec, data, vec->pool()); + velox::exportToArrow(vec, schema, options_); + velox::exportToArrow(vec, data, vec->pool(), options_); auto result = importFromArrowAsViewer(schema, data, vec->pool()); test::assertEqualVectors(result, vec); @@ -699,7 +737,7 @@ TEST_F(ArrowBridgeArrayExportTest, arrayGap) { auto sizes = makeBuffer({2, 2}); auto vec = std::make_shared( pool_.get(), ARRAY(BIGINT()), nullptr, 2, offsets, sizes, elements); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::list(arrow::int64())); @@ -724,7 +762,7 @@ TEST_F(ArrowBridgeArrayExportTest, arrayReorder) { auto sizes = makeBuffer({2, 2}); auto vec = std::make_shared( pool_.get(), ARRAY(BIGINT()), nullptr, 2, offsets, sizes, elements); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::list(arrow::int64())); @@ -754,7 +792,7 @@ TEST_F(ArrowBridgeArrayExportTest, arrayNested) { std::make_shared( pool_.get(), ARRAY(inner->type()), nullptr, 2, offsets, sizes, inner); }); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); ASSERT_EQ(*array->type(), *arrow::list(arrow::list(arrow::int64()))); auto& listArray = static_cast(*array); @@ -771,7 +809,7 @@ TEST_F(ArrowBridgeArrayExportTest, mapSimple) { auto allOnes = [](vector_size_t) { return 1; }; auto vec = vectorMaker_.mapVector(2, allOnes, allOnes, allOnes); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::map(arrow::int64(), arrow::int64())); @@ -806,7 +844,7 @@ TEST_F(ArrowBridgeArrayExportTest, mapNested) { std::make_shared( pool_.get(), type, nullptr, 2, offsets, sizes, keys, inner); }); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ( @@ -838,7 +876,7 @@ TEST_F(ArrowBridgeArrayExportTest, dictionarySimple) { allocateIndices(3, pool_.get()), 3, vectorMaker_.flatVector({1, 2, 3})); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); ASSERT_EQ(*array->type(), *arrow::dictionary(arrow::int32(), arrow::int64())); auto& dict = static_cast(*array); @@ -864,7 +902,7 @@ TEST_F(ArrowBridgeArrayExportTest, dictionaryNested) { std::make_shared( pool_.get(), ARRAY(inner->type()), nullptr, 2, offsets, sizes, inner); }); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); ASSERT_EQ( *array->type(), @@ -914,7 +952,7 @@ TEST_F(ArrowBridgeArrayExportTest, constantComplex) { TEST_F(ArrowBridgeArrayExportTest, constantCrossValidate) { auto vector = BaseVector::createConstant(VARCHAR(), "hello", 100, pool_.get()); - auto array = toArrow(vector, pool_.get()); + auto array = toArrow(vector, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); @@ -963,7 +1001,8 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { template ArrowArray fillArrowArray( const std::vector>& inputValues, - ArrowContextHolder& holder) { + ArrowContextHolder& holder, + const char* format = nullptr) { using TArrow = typename VeloxToArrowType::type; int64_t length = inputValues.size(); int64_t nullCount = 0; @@ -983,7 +1022,21 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { if constexpr (std::is_same_v) { bits::setBit(rawValues, i, *inputValues[i]); } else if constexpr (std::is_same_v) { - rawValues[i] = inputValues[i]->toNanos(); + VELOX_USER_CHECK_NOT_NULL( + format, + "Format needs to be provided for timestamp result comparison."); + const auto formatStr = std::string(format); + if (formatStr == "tss:") { + rawValues[i] = inputValues[i]->getSeconds(); + } else if (formatStr == "tsm:") { + rawValues[i] = inputValues[i]->toMillis(); + } else if (formatStr == "tsu:") { + rawValues[i] = inputValues[i]->toMicros(); + } else if (formatStr == "tsn:") { + rawValues[i] = inputValues[i]->toNanos(); + } else { + VELOX_UNREACHABLE(); + } } else { rawValues[i] = *inputValues[i]; } @@ -997,7 +1050,8 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { ArrowArray fillArrowArray( const std::vector>& inputValues, - ArrowContextHolder& holder) { + ArrowContextHolder& holder, + const char* format = nullptr) { int64_t length = inputValues.size(); int64_t nullCount = 0; @@ -1050,11 +1104,11 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { const char* format, const std::vector>& inputValues) { ArrowContextHolder holder; - auto arrowArray = fillArrowArray(inputValues, holder); + auto arrowArray = fillArrowArray(inputValues, holder, format); auto arrowSchema = makeArrowSchema(format); auto output = importFromArrow(arrowSchema, arrowArray, pool_.get()); - assertVectorContent(inputValues, output, arrowArray.null_count); + assertVectorContent(inputValues, output, arrowArray.null_count, format); // Buffer views are not reusable. Strings might need to create an additional // buffer, depending on the string sizes, in which case the buffers could be @@ -1087,7 +1141,8 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { void assertVectorContent( const std::vector>& inputValues, const VectorPtr& convertedVector, - size_t nullCount) { + size_t nullCount, + const char* format = nullptr) { EXPECT_EQ((nullCount > 0), convertedVector->mayHaveNulls()); EXPECT_EQ(nullCount, *convertedVector->getNullCount()); EXPECT_EQ(inputValues.size(), convertedVector->size()); @@ -1096,9 +1151,44 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { // Assert new vector contents. for (vector_size_t i = 0; i < convertedVector->size(); ++i) { - ASSERT_TRUE(expected->equalValueAt(convertedVector.get(), i, i)) - << "at " << i << ": " << expected->toString(i) << " vs. " - << convertedVector->toString(i); + if constexpr (std::is_same_v) { + if (expected->isNullAt(i)) { + ASSERT_TRUE(convertedVector->isNullAt(i)) + << "at " << i << ": " << expected->toString(i) << " vs. " + << convertedVector->toString(i); + } else { + auto convertedFlat = convertedVector->asFlatVector(); + VELOX_USER_CHECK_NOT_NULL( + format, + "Format needs to be provided for timestamp result comparison."); + const auto formatStr = std::string(format); + if (formatStr == "tss:") { + EXPECT_EQ( + Timestamp(expected->valueAt(i).getSeconds(), 0), + convertedFlat->valueAt(i)) + << "mismatch at index " << i; + } else if (formatStr == "tsm:") { + EXPECT_EQ( + Timestamp::fromMillis(expected->valueAt(i).toMillis()), + convertedFlat->valueAt(i)) + << "mismatch at index " << i; + } else if (formatStr == "tsu:") { + EXPECT_EQ( + Timestamp::fromMicros(expected->valueAt(i).toMicros()), + convertedFlat->valueAt(i)) + << "mismatch at index " << i; + } else if (formatStr == "tsn:") { + EXPECT_EQ(expected->valueAt(i), convertedFlat->valueAt(i)) + << "mismatch at index " << i; + } else { + VELOX_UNREACHABLE(); + } + } + } else { + ASSERT_TRUE(expected->equalValueAt(convertedVector.get(), i, i)) + << "at " << i << ": " << expected->toString(i) << " vs. " + << convertedVector->toString(i); + } } } @@ -1131,8 +1221,11 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { testArrowImport("g", {-99.9, 4.3, 31.1, 129.11, -12}); testArrowImport("f", {-99.9, 4.3, 31.1, 129.11, -12}); - testArrowImport( - "ttn", {Timestamp(0, 0), std::nullopt, Timestamp(1699308257, 1234)}); + for (const std::string& tsString : {"tss:", "tsm:", "tsu:", "tsn:"}) { + testArrowImport( + tsString.data(), + {Timestamp(0, 0), std::nullopt, Timestamp(1699308257, 1234)}); + } } void testImportWithoutNullsBuffer() { @@ -1317,8 +1410,8 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { ArrowSchema schema; ArrowArray data; - velox::exportToArrow(vec, schema); - velox::exportToArrow(vec, data, pool_.get()); + velox::exportToArrow(vec, schema, options_); + velox::exportToArrow(vec, data, pool_.get(), options_); ASSERT_OK_AND_ASSIGN(auto arrowType, arrow::ImportType(&schema)); ASSERT_OK_AND_ASSIGN(auto array2, arrow::ImportArray(&data, arrowType)); ASSERT_OK(array2->ValidateFull()); @@ -1557,6 +1650,7 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { EXPECT_NO_THROW(importFromArrow(arrowSchema, arrowArray, pool_.get())); } + ArrowOptions options_; std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; }; diff --git a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp index a02e4cc08732b..7b6788f129f23 100644 --- a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp +++ b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp @@ -120,7 +120,7 @@ class ArrowBridgeSchemaExportTest : public testing::Test { 3, // index to use for the constant BaseVector::create(type, 100, pool_.get())); - velox::exportToArrow(constantVector, arrowSchema); + velox::exportToArrow(constantVector, arrowSchema, options_); EXPECT_STREQ("+r", arrowSchema.format); EXPECT_EQ(nullptr, arrowSchema.name); @@ -155,7 +155,8 @@ class ArrowBridgeSchemaExportTest : public testing::Test { } void exportToArrow(const TypePtr& type, ArrowSchema& out) { - velox::exportToArrow(BaseVector::create(type, 0, pool_.get()), out); + velox::exportToArrow( + BaseVector::create(type, 0, pool_.get()), out, options_); } ArrowSchema makeArrowSchema(const char* format) { @@ -172,6 +173,7 @@ class ArrowBridgeSchemaExportTest : public testing::Test { }; } + ArrowOptions options_; std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; }; @@ -190,7 +192,15 @@ TEST_F(ArrowBridgeSchemaExportTest, scalar) { testScalarType(VARCHAR(), "u"); testScalarType(VARBINARY(), "z"); - testScalarType(TIMESTAMP(), "ttn"); + options_.timestampUnit = TimestampUnit::kSecond; + testScalarType(TIMESTAMP(), "tss:"); + options_.timestampUnit = TimestampUnit::kMilli; + testScalarType(TIMESTAMP(), "tsm:"); + options_.timestampUnit = TimestampUnit::kMicro; + testScalarType(TIMESTAMP(), "tsu:"); + options_.timestampUnit = TimestampUnit::kNano; + testScalarType(TIMESTAMP(), "tsn:"); + testScalarType(DATE(), "tdD"); testScalarType(DECIMAL(10, 4), "d:10,4"); @@ -360,7 +370,7 @@ TEST_F(ArrowBridgeSchemaImportTest, scalar) { EXPECT_EQ(*VARBINARY(), *testSchemaImport("Z")); // Temporal. - EXPECT_EQ(*TIMESTAMP(), *testSchemaImport("ttn")); + EXPECT_EQ(*TIMESTAMP(), *testSchemaImport("tsu:")); EXPECT_EQ(*DATE(), *testSchemaImport("tdD")); EXPECT_EQ(*DECIMAL(10, 4), *testSchemaImport("d:10,4")); @@ -373,7 +383,7 @@ TEST_F(ArrowBridgeSchemaImportTest, scalar) { TEST_F(ArrowBridgeSchemaImportTest, complexTypes) { // Array. EXPECT_EQ(*ARRAY(BIGINT()), *testSchemaImportComplex("+l", {"l"})); - EXPECT_EQ(*ARRAY(TIMESTAMP()), *testSchemaImportComplex("+l", {"ttn"})); + EXPECT_EQ(*ARRAY(TIMESTAMP()), *testSchemaImportComplex("+l", {"tsn:"})); EXPECT_EQ(*ARRAY(DATE()), *testSchemaImportComplex("+l", {"tdD"})); EXPECT_EQ(*ARRAY(VARCHAR()), *testSchemaImportComplex("+l", {"U"})); @@ -438,9 +448,11 @@ class ArrowBridgeSchemaTest : public testing::Test { } void exportToArrow(const TypePtr& type, ArrowSchema& out) { - velox::exportToArrow(BaseVector::create(type, 0, pool_.get()), out); + velox::exportToArrow( + BaseVector::create(type, 0, pool_.get()), out, options_); } + ArrowOptions options_; std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; }; @@ -517,7 +529,7 @@ TEST_F(ArrowBridgeSchemaImportTest, dictionaryTypeTest) { *testSchemaDictionaryImport( "i", makeComplexArrowSchema( - schemas, schemaPtrs, mapSchemas, mapSchemaPtrs, "+l", {"ttn"}))); + schemas, schemaPtrs, mapSchemas, mapSchemaPtrs, "+l", {"ts"}))); EXPECT_EQ( *ARRAY(DATE()), *testSchemaDictionaryImport(