Skip to content

Commit

Permalink
ref: Move source to a public module in interfaces (#816)
Browse files Browse the repository at this point in the history
  • Loading branch information
bjchambers authored Oct 17, 2023
1 parent ae68437 commit b5709cf
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 34 deletions.
3 changes: 2 additions & 1 deletion crates/sparrow-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use hashbrown::HashMap;
use sparrow_interfaces::{ExecutionOptions, Source};
use sparrow_interfaces::source::Source;
use sparrow_interfaces::ExecutionOptions;
use sparrow_physical::StepId;
use sparrow_transforms::TransformPipeline;
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-execution/src/source_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use error_stack::ResultExt;
use futures::stream::{BoxStream, FuturesUnordered};
use futures::{StreamExt, TryStreamExt};
use sparrow_batch::Batch;
use sparrow_interfaces::SourceError;
use sparrow_interfaces::source::SourceError;
use sparrow_scheduler::{Injector, InputHandles};
use tracing::Instrument;
use uuid::Uuid;
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-execution/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use arrow_array::cast::AsArray;
use arrow_array::{Int64Array, RecordBatch, TimestampNanosecondArray, UInt64Array};
use sparrow_interfaces::{Source, SourceExt};
use sparrow_interfaces::source::{Source, SourceExt};
use sparrow_logical::ExprRef;
use sparrow_session::partitioned::Session;
use sparrow_sources::InMemory;
Expand Down
5 changes: 1 addition & 4 deletions crates/sparrow-interfaces/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
)]

mod execution_options;
mod source;
mod source_error;
pub mod source;

pub use execution_options::*;
pub use source::*;
pub use source_error::SourceError;
25 changes: 24 additions & 1 deletion crates/sparrow-interfaces/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arrow_schema::{DataType, SchemaRef};
use futures::stream::BoxStream;
use sparrow_batch::Batch;

use crate::{ExecutionOptions, SourceError};
use crate::ExecutionOptions;

/// Trait implemented by sources.
pub trait Source: Send + Sync {
Expand All @@ -30,6 +30,29 @@ pub trait Source: Send + Sync {
fn as_any(&self) -> &dyn std::any::Any;
}

#[non_exhaustive]
#[derive(derive_more::Display, Debug)]
pub enum SourceError {
#[display(fmt = "internal error: {}", _0)]
Internal(&'static str),
#[display(fmt = "failed to add in-memory batch")]
Add,
#[display(fmt = "receiver lagged")]
ReceiverLagged,
}

impl error_stack::Context for SourceError {}

impl SourceError {
pub fn internal() -> Self {
SourceError::Internal("no additional context")
}

pub fn internal_msg(msg: &'static str) -> Self {
SourceError::Internal(msg)
}
}

pub trait SourceExt {
fn downcast_source_opt<T: Source + 'static>(&self) -> Option<&T>;
fn downcast_source<T: Source + 'static>(&self) -> &T {
Expand Down
22 changes: 0 additions & 22 deletions crates/sparrow-interfaces/src/source_error.rs

This file was deleted.

3 changes: 2 additions & 1 deletion crates/sparrow-session/src/partitioned/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use error_stack::ResultExt;
use futures::FutureExt;
use hashbrown::HashMap;
use sparrow_compiler::NearestMatches;
use sparrow_interfaces::{ExecutionOptions, Source};
use sparrow_interfaces::source::Source;
use sparrow_interfaces::ExecutionOptions;
use sparrow_logical::{ExprRef, Literal};
use uuid::Uuid;

Expand Down
5 changes: 2 additions & 3 deletions crates/sparrow-sources/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use error_stack::{IntoReportCompat, ResultExt};
use futures::{Stream, StreamExt, TryStreamExt};

use sparrow_batch::Batch;
use sparrow_interfaces::{ExecutionOptions, Source};
use sparrow_interfaces::source::{Source, SourceError};
use sparrow_interfaces::ExecutionOptions;
use sparrow_merge::old::homogeneous_merge;

use sparrow_interfaces::SourceError;

/// A shared, synchronized container for in-memory batches.
pub struct InMemory {
/// The prepared schema.
Expand Down

0 comments on commit b5709cf

Please sign in to comment.