diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index d99d1510ee1ac..ff965771464c2 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -181,6 +181,11 @@ uint64_t HiveConfig::sortWriterMaxOutputBytes(const Config* config) { return config->get(kSortWriterMaxOutputBytes, 10UL << 20); } +// static. +uint8_t HiveConfig::arrowBridgeTimestampUnit(const Config* config) { + return config->get(kArrowBridgeTimestampUnit, 3 /* nano */); +} + uint64_t HiveConfig::getOrcWriterMaxStripeSize( const Config* connectorQueryCtxConfig, const Config* connectorPropertiesConfig) { diff --git a/velox/connectors/hive/HiveConfig.h b/velox/connectors/hive/HiveConfig.h index 7e06c03b1de56..658334a2968d9 100644 --- a/velox/connectors/hive/HiveConfig.h +++ b/velox/connectors/hive/HiveConfig.h @@ -127,6 +127,10 @@ class HiveConfig { static constexpr const char* kSortWriterMaxOutputBytes = "sort_writer_max_output_bytes"; + // Timestamp unit used during Velox-Arrow conversion. + static constexpr const char* kArrowBridgeTimestampUnit = + "arrow_bridge_timestamp_unit"; + static InsertExistingPartitionsBehavior insertExistingPartitionsBehavior( const Config* config); @@ -176,6 +180,10 @@ class HiveConfig { static uint64_t sortWriterMaxOutputBytes(const Config* config); + /// Returns the timestamp unit used in Velox-Arrow conversion. + /// 0: second, 1: milli, 2: micro, 3: nano. + static uint8_t arrowBridgeTimestampUnit(const Config* config); + static uint64_t getOrcWriterMaxStripeSize( const Config* connectorQueryCtxConfig, const Config* connectorPropertiesConfig); diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 69f9ea79485b2..7023a7b93f223 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -607,6 +607,8 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) { options.maxDictionaryMemory = std::optional(HiveConfig::getOrcWriterMaxDictionaryMemory( connectorQueryCtx_->config(), connectorProperties_.get())); + options.arrowBridgeTimestampUnit = + HiveConfig::arrowBridgeTimestampUnit(connectorQueryCtx_->config()); ioStats_.emplace_back(std::make_shared()); // Prevents the memory allocation during the writer creation. diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 4edc69b607add..340fdd5412a39 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -338,6 +338,10 @@ class QueryConfig { static constexpr const char* kEnableExpressionEvaluationCache = "enable_expression_evaluation_cache"; + // Timestamp unit used during Velox-Arrow conversion. + static constexpr const char* kArrowBridgeTimestampUnit = + "arrow_bridge_timestamp_unit"; + uint64_t queryMaxMemoryPerNode() const { return toCapacity( get(kQueryMaxMemoryPerNode, "0B"), CapacityUnit::BYTE); @@ -549,6 +553,13 @@ class QueryConfig { return get(kSpillStartPartitionBit, kDefaultStartBit); } + /// Returns the timestamp unit used in Velox-Arrow conversion. + /// 0: second, 1: milli, 2: micro, 3: nano. + uint8_t arrowBridgeTimestampUnit() const { + constexpr uint8_t kDefaultUnit = 3; + return get(kArrowBridgeTimestampUnit, kDefaultUnit); + } + /// Returns the number of bits used to calculate the spilling partition /// number for hash join. The number of spilling partitions will be power of /// two. diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 45be0268bd184..6b4f31175e22c 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -555,6 +555,7 @@ struct WriterOptions { std::optional maxStripeSize{std::nullopt}; std::optional maxDictionaryMemory{std::nullopt}; std::map serdeParameters; + std::optional arrowBridgeTimestampUnit; }; } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index 54f875fce0eb8..84e417aecd878 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -14,8 +14,6 @@ * limitations under the License. */ -#include "velox/vector/arrow/Bridge.h" - #include #include #include @@ -155,6 +153,8 @@ Writer::Writer( } else { flushPolicy_ = std::make_unique(); } + options_.timestampUnit = + static_cast(options.arrowBridgeTimestampUnit); arrowContext_->properties = getArrowParquetWriterOptions(options, flushPolicy_); } @@ -226,8 +226,8 @@ dwio::common::StripeProgress getStripeProgress( void Writer::write(const VectorPtr& data) { ArrowArray array; ArrowSchema schema; - exportToArrow(data, array, generalPool_.get()); - exportToArrow(data, schema); + exportToArrow(data, options_, array, generalPool_.get()); + exportToArrow(data, options_, schema); PARQUET_ASSIGN_OR_THROW( auto recordBatch, ::arrow::ImportRecordBatch(&array, &schema)); if (!arrowContext_->schema) { @@ -287,6 +287,10 @@ parquet::WriterOptions getParquetOptions( if (options.compressionKind.has_value()) { parquetOptions.compression = options.compressionKind.value(); } + if (options.arrowBridgeTimestampUnit.has_value()) { + parquetOptions.arrowBridgeTimestampUnit = + options.arrowBridgeTimestampUnit.value(); + } return parquetOptions; } diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index e7d70b7a88f7e..03fef2a78c52c 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -25,6 +25,7 @@ #include "velox/dwio/common/WriterFactory.h" #include "velox/dwio/parquet/writer/arrow/util/Compression.h" #include "velox/vector/ComplexVector.h" +#include "velox/vector/arrow/Bridge.h" namespace facebook::velox::parquet { @@ -98,6 +99,7 @@ struct WriterOptions { // policy with the configs in its ctor. std::function()> flushPolicyFactory; std::shared_ptr codecOptions; + uint8_t arrowBridgeTimestampUnit = static_cast(TimestampUnit::kNano); }; // Writes Velox vectors into a DataSink using Arrow Parquet writer. @@ -146,6 +148,7 @@ class Writer : public dwio::common::Writer { std::shared_ptr arrowContext_; std::unique_ptr flushPolicy_; + BridgeOptions options_; }; class ParquetWriterFactory : public dwio::common::WriterFactory { diff --git a/velox/exec/ArrowStream.cpp b/velox/exec/ArrowStream.cpp index 863e43f8ba22f..d90734e842e40 100644 --- a/velox/exec/ArrowStream.cpp +++ b/velox/exec/ArrowStream.cpp @@ -27,6 +27,8 @@ ArrowStream::ArrowStream( operatorId, arrowStreamNode->id(), "ArrowStream") { + options_.timestampUnit = static_cast( + driverCtx->queryConfig().arrowBridgeTimestampUnit()); arrowStream_ = arrowStreamNode->arrowStream(); } @@ -66,7 +68,7 @@ RowVectorPtr ArrowStream::getOutput() { // Convert Arrow Array into RowVector and return. return std::dynamic_pointer_cast( - importFromArrowAsOwner(arrowSchema, arrowArray, pool())); + importFromArrowAsOwner(arrowSchema, arrowArray, options_, pool())); } bool ArrowStream::isFinished() { diff --git a/velox/exec/ArrowStream.h b/velox/exec/ArrowStream.h index c35894d0d2830..b8d1260f290ed 100644 --- a/velox/exec/ArrowStream.h +++ b/velox/exec/ArrowStream.h @@ -45,6 +45,7 @@ class ArrowStream : public SourceOperator { bool finished_ = false; std::shared_ptr arrowStream_; + BridgeOptions options_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/tests/ArrowStreamTest.cpp b/velox/exec/tests/ArrowStreamTest.cpp index f0fe9b37e04e0..e04a1ddfacf45 100644 --- a/velox/exec/tests/ArrowStreamTest.cpp +++ b/velox/exec/tests/ArrowStreamTest.cpp @@ -45,7 +45,7 @@ class ArrowStreamTest : public OperatorTestBase { int getNext(struct ArrowArray* outArray) { if (vectorIndex_ < vectors_.size()) { - exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get()); + exportToArrow(vectors_[vectorIndex_], options_, *outArray, pool_.get()); vectorIndex_ += 1; } else { // End of stream. Mark the array released. @@ -56,12 +56,13 @@ class ArrowStreamTest : public OperatorTestBase { } int getArrowSchema(ArrowSchema& out) { - exportToArrow(BaseVector::create(type_, 0, pool_.get()), out); + exportToArrow(BaseVector::create(type_, 0, pool_.get()), options_, out); return failGetSchema_ ? (int)ErrorCode::kGetSchemaFailed : (int)ErrorCode::kNoError; } private: + BridgeOptions options_; const std::shared_ptr pool_; const std::vector& vectors_; const TypePtr type_; diff --git a/velox/vector/arrow/Bridge.cpp b/velox/vector/arrow/Bridge.cpp index ed82878f2e5d4..a8c410591ec7c 100644 --- a/velox/vector/arrow/Bridge.cpp +++ b/velox/vector/arrow/Bridge.cpp @@ -213,6 +213,7 @@ static void releaseArrowSchema(ArrowSchema* arrowSchema) { // Returns the Arrow C data interface format type for a given Velox type. const char* exportArrowFormatStr( const TypePtr& type, + const BridgeOptions& options, std::string& formatBuffer) { if (type->isDecimal()) { // Decimal types encode the precision, scale values. @@ -246,9 +247,20 @@ const char* exportArrowFormatStr( return "u"; // utf-8 string case TypeKind::VARBINARY: return "z"; // binary - case TypeKind::TIMESTAMP: - return "ttn"; // time64 [nanoseconds] + switch (options.timestampUnit) { + case TimestampUnit::kSecond: + return "tss:"; + case TimestampUnit::kMilli: + return "tsm:"; + case TimestampUnit::kMicro: + return "tsu:"; + case TimestampUnit::kNano: + return "tsn:"; + default: + VELOX_USER_FAIL(fmt::format( + "Unsupported timestamp unit: {}.", options.timestampUnit)); + } // Complex/nested types. case TypeKind::ARRAY: static_assert(sizeof(vector_size_t) == 4); @@ -331,6 +343,7 @@ void gatherFromBuffer( const Type& type, const Buffer& buf, const Selection& rows, + const BridgeOptions& options, Buffer& out) { auto src = buf.as(); auto dst = out.asMutable(); @@ -348,8 +361,24 @@ void gatherFromBuffer( } else if (type.isTimestamp()) { auto srcTs = buf.as(); auto dstTs = out.asMutable(); - - rows.apply([&](vector_size_t i) { dstTs[j++] = srcTs[i].toNanos(); }); + switch (options.timestampUnit) { + case TimestampUnit::kSecond: + rows.apply( + [&](vector_size_t i) { dstTs[j++] = srcTs[i].getSeconds(); }); + break; + case TimestampUnit::kMilli: + rows.apply([&](vector_size_t i) { dstTs[j++] = srcTs[i].toMillis(); }); + break; + case TimestampUnit::kMicro: + rows.apply([&](vector_size_t i) { dstTs[j++] = srcTs[i].toMicros(); }); + break; + case TimestampUnit::kNano: + rows.apply([&](vector_size_t i) { dstTs[j++] = srcTs[i].toNanos(); }); + break; + default: + VELOX_USER_FAIL(fmt::format( + "Unsupported timestamp unit: {}.", options.timestampUnit)); + } } else { auto typeSize = type.cppSizeInBytes(); rows.apply([&](vector_size_t i) { @@ -379,9 +408,9 @@ struct BufferViewReleaser { const std::shared_ptr arrayReleaser_; }; -// Wraps a naked pointer using a Velox buffer view, without copying it. Adding a -// dummy releaser as the buffer lifetime is fully controled by the client of the -// API. +// Wraps a naked pointer using a Velox buffer view, without copying it. Adding +// a dummy releaser as the buffer lifetime is fully controled by the client of +// the API. BufferPtr wrapInBufferViewAsViewer(const void* buffer, size_t length) { static const BufferViewReleaser kViewerReleaser; return BufferView::create( @@ -468,11 +497,12 @@ VectorPtr createStringFlatVector( optionalNullCount(nullCount)); } -// This functions does two things: (a) sets the value of null_count, and (b) the -// validity buffer (if there is at least one null row). +// This functions does two things: (a) sets the value of null_count, and (b) +// the validity buffer (if there is at least one null row). void exportValidityBitmap( const BaseVector& vec, const Selection& rows, + const BridgeOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -486,7 +516,7 @@ void exportValidityBitmap( // If we're only exporting a subset, create a new validity buffer. if (rows.changed()) { nulls = AlignedBuffer::allocate(out.length, pool); - gatherFromBuffer(*BOOLEAN(), *vec.nulls(), rows, *nulls); + gatherFromBuffer(*BOOLEAN(), *vec.nulls(), rows, options, *nulls); } // Set null counts. @@ -502,8 +532,8 @@ void exportValidityBitmap( } bool isFlatScalarZeroCopy(const TypePtr& type) { - // - Short decimals need to be converted to 128 bit values as they are mapped - // to Arrow Decimal128. + // - Short decimals need to be converted to 128 bit values as they are + // mapped to Arrow Decimal128. // - Velox's Timestamp representation (2x 64bit values) does not have an // equivalent in Arrow. return !type->isShortDecimal() && !type->isTimestamp(); @@ -524,6 +554,7 @@ size_t getArrowElementSize(const TypePtr& type) { void exportValues( const BaseVector& vec, const Selection& rows, + const BridgeOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -541,7 +572,7 @@ void exportValues( ? AlignedBuffer::allocate(out.length, pool) : AlignedBuffer::allocate( checkedMultiply(out.length, size), pool); - gatherFromBuffer(*type, *vec.values(), rows, *values); + gatherFromBuffer(*type, *vec.values(), rows, options, *values); holder.setBuffer(1, values); } @@ -581,6 +612,7 @@ void exportStrings( void exportFlat( const BaseVector& vec, const Selection& rows, + const BridgeOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -596,7 +628,7 @@ void exportFlat( case TypeKind::REAL: case TypeKind::DOUBLE: case TypeKind::TIMESTAMP: - exportValues(vec, rows, out, pool, holder); + exportValues(vec, rows, options, out, pool, holder); break; case TypeKind::VARCHAR: case TypeKind::VARBINARY: @@ -613,12 +645,14 @@ void exportFlat( void exportToArrowImpl( const BaseVector&, const Selection&, + const BridgeOptions& options, ArrowArray&, memory::MemoryPool*); void exportRows( const RowVector& vec, const Selection& rows, + const BridgeOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -631,6 +665,7 @@ void exportRows( exportToArrowImpl( *vec.childAt(i)->loadedVector(), rows, + options, *holder.allocateChild(i), pool); } catch (const VeloxException&) { @@ -691,6 +726,7 @@ void exportOffsets( void exportArrays( const ArrayVector& vec, const Selection& rows, + const BridgeOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -700,6 +736,7 @@ void exportArrays( exportToArrowImpl( *vec.elements()->loadedVector(), childRows, + options, *holder.allocateChild(0), pool); out.n_children = 1; @@ -709,6 +746,7 @@ void exportArrays( void exportMaps( const MapVector& vec, const Selection& rows, + const BridgeOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -721,7 +759,7 @@ void exportMaps( Selection childRows(child.size()); exportOffsets(vec, rows, out, pool, holder, childRows); holder.resizeChildren(1); - exportToArrowImpl(child, childRows, *holder.allocateChild(0), pool); + exportToArrowImpl(child, childRows, options, *holder.allocateChild(0), pool); out.n_children = 1; out.children = holder.getChildrenArrays(); } @@ -729,6 +767,7 @@ void exportMaps( void exportDictionary( const BaseVector& vec, const Selection& rows, + const BridgeOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -736,14 +775,15 @@ void exportDictionary( out.n_children = 0; if (rows.changed()) { auto indices = AlignedBuffer::allocate(out.length, pool); - gatherFromBuffer(*INTEGER(), *vec.wrapInfo(), rows, *indices); + gatherFromBuffer(*INTEGER(), *vec.wrapInfo(), rows, options, *indices); holder.setBuffer(1, indices); } else { holder.setBuffer(1, vec.wrapInfo()); } auto& values = *vec.valueVector()->loadedVector(); out.dictionary = holder.allocateDictionary(); - exportToArrowImpl(values, Selection(values.size()), *out.dictionary, pool); + exportToArrowImpl( + values, Selection(values.size()), options, *out.dictionary, pool); } // Set the array as using "Null Layout" - no buffers are allocated. @@ -761,6 +801,7 @@ void setNullArray(ArrowArray& array, size_t length) { void exportConstantValue( const BaseVector& vec, + const BridgeOptions& options, ArrowArray& out, memory::MemoryPool* pool) { VectorPtr valuesVector; @@ -790,7 +831,7 @@ void exportConstantValue( wrapInBufferViewAsViewer(vec.valuesAsVoid(), bufferSize), vec.mayHaveNulls() ? 1 : 0); } - exportToArrowImpl(*valuesVector, selection, out, pool); + exportToArrowImpl(*valuesVector, selection, options, out, pool); } // Velox constant vectors are exported as Arrow REE containing a single run @@ -798,6 +839,7 @@ void exportConstantValue( void exportConstant( const BaseVector& vec, const Selection& rows, + const BridgeOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -809,7 +851,7 @@ void exportConstant( out.n_children = 2; holder.resizeChildren(2); out.children = holder.getChildrenArrays(); - exportConstantValue(vec, *holder.allocateChild(1), pool); + exportConstantValue(vec, options, *holder.allocateChild(1), pool); // Create the run ends child. auto* runEnds = holder.allocateChild(0); @@ -836,6 +878,7 @@ void exportConstant( void exportToArrowImpl( const BaseVector& vec, const Selection& rows, + const BridgeOptions& options, ArrowArray& out, memory::MemoryPool* pool) { auto holder = std::make_unique(); @@ -843,26 +886,29 @@ void exportToArrowImpl( out.length = rows.count(); out.offset = 0; out.dictionary = nullptr; - exportValidityBitmap(vec, rows, out, pool, *holder); + exportValidityBitmap(vec, rows, options, out, pool, *holder); switch (vec.encoding()) { case VectorEncoding::Simple::FLAT: - exportFlat(vec, rows, out, pool, *holder); + exportFlat(vec, rows, options, out, pool, *holder); break; case VectorEncoding::Simple::ROW: - exportRows(*vec.asUnchecked(), rows, out, pool, *holder); + exportRows( + *vec.asUnchecked(), rows, options, out, pool, *holder); break; case VectorEncoding::Simple::ARRAY: - exportArrays(*vec.asUnchecked(), rows, out, pool, *holder); + exportArrays( + *vec.asUnchecked(), rows, options, out, pool, *holder); break; case VectorEncoding::Simple::MAP: - exportMaps(*vec.asUnchecked(), rows, out, pool, *holder); + exportMaps( + *vec.asUnchecked(), rows, options, out, pool, *holder); break; case VectorEncoding::Simple::DICTIONARY: - exportDictionary(vec, rows, out, pool, *holder); + exportDictionary(vec, rows, options, out, pool, *holder); break; case VectorEncoding::Simple::CONSTANT: - exportConstant(vec, rows, out, pool, *holder); + exportConstant(vec, rows, options, out, pool, *holder); break; default: VELOX_NYI("{} cannot be exported to Arrow yet.", vec.encoding()); @@ -903,8 +949,7 @@ TypePtr importFromArrowImpl( return VARBINARY(); case 't': // temporal types. - // Mapping it to ttn for now. - if (format[1] == 't' && format[2] == 'n') { + if (format[1] == 's') { return TIMESTAMP(); } if (format[1] == 'd' && format[2] == 'D') { @@ -985,12 +1030,17 @@ TypePtr importFromArrowImpl( void exportToArrow( const VectorPtr& vector, + const BridgeOptions& options, ArrowArray& arrowArray, memory::MemoryPool* pool) { - exportToArrowImpl(*vector, Selection(vector->size()), arrowArray, pool); + exportToArrowImpl( + *vector, Selection(vector->size()), options, arrowArray, pool); } -void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) { +void exportToArrow( + const VectorPtr& vec, + const BridgeOptions& options, + ArrowSchema& arrowSchema) { auto& type = vec->type(); arrowSchema.name = nullptr; @@ -1010,7 +1060,7 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) { arrowSchema.format = "i"; bridgeHolder->dictionary = std::make_unique(); arrowSchema.dictionary = bridgeHolder->dictionary.get(); - exportToArrow(vec->valueVector(), *arrowSchema.dictionary); + exportToArrow(vec->valueVector(), options, *arrowSchema.dictionary); } else if (vec->encoding() == VectorEncoding::Simple::CONSTANT) { // Arrow REE spec available in @@ -1024,10 +1074,10 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) { // Contants of complex types are stored in the `values` vector. if (valueVector != nullptr) { - exportToArrow(valueVector, *valuesChild); + exportToArrow(valueVector, options, *valuesChild); } else { valuesChild->format = - exportArrowFormatStr(type, bridgeHolder->formatBuffer); + exportArrowFormatStr(type, options, bridgeHolder->formatBuffer); } bridgeHolder->setChildAtIndex( @@ -1035,7 +1085,8 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) { bridgeHolder->setChildAtIndex(1, std::move(valuesChild), arrowSchema); } else { - arrowSchema.format = exportArrowFormatStr(type, bridgeHolder->formatBuffer); + arrowSchema.format = + exportArrowFormatStr(type, options, bridgeHolder->formatBuffer); arrowSchema.dictionary = nullptr; if (type->kind() == TypeKind::MAP) { @@ -1050,14 +1101,14 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) { 0, std::vector{maps.mapKeys(), maps.mapValues()}, maps.getNullCount()); - exportToArrow(rows, *child); + exportToArrow(rows, options, *child); child->name = "entries"; bridgeHolder->setChildAtIndex(0, std::move(child), arrowSchema); } else if (type->kind() == TypeKind::ARRAY) { auto child = std::make_unique(); auto& arrays = *vec->asUnchecked(); - exportToArrow(arrays.elements(), *child); + exportToArrow(arrays.elements(), options, *child); // Name is required, and "item" is the default name used in arrow itself. child->name = "item"; bridgeHolder->setChildAtIndex(0, std::move(child), arrowSchema); @@ -1090,7 +1141,7 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) { try { auto& currentSchema = bridgeHolder->childrenOwned[i]; currentSchema = std::make_unique(); - exportToArrow(rows.childAt(i), *currentSchema); + exportToArrow(rows.childAt(i), options, *currentSchema); currentSchema->name = bridgeHolder->rowType->nameOf(i).data(); arrowSchema.children[i] = currentSchema.get(); } catch (const VeloxException& e) { @@ -1130,12 +1181,14 @@ TypePtr importFromArrow(const ArrowSchema& arrowSchema) { namespace { VectorPtr importFromArrowImpl( + const BridgeOptions& options, ArrowSchema& arrowSchema, ArrowArray& arrowArray, memory::MemoryPool* pool, bool isViewer); RowVectorPtr createRowVector( + const BridgeOptions& options, memory::MemoryPool* pool, const RowTypePtr& rowType, BufferPtr nulls, @@ -1150,7 +1203,11 @@ RowVectorPtr createRowVector( for (size_t i = 0; i < arrowArray.n_children; ++i) { childrenVector.push_back(importFromArrowImpl( - *arrowSchema.children[i], *arrowArray.children[i], pool, isViewer)); + options, + *arrowSchema.children[i], + *arrowArray.children[i], + pool, + isViewer)); } return std::make_shared( pool, @@ -1175,6 +1232,7 @@ BufferPtr computeSizes( } ArrayVectorPtr createArrayVector( + const BridgeOptions& options, memory::MemoryPool* pool, const TypePtr& type, BufferPtr nulls, @@ -1190,7 +1248,11 @@ ArrayVectorPtr createArrayVector( auto sizes = computeSizes(offsets->as(), arrowArray.length, pool); auto elements = importFromArrowImpl( - *arrowSchema.children[0], *arrowArray.children[0], pool, isViewer); + options, + *arrowSchema.children[0], + *arrowArray.children[0], + pool, + isViewer); return std::make_shared( pool, type, @@ -1203,6 +1265,7 @@ ArrayVectorPtr createArrayVector( } MapVectorPtr createMapVector( + const BridgeOptions& options, memory::MemoryPool* pool, const TypePtr& type, BufferPtr nulls, @@ -1218,7 +1281,11 @@ MapVectorPtr createMapVector( computeSizes(offsets->as(), arrowArray.length, pool); // Arrow wraps keys and values into a struct. auto entries = importFromArrowImpl( - *arrowSchema.children[0], *arrowArray.children[0], pool, isViewer); + options, + *arrowSchema.children[0], + *arrowArray.children[0], + pool, + isViewer); VELOX_CHECK(entries->type()->isRow()); const auto& rows = *entries->asUnchecked(); VELOX_CHECK_EQ(rows.childrenSize(), 2); @@ -1235,6 +1302,7 @@ MapVectorPtr createMapVector( } VectorPtr createDictionaryVector( + const BridgeOptions& options, memory::MemoryPool* pool, const TypePtr& indexType, BufferPtr nulls, @@ -1253,7 +1321,7 @@ VectorPtr createDictionaryVector( arrowArray.buffers[1], arrowArray.length * sizeof(vector_size_t)); auto type = importFromArrow(*arrowSchema.dictionary); auto wrapped = importFromArrowImpl( - *arrowSchema.dictionary, *arrowArray.dictionary, pool, isViewer); + options, *arrowSchema.dictionary, *arrowArray.dictionary, pool, isViewer); return BaseVector::wrapInDictionary( std::move(nulls), std::move(indices), @@ -1262,6 +1330,7 @@ VectorPtr createDictionaryVector( } VectorPtr createTimestampVector( + const BridgeOptions& options, memory::MemoryPool* pool, const TypePtr& type, BufferPtr nulls, @@ -1273,10 +1342,38 @@ VectorPtr createTimestampVector( const auto* rawNulls = nulls->as(); if (length > nullCount) { - for (size_t i = 0; i < length; ++i) { - if (!bits::isBitNull(rawNulls, i)) { - rawTimestamps[i] = Timestamp::fromNanos(input[i]); - } + switch (options.timestampUnit) { + case TimestampUnit::kSecond: + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp(input[i], 0); + } + } + break; + case TimestampUnit::kMilli: + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp::fromMillis(input[i]); + } + } + break; + case TimestampUnit::kMicro: + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp::fromMicros(input[i]); + } + } + break; + case TimestampUnit::kNano: + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp::fromNanos(input[i]); + } + } + break; + default: + VELOX_USER_FAIL(fmt::format( + "Unsupported timestamp unit: {}.", options.timestampUnit)); } } return std::make_shared>( @@ -1292,6 +1389,7 @@ VectorPtr createTimestampVector( } VectorPtr importFromArrowImpl( + const BridgeOptions& options, ArrowSchema& arrowSchema, ArrowArray& arrowArray, memory::MemoryPool* pool, @@ -1329,6 +1427,7 @@ VectorPtr importFromArrowImpl( if (arrowSchema.dictionary) { auto indexType = importFromArrowImpl(arrowSchema.format, arrowSchema); return createDictionaryVector( + options, pool, indexType, nulls, @@ -1355,6 +1454,7 @@ VectorPtr importFromArrowImpl( wrapInBufferView); } else if (type->isTimestamp()) { return createTimestampVector( + options, pool, type, nulls, @@ -1364,6 +1464,7 @@ VectorPtr importFromArrowImpl( } else if (type->isRow()) { // Row/structs. return createRowVector( + options, pool, std::dynamic_pointer_cast(type), nulls, @@ -1372,10 +1473,24 @@ VectorPtr importFromArrowImpl( isViewer); } else if (type->isArray()) { return createArrayVector( - pool, type, nulls, arrowSchema, arrowArray, isViewer, wrapInBufferView); + options, + pool, + type, + nulls, + arrowSchema, + arrowArray, + isViewer, + wrapInBufferView); } else if (type->isMap()) { return createMapVector( - pool, type, nulls, arrowSchema, arrowArray, isViewer, wrapInBufferView); + options, + pool, + type, + nulls, + arrowSchema, + arrowArray, + isViewer, + wrapInBufferView); } else if (type->isPrimitiveType()) { // Other primitive types. @@ -1403,13 +1518,19 @@ VectorPtr importFromArrowImpl( } VectorPtr importFromArrowImpl( + const BridgeOptions& options, ArrowSchema& arrowSchema, ArrowArray& arrowArray, memory::MemoryPool* pool, bool isViewer) { if (isViewer) { return importFromArrowImpl( - arrowSchema, arrowArray, pool, isViewer, wrapInBufferViewAsViewer); + options, + arrowSchema, + arrowArray, + pool, + isViewer, + wrapInBufferViewAsViewer); } // This Vector will take over the ownership of `arrowSchema` and `arrowArray` @@ -1436,6 +1557,7 @@ VectorPtr importFromArrowImpl( } }); VectorPtr imported = importFromArrowImpl( + options, arrowSchema, arrowArray, pool, @@ -1456,8 +1578,10 @@ VectorPtr importFromArrowImpl( VectorPtr importFromArrowAsViewer( const ArrowSchema& arrowSchema, const ArrowArray& arrowArray, + const BridgeOptions& options, memory::MemoryPool* pool) { return importFromArrowImpl( + options, const_cast(arrowSchema), const_cast(arrowArray), pool, @@ -1467,8 +1591,9 @@ VectorPtr importFromArrowAsViewer( VectorPtr importFromArrowAsOwner( ArrowSchema& arrowSchema, ArrowArray& arrowArray, + const BridgeOptions& options, memory::MemoryPool* pool) { - return importFromArrowImpl(arrowSchema, arrowArray, pool, false); + return importFromArrowImpl(options, arrowSchema, arrowArray, pool, false); } } // namespace facebook::velox diff --git a/velox/vector/arrow/Bridge.h b/velox/vector/arrow/Bridge.h index faeb571d38169..10f9d5efcfeb6 100644 --- a/velox/vector/arrow/Bridge.h +++ b/velox/vector/arrow/Bridge.h @@ -27,6 +27,12 @@ struct ArrowSchema; namespace facebook::velox { +enum class TimestampUnit { kSecond = 0, kMilli = 1, kMicro = 2, kNano = 3 }; + +struct BridgeOptions { + TimestampUnit timestampUnit = TimestampUnit::kNano; +}; + /// Export a generic Velox Vector to an ArrowArray, as defined by Arrow's C data /// interface: /// @@ -56,6 +62,7 @@ namespace facebook::velox { /// void exportToArrow( const VectorPtr& vector, + const BridgeOptions& options, ArrowArray& arrowArray, memory::MemoryPool* pool); @@ -78,7 +85,7 @@ void exportToArrow( /// /// NOTE: Since Arrow couples type and encoding, we need both Velox type and /// actual data (containing encoding) to create an ArrowSchema. -void exportToArrow(const VectorPtr&, ArrowSchema&); +void exportToArrow(const VectorPtr&, const BridgeOptions&, ArrowSchema&); /// Import an ArrowSchema into a Velox Type object. /// @@ -126,6 +133,7 @@ TypePtr importFromArrow(const ArrowSchema& arrowSchema); VectorPtr importFromArrowAsViewer( const ArrowSchema& arrowSchema, const ArrowArray& arrowArray, + const BridgeOptions& options, memory::MemoryPool* pool); /// Import an ArrowArray and ArrowSchema into a Velox vector, acquiring @@ -143,6 +151,7 @@ VectorPtr importFromArrowAsViewer( VectorPtr importFromArrowAsOwner( ArrowSchema& arrowSchema, ArrowArray& arrowArray, + const BridgeOptions& options, memory::MemoryPool* pool); } // namespace facebook::velox diff --git a/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp b/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp index e4cc8cf31ff53..0337927277bd5 100644 --- a/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp +++ b/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp @@ -50,7 +50,7 @@ class ArrowBridgeArrayExportTest : public testing::Test { const TypePtr& type = CppToType::create()) { auto flatVector = vectorMaker_.flatVectorNullable(inputData, type); ArrowArray arrowArray; - velox::exportToArrow(flatVector, arrowArray, pool_.get()); + velox::exportToArrow(flatVector, options_, arrowArray, pool_.get()); validateArray(inputData, arrowArray); @@ -63,7 +63,7 @@ class ArrowBridgeArrayExportTest : public testing::Test { void testArrayVector(const T& inputData) { auto arrayVector = vectorMaker_.arrayVectorNullable(inputData); ArrowArray arrowArray; - velox::exportToArrow(arrayVector, arrowArray, pool_.get()); + velox::exportToArrow(arrayVector, options_, arrowArray, pool_.get()); validateListArray(inputData, arrowArray); @@ -89,7 +89,7 @@ class ArrowBridgeArrayExportTest : public testing::Test { const VectorPtr& constantVector, const TInput& input) { ArrowArray arrowArray; - velox::exportToArrow(constantVector, arrowArray, pool_.get()); + velox::exportToArrow(constantVector, options_, arrowArray, pool_.get()); validateConstant( input, constantVector->size(), @@ -157,8 +157,33 @@ class ArrowBridgeArrayExportTest : public testing::Test { bits::isBitSet(reinterpret_cast(values), i)) << "mismatch at index " << i; } else if constexpr (std::is_same_v) { - EXPECT_EQ(inputData[i], Timestamp::fromNanos(values[i])) - << "mismatch at index " << i; + switch (options_.timestampUnit) { + case TimestampUnit::kSecond: + EXPECT_EQ( + Timestamp(inputData[i].value().getSeconds(), 0), + Timestamp(values[i], 0)) + << "mismatch at index " << i; + break; + case TimestampUnit::kMilli: + EXPECT_EQ( + Timestamp::fromMillis(inputData[i].value().toMillis()), + Timestamp::fromMillis(values[i])) + << "mismatch at index " << i; + break; + case TimestampUnit::kMicro: + EXPECT_EQ( + Timestamp::fromMicros(inputData[i].value().toMicros()), + Timestamp::fromMicros(values[i])) + << "mismatch at index " << i; + break; + case TimestampUnit::kNano: + EXPECT_EQ(inputData[i], Timestamp::fromNanos(values[i])) + << "mismatch at index " << i; + break; + default: + VELOX_USER_FAIL(fmt::format( + "Timestamp unit not supported: {}.", options_.timestampUnit)); + } } else { EXPECT_EQ(inputData[i], values[i]) << "mismatch at index " << i; } @@ -349,10 +374,12 @@ class ArrowBridgeArrayExportTest : public testing::Test { } void exportToArrow(const TypePtr& type, ArrowSchema& out) { - velox::exportToArrow(BaseVector::create(type, 0, pool_.get()), out); + velox::exportToArrow( + BaseVector::create(type, 0, pool_.get()), options_, out); } // Boiler plate structures required by vectorMaker. + BridgeOptions options_; std::shared_ptr queryCtx_{std::make_shared()}; std::shared_ptr pool_{memory::addDefaultLeafMemoryPool()}; core::ExecCtx execCtx_{pool_.get(), queryCtx_.get()}; @@ -366,7 +393,7 @@ TEST_F(ArrowBridgeArrayExportTest, flatNotNull) { // Make sure that ArrowArray is correctly acquiring ownership, even after // the initial vector shared_ptr is gone. auto flatVector = vectorMaker_.flatVector(inputData); - velox::exportToArrow(flatVector, arrowArray, pool_.get()); + velox::exportToArrow(flatVector, options_, arrowArray, pool_.get()); } EXPECT_EQ(inputData.size(), arrowArray.length); @@ -489,16 +516,20 @@ TEST_F(ArrowBridgeArrayExportTest, flatDate) { } TEST_F(ArrowBridgeArrayExportTest, flatTimestamp) { - testFlatVector( - { - Timestamp(0, 0), - std::nullopt, - Timestamp(1699300965, 12'349), - Timestamp(-2208960000, 0), // 1900-01-01 - Timestamp(3155788800, 999'999'999), - std::nullopt, - }, - TIMESTAMP()); + for (uint8_t unit = 0; unit <= static_cast(TimestampUnit::kNano); + ++unit) { + options_.timestampUnit = static_cast(unit); + testFlatVector( + { + Timestamp(0, 0), + std::nullopt, + Timestamp(1699300965, 12'349), + Timestamp(-2208960000, 0), // 1900-01-01 + Timestamp(3155788800, 999'999'999), + std::nullopt, + }, + TIMESTAMP()); + } // Out of range. If nanosecond precision is represented in Arrow, timestamps // starting around 2263-01-01 should overflow and throw a user exception. @@ -537,7 +568,7 @@ TEST_F(ArrowBridgeArrayExportTest, rowVector) { }); ArrowArray arrowArray; - velox::exportToArrow(vector, arrowArray, pool_.get()); + velox::exportToArrow(vector, options_, arrowArray, pool_.get()); EXPECT_EQ(col1.size(), arrowArray.length); EXPECT_EQ(0, arrowArray.null_count); @@ -574,7 +605,7 @@ TEST_F(ArrowBridgeArrayExportTest, rowVectorNullable) { vector->setNullCount(3); ArrowArray arrowArray; - velox::exportToArrow(vector, arrowArray, pool_.get()); + velox::exportToArrow(vector, options_, arrowArray, pool_.get()); EXPECT_EQ(col1.size(), arrowArray.length); EXPECT_EQ(3, arrowArray.null_count); @@ -602,7 +633,8 @@ TEST_F(ArrowBridgeArrayExportTest, rowVectorNullable) { TEST_F(ArrowBridgeArrayExportTest, rowVectorEmpty) { ArrowArray arrowArray; - velox::exportToArrow(vectorMaker_.rowVector({}), arrowArray, pool_.get()); + velox::exportToArrow( + vectorMaker_.rowVector({}), options_, arrowArray, pool_.get()); EXPECT_EQ(0, arrowArray.n_children); EXPECT_EQ(1, arrowArray.n_buffers); EXPECT_EQ(nullptr, arrowArray.children); @@ -612,11 +644,12 @@ TEST_F(ArrowBridgeArrayExportTest, rowVectorEmpty) { std::shared_ptr toArrow( const VectorPtr& vec, + const BridgeOptions options, memory::MemoryPool* pool) { ArrowSchema schema; ArrowArray array; - exportToArrow(vec, schema); - exportToArrow(vec, array, pool); + exportToArrow(vec, options, schema); + exportToArrow(vec, options, array, pool); EXPECT_OK_AND_ASSIGN(auto type, arrow::ImportType(&schema)); EXPECT_OK_AND_ASSIGN(auto ans, arrow::ImportArray(&array, type)); return ans; @@ -650,7 +683,7 @@ TEST_F(ArrowBridgeArrayExportTest, arraySimple) { TEST_F(ArrowBridgeArrayExportTest, arrayCrossValidate) { auto vec = vectorMaker_.arrayVector({{1, 2, 3}, {4, 5}}); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::list(arrow::int64())); @@ -677,10 +710,10 @@ TEST_F(ArrowBridgeArrayExportTest, arrayDictionary) { ArrowSchema schema; ArrowArray data; - velox::exportToArrow(vec, schema); - velox::exportToArrow(vec, data, vec->pool()); + velox::exportToArrow(vec, options_, schema); + velox::exportToArrow(vec, options_, data, vec->pool()); - auto result = importFromArrowAsViewer(schema, data, vec->pool()); + auto result = importFromArrowAsViewer(schema, data, options_, vec->pool()); test::assertEqualVectors(result, vec); schema.release(&schema); data.release(&data); @@ -694,7 +727,7 @@ TEST_F(ArrowBridgeArrayExportTest, arrayGap) { auto sizes = makeBuffer({2, 2}); auto vec = std::make_shared( pool_.get(), ARRAY(BIGINT()), nullptr, 2, offsets, sizes, elements); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::list(arrow::int64())); @@ -719,7 +752,7 @@ TEST_F(ArrowBridgeArrayExportTest, arrayReorder) { auto sizes = makeBuffer({2, 2}); auto vec = std::make_shared( pool_.get(), ARRAY(BIGINT()), nullptr, 2, offsets, sizes, elements); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::list(arrow::int64())); @@ -749,7 +782,7 @@ TEST_F(ArrowBridgeArrayExportTest, arrayNested) { std::make_shared( pool_.get(), ARRAY(inner->type()), nullptr, 2, offsets, sizes, inner); }); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); ASSERT_EQ(*array->type(), *arrow::list(arrow::list(arrow::int64()))); auto& listArray = static_cast(*array); @@ -766,7 +799,7 @@ TEST_F(ArrowBridgeArrayExportTest, mapSimple) { auto allOnes = [](vector_size_t) { return 1; }; auto vec = vectorMaker_.mapVector(2, allOnes, allOnes, allOnes); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::map(arrow::int64(), arrow::int64())); @@ -801,7 +834,7 @@ TEST_F(ArrowBridgeArrayExportTest, mapNested) { std::make_shared( pool_.get(), type, nullptr, 2, offsets, sizes, keys, inner); }); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ( @@ -833,7 +866,7 @@ TEST_F(ArrowBridgeArrayExportTest, dictionarySimple) { allocateIndices(3, pool_.get()), 3, vectorMaker_.flatVector({1, 2, 3})); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); ASSERT_EQ(*array->type(), *arrow::dictionary(arrow::int32(), arrow::int64())); auto& dict = static_cast(*array); @@ -859,7 +892,7 @@ TEST_F(ArrowBridgeArrayExportTest, dictionaryNested) { std::make_shared( pool_.get(), ARRAY(inner->type()), nullptr, 2, offsets, sizes, inner); }); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); ASSERT_EQ( *array->type(), @@ -951,7 +984,23 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { if constexpr (std::is_same_v) { bits::setBit(rawValues, i, *inputValues[i]); } else if constexpr (std::is_same_v) { - rawValues[i] = inputValues[i]->toNanos(); + switch (options_.timestampUnit) { + case TimestampUnit::kSecond: + rawValues[i] = inputValues[i]->getSeconds(); + break; + case TimestampUnit::kMilli: + rawValues[i] = inputValues[i]->toMillis(); + break; + case TimestampUnit::kMicro: + rawValues[i] = inputValues[i]->toMicros(); + break; + case TimestampUnit::kNano: + rawValues[i] = inputValues[i]->toNanos(); + break; + default: + VELOX_USER_FAIL(fmt::format( + "Timestamp unit not supported: {}.", options_.timestampUnit)); + } } else { rawValues[i] = *inputValues[i]; } @@ -1064,9 +1113,46 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { // Assert new vector contents. for (vector_size_t i = 0; i < convertedVector->size(); ++i) { - ASSERT_TRUE(expected->equalValueAt(convertedVector.get(), i, i)) - << "at " << i << ": " << expected->toString(i) << " vs. " - << convertedVector->toString(i); + if constexpr (std::is_same_v) { + if (expected->isNullAt(i)) { + ASSERT_TRUE(convertedVector->isNullAt(i)) + << "at " << i << ": " << expected->toString(i) << " vs. " + << convertedVector->toString(i); + } else { + auto convertedFlat = convertedVector->asFlatVector(); + switch (options_.timestampUnit) { + case TimestampUnit::kSecond: + EXPECT_EQ( + Timestamp(expected->valueAt(i).getSeconds(), 0), + convertedFlat->valueAt(i)) + << "mismatch at index " << i; + break; + case TimestampUnit::kMilli: + EXPECT_EQ( + Timestamp::fromMillis(expected->valueAt(i).toMillis()), + convertedFlat->valueAt(i)) + << "mismatch at index " << i; + break; + case TimestampUnit::kMicro: + EXPECT_EQ( + Timestamp::fromMicros(expected->valueAt(i).toMicros()), + convertedFlat->valueAt(i)) + << "mismatch at index " << i; + break; + case TimestampUnit::kNano: + EXPECT_EQ(expected->valueAt(i), convertedFlat->valueAt(i)) + << "mismatch at index " << i; + break; + default: + VELOX_USER_FAIL(fmt::format( + "Timestamp unit not supported: {}.", options_.timestampUnit)); + } + } + } else { + ASSERT_TRUE(expected->equalValueAt(convertedVector.get(), i, i)) + << "at " << i << ": " << expected->toString(i) << " vs. " + << convertedVector->toString(i); + } } } @@ -1099,8 +1185,12 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { testArrowImport("g", {-99.9, 4.3, 31.1, 129.11, -12}); testArrowImport("f", {-99.9, 4.3, 31.1, 129.11, -12}); - testArrowImport( - "ttn", {Timestamp(0, 0), std::nullopt, Timestamp(1699308257, 1234)}); + for (uint8_t unit = 0; unit <= static_cast(TimestampUnit::kNano); + ++unit) { + options_.timestampUnit = static_cast(unit); + testArrowImport( + "ts", {Timestamp(0, 0), std::nullopt, Timestamp(1699308257, 1234)}); + } } void testImportWithoutNullsBuffer() { @@ -1277,8 +1367,8 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { EXPECT_FALSE(schema.release); EXPECT_FALSE(data.release); } - velox::exportToArrow(vec, schema); - velox::exportToArrow(vec, data, pool_.get()); + velox::exportToArrow(vec, options_, schema); + velox::exportToArrow(vec, options_, data, pool_.get()); ASSERT_OK_AND_ASSIGN(auto arrowType, arrow::ImportType(&schema)); ASSERT_OK_AND_ASSIGN(auto array2, arrow::ImportArray(&data, arrowType)); ASSERT_OK(array2->ValidateFull()); @@ -1400,6 +1490,7 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { } std::shared_ptr pool_{memory::addDefaultLeafMemoryPool()}; + BridgeOptions options_; }; class ArrowBridgeArrayImportAsViewerTest : public ArrowBridgeArrayImportTest { @@ -1412,7 +1503,7 @@ class ArrowBridgeArrayImportAsViewerTest : public ArrowBridgeArrayImportTest { ArrowArray& arrowArray, memory::MemoryPool* pool) override { return facebook::velox::importFromArrowAsViewer( - arrowSchema, arrowArray, pool); + arrowSchema, arrowArray, options_, pool); } }; @@ -1459,7 +1550,7 @@ class ArrowBridgeArrayImportAsOwnerTest ArrowArray& arrowArray, memory::MemoryPool* pool) override { return facebook::velox::importFromArrowAsOwner( - arrowSchema, arrowArray, pool); + arrowSchema, arrowArray, options_, pool); } }; @@ -1502,7 +1593,8 @@ TEST_F(ArrowBridgeArrayImportAsOwnerTest, inputsMarkedReleased) { ArrowSchema arrowSchema = makeArrowSchema("i"); ArrowArray arrowArray = makeArrowArray(buffers, 2, 4, 0); - auto _ = importFromArrowAsOwner(arrowSchema, arrowArray, pool_.get()); + auto _ = + importFromArrowAsOwner(arrowSchema, arrowArray, options_, pool_.get()); EXPECT_EQ(arrowSchema.release, nullptr); EXPECT_EQ(arrowArray.release, nullptr); @@ -1535,7 +1627,10 @@ TEST_F(ArrowBridgeArrayImportAsOwnerTest, releaseCalled) { // Create a Velox Vector from Arrow and then destruct it to trigger the // release callback calling - { auto _ = importFromArrowAsOwner(arrowSchema, arrowArray, pool_.get()); } + { + auto _ = + importFromArrowAsOwner(arrowSchema, arrowArray, options_, pool_.get()); + } EXPECT_TRUE(TestReleaseCalled::schemaReleaseCalled); EXPECT_TRUE(TestReleaseCalled::arrayReleaseCalled); diff --git a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp index 7277bd2e060db..83ee3c2ad80b6 100644 --- a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp +++ b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp @@ -111,7 +111,7 @@ class ArrowBridgeSchemaExportTest : public testing::Test { 3, // index to use for the constant BaseVector::create(type, 100, pool_.get())); - velox::exportToArrow(constantVector, arrowSchema); + velox::exportToArrow(constantVector, options_, arrowSchema); EXPECT_EQ("+r", std::string{arrowSchema.format}); EXPECT_EQ(nullptr, arrowSchema.name); @@ -146,7 +146,8 @@ class ArrowBridgeSchemaExportTest : public testing::Test { } void exportToArrow(const TypePtr& type, ArrowSchema& out) { - velox::exportToArrow(BaseVector::create(type, 0, pool_.get()), out); + velox::exportToArrow( + BaseVector::create(type, 0, pool_.get()), options_, out); } ArrowSchema makeArrowSchema(const char* format) { @@ -163,6 +164,7 @@ class ArrowBridgeSchemaExportTest : public testing::Test { }; } + BridgeOptions options_; std::shared_ptr pool_{memory::addDefaultLeafMemoryPool()}; }; @@ -180,7 +182,15 @@ TEST_F(ArrowBridgeSchemaExportTest, scalar) { testScalarType(VARCHAR(), "u"); testScalarType(VARBINARY(), "z"); - testScalarType(TIMESTAMP(), "ttn"); + options_.timestampUnit = TimestampUnit::kSecond; + testScalarType(TIMESTAMP(), "tss:"); + options_.timestampUnit = TimestampUnit::kMilli; + testScalarType(TIMESTAMP(), "tsm:"); + options_.timestampUnit = TimestampUnit::kMicro; + testScalarType(TIMESTAMP(), "tsu:"); + options_.timestampUnit = TimestampUnit::kNano; + testScalarType(TIMESTAMP(), "tsn:"); + testScalarType(DATE(), "tdD"); testScalarType(DECIMAL(10, 4), "d:10,4"); @@ -344,7 +354,7 @@ TEST_F(ArrowBridgeSchemaImportTest, scalar) { EXPECT_EQ(*VARBINARY(), *testSchemaImport("Z")); // Temporal. - EXPECT_EQ(*TIMESTAMP(), *testSchemaImport("ttn")); + EXPECT_EQ(*TIMESTAMP(), *testSchemaImport("tsu:")); EXPECT_EQ(*DATE(), *testSchemaImport("tdD")); EXPECT_EQ(*DECIMAL(10, 4), *testSchemaImport("d:10,4")); @@ -357,7 +367,7 @@ TEST_F(ArrowBridgeSchemaImportTest, scalar) { TEST_F(ArrowBridgeSchemaImportTest, complexTypes) { // Array. EXPECT_EQ(*ARRAY(BIGINT()), *testSchemaImportComplex("+l", {"l"})); - EXPECT_EQ(*ARRAY(TIMESTAMP()), *testSchemaImportComplex("+l", {"ttn"})); + EXPECT_EQ(*ARRAY(TIMESTAMP()), *testSchemaImportComplex("+l", {"tsn:"})); EXPECT_EQ(*ARRAY(DATE()), *testSchemaImportComplex("+l", {"tdD"})); EXPECT_EQ(*ARRAY(VARCHAR()), *testSchemaImportComplex("+l", {"U"})); @@ -419,9 +429,11 @@ class ArrowBridgeSchemaTest : public testing::Test { } void exportToArrow(const TypePtr& type, ArrowSchema& out) { - velox::exportToArrow(BaseVector::create(type, 0, pool_.get()), out); + velox::exportToArrow( + BaseVector::create(type, 0, pool_.get()), options_, out); } + BridgeOptions options_; std::shared_ptr pool_{memory::addDefaultLeafMemoryPool()}; }; @@ -497,7 +509,7 @@ TEST_F(ArrowBridgeSchemaImportTest, dictionaryTypeTest) { *testSchemaDictionaryImport( "i", makeComplexArrowSchema( - schemas, schemaPtrs, mapSchemas, mapSchemaPtrs, "+l", {"ttn"}))); + schemas, schemaPtrs, mapSchemas, mapSchemaPtrs, "+l", {"ts"}))); EXPECT_EQ( *ARRAY(DATE()), *testSchemaDictionaryImport(