From 25668dc18cd06be06d8547fdc8ae9e26d96298f6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 10 Oct 2023 15:30:41 -0400 Subject: [PATCH] Minor: Clarify rationale for FlightDataEncoder API, add examples --- arrow-flight/src/encode.rs | 48 ++++++++++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 4 deletions(-) diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index cd2ee7c02b68..28c181c0d5fd 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -30,6 +30,11 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt}; /// This can be used to implement [`FlightService::do_get`] in an /// Arrow Flight implementation; /// +/// This structure encodes a stream of `Result`s rather than `RecordBatch`es to +/// propagate errors from streaming execution, where the generation of the +/// `RecordBatch`es is incremental, and an error may occur even after +/// several have already been successfully produced. +/// /// # Caveats /// 1. [`DictionaryArray`](arrow_array::array::DictionaryArray)s /// are converted to their underlying types prior to transport, due to @@ -41,14 +46,14 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt}; /// # use arrow_array::{ArrayRef, RecordBatch, UInt32Array}; /// # async fn f() { /// # let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]); -/// # let record_batch = RecordBatch::try_from_iter(vec![ +/// # let batch = RecordBatch::try_from_iter(vec![ /// # ("a", Arc::new(c1) as ArrayRef) /// # ]) /// # .expect("cannot create record batch"); /// use arrow_flight::encode::FlightDataEncoderBuilder; /// /// // Get an input stream of Result -/// let input_stream = futures::stream::iter(vec![Ok(record_batch)]); +/// let input_stream = futures::stream::iter(vec![Ok(batch)]); /// /// // Build a stream of `Result` (e.g. to return for do_get) /// let flight_data_stream = FlightDataEncoderBuilder::new() @@ -59,6 +64,39 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt}; /// # } /// ``` /// +/// # Example: Sending `Vec` +/// +/// You can create a [`Stream`] to pass to [`Self::build`] from an existing +/// `Vec` of `RecordBatch`es like this: +/// +/// ``` +/// # use std::sync::Arc; +/// # use arrow_array::{ArrayRef, RecordBatch, UInt32Array}; +/// # async fn f() { +/// # fn make_batches() -> Vec { +/// # let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]); +/// # let batch = RecordBatch::try_from_iter(vec![ +/// # ("a", Arc::new(c1) as ArrayRef) +/// # ]) +/// # .expect("cannot create record batch"); +/// # vec![batch.clone(), batch.clone()] +/// # } +/// use arrow_flight::encode::FlightDataEncoderBuilder; +/// +/// // Get batches that you want to send via Flight +/// let batches: Vec = make_batches(); +/// +/// // Create an input stream of Result +/// let input_stream = futures::stream::iter( +/// batches.into_iter().map(Ok) +/// ); +/// +/// // Build a stream of `Result` (e.g. to return for do_get) +/// let flight_data_stream = FlightDataEncoderBuilder::new() +/// .build(input_stream); +/// # } +/// ``` +/// /// [`FlightService::do_get`]: crate::flight_service_server::FlightService::do_get /// [`FlightError`]: crate::error::FlightError #[derive(Debug)] @@ -146,8 +184,10 @@ impl FlightDataEncoderBuilder { self } - /// Return a [`Stream`] of [`FlightData`], - /// consuming self. More details on [`FlightDataEncoder`] + /// Takes a [`Stream`] of [`Result`] and returns a [`Stream`] + /// of [`FlightData`], consuming self. + /// + /// See example on [`Self`] and [`FlightDataEncoder`] for more details pub fn build(self, input: S) -> FlightDataEncoder where S: Stream> + Send + 'static,