Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support parquet bloom filter length #4885

Merged
merged 2 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions parquet/src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,12 @@ pub struct Sbbf(Vec<Block>);

const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;

/// given an initial offset, and a [ChunkReader], try to read out a bloom filter header and return
/// given an initial offset, and a byte buffer, try to read out a bloom filter header and return
/// both the header and the offset after it (for bitset).
fn chunk_read_bloom_filter_header_and_offset<R: ChunkReader>(
fn chunk_read_bloom_filter_header_and_offset(
offset: u64,
reader: Arc<R>,
buffer: Bytes,
) -> Result<(BloomFilterHeader, u64), ParquetError> {
let buffer = reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE)?;
let (header, length) = read_bloom_filter_header_and_length(buffer)?;
Ok((header, offset + length))
}
Expand Down Expand Up @@ -271,8 +270,13 @@ impl Sbbf {
return Ok(None);
};

let buffer = match column_metadata.bloom_filter_length() {
Some(length) => reader.get_bytes(offset, length as usize),
None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
}?;

let (header, bitset_offset) =
chunk_read_bloom_filter_header_and_offset(offset, reader.clone())?;
chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;

match header.algorithm {
BloomFilterAlgorithm::BLOCK(_) => {
Expand All @@ -289,11 +293,17 @@ impl Sbbf {
// this match exists to future proof the singleton hash enum
}
}
// length in bytes
let length: usize = header.num_bytes.try_into().map_err(|_| {
ParquetError::General("Bloom filter length is invalid".to_string())
})?;
Comment on lines -293 to -295
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We appear to have lost this check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

let bitset = reader.get_bytes(bitset_offset, length)?;

let bitset = match column_metadata.bloom_filter_length() {
Some(_) => buffer.slice((bitset_offset - offset) as usize..),
None => {
let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
ParquetError::General("Bloom filter length is invalid".to_string())
})?;
reader.get_bytes(bitset_offset, bitset_length)?
}
};

Ok(Some(Self::new(&bitset)))
}

Expand Down
17 changes: 17 additions & 0 deletions parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ pub struct ColumnChunkMetaData {
statistics: Option<Statistics>,
encoding_stats: Option<Vec<PageEncodingStats>>,
bloom_filter_offset: Option<i64>,
bloom_filter_length: Option<i32>,
offset_index_offset: Option<i64>,
offset_index_length: Option<i32>,
column_index_offset: Option<i64>,
Expand Down Expand Up @@ -591,6 +592,11 @@ impl ColumnChunkMetaData {
self.bloom_filter_offset
}

/// Returns the offset for the bloom filter.
pub fn bloom_filter_length(&self) -> Option<i32> {
self.bloom_filter_length
}

/// Returns the offset for the column index.
pub fn column_index_offset(&self) -> Option<i64> {
self.column_index_offset
Expand Down Expand Up @@ -657,6 +663,7 @@ impl ColumnChunkMetaData {
})
.transpose()?;
let bloom_filter_offset = col_metadata.bloom_filter_offset;
let bloom_filter_length = col_metadata.bloom_filter_length;
let offset_index_offset = cc.offset_index_offset;
let offset_index_length = cc.offset_index_length;
let column_index_offset = cc.column_index_offset;
Expand All @@ -677,6 +684,7 @@ impl ColumnChunkMetaData {
statistics,
encoding_stats,
bloom_filter_offset,
bloom_filter_length,
offset_index_offset,
offset_index_length,
column_index_offset,
Expand Down Expand Up @@ -722,6 +730,7 @@ impl ColumnChunkMetaData {
.as_ref()
.map(|vec| vec.iter().map(page_encoding_stats::to_thrift).collect()),
bloom_filter_offset: self.bloom_filter_offset,
bloom_filter_length: self.bloom_filter_length,
}
}

Expand Down Expand Up @@ -752,6 +761,7 @@ impl ColumnChunkMetaDataBuilder {
statistics: None,
encoding_stats: None,
bloom_filter_offset: None,
bloom_filter_length: None,
offset_index_offset: None,
offset_index_length: None,
column_index_offset: None,
Expand Down Expand Up @@ -837,6 +847,12 @@ impl ColumnChunkMetaDataBuilder {
self
}

/// Sets optional bloom filter length in bytes.
pub fn set_bloom_filter_length(mut self, value: Option<i32>) -> Self {
self.0.bloom_filter_length = value;
self
}

/// Sets optional offset index offset in bytes.
pub fn set_offset_index_offset(mut self, value: Option<i64>) -> Self {
self.0.offset_index_offset = value;
Expand Down Expand Up @@ -1053,6 +1069,7 @@ mod tests {
},
])
.set_bloom_filter_offset(Some(6000))
.set_bloom_filter_length(Some(25))
.set_offset_index_offset(Some(7000))
.set_offset_index_length(Some(25))
.set_column_index_offset(Some(8000))
Expand Down
9 changes: 6 additions & 3 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,15 @@ impl<W: Write + Send> SerializedFileWriter<W> {
Some(bloom_filter) => {
let start_offset = self.buf.bytes_written();
bloom_filter.write(&mut self.buf)?;
let end_offset = self.buf.bytes_written();
// set offset and index for bloom filter
column_chunk
let column_chunk_meta = column_chunk
.meta_data
.as_mut()
.expect("can't have bloom filter without column metadata")
.bloom_filter_offset = Some(start_offset as i64);
.expect("can't have bloom filter without column metadata");
column_chunk_meta.bloom_filter_offset = Some(start_offset as i64);
column_chunk_meta.bloom_filter_length =
Some((end_offset - start_offset) as i32);
}
None => {}
}
Expand Down
Loading