Skip to content

Commit

Permalink
use sparrow source error
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanrfrazier committed Oct 11, 2023
1 parent ae14ae1 commit 6755a53
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 62 deletions.
7 changes: 4 additions & 3 deletions crates/sparrow-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod tests {
use parking_lot::Mutex;
use sparrow_arrow::scalar_value::ScalarValue;
use sparrow_batch::Batch;
use sparrow_core::ReadConfig;
use sparrow_interfaces::ReadConfig;
use sparrow_interfaces::Source;
use sparrow_scheduler::{Pipeline, PipelineError, PipelineInput, WorkerPool};
use sparrow_transforms::TransformPipeline;
Expand Down Expand Up @@ -64,6 +64,7 @@ mod tests {
Field::new("b", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
]);
let projected_datatype = DataType::Struct(input_fields.clone());
let schema = Arc::new(Schema::new(input_fields.clone()));
let read_config = ReadConfig {
keep_open: true,
Expand All @@ -72,7 +73,7 @@ mod tests {
};

let input_source = in_memory_source(schema.clone());
let input_rx = input_source.read(projected_datatype, read_config);
let input_rx = input_source.read(&projected_datatype, read_config);

let output_fields = Fields::from(vec![
Field::new("ab", DataType::Int64, true),
Expand Down Expand Up @@ -115,7 +116,7 @@ mod tests {
pub async fn execute(
query_id: String,
input_type: DataType,
mut input: BoxStream<'_, error_stack::Result<Batch, sparrow_interfaces::Error>>,
mut input: BoxStream<'_, error_stack::Result<Batch, sparrow_interfaces::SourceError>>,
output_type: DataType,
output: tokio::sync::mpsc::Sender<Batch>,
) -> error_stack::Result<(), Error> {
Expand Down
17 changes: 0 additions & 17 deletions crates/sparrow-interfaces/src/error.rs

This file was deleted.

4 changes: 2 additions & 2 deletions crates/sparrow-interfaces/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
clippy::print_stderr
)]

mod error;
mod source;
mod source_error;

pub use error::Error;
pub use source::*;
pub use source_error::SourceError;
4 changes: 2 additions & 2 deletions crates/sparrow-interfaces/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use arrow_schema::{DataType, SchemaRef};
use futures::stream::BoxStream;
use sparrow_batch::Batch;

use crate::Error;
use crate::SourceError;

/// Trait implemented by sources.
pub trait Source: Send + Sync {
Expand All @@ -19,7 +19,7 @@ pub trait Source: Send + Sync {
&self,
projected_datatype: &DataType,
read_config: ReadConfig,
) -> BoxStream<'_, error_stack::Result<Batch, Error>>;
) -> BoxStream<'_, error_stack::Result<Batch, SourceError>>;
}

/// Defines the configuration for a read from a source.
Expand Down
21 changes: 0 additions & 21 deletions crates/sparrow-sources/src/error.rs

This file was deleted.

28 changes: 13 additions & 15 deletions crates/sparrow-sources/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use sparrow_batch::Batch;
use sparrow_interfaces::{ReadConfig, Source};
use sparrow_merge::old::homogeneous_merge;

use crate::Error;
use sparrow_interfaces::SourceError;

/// A shared, synchronized container for in-memory batches.
pub struct InMemory {
Expand All @@ -23,7 +23,7 @@ pub struct InMemory {
}

impl InMemory {
pub fn new(queryable: bool, schema: SchemaRef) -> error_stack::Result<Self, Error> {
pub fn new(queryable: bool, schema: SchemaRef) -> error_stack::Result<Self, SourceError> {
let data = Arc::new(InMemoryBatches::new(queryable, schema.clone()));
let source = Self {
prepared_schema: schema,
Expand All @@ -33,7 +33,7 @@ impl InMemory {
}

/// Add a batch, publishing it to the subscribers.
pub async fn add_batch(&self, batch: RecordBatch) -> error_stack::Result<(), Error> {
pub async fn add_batch(&self, batch: RecordBatch) -> error_stack::Result<(), SourceError> {
self.data.add_batch(batch).await
}
}
Expand All @@ -47,7 +47,7 @@ impl Source for InMemory {
&self,
projected_datatype: &DataType,
read_config: ReadConfig,
) -> futures::stream::BoxStream<'_, error_stack::Result<Batch, sparrow_interfaces::Error>> {
) -> futures::stream::BoxStream<'_, error_stack::Result<Batch, SourceError>> {
assert_eq!(
&DataType::Struct(self.prepared_schema().fields().clone()),
projected_datatype,
Expand All @@ -57,18 +57,16 @@ impl Source for InMemory {
let input_stream = if read_config.keep_open {
self.data
.subscribe()
.map_err(|e| {
e.change_context(sparrow_interfaces::Error::internal_msg("invalid input"))
})
.map_err(|e| e.change_context(SourceError::internal_msg("invalid input")))
.and_then(move |batch| async move {
Batch::try_new_from_batch(batch)
.change_context(sparrow_interfaces::Error::internal_msg("invalid input"))
.change_context(SourceError::internal_msg("invalid input"))
})
.boxed()
} else if let Some(batch) = self.data.current() {
futures::stream::once(async move {
Batch::try_new_from_batch(batch)
.change_context(sparrow_interfaces::Error::internal_msg("invalid input"))
.change_context(SourceError::internal_msg("invalid input"))
})
.boxed()
} else {
Expand Down Expand Up @@ -113,7 +111,7 @@ impl Current {
}
}

fn add_batch(&mut self, batch: &RecordBatch) -> error_stack::Result<(), Error> {
fn add_batch(&mut self, batch: &RecordBatch) -> error_stack::Result<(), SourceError> {
if self.batch.num_rows() == 0 {
self.batch = batch.clone();
} else {
Expand All @@ -122,7 +120,7 @@ impl Current {
// put it in an option, or allow `homogeneous_merge` to take `&RecordBatch`.
self.batch = homogeneous_merge(&self.schema, vec![self.batch.clone(), batch.clone()])
.into_report()
.change_context(Error::Add)?;
.change_context(SourceError::Add)?;
}
Ok(())
}
Expand All @@ -147,13 +145,13 @@ impl InMemoryBatches {
/// Add a batch, merging it into the in-memory version.
///
/// Publishes the new batch to the subscribers.
pub async fn add_batch(&self, batch: RecordBatch) -> error_stack::Result<(), Error> {
pub async fn add_batch(&self, batch: RecordBatch) -> error_stack::Result<(), SourceError> {
if batch.num_rows() == 0 {
return Ok(());
}

let new_version = {
let mut write = self.current.write().map_err(|_| Error::Add)?;
let mut write = self.current.write().map_err(|_| SourceError::Add)?;
if self.queryable {
write.add_batch(&batch)?;
}
Expand All @@ -175,7 +173,7 @@ impl InMemoryBatches {
/// added as they arrive.
pub fn subscribe(
&self,
) -> impl Stream<Item = error_stack::Result<RecordBatch, Error>> + 'static {
) -> impl Stream<Item = error_stack::Result<RecordBatch, SourceError>> + 'static {
let (mut version, merged) = {
let read = self.current.read().unwrap();
(read.version, read.batch.clone())
Expand Down Expand Up @@ -204,7 +202,7 @@ impl InMemoryBatches {
break;
},
Err(async_broadcast::RecvError::Overflowed(_)) => {
Err(Error::ReceiverLagged)?;
Err(SourceError::ReceiverLagged)?;
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions crates/sparrow-sources/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
clippy::print_stderr
)]

mod error;
mod in_memory;

pub use error::Error;
pub use in_memory::*;

0 comments on commit 6755a53

Please sign in to comment.