diff --git a/crates/sparrow-batch/src/batch.rs b/crates/sparrow-batch/src/batch.rs index 38591b49f..e5e6a2b3b 100644 --- a/crates/sparrow-batch/src/batch.rs +++ b/crates/sparrow-batch/src/batch.rs @@ -78,6 +78,35 @@ impl Batch { }) } + /// Updates the up_to_time. + pub fn with_up_to_time(self, up_to_time: RowTime) -> Self { + Batch { + data: self.data, + up_to_time, + } + } + + /// Replaces the batch's data with the given data. + /// + /// Panics if the [BatchInfo] is empty. + pub fn with_data(self, data: ArrayRef) -> Self { + if let Some(info) = self.data { + Self { + data: Some(BatchInfo { + data, + time: info.time, + subsort: info.subsort, + key_hash: info.key_hash, + min_present_time: info.min_present_time, + max_present_time: info.max_present_time, + }), + up_to_time: self.up_to_time, + } + } else { + panic!("Cannot replace data of empty batch"); + } + } + // NOTE: The current execution logic expects `RecordBatch` outputs (as well as some existing // testing functionality that is compatible with the old non-partitioned execution path. In the // future, we should standardize on `Batch` which makes it easier to carry a primitive value out. diff --git a/crates/sparrow-main/src/serve/compute_service.rs b/crates/sparrow-main/src/serve/compute_service.rs index cf0224fd3..2b56605fc 100644 --- a/crates/sparrow-main/src/serve/compute_service.rs +++ b/crates/sparrow-main/src/serve/compute_service.rs @@ -1,4 +1,4 @@ -ese std::sync::Arc; +use std::sync::Arc; use dashmap::DashMap; use error_stack::{IntoReport, ResultExt}; diff --git a/crates/sparrow-merge/src/gather.rs b/crates/sparrow-merge/src/gather.rs index b87852369..92c2c4213 100644 --- a/crates/sparrow-merge/src/gather.rs +++ b/crates/sparrow-merge/src/gather.rs @@ -1,6 +1,9 @@ -use arrow_array::{Array, TimestampNanosecondArray}; use sparrow_batch::{Batch, RowTime}; -use std::{collections::BinaryHeap, sync::Arc}; +use std::collections::BinaryHeap; + +use error_stack::ResultExt; + +use crate::merge::Error; /// Gathers batches from multiple inputs, keeping track of the /// minimum `up_to_time` across all inputs, which acts as a @@ -298,14 +301,14 @@ pub struct GatheredBatches { impl GatheredBatches { /// For each input, concats the gathered batches together. - pub fn concat(self) -> Vec { + pub fn concat(self) -> error_stack::Result, Error> { self.batches .iter() .map(|batches| { - let time: &TimestampNanosecondArray = batches[batches.len() - 1].time().unwrap(); - let up_to_time = time.value(time.len() - 1); - let batches = batches.into_iter().cloned().collect(); - Batch::concat(batches, up_to_time.into()).unwrap() + let up_to_time = batches[batches.len() - 1].up_to_time; + let batches = batches.to_vec(); + Batch::concat(batches, up_to_time) + .change_context(Error::Internal("failed to concat batches")) }) .collect() } diff --git a/crates/sparrow-merge/src/lib.rs b/crates/sparrow-merge/src/lib.rs index 664820074..f803201d3 100644 --- a/crates/sparrow-merge/src/lib.rs +++ b/crates/sparrow-merge/src/lib.rs @@ -17,4 +17,3 @@ mod spread; pub(crate) use merge::*; pub use merge_pipeline::MergePipeline; -pub(crate) use spread::*; diff --git a/crates/sparrow-merge/src/merge/binary_merge.rs b/crates/sparrow-merge/src/merge/binary_merge.rs index f134538fc..9cd836620 100644 --- a/crates/sparrow-merge/src/merge/binary_merge.rs +++ b/crates/sparrow-merge/src/merge/binary_merge.rs @@ -8,9 +8,11 @@ use std::cmp::Ordering; use arrow_array::builder::{TimestampNanosecondBuilder, UInt64Builder}; -use arrow_array::{ArrayRef, TimestampNanosecondArray, UInt64Array}; +use arrow_array::{TimestampNanosecondArray, UInt64Array}; use arrow_schema::ArrowError; -use sparrow_batch::Batch; + +#[cfg(test)] +use arrow_array::ArrayRef; /// Struct describing one of the two inputs to the [binary_merge] /// function. diff --git a/crates/sparrow-merge/src/merge/heterogeneous_merge.rs b/crates/sparrow-merge/src/merge/heterogeneous_merge.rs index 0679d696d..12b2818b8 100644 --- a/crates/sparrow-merge/src/merge/heterogeneous_merge.rs +++ b/crates/sparrow-merge/src/merge/heterogeneous_merge.rs @@ -1,10 +1,10 @@ use std::sync::Arc; -use arrow_array::ArrayRef; +use arrow_array::{new_null_array, ArrayRef, StructArray}; use arrow_schema::{DataType, FieldRef}; use binary_merge::BinaryMergeInput; use error_stack::{IntoReport, IntoReportCompat, ResultExt}; -use sparrow_batch::Batch; +use sparrow_batch::{Batch, RowTime}; use sparrow_instructions::GroupingIndices; use crate::{gather::Gatherer, spread::Spread}; @@ -14,9 +14,10 @@ use super::binary_merge; /// Manages the merge of two heterogeneous inputs. pub struct HeterogeneousMerge { result_type: DataType, + left_data_type: DataType, + right_data_type: DataType, spread_left: Spread, spread_right: Spread, - /// Gathers batches from both sides and produces [GatheredBatches] /// up to a valid watermark. gatherer: Gatherer, @@ -39,6 +40,8 @@ impl HeterogeneousMerge { pub fn new(result_type: &DataType, datatype_l: &DataType, datatype_r: &DataType) -> Self { Self { result_type: result_type.clone(), + left_data_type: datatype_l.clone(), + right_data_type: datatype_r.clone(), spread_left: Spread::try_new(false, datatype_l).expect("spread"), spread_right: Spread::try_new(false, datatype_r).expect("spread"), gatherer: Gatherer::new(2), @@ -69,91 +72,168 @@ impl HeterogeneousMerge { pub fn merge(&mut self) -> error_stack::Result { let gathered_batches = self.gatherer.next_batch(); if let Some(gathered_batches) = gathered_batches { - let concat_batches = gathered_batches.concat(); - let left: &Batch = &concat_batches[0]; - let right: &Batch = &concat_batches[1]; - - // TODO: Assumes batch data is non-empty. - let left_merge_input = BinaryMergeInput::new( - left.time().expect("time"), - left.subsort().expect("subsort"), - left.key_hash().expect("key_hash"), - ); - let right_merge_input = BinaryMergeInput::new( - right.time().expect("time"), - right.subsort().expect("subsort"), - right.key_hash().expect("key_hash"), - ); - let merged_result = crate::binary_merge(left_merge_input, right_merge_input) - .into_report() - .change_context(Error::Internal("TODO"))?; - - let left_spread_bits = arrow::compute::is_not_null(&merged_result.take_a) - .into_report() - .change_context(Error::Internal("TODO"))?; - let right_spread_bits = arrow::compute::is_not_null(&merged_result.take_b) - .into_report() - .change_context(Error::Internal("TODO"))?; + let mut concat_batches = gathered_batches.concat()?; + let right: Batch = concat_batches.remove(1); + let left: Batch = concat_batches.remove(0); + + let result = match (left.time(), right.time()) { + (None, None) => self.handle_empty_merge(left.up_to_time, right.up_to_time)?, + (Some(_), None) => self.handle_single_merge(0, left, right.up_to_time)?, + (None, Some(_)) => self.handle_single_merge(1, right, left.up_to_time)?, + (Some(_), Some(_)) => self.handle_merge(left, right)?, + }; + Ok(result) + } else { + error_stack::bail!(Error::Internal("expected batch -- ")) + } + } - let merged_time = Arc::new(merged_result.time); - let merged_subsort = Arc::new(merged_result.subsort); - let merged_key_hash = Arc::new(merged_result.key_hash); + fn handle_empty_merge( + &mut self, + left: RowTime, + right: RowTime, + ) -> error_stack::Result { + let up_to_time = left.max(right); + Ok(Batch::new_empty(up_to_time)) + } - // TODO: Grouping - let grouping = GroupingIndices::new_empty(); - // TODO: Handle empty batches - let spread_left = self + /// Handles merging where one batch is empty. + /// + /// The `input` indicates from which input the `batch` is from. + fn handle_single_merge( + &mut self, + input: usize, + batch: Batch, + other_up_to: RowTime, + ) -> error_stack::Result { + let up_to_time = batch.up_to_time.max(other_up_to); + + // TODO: Grouping + let grouping = GroupingIndices::new_empty(); + + // SAFETY: Verified batch is non-empty + if input == 0 { + let left_data = self .spread_left - .spread_signaled(&grouping, left.data().expect("data"), &left_spread_bits) + .spread_true(&grouping, batch.data().expect("data")) .into_report() .change_context(Error::Internal("TODO"))?; - let spread_right = self + let num_rows = left_data.len(); + assert_eq!(num_rows, batch.time().expect("time").len()); + + let right_data = new_null_array(&self.right_data_type, num_rows); + let merged_data = self.create_merged_data(num_rows, left_data, right_data)?; + + let batch = batch.with_data(merged_data); + let batch = batch.with_up_to_time(up_to_time); + Ok(batch) + } else if input == 1 { + let right_data = self .spread_right - .spread_signaled(&grouping, right.data().expect("data"), &right_spread_bits) + .spread_true(&grouping, batch.data().expect("data")) .into_report() .change_context(Error::Internal("TODO"))?; + let num_rows = right_data.len(); + assert_eq!(num_rows, batch.time().expect("time").len()); - // println!("Spread left: {:?}", spread_left); - // println!("Spread right: {:?}", spread_right); - assert_eq!(spread_left.len(), spread_right.len()); - let num_rows = spread_left.len(); - - // The result type of the merge does not flatten the structs. - // - // e.g. have X.a + Y.a then we have left: { a: i64 } and right: { a: i64 }. - // If we flatten that to {a: i64} we can no longer perform the arithmetic. - // Instead, we want {left: {a: i64}, right: {a: i64 }} so we can do - // merged.left.a + merged.right. - let fields: Vec<(FieldRef, ArrayRef)> = match &self.result_type { - DataType::Struct(fields) => { - // The result type should always have two fields -- the left and the right. - assert_eq!(fields.len(), 2); - vec![ - (fields[0].clone(), spread_left), - (fields[1].clone(), spread_right), - ] - } - other => { - tracing::error!("expected struct, got {:?}", other); - error_stack::bail!(Error::Internal("merge result type should be a struct")) - } - }; - let merged_data = Arc::new(sparrow_arrow::utils::make_struct_array(num_rows, fields)); - // Since we're merging batches, the up_to_time is the last time in the merged batch. - let up_to_time = merged_time.value(merged_time.len() - 1); - - Ok(Batch::new_with_data( - merged_data, - merged_time, - merged_subsort, - merged_key_hash, - up_to_time.into(), - )) + let left_data = new_null_array(&self.left_data_type, num_rows); + let merged_data = self.create_merged_data(num_rows, left_data, right_data)?; + + let batch = batch.with_data(merged_data); + let batch = batch.with_up_to_time(up_to_time); + Ok(batch) } else { - error_stack::bail!(Error::Internal("expected batch -- ")) + panic!("unexpected input: {}", input); } } + fn handle_merge(&mut self, left: Batch, right: Batch) -> error_stack::Result { + let up_to_time = left.up_to_time.max(right.up_to_time); + + // SAFETY: Verified non-empty arrays + let left_merge_input = BinaryMergeInput::new( + left.time().expect("time"), + left.subsort().expect("subsort"), + left.key_hash().expect("key_hash"), + ); + let right_merge_input = BinaryMergeInput::new( + right.time().expect("time"), + right.subsort().expect("subsort"), + right.key_hash().expect("key_hash"), + ); + let merged_result = crate::binary_merge(left_merge_input, right_merge_input) + .into_report() + .change_context(Error::Internal("TODO"))?; + + let left_spread_bits = arrow::compute::is_not_null(&merged_result.take_a) + .into_report() + .change_context(Error::Internal("TODO"))?; + let right_spread_bits = arrow::compute::is_not_null(&merged_result.take_b) + .into_report() + .change_context(Error::Internal("TODO"))?; + + let merged_time = Arc::new(merged_result.time); + let merged_subsort = Arc::new(merged_result.subsort); + let merged_key_hash = Arc::new(merged_result.key_hash); + + // TODO: Grouping + let grouping = GroupingIndices::new_empty(); + let spread_left = self + .spread_left + .spread_signaled(&grouping, left.data().expect("data"), &left_spread_bits) + .into_report() + .change_context(Error::Internal("TODO"))?; + let spread_right = self + .spread_right + .spread_signaled(&grouping, right.data().expect("data"), &right_spread_bits) + .into_report() + .change_context(Error::Internal("TODO"))?; + + assert_eq!(spread_left.len(), spread_right.len()); + let num_rows = spread_left.len(); + + let merged_data = self.create_merged_data(num_rows, spread_left, spread_right)?; + + Ok(Batch::new_with_data( + merged_data, + merged_time, + merged_subsort, + merged_key_hash, + up_to_time, + )) + } + + fn create_merged_data( + &self, + num_rows: usize, + left_data: ArrayRef, + right_data: ArrayRef, + ) -> error_stack::Result, Error> { + // The result type of the merge does not flatten the structs. + // + // e.g. have X.a + Y.a then we have left: { a: i64 } and right: { a: i64 }. + // If we flatten that to {a: i64} we can no longer perform the arithmetic. + // Instead, we want {left: {a: i64}, right: {a: i64 }} so we can do + // merged.left.a + merged.right. + let fields: Vec<(FieldRef, ArrayRef)> = match &self.result_type { + DataType::Struct(fields) => { + // The result type should always have two fields -- the left and the right. + assert_eq!(fields.len(), 2); + vec![ + (fields[0].clone(), left_data), + (fields[1].clone(), right_data), + ] + } + other => { + tracing::error!("expected struct, got {:?}", other); + error_stack::bail!(Error::Internal("merge result type should be a struct")) + } + }; + Ok(Arc::new(sparrow_arrow::utils::make_struct_array( + num_rows, fields, + ))) + } + /// Closes the given side of the input. /// /// This allows the remaining side to progress unbounded. @@ -163,16 +243,13 @@ impl HeterogeneousMerge { } #[cfg(test)] mod tests { - + use super::*; use arrow_array::{ - types::{ArrowPrimitiveType, TimestampNanosecondType, UInt32Type, UInt64Type}, - Int64Array, RecordBatch, StructArray, TimestampNanosecondArray, UInt32Array, UInt64Array, - UInt8Array, + types::{ArrowPrimitiveType, TimestampNanosecondType, UInt64Type}, + Int64Array, StructArray, TimestampNanosecondArray, UInt32Array, UInt64Array, }; use arrow_schema::{Field, Fields}; - use super::*; - fn minimal_struct_type() -> DataType { DataType::Struct(Fields::from(vec![ Field::new("time", TimestampNanosecondType::DATA_TYPE, true), @@ -354,5 +431,7 @@ mod tests { } #[test] - fn test_merge_empty_batches() {} + fn test_merge_empty_batches() { + // merge in empty batches with high up_to_times + } } diff --git a/crates/sparrow-merge/src/merge_pipeline.rs b/crates/sparrow-merge/src/merge_pipeline.rs index 1c7c8e6e5..ce8e8ed66 100644 --- a/crates/sparrow-merge/src/merge_pipeline.rs +++ b/crates/sparrow-merge/src/merge_pipeline.rs @@ -1,21 +1,15 @@ -use std::collections::VecDeque; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; - use arrow_schema::DataType; -use error_stack::{IntoReport, IntoReportCompat, ResultExt}; -use itertools::Itertools; +use error_stack::{IntoReport, ResultExt}; use parking_lot::Mutex; use sparrow_batch::Batch; use sparrow_physical::StepId; use sparrow_scheduler::{ InputHandles, Partition, Partitioned, Pipeline, PipelineError, Scheduler, TaskRef, }; +use std::sync::atomic::{AtomicBool, Ordering}; use tokio::sync::mpsc::error::TryRecvError; -use crate::gather::Gatherer; -use crate::merge::{BinaryMergeInput, HeterogeneousMerge}; -use crate::spread::Spread; +use crate::merge::HeterogeneousMerge; /// Runs a merge operation. pub struct MergePipeline { @@ -39,7 +33,8 @@ impl std::fmt::Debug for MergePipeline { struct Input { /// The input id for this input - input: StepId, + #[allow(unused)] + id: StepId, /// The data type for this input datatype: DataType, /// Whether this side is closed @@ -106,7 +101,7 @@ impl MergePipeline { /// /// Args: /// consumers: The `InputHandles` to output the result of the transform to. - pub fn try_new<'a>( + pub fn try_new( input_l: StepId, input_r: StepId, datatype_l: &DataType, @@ -115,12 +110,12 @@ impl MergePipeline { consumers: InputHandles, ) -> error_stack::Result { let left = Input { - input: input_l, + id: input_l, datatype: datatype_l.clone(), is_closed: Mutex::new(false), }; let right = Input { - input: input_r, + id: input_r, datatype: datatype_r.clone(), is_closed: Mutex::new(false), }; @@ -136,7 +131,7 @@ impl MergePipeline { impl Pipeline for MergePipeline { fn initialize(&mut self, tasks: Partitioned) { - // TODO: FRAZ - need to have the channels here. + // TODO: FRAZ - need to create the channels here. self.partitions = tasks .into_iter() .map(|task| { @@ -148,7 +143,7 @@ impl Pipeline for MergePipeline { // TODO: Interpolation // Current impl uses unlatched spread (`Null` interpolation), meaning discrete behavior. handler: Mutex::new(MergePartitionHandler { - rxs: Vec::new(), + rxs: Vec::new(), // TODO: create channels merger: HeterogeneousMerge::new( &self.result_type, &self.left.datatype, diff --git a/crates/sparrow-merge/src/spread.rs b/crates/sparrow-merge/src/spread.rs index 9e4d89061..c196c8f9e 100644 --- a/crates/sparrow-merge/src/spread.rs +++ b/crates/sparrow-merge/src/spread.rs @@ -143,6 +143,7 @@ impl Spread { } /// Special case of spread when `signal` is all `true`. + #[allow(unused)] pub fn spread_true( &mut self, grouping: &GroupingIndices, @@ -152,6 +153,7 @@ impl Spread { } /// Special case of `spread` when `signal` is all `false`. + #[allow(unused)] pub fn spread_false( &mut self, grouping: &GroupingIndices,