diff --git a/velox/dwio/common/IntDecoder.h b/velox/dwio/common/IntDecoder.h index 016e8b1f8244..913692b03eb9 100644 --- a/velox/dwio/common/IntDecoder.h +++ b/velox/dwio/common/IntDecoder.h @@ -150,6 +150,9 @@ class IntDecoder { template T readInt(); + // Reads Int96 timestamp composed of days and nanos as int128_t. + int128_t readInt96(); + template T readVInt(); @@ -438,12 +441,26 @@ inline T IntDecoder::readInt() { return readLittleEndianFromBigEndian(); } else { if constexpr (std::is_same_v) { + if (numBytes_ == 12) { + VELOX_DCHECK(!useVInts_, "Int96 should not be VInt encoded."); + return readInt96(); + } VELOX_NYI(); } return readLongLE(); } } +template +inline int128_t IntDecoder::readInt96() { + int128_t result = 0; + for (int i = 0; i < 12; ++i) { + auto ch = readByte(); + result |= static_cast(ch & BASE_256_MASK) << (i * 8); + } + return result; +} + template template inline T IntDecoder::readVInt() { diff --git a/velox/dwio/common/SelectiveIntegerColumnReader.h b/velox/dwio/common/SelectiveIntegerColumnReader.h index 2716f800bba8..c94b5a60d789 100644 --- a/velox/dwio/common/SelectiveIntegerColumnReader.h +++ b/velox/dwio/common/SelectiveIntegerColumnReader.h @@ -127,23 +127,26 @@ void SelectiveIntegerColumnReader::processFilter( ExtractValues extractValues, const RowSet& rows) { if (filter == nullptr) { - readHelper( - &dwio::common::alwaysTrue(), rows, extractValues); + static_cast(this) + ->template readHelper( + &dwio::common::alwaysTrue(), rows, extractValues); return; } switch (filter->kind()) { case velox::common::FilterKind::kAlwaysTrue: - readHelper( - filter, rows, extractValues); + static_cast(this) + ->template readHelper( + filter, rows, extractValues); break; case velox::common::FilterKind::kIsNull: if constexpr (kEncodingHasNulls) { filterNulls( rows, true, !std::is_same_v); } else { - readHelper( - filter, rows, extractValues); + static_cast(this) + ->template readHelper( + filter, rows, extractValues); } break; case velox::common::FilterKind::kIsNotNull: @@ -152,41 +155,55 @@ void SelectiveIntegerColumnReader::processFilter( std::is_same_v) { filterNulls(rows, false, false); } else { - readHelper( - filter, rows, extractValues); + static_cast(this) + ->template readHelper( + filter, rows, extractValues); } break; case velox::common::FilterKind::kBigintRange: - readHelper( - filter, rows, extractValues); + static_cast(this) + ->template readHelper( + filter, rows, extractValues); break; case velox::common::FilterKind::kNegatedBigintRange: - readHelper( - filter, rows, extractValues); + static_cast(this) + ->template readHelper< + Reader, + velox::common::NegatedBigintRange, + isDense>(filter, rows, extractValues); break; case velox::common::FilterKind::kBigintValuesUsingHashTable: - readHelper( - filter, rows, extractValues); + static_cast(this) + ->template readHelper< + Reader, + velox::common::BigintValuesUsingHashTable, + isDense>(filter, rows, extractValues); break; case velox::common::FilterKind::kBigintValuesUsingBitmask: - readHelper( - filter, rows, extractValues); + static_cast(this) + ->template readHelper< + Reader, + velox::common::BigintValuesUsingBitmask, + isDense>(filter, rows, extractValues); break; case velox::common::FilterKind::kNegatedBigintValuesUsingHashTable: - readHelper< - Reader, - velox::common::NegatedBigintValuesUsingHashTable, - isDense>(filter, rows, extractValues); + static_cast(this) + ->template readHelper< + Reader, + velox::common::NegatedBigintValuesUsingHashTable, + isDense>(filter, rows, extractValues); break; case velox::common::FilterKind::kNegatedBigintValuesUsingBitmask: - readHelper< - Reader, - velox::common::NegatedBigintValuesUsingBitmask, - isDense>(filter, rows, extractValues); + static_cast(this) + ->template readHelper< + Reader, + velox::common::NegatedBigintValuesUsingBitmask, + isDense>(filter, rows, extractValues); break; default: - readHelper( - filter, rows, extractValues); + static_cast(this) + ->template readHelper( + filter, rows, extractValues); break; } } diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 71f0fce11622..78948fd69f57 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -377,7 +377,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { break; } case thrift::Type::INT96: { - auto numVeloxBytes = dictionary_.numValues * sizeof(Timestamp); + auto numVeloxBytes = dictionary_.numValues * sizeof(int128_t); dictionary_.values = AlignedBuffer::allocate(numVeloxBytes, &pool_); auto numBytes = dictionary_.numValues * sizeof(Int96Timestamp); if (pageData_) { @@ -392,23 +392,16 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { } // Expand the Parquet type length values to Velox type length. // We start from the end to allow in-place expansion. - auto values = dictionary_.values->asMutable(); + auto values = dictionary_.values->asMutable(); auto parquetValues = dictionary_.values->asMutable(); for (auto i = dictionary_.numValues - 1; i >= 0; --i) { - // Convert the timestamp into seconds and nanos since the Unix epoch, - // 00:00:00.000000 on 1 January 1970. - int64_t nanos; + int128_t result = 0; memcpy( - &nanos, + &result, parquetValues + i * sizeof(Int96Timestamp), - sizeof(int64_t)); - int32_t days; - memcpy( - &days, - parquetValues + i * sizeof(Int96Timestamp) + sizeof(int64_t), - sizeof(int32_t)); - values[i] = Timestamp::fromDaysAndNanos(days, nanos); + sizeof(Int96Timestamp)); + values[i] = result; } break; } diff --git a/velox/dwio/parquet/reader/TimestampColumnReader.h b/velox/dwio/parquet/reader/TimestampColumnReader.h index 0e1045774535..308743ecea4f 100644 --- a/velox/dwio/parquet/reader/TimestampColumnReader.h +++ b/velox/dwio/parquet/reader/TimestampColumnReader.h @@ -20,6 +20,31 @@ #include "velox/dwio/parquet/reader/ParquetColumnReader.h" namespace facebook::velox::parquet { +namespace { + +// Range filter for Parquet Int96 Timestamp. +class ParquetInt96TimestampRange : public common::TimestampRange { + public: + // @param lower Lower end of the range, inclusive. + // @param upper Upper end of the range, inclusive. + // @param nullAllowed Null values are passing the filter if true. + ParquetInt96TimestampRange( + const Timestamp& lower, + const Timestamp& upper, + bool nullAllowed) + : TimestampRange(lower, upper, nullAllowed) {} + + // Int96 is read as int128_t value and converted to Timestamp by extracting + // days and nanos. + bool testInt128(int128_t value) const final override { + const int32_t days = static_cast(value >> 64); + const uint64_t nanos = value & ((((1ULL << 63) - 1ULL) << 1) + 1); + const auto ts = Timestamp::fromDaysAndNanos(days, nanos); + return ts >= this->lower() && ts <= this->upper(); + } +}; + +} // namespace class TimestampColumnReader : public IntegerColumnReader { public: @@ -49,8 +74,14 @@ class TimestampColumnReader : public IntegerColumnReader { if (resultVector->isNullAt(i)) { continue; } - const auto timestamp = rawValues[i]; - uint64_t nanos = timestamp.getNanos(); + + // Convert int128_t to Timestamp by extracting days and nanos. + const int128_t encoded = reinterpret_cast(rawValues[i]); + const int32_t days = static_cast(encoded >> 64); + uint64_t nanos = encoded & ((((1ULL << 63) - 1ULL) << 1) + 1); + const auto timestamp = Timestamp::fromDaysAndNanos(days, nanos); + + nanos = timestamp.getNanos(); switch (timestampPrecision_) { case TimestampPrecision::kMilliseconds: nanos = nanos / 1'000'000 * 1'000'000; @@ -65,14 +96,47 @@ class TimestampColumnReader : public IntegerColumnReader { } } + template < + typename Reader, + typename TFilter, + bool isDense, + typename ExtractValues> + void readHelper( + velox::common::Filter* filter, + const RowSet& rows, + ExtractValues extractValues) { + if (auto* range = dynamic_cast(filter)) { + // Convert TimestampRange to ParquetInt96TimestampRange. + ParquetInt96TimestampRange newRange = ParquetInt96TimestampRange( + range->lower(), range->upper(), range->nullAllowed()); + this->readWithVisitor( + rows, + dwio::common::ColumnVisitor< + int128_t, + ParquetInt96TimestampRange, + ExtractValues, + isDense>(newRange, this, rows, extractValues)); + } else { + this->readWithVisitor( + rows, + dwio::common:: + ColumnVisitor( + *reinterpret_cast(filter), + this, + rows, + extractValues)); + } + return; + } + void read( int64_t offset, const RowSet& rows, const uint64_t* /*incomingNulls*/) override { auto& data = formatData_->as(); - // Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length. + // Use int128_t as a workaround. Timestamp in Velox is of 16-byte length. prepareRead(offset, rows, nullptr); - readCommon(rows); + readCommon(rows); readOffset_ += rows.back() + 1; } diff --git a/velox/dwio/parquet/tests/examples/timestamp_int96.parquet b/velox/dwio/parquet/tests/examples/timestamp_int96.parquet deleted file mode 100644 index ea3a125aab60..000000000000 Binary files a/velox/dwio/parquet/tests/examples/timestamp_int96.parquet and /dev/null differ diff --git a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp index 85853ae350c0..8450b01523b4 100644 --- a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp +++ b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp @@ -256,6 +256,20 @@ TEST_F(E2EFilterTest, integerDictionary) { 20); } +TEST_F(E2EFilterTest, timestampDirect) { + options_.enableDictionary = false; + options_.dataPageSize = 4 * 1024; + options_.writeInt96AsTimestamp = true; + + testWithTypes( + "timestamp_val_0:timestamp," + "timestamp_val_1:timestamp", + [&]() {}, + true, + {"timestamp_val_0", "timestamp_val_1"}, + 20); +} + TEST_F(E2EFilterTest, timestampDictionary) { options_.dataPageSize = 4 * 1024; options_.writeInt96AsTimestamp = true; diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index 58f7bfdb8040..69d249232447 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -197,16 +197,12 @@ class ParquetTableScanTest : public HiveConnectorTestBase { } // Write data to a parquet file on specified path. - // @param writeInt96AsTimestamp Write timestamp as Int96 if enabled. void writeToParquetFile( const std::string& path, const std::vector& data, - bool writeInt96AsTimestamp) { + WriterOptions options) { VELOX_CHECK_GT(data.size(), 0); - WriterOptions options; - options.writeInt96AsTimestamp = writeInt96AsTimestamp; - auto writeFile = std::make_unique(path, true, false); auto sink = std::make_unique( std::move(writeFile), path); @@ -223,6 +219,72 @@ class ParquetTableScanTest : public HiveConnectorTestBase { writer->close(); } + void testInt96TimestampRead(const WriterOptions& options) { + auto stringToTimestamp = [](std::string_view view) { + return util::fromTimestampString( + view.data(), + view.size(), + util::TimestampParseMode::kPrestoCast) + .thenOrThrow(folly::identity, [&](const Status& status) { + VELOX_USER_FAIL("{}", status.message()); + }); + }; + std::vector views = { + "2015-06-01 19:34:56", + "2015-06-02 19:34:56", + "2001-02-03 03:34:06", + "1998-03-01 08:01:06", + "2022-12-23 03:56:01", + "1980-01-24 00:23:07", + "1999-12-08 13:39:26", + "2023-04-21 09:09:34", + "2000-09-12 22:36:29", + "2007-12-12 04:27:56", + }; + std::vector values; + values.reserve(views.size()); + for (auto view : views) { + values.emplace_back(stringToTimestamp(view)); + } + + auto vector = makeRowVector( + {"t"}, + { + makeFlatVector(values), + }); + auto schema = asRowType(vector->type()); + auto file = TempFilePath::create(); + writeToParquetFile(file->getPath(), {vector}, options); + loadData(file->getPath(), schema, vector); + + assertSelectWithFilter({"t"}, {}, "", "SELECT t from tmp"); + assertSelectWithFilter( + {"t"}, + {}, + "t < TIMESTAMP '2000-09-12 22:36:29'", + "SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'"); + assertSelectWithFilter( + {"t"}, + {}, + "t <= TIMESTAMP '2000-09-12 22:36:29'", + "SELECT t from tmp where t <= TIMESTAMP '2000-09-12 22:36:29'"); + assertSelectWithFilter( + {"t"}, + {}, + "t > TIMESTAMP '1980-01-24 00:23:07'", + "SELECT t from tmp where t > TIMESTAMP '1980-01-24 00:23:07'"); + assertSelectWithFilter( + {"t"}, + {}, + "t >= TIMESTAMP '1980-01-24 00:23:07'", + "SELECT t from tmp where t >= TIMESTAMP '1980-01-24 00:23:07'"); + assertSelectWithFilter( + {"t"}, + {}, + "t == TIMESTAMP '2022-12-23 03:56:01'", + "SELECT t from tmp where t == TIMESTAMP '2022-12-23 03:56:01'"); + } + private: RowTypePtr getRowType(std::vector&& outputColumnNames) const { std::vector types; @@ -753,70 +815,18 @@ TEST_F(ParquetTableScanTest, sessionTimezone) { assertSelectWithTimezone({"a"}, "SELECT a FROM tmp", "Asia/Shanghai"); } -TEST_F(ParquetTableScanTest, timestampFilter) { - // Timestamp-int96.parquet holds one column (t: TIMESTAMP) and - // 10 rows in one row group. Data is in SNAPPY compressed format. - // The values are: - // |t | - // +-------------------+ - // |2015-06-01 19:34:56| - // |2015-06-02 19:34:56| - // |2001-02-03 03:34:06| - // |1998-03-01 08:01:06| - // |2022-12-23 03:56:01| - // |1980-01-24 00:23:07| - // |1999-12-08 13:39:26| - // |2023-04-21 09:09:34| - // |2000-09-12 22:36:29| - // |2007-12-12 04:27:56| - // +-------------------+ - auto vector = makeFlatVector( - {Timestamp(1433187296, 0), - Timestamp(1433273696, 0), - Timestamp(981171246, 0), - Timestamp(888739266, 0), - Timestamp(1671767761, 0), - Timestamp(317521387, 0), - Timestamp(944660366, 0), - Timestamp(1682068174, 0), - Timestamp(968798189, 0), - Timestamp(1197433676, 0)}); - - loadData( - getExampleFilePath("timestamp_int96.parquet"), - ROW({"t"}, {TIMESTAMP()}), - makeRowVector( - {"t"}, - { - vector, - })); +TEST_F(ParquetTableScanTest, timestampInt96Dictionary) { + WriterOptions options; + options.writeInt96AsTimestamp = true; + options.enableDictionary = true; + testInt96TimestampRead(options); +} - assertSelectWithFilter({"t"}, {}, "", "SELECT t from tmp"); - assertSelectWithFilter( - {"t"}, - {}, - "t < TIMESTAMP '2000-09-12 22:36:29'", - "SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'"); - assertSelectWithFilter( - {"t"}, - {}, - "t <= TIMESTAMP '2000-09-12 22:36:29'", - "SELECT t from tmp where t <= TIMESTAMP '2000-09-12 22:36:29'"); - assertSelectWithFilter( - {"t"}, - {}, - "t > TIMESTAMP '1980-01-24 00:23:07'", - "SELECT t from tmp where t > TIMESTAMP '1980-01-24 00:23:07'"); - assertSelectWithFilter( - {"t"}, - {}, - "t >= TIMESTAMP '1980-01-24 00:23:07'", - "SELECT t from tmp where t >= TIMESTAMP '1980-01-24 00:23:07'"); - assertSelectWithFilter( - {"t"}, - {}, - "t == TIMESTAMP '2022-12-23 03:56:01'", - "SELECT t from tmp where t == TIMESTAMP '2022-12-23 03:56:01'"); +TEST_F(ParquetTableScanTest, timestampInt96Plain) { + WriterOptions options; + options.writeInt96AsTimestamp = true; + options.enableDictionary = false; + testInt96TimestampRead(options); } TEST_F(ParquetTableScanTest, timestampPrecisionMicrosecond) { @@ -828,7 +838,9 @@ TEST_F(ParquetTableScanTest, timestampPrecisionMicrosecond) { }); auto schema = asRowType(vector->type()); auto file = TempFilePath::create(); - writeToParquetFile(file->getPath(), {vector}, true); + WriterOptions options; + options.writeInt96AsTimestamp = true; + writeToParquetFile(file->getPath(), {vector}, options); auto plan = PlanBuilder().tableScan(schema).planNode(); // Read timestamp data from parquet with microsecond precision. @@ -924,7 +936,9 @@ TEST_F(ParquetTableScanTest, readParquetColumnsByIndexMixedType) { const std::shared_ptr dataFileFolder = exec::test::TempDirectoryPath::create(); auto filePath = dataFileFolder->getPath() + "/" + "nested_data.parquet"; - writeToParquetFile(filePath, {dataFileVectors}, false); + WriterOptions options; + options.writeInt96AsTimestamp = false; + writeToParquetFile(filePath, {dataFileVectors}, options); // Create a row type with columns having different names than in the file. auto structType = ROW({"aa1", "bb1"}, {BIGINT(), BIGINT()}); @@ -1036,7 +1050,9 @@ TEST_F(ParquetTableScanTest, readParquetColumnsByIndex) { const std::shared_ptr dataFileFolder = exec::test::TempDirectoryPath::create(); auto filePath = dataFileFolder->getPath() + "/" + "data.parquet"; - writeToParquetFile(filePath, {dataFileVectors}, false); + WriterOptions options; + options.writeInt96AsTimestamp = false; + writeToParquetFile(filePath, {dataFileVectors}, options); auto rowType = ROW({"c2", "c3"}, {BIGINT(), BIGINT()}); diff --git a/velox/type/Filter.h b/velox/type/Filter.h index 4a84992f71d0..d1fc73e00b9c 100644 --- a/velox/type/Filter.h +++ b/velox/type/Filter.h @@ -1788,7 +1788,7 @@ class NegatedBytesRange final : public Filter { /// Open ranges can be implemented by using the value to the left /// or right of the end of the range, e.g. a < timestamp '2023-07-19 /// 17:00:00.777' is equivalent to a <= timestamp '2023-07-19 17:00:00.776'. -class TimestampRange final : public Filter { +class TimestampRange : public Filter { public: /// @param lower Lower end of the range, inclusive. /// @param upper Upper end of the range, inclusive. @@ -1824,11 +1824,6 @@ class TimestampRange final : public Filter { nullAllowed_ ? "with nulls" : "no nulls"); } - bool testInt128(int128_t value) const final { - const auto& ts = reinterpret_cast(value); - return ts >= lower_ && ts <= upper_; - } - bool testTimestamp(Timestamp value) const override { return value >= lower_ && value <= upper_; } @@ -1856,6 +1851,10 @@ class TimestampRange final : public Filter { return upper_; } + const bool nullAllowed() const { + return nullAllowed_; + } + bool testingEquals(const Filter& other) const final; private: