Skip to content

Commit

Permalink
apacheGH-40592: [C++][Parquet] Implement SizeStatistics
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac committed Jul 9, 2024
1 parent d25ec6a commit 3dac2b1
Show file tree
Hide file tree
Showing 12 changed files with 825 additions and 48 deletions.
2 changes: 2 additions & 0 deletions cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ set(PARQUET_SRCS
printer.cc
properties.cc
schema.cc
size_statistics.cc
statistics.cc
stream_reader.cc
stream_writer.cc
Expand Down Expand Up @@ -376,6 +377,7 @@ add_parquet_test(internals-test
metadata_test.cc
page_index_test.cc
public_api_test.cc
size_statistics_test.cc
types_test.cc)

set_source_files_properties(public_api_test.cc PROPERTIES SKIP_PRECOMPILE_HEADERS ON
Expand Down
24 changes: 17 additions & 7 deletions cpp/src/parquet/column_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <optional>
#include <string>

#include "parquet/size_statistics.h"
#include "parquet/statistics.h"
#include "parquet/types.h"

Expand Down Expand Up @@ -69,27 +70,32 @@ class DataPage : public Page {
/// Currently it is only present from data pages created by ColumnWriter in order
/// to collect page index.
std::optional<int64_t> first_row_index() const { return first_row_index_; }
const std::shared_ptr<SizeStatistics>& size_statistics() const {
return size_statistics_;
}

virtual ~DataPage() = default;

protected:
DataPage(PageType::type type, const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, int64_t uncompressed_size,
EncodedStatistics statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt)
EncodedStatistics statistics, std::optional<int64_t> first_row_index,
std::shared_ptr<SizeStatistics> size_statistics)
: Page(buffer, type),
num_values_(num_values),
encoding_(encoding),
uncompressed_size_(uncompressed_size),
statistics_(std::move(statistics)),
first_row_index_(std::move(first_row_index)) {}
first_row_index_(std::move(first_row_index)),
size_statistics_(std::move(size_statistics)) {}

int32_t num_values_;
Encoding::type encoding_;
int64_t uncompressed_size_;
EncodedStatistics statistics_;
/// Row ordinal within the row group to the first row in the data page.
std::optional<int64_t> first_row_index_;
std::shared_ptr<SizeStatistics> size_statistics_;
};

class DataPageV1 : public DataPage {
Expand All @@ -98,9 +104,11 @@ class DataPageV1 : public DataPage {
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding, int64_t uncompressed_size,
EncodedStatistics statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt)
std::optional<int64_t> first_row_index = std::nullopt,
std::shared_ptr<SizeStatistics> size_statistics = NULLPTR)
: DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, uncompressed_size,
std::move(statistics), std::move(first_row_index)),
std::move(statistics), std::move(first_row_index),
std::move(size_statistics)),
definition_level_encoding_(definition_level_encoding),
repetition_level_encoding_(repetition_level_encoding) {}

