diff --git a/velox/dwio/common/IntDecoder.h b/velox/dwio/common/IntDecoder.h index 016e8b1f8244d..913692b03eb90 100644 --- a/velox/dwio/common/IntDecoder.h +++ b/velox/dwio/common/IntDecoder.h @@ -150,6 +150,9 @@ class IntDecoder { template T readInt(); + // Reads Int96 timestamp composed of days and nanos as int128_t. + int128_t readInt96(); + template T readVInt(); @@ -438,12 +441,26 @@ inline T IntDecoder::readInt() { return readLittleEndianFromBigEndian(); } else { if constexpr (std::is_same_v) { + if (numBytes_ == 12) { + VELOX_DCHECK(!useVInts_, "Int96 should not be VInt encoded."); + return readInt96(); + } VELOX_NYI(); } return readLongLE(); } } +template +inline int128_t IntDecoder::readInt96() { + int128_t result = 0; + for (int i = 0; i < 12; ++i) { + auto ch = readByte(); + result |= static_cast(ch & BASE_256_MASK) << (i * 8); + } + return result; +} + template template inline T IntDecoder::readVInt() { diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 71f0fce116226..78948fd69f57b 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -377,7 +377,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { break; } case thrift::Type::INT96: { - auto numVeloxBytes = dictionary_.numValues * sizeof(Timestamp); + auto numVeloxBytes = dictionary_.numValues * sizeof(int128_t); dictionary_.values = AlignedBuffer::allocate(numVeloxBytes, &pool_); auto numBytes = dictionary_.numValues * sizeof(Int96Timestamp); if (pageData_) { @@ -392,23 +392,16 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { } // Expand the Parquet type length values to Velox type length. // We start from the end to allow in-place expansion. - auto values = dictionary_.values->asMutable(); + auto values = dictionary_.values->asMutable(); auto parquetValues = dictionary_.values->asMutable(); for (auto i = dictionary_.numValues - 1; i >= 0; --i) { - // Convert the timestamp into seconds and nanos since the Unix epoch, - // 00:00:00.000000 on 1 January 1970. - int64_t nanos; + int128_t result = 0; memcpy( - &nanos, + &result, parquetValues + i * sizeof(Int96Timestamp), - sizeof(int64_t)); - int32_t days; - memcpy( - &days, - parquetValues + i * sizeof(Int96Timestamp) + sizeof(int64_t), - sizeof(int32_t)); - values[i] = Timestamp::fromDaysAndNanos(days, nanos); + sizeof(Int96Timestamp)); + values[i] = result; } break; } diff --git a/velox/dwio/parquet/reader/TimestampColumnReader.h b/velox/dwio/parquet/reader/TimestampColumnReader.h index 0e1045774535e..040b03e39964c 100644 --- a/velox/dwio/parquet/reader/TimestampColumnReader.h +++ b/velox/dwio/parquet/reader/TimestampColumnReader.h @@ -49,8 +49,14 @@ class TimestampColumnReader : public IntegerColumnReader { if (resultVector->isNullAt(i)) { continue; } - const auto timestamp = rawValues[i]; - uint64_t nanos = timestamp.getNanos(); + + // Convert int128_t to Timestamp by extracting days and nanos. + const int128_t encoded = reinterpret_cast(rawValues[i]); + const int32_t days = static_cast(encoded >> 64); + uint64_t nanos = encoded & ((((1ULL << 63) - 1ULL) << 1) + 1); + const auto timestamp = Timestamp::fromDaysAndNanos(days, nanos); + + nanos = timestamp.getNanos(); switch (timestampPrecision_) { case TimestampPrecision::kMilliseconds: nanos = nanos / 1'000'000 * 1'000'000; @@ -70,7 +76,7 @@ class TimestampColumnReader : public IntegerColumnReader { const RowSet& rows, const uint64_t* /*incomingNulls*/) override { auto& data = formatData_->as(); - // Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length. + // Use int128_t as a workaround. Timestamp in Velox is of 16-byte length. prepareRead(offset, rows, nullptr); readCommon(rows); readOffset_ += rows.back() + 1; diff --git a/velox/dwio/parquet/tests/examples/timestamp_int96.parquet b/velox/dwio/parquet/tests/examples/timestamp_int96_dictionary.parquet similarity index 100% rename from velox/dwio/parquet/tests/examples/timestamp_int96.parquet rename to velox/dwio/parquet/tests/examples/timestamp_int96_dictionary.parquet diff --git a/velox/dwio/parquet/tests/examples/timestamp_int96_plain.parquet b/velox/dwio/parquet/tests/examples/timestamp_int96_plain.parquet new file mode 100644 index 0000000000000..8666703ea732f Binary files /dev/null and b/velox/dwio/parquet/tests/examples/timestamp_int96_plain.parquet differ diff --git a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp index 85853ae350c0c..8450b01523b4e 100644 --- a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp +++ b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp @@ -256,6 +256,20 @@ TEST_F(E2EFilterTest, integerDictionary) { 20); } +TEST_F(E2EFilterTest, timestampDirect) { + options_.enableDictionary = false; + options_.dataPageSize = 4 * 1024; + options_.writeInt96AsTimestamp = true; + + testWithTypes( + "timestamp_val_0:timestamp," + "timestamp_val_1:timestamp", + [&]() {}, + true, + {"timestamp_val_0", "timestamp_val_1"}, + 20); +} + TEST_F(E2EFilterTest, timestampDictionary) { options_.dataPageSize = 4 * 1024; options_.writeInt96AsTimestamp = true; diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index 58f7bfdb80400..e910022a6329c 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -223,6 +223,72 @@ class ParquetTableScanTest : public HiveConnectorTestBase { writer->close(); } + void testInt96TimestampRead(const std::string& fileName) { + // Timestamp-int96.parquet holds one column (t: TIMESTAMP) and + // 10 rows in one row group. Data is in SNAPPY compressed format. + // The values are: + // |t | + // +-------------------+ + // |2015-06-01 19:34:56| + // |2015-06-02 19:34:56| + // |2001-02-03 03:34:06| + // |1998-03-01 08:01:06| + // |2022-12-23 03:56:01| + // |1980-01-24 00:23:07| + // |1999-12-08 13:39:26| + // |2023-04-21 09:09:34| + // |2000-09-12 22:36:29| + // |2007-12-12 04:27:56| + // +-------------------+ + auto vector = makeFlatVector( + {Timestamp(1433187296, 0), + Timestamp(1433273696, 0), + Timestamp(981171246, 0), + Timestamp(888739266, 0), + Timestamp(1671767761, 0), + Timestamp(317521387, 0), + Timestamp(944660366, 0), + Timestamp(1682068174, 0), + Timestamp(968798189, 0), + Timestamp(1197433676, 0)}); + + loadData( + getExampleFilePath(fileName), + ROW({"t"}, {TIMESTAMP()}), + makeRowVector( + {"t"}, + { + vector, + })); + + assertSelectWithFilter({"t"}, {}, "", "SELECT t from tmp"); + assertSelectWithFilter( + {"t"}, + {}, + "t < TIMESTAMP '2000-09-12 22:36:29'", + "SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'"); + assertSelectWithFilter( + {"t"}, + {}, + "t <= TIMESTAMP '2000-09-12 22:36:29'", + "SELECT t from tmp where t <= TIMESTAMP '2000-09-12 22:36:29'"); + assertSelectWithFilter( + {"t"}, + {}, + "t > TIMESTAMP '1980-01-24 00:23:07'", + "SELECT t from tmp where t > TIMESTAMP '1980-01-24 00:23:07'"); + assertSelectWithFilter( + {"t"}, + {}, + "t >= TIMESTAMP '1980-01-24 00:23:07'", + "SELECT t from tmp where t >= TIMESTAMP '1980-01-24 00:23:07'"); + assertSelectWithFilter( + {"t"}, + {}, + "t == TIMESTAMP '2022-12-23 03:56:01'", + "SELECT t from tmp where t == TIMESTAMP '2022-12-23 03:56:01'"); + } + private: RowTypePtr getRowType(std::vector&& outputColumnNames) const { std::vector types; @@ -753,70 +819,12 @@ TEST_F(ParquetTableScanTest, sessionTimezone) { assertSelectWithTimezone({"a"}, "SELECT a FROM tmp", "Asia/Shanghai"); } -TEST_F(ParquetTableScanTest, timestampFilter) { - // Timestamp-int96.parquet holds one column (t: TIMESTAMP) and - // 10 rows in one row group. Data is in SNAPPY compressed format. - // The values are: - // |t | - // +-------------------+ - // |2015-06-01 19:34:56| - // |2015-06-02 19:34:56| - // |2001-02-03 03:34:06| - // |1998-03-01 08:01:06| - // |2022-12-23 03:56:01| - // |1980-01-24 00:23:07| - // |1999-12-08 13:39:26| - // |2023-04-21 09:09:34| - // |2000-09-12 22:36:29| - // |2007-12-12 04:27:56| - // +-------------------+ - auto vector = makeFlatVector( - {Timestamp(1433187296, 0), - Timestamp(1433273696, 0), - Timestamp(981171246, 0), - Timestamp(888739266, 0), - Timestamp(1671767761, 0), - Timestamp(317521387, 0), - Timestamp(944660366, 0), - Timestamp(1682068174, 0), - Timestamp(968798189, 0), - Timestamp(1197433676, 0)}); - - loadData( - getExampleFilePath("timestamp_int96.parquet"), - ROW({"t"}, {TIMESTAMP()}), - makeRowVector( - {"t"}, - { - vector, - })); +TEST_F(ParquetTableScanTest, timestampInt96Dictionary) { + testInt96TimestampRead("timestamp_int96_dictionary.parquet"); +} - assertSelectWithFilter({"t"}, {}, "", "SELECT t from tmp"); - assertSelectWithFilter( - {"t"}, - {}, - "t < TIMESTAMP '2000-09-12 22:36:29'", - "SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'"); - assertSelectWithFilter( - {"t"}, - {}, - "t <= TIMESTAMP '2000-09-12 22:36:29'", - "SELECT t from tmp where t <= TIMESTAMP '2000-09-12 22:36:29'"); - assertSelectWithFilter( - {"t"}, - {}, - "t > TIMESTAMP '1980-01-24 00:23:07'", - "SELECT t from tmp where t > TIMESTAMP '1980-01-24 00:23:07'"); - assertSelectWithFilter( - {"t"}, - {}, - "t >= TIMESTAMP '1980-01-24 00:23:07'", - "SELECT t from tmp where t >= TIMESTAMP '1980-01-24 00:23:07'"); - assertSelectWithFilter( - {"t"}, - {}, - "t == TIMESTAMP '2022-12-23 03:56:01'", - "SELECT t from tmp where t == TIMESTAMP '2022-12-23 03:56:01'"); +TEST_F(ParquetTableScanTest, timestampInt96Plain) { + testInt96TimestampRead("timestamp_int96_plain.parquet"); } TEST_F(ParquetTableScanTest, timestampPrecisionMicrosecond) { diff --git a/velox/type/Filter.h b/velox/type/Filter.h index 4a84992f71d04..89b95a31ec230 100644 --- a/velox/type/Filter.h +++ b/velox/type/Filter.h @@ -1825,7 +1825,10 @@ class TimestampRange final : public Filter { } bool testInt128(int128_t value) const final { - const auto& ts = reinterpret_cast(value); + // Convert int128_t to Timestamp by extracting days and nanos. + const int32_t days = static_cast(value >> 64); + const uint64_t nanos = value & ((((1ULL << 63) - 1ULL) << 1) + 1); + const auto ts = Timestamp::fromDaysAndNanos(days, nanos); return ts >= lower_ && ts <= upper_; }