diff --git a/Cargo.lock b/Cargo.lock index 79033deaa..33e030572 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5031,6 +5031,7 @@ dependencies = [ "itertools 0.11.0", "parking_lot 0.12.1", "sparrow-arrow", + "sparrow-batch", "sparrow-expressions", "sparrow-physical", "sparrow-scheduler", diff --git a/crates/sparrow-arrow/src/lib.rs b/crates/sparrow-arrow/src/lib.rs index f39d56cff..932cc1afe 100644 --- a/crates/sparrow-arrow/src/lib.rs +++ b/crates/sparrow-arrow/src/lib.rs @@ -16,8 +16,6 @@ pub mod hash; pub mod hasher; pub mod scalar_value; pub mod serde; -#[cfg(any(test, feature = "testing"))] -pub mod testing; pub mod utils; pub use concat_take::*; diff --git a/crates/sparrow-batch/src/batch.rs b/crates/sparrow-batch/src/batch.rs index 86a48b30d..419baa2ae 100644 --- a/crates/sparrow-batch/src/batch.rs +++ b/crates/sparrow-batch/src/batch.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use arrow::ipc::Timestamp; use arrow_array::cast::AsArray; use arrow_array::types::{TimestampNanosecondType, UInt64Type}; use arrow_array::{ @@ -8,7 +7,7 @@ use arrow_array::{ UInt64Array, }; use arrow_schema::{Field, Fields, Schema}; -use error_stack::{IntoReport, IntoReportCompat, ResultExt}; +use error_stack::{IntoReport, ResultExt}; use itertools::Itertools; use crate::{Error, RowTime}; @@ -335,9 +334,6 @@ impl Batch { key_hash: impl Into, up_to_time: i64, ) -> Self { - use arrow_array::StructArray; - use std::sync::Arc; - let time: TimestampNanosecondArray = time.into(); let subsort: UInt64Array = (0..(time.len() as u64)).collect_vec().into(); let key_hash: UInt64Array = key_hash.into(); @@ -597,10 +593,10 @@ static MINIMAL_SCHEMA: arrow_schema::SchemaRef = { #[cfg(test)] mod tests { + use crate::testing::arb_arrays::arb_batch; use crate::{Batch, RowTime}; use itertools::Itertools; use proptest::prelude::*; - use sparrow_arrow::testing::arb_arrays::arb_batch; #[test] fn test_split_batch() { diff --git a/crates/sparrow-batch/src/lib.rs b/crates/sparrow-batch/src/lib.rs index d6d374423..a5556c876 100644 --- a/crates/sparrow-batch/src/lib.rs +++ b/crates/sparrow-batch/src/lib.rs @@ -14,3 +14,6 @@ mod row_time; pub use batch::*; pub use error::Error; pub use row_time::*; + +#[cfg(any(test, feature = "testing"))] +pub mod testing; diff --git a/crates/sparrow-arrow/src/testing.rs b/crates/sparrow-batch/src/testing.rs similarity index 100% rename from crates/sparrow-arrow/src/testing.rs rename to crates/sparrow-batch/src/testing.rs diff --git a/crates/sparrow-arrow/src/testing/arb_arrays.rs b/crates/sparrow-batch/src/testing/arb_arrays.rs similarity index 100% rename from crates/sparrow-arrow/src/testing/arb_arrays.rs rename to crates/sparrow-batch/src/testing/arb_arrays.rs diff --git a/crates/sparrow-execution/src/lib.rs b/crates/sparrow-execution/src/lib.rs index c03959e17..8ed31d0b8 100644 --- a/crates/sparrow-execution/src/lib.rs +++ b/crates/sparrow-execution/src/lib.rs @@ -20,7 +20,7 @@ mod tests { use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit}; use error_stack::{IntoReport, ResultExt}; use futures::stream::BoxStream; - use futures::{StreamExt, TryStreamExt}; + use futures::TryStreamExt; use index_vec::index_vec; use parking_lot::Mutex; use sparrow_arrow::scalar_value::ScalarValue; @@ -117,7 +117,7 @@ mod tests { pub async fn execute( query_id: String, input_type: DataType, - input: BoxStream<'_, error_stack::Result>, + mut input: BoxStream<'_, error_stack::Result>, output_type: DataType, output: tokio::sync::mpsc::Sender, ) -> error_stack::Result<(), Error> { @@ -199,7 +199,7 @@ mod tests { let transform_pipeline_input = PipelineInput::new(transform_pipeline, 0); let mut injector = worker_pool.injector().clone(); - while let Some(batch) = input.try_next().await { + while let Some(batch) = input.try_next().await.unwrap() { transform_pipeline_input .add_input(0.into(), batch, &mut injector) .change_context(Error::Executing)?; diff --git a/crates/sparrow-sources/src/in_memory.rs b/crates/sparrow-sources/src/in_memory.rs index 4c3e7dde9..7edff7760 100644 --- a/crates/sparrow-sources/src/in_memory.rs +++ b/crates/sparrow-sources/src/in_memory.rs @@ -57,14 +57,12 @@ impl Source for InMemory { }) .and_then(|batch| async move { Batch::try_new_from_batch(batch) - .into_report() .change_context(sparrow_interfaces::Error::internal_msg("invalid input")) }) .boxed() } else if let Some(batch) = self.data.current() { futures::stream::once(async move { Batch::try_new_from_batch(batch) - .into_report() .change_context(sparrow_interfaces::Error::internal_msg("invalid input")) }) .boxed() diff --git a/crates/sparrow-transforms/Cargo.toml b/crates/sparrow-transforms/Cargo.toml index db5d87d84..3b4913553 100644 --- a/crates/sparrow-transforms/Cargo.toml +++ b/crates/sparrow-transforms/Cargo.toml @@ -18,6 +18,7 @@ error-stack.workspace = true itertools.workspace = true parking_lot.workspace = true sparrow-arrow = { path = "../sparrow-arrow" } +sparrow-batch = { path = "../sparrow-batch" } sparrow-expressions = { path = "../sparrow-expressions" } sparrow-physical = { path = "../sparrow-physical" } sparrow-scheduler = { path = "../sparrow-scheduler" } diff --git a/crates/sparrow-transforms/src/project.rs b/crates/sparrow-transforms/src/project.rs index b46257a54..e1a358cad 100644 --- a/crates/sparrow-transforms/src/project.rs +++ b/crates/sparrow-transforms/src/project.rs @@ -1,6 +1,6 @@ use arrow_schema::DataType; use error_stack::ResultExt; -use sparrow_arrow::Batch; +use sparrow_batch::Batch; use sparrow_expressions::ExpressionExecutor; use sparrow_physical::Exprs; @@ -50,7 +50,8 @@ mod tests { use arrow_array::{Int64Array, StructArray, TimestampNanosecondArray, UInt64Array}; use arrow_schema::{Field, Fields}; use index_vec::index_vec; - use sparrow_arrow::{scalar_value::ScalarValue, RowTime}; + use sparrow_arrow::scalar_value::ScalarValue; + use sparrow_batch::RowTime; fn test_batch() -> Batch { let input_fields = Fields::from(vec![ diff --git a/crates/sparrow-transforms/src/select.rs b/crates/sparrow-transforms/src/select.rs index 32bb4ffc1..59f4c4c2f 100644 --- a/crates/sparrow-transforms/src/select.rs +++ b/crates/sparrow-transforms/src/select.rs @@ -1,7 +1,7 @@ use arrow_array::{cast::AsArray, BooleanArray}; use arrow_schema::DataType; use error_stack::{IntoReport, ResultExt}; -use sparrow_arrow::Batch; +use sparrow_batch::Batch; use sparrow_expressions::ExpressionExecutor; use sparrow_physical::Exprs; @@ -99,7 +99,8 @@ mod tests { use arrow_array::{Int64Array, StructArray, TimestampNanosecondArray, UInt64Array}; use arrow_schema::{Field, Fields}; use index_vec::index_vec; - use sparrow_arrow::{scalar_value::ScalarValue, RowTime}; + use sparrow_arrow::scalar_value::ScalarValue; + use sparrow_batch::RowTime; fn test_batch() -> Batch { let input_fields = Fields::from(vec![ diff --git a/crates/sparrow-transforms/src/transform.rs b/crates/sparrow-transforms/src/transform.rs index 18e91cc81..ecea88dff 100644 --- a/crates/sparrow-transforms/src/transform.rs +++ b/crates/sparrow-transforms/src/transform.rs @@ -1,5 +1,5 @@ use arrow_schema::DataType; -use sparrow_arrow::Batch; +use sparrow_batch::Batch; #[derive(derive_more::Display, Debug)] pub enum Error { diff --git a/crates/sparrow-transforms/src/transform_pipeline.rs b/crates/sparrow-transforms/src/transform_pipeline.rs index 1687ed64c..7332fbf60 100644 --- a/crates/sparrow-transforms/src/transform_pipeline.rs +++ b/crates/sparrow-transforms/src/transform_pipeline.rs @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use error_stack::ResultExt; use itertools::Itertools; use parking_lot::Mutex; -use sparrow_arrow::Batch; +use sparrow_batch::Batch; use sparrow_physical::{StepId, StepKind}; use sparrow_scheduler::{ Partition, Partitioned, Pipeline, PipelineError, PipelineInput, Scheduler, TaskRef,