From 08cd4ced63f99c95873fee75ce492095d9abb229 Mon Sep 17 00:00:00 2001 From: mwish Date: Fri, 6 Oct 2023 20:21:44 +0800 Subject: [PATCH 1/3] Fix Overlap column chunk ranges for pre-buffer --- cpp/src/parquet/file_reader.cc | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index 5247b9d4b543d..f9c6c065284d8 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -139,8 +139,10 @@ const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->met ::arrow::io::ReadRange ComputeColumnChunkRange(FileMetaData* file_metadata, int64_t source_size, int row_group_index, int column_index) { - auto row_group_metadata = file_metadata->RowGroup(row_group_index); - auto column_metadata = row_group_metadata->ColumnChunk(column_index); + std::unique_ptr row_group_metadata = + file_metadata->RowGroup(row_group_index); + std::unique_ptr column_metadata = + row_group_metadata->ColumnChunk(column_index); int64_t col_start = column_metadata->data_page_offset(); if (column_metadata->has_dictionary_page() && @@ -365,6 +367,20 @@ class SerializedFile : public ParquetFileReader::Contents { ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, col)); } } +#ifndef NDEBUG + if (!ranges.empty()) { + auto copied_ranges = ranges; + std::sort(copied_ranges.begin(), copied_ranges.end(), + [](const ::arrow::io::ReadRange& a, const ::arrow::io::ReadRange& b) { + return a.offset < b.offset; + }); + for (size_t i = 1; i < copied_ranges.size(); ++i) { + if (ranges[i].offset < ranges[i - 1].offset + ranges[i - 1].length) { + throw ParquetException("Overlapping column chunk ranges for prebuffer"); + } + } + } +#endif PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges)); } From 9bc42b67ab39fc9817deabef4d4a343c5598771f Mon Sep 17 00:00:00 2001 From: mwish Date: Fri, 6 Oct 2023 20:56:44 +0800 Subject: [PATCH 2/3] tiny fixing --- cpp/src/parquet/file_reader.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index f9c6c065284d8..2a18da98527cb 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -375,8 +375,9 @@ class SerializedFile : public ParquetFileReader::Contents { return a.offset < b.offset; }); for (size_t i = 1; i < copied_ranges.size(); ++i) { - if (ranges[i].offset < ranges[i - 1].offset + ranges[i - 1].length) { - throw ParquetException("Overlapping column chunk ranges for prebuffer"); + if (copied_ranges[i].offset < + copied_ranges[i - 1].offset + copied_ranges[i - 1].length) { + throw ParquetException("Overlapping column chunk ranges for pre-buffer"); } } } From f36532d058f5b3f29adf263935a4394e6c77f952 Mon Sep 17 00:00:00 2001 From: mwish Date: Fri, 6 Oct 2023 22:48:03 +0800 Subject: [PATCH 3/3] resolve comment to use Result in CoalesceReadRanges --- cpp/src/arrow/io/caching.cc | 5 +++-- cpp/src/arrow/io/interfaces.cc | 12 +++++++----- cpp/src/arrow/io/util_internal.h | 6 +++--- cpp/src/parquet/file_reader.cc | 15 --------------- 4 files changed, 13 insertions(+), 25 deletions(-) diff --git a/cpp/src/arrow/io/caching.cc b/cpp/src/arrow/io/caching.cc index 307b933d16ee3..bd61c40693a27 100644 --- a/cpp/src/arrow/io/caching.cc +++ b/cpp/src/arrow/io/caching.cc @@ -174,8 +174,9 @@ struct ReadRangeCache::Impl { // Add the given ranges to the cache, coalescing them where possible virtual Status Cache(std::vector ranges) { - ranges = internal::CoalesceReadRanges(std::move(ranges), options.hole_size_limit, - options.range_size_limit); + ARROW_ASSIGN_OR_RAISE( + ranges, internal::CoalesceReadRanges(std::move(ranges), options.hole_size_limit, + options.range_size_limit)); std::vector new_entries = MakeCacheEntries(ranges); // Add new entries, themselves ordered by offset if (entries.size() > 0) { diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc index d3229fd067cbe..1d35549cc4345 100644 --- a/cpp/src/arrow/io/interfaces.cc +++ b/cpp/src/arrow/io/interfaces.cc @@ -425,7 +425,7 @@ ThreadPool* GetIOThreadPool() { namespace { struct ReadRangeCombiner { - std::vector Coalesce(std::vector ranges) { + Result> Coalesce(std::vector ranges) { if (ranges.empty()) { return ranges; } @@ -454,7 +454,9 @@ struct ReadRangeCombiner { const auto& left = ranges[i]; const auto& right = ranges[i + 1]; DCHECK_LE(left.offset, right.offset); - DCHECK_LE(left.offset + left.length, right.offset) << "Some read ranges overlap"; + if (left.offset + left.length > right.offset) { + return Status::IOError("Some read ranges overlap"); + } } #endif @@ -509,9 +511,9 @@ struct ReadRangeCombiner { }; // namespace -std::vector CoalesceReadRanges(std::vector ranges, - int64_t hole_size_limit, - int64_t range_size_limit) { +Result> CoalesceReadRanges(std::vector ranges, + int64_t hole_size_limit, + int64_t range_size_limit) { DCHECK_GT(range_size_limit, hole_size_limit); ReadRangeCombiner combiner{hole_size_limit, range_size_limit}; diff --git a/cpp/src/arrow/io/util_internal.h b/cpp/src/arrow/io/util_internal.h index b1d75d1d0bd1f..2015f6a211292 100644 --- a/cpp/src/arrow/io/util_internal.h +++ b/cpp/src/arrow/io/util_internal.h @@ -45,9 +45,9 @@ ARROW_EXPORT Status ValidateWriteRange(int64_t offset, int64_t size, int64_t fil ARROW_EXPORT Status ValidateRange(int64_t offset, int64_t size); ARROW_EXPORT -std::vector CoalesceReadRanges(std::vector ranges, - int64_t hole_size_limit, - int64_t range_size_limit); +Result> CoalesceReadRanges(std::vector ranges, + int64_t hole_size_limit, + int64_t range_size_limit); ARROW_EXPORT ::arrow::internal::ThreadPool* GetIOThreadPool(); diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index 2a18da98527cb..edf5b5ed7570b 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -367,21 +367,6 @@ class SerializedFile : public ParquetFileReader::Contents { ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, col)); } } -#ifndef NDEBUG - if (!ranges.empty()) { - auto copied_ranges = ranges; - std::sort(copied_ranges.begin(), copied_ranges.end(), - [](const ::arrow::io::ReadRange& a, const ::arrow::io::ReadRange& b) { - return a.offset < b.offset; - }); - for (size_t i = 1; i < copied_ranges.size(); ++i) { - if (copied_ranges[i].offset < - copied_ranges[i - 1].offset + copied_ranges[i - 1].length) { - throw ParquetException("Overlapping column chunk ranges for pre-buffer"); - } - } - } -#endif PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges)); }