Skip to content

Commit

Permalink
Minor: Clarify rationale for FlightDataEncoder API, add examples
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Oct 10, 2023
1 parent 4aabd2c commit 25668dc
Showing 1 changed file with 44 additions and 4 deletions.
48 changes: 44 additions & 4 deletions arrow-flight/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt};
/// This can be used to implement [`FlightService::do_get`] in an
/// Arrow Flight implementation;
///
/// This structure encodes a stream of `Result`s rather than `RecordBatch`es to
/// propagate errors from streaming execution, where the generation of the
/// `RecordBatch`es is incremental, and an error may occur even after
/// several have already been successfully produced.
///
/// # Caveats
/// 1. [`DictionaryArray`](arrow_array::array::DictionaryArray)s
/// are converted to their underlying types prior to transport, due to
Expand All @@ -41,14 +46,14 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt};
/// # use arrow_array::{ArrayRef, RecordBatch, UInt32Array};
/// # async fn f() {
/// # let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
/// # let record_batch = RecordBatch::try_from_iter(vec![
/// # let batch = RecordBatch::try_from_iter(vec![
/// # ("a", Arc::new(c1) as ArrayRef)
/// # ])
/// # .expect("cannot create record batch");
/// use arrow_flight::encode::FlightDataEncoderBuilder;
///
/// // Get an input stream of Result<RecordBatch, FlightError>
/// let input_stream = futures::stream::iter(vec![Ok(record_batch)]);
/// let input_stream = futures::stream::iter(vec![Ok(batch)]);
///
/// // Build a stream of `Result<FlightData>` (e.g. to return for do_get)
/// let flight_data_stream = FlightDataEncoderBuilder::new()
Expand All @@ -59,6 +64,39 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt};
/// # }
/// ```
///
/// # Example: Sending `Vec<RecordBatch>`
///
/// You can create a [`Stream`] to pass to [`Self::build`] from an existing
/// `Vec` of `RecordBatch`es like this:
///
/// ```
/// # use std::sync::Arc;
/// # use arrow_array::{ArrayRef, RecordBatch, UInt32Array};
/// # async fn f() {
/// # fn make_batches() -> Vec<RecordBatch> {
/// # let c1 = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
/// # let batch = RecordBatch::try_from_iter(vec![
/// # ("a", Arc::new(c1) as ArrayRef)
/// # ])
/// # .expect("cannot create record batch");
/// # vec![batch.clone(), batch.clone()]
/// # }
/// use arrow_flight::encode::FlightDataEncoderBuilder;
///
/// // Get batches that you want to send via Flight
/// let batches: Vec<RecordBatch> = make_batches();
///
/// // Create an input stream of Result<RecordBatch, FlightError>
/// let input_stream = futures::stream::iter(
/// batches.into_iter().map(Ok)
/// );
///
/// // Build a stream of `Result<FlightData>` (e.g. to return for do_get)
/// let flight_data_stream = FlightDataEncoderBuilder::new()
/// .build(input_stream);
/// # }
/// ```
///
/// [`FlightService::do_get`]: crate::flight_service_server::FlightService::do_get
/// [`FlightError`]: crate::error::FlightError
#[derive(Debug)]
Expand Down Expand Up @@ -146,8 +184,10 @@ impl FlightDataEncoderBuilder {
self
}

/// Return a [`Stream`] of [`FlightData`],
/// consuming self. More details on [`FlightDataEncoder`]
/// Takes a [`Stream`] of [`Result<RecordBatch>`] and returns a [`Stream`]
/// of [`FlightData`], consuming self.
///
/// See example on [`Self`] and [`FlightDataEncoder`] for more details
pub fn build<S>(self, input: S) -> FlightDataEncoder
where
S: Stream<Item = Result<RecordBatch>> + Send + 'static,
Expand Down

0 comments on commit 25668dc

Please sign in to comment.