Skip to content

Commit

Permalink
Add basic in-memory read impl
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanrfrazier committed Oct 10, 2023
1 parent 0b15e70 commit 137bd20
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 34 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"rust-analyzer.cargo.features": "all",
"editor.formatOnSave": true,
"editor.minimap.showSlider": "always",
}
25 changes: 2 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/sparrow-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ use std::pin::Pin;

mod context_code;
mod key_triple;
mod read_config;
mod table_schema;

use arrow::record_batch::RecordBatch;
pub use context_code::*;
use futures::Stream;
pub use key_triple::{KeyTriple, KeyTriples};
pub use read_config::ReadConfig;
pub use table_schema::TableSchema;

// A stream of record batches (wrapped in anyhow::Result, boxed and pinned).
Expand Down
1 change: 1 addition & 0 deletions crates/sparrow-interfaces/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ static_init.workspace = true
tempfile.workspace = true
tokio.workspace = true
sparrow-batch = { path = "../sparrow-batch" }
sparrow-core = { path = "../sparrow-core" }
tokio-stream.workspace = true
uuid.workspace = true

Expand Down
6 changes: 3 additions & 3 deletions crates/sparrow-interfaces/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
#[derive(derive_more::Display, Debug)]
pub enum Error {
#[display(fmt = "internal error: {}", _0)]
Internal(String),
Internal(&'static str),
}

impl error_stack::Context for Error {}

impl Error {
pub fn internal() -> Self {
Error::Internal("no additional context".to_owned())
Error::Internal("no additional context")
}

pub fn internal_msg(msg: String) -> Self {
pub fn internal_msg(msg: &'static str) -> Self {
Error::Internal(msg)
}
}
3 changes: 2 additions & 1 deletion crates/sparrow-interfaces/src/source.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use arrow_schema::SchemaRef;
use futures::stream::BoxStream;
use sparrow_batch::Batch;
use sparrow_core::ReadConfig;

use crate::Error;

/// Defines how a source provides data to the execution layer.
pub trait Source: Send + Sync {
fn schema(&self) -> SchemaRef;
fn read(&self, projection: &[usize]) -> BoxStream<'_, error_stack::Result<Batch, Error>>;
fn read(&self, read_config: ReadConfig) -> BoxStream<'_, error_stack::Result<Batch, Error>>;
}
1 change: 1 addition & 0 deletions crates/sparrow-sources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ static_init.workspace = true
tempfile.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
sparrow-core = { path = "../sparrow-core" }
sparrow-merge = { path = "../sparrow-merge" }
sparrow-batch = { path = "../sparrow-batch" }
sparrow-interfaces = { path = "../sparrow-interfaces" }
Expand Down
6 changes: 3 additions & 3 deletions crates/sparrow-sources/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[derive(derive_more::Display, Debug)]
pub enum Error {
#[display(fmt = "internal error: {}", _0)]
Internal(String),
Internal(&'static str),
#[display(fmt = "failed to add in-memory batch")]
Add,
#[display(fmt = "receiver lagged")]
Expand All @@ -12,10 +12,10 @@ impl error_stack::Context for Error {}

impl Error {
pub fn internal() -> Self {
Error::Internal("no additional context".to_owned())
Error::Internal("no additional context")
}

pub fn internal_msg(msg: String) -> Self {
pub fn internal_msg(msg: &'static str) -> Self {
Error::Internal(msg)
}
}
35 changes: 31 additions & 4 deletions crates/sparrow-sources/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,56 @@ use std::sync::{Arc, RwLock};
use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use error_stack::{IntoReportCompat, ResultExt};
use futures::Stream;
use futures::{Stream, StreamExt, TryStreamExt};

use sparrow_batch::Batch;
use sparrow_core::ReadConfig;
use sparrow_interfaces::Source;
use sparrow_merge::old::homogeneous_merge;

use crate::Error;

/// Wraps the mutable in-memory batch container.
pub struct InMemory {
/// The projected schema of the batches this source will produce.
schema: SchemaRef,
/// The in-memory batches.
data: Arc<InMemoryBatches>,
}

impl Source for InMemory {
fn schema(&self) -> SchemaRef {
todo!()
self.schema.clone()
}

fn read(
&self,
projection: &[usize],
read_config: ReadConfig,
) -> futures::stream::BoxStream<'_, error_stack::Result<Batch, sparrow_interfaces::Error>> {
todo!()
let input_stream = if read_config.keep_open {
self.data
.subscribe()
.map_err(|e| {
e.change_context(sparrow_interfaces::Error::internal_msg("invalid input"))
})
.and_then(|batch| async move {
Batch::try_new_from_batch(batch)
.into_report()
.change_context(sparrow_interfaces::Error::internal_msg("invalid input"))
})
.boxed()
} else if let Some(batch) = self.data.current() {
futures::stream::once(async move {
Batch::try_new_from_batch(batch)
.into_report()
.change_context(sparrow_interfaces::Error::internal_msg("invalid input"))
})
.boxed()
} else {
futures::stream::empty().boxed()
};

input_stream
}
}

Expand Down

0 comments on commit 137bd20

Please sign in to comment.