diff --git a/Cargo.lock b/Cargo.lock index 2a4b9c327..56c326f82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4331,6 +4331,7 @@ dependencies = [ "anyhow", "arrow", "arrow-arith", + "arrow-array", "arrow-select", "async-once-cell", "async-stream", @@ -4546,6 +4547,7 @@ dependencies = [ "num", "serde_json", "sparrow-arrow", + "sparrow-batch", "sparrow-physical", "static_init", "substring", @@ -4754,7 +4756,6 @@ dependencies = [ "proptest", "smallvec", "sparrow-arrow", - "sparrow-batch", "sparrow-core", "sparrow-interfaces", "tokio", @@ -4913,6 +4914,7 @@ dependencies = [ "loom", "serde", "sparrow-arrow", + "sparrow-batch", "tracing", "work-queue", ] diff --git a/crates/sparrow-arrow/src/batch.rs b/crates/sparrow-arrow/src/batch.rs deleted file mode 100644 index 245da081c..000000000 --- a/crates/sparrow-arrow/src/batch.rs +++ /dev/null @@ -1,501 +0,0 @@ -use arrow_array::cast::AsArray; -use arrow_array::types::{TimestampNanosecondType, UInt64Type}; -use arrow_array::{Array, ArrayRef, ArrowPrimitiveType, TimestampNanosecondArray, UInt64Array}; -use error_stack::{IntoReport, ResultExt}; -use itertools::Itertools; - -use crate::downcast::downcast_primitive_array; -use crate::RowTime; - -/// A batch to be processed by the system. -#[derive(Clone, PartialEq, Debug)] -pub struct Batch { - /// The data associated with the batch. - pub(crate) data: Option, - /// An indication that the batch stream has completed up to the given time. - /// Any rows in future batches on this stream must have a time strictly - /// greater than this. - pub up_to_time: RowTime, -} - -impl Batch { - pub fn new_empty(up_to_time: RowTime) -> Self { - Self { - data: None, - up_to_time, - } - } - - pub fn is_empty(&self) -> bool { - self.data.is_none() - } - - pub fn num_rows(&self) -> usize { - match &self.data { - Some(data) => data.data.len(), - None => 0, - } - } - - pub fn time(&self) -> Option<&TimestampNanosecondArray> { - self.data.as_ref().map(|data| data.time()) - } - - pub fn subsort(&self) -> Option<&UInt64Array> { - self.data.as_ref().map(|data| data.subsort()) - } - - pub fn key_hash(&self) -> Option<&UInt64Array> { - self.data.as_ref().map(|data| data.key_hash()) - } - - /// Return the minimum time present in the batch. - pub fn min_present_time(&self) -> Option { - self.data.as_ref().map(|info| info.min_present_time) - } - - pub fn max_present_time(&self) -> Option { - self.data.as_ref().map(|info| info.max_present_time) - } - - pub fn data(&self) -> Option<&ArrayRef> { - self.data.as_ref().map(|info| &info.data) - } - - /// Create a new `Batch` containing the given batch data. - /// - /// `time`, `subsort` and `key_hash` are references to the key columns. - /// They may be references to columns within the batch or not. - /// - /// `up_to_time` is a [RowTime] such that: - /// (a) all rows so far are less than or equal to `up_to_time` - /// (b) all rows in this batch or less than or equal to `up_to_time` - /// (c) all future rows are greater than `up_to_time`. - pub fn new_with_data( - data: ArrayRef, - time: ArrayRef, - subsort: ArrayRef, - key_hash: ArrayRef, - up_to_time: RowTime, - ) -> Self { - debug_assert_eq!(data.len(), time.len()); - debug_assert_eq!(data.len(), subsort.len()); - debug_assert_eq!(data.len(), key_hash.len()); - debug_assert_eq!(time.data_type(), &TimestampNanosecondType::DATA_TYPE); - debug_assert_eq!(subsort.data_type(), &UInt64Type::DATA_TYPE); - debug_assert_eq!(key_hash.data_type(), &UInt64Type::DATA_TYPE); - let data = if data.len() == 0 { - None - } else { - Some(BatchInfo::new(data, time, subsort, key_hash)) - }; - - Self { data, up_to_time } - } - - /// Return a new `Batch` with the same time properties, but new data. - pub fn with_projection(&self, new_data: ArrayRef) -> Self { - assert_eq!(new_data.len(), self.num_rows()); - Self { - data: self.data.as_ref().map(|data| BatchInfo { - data: new_data, - time: data.time.clone(), - subsort: data.subsort.clone(), - key_hash: data.key_hash.clone(), - min_present_time: data.min_present_time, - max_present_time: data.max_present_time, - }), - up_to_time: self.up_to_time, - } - } - - /// Split off the rows less or equal to the given time. - /// - /// Returns the rows less than or equal to the given time and - /// leaves the remaining rows in `self`. - pub fn split_up_to(&mut self, time_inclusive: RowTime) -> Option { - let Some(data) = &mut self.data else { - return None; - }; - - if time_inclusive < data.min_present_time { - // time_inclusive < min_present <= max_present. - // none of the rows in the batch should be taken. - return None; - } - - if data.max_present_time <= time_inclusive { - // min_present <= max_present <= time_inclusive - // all rows should be taken - return Some(Batch { - data: self.data.take(), - // Even though we took all the rows, the batch - // we return is only as complete as the original - // batch. There may be other batches after this - // that have equal rows less than time inclusive. - up_to_time: self.up_to_time, - }); - } - - // If we reach this point, then we need to actually split the data. - debug_assert!(time_inclusive <= self.up_to_time); - Some(Batch { - data: Some(data.split_up_to(time_inclusive)), - // We can be complete up to time_inclusive because it is less - // than or equal to the time this batch was complete to. We put - // all of the rows this batch had up to that time in the result, - // and only left the batches after that time. - up_to_time: time_inclusive, - }) - } - - pub fn concat(batches: Vec, up_to_time: RowTime) -> error_stack::Result { - // TODO: Add debug assertions for batch ordering? - if batches.iter().all(|batch| batch.is_empty()) { - return Ok(Batch::new_empty(up_to_time)); - } - - let min_present_time = batches - .iter() - .find_map(|batch| batch.data.as_ref().map(|data| data.min_present_time)) - .expect("at least one non-empty batch"); - - let max_present_time = batches - .iter() - .rev() - .find_map(|batch| batch.data.as_ref().map(|data| data.max_present_time)) - .expect("at least one non-empty batch"); - - let times: Vec<_> = batches - .iter() - .flat_map(|batch| batch.time()) - .map(|batch| -> &dyn arrow_array::Array { batch }) - .collect(); - let time = arrow_select::concat::concat(×) - .into_report() - .change_context(Error::Internal)?; - - let subsorts: Vec<_> = batches - .iter() - .flat_map(|batch| batch.subsort()) - .map(|batch| -> &dyn arrow_array::Array { batch }) - .collect(); - let subsort = arrow_select::concat::concat(&subsorts) - .into_report() - .change_context(Error::Internal)?; - - let key_hashes: Vec<_> = batches - .iter() - .flat_map(|batch| batch.key_hash()) - .map(|batch| -> &dyn arrow_array::Array { batch }) - .collect(); - let key_hash = arrow_select::concat::concat(&key_hashes) - .into_report() - .change_context(Error::Internal)?; - - let batches: Vec<_> = batches - .iter() - .flat_map(|batch| batch.data()) - .map(|data| data.as_ref()) - .collect(); - let data = arrow_select::concat::concat(&batches) - .into_report() - .change_context(Error::Internal)?; - - Ok(Self { - data: Some(BatchInfo { - data, - time, - subsort, - key_hash, - min_present_time, - max_present_time, - }), - up_to_time, - }) - } - - pub fn take(&self, indices: &UInt64Array) -> error_stack::Result { - match &self.data { - Some(info) => { - let data = arrow_select::take::take(info.data.as_ref(), indices, None) - .into_report() - .change_context(Error::Internal)?; - let time = arrow_select::take::take(info.time.as_ref(), indices, None) - .into_report() - .change_context(Error::Internal)?; - let subsort = arrow_select::take::take(info.subsort.as_ref(), indices, None) - .into_report() - .change_context(Error::Internal)?; - let key_hash = arrow_select::take::take(info.key_hash.as_ref(), indices, None) - .into_report() - .change_context(Error::Internal)?; - let info = BatchInfo { - data, - time, - subsort, - key_hash, - // TODO: Should the `*_present_time` be updated to reflect actual contents of batch? - min_present_time: info.min_present_time, - max_present_time: info.max_present_time, - }; - - Ok(Self { - data: Some(info), - up_to_time: self.up_to_time, - }) - } - None => { - assert_eq!(indices.len() - indices.null_count(), 0); - Ok(self.clone()) - } - } - } - - pub fn slice(&self, offset: usize, length: usize) -> Self { - match &self.data { - Some(info) => { - let info = BatchInfo { - data: info.data.slice(offset, length), - time: info.time.slice(offset, length), - subsort: info.subsort.slice(offset, length), - key_hash: info.key_hash.slice(offset, length), - min_present_time: info.min_present_time, - max_present_time: info.max_present_time, - }; - Self { - data: Some(info), - up_to_time: self.up_to_time, - } - } - None => { - assert_eq!(offset, 0); - assert_eq!(length, 0); - self.clone() - } - } - } - - /// Creates a batch with the given times and key hashes. - #[cfg(any(test, feature = "testing"))] - pub fn minimal_from( - time: impl Into, - key_hash: impl Into, - up_to_time: i64, - ) -> Self { - use std::sync::Arc; - - use arrow_array::StructArray; - - let time: TimestampNanosecondArray = time.into(); - let subsort: UInt64Array = (0..(time.len() as u64)).collect_vec().into(); - let key_hash: UInt64Array = key_hash.into(); - - let time: ArrayRef = Arc::new(time); - let subsort: ArrayRef = Arc::new(subsort); - let key_hash: ArrayRef = Arc::new(key_hash); - - let data = Arc::new(StructArray::new( - MINIMAL_SCHEMA.fields().clone(), - vec![time.clone(), key_hash.clone()], - None, - )); - - Batch::new_with_data( - data, - time, - subsort, - key_hash, - RowTime::from_timestamp_ns(up_to_time), - ) - } -} - -#[derive(Clone, Debug)] -pub(crate) struct BatchInfo { - pub(crate) data: ArrayRef, - pub(crate) time: ArrayRef, - pub(crate) subsort: ArrayRef, - pub(crate) key_hash: ArrayRef, - min_present_time: RowTime, - max_present_time: RowTime, -} - -impl PartialEq for BatchInfo { - fn eq(&self, other: &Self) -> bool { - self.data.as_ref() == other.data.as_ref() - && self.time.as_ref() == other.time.as_ref() - && self.min_present_time == other.min_present_time - && self.max_present_time == other.max_present_time - } -} - -impl BatchInfo { - fn new(data: ArrayRef, time: ArrayRef, subsort: ArrayRef, key_hash: ArrayRef) -> Self { - debug_assert_eq!(data.len(), time.len()); - debug_assert_eq!(data.len(), subsort.len()); - debug_assert_eq!(data.len(), key_hash.len()); - debug_assert!(data.len() > 0); - debug_assert_eq!(time.null_count(), 0); - - let time_column: &TimestampNanosecondArray = - downcast_primitive_array(time.as_ref()).expect("time column to be timestamp"); - - // Debug assertion that the array is sorted by time. - // Once `is_sorted` is stable (https://github.com/rust-lang/rust/issues/53485). - // debug_assert!(time_column.iter().is_sorted()); - // - debug_assert!(time_column.iter().tuple_windows().all(|(a, b)| a <= b)); - - let min_present_time = RowTime::from_timestamp_ns(time_column.values()[0]); - let max_present_time = - RowTime::from_timestamp_ns(time_column.values()[time_column.len() - 1]); - - Self { - data, - time, - subsort, - key_hash, - min_present_time, - max_present_time, - } - } - - /// Split off the rows less or equal to the given time. - /// - /// Returns the rows less than or equal to the given time and - /// leaves the remaining rows in `self`. - fn split_up_to(&mut self, time_inclusive: RowTime) -> Self { - let time_column: &TimestampNanosecondArray = - downcast_primitive_array(self.time.as_ref()).expect("time column to be timestamp"); - - // 0..slice_start (len = slice_start) = what we return - // slice_start..len (len = len - slice_start) = what we leave in self - let slice_start = time_column - .values() - .partition_point(|time| RowTime::from_timestamp_ns(*time) <= time_inclusive); - let slice_len = self.data.len() - slice_start; - - let max_result_time = RowTime::from_timestamp_ns(time_column.values()[slice_start - 1]); - let min_self_time = RowTime::from_timestamp_ns(time_column.values()[slice_start]); - - let result = Self { - data: self.data.slice(0, slice_start), - time: self.time.slice(0, slice_start), - subsort: self.subsort.slice(0, slice_start), - key_hash: self.key_hash.slice(0, slice_start), - min_present_time: self.min_present_time, - max_present_time: max_result_time, - }; - - self.data = self.data.slice(slice_start, slice_len); - self.time = self.time.slice(slice_start, slice_len); - self.key_hash = self.key_hash.slice(slice_start, slice_len); - self.min_present_time = min_self_time; - - result - } - - pub(crate) fn time(&self) -> &TimestampNanosecondArray { - self.time.as_primitive() - } - - pub(crate) fn subsort(&self) -> &UInt64Array { - self.subsort.as_primitive() - } - - pub(crate) fn key_hash(&self) -> &UInt64Array { - self.key_hash.as_primitive() - } -} - -#[derive(Debug, derive_more::Display)] -pub enum Error { - #[display(fmt = "internal error")] - Internal, -} - -impl error_stack::Context for Error {} - -#[cfg(any(test, feature = "testing"))] -#[static_init::dynamic] -static MINIMAL_SCHEMA: arrow_schema::SchemaRef = { - use std::sync::Arc; - - use arrow_array::types::{TimestampNanosecondType, UInt64Type}; - use arrow_array::ArrowPrimitiveType; - use arrow_schema::{Field, Schema}; - - let schema = Schema::new(vec![ - Field::new("time", TimestampNanosecondType::DATA_TYPE, false), - Field::new("key_hash", UInt64Type::DATA_TYPE, false), - ]); - Arc::new(schema) -}; - -#[cfg(test)] -mod tests { - - use crate::testing::arb_arrays::arb_batch; - use crate::{Batch, RowTime}; - use itertools::Itertools; - use proptest::prelude::*; - - #[test] - fn test_split_batch() { - let mut batch = Batch::minimal_from(vec![0, 1], vec![0, 0], 1); - - let result = batch.split_up_to(RowTime::from_timestamp_ns(0)).unwrap(); - - assert_eq!(result, Batch::minimal_from(vec![0], vec![0], 0)); - assert_eq!(batch, Batch::minimal_from(vec![1], vec![0], 1)); - } - - proptest::proptest! { - #[test] - fn test_split_in_batch(original in arb_batch(2..100)) { - for time in original.time().unwrap().values().iter().dedup() { - let time = RowTime::from_timestamp_ns(*time); - - let mut remainder = original.clone(); - - if let Some(result) = remainder.split_up_to(time) { - // The result of the split is complete up to the requested time. - prop_assert_eq!(result.up_to_time, time); - // The remainder is complete up to the original time. - prop_assert_eq!(remainder.up_to_time, original.up_to_time); - - // create the concatenated result - let concatenated = match (result.data(), remainder.data()) { - (None, None) => unreachable!(), - (Some(a), None) => a.clone(), - (None, Some(b)) => b.clone(), - (Some(a), Some(b)) => arrow_select::concat::concat(&[a.as_ref(), b.as_ref()]).unwrap(), - }; - prop_assert_eq!(&concatenated, original.data().unwrap()); - - prop_assert!(result.data.is_some()); - let result = result.data.unwrap(); - - prop_assert_eq!(result.data.len(), result.time.len()); - prop_assert_eq!(result.time().values()[0], i64::from(result.min_present_time)); - prop_assert_eq!(result.time().values()[result.time.len() - 1], i64::from(result.max_present_time)); - - if time == original.max_present_time().unwrap() { - // If the time equalled the max present time, then we consume the batch. - prop_assert!(remainder.is_empty()); - } else { - prop_assert!(remainder.data.is_some()); - let remainder = remainder.data.unwrap(); - - prop_assert_eq!(remainder.data.len(), remainder.time.len()); - prop_assert_eq!(remainder.time().values()[0], i64::from(remainder.min_present_time)); - prop_assert_eq!(remainder.time().values()[remainder.time.len() - 1], i64::from(remainder.max_present_time)); - } - } else { - prop_assert!(false) - } - } - } - } -} diff --git a/crates/sparrow-arrow/src/lib.rs b/crates/sparrow-arrow/src/lib.rs index 65d2f40b4..f39d56cff 100644 --- a/crates/sparrow-arrow/src/lib.rs +++ b/crates/sparrow-arrow/src/lib.rs @@ -10,18 +10,14 @@ pub mod attachments; #[cfg(feature = "avro")] pub mod avro; -mod batch; mod concat_take; pub mod downcast; pub mod hash; pub mod hasher; -mod row_time; pub mod scalar_value; pub mod serde; #[cfg(any(test, feature = "testing"))] pub mod testing; pub mod utils; -pub use batch::*; pub use concat_take::*; -pub use row_time::*; diff --git a/crates/sparrow-arrow/src/row_time.rs b/crates/sparrow-arrow/src/row_time.rs deleted file mode 100644 index 687e80a86..000000000 --- a/crates/sparrow-arrow/src/row_time.rs +++ /dev/null @@ -1,35 +0,0 @@ -use arrow_array::temporal_conversions::timestamp_ns_to_datetime; - -/// Wrapper around the time of a row. -/// -/// This is used to allow for future changes to the representation -/// of time in a single place. -#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Default)] -#[repr(transparent)] -pub struct RowTime(i64); - -impl std::fmt::Display for RowTime { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let time = timestamp_ns_to_datetime(self.0).expect("expected date time"); - write!(f, "{time}") - } -} - -impl RowTime { - pub const ZERO: Self = Self::from_timestamp_ns(0); - pub const MAX: Self = Self::from_timestamp_ns(i64::MAX); - - pub const fn from_timestamp_ns(timestamp: i64) -> Self { - Self(timestamp) - } - - pub fn pred(&self) -> Self { - Self(self.0 - 1) - } -} - -impl From for i64 { - fn from(val: RowTime) -> Self { - val.0 - } -} diff --git a/crates/sparrow-batch/Cargo.toml b/crates/sparrow-batch/Cargo.toml index bbace2118..625501351 100644 --- a/crates/sparrow-batch/Cargo.toml +++ b/crates/sparrow-batch/Cargo.toml @@ -12,6 +12,7 @@ Defines the main struct for wrapping RecordBatches in execution. [dependencies] anyhow.workspace = true arrow.workspace = true +arrow-array.workspace = true arrow-arith.workspace = true arrow-select.workspace = true async-once-cell.workspace = true diff --git a/crates/sparrow-batch/src/batch.rs b/crates/sparrow-batch/src/batch.rs index f3c34dcea..66d5935d5 100644 --- a/crates/sparrow-batch/src/batch.rs +++ b/crates/sparrow-batch/src/batch.rs @@ -1,394 +1,498 @@ -use std::sync::Arc; +use arrow_array::cast::AsArray; +use arrow_array::types::{TimestampNanosecondType, UInt64Type}; +use arrow_array::{Array, ArrayRef, ArrowPrimitiveType, TimestampNanosecondArray, UInt64Array}; +use error_stack::{IntoReport, ResultExt}; +use itertools::Itertools; -use anyhow::Context; -use arrow::array::{ArrayRef, TimestampNanosecondArray, UInt64Array}; -use arrow::datatypes::{ - ArrowPrimitiveType, DataType, Field, Schema, SchemaRef, TimeUnit, TimestampNanosecondType, -}; -use arrow::record_batch::RecordBatch; -use chrono::NaiveDateTime; -use rand::Rng; -use sparrow_arrow::downcast::downcast_primitive_array; -use sparrow_core::{KeyTriple, KeyTriples}; - -/// Represents a batch to be processed. -/// -/// Each batch contains 0 or more rows of data and covers a range of time. -#[derive(Debug, Clone)] +use crate::RowTime; + +/// A batch to be processed by the system. +#[derive(Clone, PartialEq, Debug)] pub struct Batch { - /// The first key triple in the record batch. - /// - /// This represents the lower bound of the range of rows - /// this record batch contains. Although certain operations - /// may filter rows out, the bounds remain. This ensures we - /// continue to make progress on downstream operations. - /// The minimum row in the data may be past this key triple. - pub lower_bound: KeyTriple, - - /// The last key triple in the record batch. - /// - /// This represents the upper bound of the range of rows - /// this record batch contains. Although certain operations - /// may filter rows out, the bounds remain. This ensures we - /// continue to make progress on downstream operations. - /// The maximum row in the data may be before this key triple. - pub upper_bound: KeyTriple, - - /// The data contained in this batch. - /// - /// The data is sorted by the key triples. - pub data: RecordBatch, + /// The data associated with the batch. + pub(crate) data: Option, + /// An indication that the batch stream has completed up to the given time. + /// Any rows in future batches on this stream must have a time strictly + /// greater than this. + pub up_to_time: RowTime, } impl Batch { - /// Construct a new batch, inferring the lower and upper bound from the - /// data. - /// - /// It is expected that that data is sorted. - /// - /// This should be used only when the initial [Batch] is created while - /// reading a table. Otherwise, bounds should be preserved using - /// [Batch::with_data] or [Batch::try_new_bounds]. - pub fn try_new_from_batch(data: RecordBatch) -> anyhow::Result { - anyhow::ensure!( - data.num_rows() > 0, - "Unable to create batch from empty data" - ); - - let key_triples = KeyTriples::try_from(&data)?; - Self::try_new_with_bounds( - data, - key_triples.first().context("First key triple")?, - key_triples.last().context("Last key triple")?, - ) + pub fn new_empty(up_to_time: RowTime) -> Self { + Self { + data: None, + up_to_time, + } } - pub fn try_new_with_bounds( - data: RecordBatch, - lower_bound: KeyTriple, - upper_bound: KeyTriple, - ) -> anyhow::Result { - #[cfg(debug_assertions)] - validate(&data, &lower_bound, &upper_bound)?; - - Ok(Self { - lower_bound, - upper_bound, - data, - }) + pub fn is_empty(&self) -> bool { + self.data.is_none() } - pub fn schema(&self) -> SchemaRef { - self.data.schema() + pub fn num_rows(&self) -> usize { + match &self.data { + Some(data) => data.data.len(), + None => 0, + } } - pub fn data(&self) -> &RecordBatch { - &self.data + pub fn time(&self) -> Option<&TimestampNanosecondArray> { + self.data.as_ref().map(|data| data.time()) } - pub fn column(&self, i: usize) -> &ArrayRef { - self.data.column(i) + pub fn subsort(&self) -> Option<&UInt64Array> { + self.data.as_ref().map(|data| data.subsort()) } - pub fn columns(&self) -> &[ArrayRef] { - self.data.columns() + pub fn key_hash(&self) -> Option<&UInt64Array> { + self.data.as_ref().map(|data| data.key_hash()) } - pub fn lower_bound_as_date(&self) -> anyhow::Result { - arrow::temporal_conversions::timestamp_ns_to_datetime(self.lower_bound.time) - .context("convert lower bound to date") + /// Return the minimum time present in the batch. + pub fn min_present_time(&self) -> Option { + self.data.as_ref().map(|info| info.min_present_time) } - pub fn upper_bound_as_date(&self) -> anyhow::Result { - arrow::temporal_conversions::timestamp_ns_to_datetime(self.upper_bound.time) - .context("convert upper bound to date") + pub fn max_present_time(&self) -> Option { + self.data.as_ref().map(|info| info.max_present_time) } - /// Return the array of times within this. - pub fn times(&self) -> anyhow::Result<&[i64]> { - // TODO: Consider storing the downcast version of the column (possibly in an - // `ArcRef`)? - let times: &TimestampNanosecondArray = - downcast_primitive_array(self.data.column(0).as_ref())?; - Ok(times.values()) + pub fn data(&self) -> Option<&ArrayRef> { + self.data.as_ref().map(|info| &info.data) } - pub fn is_empty(&self) -> bool { - self.data.num_rows() == 0 - } - - pub fn num_rows(&self) -> usize { - self.data.num_rows() + /// Create a new `Batch` containing the given batch data. + /// + /// `time`, `subsort` and `key_hash` are references to the key columns. + /// They may be references to columns within the batch or not. + /// + /// `up_to_time` is a [RowTime] such that: + /// (a) all rows so far are less than or equal to `up_to_time` + /// (b) all rows in this batch or less than or equal to `up_to_time` + /// (c) all future rows are greater than `up_to_time`. + pub fn new_with_data( + data: ArrayRef, + time: ArrayRef, + subsort: ArrayRef, + key_hash: ArrayRef, + up_to_time: RowTime, + ) -> Self { + debug_assert_eq!(data.len(), time.len()); + debug_assert_eq!(data.len(), subsort.len()); + debug_assert_eq!(data.len(), key_hash.len()); + debug_assert_eq!(time.data_type(), &TimestampNanosecondType::DATA_TYPE); + debug_assert_eq!(subsort.data_type(), &UInt64Type::DATA_TYPE); + debug_assert_eq!(key_hash.data_type(), &UInt64Type::DATA_TYPE); + let data = if data.len() == 0 { + None + } else { + Some(BatchInfo::new(data, time, subsort, key_hash)) + }; + + Self { data, up_to_time } } - /// Create a new batch with the given data and the same bounds. - pub fn with_data(&self, data: RecordBatch) -> Self { + /// Return a new `Batch` with the same time properties, but new data. + pub fn with_projection(&self, new_data: ArrayRef) -> Self { + assert_eq!(new_data.len(), self.num_rows()); Self { - data, - lower_bound: self.lower_bound, - upper_bound: self.upper_bound, + data: self.data.as_ref().map(|data| BatchInfo { + data: new_data, + time: data.time.clone(), + subsort: data.subsort.clone(), + key_hash: data.key_hash.clone(), + min_present_time: data.min_present_time, + max_present_time: data.max_present_time, + }), + up_to_time: self.up_to_time, } } - pub fn slice(&self, offset: usize, length: usize) -> Self { - let data = self.data.slice(offset, length); - Self::try_new_from_batch(data).expect("slicing batch produces valid batch") + /// Split off the rows less or equal to the given time. + /// + /// Returns the rows less than or equal to the given time and + /// leaves the remaining rows in `self`. + pub fn split_up_to(&mut self, time_inclusive: RowTime) -> Option { + let Some(data) = &mut self.data else { + return None; + }; + + if time_inclusive < data.min_present_time { + // time_inclusive < min_present <= max_present. + // none of the rows in the batch should be taken. + return None; + } + + if data.max_present_time <= time_inclusive { + // min_present <= max_present <= time_inclusive + // all rows should be taken + return Some(Batch { + data: self.data.take(), + // Even though we took all the rows, the batch + // we return is only as complete as the original + // batch. There may be other batches after this + // that have equal rows less than time inclusive. + up_to_time: self.up_to_time, + }); + } + + // If we reach this point, then we need to actually split the data. + debug_assert!(time_inclusive <= self.up_to_time); + Some(Batch { + data: Some(data.split_up_to(time_inclusive)), + // We can be complete up to time_inclusive because it is less + // than or equal to the time this batch was complete to. We put + // all of the rows this batch had up to that time in the result, + // and only left the batches after that time. + up_to_time: time_inclusive, + }) } - pub fn batch_from_nanos(start: i64, end: i64) -> Batch { - let times: Vec = (start..=end).collect(); - let time = TimestampNanosecondArray::from_iter_values(times.iter().copied()); - Self::test_batch_random_keys(time) + pub fn concat(batches: Vec, up_to_time: RowTime) -> error_stack::Result { + // TODO: Add debug assertions for batch ordering? + if batches.iter().all(|batch| batch.is_empty()) { + return Ok(Batch::new_empty(up_to_time)); + } + + let min_present_time = batches + .iter() + .find_map(|batch| batch.data.as_ref().map(|data| data.min_present_time)) + .expect("at least one non-empty batch"); + + let max_present_time = batches + .iter() + .rev() + .find_map(|batch| batch.data.as_ref().map(|data| data.max_present_time)) + .expect("at least one non-empty batch"); + + let times: Vec<_> = batches + .iter() + .flat_map(|batch| batch.time()) + .map(|batch| -> &dyn arrow_array::Array { batch }) + .collect(); + let time = arrow_select::concat::concat(×) + .into_report() + .change_context(Error::Internal)?; + + let subsorts: Vec<_> = batches + .iter() + .flat_map(|batch| batch.subsort()) + .map(|batch| -> &dyn arrow_array::Array { batch }) + .collect(); + let subsort = arrow_select::concat::concat(&subsorts) + .into_report() + .change_context(Error::Internal)?; + + let key_hashes: Vec<_> = batches + .iter() + .flat_map(|batch| batch.key_hash()) + .map(|batch| -> &dyn arrow_array::Array { batch }) + .collect(); + let key_hash = arrow_select::concat::concat(&key_hashes) + .into_report() + .change_context(Error::Internal)?; + + let batches: Vec<_> = batches + .iter() + .flat_map(|batch| batch.data()) + .map(|data| data.as_ref()) + .collect(); + let data = arrow_select::concat::concat(&batches) + .into_report() + .change_context(Error::Internal)?; + + Ok(Self { + data: Some(BatchInfo { + data, + time, + subsort, + key_hash, + min_present_time, + max_present_time, + }), + up_to_time, + }) } - pub fn batch_from_dates(start: NaiveDateTime, end: NaiveDateTime) -> Batch { - let start = start - .timestamp_nanos_opt() - .expect("timestamp doesn't overflow"); - let end = end - .timestamp_nanos_opt() - .expect("timestamp doesn't overflow"); - let times: Vec = (start..=end).collect(); - let time = TimestampNanosecondArray::from_iter_values(times.iter().copied()); - Self::test_batch_random_keys(time) + pub fn take(&self, indices: &UInt64Array) -> error_stack::Result { + match &self.data { + Some(info) => { + let data = arrow_select::take::take(info.data.as_ref(), indices, None) + .into_report() + .change_context(Error::Internal)?; + let time = arrow_select::take::take(info.time.as_ref(), indices, None) + .into_report() + .change_context(Error::Internal)?; + let subsort = arrow_select::take::take(info.subsort.as_ref(), indices, None) + .into_report() + .change_context(Error::Internal)?; + let key_hash = arrow_select::take::take(info.key_hash.as_ref(), indices, None) + .into_report() + .change_context(Error::Internal)?; + let info = BatchInfo { + data, + time, + subsort, + key_hash, + // TODO: Should the `*_present_time` be updated to reflect actual contents of batch? + min_present_time: info.min_present_time, + max_present_time: info.max_present_time, + }; + + Ok(Self { + data: Some(info), + up_to_time: self.up_to_time, + }) + } + None => { + assert_eq!(indices.len() - indices.null_count(), 0); + Ok(self.clone()) + } + } } - // Creates a batch from the time array and the given subsort/key repeated. - pub fn test_batch(time: TimestampNanosecondArray, subsort: u64, key: u64) -> Batch { - let subsort = UInt64Array::from_iter_values(std::iter::repeat(subsort).take(time.len())); - let key_hash = UInt64Array::from_iter_values(std::iter::repeat(key).take(time.len())); - let schema = Schema::new(vec![ - Field::new("_time", TimestampNanosecondType::DATA_TYPE, false), - Field::new("_subsort", DataType::UInt64, false), - Field::new("_key_hash", DataType::UInt64, false), - ]); - let schema = Arc::new(schema); - let batch = RecordBatch::try_new( - schema, - vec![Arc::new(time), Arc::new(subsort), Arc::new(key_hash)], - ) - .unwrap(); - Batch::try_new_from_batch(batch).unwrap() + pub fn slice(&self, offset: usize, length: usize) -> Self { + match &self.data { + Some(info) => { + let info = BatchInfo { + data: info.data.slice(offset, length), + time: info.time.slice(offset, length), + subsort: info.subsort.slice(offset, length), + key_hash: info.key_hash.slice(offset, length), + min_present_time: info.min_present_time, + max_present_time: info.max_present_time, + }; + Self { + data: Some(info), + up_to_time: self.up_to_time, + } + } + None => { + assert_eq!(offset, 0); + assert_eq!(length, 0); + self.clone() + } + } } - // Creates a batch from the time array, with randomized subsort/keys. - // - // Note this isn't smart enough to ensure increasing random keys, so it assumes - // that the time is strictly increasing. - pub fn test_batch_random_keys(time: TimestampNanosecondArray) -> Batch { - let mut rng = rand::thread_rng(); - let vals: Vec = (0..time.len()) - .map(|_| rng.gen_range(0..10) as u64) - .collect(); + /// Creates a batch with the given times and key hashes. + #[cfg(any(test, feature = "testing"))] + pub fn minimal_from( + time: impl Into, + key_hash: impl Into, + up_to_time: i64, + ) -> Self { + use std::sync::Arc; + + use arrow_array::StructArray; + + let time: TimestampNanosecondArray = time.into(); + let subsort: UInt64Array = (0..(time.len() as u64)).collect_vec().into(); + let key_hash: UInt64Array = key_hash.into(); + + let time: ArrayRef = Arc::new(time); + let subsort: ArrayRef = Arc::new(subsort); + let key_hash: ArrayRef = Arc::new(key_hash); - let subsort = UInt64Array::from_iter_values(vals.clone()); - let key_hash = UInt64Array::from_iter_values(vals); - let schema = Schema::new(vec![ - Field::new("_time", TimestampNanosecondType::DATA_TYPE, false), - Field::new("_subsort", DataType::UInt64, false), - Field::new("_key_hash", DataType::UInt64, false), - ]); - let schema = Arc::new(schema); - let batch = RecordBatch::try_new( - schema, - vec![Arc::new(time), Arc::new(subsort), Arc::new(key_hash)], + let data = Arc::new(StructArray::new( + MINIMAL_SCHEMA.fields().clone(), + vec![time.clone(), key_hash.clone()], + None, + )); + + Batch::new_with_data( + data, + time, + subsort, + key_hash, + RowTime::from_timestamp_ns(up_to_time), ) - .unwrap(); - Batch::try_new_from_batch(batch).unwrap() } +} - #[allow(dead_code)] - pub fn as_json(&self) -> AsJson<'_> { - AsJson(self) - } +#[derive(Clone, Debug)] +pub(crate) struct BatchInfo { + pub(crate) data: ArrayRef, + pub(crate) time: ArrayRef, + pub(crate) subsort: ArrayRef, + pub(crate) key_hash: ArrayRef, + min_present_time: RowTime, + max_present_time: RowTime, } -pub fn validate_batch_schema(schema: &Schema) -> anyhow::Result<()> { - // Validate the three key columns are present, non-null and have the right type. - validate_key_column( - schema, - 0, - "_time", - &DataType::Timestamp(TimeUnit::Nanosecond, None), - )?; - validate_key_column(schema, 1, "_subsort", &DataType::UInt64)?; - validate_key_column(schema, 2, "_key_hash", &DataType::UInt64)?; - - Ok(()) +impl PartialEq for BatchInfo { + fn eq(&self, other: &Self) -> bool { + self.data.as_ref() == other.data.as_ref() + && self.time.as_ref() == other.time.as_ref() + && self.min_present_time == other.min_present_time + && self.max_present_time == other.max_present_time + } } -#[cfg(debug_assertions)] -/// Validate that the result is totally sorted and that the lower and upper -/// bound are correct. -pub fn validate_bounds( - time: &ArrayRef, - subsort: &ArrayRef, - key_hash: &ArrayRef, - lower_bound: &KeyTriple, - upper_bound: &KeyTriple, -) -> anyhow::Result<()> { - if time.len() == 0 { - // No more validation necessary for empty batches. - return Ok(()); +impl BatchInfo { + fn new(data: ArrayRef, time: ArrayRef, subsort: ArrayRef, key_hash: ArrayRef) -> Self { + debug_assert_eq!(data.len(), time.len()); + debug_assert_eq!(data.len(), subsort.len()); + debug_assert_eq!(data.len(), key_hash.len()); + debug_assert!(data.len() > 0); + debug_assert_eq!(time.null_count(), 0); + + let time_column: &TimestampNanosecondArray = time.as_primitive(); + + // Debug assertion that the array is sorted by time. + // Once `is_sorted` is stable (https://github.com/rust-lang/rust/issues/53485). + // debug_assert!(time_column.iter().is_sorted()); + // + debug_assert!(time_column.iter().tuple_windows().all(|(a, b)| a <= b)); + + let min_present_time = RowTime::from_timestamp_ns(time_column.values()[0]); + let max_present_time = + RowTime::from_timestamp_ns(time_column.values()[time_column.len() - 1]); + + Self { + data, + time, + subsort, + key_hash, + min_present_time, + max_present_time, + } } - let time: &TimestampNanosecondArray = downcast_primitive_array(time)?; - let subsort: &UInt64Array = downcast_primitive_array(subsort)?; - let key_hash: &UInt64Array = downcast_primitive_array(key_hash)?; - - let mut prev_time = lower_bound.time; - let mut prev_subsort = lower_bound.subsort; - let mut prev_key_hash = lower_bound.key_hash; - - let curr_time = time.value(0); - let curr_subsort = subsort.value(0); - let curr_key_hash = key_hash.value(0); - - let order = prev_time - .cmp(&curr_time) - .then(prev_subsort.cmp(&curr_subsort)) - .then(prev_key_hash.cmp(&curr_key_hash)); - - anyhow::ensure!( - order.is_le(), - "Expected lower_bound <= data[0], but ({}, {}, {}) > ({}, {}, {})", - prev_time, - prev_subsort, - prev_key_hash, - curr_time, - curr_subsort, - curr_key_hash - ); - - prev_time = curr_time; - prev_subsort = curr_subsort; - prev_key_hash = curr_key_hash; - - for i in 1..time.len() { - let curr_time = time.value(i); - let curr_subsort = subsort.value(i); - let curr_key_hash = key_hash.value(i); - - let order = prev_time - .cmp(&curr_time) - .then(prev_subsort.cmp(&curr_subsort)) - .then(prev_key_hash.cmp(&curr_key_hash)); - - anyhow::ensure!( - order.is_lt(), - "Expected data[i - 1] < data[i], but ({}, {}, {}) >= ({}, {}, {})", - prev_time, - prev_subsort, - prev_key_hash, - curr_time, - curr_subsort, - curr_key_hash - ); - - prev_time = curr_time; - prev_subsort = curr_subsort; - prev_key_hash = curr_key_hash; + /// Split off the rows less or equal to the given time. + /// + /// Returns the rows less than or equal to the given time and + /// leaves the remaining rows in `self`. + fn split_up_to(&mut self, time_inclusive: RowTime) -> Self { + let time_column: &TimestampNanosecondArray = self.time.as_primitive(); + + // 0..slice_start (len = slice_start) = what we return + // slice_start..len (len = len - slice_start) = what we leave in self + let slice_start = time_column + .values() + .partition_point(|time| RowTime::from_timestamp_ns(*time) <= time_inclusive); + let slice_len = self.data.len() - slice_start; + + let max_result_time = RowTime::from_timestamp_ns(time_column.values()[slice_start - 1]); + let min_self_time = RowTime::from_timestamp_ns(time_column.values()[slice_start]); + + let result = Self { + data: self.data.slice(0, slice_start), + time: self.time.slice(0, slice_start), + subsort: self.subsort.slice(0, slice_start), + key_hash: self.key_hash.slice(0, slice_start), + min_present_time: self.min_present_time, + max_present_time: max_result_time, + }; + + self.data = self.data.slice(slice_start, slice_len); + self.time = self.time.slice(slice_start, slice_len); + self.key_hash = self.key_hash.slice(slice_start, slice_len); + self.min_present_time = min_self_time; + + result } - let curr_time = upper_bound.time; - let curr_subsort = upper_bound.subsort; - let curr_key_hash = upper_bound.key_hash; - - let order = prev_time - .cmp(&curr_time) - .then(prev_subsort.cmp(&curr_subsort)) - .then(prev_key_hash.cmp(&curr_key_hash)); - - anyhow::ensure!( - order.is_le(), - "Expected last data <= upper bound, but ({}, {}, {}) > ({}, {}, {})", - prev_time, - prev_subsort, - prev_key_hash, - curr_time, - curr_subsort, - curr_key_hash - ); - - Ok(()) -} + pub(crate) fn time(&self) -> &TimestampNanosecondArray { + self.time.as_primitive() + } -#[cfg(debug_assertions)] -fn validate( - data: &RecordBatch, - lower_bound: &KeyTriple, - upper_bound: &KeyTriple, -) -> anyhow::Result<()> { - validate_batch_schema(data.schema().as_ref())?; - - for key_column in 0..3 { - anyhow::ensure!( - data.column(key_column).null_count() == 0, - "Key column '{}' should not contain null", - data.schema().field(key_column).name() - ); + pub(crate) fn subsort(&self) -> &UInt64Array { + self.subsort.as_primitive() } - validate_bounds( - data.column(0), - data.column(1), - data.column(2), - lower_bound, - upper_bound, - ) + pub(crate) fn key_hash(&self) -> &UInt64Array { + self.key_hash.as_primitive() + } } -fn validate_key_column( - schema: &Schema, - index: usize, - expected_name: &str, - expected_type: &arrow::datatypes::DataType, -) -> anyhow::Result<()> { - anyhow::ensure!( - schema.field(index).name() == expected_name, - "Expected column {} to be '{}' but was '{}'", - index, - expected_name, - schema.field(index).name() - ); - anyhow::ensure!( - schema.field(index).data_type() == expected_type, - "Key column '{}' should be '{:?}' but was '{:?}'", - expected_name, - schema.field(index).data_type(), - expected_type, - ); - - Ok(()) +#[derive(Debug, derive_more::Display)] +pub enum Error { + #[display(fmt = "internal error")] + Internal, } -/// A channel for sending record batches. -pub type BatchSender = tokio::sync::mpsc::Sender; - -/// A channel for receiving record batches. -pub type BatchReceiver = tokio::sync::mpsc::Receiver; - -#[repr(transparent)] -pub struct AsJson<'a>(&'a Batch); - -impl<'a> std::fmt::Display for AsJson<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut json_string = Vec::new(); - let mut writer = arrow::json::LineDelimitedWriter::new(&mut json_string); - - writer.write_batches(&[&self.0.data]).map_err(|e| { - tracing::error!("Error formatting batch: {}", e); - std::fmt::Error - })?; - writer.finish().map_err(|e| { - tracing::error!("Error formatting batch: {}", e); - std::fmt::Error - })?; - let json_string = String::from_utf8(json_string).map_err(|e| { - tracing::error!("Error formatting batch: {}", e); - std::fmt::Error - })?; - - write!(f, "{json_string}") +impl error_stack::Context for Error {} + +#[cfg(any(test, feature = "testing"))] +#[static_init::dynamic] +static MINIMAL_SCHEMA: arrow_schema::SchemaRef = { + use std::sync::Arc; + + use arrow_array::types::{TimestampNanosecondType, UInt64Type}; + use arrow_array::ArrowPrimitiveType; + use arrow_schema::{Field, Schema}; + + let schema = Schema::new(vec![ + Field::new("time", TimestampNanosecondType::DATA_TYPE, false), + Field::new("key_hash", UInt64Type::DATA_TYPE, false), + ]); + Arc::new(schema) +}; + +#[cfg(test)] +mod tests { + + use crate::testing::arb_arrays::arb_batch; + use crate::{Batch, RowTime}; + use itertools::Itertools; + use proptest::prelude::*; + + #[test] + fn test_split_batch() { + let mut batch = Batch::minimal_from(vec![0, 1], vec![0, 0], 1); + + let result = batch.split_up_to(RowTime::from_timestamp_ns(0)).unwrap(); + + assert_eq!(result, Batch::minimal_from(vec![0], vec![0], 0)); + assert_eq!(batch, Batch::minimal_from(vec![1], vec![0], 1)); + } + + proptest::proptest! { + #[test] + fn test_split_in_batch(original in arb_batch(2..100)) { + for time in original.time().unwrap().values().iter().dedup() { + let time = RowTime::from_timestamp_ns(*time); + + let mut remainder = original.clone(); + + if let Some(result) = remainder.split_up_to(time) { + // The result of the split is complete up to the requested time. + prop_assert_eq!(result.up_to_time, time); + // The remainder is complete up to the original time. + prop_assert_eq!(remainder.up_to_time, original.up_to_time); + + // create the concatenated result + let concatenated = match (result.data(), remainder.data()) { + (None, None) => unreachable!(), + (Some(a), None) => a.clone(), + (None, Some(b)) => b.clone(), + (Some(a), Some(b)) => arrow_select::concat::concat(&[a.as_ref(), b.as_ref()]).unwrap(), + }; + prop_assert_eq!(&concatenated, original.data().unwrap()); + + prop_assert!(result.data.is_some()); + let result = result.data.unwrap(); + + prop_assert_eq!(result.data.len(), result.time.len()); + prop_assert_eq!(result.time().values()[0], i64::from(result.min_present_time)); + prop_assert_eq!(result.time().values()[result.time.len() - 1], i64::from(result.max_present_time)); + + if time == original.max_present_time().unwrap() { + // If the time equalled the max present time, then we consume the batch. + prop_assert!(remainder.is_empty()); + } else { + prop_assert!(remainder.data.is_some()); + let remainder = remainder.data.unwrap(); + + prop_assert_eq!(remainder.data.len(), remainder.time.len()); + prop_assert_eq!(remainder.time().values()[0], i64::from(remainder.min_present_time)); + prop_assert_eq!(remainder.time().values()[remainder.time.len() - 1], i64::from(remainder.max_present_time)); + } + } else { + prop_assert!(false) + } + } + } } } diff --git a/crates/sparrow-batch/src/lib.rs b/crates/sparrow-batch/src/lib.rs index 660ed512e..d6d374423 100644 --- a/crates/sparrow-batch/src/lib.rs +++ b/crates/sparrow-batch/src/lib.rs @@ -9,6 +9,8 @@ mod batch; mod error; +mod row_time; pub use batch::*; pub use error::Error; +pub use row_time::*; diff --git a/crates/sparrow-execution/src/lib.rs b/crates/sparrow-execution/src/lib.rs index 403d3ad87..c03959e17 100644 --- a/crates/sparrow-execution/src/lib.rs +++ b/crates/sparrow-execution/src/lib.rs @@ -25,7 +25,6 @@ mod tests { use parking_lot::Mutex; use sparrow_arrow::scalar_value::ScalarValue; use sparrow_batch::Batch; - // use sparrow_arrow::{Batch, RowTime}; use sparrow_core::ReadConfig; use sparrow_interfaces::Source; use sparrow_scheduler::{Pipeline, PipelineError, PipelineInput, WorkerPool}; diff --git a/crates/sparrow-expressions/Cargo.toml b/crates/sparrow-expressions/Cargo.toml index f91b4e52f..93438188c 100644 --- a/crates/sparrow-expressions/Cargo.toml +++ b/crates/sparrow-expressions/Cargo.toml @@ -28,6 +28,7 @@ itertools.workspace = true num.workspace = true serde_json.workspace = true sparrow-arrow = { path = "../sparrow-arrow" } +sparrow-batch = { path = "../sparrow-batch" } sparrow-physical = { path = "../sparrow-physical" } static_init.workspace = true substring.workspace = true diff --git a/crates/sparrow-merge/Cargo.toml b/crates/sparrow-merge/Cargo.toml index 37085de78..c83af0294 100644 --- a/crates/sparrow-merge/Cargo.toml +++ b/crates/sparrow-merge/Cargo.toml @@ -30,7 +30,6 @@ itertools.workspace = true proptest = { workspace = true, optional = true } smallvec.workspace = true sparrow-arrow = { path = "../sparrow-arrow" } -sparrow-batch = { path = "../sparrow-batch" } sparrow-core = { path = "../sparrow-core" } sparrow-interfaces = { path = "../sparrow-interfaces" } tokio.workspace = true diff --git a/crates/sparrow-merge/src/old/input.rs b/crates/sparrow-merge/src/old/input.rs index 3b08f0b69..26aafbc96 100644 --- a/crates/sparrow-merge/src/old/input.rs +++ b/crates/sparrow-merge/src/old/input.rs @@ -1,5 +1,4 @@ use smallvec::SmallVec; -use sparrow_batch::Batch; /// An `InputItem` is is a splittable container of time-ordered data. /// diff --git a/crates/sparrow-runtime/Cargo.toml b/crates/sparrow-runtime/Cargo.toml index 253b79cea..6d2ae68b1 100644 --- a/crates/sparrow-runtime/Cargo.toml +++ b/crates/sparrow-runtime/Cargo.toml @@ -60,7 +60,6 @@ sha2.workspace = true smallvec.workspace = true sparrow-api = { path = "../sparrow-api" } sparrow-arrow = { path = "../sparrow-arrow" } -sparrow-batch = { path = "../sparrow-batch" } sparrow-compiler = { path = "../sparrow-compiler" } sparrow-core = { path = "../sparrow-core" } sparrow-instructions = { path = "../sparrow-instructions" } diff --git a/crates/sparrow-runtime/src/execute/compute_executor.rs b/crates/sparrow-runtime/src/execute/compute_executor.rs index 16929b4cf..96757336b 100644 --- a/crates/sparrow-runtime/src/execute/compute_executor.rs +++ b/crates/sparrow-runtime/src/execute/compute_executor.rs @@ -26,7 +26,6 @@ use crate::execute::Error::Internal; use crate::stores::ObjectStoreRegistry; use crate::util::JoinTask; use crate::RuntimeOptions; -use sparrow_batch::Batch; pub(crate) struct ComputeExecutor { object_stores: Arc, diff --git a/crates/sparrow-runtime/src/execute/operation.rs b/crates/sparrow-runtime/src/execute/operation.rs index 9ef2d4698..f3406ddc3 100644 --- a/crates/sparrow-runtime/src/execute/operation.rs +++ b/crates/sparrow-runtime/src/execute/operation.rs @@ -67,7 +67,6 @@ use crate::execute::operation::shift_until::ShiftUntilOperation; use crate::execute::Error; use crate::key_hash_inverse::ThreadSafeKeyHashInverse; use crate::stores::ObjectStoreRegistry; -use sparrow_batch::Batch; /// Information used while creating operations. /// diff --git a/crates/sparrow-runtime/src/lib.rs b/crates/sparrow-runtime/src/lib.rs index beefa74fa..706fbeb75 100644 --- a/crates/sparrow-runtime/src/lib.rs +++ b/crates/sparrow-runtime/src/lib.rs @@ -22,6 +22,7 @@ clippy::undocumented_unsafe_blocks )] +mod batch; pub mod execute; mod key_hash_index; pub mod key_hash_inverse; @@ -35,6 +36,7 @@ mod util; use std::path::PathBuf; +pub use batch::*; pub use metadata::*; pub use prepare::preparer; pub use read::*; diff --git a/crates/sparrow-scheduler/Cargo.toml b/crates/sparrow-scheduler/Cargo.toml index 1f6db8e9c..67e2f737c 100644 --- a/crates/sparrow-scheduler/Cargo.toml +++ b/crates/sparrow-scheduler/Cargo.toml @@ -17,6 +17,7 @@ index_vec.workspace = true itertools.workspace = true serde.workspace = true sparrow-arrow = { path = "../sparrow-arrow" } +sparrow-batch = { path = "../sparrow-batch" } tracing.workspace = true work-queue = "0.1.4" diff --git a/crates/sparrow-scheduler/src/pipeline.rs b/crates/sparrow-scheduler/src/pipeline.rs index 9630828b6..6f96b20b3 100644 --- a/crates/sparrow-scheduler/src/pipeline.rs +++ b/crates/sparrow-scheduler/src/pipeline.rs @@ -1,6 +1,6 @@ use std::borrow::Cow; -use sparrow_arrow::Batch; +use sparrow_batch::Batch; use crate::{Partition, Partitioned, Scheduler, TaskRef}; diff --git a/crates/sparrow-scheduler/src/sink.rs b/crates/sparrow-scheduler/src/sink.rs index 6f7fe7c15..b99533eca 100644 --- a/crates/sparrow-scheduler/src/sink.rs +++ b/crates/sparrow-scheduler/src/sink.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use sparrow_arrow::Batch; +use sparrow_batch::Batch; use crate::{Partition, Pipeline, PipelineError, Scheduler};