diff --git a/.vscode/settings.json b/.vscode/settings.json index d3432e6f2..9de2826e6 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,4 +1,5 @@ { "rust-analyzer.cargo.features": "all", "editor.formatOnSave": true, + "editor.minimap.showSlider": "always", } \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index a9605cd67..b2ae84442 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4607,6 +4607,7 @@ dependencies = [ "itertools 0.11.0", "smallvec", "sparrow-batch", + "sparrow-core", "static_init", "tempfile", "tokio", @@ -4821,29 +4822,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "sparrow-read" -version = "0.11.0" -dependencies = [ - "arrow-array", - "arrow-schema", - "arrow-select", - "async-trait", - "derive_more", - "error-stack", - "futures", - "futures-lite", - "index_vec", - "itertools 0.11.0", - "parking_lot 0.12.1", - "sparrow-arrow", - "sparrow-expressions", - "sparrow-physical", - "sparrow-scheduler", - "tracing", - "uuid 1.4.1", -] - [[package]] name = "sparrow-runtime" version = "0.11.0" @@ -4977,6 +4955,7 @@ dependencies = [ "itertools 0.11.0", "smallvec", "sparrow-batch", + "sparrow-core", "sparrow-interfaces", "sparrow-merge", "static_init", diff --git a/crates/sparrow-core/src/lib.rs b/crates/sparrow-core/src/lib.rs index 86347078a..06d3d706f 100644 --- a/crates/sparrow-core/src/lib.rs +++ b/crates/sparrow-core/src/lib.rs @@ -11,12 +11,14 @@ use std::pin::Pin; mod context_code; mod key_triple; +mod read_config; mod table_schema; use arrow::record_batch::RecordBatch; pub use context_code::*; use futures::Stream; pub use key_triple::{KeyTriple, KeyTriples}; +pub use read_config::ReadConfig; pub use table_schema::TableSchema; // A stream of record batches (wrapped in anyhow::Result, boxed and pinned). diff --git a/crates/sparrow-interfaces/Cargo.toml b/crates/sparrow-interfaces/Cargo.toml index 73bea4a7f..e5652dce7 100644 --- a/crates/sparrow-interfaces/Cargo.toml +++ b/crates/sparrow-interfaces/Cargo.toml @@ -24,6 +24,7 @@ static_init.workspace = true tempfile.workspace = true tokio.workspace = true sparrow-batch = { path = "../sparrow-batch" } +sparrow-core = { path = "../sparrow-core" } tokio-stream.workspace = true uuid.workspace = true diff --git a/crates/sparrow-interfaces/src/error.rs b/crates/sparrow-interfaces/src/error.rs index b4e5fc7a4..ab1dd8844 100644 --- a/crates/sparrow-interfaces/src/error.rs +++ b/crates/sparrow-interfaces/src/error.rs @@ -1,17 +1,17 @@ #[derive(derive_more::Display, Debug)] pub enum Error { #[display(fmt = "internal error: {}", _0)] - Internal(String), + Internal(&'static str), } impl error_stack::Context for Error {} impl Error { pub fn internal() -> Self { - Error::Internal("no additional context".to_owned()) + Error::Internal("no additional context") } - pub fn internal_msg(msg: String) -> Self { + pub fn internal_msg(msg: &'static str) -> Self { Error::Internal(msg) } } diff --git a/crates/sparrow-interfaces/src/source.rs b/crates/sparrow-interfaces/src/source.rs index 9c7ff673e..1df3e1c66 100644 --- a/crates/sparrow-interfaces/src/source.rs +++ b/crates/sparrow-interfaces/src/source.rs @@ -1,11 +1,12 @@ use arrow_schema::SchemaRef; use futures::stream::BoxStream; use sparrow_batch::Batch; +use sparrow_core::ReadConfig; use crate::Error; /// Defines how a source provides data to the execution layer. pub trait Source: Send + Sync { fn schema(&self) -> SchemaRef; - fn read(&self, projection: &[usize]) -> BoxStream<'_, error_stack::Result>; + fn read(&self, read_config: ReadConfig) -> BoxStream<'_, error_stack::Result>; } diff --git a/crates/sparrow-sources/Cargo.toml b/crates/sparrow-sources/Cargo.toml index 6789247ae..f7c5022d0 100644 --- a/crates/sparrow-sources/Cargo.toml +++ b/crates/sparrow-sources/Cargo.toml @@ -25,6 +25,7 @@ static_init.workspace = true tempfile.workspace = true tokio.workspace = true tokio-stream.workspace = true +sparrow-core = { path = "../sparrow-core" } sparrow-merge = { path = "../sparrow-merge" } sparrow-batch = { path = "../sparrow-batch" } sparrow-interfaces = { path = "../sparrow-interfaces" } diff --git a/crates/sparrow-sources/src/error.rs b/crates/sparrow-sources/src/error.rs index 70d93b7aa..f96b0c80f 100644 --- a/crates/sparrow-sources/src/error.rs +++ b/crates/sparrow-sources/src/error.rs @@ -1,7 +1,7 @@ #[derive(derive_more::Display, Debug)] pub enum Error { #[display(fmt = "internal error: {}", _0)] - Internal(String), + Internal(&'static str), #[display(fmt = "failed to add in-memory batch")] Add, #[display(fmt = "receiver lagged")] @@ -12,10 +12,10 @@ impl error_stack::Context for Error {} impl Error { pub fn internal() -> Self { - Error::Internal("no additional context".to_owned()) + Error::Internal("no additional context") } - pub fn internal_msg(msg: String) -> Self { + pub fn internal_msg(msg: &'static str) -> Self { Error::Internal(msg) } } diff --git a/crates/sparrow-sources/src/in_memory.rs b/crates/sparrow-sources/src/in_memory.rs index a917c7dcb..30d57a01d 100644 --- a/crates/sparrow-sources/src/in_memory.rs +++ b/crates/sparrow-sources/src/in_memory.rs @@ -3,9 +3,10 @@ use std::sync::{Arc, RwLock}; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; use error_stack::{IntoReportCompat, ResultExt}; -use futures::Stream; +use futures::{Stream, StreamExt, TryStreamExt}; use sparrow_batch::Batch; +use sparrow_core::ReadConfig; use sparrow_interfaces::Source; use sparrow_merge::old::homogeneous_merge; @@ -13,19 +14,45 @@ use crate::Error; /// Wraps the mutable in-memory batch container. pub struct InMemory { + /// The projected schema of the batches this source will produce. + schema: SchemaRef, + /// The in-memory batches. data: Arc, } impl Source for InMemory { fn schema(&self) -> SchemaRef { - todo!() + self.schema.clone() } fn read( &self, - projection: &[usize], + read_config: ReadConfig, ) -> futures::stream::BoxStream<'_, error_stack::Result> { - todo!() + let input_stream = if read_config.keep_open { + self.data + .subscribe() + .map_err(|e| { + e.change_context(sparrow_interfaces::Error::internal_msg("invalid input")) + }) + .and_then(|batch| async move { + Batch::try_new_from_batch(batch) + .into_report() + .change_context(sparrow_interfaces::Error::internal_msg("invalid input")) + }) + .boxed() + } else if let Some(batch) = self.data.current() { + futures::stream::once(async move { + Batch::try_new_from_batch(batch) + .into_report() + .change_context(sparrow_interfaces::Error::internal_msg("invalid input")) + }) + .boxed() + } else { + futures::stream::empty().boxed() + }; + + input_stream } }