diff --git a/Cargo.lock b/Cargo.lock index 98b1b48c2..84002b468 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4302,9 +4302,24 @@ name = "sparrow-backend" version = "0.11.0" dependencies = [ "arrow-schema", + "bitvec", + "derive_more", + "egg", + "enum-as-inner", + "error-stack", + "hashbrown 0.14.0", "index_vec", + "insta", + "itertools 0.11.0", + "rand 0.8.5", + "smallvec", + "sparrow-arrow", "sparrow-core", + "sparrow-expressions", + "sparrow-logical", "sparrow-physical", + "static_init", + "tracing", "uuid 1.4.1", ] diff --git a/crates/sparrow-backend/Cargo.toml b/crates/sparrow-backend/Cargo.toml index 937967dab..fc2dbb190 100644 --- a/crates/sparrow-backend/Cargo.toml +++ b/crates/sparrow-backend/Cargo.toml @@ -11,12 +11,27 @@ Compilation backend for Kaskada queries. [dependencies] arrow-schema.workspace = true +bitvec.workspace = true +derive_more.workspace = true +egg.workspace = true +enum-as-inner.workspace = true +error-stack.workspace = true +hashbrown.workspace = true index_vec.workspace = true +itertools.workspace = true +rand.workspace = true +smallvec.workspace = true +sparrow-arrow = { path = "../sparrow-arrow" } sparrow-core = { path = "../sparrow-core" } +sparrow-expressions = { path = "../sparrow-expressions" } +sparrow-logical = { path = "../sparrow-logical" } sparrow-physical = { path = "../sparrow-physical" } uuid.workspace = true +static_init.workspace = true +tracing.workspace = true [dev-dependencies] +insta.workspace = true [lib] doctest = false diff --git a/crates/sparrow-backend/src/compile.rs b/crates/sparrow-backend/src/compile.rs new file mode 100644 index 000000000..957133e2d --- /dev/null +++ b/crates/sparrow-backend/src/compile.rs @@ -0,0 +1,23 @@ +use std::borrow::Cow; + +use crate::logical_to_physical::LogicalToPhysical; +use crate::Error; + +/// Options for compiling logical plans to physical plans. +#[derive(Clone, Debug, Default)] +pub struct CompileOptions {} + +/// Compile a logical plan to a physical execution plan. +pub fn compile( + root: &sparrow_logical::ExprRef, + options: Option<&CompileOptions>, +) -> error_stack::Result { + let _options = if let Some(options) = options { + Cow::Borrowed(options) + } else { + Cow::Owned(CompileOptions::default()) + }; + + let physical = LogicalToPhysical::new().apply(root)?; + Ok(physical) +} diff --git a/crates/sparrow-backend/src/error.rs b/crates/sparrow-backend/src/error.rs new file mode 100644 index 000000000..3eb682980 --- /dev/null +++ b/crates/sparrow-backend/src/error.rs @@ -0,0 +1,23 @@ +use std::borrow::Cow; + +#[derive(derive_more::Display, Debug)] +pub enum Error { + #[display(fmt = "no instruction named '{_0}'")] + NoSuchInstruction(String), + #[display(fmt = "invalid logical plan: {_0}")] + InvalidLogicalPlan(Cow<'static, str>), + #[display(fmt = "internal error: {_0}")] + Internal(Cow<'static, str>), +} + +impl Error { + pub fn invalid_logical_plan(message: impl Into>) -> Self { + Self::InvalidLogicalPlan(message.into()) + } + + pub fn internal(message: impl Into>) -> Self { + Self::Internal(message.into()) + } +} + +impl error_stack::Context for Error {} diff --git a/crates/sparrow-backend/src/exprs.rs b/crates/sparrow-backend/src/exprs.rs new file mode 100644 index 000000000..12dbc3724 --- /dev/null +++ b/crates/sparrow-backend/src/exprs.rs @@ -0,0 +1,6 @@ +mod expr_lang; +mod expr_pattern; +mod expr_vec; + +pub(crate) use expr_pattern::*; +pub(crate) use expr_vec::*; diff --git a/crates/sparrow-backend/src/exprs/expr_lang.rs b/crates/sparrow-backend/src/exprs/expr_lang.rs new file mode 100644 index 000000000..b1cbfb10a --- /dev/null +++ b/crates/sparrow-backend/src/exprs/expr_lang.rs @@ -0,0 +1,76 @@ +use arrow_schema::DataType; +use egg::Id; +use smallvec::SmallVec; +use sparrow_arrow::scalar_value::ScalarValue; + +use crate::Error; + +#[derive(Hash, PartialOrd, Ord, PartialEq, Eq, Clone, Debug)] +pub(crate) struct ExprLang { + /// The name of the instruction being applied by this expression. + /// + /// Similar to an opcode or function. + /// + /// Generally, interning owned strings to the specific static strings is preferred. + pub name: &'static str, + /// Literal arguments to the expression. + pub literal_args: SmallVec<[ScalarValue; 2]>, + /// Arguments to the expression. + pub args: SmallVec<[egg::Id; 2]>, + // TODO: This includes the `DataType` in the enodes. + // This is necessary for ensuring that cast instructions to different types are treated + // as distinct, however it is potentially risky for writing simplifications, since the + // patterns won't have specific types. We may need to make this optional, so only the + // cast instruction has to specify it, and then rely on analysis to infer the types. + pub result_type: DataType, +} + +// It is weird that we need to implement `Display` for `ExprLang` to pretty print +// only the kind. But, this is a requirement of `egg`. +impl std::fmt::Display for ExprLang { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.name.fmt(f) + } +} + +impl egg::Language for ExprLang { + fn children(&self) -> &[egg::Id] { + &self.args + } + + fn children_mut(&mut self) -> &mut [egg::Id] { + &mut self.args + } + + fn matches(&self, other: &Self) -> bool { + // Note: As per + // https://egraphs-good.github.io/egg/egg/trait.Language.html#tymethod.matches, + // "This should only consider the operator, not the children `Id`s". + // + // egg itself will check whether the arguments are *equivalent*. + // + // Some instructions (especially `cast`) depend on the `result_type` to + // determine the operation being performed. + + self.name == other.name + && self.literal_args == other.literal_args + && self.result_type == other.result_type + } +} + +impl egg::FromOp for ExprLang { + type Error = error_stack::Report; + + fn from_op(op: &str, children: Vec) -> Result { + let name = sparrow_expressions::intern_name(op) + .ok_or_else(|| Error::NoSuchInstruction(op.to_owned()))?; + + let args = SmallVec::from_vec(children); + Ok(Self { + name, + literal_args: smallvec::smallvec![], + args, + result_type: DataType::Null, + }) + } +} diff --git a/crates/sparrow-backend/src/exprs/expr_pattern.rs b/crates/sparrow-backend/src/exprs/expr_pattern.rs new file mode 100644 index 000000000..ba6fd25b0 --- /dev/null +++ b/crates/sparrow-backend/src/exprs/expr_pattern.rs @@ -0,0 +1,190 @@ +use std::str::FromStr; + +use smallvec::{smallvec, SmallVec}; +use sparrow_arrow::scalar_value::ScalarValue; + +use crate::exprs::expr_lang::ExprLang; +use crate::exprs::ExprVec; +use crate::Error; + +/// A representation of an expression with "holes" that may be instantiated. +/// +/// For instance, while `(add (source["uuid"]) (literal 1))` is an expression that adds +/// the literal `1` to the identified `source`, `(add ?input (literal 1))` is a pattern +/// with a hole (placeholder) named `?input` that adds 1 to whatever we substitute in for +/// `?input`. +#[derive(Debug, Default)] +pub(crate) struct ExprPattern { + pub(super) expr: egg::PatternAst, +} + +#[static_init::dynamic] +pub(crate) static INPUT_VAR: egg::Var = egg::Var::from_str("?input").unwrap(); + +impl ExprPattern { + /// Create a new `ExprPattern` which is an identity expression referencing the input. + pub fn new_input() -> error_stack::Result { + let mut exprs = ExprPattern::default(); + exprs.add_var(*INPUT_VAR)?; + Ok(exprs) + } + + /// Create an expr pattern containing the given arguments. + /// + /// Returns the resulting pattern as well as the IDs of each of the arguments. + pub fn new_instruction( + name: &'static str, + literal_args: smallvec::SmallVec<[ScalarValue; 2]>, + args: Vec, + data_type: arrow_schema::DataType, + ) -> error_stack::Result { + // NOTE: This adds the pattern for each argument to an `EGraph` + // to add an instruction. This may be overkill, but does simplify + // (a) de-duplicating expressions that appear in multiple arguments + // (b) managing things like "all of these arguments should have a + // single input". + // + // If the use of the EGraph and extractor proves to be too expensive + // we could do this "combine while de-duplicating" ourselves. + let mut graph = egg::EGraph::, ()>::default(); + let mut arg_ids = SmallVec::with_capacity(args.len()); + for arg in args { + let id = graph.add_expr(&arg.expr); + arg_ids.push(id); + } + + // We can only extract a single expression, so we create one. + // This is why we need to know the instruction to create, rather than + // just returning the resulting `egg::Id` for each argument. + let output = graph.add(egg::ENodeOrVar::ENode(ExprLang { + name, + literal_args, + args: arg_ids, + result_type: data_type, + })); + + let cost_function = egg::AstSize; + let extractor = egg::Extractor::new(&graph, cost_function); + let (_best_cost, expr) = extractor.find_best(output); + + Ok(ExprPattern { expr }) + } + + /// Instantiate the pattern. + /// + /// Replaces `?input` with the `input` instruction. + pub fn instantiate( + &self, + input_type: arrow_schema::DataType, + ) -> error_stack::Result { + // Note: Instead of instantiating the pattern ourselves (replacing `?input` with the + // input expression) we instead make an `EGraph`, add the input expression, and then + // instantiate the pattern into that. + // + // This lets us extract the *best* (shortest) expression, rather than copying all of + // the pattern. One nice thing about this is that the `EGraph` will de-duplicate + // equivalent operations, etc. + let mut graph = egg::EGraph::::default(); + + let input_id = graph.add(ExprLang { + name: "input", + literal_args: smallvec![], + args: smallvec![], + result_type: input_type, + }); + let mut subst = egg::Subst::with_capacity(1); + subst.insert(*INPUT_VAR, input_id); + + let result = graph.add_instantiation(&self.expr, &subst); + + let cost_function = egg::AstSize; + let extractor = egg::Extractor::new(&graph, cost_function); + let (_best_cost, expr) = extractor.find_best(result); + + Ok(ExprVec { expr }) + } + + pub fn len(&self) -> usize { + self.expr.as_ref().len() + } + + /// Return true if this pattern just returns `?input`. + /// + /// This is used to identify expression patterns that "just pass the value through". + /// For instance, a projection step with the `identity` pattern is a noop and can + /// be removed. + pub fn is_identity(&self) -> bool { + // TODO: We may want to make this more intelligent and detect cases where + // the expression is *equivalent* to the identity. But for now, we think + // we can treat that as an optimization performed by a later pass. + let instructions = self.expr.as_ref(); + instructions.len() == 1 && instructions[0] == egg::ENodeOrVar::Var(*INPUT_VAR) + } + + /// Return the `egg::Id` corresponding to the last expression. + pub fn last_value(&self) -> egg::Id { + egg::Id::from(self.expr.as_ref().len() - 1) + } + + pub fn last(&self) -> &egg::ENodeOrVar { + self.expr.as_ref().last().expect("non empty") + } + + /// Add a physical expression. + /// + /// Args: + /// - name: The name of the operation to apply. + /// - literal_args: Literal arguments to the physical expression. + /// - args: The actual arguments to use. + /// + /// Returns the `egg::Id` referencing the expression. + pub fn add_instruction( + &mut self, + name: &'static str, + literal_args: smallvec::SmallVec<[ScalarValue; 2]>, + args: smallvec::SmallVec<[egg::Id; 2]>, + data_type: arrow_schema::DataType, + ) -> error_stack::Result { + let expr = self.expr.add(egg::ENodeOrVar::ENode(ExprLang { + name, + literal_args, + args, + result_type: data_type, + })); + + Ok(expr) + } + + /// Add a variable to the expression. + pub fn add_var(&mut self, var: egg::Var) -> error_stack::Result { + Ok(self.expr.add(egg::ENodeOrVar::Var(var))) + } + + /// Add the given pattern to this pattern, applying the substitution. + pub fn add_pattern( + &mut self, + pattern: &ExprPattern, + subst: &egg::Subst, + ) -> error_stack::Result { + let mut new_ids = Vec::with_capacity(pattern.len()); + for expr in pattern.expr.as_ref() { + let new_id = match expr { + egg::ENodeOrVar::Var(var) => match subst.get(*var) { + Some(existing_id) => *existing_id, + None => self.add_var(*var)?, + }, + egg::ENodeOrVar::ENode(node) => { + let mut node = node.clone(); + node.args = node + .args + .into_iter() + .map(|arg| new_ids[usize::from(arg)]) + .collect(); + self.expr.add(egg::ENodeOrVar::ENode(node)) + } + }; + new_ids.push(new_id); + } + Ok(*new_ids.last().unwrap()) + } +} diff --git a/crates/sparrow-backend/src/exprs/expr_vec.rs b/crates/sparrow-backend/src/exprs/expr_vec.rs new file mode 100644 index 000000000..292d0f035 --- /dev/null +++ b/crates/sparrow-backend/src/exprs/expr_vec.rs @@ -0,0 +1,48 @@ +use index_vec::IndexVec; +use sparrow_physical::ExprId; + +use crate::exprs::expr_lang::ExprLang; +use crate::Error; + +/// An linear representation of the expressions. +/// +/// Unlike `ExprGraph` (which is built on `egg::EGraph`) this does not +/// deduplicate the expressions internally. However, when added to a `ExprGraph` +/// or `EGraph` deduplication will be performed. +#[derive(Debug, PartialEq, Eq, Hash, Default)] +pub(crate) struct ExprVec { + pub(super) expr: egg::RecExpr, +} + +impl ExprVec { + pub fn empty() -> Self { + Self::default() + } + + pub fn len(&self) -> usize { + self.expr.as_ref().len() + } + + pub fn to_physical_exprs(&self) -> error_stack::Result { + let mut exprs = IndexVec::with_capacity(self.len()); + for expr in self.expr.as_ref() { + // let expr = self.graph[expr].clone(); + let args = expr + .args + .iter() + .map(|arg| { + let id: usize = (*arg).into(); + ExprId::from(id) + }) + .collect(); + + exprs.push(sparrow_physical::Expr { + name: std::borrow::Cow::Borrowed(expr.name), + literal_args: expr.literal_args.iter().cloned().collect(), + args, + result_type: expr.result_type.clone(), + }); + } + Ok(exprs) + } +} diff --git a/crates/sparrow-backend/src/lib.rs b/crates/sparrow-backend/src/lib.rs index c55a71004..9140112c6 100644 --- a/crates/sparrow-backend/src/lib.rs +++ b/crates/sparrow-backend/src/lib.rs @@ -14,6 +14,14 @@ //! It also performs optimizations on both the logical plans and the physical //! plans. +mod compile; +mod error; +mod exprs; +mod logical_to_physical; +// mod mutable_plan; +mod mutable_plan; mod pipeline_schedule; +pub use compile::*; +pub use error::*; pub use pipeline_schedule::*; diff --git a/crates/sparrow-backend/src/logical_to_physical.rs b/crates/sparrow-backend/src/logical_to_physical.rs new file mode 100644 index 000000000..e96647de4 --- /dev/null +++ b/crates/sparrow-backend/src/logical_to_physical.rs @@ -0,0 +1,355 @@ +//! Converts logical expressions to physical plans. +//! +//! The goal is to strike a balance between simplicity of the conversion +//! and simplicity of the initial physical plan. In general, this attempts +//! to minimize the number of physical steps produced at the potential cost +//! of duplicating the physical expressions executed within those steps. +//! +//! To do this, it uses a few strategies: +//! +//! 1. A hash-map from step-kind and inputs to step ID is used to ensure +//! redundant steps aren't created. Note this is a best effort heuristic: +//! `(merge a b)` and `(merge b a)` are seen as different steps for the +//! purposes of this "hashcons" map. +//! 2. Expressions within a step are first built up as a sequence of +//! expression parts (`PhysicalExpr`) before being added to the a step. +//! This allows different steps to put the expressions in the right place. +//! For instance, a `select` step can evaluate the predicate expression +//! directly. +//! +//! # Improvements +//! +//! The expression heuristic could be improved in a variety of ways: +//! 1. We could count how many times each logical expression was referenced +//! in different steps, in order to determine whether it was worth +//! introducing a separate projection. +//! 2. We could estimate the cost of physical expressions, in order to +//! to determine when it was worth introducing a separate projection. +//! For example, if the sequence of expressions exceeded a certain +//! threshold, or contained a UDF, we could introduce a projection. + +use arrow_schema::DataType; +use egg::ENodeOrVar; +use itertools::Itertools; +use smallvec::smallvec; +use sparrow_arrow::scalar_value::ScalarValue; + +use crate::exprs::{ExprPattern, ExprVec}; +use crate::mutable_plan::MutablePlan; +use crate::Error; + +/// Manages the conversion of logical expressions to physical plans. +/// +/// This doesn't produce optimized / final execution plans. Instead, it +/// produces a relatively naive physical plan that is then improved by +/// the remaining passes comprising the query compilation. +pub(super) struct LogicalToPhysical { + plan: MutablePlan, +} + +#[derive(Debug)] +struct Reference { + /// Reference to a specific step. + step_id: sparrow_physical::StepId, + /// Expression corresponding to the result of compilation. + /// + /// Will be the "identity" pattern containing only `?input` if this + /// should just use the result of the step. + expr: ExprPattern, +} + +impl Reference { + /// Create a reference to the input to the given step. + fn step_input(step_id: sparrow_physical::StepId) -> error_stack::Result { + Ok(Self { + step_id, + expr: ExprPattern::new_input()?, + }) + } +} + +impl LogicalToPhysical { + pub(super) fn new() -> Self { + Self { + plan: MutablePlan::empty(), + } + } + + /// Resolve the arguments to specific expressions in a step. + /// + /// This will add a `merge` step, if needed, so that all arguments are available in the + /// same step. + fn resolve_args( + &mut self, + args: Vec, + ) -> error_stack::Result<(sparrow_physical::StepId, Vec), Error> { + match args + .iter() + .map(|reference| reference.step_id) + .unique() + .sorted() + .at_most_one() + { + Ok(None) => todo!("handle all-literal arguments"), + Ok(Some(step)) => { + // There is only one step, which means all of the arguments are already + // available in the same place. We can use all of those patterns within + // a single step to provide the arguments. + let exprs = args + .into_iter() + .map(|arg| { + // It shouldn't be possible to have an argument refer to the entire result of the + // projection step at this point (while we're building the physical plan). + debug_assert_eq!(arg.step_id, step); + arg.expr + }) + .collect(); + Ok((step, exprs)) + } + Err(must_merge) => { + // First, create a merge step. + // + // The physical plan we produce includes N-way merges. These are much easier to work + // with while producing the plan since we don't need to recursively generate binary + // merges and pull the input through each merge. Additionally, it allows a later pass + // to identify opportunities for sharing common "sub-merges". + // + // Further, we can more efficiently implement a multi-way merge operation in terms + // of binary merges by choosing the order to perform the merges based on the number + // of rows. + let inputs: Vec<_> = must_merge.into_iter().collect(); + let merged_step = self.plan.get_or_create_step_id( + sparrow_physical::StepKind::Merge, + inputs.clone(), + ExprVec::empty(), + DataType::Null, + ); + let exprs = args + .into_iter() + .map(|arg| { + // Determine which input to the merge we want. + // Start with replacement containing `?input => (fieldref ?input "step_)") + let mut exprs = ExprPattern::new_input()?; + + let data_type = self.reference_type(&arg)?.clone(); + exprs.add_instruction( + "fieldref", + // Note: that `merge` should produce a record with a + // field for each merged step, identified by the + // step ID being merged. This may change depending + // on (a) optimizations that change steps and + // whether it is difficult to update these and (b) + // how we choose to implement merge. + // + // It may be more practical to use the "index of the + // step ID in the inputs" which would be more stable + // as we change step IDs. + smallvec![ScalarValue::Utf8(Some(format!("step_{}", arg.step_id)))], + smallvec![egg::Id::from(0)], + data_type, + )?; + + // Then add the actual expression, replacing the `?input` with the fieldref. + let mut subst = egg::Subst::with_capacity(1); + subst.insert(*crate::exprs::INPUT_VAR, egg::Id::from(1)); + exprs.add_pattern(&arg.expr, &subst)?; + Ok(exprs) + }) + .collect::>()?; + Ok((merged_step, exprs)) + } + } + } + + /// Recursive function visiting the nodes in a logical expression. + /// + /// The result is a `Reference` which indicates a specific step containing + /// the result as well as expressions to apply to that steps output in order + /// to prdouce the result. + fn visit(&mut self, node: &sparrow_logical::ExprRef) -> error_stack::Result { + match node.name.as_ref() { + "read" => { + // A logical scan instruction should have a single literal argument + // containing the UUID of the table to scan. + let table_id = *node + .literal_args + .get(0) + .ok_or_else(|| Error::invalid_logical_plan("scan missing table ID"))? + .as_uuid() + .ok_or_else(|| Error::invalid_logical_plan("scan contains non-UUID"))?; + + let step_id = self.plan.get_or_create_step_id( + sparrow_physical::StepKind::Read { + source_id: table_id, + }, + vec![], + ExprVec::empty(), + node.result_type.clone(), + ); + Ok(Reference::step_input(step_id)?) + } + "fieldref" => { + // A field ref contains two arguments. The first is the input record and the second + // should be a literal corresponding to the field name. + // + // Note this doesn't use the literal arguments in logical plans. + let mut input = self.visit(&node.args[0])?; + let Some(field) = node.args[1].literal_str_opt() else { + error_stack::bail!(Error::invalid_logical_plan( + "fieldref field was not a literal" + )) + }; + let field = ScalarValue::Utf8(Some(field.to_owned())); + let base = input.expr.last_value(); + input.expr.add_instruction( + "fieldref", + smallvec![field], + smallvec![base], + node.result_type.clone(), + )?; + Ok(input) + } + other => { + let Some(instruction) = sparrow_expressions::intern_name(other) else { + error_stack::bail!(Error::invalid_logical_plan(format!( + "unknown instruction '{other}' in logical plan" + ))); + }; + + let args = node + .args + .iter() + .map(|arg| self.visit(arg)) + .collect::>()?; + let (step_id, args) = self.resolve_args(args)?; + + assert_eq!( + node.literal_args.len(), + 0, + "literal arguments in logical not yet supported" + ); + + let literal_args = smallvec![]; + let expr = ExprPattern::new_instruction( + instruction, + literal_args, + args, + node.result_type.clone(), + )?; + Ok(Reference { step_id, expr }) + } + } + } + + pub(super) fn apply( + mut self, + root: &sparrow_logical::ExprRef, + ) -> error_stack::Result { + let result = self.visit(root)?; + + // Make sure the resulting step is the last step. + assert!(result.step_id == self.plan.last_step_id()); + + debug_assert_eq!(self.reference_type(&result)?, root.result_type); + + let output_id = if result.expr.is_identity() { + // The result is the output of the step. + // Nothing to do. + + // We could rely on the fact that the "identity" transform can be pretty + // trivially instantiated within a projection, but we do this check here + // for the case of the expression being the result of a non-projection + // step, since it lets us avoid adding the projection step. + result.step_id + } else { + // Add a projection for the remaining expressions. + // + // We could attempt to add them to the step if it is already a projection, but we'd + // need to deal with mutating the expressions in the projection. For now, we think it + // is easier to treat steps as immutable once created, and then combine steps in a + // later optimization pass of the physical plan. + let result_type = root.result_type.clone(); + self.plan.get_or_create_step_id( + sparrow_physical::StepKind::Project, + vec![result.step_id], + result.expr.instantiate(result_type)?, + root.result_type.clone(), + ) + }; + + let plan = self.plan.finish(output_id)?; + Ok(plan) + } + + fn reference_type(&self, reference: &Reference) -> error_stack::Result { + match reference.expr.last() { + ENodeOrVar::Var(var) if *var == *crate::exprs::INPUT_VAR => { + Ok(self.plan.step_result_type(reference.step_id).clone()) + } + ENodeOrVar::Var(other) => { + error_stack::bail!(Error::internal(format!( + "unrecognized variable {other:?} in argument to merge" + ))) + } + ENodeOrVar::ENode(expr) => Ok(expr.result_type.clone()), + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use arrow_schema::{DataType, Field}; + + #[test] + fn test_logical_to_physical_arithmetic() { + let struct_type = DataType::Struct( + vec![ + Field::new("x", DataType::Int64, false), + Field::new("y", DataType::Float64, false), + ] + .into(), + ); + + let uuid1 = uuid::uuid!("00000000-0000-0000-0000-000000000001"); + + let group = sparrow_logical::Grouping::new(0); + let source1 = Arc::new(sparrow_logical::Expr::new_uuid( + "read", + uuid1, + struct_type.clone(), + group, + )); + let x = Arc::new(sparrow_logical::Expr::new_literal_str("x")); + let y = Arc::new(sparrow_logical::Expr::new_literal_str("y")); + let x1 = Arc::new( + sparrow_logical::Expr::try_new("fieldref".into(), vec![source1.clone(), x]).unwrap(), + ); + let physical_x1 = LogicalToPhysical::new().apply(&x1).unwrap(); + insta::assert_yaml_snapshot!(physical_x1); + + let y1 = Arc::new( + sparrow_logical::Expr::try_new("fieldref".into(), vec![source1, y.clone()]).unwrap(), + ); + let add_x1_y1 = + Arc::new(sparrow_logical::Expr::try_new("add".into(), vec![x1.clone(), y1]).unwrap()); + let physical_add_x1_y1 = LogicalToPhysical::new().apply(&add_x1_y1).unwrap(); + insta::assert_yaml_snapshot!(physical_add_x1_y1); + + let uuid2 = uuid::uuid!("00000000-0000-0000-0000-000000000002"); + let source2 = Arc::new(sparrow_logical::Expr::new_uuid( + "read", + uuid2, + struct_type.clone(), + group, + )); + let y2 = + Arc::new(sparrow_logical::Expr::try_new("fieldref".into(), vec![source2, y]).unwrap()); + let add_x1_y2 = + Arc::new(sparrow_logical::Expr::try_new("add".into(), vec![x1, y2]).unwrap()); + let physical_add_x1_y2 = LogicalToPhysical::new().apply(&add_x1_y2).unwrap(); + insta::assert_yaml_snapshot!(physical_add_x1_y2); + } +} diff --git a/crates/sparrow-backend/src/mutable_plan.rs b/crates/sparrow-backend/src/mutable_plan.rs new file mode 100644 index 000000000..9977ff949 --- /dev/null +++ b/crates/sparrow-backend/src/mutable_plan.rs @@ -0,0 +1,129 @@ +use std::rc::Rc; + +use arrow_schema::DataType; +use bitvec::prelude::BitVec; +use hashbrown::hash_map::Entry; +use hashbrown::HashMap; +use index_vec::IndexVec; +use sparrow_physical::{StepId, StepKind}; + +use crate::exprs::ExprVec; +use crate::Error; + +/// Physical plan builder. +pub(crate) struct MutablePlan { + /// The mutable steps within the plan. + /// + /// Note that not all steps may end up being necessary in the final plan. + steps: IndexVec>, + + /// Map from "step key" to step IDs, to avoid creating steps with the + /// same kind and inputs. + /// + /// Note: This doesn't prevent all equivalent steps. For instance, + /// `(merge a (merge b c))` and `(merge b (merge a c))`. We could + /// use an `egraph` or more complicated logic to find the minimal + /// set of steps early. Instead, we plan to do so using a later + /// pass that attempts to eliminate unnecessary merges and steps. + step_hashcons: HashMap, StepId>, +} + +#[derive(Debug, Hash, Eq, PartialEq)] +pub(crate) struct Step { + /// The step kind. + pub kind: StepKind, + /// The IDs of steps used as input to this step builder. + pub inputs: Vec, + /// The expressions within this step. + pub exprs: ExprVec, + pub result_type: DataType, +} + +impl MutablePlan { + /// Create a new empty plan. + pub fn empty() -> Self { + Self { + steps: IndexVec::new(), + step_hashcons: HashMap::new(), + } + } + + pub fn get_or_create_step_id( + &mut self, + kind: StepKind, + inputs: Vec, + exprs: ExprVec, + result_type: DataType, + ) -> StepId { + let step = Rc::new(Step { + kind, + inputs, + exprs, + result_type, + }); + match self.step_hashcons.entry(step) { + Entry::Occupied(entry) => *entry.get(), + Entry::Vacant(entry) => { + let id = self.steps.next_idx(); + self.steps.push(entry.key().clone()); + *entry.insert(id) + } + } + } + + pub fn last_step_id(&self) -> StepId { + self.steps.last_idx() + } + + /// Create the physical plan for the given output. + pub fn finish(self, output: StepId) -> error_stack::Result { + // First, determine which steps we need. + // Since we know they were created in topologic order, we can just + // output them in the original order, but omitting any unnecessary + // steps. + let mut needed: BitVec = BitVec::with_capacity(self.steps.len()); + needed.resize(self.steps.len(), false); + needed.set(output.index(), true); + for (step_id, step) in self.steps.iter_enumerated().rev() { + if needed[step_id.index()] { + for input in step.inputs.iter() { + needed.set(input.index(), true); + } + } + } + + let mut step_ids = index_vec::index_vec![None; self.steps.len()]; + let mut steps = IndexVec::with_capacity(needed.count_ones()); + for needed_step in needed.iter_ones() { + let step = &self.steps[needed_step]; + + let id = steps.next_idx(); + let kind = step.kind; + let inputs = step + .inputs + .iter() + .map(|input| step_ids[*input].expect("needed")) + .collect(); + let exprs = step.exprs.to_physical_exprs()?; + let result_type = step.result_type.clone(); + steps.push(sparrow_physical::Step { + id, + kind, + inputs, + exprs, + result_type, + }); + step_ids.insert(needed_step.into(), Some(id)); + } + + let plan = sparrow_physical::Plan { + steps, + pipelines: vec![], + }; + Ok(plan) + } + + pub(crate) fn step_result_type(&self, step_id: StepId) -> &DataType { + &self.steps[step_id].result_type + } +} diff --git a/crates/sparrow-backend/src/snapshots/sparrow_backend__logical_to_physical__tests__logical_to_physical_arithmetic-2.snap b/crates/sparrow-backend/src/snapshots/sparrow_backend__logical_to_physical__tests__logical_to_physical_arithmetic-2.snap new file mode 100644 index 000000000..71ed0c14a --- /dev/null +++ b/crates/sparrow-backend/src/snapshots/sparrow_backend__logical_to_physical__tests__logical_to_physical_arithmetic-2.snap @@ -0,0 +1,60 @@ +--- +source: crates/sparrow-backend/src/logical_to_physical.rs +expression: physical_add_x1_y1 +--- +steps: + - id: 0 + kind: + read: + source_id: 00000000-0000-0000-0000-000000000001 + inputs: [] + result_type: + Struct: + - name: x + data_type: Int64 + nullable: false + dict_id: 0 + dict_is_ordered: false + metadata: {} + - name: y + data_type: Float64 + nullable: false + dict_id: 0 + dict_is_ordered: false + metadata: {} + exprs: [] + - id: 1 + kind: project + inputs: + - 0 + result_type: Float64 + exprs: + - name: input + literal_args: [] + args: [] + result_type: Float64 + - name: fieldref + literal_args: + - Utf8: y + args: + - 0 + result_type: Float64 + - name: fieldref + literal_args: + - Utf8: x + args: + - 0 + result_type: Int64 + - name: cast + literal_args: [] + args: + - 2 + result_type: Float64 + - name: add + literal_args: [] + args: + - 3 + - 1 + result_type: Float64 +pipelines: [] + diff --git a/crates/sparrow-backend/src/snapshots/sparrow_backend__logical_to_physical__tests__logical_to_physical_arithmetic-3.snap b/crates/sparrow-backend/src/snapshots/sparrow_backend__logical_to_physical__tests__logical_to_physical_arithmetic-3.snap new file mode 100644 index 000000000..5199870d1 --- /dev/null +++ b/crates/sparrow-backend/src/snapshots/sparrow_backend__logical_to_physical__tests__logical_to_physical_arithmetic-3.snap @@ -0,0 +1,99 @@ +--- +source: crates/sparrow-backend/src/logical_to_physical.rs +expression: physical_add_x1_y2 +--- +steps: + - id: 0 + kind: + read: + source_id: 00000000-0000-0000-0000-000000000001 + inputs: [] + result_type: + Struct: + - name: x + data_type: Int64 + nullable: false + dict_id: 0 + dict_is_ordered: false + metadata: {} + - name: y + data_type: Float64 + nullable: false + dict_id: 0 + dict_is_ordered: false + metadata: {} + exprs: [] + - id: 1 + kind: + read: + source_id: 00000000-0000-0000-0000-000000000002 + inputs: [] + result_type: + Struct: + - name: x + data_type: Int64 + nullable: false + dict_id: 0 + dict_is_ordered: false + metadata: {} + - name: y + data_type: Float64 + nullable: false + dict_id: 0 + dict_is_ordered: false + metadata: {} + exprs: [] + - id: 2 + kind: merge + inputs: + - 0 + - 1 + result_type: "Null" + exprs: [] + - id: 3 + kind: project + inputs: + - 2 + result_type: Float64 + exprs: + - name: input + literal_args: [] + args: [] + result_type: Float64 + - name: fieldref + literal_args: + - Utf8: step_1 + args: + - 0 + result_type: Float64 + - name: fieldref + literal_args: + - Utf8: y + args: + - 1 + result_type: Float64 + - name: fieldref + literal_args: + - Utf8: step_0 + args: + - 0 + result_type: Float64 + - name: fieldref + literal_args: + - Utf8: x + args: + - 3 + result_type: Int64 + - name: cast + literal_args: [] + args: + - 4 + result_type: Float64 + - name: add + literal_args: [] + args: + - 5 + - 2 + result_type: Float64 +pipelines: [] + diff --git a/crates/sparrow-backend/src/snapshots/sparrow_backend__logical_to_physical__tests__logical_to_physical_arithmetic.snap b/crates/sparrow-backend/src/snapshots/sparrow_backend__logical_to_physical__tests__logical_to_physical_arithmetic.snap new file mode 100644 index 000000000..9ac219a84 --- /dev/null +++ b/crates/sparrow-backend/src/snapshots/sparrow_backend__logical_to_physical__tests__logical_to_physical_arithmetic.snap @@ -0,0 +1,43 @@ +--- +source: crates/sparrow-backend/src/logical_to_physical.rs +expression: physical_x1 +--- +steps: + - id: 0 + kind: + read: + source_id: 00000000-0000-0000-0000-000000000001 + inputs: [] + result_type: + Struct: + - name: x + data_type: Int64 + nullable: false + dict_id: 0 + dict_is_ordered: false + metadata: {} + - name: y + data_type: Float64 + nullable: false + dict_id: 0 + dict_is_ordered: false + metadata: {} + exprs: [] + - id: 1 + kind: project + inputs: + - 0 + result_type: Int64 + exprs: + - name: input + literal_args: [] + args: [] + result_type: Int64 + - name: fieldref + literal_args: + - Utf8: x + args: + - 0 + result_type: Int64 +pipelines: [] + diff --git a/crates/sparrow-physical/src/step.rs b/crates/sparrow-physical/src/step.rs index 435b1819f..559d00111 100644 --- a/crates/sparrow-physical/src/step.rs +++ b/crates/sparrow-physical/src/step.rs @@ -50,6 +50,7 @@ pub struct Step { /// The kinds of steps that can occur in the physical plan. #[derive( Clone, + Copy, Debug, serde::Serialize, serde::Deserialize,