From efd015896c3c1d9fa5ba39ce3c4b9de26b45fc39 Mon Sep 17 00:00:00 2001 From: rui-mo Date: Tue, 19 Mar 2024 13:09:13 -0700 Subject: [PATCH] Support different timestamp units in arrow bridge (#7625) Summary: Arrow bridge supports different timestamp units, including second, milli, micro and nano. This PR adds `TimestampUnit` in `ArrowOptions` to support these units in the process of exportToArrow. For importFromArrow, the unit extracted from arrow schema is followed. By default, the conversion unit is nano, and in Gluten, micro is configured to align with Spark. https://github.com/facebookincubator/velox/pull/4680#discussion_r1378867890 Arrow Reference: https://github.com/apache/arrow/blob/main/cpp/src/arrow/c/bridge.cc#L402-L421. Pull Request resolved: https://github.com/facebookincubator/velox/pull/7625 Reviewed By: mbasmanova Differential Revision: D54852534 Pulled By: Yuhta fbshipit-source-id: 0494102fedc73f7068424bc09e972e7deb297a6e --- velox/connectors/hive/HiveConfig.cpp | 11 + velox/connectors/hive/HiveConfig.h | 10 + velox/connectors/hive/HiveDataSink.cpp | 2 + velox/docs/configs.rst | 6 + velox/dwio/common/Options.h | 1 + .../tests/writer/ParquetWriterTest.cpp | 100 ++++++ velox/dwio/parquet/writer/Writer.cpp | 15 +- velox/dwio/parquet/writer/Writer.h | 5 + velox/exec/tests/ArrowStreamTest.cpp | 11 +- velox/vector/arrow/Bridge.cpp | 310 ++++++++++++++---- velox/vector/arrow/Bridge.h | 8 + .../arrow/tests/ArrowBridgeArrayTest.cpp | 163 ++++++--- .../arrow/tests/ArrowBridgeSchemaTest.cpp | 26 +- 13 files changed, 541 insertions(+), 127 deletions(-) diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index e6048f0d6aedb..ae88c700f8034 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -206,4 +206,15 @@ bool HiveConfig::s3UseProxyFromEnv() const { return config_->get(kS3UseProxyFromEnv, false); } +uint8_t HiveConfig::parquetWriteTimestampUnit(const Config* session) const { + const auto unit = session->get( + kParquetWriteTimestampUnitSession, + config_->get(kParquetWriteTimestampUnit, 9 /*nano*/)); + VELOX_CHECK( + unit == 0 /*second*/ || unit == 3 /*milli*/ || unit == 6 /*micro*/ || + unit == 9, + "Invalid timestamp unit."); + return unit; +} + } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/HiveConfig.h b/velox/connectors/hive/HiveConfig.h index e50b411570363..e1b0a3d2a19c0 100644 --- a/velox/connectors/hive/HiveConfig.h +++ b/velox/connectors/hive/HiveConfig.h @@ -178,6 +178,12 @@ class HiveConfig { static constexpr const char* kS3UseProxyFromEnv = "hive.s3.use-proxy-from-env"; + /// Timestamp unit for Parquet write through Arrow bridge. + static constexpr const char* kParquetWriteTimestampUnit = + "hive.parquet.writer.timestamp-unit"; + static constexpr const char* kParquetWriteTimestampUnitSession = + "hive.parquet.writer.timestamp_unit"; + InsertExistingPartitionsBehavior insertExistingPartitionsBehavior( const Config* session) const; @@ -247,6 +253,10 @@ class HiveConfig { bool s3UseProxyFromEnv() const; + /// Returns the timestamp unit used when writing timestamps into Parquet + /// through Arrow bridge. 0: second, 3: milli, 6: micro, 9: nano. + uint8_t parquetWriteTimestampUnit(const Config* session) const; + HiveConfig(std::shared_ptr config) { VELOX_CHECK_NOT_NULL( config, "Config is null for HiveConfig initialization"); diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 3e770d973ed1e..89436ce81640c 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -675,6 +675,8 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) { hiveConfig_->orcWriterMaxStripeSize(connectorSessionProperties)); options.maxDictionaryMemory = std::optional( hiveConfig_->orcWriterMaxDictionaryMemory(connectorSessionProperties)); + options.parquetWriteTimestampUnit = + hiveConfig_->parquetWriteTimestampUnit(connectorSessionProperties); options.serdeParameters = std::map( insertTableHandle_->serdeParameters().begin(), insertTableHandle_->serdeParameters().end()); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index af66d2ea9f810..e336f2b02e9c7 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -488,6 +488,12 @@ Each query can override the config by setting corresponding query session proper - string - 16M - Maximum dictionary memory that can be used in orc writer. + * - hive.parquet.writer.timestamp-unit + - hive.parquet.writer.timestamp_unit + - tinyint + - 9 + - Timestamp unit used when writing timestamps into Parquet through Arrow bridge. + Valid values are 0 (second), 3 (millisecond), 6 (microsecond), 9 (nanosecond). ``Amazon S3 Configuration`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index acb852a6b06fa..86eb3fda64635 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -593,6 +593,7 @@ struct WriterOptions { std::optional maxStripeSize{std::nullopt}; std::optional maxDictionaryMemory{std::nullopt}; std::map serdeParameters; + std::optional parquetWriteTimestampUnit; }; } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp index a9582e49fd07e..6fed5d3da28a9 100644 --- a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp +++ b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp @@ -14,16 +14,38 @@ * limitations under the License. */ +#include +#include + #include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/testutil/TestValue.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/core/QueryCtx.h" #include "velox/dwio/parquet/tests/ParquetTestBase.h" +#include "velox/exec/tests/utils/Cursor.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/QueryAssertions.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" using namespace facebook::velox; using namespace facebook::velox::common; using namespace facebook::velox::dwio::common; +using namespace facebook::velox::exec::test; using namespace facebook::velox::parquet; class ParquetWriterTest : public ParquetTestBase { protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + testutil::TestValue::enable(); + auto hiveConnector = + connector::getConnectorFactory( + connector::hive::HiveConnectorFactory::kHiveConnectorName) + ->newConnector( + kHiveConnectorId, std::make_shared()); + connector::registerConnector(hiveConnector); + } + std::unique_ptr createRowReaderWithSchema( const std::unique_ptr reader, const RowTypePtr& rowType) { @@ -43,6 +65,8 @@ class ParquetWriterTest : public ParquetTestBase { std::make_shared(data), opts.getMemoryPool()), opts); }; + + inline static const std::string kHiveConnectorId = "test-hive"; }; std::vector params = { @@ -110,3 +134,79 @@ TEST_F(ParquetWriterTest, compression) { auto rowReader = createRowReaderWithSchema(std::move(reader), schema); assertReadWithReaderAndExpected(schema, *rowReader, data, *leafPool_); }; + +DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) { + SCOPED_TESTVALUE_SET( + "facebook::velox::parquet::Writer::write", + std::function( + ([&](const ::arrow::Schema* arrowSchema) { + const auto tsType = + std::dynamic_pointer_cast<::arrow::TimestampType>( + arrowSchema->field(0)->type()); + ASSERT_EQ(tsType->unit(), ::arrow::TimeUnit::MICRO); + }))); + + const auto data = makeRowVector({makeFlatVector( + 10'000, [](auto row) { return Timestamp(row, row); })}); + parquet::WriterOptions writerOptions; + writerOptions.memoryPool = leafPool_.get(); + writerOptions.parquetWriteTimestampUnit = + static_cast(TimestampUnit::kMicro); + + // Create an in-memory writer. + auto sink = std::make_unique( + 200 * 1024 * 1024, + dwio::common::FileSink::Options{.pool = leafPool_.get()}); + auto writer = std::make_unique( + std::move(sink), writerOptions, rootPool_, ROW({"c0"}, {TIMESTAMP()})); + writer->write(data); + writer->close(); +}; + +#ifdef VELOX_ENABLE_PARQUET +DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromHiveConfig) { + SCOPED_TESTVALUE_SET( + "facebook::velox::parquet::Writer::write", + std::function( + ([&](const ::arrow::Schema* arrowSchema) { + const auto tsType = + std::dynamic_pointer_cast<::arrow::TimestampType>( + arrowSchema->field(0)->type()); + ASSERT_EQ(tsType->unit(), ::arrow::TimeUnit::MICRO); + }))); + + const auto data = makeRowVector({makeFlatVector( + 10'000, [](auto row) { return Timestamp(row, row); })}); + const auto outputDirectory = TempDirectoryPath::create(); + const auto plan = + PlanBuilder() + .values({data}) + .tableWrite(outputDirectory->path, dwio::common::FileFormat::PARQUET) + .planNode(); + + CursorParameters params; + std::shared_ptr executor = + std::make_shared( + std::thread::hardware_concurrency()); + std::shared_ptr queryCtx = + std::make_shared(executor.get()); + std::unordered_map session = { + {std::string( + connector::hive::HiveConfig::kParquetWriteTimestampUnitSession), + "6" /*kMicro*/}}; + queryCtx->setConnectorSessionOverridesUnsafe( + kHiveConnectorId, std::move(session)); + params.queryCtx = queryCtx; + params.planNode = plan; + + auto addSplits = [&](exec::Task* task) {}; + auto result = readCursor(params, addSplits); + ASSERT_TRUE(waitForTaskCompletion(result.first->task().get())); +} +#endif + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::Init init{&argc, &argv, false}; + return RUN_ALL_TESTS(); +} diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index b6479189251d0..49c0a84f3def6 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -18,10 +18,10 @@ #include #include #include +#include "velox/common/testutil/TestValue.h" #include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/dwio/parquet/writer/arrow/Writer.h" #include "velox/exec/MemoryReclaimer.h" -#include "velox/vector/arrow/Bridge.h" namespace facebook::velox::parquet { @@ -234,6 +234,8 @@ Writer::Writer( } else { flushPolicy_ = std::make_unique(); } + options_.timestampUnit = + static_cast(options.parquetWriteTimestampUnit); arrowContext_->properties = getArrowParquetWriterOptions(options, flushPolicy_); setMemoryReclaimers(); @@ -310,15 +312,16 @@ void Writer::write(const VectorPtr& data) { data->type()->equivalent(*schema_), "The file schema type should be equal with the input rowvector type."); - ArrowOptions options{.flattenDictionary = true, .flattenConstant = true}; ArrowArray array; ArrowSchema schema; - exportToArrow(data, array, generalPool_.get(), options); - exportToArrow(data, schema, options); + exportToArrow(data, array, generalPool_.get(), options_); + exportToArrow(data, schema, options_); // Convert the arrow schema to Schema and then update the column names based // on schema_. auto arrowSchema = ::arrow::ImportSchema(&schema).ValueOrDie(); + common::testutil::TestValue::adjust( + "facebook::velox::parquet::Writer::write", arrowSchema.get()); std::vector> newFields; auto childSize = schema_->size(); for (auto i = 0; i < childSize; i++) { @@ -386,6 +389,10 @@ parquet::WriterOptions getParquetOptions( if (options.compressionKind.has_value()) { parquetOptions.compression = options.compressionKind.value(); } + if (options.parquetWriteTimestampUnit.has_value()) { + parquetOptions.parquetWriteTimestampUnit = + options.parquetWriteTimestampUnit.value(); + } return parquetOptions; } diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index 6065366bce6b5..7f1886708a2bd 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -26,6 +26,7 @@ #include "velox/dwio/parquet/writer/arrow/Types.h" #include "velox/dwio/parquet/writer/arrow/util/Compression.h" #include "velox/vector/ComplexVector.h" +#include "velox/vector/arrow/Bridge.h" namespace facebook::velox::parquet { @@ -102,6 +103,8 @@ struct WriterOptions { std::shared_ptr codecOptions; std::unordered_map columnCompressionsMap; + uint8_t parquetWriteTimestampUnit = + static_cast(TimestampUnit::kNano); }; // Writes Velox vectors into a DataSink using Arrow Parquet writer. @@ -158,6 +161,8 @@ class Writer : public dwio::common::Writer { std::unique_ptr flushPolicy_; const RowTypePtr schema_; + + ArrowOptions options_{.flattenDictionary = true, .flattenConstant = true}; }; class ParquetWriterFactory : public dwio::common::WriterFactory { diff --git a/velox/exec/tests/ArrowStreamTest.cpp b/velox/exec/tests/ArrowStreamTest.cpp index f0fe9b37e04e0..6846c5d7f09c0 100644 --- a/velox/exec/tests/ArrowStreamTest.cpp +++ b/velox/exec/tests/ArrowStreamTest.cpp @@ -45,7 +45,7 @@ class ArrowStreamTest : public OperatorTestBase { int getNext(struct ArrowArray* outArray) { if (vectorIndex_ < vectors_.size()) { - exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get()); + exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get(), options_); vectorIndex_ += 1; } else { // End of stream. Mark the array released. @@ -56,12 +56,13 @@ class ArrowStreamTest : public OperatorTestBase { } int getArrowSchema(ArrowSchema& out) { - exportToArrow(BaseVector::create(type_, 0, pool_.get()), out); + exportToArrow(BaseVector::create(type_, 0, pool_.get()), out, options_); return failGetSchema_ ? (int)ErrorCode::kGetSchemaFailed : (int)ErrorCode::kNoError; } private: + ArrowOptions options_; const std::shared_ptr pool_; const std::vector& vectors_; const TypePtr type_; @@ -184,7 +185,11 @@ TEST_F(ArrowStreamTest, basic) { return StringView::makeInline( std::to_string(100000000000 + (uint64_t)(row % 100))); }, - nullEvery(7))})); + nullEvery(7)), + makeFlatVector( + size, + [&](vector_size_t row) { return Timestamp(row, row * 1000); }, + nullEvery(5))})); } createDuckDbTable(vectors); auto type = asRowType(vectors[0]->type()); diff --git a/velox/vector/arrow/Bridge.cpp b/velox/vector/arrow/Bridge.cpp index c90ca6640dd68..04b18dc976d06 100644 --- a/velox/vector/arrow/Bridge.cpp +++ b/velox/vector/arrow/Bridge.cpp @@ -215,6 +215,7 @@ static void releaseArrowSchema(ArrowSchema* arrowSchema) { // Returns the Arrow C data interface format type for a given Velox type. const char* exportArrowFormatStr( const TypePtr& type, + const ArrowOptions& options, std::string& formatBuffer) { if (type->isDecimal()) { // Decimal types encode the precision, scale values. @@ -253,9 +254,19 @@ const char* exportArrowFormatStr( return "z"; // binary case TypeKind::UNKNOWN: return "n"; // NullType - case TypeKind::TIMESTAMP: - return "ttn"; // time64 [nanoseconds] + switch (options.timestampUnit) { + case TimestampUnit::kSecond: + return "tss:"; + case TimestampUnit::kMilli: + return "tsm:"; + case TimestampUnit::kMicro: + return "tsu:"; + case TimestampUnit::kNano: + return "tsn:"; + default: + VELOX_UNREACHABLE(); + } // Complex/nested types. case TypeKind::ARRAY: static_assert(sizeof(vector_size_t) == 4); @@ -334,10 +345,72 @@ struct Selection { vector_size_t total_; }; +// Gather values from timestamp buffer. Nulls are skipped. +void gatherFromTimestampBuffer( + const BaseVector& vec, + const Selection& rows, + TimestampUnit unit, + Buffer& out) { + auto src = (*vec.values()).as(); + auto dst = out.asMutable(); + if (!vec.mayHaveNulls() || vec.getNullCount() == 0) { + switch (unit) { + case TimestampUnit::kSecond: + rows.apply([&](vector_size_t i) { dst[i] = src[i].getSeconds(); }); + break; + case TimestampUnit::kMilli: + rows.apply([&](vector_size_t i) { dst[i] = src[i].toMillis(); }); + break; + case TimestampUnit::kMicro: + rows.apply([&](vector_size_t i) { dst[i] = src[i].toMicros(); }); + break; + case TimestampUnit::kNano: + rows.apply([&](vector_size_t i) { dst[i] = src[i].toNanos(); }); + break; + default: + VELOX_UNREACHABLE(); + } + return; + } + switch (unit) { + case TimestampUnit::kSecond: + rows.apply([&](vector_size_t i) { + if (!vec.isNullAt(i)) { + dst[i] = src[i].getSeconds(); + } + }); + break; + case TimestampUnit::kMilli: + rows.apply([&](vector_size_t i) { + if (!vec.isNullAt(i)) { + dst[i] = src[i].toMillis(); + } + }); + break; + case TimestampUnit::kMicro: + rows.apply([&](vector_size_t i) { + if (!vec.isNullAt(i)) { + dst[i] = src[i].toMicros(); + }; + }); + break; + case TimestampUnit::kNano: + rows.apply([&](vector_size_t i) { + if (!vec.isNullAt(i)) { + dst[i] = src[i].toNanos(); + } + }); + break; + default: + VELOX_UNREACHABLE(); + } +} + void gatherFromBuffer( const Type& type, const Buffer& buf, const Selection& rows, + const ArrowOptions& options, Buffer& out) { auto src = buf.as(); auto dst = out.asMutable(); @@ -352,11 +425,6 @@ void gatherFromBuffer( int128_t value = buf.as()[i]; memcpy(dst + (j++) * sizeof(int128_t), &value, sizeof(int128_t)); }); - } else if (type.isTimestamp()) { - auto srcTs = buf.as(); - auto dstTs = out.asMutable(); - - rows.apply([&](vector_size_t i) { dstTs[j++] = srcTs[i].toNanos(); }); } else { auto typeSize = type.cppSizeInBytes(); rows.apply([&](vector_size_t i) { @@ -387,8 +455,8 @@ struct BufferViewReleaser { }; // Wraps a naked pointer using a Velox buffer view, without copying it. Adding a -// dummy releaser as the buffer lifetime is fully controled by the client of the -// API. +// dummy releaser as the buffer lifetime is fully controlled by the client of +// the API. BufferPtr wrapInBufferViewAsViewer(const void* buffer, size_t length) { static const BufferViewReleaser kViewerReleaser; return BufferView::create( @@ -475,11 +543,12 @@ VectorPtr createStringFlatVector( optionalNullCount(nullCount)); } -// This functions does two things: (a) sets the value of null_count, and (b) the -// validity buffer (if there is at least one null row). +// This functions does two things: (a) sets the value of null_count, and (b) +// the validity buffer (if there is at least one null row). void exportValidityBitmap( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -493,7 +562,7 @@ void exportValidityBitmap( // If we're only exporting a subset, create a new validity buffer. if (rows.changed()) { nulls = AlignedBuffer::allocate(out.length, pool); - gatherFromBuffer(*BOOLEAN(), *vec.nulls(), rows, *nulls); + gatherFromBuffer(*BOOLEAN(), *vec.nulls(), rows, options, *nulls); } // Set null counts. @@ -509,8 +578,8 @@ void exportValidityBitmap( } bool isFlatScalarZeroCopy(const TypePtr& type) { - // - Short decimals need to be converted to 128 bit values as they are mapped - // to Arrow Decimal128. + // - Short decimals need to be converted to 128 bit values as they are + // mapped to Arrow Decimal128. // - Velox's Timestamp representation (2x 64bit values) does not have an // equivalent in Arrow. return !type->isShortDecimal() && !type->isTimestamp(); @@ -531,6 +600,7 @@ size_t getArrowElementSize(const TypePtr& type) { void exportValues( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -548,7 +618,11 @@ void exportValues( ? AlignedBuffer::allocate(out.length, pool) : AlignedBuffer::allocate( checkedMultiply(out.length, size), pool); - gatherFromBuffer(*type, *vec.values(), rows, *values); + if (type->kind() == TypeKind::TIMESTAMP) { + gatherFromTimestampBuffer(vec, rows, options.timestampUnit, *values); + } else { + gatherFromBuffer(*type, *vec.values(), rows, options, *values); + } holder.setBuffer(1, values); } @@ -588,6 +662,7 @@ void exportStrings( void exportFlat( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -604,7 +679,7 @@ void exportFlat( case TypeKind::DOUBLE: case TypeKind::TIMESTAMP: case TypeKind::UNKNOWN: - exportValues(vec, rows, out, pool, holder); + exportValues(vec, rows, options, out, pool, holder); break; case TypeKind::VARCHAR: case TypeKind::VARBINARY: @@ -621,17 +696,17 @@ void exportFlat( void exportToArrowImpl( const BaseVector&, const Selection&, + const ArrowOptions& options, ArrowArray&, - memory::MemoryPool*, - const ArrowOptions& options); + memory::MemoryPool*); void exportRows( const RowVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder, - const ArrowOptions& options) { + VeloxToArrowBridgeHolder& holder) { out.n_buffers = 1; holder.resizeChildren(vec.childrenSize()); out.n_children = vec.childrenSize(); @@ -641,9 +716,9 @@ void exportRows( exportToArrowImpl( *vec.childAt(i)->loadedVector(), rows, + options, *holder.allocateChild(i), - pool, - options); + pool); } catch (const VeloxException&) { for (column_index_t j = 0; j < i; ++j) { // When exception is thrown, i th child is guaranteed unset. @@ -702,19 +777,19 @@ void exportOffsets( void exportArrays( const ArrayVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder, - const ArrowOptions& options) { + VeloxToArrowBridgeHolder& holder) { Selection childRows(vec.elements()->size()); exportOffsets(vec, rows, out, pool, holder, childRows); holder.resizeChildren(1); exportToArrowImpl( *vec.elements()->loadedVector(), childRows, + options, *holder.allocateChild(0), - pool, - options); + pool); out.n_children = 1; out.children = holder.getChildrenArrays(); } @@ -722,10 +797,10 @@ void exportArrays( void exportMaps( const MapVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder, - const ArrowOptions& options) { + VeloxToArrowBridgeHolder& holder) { RowVector child( pool, ROW({"key", "value"}, {vec.mapKeys()->type(), vec.mapValues()->type()}), @@ -735,7 +810,7 @@ void exportMaps( Selection childRows(child.size()); exportOffsets(vec, rows, out, pool, holder, childRows); holder.resizeChildren(1); - exportToArrowImpl(child, childRows, *holder.allocateChild(0), pool, options); + exportToArrowImpl(child, childRows, options, *holder.allocateChild(0), pool); out.n_children = 1; out.children = holder.getChildrenArrays(); } @@ -744,6 +819,7 @@ template void flattenAndExport( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -761,19 +837,20 @@ void flattenAndExport( flatVector->set(row, decoded.valueAt(row)); } }); - exportValidityBitmap(*flatVector, rows, out, pool, holder); - exportFlat(*flatVector, rows, out, pool, holder); + exportValidityBitmap(*flatVector, rows, options, out, pool, holder); + exportFlat(*flatVector, rows, options, out, pool, holder); } else { allRows.applyToSelected([&](vector_size_t row) { flatVector->set(row, decoded.valueAt(row)); }); - exportFlat(*flatVector, rows, out, pool, holder); + exportFlat(*flatVector, rows, options, out, pool, holder); } } void exportDictionary( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -781,7 +858,7 @@ void exportDictionary( out.n_children = 0; if (rows.changed()) { auto indices = AlignedBuffer::allocate(out.length, pool); - gatherFromBuffer(*INTEGER(), *vec.wrapInfo(), rows, *indices); + gatherFromBuffer(*INTEGER(), *vec.wrapInfo(), rows, options, *indices); holder.setBuffer(1, indices); } else { holder.setBuffer(1, vec.wrapInfo()); @@ -789,12 +866,13 @@ void exportDictionary( auto& values = *vec.valueVector()->loadedVector(); out.dictionary = holder.allocateDictionary(); exportToArrowImpl( - values, Selection(values.size()), *out.dictionary, pool, ArrowOptions()); + values, Selection(values.size()), options, *out.dictionary, pool); } void exportFlattenedVector( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -803,11 +881,12 @@ void exportFlattenedVector( "An unsupported nested encoding was found."); VELOX_CHECK(vec.isScalar(), "Flattening is only supported for scalar types."); VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( - flattenAndExport, vec.typeKind(), vec, rows, out, pool, holder); + flattenAndExport, vec.typeKind(), vec, rows, options, out, pool, holder); } void exportConstantValue( const BaseVector& vec, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool) { VectorPtr valuesVector; @@ -837,7 +916,7 @@ void exportConstantValue( wrapInBufferViewAsViewer(vec.valuesAsVoid(), bufferSize), vec.mayHaveNulls() ? 1 : 0); } - exportToArrowImpl(*valuesVector, selection, out, pool, ArrowOptions()); + exportToArrowImpl(*valuesVector, selection, options, out, pool); } // Velox constant vectors are exported as Arrow REE containing a single run @@ -845,6 +924,7 @@ void exportConstantValue( void exportConstant( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -856,7 +936,7 @@ void exportConstant( out.n_children = 2; holder.resizeChildren(2); out.children = holder.getChildrenArrays(); - exportConstantValue(vec, *holder.allocateChild(1), pool); + exportConstantValue(vec, options, *holder.allocateChild(1), pool); // Create the run ends child. auto* runEnds = holder.allocateChild(0); @@ -883,41 +963,41 @@ void exportConstant( void exportToArrowImpl( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, - memory::MemoryPool* pool, - const ArrowOptions& options) { + memory::MemoryPool* pool) { auto holder = std::make_unique(); out.buffers = holder->getArrowBuffers(); out.length = rows.count(); out.offset = 0; out.dictionary = nullptr; - exportValidityBitmap(vec, rows, out, pool, *holder); + exportValidityBitmap(vec, rows, options, out, pool, *holder); switch (vec.encoding()) { case VectorEncoding::Simple::FLAT: - exportFlat(vec, rows, out, pool, *holder); + exportFlat(vec, rows, options, out, pool, *holder); break; case VectorEncoding::Simple::ROW: exportRows( - *vec.asUnchecked(), rows, out, pool, *holder, options); + *vec.asUnchecked(), rows, options, out, pool, *holder); break; case VectorEncoding::Simple::ARRAY: exportArrays( - *vec.asUnchecked(), rows, out, pool, *holder, options); + *vec.asUnchecked(), rows, options, out, pool, *holder); break; case VectorEncoding::Simple::MAP: exportMaps( - *vec.asUnchecked(), rows, out, pool, *holder, options); + *vec.asUnchecked(), rows, options, out, pool, *holder); break; case VectorEncoding::Simple::DICTIONARY: options.flattenDictionary - ? exportFlattenedVector(vec, rows, out, pool, *holder) - : exportDictionary(vec, rows, out, pool, *holder); + ? exportFlattenedVector(vec, rows, options, out, pool, *holder) + : exportDictionary(vec, rows, options, out, pool, *holder); break; case VectorEncoding::Simple::CONSTANT: options.flattenConstant - ? exportFlattenedVector(vec, rows, out, pool, *holder) - : exportConstant(vec, rows, out, pool, *holder); + ? exportFlattenedVector(vec, rows, options, out, pool, *holder) + : exportConstant(vec, rows, options, out, pool, *holder); break; default: VELOX_NYI("{} cannot be exported to Arrow yet.", vec.encoding()); @@ -960,8 +1040,7 @@ TypePtr importFromArrowImpl( return VARBINARY(); case 't': // temporal types. - // Mapping it to ttn for now. - if (format[1] == 't' && format[2] == 'n') { + if (format[1] == 's') { return TIMESTAMP(); } if (format[1] == 'd' && format[2] == 'D') { @@ -1056,7 +1135,7 @@ void exportToArrow( memory::MemoryPool* pool, const ArrowOptions& options) { exportToArrowImpl( - *vector, Selection(vector->size()), arrowArray, pool, options); + *vector, Selection(vector->size()), options, arrowArray, pool); } void exportToArrow( @@ -1083,12 +1162,12 @@ void exportToArrow( // Dictionary data is flattened. Set the underlying data types. arrowSchema.dictionary = nullptr; arrowSchema.format = - exportArrowFormatStr(type, bridgeHolder->formatBuffer); + exportArrowFormatStr(type, options, bridgeHolder->formatBuffer); } else { arrowSchema.format = "i"; bridgeHolder->dictionary = std::make_unique(); arrowSchema.dictionary = bridgeHolder->dictionary.get(); - exportToArrow(vec->valueVector(), *arrowSchema.dictionary); + exportToArrow(vec->valueVector(), *arrowSchema.dictionary, options); } } else if ( vec->encoding() == VectorEncoding::Simple::CONSTANT && @@ -1107,7 +1186,7 @@ void exportToArrow( exportToArrow(valueVector, *valuesChild, options); } else { valuesChild->format = - exportArrowFormatStr(type, bridgeHolder->formatBuffer); + exportArrowFormatStr(type, options, bridgeHolder->formatBuffer); } valuesChild->name = "values"; @@ -1115,7 +1194,8 @@ void exportToArrow( 0, newArrowSchema("i", "run_ends"), arrowSchema); bridgeHolder->setChildAtIndex(1, std::move(valuesChild), arrowSchema); } else { - arrowSchema.format = exportArrowFormatStr(type, bridgeHolder->formatBuffer); + arrowSchema.format = + exportArrowFormatStr(type, options, bridgeHolder->formatBuffer); arrowSchema.dictionary = nullptr; if (type->kind() == TypeKind::MAP) { @@ -1209,6 +1289,30 @@ TypePtr importFromArrow(const ArrowSchema& arrowSchema) { namespace { +TimestampUnit getTimestampUnit(const ArrowSchema& arrowSchema) { + const char* format = arrowSchema.dictionary ? arrowSchema.dictionary->format + : arrowSchema.format; + VELOX_USER_CHECK_NOT_NULL(format); + VELOX_USER_CHECK_GE( + strlen(format), + 3, + "The arrow format string of timestamp should contain 'ts' and unit char."); + VELOX_USER_CHECK_EQ(format[0], 't', "The first character should be 't'."); + VELOX_USER_CHECK_EQ(format[1], 's', "The second character should be 's'."); + switch (format[2]) { + case 's': + return TimestampUnit::kSecond; + case 'm': + return TimestampUnit::kMilli; + case 'u': + return TimestampUnit::kMicro; + case 'n': + return TimestampUnit::kNano; + default: + VELOX_UNREACHABLE(); + } +} + VectorPtr importFromArrowImpl( ArrowSchema& arrowSchema, ArrowArray& arrowArray, @@ -1393,9 +1497,93 @@ VectorPtr createVectorFromReeArray( } } +// Set valid timestamp values according to the input and timestamp unit. +void setTimestamps( + const int64_t* input, + int64_t length, + TimestampUnit unit, + Timestamp* rawTimestamps) { + switch (unit) { + case TimestampUnit::kSecond: { + for (int64_t i = 0; i < length; ++i) { + rawTimestamps[i] = Timestamp(input[i], 0); + } + break; + } + case TimestampUnit::kMilli: { + for (int64_t i = 0; i < length; ++i) { + rawTimestamps[i] = Timestamp::fromMillis(input[i]); + } + break; + } + case TimestampUnit::kMicro: { + for (int64_t i = 0; i < length; ++i) { + rawTimestamps[i] = Timestamp::fromMicros(input[i]); + } + break; + } + case TimestampUnit::kNano: { + for (int64_t i = 0; i < length; ++i) { + rawTimestamps[i] = Timestamp::fromNanos(input[i]); + } + break; + } + default: + VELOX_UNREACHABLE(); + } +} + +// Set valid timestamp values according to the input and timestamp unit. Nulls +// are skipped. +void setTimestamps( + const int64_t* input, + BufferPtr nulls, + int64_t length, + TimestampUnit unit, + Timestamp* rawTimestamps) { + const auto* rawNulls = nulls->as(); + switch (unit) { + case TimestampUnit::kSecond: { + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp(input[i], 0); + } + } + break; + } + case TimestampUnit::kMilli: { + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp::fromMillis(input[i]); + } + } + break; + } + case TimestampUnit::kMicro: { + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp::fromMicros(input[i]); + } + } + break; + } + case TimestampUnit::kNano: { + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp::fromNanos(input[i]); + } + } + break; + } + default: + VELOX_UNREACHABLE(); + } +} + VectorPtr createTimestampVector( memory::MemoryPool* pool, const TypePtr& type, + TimestampUnit unit, BufferPtr nulls, const int64_t* input, size_t length, @@ -1403,16 +1591,9 @@ VectorPtr createTimestampVector( BufferPtr timestamps = AlignedBuffer::allocate(length, pool); auto* rawTimestamps = timestamps->asMutable(); if (nulls == nullptr) { - for (size_t i = 0; i < length; ++i) { - rawTimestamps[i] = Timestamp::fromNanos(input[i]); - } + setTimestamps(input, length, unit, rawTimestamps); } else if (length > nullCount) { - const auto* rawNulls = nulls->as(); - for (size_t i = 0; i < length; ++i) { - if (!bits::isBitNull(rawNulls, i)) { - rawTimestamps[i] = Timestamp::fromNanos(input[i]); - } - } + setTimestamps(input, nulls, length, unit, rawTimestamps); } return std::make_shared>( pool, @@ -1500,6 +1681,7 @@ VectorPtr importFromArrowImpl( return createTimestampVector( pool, type, + getTimestampUnit(arrowSchema), nulls, static_cast(arrowArray.buffers[1]), arrowArray.length, diff --git a/velox/vector/arrow/Bridge.h b/velox/vector/arrow/Bridge.h index ba30068f8957c..7d93a809881f3 100644 --- a/velox/vector/arrow/Bridge.h +++ b/velox/vector/arrow/Bridge.h @@ -25,9 +25,17 @@ struct ArrowArray; struct ArrowSchema; +enum class TimestampUnit : uint8_t { + kSecond = 0 /*10^0 second is equal to 1 second*/, + kMilli = 3 /*10^3 milliseconds are equal to 1 second*/, + kMicro = 6 /*10^6 microseconds are equal to 1 second*/, + kNano = 9 /*10^9 nanoseconds are equal to 1 second*/ +}; + struct ArrowOptions { bool flattenDictionary{false}; bool flattenConstant{false}; + TimestampUnit timestampUnit = TimestampUnit::kNano; }; namespace facebook::velox { diff --git a/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp b/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp index 04c33ec68cfc5..6fb8af3fb3392 100644 --- a/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp +++ b/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp @@ -54,7 +54,7 @@ class ArrowBridgeArrayExportTest : public testing::Test { const TypePtr& type = CppToType::create()) { auto flatVector = vectorMaker_.flatVectorNullable(inputData, type); ArrowArray arrowArray; - velox::exportToArrow(flatVector, arrowArray, pool_.get()); + velox::exportToArrow(flatVector, arrowArray, pool_.get(), options_); validateArray(inputData, arrowArray); @@ -67,7 +67,7 @@ class ArrowBridgeArrayExportTest : public testing::Test { void testArrayVector(const T& inputData) { auto arrayVector = vectorMaker_.arrayVectorNullable(inputData); ArrowArray arrowArray; - velox::exportToArrow(arrayVector, arrowArray, pool_.get()); + velox::exportToArrow(arrayVector, arrowArray, pool_.get(), options_); validateListArray(inputData, arrowArray); @@ -93,7 +93,7 @@ class ArrowBridgeArrayExportTest : public testing::Test { const VectorPtr& constantVector, const TInput& input) { ArrowArray arrowArray; - velox::exportToArrow(constantVector, arrowArray, pool_.get()); + velox::exportToArrow(constantVector, arrowArray, pool_.get(), options_); validateConstant( input, constantVector->size(), @@ -161,7 +161,8 @@ class ArrowBridgeArrayExportTest : public testing::Test { bits::isBitSet(reinterpret_cast(values), i)) << "mismatch at index " << i; } else if constexpr (std::is_same_v) { - EXPECT_EQ(inputData[i], Timestamp::fromNanos(values[i])) + EXPECT_TRUE(validateTimestamp( + inputData[i].value(), options_.timestampUnit, values[i])) << "mismatch at index " << i; } else { EXPECT_EQ(inputData[i], values[i]) << "mismatch at index " << i; @@ -353,15 +354,42 @@ class ArrowBridgeArrayExportTest : public testing::Test { } void exportToArrow(const TypePtr& type, ArrowSchema& out) { - velox::exportToArrow(BaseVector::create(type, 0, pool_.get()), out); + velox::exportToArrow( + BaseVector::create(type, 0, pool_.get()), out, options_); } // Boiler plate structures required by vectorMaker. + ArrowOptions options_; std::shared_ptr queryCtx_{std::make_shared()}; std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; core::ExecCtx execCtx_{pool_.get(), queryCtx_.get()}; facebook::velox::test::VectorMaker vectorMaker_{execCtx_.pool()}; + + private: + // Converts timestamp as bigint according to unit, and compares it with the + // actual value. + bool + validateTimestamp(Timestamp ts, TimestampUnit unit, int64_t actualValue) { + int64_t expectedValue; + switch (unit) { + case TimestampUnit::kSecond: + expectedValue = ts.getSeconds(); + break; + case TimestampUnit::kMilli: + expectedValue = ts.toMillis(); + break; + case TimestampUnit::kMicro: + expectedValue = ts.toMicros(); + break; + case TimestampUnit::kNano: + expectedValue = ts.toNanos(); + break; + default: + VELOX_UNREACHABLE(); + } + return expectedValue == actualValue; + } }; TEST_F(ArrowBridgeArrayExportTest, flatNotNull) { @@ -371,7 +399,7 @@ TEST_F(ArrowBridgeArrayExportTest, flatNotNull) { // Make sure that ArrowArray is correctly acquiring ownership, even after // the initial vector shared_ptr is gone. auto flatVector = vectorMaker_.flatVector(inputData); - velox::exportToArrow(flatVector, arrowArray, pool_.get()); + velox::exportToArrow(flatVector, arrowArray, pool_.get(), options_); } EXPECT_EQ(inputData.size(), arrowArray.length); @@ -494,16 +522,23 @@ TEST_F(ArrowBridgeArrayExportTest, flatDate) { } TEST_F(ArrowBridgeArrayExportTest, flatTimestamp) { - testFlatVector( - { - Timestamp(0, 0), - std::nullopt, - Timestamp(1699300965, 12'349), - Timestamp(-2208960000, 0), // 1900-01-01 - Timestamp(3155788800, 999'999'999), - std::nullopt, - }, - TIMESTAMP()); + for (TimestampUnit unit : + {TimestampUnit::kSecond, + TimestampUnit::kMilli, + TimestampUnit::kMicro, + TimestampUnit::kNano}) { + options_.timestampUnit = unit; + testFlatVector( + { + Timestamp(0, 0), + std::nullopt, + Timestamp(1699300965, 12'349), + Timestamp(-2208960000, 0), // 1900-01-01 + Timestamp(3155788800, 999'999'999), + std::nullopt, + }, + TIMESTAMP()); + } // Out of range. If nanosecond precision is represented in Arrow, timestamps // starting around 2263-01-01 should overflow and throw a user exception. @@ -542,7 +577,7 @@ TEST_F(ArrowBridgeArrayExportTest, rowVector) { }); ArrowArray arrowArray; - velox::exportToArrow(vector, arrowArray, pool_.get()); + velox::exportToArrow(vector, arrowArray, pool_.get(), options_); EXPECT_EQ(col1.size(), arrowArray.length); EXPECT_EQ(0, arrowArray.null_count); @@ -579,7 +614,7 @@ TEST_F(ArrowBridgeArrayExportTest, rowVectorNullable) { vector->setNullCount(3); ArrowArray arrowArray; - velox::exportToArrow(vector, arrowArray, pool_.get()); + velox::exportToArrow(vector, arrowArray, pool_.get(), options_); EXPECT_EQ(col1.size(), arrowArray.length); EXPECT_EQ(3, arrowArray.null_count); @@ -607,7 +642,8 @@ TEST_F(ArrowBridgeArrayExportTest, rowVectorNullable) { TEST_F(ArrowBridgeArrayExportTest, rowVectorEmpty) { ArrowArray arrowArray; - velox::exportToArrow(vectorMaker_.rowVector({}), arrowArray, pool_.get()); + velox::exportToArrow( + vectorMaker_.rowVector({}), arrowArray, pool_.get(), options_); EXPECT_EQ(0, arrowArray.n_children); EXPECT_EQ(1, arrowArray.n_buffers); EXPECT_EQ(nullptr, arrowArray.children); @@ -617,11 +653,12 @@ TEST_F(ArrowBridgeArrayExportTest, rowVectorEmpty) { std::shared_ptr toArrow( const VectorPtr& vec, + const ArrowOptions& options, memory::MemoryPool* pool) { ArrowSchema schema; ArrowArray array; - exportToArrow(vec, schema); - exportToArrow(vec, array, pool); + exportToArrow(vec, schema, options); + exportToArrow(vec, array, pool, options); EXPECT_OK_AND_ASSIGN(auto type, arrow::ImportType(&schema)); EXPECT_OK_AND_ASSIGN(auto ans, arrow::ImportArray(&array, type)); return ans; @@ -655,7 +692,7 @@ TEST_F(ArrowBridgeArrayExportTest, arraySimple) { TEST_F(ArrowBridgeArrayExportTest, arrayCrossValidate) { auto vec = vectorMaker_.arrayVector({{1, 2, 3}, {4, 5}}); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::list(arrow::int64())); @@ -682,8 +719,8 @@ TEST_F(ArrowBridgeArrayExportTest, arrayDictionary) { ArrowSchema schema; ArrowArray data; - velox::exportToArrow(vec, schema); - velox::exportToArrow(vec, data, vec->pool()); + velox::exportToArrow(vec, schema, options_); + velox::exportToArrow(vec, data, vec->pool(), options_); auto result = importFromArrowAsViewer(schema, data, vec->pool()); test::assertEqualVectors(result, vec); @@ -699,7 +736,7 @@ TEST_F(ArrowBridgeArrayExportTest, arrayGap) { auto sizes = makeBuffer({2, 2}); auto vec = std::make_shared( pool_.get(), ARRAY(BIGINT()), nullptr, 2, offsets, sizes, elements); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::list(arrow::int64())); @@ -724,7 +761,7 @@ TEST_F(ArrowBridgeArrayExportTest, arrayReorder) { auto sizes = makeBuffer({2, 2}); auto vec = std::make_shared( pool_.get(), ARRAY(BIGINT()), nullptr, 2, offsets, sizes, elements); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::list(arrow::int64())); @@ -754,7 +791,7 @@ TEST_F(ArrowBridgeArrayExportTest, arrayNested) { std::make_shared( pool_.get(), ARRAY(inner->type()), nullptr, 2, offsets, sizes, inner); }); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); ASSERT_EQ(*array->type(), *arrow::list(arrow::list(arrow::int64()))); auto& listArray = static_cast(*array); @@ -771,7 +808,7 @@ TEST_F(ArrowBridgeArrayExportTest, mapSimple) { auto allOnes = [](vector_size_t) { return 1; }; auto vec = vectorMaker_.mapVector(2, allOnes, allOnes, allOnes); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::map(arrow::int64(), arrow::int64())); @@ -806,7 +843,7 @@ TEST_F(ArrowBridgeArrayExportTest, mapNested) { std::make_shared( pool_.get(), type, nullptr, 2, offsets, sizes, keys, inner); }); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ( @@ -838,7 +875,7 @@ TEST_F(ArrowBridgeArrayExportTest, dictionarySimple) { allocateIndices(3, pool_.get()), 3, vectorMaker_.flatVector({1, 2, 3})); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); ASSERT_EQ(*array->type(), *arrow::dictionary(arrow::int32(), arrow::int64())); auto& dict = static_cast(*array); @@ -864,7 +901,7 @@ TEST_F(ArrowBridgeArrayExportTest, dictionaryNested) { std::make_shared( pool_.get(), ARRAY(inner->type()), nullptr, 2, offsets, sizes, inner); }); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); ASSERT_EQ( *array->type(), @@ -914,7 +951,7 @@ TEST_F(ArrowBridgeArrayExportTest, constantComplex) { TEST_F(ArrowBridgeArrayExportTest, constantCrossValidate) { auto vector = BaseVector::createConstant(VARCHAR(), "hello", 100, pool_.get()); - auto array = toArrow(vector, pool_.get()); + auto array = toArrow(vector, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); @@ -982,8 +1019,6 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { bits::clearNull(rawNulls, i); if constexpr (std::is_same_v) { bits::setBit(rawValues, i, *inputValues[i]); - } else if constexpr (std::is_same_v) { - rawValues[i] = inputValues[i]->toNanos(); } else { rawValues[i] = *inputValues[i]; } @@ -1045,21 +1080,27 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { // Takes a vector with input data, generates an input ArrowArray and Velox // Vector (using vector maker). Then converts ArrowArray into Velox vector and // assert that both Velox vectors are semantically the same. - template + template void testArrowImport( const char* format, - const std::vector>& inputValues) { + const std::vector>& inputValues) { ArrowContextHolder holder; auto arrowArray = fillArrowArray(inputValues, holder); auto arrowSchema = makeArrowSchema(format); auto output = importFromArrow(arrowSchema, arrowArray, pool_.get()); - assertVectorContent(inputValues, output, arrowArray.null_count); + if constexpr ( + std::is_same_v && std::is_same_v) { + assertTimestampVectorContent( + inputValues, output, arrowArray.null_count, format); + } else { + assertVectorContent(inputValues, output, arrowArray.null_count); + } // Buffer views are not reusable. Strings might need to create an additional // buffer, depending on the string sizes, in which case the buffers could be // reusable. So we don't check them in here. - if constexpr (!std::is_same_v) { + if constexpr (!std::is_same_v) { EXPECT_FALSE(BaseVector::isVectorWritable(output)); } else { size_t totalLength = 0; @@ -1131,8 +1172,10 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { testArrowImport("g", {-99.9, 4.3, 31.1, 129.11, -12}); testArrowImport("f", {-99.9, 4.3, 31.1, 129.11, -12}); - testArrowImport( - "ttn", {Timestamp(0, 0), std::nullopt, Timestamp(1699308257, 1234)}); + for (const std::string& tsString : {"tss:", "tsm:", "tsu:", "tsn:"}) { + testArrowImport( + tsString.data(), {0, std::nullopt, Timestamp::kMaxSeconds}); + } } template @@ -1157,7 +1200,8 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { auto output = importFromArrow(arrowSchema1, arrowArray1, pool_.get()); if constexpr ( std::is_same_v && std::is_same_v) { - assertTimestampVectorContent(inputValues, output, arrowArray1.null_count); + assertTimestampVectorContent( + inputValues, output, arrowArray1.null_count, format); } else { assertVectorContent(inputValues, output, arrowArray1.null_count); } @@ -1217,14 +1261,34 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { void assertTimestampVectorContent( const std::vector>& expectedValues, const VectorPtr& actual, - size_t nullCount) { + size_t nullCount, + const char* format) { + VELOX_USER_CHECK_GE( + strlen(format), 3, "At least three characters are expected."); std::vector> tsValues; tsValues.reserve(expectedValues.size()); for (const auto& value : expectedValues) { - if (value.has_value()) { - tsValues.emplace_back(Timestamp::fromNanos(value.value())); - } else { + if (!value.has_value()) { tsValues.emplace_back(std::nullopt); + } else { + Timestamp ts; + switch (format[2]) { + case 's': + ts = Timestamp(value.value(), 0); + break; + case 'm': + ts = Timestamp::fromMillis(value.value()); + break; + case 'u': + ts = Timestamp::fromMicros(value.value()); + break; + case 'n': + ts = Timestamp::fromNanos(value.value()); + break; + default: + VELOX_UNREACHABLE(); + } + tsValues.emplace_back(ts); } } assertVectorContent(tsValues, actual, nullCount); @@ -1342,8 +1406,8 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { ArrowSchema schema; ArrowArray data; - velox::exportToArrow(vec, schema); - velox::exportToArrow(vec, data, pool_.get()); + velox::exportToArrow(vec, schema, options_); + velox::exportToArrow(vec, data, pool_.get(), options_); ASSERT_OK_AND_ASSIGN(auto arrowType, arrow::ImportType(&schema)); ASSERT_OK_AND_ASSIGN(auto array2, arrow::ImportArray(&data, arrowType)); ASSERT_OK(array2->ValidateFull()); @@ -1582,6 +1646,7 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { EXPECT_NO_THROW(importFromArrow(arrowSchema, arrowArray, pool_.get())); } + ArrowOptions options_; std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; }; @@ -1607,7 +1672,7 @@ TEST_F(ArrowBridgeArrayImportAsViewerTest, scalar) { TEST_F(ArrowBridgeArrayImportAsViewerTest, without_nulls_buffer) { std::vector> inputValues = {1, 2, 3, 4, 5}; testImportWithoutNullsBuffer(inputValues, "l"); - testImportWithoutNullsBuffer(inputValues, "ttn"); + testImportWithoutNullsBuffer(inputValues, "tsn:"); } TEST_F(ArrowBridgeArrayImportAsViewerTest, string) { @@ -1660,7 +1725,7 @@ TEST_F(ArrowBridgeArrayImportAsOwnerTest, scalar) { TEST_F(ArrowBridgeArrayImportAsOwnerTest, without_nulls_buffer) { std::vector> inputValues = {1, 2, 3, 4, 5}; testImportWithoutNullsBuffer(inputValues, "l"); - testImportWithoutNullsBuffer(inputValues, "ttn"); + testImportWithoutNullsBuffer(inputValues, "tsn:"); } TEST_F(ArrowBridgeArrayImportAsOwnerTest, string) { diff --git a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp index 997a37c4b2137..7b2c94d962d0d 100644 --- a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp +++ b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp @@ -120,7 +120,7 @@ class ArrowBridgeSchemaExportTest : public testing::Test { 3, // index to use for the constant BaseVector::create(type, 100, pool_.get())); - velox::exportToArrow(constantVector, arrowSchema); + velox::exportToArrow(constantVector, arrowSchema, options_); EXPECT_STREQ("+r", arrowSchema.format); EXPECT_EQ(nullptr, arrowSchema.name); @@ -155,7 +155,8 @@ class ArrowBridgeSchemaExportTest : public testing::Test { } void exportToArrow(const TypePtr& type, ArrowSchema& out) { - velox::exportToArrow(BaseVector::create(type, 0, pool_.get()), out); + velox::exportToArrow( + BaseVector::create(type, 0, pool_.get()), out, options_); } ArrowSchema makeArrowSchema(const char* format) { @@ -172,6 +173,7 @@ class ArrowBridgeSchemaExportTest : public testing::Test { }; } + ArrowOptions options_; std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; }; @@ -190,7 +192,15 @@ TEST_F(ArrowBridgeSchemaExportTest, scalar) { testScalarType(VARCHAR(), "u"); testScalarType(VARBINARY(), "z"); - testScalarType(TIMESTAMP(), "ttn"); + options_.timestampUnit = TimestampUnit::kSecond; + testScalarType(TIMESTAMP(), "tss:"); + options_.timestampUnit = TimestampUnit::kMilli; + testScalarType(TIMESTAMP(), "tsm:"); + options_.timestampUnit = TimestampUnit::kMicro; + testScalarType(TIMESTAMP(), "tsu:"); + options_.timestampUnit = TimestampUnit::kNano; + testScalarType(TIMESTAMP(), "tsn:"); + testScalarType(DATE(), "tdD"); testScalarType(INTERVAL_YEAR_MONTH(), "tiM"); @@ -362,7 +372,7 @@ TEST_F(ArrowBridgeSchemaImportTest, scalar) { EXPECT_EQ(*VARBINARY(), *testSchemaImport("Z")); // Temporal. - EXPECT_EQ(*TIMESTAMP(), *testSchemaImport("ttn")); + EXPECT_EQ(*TIMESTAMP(), *testSchemaImport("tsn:")); EXPECT_EQ(*DATE(), *testSchemaImport("tdD")); EXPECT_EQ(*INTERVAL_YEAR_MONTH(), *testSchemaImport("tiM")); @@ -376,7 +386,7 @@ TEST_F(ArrowBridgeSchemaImportTest, scalar) { TEST_F(ArrowBridgeSchemaImportTest, complexTypes) { // Array. EXPECT_EQ(*ARRAY(BIGINT()), *testSchemaImportComplex("+l", {"l"})); - EXPECT_EQ(*ARRAY(TIMESTAMP()), *testSchemaImportComplex("+l", {"ttn"})); + EXPECT_EQ(*ARRAY(TIMESTAMP()), *testSchemaImportComplex("+l", {"tsn:"})); EXPECT_EQ(*ARRAY(DATE()), *testSchemaImportComplex("+l", {"tdD"})); EXPECT_EQ( *ARRAY(INTERVAL_YEAR_MONTH()), *testSchemaImportComplex("+l", {"tiM"})); @@ -442,9 +452,11 @@ class ArrowBridgeSchemaTest : public testing::Test { } void exportToArrow(const TypePtr& type, ArrowSchema& out) { - velox::exportToArrow(BaseVector::create(type, 0, pool_.get()), out); + velox::exportToArrow( + BaseVector::create(type, 0, pool_.get()), out, options_); } + ArrowOptions options_; std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; }; @@ -521,7 +533,7 @@ TEST_F(ArrowBridgeSchemaImportTest, dictionaryTypeTest) { *testSchemaDictionaryImport( "i", makeComplexArrowSchema( - schemas, schemaPtrs, mapSchemas, mapSchemaPtrs, "+l", {"ttn"}))); + schemas, schemaPtrs, mapSchemas, mapSchemaPtrs, "+l", {"tsn:"}))); EXPECT_EQ( *ARRAY(DATE()), *testSchemaDictionaryImport(