Skip to content

Commit

Permalink
handle case where gathered batches are empty
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanrfrazier committed Oct 26, 2023
1 parent ca803d3 commit b55c04f
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 44 deletions.
21 changes: 14 additions & 7 deletions crates/sparrow-merge/src/gather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ pub struct GatheredBatches {
/// This may be multiple batches for each input stream -- for instance, the
/// slice of a previous batch that wasn't emitted or a few batches that were
/// collected while waiting for a different input stream to receive a batch
/// and "unblock" gathering up to a certain tiime.
/// and "unblock" gathering up to a certain time.
///
/// We don't concatenate the input batches because that would lead to
/// allocating and copying data into the concatenated chunk. Instead, we
Expand All @@ -301,14 +301,21 @@ pub struct GatheredBatches {

impl GatheredBatches {
/// For each input, concats the gathered batches together.
pub fn concat(self) -> error_stack::Result<Vec<Batch>, Error> {
///
/// If the inner vec is empty, the result will be None.
pub fn concat(self) -> error_stack::Result<Vec<Option<Batch>>, Error> {
self.batches
.iter()
.map(|batches| {
let up_to_time = batches[batches.len() - 1].up_to_time;
let batches = batches.to_vec();
Batch::concat(batches, up_to_time)
.change_context(Error::Internal("failed to concat batches"))
.map(|batches| -> error_stack::Result<Option<Batch>, Error> {
if batches.len() > 0 {
let up_to_time = batches[batches.len() - 1].up_to_time;
let batches = batches.to_vec();
Ok(Some(Batch::concat(batches, up_to_time).change_context(
Error::Internal("failed to concat batches"),
)?))
} else {
Ok(None)
}
})
.collect()
}
Expand Down
143 changes: 109 additions & 34 deletions crates/sparrow-merge/src/merge/heterogeneous_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,34 @@ impl HeterogeneousMerge {
let gathered_batches = self.gatherer.next_batch();
if let Some(gathered_batches) = gathered_batches {
let mut concat_batches = gathered_batches.concat()?;
let right: Batch = concat_batches.remove(1);
let left: Batch = concat_batches.remove(0);

let result = match (left.time(), right.time()) {
(None, None) => self.handle_empty_merge(left.up_to_time, right.up_to_time)?,
(Some(_), None) => self.handle_single_merge(0, left, right.up_to_time)?,
(None, Some(_)) => self.handle_single_merge(1, right, left.up_to_time)?,
(Some(_), Some(_)) => self.handle_merge(left, right)?,
};
Ok(result)
let right: Option<Batch> = concat_batches.remove(1);
let left: Option<Batch> = concat_batches.remove(0);

// There are two layers here:
// 1) A batch does not exist, which occurs when the gatherer does not have any batches
// for a particular side. This is expected, and we can just return the other side.
// 2) A batch exists, but is empty. It still has an up_to_time, which needs to be accounted for.
match (left, right) {
(Some(left), Some(right)) => match (left.time(), right.time()) {
(None, None) => self.handle_empty_merge(left.up_to_time, right.up_to_time),
(Some(_), None) => self.handle_single_merge(0, left, right.up_to_time),
(None, Some(_)) => self.handle_single_merge(1, right, left.up_to_time),
(Some(_), Some(_)) => self.handle_merge(left, right),
},
(Some(left), None) => {
let up_to_time = left.up_to_time;
self.handle_single_merge(0, left, up_to_time)
}
(None, Some(right)) => {
let up_to_time = right.up_to_time;
self.handle_single_merge(1, right, up_to_time)
}
(None, None) => {
// This is in unexpected state -- if we call merge, we should be getting at least
// one batch, even if it's empty.
error_stack::bail!(Error::Internal("at least one batch"))
}
}
} else {
error_stack::bail!(Error::Internal("expected batch -- "))
}
Expand Down Expand Up @@ -241,6 +259,7 @@ impl HeterogeneousMerge {
self.gatherer.close(index);
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -330,6 +349,77 @@ mod tests {
assert_eq!(merge.blocking_input(), Some(0));
}

#[test]
fn test_merge_non_structs() {
let left = DataType::Int64;
let right = DataType::UInt32;
let mut merge = merge(left.clone(), right.clone());

let left_time = TimestampNanosecondArray::from(vec![0, 1, 2]);
let left_subsort = UInt64Array::from(vec![0, 1, 2]);
let left_key = UInt64Array::from(vec![0, 0, 0]);
let left_data = Int64Array::from(vec![10, 4, 22]);
let left_batch = Batch::new_with_data(
Arc::new(left_data),
Arc::new(left_time),
Arc::new(left_subsort),
Arc::new(left_key),
2.into(),
);

let right_time = TimestampNanosecondArray::from(vec![0, 1, 4]);
let right_subsort = UInt64Array::from(vec![0, 1, 2]);
let right_key = UInt64Array::from(vec![0, 1, 0]);
let right_data = UInt32Array::from(vec![100, 101, 102]);
let right_batch = Batch::new_with_data(
Arc::new(right_data),
Arc::new(right_time),
Arc::new(right_subsort),
Arc::new(right_key),
4.into(),
);

assert_eq!(merge.blocking_input(), Some(1));
let can_produce = merge.add_batch(1, right_batch);
assert!(!can_produce);

let can_produce = merge.add_batch(0, left_batch);
assert!(can_produce);

let merged = merge.merge().unwrap();
let expected_fields: Vec<(FieldRef, ArrayRef)> = vec![
(
Arc::new(Field::new("step_0", left.clone(), true)),
Arc::new(Int64Array::from(vec![Some(10), Some(4), None])),
),
(
Arc::new(Field::new("step_1", right.clone(), true)),
Arc::new(UInt32Array::from(vec![Some(100), None, Some(101)])),
),
];
let expected_data = Arc::new(sparrow_arrow::utils::make_struct_array(3, expected_fields));
let expected_time = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 1]));
let expected_subsort = Arc::new(UInt64Array::from(vec![0, 1, 1]));
let expected_key = Arc::new(UInt64Array::from(vec![0, 0, 1]));

let expected = Batch::new_with_data(
expected_data,
expected_time,
expected_subsort,
expected_key,
1.into(),
);

assert_eq!(merged.time(), expected.time());
assert_eq!(merged.key_hash(), expected.key_hash());
assert_eq!(merged.subsort(), expected.subsort());
assert_eq!(merge.blocking_input(), Some(0));

// TODO: struct equality failing?
assert_eq!(merged, expected);
assert_eq!(merge.blocking_input(), Some(0));
}

#[test]
#[should_panic]
fn test_fails_if_not_working_on_active_input() {
Expand Down Expand Up @@ -360,7 +450,7 @@ mod tests {
}

#[test]
fn test_merge_non_structs() {
fn test_merge_empty_batches() {
let left = DataType::Int64;
let right = DataType::UInt32;
let mut merge = merge(left.clone(), right.clone());
Expand All @@ -377,17 +467,7 @@ mod tests {
2.into(),
);

let right_time = TimestampNanosecondArray::from(vec![0, 1, 4]);
let right_subsort = UInt64Array::from(vec![0, 1, 2]);
let right_key = UInt64Array::from(vec![0, 1, 0]);
let right_data = UInt32Array::from(vec![100, 101, 102]);
let right_batch = Batch::new_with_data(
Arc::new(right_data),
Arc::new(right_time),
Arc::new(right_subsort),
Arc::new(right_key),
4.into(),
);
let right_batch = Batch::new_empty(4.into());

assert_eq!(merge.blocking_input(), Some(1));
let can_produce = merge.add_batch(1, right_batch);
Expand All @@ -400,24 +480,24 @@ mod tests {
let expected_fields: Vec<(FieldRef, ArrayRef)> = vec![
(
Arc::new(Field::new("step_0", left.clone(), true)),
Arc::new(Int64Array::from(vec![Some(10), Some(4), None])),
Arc::new(Int64Array::from(vec![Some(10), Some(4)])),
),
(
Arc::new(Field::new("step_1", right.clone(), true)),
Arc::new(UInt32Array::from(vec![Some(100), None, Some(101)])),
Arc::new(UInt32Array::from(vec![None, None])),
),
];
let expected_data = Arc::new(sparrow_arrow::utils::make_struct_array(3, expected_fields));
let expected_time = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 1]));
let expected_subsort = Arc::new(UInt64Array::from(vec![0, 1, 1]));
let expected_key = Arc::new(UInt64Array::from(vec![0, 0, 1]));
let expected_data = Arc::new(sparrow_arrow::utils::make_struct_array(2, expected_fields));
let expected_time = Arc::new(TimestampNanosecondArray::from(vec![0, 1]));
let expected_subsort = Arc::new(UInt64Array::from(vec![0, 1]));
let expected_key = Arc::new(UInt64Array::from(vec![0, 0]));

let expected = Batch::new_with_data(
expected_data,
expected_time,
expected_subsort,
expected_key,
2.into(),
1.into(),
);

assert_eq!(merged.time(), expected.time());
Expand All @@ -429,9 +509,4 @@ mod tests {
assert_eq!(merged, expected);
assert_eq!(merge.blocking_input(), Some(0));
}

#[test]
fn test_merge_empty_batches() {
// merge in empty batches with high up_to_times
}
}
9 changes: 6 additions & 3 deletions crates/sparrow-merge/src/spread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ impl SpreadImpl for UnlatchedSpread {
values: &ArrayRef,
signal: &BooleanArray,
) -> anyhow::Result<ArrayRef> {
// TODO: Grouping unsupported, but shouldn't affect results
let mut indices = Int32Builder::with_capacity(grouping.len());
let mut next_index = 0;
for signal in signal.iter() {
Expand All @@ -247,10 +248,11 @@ impl SpreadImpl for UnlatchedSpread {

fn spread_true(
&mut self,
grouping: &GroupingIndices,
_grouping: &GroupingIndices,
values: &ArrayRef,
) -> anyhow::Result<ArrayRef> {
anyhow::ensure!(grouping.len() == values.len());
// TODO: Grouping unsupported:
// anyhow::ensure!(grouping.len() == values.len());
Ok(values.clone())
}

Expand All @@ -259,7 +261,8 @@ impl SpreadImpl for UnlatchedSpread {
grouping: &GroupingIndices,
value_type: &DataType,
) -> anyhow::Result<ArrayRef> {
Ok(new_null_array(value_type, grouping.len()))
todo!("unimplemented - requires grouping")
// Ok(new_null_array(value_type, grouping.len()))
}
}

Expand Down

0 comments on commit b55c04f

Please sign in to comment.