Skip to content

Commit

Permalink
refactor(query): Remove PhysicalPlanner trait (#4412)
Browse files Browse the repository at this point in the history
  • Loading branch information
leaf-potato authored Jul 24, 2024
1 parent 547730a commit f787265
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 93 deletions.
114 changes: 55 additions & 59 deletions src/query/src/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use crate::executor::QueryExecutor;
use crate::logical_optimizer::LogicalOptimizer;
use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED};
use crate::physical_optimizer::PhysicalOptimizer;
use crate::physical_planner::PhysicalPlanner;
use crate::physical_wrapper::PhysicalPlanWrapperRef;
use crate::plan::LogicalPlan;
use crate::planner::{DfLogicalPlanner, LogicalPlanner};
Expand Down Expand Up @@ -256,6 +255,61 @@ impl DatafusionQueryEngine {
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu { table: table_name })
}

#[tracing::instrument(skip_all)]
async fn create_physical_plan(
&self,
ctx: &mut QueryEngineContext,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
let _timer = metrics::CREATE_PHYSICAL_ELAPSED.start_timer();
match logical_plan {
LogicalPlan::DfPlan(df_plan) => {
let state = ctx.state();

// special handle EXPLAIN plan
if matches!(df_plan, DfLogicalPlan::Explain(_)) {
return state
.create_physical_plan(df_plan)
.await
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu);
}

// analyze first
let analyzed_plan = state
.analyzer()
.execute_and_check(df_plan.clone(), state.config_options(), |_, _| {})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
// skip optimize for MergeScan
let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan
&& ext.node.name() == MergeScanLogicalPlan::name()
{
analyzed_plan.clone()
} else {
state
.optimizer()
.optimize(analyzed_plan, state, |_, _| {})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?
};

let physical_plan = state
.query_planner()
.create_physical_plan(&optimized_plan, state)
.await
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;

Ok(physical_plan)
}
}
}
}

#[async_trait]
Expand Down Expand Up @@ -362,64 +416,6 @@ impl LogicalOptimizer for DatafusionQueryEngine {
}
}

#[async_trait::async_trait]
impl PhysicalPlanner for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
async fn create_physical_plan(
&self,
ctx: &mut QueryEngineContext,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
let _timer = metrics::CREATE_PHYSICAL_ELAPSED.start_timer();
match logical_plan {
LogicalPlan::DfPlan(df_plan) => {
let state = ctx.state();

// special handle EXPLAIN plan
if matches!(df_plan, DfLogicalPlan::Explain(_)) {
return state
.create_physical_plan(df_plan)
.await
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu);
}

// analyze first
let analyzed_plan = state
.analyzer()
.execute_and_check(df_plan.clone(), state.config_options(), |_, _| {})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
// skip optimize for MergeScan
let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan
&& ext.node.name() == MergeScanLogicalPlan::name()
{
analyzed_plan.clone()
} else {
state
.optimizer()
.optimize(analyzed_plan, state, |_, _| {})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?
};

let physical_plan = state
.query_planner()
.create_physical_plan(&optimized_plan, state)
.await
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;

Ok(physical_plan)
}
}
}
}

impl PhysicalOptimizer for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
fn optimize_physical_plan(
Expand Down
1 change: 0 additions & 1 deletion src/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ pub mod metrics;
mod optimizer;
pub mod parser;
pub mod physical_optimizer;
pub mod physical_planner;
pub mod physical_wrapper;
pub mod plan;
pub mod planner;
Expand Down
33 changes: 0 additions & 33 deletions src/query/src/physical_planner.rs

This file was deleted.

0 comments on commit f787265

Please sign in to comment.