Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to FlightDataEncoder to always resend batch dictionaries #4896

Merged
merged 4 commits into from
Oct 12, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 163 additions & 34 deletions arrow-flight/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt};
/// Arrow Flight implementation;
///
/// # Caveats
/// 1. [`DictionaryArray`](arrow_array::array::DictionaryArray)s
/// are converted to their underlying types prior to transport, due to
/// <https://github.com/apache/arrow-rs/issues/3389>.
/// 1. When [`DictionaryHandling`] is [`DictionaryHandling::Hydrate`], [`DictionaryArray`](arrow_array::array::DictionaryArray)s
/// are converted to their underlying types prior to transport.
/// When [`DictionaryHandling`] is [`DictionaryHandling::Resend`], Dictionary [`FlightData`] is sent with every
/// [`RecordBatch`] that contains a [`DictionaryArray`](arrow_array::array::DictionaryArray).
/// See <https://github.com/apache/arrow-rs/issues/3389>.
///
/// # Example
/// ```no_run
Expand Down Expand Up @@ -74,6 +76,9 @@ pub struct FlightDataEncoderBuilder {
schema: Option<SchemaRef>,
/// Optional flight descriptor, if known before data.
descriptor: Option<FlightDescriptor>,
/// Deterimines how `DictionaryArray`s are encoded for transport.
/// See [`DictionaryHandling`] for more information.
dictionary_handling: DictionaryHandling,
}

/// Default target size for encoded [`FlightData`].
Expand All @@ -90,6 +95,7 @@ impl Default for FlightDataEncoderBuilder {
app_metadata: Bytes::new(),
schema: None,
descriptor: None,
dictionary_handling: DictionaryHandling::Hydrate,
}
}
}
Expand All @@ -114,6 +120,15 @@ impl FlightDataEncoderBuilder {
self
}

/// Set [`DictionaryHandling`] for encoder
pub fn with_dictionary_handling(
mut self,
dictionary_handling: DictionaryHandling,
) -> Self {
self.dictionary_handling = dictionary_handling;
self
}

/// Specify application specific metadata included in the
/// [`FlightData::app_metadata`] field of the the first Schema
/// message
Expand Down Expand Up @@ -158,6 +173,7 @@ impl FlightDataEncoderBuilder {
app_metadata,
schema,
descriptor,
dictionary_handling,
} = self;

FlightDataEncoder::new(
Expand All @@ -167,6 +183,7 @@ impl FlightDataEncoderBuilder {
options,
app_metadata,
descriptor,
dictionary_handling,
)
}
}
Expand All @@ -192,6 +209,9 @@ pub struct FlightDataEncoder {
done: bool,
/// cleared after the first FlightData message is sent
descriptor: Option<FlightDescriptor>,
/// Deterimines how `DictionaryArray`s are encoded for transport.
/// See [`DictionaryHandling`] for more information.
dictionary_handling: DictionaryHandling,
}

