From b0aa8c6e086f34c033fd84cadee9cf8ba3617bac Mon Sep 17 00:00:00 2001 From: Kapil Singh Date: Mon, 29 Apr 2024 17:51:06 +0530 Subject: [PATCH] PR comments --- velox/dwio/parquet/reader/PageReader.cpp | 28 +++++------ velox/dwio/parquet/reader/PageReader.h | 4 +- velox/dwio/parquet/reader/ParquetReader.cpp | 28 +++++------ .../parquet/reader/TimestampColumnReader.h | 4 +- .../reader}/TimestampDecoder.h | 24 +++++---- .../tests/reader/ParquetTableScanTest.cpp | 50 +++++++++++++++++-- velox/type/TimestampConversion.h | 8 --- 7 files changed, 93 insertions(+), 53 deletions(-) rename velox/dwio/{common => parquet/reader}/TimestampDecoder.h (80%) diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 96145eb6a4e2a..a798614c1b991 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -18,7 +18,6 @@ #include "velox/dwio/common/BufferUtil.h" #include "velox/dwio/common/ColumnVisitors.h" -#include "velox/dwio/common/TimestampDecoder.h" #include "velox/dwio/parquet/thrift/ThriftTransport.h" #include "velox/vector/FlatVector.h" @@ -371,9 +370,9 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { VELOX_CHECK(type_->logicalType_.has_value()); auto logicalType = type_->logicalType_.value(); if (logicalType.__isset.TIMESTAMP) { - VELOX_CHECK( - logicalType.TIMESTAMP.isAdjustedToUTC, - "Only UTC adjusted Timestamp is supported."); + if (!logicalType.TIMESTAMP.isAdjustedToUTC) { + VELOX_NYI("Only UTC adjusted Timestamp is supported."); + } auto values = dictionary_.values->asMutable(); auto parquetValues = dictionary_.values->asMutable(); int64_t units; @@ -381,12 +380,12 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { if (logicalType.TIMESTAMP.unit.__isset.MICROS) { for (auto i = dictionary_.numValues - 1; i >= 0; --i) { memcpy(&units, parquetValues + i * typeSize, typeSize); - values[i] = util::fromUTCMicros(units); + values[i] = Timestamp::fromMicros(units); } } else if (logicalType.TIMESTAMP.unit.__isset.MILLIS) { for (auto i = dictionary_.numValues - 1; i >= 0; --i) { memcpy(&units, parquetValues + i * typeSize, typeSize); - values[i] = util::fromUTCMillis(units); + values[i] = Timestamp::fromMillis(units); } } else { VELOX_NYI("Nano Timestamp unit is not supported."); @@ -668,15 +667,14 @@ void PageReader::makeDecoder() { "Nano Timestamp unit not supported."); auto precisionUnit = logicalType.TIMESTAMP.unit.__isset.MICROS - ? dwio::common::TimestampPrecision::kMicros - : dwio::common::TimestampPrecision::kMillis; - timestampDecoder_ = - std::make_unique( - precisionUnit, - std::make_unique( - pageData_, encodedDataSize_), - false, - parquetTypeBytes(type_->parquetType_.value())); + ? TimestampPrecision::kMicros + : TimestampPrecision::kMillis; + timestampDecoder_ = std::make_unique( + precisionUnit, + std::make_unique( + pageData_, encodedDataSize_), + false, + parquetTypeBytes(type_->parquetType_.value())); } else { directDecoder_ = std::make_unique>( diff --git a/velox/dwio/parquet/reader/PageReader.h b/velox/dwio/parquet/reader/PageReader.h index c593bfc4e536b..3631e059411fa 100644 --- a/velox/dwio/parquet/reader/PageReader.h +++ b/velox/dwio/parquet/reader/PageReader.h @@ -20,13 +20,13 @@ #include "velox/dwio/common/BitConcatenation.h" #include "velox/dwio/common/DirectDecoder.h" #include "velox/dwio/common/SelectiveColumnReader.h" -#include "velox/dwio/common/TimestampDecoder.h" #include "velox/dwio/common/compression/Compression.h" #include "velox/dwio/parquet/reader/BooleanDecoder.h" #include "velox/dwio/parquet/reader/DeltaBpDecoder.h" #include "velox/dwio/parquet/reader/ParquetTypeWithId.h" #include "velox/dwio/parquet/reader/RleBpDataDecoder.h" #include "velox/dwio/parquet/reader/StringDecoder.h" +#include "velox/dwio/parquet/reader/TimestampDecoder.h" #include @@ -490,7 +490,7 @@ class PageReader { std::unique_ptr stringDecoder_; std::unique_ptr booleanDecoder_; std::unique_ptr deltaBpDecoder_; - std::unique_ptr timestampDecoder_; + std::unique_ptr timestampDecoder_; // Add decoders for other encodings here. }; diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 7f8ee8058be97..6364a4712f7b3 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -397,7 +397,7 @@ std::unique_ptr ReaderBase::getParquetColumnInfo( int32_t precision = schemaElement.__isset.precision ? schemaElement.precision : 0; int32_t scale = schemaElement.__isset.scale ? schemaElement.scale : 0; - int32_t type_length = + int32_t typeLength = schemaElement.__isset.type_length ? schemaElement.type_length : 0; std::vector> children; std::optional logicalType_ = @@ -435,19 +435,19 @@ std::unique_ptr ReaderBase::getParquetColumnInfo( } auto leafTypePtr = std::make_unique( - veloxType, - std::move(children), - curSchemaIdx, - maxSchemaElementIdx, - columnIdx++, - name, - schemaElement.type, - logicalType_, - maxRepeat, - maxDefine, - precision, - scale, - type_length); + veloxType, + std::move(children), + curSchemaIdx, + maxSchemaElementIdx, + columnIdx++, + name, + schemaElement.type, + logicalType_, + maxRepeat, + maxDefine, + precision, + scale, + typeLength); if (schemaElement.repetition_type == thrift::FieldRepetitionType::REPEATED) { diff --git a/velox/dwio/parquet/reader/TimestampColumnReader.h b/velox/dwio/parquet/reader/TimestampColumnReader.h index 4c534b4bfcee1..61c1632bd86bd 100644 --- a/velox/dwio/parquet/reader/TimestampColumnReader.h +++ b/velox/dwio/parquet/reader/TimestampColumnReader.h @@ -39,7 +39,9 @@ class TimestampColumnReader : public IntegerColumnReader { RowSet rows, const uint64_t* /*incomingNulls*/) override { auto& data = formatData_->as(); - // Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length. + // Use int128_t as a workaroud. Timestamp type in Velox is comprised of an + // int64_t seconds_ field and a uint64_t nanos_ field, a total of 16-byte + // length prepareRead(offset, rows, nullptr); readCommon(rows); readOffset_ += rows.back() + 1; diff --git a/velox/dwio/common/TimestampDecoder.h b/velox/dwio/parquet/reader/TimestampDecoder.h similarity index 80% rename from velox/dwio/common/TimestampDecoder.h rename to velox/dwio/parquet/reader/TimestampDecoder.h index 2abf39bbc65a9..f2b355e5caf8f 100644 --- a/velox/dwio/common/TimestampDecoder.h +++ b/velox/dwio/parquet/reader/TimestampDecoder.h @@ -18,11 +18,11 @@ #include "velox/dwio/common/DirectDecoder.h" -namespace facebook::velox::dwio::common { +namespace facebook::velox::parquet { enum TimestampPrecision { kMillis, kMicros }; -class TimestampDecoder : public DirectDecoder { +class TimestampDecoder : public dwio::common::DirectDecoder { public: TimestampDecoder( TimestampPrecision precision, @@ -60,17 +60,21 @@ class TimestampDecoder : public DirectDecoder { } } } - if constexpr (std::is_same_v) { + { auto units = IntDecoder::template readInt(); - Timestamp timestamp = precision_ == TimestampPrecision::kMillis - ? util::fromUTCMillis(units) - : util::fromUTCMicros(units); + Timestamp timestamp; + if (precision_ == TimestampPrecision::kMillis) { + timestamp = Timestamp::fromMillis(units); + } else if (precision_ == TimestampPrecision::kMicros) { + timestamp = Timestamp::fromMicros(units); + } else { + VELOX_NYI( + "Unsupported timestamp unit. Only kMillis and kMicros supported."); + } + int128_t value; memcpy(&value, ×tamp, sizeof(int128_t)); toSkip = visitor.process(value, atEnd); - } else { - toSkip = visitor.process( - IntDecoder::template readInt(), atEnd); } skip: ++current; @@ -87,4 +91,4 @@ class TimestampDecoder : public DirectDecoder { private: TimestampPrecision precision_; }; -} // namespace facebook::velox::dwio::common +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index 55406b7b65392..b9e065db2cad0 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -443,9 +443,26 @@ TEST_F(ParquetTableScanTest, readAsLowerCase) { result.second, {makeRowVector({"a"}, {makeFlatVector({0, 1})})}); } -TEST_F(ParquetTableScanTest, timestampINT64) { - auto a = makeFlatVector( - 12, [](auto row) { return Timestamp(row % 4, 0); }); +TEST_F(ParquetTableScanTest, timestampINT64millis) { + std::vector rawData = { + Timestamp(0, 0), + Timestamp(0, 1000000), + Timestamp(-1, 999000000), + Timestamp(1, 0), + Timestamp(-1, 0), + Timestamp(1, 1000000), + Timestamp(-2, 999000000), + Timestamp(0, 999000000), + Timestamp(-1, 1000000), + Timestamp(1000, 0), + Timestamp(-1000, 0), + Timestamp(1000, 1000000), + Timestamp(-1001, 999000000), + Timestamp(99, 999000000), + Timestamp(-100, 1000000)}; + + auto a = + makeFlatVector(60, [&](auto row) { return rawData[row / 4]; }); auto expected = makeRowVector({"time"}, {a}); createDuckDbTable("expected", {expected}); @@ -471,6 +488,33 @@ TEST_F(ParquetTableScanTest, timestampINT64) { vector, })); assertSelect({"time"}, "SELECT time from expected"); +} + +TEST_F(ParquetTableScanTest, timestampINT64micros) { + std::vector rawData = { + Timestamp(0, 0), + Timestamp(0, 1000), + Timestamp(-1, 999999000), + Timestamp(0, 1000000), + Timestamp(-1, 999000000), + Timestamp(0, 1001000), + Timestamp(-1, 998999000), + Timestamp(0, 999000), + Timestamp(-1, 999001000), + Timestamp(1, 0), + Timestamp(-1, 0), + Timestamp(1, 1000), + Timestamp(-2, 999999000), + Timestamp(0, 99999000), + Timestamp(-1, 900001000)}; + + auto a = + makeFlatVector(60, [&](auto row) { return rawData[row / 4]; }); + + auto expected = makeRowVector({"time"}, {a}); + createDuckDbTable("expected", {expected}); + + auto vector = makeArrayVector({{}}); loadData( getExampleFilePath("int64_micros_dictionary.parquet"), diff --git a/velox/type/TimestampConversion.h b/velox/type/TimestampConversion.h index 9c81666a0ffd5..eb41d3d150577 100644 --- a/velox/type/TimestampConversion.h +++ b/velox/type/TimestampConversion.h @@ -203,12 +203,4 @@ inline auto fromTimestampWithTimezoneString(const StringView& str) { Timestamp fromDatetime(int64_t daysSinceEpoch, int64_t microsSinceMidnight); -inline Timestamp fromUTCMillis(int64_t millis) { - return Timestamp(millis / 1000, (millis % 1000) * 1000000); -} - -inline Timestamp fromUTCMicros(int64_t micros) { - return Timestamp(micros / 1000000, (micros % 1000000) * 1000); -} - } // namespace facebook::velox::util