diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index fe57987a5c7c..828be6e99f11 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -58,7 +58,6 @@ use crate::executor::QueryExecutor; use crate::logical_optimizer::LogicalOptimizer; use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED}; use crate::physical_optimizer::PhysicalOptimizer; -use crate::physical_planner::PhysicalPlanner; use crate::physical_wrapper::PhysicalPlanWrapperRef; use crate::plan::LogicalPlan; use crate::planner::{DfLogicalPlanner, LogicalPlanner}; @@ -256,6 +255,61 @@ impl DatafusionQueryEngine { .context(CatalogSnafu)? .with_context(|| TableNotFoundSnafu { table: table_name }) } + + #[tracing::instrument(skip_all)] + async fn create_physical_plan( + &self, + ctx: &mut QueryEngineContext, + logical_plan: &LogicalPlan, + ) -> Result> { + let _timer = metrics::CREATE_PHYSICAL_ELAPSED.start_timer(); + match logical_plan { + LogicalPlan::DfPlan(df_plan) => { + let state = ctx.state(); + + // special handle EXPLAIN plan + if matches!(df_plan, DfLogicalPlan::Explain(_)) { + return state + .create_physical_plan(df_plan) + .await + .context(error::DatafusionSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu); + } + + // analyze first + let analyzed_plan = state + .analyzer() + .execute_and_check(df_plan.clone(), state.config_options(), |_, _| {}) + .context(error::DatafusionSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; + // skip optimize for MergeScan + let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan + && ext.node.name() == MergeScanLogicalPlan::name() + { + analyzed_plan.clone() + } else { + state + .optimizer() + .optimize(analyzed_plan, state, |_, _| {}) + .context(error::DatafusionSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)? + }; + + let physical_plan = state + .query_planner() + .create_physical_plan(&optimized_plan, state) + .await + .context(error::DatafusionSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; + + Ok(physical_plan) + } + } + } } #[async_trait] @@ -362,64 +416,6 @@ impl LogicalOptimizer for DatafusionQueryEngine { } } -#[async_trait::async_trait] -impl PhysicalPlanner for DatafusionQueryEngine { - #[tracing::instrument(skip_all)] - async fn create_physical_plan( - &self, - ctx: &mut QueryEngineContext, - logical_plan: &LogicalPlan, - ) -> Result> { - let _timer = metrics::CREATE_PHYSICAL_ELAPSED.start_timer(); - match logical_plan { - LogicalPlan::DfPlan(df_plan) => { - let state = ctx.state(); - - // special handle EXPLAIN plan - if matches!(df_plan, DfLogicalPlan::Explain(_)) { - return state - .create_physical_plan(df_plan) - .await - .context(error::DatafusionSnafu) - .map_err(BoxedError::new) - .context(QueryExecutionSnafu); - } - - // analyze first - let analyzed_plan = state - .analyzer() - .execute_and_check(df_plan.clone(), state.config_options(), |_, _| {}) - .context(error::DatafusionSnafu) - .map_err(BoxedError::new) - .context(QueryExecutionSnafu)?; - // skip optimize for MergeScan - let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan - && ext.node.name() == MergeScanLogicalPlan::name() - { - analyzed_plan.clone() - } else { - state - .optimizer() - .optimize(analyzed_plan, state, |_, _| {}) - .context(error::DatafusionSnafu) - .map_err(BoxedError::new) - .context(QueryExecutionSnafu)? - }; - - let physical_plan = state - .query_planner() - .create_physical_plan(&optimized_plan, state) - .await - .context(error::DatafusionSnafu) - .map_err(BoxedError::new) - .context(QueryExecutionSnafu)?; - - Ok(physical_plan) - } - } - } -} - impl PhysicalOptimizer for DatafusionQueryEngine { #[tracing::instrument(skip_all)] fn optimize_physical_plan( diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 0d4553c94456..d4e9dbae66b2 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -29,7 +29,6 @@ pub mod metrics; mod optimizer; pub mod parser; pub mod physical_optimizer; -pub mod physical_planner; pub mod physical_wrapper; pub mod plan; pub mod planner; diff --git a/src/query/src/physical_planner.rs b/src/query/src/physical_planner.rs deleted file mode 100644 index b527b8e72448..000000000000 --- a/src/query/src/physical_planner.rs +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use datafusion::physical_plan::ExecutionPlan; - -use crate::error::Result; -use crate::plan::LogicalPlan; -use crate::query_engine::QueryEngineContext; - -/// Physical query planner that converts a `LogicalPlan` to an -/// `ExecutionPlan` suitable for execution. -#[async_trait::async_trait] -pub trait PhysicalPlanner { - /// Create a physical plan from a logical plan - async fn create_physical_plan( - &self, - ctx: &mut QueryEngineContext, - logical_plan: &LogicalPlan, - ) -> Result>; -}