Expand All @@ -120,9 +128,11 @@ class DataPageV2 : public DataPage {
int32_t definition_levels_byte_length, int32_t repetition_levels_byte_length,
int64_t uncompressed_size, bool is_compressed = false,
EncodedStatistics statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt)
std::optional<int64_t> first_row_index = std::nullopt,
std::shared_ptr<SizeStatistics> size_statistics = NULLPTR)
: DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding, uncompressed_size,
std::move(statistics), std::move(first_row_index)),
std::move(statistics), std::move(first_row_index),
std::move(size_statistics)),
num_nulls_(num_nulls),
num_rows_(num_rows),
definition_levels_byte_length_(definition_levels_byte_length),
Expand Down
134 changes: 109 additions & 25 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "parquet/platform.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
#include "parquet/size_statistics.h"
#include "parquet/statistics.h"
#include "parquet/thrift_internal.h"
#include "parquet/types.h"
Expand Down Expand Up @@ -434,10 +435,11 @@ class SerializedPageWriter : public PageWriter {
const int64_t header_size =
thrift_serializer_->Serialize(&page_header, sink_.get(), meta_encryptor_.get());
PARQUET_THROW_NOT_OK(sink_->Write(output_data_buffer, output_data_len));
const auto& page_size_stats = page.size_statistics();

/// Collect page index
if (column_index_builder_ != nullptr) {
column_index_builder_->AddPage(page.statistics());
column_index_builder_->AddPage(page.statistics(), page_size_stats.get());
}
if (offset_index_builder_ != nullptr) {
const int64_t compressed_size = output_data_len + header_size;
Expand All @@ -451,8 +453,10 @@ class SerializedPageWriter : public PageWriter {
/// start_pos is a relative offset in the buffered mode. It should be
/// adjusted via OffsetIndexBuilder::Finish() after BufferedPageWriter
/// has flushed all data pages.
offset_index_builder_->AddPage(start_pos, static_cast<int32_t>(compressed_size),
*page.first_row_index());
offset_index_builder_->AddPage(
start_pos, static_cast<int32_t>(compressed_size), *page.first_row_index(),
page_size_stats ? page_size_stats->unencoded_byte_array_data_bytes()
: std::nullopt);
}

total_uncompressed_size_ += uncompressed_size + header_size;
Expand Down Expand Up @@ -789,11 +793,13 @@ class ColumnWriterImpl {
// Serializes Dictionary Page if enabled
virtual void WriteDictionaryPage() = 0;

using StatisticsPair = std::pair<EncodedStatistics, std::shared_ptr<SizeStatistics>>;

// Plain-encoded statistics of the current page
virtual EncodedStatistics GetPageStatistics() = 0;
virtual StatisticsPair GetPageStatistics() = 0;

// Plain-encoded statistics of the whole chunk
virtual EncodedStatistics GetChunkStatistics() = 0;
virtual StatisticsPair GetChunkStatistics() = 0;

// Merges page statistics into chunk statistics, then resets the values
virtual void ResetPageStatistics() = 0;
Expand Down Expand Up @@ -994,7 +1000,9 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size,
ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size, values,
uncompressed_data_->mutable_data());

EncodedStatistics page_stats = GetPageStatistics();
EncodedStatistics page_stats;
std::shared_ptr<SizeStatistics> page_size_stats;
std::tie(page_stats, page_size_stats) = GetPageStatistics();
page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
ResetPageStatistics();
Expand All @@ -1018,13 +1026,15 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size,
compressed_data->CopySlice(0, compressed_data->size(), allocator_));
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV1>(
compressed_data_copy, num_values, encoding_, Encoding::RLE, Encoding::RLE,
uncompressed_size, std::move(page_stats), first_row_index);
uncompressed_size, std::move(page_stats), first_row_index,
std::move(page_size_stats));
total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);

data_pages_.push_back(std::move(page_ptr));
} else { // Eagerly write pages
DataPageV1 page(compressed_data, num_values, encoding_, Encoding::RLE, Encoding::RLE,
uncompressed_size, std::move(page_stats), first_row_index);
uncompressed_size, std::move(page_stats), first_row_index,
std::move(page_size_stats));
WriteDataPage(page);
}
}
Expand All @@ -1051,7 +1061,9 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size,
ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size,
compressed_values, combined->mutable_data());

