Skip to content

Commit

Permalink
feat: Initial partitioned execution (#528)
Browse files Browse the repository at this point in the history
This introduces the key components of partitioned execution.

- `sparrow-scheduler` provides functionality for managing the separate
pipelines within the query plan and morsel-driven parallelism. It
managing a thread-pool of workers pinned to specific CPUs pulling tasks
from local queues.
- `sparrow-transforms` will provide implementations of the "transforms"
(project, select, etc.) and a pipeline for executing the transforms.
- `sparrow-execution` will pull everything together to provide
partitioned execution.

This is part of #409.
  • Loading branch information
bjchambers authored Jul 25, 2023
1 parent fafdf58 commit 71bfc2c
Show file tree
Hide file tree
Showing 28 changed files with 1,957 additions and 359 deletions.
807 changes: 456 additions & 351 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ chronoutil = "0.2.3"
clap = { version = "4.2.0", features = ["derive", "env"] }
codespan-reporting = "0.11.1"
const_format = "0.2.30"
core_affinity = "0.8.0"
cpu-time = "1.0.0"
criterion = { version = "0.4.0", default-features = false, features = [
"async_tokio",
Expand Down Expand Up @@ -75,6 +76,7 @@ once_cell = "1.17.1"
opentelemetry = { version = "0.18.0", features = ["rt-tokio"] }
opentelemetry-otlp = "0.11.0"
owning_ref = "0.4.1"
parking_lot = { version = "0.12.1" }
parquet = { version = "43.0.0", features = ["async"] }
parse-display = "0.8.0"
pin-project = "1.0.12"
Expand Down Expand Up @@ -143,6 +145,7 @@ features = ["lz4"]
[profile.release]
lto = "thin"
debug = 0 # Set this to 1 or 2 to get more useful backtraces from debugger
codegen-units = 1

# Enable max optimizations for dependencies, but not for our code
[profile.dev.package."*"]
Expand All @@ -152,3 +155,4 @@ opt-level = 3
[profile.dev]
opt-level = 1
debug = 2
codegen-units = 1
9 changes: 9 additions & 0 deletions crates/sparrow-arrow/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ impl Batch {
}
}

/// Create a new `Batch` containing the given batch data.
///
/// `time`, `subsort` and `key_hash` are references to the key columns.
/// They may be references to columns within the batch or not.
///
/// `up_to_time` is a [RowTime] such that:
/// (a) all rows so far are less than or equal to `up_to_time`
/// (b) all rows in this batch or less than or equal to `up_to_time`
/// (c) all future rows are greater than `up_to_time`.
pub fn new_with_data(
batch: RecordBatch,
time: ArrayRef,
Expand Down
6 changes: 6 additions & 0 deletions crates/sparrow-backend/src/pipeline_schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ mod tests {
let steps = index_vec::index_vec![
// 0: scan table1
Step {
id: 0.into(),
kind: StepKind::Scan {
table_name: "table1".to_owned(),
},
Expand All @@ -102,6 +103,7 @@ mod tests {
},
// 1: scan table2
Step {
id: 1.into(),
kind: StepKind::Scan {
table_name: "table2".to_owned(),
},
Expand All @@ -110,12 +112,14 @@ mod tests {
},
// 2: merge 0 and 1
Step {
id: 2.into(),
kind: StepKind::Merge,
inputs: vec![0.into(), 1.into()],
schema: schema.clone(),
},
// 3: project 0 -> separate pipeline since 0 has 2 consumers
Step {
id: 3.into(),
kind: StepKind::Project {
exprs: Exprs::empty(),
},
Expand All @@ -124,6 +128,7 @@ mod tests {
},
// 4: project 2 -> same pipeline since only consumer
Step {
id: 4.into(),
kind: StepKind::Project {
exprs: Exprs::empty(),
},
Expand All @@ -132,6 +137,7 @@ mod tests {
},
// 5: merge 3 and 4 -> new pipeline since merge
Step {
id: 5.into(),
kind: StepKind::Merge,
inputs: vec![3.into(), 4.into()],
schema,
Expand Down
29 changes: 29 additions & 0 deletions crates/sparrow-execution/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "sparrow-execution"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
publish = false
description = """
Implementations of the pipelines to be executed.
"""

[dependencies]
derive_more.workspace = true
error-stack.workspace = true
parking_lot.workspace = true
sparrow-arrow = { path = "../sparrow-arrow" }
sparrow-physical = { path = "../sparrow-physical" }
sparrow-transforms = { path = "../sparrow-transforms" }
sparrow-scheduler = { path = "../sparrow-scheduler" }
tokio.workspace = true

[dev-dependencies]
arrow-array.workspace = true
arrow-schema.workspace = true
index_vec.workspace = true
sparrow-testing = { path = "../sparrow-testing" }

[lib]
doctest = false
255 changes: 255 additions & 0 deletions crates/sparrow-execution/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
#![warn(
rust_2018_idioms,
nonstandard_style,
future_incompatible,
clippy::mod_module_files,
clippy::print_stdout,
clippy::print_stderr,
clippy::undocumented_unsafe_blocks
)]

//! Implementations of the pipelines to be executed.

#[cfg(test)]
mod tests {

use std::sync::Arc;

use arrow_array::cast::AsArray;
use arrow_array::{Int64Array, RecordBatch, TimestampNanosecondArray, UInt64Array};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use error_stack::{IntoReport, ResultExt};
use index_vec::index_vec;
use parking_lot::Mutex;
use sparrow_arrow::scalar_value::ScalarValue;
use sparrow_arrow::{Batch, RowTime};
use sparrow_scheduler::{Pipeline, PipelineError, PipelineInput, WorkerPool};
use sparrow_transforms::TransformPipeline;

#[derive(derive_more::Display, Debug)]
pub enum Error {
#[display(fmt = "error creating executor")]
Creating,
#[display(fmt = "error executing")]
Executing,
}

impl error_stack::Context for Error {}

#[tokio::test]
async fn test_query() {
sparrow_testing::init_test_logging();

let (input_tx, input_rx) = tokio::sync::mpsc::channel(10);
let (output_tx, mut output_rx) = tokio::sync::mpsc::channel(10);

let input_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
]));

let output_schema = Arc::new(Schema::new(vec![
Field::new("ab", DataType::Int64, true),
Field::new("abc", DataType::Int64, true),
]));

let input_batch = RecordBatch::try_new(
input_schema.clone(),
vec![
Arc::new(Int64Array::from(vec![0, 1, 2, 3])),
Arc::new(Int64Array::from(vec![4, 7, 10, 11])),
Arc::new(Int64Array::from(vec![Some(21), None, Some(387), Some(87)])),
],
)
.unwrap();
let time = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2, 3]));
let subsort = Arc::new(UInt64Array::from(vec![0, 1, 2, 3]));
let key_hash = Arc::new(UInt64Array::from(vec![0, 1, 2, 3]));
let input_batch = Batch::new_with_data(
input_batch,
time,
subsort,
key_hash,
RowTime::from_timestamp_ns(3),
);
input_tx.send(input_batch).await.unwrap();
std::mem::drop(input_tx);

execute(
"hello".to_owned(),
input_schema,
input_rx,
output_schema,
output_tx,
)
.await
.unwrap();

let output = output_rx.recv().await.unwrap();
let output = output.into_record_batch().unwrap();
let ab = output.column_by_name("ab").unwrap();
let abc = output.column_by_name("abc").unwrap();
assert_eq!(ab.as_primitive(), &Int64Array::from(vec![4, 8, 12, 14]));
assert_eq!(
abc.as_primitive(),
&Int64Array::from(vec![Some(25), None, Some(399), Some(101)])
);
}

/// Execute a physical plan.
pub async fn execute(
query_id: String,
input_schema: SchemaRef,
mut input: tokio::sync::mpsc::Receiver<Batch>,
output_schema: SchemaRef,
output: tokio::sync::mpsc::Sender<Batch>,
) -> error_stack::Result<(), Error> {
let mut worker_pool = WorkerPool::start(query_id).change_context(Error::Creating)?;

// This sets up some fake stuff:
// - We don't have sources / sinks yet, so we use tokio channels.
// - We create a "hypothetical" scan step (0)
// - We create a hard-coded "project" step (1)
// - We output the results to the channel.

let scan = sparrow_physical::Step {
id: 0.into(),
kind: sparrow_physical::StepKind::Scan {
table_name: "table".to_owned(),
},
inputs: vec![],
schema: input_schema,
};

let project = sparrow_physical::Step {
id: 1.into(),
kind: sparrow_physical::StepKind::Project {
exprs: sparrow_physical::Exprs {
exprs: index_vec![
sparrow_physical::Expr {
name: "column".into(),
literal_args: vec![ScalarValue::Utf8(Some("a".to_owned()))],
args: vec![],
result_type: DataType::Int64
},
sparrow_physical::Expr {
name: "column".into(),
literal_args: vec![ScalarValue::Utf8(Some("b".to_owned()))],
args: vec![],
result_type: DataType::Int64
},
sparrow_physical::Expr {
name: "add".into(),
literal_args: vec![],
args: vec![0.into(), 1.into()],
result_type: DataType::Int64
},
sparrow_physical::Expr {
name: "column".into(),
literal_args: vec![ScalarValue::Utf8(Some("c".to_owned()))],
args: vec![],
result_type: DataType::Int64
},
sparrow_physical::Expr {
name: "add".into(),
literal_args: vec![],
args: vec![2.into(), 3.into()],
result_type: DataType::Int64
},
],
outputs: vec![2.into(), 4.into()],
},
},
inputs: vec![0.into()],
schema: output_schema,
};

let sink_pipeline = worker_pool.add_pipeline(1, WriteChannelPipeline::new(output));
let transform_pipeline = worker_pool.add_pipeline(
1,
TransformPipeline::try_new(
&scan,
vec![project].iter(),
PipelineInput::new(sink_pipeline, 0),
)
.change_context(Error::Creating)?,
);
let transform_pipeline_input = PipelineInput::new(transform_pipeline, 0);

let mut injector = worker_pool.injector().clone();
while let Some(batch) = input.recv().await {
transform_pipeline_input
.add_input(0.into(), batch, &mut injector)
.change_context(Error::Executing)?;
}
transform_pipeline_input
.close_input(0.into(), &mut injector)
.change_context(Error::Executing)?;
worker_pool.stop().change_context(Error::Executing)?;

Ok(())
}

#[derive(Debug)]
struct WriteChannelPipeline(Mutex<Option<tokio::sync::mpsc::Sender<Batch>>>);

impl WriteChannelPipeline {
fn new(channel: tokio::sync::mpsc::Sender<Batch>) -> Self {
Self(Mutex::new(Some(channel)))
}
}

impl Pipeline for WriteChannelPipeline {
fn initialize(
&mut self,
_tasks: sparrow_scheduler::Partitioned<sparrow_scheduler::TaskRef>,
) {
}

fn add_input(
&self,
input_partition: sparrow_scheduler::Partition,
input: usize,
batch: Batch,
_scheduler: &mut dyn sparrow_scheduler::Scheduler,
) -> error_stack::Result<(), PipelineError> {
let channel = self.0.lock();
channel
.as_ref()
.ok_or(PipelineError::InputClosed {
input,
input_partition,
})?
.blocking_send(batch)
.into_report()
.change_context(PipelineError::Execution)
}

fn close_input(
&self,
input_partition: sparrow_scheduler::Partition,
input: usize,
_scheduler: &mut dyn sparrow_scheduler::Scheduler,
) -> error_stack::Result<(), PipelineError> {
let mut channel = self.0.lock();
error_stack::ensure!(
channel.is_some(),
PipelineError::InputClosed {
input,
input_partition,
},
);
*channel = None;
Ok(())
}

fn do_work(
&self,
_partition: sparrow_scheduler::Partition,
_scheduler: &mut dyn sparrow_scheduler::Scheduler,
) -> error_stack::Result<(), PipelineError> {
Ok(())
}
}
}
5 changes: 5 additions & 0 deletions crates/sparrow-expressions/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ pub enum Error {
expected: DataType,
actual: DataType,
},
#[display(fmt = "invalid result type: expected {expected:?} but was {actual:?}")]
InvalidResultType {
expected: DataType,
actual: DataType,
},
#[display(fmt = "invalid argument type: expected struct but was {actual:?}")]
InvalidNonStructArgumentType { actual: DataType },
#[display(fmt = "invalid result type: expected struct but was {actual:?}")]
Expand Down
Loading

0 comments on commit 71bfc2c

Please sign in to comment.