Skip to content

Commit

Permalink
Add basic implement for read BloomFilter with length
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Nov 23, 2023
1 parent 76f5976 commit b08ea45
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 9 deletions.
19 changes: 13 additions & 6 deletions cpp/src/parquet/bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,23 @@ static ::arrow::Status ValidateBloomFilterHeader(
}

BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(
const ReaderProperties& properties, ArrowInputStream* input) {
// NOTE: we don't know the bloom filter header size upfront, and we can't rely on
// InputStream::Peek() which isn't always implemented. Therefore, we must first
// Read() with an upper bound estimate of the header size, then once we know
// the bloom filter data size, we can Read() the exact number of remaining data bytes.
const ReaderProperties& properties, ArrowInputStream* input, std::optional<int64_t> bloom_filter_length) {
ThriftDeserializer deserializer(properties);
format::BloomFilterHeader header;
int64_t bloom_filter_header_read_size = 0;
if (bloom_filter_length.has_value()) {
bloom_filter_header_read_size = bloom_filter_length.value();
} else {
// NOTE: we don't know the bloom filter header size upfront without bloom_filter_length,
// and we can't rely on InputStream::Peek() which isn't always implemented.
// Therefore, we must first Read() with an upper bound estimate of the header size,
// then once we know the bloom filter data size, we can Read() the exact number of
// remaining data bytes.
bloom_filter_header_read_size = kBloomFilterHeaderSizeGuess;
}

// Read and deserialize bloom filter header
PARQUET_ASSIGN_OR_THROW(auto header_buf, input->Read(kBloomFilterHeaderSizeGuess));
PARQUET_ASSIGN_OR_THROW(auto header_buf, input->Read(bloom_filter_header_read_size));
// This gets used, then set by DeserializeThriftMsg
uint32_t header_size = static_cast<uint32_t>(header_buf->size());
try {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter {
/// @param input_stream The input stream from which to construct the Bloom filter.
/// @return The BlockSplitBloomFilter.
static BlockSplitBloomFilter Deserialize(const ReaderProperties& properties,
ArrowInputStream* input_stream);
ArrowInputStream* input_stream, std::optional<int64_t> bloom_filter_length);

private:
inline void InsertHashImpl(uint64_t hash);
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/parquet/bloom_filter_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,13 @@ std::unique_ptr<BloomFilter> RowGroupBloomFilterReaderImpl::GetColumnBloomFilter
if (file_size <= *bloom_filter_offset) {
throw ParquetException("file size less or equal than bloom offset");
}
std::optional<int64_t> bloom_filter_length = col_chunk->bloom_filter_length();
if (bloom_filter_length.has_value() && *bloom_filter_length + *bloom_filter_offset > file_size) {
throw ParquetException("bloom filter length + bloom filter offset greater than file size");
}
auto stream = ::arrow::io::RandomAccessFile::GetStream(
input_, *bloom_filter_offset, file_size - *bloom_filter_offset);
auto bloom_filter = BlockSplitBloomFilter::Deserialize(properties_, stream->get());
auto bloom_filter = BlockSplitBloomFilter::Deserialize(properties_, stream->get(), bloom_filter_length);
return std::make_unique<BlockSplitBloomFilter>(std::move(bloom_filter));
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1983,7 +1983,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl<ByteArrayType>,
int values_decoded = 0;

ArrowBinaryHelper<ByteArrayType> helper(out, num_values);
RETURN_NOT_OK(helper.Prepare(len_));
RETURN_NOT_OK(helper.Prepare());

auto dict_values = reinterpret_cast<const ByteArray*>(dictionary_->data());

Expand Down
11 changes: 11 additions & 0 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,13 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
return std::nullopt;
}

inline std::optional<int64_t> bloom_filter_length() const {
if (column_metadata_->__isset.bloom_filter_length) {
return column_metadata_->bloom_filter_length;
}
return std::nullopt;
}

inline bool has_dictionary_page() const {
return column_metadata_->__isset.dictionary_page_offset;
}
Expand Down Expand Up @@ -399,6 +406,10 @@ std::optional<int64_t> ColumnChunkMetaData::bloom_filter_offset() const {
return impl_->bloom_filter_offset();
}

std::optional<int64_t> ColumnChunkMetaData::bloom_filter_length() const {
return impl_->bloom_filter_length();
}

bool ColumnChunkMetaData::has_dictionary_page() const {
return impl_->has_dictionary_page();
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class PARQUET_EXPORT ColumnChunkMetaData {
const std::vector<Encoding::type>& encodings() const;
const std::vector<PageEncodingStats>& encoding_stats() const;
std::optional<int64_t> bloom_filter_offset() const;
std::optional<int64_t> bloom_filter_length() const;
bool has_dictionary_page() const;
int64_t dictionary_page_offset() const;
int64_t data_page_offset() const;
Expand Down
25 changes: 25 additions & 0 deletions cpp/src/parquet/printer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,31 @@ void ParquetFilePrinter::JSONPrint(std::ostream& stream, std::list<int> selected
<< "\"UncompressedSize\": \"" << column_chunk->total_uncompressed_size()
<< "\", \"CompressedSize\": \"" << column_chunk->total_compressed_size();

if (column_chunk->bloom_filter_offset()) {
// Output BloomFilter {offset, length}
stream << "\", BloomFilter {" << "\"offset\": \"" << column_chunk->bloom_filter_offset().value();
if (column_chunk->bloom_filter_length()) {
stream << "\", \"length\": \"" << column_chunk->bloom_filter_length().value();
}
stream << "\"}";
}

if (column_chunk->GetOffsetIndexLocation()) {
auto location = column_chunk->GetOffsetIndexLocation().value();
// Output OffsetIndex {offset, length}
stream << "\", OffsetIndex {" << "\"offset\": \"" << location.offset;
stream << "\", \"length\": \"" << location.length;
stream << "\"}";
}

if (column_chunk->GetColumnIndexLocation()) {
auto location = column_chunk->GetColumnIndexLocation().value();
// Output ColumnIndex {offset, length}
stream << "\", ColumnIndex {" << "\"offset\": \"" << location.offset;
stream << "\", \"length\": \"" << location.length;
stream << "\"}";
}

// end of a ColumnChunk
stream << "\" }";
c1++;
Expand Down

0 comments on commit b08ea45

Please sign in to comment.