Skip to content

Commit

Permalink
Don't return Option in ArrayBatchDecoder::next_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey committed Mar 5, 2024
1 parent 74a864f commit 1489ebf
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 77 deletions.
12 changes: 4 additions & 8 deletions src/arrow_reader/decoder/list.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use arrow::array::{Array, ArrayRef, ListArray};
use arrow::array::{ArrayRef, ListArray};
use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::datatypes::{Field, FieldRef};
use snafu::ResultExt;
Expand Down Expand Up @@ -51,7 +51,7 @@ impl ArrayBatchDecoder for ListArrayDecoder {
&mut self,
batch_size: usize,
parent_present: Option<&[bool]>,
) -> Result<Option<ArrayRef>> {
) -> Result<ArrayRef> {
let present = self.present.by_ref().take(batch_size);
let present = if let Some(parent_present) = parent_present {
debug_assert_eq!(
Expand All @@ -77,7 +77,7 @@ impl ArrayBatchDecoder for ListArrayDecoder {
);
let total_length: u64 = lengths.iter().sum();
// Fetch child array as one Array with total_length elements
let child_array = self.inner.next_batch(total_length as usize, None)?.unwrap();
let child_array = self.inner.next_batch(total_length as usize, None)?;
// Fix the lengths to account for nulls (represented as 0 length)
let mut lengths_with_nulls = Vec::with_capacity(batch_size);
let mut lengths = lengths.iter();
Expand All @@ -95,10 +95,6 @@ impl ArrayBatchDecoder for ListArrayDecoder {
let array = ListArray::try_new(self.field.clone(), offsets, child_array, Some(null_buffer))
.context(ArrowSnafu)?;
let array = Arc::new(array);
if array.is_empty() {
Ok(None)
} else {
Ok(Some(array))
}
Ok(array)
}
}
17 changes: 5 additions & 12 deletions src/arrow_reader/decoder/map.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use arrow::array::{Array, ArrayRef, MapArray, StructArray};
use arrow::array::{ArrayRef, MapArray, StructArray};
use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::datatypes::{Field, Fields};
use snafu::ResultExt;
Expand Down Expand Up @@ -64,7 +64,7 @@ impl ArrayBatchDecoder for MapArrayDecoder {
&mut self,
batch_size: usize,
parent_present: Option<&[bool]>,
) -> Result<Option<ArrayRef>> {
) -> Result<ArrayRef> {
let present = self.present.by_ref().take(batch_size);
let present = if let Some(parent_present) = parent_present {
debug_assert_eq!(
Expand All @@ -91,11 +91,8 @@ impl ArrayBatchDecoder for MapArrayDecoder {
let total_length: u64 = lengths.iter().sum();
// Fetch key and value arrays, each with total_length elements
// Fetch child array as one Array with total_length elements
let keys_array = self.keys.next_batch(total_length as usize, None)?.unwrap();
let values_array = self
.values
.next_batch(total_length as usize, None)?
.unwrap();
let keys_array = self.keys.next_batch(total_length as usize, None)?;
let values_array = self.values.next_batch(total_length as usize, None)?;
// Compose the keys + values array into a StructArray with two entries
let entries =
StructArray::try_new(self.fields.clone(), vec![keys_array, values_array], None)
Expand All @@ -118,10 +115,6 @@ impl ArrayBatchDecoder for MapArrayDecoder {
let array = MapArray::try_new(field, offsets, entries, Some(null_buffer), false)
.context(ArrowSnafu)?;
let array = Arc::new(array);
if array.is_empty() {
Ok(None)
} else {
Ok(Some(array))
}
Ok(array)
}
}
34 changes: 12 additions & 22 deletions src/arrow_reader/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<T: ArrowPrimitiveType> PrimitiveArrayDecoder<T> {
&mut self,
batch_size: usize,
parent_present: Option<&[bool]>,
) -> Result<Option<PrimitiveArray<T>>> {
) -> Result<PrimitiveArray<T>> {
let mut builder = PrimitiveBuilder::<T>::with_capacity(batch_size);

let mut iter = self.inner.by_ref().take(batch_size);
Expand Down Expand Up @@ -75,11 +75,7 @@ impl<T: ArrowPrimitiveType> PrimitiveArrayDecoder<T> {
};

let array = builder.finish();
if array.is_empty() {
Ok(None)
} else {
Ok(Some(array))
}
Ok(array)
}
}

Expand All @@ -88,9 +84,9 @@ impl<T: ArrowPrimitiveType> ArrayBatchDecoder for PrimitiveArrayDecoder<T> {
&mut self,
batch_size: usize,
parent_present: Option<&[bool]>,
) -> Result<Option<ArrayRef>> {
) -> Result<ArrayRef> {
let array = self.next_primitive_batch(batch_size, parent_present)?;
let array = array.map(|a| Arc::new(a) as ArrayRef);
let array = Arc::new(array) as ArrayRef;
Ok(array)
}
}
Expand Down Expand Up @@ -120,7 +116,7 @@ impl ArrayBatchDecoder for BooleanArrayDecoder {
&mut self,
batch_size: usize,
parent_present: Option<&[bool]>,
) -> Result<Option<ArrayRef>> {
) -> Result<ArrayRef> {
let mut builder = BooleanBuilder::with_capacity(batch_size);

let mut iter = self.inner.by_ref().take(batch_size);
Expand Down Expand Up @@ -151,11 +147,7 @@ impl ArrayBatchDecoder for BooleanArrayDecoder {
};

let array = Arc::new(builder.finish());
if array.is_empty() {
Ok(None)
} else {
Ok(Some(array))
}
Ok(array)
}
}

Expand Down Expand Up @@ -206,19 +198,15 @@ pub trait ArrayBatchDecoder: Send {
/// Used as base for decoding ORC columns into Arrow arrays. Provide an input `batch_size`
/// which specifies the upper limit of the number of values returned in the output array.
///
/// Will return `None` if no more elements to decode (aka return 0 values).
///
/// If parent nested type (e.g. Struct) indicates a null in it's PRESENT stream,
/// then the child doesn't have a value (similar to other nullability). So we need
/// to take care to insert these null values as Arrow requires the child to hold
/// data in the null slot of the child.
// TODO: reconsider returning Option - array already encodes emptiness, this causes
// more boilerplate?
fn next_batch(
&mut self,
batch_size: usize,
parent_present: Option<&[bool]>,
) -> Result<Option<ArrayRef>>;
) -> Result<ArrayRef>;
}

pub fn array_decoder_factory(
Expand Down Expand Up @@ -284,9 +272,11 @@ impl NaiveStripeDecoder {
let mut fields = Vec::with_capacity(self.stripe.columns.len());

for decoder in &mut self.decoders {
match decoder.next_batch(chunk, None)? {
Some(array) => fields.push(array),
None => break,
let array = decoder.next_batch(chunk, None)?;
if array.is_empty() {
break;
} else {
fields.push(array);
}
}

Expand Down
28 changes: 9 additions & 19 deletions src/arrow_reader/decoder/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io::Read;
use std::marker::PhantomData;
use std::sync::Arc;

use arrow::array::{Array, ArrayRef, DictionaryArray, GenericByteArray, StringArray};
use arrow::array::{ArrayRef, DictionaryArray, GenericByteArray, StringArray};
use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer};
use arrow::datatypes::{ByteArrayType, GenericBinaryType, GenericStringType};
use snafu::ResultExt;
Expand Down Expand Up @@ -62,8 +62,7 @@ pub fn new_string_decoder(column: &Column, stripe: &Stripe) -> Result<Box<dyn Ar
debug_assert!(dictionary_size > 0, "dictionary cannot be empty");
// We assume here we have fetched all the dictionary strings (according to size above)
let dictionary_strings = DirectStringArrayDecoder::new(bytes, lengths, None)
.next_byte_batch(dictionary_size, None)?
.unwrap();
.next_byte_batch(dictionary_size, None)?;
let dictionary_strings = Arc::new(dictionary_strings);

let indexes = stripe.stream_map.get(column, Kind::Data)?;
Expand Down Expand Up @@ -111,7 +110,7 @@ impl<T: ByteArrayType> GenericByteArrayDecoder<T> {
&mut self,
batch_size: usize,
parent_present: Option<&[bool]>,
) -> Result<Option<GenericByteArray<T>>> {
) -> Result<GenericByteArray<T>> {
let present = match (&mut self.present, parent_present) {
(Some(present), Some(parent_present)) => {
let present = present.by_ref().take(batch_size);
Expand Down Expand Up @@ -172,11 +171,7 @@ impl<T: ByteArrayType> GenericByteArrayDecoder<T> {

let array =
GenericByteArray::<T>::try_new(offsets, bytes, null_buffer).context(ArrowSnafu)?;
if array.is_empty() {
Ok(None)
} else {
Ok(Some(array))
}
Ok(array)
}
}

Expand All @@ -185,9 +180,9 @@ impl<T: ByteArrayType> ArrayBatchDecoder for GenericByteArrayDecoder<T> {
&mut self,
batch_size: usize,
parent_present: Option<&[bool]>,
) -> Result<Option<ArrayRef>> {
) -> Result<ArrayRef> {
let array = self.next_byte_batch(batch_size, parent_present)?;
let array = array.map(|a| Arc::new(a) as ArrayRef);
let array = Arc::new(array) as ArrayRef;
Ok(array)
}
}
Expand All @@ -211,18 +206,13 @@ impl ArrayBatchDecoder for DictionaryStringArrayDecoder {
&mut self,
batch_size: usize,
parent_present: Option<&[bool]>,
) -> Result<Option<ArrayRef>> {
) -> Result<ArrayRef> {
let keys = self
.indexes
.next_primitive_batch(batch_size, parent_present)?
.unwrap();
.next_primitive_batch(batch_size, parent_present)?;
let array = DictionaryArray::try_new(keys, self.dictionary.clone()).context(ArrowSnafu)?;

let array = Arc::new(array);
if array.is_empty() {
Ok(None)
} else {
Ok(Some(array))
}
Ok(array)
}
}
22 changes: 6 additions & 16 deletions src/arrow_reader/decoder/struct_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl ArrayBatchDecoder for StructArrayDecoder {
&mut self,
batch_size: usize,
parent_present: Option<&[bool]>,
) -> Result<Option<ArrayRef>> {
) -> Result<ArrayRef> {
let present = self.present.by_ref().take(batch_size);
let present = if let Some(parent_present) = parent_present {
debug_assert_eq!(
Expand All @@ -73,20 +73,10 @@ impl ArrayBatchDecoder for StructArrayDecoder {
.map(|child| child.next_batch(batch_size, Some(&present)))
.collect::<Result<Vec<_>>>()?;

// TODO: more strict, this should either be all Some or all None, not in-between
// throw error if in-between case
if child_arrays.contains(&None) {
Ok(None)
} else {
let child_arrays = child_arrays
.into_iter()
.map(|opt| opt.unwrap())
.collect::<Vec<_>>();
let null_buffer = NullBuffer::from(present);
let array = StructArray::try_new(self.fields.clone(), child_arrays, Some(null_buffer))
.context(ArrowSnafu)?;
let array = Arc::new(array);
Ok(Some(array))
}
let null_buffer = NullBuffer::from(present);
let array = StructArray::try_new(self.fields.clone(), child_arrays, Some(null_buffer))
.context(ArrowSnafu)?;
let array = Arc::new(array);
Ok(array)
}
}

0 comments on commit 1489ebf

Please sign in to comment.