Skip to content

Commit

Permalink
feat: support explain fragments for update (databendlabs#14227)
Browse files Browse the repository at this point in the history
* feat: support explain fragments for update

* fix

* fix

* fix

* fix
  • Loading branch information
SkyFan2002 authored Jan 5, 2024
1 parent bac11aa commit 3759207
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 24 deletions.
25 changes: 25 additions & 0 deletions src/query/service/src/interpreters/interpreter_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use databend_common_profile::QueryProfileManager;
use databend_common_profile::SharedProcessorProfiles;
use databend_common_sql::executor::ProfileHelper;
use databend_common_sql::optimizer::ColumnSet;
use databend_common_sql::plans::UpdatePlan;
use databend_common_sql::BindContext;
use databend_common_sql::InsertInputSource;
use databend_common_sql::MetadataRef;
Expand All @@ -34,6 +35,7 @@ use databend_common_storages_result_cache::ResultCacheReader;
use databend_common_users::UserApiProvider;

use super::InterpreterFactory;
use super::UpdateInterpreter;
use crate::interpreters::Interpreter;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
Expand Down Expand Up @@ -185,6 +187,7 @@ impl Interpreter for ExplainInterpreter {
)
.await?
}
Plan::Update(update) => self.explain_update_fragments(update.as_ref()).await?,
_ => {
return Err(ErrorCode::Unimplemented("Unsupported EXPLAIN statement"));
}
Expand Down Expand Up @@ -322,6 +325,28 @@ impl ExplainInterpreter {
Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])])
}

#[async_backtrace::framed]
async fn explain_update_fragments(&self, update: &UpdatePlan) -> Result<Vec<DataBlock>> {
let interpreter = UpdateInterpreter::try_create(self.ctx.clone(), update.clone())?;
let display_string = if let Some(plan) = interpreter.get_physical_plan().await? {
let root_fragment = Fragmenter::try_create(self.ctx.clone())?.build_fragment(&plan)?;

let mut fragments_actions = QueryFragmentsActions::create(self.ctx.clone(), false);
root_fragment.get_actions(self.ctx.clone(), &mut fragments_actions)?;

let ident = fragments_actions.display_indent(&update.metadata);
ident.to_string()
} else {
"Nothing to update".to_string()
};
let line_split_result = display_string
.lines()
.map(|s| s.as_bytes().to_vec())
.collect::<Vec<_>>();
let formatted_plan = StringType::from_data(line_split_result);
Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])])
}

#[async_backtrace::framed]
async fn explain_analyze(
&self,
Expand Down
63 changes: 39 additions & 24 deletions src/query/service/src/interpreters/interpreter_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ impl Interpreter for UpdateInterpreter {

let catalog_name = self.plan.catalog.as_str();
let catalog = self.ctx.get_catalog(catalog_name).await?;
let catalog_info = catalog.info();

let db_name = self.plan.database.as_str();
let tbl_name = self.plan.table.as_str();
Expand All @@ -103,9 +102,45 @@ impl Interpreter for UpdateInterpreter {
let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?;
let lock_guard = table_lock.try_lock(self.ctx.clone()).await?;

// build physical plan.
let physical_plan = self.get_physical_plan().await?;

// build pipeline.
let mut build_res = PipelineBuildResult::create();
if let Some(physical_plan) = physical_plan {
build_res =
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan, false)
.await?;
// generate virtual columns if `enable_refresh_virtual_column_after_write` on.
{
let refresh_desc = RefreshDesc {
catalog: catalog_name.to_string(),
database: db_name.to_string(),
table: tbl_name.to_string(),
};

hook_refresh(self.ctx.clone(), &mut build_res.main_pipeline, refresh_desc).await;
}
}

build_res.main_pipeline.add_lock_guard(lock_guard);
Ok(build_res)
}
}

impl UpdateInterpreter {
pub async fn get_physical_plan(&self) -> Result<Option<PhysicalPlan>> {
let catalog_name = self.plan.catalog.as_str();
let catalog = self.ctx.get_catalog(catalog_name).await?;
let catalog_info = catalog.info();

let db_name = self.plan.database.as_str();
let tbl_name = self.plan.table.as_str();
let tbl = catalog
.get_table(self.ctx.get_tenant().as_str(), db_name, tbl_name)
.await?;
// refresh table.
let tbl = tbl.refresh(self.ctx.as_ref()).await?;

// check mutability
tbl.check_mutable()?;

Expand Down Expand Up @@ -203,7 +238,6 @@ impl Interpreter for UpdateInterpreter {
))
})?;

let mut build_res = PipelineBuildResult::create();
let query_row_id_col = !self.plan.subquery_desc.is_empty();
if let Some(snapshot) = fuse_table
.fast_update(
Expand Down Expand Up @@ -239,29 +273,10 @@ impl Interpreter for UpdateInterpreter {
is_distributed,
self.ctx.clone(),
)?;

build_res =
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan, false)
.await?;

// generate virtual columns if `enable_refresh_virtual_column_after_write` on.
{
let refresh_desc = RefreshDesc {
catalog: catalog_name.to_string(),
database: db_name.to_string(),
table: tbl_name.to_string(),
};

hook_refresh(self.ctx.clone(), &mut build_res.main_pipeline, refresh_desc).await;
}
return Ok(Some(physical_plan));
}

build_res.main_pipeline.add_lock_guard(lock_guard);
Ok(build_res)
Ok(None)
}
}

impl UpdateInterpreter {
#[allow(clippy::too_many_arguments)]
pub fn build_physical_plan(
filters: Option<Filters>,
Expand Down
54 changes: 54 additions & 0 deletions tests/sqllogictests/suites/mode/cluster/update.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
statement ok
DROP DATABASE IF EXISTS db1

statement ok
CREATE DATABASE db1

statement ok
USE db1

statement ok
CREATE TABLE IF NOT EXISTS t1(a Int, b Date)

statement ok
INSERT INTO t1 VALUES(1, '2022-12-30')

statement ok
INSERT INTO t1 VALUES(2, '2023-01-01')

query IT
SELECT * FROM t1 ORDER BY b
----
1 2022-12-30
2 2023-01-01

query T
explain fragments UPDATE t1 SET a = 3 WHERE b > '2022-12-31'
----
Fragment 0:
DataExchange: Merge
ExchangeSink
├── output columns: []
├── destination fragment: [1]
└── UpdateSource
(empty)
(empty)
Fragment 1:
CommitSink
└── ExchangeSource
├── output columns: []
└── source fragment: [0]
(empty)

query IT
SELECT * FROM t1 ORDER BY b
----
1 2022-12-30
2 2023-01-01

query T
explain fragments UPDATE t1 SET a = 3 WHERE false
----
Nothing to update


0 comments on commit 3759207

Please sign in to comment.