From 3759207bf9e724f7b29f38065aa93131ff2615c9 Mon Sep 17 00:00:00 2001 From: Sky Fan <3374614481@qq.com> Date: Fri, 5 Jan 2024 11:13:28 +0800 Subject: [PATCH] feat: support explain fragments for update (#14227) * feat: support explain fragments for update * fix * fix * fix * fix --- .../src/interpreters/interpreter_explain.rs | 25 ++++++++ .../src/interpreters/interpreter_update.rs | 63 ++++++++++++------- .../suites/mode/cluster/update.sql | 54 ++++++++++++++++ 3 files changed, 118 insertions(+), 24 deletions(-) create mode 100644 tests/sqllogictests/suites/mode/cluster/update.sql diff --git a/src/query/service/src/interpreters/interpreter_explain.rs b/src/query/service/src/interpreters/interpreter_explain.rs index 8d83d1cc34cf9..1e2662543f210 100644 --- a/src/query/service/src/interpreters/interpreter_explain.rs +++ b/src/query/service/src/interpreters/interpreter_explain.rs @@ -26,6 +26,7 @@ use databend_common_profile::QueryProfileManager; use databend_common_profile::SharedProcessorProfiles; use databend_common_sql::executor::ProfileHelper; use databend_common_sql::optimizer::ColumnSet; +use databend_common_sql::plans::UpdatePlan; use databend_common_sql::BindContext; use databend_common_sql::InsertInputSource; use databend_common_sql::MetadataRef; @@ -34,6 +35,7 @@ use databend_common_storages_result_cache::ResultCacheReader; use databend_common_users::UserApiProvider; use super::InterpreterFactory; +use super::UpdateInterpreter; use crate::interpreters::Interpreter; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; @@ -185,6 +187,7 @@ impl Interpreter for ExplainInterpreter { ) .await? } + Plan::Update(update) => self.explain_update_fragments(update.as_ref()).await?, _ => { return Err(ErrorCode::Unimplemented("Unsupported EXPLAIN statement")); } @@ -322,6 +325,28 @@ impl ExplainInterpreter { Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])]) } + #[async_backtrace::framed] + async fn explain_update_fragments(&self, update: &UpdatePlan) -> Result> { + let interpreter = UpdateInterpreter::try_create(self.ctx.clone(), update.clone())?; + let display_string = if let Some(plan) = interpreter.get_physical_plan().await? { + let root_fragment = Fragmenter::try_create(self.ctx.clone())?.build_fragment(&plan)?; + + let mut fragments_actions = QueryFragmentsActions::create(self.ctx.clone(), false); + root_fragment.get_actions(self.ctx.clone(), &mut fragments_actions)?; + + let ident = fragments_actions.display_indent(&update.metadata); + ident.to_string() + } else { + "Nothing to update".to_string() + }; + let line_split_result = display_string + .lines() + .map(|s| s.as_bytes().to_vec()) + .collect::>(); + let formatted_plan = StringType::from_data(line_split_result); + Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])]) + } + #[async_backtrace::framed] async fn explain_analyze( &self, diff --git a/src/query/service/src/interpreters/interpreter_update.rs b/src/query/service/src/interpreters/interpreter_update.rs index a1c42ccfd2bf6..8aa8539591666 100644 --- a/src/query/service/src/interpreters/interpreter_update.rs +++ b/src/query/service/src/interpreters/interpreter_update.rs @@ -91,7 +91,6 @@ impl Interpreter for UpdateInterpreter { let catalog_name = self.plan.catalog.as_str(); let catalog = self.ctx.get_catalog(catalog_name).await?; - let catalog_info = catalog.info(); let db_name = self.plan.database.as_str(); let tbl_name = self.plan.table.as_str(); @@ -103,9 +102,45 @@ impl Interpreter for UpdateInterpreter { let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?; let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; + // build physical plan. + let physical_plan = self.get_physical_plan().await?; + + // build pipeline. + let mut build_res = PipelineBuildResult::create(); + if let Some(physical_plan) = physical_plan { + build_res = + build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan, false) + .await?; + // generate virtual columns if `enable_refresh_virtual_column_after_write` on. + { + let refresh_desc = RefreshDesc { + catalog: catalog_name.to_string(), + database: db_name.to_string(), + table: tbl_name.to_string(), + }; + + hook_refresh(self.ctx.clone(), &mut build_res.main_pipeline, refresh_desc).await; + } + } + + build_res.main_pipeline.add_lock_guard(lock_guard); + Ok(build_res) + } +} + +impl UpdateInterpreter { + pub async fn get_physical_plan(&self) -> Result> { + let catalog_name = self.plan.catalog.as_str(); + let catalog = self.ctx.get_catalog(catalog_name).await?; + let catalog_info = catalog.info(); + + let db_name = self.plan.database.as_str(); + let tbl_name = self.plan.table.as_str(); + let tbl = catalog + .get_table(self.ctx.get_tenant().as_str(), db_name, tbl_name) + .await?; // refresh table. let tbl = tbl.refresh(self.ctx.as_ref()).await?; - // check mutability tbl.check_mutable()?; @@ -203,7 +238,6 @@ impl Interpreter for UpdateInterpreter { )) })?; - let mut build_res = PipelineBuildResult::create(); let query_row_id_col = !self.plan.subquery_desc.is_empty(); if let Some(snapshot) = fuse_table .fast_update( @@ -239,29 +273,10 @@ impl Interpreter for UpdateInterpreter { is_distributed, self.ctx.clone(), )?; - - build_res = - build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan, false) - .await?; - - // generate virtual columns if `enable_refresh_virtual_column_after_write` on. - { - let refresh_desc = RefreshDesc { - catalog: catalog_name.to_string(), - database: db_name.to_string(), - table: tbl_name.to_string(), - }; - - hook_refresh(self.ctx.clone(), &mut build_res.main_pipeline, refresh_desc).await; - } + return Ok(Some(physical_plan)); } - - build_res.main_pipeline.add_lock_guard(lock_guard); - Ok(build_res) + Ok(None) } -} - -impl UpdateInterpreter { #[allow(clippy::too_many_arguments)] pub fn build_physical_plan( filters: Option, diff --git a/tests/sqllogictests/suites/mode/cluster/update.sql b/tests/sqllogictests/suites/mode/cluster/update.sql new file mode 100644 index 0000000000000..e7419927b707d --- /dev/null +++ b/tests/sqllogictests/suites/mode/cluster/update.sql @@ -0,0 +1,54 @@ +statement ok +DROP DATABASE IF EXISTS db1 + +statement ok +CREATE DATABASE db1 + +statement ok +USE db1 + +statement ok +CREATE TABLE IF NOT EXISTS t1(a Int, b Date) + +statement ok +INSERT INTO t1 VALUES(1, '2022-12-30') + +statement ok +INSERT INTO t1 VALUES(2, '2023-01-01') + +query IT +SELECT * FROM t1 ORDER BY b +---- +1 2022-12-30 +2 2023-01-01 + +query T +explain fragments UPDATE t1 SET a = 3 WHERE b > '2022-12-31' +---- +Fragment 0: + DataExchange: Merge + ExchangeSink + ├── output columns: [] + ├── destination fragment: [1] + └── UpdateSource +(empty) +(empty) +Fragment 1: + CommitSink + └── ExchangeSource + ├── output columns: [] + └── source fragment: [0] +(empty) + +query IT +SELECT * FROM t1 ORDER BY b +---- +1 2022-12-30 +2 2023-01-01 + +query T +explain fragments UPDATE t1 SET a = 3 WHERE false +---- +Nothing to update + +