EncodedStatistics page_stats = GetPageStatistics();
EncodedStatistics page_stats;
std::shared_ptr<SizeStatistics> page_size_stats;
std::tie(page_stats, page_size_stats) = GetPageStatistics();
page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
ResetPageStatistics();
Expand All @@ -1075,13 +1087,14 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size,
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV2>(
combined, num_values, null_count, num_rows, encoding_, def_levels_byte_length,
rep_levels_byte_length, uncompressed_size, pager_->has_compressor(), page_stats,
first_row_index);
first_row_index, std::move(page_size_stats));
total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
data_pages_.push_back(std::move(page_ptr));
} else {
DataPageV2 page(combined, num_values, null_count, num_rows, encoding_,
def_levels_byte_length, rep_levels_byte_length, uncompressed_size,
pager_->has_compressor(), page_stats, first_row_index);
pager_->has_compressor(), page_stats, first_row_index,
std::move(page_size_stats));
WriteDataPage(page);
}
}
Expand All @@ -1095,7 +1108,9 @@ int64_t ColumnWriterImpl::Close() {

FlushBufferedDataPages();

EncodedStatistics chunk_statistics = GetChunkStatistics();
EncodedStatistics chunk_statistics;
std::shared_ptr<SizeStatistics> chunk_size_stats;
std::tie(chunk_statistics, chunk_size_stats) = GetChunkStatistics();
chunk_statistics.ApplyStatSizeLimits(
properties_->max_statistics_size(descr_->path()));
chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
Expand All @@ -1104,6 +1119,9 @@ int64_t ColumnWriterImpl::Close() {
if (rows_written_ > 0 && chunk_statistics.is_set()) {
metadata_->SetStatistics(chunk_statistics);
}
if (rows_written_ > 0 && chunk_size_stats) {
metadata_->SetSizeStatistics(*chunk_size_stats);
}
pager_->Close(has_dictionary_, fallback_);
}

Expand Down Expand Up @@ -1231,6 +1249,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
pages_change_on_record_boundaries_ =
properties->data_page_version() == ParquetDataPageVersion::V2 ||
properties->page_index_enabled(descr_->path());

if (properties->size_statistics_level() != SizeStatisticsLevel::NONE) {
page_size_stats_builder_ = SizeStatisticsBuilder::Make(descr_);
chunk_size_stats_ = page_size_stats_builder_->Build();
}
}

int64_t Close() override { return ColumnWriterImpl::Close(); }
Expand Down Expand Up @@ -1362,15 +1385,19 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
total_bytes_written_ += pager_->WriteDictionaryPage(page);
}

EncodedStatistics GetPageStatistics() override {
EncodedStatistics result;
if (page_statistics_) result = page_statistics_->Encode();
StatisticsPair GetPageStatistics() override {
StatisticsPair result;
if (page_statistics_) result.first = page_statistics_->Encode();
if (properties_->size_statistics_level() == SizeStatisticsLevel::PAGE) {
result.second = page_size_stats_builder_->Build();
}
return result;
}

EncodedStatistics GetChunkStatistics() override {
EncodedStatistics result;
if (chunk_statistics_) result = chunk_statistics_->Encode();
StatisticsPair GetChunkStatistics() override {
StatisticsPair result;
if (chunk_statistics_) result.first = chunk_statistics_->Encode();
if (chunk_size_stats_) result.second = chunk_size_stats_;
return result;
}

Expand All @@ -1379,6 +1406,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
chunk_statistics_->Merge(*page_statistics_);
page_statistics_->Reset();
}
if (page_size_stats_builder_ != nullptr) {
auto page_size_stats = page_size_stats_builder_->Build();
chunk_size_stats_->Merge(*page_size_stats);
page_size_stats_builder_->Reset();
}
}

Type::type type() const override { return descr_->physical_type(); }
Expand Down Expand Up @@ -1421,6 +1453,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
// which case we call back to the dense write path)
std::shared_ptr<::arrow::Array> preserved_dictionary_;

// Utility to collect and store SizeStatistics of page and chunk.
std::unique_ptr<SizeStatisticsBuilder> page_size_stats_builder_;
std::shared_ptr<SizeStatistics> chunk_size_stats_;

int64_t WriteLevels(int64_t num_values, const int16_t* def_levels,
const int16_t* rep_levels) {
int64_t values_to_write = 0;
Expand Down Expand Up @@ -1455,6 +1491,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
rows_written_ += num_values;
num_buffered_rows_ += num_values;
}

CollectLevelHistogram(num_values, def_levels, rep_levels);
return values_to_write;
}

Expand Down Expand Up @@ -1546,6 +1584,27 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
rows_written_ += num_levels;
num_buffered_rows_ += num_levels;
}

