diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index c749d4deeb16..10d92e8815cf 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -77,7 +77,6 @@ use std::collections::VecDeque; use std::fmt::Formatter; - use std::io::SeekFrom; use std::ops::Range; use std::pin::Pin; @@ -88,7 +87,6 @@ use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt}; use futures::ready; use futures::stream::Stream; - use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use arrow_array::RecordBatch; @@ -102,15 +100,16 @@ use crate::arrow::arrow_reader::{ }; use crate::arrow::ProjectionMask; +use crate::bloom_filter::{ + read_bloom_filter_header_and_length, Sbbf, SBBF_HEADER_SIZE_ESTIMATE, +}; use crate::column::page::{PageIterator, PageReader}; - use crate::errors::{ParquetError, Result}; use crate::file::footer::{decode_footer, decode_metadata}; use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; -use crate::format::PageLocation; - use crate::file::FOOTER_SIZE; +use crate::format::PageLocation; mod metadata; pub use metadata::*; @@ -302,6 +301,46 @@ impl ParquetRecordBatchStreamBuilder { Self::new_builder(AsyncReader(input), metadata) } + /// Read bloom filter for a column in a row group + /// Returns `None` if the column does not have a bloom filter + /// + /// We should call this function after other forms pruning, such as projection and predicate pushdown. + pub async fn get_row_group_column_bloom_filter( + &mut self, + row_group_idx: usize, + column_idx: usize, + ) -> Result> { + let metadata = self.metadata.row_group(row_group_idx); + let column_metadata = metadata.column(column_idx); + let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() { + offset.try_into().map_err(|_| { + ParquetError::General("Bloom filter offset is invalid".to_string()) + })? + } else { + return Ok(None); + }; + + let buffer = self + .input + .0 + .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE) + .await + .unwrap(); + let (header, length) = read_bloom_filter_header_and_length(buffer)?; + let (header, bitset_offset) = (header, offset + length as usize); + + // length in bytes + let length: usize = header.num_bytes.try_into().map_err(|_| { + ParquetError::General("Bloom filter length is invalid".to_string()) + })?; + let bitset = self + .input + .0 + .get_bytes(bitset_offset..bitset_offset + length) + .await?; + Ok(Some(Sbbf::new(&bitset))) + } + /// Build a new [`ParquetRecordBatchStream`] pub fn build(self) -> Result> { let num_row_groups = self.metadata.row_groups().len(); diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index a3807eb37011..681abc400974 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -132,7 +132,7 @@ impl std::ops::IndexMut for Block { #[derive(Debug, Clone)] pub struct Sbbf(Vec); -const SBBF_HEADER_SIZE_ESTIMATE: usize = 20; +pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20; /// given an initial offset, and a byte buffer, try to read out a bloom filter header and return /// both the header and the offset after it (for bitset). @@ -147,7 +147,7 @@ fn chunk_read_bloom_filter_header_and_offset( /// given a [Bytes] buffer, try to read out a bloom filter header and return both the header and /// length of the header. #[inline] -fn read_bloom_filter_header_and_length( +pub(crate) fn read_bloom_filter_header_and_length( buffer: Bytes, ) -> Result<(BloomFilterHeader, u64), ParquetError> { let total_length = buffer.len(); @@ -199,7 +199,7 @@ impl Sbbf { Self::new(&bitset) } - fn new(bitset: &[u8]) -> Self { + pub(crate) fn new(bitset: &[u8]) -> Self { let data = bitset .chunks_exact(4 * 8) .map(|chunk| {