Skip to content

Commit

Permalink
Some refactor to get sparrow-execution test passing
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanrfrazier committed Oct 11, 2023
1 parent ba89ff4 commit 0d3f9b7
Show file tree
Hide file tree
Showing 13 changed files with 18 additions and 19 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions crates/sparrow-arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ pub mod hash;
pub mod hasher;
pub mod scalar_value;
pub mod serde;
#[cfg(any(test, feature = "testing"))]
pub mod testing;
pub mod utils;

pub use concat_take::*;
8 changes: 2 additions & 6 deletions crates/sparrow-batch/src/batch.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use std::sync::Arc;

use arrow::ipc::Timestamp;
use arrow_array::cast::AsArray;
use arrow_array::types::{TimestampNanosecondType, UInt64Type};
use arrow_array::{
Array, ArrayRef, ArrowPrimitiveType, RecordBatch, StructArray, TimestampNanosecondArray,
UInt64Array,
};
use arrow_schema::{Field, Fields, Schema};
use error_stack::{IntoReport, IntoReportCompat, ResultExt};
use error_stack::{IntoReport, ResultExt};
use itertools::Itertools;

use crate::{Error, RowTime};
Expand Down Expand Up @@ -335,9 +334,6 @@ impl Batch {
key_hash: impl Into<arrow_array::UInt64Array>,
up_to_time: i64,
) -> Self {
use arrow_array::StructArray;
use std::sync::Arc;

let time: TimestampNanosecondArray = time.into();
let subsort: UInt64Array = (0..(time.len() as u64)).collect_vec().into();
let key_hash: UInt64Array = key_hash.into();
Expand Down Expand Up @@ -597,10 +593,10 @@ static MINIMAL_SCHEMA: arrow_schema::SchemaRef = {
#[cfg(test)]
mod tests {

use crate::testing::arb_arrays::arb_batch;
use crate::{Batch, RowTime};
use itertools::Itertools;
use proptest::prelude::*;
use sparrow_arrow::testing::arb_arrays::arb_batch;

#[test]
fn test_split_batch() {
Expand Down
3 changes: 3 additions & 0 deletions crates/sparrow-batch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ mod row_time;
pub use batch::*;
pub use error::Error;
pub use row_time::*;

#[cfg(any(test, feature = "testing"))]
pub mod testing;
File renamed without changes.
6 changes: 3 additions & 3 deletions crates/sparrow-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod tests {
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
use error_stack::{IntoReport, ResultExt};
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use futures::TryStreamExt;
use index_vec::index_vec;
use parking_lot::Mutex;
use sparrow_arrow::scalar_value::ScalarValue;
Expand Down Expand Up @@ -117,7 +117,7 @@ mod tests {
pub async fn execute(
query_id: String,
input_type: DataType,
input: BoxStream<'_, error_stack::Result<Batch, sparrow_interfaces::Error>>,
mut input: BoxStream<'_, error_stack::Result<Batch, sparrow_interfaces::Error>>,
output_type: DataType,
output: tokio::sync::mpsc::Sender<Batch>,
) -> error_stack::Result<(), Error> {
Expand Down Expand Up @@ -199,7 +199,7 @@ mod tests {
let transform_pipeline_input = PipelineInput::new(transform_pipeline, 0);

let mut injector = worker_pool.injector().clone();
while let Some(batch) = input.try_next().await {
while let Some(batch) = input.try_next().await.unwrap() {
transform_pipeline_input
.add_input(0.into(), batch, &mut injector)
.change_context(Error::Executing)?;
Expand Down
2 changes: 0 additions & 2 deletions crates/sparrow-sources/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,12 @@ impl Source for InMemory {
})
.and_then(|batch| async move {
Batch::try_new_from_batch(batch)
.into_report()
.change_context(sparrow_interfaces::Error::internal_msg("invalid input"))
})
.boxed()
} else if let Some(batch) = self.data.current() {
futures::stream::once(async move {
Batch::try_new_from_batch(batch)
.into_report()
.change_context(sparrow_interfaces::Error::internal_msg("invalid input"))
})
.boxed()
Expand Down
1 change: 1 addition & 0 deletions crates/sparrow-transforms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ error-stack.workspace = true
itertools.workspace = true
parking_lot.workspace = true
sparrow-arrow = { path = "../sparrow-arrow" }
sparrow-batch = { path = "../sparrow-batch" }
sparrow-expressions = { path = "../sparrow-expressions" }
sparrow-physical = { path = "../sparrow-physical" }
sparrow-scheduler = { path = "../sparrow-scheduler" }
Expand Down
5 changes: 3 additions & 2 deletions crates/sparrow-transforms/src/project.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use arrow_schema::DataType;
use error_stack::ResultExt;
use sparrow_arrow::Batch;
use sparrow_batch::Batch;

use sparrow_expressions::ExpressionExecutor;
use sparrow_physical::Exprs;
Expand Down Expand Up @@ -50,7 +50,8 @@ mod tests {
use arrow_array::{Int64Array, StructArray, TimestampNanosecondArray, UInt64Array};
use arrow_schema::{Field, Fields};
use index_vec::index_vec;
use sparrow_arrow::{scalar_value::ScalarValue, RowTime};
use sparrow_arrow::scalar_value::ScalarValue;
use sparrow_batch::RowTime;

fn test_batch() -> Batch {
let input_fields = Fields::from(vec![
Expand Down
5 changes: 3 additions & 2 deletions crates/sparrow-transforms/src/select.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use arrow_array::{cast::AsArray, BooleanArray};
use arrow_schema::DataType;
use error_stack::{IntoReport, ResultExt};
use sparrow_arrow::Batch;
use sparrow_batch::Batch;

use sparrow_expressions::ExpressionExecutor;
use sparrow_physical::Exprs;
Expand Down Expand Up @@ -99,7 +99,8 @@ mod tests {
use arrow_array::{Int64Array, StructArray, TimestampNanosecondArray, UInt64Array};
use arrow_schema::{Field, Fields};
use index_vec::index_vec;
use sparrow_arrow::{scalar_value::ScalarValue, RowTime};
use sparrow_arrow::scalar_value::ScalarValue;
use sparrow_batch::RowTime;

fn test_batch() -> Batch {
let input_fields = Fields::from(vec![
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-transforms/src/transform.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arrow_schema::DataType;
use sparrow_arrow::Batch;
use sparrow_batch::Batch;

#[derive(derive_more::Display, Debug)]
pub enum Error {
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-transforms/src/transform_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use error_stack::ResultExt;
use itertools::Itertools;
use parking_lot::Mutex;
use sparrow_arrow::Batch;
use sparrow_batch::Batch;
use sparrow_physical::{StepId, StepKind};
use sparrow_scheduler::{
Partition, Partitioned, Pipeline, PipelineError, PipelineInput, Scheduler, TaskRef,
Expand Down

0 comments on commit 0d3f9b7

Please sign in to comment.