From 20ffed4ea4c04e0f7f4d9f6a13ddf75d6d5957e0 Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Fri, 23 Aug 2024 19:04:21 -0700 Subject: [PATCH] [FEAT] Fix projection pushdowns in actor pool project (#2680) Adds some fixes for performing projection pushdowns performed by actor pool project --------- Co-authored-by: Jay Chia --- .../src/logical_ops/actor_pool_project.rs | 29 ++- .../rules/push_down_projection.rs | 203 +++++++++++++++++- .../rules/split_actor_pool_projects.rs | 13 +- 3 files changed, 230 insertions(+), 15 deletions(-) diff --git a/src/daft-plan/src/logical_ops/actor_pool_project.rs b/src/daft-plan/src/logical_ops/actor_pool_project.rs index 63457cbff5..e991431b94 100644 --- a/src/daft-plan/src/logical_ops/actor_pool_project.rs +++ b/src/daft-plan/src/logical_ops/actor_pool_project.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use common_error::DaftError; use common_resource_request::ResourceRequest; use common_treenode::TreeNode; use daft_core::schema::{Schema, SchemaRef}; @@ -14,7 +15,7 @@ use itertools::Itertools; use snafu::ResultExt; use crate::{ - logical_plan::{CreationSnafu, Result}, + logical_plan::{CreationSnafu, Error, Result}, LogicalPlan, }; @@ -30,7 +31,33 @@ impl ActorPoolProject { pub(crate) fn try_new(input: Arc, projection: Vec) -> Result { let (projection, fields) = resolve_exprs(projection, input.schema().as_ref()).context(CreationSnafu)?; + + let num_stateful_udf_exprs: usize = projection + .iter() + .map(|expr| { + let mut num_stateful_udfs = 0; + expr.apply(|e| { + if matches!( + e.as_ref(), + Expr::Function { + func: FunctionExpr::Python(PythonUDF::Stateful(_)), + .. + } + ) { + num_stateful_udfs += 1; + } + Ok(common_treenode::TreeNodeRecursion::Continue) + }) + .unwrap(); + num_stateful_udfs + }) + .sum(); + if !num_stateful_udf_exprs == 1 { + return Err(Error::CreationError { source: DaftError::InternalError(format!("Expected ActorPoolProject to have exactly 1 stateful UDF expression but found: {num_stateful_udf_exprs}")) }); + } + let projected_schema = Schema::new(fields).context(CreationSnafu)?.into(); + Ok(ActorPoolProject { input, projection, diff --git a/src/daft-plan/src/logical_optimization/rules/push_down_projection.rs b/src/daft-plan/src/logical_optimization/rules/push_down_projection.rs index b6c0e3e58f..db88c468c1 100644 --- a/src/daft-plan/src/logical_optimization/rules/push_down_projection.rs +++ b/src/daft-plan/src/logical_optimization/rules/push_down_projection.rs @@ -4,8 +4,14 @@ use common_error::DaftResult; use common_treenode::TreeNode; use daft_core::{schema::Schema, JoinType}; -use daft_dsl::{col, optimization::replace_columns_with_expressions, Expr, ExprRef}; +use daft_dsl::{ + col, + functions::{python::PythonUDF, FunctionExpr}, + optimization::{get_required_columns, replace_columns_with_expressions, requires_computation}, + Expr, ExprRef, +}; use indexmap::IndexSet; +use itertools::Itertools; use crate::{ logical_ops::{ActorPoolProject, Aggregate, Join, Pivot, Project, Source}, @@ -230,6 +236,81 @@ impl PushDownProjection { } } LogicalPlan::ActorPoolProject(upstream_actor_pool_projection) => { + // Attempt to merge the current Projection into the upstream ActorPoolProject + // if there aren't any actual computations being performed in the Projection, and + // if each upstream column is used only once (no common subtrees) + if projection + .projection + .iter() + .all(|e| !requires_computation(e)) + { + // Only perform this optimization if all required column names are distinct + let required_column_names = projection + .projection + .iter() + .flat_map(get_required_columns) + .collect_vec(); + let mut all_required_column_names_distinct = true; + let mut distinct_required_column_names = IndexSet::new(); + for required_col_name in required_column_names { + if distinct_required_column_names.contains(&required_col_name) { + all_required_column_names_distinct = false; + break; + } else { + distinct_required_column_names.insert(required_col_name); + } + } + + if all_required_column_names_distinct { + let actor_pool_projection_map = upstream_actor_pool_projection + .projection + .iter() + .map(|e| (e.name().to_string(), e.clone())) + .collect::>(); + let new_actor_pool_projections = projection + .projection + .iter() + .map(|p| { + replace_columns_with_expressions( + p.clone(), + &actor_pool_projection_map, + ) + }) + .collect_vec(); + + // Construct either a new ActorPoolProject or Project, depending on whether the pruned projection still has StatefulUDFs + let new_plan = if new_actor_pool_projections.iter().any(|e| { + e.exists(|e| { + matches!( + e.as_ref(), + Expr::Function { + func: FunctionExpr::Python(PythonUDF::Stateful(_)), + .. + } + ) + }) + }) { + LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + upstream_actor_pool_projection.input.clone(), + new_actor_pool_projections, + )?) + .arced() + } else { + LogicalPlan::Project(Project::try_new( + upstream_actor_pool_projection.input.clone(), + new_actor_pool_projections, + )?) + .arced() + }; + + // Retry optimization now that the node is different. + let new_plan = self + .try_optimize(new_plan.clone())? + .or(Transformed::Yes(new_plan)); + return Ok(new_plan); + } + } + // Prune columns from the child ActorPoolProjection that are not used in this projection. let required_columns = &plan.required_columns()[0]; if required_columns.len() < upstream_schema.names().len() { @@ -437,6 +518,34 @@ impl PushDownProjection { } } + fn try_optimize_actor_pool_project( + &self, + actor_pool_project: &ActorPoolProject, + plan: Arc, + ) -> DaftResult>> { + // If this ActorPoolPorject prunes columns from its upstream, + // then explicitly create a projection to do so. + let upstream_plan = &actor_pool_project.input; + let upstream_schema = upstream_plan.schema(); + + let actor_pool_project_required_cols = &plan.required_columns()[0]; + if actor_pool_project_required_cols.len() < upstream_schema.names().len() { + let new_subprojection: LogicalPlan = { + let pushdown_column_exprs = actor_pool_project_required_cols + .iter() + .map(|s| col(s.as_str())) + .collect::>(); + + Project::try_new(upstream_plan.clone(), pushdown_column_exprs)?.into() + }; + + let new_actor_pool_project = plan.with_new_children(&[new_subprojection.into()]); + Ok(Transformed::Yes(new_actor_pool_project.into())) + } else { + Ok(Transformed::No(plan)) + } + } + fn try_optimize_aggregation( &self, aggregation: &Aggregate, @@ -543,6 +652,10 @@ impl OptimizerRule for PushDownProjection { fn try_optimize(&self, plan: Arc) -> DaftResult>> { match plan.as_ref() { LogicalPlan::Project(projection) => self.try_optimize_project(projection, plan.clone()), + // ActorPoolProjects also do column projection + LogicalPlan::ActorPoolProject(actor_pool_project) => { + self.try_optimize_actor_pool_project(actor_pool_project, plan.clone()) + } // Aggregations also do column projection LogicalPlan::Aggregate(aggregation) => { self.try_optimize_aggregation(aggregation, plan.clone()) @@ -810,7 +923,7 @@ mod tests { Field::new("b", DataType::Boolean), Field::new("c", DataType::Int64), ]); - let scan_node = dummy_scan_node(scan_op).build(); + let scan_node = dummy_scan_node(scan_op.clone()); let mock_stateful_udf = Expr::Function { func: FunctionExpr::Python(PythonUDF::Stateful(StatefulPythonUDF { name: Arc::new("my-udf".to_string()), @@ -826,7 +939,7 @@ mod tests { // Select the `udf_results` column, so the ActorPoolProject should apply column pruning to the other columns let actor_pool_project = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( - scan_node.clone(), + scan_node.build(), vec![col("a"), col("b"), mock_stateful_udf.alias("udf_results")], )?) .arced(); @@ -837,7 +950,11 @@ mod tests { .arced(); let expected_actor_pool_project = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( - scan_node.clone(), + dummy_scan_node_with_pushdowns( + scan_op, + Pushdowns::default().with_columns(Some(Arc::new(vec!["c".to_string()]))), + ) + .build(), vec![mock_stateful_udf.alias("udf_results")], )?) .arced(); @@ -846,6 +963,84 @@ mod tests { Ok(()) } + /// Projection<-ActorPoolProject<-ActorPoolProject prunes columns from both ActorPoolProjects + #[test] + fn test_projection_pushdown_into_double_actorpoolproject() -> DaftResult<()> { + use crate::logical_ops::ActorPoolProject; + use crate::logical_ops::Project; + use common_resource_request::ResourceRequest; + use daft_dsl::functions::python::{PythonUDF, StatefulPythonUDF}; + use daft_dsl::functions::FunctionExpr; + use daft_dsl::Expr; + + let scan_op = dummy_scan_operator(vec![ + Field::new("a", DataType::Int64), + Field::new("b", DataType::Boolean), + Field::new("c", DataType::Int64), + ]); + let scan_node = dummy_scan_node(scan_op.clone()).build(); + let mock_stateful_udf = Expr::Function { + func: FunctionExpr::Python(PythonUDF::Stateful(StatefulPythonUDF { + name: Arc::new("my-udf".to_string()), + num_expressions: 1, + return_dtype: DataType::Utf8, + resource_request: Some(ResourceRequest::default_cpu()), + batch_size: None, + concurrency: Some(8), + })), + inputs: vec![col("a")], + } + .arced(); + + // Select the `udf_results` column, so the ActorPoolProject should apply column pruning to the other columns + let plan = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + scan_node.clone(), + vec![col("a"), col("b"), mock_stateful_udf.alias("udf_results_0")], + )?) + .arced(); + let plan = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + plan, + vec![ + col("a"), + col("b"), + col("udf_results_0"), + mock_stateful_udf.alias("udf_results_1"), + ], + )?) + .arced(); + let plan = LogicalPlan::Project(Project::try_new( + plan, + vec![ + col("udf_results_0").alias("udf_results_0_alias"), + col("udf_results_1"), + ], + )?) + .arced(); + + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + dummy_scan_node_with_pushdowns( + scan_op, + Pushdowns::default().with_columns(Some(Arc::new(vec!["a".to_string()]))), + ) + .build(), + // col("b") is pruned + vec![mock_stateful_udf.alias("udf_results_0"), col("a")], + )?) + .arced(); + let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( + expected.clone(), + vec![ + // Absorbed a non-computational expression (alias) from the Projection + col("udf_results_0").alias("udf_results_0_alias"), + mock_stateful_udf.alias("udf_results_1"), + ], + )?) + .arced(); + + assert_optimized_plan_eq(plan, expected)?; + Ok(()) + } + /// Projection<-ActorPoolProject prunes ActorPoolProject entirely if the stateful projection column is pruned #[test] fn test_projection_pushdown_into_actorpoolproject_completely_removed() -> DaftResult<()> { diff --git a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs index 568c511519..3b86f39f92 100644 --- a/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs +++ b/src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs @@ -834,18 +834,13 @@ mod tests { let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( expected, vec![ - col("a"), create_stateful_udf(vec![col("a")]) .clone() .alias(intermediate_name), + col("a"), ], )?) .arced(); - let expected = LogicalPlan::Project(Project::try_new( - expected, - vec![col(intermediate_name), col("a")], - )?) - .arced(); let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( expected, vec![ @@ -933,11 +928,10 @@ mod tests { let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( expected, vec![ - col("a"), // TODO: This should be able to be pruned as well, but it seems Projection Pushdown isn't working as intended - col("b"), create_stateful_udf(vec![col("a")]) .clone() .alias(intermediate_name_0), + col("b"), ], )?) .arced(); @@ -1049,11 +1043,10 @@ mod tests { let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new( expected, vec![ - col("a"), // TODO: This should be pruned by Projection Pushdown, but isn't for some reason - col("b"), create_stateful_udf(vec![col("a")]) .clone() .alias(intermediate_name_0), + col("b"), ], )?) .arced();