diff --git a/velox/dwio/parquet/reader/TimestampColumnReader.h b/velox/dwio/parquet/reader/TimestampColumnReader.h index 040b03e39964..7f5670e21fb3 100644 --- a/velox/dwio/parquet/reader/TimestampColumnReader.h +++ b/velox/dwio/parquet/reader/TimestampColumnReader.h @@ -20,6 +20,31 @@ #include "velox/dwio/parquet/reader/ParquetColumnReader.h" namespace facebook::velox::parquet { +namespace { + +// Range filter for Parquet Int96 Timestamp. +class ParquetInt96TimestampRange : public common::TimestampRange { + public: + // @param lower Lower end of the range, inclusive. + // @param upper Upper end of the range, inclusive. + // @param nullAllowed Null values are passing the filter if true. + ParquetInt96TimestampRange( + const Timestamp& lower, + const Timestamp& upper, + bool nullAllowed) + : TimestampRange(lower, upper, nullAllowed) {} + + // Int96 is read as int128_t value and converted to Timestamp by extracting + // days and nanos. + bool testInt128(int128_t value) const final override { + const int32_t days = static_cast(value >> 64); + const uint64_t nanos = value & ((((1ULL << 63) - 1ULL) << 1) + 1); + const auto ts = Timestamp::fromDaysAndNanos(days, nanos); + return ts >= this->lower() && ts <= this->upper(); + } +}; + +} // namespace class TimestampColumnReader : public IntegerColumnReader { public: @@ -71,6 +96,123 @@ class TimestampColumnReader : public IntegerColumnReader { } } + template < + typename Reader, + typename TFilter, + bool isDense, + typename ExtractValues> + void readHelperTimestamp( + velox::common::Filter* filter, + const RowSet& rows, + ExtractValues extractValues) { + if constexpr (std::is_same_v) { + // Convert TimestampRange to ParquetInt96TimestampRange. + auto* range = reinterpret_cast(filter); + ParquetInt96TimestampRange newRange = ParquetInt96TimestampRange( + range->lower(), range->upper(), range->nullAllowed()); + reinterpret_cast(this)->Reader::readWithVisitor( + rows, + dwio::common::ColumnVisitor< + int128_t, + ParquetInt96TimestampRange, + ExtractValues, + isDense>(newRange, this, rows, extractValues)); + } else { + reinterpret_cast(this)->Reader::readWithVisitor( + rows, + dwio::common:: + ColumnVisitor( + *reinterpret_cast(filter), + this, + rows, + extractValues)); + } + return; + } + + template < + typename Reader, + bool isDense, + bool kEncodingHasNulls, + typename ExtractValues> + void processTimestampFilter( + velox::common::Filter* filter, + ExtractValues extractValues, + const RowSet& rows) { + if (filter == nullptr) { + readHelperTimestamp( + &dwio::common::alwaysTrue(), rows, extractValues); + return; + } + + switch (filter->kind()) { + case velox::common::FilterKind::kAlwaysTrue: + readHelperTimestamp( + filter, rows, extractValues); + break; + case velox::common::FilterKind::kIsNull: + if constexpr (kEncodingHasNulls) { + filterNulls( + rows, + true, + !std:: + is_same_v); + } else { + readHelperTimestamp( + filter, rows, extractValues); + } + break; + case velox::common::FilterKind::kIsNotNull: + if constexpr ( + kEncodingHasNulls && + std::is_same_v) { + filterNulls(rows, false, false); + } else { + readHelperTimestamp( + filter, rows, extractValues); + } + break; + case velox::common::FilterKind::kTimestampRange: + readHelperTimestamp( + filter, rows, extractValues); + break; + default: + VELOX_UNREACHABLE(); + } + } + + template + void readTimestamp(const RowSet& rows) { + const bool isDense = rows.back() == rows.size() - 1; + velox::common::Filter* filter = + scanSpec_->filter() ? scanSpec_->filter() : &dwio::common::alwaysTrue(); + if (scanSpec_->keepValues()) { + if (scanSpec_->valueHook()) { + if (isDense) { + processValueHook(rows, scanSpec_->valueHook()); + } else { + processValueHook(rows, scanSpec_->valueHook()); + } + } else { + if (isDense) { + processTimestampFilter( + filter, dwio::common::ExtractToReader(this), rows); + } else { + processTimestampFilter( + filter, dwio::common::ExtractToReader(this), rows); + } + } + } else { + if (isDense) { + processTimestampFilter( + filter, dwio::common::DropValues(), rows); + } else { + processTimestampFilter( + filter, dwio::common::DropValues(), rows); + } + } + } + void read( int64_t offset, const RowSet& rows, @@ -78,7 +220,7 @@ class TimestampColumnReader : public IntegerColumnReader { auto& data = formatData_->as(); // Use int128_t as a workaround. Timestamp in Velox is of 16-byte length. prepareRead(offset, rows, nullptr); - readCommon(rows); + readTimestamp(rows); readOffset_ += rows.back() + 1; } diff --git a/velox/dwio/parquet/tests/examples/timestamp_int96_dictionary.parquet b/velox/dwio/parquet/tests/examples/timestamp_int96_dictionary.parquet deleted file mode 100644 index ea3a125aab60..000000000000 Binary files a/velox/dwio/parquet/tests/examples/timestamp_int96_dictionary.parquet and /dev/null differ diff --git a/velox/dwio/parquet/tests/examples/timestamp_int96_plain.parquet b/velox/dwio/parquet/tests/examples/timestamp_int96_plain.parquet deleted file mode 100644 index 8666703ea732..000000000000 Binary files a/velox/dwio/parquet/tests/examples/timestamp_int96_plain.parquet and /dev/null differ diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index e910022a6329..69d249232447 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -197,16 +197,12 @@ class ParquetTableScanTest : public HiveConnectorTestBase { } // Write data to a parquet file on specified path. - // @param writeInt96AsTimestamp Write timestamp as Int96 if enabled. void writeToParquetFile( const std::string& path, const std::vector& data, - bool writeInt96AsTimestamp) { + WriterOptions options) { VELOX_CHECK_GT(data.size(), 0); - WriterOptions options; - options.writeInt96AsTimestamp = writeInt96AsTimestamp; - auto writeFile = std::make_unique(path, true, false); auto sink = std::make_unique( std::move(writeFile), path); @@ -223,43 +219,43 @@ class ParquetTableScanTest : public HiveConnectorTestBase { writer->close(); } - void testInt96TimestampRead(const std::string& fileName) { - // Timestamp-int96.parquet holds one column (t: TIMESTAMP) and - // 10 rows in one row group. Data is in SNAPPY compressed format. - // The values are: - // |t | - // +-------------------+ - // |2015-06-01 19:34:56| - // |2015-06-02 19:34:56| - // |2001-02-03 03:34:06| - // |1998-03-01 08:01:06| - // |2022-12-23 03:56:01| - // |1980-01-24 00:23:07| - // |1999-12-08 13:39:26| - // |2023-04-21 09:09:34| - // |2000-09-12 22:36:29| - // |2007-12-12 04:27:56| - // +-------------------+ - auto vector = makeFlatVector( - {Timestamp(1433187296, 0), - Timestamp(1433273696, 0), - Timestamp(981171246, 0), - Timestamp(888739266, 0), - Timestamp(1671767761, 0), - Timestamp(317521387, 0), - Timestamp(944660366, 0), - Timestamp(1682068174, 0), - Timestamp(968798189, 0), - Timestamp(1197433676, 0)}); - - loadData( - getExampleFilePath(fileName), - ROW({"t"}, {TIMESTAMP()}), - makeRowVector( - {"t"}, - { - vector, - })); + void testInt96TimestampRead(const WriterOptions& options) { + auto stringToTimestamp = [](std::string_view view) { + return util::fromTimestampString( + view.data(), + view.size(), + util::TimestampParseMode::kPrestoCast) + .thenOrThrow(folly::identity, [&](const Status& status) { + VELOX_USER_FAIL("{}", status.message()); + }); + }; + std::vector views = { + "2015-06-01 19:34:56", + "2015-06-02 19:34:56", + "2001-02-03 03:34:06", + "1998-03-01 08:01:06", + "2022-12-23 03:56:01", + "1980-01-24 00:23:07", + "1999-12-08 13:39:26", + "2023-04-21 09:09:34", + "2000-09-12 22:36:29", + "2007-12-12 04:27:56", + }; + std::vector values; + values.reserve(views.size()); + for (auto view : views) { + values.emplace_back(stringToTimestamp(view)); + } + + auto vector = makeRowVector( + {"t"}, + { + makeFlatVector(values), + }); + auto schema = asRowType(vector->type()); + auto file = TempFilePath::create(); + writeToParquetFile(file->getPath(), {vector}, options); + loadData(file->getPath(), schema, vector); assertSelectWithFilter({"t"}, {}, "", "SELECT t from tmp"); assertSelectWithFilter( @@ -820,11 +816,17 @@ TEST_F(ParquetTableScanTest, sessionTimezone) { } TEST_F(ParquetTableScanTest, timestampInt96Dictionary) { - testInt96TimestampRead("timestamp_int96_dictionary.parquet"); + WriterOptions options; + options.writeInt96AsTimestamp = true; + options.enableDictionary = true; + testInt96TimestampRead(options); } TEST_F(ParquetTableScanTest, timestampInt96Plain) { - testInt96TimestampRead("timestamp_int96_plain.parquet"); + WriterOptions options; + options.writeInt96AsTimestamp = true; + options.enableDictionary = false; + testInt96TimestampRead(options); } TEST_F(ParquetTableScanTest, timestampPrecisionMicrosecond) { @@ -836,7 +838,9 @@ TEST_F(ParquetTableScanTest, timestampPrecisionMicrosecond) { }); auto schema = asRowType(vector->type()); auto file = TempFilePath::create(); - writeToParquetFile(file->getPath(), {vector}, true); + WriterOptions options; + options.writeInt96AsTimestamp = true; + writeToParquetFile(file->getPath(), {vector}, options); auto plan = PlanBuilder().tableScan(schema).planNode(); // Read timestamp data from parquet with microsecond precision. @@ -932,7 +936,9 @@ TEST_F(ParquetTableScanTest, readParquetColumnsByIndexMixedType) { const std::shared_ptr dataFileFolder = exec::test::TempDirectoryPath::create(); auto filePath = dataFileFolder->getPath() + "/" + "nested_data.parquet"; - writeToParquetFile(filePath, {dataFileVectors}, false); + WriterOptions options; + options.writeInt96AsTimestamp = false; + writeToParquetFile(filePath, {dataFileVectors}, options); // Create a row type with columns having different names than in the file. auto structType = ROW({"aa1", "bb1"}, {BIGINT(), BIGINT()}); @@ -1044,7 +1050,9 @@ TEST_F(ParquetTableScanTest, readParquetColumnsByIndex) { const std::shared_ptr dataFileFolder = exec::test::TempDirectoryPath::create(); auto filePath = dataFileFolder->getPath() + "/" + "data.parquet"; - writeToParquetFile(filePath, {dataFileVectors}, false); + WriterOptions options; + options.writeInt96AsTimestamp = false; + writeToParquetFile(filePath, {dataFileVectors}, options); auto rowType = ROW({"c2", "c3"}, {BIGINT(), BIGINT()}); diff --git a/velox/type/Filter.h b/velox/type/Filter.h index 89b95a31ec23..d1fc73e00b9c 100644 --- a/velox/type/Filter.h +++ b/velox/type/Filter.h @@ -1788,7 +1788,7 @@ class NegatedBytesRange final : public Filter { /// Open ranges can be implemented by using the value to the left /// or right of the end of the range, e.g. a < timestamp '2023-07-19 /// 17:00:00.777' is equivalent to a <= timestamp '2023-07-19 17:00:00.776'. -class TimestampRange final : public Filter { +class TimestampRange : public Filter { public: /// @param lower Lower end of the range, inclusive. /// @param upper Upper end of the range, inclusive. @@ -1824,14 +1824,6 @@ class TimestampRange final : public Filter { nullAllowed_ ? "with nulls" : "no nulls"); } - bool testInt128(int128_t value) const final { - // Convert int128_t to Timestamp by extracting days and nanos. - const int32_t days = static_cast(value >> 64); - const uint64_t nanos = value & ((((1ULL << 63) - 1ULL) << 1) + 1); - const auto ts = Timestamp::fromDaysAndNanos(days, nanos); - return ts >= lower_ && ts <= upper_; - } - bool testTimestamp(Timestamp value) const override { return value >= lower_ && value <= upper_; } @@ -1859,6 +1851,10 @@ class TimestampRange final : public Filter { return upper_; } + const bool nullAllowed() const { + return nullAllowed_; + } + bool testingEquals(const Filter& other) const final; private: