Skip to content

Commit

Permalink
Handle empty batch merges
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanrfrazier committed Oct 26, 2023
1 parent 3aa7f2b commit ca803d3
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 108 deletions.
29 changes: 29 additions & 0 deletions crates/sparrow-batch/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,35 @@ impl Batch {
})
}

/// Updates the up_to_time.
pub fn with_up_to_time(self, up_to_time: RowTime) -> Self {
Batch {
data: self.data,
up_to_time,
}
}

/// Replaces the batch's data with the given data.
///
/// Panics if the [BatchInfo] is empty.
pub fn with_data(self, data: ArrayRef) -> Self {
if let Some(info) = self.data {
Self {
data: Some(BatchInfo {
data,
time: info.time,
subsort: info.subsort,
key_hash: info.key_hash,
min_present_time: info.min_present_time,
max_present_time: info.max_present_time,
}),
up_to_time: self.up_to_time,
}
} else {
panic!("Cannot replace data of empty batch");
}
}

// NOTE: The current execution logic expects `RecordBatch` outputs (as well as some existing
// testing functionality that is compatible with the old non-partitioned execution path. In the
// future, we should standardize on `Batch` which makes it easier to carry a primitive value out.
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-main/src/serve/compute_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ese std::sync::Arc;
use std::sync::Arc;

use dashmap::DashMap;
use error_stack::{IntoReport, ResultExt};
Expand Down
17 changes: 10 additions & 7 deletions crates/sparrow-merge/src/gather.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use arrow_array::{Array, TimestampNanosecondArray};
use sparrow_batch::{Batch, RowTime};
use std::{collections::BinaryHeap, sync::Arc};
use std::collections::BinaryHeap;

use error_stack::ResultExt;

use crate::merge::Error;

/// Gathers batches from multiple inputs, keeping track of the
/// minimum `up_to_time` across all inputs, which acts as a
Expand Down Expand Up @@ -298,14 +301,14 @@ pub struct GatheredBatches {

impl GatheredBatches {
/// For each input, concats the gathered batches together.
pub fn concat(self) -> Vec<Batch> {
pub fn concat(self) -> error_stack::Result<Vec<Batch>, Error> {
self.batches
.iter()
.map(|batches| {
let time: &TimestampNanosecondArray = batches[batches.len() - 1].time().unwrap();
let up_to_time = time.value(time.len() - 1);
let batches = batches.into_iter().cloned().collect();
Batch::concat(batches, up_to_time.into()).unwrap()
let up_to_time = batches[batches.len() - 1].up_to_time;
let batches = batches.to_vec();
Batch::concat(batches, up_to_time)
.change_context(Error::Internal("failed to concat batches"))
})
.collect()
}
Expand Down
1 change: 0 additions & 1 deletion crates/sparrow-merge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@ mod spread;

pub(crate) use merge::*;
pub use merge_pipeline::MergePipeline;
pub(crate) use spread::*;
6 changes: 4 additions & 2 deletions crates/sparrow-merge/src/merge/binary_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
use std::cmp::Ordering;

use arrow_array::builder::{TimestampNanosecondBuilder, UInt64Builder};
use arrow_array::{ArrayRef, TimestampNanosecondArray, UInt64Array};
use arrow_array::{TimestampNanosecondArray, UInt64Array};
use arrow_schema::ArrowError;
use sparrow_batch::Batch;

#[cfg(test)]
use arrow_array::ArrayRef;

/// Struct describing one of the two inputs to the [binary_merge]
/// function.
Expand Down
243 changes: 161 additions & 82 deletions crates/sparrow-merge/src/merge/heterogeneous_merge.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::sync::Arc;

use arrow_array::ArrayRef;
use arrow_array::{new_null_array, ArrayRef, StructArray};
use arrow_schema::{DataType, FieldRef};
use binary_merge::BinaryMergeInput;
use error_stack::{IntoReport, IntoReportCompat, ResultExt};
use sparrow_batch::Batch;
use sparrow_batch::{Batch, RowTime};
use sparrow_instructions::GroupingIndices;

use crate::{gather::Gatherer, spread::Spread};
Expand All @@ -14,9 +14,10 @@ use super::binary_merge;
/// Manages the merge of two heterogeneous inputs.
pub struct HeterogeneousMerge {
result_type: DataType,
left_data_type: DataType,
right_data_type: DataType,
spread_left: Spread,
spread_right: Spread,

/// Gathers batches from both sides and produces [GatheredBatches]
/// up to a valid watermark.
gatherer: Gatherer,
Expand All @@ -39,6 +40,8 @@ impl HeterogeneousMerge {
pub fn new(result_type: &DataType, datatype_l: &DataType, datatype_r: &DataType) -> Self {
Self {
result_type: result_type.clone(),
left_data_type: datatype_l.clone(),
right_data_type: datatype_r.clone(),
spread_left: Spread::try_new(false, datatype_l).expect("spread"),
spread_right: Spread::try_new(false, datatype_r).expect("spread"),
gatherer: Gatherer::new(2),
Expand Down Expand Up @@ -69,91 +72,168 @@ impl HeterogeneousMerge {
pub fn merge(&mut self) -> error_stack::Result<Batch, Error> {
let gathered_batches = self.gatherer.next_batch();
if let Some(gathered_batches) = gathered_batches {
let concat_batches = gathered_batches.concat();
let left: &Batch = &concat_batches[0];
let right: &Batch = &concat_batches[1];

// TODO: Assumes batch data is non-empty.
let left_merge_input = BinaryMergeInput::new(
left.time().expect("time"),
left.subsort().expect("subsort"),
left.key_hash().expect("key_hash"),
);
let right_merge_input = BinaryMergeInput::new(
right.time().expect("time"),
right.subsort().expect("subsort"),
right.key_hash().expect("key_hash"),
);
let merged_result = crate::binary_merge(left_merge_input, right_merge_input)
.into_report()
.change_context(Error::Internal("TODO"))?;

let left_spread_bits = arrow::compute::is_not_null(&merged_result.take_a)
.into_report()
.change_context(Error::Internal("TODO"))?;
let right_spread_bits = arrow::compute::is_not_null(&merged_result.take_b)
.into_report()
.change_context(Error::Internal("TODO"))?;
let mut concat_batches = gathered_batches.concat()?;
let right: Batch = concat_batches.remove(1);
let left: Batch = concat_batches.remove(0);

let result = match (left.time(), right.time()) {
(None, None) => self.handle_empty_merge(left.up_to_time, right.up_to_time)?,
(Some(_), None) => self.handle_single_merge(0, left, right.up_to_time)?,
(None, Some(_)) => self.handle_single_merge(1, right, left.up_to_time)?,
(Some(_), Some(_)) => self.handle_merge(left, right)?,
};
Ok(result)
} else {
error_stack::bail!(Error::Internal("expected batch -- "))
}
}

let merged_time = Arc::new(merged_result.time);
let merged_subsort = Arc::new(merged_result.subsort);
let merged_key_hash = Arc::new(merged_result.key_hash);
fn handle_empty_merge(
&mut self,
left: RowTime,
right: RowTime,
) -> error_stack::Result<Batch, Error> {
let up_to_time = left.max(right);
Ok(Batch::new_empty(up_to_time))
}

// TODO: Grouping
let grouping = GroupingIndices::new_empty();
// TODO: Handle empty batches
let spread_left = self
/// Handles merging where one batch is empty.
///
/// The `input` indicates from which input the `batch` is from.
fn handle_single_merge(
&mut self,
input: usize,
batch: Batch,
other_up_to: RowTime,
) -> error_stack::Result<Batch, Error> {
let up_to_time = batch.up_to_time.max(other_up_to);

// TODO: Grouping
let grouping = GroupingIndices::new_empty();

// SAFETY: Verified batch is non-empty
if input == 0 {
let left_data = self
.spread_left
.spread_signaled(&grouping, left.data().expect("data"), &left_spread_bits)
.spread_true(&grouping, batch.data().expect("data"))
.into_report()
.change_context(Error::Internal("TODO"))?;
let spread_right = self
let num_rows = left_data.len();
assert_eq!(num_rows, batch.time().expect("time").len());

let right_data = new_null_array(&self.right_data_type, num_rows);
let merged_data = self.create_merged_data(num_rows, left_data, right_data)?;

let batch = batch.with_data(merged_data);
let batch = batch.with_up_to_time(up_to_time);
Ok(batch)
} else if input == 1 {
let right_data = self
.spread_right
.spread_signaled(&grouping, right.data().expect("data"), &right_spread_bits)
.spread_true(&grouping, batch.data().expect("data"))
.into_report()
.change_context(Error::Internal("TODO"))?;
let num_rows = right_data.len();
assert_eq!(num_rows, batch.time().expect("time").len());

// println!("Spread left: {:?}", spread_left);
// println!("Spread right: {:?}", spread_right);
assert_eq!(spread_left.len(), spread_right.len());
let num_rows = spread_left.len();

// The result type of the merge does not flatten the structs.
//
// e.g. have X.a + Y.a then we have left: { a: i64 } and right: { a: i64 }.
// If we flatten that to {a: i64} we can no longer perform the arithmetic.
// Instead, we want {left: {a: i64}, right: {a: i64 }} so we can do
// merged.left.a + merged.right.
let fields: Vec<(FieldRef, ArrayRef)> = match &self.result_type {
DataType::Struct(fields) => {
// The result type should always have two fields -- the left and the right.
assert_eq!(fields.len(), 2);
vec![
(fields[0].clone(), spread_left),
(fields[1].clone(), spread_right),
]
}
other => {
tracing::error!("expected struct, got {:?}", other);
error_stack::bail!(Error::Internal("merge result type should be a struct"))
}
};
let merged_data = Arc::new(sparrow_arrow::utils::make_struct_array(num_rows, fields));
// Since we're merging batches, the up_to_time is the last time in the merged batch.
let up_to_time = merged_time.value(merged_time.len() - 1);

Ok(Batch::new_with_data(
merged_data,
merged_time,
merged_subsort,
merged_key_hash,
up_to_time.into(),
))
let left_data = new_null_array(&self.left_data_type, num_rows);
let merged_data = self.create_merged_data(num_rows, left_data, right_data)?;

let batch = batch.with_data(merged_data);
let batch = batch.with_up_to_time(up_to_time);
Ok(batch)
} else {
error_stack::bail!(Error::Internal("expected batch -- "))
panic!("unexpected input: {}", input);
}
}

fn handle_merge(&mut self, left: Batch, right: Batch) -> error_stack::Result<Batch, Error> {
let up_to_time = left.up_to_time.max(right.up_to_time);

// SAFETY: Verified non-empty arrays
let left_merge_input = BinaryMergeInput::new(
left.time().expect("time"),
left.subsort().expect("subsort"),
left.key_hash().expect("key_hash"),
);
let right_merge_input = BinaryMergeInput::new(
right.time().expect("time"),
right.subsort().expect("subsort"),
right.key_hash().expect("key_hash"),
);
let merged_result = crate::binary_merge(left_merge_input, right_merge_input)
.into_report()
.change_context(Error::Internal("TODO"))?;

let left_spread_bits = arrow::compute::is_not_null(&merged_result.take_a)
.into_report()
.change_context(Error::Internal("TODO"))?;
let right_spread_bits = arrow::compute::is_not_null(&merged_result.take_b)
.into_report()
.change_context(Error::Internal("TODO"))?;

let merged_time = Arc::new(merged_result.time);
let merged_subsort = Arc::new(merged_result.subsort);
let merged_key_hash = Arc::new(merged_result.key_hash);

// TODO: Grouping
let grouping = GroupingIndices::new_empty();
let spread_left = self
.spread_left
.spread_signaled(&grouping, left.data().expect("data"), &left_spread_bits)
.into_report()
.change_context(Error::Internal("TODO"))?;
let spread_right = self
.spread_right
.spread_signaled(&grouping, right.data().expect("data"), &right_spread_bits)
.into_report()
.change_context(Error::Internal("TODO"))?;

assert_eq!(spread_left.len(), spread_right.len());
let num_rows = spread_left.len();

let merged_data = self.create_merged_data(num_rows, spread_left, spread_right)?;

Ok(Batch::new_with_data(
merged_data,
merged_time,
merged_subsort,
merged_key_hash,
up_to_time,
))
}

fn create_merged_data(
&self,
num_rows: usize,
left_data: ArrayRef,
right_data: ArrayRef,
) -> error_stack::Result<Arc<StructArray>, Error> {
// The result type of the merge does not flatten the structs.
//
// e.g. have X.a + Y.a then we have left: { a: i64 } and right: { a: i64 }.
// If we flatten that to {a: i64} we can no longer perform the arithmetic.
// Instead, we want {left: {a: i64}, right: {a: i64 }} so we can do
// merged.left.a + merged.right.
let fields: Vec<(FieldRef, ArrayRef)> = match &self.result_type {
DataType::Struct(fields) => {
// The result type should always have two fields -- the left and the right.
assert_eq!(fields.len(), 2);
vec![
(fields[0].clone(), left_data),
(fields[1].clone(), right_data),
]
}
other => {
tracing::error!("expected struct, got {:?}", other);
error_stack::bail!(Error::Internal("merge result type should be a struct"))
}
};
Ok(Arc::new(sparrow_arrow::utils::make_struct_array(
num_rows, fields,
)))
}

/// Closes the given side of the input.
///
/// This allows the remaining side to progress unbounded.
Expand All @@ -163,16 +243,13 @@ impl HeterogeneousMerge {
}
#[cfg(test)]
mod tests {

use super::*;
use arrow_array::{
types::{ArrowPrimitiveType, TimestampNanosecondType, UInt32Type, UInt64Type},
Int64Array, RecordBatch, StructArray, TimestampNanosecondArray, UInt32Array, UInt64Array,
UInt8Array,
types::{ArrowPrimitiveType, TimestampNanosecondType, UInt64Type},
Int64Array, StructArray, TimestampNanosecondArray, UInt32Array, UInt64Array,
};
use arrow_schema::{Field, Fields};

use super::*;

fn minimal_struct_type() -> DataType {
DataType::Struct(Fields::from(vec![
Field::new("time", TimestampNanosecondType::DATA_TYPE, true),
Expand Down Expand Up @@ -354,5 +431,7 @@ mod tests {
}

#[test]
fn test_merge_empty_batches() {}
fn test_merge_empty_batches() {
// merge in empty batches with high up_to_times
}
}
Loading

0 comments on commit ca803d3

Please sign in to comment.