impl FlightDataEncoder {
Expand All @@ -202,16 +222,21 @@ impl FlightDataEncoder {
options: IpcWriteOptions,
app_metadata: Bytes,
descriptor: Option<FlightDescriptor>,
dictionary_handling: DictionaryHandling,
) -> Self {
let mut encoder = Self {
inner,
schema: None,
max_flight_data_size,
encoder: FlightIpcEncoder::new(options),
encoder: FlightIpcEncoder::new(
options,
dictionary_handling != DictionaryHandling::Resend,
),
app_metadata: Some(app_metadata),
queue: VecDeque::new(),
done: false,
descriptor,
dictionary_handling,
};

// If schema is known up front, enqueue it immediately
Expand Down Expand Up @@ -242,7 +267,8 @@ impl FlightDataEncoder {
fn encode_schema(&mut self, schema: &SchemaRef) -> SchemaRef {
// The first message is the schema message, and all
// batches have the same schema
let schema = Arc::new(prepare_schema_for_flight(schema));
let send_dictionaries = self.dictionary_handling == DictionaryHandling::Resend;
let schema = Arc::new(prepare_schema_for_flight(schema, send_dictionaries));
let mut schema_flight_data = self.encoder.encode_schema(&schema);

// attach any metadata requested
Expand All @@ -264,7 +290,8 @@ impl FlightDataEncoder {
};

// encode the batch
let batch = prepare_batch_for_flight(&batch, schema)?;
let send_dictionaries = self.dictionary_handling == DictionaryHandling::Resend;
let batch = prepare_batch_for_flight(&batch, schema, send_dictionaries)?;

for batch in split_batch_for_grpc_response(batch, self.max_flight_data_size) {
let (flight_dictionaries, flight_batch) =
Expand Down Expand Up @@ -325,17 +352,46 @@ impl Stream for FlightDataEncoder {
}
}

/// Defines how a [`FlightDataEncoder`] encodes [`DictionaryArray`]s
///
/// [`DictionaryArray`]: arrow_array::DictionaryArray
#[derive(Debug, PartialEq)]
pub enum DictionaryHandling {
/// Expands to the underlying type (default). This likely sends more data
/// over the network but requires less memory (dictionaries are not tracked)
/// and is more compatible with other arrow flight client implementations
/// that may not support `DictionaryEncoding`
///
/// An IPC response, streaming or otherwise, defines its schema up front
/// which defines the mapping from dictionary IDs. It then sends these
/// dictionaries over the wire.
///
/// This requires identifying the different dictionaries in use, assigning
/// them IDs, and sending new dictionaries, delta or otherwise, when needed
///
/// See also:
/// * <https://github.com/apache/arrow-rs/issues/1206>
Hydrate,
/// Send dictionary FlightData with every RecordBatch that contains a
/// [`DictionaryArray`]. See [`Self::Hydrate`] for more tradeoffs. No
/// attempt is made to skip sending the same (logical) dictionary values
/// twice.
///
/// [`DictionaryArray`]: arrow_array::DictionaryArray
Resend,
}

/// Prepare an arrow Schema for transport over the Arrow Flight protocol
///
/// Convert dictionary types to underlying types
///
/// See hydrate_dictionary for more information
fn prepare_schema_for_flight(schema: &Schema) -> Schema {
fn prepare_schema_for_flight(schema: &Schema, send_dictionaries: bool) -> Schema {
let fields: Fields = schema
.fields()
.iter()
.map(|field| match field.data_type() {
DataType::Dictionary(_, value_type) => Field::new(
DataType::Dictionary(_, value_type) if !send_dictionaries => Field::new(
field.name(),
value_type.as_ref().clone(),
field.is_nullable(),
Expand Down Expand Up @@ -394,8 +450,7 @@ struct FlightIpcEncoder {
}

impl FlightIpcEncoder {
fn new(options: IpcWriteOptions) -> Self {
let error_on_replacement = true;
fn new(options: IpcWriteOptions, error_on_replacement: bool) -> Self {
Self {
options,
data_gen: IpcDataGenerator::default(),
Expand Down Expand Up @@ -438,48 +493,43 @@ impl FlightIpcEncoder {
fn prepare_batch_for_flight(
batch: &RecordBatch,
schema: SchemaRef,
send_dictionaries: bool,
) -> Result<RecordBatch> {
let columns = batch
.columns()
.iter()
.map(hydrate_dictionary)
.map(|c| hydrate_dictionary(c, send_dictionaries))
.collect::<Result<Vec<_>>>()?;

let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));

Ok(RecordBatch::try_new_with_options(
schema, columns, &options,
)?)
}

/// Hydrates a dictionary to its underlying type
///
/// An IPC response, streaming or otherwise, defines its schema up front
/// which defines the mapping from dictionary IDs. It then sends these
/// dictionaries over the wire.
///
/// This requires identifying the different dictionaries in use, assigning
/// them IDs, and sending new dictionaries, delta or otherwise, when needed
///
/// See also:
/// * <https://github.com/apache/arrow-rs/issues/1206>
///
/// For now we just hydrate the dictionaries to their underlying type
fn hydrate_dictionary(array: &ArrayRef) -> Result<ArrayRef> {
let arr = if let DataType::Dictionary(_, value) = array.data_type() {
arrow_cast::cast(array, value)?
} else {
Arc::clone(array)
/// Hydrates a dictionary to its underlying type if send_dictionaries is false. If send_dictionaries
/// is true, dictionaries are sent with every batch which is not as optimal as described in [DictionaryHandling::Hydrate] above,
/// but does enable sending DictionaryArray's via Flight.
fn hydrate_dictionary(array: &ArrayRef, send_dictionaries: bool) -> Result<ArrayRef> {
let arr = match array.data_type() {
DataType::Dictionary(_, value) if !send_dictionaries => {
arrow_cast::cast(array, value)?
}
_ => Arc::clone(array),
};
Ok(arr)
}

#[cfg(test)]
mod tests {
use arrow_array::types::*;
use arrow_array::*;
use arrow_array::{cast::downcast_array, types::*};
use arrow_cast::pretty::pretty_format_batches;
use std::collections::HashMap;

use crate::decode::{DecodedPayload, FlightDataDecoder};

use super::*;

#[test]
Expand All @@ -497,7 +547,7 @@ mod tests {

let big_batch = batch.slice(0, batch.num_rows() - 1);
let optimized_big_batch =
prepare_batch_for_flight(&big_batch, Arc::clone(&schema))
prepare_batch_for_flight(&big_batch, Arc::clone(&schema), false)
.expect("failed to optimize");
let (_, optimized_big_flight_batch) =
make_flight_data(&optimized_big_batch, &options);
Expand All @@ -509,7 +559,7 @@ mod tests {

let small_batch = batch.slice(0, 1);
let optimized_small_batch =
prepare_batch_for_flight(&small_batch, Arc::clone(&schema))
prepare_batch_for_flight(&small_batch, Arc::clone(&schema), false)
.expect("failed to optimize");
let (_, optimized_small_flight_batch) =
make_flight_data(&optimized_small_batch, &options);
Expand All @@ -520,14 +570,92 @@ mod tests {
);
}

#[tokio::test]
async fn test_dictionary_hydration() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let arr: DictionaryArray<UInt16Type> = vec!["a", "a", "b"].into_iter().collect();
let schema = Arc::new(Schema::new(vec![Field::new_dictionary(
"dict",
DataType::UInt16,
DataType::Utf8,
false,
)]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
let encoder = FlightDataEncoderBuilder::default()
.build(futures::stream::once(async { Ok(batch) }));
let mut decoder = FlightDataDecoder::new(encoder);
let expected_schema =
Schema::new(vec![Field::new("dict", DataType::Utf8, false)]);
let expected_schema = Arc::new(expected_schema);
while let Some(decoded) = decoder.next().await {
let decoded = decoded.unwrap();
match decoded.payload {
DecodedPayload::None => {}
DecodedPayload::Schema(s) => assert_eq!(s, expected_schema),
DecodedPayload::RecordBatch(b) => {
assert_eq!(b.schema(), expected_schema);
let expected_array = StringArray::from(vec!["a", "a", "b"]);
let actual_array = b.column_by_name("dict").unwrap();
let actual_array = downcast_array::<StringArray>(actual_array);

assert_eq!(actual_array, expected_array);
}
}
}
}

#[tokio::test]
async fn test_send_dictionaries() {
let schema = Arc::new(Schema::new(vec![Field::new_dictionary(
"dict",
DataType::UInt16,
DataType::Utf8,
false,
)]));

let arr_one: Arc<DictionaryArray<UInt16Type>> =
Arc::new(vec!["a", "a", "b"].into_iter().collect());
let arr_two: Arc<DictionaryArray<UInt16Type>> =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

Arc::new(vec!["b", "a", "c"].into_iter().collect());
let batch_one =
RecordBatch::try_new(schema.clone(), vec![arr_one.clone()]).unwrap();
let batch_two =
RecordBatch::try_new(schema.clone(), vec![arr_two.clone()]).unwrap();

let encoder = FlightDataEncoderBuilder::default()
.with_dictionary_handling(DictionaryHandling::Resend)
.build(futures::stream::iter(vec![Ok(batch_one), Ok(batch_two)]));

let mut decoder = FlightDataDecoder::new(encoder);
let mut expected_array = arr_one;
while let Some(decoded) = decoder.next().await {
let decoded = decoded.unwrap();
match decoded.payload {
DecodedPayload::None => {}
DecodedPayload::Schema(s) => assert_eq!(s, schema),
DecodedPayload::RecordBatch(b) => {
assert_eq!(b.schema(), schema);

let actual_array =
Arc::new(downcast_array::<DictionaryArray<UInt16Type>>(
b.column_by_name("dict").unwrap(),
));

assert_eq!(actual_array, expected_array);

expected_array = arr_two.clone();
}
}
}
}

#[test]
fn test_schema_metadata_encoded() {
let schema =
Schema::new(vec![Field::new("data", DataType::Int32, false)]).with_metadata(
HashMap::from([("some_key".to_owned(), "some_value".to_owned())]),
);

let got = prepare_schema_for_flight(&schema);
let got = prepare_schema_for_flight(&schema, false);
assert!(got.metadata().contains_key("some_key"));
}

Expand All @@ -540,7 +668,8 @@ mod tests {
)
.expect("cannot create record batch");

prepare_batch_for_flight(&batch, batch.schema()).expect("failed to optimize");
prepare_batch_for_flight(&batch, batch.schema(), false)
.expect("failed to optimize");
}

pub fn make_flight_data(
Expand Down
Loading