From c26be68e431112a474968507a17d7987a4d90575 Mon Sep 17 00:00:00 2001 From: Kapil Singh Date: Fri, 8 Dec 2023 12:13:48 +0530 Subject: [PATCH] Add support to read plain encoded INT96 timestamp from Parquet file --- velox/dwio/common/IntDecoder.h | 17 +++ velox/dwio/parquet/reader/PageReader.cpp | 19 +-- .../parquet/reader/TimestampColumnReader.h | 12 +- ...uet => timestamp_int96_dictionary.parquet} | Bin .../examples/timestamp_int96_plain.parquet | Bin 0 -> 518 bytes .../parquet/tests/reader/E2EFilterTest.cpp | 14 ++ .../tests/reader/ParquetTableScanTest.cpp | 134 ++++++++++-------- velox/type/Filter.h | 5 +- 8 files changed, 121 insertions(+), 80 deletions(-) rename velox/dwio/parquet/tests/examples/{timestamp_int96.parquet => timestamp_int96_dictionary.parquet} (100%) create mode 100644 velox/dwio/parquet/tests/examples/timestamp_int96_plain.parquet diff --git a/velox/dwio/common/IntDecoder.h b/velox/dwio/common/IntDecoder.h index 016e8b1f8244d..913692b03eb90 100644 --- a/velox/dwio/common/IntDecoder.h +++ b/velox/dwio/common/IntDecoder.h @@ -150,6 +150,9 @@ class IntDecoder { template T readInt(); + // Reads Int96 timestamp composed of days and nanos as int128_t. + int128_t readInt96(); + template T readVInt(); @@ -438,12 +441,26 @@ inline T IntDecoder::readInt() { return readLittleEndianFromBigEndian(); } else { if constexpr (std::is_same_v) { + if (numBytes_ == 12) { + VELOX_DCHECK(!useVInts_, "Int96 should not be VInt encoded."); + return readInt96(); + } VELOX_NYI(); } return readLongLE(); } } +template +inline int128_t IntDecoder::readInt96() { + int128_t result = 0; + for (int i = 0; i < 12; ++i) { + auto ch = readByte(); + result |= static_cast(ch & BASE_256_MASK) << (i * 8); + } + return result; +} + template template inline T IntDecoder::readVInt() { diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 71f0fce116226..78948fd69f57b 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -377,7 +377,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { break; } case thrift::Type::INT96: { - auto numVeloxBytes = dictionary_.numValues * sizeof(Timestamp); + auto numVeloxBytes = dictionary_.numValues * sizeof(int128_t); dictionary_.values = AlignedBuffer::allocate(numVeloxBytes, &pool_); auto numBytes = dictionary_.numValues * sizeof(Int96Timestamp); if (pageData_) { @@ -392,23 +392,16 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { } // Expand the Parquet type length values to Velox type length. // We start from the end to allow in-place expansion. - auto values = dictionary_.values->asMutable(); + auto values = dictionary_.values->asMutable(); auto parquetValues = dictionary_.values->asMutable(); for (auto i = dictionary_.numValues - 1; i >= 0; --i) { - // Convert the timestamp into seconds and nanos since the Unix epoch, - // 00:00:00.000000 on 1 January 1970. - int64_t nanos; + int128_t result = 0; memcpy( - &nanos, + &result, parquetValues + i * sizeof(Int96Timestamp), - sizeof(int64_t)); - int32_t days; - memcpy( - &days, - parquetValues + i * sizeof(Int96Timestamp) + sizeof(int64_t), - sizeof(int32_t)); - values[i] = Timestamp::fromDaysAndNanos(days, nanos); + sizeof(Int96Timestamp)); + values[i] = result; } break; } diff --git a/velox/dwio/parquet/reader/TimestampColumnReader.h b/velox/dwio/parquet/reader/TimestampColumnReader.h index 0e1045774535e..040b03e39964c 100644 --- a/velox/dwio/parquet/reader/TimestampColumnReader.h +++ b/velox/dwio/parquet/reader/TimestampColumnReader.h @@ -49,8 +49,14 @@ class TimestampColumnReader : public IntegerColumnReader { if (resultVector->isNullAt(i)) { continue; } - const auto timestamp = rawValues[i]; - uint64_t nanos = timestamp.getNanos(); + + // Convert int128_t to Timestamp by extracting days and nanos. + const int128_t encoded = reinterpret_cast(rawValues[i]); + const int32_t days = static_cast(encoded >> 64); + uint64_t nanos = encoded & ((((1ULL << 63) - 1ULL) << 1) + 1); + const auto timestamp = Timestamp::fromDaysAndNanos(days, nanos); + + nanos = timestamp.getNanos(); switch (timestampPrecision_) { case TimestampPrecision::kMilliseconds: nanos = nanos / 1'000'000 * 1'000'000; @@ -70,7 +76,7 @@ class TimestampColumnReader : public IntegerColumnReader { const RowSet& rows, const uint64_t* /*incomingNulls*/) override { auto& data = formatData_->as(); - // Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length. + // Use int128_t as a workaround. Timestamp in Velox is of 16-byte length. prepareRead(offset, rows, nullptr); readCommon(rows); readOffset_ += rows.back() + 1; diff --git a/velox/dwio/parquet/tests/examples/timestamp_int96.parquet b/velox/dwio/parquet/tests/examples/timestamp_int96_dictionary.parquet similarity index 100% rename from velox/dwio/parquet/tests/examples/timestamp_int96.parquet rename to velox/dwio/parquet/tests/examples/timestamp_int96_dictionary.parquet diff --git a/velox/dwio/parquet/tests/examples/timestamp_int96_plain.parquet b/velox/dwio/parquet/tests/examples/timestamp_int96_plain.parquet new file mode 100644 index 0000000000000000000000000000000000000000..8666703ea732f65636c1d87e47894e205139320b GIT binary patch literal 518 zcmZWn&ubGw82xsjVYQ1O>@2&ofjF38VI8_VZPEr0t>z%rL-As?N||gXc5ydJcPAAI z>0J=ddJzQa&3Y6(dC~ttZ{k7mqJp4@3VxAz@-jT$_vXDfKQ`B|h7{-*t6z~`ijisv8KP6519h`(%? zKUe^#?<4L%`+C6u_;eYu_4E460APIw@q>q3zsmrRj5BRAct5uR(4N}-Y8^c}0q~-z zg6BIXJs=LoQ{n}etYRqhJKIIPqcTYe_Nx-t(BRmf@A);;rzLM0N~VV-C64VJ66b)_ z%=2|XFFX(003b`9>FU@u6X!M#1J@nox49fj^yYtK_@2s(WYBl#1Q$F6ci}%pv72&i zv^!Fo=YzXEQ(AVVmhO!))AxpoHCdtaQLI_OdWlNA1#51NSzl(;(UhI3bdsq;%WQ~L zf0U-OlcEB*s(}47WzF$qa%%$a#w_w&mRDvSDclose(); } + void testInt96TimestampRead(const std::string& fileName) { + // Timestamp-int96.parquet holds one column (t: TIMESTAMP) and + // 10 rows in one row group. Data is in SNAPPY compressed format. + // The values are: + // |t | + // +-------------------+ + // |2015-06-01 19:34:56| + // |2015-06-02 19:34:56| + // |2001-02-03 03:34:06| + // |1998-03-01 08:01:06| + // |2022-12-23 03:56:01| + // |1980-01-24 00:23:07| + // |1999-12-08 13:39:26| + // |2023-04-21 09:09:34| + // |2000-09-12 22:36:29| + // |2007-12-12 04:27:56| + // +-------------------+ + auto vector = makeFlatVector( + {Timestamp(1433187296, 0), + Timestamp(1433273696, 0), + Timestamp(981171246, 0), + Timestamp(888739266, 0), + Timestamp(1671767761, 0), + Timestamp(317521387, 0), + Timestamp(944660366, 0), + Timestamp(1682068174, 0), + Timestamp(968798189, 0), + Timestamp(1197433676, 0)}); + + loadData( + getExampleFilePath(fileName), + ROW({"t"}, {TIMESTAMP()}), + makeRowVector( + {"t"}, + { + vector, + })); + + assertSelectWithFilter({"t"}, {}, "", "SELECT t from tmp"); + assertSelectWithFilter( + {"t"}, + {}, + "t < TIMESTAMP '2000-09-12 22:36:29'", + "SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'"); + assertSelectWithFilter( + {"t"}, + {}, + "t <= TIMESTAMP '2000-09-12 22:36:29'", + "SELECT t from tmp where t <= TIMESTAMP '2000-09-12 22:36:29'"); + assertSelectWithFilter( + {"t"}, + {}, + "t > TIMESTAMP '1980-01-24 00:23:07'", + "SELECT t from tmp where t > TIMESTAMP '1980-01-24 00:23:07'"); + assertSelectWithFilter( + {"t"}, + {}, + "t >= TIMESTAMP '1980-01-24 00:23:07'", + "SELECT t from tmp where t >= TIMESTAMP '1980-01-24 00:23:07'"); + assertSelectWithFilter( + {"t"}, + {}, + "t == TIMESTAMP '2022-12-23 03:56:01'", + "SELECT t from tmp where t == TIMESTAMP '2022-12-23 03:56:01'"); + } + private: RowTypePtr getRowType(std::vector&& outputColumnNames) const { std::vector types; @@ -753,70 +819,12 @@ TEST_F(ParquetTableScanTest, sessionTimezone) { assertSelectWithTimezone({"a"}, "SELECT a FROM tmp", "Asia/Shanghai"); } -TEST_F(ParquetTableScanTest, timestampFilter) { - // Timestamp-int96.parquet holds one column (t: TIMESTAMP) and - // 10 rows in one row group. Data is in SNAPPY compressed format. - // The values are: - // |t | - // +-------------------+ - // |2015-06-01 19:34:56| - // |2015-06-02 19:34:56| - // |2001-02-03 03:34:06| - // |1998-03-01 08:01:06| - // |2022-12-23 03:56:01| - // |1980-01-24 00:23:07| - // |1999-12-08 13:39:26| - // |2023-04-21 09:09:34| - // |2000-09-12 22:36:29| - // |2007-12-12 04:27:56| - // +-------------------+ - auto vector = makeFlatVector( - {Timestamp(1433187296, 0), - Timestamp(1433273696, 0), - Timestamp(981171246, 0), - Timestamp(888739266, 0), - Timestamp(1671767761, 0), - Timestamp(317521387, 0), - Timestamp(944660366, 0), - Timestamp(1682068174, 0), - Timestamp(968798189, 0), - Timestamp(1197433676, 0)}); - - loadData( - getExampleFilePath("timestamp_int96.parquet"), - ROW({"t"}, {TIMESTAMP()}), - makeRowVector( - {"t"}, - { - vector, - })); +TEST_F(ParquetTableScanTest, timestampInt96Dictionary) { + testInt96TimestampRead("timestamp_int96_dictionary.parquet"); +} - assertSelectWithFilter({"t"}, {}, "", "SELECT t from tmp"); - assertSelectWithFilter( - {"t"}, - {}, - "t < TIMESTAMP '2000-09-12 22:36:29'", - "SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'"); - assertSelectWithFilter( - {"t"}, - {}, - "t <= TIMESTAMP '2000-09-12 22:36:29'", - "SELECT t from tmp where t <= TIMESTAMP '2000-09-12 22:36:29'"); - assertSelectWithFilter( - {"t"}, - {}, - "t > TIMESTAMP '1980-01-24 00:23:07'", - "SELECT t from tmp where t > TIMESTAMP '1980-01-24 00:23:07'"); - assertSelectWithFilter( - {"t"}, - {}, - "t >= TIMESTAMP '1980-01-24 00:23:07'", - "SELECT t from tmp where t >= TIMESTAMP '1980-01-24 00:23:07'"); - assertSelectWithFilter( - {"t"}, - {}, - "t == TIMESTAMP '2022-12-23 03:56:01'", - "SELECT t from tmp where t == TIMESTAMP '2022-12-23 03:56:01'"); +TEST_F(ParquetTableScanTest, timestampInt96Plain) { + testInt96TimestampRead("timestamp_int96_plain.parquet"); } TEST_F(ParquetTableScanTest, timestampPrecisionMicrosecond) { diff --git a/velox/type/Filter.h b/velox/type/Filter.h index 4a84992f71d04..89b95a31ec230 100644 --- a/velox/type/Filter.h +++ b/velox/type/Filter.h @@ -1825,7 +1825,10 @@ class TimestampRange final : public Filter { } bool testInt128(int128_t value) const final { - const auto& ts = reinterpret_cast(value); + // Convert int128_t to Timestamp by extracting days and nanos. + const int32_t days = static_cast(value >> 64); + const uint64_t nanos = value & ((((1ULL << 63) - 1ULL) << 1) + 1); + const auto ts = Timestamp::fromDaysAndNanos(days, nanos); return ts >= lower_ && ts <= upper_; }