Skip to content

Commit

Permalink
[FEAT] Fix projection pushdowns in actor pool project (#2680)
Browse files Browse the repository at this point in the history
Adds some fixes for performing projection pushdowns performed by actor
pool project

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Aug 24, 2024
1 parent bf5c853 commit 20ffed4
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 15 deletions.
29 changes: 28 additions & 1 deletion src/daft-plan/src/logical_ops/actor_pool_project.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use common_error::DaftError;
use common_resource_request::ResourceRequest;
use common_treenode::TreeNode;
use daft_core::schema::{Schema, SchemaRef};
Expand All @@ -14,7 +15,7 @@ use itertools::Itertools;
use snafu::ResultExt;

use crate::{
logical_plan::{CreationSnafu, Result},
logical_plan::{CreationSnafu, Error, Result},
LogicalPlan,
};

Expand All @@ -30,7 +31,33 @@ impl ActorPoolProject {
pub(crate) fn try_new(input: Arc<LogicalPlan>, projection: Vec<ExprRef>) -> Result<Self> {
let (projection, fields) =
resolve_exprs(projection, input.schema().as_ref()).context(CreationSnafu)?;

let num_stateful_udf_exprs: usize = projection
.iter()
.map(|expr| {
let mut num_stateful_udfs = 0;
expr.apply(|e| {
if matches!(
e.as_ref(),
Expr::Function {
func: FunctionExpr::Python(PythonUDF::Stateful(_)),
..
}
) {
num_stateful_udfs += 1;
}
Ok(common_treenode::TreeNodeRecursion::Continue)
})
.unwrap();
num_stateful_udfs
})
.sum();
if !num_stateful_udf_exprs == 1 {
return Err(Error::CreationError { source: DaftError::InternalError(format!("Expected ActorPoolProject to have exactly 1 stateful UDF expression but found: {num_stateful_udf_exprs}")) });
}

let projected_schema = Schema::new(fields).context(CreationSnafu)?.into();

Ok(ActorPoolProject {
input,
projection,
Expand Down
203 changes: 199 additions & 4 deletions src/daft-plan/src/logical_optimization/rules/push_down_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@ use common_error::DaftResult;

use common_treenode::TreeNode;
use daft_core::{schema::Schema, JoinType};
use daft_dsl::{col, optimization::replace_columns_with_expressions, Expr, ExprRef};
use daft_dsl::{
col,
functions::{python::PythonUDF, FunctionExpr},
optimization::{get_required_columns, replace_columns_with_expressions, requires_computation},
Expr, ExprRef,
};
use indexmap::IndexSet;
use itertools::Itertools;

use crate::{
logical_ops::{ActorPoolProject, Aggregate, Join, Pivot, Project, Source},
Expand Down Expand Up @@ -230,6 +236,81 @@ impl PushDownProjection {
}
}
LogicalPlan::ActorPoolProject(upstream_actor_pool_projection) => {
// Attempt to merge the current Projection into the upstream ActorPoolProject
// if there aren't any actual computations being performed in the Projection, and
// if each upstream column is used only once (no common subtrees)
if projection
.projection
.iter()
.all(|e| !requires_computation(e))
{
// Only perform this optimization if all required column names are distinct
let required_column_names = projection
.projection
.iter()
.flat_map(get_required_columns)
.collect_vec();
let mut all_required_column_names_distinct = true;
let mut distinct_required_column_names = IndexSet::new();
for required_col_name in required_column_names {
if distinct_required_column_names.contains(&required_col_name) {
all_required_column_names_distinct = false;
break;
} else {
distinct_required_column_names.insert(required_col_name);
}
}

if all_required_column_names_distinct {
let actor_pool_projection_map = upstream_actor_pool_projection
.projection
.iter()
.map(|e| (e.name().to_string(), e.clone()))
.collect::<HashMap<String, ExprRef>>();
let new_actor_pool_projections = projection
.projection
.iter()
.map(|p| {
replace_columns_with_expressions(
p.clone(),
&actor_pool_projection_map,
)
})
.collect_vec();

// Construct either a new ActorPoolProject or Project, depending on whether the pruned projection still has StatefulUDFs
let new_plan = if new_actor_pool_projections.iter().any(|e| {
e.exists(|e| {
matches!(
e.as_ref(),
Expr::Function {
func: FunctionExpr::Python(PythonUDF::Stateful(_)),
..
}
)
})
}) {
LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
upstream_actor_pool_projection.input.clone(),
new_actor_pool_projections,
)?)
.arced()
} else {
LogicalPlan::Project(Project::try_new(
upstream_actor_pool_projection.input.clone(),
new_actor_pool_projections,
)?)
.arced()
};

// Retry optimization now that the node is different.
let new_plan = self
.try_optimize(new_plan.clone())?
.or(Transformed::Yes(new_plan));
return Ok(new_plan);
}
}

// Prune columns from the child ActorPoolProjection that are not used in this projection.
let required_columns = &plan.required_columns()[0];
if required_columns.len() < upstream_schema.names().len() {
Expand Down Expand Up @@ -437,6 +518,34 @@ impl PushDownProjection {
}
}

fn try_optimize_actor_pool_project(
&self,
actor_pool_project: &ActorPoolProject,
plan: Arc<LogicalPlan>,
) -> DaftResult<Transformed<Arc<LogicalPlan>>> {
// If this ActorPoolPorject prunes columns from its upstream,
// then explicitly create a projection to do so.
let upstream_plan = &actor_pool_project.input;
let upstream_schema = upstream_plan.schema();

let actor_pool_project_required_cols = &plan.required_columns()[0];
if actor_pool_project_required_cols.len() < upstream_schema.names().len() {
let new_subprojection: LogicalPlan = {
let pushdown_column_exprs = actor_pool_project_required_cols
.iter()
.map(|s| col(s.as_str()))
.collect::<Vec<_>>();

Project::try_new(upstream_plan.clone(), pushdown_column_exprs)?.into()
};

let new_actor_pool_project = plan.with_new_children(&[new_subprojection.into()]);
Ok(Transformed::Yes(new_actor_pool_project.into()))
} else {
Ok(Transformed::No(plan))
}
}

fn try_optimize_aggregation(
&self,
aggregation: &Aggregate,
Expand Down Expand Up @@ -543,6 +652,10 @@ impl OptimizerRule for PushDownProjection {
fn try_optimize(&self, plan: Arc<LogicalPlan>) -> DaftResult<Transformed<Arc<LogicalPlan>>> {
match plan.as_ref() {
LogicalPlan::Project(projection) => self.try_optimize_project(projection, plan.clone()),
// ActorPoolProjects also do column projection
LogicalPlan::ActorPoolProject(actor_pool_project) => {
self.try_optimize_actor_pool_project(actor_pool_project, plan.clone())
}
// Aggregations also do column projection
LogicalPlan::Aggregate(aggregation) => {
self.try_optimize_aggregation(aggregation, plan.clone())
Expand Down Expand Up @@ -810,7 +923,7 @@ mod tests {
Field::new("b", DataType::Boolean),
Field::new("c", DataType::Int64),
]);
let scan_node = dummy_scan_node(scan_op).build();
let scan_node = dummy_scan_node(scan_op.clone());
let mock_stateful_udf = Expr::Function {
func: FunctionExpr::Python(PythonUDF::Stateful(StatefulPythonUDF {
name: Arc::new("my-udf".to_string()),
Expand All @@ -826,7 +939,7 @@ mod tests {

// Select the `udf_results` column, so the ActorPoolProject should apply column pruning to the other columns
let actor_pool_project = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
scan_node.clone(),
scan_node.build(),
vec![col("a"), col("b"), mock_stateful_udf.alias("udf_results")],
)?)
.arced();
Expand All @@ -837,7 +950,11 @@ mod tests {
.arced();

let expected_actor_pool_project = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
scan_node.clone(),
dummy_scan_node_with_pushdowns(
scan_op,
Pushdowns::default().with_columns(Some(Arc::new(vec!["c".to_string()]))),
)
.build(),
vec![mock_stateful_udf.alias("udf_results")],
)?)
.arced();
Expand All @@ -846,6 +963,84 @@ mod tests {
Ok(())
}

/// Projection<-ActorPoolProject<-ActorPoolProject prunes columns from both ActorPoolProjects
#[test]
fn test_projection_pushdown_into_double_actorpoolproject() -> DaftResult<()> {
use crate::logical_ops::ActorPoolProject;
use crate::logical_ops::Project;
use common_resource_request::ResourceRequest;
use daft_dsl::functions::python::{PythonUDF, StatefulPythonUDF};
use daft_dsl::functions::FunctionExpr;
use daft_dsl::Expr;

let scan_op = dummy_scan_operator(vec![
Field::new("a", DataType::Int64),
Field::new("b", DataType::Boolean),
Field::new("c", DataType::Int64),
]);
let scan_node = dummy_scan_node(scan_op.clone()).build();
let mock_stateful_udf = Expr::Function {
func: FunctionExpr::Python(PythonUDF::Stateful(StatefulPythonUDF {
name: Arc::new("my-udf".to_string()),
num_expressions: 1,
return_dtype: DataType::Utf8,
resource_request: Some(ResourceRequest::default_cpu()),
batch_size: None,
concurrency: Some(8),
})),
inputs: vec![col("a")],
}
.arced();

// Select the `udf_results` column, so the ActorPoolProject should apply column pruning to the other columns
let plan = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
scan_node.clone(),
vec![col("a"), col("b"), mock_stateful_udf.alias("udf_results_0")],
)?)
.arced();
let plan = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
plan,
vec![
col("a"),
col("b"),
col("udf_results_0"),
mock_stateful_udf.alias("udf_results_1"),
],
)?)
.arced();
let plan = LogicalPlan::Project(Project::try_new(
plan,
vec![
col("udf_results_0").alias("udf_results_0_alias"),
col("udf_results_1"),
],
)?)
.arced();

let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
dummy_scan_node_with_pushdowns(
scan_op,
Pushdowns::default().with_columns(Some(Arc::new(vec!["a".to_string()]))),
)
.build(),
// col("b") is pruned
vec![mock_stateful_udf.alias("udf_results_0"), col("a")],
)?)
.arced();
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected.clone(),
vec![
// Absorbed a non-computational expression (alias) from the Projection
col("udf_results_0").alias("udf_results_0_alias"),
mock_stateful_udf.alias("udf_results_1"),
],
)?)
.arced();

assert_optimized_plan_eq(plan, expected)?;
Ok(())
}

/// Projection<-ActorPoolProject prunes ActorPoolProject entirely if the stateful projection column is pruned
#[test]
fn test_projection_pushdown_into_actorpoolproject_completely_removed() -> DaftResult<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -834,18 +834,13 @@ mod tests {
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected,
vec![
col("a"),
create_stateful_udf(vec![col("a")])
.clone()
.alias(intermediate_name),
col("a"),
],
)?)
.arced();
let expected = LogicalPlan::Project(Project::try_new(
expected,
vec![col(intermediate_name), col("a")],
)?)
.arced();
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected,
vec![
Expand Down Expand Up @@ -933,11 +928,10 @@ mod tests {
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected,
vec![
col("a"), // TODO: This should be able to be pruned as well, but it seems Projection Pushdown isn't working as intended
col("b"),
create_stateful_udf(vec![col("a")])
.clone()
.alias(intermediate_name_0),
col("b"),
],
)?)
.arced();
Expand Down Expand Up @@ -1049,11 +1043,10 @@ mod tests {
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected,
vec![
col("a"), // TODO: This should be pruned by Projection Pushdown, but isn't for some reason
col("b"),
create_stateful_udf(vec![col("a")])
.clone()
.alias(intermediate_name_0),
col("b"),
],
)?)
.arced();
Expand Down

0 comments on commit 20ffed4

Please sign in to comment.