diff --git a/cpp/src/arrow/util/hashing.h b/cpp/src/arrow/util/hashing.h index 4ead1a7283d81..52525a83aa2ea 100644 --- a/cpp/src/arrow/util/hashing.h +++ b/cpp/src/arrow/util/hashing.h @@ -843,6 +843,14 @@ class BinaryMemoTable : public MemoTable { } } + // Visit the stored value at a specific index in insertion order. + // The visitor function should have the signature `void(std::string_view)` + // or `void(const std::string_view&)`. + template + void VisitValue(int32_t idx, VisitFunc&& visit) const { + visit(binary_builder_.GetView(idx)); + } + protected: struct Payload { int32_t memo_index; diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 9c28b749e4319..0a9f92cebbbc4 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -181,6 +181,7 @@ set(PARQUET_SRCS printer.cc properties.cc schema.cc + size_statistics.cc statistics.cc stream_reader.cc stream_writer.cc @@ -373,6 +374,7 @@ add_parquet_test(internals-test metadata_test.cc page_index_test.cc public_api_test.cc + size_statistics_test.cc types_test.cc) set_source_files_properties(public_api_test.cc PROPERTIES SKIP_PRECOMPILE_HEADERS ON diff --git a/cpp/src/parquet/column_page.h b/cpp/src/parquet/column_page.h index b389ffd98e6c7..111265a842ee7 100644 --- a/cpp/src/parquet/column_page.h +++ b/cpp/src/parquet/column_page.h @@ -26,6 +26,7 @@ #include #include +#include "parquet/size_statistics.h" #include "parquet/statistics.h" #include "parquet/types.h" @@ -69,20 +70,22 @@ class DataPage : public Page { /// Currently it is only present from data pages created by ColumnWriter in order /// to collect page index. std::optional first_row_index() const { return first_row_index_; } + const SizeStatistics& size_statistics() const { return size_statistics_; } virtual ~DataPage() = default; protected: DataPage(PageType::type type, const std::shared_ptr& buffer, int32_t num_values, Encoding::type encoding, int64_t uncompressed_size, - EncodedStatistics statistics = EncodedStatistics(), - std::optional first_row_index = std::nullopt) + EncodedStatistics statistics, std::optional first_row_index, + SizeStatistics size_statistics) : Page(buffer, type), num_values_(num_values), encoding_(encoding), uncompressed_size_(uncompressed_size), statistics_(std::move(statistics)), - first_row_index_(std::move(first_row_index)) {} + first_row_index_(std::move(first_row_index)), + size_statistics_(std::move(size_statistics)) {} int32_t num_values_; Encoding::type encoding_; @@ -90,6 +93,7 @@ class DataPage : public Page { EncodedStatistics statistics_; /// Row ordinal within the row group to the first row in the data page. std::optional first_row_index_; + SizeStatistics size_statistics_; }; class DataPageV1 : public DataPage { @@ -98,9 +102,11 @@ class DataPageV1 : public DataPage { Encoding::type encoding, Encoding::type definition_level_encoding, Encoding::type repetition_level_encoding, int64_t uncompressed_size, EncodedStatistics statistics = EncodedStatistics(), - std::optional first_row_index = std::nullopt) + std::optional first_row_index = std::nullopt, + SizeStatistics size_statistics = SizeStatistics()) : DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, uncompressed_size, - std::move(statistics), std::move(first_row_index)), + std::move(statistics), std::move(first_row_index), + std::move(size_statistics)), definition_level_encoding_(definition_level_encoding), repetition_level_encoding_(repetition_level_encoding) {} @@ -120,9 +126,11 @@ class DataPageV2 : public DataPage { int32_t definition_levels_byte_length, int32_t repetition_levels_byte_length, int64_t uncompressed_size, bool is_compressed = false, EncodedStatistics statistics = EncodedStatistics(), - std::optional first_row_index = std::nullopt) + std::optional first_row_index = std::nullopt, + SizeStatistics size_statistics = SizeStatistics()) : DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding, uncompressed_size, - std::move(statistics), std::move(first_row_index)), + std::move(statistics), std::move(first_row_index), + std::move(size_statistics)), num_nulls_(num_nulls), num_rows_(num_rows), definition_levels_byte_length_(definition_levels_byte_length), diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index d3e0fdfa811c0..12cbcf20affa4 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -55,6 +55,7 @@ #include "parquet/platform.h" #include "parquet/properties.h" #include "parquet/schema.h" +#include "parquet/size_statistics.h" #include "parquet/statistics.h" #include "parquet/thrift_internal.h" #include "parquet/types.h" @@ -437,7 +438,7 @@ class SerializedPageWriter : public PageWriter { /// Collect page index if (column_index_builder_ != nullptr) { - column_index_builder_->AddPage(page.statistics()); + column_index_builder_->AddPage(page.statistics(), page.size_statistics()); } if (offset_index_builder_ != nullptr) { const int64_t compressed_size = output_data_len + header_size; @@ -451,8 +452,9 @@ class SerializedPageWriter : public PageWriter { /// start_pos is a relative offset in the buffered mode. It should be /// adjusted via OffsetIndexBuilder::Finish() after BufferedPageWriter /// has flushed all data pages. - offset_index_builder_->AddPage(start_pos, static_cast(compressed_size), - *page.first_row_index()); + offset_index_builder_->AddPage( + start_pos, static_cast(compressed_size), *page.first_row_index(), + page.size_statistics().unencoded_byte_array_data_bytes); } total_uncompressed_size_ += uncompressed_size + header_size; @@ -774,11 +776,17 @@ class ColumnWriterImpl { // Serializes Dictionary Page if enabled virtual void WriteDictionaryPage() = 0; + // A convenience struct to combine the encoded statistics and size statistics + struct StatisticsPair { + EncodedStatistics encoded_stats; + SizeStatistics size_stats; + }; + // Plain-encoded statistics of the current page - virtual EncodedStatistics GetPageStatistics() = 0; + virtual StatisticsPair GetPageStatistics() = 0; // Plain-encoded statistics of the whole chunk - virtual EncodedStatistics GetChunkStatistics() = 0; + virtual StatisticsPair GetChunkStatistics() = 0; // Merges page statistics into chunk statistics, then resets the values virtual void ResetPageStatistics() = 0; @@ -981,8 +989,7 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size, PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false)); ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size, values, uncompressed_data_->mutable_data()); - - EncodedStatistics page_stats = GetPageStatistics(); + auto [page_stats, page_size_stats] = GetPageStatistics(); page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path())); page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order()); ResetPageStatistics(); @@ -1006,13 +1013,15 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size, compressed_data->CopySlice(0, compressed_data->size(), allocator_)); std::unique_ptr page_ptr = std::make_unique( compressed_data_copy, num_values, encoding_, Encoding::RLE, Encoding::RLE, - uncompressed_size, std::move(page_stats), first_row_index); + uncompressed_size, std::move(page_stats), first_row_index, + std::move(page_size_stats)); total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader); data_pages_.push_back(std::move(page_ptr)); } else { // Eagerly write pages DataPageV1 page(compressed_data, num_values, encoding_, Encoding::RLE, Encoding::RLE, - uncompressed_size, std::move(page_stats), first_row_index); + uncompressed_size, std::move(page_stats), first_row_index, + std::move(page_size_stats)); WriteDataPage(page); } } @@ -1039,7 +1048,7 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size, ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size, compressed_values, combined->mutable_data()); - EncodedStatistics page_stats = GetPageStatistics(); + auto [page_stats, page_size_stats] = GetPageStatistics(); page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path())); page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order()); ResetPageStatistics(); @@ -1062,14 +1071,15 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size, combined->CopySlice(0, combined->size(), allocator_)); std::unique_ptr page_ptr = std::make_unique( combined, num_values, null_count, num_rows, encoding_, def_levels_byte_length, - rep_levels_byte_length, uncompressed_size, pager_->has_compressor(), page_stats, - first_row_index); + rep_levels_byte_length, uncompressed_size, pager_->has_compressor(), + std::move(page_stats), first_row_index, std::move(page_size_stats)); total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader); data_pages_.push_back(std::move(page_ptr)); } else { DataPageV2 page(combined, num_values, null_count, num_rows, encoding_, def_levels_byte_length, rep_levels_byte_length, uncompressed_size, - pager_->has_compressor(), page_stats, first_row_index); + pager_->has_compressor(), std::move(page_stats), first_row_index, + std::move(page_size_stats)); WriteDataPage(page); } } @@ -1083,7 +1093,7 @@ int64_t ColumnWriterImpl::Close() { FlushBufferedDataPages(); - EncodedStatistics chunk_statistics = GetChunkStatistics(); + auto [chunk_statistics, chunk_size_statistics] = GetChunkStatistics(); chunk_statistics.ApplyStatSizeLimits( properties_->max_statistics_size(descr_->path())); chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order()); @@ -1092,6 +1102,9 @@ int64_t ColumnWriterImpl::Close() { if (rows_written_ > 0 && chunk_statistics.is_set()) { metadata_->SetStatistics(chunk_statistics); } + if (rows_written_ > 0 && chunk_size_statistics.is_set()) { + metadata_->SetSizeStatistics(chunk_size_statistics); + } metadata_->SetKeyValueMetadata(key_value_metadata_); pager_->Close(has_dictionary_, fallback_); } @@ -1217,6 +1230,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< page_statistics_ = MakeStatistics(descr_, allocator_); chunk_statistics_ = MakeStatistics(descr_, allocator_); } + if (properties->size_statistics_level() == SizeStatisticsLevel::ColumnChunk || + properties->size_statistics_level() == SizeStatisticsLevel::PageAndColumnChunk) { + page_size_statistics_ = SizeStatistics::Make(descr_); + chunk_size_statistics_ = SizeStatistics::Make(descr_); + } pages_change_on_record_boundaries_ = properties->data_page_version() == ParquetDataPageVersion::V2 || properties->page_index_enabled(descr_->path()); @@ -1355,15 +1373,26 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< total_bytes_written_ += pager_->WriteDictionaryPage(page); } - EncodedStatistics GetPageStatistics() override { - EncodedStatistics result; - if (page_statistics_) result = page_statistics_->Encode(); + StatisticsPair GetPageStatistics() override { + StatisticsPair result; + if (page_statistics_) { + result.encoded_stats = page_statistics_->Encode(); + } + if (properties_->size_statistics_level() == SizeStatisticsLevel::PageAndColumnChunk) { + ARROW_DCHECK(page_size_statistics_ != nullptr); + result.size_stats = *page_size_statistics_; + } return result; } - EncodedStatistics GetChunkStatistics() override { - EncodedStatistics result; - if (chunk_statistics_) result = chunk_statistics_->Encode(); + StatisticsPair GetChunkStatistics() override { + StatisticsPair result; + if (chunk_statistics_) { + result.encoded_stats = chunk_statistics_->Encode(); + } + if (chunk_size_statistics_) { + result.size_stats = *chunk_size_statistics_; + } return result; } @@ -1372,6 +1401,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< chunk_statistics_->Merge(*page_statistics_); page_statistics_->Reset(); } + if (page_size_statistics_ != nullptr) { + chunk_size_statistics_->Merge(*page_size_statistics_); + page_size_statistics_->Reset(); + } } Type::type type() const override { return descr_->physical_type(); } @@ -1425,6 +1458,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< DictEncoder* current_dict_encoder_; std::shared_ptr page_statistics_; std::shared_ptr chunk_statistics_; + std::unique_ptr page_size_statistics_; + std::shared_ptr chunk_size_statistics_; bool pages_change_on_record_boundaries_; // If writing a sequence of ::arrow::DictionaryArray to the writer, we keep the @@ -1467,6 +1502,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< rows_written_ += num_values; num_buffered_rows_ += num_values; } + + UpdateLevelHistogram(num_values, def_levels, rep_levels); return values_to_write; } @@ -1558,6 +1595,47 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< rows_written_ += num_levels; num_buffered_rows_ += num_levels; } + + UpdateLevelHistogram(num_levels, def_levels, rep_levels); + } + + void UpdateLevelHistogram(int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels) const { + if (page_size_statistics_ == nullptr) { + return; + } + + auto add_levels = [](std::vector& level_histogram, + ::arrow::util::span levels) { + for (int16_t level : levels) { + ARROW_DCHECK_LT(level, static_cast(level_histogram.size())); + ++level_histogram[level]; + } + }; + + if (descr_->max_definition_level() > 0) { + add_levels(page_size_statistics_->definition_level_histogram, + {def_levels, static_cast(num_levels)}); + } else { + page_size_statistics_->definition_level_histogram[0] += num_levels; + } + + if (descr_->max_repetition_level() > 0) { + add_levels(page_size_statistics_->repetition_level_histogram, + {rep_levels, static_cast(num_levels)}); + } else { + page_size_statistics_->repetition_level_histogram[0] += num_levels; + } + } + + // Update the unencoded data bytes for ByteArray only per the specification. + void UpdateUnencodedDataBytes() const { + if constexpr (std::is_same_v) { + if (page_size_statistics_ != nullptr) { + page_size_statistics_->IncrementUnencodedByteArrayDataBytes( + current_encoder_->ReportUnencodedDataBytes()); + } + } } void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values, @@ -1611,6 +1689,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< if (page_statistics_ != nullptr) { page_statistics_->Update(values, num_values, num_nulls); } + UpdateUnencodedDataBytes(); } /// \brief Write values with spaces and update page statistics accordingly. @@ -1639,6 +1718,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, num_spaced_values, num_values, num_nulls); } + UpdateUnencodedDataBytes(); } }; @@ -1739,6 +1819,8 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( writeable_indices, MaybeReplaceValidity(writeable_indices, null_count, ctx->memory_pool)); dict_encoder->PutIndices(*writeable_indices); + // Update unencoded byte array data size to size statistics + UpdateUnencodedDataBytes(); CommitWriteAndCheckPageLimit(batch_size, batch_num_values, null_count, check_page); value_offset += batch_num_spaced_values; }; @@ -2219,6 +2301,7 @@ Status TypedColumnWriterImpl::WriteArrowDense( page_statistics_->IncrementNullCount(batch_size - non_null); page_statistics_->IncrementNumValues(non_null); } + UpdateUnencodedDataBytes(); CommitWriteAndCheckPageLimit(batch_size, batch_num_values, batch_size - non_null, check_page); CheckDictionarySizeLimit(); diff --git a/cpp/src/parquet/encoder.cc b/cpp/src/parquet/encoder.cc index 89d5d44c5219c..f41eb9a19123c 100644 --- a/cpp/src/parquet/encoder.cc +++ b/cpp/src/parquet/encoder.cc @@ -79,6 +79,15 @@ class EncoderImpl : virtual public Encoder { MemoryPool* memory_pool() const override { return pool_; } + int64_t ReportUnencodedDataBytes() override { + if (descr_->physical_type() != Type::BYTE_ARRAY) { + throw ParquetException("ReportUnencodedDataBytes is only supported for BYTE_ARRAY"); + } + int64_t bytes = unencoded_byte_array_data_bytes_; + unencoded_byte_array_data_bytes_ = 0; + return bytes; + } + protected: // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY const ColumnDescriptor* descr_; @@ -87,6 +96,8 @@ class EncoderImpl : virtual public Encoder { /// Type length from descr const int type_length_; + /// Number of unencoded bytes written to the encoder. Used for ByteArray type only. + int64_t unencoded_byte_array_data_bytes_ = 0; }; // ---------------------------------------------------------------------- @@ -132,6 +143,7 @@ class PlainEncoder : public EncoderImpl, virtual public TypedEncoder { DCHECK(length == 0 || data != nullptr) << "Value ptr cannot be NULL"; sink_.UnsafeAppend(&length, sizeof(uint32_t)); sink_.UnsafeAppend(data, static_cast(length)); + unencoded_byte_array_data_bytes_ += length; } void Put(const ByteArray& val) { @@ -513,6 +525,18 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder { static_cast(values[i + position]); } }); + + // Track unencoded bytes based on dictionary value type + if constexpr (std::is_same_v) { + // For ByteArray, need to look up actual lengths from dictionary + for (size_t idx = + buffer_position - static_cast(data.length() - data.null_count()); + idx < buffer_position; ++idx) { + memo_table_.VisitValue(buffered_indices_[idx], [&](std::string_view value) { + unencoded_byte_array_data_bytes_ += value.length(); + }); + } + } } void PutIndices(const ::arrow::Array& data) override { @@ -656,6 +680,7 @@ inline void DictEncoderImpl::PutByteArray(const void* ptr, PARQUET_THROW_NOT_OK( memo_table_.GetOrInsert(ptr, length, on_found, on_not_found, &memo_index)); buffered_indices_.push_back(memo_index); + unencoded_byte_array_data_bytes_ += length; } template <> @@ -1268,6 +1293,7 @@ class DeltaLengthByteArrayEncoder : public EncoderImpl, } length_encoder_.Put({static_cast(view.length())}, 1); PARQUET_THROW_NOT_OK(sink_.Append(view.data(), view.length())); + unencoded_byte_array_data_bytes_ += view.size(); return Status::OK(); }, []() { return Status::OK(); })); @@ -1313,6 +1339,7 @@ void DeltaLengthByteArrayEncoder::Put(const T* src, int num_values) { for (int idx = 0; idx < num_values; idx++) { sink_.UnsafeAppend(src[idx].ptr, src[idx].len); } + unencoded_byte_array_data_bytes_ += total_increment_size; } void DeltaLengthByteArrayEncoder::PutSpaced(const T* src, int num_values, @@ -1444,6 +1471,8 @@ class DeltaByteArrayEncoder : public EncoderImpl, virtual public TypedEncoder
(); type_length_ = descr_->type_length(); + unencoded_byte_array_data_bytes_ = 0; allocator_ = default_memory_pool(); } @@ -197,6 +198,8 @@ class TestEncodingBase : public ::testing::Test { draws_[nvalues * j + i] = draws_[i]; } } + + InitUnencodedByteArrayDataBytes(); } virtual void CheckRoundtrip() = 0; @@ -222,6 +225,16 @@ class TestEncodingBase : public ::testing::Test { } } + void InitUnencodedByteArrayDataBytes() { + // Calculate expected unencoded bytes based on type + if constexpr (std::is_same_v) { + unencoded_byte_array_data_bytes_ = 0; + for (int i = 0; i < num_values_; i++) { + unencoded_byte_array_data_bytes_ += draws_[i].len; + } + } + } + protected: MemoryPool* allocator_; @@ -235,6 +248,7 @@ class TestEncodingBase : public ::testing::Test { std::shared_ptr encode_buffer_; std::shared_ptr descr_; + int64_t unencoded_byte_array_data_bytes_; // unencoded data size for dense values }; // Member variables are not visible to templated subclasses. Possibly figure @@ -261,6 +275,10 @@ class TestPlainEncoding : public TestEncodingBase { auto decoder = MakeTypedDecoder(Encoding::PLAIN, descr_.get()); encoder->Put(draws_, num_values_); encode_buffer_ = encoder->FlushValues(); + if constexpr (std::is_same_v) { + ASSERT_EQ(encoder->ReportUnencodedDataBytes(), + this->unencoded_byte_array_data_bytes_); + } decoder->SetData(num_values_, encode_buffer_->data(), static_cast(encode_buffer_->size())); @@ -346,6 +364,10 @@ class TestDictionaryEncoding : public TestEncodingBase { AllocateBuffer(default_memory_pool(), dict_traits->dict_encoded_size()); dict_traits->WriteDict(dict_buffer_->mutable_data()); std::shared_ptr indices = encoder->FlushValues(); + if constexpr (std::is_same_v) { + ASSERT_EQ(encoder->ReportUnencodedDataBytes(), + this->unencoded_byte_array_data_bytes_); + } auto base_spaced_encoder = MakeEncoder(Type::type_num, Encoding::PLAIN, true, descr_.get()); @@ -1992,6 +2014,10 @@ class TestDeltaLengthByteArrayEncoding : public TestEncodingBase { encoder->Put(draws_, num_values_); encode_buffer_ = encoder->FlushValues(); + if constexpr (std::is_same_v) { + ASSERT_EQ(encoder->ReportUnencodedDataBytes(), + this->unencoded_byte_array_data_bytes_); + } decoder->SetData(num_values_, encode_buffer_->data(), static_cast(encode_buffer_->size())); @@ -2296,6 +2322,8 @@ class TestDeltaByteArrayEncoding : public TestDeltaLengthByteArrayEncoding draws_[nvalues * j + i] = draws_[i]; } } + + TestEncodingBase::InitUnencodedByteArrayDataBytes(); } Encoding::type GetEncoding() override { return Encoding::DELTA_BYTE_ARRAY; } diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index 8f577be45b96d..f47c61421936c 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -37,6 +37,7 @@ #include "parquet/exception.h" #include "parquet/schema.h" #include "parquet/schema_internal.h" +#include "parquet/size_statistics.h" #include "parquet/thrift_internal.h" namespace parquet { @@ -265,6 +266,11 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { LoadEnumSafe(&encoding_stats.encoding), encoding_stats.count}); } + if (column_metadata_->__isset.size_statistics) { + size_statistics_ = + std::make_shared(FromThrift(column_metadata_->size_statistics)); + size_statistics_->Validate(descr_); + } possible_stats_ = nullptr; InitKeyValueMetadata(); } @@ -308,6 +314,10 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { return is_stats_set() ? possible_stats_ : nullptr; } + inline std::shared_ptr size_statistics() const { + return size_statistics_; + } + inline Compression::type compression() const { return LoadEnumSafe(&column_metadata_->codec); } @@ -396,6 +406,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { const ReaderProperties properties_; const ApplicationVersion* writer_version_; std::shared_ptr key_value_metadata_; + std::shared_ptr size_statistics_; }; std::unique_ptr ColumnChunkMetaData::Make( @@ -439,6 +450,10 @@ std::shared_ptr ColumnChunkMetaData::statistics() const { bool ColumnChunkMetaData::is_stats_set() const { return impl_->is_stats_set(); } +std::shared_ptr ColumnChunkMetaData::size_statistics() const { + return impl_->size_statistics(); +} + std::optional ColumnChunkMetaData::bloom_filter_offset() const { return impl_->bloom_filter_offset(); } @@ -1543,6 +1558,10 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { column_chunk_->meta_data.__set_statistics(ToThrift(val)); } + void SetSizeStatistics(const SizeStatistics& size_stats) { + column_chunk_->meta_data.__set_size_statistics(ToThrift(size_stats)); + } + void Finish(int64_t num_values, int64_t dictionary_page_offset, int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary, @@ -1752,6 +1771,10 @@ void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics& result) impl_->SetStatistics(result); } +void ColumnChunkMetaDataBuilder::SetSizeStatistics(const SizeStatistics& size_stats) { + impl_->SetSizeStatistics(size_stats); +} + void ColumnChunkMetaDataBuilder::SetKeyValueMetadata( std::shared_ptr key_value_metadata) { impl_->SetKeyValueMetadata(std::move(key_value_metadata)); diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index dc97d816daa74..9a3964f7d6574 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -28,23 +28,9 @@ #include "parquet/encryption/type_fwd.h" #include "parquet/platform.h" #include "parquet/properties.h" -#include "parquet/schema.h" -#include "parquet/types.h" namespace parquet { -class ColumnDescriptor; -class EncodedStatistics; -class FileCryptoMetaData; -class Statistics; -class SchemaDescriptor; - -namespace schema { - -class ColumnPath; - -} // namespace schema - using KeyValueMetadata = ::arrow::KeyValueMetadata; class PARQUET_EXPORT ApplicationVersion { @@ -156,6 +142,7 @@ class PARQUET_EXPORT ColumnChunkMetaData { std::shared_ptr path_in_schema() const; bool is_stats_set() const; std::shared_ptr statistics() const; + std::shared_ptr size_statistics() const; Compression::type compression() const; // Indicate if the ColumnChunk compression is supported by the current @@ -451,6 +438,7 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder { // column metadata void SetStatistics(const EncodedStatistics& stats); + void SetSizeStatistics(const SizeStatistics& size_stats); void SetKeyValueMetadata(std::shared_ptr key_value_metadata); diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index afda4c6064b36..8cc819f10cacd 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -159,6 +159,22 @@ class TypedColumnIndexImpl : public TypedColumnIndex { const std::vector& max_values() const override { return max_values_; } + bool has_definition_level_histograms() const override { + return column_index_.__isset.definition_level_histograms; + } + + bool has_repetition_level_histograms() const override { + return column_index_.__isset.repetition_level_histograms; + } + + const std::vector& definition_level_histograms() const override { + return column_index_.definition_level_histograms; + } + + const std::vector& repetition_level_histograms() const override { + return column_index_.repetition_level_histograms; + } + private: /// Wrapped thrift column index. const format::ColumnIndex column_index_; @@ -178,14 +194,22 @@ class OffsetIndexImpl : public OffsetIndex { page_location.compressed_page_size, page_location.first_row_index}); } + if (offset_index.__isset.unencoded_byte_array_data_bytes) { + unencoded_byte_array_data_bytes_ = offset_index.unencoded_byte_array_data_bytes; + } } const std::vector& page_locations() const override { return page_locations_; } + const std::vector& unencoded_byte_array_data_bytes() const override { + return unencoded_byte_array_data_bytes_; + } + private: std::vector page_locations_; + std::vector unencoded_byte_array_data_bytes_; }; class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { @@ -460,7 +484,8 @@ class ColumnIndexBuilderImpl final : public ColumnIndexBuilder { column_index_.boundary_order = format::BoundaryOrder::UNORDERED; } - void AddPage(const EncodedStatistics& stats) override { + void AddPage(const EncodedStatistics& stats, + const SizeStatistics& size_stats) override { if (state_ == BuilderState::kFinished) { throw ParquetException("Cannot add page to finished ColumnIndexBuilder."); } else if (state_ == BuilderState::kDiscarded) { @@ -493,6 +518,17 @@ class ColumnIndexBuilderImpl final : public ColumnIndexBuilder { column_index_.__isset.null_counts = false; column_index_.null_counts.clear(); } + + if (size_stats.is_set()) { + const auto& page_def_level_hist = size_stats.definition_level_histogram; + const auto& page_ref_level_hist = size_stats.repetition_level_histogram; + column_index_.definition_level_histograms.insert( + column_index_.definition_level_histograms.end(), page_def_level_hist.cbegin(), + page_def_level_hist.cend()); + column_index_.repetition_level_histograms.insert( + column_index_.repetition_level_histograms.end(), page_ref_level_hist.cbegin(), + page_ref_level_hist.cend()); + } } void Finish() override { @@ -533,6 +569,29 @@ class ColumnIndexBuilderImpl final : public ColumnIndexBuilder { /// Decide the boundary order from decoded min/max values. auto boundary_order = DetermineBoundaryOrder(min_values, max_values); column_index_.__set_boundary_order(ToThrift(boundary_order)); + + // Finalize level histogram. + const int64_t num_pages = column_index_.null_pages.size(); + const int64_t def_level_hist_size = column_index_.definition_level_histograms.size(); + const int64_t rep_level_hist_size = column_index_.repetition_level_histograms.size(); + if (def_level_hist_size != 0 && + def_level_hist_size != (descr_->max_definition_level() + 1) * num_pages) { + std::stringstream ss; + ss << "Invalid definition level histogram size: " << def_level_hist_size + << ", expected: " << (descr_->max_definition_level() + 1) * num_pages; + throw ParquetException(ss.str()); + } + if (rep_level_hist_size != 0 && + rep_level_hist_size != (descr_->max_repetition_level() + 1) * num_pages) { + std::stringstream ss; + ss << "Invalid repetition level histogram size: " << rep_level_hist_size + << ", expected: " << (descr_->max_repetition_level() + 1) * num_pages; + throw ParquetException(ss.str()); + } + column_index_.__isset.definition_level_histograms = + !column_index_.definition_level_histograms.empty(); + column_index_.__isset.repetition_level_histograms = + !column_index_.repetition_level_histograms.empty(); } void WriteTo(::arrow::io::OutputStream* sink, Encryptor* encryptor) const override { @@ -604,8 +663,8 @@ class OffsetIndexBuilderImpl final : public OffsetIndexBuilder { public: OffsetIndexBuilderImpl() = default; - void AddPage(int64_t offset, int32_t compressed_page_size, - int64_t first_row_index) override { + void AddPage(int64_t offset, int32_t compressed_page_size, int64_t first_row_index, + std::optional unencoded_byte_array_length) override { if (state_ == BuilderState::kFinished) { throw ParquetException("Cannot add page to finished OffsetIndexBuilder."); } else if (state_ == BuilderState::kDiscarded) { @@ -620,6 +679,10 @@ class OffsetIndexBuilderImpl final : public OffsetIndexBuilder { page_location.__set_compressed_page_size(compressed_page_size); page_location.__set_first_row_index(first_row_index); offset_index_.page_locations.emplace_back(std::move(page_location)); + if (unencoded_byte_array_length.has_value()) { + offset_index_.unencoded_byte_array_data_bytes.emplace_back( + unencoded_byte_array_length.value()); + } } void Finish(int64_t final_position) override { @@ -636,6 +699,19 @@ class OffsetIndexBuilderImpl final : public OffsetIndexBuilder { page_location.__set_offset(page_location.offset + final_position); } } + + // Finalize unencoded_byte_array_data_bytes and make sure page sizes match. + if (offset_index_.page_locations.size() == + offset_index_.unencoded_byte_array_data_bytes.size()) { + offset_index_.__isset.unencoded_byte_array_data_bytes = true; + } else if (!offset_index_.unencoded_byte_array_data_bytes.empty()) { + std::stringstream ss; + ss << "Invalid count of unencoded BYTE_ARRAY data bytes: " + << offset_index_.unencoded_byte_array_data_bytes.size() + << ", expected page count: " << offset_index_.page_locations.size(); + throw ParquetException(ss.str()); + } + state_ = BuilderState::kFinished; break; } @@ -813,6 +889,14 @@ class PageIndexBuilderImpl final : public PageIndexBuilder { } // namespace +void OffsetIndexBuilder::AddPage(const PageLocation& page_location, + const SizeStatistics& size_stats) { + this->AddPage( + page_location.offset, page_location.compressed_page_size, + page_location.first_row_index, + size_stats.is_set() ? size_stats.unencoded_byte_array_data_bytes : std::nullopt); +} + RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup( const RowGroupMetaData& row_group_metadata, const std::vector& columns) { int64_t ci_start = std::numeric_limits::max(); diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h index d45c59cab223f..3083159783ba7 100644 --- a/cpp/src/parquet/page_index.h +++ b/cpp/src/parquet/page_index.h @@ -19,6 +19,7 @@ #include "arrow/io/interfaces.h" #include "parquet/encryption/type_fwd.h" +#include "parquet/type_fwd.h" #include "parquet/types.h" #include @@ -26,9 +27,6 @@ namespace parquet { -class EncodedStatistics; -struct PageIndexLocation; - /// \brief ColumnIndex is a proxy around format::ColumnIndex. class PARQUET_EXPORT ColumnIndex { public: @@ -76,6 +74,18 @@ class PARQUET_EXPORT ColumnIndex { /// \brief A vector of page indices for non-null pages. virtual const std::vector& non_null_page_indices() const = 0; + + /// \brief Whether definition level histogram is available. + virtual bool has_definition_level_histograms() const = 0; + + /// \brief Whether repetition level histogram is available. + virtual bool has_repetition_level_histograms() const = 0; + + /// \brief List of definition level histograms for each page concatenated together. + virtual const std::vector& definition_level_histograms() const = 0; + + /// \brief List of repetition level histograms for each page concatenated together. + virtual const std::vector& repetition_level_histograms() const = 0; }; /// \brief Typed implementation of ColumnIndex. @@ -129,6 +139,10 @@ class PARQUET_EXPORT OffsetIndex { /// \brief A vector of locations for each data page in this column. virtual const std::vector& page_locations() const = 0; + + /// \brief A vector of unencoded/uncompressed size of each page for BYTE_ARRAY types, + /// or empty for other types. + virtual const std::vector& unencoded_byte_array_data_bytes() const = 0; }; /// \brief Interface for reading the page index for a Parquet row group. @@ -266,7 +280,9 @@ class PARQUET_EXPORT ColumnIndexBuilder { /// not update statistics anymore. /// /// \param stats Page statistics in the encoded form. - virtual void AddPage(const EncodedStatistics& stats) = 0; + /// \param size_stats Size statistics of the page if available. + virtual void AddPage(const EncodedStatistics& stats, + const SizeStatistics& size_stats) = 0; /// \brief Complete the column index. /// @@ -299,15 +315,13 @@ class PARQUET_EXPORT OffsetIndexBuilder { virtual ~OffsetIndexBuilder() = default; - /// \brief Add page location of a data page. + /// \brief Add page location and size stats of a data page. virtual void AddPage(int64_t offset, int32_t compressed_page_size, - int64_t first_row_index) = 0; + int64_t first_row_index, + std::optional unencoded_byte_array_length = {}) = 0; - /// \brief Add page location of a data page. - void AddPage(const PageLocation& page_location) { - AddPage(page_location.offset, page_location.compressed_page_size, - page_location.first_row_index); - } + /// \brief Add page location and size stats of a data page. + void AddPage(const PageLocation& page_location, const SizeStatistics& size_stats); /// \brief Complete the offset index. /// diff --git a/cpp/src/parquet/page_index_benchmark.cc b/cpp/src/parquet/page_index_benchmark.cc index 5631034105056..e94fa0365d189 100644 --- a/cpp/src/parquet/page_index_benchmark.cc +++ b/cpp/src/parquet/page_index_benchmark.cc @@ -82,7 +82,7 @@ void BM_ReadColumnIndex(::benchmark::State& state) { GenerateBenchmarkData(values_per_page, /*seed=*/0, values.data(), &heap, kDataStringLength); stats->Update(values.data(), values_per_page, /*null_count=*/0); - builder->AddPage(stats->Encode()); + builder->AddPage(stats->Encode(), /*size_stats=*/{}); } builder->Finish(); diff --git a/cpp/src/parquet/page_index_test.cc b/cpp/src/parquet/page_index_test.cc index 4db49b4267415..916e28f8cea8e 100644 --- a/cpp/src/parquet/page_index_test.cc +++ b/cpp/src/parquet/page_index_test.cc @@ -419,15 +419,20 @@ TEST(PageIndex, DeterminePageIndexRangesInRowGroupWithMissingPageIndex) { -1); } -TEST(PageIndex, WriteOffsetIndex) { +void TestWriteOffsetIndex(bool write_size_stats) { /// Create offset index via the OffsetIndexBuilder interface. auto builder = OffsetIndexBuilder::Make(); const size_t num_pages = 5; const std::vector offsets = {100, 200, 300, 400, 500}; const std::vector page_sizes = {1024, 2048, 3072, 4096, 8192}; const std::vector first_row_indices = {0, 10000, 20000, 30000, 40000}; + const std::vector unencoded_byte_array_lengths = {1111, 2222, 0, 3333, 4444}; for (size_t i = 0; i < num_pages; ++i) { - builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i]); + auto unencoded_byte_array_length = + write_size_stats ? std::make_optional(unencoded_byte_array_lengths[i]) + : std::nullopt; + builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i], + unencoded_byte_array_length); } const int64_t final_position = 4096; builder->Finish(final_position); @@ -446,23 +451,73 @@ TEST(PageIndex, WriteOffsetIndex) { /// Verify the data of the offset index. for (const auto& offset_index : offset_indexes) { ASSERT_EQ(num_pages, offset_index->page_locations().size()); + if (write_size_stats) { + ASSERT_EQ(num_pages, offset_index->unencoded_byte_array_data_bytes().size()); + } else { + ASSERT_TRUE(offset_index->unencoded_byte_array_data_bytes().empty()); + } for (size_t i = 0; i < num_pages; ++i) { const auto& page_location = offset_index->page_locations().at(i); ASSERT_EQ(offsets[i] + final_position, page_location.offset); ASSERT_EQ(page_sizes[i], page_location.compressed_page_size); ASSERT_EQ(first_row_indices[i], page_location.first_row_index); + if (write_size_stats) { + ASSERT_EQ(unencoded_byte_array_lengths[i], + offset_index->unencoded_byte_array_data_bytes()[i]); + } } } } +TEST(PageIndex, WriteOffsetIndexWithoutSizeStats) { + TestWriteOffsetIndex(/*write_size_stats=*/false); +} + +TEST(PageIndex, WriteOffsetIndexWithSizeStats) { + TestWriteOffsetIndex(/*write_size_stats=*/true); +} + +struct PageLevelHistogram { + std::vector def_levels; + std::vector rep_levels; +}; + +std::unique_ptr ConstructFakeSizeStatistics( + const ColumnDescriptor* descr, const PageLevelHistogram& page_level_histogram) { + auto stats = SizeStatistics::Make(descr); + stats->definition_level_histogram = page_level_histogram.def_levels; + stats->repetition_level_histogram = page_level_histogram.rep_levels; + return stats; +} + +void VerifyPageLevelHistogram(size_t page_id, + const std::vector& expected_page_levels, + const std::vector& all_page_levels) { + const size_t max_level = expected_page_levels.size() - 1; + const size_t offset = page_id * (max_level + 1); + for (size_t level = 0; level <= max_level; ++level) { + ASSERT_EQ(expected_page_levels[level], all_page_levels[offset + level]); + } +} + void TestWriteTypedColumnIndex(schema::NodePtr node, const std::vector& page_stats, - BoundaryOrder::type boundary_order, bool has_null_counts) { - auto descr = std::make_unique(node, /*max_definition_level=*/1, 0); - + BoundaryOrder::type boundary_order, bool has_null_counts, + int16_t max_definition_level = 1, + int16_t max_repetition_level = 0, + const std::vector& page_levels = {}) { + const bool build_size_stats = !page_levels.empty(); + if (build_size_stats) { + ASSERT_EQ(page_levels.size(), page_stats.size()); + } + auto descr = std::make_unique(node, max_definition_level, + max_repetition_level); auto builder = ColumnIndexBuilder::Make(descr.get()); - for (const auto& stats : page_stats) { - builder->AddPage(stats); + for (size_t i = 0; i < page_stats.size(); ++i) { + auto size_stats = build_size_stats + ? ConstructFakeSizeStatistics(descr.get(), page_levels[i]) + : std::make_unique(); + builder->AddPage(page_stats[i], *size_stats); } ASSERT_NO_THROW(builder->Finish()); @@ -482,6 +537,13 @@ void TestWriteTypedColumnIndex(schema::NodePtr node, ASSERT_EQ(boundary_order, column_index->boundary_order()); ASSERT_EQ(has_null_counts, column_index->has_null_counts()); const size_t num_pages = column_index->null_pages().size(); + if (build_size_stats) { + ASSERT_EQ(num_pages * (max_repetition_level + 1), + column_index->repetition_level_histograms().size()); + ASSERT_EQ(num_pages * (max_definition_level + 1), + column_index->definition_level_histograms().size()); + } + for (size_t i = 0; i < num_pages; ++i) { ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[i]); ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[i]); @@ -489,6 +551,12 @@ void TestWriteTypedColumnIndex(schema::NodePtr node, if (has_null_counts) { ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[i]); } + if (build_size_stats) { + ASSERT_NO_FATAL_FAILURE(VerifyPageLevelHistogram( + i, page_levels[i].def_levels, column_index->definition_level_histograms())); + ASSERT_NO_FATAL_FAILURE(VerifyPageLevelHistogram( + i, page_levels[i].rep_levels, column_index->repetition_level_histograms())); + } } } } @@ -640,7 +708,7 @@ TEST(PageIndex, WriteColumnIndexWithCorruptedStats) { ColumnDescriptor descr(schema::Int32("c1"), /*max_definition_level=*/1, 0); auto builder = ColumnIndexBuilder::Make(&descr); for (const auto& stats : page_stats) { - builder->AddPage(stats); + builder->AddPage(stats, SizeStatistics()); } ASSERT_NO_THROW(builder->Finish()); ASSERT_EQ(nullptr, builder->Build()); @@ -651,6 +719,31 @@ TEST(PageIndex, WriteColumnIndexWithCorruptedStats) { EXPECT_EQ(0, buffer->size()); } +TEST(PageIndex, WriteInt64ColumnIndexWithSizeStats) { + auto encode = [=](int64_t value) { + return std::string(reinterpret_cast(&value), sizeof(int64_t)); + }; + + // Integer values in the descending order. + std::vector page_stats(3); + page_stats.at(0).set_null_count(4).set_min(encode(-1)).set_max(encode(-2)); + page_stats.at(1).set_null_count(0).set_min(encode(-2)).set_max(encode(-3)); + page_stats.at(2).set_null_count(4).set_min(encode(-3)).set_max(encode(-4)); + + // Page level histograms. + std::vector page_levels; + page_levels.push_back( + PageLevelHistogram{/*def_levels=*/{2, 4, 6, 8}, /*rep_levels=*/{10, 5, 5}}); + page_levels.push_back( + PageLevelHistogram{/*def_levels=*/{1, 3, 5, 7}, /*rep_levels=*/{4, 8, 4}}); + page_levels.push_back( + PageLevelHistogram{/*def_levels=*/{0, 2, 4, 6}, /*rep_levels=*/{3, 4, 5}}); + + TestWriteTypedColumnIndex(schema::Int64("c1"), page_stats, BoundaryOrder::Descending, + /*has_null_counts=*/true, /*max_definition_level=*/3, + /*max_repetition_level=*/2, page_levels); +} + TEST(PageIndex, TestPageIndexBuilderWithZeroRowGroup) { schema::NodeVector fields = {schema::Int32("c1"), schema::ByteArray("c2")}; schema::NodePtr root = schema::GroupNode::Make("schema", Repetition::REPEATED, fields); @@ -689,14 +782,15 @@ class PageIndexBuilderTest : public ::testing::Test { for (int column = 0; column < num_columns; ++column) { if (static_cast(column) < page_stats[row_group].size()) { auto column_index_builder = builder->GetColumnIndexBuilder(column); - ASSERT_NO_THROW(column_index_builder->AddPage(page_stats[row_group][column])); + ASSERT_NO_THROW( + column_index_builder->AddPage(page_stats[row_group][column], {})); ASSERT_NO_THROW(column_index_builder->Finish()); } if (static_cast(column) < page_locations[row_group].size()) { auto offset_index_builder = builder->GetOffsetIndexBuilder(column); - ASSERT_NO_THROW( - offset_index_builder->AddPage(page_locations[row_group][column])); + ASSERT_NO_THROW(offset_index_builder->AddPage(page_locations[row_group][column], + /*size_stats=*/{})); ASSERT_NO_THROW(offset_index_builder->Finish(final_position)); } } diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index a8e4430a03d82..c942010396826 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -47,6 +47,16 @@ namespace parquet { /// DataPageV2 at all. enum class ParquetDataPageVersion { V1, V2 }; +/// Controls the level of size statistics that are written to the file. +enum class SizeStatisticsLevel : uint8_t { + // No size statistics are written. + None = 0, + // Only column chunk size statistics are written. + ColumnChunk, + // Both size statistics in the column chunk and page index are written. + PageAndColumnChunk +}; + /// Align the default buffer size to a small multiple of a page size. constexpr int64_t kDefaultBufferSize = 4096 * 4; @@ -247,7 +257,8 @@ class PARQUET_EXPORT WriterProperties { data_page_version_(ParquetDataPageVersion::V1), created_by_(DEFAULT_CREATED_BY), store_decimal_as_integer_(false), - page_checksum_enabled_(false) {} + page_checksum_enabled_(false), + size_statistics_level_(SizeStatisticsLevel::None) {} explicit Builder(const WriterProperties& properties) : pool_(properties.memory_pool()), @@ -649,6 +660,16 @@ class PARQUET_EXPORT WriterProperties { return this->disable_write_page_index(path->ToDotString()); } + /// \brief Set the level to write size statistics for all columns. Default is None. + /// + /// \param level The level to write size statistics. Note that if page index is not + /// enabled, page level size statistics will not be written even if the level + /// is set to PageAndColumnChunk. + Builder* set_size_statistics_level(SizeStatisticsLevel level) { + size_statistics_level_ = level; + return this; + } + /// \brief Build the WriterProperties with the builder parameters. /// \return The WriterProperties defined by the builder. std::shared_ptr build() { @@ -675,9 +696,9 @@ class PARQUET_EXPORT WriterProperties { return std::shared_ptr(new WriterProperties( pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_, pagesize_, version_, created_by_, page_checksum_enabled_, - std::move(file_encryption_properties_), default_column_properties_, - column_properties, data_page_version_, store_decimal_as_integer_, - std::move(sorting_columns_))); + size_statistics_level_, std::move(file_encryption_properties_), + default_column_properties_, column_properties, data_page_version_, + store_decimal_as_integer_, std::move(sorting_columns_))); } private: @@ -691,6 +712,7 @@ class PARQUET_EXPORT WriterProperties { std::string created_by_; bool store_decimal_as_integer_; bool page_checksum_enabled_; + SizeStatisticsLevel size_statistics_level_; std::shared_ptr file_encryption_properties_; @@ -729,6 +751,10 @@ class PARQUET_EXPORT WriterProperties { inline bool page_checksum_enabled() const { return page_checksum_enabled_; } + inline SizeStatisticsLevel size_statistics_level() const { + return size_statistics_level_; + } + inline Encoding::type dictionary_index_encoding() const { if (parquet_version_ == ParquetVersion::PARQUET_1_0) { return Encoding::PLAIN_DICTIONARY; @@ -822,6 +848,7 @@ class PARQUET_EXPORT WriterProperties { MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size, int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type version, const std::string& created_by, bool page_write_checksum_enabled, + SizeStatisticsLevel size_statistics_level, std::shared_ptr file_encryption_properties, const ColumnProperties& default_column_properties, const std::unordered_map& column_properties, @@ -837,6 +864,7 @@ class PARQUET_EXPORT WriterProperties { parquet_created_by_(created_by), store_decimal_as_integer_(store_short_decimal_as_integer), page_checksum_enabled_(page_write_checksum_enabled), + size_statistics_level_(size_statistics_level), file_encryption_properties_(file_encryption_properties), sorting_columns_(std::move(sorting_columns)), default_column_properties_(default_column_properties), @@ -852,6 +880,7 @@ class PARQUET_EXPORT WriterProperties { std::string parquet_created_by_; bool store_decimal_as_integer_; bool page_checksum_enabled_; + SizeStatisticsLevel size_statistics_level_; std::shared_ptr file_encryption_properties_; diff --git a/cpp/src/parquet/size_statistics.cc b/cpp/src/parquet/size_statistics.cc new file mode 100644 index 0000000000000..a02cef7aba46f --- /dev/null +++ b/cpp/src/parquet/size_statistics.cc @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliancec +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/size_statistics.h" + +#include + +#include "arrow/util/logging.h" +#include "parquet/exception.h" +#include "parquet/schema.h" + +namespace parquet { + +void SizeStatistics::Merge(const SizeStatistics& other) { + if (repetition_level_histogram.size() != other.repetition_level_histogram.size()) { + throw ParquetException("Repetition level histogram size mismatch"); + } + if (definition_level_histogram.size() != other.definition_level_histogram.size()) { + throw ParquetException("Definition level histogram size mismatch"); + } + if (unencoded_byte_array_data_bytes.has_value() != + other.unencoded_byte_array_data_bytes.has_value()) { + throw ParquetException("Unencoded byte array data bytes are not consistent"); + } + std::transform(repetition_level_histogram.begin(), repetition_level_histogram.end(), + other.repetition_level_histogram.begin(), + repetition_level_histogram.begin(), std::plus<>()); + std::transform(definition_level_histogram.begin(), definition_level_histogram.end(), + other.definition_level_histogram.begin(), + definition_level_histogram.begin(), std::plus<>()); + if (unencoded_byte_array_data_bytes.has_value()) { + unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes.value() + + other.unencoded_byte_array_data_bytes.value(); + } +} + +void SizeStatistics::IncrementUnencodedByteArrayDataBytes(int64_t value) { + ARROW_CHECK(unencoded_byte_array_data_bytes.has_value()); + unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes.value() + value; +} + +void SizeStatistics::Validate(const ColumnDescriptor* descr) const { + if (repetition_level_histogram.size() != + static_cast(descr->max_repetition_level() + 1)) { + throw ParquetException("Repetition level histogram size mismatch"); + } + if (definition_level_histogram.size() != + static_cast(descr->max_definition_level() + 1)) { + throw ParquetException("Definition level histogram size mismatch"); + } + if (unencoded_byte_array_data_bytes.has_value() && + descr->physical_type() != Type::BYTE_ARRAY) { + throw ParquetException("Unencoded byte array data bytes does not support " + + TypeToString(descr->physical_type())); + } + if (!unencoded_byte_array_data_bytes.has_value() && + descr->physical_type() == Type::BYTE_ARRAY) { + throw ParquetException("Missing unencoded byte array data bytes"); + } +} + +void SizeStatistics::Reset() { + repetition_level_histogram.assign(repetition_level_histogram.size(), 0); + definition_level_histogram.assign(definition_level_histogram.size(), 0); + if (unencoded_byte_array_data_bytes.has_value()) { + unencoded_byte_array_data_bytes = 0; + } +} + +std::unique_ptr SizeStatistics::Make(const ColumnDescriptor* descr) { + auto size_stats = std::make_unique(); + size_stats->repetition_level_histogram.resize(descr->max_repetition_level() + 1, 0); + size_stats->definition_level_histogram.resize(descr->max_definition_level() + 1, 0); + if (descr->physical_type() == Type::BYTE_ARRAY) { + size_stats->unencoded_byte_array_data_bytes = 0; + } + return size_stats; +} + +} // namespace parquet diff --git a/cpp/src/parquet/size_statistics.h b/cpp/src/parquet/size_statistics.h new file mode 100644 index 0000000000000..c25e70ee36d8a --- /dev/null +++ b/cpp/src/parquet/size_statistics.h @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "parquet/platform.h" +#include "parquet/type_fwd.h" + +namespace parquet { + +/// A structure for capturing metadata for estimating the unencoded, +/// uncompressed size of data written. This is useful for readers to estimate +/// how much memory is needed to reconstruct data in their memory model and for +/// fine-grained filter push down on nested structures (the histograms contained +/// in this structure can help determine the number of nulls at a particular +/// nesting level and maximum length of lists). +struct PARQUET_EXPORT SizeStatistics { + /// When present, there is expected to be one element corresponding to each + /// definition (i.e. size=max definition+1) where each element + /// represents the number of times the definition level was observed in the + /// data. + /// + /// This field may be omitted (a.k.a. zero-length vector) if max_definition_level + /// is 0 without loss of information. + std::vector definition_level_histogram; + + /// Same as definition_level_histogram except for repetition levels. + /// + /// This field may be omitted (a.k.a. zero-length vector) if max_repetition_level + /// is 0 without loss of information. + std::vector repetition_level_histogram; + + /// The number of physical bytes stored for BYTE_ARRAY data values assuming + /// no encoding. This is exclusive of the bytes needed to store the length of + /// each byte array. In other words, this field is equivalent to the `(size + /// of PLAIN-ENCODING the byte array values) - (4 bytes * number of values + /// written)`. To determine unencoded sizes of other types readers can use + /// schema information multiplied by the number of non-null and null values. + /// The number of null/non-null values can be inferred from the histograms + /// below. + /// + /// For example, if a column chunk is dictionary-encoded with dictionary + /// ["a", "bc", "cde"], and a data page contains the indices [0, 0, 1, 2], + /// then this value for that data page should be 7 (1 + 1 + 2 + 3). + /// + /// This field should only be set for types that use BYTE_ARRAY as their + /// physical type. + std::optional unencoded_byte_array_data_bytes; + + /// \brief Check if the SizeStatistics is set. + bool is_set() const { + return !repetition_level_histogram.empty() || !definition_level_histogram.empty() || + unencoded_byte_array_data_bytes.has_value(); + } + + /// \brief Increment the unencoded byte array data bytes. + void IncrementUnencodedByteArrayDataBytes(int64_t value); + + /// \brief Merge two SizeStatistics. + /// \throws ParquetException if SizeStatistics to merge is not compatible. + void Merge(const SizeStatistics& other); + + /// \brief Validate the SizeStatistics + /// \throws ParquetException if the histograms don't have the right length, + /// or if unencoded_byte_array_data_bytes is present for a non-BYTE_ARRAY column. + void Validate(const ColumnDescriptor* descr) const; + + /// \brief Reset the SizeStatistics to be empty. + void Reset(); + + /// \brief Make an empty SizeStatistics object for specific type. + static std::unique_ptr Make(const ColumnDescriptor* descr); +}; + +} // namespace parquet diff --git a/cpp/src/parquet/size_statistics_test.cc b/cpp/src/parquet/size_statistics_test.cc new file mode 100644 index 0000000000000..cefd31dce285d --- /dev/null +++ b/cpp/src/parquet/size_statistics_test.cc @@ -0,0 +1,279 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include +#include + +#include "arrow/buffer.h" +#include "arrow/table.h" +#include "arrow/testing/builder.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/span.h" +#include "parquet/arrow/reader.h" +#include "parquet/arrow/reader_internal.h" +#include "parquet/arrow/schema.h" +#include "parquet/arrow/writer.h" +#include "parquet/column_writer.h" +#include "parquet/file_writer.h" +#include "parquet/page_index.h" +#include "parquet/schema.h" +#include "parquet/size_statistics.h" +#include "parquet/test_util.h" +#include "parquet/thrift_internal.h" +#include "parquet/types.h" + +namespace parquet { + +TEST(SizeStatistics, ThriftSerDe) { + const std::vector kDefLevels = {128, 64, 32, 16}; + const std::vector kRepLevels = {100, 80, 60, 40, 20}; + constexpr int64_t kUnencodedByteArrayDataBytes = 1234; + + for (const auto& descr : + {std::make_unique(schema::Int32("a"), /*max_def_level=*/3, + /*max_rep_level=*/4), + std::make_unique(schema::ByteArray("a"), /*max_def_level=*/3, + /*max_rep_level=*/4)}) { + auto size_statistics = SizeStatistics::Make(descr.get()); + size_statistics->repetition_level_histogram = kRepLevels; + size_statistics->definition_level_histogram = kDefLevels; + if (descr->physical_type() == Type::BYTE_ARRAY) { + size_statistics->IncrementUnencodedByteArrayDataBytes(kUnencodedByteArrayDataBytes); + } + auto thrift_statistics = ToThrift(*size_statistics); + auto restored_statistics = FromThrift(thrift_statistics); + EXPECT_EQ(restored_statistics.definition_level_histogram, kDefLevels); + EXPECT_EQ(restored_statistics.repetition_level_histogram, kRepLevels); + if (descr->physical_type() == Type::BYTE_ARRAY) { + EXPECT_TRUE(restored_statistics.unencoded_byte_array_data_bytes.has_value()); + EXPECT_EQ(restored_statistics.unencoded_byte_array_data_bytes.value(), + kUnencodedByteArrayDataBytes); + } else { + EXPECT_FALSE(restored_statistics.unencoded_byte_array_data_bytes.has_value()); + } + } +} + +bool operator==(const SizeStatistics& lhs, const SizeStatistics& rhs) { + return lhs.repetition_level_histogram == rhs.repetition_level_histogram && + lhs.definition_level_histogram == rhs.definition_level_histogram && + lhs.unencoded_byte_array_data_bytes == rhs.unencoded_byte_array_data_bytes; +} + +struct PageSizeStatistics { + std::vector def_levels; + std::vector rep_levels; + std::vector byte_array_bytes; + bool operator==(const PageSizeStatistics& other) const { + return def_levels == other.def_levels && rep_levels == other.rep_levels && + byte_array_bytes == other.byte_array_bytes; + } +}; + +class SizeStatisticsRoundTripTest : public ::testing::Test { + public: + void WriteFile(SizeStatisticsLevel level, + const std::shared_ptr<::arrow::Table>& table) { + auto writer_properties = WriterProperties::Builder() + .max_row_group_length(2) /* every row group has 2 rows */ + ->data_pagesize(1) /* every page has 1 row */ + ->enable_write_page_index() + ->enable_statistics() + ->set_size_statistics_level(level) + ->build(); + + // Get schema from table. + auto schema = table->schema(); + std::shared_ptr parquet_schema; + auto arrow_writer_properties = default_arrow_writer_properties(); + ASSERT_OK_NO_THROW(arrow::ToParquetSchema(schema.get(), *writer_properties, + *arrow_writer_properties, &parquet_schema)); + auto schema_node = + std::static_pointer_cast(parquet_schema->schema_root()); + + // Write table to buffer. + auto sink = CreateOutputStream(); + auto pool = ::arrow::default_memory_pool(); + auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties); + std::unique_ptr arrow_writer; + ASSERT_OK(arrow::FileWriter::Make(pool, std::move(writer), schema, + arrow_writer_properties, &arrow_writer)); + ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table)); + ASSERT_OK_NO_THROW(arrow_writer->Close()); + ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish()); + } + + void ReadSizeStatistics() { + auto read_properties = default_arrow_reader_properties(); + auto reader = + ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer_)); + + // Read row group size statistics in order. + auto metadata = reader->metadata(); + for (int i = 0; i < metadata->num_row_groups(); ++i) { + auto row_group_metadata = metadata->RowGroup(i); + for (int j = 0; j < metadata->num_columns(); ++j) { + auto column_metadata = row_group_metadata->ColumnChunk(j); + auto size_stats = column_metadata->size_statistics(); + row_group_stats_.push_back(size_stats ? *size_stats : SizeStatistics{}); + } + } + + // Read page size statistics in order. + auto page_index_reader = reader->GetPageIndexReader(); + ASSERT_NE(page_index_reader, nullptr); + + for (int i = 0; i < metadata->num_row_groups(); ++i) { + auto row_group_index_reader = page_index_reader->RowGroup(i); + ASSERT_NE(row_group_index_reader, nullptr); + + for (int j = 0; j < metadata->num_columns(); ++j) { + PageSizeStatistics page_stats; + + auto column_index = row_group_index_reader->GetColumnIndex(j); + if (column_index != nullptr) { + if (column_index->has_definition_level_histograms()) { + page_stats.def_levels = column_index->definition_level_histograms(); + } + if (column_index->has_repetition_level_histograms()) { + page_stats.rep_levels = column_index->repetition_level_histograms(); + } + } + + auto offset_index = row_group_index_reader->GetOffsetIndex(j); + if (offset_index != nullptr) { + page_stats.byte_array_bytes = offset_index->unencoded_byte_array_data_bytes(); + } + + page_stats_.emplace_back(std::move(page_stats)); + } + } + } + + void Reset() { + buffer_.reset(); + row_group_stats_.clear(); + page_stats_.clear(); + } + + protected: + std::shared_ptr buffer_; + std::vector row_group_stats_; + std::vector page_stats_; + inline static const SizeStatistics kEmptyRowGroupStats{}; + inline static const PageSizeStatistics kEmptyPageStats{}; +}; + +TEST_F(SizeStatisticsRoundTripTest, EnableSizeStats) { + auto schema = ::arrow::schema({ + ::arrow::field("a", ::arrow::list(::arrow::list(::arrow::int32()))), + ::arrow::field("b", ::arrow::list(::arrow::list(::arrow::utf8()))), + }); + // First two rows are in one row group, and the other two rows are in another row group. + auto table = ::arrow::TableFromJSON(schema, {R"([ + [ [[1],[1,1],[1,1,1]], [["a"],["a","a"],["a","a","a"]] ], + [ [[0,1,null]], [["foo","bar",null]] ], + [ [], [] ], + [ [[],[null],null], [[],[null],null] ] + ])"}); + + for (auto size_stats_level : + {SizeStatisticsLevel::None, SizeStatisticsLevel::ColumnChunk, + SizeStatisticsLevel::PageAndColumnChunk}) { + WriteFile(size_stats_level, table); + ReadSizeStatistics(); + + if (size_stats_level == SizeStatisticsLevel::None) { + EXPECT_THAT(row_group_stats_, + ::testing::ElementsAre(kEmptyRowGroupStats, kEmptyRowGroupStats, + kEmptyRowGroupStats, kEmptyRowGroupStats)); + } else { + EXPECT_THAT(row_group_stats_, ::testing::ElementsAre( + SizeStatistics{/*def_levels=*/{0, 0, 0, 0, 1, 8}, + /*rep_levels=*/{2, 2, 5}, + /*byte_array_bytes=*/std::nullopt}, + SizeStatistics{/*def_levels=*/{0, 0, 0, 0, 1, 8}, + /*rep_levels=*/{2, 2, 5}, + /*byte_array_bytes=*/12}, + SizeStatistics{/*def_levels=*/{0, 1, 1, 1, 1, 0}, + /*rep_levels=*/{2, 2, 0}, + /*byte_array_bytes=*/std::nullopt}, + SizeStatistics{/*def_levels=*/{0, 1, 1, 1, 1, 0}, + /*rep_levels=*/{2, 2, 0}, + /*byte_array_bytes=*/0})); + } + + if (size_stats_level == SizeStatisticsLevel::PageAndColumnChunk) { + EXPECT_THAT( + page_stats_, + ::testing::ElementsAre( + PageSizeStatistics{/*def_levels=*/{0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 1, 2}, + /*rep_levels=*/{1, 2, 3, 1, 0, 2}, + /*byte_array_bytes=*/{}}, + PageSizeStatistics{/*def_levels=*/{0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 1, 2}, + /*rep_levels=*/{1, 2, 3, 1, 0, 2}, + /*byte_array_bytes=*/{6, 6}}, + PageSizeStatistics{/*def_levels=*/{0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0}, + /*rep_levels=*/{1, 0, 0, 1, 2, 0}, + /*byte_array_bytes=*/{}}, + PageSizeStatistics{/*def_levels=*/{0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0}, + /*rep_levels=*/{1, 0, 0, 1, 2, 0}, + /*byte_array_bytes=*/{0, 0}})); + } else { + EXPECT_THAT(page_stats_, ::testing::ElementsAre(kEmptyPageStats, kEmptyPageStats, + kEmptyPageStats, kEmptyPageStats)); + } + + Reset(); + } +} + +TEST_F(SizeStatisticsRoundTripTest, WriteDictionaryArray) { + auto schema = ::arrow::schema( + {::arrow::field("a", ::arrow::dictionary(::arrow::int16(), ::arrow::utf8()))}); + WriteFile( + SizeStatisticsLevel::PageAndColumnChunk, + ::arrow::TableFromJSON(schema, {R"([["aa"],["aaa"],[null],["a"],["aaa"],["a"]])"})); + + ReadSizeStatistics(); + EXPECT_THAT(row_group_stats_, + ::testing::ElementsAre(SizeStatistics{/*def_levels=*/{0, 2}, + /*rep_levels=*/{2}, + /*byte_array_bytes=*/5}, + SizeStatistics{/*def_levels=*/{1, 1}, + /*rep_levels=*/{2}, + /*byte_array_bytes=*/1}, + SizeStatistics{/*def_levels=*/{0, 2}, + /*rep_levels=*/{2}, + /*byte_array_bytes=*/4})); + EXPECT_THAT(page_stats_, + ::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{0, 2}, + /*rep_levels=*/{2}, + /*byte_array_bytes=*/{5}}, + PageSizeStatistics{/*def_levels=*/{1, 1}, + /*rep_levels=*/{2}, + /*byte_array_bytes=*/{1}}, + PageSizeStatistics{/*def_levels=*/{0, 2}, + /*rep_levels=*/{2}, + /*byte_array_bytes=*/{4}})); +} + +} // namespace parquet diff --git a/cpp/src/parquet/thrift_internal.h b/cpp/src/parquet/thrift_internal.h index e7bfd434c81a8..744af743118e2 100644 --- a/cpp/src/parquet/thrift_internal.h +++ b/cpp/src/parquet/thrift_internal.h @@ -43,6 +43,7 @@ #include "parquet/exception.h" #include "parquet/platform.h" #include "parquet/properties.h" +#include "parquet/size_statistics.h" #include "parquet/statistics.h" #include "parquet/types.h" @@ -254,6 +255,14 @@ static inline SortingColumn FromThrift(format::SortingColumn thrift_sorting_colu return sorting_column; } +static inline SizeStatistics FromThrift(const format::SizeStatistics& size_stats) { + return SizeStatistics{ + size_stats.definition_level_histogram, size_stats.repetition_level_histogram, + size_stats.__isset.unencoded_byte_array_data_bytes + ? std::make_optional(size_stats.unencoded_byte_array_data_bytes) + : std::nullopt}; +} + // ---------------------------------------------------------------------- // Convert Thrift enums from Parquet enums @@ -383,6 +392,17 @@ static inline format::EncryptionAlgorithm ToThrift(EncryptionAlgorithm encryptio return encryption_algorithm; } +static inline format::SizeStatistics ToThrift(const SizeStatistics& size_stats) { + format::SizeStatistics size_statistics; + size_statistics.__set_definition_level_histogram(size_stats.definition_level_histogram); + size_statistics.__set_repetition_level_histogram(size_stats.repetition_level_histogram); + if (size_stats.unencoded_byte_array_data_bytes.has_value()) { + size_statistics.__set_unencoded_byte_array_data_bytes( + size_stats.unencoded_byte_array_data_bytes.value()); + } + return size_statistics; +} + // ---------------------------------------------------------------------- // Thrift struct serialization / deserialization utilities diff --git a/cpp/src/parquet/type_fwd.h b/cpp/src/parquet/type_fwd.h index da0d0f7bdee96..cda0dc5a77e1f 100644 --- a/cpp/src/parquet/type_fwd.h +++ b/cpp/src/parquet/type_fwd.h @@ -68,7 +68,10 @@ struct ParquetVersion { }; }; +struct PageIndexLocation; + class FileMetaData; +class FileCryptoMetaData; class RowGroupMetaData; class ColumnDescriptor; @@ -82,10 +85,22 @@ class WriterPropertiesBuilder; class ArrowWriterProperties; class ArrowWriterPropertiesBuilder; +class EncodedStatistics; +class Statistics; +struct SizeStatistics; + +class ColumnIndex; +class OffsetIndex; + namespace arrow { class FileWriter; class FileReader; } // namespace arrow + +namespace schema { +class ColumnPath; +} // namespace schema + } // namespace parquet