From 13be46f875306ad6a9babc00e5a061cb78eb6e86 Mon Sep 17 00:00:00 2001 From: jordanrfrazier <122494242+jordanrfrazier@users.noreply.github.com> Date: Wed, 11 Oct 2023 20:42:08 -0700 Subject: [PATCH] feat: add source trait (#803) Adds the `source` trait for sources to implement. The idea being they'll be created as async functions and we can just call next on the returned `stream` -- no need to make them a whole `ReadPipeline` or anything we think. * Adds the `sparrow-sources` crate * Adds the `sparrow-interfaces` crate * Refactors `batch` into `sparrow-batch`. --------- Co-authored-by: Ben Chambers <35960+bjchambers@users.noreply.github.com> --- Cargo.lock | 57 +++ crates/sparrow-arrow/src/lib.rs | 6 - crates/sparrow-batch/Cargo.toml | 30 ++ .../src/batch.rs | 211 +++++++- crates/sparrow-batch/src/error.rs | 17 + crates/sparrow-batch/src/lib.rs | 19 + .../src/row_time.rs | 6 + .../src/testing.rs | 0 .../src/testing/arb_arrays.rs | 0 crates/sparrow-compiler/Cargo.toml | 1 + crates/sparrow-compiler/src/data_context.rs | 2 +- crates/sparrow-execution/Cargo.toml | 5 + crates/sparrow-execution/src/lib.rs | 2 - crates/sparrow-execution/src/tests.rs | 470 +++++++++--------- crates/sparrow-expressions/Cargo.toml | 1 + crates/sparrow-expressions/src/executor.rs | 2 +- crates/sparrow-expressions/src/work_area.rs | 2 +- crates/sparrow-interfaces/Cargo.toml | 24 + crates/sparrow-interfaces/src/lib.rs | 14 + crates/sparrow-interfaces/src/source.rs | 45 ++ crates/sparrow-interfaces/src/source_error.rs | 22 + crates/sparrow-merge/src/lib.rs | 3 - .../sparrow-runtime/src/key_hash_inverse.rs | 2 +- crates/sparrow-scheduler/Cargo.toml | 1 + crates/sparrow-scheduler/src/pipeline.rs | 2 +- crates/sparrow-scheduler/src/sink.rs | 2 +- crates/sparrow-session/Cargo.toml | 1 + crates/sparrow-session/src/table.rs | 2 +- crates/sparrow-sources/Cargo.toml | 30 ++ .../src/in_memory.rs} | 99 +++- crates/sparrow-sources/src/lib.rs | 12 + crates/sparrow-transforms/Cargo.toml | 1 + crates/sparrow-transforms/src/project.rs | 5 +- crates/sparrow-transforms/src/select.rs | 5 +- crates/sparrow-transforms/src/transform.rs | 2 +- .../src/transform_pipeline.rs | 2 +- 36 files changed, 812 insertions(+), 293 deletions(-) create mode 100644 crates/sparrow-batch/Cargo.toml rename crates/{sparrow-arrow => sparrow-batch}/src/batch.rs (73%) create mode 100644 crates/sparrow-batch/src/error.rs create mode 100644 crates/sparrow-batch/src/lib.rs rename crates/{sparrow-arrow => sparrow-batch}/src/row_time.rs (90%) rename crates/{sparrow-arrow => sparrow-batch}/src/testing.rs (100%) rename crates/{sparrow-arrow => sparrow-batch}/src/testing/arb_arrays.rs (100%) create mode 100644 crates/sparrow-interfaces/Cargo.toml create mode 100644 crates/sparrow-interfaces/src/lib.rs create mode 100644 crates/sparrow-interfaces/src/source.rs create mode 100644 crates/sparrow-interfaces/src/source_error.rs create mode 100644 crates/sparrow-sources/Cargo.toml rename crates/{sparrow-merge/src/in_memory_batches.rs => sparrow-sources/src/in_memory.rs} (59%) create mode 100644 crates/sparrow-sources/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index b44e0b50b..9bc0cfa18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4279,6 +4279,23 @@ dependencies = [ "uuid 1.4.1", ] +[[package]] +name = "sparrow-batch" +version = "0.11.0" +dependencies = [ + "arrow", + "arrow-array", + "arrow-schema", + "arrow-select", + "derive_more", + "error-stack", + "itertools 0.11.0", + "proptest", + "sparrow-merge", + "sparrow-testing", + "static_init", +] + [[package]] name = "sparrow-catalog" version = "0.11.0" @@ -4342,6 +4359,7 @@ dependencies = [ "sparrow-core", "sparrow-instructions", "sparrow-merge", + "sparrow-sources", "sparrow-syntax", "static_init", "strum 0.25.0", @@ -4374,11 +4392,16 @@ dependencies = [ "arrow-schema", "derive_more", "error-stack", + "futures", "index_vec", "parking_lot 0.12.1", "sparrow-arrow", + "sparrow-batch", + "sparrow-core", + "sparrow-interfaces", "sparrow-physical", "sparrow-scheduler", + "sparrow-sources", "sparrow-testing", "sparrow-transforms", "tokio", @@ -4408,6 +4431,7 @@ dependencies = [ "num", "serde_json", "sparrow-arrow", + "sparrow-batch", "sparrow-physical", "static_init", "substring", @@ -4453,6 +4477,18 @@ dependencies = [ "uuid 1.4.1", ] +[[package]] +name = "sparrow-interfaces" +version = "0.11.0" +dependencies = [ + "arrow-schema", + "derive_more", + "error-stack", + "futures", + "sparrow-batch", + "sparrow-core", +] + [[package]] name = "sparrow-kernels" version = "0.11.0" @@ -4704,6 +4740,7 @@ dependencies = [ "loom", "serde", "sparrow-arrow", + "sparrow-batch", "tracing", "work-queue", ] @@ -4725,6 +4762,7 @@ dependencies = [ "sparrow-instructions", "sparrow-merge", "sparrow-runtime", + "sparrow-sources", "sparrow-syntax", "static_init", "tempfile", @@ -4733,6 +4771,24 @@ dependencies = [ "uuid 1.4.1", ] +[[package]] +name = "sparrow-sources" +version = "0.11.0" +dependencies = [ + "arrow-array", + "arrow-schema", + "async-broadcast", + "async-stream", + "derive_more", + "error-stack", + "futures", + "sparrow-batch", + "sparrow-core", + "sparrow-interfaces", + "sparrow-merge", + "tracing", +] + [[package]] name = "sparrow-syntax" version = "0.11.0" @@ -4788,6 +4844,7 @@ dependencies = [ "itertools 0.11.0", "parking_lot 0.12.1", "sparrow-arrow", + "sparrow-batch", "sparrow-expressions", "sparrow-physical", "sparrow-scheduler", diff --git a/crates/sparrow-arrow/src/lib.rs b/crates/sparrow-arrow/src/lib.rs index 65d2f40b4..932cc1afe 100644 --- a/crates/sparrow-arrow/src/lib.rs +++ b/crates/sparrow-arrow/src/lib.rs @@ -10,18 +10,12 @@ pub mod attachments; #[cfg(feature = "avro")] pub mod avro; -mod batch; mod concat_take; pub mod downcast; pub mod hash; pub mod hasher; -mod row_time; pub mod scalar_value; pub mod serde; -#[cfg(any(test, feature = "testing"))] -pub mod testing; pub mod utils; -pub use batch::*; pub use concat_take::*; -pub use row_time::*; diff --git a/crates/sparrow-batch/Cargo.toml b/crates/sparrow-batch/Cargo.toml new file mode 100644 index 000000000..b7350929d --- /dev/null +++ b/crates/sparrow-batch/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "sparrow-batch" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +publish = false +description = """ +Defines the main struct for wrapping RecordBatches in execution. +""" + +[dependencies] +arrow.workspace = true +arrow-array.workspace = true +arrow-schema.workspace = true +arrow-select.workspace = true +derive_more.workspace = true +error-stack.workspace = true +itertools.workspace = true +static_init.workspace = true +proptest = { workspace = true, optional = true } + +[dev-dependencies] +sparrow-testing = { path = "../sparrow-testing" } +sparrow-merge = { path = "../sparrow-merge" } +proptest.workspace = true + +[lib] +bench = false +doctest = false diff --git a/crates/sparrow-arrow/src/batch.rs b/crates/sparrow-batch/src/batch.rs similarity index 73% rename from crates/sparrow-arrow/src/batch.rs rename to crates/sparrow-batch/src/batch.rs index 245da081c..8fc8dbdba 100644 --- a/crates/sparrow-arrow/src/batch.rs +++ b/crates/sparrow-batch/src/batch.rs @@ -1,17 +1,23 @@ +use std::sync::Arc; + use arrow_array::cast::AsArray; use arrow_array::types::{TimestampNanosecondType, UInt64Type}; -use arrow_array::{Array, ArrayRef, ArrowPrimitiveType, TimestampNanosecondArray, UInt64Array}; +use arrow_array::{ + Array, ArrayRef, ArrowPrimitiveType, RecordBatch, StructArray, TimestampNanosecondArray, + UInt64Array, +}; +use arrow_schema::{Fields, Schema}; use error_stack::{IntoReport, ResultExt}; use itertools::Itertools; -use crate::downcast::downcast_primitive_array; -use crate::RowTime; +use crate::{Error, RowTime}; /// A batch to be processed by the system. #[derive(Clone, PartialEq, Debug)] pub struct Batch { /// The data associated with the batch. pub(crate) data: Option, + /// An indication that the batch stream has completed up to the given time. /// Any rows in future batches on this stream must have a time strictly /// greater than this. @@ -26,6 +32,51 @@ impl Batch { } } + /// Construct a new batch, inferring the upper bound from the data. + /// + /// It is expected that that data is sorted. + pub fn try_new_from_batch(data: RecordBatch) -> error_stack::Result { + error_stack::ensure!( + data.num_rows() > 0, + Error::internal_msg("Unable to create batch from empty data".to_owned()) + ); + + let time_column: &TimestampNanosecondArray = data.column(0).as_primitive(); + let up_to_time = RowTime::from_timestamp_ns(time_column.value(time_column.len() - 1)); + + #[cfg(debug_assertions)] + validate(&data, up_to_time)?; + + let time: &TimestampNanosecondArray = data.column(0).as_primitive(); + let min_present_time = time.value(0); + let max_present_time = time.value(time.len() - 1); + + let time = data.column(0).clone(); + let subsort = data.column(1).clone(); + let key_hash = data.column(2).clone(); + + // TODO: This creates a `Fields` from the schema, dropping the key columns. + // Under the hood, this creates a new vec of fields. It would be better to + // do this outside of this try_new, then share that `Fields`. + let schema = data.schema(); + let fields: Fields = schema.fields()[3..].into(); + let columns: Vec = data.columns()[3..].to_vec(); + + let data = Arc::new(StructArray::new(fields, columns, None)); + + Ok(Self { + data: Some(BatchInfo { + data, + time, + subsort, + key_hash, + min_present_time: min_present_time.into(), + max_present_time: max_present_time.into(), + }), + up_to_time, + }) + } + pub fn is_empty(&self) -> bool { self.data.is_none() } @@ -173,7 +224,7 @@ impl Batch { .collect(); let time = arrow_select::concat::concat(×) .into_report() - .change_context(Error::Internal)?; + .change_context(Error::internal())?; let subsorts: Vec<_> = batches .iter() @@ -182,7 +233,7 @@ impl Batch { .collect(); let subsort = arrow_select::concat::concat(&subsorts) .into_report() - .change_context(Error::Internal)?; + .change_context(Error::internal())?; let key_hashes: Vec<_> = batches .iter() @@ -191,7 +242,7 @@ impl Batch { .collect(); let key_hash = arrow_select::concat::concat(&key_hashes) .into_report() - .change_context(Error::Internal)?; + .change_context(Error::internal())?; let batches: Vec<_> = batches .iter() @@ -200,7 +251,7 @@ impl Batch { .collect(); let data = arrow_select::concat::concat(&batches) .into_report() - .change_context(Error::Internal)?; + .change_context(Error::internal())?; Ok(Self { data: Some(BatchInfo { @@ -220,16 +271,16 @@ impl Batch { Some(info) => { let data = arrow_select::take::take(info.data.as_ref(), indices, None) .into_report() - .change_context(Error::Internal)?; + .change_context(Error::internal())?; let time = arrow_select::take::take(info.time.as_ref(), indices, None) .into_report() - .change_context(Error::Internal)?; + .change_context(Error::internal())?; let subsort = arrow_select::take::take(info.subsort.as_ref(), indices, None) .into_report() - .change_context(Error::Internal)?; + .change_context(Error::internal())?; let key_hash = arrow_select::take::take(info.key_hash.as_ref(), indices, None) .into_report() - .change_context(Error::Internal)?; + .change_context(Error::internal())?; let info = BatchInfo { data, time, @@ -283,10 +334,6 @@ impl Batch { key_hash: impl Into, up_to_time: i64, ) -> Self { - use std::sync::Arc; - - use arrow_array::StructArray; - let time: TimestampNanosecondArray = time.into(); let subsort: UInt64Array = (0..(time.len() as u64)).collect_vec().into(); let key_hash: UInt64Array = key_hash.into(); @@ -311,6 +358,126 @@ impl Batch { } } +#[cfg(debug_assertions)] +fn validate(data: &RecordBatch, up_to_time: RowTime) -> error_stack::Result<(), Error> { + validate_batch_schema(data.schema().as_ref())?; + + for key_column in 0..3 { + error_stack::ensure!( + data.column(key_column).null_count() == 0, + Error::internal_msg(format!( + "Key column '{}' should not contain null", + data.schema().field(key_column).name() + )) + ); + } + + validate_bounds(data.column(0), data.column(1), data.column(2), up_to_time) +} + +#[cfg(debug_assertions)] +/// Validate that the result is sorted. +/// +/// Note: This only validates the up_to_time bound. +pub(crate) fn validate_bounds( + time: &ArrayRef, + subsort: &ArrayRef, + key_hash: &ArrayRef, + up_to_time: RowTime, +) -> error_stack::Result<(), Error> { + if time.len() == 0 { + // No more validation necessary for empty batches. + return Ok(()); + } + + let time: &TimestampNanosecondArray = time.as_primitive(); + let subsort: &UInt64Array = subsort.as_primitive(); + let key_hash: &UInt64Array = key_hash.as_primitive(); + + let mut prev_time = time.value(0); + let mut prev_subsort = subsort.value(0); + let mut prev_key_hash = key_hash.value(0); + + for i in 1..time.len() { + let curr_time = time.value(i); + let curr_subsort = subsort.value(i); + let curr_key_hash = key_hash.value(i); + + let order = prev_time + .cmp(&curr_time) + .then(prev_subsort.cmp(&curr_subsort)) + .then(prev_key_hash.cmp(&curr_key_hash)); + + error_stack::ensure!( + order.is_lt(), + Error::internal_msg(format!( + "Expected data[i - 1] < data[i], but ({}, {}, {}) >= ({}, {}, {})", + prev_time, prev_subsort, prev_key_hash, curr_time, curr_subsort, curr_key_hash + )) + ); + + prev_time = curr_time; + prev_subsort = curr_subsort; + prev_key_hash = curr_key_hash; + } + + let curr_time: i64 = up_to_time.into(); + let order = prev_time.cmp(&curr_time); + error_stack::ensure!( + order.is_le(), + Error::internal_msg(format!( + "Expected last data <= upper bound, but ({}) > ({})", + prev_time, curr_time + )) + ); + + Ok(()) +} + +fn validate_batch_schema(schema: &Schema) -> error_stack::Result<(), Error> { + use arrow::datatypes::{DataType, TimeUnit}; + + // Validate the three key columns are present, non-null and have the right type. + validate_key_column( + schema, + 0, + "_time", + &DataType::Timestamp(TimeUnit::Nanosecond, None), + )?; + validate_key_column(schema, 1, "_subsort", &DataType::UInt64)?; + validate_key_column(schema, 2, "_key_hash", &DataType::UInt64)?; + + Ok(()) +} + +fn validate_key_column( + schema: &Schema, + index: usize, + expected_name: &str, + expected_type: &arrow::datatypes::DataType, +) -> error_stack::Result<(), Error> { + error_stack::ensure!( + schema.field(index).name() == expected_name, + Error::internal_msg(format!( + "Expected column {} to be '{}' but was '{}'", + index, + expected_name, + schema.field(index).name() + )) + ); + error_stack::ensure!( + schema.field(index).data_type() == expected_type, + Error::internal_msg(format!( + "Key column '{}' should be '{:?}' but was '{:?}'", + expected_name, + schema.field(index).data_type(), + expected_type, + )) + ); + + Ok(()) +} + #[derive(Clone, Debug)] pub(crate) struct BatchInfo { pub(crate) data: ArrayRef, @@ -338,8 +505,7 @@ impl BatchInfo { debug_assert!(data.len() > 0); debug_assert_eq!(time.null_count(), 0); - let time_column: &TimestampNanosecondArray = - downcast_primitive_array(time.as_ref()).expect("time column to be timestamp"); + let time_column: &TimestampNanosecondArray = time.as_primitive(); // Debug assertion that the array is sorted by time. // Once `is_sorted` is stable (https://github.com/rust-lang/rust/issues/53485). @@ -366,8 +532,7 @@ impl BatchInfo { /// Returns the rows less than or equal to the given time and /// leaves the remaining rows in `self`. fn split_up_to(&mut self, time_inclusive: RowTime) -> Self { - let time_column: &TimestampNanosecondArray = - downcast_primitive_array(self.time.as_ref()).expect("time column to be timestamp"); + let time_column: &TimestampNanosecondArray = self.time.as_primitive(); // 0..slice_start (len = slice_start) = what we return // slice_start..len (len = len - slice_start) = what we leave in self @@ -409,14 +574,6 @@ impl BatchInfo { } } -#[derive(Debug, derive_more::Display)] -pub enum Error { - #[display(fmt = "internal error")] - Internal, -} - -impl error_stack::Context for Error {} - #[cfg(any(test, feature = "testing"))] #[static_init::dynamic] static MINIMAL_SCHEMA: arrow_schema::SchemaRef = { diff --git a/crates/sparrow-batch/src/error.rs b/crates/sparrow-batch/src/error.rs new file mode 100644 index 000000000..b4e5fc7a4 --- /dev/null +++ b/crates/sparrow-batch/src/error.rs @@ -0,0 +1,17 @@ +#[derive(derive_more::Display, Debug)] +pub enum Error { + #[display(fmt = "internal error: {}", _0)] + Internal(String), +} + +impl error_stack::Context for Error {} + +impl Error { + pub fn internal() -> Self { + Error::Internal("no additional context".to_owned()) + } + + pub fn internal_msg(msg: String) -> Self { + Error::Internal(msg) + } +} diff --git a/crates/sparrow-batch/src/lib.rs b/crates/sparrow-batch/src/lib.rs new file mode 100644 index 000000000..a5556c876 --- /dev/null +++ b/crates/sparrow-batch/src/lib.rs @@ -0,0 +1,19 @@ +#![warn( + rust_2018_idioms, + nonstandard_style, + future_incompatible, + clippy::mod_module_files, + clippy::print_stdout, + clippy::print_stderr +)] + +mod batch; +mod error; +mod row_time; + +pub use batch::*; +pub use error::Error; +pub use row_time::*; + +#[cfg(any(test, feature = "testing"))] +pub mod testing; diff --git a/crates/sparrow-arrow/src/row_time.rs b/crates/sparrow-batch/src/row_time.rs similarity index 90% rename from crates/sparrow-arrow/src/row_time.rs rename to crates/sparrow-batch/src/row_time.rs index 687e80a86..59c33ef73 100644 --- a/crates/sparrow-arrow/src/row_time.rs +++ b/crates/sparrow-batch/src/row_time.rs @@ -33,3 +33,9 @@ impl From for i64 { val.0 } } + +impl From for RowTime { + fn from(value: i64) -> Self { + RowTime(value) + } +} diff --git a/crates/sparrow-arrow/src/testing.rs b/crates/sparrow-batch/src/testing.rs similarity index 100% rename from crates/sparrow-arrow/src/testing.rs rename to crates/sparrow-batch/src/testing.rs diff --git a/crates/sparrow-arrow/src/testing/arb_arrays.rs b/crates/sparrow-batch/src/testing/arb_arrays.rs similarity index 100% rename from crates/sparrow-arrow/src/testing/arb_arrays.rs rename to crates/sparrow-batch/src/testing/arb_arrays.rs diff --git a/crates/sparrow-compiler/Cargo.toml b/crates/sparrow-compiler/Cargo.toml index 00787a6c4..5bfbd76b1 100644 --- a/crates/sparrow-compiler/Cargo.toml +++ b/crates/sparrow-compiler/Cargo.toml @@ -33,6 +33,7 @@ sparrow-arrow = { path = "../sparrow-arrow" } sparrow-core = { path = "../sparrow-core" } sparrow-instructions = { path = "../sparrow-instructions" } sparrow-merge = { path = "../sparrow-merge" } +sparrow-sources = { path = "../sparrow-sources" } sparrow-syntax = { path = "../sparrow-syntax" } static_init.workspace = true strum.workspace = true diff --git a/crates/sparrow-compiler/src/data_context.rs b/crates/sparrow-compiler/src/data_context.rs index afe62ff83..3d6948996 100644 --- a/crates/sparrow-compiler/src/data_context.rs +++ b/crates/sparrow-compiler/src/data_context.rs @@ -7,7 +7,7 @@ use sparrow_api::kaskada::v1alpha::slice_plan::Slice; use sparrow_api::kaskada::v1alpha::{self, compute_table, ComputeTable, PreparedFile, TableConfig}; use sparrow_core::context_code; use sparrow_instructions::{GroupId, TableId}; -use sparrow_merge::InMemoryBatches; +use sparrow_sources::InMemoryBatches; use sparrow_syntax::Location; use uuid::Uuid; diff --git a/crates/sparrow-execution/Cargo.toml b/crates/sparrow-execution/Cargo.toml index 09a673df3..3911cf2ac 100644 --- a/crates/sparrow-execution/Cargo.toml +++ b/crates/sparrow-execution/Cargo.toml @@ -13,10 +13,15 @@ Implementations of the pipelines to be executed. derive_more.workspace = true error-stack.workspace = true parking_lot.workspace = true +futures.workspace = true sparrow-arrow = { path = "../sparrow-arrow" } +sparrow-batch = { path = "../sparrow-batch" } +sparrow-core = { path = "../sparrow-core" } +sparrow-interfaces = { path = "../sparrow-interfaces" } sparrow-physical = { path = "../sparrow-physical" } sparrow-transforms = { path = "../sparrow-transforms" } sparrow-scheduler = { path = "../sparrow-scheduler" } +sparrow-sources = { path = "../sparrow-sources" } tokio.workspace = true uuid.workspace = true diff --git a/crates/sparrow-execution/src/lib.rs b/crates/sparrow-execution/src/lib.rs index b325d454d..1b1c435fb 100644 --- a/crates/sparrow-execution/src/lib.rs +++ b/crates/sparrow-execution/src/lib.rs @@ -9,6 +9,4 @@ )] //! Implementations of the pipelines to be executed. - -#[cfg(test)] mod tests; diff --git a/crates/sparrow-execution/src/tests.rs b/crates/sparrow-execution/src/tests.rs index 5a50d482c..cb239bc66 100644 --- a/crates/sparrow-execution/src/tests.rs +++ b/crates/sparrow-execution/src/tests.rs @@ -1,241 +1,263 @@ -use std::sync::Arc; - -use arrow_array::cast::AsArray; -use arrow_array::{Int64Array, StructArray, TimestampNanosecondArray, UInt64Array}; -use arrow_schema::{DataType, Field, Fields}; -use error_stack::{IntoReport, ResultExt}; -use index_vec::index_vec; -use parking_lot::Mutex; -use sparrow_arrow::scalar_value::ScalarValue; -use sparrow_arrow::{Batch, RowTime}; -use sparrow_scheduler::{Pipeline, PipelineError, PipelineInput, WorkerPool}; -use sparrow_transforms::TransformPipeline; - -#[derive(derive_more::Display, Debug)] -pub enum Error { - #[display(fmt = "error creating executor")] - Creating, - #[display(fmt = "error executing")] - Executing, -} +#[cfg(test)] +mod tests { -impl error_stack::Context for Error {} - -#[tokio::test] -#[ignore] -async fn test_query() { - sparrow_testing::init_test_logging(); - - let (input_tx, input_rx) = tokio::sync::mpsc::channel(10); - let (output_tx, mut output_rx) = tokio::sync::mpsc::channel(10); - - let input_fields = Fields::from(vec![ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Int64, true), - Field::new("c", DataType::Int64, true), - ]); - - let output_fields = Fields::from(vec![ - Field::new("ab", DataType::Int64, true), - Field::new("abc", DataType::Int64, true), - ]); - - let input_data = Arc::new(StructArray::new( - input_fields.clone(), - vec![ - Arc::new(Int64Array::from(vec![0, 1, 2, 3])), - Arc::new(Int64Array::from(vec![4, 7, 10, 11])), - Arc::new(Int64Array::from(vec![Some(21), None, Some(387), Some(87)])), - ], - None, - )); - let time = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2, 3])); - let subsort = Arc::new(UInt64Array::from(vec![0, 1, 2, 3])); - let key_hash = Arc::new(UInt64Array::from(vec![0, 1, 2, 3])); - let input_batch = Batch::new_with_data( - input_data, - time, - subsort, - key_hash, - RowTime::from_timestamp_ns(3), - ); - input_tx.send(input_batch).await.unwrap(); - std::mem::drop(input_tx); - - execute( - "hello".to_owned(), - DataType::Struct(input_fields), - input_rx, - DataType::Struct(output_fields), - output_tx, - ) - .await - .unwrap(); - - let output = output_rx.recv().await.unwrap(); - let output = output.data().unwrap().as_struct(); - let ab = output.column_by_name("ab").unwrap(); - let abc = output.column_by_name("abc").unwrap(); - assert_eq!(ab.as_primitive(), &Int64Array::from(vec![4, 8, 12, 14])); - assert_eq!( - abc.as_primitive(), - &Int64Array::from(vec![Some(25), None, Some(399), Some(101)]) - ); -} + use std::sync::Arc; -/// Execute a physical plan. -pub async fn execute( - query_id: String, - input_type: DataType, - mut input: tokio::sync::mpsc::Receiver, - output_type: DataType, - output: tokio::sync::mpsc::Sender, -) -> error_stack::Result<(), Error> { - let mut worker_pool = WorkerPool::start(query_id).change_context(Error::Creating)?; - - // This sets up some fake stuff: - // - We don't have sources / sinks yet, so we use tokio channels. - // - We create a "hypothetical" scan step (0) - // - We create a hard-coded "project" step (1) - // - We output the results to the channel. - - let table_id = uuid::uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8"); - - let scan = sparrow_physical::Step { - id: 0.into(), - kind: sparrow_physical::StepKind::Read { - source_id: table_id, - }, - inputs: vec![], - result_type: input_type, - exprs: sparrow_physical::Exprs::new(), - }; - - let project = sparrow_physical::Step { - id: 1.into(), - kind: sparrow_physical::StepKind::Project, - inputs: vec![0.into()], - result_type: output_type.clone(), - exprs: index_vec![ - sparrow_physical::Expr { - name: "column".into(), - literal_args: vec![ScalarValue::Utf8(Some("a".to_owned()))], - args: vec![], - result_type: DataType::Int64 - }, - sparrow_physical::Expr { - name: "column".into(), - literal_args: vec![ScalarValue::Utf8(Some("b".to_owned()))], - args: vec![], - result_type: DataType::Int64 - }, - sparrow_physical::Expr { - name: "add".into(), - literal_args: vec![], - args: vec![0.into(), 1.into()], - result_type: DataType::Int64 - }, - sparrow_physical::Expr { - name: "column".into(), - literal_args: vec![ScalarValue::Utf8(Some("c".to_owned()))], - args: vec![], - result_type: DataType::Int64 - }, - sparrow_physical::Expr { - name: "add".into(), - literal_args: vec![], - args: vec![2.into(), 3.into()], - result_type: DataType::Int64 - }, - sparrow_physical::Expr { - name: "record".into(), - literal_args: vec![], - args: vec![2.into(), 4.into()], - result_type: output_type - } - ], - }; - - let sink_pipeline = worker_pool.add_pipeline(1, WriteChannelPipeline::new(output)); - let transform_pipeline = worker_pool.add_pipeline( - 1, - TransformPipeline::try_new( - &scan, - [project].iter(), - PipelineInput::new(sink_pipeline, 0), - ) - .change_context(Error::Creating)?, - ); - let transform_pipeline_input = PipelineInput::new(transform_pipeline, 0); + use arrow_array::cast::AsArray; + use arrow_array::{ArrayRef, Int64Array, RecordBatch, TimestampNanosecondArray, UInt64Array}; + use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit}; + use error_stack::{IntoReport, ResultExt}; + use futures::stream::BoxStream; + use futures::TryStreamExt; + use index_vec::index_vec; + use parking_lot::Mutex; + use sparrow_arrow::scalar_value::ScalarValue; + use sparrow_batch::Batch; + use sparrow_interfaces::ReadConfig; + use sparrow_interfaces::Source; + use sparrow_scheduler::{Pipeline, PipelineError, PipelineInput, WorkerPool}; + use sparrow_transforms::TransformPipeline; - let mut injector = worker_pool.injector().clone(); - while let Some(batch) = input.recv().await { - transform_pipeline_input - .add_input(0.into(), batch, &mut injector) - .change_context(Error::Executing)?; + #[derive(derive_more::Display, Debug)] + pub enum Error { + #[display(fmt = "error creating executor")] + Creating, + #[display(fmt = "error executing")] + Executing, } - transform_pipeline_input - .close_input(0.into(), &mut injector) - .change_context(Error::Executing)?; - worker_pool.stop().change_context(Error::Executing)?; - Ok(()) -} - -#[derive(Debug)] -struct WriteChannelPipeline(Mutex>>); + impl error_stack::Context for Error {} -impl WriteChannelPipeline { - fn new(channel: tokio::sync::mpsc::Sender) -> Self { - Self(Mutex::new(Some(channel))) + fn in_memory_source(schema: SchemaRef) -> sparrow_sources::InMemory { + let source = sparrow_sources::InMemory::new(true, schema.clone()).unwrap(); + source } -} -impl Pipeline for WriteChannelPipeline { - fn initialize(&mut self, _tasks: sparrow_scheduler::Partitioned) {} - - fn add_input( - &self, - input_partition: sparrow_scheduler::Partition, - input: usize, - batch: Batch, - _scheduler: &mut dyn sparrow_scheduler::Scheduler, - ) -> error_stack::Result<(), PipelineError> { - let channel = self.0.lock(); - channel - .as_ref() - .ok_or(PipelineError::InputClosed { - input, - input_partition, - })? - .blocking_send(batch) - .into_report() - .change_context(PipelineError::Execution) + #[tokio::test] + #[ignore] + async fn test_query() { + sparrow_testing::init_test_logging(); + + let (output_tx, mut output_rx) = tokio::sync::mpsc::channel(10); + + let input_fields = Fields::from(vec![ + Field::new( + "_time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new("_subsort", DataType::UInt64, true), + Field::new("_key_hash", DataType::UInt64, false), + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Int64, true), + Field::new("c", DataType::Int64, true), + ]); + let projected_datatype = DataType::Struct(input_fields.clone()); + let schema = Arc::new(Schema::new(input_fields.clone())); + let read_config = ReadConfig { + keep_open: true, + start_time: None, + end_time: None, + }; + + let input_source = in_memory_source(schema.clone()); + let input_rx = input_source.read(&projected_datatype, read_config); + + let output_fields = Fields::from(vec![ + Field::new("ab", DataType::Int64, true), + Field::new("abc", DataType::Int64, true), + ]); + + let input_columns: Vec = vec![ + Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2, 3])), // time + Arc::new(UInt64Array::from(vec![0, 1, 2, 3])), // subsort + Arc::new(UInt64Array::from(vec![0, 1, 2, 3])), // key_hash + Arc::new(Int64Array::from(vec![0, 1, 2, 3])), // a + Arc::new(Int64Array::from(vec![4, 7, 10, 11])), // b + Arc::new(Int64Array::from(vec![Some(21), None, Some(387), Some(87)])), // c + ]; + let input_batch = RecordBatch::try_new(schema.clone(), input_columns).unwrap(); + input_source.add_batch(input_batch).await.unwrap(); + + execute( + "hello".to_owned(), + DataType::Struct(input_fields), + input_rx, + DataType::Struct(output_fields), + output_tx, + ) + .await + .unwrap(); + + let output = output_rx.recv().await.unwrap(); + let output = output.data().unwrap().as_struct(); + let ab = output.column_by_name("ab").unwrap(); + let abc = output.column_by_name("abc").unwrap(); + assert_eq!(ab.as_primitive(), &Int64Array::from(vec![4, 8, 12, 14])); + assert_eq!( + abc.as_primitive(), + &Int64Array::from(vec![Some(25), None, Some(399), Some(101)]) + ); } - fn close_input( - &self, - input_partition: sparrow_scheduler::Partition, - input: usize, - _scheduler: &mut dyn sparrow_scheduler::Scheduler, - ) -> error_stack::Result<(), PipelineError> { - let mut channel = self.0.lock(); - error_stack::ensure!( - channel.is_some(), - PipelineError::InputClosed { - input, - input_partition, + /// Execute a physical plan. + pub async fn execute( + query_id: String, + input_type: DataType, + mut input: BoxStream<'_, error_stack::Result>, + output_type: DataType, + output: tokio::sync::mpsc::Sender, + ) -> error_stack::Result<(), Error> { + let mut worker_pool = WorkerPool::start(query_id).change_context(Error::Creating)?; + + // This sets up some fake stuff: + // - We don't have sinks yet, so we use tokio channels. + // - We create a "hypothetical" scan step (0) + // - We create a hard-coded "project" step (1) + // - We output the results to the channel. + + let table_id = uuid::uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8"); + + let scan = sparrow_physical::Step { + id: 0.into(), + kind: sparrow_physical::StepKind::Read { + source_id: table_id, }, + inputs: vec![], + result_type: input_type, + exprs: sparrow_physical::Exprs::new(), + }; + + let project = sparrow_physical::Step { + id: 1.into(), + kind: sparrow_physical::StepKind::Project, + inputs: vec![0.into()], + result_type: output_type.clone(), + exprs: index_vec![ + sparrow_physical::Expr { + name: "column".into(), + literal_args: vec![ScalarValue::Utf8(Some("a".to_owned()))], + args: vec![], + result_type: DataType::Int64 + }, + sparrow_physical::Expr { + name: "column".into(), + literal_args: vec![ScalarValue::Utf8(Some("b".to_owned()))], + args: vec![], + result_type: DataType::Int64 + }, + sparrow_physical::Expr { + name: "add".into(), + literal_args: vec![], + args: vec![0.into(), 1.into()], + result_type: DataType::Int64 + }, + sparrow_physical::Expr { + name: "column".into(), + literal_args: vec![ScalarValue::Utf8(Some("c".to_owned()))], + args: vec![], + result_type: DataType::Int64 + }, + sparrow_physical::Expr { + name: "add".into(), + literal_args: vec![], + args: vec![2.into(), 3.into()], + result_type: DataType::Int64 + }, + sparrow_physical::Expr { + name: "record".into(), + literal_args: vec![], + args: vec![2.into(), 4.into()], + result_type: output_type + } + ], + }; + + let sink_pipeline = worker_pool.add_pipeline(1, WriteChannelPipeline::new(output)); + let transform_pipeline = worker_pool.add_pipeline( + 1, + TransformPipeline::try_new( + &scan, + [project].iter(), + PipelineInput::new(sink_pipeline, 0), + ) + .change_context(Error::Creating)?, ); - *channel = None; + let transform_pipeline_input = PipelineInput::new(transform_pipeline, 0); + + let mut injector = worker_pool.injector().clone(); + while let Some(batch) = input.try_next().await.unwrap() { + transform_pipeline_input + .add_input(0.into(), batch, &mut injector) + .change_context(Error::Executing)?; + } + transform_pipeline_input + .close_input(0.into(), &mut injector) + .change_context(Error::Executing)?; + worker_pool.stop().change_context(Error::Executing)?; + Ok(()) } - fn do_work( - &self, - _partition: sparrow_scheduler::Partition, - _scheduler: &mut dyn sparrow_scheduler::Scheduler, - ) -> error_stack::Result<(), PipelineError> { - Ok(()) + #[derive(Debug)] + struct WriteChannelPipeline(Mutex>>); + + impl WriteChannelPipeline { + fn new(channel: tokio::sync::mpsc::Sender) -> Self { + Self(Mutex::new(Some(channel))) + } + } + + impl Pipeline for WriteChannelPipeline { + fn initialize( + &mut self, + _tasks: sparrow_scheduler::Partitioned, + ) { + } + + fn add_input( + &self, + input_partition: sparrow_scheduler::Partition, + input: usize, + batch: Batch, + _scheduler: &mut dyn sparrow_scheduler::Scheduler, + ) -> error_stack::Result<(), PipelineError> { + let channel = self.0.lock(); + channel + .as_ref() + .ok_or(PipelineError::InputClosed { + input, + input_partition, + })? + .blocking_send(batch) + .into_report() + .change_context(PipelineError::Execution) + } + + fn close_input( + &self, + input_partition: sparrow_scheduler::Partition, + input: usize, + _scheduler: &mut dyn sparrow_scheduler::Scheduler, + ) -> error_stack::Result<(), PipelineError> { + let mut channel = self.0.lock(); + error_stack::ensure!( + channel.is_some(), + PipelineError::InputClosed { + input, + input_partition, + }, + ); + *channel = None; + Ok(()) + } + + fn do_work( + &self, + _partition: sparrow_scheduler::Partition, + _scheduler: &mut dyn sparrow_scheduler::Scheduler, + ) -> error_stack::Result<(), PipelineError> { + Ok(()) + } } } diff --git a/crates/sparrow-expressions/Cargo.toml b/crates/sparrow-expressions/Cargo.toml index f91b4e52f..93438188c 100644 --- a/crates/sparrow-expressions/Cargo.toml +++ b/crates/sparrow-expressions/Cargo.toml @@ -28,6 +28,7 @@ itertools.workspace = true num.workspace = true serde_json.workspace = true sparrow-arrow = { path = "../sparrow-arrow" } +sparrow-batch = { path = "../sparrow-batch" } sparrow-physical = { path = "../sparrow-physical" } static_init.workspace = true substring.workspace = true diff --git a/crates/sparrow-expressions/src/executor.rs b/crates/sparrow-expressions/src/executor.rs index da323c312..67843b651 100644 --- a/crates/sparrow-expressions/src/executor.rs +++ b/crates/sparrow-expressions/src/executor.rs @@ -1,6 +1,6 @@ use arrow_array::ArrayRef; use arrow_schema::DataType; -use sparrow_arrow::Batch; +use sparrow_batch::Batch; use crate::evaluator::Evaluator; use crate::evaluators; diff --git a/crates/sparrow-expressions/src/work_area.rs b/crates/sparrow-expressions/src/work_area.rs index 366abc8c6..3c838bd0e 100644 --- a/crates/sparrow-expressions/src/work_area.rs +++ b/crates/sparrow-expressions/src/work_area.rs @@ -1,5 +1,5 @@ use arrow_array::ArrayRef; -use sparrow_arrow::Batch; +use sparrow_batch::Batch; use crate::values::WorkAreaValue; diff --git a/crates/sparrow-interfaces/Cargo.toml b/crates/sparrow-interfaces/Cargo.toml new file mode 100644 index 000000000..48d225263 --- /dev/null +++ b/crates/sparrow-interfaces/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "sparrow-interfaces" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +publish = false +description = """ +Common interfaces for the Sparrow compilation and runtime. +""" + +[dependencies] +arrow-schema.workspace = true +derive_more.workspace = true +error-stack.workspace = true +futures.workspace = true +sparrow-batch = { path = "../sparrow-batch" } +sparrow-core = { path = "../sparrow-core" } + +[dev-dependencies] + +[lib] +bench = false +doctest = false diff --git a/crates/sparrow-interfaces/src/lib.rs b/crates/sparrow-interfaces/src/lib.rs new file mode 100644 index 000000000..6aab43cca --- /dev/null +++ b/crates/sparrow-interfaces/src/lib.rs @@ -0,0 +1,14 @@ +#![warn( + rust_2018_idioms, + nonstandard_style, + future_incompatible, + clippy::mod_module_files, + clippy::print_stdout, + clippy::print_stderr +)] + +mod source; +mod source_error; + +pub use source::*; +pub use source_error::SourceError; diff --git a/crates/sparrow-interfaces/src/source.rs b/crates/sparrow-interfaces/src/source.rs new file mode 100644 index 000000000..4b9b74127 --- /dev/null +++ b/crates/sparrow-interfaces/src/source.rs @@ -0,0 +1,45 @@ +use arrow_schema::{DataType, SchemaRef}; +use futures::stream::BoxStream; +use sparrow_batch::Batch; + +use crate::SourceError; + +/// Trait implemented by sources. +pub trait Source: Send + Sync { + fn prepared_schema(&self) -> SchemaRef; + /// Defines how a source provides data to the execution layer. + /// + /// Creates a stream which can be read in a loop to obtain batches + /// until some stopping condition is met (stream is exhausted, + /// a certain number of batches are read, a stop signal is received, + /// etc. + /// + /// Parameters: + /// * `projected_datatype`: The datatype of the data to produce. Note that + /// this may differ from the prepared type. + /// * `read_config`: Configuration for the read. + fn read( + &self, + projected_datatype: &DataType, + read_config: ReadConfig, + ) -> BoxStream<'_, error_stack::Result>; +} + +/// Defines the configuration for a read from a source. +#[derive(Debug)] +pub struct ReadConfig { + /// If true, the read will act as an unbounded source and continue reading + /// as new data is added. It is on the consumer to close the channel. + /// + /// If false, the read will act as a bounded source, and stop once the set + /// of data available at the time of the read has been processed. + pub keep_open: bool, + /// Optional timestamp in nanos at which to start reading. + /// + /// Defaults to the earliest available timestamp. + pub start_time: Option, + /// Optional timestamp in nanos at which to end reading. + /// + /// Defaults to reading until the source is closed. + pub end_time: Option, +} diff --git a/crates/sparrow-interfaces/src/source_error.rs b/crates/sparrow-interfaces/src/source_error.rs new file mode 100644 index 000000000..815087a6f --- /dev/null +++ b/crates/sparrow-interfaces/src/source_error.rs @@ -0,0 +1,22 @@ +#[non_exhaustive] +#[derive(derive_more::Display, Debug)] +pub enum SourceError { + #[display(fmt = "internal error: {}", _0)] + Internal(&'static str), + #[display(fmt = "failed to add in-memory batch")] + Add, + #[display(fmt = "receiver lagged")] + ReceiverLagged, +} + +impl error_stack::Context for SourceError {} + +impl SourceError { + pub fn internal() -> Self { + SourceError::Internal("no additional context") + } + + pub fn internal_msg(msg: &'static str) -> Self { + SourceError::Internal(msg) + } +} diff --git a/crates/sparrow-merge/src/lib.rs b/crates/sparrow-merge/src/lib.rs index b13f73871..05c58b040 100644 --- a/crates/sparrow-merge/src/lib.rs +++ b/crates/sparrow-merge/src/lib.rs @@ -9,7 +9,4 @@ clippy::undocumented_unsafe_blocks )] -mod in_memory_batches; pub mod old; - -pub use in_memory_batches::*; diff --git a/crates/sparrow-runtime/src/key_hash_inverse.rs b/crates/sparrow-runtime/src/key_hash_inverse.rs index 4382885b1..dfd3c8004 100644 --- a/crates/sparrow-runtime/src/key_hash_inverse.rs +++ b/crates/sparrow-runtime/src/key_hash_inverse.rs @@ -138,7 +138,7 @@ impl KeyHashInverse { table .in_memory .as_ref() - .and_then(|in_memroy| in_memroy.current()) + .and_then(|in_memory| in_memory.current()) .map(|batch| { let keys = batch .column_by_name(&table.config().group_column_name) diff --git a/crates/sparrow-scheduler/Cargo.toml b/crates/sparrow-scheduler/Cargo.toml index 227db8568..dab7fab27 100644 --- a/crates/sparrow-scheduler/Cargo.toml +++ b/crates/sparrow-scheduler/Cargo.toml @@ -17,6 +17,7 @@ index_vec.workspace = true itertools.workspace = true serde.workspace = true sparrow-arrow = { path = "../sparrow-arrow" } +sparrow-batch = { path = "../sparrow-batch" } tracing.workspace = true work-queue = "0.1.4" diff --git a/crates/sparrow-scheduler/src/pipeline.rs b/crates/sparrow-scheduler/src/pipeline.rs index 9630828b6..6f96b20b3 100644 --- a/crates/sparrow-scheduler/src/pipeline.rs +++ b/crates/sparrow-scheduler/src/pipeline.rs @@ -1,6 +1,6 @@ use std::borrow::Cow; -use sparrow_arrow::Batch; +use sparrow_batch::Batch; use crate::{Partition, Partitioned, Scheduler, TaskRef}; diff --git a/crates/sparrow-scheduler/src/sink.rs b/crates/sparrow-scheduler/src/sink.rs index 6f7fe7c15..b99533eca 100644 --- a/crates/sparrow-scheduler/src/sink.rs +++ b/crates/sparrow-scheduler/src/sink.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use sparrow_arrow::Batch; +use sparrow_batch::Batch; use crate::{Partition, Pipeline, PipelineError, Scheduler}; diff --git a/crates/sparrow-session/Cargo.toml b/crates/sparrow-session/Cargo.toml index 7f1d78af8..42a51a779 100644 --- a/crates/sparrow-session/Cargo.toml +++ b/crates/sparrow-session/Cargo.toml @@ -22,6 +22,7 @@ sparrow-api = { path = "../sparrow-api" } sparrow-compiler = { path = "../sparrow-compiler" } sparrow-merge = { path = "../sparrow-merge" } sparrow-runtime = { path = "../sparrow-runtime" } +sparrow-sources = { path = "../sparrow-sources" } sparrow-syntax = { path = "../sparrow-syntax" } sparrow-instructions = { path = "../sparrow-instructions" } static_init.workspace = true diff --git a/crates/sparrow-session/src/table.rs b/crates/sparrow-session/src/table.rs index 097ee9249..e83d57405 100644 --- a/crates/sparrow-session/src/table.rs +++ b/crates/sparrow-session/src/table.rs @@ -9,11 +9,11 @@ use error_stack::ResultExt; use futures::TryStreamExt; use sparrow_api::kaskada::v1alpha; use sparrow_compiler::{FileSets, TableInfo}; -use sparrow_merge::InMemoryBatches; use sparrow_runtime::preparer::Preparer; use sparrow_runtime::stores::ObjectStoreUrl; use sparrow_runtime::ParquetFile; use sparrow_runtime::{key_hash_inverse::ThreadSafeKeyHashInverse, stores::ObjectStoreRegistry}; +use sparrow_sources::InMemoryBatches; use crate::{Error, Expr}; diff --git a/crates/sparrow-sources/Cargo.toml b/crates/sparrow-sources/Cargo.toml new file mode 100644 index 000000000..75414dbe3 --- /dev/null +++ b/crates/sparrow-sources/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "sparrow-sources" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +publish = false +description = """ +Defines the input source implementations. +""" + +[dependencies] +arrow-array.workspace = true +async-broadcast.workspace = true +async-stream.workspace = true +arrow-schema.workspace = true +derive_more.workspace = true +error-stack.workspace = true +futures.workspace = true +sparrow-core = { path = "../sparrow-core" } +sparrow-merge = { path = "../sparrow-merge" } +sparrow-batch = { path = "../sparrow-batch" } +sparrow-interfaces = { path = "../sparrow-interfaces" } +tracing.workspace = true + +[dev-dependencies] + +[lib] +bench = false +doctest = false diff --git a/crates/sparrow-merge/src/in_memory_batches.rs b/crates/sparrow-sources/src/in_memory.rs similarity index 59% rename from crates/sparrow-merge/src/in_memory_batches.rs rename to crates/sparrow-sources/src/in_memory.rs index ac727c7d1..e6b951683 100644 --- a/crates/sparrow-merge/src/in_memory_batches.rs +++ b/crates/sparrow-sources/src/in_memory.rs @@ -1,23 +1,86 @@ -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; use arrow_array::RecordBatch; -use arrow_schema::SchemaRef; +use arrow_schema::{DataType, SchemaRef}; use error_stack::{IntoReportCompat, ResultExt}; -use futures::Stream; +use futures::{Stream, StreamExt, TryStreamExt}; -use crate::old::homogeneous_merge; +use sparrow_batch::Batch; +use sparrow_interfaces::{ReadConfig, Source}; +use sparrow_merge::old::homogeneous_merge; -#[derive(derive_more::Display, Debug)] -pub enum Error { - #[display(fmt = "failed to add in-memory batch")] - Add, - #[display(fmt = "receiver lagged")] - ReceiverLagged, +use sparrow_interfaces::SourceError; + +/// A shared, synchronized container for in-memory batches. +pub struct InMemory { + /// The prepared schema. + /// + /// Note this is not the `projected_schema`, which is the schema + /// after applying column projections. + prepared_schema: SchemaRef, + /// The in-memory batches. + data: Arc, +} + +impl InMemory { + pub fn new(queryable: bool, schema: SchemaRef) -> error_stack::Result { + let data = Arc::new(InMemoryBatches::new(queryable, schema.clone())); + let source = Self { + prepared_schema: schema, + data, + }; + Ok(source) + } + + /// Add a batch, publishing it to the subscribers. + pub async fn add_batch(&self, batch: RecordBatch) -> error_stack::Result<(), SourceError> { + self.data.add_batch(batch).await + } } -impl error_stack::Context for Error {} +impl Source for InMemory { + fn prepared_schema(&self) -> SchemaRef { + self.prepared_schema.clone() + } + + fn read( + &self, + projected_datatype: &DataType, + read_config: ReadConfig, + ) -> futures::stream::BoxStream<'_, error_stack::Result> { + assert_eq!( + &DataType::Struct(self.prepared_schema().fields().clone()), + projected_datatype, + "Projection not yet supported" + ); + + let input_stream = if read_config.keep_open { + self.data + .subscribe() + .map_err(|e| e.change_context(SourceError::internal_msg("invalid input"))) + .and_then(move |batch| async move { + Batch::try_new_from_batch(batch) + .change_context(SourceError::internal_msg("invalid input")) + }) + .boxed() + } else if let Some(batch) = self.data.current() { + futures::stream::once(async move { + Batch::try_new_from_batch(batch) + .change_context(SourceError::internal_msg("invalid input")) + }) + .boxed() + } else { + futures::stream::empty().boxed() + }; + + input_stream + } +} /// Struct for managing in-memory batches. +/// +/// Note: several items left pub for use in old code path, can remove +/// when that path is removed. #[derive(Debug)] pub struct InMemoryBatches { /// Whether rows added will be available for interactive queries. @@ -39,7 +102,7 @@ struct Current { } impl Current { - pub fn new(schema: SchemaRef) -> Self { + fn new(schema: SchemaRef) -> Self { let batch = RecordBatch::new_empty(schema.clone()); Self { schema, @@ -48,7 +111,7 @@ impl Current { } } - pub fn add_batch(&mut self, batch: &RecordBatch) -> error_stack::Result<(), Error> { + fn add_batch(&mut self, batch: &RecordBatch) -> error_stack::Result<(), SourceError> { if self.batch.num_rows() == 0 { self.batch = batch.clone(); } else { @@ -57,7 +120,7 @@ impl Current { // put it in an option, or allow `homogeneous_merge` to take `&RecordBatch`. self.batch = homogeneous_merge(&self.schema, vec![self.batch.clone(), batch.clone()]) .into_report() - .change_context(Error::Add)?; + .change_context(SourceError::Add)?; } Ok(()) } @@ -82,13 +145,13 @@ impl InMemoryBatches { /// Add a batch, merging it into the in-memory version. /// /// Publishes the new batch to the subscribers. - pub async fn add_batch(&self, batch: RecordBatch) -> error_stack::Result<(), Error> { + pub async fn add_batch(&self, batch: RecordBatch) -> error_stack::Result<(), SourceError> { if batch.num_rows() == 0 { return Ok(()); } let new_version = { - let mut write = self.current.write().map_err(|_| Error::Add)?; + let mut write = self.current.write().map_err(|_| SourceError::Add)?; if self.queryable { write.add_batch(&batch)?; } @@ -110,7 +173,7 @@ impl InMemoryBatches { /// added as they arrive. pub fn subscribe( &self, - ) -> impl Stream> + 'static { + ) -> impl Stream> + 'static { let (mut version, merged) = { let read = self.current.read().unwrap(); (read.version, read.batch.clone()) @@ -139,7 +202,7 @@ impl InMemoryBatches { break; }, Err(async_broadcast::RecvError::Overflowed(_)) => { - Err(Error::ReceiverLagged)?; + Err(SourceError::ReceiverLagged)?; } } } diff --git a/crates/sparrow-sources/src/lib.rs b/crates/sparrow-sources/src/lib.rs new file mode 100644 index 000000000..f58ecf100 --- /dev/null +++ b/crates/sparrow-sources/src/lib.rs @@ -0,0 +1,12 @@ +#![warn( + rust_2018_idioms, + nonstandard_style, + future_incompatible, + clippy::mod_module_files, + clippy::print_stdout, + clippy::print_stderr +)] + +mod in_memory; + +pub use in_memory::*; diff --git a/crates/sparrow-transforms/Cargo.toml b/crates/sparrow-transforms/Cargo.toml index db5d87d84..3b4913553 100644 --- a/crates/sparrow-transforms/Cargo.toml +++ b/crates/sparrow-transforms/Cargo.toml @@ -18,6 +18,7 @@ error-stack.workspace = true itertools.workspace = true parking_lot.workspace = true sparrow-arrow = { path = "../sparrow-arrow" } +sparrow-batch = { path = "../sparrow-batch" } sparrow-expressions = { path = "../sparrow-expressions" } sparrow-physical = { path = "../sparrow-physical" } sparrow-scheduler = { path = "../sparrow-scheduler" } diff --git a/crates/sparrow-transforms/src/project.rs b/crates/sparrow-transforms/src/project.rs index b46257a54..e1a358cad 100644 --- a/crates/sparrow-transforms/src/project.rs +++ b/crates/sparrow-transforms/src/project.rs @@ -1,6 +1,6 @@ use arrow_schema::DataType; use error_stack::ResultExt; -use sparrow_arrow::Batch; +use sparrow_batch::Batch; use sparrow_expressions::ExpressionExecutor; use sparrow_physical::Exprs; @@ -50,7 +50,8 @@ mod tests { use arrow_array::{Int64Array, StructArray, TimestampNanosecondArray, UInt64Array}; use arrow_schema::{Field, Fields}; use index_vec::index_vec; - use sparrow_arrow::{scalar_value::ScalarValue, RowTime}; + use sparrow_arrow::scalar_value::ScalarValue; + use sparrow_batch::RowTime; fn test_batch() -> Batch { let input_fields = Fields::from(vec![ diff --git a/crates/sparrow-transforms/src/select.rs b/crates/sparrow-transforms/src/select.rs index 32bb4ffc1..59f4c4c2f 100644 --- a/crates/sparrow-transforms/src/select.rs +++ b/crates/sparrow-transforms/src/select.rs @@ -1,7 +1,7 @@ use arrow_array::{cast::AsArray, BooleanArray}; use arrow_schema::DataType; use error_stack::{IntoReport, ResultExt}; -use sparrow_arrow::Batch; +use sparrow_batch::Batch; use sparrow_expressions::ExpressionExecutor; use sparrow_physical::Exprs; @@ -99,7 +99,8 @@ mod tests { use arrow_array::{Int64Array, StructArray, TimestampNanosecondArray, UInt64Array}; use arrow_schema::{Field, Fields}; use index_vec::index_vec; - use sparrow_arrow::{scalar_value::ScalarValue, RowTime}; + use sparrow_arrow::scalar_value::ScalarValue; + use sparrow_batch::RowTime; fn test_batch() -> Batch { let input_fields = Fields::from(vec![ diff --git a/crates/sparrow-transforms/src/transform.rs b/crates/sparrow-transforms/src/transform.rs index 18e91cc81..ecea88dff 100644 --- a/crates/sparrow-transforms/src/transform.rs +++ b/crates/sparrow-transforms/src/transform.rs @@ -1,5 +1,5 @@ use arrow_schema::DataType; -use sparrow_arrow::Batch; +use sparrow_batch::Batch; #[derive(derive_more::Display, Debug)] pub enum Error { diff --git a/crates/sparrow-transforms/src/transform_pipeline.rs b/crates/sparrow-transforms/src/transform_pipeline.rs index 1687ed64c..7332fbf60 100644 --- a/crates/sparrow-transforms/src/transform_pipeline.rs +++ b/crates/sparrow-transforms/src/transform_pipeline.rs @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use error_stack::ResultExt; use itertools::Itertools; use parking_lot::Mutex; -use sparrow_arrow::Batch; +use sparrow_batch::Batch; use sparrow_physical::{StepId, StepKind}; use sparrow_scheduler::{ Partition, Partitioned, Pipeline, PipelineError, PipelineInput, Scheduler, TaskRef,