Skip to content

Commit

Permalink
Support parquet bloom filter length
Browse files Browse the repository at this point in the history
Signed-off-by: Letian Jiang <[email protected]>
  • Loading branch information
letian-jiang committed Oct 1, 2023
1 parent 587250c commit 351e1a9
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 373 deletions.
25 changes: 15 additions & 10 deletions parquet/src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,12 @@ pub struct Sbbf(Vec<Block>);

const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;

/// given an initial offset, and a [ChunkReader], try to read out a bloom filter header and return
/// given an initial offset, and a byte buffer, try to read out a bloom filter header and return
/// both the header and the offset after it (for bitset).
fn chunk_read_bloom_filter_header_and_offset<R: ChunkReader>(
fn chunk_read_bloom_filter_header_and_offset(
offset: u64,
reader: Arc<R>,
buffer: Bytes,
) -> Result<(BloomFilterHeader, u64), ParquetError> {
let buffer = reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE)?;
let (header, length) = read_bloom_filter_header_and_length(buffer)?;
Ok((header, offset + length))
}
Expand Down Expand Up @@ -271,8 +270,13 @@ impl Sbbf {
return Ok(None);
};

let buffer = match column_metadata.bloom_filter_length() {
Some(length) => reader.get_bytes(offset, length as usize),
None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
}?;

let (header, bitset_offset) =
chunk_read_bloom_filter_header_and_offset(offset, reader.clone())?;
chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;

match header.algorithm {
BloomFilterAlgorithm::BLOCK(_) => {
Expand All @@ -289,11 +293,12 @@ impl Sbbf {
// this match exists to future proof the singleton hash enum
}
}
// length in bytes
let length: usize = header.num_bytes.try_into().map_err(|_| {
ParquetError::General("Bloom filter length is invalid".to_string())
})?;
let bitset = reader.get_bytes(bitset_offset, length)?;

let bitset = match column_metadata.bloom_filter_length() {
Some(_) => buffer.slice((bitset_offset - offset) as usize..),
None => reader.get_bytes(bitset_offset, header.num_bytes as usize)?,
};

Ok(Some(Self::new(&bitset)))
}

Expand Down
17 changes: 17 additions & 0 deletions parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ pub struct ColumnChunkMetaData {
statistics: Option<Statistics>,
encoding_stats: Option<Vec<PageEncodingStats>>,
bloom_filter_offset: Option<i64>,
bloom_filter_length: Option<i32>,
offset_index_offset: Option<i64>,
offset_index_length: Option<i32>,
column_index_offset: Option<i64>,
Expand Down Expand Up @@ -591,6 +592,11 @@ impl ColumnChunkMetaData {
self.bloom_filter_offset
}

/// Returns the offset for the bloom filter.
pub fn bloom_filter_length(&self) -> Option<i32> {
self.bloom_filter_length
}

/// Returns the offset for the column index.
pub fn column_index_offset(&self) -> Option<i64> {
self.column_index_offset
Expand Down Expand Up @@ -657,6 +663,7 @@ impl ColumnChunkMetaData {
})
.transpose()?;
let bloom_filter_offset = col_metadata.bloom_filter_offset;
let bloom_filter_length = col_metadata.bloom_filter_length;
let offset_index_offset = cc.offset_index_offset;
let offset_index_length = cc.offset_index_length;
let column_index_offset = cc.column_index_offset;
Expand All @@ -677,6 +684,7 @@ impl ColumnChunkMetaData {
statistics,
encoding_stats,
bloom_filter_offset,
bloom_filter_length,
offset_index_offset,
offset_index_length,
column_index_offset,
Expand Down Expand Up @@ -722,6 +730,7 @@ impl ColumnChunkMetaData {
.as_ref()
.map(|vec| vec.iter().map(page_encoding_stats::to_thrift).collect()),
bloom_filter_offset: self.bloom_filter_offset,
bloom_filter_length: self.bloom_filter_length,
}
}

Expand Down Expand Up @@ -752,6 +761,7 @@ impl ColumnChunkMetaDataBuilder {
statistics: None,
encoding_stats: None,
bloom_filter_offset: None,
bloom_filter_length: None,
offset_index_offset: None,
offset_index_length: None,
column_index_offset: None,
Expand Down Expand Up @@ -837,6 +847,12 @@ impl ColumnChunkMetaDataBuilder {
self
}

/// Sets optional bloom filter length in bytes.
pub fn set_bloom_filter_length(mut self, value: Option<i32>) -> Self {
self.0.bloom_filter_length = value;
self
}

/// Sets optional offset index offset in bytes.
pub fn set_offset_index_offset(mut self, value: Option<i64>) -> Self {
self.0.offset_index_offset = value;
Expand Down Expand Up @@ -1053,6 +1069,7 @@ mod tests {
},
])
.set_bloom_filter_offset(Some(6000))
.set_bloom_filter_length(Some(25))
.set_offset_index_offset(Some(7000))
.set_offset_index_length(Some(25))
.set_column_index_offset(Some(8000))
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,19 @@ impl<W: Write + Send> SerializedFileWriter<W> {
Some(bloom_filter) => {
let start_offset = self.buf.bytes_written();
bloom_filter.write(&mut self.buf)?;
let end_offset = self.buf.bytes_written();
// set offset and index for bloom filter
column_chunk
.meta_data
.as_mut()
.expect("can't have bloom filter without column metadata")
.bloom_filter_offset = Some(start_offset as i64);
column_chunk
.meta_data
.as_mut()
.expect("can't have bloom filter without column metadata")
.bloom_filter_length =
Some((end_offset - start_offset) as i32)
}
None => {}
}
Expand Down
Loading

0 comments on commit 351e1a9

Please sign in to comment.