Skip to content

Commit

Permalink
prune deps and add todo for projection schema
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanrfrazier committed Oct 11, 2023
1 parent dad00ec commit 88843f3
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 48 deletions.
1 change: 0 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{
"rust-analyzer.cargo.features": "all",
"editor.formatOnSave": true,
"editor.minimap.showSlider": "mouseover",
}
20 changes: 0 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/sparrow-batch/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ impl Batch {

#[cfg(debug_assertions)]
validate(&data, up_to_time)?;
// TODO: Extract the columns I want

let time: &TimestampNanosecondArray = data.column(0).as_primitive();
let min_present_time = time.value(0);
Expand Down
3 changes: 1 addition & 2 deletions crates/sparrow-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ mod tests {
async fn test_query() {
sparrow_testing::init_test_logging();

// let (input_tx, input_rx) = tokio::sync::mpsc::channel(10);
let (output_tx, mut output_rx) = tokio::sync::mpsc::channel(10);

let input_fields = Fields::from(vec![
Expand Down Expand Up @@ -124,7 +123,7 @@ mod tests {
let mut worker_pool = WorkerPool::start(query_id).change_context(Error::Creating)?;

// This sets up some fake stuff:
// - We don't have sources / sinks yet, so we use tokio channels.
// - We don't have sinks yet, so we use tokio channels.
// - We create a "hypothetical" scan step (0)
// - We create a hard-coded "project" step (1)
// - We output the results to the channel.
Expand Down
11 changes: 0 additions & 11 deletions crates/sparrow-interfaces/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,12 @@ Defines the common interfaces for the Sparrow compilation and runtime.
"""

[dependencies]
anyhow.workspace = true
arrow-array.workspace = true
hashbrown.workspace = true
arrow-schema.workspace = true
arrow-select.workspace = true
derive_more.workspace = true
error-stack.workspace = true
futures.workspace = true
itertools.workspace = true
smallvec.workspace = true
static_init.workspace = true
tempfile.workspace = true
tokio.workspace = true
sparrow-batch = { path = "../sparrow-batch" }
sparrow-core = { path = "../sparrow-core" }
tokio-stream.workspace = true
uuid.workspace = true

[dev-dependencies]

Expand Down
9 changes: 0 additions & 9 deletions crates/sparrow-sources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,14 @@ Defines the input source implementations.
arrow-array.workspace = true
async-broadcast.workspace = true
async-stream.workspace = true
hashbrown.workspace = true
arrow-schema.workspace = true
arrow-select.workspace = true
derive_more.workspace = true
error-stack.workspace = true
futures.workspace = true
itertools.workspace = true
smallvec.workspace = true
static_init.workspace = true
tempfile.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
sparrow-core = { path = "../sparrow-core" }
sparrow-merge = { path = "../sparrow-merge" }
sparrow-batch = { path = "../sparrow-batch" }
sparrow-interfaces = { path = "../sparrow-interfaces" }
uuid.workspace = true
tracing.workspace = true

[dev-dependencies]
Expand Down
14 changes: 10 additions & 4 deletions crates/sparrow-sources/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,22 @@ impl Source for InMemory {
&self,
read_config: ReadConfig,
) -> futures::stream::BoxStream<'_, error_stack::Result<Batch, sparrow_interfaces::Error>> {
// TODO: Projection
// TODO: Do we get any benefit from projecting the schema here?
// Or should the prepared schema just match the projected for in-memory sources?
let input_stream = if read_config.keep_open {
self.data
.subscribe()
.map_err(|e| {
e.change_context(sparrow_interfaces::Error::internal_msg("invalid input"))
})
.and_then(|batch| async move {
Batch::try_new_from_batch(batch)
.change_context(sparrow_interfaces::Error::internal_msg("invalid input"))
.and_then(move |batch| {
let projected_schema = read_config.projected_schema.clone();
async move {
debug_assert_eq!(batch.schema(), projected_schema, "expected projected schema to equal batch schema. Saw schema: {:?}, expected projected: {:?}", batch.schema(), projected_schema);
Batch::try_new_from_batch(batch).change_context(
sparrow_interfaces::Error::internal_msg("invalid input"),
)
}
})
.boxed()
} else if let Some(batch) = self.data.current() {
Expand Down

0 comments on commit 88843f3

Please sign in to comment.