From bb74060624adc8751e3917ce5585639cf52f9f79 Mon Sep 17 00:00:00 2001 From: rui-mo Date: Fri, 17 Nov 2023 10:44:32 +0800 Subject: [PATCH] Support timestamp units in arrow bridge --- velox/connectors/hive/HiveConfig.cpp | 11 + velox/connectors/hive/HiveConfig.h | 10 + velox/connectors/hive/HiveDataSink.cpp | 2 + velox/docs/configs.rst | 6 + velox/dwio/common/Options.h | 1 + .../tests/writer/ParquetWriterTest.cpp | 98 ++++++ velox/dwio/parquet/writer/Writer.cpp | 15 +- velox/dwio/parquet/writer/Writer.h | 5 + velox/exec/tests/ArrowStreamTest.cpp | 11 +- velox/vector/arrow/Bridge.cpp | 310 ++++++++++++++---- velox/vector/arrow/Bridge.h | 8 + .../arrow/tests/ArrowBridgeArrayTest.cpp | 163 ++++++--- .../arrow/tests/ArrowBridgeSchemaTest.cpp | 26 +- 13 files changed, 539 insertions(+), 127 deletions(-) diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index e6048f0d6aed..ae88c700f803 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -206,4 +206,15 @@ bool HiveConfig::s3UseProxyFromEnv() const { return config_->get(kS3UseProxyFromEnv, false); } +uint8_t HiveConfig::parquetWriteTimestampUnit(const Config* session) const { + const auto unit = session->get( + kParquetWriteTimestampUnitSession, + config_->get(kParquetWriteTimestampUnit, 9 /*nano*/)); + VELOX_CHECK( + unit == 0 /*second*/ || unit == 3 /*milli*/ || unit == 6 /*micro*/ || + unit == 9, + "Invalid timestamp unit."); + return unit; +} + } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/HiveConfig.h b/velox/connectors/hive/HiveConfig.h index e50b41157036..e1b0a3d2a19c 100644 --- a/velox/connectors/hive/HiveConfig.h +++ b/velox/connectors/hive/HiveConfig.h @@ -178,6 +178,12 @@ class HiveConfig { static constexpr const char* kS3UseProxyFromEnv = "hive.s3.use-proxy-from-env"; + /// Timestamp unit for Parquet write through Arrow bridge. + static constexpr const char* kParquetWriteTimestampUnit = + "hive.parquet.writer.timestamp-unit"; + static constexpr const char* kParquetWriteTimestampUnitSession = + "hive.parquet.writer.timestamp_unit"; + InsertExistingPartitionsBehavior insertExistingPartitionsBehavior( const Config* session) const; @@ -247,6 +253,10 @@ class HiveConfig { bool s3UseProxyFromEnv() const; + /// Returns the timestamp unit used when writing timestamps into Parquet + /// through Arrow bridge. 0: second, 3: milli, 6: micro, 9: nano. + uint8_t parquetWriteTimestampUnit(const Config* session) const; + HiveConfig(std::shared_ptr config) { VELOX_CHECK_NOT_NULL( config, "Config is null for HiveConfig initialization"); diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 3e770d973ed1..89436ce81640 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -675,6 +675,8 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) { hiveConfig_->orcWriterMaxStripeSize(connectorSessionProperties)); options.maxDictionaryMemory = std::optional( hiveConfig_->orcWriterMaxDictionaryMemory(connectorSessionProperties)); + options.parquetWriteTimestampUnit = + hiveConfig_->parquetWriteTimestampUnit(connectorSessionProperties); options.serdeParameters = std::map( insertTableHandle_->serdeParameters().begin(), insertTableHandle_->serdeParameters().end()); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index af66d2ea9f81..e336f2b02e9c 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -488,6 +488,12 @@ Each query can override the config by setting corresponding query session proper - string - 16M - Maximum dictionary memory that can be used in orc writer. + * - hive.parquet.writer.timestamp-unit + - hive.parquet.writer.timestamp_unit + - tinyint + - 9 + - Timestamp unit used when writing timestamps into Parquet through Arrow bridge. + Valid values are 0 (second), 3 (millisecond), 6 (microsecond), 9 (nanosecond). ``Amazon S3 Configuration`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index acb852a6b06f..86eb3fda6463 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -593,6 +593,7 @@ struct WriterOptions { std::optional maxStripeSize{std::nullopt}; std::optional maxDictionaryMemory{std::nullopt}; std::map serdeParameters; + std::optional parquetWriteTimestampUnit; }; } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp index a9582e49fd07..32d1e78f5401 100644 --- a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp +++ b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp @@ -14,16 +14,38 @@ * limitations under the License. */ +#include +#include + #include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/testutil/TestValue.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/core/QueryCtx.h" #include "velox/dwio/parquet/tests/ParquetTestBase.h" +#include "velox/exec/tests/utils/Cursor.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/QueryAssertions.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" using namespace facebook::velox; using namespace facebook::velox::common; using namespace facebook::velox::dwio::common; +using namespace facebook::velox::exec::test; using namespace facebook::velox::parquet; class ParquetWriterTest : public ParquetTestBase { protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + testutil::TestValue::enable(); + auto hiveConnector = + connector::getConnectorFactory( + connector::hive::HiveConnectorFactory::kHiveConnectorName) + ->newConnector( + kHiveConnectorId, std::make_shared()); + connector::registerConnector(hiveConnector); + } + std::unique_ptr createRowReaderWithSchema( const std::unique_ptr reader, const RowTypePtr& rowType) { @@ -43,6 +65,8 @@ class ParquetWriterTest : public ParquetTestBase { std::make_shared(data), opts.getMemoryPool()), opts); }; + + inline static const std::string kHiveConnectorId = "test-hive"; }; std::vector params = { @@ -110,3 +134,77 @@ TEST_F(ParquetWriterTest, compression) { auto rowReader = createRowReaderWithSchema(std::move(reader), schema); assertReadWithReaderAndExpected(schema, *rowReader, data, *leafPool_); }; + +DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) { + SCOPED_TESTVALUE_SET( + "facebook::velox::parquet::Writer::write", + std::function( + ([&](const ::arrow::Schema* arrowSchema) { + const auto tsType = + std::dynamic_pointer_cast<::arrow::TimestampType>( + arrowSchema->field(0)->type()); + ASSERT_EQ(tsType->unit(), ::arrow::TimeUnit::MICRO); + }))); + + const auto data = makeRowVector({makeFlatVector( + 10'000, [](auto row) { return Timestamp(row, row); })}); + parquet::WriterOptions writerOptions; + writerOptions.memoryPool = leafPool_.get(); + writerOptions.parquetWriteTimestampUnit = + static_cast(TimestampUnit::kMicro); + + // Create an in-memory writer. + auto sink = std::make_unique( + 200 * 1024 * 1024, + dwio::common::FileSink::Options{.pool = leafPool_.get()}); + auto writer = std::make_unique( + std::move(sink), writerOptions, rootPool_, ROW({"c0"}, {TIMESTAMP()})); + writer->write(data); + writer->close(); +}; + +DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromHiveConfig) { + SCOPED_TESTVALUE_SET( + "facebook::velox::parquet::Writer::write", + std::function( + ([&](const ::arrow::Schema* arrowSchema) { + const auto tsType = + std::dynamic_pointer_cast<::arrow::TimestampType>( + arrowSchema->field(0)->type()); + ASSERT_EQ(tsType->unit(), ::arrow::TimeUnit::MICRO); + }))); + + const auto data = makeRowVector({makeFlatVector( + 10'000, [](auto row) { return Timestamp(row, row); })}); + const auto outputDirectory = TempDirectoryPath::create(); + const auto plan = + PlanBuilder() + .values({data}) + .tableWrite(outputDirectory->path, dwio::common::FileFormat::PARQUET) + .planNode(); + + CursorParameters params; + std::shared_ptr executor = + std::make_shared( + std::thread::hardware_concurrency()); + std::shared_ptr queryCtx = + std::make_shared(executor.get()); + std::unordered_map session = { + {std::string( + connector::hive::HiveConfig::kParquetWriteTimestampUnitSession), + "6" /*kMicro*/}}; + queryCtx->setConnectorSessionOverridesUnsafe( + kHiveConnectorId, std::move(session)); + params.queryCtx = queryCtx; + params.planNode = plan; + + auto addSplits = [&](exec::Task* task) {}; + auto result = readCursor(params, addSplits); + ASSERT_TRUE(waitForTaskCompletion(result.first->task().get())); +} + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::Init init{&argc, &argv, false}; + return RUN_ALL_TESTS(); +} diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index b6479189251d..49c0a84f3def 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -18,10 +18,10 @@ #include #include #include +#include "velox/common/testutil/TestValue.h" #include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/dwio/parquet/writer/arrow/Writer.h" #include "velox/exec/MemoryReclaimer.h" -#include "velox/vector/arrow/Bridge.h" namespace facebook::velox::parquet { @@ -234,6 +234,8 @@ Writer::Writer( } else { flushPolicy_ = std::make_unique(); } + options_.timestampUnit = + static_cast(options.parquetWriteTimestampUnit); arrowContext_->properties = getArrowParquetWriterOptions(options, flushPolicy_); setMemoryReclaimers(); @@ -310,15 +312,16 @@ void Writer::write(const VectorPtr& data) { data->type()->equivalent(*schema_), "The file schema type should be equal with the input rowvector type."); - ArrowOptions options{.flattenDictionary = true, .flattenConstant = true}; ArrowArray array; ArrowSchema schema; - exportToArrow(data, array, generalPool_.get(), options); - exportToArrow(data, schema, options); + exportToArrow(data, array, generalPool_.get(), options_); + exportToArrow(data, schema, options_); // Convert the arrow schema to Schema and then update the column names based // on schema_. auto arrowSchema = ::arrow::ImportSchema(&schema).ValueOrDie(); + common::testutil::TestValue::adjust( + "facebook::velox::parquet::Writer::write", arrowSchema.get()); std::vector> newFields; auto childSize = schema_->size(); for (auto i = 0; i < childSize; i++) { @@ -386,6 +389,10 @@ parquet::WriterOptions getParquetOptions( if (options.compressionKind.has_value()) { parquetOptions.compression = options.compressionKind.value(); } + if (options.parquetWriteTimestampUnit.has_value()) { + parquetOptions.parquetWriteTimestampUnit = + options.parquetWriteTimestampUnit.value(); + } return parquetOptions; } diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index 6065366bce6b..7f1886708a2b 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -26,6 +26,7 @@ #include "velox/dwio/parquet/writer/arrow/Types.h" #include "velox/dwio/parquet/writer/arrow/util/Compression.h" #include "velox/vector/ComplexVector.h" +#include "velox/vector/arrow/Bridge.h" namespace facebook::velox::parquet { @@ -102,6 +103,8 @@ struct WriterOptions { std::shared_ptr codecOptions; std::unordered_map columnCompressionsMap; + uint8_t parquetWriteTimestampUnit = + static_cast(TimestampUnit::kNano); }; // Writes Velox vectors into a DataSink using Arrow Parquet writer. @@ -158,6 +161,8 @@ class Writer : public dwio::common::Writer { std::unique_ptr flushPolicy_; const RowTypePtr schema_; + + ArrowOptions options_{.flattenDictionary = true, .flattenConstant = true}; }; class ParquetWriterFactory : public dwio::common::WriterFactory { diff --git a/velox/exec/tests/ArrowStreamTest.cpp b/velox/exec/tests/ArrowStreamTest.cpp index f0fe9b37e04e..6846c5d7f09c 100644 --- a/velox/exec/tests/ArrowStreamTest.cpp +++ b/velox/exec/tests/ArrowStreamTest.cpp @@ -45,7 +45,7 @@ class ArrowStreamTest : public OperatorTestBase { int getNext(struct ArrowArray* outArray) { if (vectorIndex_ < vectors_.size()) { - exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get()); + exportToArrow(vectors_[vectorIndex_], *outArray, pool_.get(), options_); vectorIndex_ += 1; } else { // End of stream. Mark the array released. @@ -56,12 +56,13 @@ class ArrowStreamTest : public OperatorTestBase { } int getArrowSchema(ArrowSchema& out) { - exportToArrow(BaseVector::create(type_, 0, pool_.get()), out); + exportToArrow(BaseVector::create(type_, 0, pool_.get()), out, options_); return failGetSchema_ ? (int)ErrorCode::kGetSchemaFailed : (int)ErrorCode::kNoError; } private: + ArrowOptions options_; const std::shared_ptr pool_; const std::vector& vectors_; const TypePtr type_; @@ -184,7 +185,11 @@ TEST_F(ArrowStreamTest, basic) { return StringView::makeInline( std::to_string(100000000000 + (uint64_t)(row % 100))); }, - nullEvery(7))})); + nullEvery(7)), + makeFlatVector( + size, + [&](vector_size_t row) { return Timestamp(row, row * 1000); }, + nullEvery(5))})); } createDuckDbTable(vectors); auto type = asRowType(vectors[0]->type()); diff --git a/velox/vector/arrow/Bridge.cpp b/velox/vector/arrow/Bridge.cpp index c90ca6640dd6..04b18dc976d0 100644 --- a/velox/vector/arrow/Bridge.cpp +++ b/velox/vector/arrow/Bridge.cpp @@ -215,6 +215,7 @@ static void releaseArrowSchema(ArrowSchema* arrowSchema) { // Returns the Arrow C data interface format type for a given Velox type. const char* exportArrowFormatStr( const TypePtr& type, + const ArrowOptions& options, std::string& formatBuffer) { if (type->isDecimal()) { // Decimal types encode the precision, scale values. @@ -253,9 +254,19 @@ const char* exportArrowFormatStr( return "z"; // binary case TypeKind::UNKNOWN: return "n"; // NullType - case TypeKind::TIMESTAMP: - return "ttn"; // time64 [nanoseconds] + switch (options.timestampUnit) { + case TimestampUnit::kSecond: + return "tss:"; + case TimestampUnit::kMilli: + return "tsm:"; + case TimestampUnit::kMicro: + return "tsu:"; + case TimestampUnit::kNano: + return "tsn:"; + default: + VELOX_UNREACHABLE(); + } // Complex/nested types. case TypeKind::ARRAY: static_assert(sizeof(vector_size_t) == 4); @@ -334,10 +345,72 @@ struct Selection { vector_size_t total_; }; +// Gather values from timestamp buffer. Nulls are skipped. +void gatherFromTimestampBuffer( + const BaseVector& vec, + const Selection& rows, + TimestampUnit unit, + Buffer& out) { + auto src = (*vec.values()).as(); + auto dst = out.asMutable(); + if (!vec.mayHaveNulls() || vec.getNullCount() == 0) { + switch (unit) { + case TimestampUnit::kSecond: + rows.apply([&](vector_size_t i) { dst[i] = src[i].getSeconds(); }); + break; + case TimestampUnit::kMilli: + rows.apply([&](vector_size_t i) { dst[i] = src[i].toMillis(); }); + break; + case TimestampUnit::kMicro: + rows.apply([&](vector_size_t i) { dst[i] = src[i].toMicros(); }); + break; + case TimestampUnit::kNano: + rows.apply([&](vector_size_t i) { dst[i] = src[i].toNanos(); }); + break; + default: + VELOX_UNREACHABLE(); + } + return; + } + switch (unit) { + case TimestampUnit::kSecond: + rows.apply([&](vector_size_t i) { + if (!vec.isNullAt(i)) { + dst[i] = src[i].getSeconds(); + } + }); + break; + case TimestampUnit::kMilli: + rows.apply([&](vector_size_t i) { + if (!vec.isNullAt(i)) { + dst[i] = src[i].toMillis(); + } + }); + break; + case TimestampUnit::kMicro: + rows.apply([&](vector_size_t i) { + if (!vec.isNullAt(i)) { + dst[i] = src[i].toMicros(); + }; + }); + break; + case TimestampUnit::kNano: + rows.apply([&](vector_size_t i) { + if (!vec.isNullAt(i)) { + dst[i] = src[i].toNanos(); + } + }); + break; + default: + VELOX_UNREACHABLE(); + } +} + void gatherFromBuffer( const Type& type, const Buffer& buf, const Selection& rows, + const ArrowOptions& options, Buffer& out) { auto src = buf.as(); auto dst = out.asMutable(); @@ -352,11 +425,6 @@ void gatherFromBuffer( int128_t value = buf.as()[i]; memcpy(dst + (j++) * sizeof(int128_t), &value, sizeof(int128_t)); }); - } else if (type.isTimestamp()) { - auto srcTs = buf.as(); - auto dstTs = out.asMutable(); - - rows.apply([&](vector_size_t i) { dstTs[j++] = srcTs[i].toNanos(); }); } else { auto typeSize = type.cppSizeInBytes(); rows.apply([&](vector_size_t i) { @@ -387,8 +455,8 @@ struct BufferViewReleaser { }; // Wraps a naked pointer using a Velox buffer view, without copying it. Adding a -// dummy releaser as the buffer lifetime is fully controled by the client of the -// API. +// dummy releaser as the buffer lifetime is fully controlled by the client of +// the API. BufferPtr wrapInBufferViewAsViewer(const void* buffer, size_t length) { static const BufferViewReleaser kViewerReleaser; return BufferView::create( @@ -475,11 +543,12 @@ VectorPtr createStringFlatVector( optionalNullCount(nullCount)); } -// This functions does two things: (a) sets the value of null_count, and (b) the -// validity buffer (if there is at least one null row). +// This functions does two things: (a) sets the value of null_count, and (b) +// the validity buffer (if there is at least one null row). void exportValidityBitmap( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -493,7 +562,7 @@ void exportValidityBitmap( // If we're only exporting a subset, create a new validity buffer. if (rows.changed()) { nulls = AlignedBuffer::allocate(out.length, pool); - gatherFromBuffer(*BOOLEAN(), *vec.nulls(), rows, *nulls); + gatherFromBuffer(*BOOLEAN(), *vec.nulls(), rows, options, *nulls); } // Set null counts. @@ -509,8 +578,8 @@ void exportValidityBitmap( } bool isFlatScalarZeroCopy(const TypePtr& type) { - // - Short decimals need to be converted to 128 bit values as they are mapped - // to Arrow Decimal128. + // - Short decimals need to be converted to 128 bit values as they are + // mapped to Arrow Decimal128. // - Velox's Timestamp representation (2x 64bit values) does not have an // equivalent in Arrow. return !type->isShortDecimal() && !type->isTimestamp(); @@ -531,6 +600,7 @@ size_t getArrowElementSize(const TypePtr& type) { void exportValues( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -548,7 +618,11 @@ void exportValues( ? AlignedBuffer::allocate(out.length, pool) : AlignedBuffer::allocate( checkedMultiply(out.length, size), pool); - gatherFromBuffer(*type, *vec.values(), rows, *values); + if (type->kind() == TypeKind::TIMESTAMP) { + gatherFromTimestampBuffer(vec, rows, options.timestampUnit, *values); + } else { + gatherFromBuffer(*type, *vec.values(), rows, options, *values); + } holder.setBuffer(1, values); } @@ -588,6 +662,7 @@ void exportStrings( void exportFlat( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -604,7 +679,7 @@ void exportFlat( case TypeKind::DOUBLE: case TypeKind::TIMESTAMP: case TypeKind::UNKNOWN: - exportValues(vec, rows, out, pool, holder); + exportValues(vec, rows, options, out, pool, holder); break; case TypeKind::VARCHAR: case TypeKind::VARBINARY: @@ -621,17 +696,17 @@ void exportFlat( void exportToArrowImpl( const BaseVector&, const Selection&, + const ArrowOptions& options, ArrowArray&, - memory::MemoryPool*, - const ArrowOptions& options); + memory::MemoryPool*); void exportRows( const RowVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder, - const ArrowOptions& options) { + VeloxToArrowBridgeHolder& holder) { out.n_buffers = 1; holder.resizeChildren(vec.childrenSize()); out.n_children = vec.childrenSize(); @@ -641,9 +716,9 @@ void exportRows( exportToArrowImpl( *vec.childAt(i)->loadedVector(), rows, + options, *holder.allocateChild(i), - pool, - options); + pool); } catch (const VeloxException&) { for (column_index_t j = 0; j < i; ++j) { // When exception is thrown, i th child is guaranteed unset. @@ -702,19 +777,19 @@ void exportOffsets( void exportArrays( const ArrayVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder, - const ArrowOptions& options) { + VeloxToArrowBridgeHolder& holder) { Selection childRows(vec.elements()->size()); exportOffsets(vec, rows, out, pool, holder, childRows); holder.resizeChildren(1); exportToArrowImpl( *vec.elements()->loadedVector(), childRows, + options, *holder.allocateChild(0), - pool, - options); + pool); out.n_children = 1; out.children = holder.getChildrenArrays(); } @@ -722,10 +797,10 @@ void exportArrays( void exportMaps( const MapVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, - VeloxToArrowBridgeHolder& holder, - const ArrowOptions& options) { + VeloxToArrowBridgeHolder& holder) { RowVector child( pool, ROW({"key", "value"}, {vec.mapKeys()->type(), vec.mapValues()->type()}), @@ -735,7 +810,7 @@ void exportMaps( Selection childRows(child.size()); exportOffsets(vec, rows, out, pool, holder, childRows); holder.resizeChildren(1); - exportToArrowImpl(child, childRows, *holder.allocateChild(0), pool, options); + exportToArrowImpl(child, childRows, options, *holder.allocateChild(0), pool); out.n_children = 1; out.children = holder.getChildrenArrays(); } @@ -744,6 +819,7 @@ template void flattenAndExport( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -761,19 +837,20 @@ void flattenAndExport( flatVector->set(row, decoded.valueAt(row)); } }); - exportValidityBitmap(*flatVector, rows, out, pool, holder); - exportFlat(*flatVector, rows, out, pool, holder); + exportValidityBitmap(*flatVector, rows, options, out, pool, holder); + exportFlat(*flatVector, rows, options, out, pool, holder); } else { allRows.applyToSelected([&](vector_size_t row) { flatVector->set(row, decoded.valueAt(row)); }); - exportFlat(*flatVector, rows, out, pool, holder); + exportFlat(*flatVector, rows, options, out, pool, holder); } } void exportDictionary( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -781,7 +858,7 @@ void exportDictionary( out.n_children = 0; if (rows.changed()) { auto indices = AlignedBuffer::allocate(out.length, pool); - gatherFromBuffer(*INTEGER(), *vec.wrapInfo(), rows, *indices); + gatherFromBuffer(*INTEGER(), *vec.wrapInfo(), rows, options, *indices); holder.setBuffer(1, indices); } else { holder.setBuffer(1, vec.wrapInfo()); @@ -789,12 +866,13 @@ void exportDictionary( auto& values = *vec.valueVector()->loadedVector(); out.dictionary = holder.allocateDictionary(); exportToArrowImpl( - values, Selection(values.size()), *out.dictionary, pool, ArrowOptions()); + values, Selection(values.size()), options, *out.dictionary, pool); } void exportFlattenedVector( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -803,11 +881,12 @@ void exportFlattenedVector( "An unsupported nested encoding was found."); VELOX_CHECK(vec.isScalar(), "Flattening is only supported for scalar types."); VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( - flattenAndExport, vec.typeKind(), vec, rows, out, pool, holder); + flattenAndExport, vec.typeKind(), vec, rows, options, out, pool, holder); } void exportConstantValue( const BaseVector& vec, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool) { VectorPtr valuesVector; @@ -837,7 +916,7 @@ void exportConstantValue( wrapInBufferViewAsViewer(vec.valuesAsVoid(), bufferSize), vec.mayHaveNulls() ? 1 : 0); } - exportToArrowImpl(*valuesVector, selection, out, pool, ArrowOptions()); + exportToArrowImpl(*valuesVector, selection, options, out, pool); } // Velox constant vectors are exported as Arrow REE containing a single run @@ -845,6 +924,7 @@ void exportConstantValue( void exportConstant( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, memory::MemoryPool* pool, VeloxToArrowBridgeHolder& holder) { @@ -856,7 +936,7 @@ void exportConstant( out.n_children = 2; holder.resizeChildren(2); out.children = holder.getChildrenArrays(); - exportConstantValue(vec, *holder.allocateChild(1), pool); + exportConstantValue(vec, options, *holder.allocateChild(1), pool); // Create the run ends child. auto* runEnds = holder.allocateChild(0); @@ -883,41 +963,41 @@ void exportConstant( void exportToArrowImpl( const BaseVector& vec, const Selection& rows, + const ArrowOptions& options, ArrowArray& out, - memory::MemoryPool* pool, - const ArrowOptions& options) { + memory::MemoryPool* pool) { auto holder = std::make_unique(); out.buffers = holder->getArrowBuffers(); out.length = rows.count(); out.offset = 0; out.dictionary = nullptr; - exportValidityBitmap(vec, rows, out, pool, *holder); + exportValidityBitmap(vec, rows, options, out, pool, *holder); switch (vec.encoding()) { case VectorEncoding::Simple::FLAT: - exportFlat(vec, rows, out, pool, *holder); + exportFlat(vec, rows, options, out, pool, *holder); break; case VectorEncoding::Simple::ROW: exportRows( - *vec.asUnchecked(), rows, out, pool, *holder, options); + *vec.asUnchecked(), rows, options, out, pool, *holder); break; case VectorEncoding::Simple::ARRAY: exportArrays( - *vec.asUnchecked(), rows, out, pool, *holder, options); + *vec.asUnchecked(), rows, options, out, pool, *holder); break; case VectorEncoding::Simple::MAP: exportMaps( - *vec.asUnchecked(), rows, out, pool, *holder, options); + *vec.asUnchecked(), rows, options, out, pool, *holder); break; case VectorEncoding::Simple::DICTIONARY: options.flattenDictionary - ? exportFlattenedVector(vec, rows, out, pool, *holder) - : exportDictionary(vec, rows, out, pool, *holder); + ? exportFlattenedVector(vec, rows, options, out, pool, *holder) + : exportDictionary(vec, rows, options, out, pool, *holder); break; case VectorEncoding::Simple::CONSTANT: options.flattenConstant - ? exportFlattenedVector(vec, rows, out, pool, *holder) - : exportConstant(vec, rows, out, pool, *holder); + ? exportFlattenedVector(vec, rows, options, out, pool, *holder) + : exportConstant(vec, rows, options, out, pool, *holder); break; default: VELOX_NYI("{} cannot be exported to Arrow yet.", vec.encoding()); @@ -960,8 +1040,7 @@ TypePtr importFromArrowImpl( return VARBINARY(); case 't': // temporal types. - // Mapping it to ttn for now. - if (format[1] == 't' && format[2] == 'n') { + if (format[1] == 's') { return TIMESTAMP(); } if (format[1] == 'd' && format[2] == 'D') { @@ -1056,7 +1135,7 @@ void exportToArrow( memory::MemoryPool* pool, const ArrowOptions& options) { exportToArrowImpl( - *vector, Selection(vector->size()), arrowArray, pool, options); + *vector, Selection(vector->size()), options, arrowArray, pool); } void exportToArrow( @@ -1083,12 +1162,12 @@ void exportToArrow( // Dictionary data is flattened. Set the underlying data types. arrowSchema.dictionary = nullptr; arrowSchema.format = - exportArrowFormatStr(type, bridgeHolder->formatBuffer); + exportArrowFormatStr(type, options, bridgeHolder->formatBuffer); } else { arrowSchema.format = "i"; bridgeHolder->dictionary = std::make_unique(); arrowSchema.dictionary = bridgeHolder->dictionary.get(); - exportToArrow(vec->valueVector(), *arrowSchema.dictionary); + exportToArrow(vec->valueVector(), *arrowSchema.dictionary, options); } } else if ( vec->encoding() == VectorEncoding::Simple::CONSTANT && @@ -1107,7 +1186,7 @@ void exportToArrow( exportToArrow(valueVector, *valuesChild, options); } else { valuesChild->format = - exportArrowFormatStr(type, bridgeHolder->formatBuffer); + exportArrowFormatStr(type, options, bridgeHolder->formatBuffer); } valuesChild->name = "values"; @@ -1115,7 +1194,8 @@ void exportToArrow( 0, newArrowSchema("i", "run_ends"), arrowSchema); bridgeHolder->setChildAtIndex(1, std::move(valuesChild), arrowSchema); } else { - arrowSchema.format = exportArrowFormatStr(type, bridgeHolder->formatBuffer); + arrowSchema.format = + exportArrowFormatStr(type, options, bridgeHolder->formatBuffer); arrowSchema.dictionary = nullptr; if (type->kind() == TypeKind::MAP) { @@ -1209,6 +1289,30 @@ TypePtr importFromArrow(const ArrowSchema& arrowSchema) { namespace { +TimestampUnit getTimestampUnit(const ArrowSchema& arrowSchema) { + const char* format = arrowSchema.dictionary ? arrowSchema.dictionary->format + : arrowSchema.format; + VELOX_USER_CHECK_NOT_NULL(format); + VELOX_USER_CHECK_GE( + strlen(format), + 3, + "The arrow format string of timestamp should contain 'ts' and unit char."); + VELOX_USER_CHECK_EQ(format[0], 't', "The first character should be 't'."); + VELOX_USER_CHECK_EQ(format[1], 's', "The second character should be 's'."); + switch (format[2]) { + case 's': + return TimestampUnit::kSecond; + case 'm': + return TimestampUnit::kMilli; + case 'u': + return TimestampUnit::kMicro; + case 'n': + return TimestampUnit::kNano; + default: + VELOX_UNREACHABLE(); + } +} + VectorPtr importFromArrowImpl( ArrowSchema& arrowSchema, ArrowArray& arrowArray, @@ -1393,9 +1497,93 @@ VectorPtr createVectorFromReeArray( } } +// Set valid timestamp values according to the input and timestamp unit. +void setTimestamps( + const int64_t* input, + int64_t length, + TimestampUnit unit, + Timestamp* rawTimestamps) { + switch (unit) { + case TimestampUnit::kSecond: { + for (int64_t i = 0; i < length; ++i) { + rawTimestamps[i] = Timestamp(input[i], 0); + } + break; + } + case TimestampUnit::kMilli: { + for (int64_t i = 0; i < length; ++i) { + rawTimestamps[i] = Timestamp::fromMillis(input[i]); + } + break; + } + case TimestampUnit::kMicro: { + for (int64_t i = 0; i < length; ++i) { + rawTimestamps[i] = Timestamp::fromMicros(input[i]); + } + break; + } + case TimestampUnit::kNano: { + for (int64_t i = 0; i < length; ++i) { + rawTimestamps[i] = Timestamp::fromNanos(input[i]); + } + break; + } + default: + VELOX_UNREACHABLE(); + } +} + +// Set valid timestamp values according to the input and timestamp unit. Nulls +// are skipped. +void setTimestamps( + const int64_t* input, + BufferPtr nulls, + int64_t length, + TimestampUnit unit, + Timestamp* rawTimestamps) { + const auto* rawNulls = nulls->as(); + switch (unit) { + case TimestampUnit::kSecond: { + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp(input[i], 0); + } + } + break; + } + case TimestampUnit::kMilli: { + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp::fromMillis(input[i]); + } + } + break; + } + case TimestampUnit::kMicro: { + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp::fromMicros(input[i]); + } + } + break; + } + case TimestampUnit::kNano: { + for (size_t i = 0; i < length; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + rawTimestamps[i] = Timestamp::fromNanos(input[i]); + } + } + break; + } + default: + VELOX_UNREACHABLE(); + } +} + VectorPtr createTimestampVector( memory::MemoryPool* pool, const TypePtr& type, + TimestampUnit unit, BufferPtr nulls, const int64_t* input, size_t length, @@ -1403,16 +1591,9 @@ VectorPtr createTimestampVector( BufferPtr timestamps = AlignedBuffer::allocate(length, pool); auto* rawTimestamps = timestamps->asMutable(); if (nulls == nullptr) { - for (size_t i = 0; i < length; ++i) { - rawTimestamps[i] = Timestamp::fromNanos(input[i]); - } + setTimestamps(input, length, unit, rawTimestamps); } else if (length > nullCount) { - const auto* rawNulls = nulls->as(); - for (size_t i = 0; i < length; ++i) { - if (!bits::isBitNull(rawNulls, i)) { - rawTimestamps[i] = Timestamp::fromNanos(input[i]); - } - } + setTimestamps(input, nulls, length, unit, rawTimestamps); } return std::make_shared>( pool, @@ -1500,6 +1681,7 @@ VectorPtr importFromArrowImpl( return createTimestampVector( pool, type, + getTimestampUnit(arrowSchema), nulls, static_cast(arrowArray.buffers[1]), arrowArray.length, diff --git a/velox/vector/arrow/Bridge.h b/velox/vector/arrow/Bridge.h index ba30068f8957..7d93a809881f 100644 --- a/velox/vector/arrow/Bridge.h +++ b/velox/vector/arrow/Bridge.h @@ -25,9 +25,17 @@ struct ArrowArray; struct ArrowSchema; +enum class TimestampUnit : uint8_t { + kSecond = 0 /*10^0 second is equal to 1 second*/, + kMilli = 3 /*10^3 milliseconds are equal to 1 second*/, + kMicro = 6 /*10^6 microseconds are equal to 1 second*/, + kNano = 9 /*10^9 nanoseconds are equal to 1 second*/ +}; + struct ArrowOptions { bool flattenDictionary{false}; bool flattenConstant{false}; + TimestampUnit timestampUnit = TimestampUnit::kNano; }; namespace facebook::velox { diff --git a/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp b/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp index 04c33ec68cfc..6fb8af3fb339 100644 --- a/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp +++ b/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp @@ -54,7 +54,7 @@ class ArrowBridgeArrayExportTest : public testing::Test { const TypePtr& type = CppToType::create()) { auto flatVector = vectorMaker_.flatVectorNullable(inputData, type); ArrowArray arrowArray; - velox::exportToArrow(flatVector, arrowArray, pool_.get()); + velox::exportToArrow(flatVector, arrowArray, pool_.get(), options_); validateArray(inputData, arrowArray); @@ -67,7 +67,7 @@ class ArrowBridgeArrayExportTest : public testing::Test { void testArrayVector(const T& inputData) { auto arrayVector = vectorMaker_.arrayVectorNullable(inputData); ArrowArray arrowArray; - velox::exportToArrow(arrayVector, arrowArray, pool_.get()); + velox::exportToArrow(arrayVector, arrowArray, pool_.get(), options_); validateListArray(inputData, arrowArray); @@ -93,7 +93,7 @@ class ArrowBridgeArrayExportTest : public testing::Test { const VectorPtr& constantVector, const TInput& input) { ArrowArray arrowArray; - velox::exportToArrow(constantVector, arrowArray, pool_.get()); + velox::exportToArrow(constantVector, arrowArray, pool_.get(), options_); validateConstant( input, constantVector->size(), @@ -161,7 +161,8 @@ class ArrowBridgeArrayExportTest : public testing::Test { bits::isBitSet(reinterpret_cast(values), i)) << "mismatch at index " << i; } else if constexpr (std::is_same_v) { - EXPECT_EQ(inputData[i], Timestamp::fromNanos(values[i])) + EXPECT_TRUE(validateTimestamp( + inputData[i].value(), options_.timestampUnit, values[i])) << "mismatch at index " << i; } else { EXPECT_EQ(inputData[i], values[i]) << "mismatch at index " << i; @@ -353,15 +354,42 @@ class ArrowBridgeArrayExportTest : public testing::Test { } void exportToArrow(const TypePtr& type, ArrowSchema& out) { - velox::exportToArrow(BaseVector::create(type, 0, pool_.get()), out); + velox::exportToArrow( + BaseVector::create(type, 0, pool_.get()), out, options_); } // Boiler plate structures required by vectorMaker. + ArrowOptions options_; std::shared_ptr queryCtx_{std::make_shared()}; std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; core::ExecCtx execCtx_{pool_.get(), queryCtx_.get()}; facebook::velox::test::VectorMaker vectorMaker_{execCtx_.pool()}; + + private: + // Converts timestamp as bigint according to unit, and compares it with the + // actual value. + bool + validateTimestamp(Timestamp ts, TimestampUnit unit, int64_t actualValue) { + int64_t expectedValue; + switch (unit) { + case TimestampUnit::kSecond: + expectedValue = ts.getSeconds(); + break; + case TimestampUnit::kMilli: + expectedValue = ts.toMillis(); + break; + case TimestampUnit::kMicro: + expectedValue = ts.toMicros(); + break; + case TimestampUnit::kNano: + expectedValue = ts.toNanos(); + break; + default: + VELOX_UNREACHABLE(); + } + return expectedValue == actualValue; + } }; TEST_F(ArrowBridgeArrayExportTest, flatNotNull) { @@ -371,7 +399,7 @@ TEST_F(ArrowBridgeArrayExportTest, flatNotNull) { // Make sure that ArrowArray is correctly acquiring ownership, even after // the initial vector shared_ptr is gone. auto flatVector = vectorMaker_.flatVector(inputData); - velox::exportToArrow(flatVector, arrowArray, pool_.get()); + velox::exportToArrow(flatVector, arrowArray, pool_.get(), options_); } EXPECT_EQ(inputData.size(), arrowArray.length); @@ -494,16 +522,23 @@ TEST_F(ArrowBridgeArrayExportTest, flatDate) { } TEST_F(ArrowBridgeArrayExportTest, flatTimestamp) { - testFlatVector( - { - Timestamp(0, 0), - std::nullopt, - Timestamp(1699300965, 12'349), - Timestamp(-2208960000, 0), // 1900-01-01 - Timestamp(3155788800, 999'999'999), - std::nullopt, - }, - TIMESTAMP()); + for (TimestampUnit unit : + {TimestampUnit::kSecond, + TimestampUnit::kMilli, + TimestampUnit::kMicro, + TimestampUnit::kNano}) { + options_.timestampUnit = unit; + testFlatVector( + { + Timestamp(0, 0), + std::nullopt, + Timestamp(1699300965, 12'349), + Timestamp(-2208960000, 0), // 1900-01-01 + Timestamp(3155788800, 999'999'999), + std::nullopt, + }, + TIMESTAMP()); + } // Out of range. If nanosecond precision is represented in Arrow, timestamps // starting around 2263-01-01 should overflow and throw a user exception. @@ -542,7 +577,7 @@ TEST_F(ArrowBridgeArrayExportTest, rowVector) { }); ArrowArray arrowArray; - velox::exportToArrow(vector, arrowArray, pool_.get()); + velox::exportToArrow(vector, arrowArray, pool_.get(), options_); EXPECT_EQ(col1.size(), arrowArray.length); EXPECT_EQ(0, arrowArray.null_count); @@ -579,7 +614,7 @@ TEST_F(ArrowBridgeArrayExportTest, rowVectorNullable) { vector->setNullCount(3); ArrowArray arrowArray; - velox::exportToArrow(vector, arrowArray, pool_.get()); + velox::exportToArrow(vector, arrowArray, pool_.get(), options_); EXPECT_EQ(col1.size(), arrowArray.length); EXPECT_EQ(3, arrowArray.null_count); @@ -607,7 +642,8 @@ TEST_F(ArrowBridgeArrayExportTest, rowVectorNullable) { TEST_F(ArrowBridgeArrayExportTest, rowVectorEmpty) { ArrowArray arrowArray; - velox::exportToArrow(vectorMaker_.rowVector({}), arrowArray, pool_.get()); + velox::exportToArrow( + vectorMaker_.rowVector({}), arrowArray, pool_.get(), options_); EXPECT_EQ(0, arrowArray.n_children); EXPECT_EQ(1, arrowArray.n_buffers); EXPECT_EQ(nullptr, arrowArray.children); @@ -617,11 +653,12 @@ TEST_F(ArrowBridgeArrayExportTest, rowVectorEmpty) { std::shared_ptr toArrow( const VectorPtr& vec, + const ArrowOptions& options, memory::MemoryPool* pool) { ArrowSchema schema; ArrowArray array; - exportToArrow(vec, schema); - exportToArrow(vec, array, pool); + exportToArrow(vec, schema, options); + exportToArrow(vec, array, pool, options); EXPECT_OK_AND_ASSIGN(auto type, arrow::ImportType(&schema)); EXPECT_OK_AND_ASSIGN(auto ans, arrow::ImportArray(&array, type)); return ans; @@ -655,7 +692,7 @@ TEST_F(ArrowBridgeArrayExportTest, arraySimple) { TEST_F(ArrowBridgeArrayExportTest, arrayCrossValidate) { auto vec = vectorMaker_.arrayVector({{1, 2, 3}, {4, 5}}); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::list(arrow::int64())); @@ -682,8 +719,8 @@ TEST_F(ArrowBridgeArrayExportTest, arrayDictionary) { ArrowSchema schema; ArrowArray data; - velox::exportToArrow(vec, schema); - velox::exportToArrow(vec, data, vec->pool()); + velox::exportToArrow(vec, schema, options_); + velox::exportToArrow(vec, data, vec->pool(), options_); auto result = importFromArrowAsViewer(schema, data, vec->pool()); test::assertEqualVectors(result, vec); @@ -699,7 +736,7 @@ TEST_F(ArrowBridgeArrayExportTest, arrayGap) { auto sizes = makeBuffer({2, 2}); auto vec = std::make_shared( pool_.get(), ARRAY(BIGINT()), nullptr, 2, offsets, sizes, elements); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::list(arrow::int64())); @@ -724,7 +761,7 @@ TEST_F(ArrowBridgeArrayExportTest, arrayReorder) { auto sizes = makeBuffer({2, 2}); auto vec = std::make_shared( pool_.get(), ARRAY(BIGINT()), nullptr, 2, offsets, sizes, elements); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::list(arrow::int64())); @@ -754,7 +791,7 @@ TEST_F(ArrowBridgeArrayExportTest, arrayNested) { std::make_shared( pool_.get(), ARRAY(inner->type()), nullptr, 2, offsets, sizes, inner); }); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); ASSERT_EQ(*array->type(), *arrow::list(arrow::list(arrow::int64()))); auto& listArray = static_cast(*array); @@ -771,7 +808,7 @@ TEST_F(ArrowBridgeArrayExportTest, mapSimple) { auto allOnes = [](vector_size_t) { return 1; }; auto vec = vectorMaker_.mapVector(2, allOnes, allOnes, allOnes); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ(*array->type(), *arrow::map(arrow::int64(), arrow::int64())); @@ -806,7 +843,7 @@ TEST_F(ArrowBridgeArrayExportTest, mapNested) { std::make_shared( pool_.get(), type, nullptr, 2, offsets, sizes, keys, inner); }); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); ASSERT_EQ( @@ -838,7 +875,7 @@ TEST_F(ArrowBridgeArrayExportTest, dictionarySimple) { allocateIndices(3, pool_.get()), 3, vectorMaker_.flatVector({1, 2, 3})); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); ASSERT_EQ(*array->type(), *arrow::dictionary(arrow::int32(), arrow::int64())); auto& dict = static_cast(*array); @@ -864,7 +901,7 @@ TEST_F(ArrowBridgeArrayExportTest, dictionaryNested) { std::make_shared( pool_.get(), ARRAY(inner->type()), nullptr, 2, offsets, sizes, inner); }); - auto array = toArrow(vec, pool_.get()); + auto array = toArrow(vec, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); ASSERT_EQ( *array->type(), @@ -914,7 +951,7 @@ TEST_F(ArrowBridgeArrayExportTest, constantComplex) { TEST_F(ArrowBridgeArrayExportTest, constantCrossValidate) { auto vector = BaseVector::createConstant(VARCHAR(), "hello", 100, pool_.get()); - auto array = toArrow(vector, pool_.get()); + auto array = toArrow(vector, options_, pool_.get()); ASSERT_OK(array->ValidateFull()); EXPECT_EQ(array->null_count(), 0); @@ -982,8 +1019,6 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { bits::clearNull(rawNulls, i); if constexpr (std::is_same_v) { bits::setBit(rawValues, i, *inputValues[i]); - } else if constexpr (std::is_same_v) { - rawValues[i] = inputValues[i]->toNanos(); } else { rawValues[i] = *inputValues[i]; } @@ -1045,21 +1080,27 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { // Takes a vector with input data, generates an input ArrowArray and Velox // Vector (using vector maker). Then converts ArrowArray into Velox vector and // assert that both Velox vectors are semantically the same. - template + template void testArrowImport( const char* format, - const std::vector>& inputValues) { + const std::vector>& inputValues) { ArrowContextHolder holder; auto arrowArray = fillArrowArray(inputValues, holder); auto arrowSchema = makeArrowSchema(format); auto output = importFromArrow(arrowSchema, arrowArray, pool_.get()); - assertVectorContent(inputValues, output, arrowArray.null_count); + if constexpr ( + std::is_same_v && std::is_same_v) { + assertTimestampVectorContent( + inputValues, output, arrowArray.null_count, format); + } else { + assertVectorContent(inputValues, output, arrowArray.null_count); + } // Buffer views are not reusable. Strings might need to create an additional // buffer, depending on the string sizes, in which case the buffers could be // reusable. So we don't check them in here. - if constexpr (!std::is_same_v) { + if constexpr (!std::is_same_v) { EXPECT_FALSE(BaseVector::isVectorWritable(output)); } else { size_t totalLength = 0; @@ -1131,8 +1172,10 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { testArrowImport("g", {-99.9, 4.3, 31.1, 129.11, -12}); testArrowImport("f", {-99.9, 4.3, 31.1, 129.11, -12}); - testArrowImport( - "ttn", {Timestamp(0, 0), std::nullopt, Timestamp(1699308257, 1234)}); + for (const std::string& tsString : {"tss:", "tsm:", "tsu:", "tsn:"}) { + testArrowImport( + tsString.data(), {0, std::nullopt, Timestamp::kMaxSeconds}); + } } template @@ -1157,7 +1200,8 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { auto output = importFromArrow(arrowSchema1, arrowArray1, pool_.get()); if constexpr ( std::is_same_v && std::is_same_v) { - assertTimestampVectorContent(inputValues, output, arrowArray1.null_count); + assertTimestampVectorContent( + inputValues, output, arrowArray1.null_count, format); } else { assertVectorContent(inputValues, output, arrowArray1.null_count); } @@ -1217,14 +1261,34 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { void assertTimestampVectorContent( const std::vector>& expectedValues, const VectorPtr& actual, - size_t nullCount) { + size_t nullCount, + const char* format) { + VELOX_USER_CHECK_GE( + strlen(format), 3, "At least three characters are expected."); std::vector> tsValues; tsValues.reserve(expectedValues.size()); for (const auto& value : expectedValues) { - if (value.has_value()) { - tsValues.emplace_back(Timestamp::fromNanos(value.value())); - } else { + if (!value.has_value()) { tsValues.emplace_back(std::nullopt); + } else { + Timestamp ts; + switch (format[2]) { + case 's': + ts = Timestamp(value.value(), 0); + break; + case 'm': + ts = Timestamp::fromMillis(value.value()); + break; + case 'u': + ts = Timestamp::fromMicros(value.value()); + break; + case 'n': + ts = Timestamp::fromNanos(value.value()); + break; + default: + VELOX_UNREACHABLE(); + } + tsValues.emplace_back(ts); } } assertVectorContent(tsValues, actual, nullCount); @@ -1342,8 +1406,8 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { ArrowSchema schema; ArrowArray data; - velox::exportToArrow(vec, schema); - velox::exportToArrow(vec, data, pool_.get()); + velox::exportToArrow(vec, schema, options_); + velox::exportToArrow(vec, data, pool_.get(), options_); ASSERT_OK_AND_ASSIGN(auto arrowType, arrow::ImportType(&schema)); ASSERT_OK_AND_ASSIGN(auto array2, arrow::ImportArray(&data, arrowType)); ASSERT_OK(array2->ValidateFull()); @@ -1582,6 +1646,7 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { EXPECT_NO_THROW(importFromArrow(arrowSchema, arrowArray, pool_.get())); } + ArrowOptions options_; std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; }; @@ -1607,7 +1672,7 @@ TEST_F(ArrowBridgeArrayImportAsViewerTest, scalar) { TEST_F(ArrowBridgeArrayImportAsViewerTest, without_nulls_buffer) { std::vector> inputValues = {1, 2, 3, 4, 5}; testImportWithoutNullsBuffer(inputValues, "l"); - testImportWithoutNullsBuffer(inputValues, "ttn"); + testImportWithoutNullsBuffer(inputValues, "tsn:"); } TEST_F(ArrowBridgeArrayImportAsViewerTest, string) { @@ -1660,7 +1725,7 @@ TEST_F(ArrowBridgeArrayImportAsOwnerTest, scalar) { TEST_F(ArrowBridgeArrayImportAsOwnerTest, without_nulls_buffer) { std::vector> inputValues = {1, 2, 3, 4, 5}; testImportWithoutNullsBuffer(inputValues, "l"); - testImportWithoutNullsBuffer(inputValues, "ttn"); + testImportWithoutNullsBuffer(inputValues, "tsn:"); } TEST_F(ArrowBridgeArrayImportAsOwnerTest, string) { diff --git a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp index 997a37c4b213..7b2c94d962d0 100644 --- a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp +++ b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp @@ -120,7 +120,7 @@ class ArrowBridgeSchemaExportTest : public testing::Test { 3, // index to use for the constant BaseVector::create(type, 100, pool_.get())); - velox::exportToArrow(constantVector, arrowSchema); + velox::exportToArrow(constantVector, arrowSchema, options_); EXPECT_STREQ("+r", arrowSchema.format); EXPECT_EQ(nullptr, arrowSchema.name); @@ -155,7 +155,8 @@ class ArrowBridgeSchemaExportTest : public testing::Test { } void exportToArrow(const TypePtr& type, ArrowSchema& out) { - velox::exportToArrow(BaseVector::create(type, 0, pool_.get()), out); + velox::exportToArrow( + BaseVector::create(type, 0, pool_.get()), out, options_); } ArrowSchema makeArrowSchema(const char* format) { @@ -172,6 +173,7 @@ class ArrowBridgeSchemaExportTest : public testing::Test { }; } + ArrowOptions options_; std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; }; @@ -190,7 +192,15 @@ TEST_F(ArrowBridgeSchemaExportTest, scalar) { testScalarType(VARCHAR(), "u"); testScalarType(VARBINARY(), "z"); - testScalarType(TIMESTAMP(), "ttn"); + options_.timestampUnit = TimestampUnit::kSecond; + testScalarType(TIMESTAMP(), "tss:"); + options_.timestampUnit = TimestampUnit::kMilli; + testScalarType(TIMESTAMP(), "tsm:"); + options_.timestampUnit = TimestampUnit::kMicro; + testScalarType(TIMESTAMP(), "tsu:"); + options_.timestampUnit = TimestampUnit::kNano; + testScalarType(TIMESTAMP(), "tsn:"); + testScalarType(DATE(), "tdD"); testScalarType(INTERVAL_YEAR_MONTH(), "tiM"); @@ -362,7 +372,7 @@ TEST_F(ArrowBridgeSchemaImportTest, scalar) { EXPECT_EQ(*VARBINARY(), *testSchemaImport("Z")); // Temporal. - EXPECT_EQ(*TIMESTAMP(), *testSchemaImport("ttn")); + EXPECT_EQ(*TIMESTAMP(), *testSchemaImport("tsn:")); EXPECT_EQ(*DATE(), *testSchemaImport("tdD")); EXPECT_EQ(*INTERVAL_YEAR_MONTH(), *testSchemaImport("tiM")); @@ -376,7 +386,7 @@ TEST_F(ArrowBridgeSchemaImportTest, scalar) { TEST_F(ArrowBridgeSchemaImportTest, complexTypes) { // Array. EXPECT_EQ(*ARRAY(BIGINT()), *testSchemaImportComplex("+l", {"l"})); - EXPECT_EQ(*ARRAY(TIMESTAMP()), *testSchemaImportComplex("+l", {"ttn"})); + EXPECT_EQ(*ARRAY(TIMESTAMP()), *testSchemaImportComplex("+l", {"tsn:"})); EXPECT_EQ(*ARRAY(DATE()), *testSchemaImportComplex("+l", {"tdD"})); EXPECT_EQ( *ARRAY(INTERVAL_YEAR_MONTH()), *testSchemaImportComplex("+l", {"tiM"})); @@ -442,9 +452,11 @@ class ArrowBridgeSchemaTest : public testing::Test { } void exportToArrow(const TypePtr& type, ArrowSchema& out) { - velox::exportToArrow(BaseVector::create(type, 0, pool_.get()), out); + velox::exportToArrow( + BaseVector::create(type, 0, pool_.get()), out, options_); } + ArrowOptions options_; std::shared_ptr pool_{ memory::memoryManager()->addLeafPool()}; }; @@ -521,7 +533,7 @@ TEST_F(ArrowBridgeSchemaImportTest, dictionaryTypeTest) { *testSchemaDictionaryImport( "i", makeComplexArrowSchema( - schemas, schemaPtrs, mapSchemas, mapSchemaPtrs, "+l", {"ttn"}))); + schemas, schemaPtrs, mapSchemas, mapSchemaPtrs, "+l", {"tsn:"}))); EXPECT_EQ( *ARRAY(DATE()), *testSchemaDictionaryImport(