From 376876cf83ae55d15bd90c700a0a5da94a3983df Mon Sep 17 00:00:00 2001 From: zuyu Date: Wed, 13 Nov 2024 12:22:20 -0800 Subject: [PATCH] feat(Parquet): Add Int64 Timestamp support in reader --- velox/dwio/common/IntDecoder.h | 3 + velox/dwio/parquet/reader/PageReader.cpp | 12 ++ .../parquet/reader/ParquetColumnReader.cpp | 20 +++- velox/dwio/parquet/reader/ParquetReader.cpp | 5 + .../parquet/reader/TimestampColumnReader.h | 110 ++++++++++++++++-- .../parquet/tests/reader/E2EFilterTest.cpp | 25 ++++ .../tests/reader/ParquetTableScanTest.cpp | 16 +++ 7 files changed, 176 insertions(+), 15 deletions(-) diff --git a/velox/dwio/common/IntDecoder.h b/velox/dwio/common/IntDecoder.h index aeba2e2d01d86..79f7bb8d89e52 100644 --- a/velox/dwio/common/IntDecoder.h +++ b/velox/dwio/common/IntDecoder.h @@ -441,6 +441,9 @@ inline T IntDecoder::readInt() { return readLittleEndianFromBigEndian(); } else { if constexpr (std::is_same_v) { + if (numBytes_ == 8) { + return readLongLE(); + } if (numBytes_ == 12) { VELOX_DCHECK(!useVInts_, "Int96 should not be VInt encoded."); return readInt96(); diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 81f429c59a5a0..010c618370ad9 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -352,6 +352,10 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { auto numVeloxBytes = dictionary_.numValues * veloxTypeLength; dictionary_.values = AlignedBuffer::allocate(numVeloxBytes, &pool_); + } else if (type_->type()->isTimestamp()) { + const auto numVeloxBytes = dictionary_.numValues * sizeof(int128_t); + dictionary_.values = + AlignedBuffer::allocate(numVeloxBytes, &pool_); } else { dictionary_.values = AlignedBuffer::allocate(numBytes, &pool_); } @@ -374,6 +378,14 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { // We start from the end to allow in-place expansion. values[i] = parquetValues[i]; } + } else if (type_->type()->isTimestamp()) { + auto values = dictionary_.values->asMutable(); + auto parquetValues = dictionary_.values->asMutable(); + for (auto i = dictionary_.numValues - 1; i >= 0; --i) { + // Expand the Parquet type length values to Velox type length. + // We start from the end to allow in-place expansion. + values[i] = parquetValues[i]; + } } break; } diff --git a/velox/dwio/parquet/reader/ParquetColumnReader.cpp b/velox/dwio/parquet/reader/ParquetColumnReader.cpp index a87a295a787cd..8a2d505419433 100644 --- a/velox/dwio/parquet/reader/ParquetColumnReader.cpp +++ b/velox/dwio/parquet/reader/ParquetColumnReader.cpp @@ -28,6 +28,7 @@ #include "velox/dwio/parquet/reader/StringColumnReader.h" #include "velox/dwio/parquet/reader/StructColumnReader.h" #include "velox/dwio/parquet/reader/TimestampColumnReader.h" +#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h" namespace facebook::velox::parquet { @@ -75,9 +76,22 @@ std::unique_ptr ParquetColumnReader::build( return std::make_unique( requestedType, fileType, params, scanSpec); - case TypeKind::TIMESTAMP: - return std::make_unique( - requestedType, fileType, params, scanSpec); + case TypeKind::TIMESTAMP: { + const auto parquetType = + std::static_pointer_cast(fileType) + ->parquetType_; + VELOX_CHECK(parquetType); + switch (parquetType.value()) { + case thrift::Type::INT64: + return std::make_unique>( + requestedType, fileType, params, scanSpec); + case thrift::Type::INT96: + return std::make_unique>( + requestedType, fileType, params, scanSpec); + default: + VELOX_UNREACHABLE(); + } + } default: VELOX_FAIL( diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 05091f4f0e14c..f742933b49c8b 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -813,6 +813,11 @@ TypePtr ReaderBase::convertType( case thrift::Type::type::INT32: return INTEGER(); case thrift::Type::type::INT64: + // For Int64 Timestamp in nano precision + if (schemaElement.__isset.logicalType && + schemaElement.logicalType.__isset.TIMESTAMP) { + return TIMESTAMP(); + } return BIGINT(); case thrift::Type::type::INT96: return TIMESTAMP(); // INT96 only maps to a timestamp diff --git a/velox/dwio/parquet/reader/TimestampColumnReader.h b/velox/dwio/parquet/reader/TimestampColumnReader.h index b5e69fad218c3..727f81851f65e 100644 --- a/velox/dwio/parquet/reader/TimestampColumnReader.h +++ b/velox/dwio/parquet/reader/TimestampColumnReader.h @@ -18,10 +18,43 @@ #include "velox/dwio/parquet/reader/IntegerColumnReader.h" #include "velox/dwio/parquet/reader/ParquetColumnReader.h" +#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h" namespace facebook::velox::parquet { namespace { +Timestamp toInt64Timestamp(const int64_t value, const thrift::TimeUnit& unit) { + int64_t seconds = value; + int64_t nanos = 0; + if (unit.__isset.MILLIS) { + seconds /= 1'000; + nanos = value - seconds * 1'000; + if (nanos < 0) { + --seconds; + nanos += 1'000; + } + nanos *= 1'000'000; + } else if (unit.__isset.MICROS) { + seconds /= 1'000'000; + nanos = value - seconds * 1'000'000; + if (nanos < 0) { + --seconds; + nanos += 1'000'000; + } + nanos *= 1'000; + } else if (unit.__isset.NANOS) { + seconds /= 1'000'000'000; + nanos = value - seconds * 1'000'000'000; + if (nanos < 0) { + --seconds; + nanos += 1'000'000'000; + } + } else { + VELOX_UNREACHABLE(); + } + return {seconds, static_cast(nanos)}; +} + Timestamp toInt96Timestamp(const int128_t& value) { // Convert int128_t to Int96 Timestamp by extracting days and nanos. const int32_t days = static_cast(value >> 64); @@ -29,35 +62,77 @@ Timestamp toInt96Timestamp(const int128_t& value) { return Timestamp::fromDaysAndNanos(days, nanos); } -// Range filter for Parquet Int96 Timestamp. -class ParquetInt96TimestampRange final : public common::TimestampRange { +// Range filter for Parquet Timestamp. +template +class ParquetTimestampRange final : public common::TimestampRange { public: + // Use int128_t for Int96 + static_assert(std::is_same_v || std::is_same_v); + // @param lower Lower end of the range, inclusive. // @param upper Upper end of the range, inclusive. // @param nullAllowed Null values are passing the filter if true. - ParquetInt96TimestampRange( + // @param timestampPrecision Precision of the Timestamp. + ParquetTimestampRange( const Timestamp& lower, const Timestamp& upper, - bool nullAllowed) - : TimestampRange(lower, upper, nullAllowed) {} + bool nullAllowed, + const thrift::TimeUnit& timestampUnit) + : TimestampRange(lower, upper, nullAllowed), + timestampUnit_(timestampUnit) {} bool testInt128(const int128_t& value) const final { - const auto ts = toInt96Timestamp(value); + Timestamp ts; + if constexpr (std::is_same_v) { + ts = toInt64Timestamp(value, timestampUnit_); + } else if constexpr (std::is_same_v) { + ts = toInt96Timestamp(value); + } return ts >= this->lower() && ts <= this->upper(); } + + private: + // Only used when T is int64_t. + const thrift::TimeUnit timestampUnit_; }; } // namespace +template class TimestampColumnReader : public IntegerColumnReader { public: + // Use int128_t for Int96 + static_assert(std::is_same_v || std::is_same_v); + TimestampColumnReader( const TypePtr& requestedType, std::shared_ptr fileType, ParquetParams& params, common::ScanSpec& scanSpec) : IntegerColumnReader(requestedType, fileType, params, scanSpec), - timestampPrecision_(params.timestampPrecision()) {} + timestampPrecision_(params.timestampPrecision()) { + if constexpr (std::is_same_v) { + const auto logicalType = + std::static_pointer_cast(fileType_) + ->logicalType_; + VELOX_CHECK(logicalType); + VELOX_CHECK(logicalType->__isset.TIMESTAMP); + timestampUnit_ = logicalType->TIMESTAMP.unit; + + if (timestampUnit_.__isset.MILLIS) { + needsConversion_ = + timestampPrecision_ != TimestampPrecision::kMilliseconds; + } else if (timestampUnit_.__isset.MICROS) { + needsConversion_ = + timestampPrecision_ != TimestampPrecision::kMicroseconds; + } else if (timestampUnit_.__isset.NANOS) { + needsConversion_ = + timestampPrecision_ != TimestampPrecision::kNanoseconds; + } else { + VELOX_UNREACHABLE(); + } + } + } bool hasBulkPath() const override { return false; @@ -79,7 +154,15 @@ class TimestampColumnReader : public IntegerColumnReader { } const int128_t encoded = reinterpret_cast(rawValues[i]); - rawValues[i] = toInt96Timestamp(encoded).toPrecision(timestampPrecision_); + if constexpr (std::is_same_v) { + rawValues[i] = toInt64Timestamp(encoded, timestampUnit_); + if (needsConversion_) { + rawValues[i] = rawValues[i].toPrecision(timestampPrecision_); + } + } else if constexpr (std::is_same_v) { + rawValues[i] = + toInt96Timestamp(encoded).toPrecision(timestampPrecision_); + } } } @@ -93,14 +176,13 @@ class TimestampColumnReader : public IntegerColumnReader { const RowSet& rows, ExtractValues extractValues) { if (auto* range = dynamic_cast(filter)) { - // Convert TimestampRange to ParquetInt96TimestampRange. - ParquetInt96TimestampRange newRange = ParquetInt96TimestampRange( - range->lower(), range->upper(), range->nullAllowed()); + ParquetTimestampRange newRange{ + range->lower(), range->upper(), range->nullAllowed(), timestampUnit_}; this->readWithVisitor( rows, dwio::common::ColumnVisitor< int128_t, - ParquetInt96TimestampRange, + common::TimestampRange, ExtractValues, isDense>(newRange, this, rows, extractValues)); } else { @@ -129,6 +211,10 @@ class TimestampColumnReader : public IntegerColumnReader { // The requested precision can be specified from HiveConfig to read timestamp // from Parquet. const TimestampPrecision timestampPrecision_; + + // Only set when T is int64_t. + thrift::TimeUnit timestampUnit_; + bool needsConversion_ = false; }; } // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp index 7f2dc8facb4b9..6cb2be529c414 100644 --- a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp +++ b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp @@ -258,6 +258,31 @@ TEST_F(E2EFilterTest, integerDictionary) { 20); } +TEST_F(E2EFilterTest, timestampInt64Direct) { + options_.enableDictionary = false; + options_.dataPageSize = 4 * 1024; + + testWithTypes( + "timestamp_val_0:timestamp," + "timestamp_val_1:timestamp", + [&]() {}, + true, + {"timestamp_val_0", "timestamp_val_1"}, + 20); +} + +TEST_F(E2EFilterTest, timestampInt64Dictionary) { + options_.dataPageSize = 4 * 1024; + + testWithTypes( + "timestamp_val_0:timestamp," + "timestamp_val_1:timestamp", + [&]() {}, + true, + {"timestamp_val_0", "timestamp_val_1"}, + 20); +} + TEST_F(E2EFilterTest, timestampInt96Direct) { options_.enableDictionary = false; options_.dataPageSize = 4 * 1024; diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index 20826137cba1a..439c7b11b387a 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -827,6 +827,22 @@ TEST_F(ParquetTableScanTest, sessionTimezone) { assertSelectWithTimezone({"a"}, "SELECT a FROM tmp", "Asia/Shanghai"); } +TEST_F(ParquetTableScanTest, timestampInt64Dictionary) { + WriterOptions options; + options.writeInt96AsTimestamp = false; + options.enableDictionary = true; + options.parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds; + testTimestampRead(options); +} + +TEST_F(ParquetTableScanTest, timestampInt64Plain) { + WriterOptions options; + options.writeInt96AsTimestamp = false; + options.enableDictionary = false; + options.parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds; + testTimestampRead(options); +} + TEST_F(ParquetTableScanTest, timestampInt96Dictionary) { WriterOptions options; options.writeInt96AsTimestamp = true;