CollectLevelHistogram(num_levels, def_levels, rep_levels);
}

void CollectLevelHistogram(int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels) {
if (page_size_stats_builder_ == nullptr) {
return;
}

if (descr_->max_definition_level() > 0) {
page_size_stats_builder_->WriteDefinitionLevels(num_levels, def_levels);
} else {
page_size_stats_builder_->WriteDefinitionLevel(num_levels, /*def_level=*/0);
}

if (descr_->max_repetition_level() > 0) {
page_size_stats_builder_->WriteRepetitionLevels(num_levels, rep_levels);
} else {
page_size_stats_builder_->WriteRepetitionLevel(num_levels, /*rep_level=*/0);
}
}

void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values,
Expand Down Expand Up @@ -1599,6 +1658,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
if (page_statistics_ != nullptr) {
page_statistics_->Update(values, num_values, num_nulls);
}
if constexpr (std::is_same_v<T, ByteArray>) {
if (page_size_stats_builder_ != nullptr) {
page_size_stats_builder_->WriteValues(values, num_values);
}
}
}

/// \brief Write values with spaces and update page statistics accordingly.
Expand Down Expand Up @@ -1627,6 +1691,12 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset,
num_spaced_values, num_values, num_nulls);
}
if constexpr (std::is_same_v<T, ByteArray>) {
if (page_size_stats_builder_ != nullptr) {
page_size_stats_builder_->WriteValuesSpaced(values, valid_bits, valid_bits_offset,
num_spaced_values);
}
}
}
};

Expand Down Expand Up @@ -1685,8 +1755,14 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
exec_ctx.set_use_threads(false);

std::shared_ptr<::arrow::Array> referenced_dictionary;
PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices,
::arrow::compute::Unique(*chunk_indices, &exec_ctx));
::arrow::Datum referenced_indices;
if (page_size_stats_builder_) {
// SizeStatistics need to compute total bytes, so we cannot extract unique values.
referenced_indices = *chunk_indices;
} else {
PARQUET_ASSIGN_OR_THROW(referenced_indices,
::arrow::compute::Unique(*chunk_indices, &exec_ctx));
}

// On first run, we might be able to re-use the existing dictionary
if (referenced_indices.length() == dictionary->length()) {
Expand All @@ -1700,10 +1776,15 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
referenced_dictionary = referenced_dictionary_datum.make_array();
}

int64_t non_null_count = chunk_indices->length() - chunk_indices->null_count();
page_statistics_->IncrementNullCount(num_chunk_levels - non_null_count);
page_statistics_->IncrementNumValues(non_null_count);
page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false);
if (page_statistics_) {
int64_t non_null_count = chunk_indices->length() - chunk_indices->null_count();
page_statistics_->IncrementNullCount(num_chunk_levels - non_null_count);
page_statistics_->IncrementNumValues(non_null_count);
page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false);
}
if (page_size_stats_builder_) {
page_size_stats_builder_->WriteValues(*referenced_dictionary);
}
};

int64_t value_offset = 0;
Expand All @@ -1720,7 +1801,7 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
AddIfNotNull(rep_levels, offset));
std::shared_ptr<Array> writeable_indices =
indices->Slice(value_offset, batch_num_spaced_values);
if (page_statistics_) {
if (page_statistics_ || page_size_stats_builder_) {
update_stats(/*num_chunk_levels=*/batch_size, writeable_indices);
}
PARQUET_ASSIGN_OR_THROW(
Expand Down Expand Up @@ -2207,6 +2288,9 @@ Status TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
page_statistics_->IncrementNullCount(batch_size - non_null);
page_statistics_->IncrementNumValues(non_null);
}
if (page_size_stats_builder_ != nullptr) {
page_size_stats_builder_->WriteValues(*data_slice);
}
CommitWriteAndCheckPageLimit(batch_size, batch_num_values, batch_size - non_null,
check_page);
CheckDictionarySizeLimit();
Expand Down
Loading

0 comments on commit 3dac2b1

Please sign in to comment.