Skip to content

Commit

Permalink
feat: add method for async read bloomfilter
Browse files Browse the repository at this point in the history
  • Loading branch information
hengfeiyang committed Oct 11, 2023
1 parent 4aabd2c commit 3aa3da4
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 8 deletions.
49 changes: 44 additions & 5 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
use std::collections::VecDeque;
use std::fmt::Formatter;

use std::io::SeekFrom;
use std::ops::Range;
use std::pin::Pin;
Expand All @@ -88,7 +87,6 @@ use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use futures::stream::Stream;

use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

use arrow_array::RecordBatch;
Expand All @@ -102,15 +100,16 @@ use crate::arrow::arrow_reader::{
};
use crate::arrow::ProjectionMask;

use crate::bloom_filter::{
read_bloom_filter_header_and_length, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
};
use crate::column::page::{PageIterator, PageReader};

use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
use crate::format::PageLocation;

use crate::file::FOOTER_SIZE;
use crate::format::PageLocation;

mod metadata;
pub use metadata::*;
Expand Down Expand Up @@ -302,6 +301,46 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
Self::new_builder(AsyncReader(input), metadata)
}

/// Read bloom filter for a column in a row group
/// Returns `None` if the column does not have a bloom filter
///
/// We should call this function after other forms pruning, such as projection and predicate pushdown.
pub async fn get_row_group_column_bloom_filter(
&mut self,
row_group_idx: usize,
column_idx: usize,
) -> Result<Option<Sbbf>> {
let metadata = self.metadata.row_group(row_group_idx);
let column_metadata = metadata.column(column_idx);
let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() {
offset.try_into().map_err(|_| {
ParquetError::General("Bloom filter offset is invalid".to_string())
})?
} else {
return Ok(None);
};

let buffer = self
.input
.0
.get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE)
.await
.unwrap();
let (header, length) = read_bloom_filter_header_and_length(buffer)?;
let (header, bitset_offset) = (header, offset + length as usize);

// length in bytes
let length: usize = header.num_bytes.try_into().map_err(|_| {
ParquetError::General("Bloom filter length is invalid".to_string())
})?;
let bitset = self
.input
.0
.get_bytes(bitset_offset..bitset_offset + length)
.await?;
Ok(Some(Sbbf::new(&bitset)))
}

/// Build a new [`ParquetRecordBatchStream`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
let num_row_groups = self.metadata.row_groups().len();
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl std::ops::IndexMut<usize> for Block {
#[derive(Debug, Clone)]
pub struct Sbbf(Vec<Block>);

const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;

/// given an initial offset, and a byte buffer, try to read out a bloom filter header and return
/// both the header and the offset after it (for bitset).
Expand All @@ -147,7 +147,7 @@ fn chunk_read_bloom_filter_header_and_offset(
/// given a [Bytes] buffer, try to read out a bloom filter header and return both the header and
/// length of the header.
#[inline]
fn read_bloom_filter_header_and_length(
pub(crate) fn read_bloom_filter_header_and_length(
buffer: Bytes,
) -> Result<(BloomFilterHeader, u64), ParquetError> {
let total_length = buffer.len();
Expand Down Expand Up @@ -199,7 +199,7 @@ impl Sbbf {
Self::new(&bitset)
}

fn new(bitset: &[u8]) -> Self {
pub(crate) fn new(bitset: &[u8]) -> Self {
let data = bitset
.chunks_exact(4 * 8)
.map(|chunk| {
Expand Down

0 comments on commit 3aa3da4

Please sign in to comment.