diff --git a/velox/dwio/common/tests/utils/DataSetBuilder.cpp b/velox/dwio/common/tests/utils/DataSetBuilder.cpp index 07b6b245003f..d42d8033677e 100644 --- a/velox/dwio/common/tests/utils/DataSetBuilder.cpp +++ b/velox/dwio/common/tests/utils/DataSetBuilder.cpp @@ -316,6 +316,36 @@ DataSetBuilder& DataSetBuilder::makeMapStringValues( return *this; } +void DataSetBuilder::adjustTimestampToPrecision( + VectorPtr batch, + TimestampPrecision precision) { + auto type = batch->type(); + + if (type->kind() == TypeKind::TIMESTAMP) { + auto rawValues = + batch->asUnchecked>()->mutableRawValues(); + for (auto i = 0; i < batch->size(); ++i) { + if (batch->isNullAt(i)) { + continue; + } + + rawValues[i].toPrecision(precision); + } + } else if (type->kind() == TypeKind::ROW) { + for (auto& child : batch->as()->children()) { + adjustTimestampToPrecision(child, precision); + } + } +} + +DataSetBuilder& DataSetBuilder::adjustTimestampToPrecision( + TimestampPrecision precision) { + for (auto& batch : *batches_) { + adjustTimestampToPrecision(batch, precision); + } + return *this; +} + std::unique_ptr> DataSetBuilder::build() { return std::move(batches_); } diff --git a/velox/dwio/common/tests/utils/DataSetBuilder.h b/velox/dwio/common/tests/utils/DataSetBuilder.h index 4893c28336f6..0ffb7e017b32 100644 --- a/velox/dwio/common/tests/utils/DataSetBuilder.h +++ b/velox/dwio/common/tests/utils/DataSetBuilder.h @@ -50,6 +50,11 @@ class DataSetBuilder { // groups. Tests skipping row groups based on row group stats. DataSetBuilder& withRowGroupSpecificData(int32_t numRowsPerGroup); + DataSetBuilder& adjustTimestampToPrecision(TimestampPrecision precision); + void adjustTimestampToPrecision( + VectorPtr batch, + TimestampPrecision precision); + // Makes all data in 'batches_' after firstRow non-null. This finds a sampling // of non-null values from each column and replaces nulls in the column in // question with one of these. A column where only nulls are found in sampling diff --git a/velox/dwio/parquet/reader/ParquetColumnReader.cpp b/velox/dwio/parquet/reader/ParquetColumnReader.cpp index 0266ac568a5a..641c7147da25 100644 --- a/velox/dwio/parquet/reader/ParquetColumnReader.cpp +++ b/velox/dwio/parquet/reader/ParquetColumnReader.cpp @@ -81,10 +81,10 @@ std::unique_ptr ParquetColumnReader::build( if (logicalTypeOpt.has_value() && logicalTypeOpt.value().__isset.TIMESTAMP && parquetFileType.parquetType_ == thrift::Type::INT64) { - return std::make_unique( + return std::make_unique( requestedType, fileType, params, scanSpec); } else { - return std::make_unique( + return std::make_unique( requestedType, fileType, params, scanSpec); } } diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 98cc38650e58..77f44a985896 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -105,6 +105,11 @@ class ReaderBase { TypePtr convertType( const thrift::SchemaElement& schemaElement, const TypePtr& requestedType) const; + + thrift::LogicalType getTimestampLogicalType( + thrift::ConvertedType::type type) const; + + TypePtr convertType(const thrift::SchemaElement& schemaElement) const; template static std::shared_ptr createRowType( @@ -554,38 +559,14 @@ std::unique_ptr ReaderBase::getParquetColumnInfo( int32_t typeLength = schemaElement.__isset.type_length ? schemaElement.type_length : 0; std::vector> children; - std::optional logicalType_ = + std::optional logicalType = schemaElement.__isset.logicalType ? std::optional(schemaElement.logicalType) : std::nullopt; if (veloxType->kind() == TypeKind::TIMESTAMP && - schemaElement.type == thrift::Type::INT64 && - !logicalType_.has_value()) { - // Construct logical type from deprecated converted type of Parquet. - thrift::TimestampType timestamp; - timestamp.__set_isAdjustedToUTC(true); - thrift::TimeUnit unit; - - if (schemaElement.converted_type == - thrift::ConvertedType::TIMESTAMP_MICROS) { - thrift::MicroSeconds micros; - unit.__set_MICROS(micros); - } else if ( - schemaElement.converted_type == - thrift::ConvertedType::TIMESTAMP_MILLIS) { - thrift::MilliSeconds millis; - unit.__set_MILLIS(millis); - } else { - VELOX_NYI( - "{} Timestamp unit not supported.", schemaElement.converted_type); - } - - timestamp.__set_unit(unit); - thrift::LogicalType newLogicalType; - newLogicalType.__set_TIMESTAMP(timestamp); - - logicalType_ = std::optional(newLogicalType); + schemaElement.type == thrift::Type::INT64 && !logicalType.has_value()) { + logicalType = getTimestampLogicalType(schemaElement.converted_type); } auto leafTypePtr = std::make_unique( @@ -596,7 +577,7 @@ std::unique_ptr ReaderBase::getParquetColumnInfo( columnIdx++, name, schemaElement.type, - logicalType_, + logicalType, maxRepeat, maxDefine, isOptional, @@ -632,6 +613,29 @@ std::unique_ptr ReaderBase::getParquetColumnInfo( return nullptr; } +thrift::LogicalType ReaderBase::getTimestampLogicalType( + thrift::ConvertedType::type convertedType) const { + thrift::TimestampType timestamp; + timestamp.__set_isAdjustedToUTC(true); + thrift::TimeUnit unit; + + if (convertedType == thrift::ConvertedType::TIMESTAMP_MICROS) { + thrift::MicroSeconds micros; + unit.__set_MICROS(micros); + } else if (convertedType == thrift::ConvertedType::TIMESTAMP_MILLIS) { + thrift::MilliSeconds millis; + unit.__set_MILLIS(millis); + } else { + VELOX_NYI("{} Timestamp unit not supported.", convertedType); + } + + timestamp.__set_unit(unit); + thrift::LogicalType logicalType; + logicalType.__set_TIMESTAMP(timestamp); + + return logicalType; +} + TypePtr ReaderBase::convertType( const thrift::SchemaElement& schemaElement, const TypePtr& requestedType) const { diff --git a/velox/dwio/parquet/reader/TimestampColumnReader.h b/velox/dwio/parquet/reader/TimestampColumnReader.h index 58f36ea73d35..80c7cf215b75 100644 --- a/velox/dwio/parquet/reader/TimestampColumnReader.h +++ b/velox/dwio/parquet/reader/TimestampColumnReader.h @@ -21,9 +21,9 @@ namespace facebook::velox::parquet { -class TimestampINT96ColumnReader : public IntegerColumnReader { +class TimestampInt96ColumnReader : public IntegerColumnReader { public: - TimestampINT96ColumnReader( + TimestampInt96ColumnReader( const TypePtr& requestedType, std::shared_ptr fileType, ParquetParams& params, @@ -49,19 +49,7 @@ class TimestampINT96ColumnReader : public IntegerColumnReader { if (resultVector->isNullAt(i)) { continue; } - const auto timestamp = rawValues[i]; - uint64_t nanos = timestamp.getNanos(); - switch (timestampPrecision_) { - case TimestampPrecision::kMilliseconds: - nanos = nanos / 1'000'000 * 1'000'000; - break; - case TimestampPrecision::kMicroseconds: - nanos = nanos / 1'000 * 1'000; - break; - case TimestampPrecision::kNanoseconds: - break; - } - rawValues[i] = Timestamp(timestamp.getSeconds(), nanos); + rawValues[i].toPrecision(timestampPrecision_); } } @@ -82,9 +70,9 @@ class TimestampINT96ColumnReader : public IntegerColumnReader { TimestampPrecision timestampPrecision_; }; -class TimestampINT64ColumnReader : public IntegerColumnReader { +class TimestampInt64ColumnReader : public IntegerColumnReader { public: - TimestampINT64ColumnReader( + TimestampInt64ColumnReader( const TypePtr& requestedType, std::shared_ptr fileType, ParquetParams& params, @@ -112,7 +100,7 @@ class TimestampINT64ColumnReader : public IntegerColumnReader { } bool hasBulkPath() const override { - return false; + return true; } void @@ -180,42 +168,23 @@ class TimestampINT64ColumnReader : public IntegerColumnReader { getFlatValues(rows, result, requestedType_); } - Timestamp adjustToPrecision( - const Timestamp& timestamp, - TimestampPrecision precision) { - auto nano = timestamp.getNanos(); - switch (precision) { - case TimestampPrecision::kMilliseconds: - nano = nano / 1'000'000 * 1'000'000; - break; - case TimestampPrecision::kMicroseconds: - nano = nano / 1'000 * 1'000; - break; - case TimestampPrecision::kNanoseconds: - break; - } - - return Timestamp(timestamp.getSeconds(), nano); - } - - void read( - vector_size_t offset, - RowSet rows, - const uint64_t* /*incomingNulls*/) override { - auto& data = formatData_->as(); - prepareRead(offset, rows, nullptr); - - // Remove filter so that we can do filtering here once data is represented - // as timestamp type - const auto filter = scanSpec_->filter()->clone(); - scanSpec_->setFilter(nullptr); - - readCommon(rows); + template + void readHelper(common::Filter* filter, RowSet rows) { + dwio::common::ExtractToReader extractValues(this); + common::AlwaysTrue alwaysTrue; + dwio::common::ColumnVisitor< + int64_t, + common::AlwaysTrue, + decltype(extractValues), + isDense> + visitor(alwaysTrue, this, rows, extractValues); + readWithVisitor(rows, visitor); auto tsValues = AlignedBuffer::allocate(numValues_, &memoryPool_); auto rawTs = tsValues->asMutable(); auto rawTsInt64 = values_->asMutable(); + const auto rawNulls = resultNulls() ? resultNulls()->as() : nullptr; @@ -227,12 +196,14 @@ class TimestampINT64ColumnReader : public IntegerColumnReader { } else { timestamp = Timestamp::fromMillis(rawTsInt64[i]); } - - rawTs[i] = adjustToPrecision(timestamp, timestampPrecision_); + rawTs[i] = timestamp; + rawTs[i].toPrecision(timestampPrecision_); } } + values_ = tsValues; rawValues_ = values_->asMutable(); + outputRows_.clear(); switch ( !filter || @@ -253,11 +224,26 @@ class TimestampINT64ColumnReader : public IntegerColumnReader { break; case common::FilterKind::kTimestampRange: case common::FilterKind::kMultiRange: - processFilter(filter.get(), rows, rawNulls); + processFilter(filter, rows, rawNulls); break; default: VELOX_UNSUPPORTED("Unsupported filter."); } + } + + void read( + vector_size_t offset, + RowSet rows, + const uint64_t* /*incomingNulls*/) override { + auto& data = formatData_->as(); + prepareRead(offset, rows, nullptr); + + bool isDense = rows.back() == rows.size() - 1; + if (isDense) { + readHelper(scanSpec_->filter(), rows); + } else { + readHelper(scanSpec_->filter(), rows); + } readOffset_ += rows.back() + 1; } diff --git a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp index 0b27395f9a00..d248f1de6294 100644 --- a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp +++ b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp @@ -269,6 +269,82 @@ TEST_F(E2EFilterTest, timestampDictionary) { 20); } +TEST_F(E2EFilterTest, timestampINT64MillisDictionary) { + options_.enableDictionary = true; + options_.dataPageSize = 4 * 1024; + options_.parquetWriteTimestampUnit = + static_cast(TimestampUnit::kMilli); + options_.parquetWriteTimestampTimeZone = "utc"; + + testWithTypes( + "timestamp_val_0:timestamp," + "timestamp_val_1:timestamp", + [&]() { + dataSetBuilder_->adjustTimestampToPrecision( + TimestampPrecision::kMilliseconds); + }, + true, + {"timestamp_val_0", "timestamp_val_1"}, + 1); +} + +TEST_F(E2EFilterTest, timestampINT64MillisPlain) { + options_.enableDictionary = false; + options_.dataPageSize = 4 * 1024; + options_.parquetWriteTimestampUnit = + static_cast(TimestampUnit::kMilli); + options_.parquetWriteTimestampTimeZone = "utc"; + + testWithTypes( + "timestamp_val_0:timestamp," + "timestamp_val_1:timestamp", + [&]() { + dataSetBuilder_->adjustTimestampToPrecision( + TimestampPrecision::kMilliseconds); + }, + true, + {"timestamp_val_0", "timestamp_val_1"}, + 1); +} + +TEST_F(E2EFilterTest, timestampINT64MicrosDictionary) { + options_.enableDictionary = true; + options_.dataPageSize = 4 * 1024; + options_.parquetWriteTimestampUnit = + static_cast(TimestampUnit::kMicro); + options_.parquetWriteTimestampTimeZone = "utc"; + + testWithTypes( + "timestamp_val_0:timestamp," + "timestamp_val_1:timestamp", + [&]() { + dataSetBuilder_->adjustTimestampToPrecision( + TimestampPrecision::kMicroseconds); + }, + true, + {"timestamp_val_0", "timestamp_val_1"}, + 1); +} + +TEST_F(E2EFilterTest, timestampINT64MicrosPlain) { + options_.enableDictionary = false; + options_.dataPageSize = 4 * 1024; + options_.parquetWriteTimestampUnit = + static_cast(TimestampUnit::kMicro); + options_.parquetWriteTimestampTimeZone = "utc"; + + testWithTypes( + "timestamp_val_0:timestamp," + "timestamp_val_1:timestamp", + [&]() { + dataSetBuilder_->adjustTimestampToPrecision( + TimestampPrecision::kMicroseconds); + }, + true, + {"timestamp_val_0", "timestamp_val_1"}, + 1); +} + TEST_F(E2EFilterTest, floatAndDoubleDirect) { options_.enableDictionary = false; options_.dataPageSize = 4 * 1024; diff --git a/velox/type/Timestamp.h b/velox/type/Timestamp.h index 44117bc7886b..57cec6ca5f3b 100644 --- a/velox/type/Timestamp.h +++ b/velox/type/Timestamp.h @@ -195,6 +195,19 @@ struct Timestamp { } } + void toPrecision(const TimestampPrecision& precision) { + switch (precision) { + case TimestampPrecision::kMilliseconds: + nanos_ = nanos_ / 1'000'000 * 1'000'000; + break; + case TimestampPrecision::kMicroseconds: + nanos_ = nanos_ / 1'000 * 1'000; + break; + case TimestampPrecision::kNanoseconds: + break; + } + } + /// Exports the current timestamp as a std::chrono::time_point of millisecond /// precision. Note that the conversion may overflow since the internal /// `seconds_` value will need to be multiplied by 1000.