Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: split up the expression crate #819

Merged
merged 4 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
{
"rust-analyzer.cargo.features": "all",
"editor.formatOnSave": true,
"rust-analyzer.linkedProjects": [
bjchambers marked this conversation as resolved.
Show resolved Hide resolved
"./crates/sparrow-interfaces/Cargo.toml"
],
}
41 changes: 41 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/sparrow-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ index_vec.workspace = true
itertools.workspace = true
smallvec.workspace = true
sparrow-arrow = { path = "../sparrow-arrow" }
sparrow-expressions = { path = "../sparrow-expressions" }
sparrow-interfaces = { path = "../sparrow-interfaces" }
sparrow-logical = { path = "../sparrow-logical" }
sparrow-physical = { path = "../sparrow-physical" }
uuid.workspace = true
Expand All @@ -29,6 +29,7 @@ tracing.workspace = true

[dev-dependencies]
insta.workspace = true
sparrow-expressions = { path = "../sparrow-expressions" }

[lib]
doctest = false
2 changes: 1 addition & 1 deletion crates/sparrow-backend/src/exprs/expr_lang.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl egg::FromOp for ExprLang {
type Error = error_stack::Report<Error>;

fn from_op(op: &str, children: Vec<Id>) -> Result<Self, Self::Error> {
let name = sparrow_expressions::intern_name(op)
let name = sparrow_interfaces::expression::intern_name(op)
.ok_or_else(|| Error::NoSuchInstruction(op.to_owned()))?;

let args = SmallVec::from_vec(children);
Expand Down
4 changes: 3 additions & 1 deletion crates/sparrow-backend/src/logical_to_physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl LogicalToPhysical {
Ok(input)
}
other => {
let Some(instruction) = sparrow_expressions::intern_name(other) else {
let Some(instruction) = sparrow_interfaces::expression::intern_name(other) else {
error_stack::bail!(Error::invalid_logical_plan(format!(
"unknown instruction '{other}' in logical plan"
)));
Expand Down Expand Up @@ -402,6 +402,8 @@ mod tests {

#[test]
fn test_logical_to_physical_arithmetic() {
sparrow_expressions::ensure_registered();

let struct_type = DataType::Struct(
vec![
Field::new("x", DataType::Int64, false),
Expand Down
42 changes: 42 additions & 0 deletions crates/sparrow-expr-execution/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
[package]
name = "sparrow-expr-execution"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
publish = false
description = """
Execution of expressions over record batches.
"""

[dependencies]
arrow-arith.workspace = true
arrow-array.workspace = true
bjchambers marked this conversation as resolved.
Show resolved Hide resolved
arrow-buffer.workspace = true
arrow-cast.workspace = true
arrow-data.workspace = true
arrow-ord.workspace = true
arrow-schema.workspace = true
arrow-select.workspace = true
arrow-string.workspace = true
derive_more.workspace = true
error-stack.workspace = true
hashbrown.workspace = true
index_vec.workspace = true
inventory.workspace = true
itertools.workspace = true
num.workspace = true
serde_json.workspace = true
sparrow-arrow = { path = "../sparrow-arrow" }
sparrow-batch = { path = "../sparrow-batch" }
sparrow-interfaces = { path = "../sparrow-interfaces" }
sparrow-physical = { path = "../sparrow-physical" }
static_init.workspace = true
substring.workspace = true

[dev-dependencies]
approx.workspace = true
sparrow-expressions = { path = "../sparrow-expressions" }

[lib]
doctest = false
103 changes: 103 additions & 0 deletions crates/sparrow-expr-execution/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#![warn(
rust_2018_idioms,
nonstandard_style,
future_incompatible,
clippy::mod_module_files,
clippy::print_stdout,
clippy::print_stderr,
clippy::undocumented_unsafe_blocks
)]

//! Execution of expressions.
//!
//! Creating an [ExpressionExecutor] takes a sequence of
//! [expressions][sparrow_physical::Expr] and applies them to an input batch to
//! produce a sequence of computed columns. Expressions may reference columns
//! from the input to the expressions (typically the input to the step executing
//! the expressions), create a column from a literal value, or apply an
//! expression to columns computed by earlier expressions.

use arrow_array::ArrayRef;
use arrow_schema::DataType;
use index_vec::IndexVec;
use sparrow_batch::Batch;

use sparrow_interfaces::expression::{Error, Evaluator, StaticArg, StaticInfo, WorkArea};

/// Executes the expressions within an operation.
///
/// Each operation produces a stream of inputs (`BoxedInputBatch`).
/// Expressions create columns from the input and by evaluating instructions
/// against existing columns.
pub struct ExpressionExecutor {
evaluators: Vec<Box<dyn Evaluator>>,
output_type: DataType,
}

impl ExpressionExecutor {
/// Create an `ExpressionExecutor` for the given expressions.
pub fn try_new(exprs: &[sparrow_physical::Expr]) -> error_stack::Result<Self, Error> {
let evaluators = create_evaluators(exprs)?;
let output_type = exprs
.last()
.expect("at least one expression")
.result_type
.clone();
Ok(Self {
evaluators,
output_type,
})
}

/// Execute the expressions on the given input batch.
///
/// The result is a vector containing the results of each expression.
pub fn execute(&self, input: &Batch) -> error_stack::Result<ArrayRef, Error> {
let mut work_area = WorkArea::with_capacity(input, self.evaluators.len());
for evaluator in self.evaluators.iter() {
let output = evaluator.evaluate(&work_area)?;
work_area.expressions.push(output);
}
Ok(work_area.expressions.pop().unwrap())
}

pub fn output_type(&self) -> &DataType {
&self.output_type
}
}

/// Create the evaluators for the given expressions.
fn create_evaluators(
exprs: &[sparrow_physical::Expr],
) -> error_stack::Result<Vec<Box<dyn Evaluator>>, Error> {
// Static information (index in expressions, type, etc.) for each expression in `exprs`.
// This is used to locate the information about arguments to the remaining expressions.
//
// It is only needed while instantiating the evaluators.
let mut expressions = IndexVec::with_capacity(exprs.len());
let mut evaluators = Vec::with_capacity(exprs.len());
for (index, expr) in exprs.iter().enumerate() {
let args = expr.args.iter().map(|index| &expressions[*index]).collect();
let info = StaticInfo {
name: &expr.name,
literal_args: &expr.literal_args,
args,
result_type: &expr.result_type,
};

evaluators.push(sparrow_interfaces::expression::create_evaluator(info)?);
expressions.push(StaticArg {
index,
data_type: &expr.result_type,
});
}
Ok(evaluators)
}

#[cfg(test)]
mod tests {
#[test]
fn test_expressions_registered() {
sparrow_expressions::ensure_registered();
}
}
3 changes: 2 additions & 1 deletion crates/sparrow-expressions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition.workspace = true
license.workspace = true
publish = false
description = """
Execution of expressions over record batches.
Implementation of stateless expressions over batches.
bjchambers marked this conversation as resolved.
Show resolved Hide resolved
"""

[dependencies]
Expand All @@ -29,6 +29,7 @@ num.workspace = true
serde_json.workspace = true
sparrow-arrow = { path = "../sparrow-arrow" }
sparrow-batch = { path = "../sparrow-batch" }
sparrow-interfaces = { path = "../sparrow-interfaces" }
sparrow-physical = { path = "../sparrow-physical" }
static_init.workspace = true
substring.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ use error_stack::{IntoReport, IntoReportCompat, ResultExt};
use sparrow_arrow::downcast::downcast_primitive_array;
use sparrow_arrow::utils::make_null_array;

use crate::evaluator::Evaluator;
use crate::evaluators::time::i64_to_two_i32;
use crate::evaluators::{EvaluatorFactory, StaticInfo};
use crate::values::ArrayRefValue;
use crate::work_area::WorkArea;
use crate::Error;
use crate::time::i64_to_two_i32;
use sparrow_interfaces::expression::{
ArrayRefValue, Error, Evaluator, EvaluatorFactory, StaticInfo, WorkArea,
};

inventory::submit!(EvaluatorFactory {
name: "cast",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use arrow_array::{ArrayRef, BooleanArray};
use error_stack::{IntoReport, ResultExt};

use crate::evaluator::Evaluator;
use crate::evaluators::StaticInfo;
use crate::values::ArrayRefValue;
use crate::Error;
use sparrow_interfaces::expression::{ArrayRefValue, Error, Evaluator, StaticInfo};

inventory::submit!(crate::evaluators::EvaluatorFactory {
inventory::submit!(sparrow_interfaces::expression::EvaluatorFactory {
name: "coalesce",
create: &create
});
Expand All @@ -19,7 +16,7 @@ struct CoalesceEvaluator {
impl Evaluator for CoalesceEvaluator {
fn evaluate(
&self,
work_area: &crate::work_area::WorkArea<'_>,
work_area: &sparrow_interfaces::expression::WorkArea<'_>,
) -> error_stack::Result<ArrayRef, Error> {
let mut inputs = self.inputs.iter().copied();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@ use std::sync::Arc;
use arrow_array::{ArrayRef, ArrowNumericType};
use error_stack::{IntoReport, ResultExt};

use crate::evaluator::Evaluator;
use crate::evaluators::StaticInfo;
use crate::values::PrimitiveValue;
use crate::work_area::WorkArea;
use crate::Error;
use sparrow_interfaces::expression::{Error, Evaluator, PrimitiveValue, StaticInfo, WorkArea};

inventory::submit!(crate::evaluators::EvaluatorFactory {
inventory::submit!(sparrow_interfaces::expression::EvaluatorFactory {
name: "gt_primitive",
create: &crate::evaluators::macros::create_primitive_evaluator!(0, create, ordered)
create: &crate::macros::create_primitive_evaluator!(0, create, ordered)
});

/// Evaluator for the `gt_primitive` (greater than) instruction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@ use std::sync::Arc;
use arrow_array::{ArrayRef, ArrowNumericType};
use error_stack::{IntoReport, ResultExt};

use crate::evaluator::Evaluator;
use crate::evaluators::StaticInfo;
use crate::values::PrimitiveValue;
use crate::work_area::WorkArea;
use crate::Error;
use sparrow_interfaces::expression::{Error, Evaluator, PrimitiveValue, StaticInfo, WorkArea};

inventory::submit!(crate::evaluators::EvaluatorFactory {
inventory::submit!(sparrow_interfaces::expression::EvaluatorFactory {
name: "gte_primitive",
create: &crate::evaluators::macros::create_primitive_evaluator!(0, create, ordered)
create: &crate::macros::create_primitive_evaluator!(0, create, ordered)
});

/// Evaluator for the `gte_primitive` (greater than or equal) instruction.
Expand Down
Loading
Loading