Skip to content

Commit

Permalink
Refine ParquetRecordBatchReaderBuilder docs (#5774)
Browse files Browse the repository at this point in the history
* Refine ParquetRecordBatchReaderBuilder docs

* fix link

* Suggest using new(), add example
  • Loading branch information
alamb authored May 21, 2024
1 parent b07fd5d commit d65240c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 12 deletions.
12 changes: 6 additions & 6 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ use crate::file::page_index::index_reader;
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};

/// A generic builder for constructing sync or async arrow parquet readers. This is not intended
/// to be used directly, instead you should use the specialization for the type of reader
/// you wish to use
/// Builder for constructing parquet readers into arrow.
///
/// * For a synchronous API - [`ParquetRecordBatchReaderBuilder`]
/// * For an asynchronous API - [`ParquetRecordBatchStreamBuilder`]
/// Most users should use one of the following specializations:
///
/// [`ParquetRecordBatchStreamBuilder`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder
/// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`]
/// * `async` API: [`ParquetRecordBatchStreamBuilder::new`]
///
/// [`ParquetRecordBatchStreamBuilder::new`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new
pub struct ArrowReaderBuilder<T> {
pub(crate) input: T,

Expand Down
51 changes: 45 additions & 6 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl ArrowReaderMetadata {
/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
pub struct AsyncReader<T>(T);

/// A builder used to construct a [`ParquetRecordBatchStream`] for a parquet file
/// A builder used to construct a [`ParquetRecordBatchStream`] for `async` reading of a parquet file
///
/// In particular, this handles reading the parquet file metadata, allowing consumers
/// to use this information to select what specific columns, row groups, etc...
Expand All @@ -239,6 +239,37 @@ pub type ParquetRecordBatchStreamBuilder<T> = ArrowReaderBuilder<AsyncReader<T>>

impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
/// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file
///
/// # Example
///
/// ```
/// # use std::fs::metadata;
/// # use std::sync::Arc;
/// # use bytes::Bytes;
/// # use arrow_array::{Int32Array, RecordBatch};
/// # use arrow_schema::{DataType, Field, Schema};
/// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
/// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
/// # use tempfile::tempfile;
/// # use futures::StreamExt;
/// # #[tokio::main(flavor="current_thread")]
/// # async fn main() {
/// #
/// # let mut file = tempfile().unwrap();
/// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
/// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
/// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
/// # writer.write(&batch).unwrap();
/// # writer.close().unwrap();
/// // Open async file containing parquet data
/// let mut file = tokio::fs::File::from_std(file);
/// // construct the reader
/// let mut reader = ParquetRecordBatchStreamBuilder::new(file)
/// .await.unwrap().build().unwrap();
/// // Read batche
/// let batch: RecordBatch = reader.next().await.unwrap().unwrap();
/// # }
/// ```
pub async fn new(input: T) -> Result<Self> {
Self::new_with_options(input, Default::default()).await
}
Expand All @@ -253,7 +284,9 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
/// Create a [`ParquetRecordBatchStreamBuilder`] from the provided [`ArrowReaderMetadata`]
///
/// This allows loading metadata once and using it to create multiple builders with
/// potentially different settings
/// potentially different settings, that can be read in parallel.
///
/// # Example of reading from multiple streams in parallel
///
/// ```
/// # use std::fs::metadata;
Expand All @@ -268,23 +301,29 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
/// # #[tokio::main(flavor="current_thread")]
/// # async fn main() {
/// #
/// let mut file = tempfile().unwrap();
/// # let mut file = tempfile().unwrap();
/// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)]));
/// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap();
/// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
/// # writer.write(&batch).unwrap();
/// # writer.close().unwrap();
/// #
/// // open file with parquet data
/// let mut file = tokio::fs::File::from_std(file);
/// // load metadata once
/// let meta = ArrowReaderMetadata::load_async(&mut file, Default::default()).await.unwrap();
/// // create two readers, a and b, from the same underlying file
/// // without reading the metadata again
/// let mut a = ParquetRecordBatchStreamBuilder::new_with_metadata(
/// file.try_clone().await.unwrap(),
/// meta.clone()
/// ).build().unwrap();
/// let mut b = ParquetRecordBatchStreamBuilder::new_with_metadata(file, meta).build().unwrap();
///
/// // Should be able to read from both in parallel
/// assert_eq!(a.next().await.unwrap().unwrap(), b.next().await.unwrap().unwrap());
/// // Can read batches from both readers in parallel
/// assert_eq!(
/// a.next().await.unwrap().unwrap(),
/// b.next().await.unwrap().unwrap(),
/// );
/// # }
/// ```
pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
Expand Down

0 comments on commit d65240c

Please sign in to comment.