From 6a73d82b2144bba58ef1ab5ef40f4eacd28f67b4 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 23 Nov 2023 20:56:11 +0800 Subject: [PATCH] Add basic implement for read BloomFilter with length --- cpp/src/parquet/bloom_filter.cc | 20 ++++++++++++------ cpp/src/parquet/bloom_filter.h | 3 ++- cpp/src/parquet/bloom_filter_reader.cc | 9 ++++++++- cpp/src/parquet/metadata.cc | 11 ++++++++++ cpp/src/parquet/metadata.h | 1 + cpp/src/parquet/printer.cc | 28 ++++++++++++++++++++++++++ 6 files changed, 64 insertions(+), 8 deletions(-) diff --git a/cpp/src/parquet/bloom_filter.cc b/cpp/src/parquet/bloom_filter.cc index 427e73b9e6428..fb543ea086455 100644 --- a/cpp/src/parquet/bloom_filter.cc +++ b/cpp/src/parquet/bloom_filter.cc @@ -105,16 +105,24 @@ static ::arrow::Status ValidateBloomFilterHeader( } BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize( - const ReaderProperties& properties, ArrowInputStream* input) { - // NOTE: we don't know the bloom filter header size upfront, and we can't rely on - // InputStream::Peek() which isn't always implemented. Therefore, we must first - // Read() with an upper bound estimate of the header size, then once we know - // the bloom filter data size, we can Read() the exact number of remaining data bytes. + const ReaderProperties& properties, ArrowInputStream* input, + std::optional bloom_filter_length) { ThriftDeserializer deserializer(properties); format::BloomFilterHeader header; + int64_t bloom_filter_header_read_size = 0; + if (bloom_filter_length.has_value()) { + bloom_filter_header_read_size = bloom_filter_length.value(); + } else { + // NOTE: we don't know the bloom filter header size upfront without + // bloom_filter_length, and we can't rely on InputStream::Peek() which isn't always + // implemented. Therefore, we must first Read() with an upper bound estimate of the + // header size, then once we know the bloom filter data size, we can Read() the exact + // number of remaining data bytes. + bloom_filter_header_read_size = kBloomFilterHeaderSizeGuess; + } // Read and deserialize bloom filter header - PARQUET_ASSIGN_OR_THROW(auto header_buf, input->Read(kBloomFilterHeaderSizeGuess)); + PARQUET_ASSIGN_OR_THROW(auto header_buf, input->Read(bloom_filter_header_read_size)); // This gets used, then set by DeserializeThriftMsg uint32_t header_size = static_cast(header_buf->size()); try { diff --git a/cpp/src/parquet/bloom_filter.h b/cpp/src/parquet/bloom_filter.h index e8ef5c0bd60db..2571ff7dedb57 100644 --- a/cpp/src/parquet/bloom_filter.h +++ b/cpp/src/parquet/bloom_filter.h @@ -313,7 +313,8 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter { /// @param input_stream The input stream from which to construct the Bloom filter. /// @return The BlockSplitBloomFilter. static BlockSplitBloomFilter Deserialize(const ReaderProperties& properties, - ArrowInputStream* input_stream); + ArrowInputStream* input_stream, + std::optional bloom_filter_length); private: inline void InsertHashImpl(uint64_t hash); diff --git a/cpp/src/parquet/bloom_filter_reader.cc b/cpp/src/parquet/bloom_filter_reader.cc index 4e27a940c2f5e..49c229029356b 100644 --- a/cpp/src/parquet/bloom_filter_reader.cc +++ b/cpp/src/parquet/bloom_filter_reader.cc @@ -63,9 +63,16 @@ std::unique_ptr RowGroupBloomFilterReaderImpl::GetColumnBloomFilter if (file_size <= *bloom_filter_offset) { throw ParquetException("file size less or equal than bloom offset"); } + std::optional bloom_filter_length = col_chunk->bloom_filter_length(); + if (bloom_filter_length.has_value() && + *bloom_filter_length + *bloom_filter_offset > file_size) { + throw ParquetException( + "bloom filter length + bloom filter offset greater than file size"); + } auto stream = ::arrow::io::RandomAccessFile::GetStream( input_, *bloom_filter_offset, file_size - *bloom_filter_offset); - auto bloom_filter = BlockSplitBloomFilter::Deserialize(properties_, stream->get()); + auto bloom_filter = + BlockSplitBloomFilter::Deserialize(properties_, stream->get(), bloom_filter_length); return std::make_unique(std::move(bloom_filter)); } diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index f43187c2dd4e5..d651ea5db0f18 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -286,6 +286,13 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { return std::nullopt; } + inline std::optional bloom_filter_length() const { + if (column_metadata_->__isset.bloom_filter_length) { + return column_metadata_->bloom_filter_length; + } + return std::nullopt; + } + inline bool has_dictionary_page() const { return column_metadata_->__isset.dictionary_page_offset; } @@ -399,6 +406,10 @@ std::optional ColumnChunkMetaData::bloom_filter_offset() const { return impl_->bloom_filter_offset(); } +std::optional ColumnChunkMetaData::bloom_filter_length() const { + return impl_->bloom_filter_length(); +} + bool ColumnChunkMetaData::has_dictionary_page() const { return impl_->has_dictionary_page(); } diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index 6609cff48bac2..e47c45ff0492a 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -168,6 +168,7 @@ class PARQUET_EXPORT ColumnChunkMetaData { const std::vector& encodings() const; const std::vector& encoding_stats() const; std::optional bloom_filter_offset() const; + std::optional bloom_filter_length() const; bool has_dictionary_page() const; int64_t dictionary_page_offset() const; int64_t data_page_offset() const; diff --git a/cpp/src/parquet/printer.cc b/cpp/src/parquet/printer.cc index 2c81abb9eee79..46085d19e4ad1 100644 --- a/cpp/src/parquet/printer.cc +++ b/cpp/src/parquet/printer.cc @@ -314,6 +314,34 @@ void ParquetFilePrinter::JSONPrint(std::ostream& stream, std::list selected << "\"UncompressedSize\": \"" << column_chunk->total_uncompressed_size() << "\", \"CompressedSize\": \"" << column_chunk->total_compressed_size(); + if (column_chunk->bloom_filter_offset()) { + // Output BloomFilter {offset, length} + stream << "\", BloomFilter {" + << "\"offset\": \"" << column_chunk->bloom_filter_offset().value(); + if (column_chunk->bloom_filter_length()) { + stream << "\", \"length\": \"" << column_chunk->bloom_filter_length().value(); + } + stream << "\"}"; + } + + if (column_chunk->GetOffsetIndexLocation()) { + auto location = column_chunk->GetOffsetIndexLocation().value(); + // Output OffsetIndex {offset, length} + stream << "\", OffsetIndex {" + << "\"offset\": \"" << location.offset; + stream << "\", \"length\": \"" << location.length; + stream << "\"}"; + } + + if (column_chunk->GetColumnIndexLocation()) { + auto location = column_chunk->GetColumnIndexLocation().value(); + // Output ColumnIndex {offset, length} + stream << "\", ColumnIndex {" + << "\"offset\": \"" << location.offset; + stream << "\", \"length\": \"" << location.length; + stream << "\"}"; + } + // end of a ColumnChunk stream << "\" }"; c1++;