Skip to content

Commit

Permalink
fix: compatible for bloom filter length
Browse files Browse the repository at this point in the history
  • Loading branch information
hengfeiyang committed Oct 11, 2023
1 parent 3aa3da4 commit de1b46e
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 21 deletions.
67 changes: 47 additions & 20 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,17 @@ use crate::arrow::arrow_reader::{
use crate::arrow::ProjectionMask;

use crate::bloom_filter::{
read_bloom_filter_header_and_length, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
};
use crate::column::page::{PageIterator, PageReader};
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
use crate::file::FOOTER_SIZE;
use crate::format::PageLocation;
use crate::format::{
BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, PageLocation,
};

mod metadata;
pub use metadata::*;
Expand Down Expand Up @@ -312,6 +314,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
) -> Result<Option<Sbbf>> {
let metadata = self.metadata.row_group(row_group_idx);
let column_metadata = metadata.column(column_idx);

let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() {
offset.try_into().map_err(|_| {
ParquetError::General("Bloom filter offset is invalid".to_string())
Expand All @@ -320,24 +323,48 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
return Ok(None);
};

let buffer = self
.input
.0
.get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE)
.await
.unwrap();
let (header, length) = read_bloom_filter_header_and_length(buffer)?;
let (header, bitset_offset) = (header, offset + length as usize);

// length in bytes
let length: usize = header.num_bytes.try_into().map_err(|_| {
ParquetError::General("Bloom filter length is invalid".to_string())
})?;
let bitset = self
.input
.0
.get_bytes(bitset_offset..bitset_offset + length)
.await?;
let buffer = match column_metadata.bloom_filter_length() {
Some(length) => self.input.0.get_bytes(offset..offset + length as usize),
None => self
.input
.0
.get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE),
}
.await?;

let (header, bitset_offset) =
chunk_read_bloom_filter_header_and_offset(offset as u64, buffer.clone())?;

match header.algorithm {
BloomFilterAlgorithm::BLOCK(_) => {
// this match exists to future proof the singleton algorithm enum
}
}
match header.compression {
BloomFilterCompression::UNCOMPRESSED(_) => {
// this match exists to future proof the singleton compression enum
}
}
match header.hash {
BloomFilterHash::XXHASH(_) => {
// this match exists to future proof the singleton hash enum
}
}

let bitset = match column_metadata.bloom_filter_length() {
Some(_) => buffer.slice((bitset_offset as usize - offset)..),
None => {
let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
ParquetError::General("Bloom filter length is invalid".to_string())
})?;
self.input
.0
.get_bytes(
bitset_offset as usize..bitset_offset as usize + bitset_length,
)
.await?
}
};
Ok(Some(Sbbf::new(&bitset)))
}

Expand Down
2 changes: 1 addition & 1 deletion parquet/src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;

/// given an initial offset, and a byte buffer, try to read out a bloom filter header and return
/// both the header and the offset after it (for bitset).
fn chunk_read_bloom_filter_header_and_offset(
pub(crate) fn chunk_read_bloom_filter_header_and_offset(
offset: u64,
buffer: Bytes,
) -> Result<(BloomFilterHeader, u64), ParquetError> {
Expand Down

0 comments on commit de1b46e

Please sign in to comment.