diff --git a/Cargo.lock b/Cargo.lock index b258acb4f902..ac72a1219a47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2885,7 +2885,6 @@ dependencies = [ "storages-common-cache", "storages-common-cache-manager", "storages-common-index", - "storages-common-locks", "storages-common-pruner", "storages-common-table-meta", "streaming-decompression", @@ -4013,6 +4012,7 @@ dependencies = [ "async-stream", "async-trait-fn", "background-service", + "backoff", "base64 0.21.0", "bumpalo", "byte-unit", @@ -4127,7 +4127,6 @@ dependencies = [ "storages-common-cache", "storages-common-cache-manager", "storages-common-index", - "storages-common-locks", "storages-common-table-meta", "stream-handler", "strength_reduce", @@ -11844,29 +11843,6 @@ dependencies = [ "xorfilter-rs", ] -[[package]] -name = "storages-common-locks" -version = "0.1.0" -dependencies = [ - "async-backtrace", - "async-trait-fn", - "backoff", - "common-base", - "common-catalog", - "common-exception", - "common-meta-app", - "common-meta-kvapi", - "common-meta-types", - "common-metrics", - "common-pipeline-core", - "common-users", - "futures", - "futures-util", - "log", - "parking_lot 0.12.1", - "rand 0.8.5", -] - [[package]] name = "storages-common-pruner" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 7d4a3f277002..1e978899cc6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,7 +53,6 @@ members = [ "src/query/storages/common/cache", "src/query/storages/common/cache_manager", "src/query/storages/common/index", - "src/query/storages/common/locks", "src/query/storages/common/pruner", "src/query/storages/common/table_meta", "src/query/storages/delta", diff --git a/src/common/metrics/src/lib.rs b/src/common/metrics/src/lib.rs index dd16bc4abda4..06271d2f8f10 100644 --- a/src/common/metrics/src/lib.rs +++ b/src/common/metrics/src/lib.rs @@ -53,6 +53,7 @@ pub use crate::metrics::cluster; /// Metrics. pub use crate::metrics::http; pub use crate::metrics::interpreter; +pub use crate::metrics::lock; pub use crate::metrics::mysql; pub use crate::metrics::openai; pub use crate::metrics::session; diff --git a/src/query/storages/common/locks/src/lock_metrics.rs b/src/common/metrics/src/metrics/lock.rs similarity index 80% rename from src/query/storages/common/locks/src/lock_metrics.rs rename to src/common/metrics/src/metrics/lock.rs index 9934115d170e..f92947833fcb 100644 --- a/src/query/storages/common/locks/src/lock_metrics.rs +++ b/src/common/metrics/src/metrics/lock.rs @@ -14,12 +14,11 @@ use std::sync::LazyLock; -use common_meta_app::schema::LockType; -use common_metrics::register_counter; -use common_metrics::register_counter_family; -use common_metrics::Counter; -use common_metrics::Family; -use common_metrics::VecLabels; +use crate::register_counter; +use crate::register_counter_family; +use crate::Counter; +use crate::Family; +use crate::VecLabels; const METRIC_CREATED_LOCK_NUMS: &str = "created_lock_nums"; const METRIC_ACQUIRED_LOCK_NUMS: &str = "acquired_lock_nums"; @@ -38,17 +37,17 @@ static SHUTDOWN_LOCK_HOLDER_NUMS: LazyLock = const LABEL_TYPE: &str = "type"; const LABEL_TABLE_ID: &str = "table_id"; -pub fn record_created_lock_nums(lock_type: LockType, table_id: u64, num: u64) { +pub fn record_created_lock_nums(lock_type: String, table_id: u64, num: u64) { let labels = &vec![ - (LABEL_TYPE, lock_type.to_string()), + (LABEL_TYPE, lock_type), (LABEL_TABLE_ID, table_id.to_string()), ]; CREATED_LOCK_NUMS.get_or_create(labels).inc_by(num); } -pub fn record_acquired_lock_nums(lock_type: LockType, table_id: u64, num: u64) { +pub fn record_acquired_lock_nums(lock_type: String, table_id: u64, num: u64) { let labels = &vec![ - (LABEL_TYPE, lock_type.to_string()), + (LABEL_TYPE, lock_type), (LABEL_TABLE_ID, table_id.to_string()), ]; ACQUIRED_LOCK_NUMS.get_or_create(labels).inc_by(num); diff --git a/src/common/metrics/src/metrics/mod.rs b/src/common/metrics/src/metrics/mod.rs index 94548aee5ae0..e2f594f10e12 100644 --- a/src/common/metrics/src/metrics/mod.rs +++ b/src/common/metrics/src/metrics/mod.rs @@ -16,6 +16,7 @@ pub mod cache; pub mod cluster; pub mod http; pub mod interpreter; +pub mod lock; pub mod mysql; pub mod openai; pub mod session; diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index ceddc67a3cff..f33cfaa289ec 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -38,6 +38,7 @@ use common_storage::StorageMetrics; use storages_common_table_meta::meta::SnapshotId; use storages_common_table_meta::meta::TableSnapshot; +use crate::lock::Lock; use crate::plan::DataSourceInfo; use crate::plan::DataSourcePlan; use crate::plan::PartStatistics; @@ -304,9 +305,10 @@ pub trait Table: Sync + Send { async fn compact_segments( &self, ctx: Arc, + lock: Arc, limit: Option, ) -> Result<()> { - let (_, _) = (ctx, limit); + let (_, _, _) = (ctx, lock, limit); Err(ErrorCode::Unimplemented(format!( "table {}, of engine type {}, does not support compact segments", diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 8f8caae68160..e05727783405 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -90,7 +90,6 @@ storages-common-blocks = { path = "../storages/common/blocks" } storages-common-cache = { path = "../storages/common/cache" } storages-common-cache-manager = { path = "../storages/common/cache_manager" } storages-common-index = { path = "../storages/common/index" } -storages-common-locks = { path = "../storages/common/locks" } storages-common-table-meta = { path = "../storages/common/table_meta" } stream-handler = { path = "../ee_features/stream_handler" } vacuum-handler = { path = "../ee_features/vacuum_handler" } @@ -107,6 +106,7 @@ async-backtrace = { workspace = true } async-channel = "1.7.1" async-stream = "0.3.3" async-trait = { workspace = true } +backoff = { version = "0.4.0", features = ["futures", "tokio"] } base64 = "0.21.0" bumpalo = { workspace = true } byte-unit = "4.0.19" diff --git a/src/query/service/src/global_services.rs b/src/query/service/src/global_services.rs index 7ea0d362169c..b4f504b24d7f 100644 --- a/src/query/service/src/global_services.rs +++ b/src/query/service/src/global_services.rs @@ -35,12 +35,12 @@ use common_tracing::GlobalLogger; use common_users::RoleCacheManager; use common_users::UserApiProvider; use storages_common_cache_manager::CacheManager; -use storages_common_locks::LockManager; use crate::api::DataExchangeManager; use crate::auth::AuthMgr; use crate::catalogs::DatabaseCatalog; use crate::clusters::ClusterDiscovery; +use crate::locks::LockManager; use crate::servers::http::v1::HttpQueryManager; use crate::sessions::SessionManager; diff --git a/src/query/service/src/interpreters/interpreter_delete.rs b/src/query/service/src/interpreters/interpreter_delete.rs index 17fa5ce66d75..69f17108955b 100644 --- a/src/query/service/src/interpreters/interpreter_delete.rs +++ b/src/query/service/src/interpreters/interpreter_delete.rs @@ -16,7 +16,6 @@ use std::collections::HashSet; use std::collections::VecDeque; use std::sync::Arc; -use common_catalog::lock::Lock; use common_catalog::plan::Filters; use common_catalog::plan::Partitions; use common_catalog::table::TableExt; @@ -58,12 +57,12 @@ use common_storages_factory::Table; use common_storages_fuse::FuseTable; use futures_util::TryStreamExt; use log::debug; -use storages_common_locks::LockManager; use storages_common_table_meta::meta::TableSnapshot; use crate::interpreters::common::create_push_down_filters; use crate::interpreters::Interpreter; use crate::interpreters::SelectInterpreter; +use crate::locks::LockManager; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelinePullingExecutor; use crate::pipelines::PipelineBuildResult; @@ -100,26 +99,27 @@ impl Interpreter for DeleteInterpreter { debug!("ctx.id" = self.ctx.get_id().as_str(); "delete_interpreter_execute"); let is_distributed = !self.ctx.get_cluster().is_empty(); - let catalog_name = self.plan.catalog_name.as_str(); + let catalog_name = self.plan.catalog_name.as_str(); let catalog = self.ctx.get_catalog(catalog_name).await?; let catalog_info = catalog.info(); let db_name = self.plan.database_name.as_str(); let tbl_name = self.plan.table_name.as_str(); - - // refresh table. let tbl = catalog .get_table(self.ctx.get_tenant().as_str(), db_name, tbl_name) .await?; - // check mutability - tbl.check_mutable()?; - // Add table lock. let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?; let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; + // refresh table. + let tbl = tbl.refresh(self.ctx.as_ref()).await?; + + // check mutability + tbl.check_mutable()?; + let selection = if !self.plan.subquery_desc.is_empty() { let support_row_id = tbl.support_row_id_column(); if !support_row_id { diff --git a/src/query/service/src/interpreters/interpreter_table_modify_column.rs b/src/query/service/src/interpreters/interpreter_table_modify_column.rs index 539dce496147..181aaff100dc 100644 --- a/src/query/service/src/interpreters/interpreter_table_modify_column.rs +++ b/src/query/service/src/interpreters/interpreter_table_modify_column.rs @@ -15,7 +15,6 @@ use std::sync::Arc; use common_catalog::catalog::Catalog; -use common_catalog::lock::Lock; use common_catalog::table::Table; use common_catalog::table::TableExt; use common_exception::ErrorCode; @@ -49,11 +48,11 @@ use common_storages_view::view_table::VIEW_ENGINE; use common_users::UserApiProvider; use data_mask_feature::get_datamask_handler; use storages_common_index::BloomIndex; -use storages_common_locks::LockManager; use storages_common_table_meta::table::OPT_KEY_BLOOM_INDEX_COLUMNS; use super::common::check_referenced_computed_columns; use crate::interpreters::Interpreter; +use crate::locks::LockManager; use crate::pipelines::PipelineBuildResult; use crate::schedulers::build_query_pipeline_without_render_result_set; use crate::sessions::QueryContext; @@ -190,6 +189,12 @@ impl ModifyTableColumnInterpreter { table: &Arc, field_and_comments: &[(TableField, String)], ) -> Result { + // Add table lock. + let table_lock = LockManager::create_table_lock(table.get_table_info().clone())?; + let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; + // refresh table. + let table = table.refresh(self.ctx.as_ref()).await?; + let schema = table.schema().as_ref().clone(); let table_info = table.get_table_info(); let mut new_schema = schema.clone(); @@ -271,10 +276,6 @@ impl ModifyTableColumnInterpreter { return Ok(PipelineBuildResult::create()); } - // Add table lock. - let table_lock = LockManager::create_table_lock(table_info.clone())?; - let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; - // 1. construct sql for selecting data from old table let mut sql = "select".to_string(); schema diff --git a/src/query/service/src/interpreters/interpreter_table_optimize.rs b/src/query/service/src/interpreters/interpreter_table_optimize.rs index c0563acb269d..de6367cd32e5 100644 --- a/src/query/service/src/interpreters/interpreter_table_optimize.rs +++ b/src/query/service/src/interpreters/interpreter_table_optimize.rs @@ -37,12 +37,12 @@ use common_sql::plans::OptimizeTableAction; use common_sql::plans::OptimizeTablePlan; use common_storages_factory::NavigationPoint; use common_storages_fuse::FuseTable; -use storages_common_locks::LockManager; use storages_common_table_meta::meta::TableSnapshot; use crate::interpreters::interpreter_table_recluster::build_recluster_physical_plan; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterClusteringHistory; +use crate::locks::LockManager; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::PipelineBuildResult; @@ -162,7 +162,7 @@ impl OptimizeTableInterpreter { if matches!(target, CompactTarget::Segments) { table - .compact_segments(self.ctx.clone(), self.plan.limit) + .compact_segments(self.ctx.clone(), table_lock, self.plan.limit) .await?; return Ok(PipelineBuildResult::create()); } diff --git a/src/query/service/src/interpreters/interpreter_table_recluster.rs b/src/query/service/src/interpreters/interpreter_table_recluster.rs index 34865f95f108..6baba80a29b7 100644 --- a/src/query/service/src/interpreters/interpreter_table_recluster.rs +++ b/src/query/service/src/interpreters/interpreter_table_recluster.rs @@ -36,13 +36,13 @@ use common_storages_fuse::FuseTable; use log::error; use log::info; use log::warn; -use storages_common_locks::LockManager; use storages_common_table_meta::meta::BlockMeta; use storages_common_table_meta::meta::Statistics; use storages_common_table_meta::meta::TableSnapshot; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterClusteringHistory; +use crate::locks::LockManager; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::PipelineBuildResult; diff --git a/src/query/service/src/interpreters/interpreter_update.rs b/src/query/service/src/interpreters/interpreter_update.rs index dffaeb31eaae..6078965b7f2b 100644 --- a/src/query/service/src/interpreters/interpreter_update.rs +++ b/src/query/service/src/interpreters/interpreter_update.rs @@ -17,7 +17,6 @@ use std::collections::HashSet; use std::collections::VecDeque; use std::sync::Arc; -use common_catalog::lock::Lock; use common_catalog::plan::Filters; use common_catalog::plan::Partitions; use common_catalog::table::TableExt; @@ -42,7 +41,6 @@ use common_sql::Visibility; use common_storages_factory::Table; use common_storages_fuse::FuseTable; use log::debug; -use storages_common_locks::LockManager; use storages_common_table_meta::meta::TableSnapshot; use crate::interpreters::common::check_deduplicate_label; @@ -52,6 +50,7 @@ use crate::interpreters::common::RefreshAggIndexDesc; use crate::interpreters::interpreter_delete::replace_subquery; use crate::interpreters::interpreter_delete::subquery_filter; use crate::interpreters::Interpreter; +use crate::locks::LockManager; use crate::pipelines::PipelineBuildResult; use crate::schedulers::build_query_pipeline_without_render_result_set; use crate::sessions::QueryContext; @@ -88,22 +87,25 @@ impl Interpreter for UpdateInterpreter { } let catalog_name = self.plan.catalog.as_str(); - let db_name = self.plan.database.as_str(); - let tbl_name = self.plan.table.as_str(); let catalog = self.ctx.get_catalog(catalog_name).await?; let catalog_info = catalog.info(); - // refresh table. + + let db_name = self.plan.database.as_str(); + let tbl_name = self.plan.table.as_str(); let tbl = catalog .get_table(self.ctx.get_tenant().as_str(), db_name, tbl_name) .await?; - // check mutability - tbl.check_mutable()?; - // Add table lock. let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?; let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; + // refresh table. + let tbl = tbl.refresh(self.ctx.as_ref()).await?; + + // check mutability + tbl.check_mutable()?; + let selection = if !self.plan.subquery_desc.is_empty() { let support_row_id = tbl.support_row_id_column(); if !support_row_id { diff --git a/src/query/service/src/lib.rs b/src/query/service/src/lib.rs index b073f8bb15b6..be9f12e1560d 100644 --- a/src/query/service/src/lib.rs +++ b/src/query/service/src/lib.rs @@ -47,6 +47,7 @@ pub mod clusters; pub mod databases; pub mod interpreters; pub mod local; +pub mod locks; pub mod metrics; pub mod pipelines; pub mod schedulers; diff --git a/src/query/storages/common/locks/src/lock_holder.rs b/src/query/service/src/locks/lock_holder.rs similarity index 90% rename from src/query/storages/common/locks/src/lock_holder.rs rename to src/query/service/src/locks/lock_holder.rs index b0a8e637bb07..93432f67da15 100644 --- a/src/query/storages/common/locks/src/lock_holder.rs +++ b/src/query/service/src/locks/lock_holder.rs @@ -30,12 +30,13 @@ use common_exception::ErrorCode; use common_exception::Result; use common_meta_app::schema::DeleteLockRevReq; use common_meta_app::schema::ExtendLockRevReq; +use common_storages_fuse::operations::set_backoff; use futures::future::select; use futures::future::Either; use rand::thread_rng; use rand::Rng; -use crate::set_backoff; +use crate::sessions::SessionManager; #[derive(Default)] pub struct LockHolder { @@ -63,7 +64,7 @@ impl LockHolder { ) .await?; - GlobalIORuntime::instance().spawn(query_id, { + GlobalIORuntime::instance().spawn(query_id.clone(), { let self_clone = self.clone(); async move { let mut notified = Box::pin(self_clone.shutdown_notify.notified()); @@ -80,13 +81,22 @@ impl LockHolder { } Either::Right((_, new_notified)) => { notified = new_notified; - self_clone + if let Err(e) = self_clone .try_extend_lock( catalog.clone(), extend_table_lock_req.clone(), Some(Duration::from_millis(expire_secs * 1000 - mills)), ) - .await?; + .await + { + // Force kill the query if extend lock failure. + if let Some(session) = + SessionManager::instance().get_session_by_id(&query_id) + { + session.force_kill_query(e.clone()); + } + return Err(e); + } } } } @@ -105,7 +115,7 @@ impl LockHolder { pub fn shutdown(&self) { self.shutdown_flag.store(true, Ordering::SeqCst); - self.shutdown_notify.notify_waiters(); + self.shutdown_notify.notify_one(); } } diff --git a/src/query/storages/common/locks/src/lock_manager.rs b/src/query/service/src/locks/lock_manager.rs similarity index 92% rename from src/query/storages/common/locks/src/lock_manager.rs rename to src/query/service/src/locks/lock_manager.rs index 1ba571a023dd..39a384e99f6c 100644 --- a/src/query/storages/common/locks/src/lock_manager.rs +++ b/src/query/service/src/locks/lock_manager.rs @@ -30,18 +30,18 @@ use common_exception::Result; use common_meta_app::schema::TableInfo; use common_meta_types::protobuf::watch_request::FilterType; use common_meta_types::protobuf::WatchRequest; +use common_metrics::lock::metrics_inc_shutdown_lock_holder_nums; +use common_metrics::lock::metrics_inc_start_lock_holder_nums; +use common_metrics::lock::record_acquired_lock_nums; +use common_metrics::lock::record_created_lock_nums; use common_pipeline_core::LockGuard; use common_pipeline_core::UnlockApi; use common_users::UserApiProvider; use futures_util::StreamExt; use parking_lot::RwLock; -use crate::metrics_inc_shutdown_lock_holder_nums; -use crate::metrics_inc_start_lock_holder_nums; -use crate::record_acquired_lock_nums; -use crate::record_created_lock_nums; -use crate::table_lock::TableLock; -use crate::LockHolder; +use crate::locks::lock_holder::LockHolder; +use crate::locks::table_lock::TableLock; pub struct LockManager { active_locks: Arc>>>, @@ -72,7 +72,7 @@ impl LockManager { GlobalInstance::get() } - pub fn create_table_lock(table_info: TableInfo) -> Result { + pub fn create_table_lock(table_info: TableInfo) -> Result> { let lock_mgr = LockManager::instance(); Ok(TableLock::create(lock_mgr, table_info)) } @@ -102,7 +102,7 @@ impl LockManager { .await?; let revision = res.revision; // metrics. - record_created_lock_nums(lock.lock_type(), lock.get_table_id(), 1); + record_created_lock_nums(lock.lock_type().to_string(), lock.get_table_id(), 1); let lock_holder = Arc::new(LockHolder::default()); lock_holder @@ -132,7 +132,7 @@ impl LockManager { let extend_table_lock_req = lock.gen_extend_lock_req(revision, expire_secs, true); catalog.extend_lock_revision(extend_table_lock_req).await?; // metrics. - record_acquired_lock_nums(lock.lock_type(), lock.get_table_id(), 1); + record_acquired_lock_nums(lock.lock_type().to_string(), lock.get_table_id(), 1); break; } diff --git a/src/query/storages/common/locks/src/lib.rs b/src/query/service/src/locks/mod.rs similarity index 79% rename from src/query/storages/common/locks/src/lib.rs rename to src/query/service/src/locks/mod.rs index 7fcb6acb74d5..81718aac0f51 100644 --- a/src/query/storages/common/locks/src/lib.rs +++ b/src/query/service/src/locks/mod.rs @@ -12,16 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(lazy_cell)] - mod lock_holder; mod lock_manager; -mod lock_metrics; mod table_lock; -mod utils; -pub use lock_holder::LockHolder; pub use lock_manager::LockManager; -pub use lock_metrics::*; -pub use table_lock::TableLock; -pub use utils::set_backoff; diff --git a/src/query/storages/common/locks/src/table_lock/mod.rs b/src/query/service/src/locks/table_lock/mod.rs similarity index 95% rename from src/query/storages/common/locks/src/table_lock/mod.rs rename to src/query/service/src/locks/table_lock/mod.rs index 83372c0a2449..4434f8bbea47 100644 --- a/src/query/storages/common/locks/src/table_lock/mod.rs +++ b/src/query/service/src/locks/table_lock/mod.rs @@ -24,7 +24,7 @@ use common_meta_app::schema::TableLockKey; use common_meta_kvapi::kvapi::Key; use common_pipeline_core::LockGuard; -use crate::LockManager; +use crate::locks::LockManager; pub struct TableLock { lock_mgr: Arc, @@ -32,11 +32,11 @@ pub struct TableLock { } impl TableLock { - pub fn create(lock_mgr: Arc, table_info: TableInfo) -> Self { - TableLock { + pub fn create(lock_mgr: Arc, table_info: TableInfo) -> Arc { + Arc::new(TableLock { lock_mgr, table_info, - } + }) } } diff --git a/src/query/service/src/pipelines/builders/builder_commit.rs b/src/query/service/src/pipelines/builders/builder_commit.rs index c91c91c182d5..8795c400dfb1 100644 --- a/src/query/service/src/pipelines/builders/builder_commit.rs +++ b/src/query/service/src/pipelines/builders/builder_commit.rs @@ -24,6 +24,7 @@ use common_storages_fuse::operations::TableMutationAggregator; use common_storages_fuse::operations::TransformMergeCommitMeta; use common_storages_fuse::FuseTable; +use crate::locks::LockManager; use crate::pipelines::PipelineBuilder; impl PipelineBuilder { @@ -65,6 +66,11 @@ impl PipelineBuilder { } let snapshot_gen = MutationGenerator::new(plan.snapshot.clone()); + let lock = if plan.need_lock { + Some(LockManager::create_table_lock(plan.table_info.clone())?) + } else { + None + }; self.main_pipeline.add_sink(|input| { CommitSink::try_create( table, @@ -74,7 +80,7 @@ impl PipelineBuilder { snapshot_gen.clone(), input, None, - plan.need_lock, + lock.clone(), None, ) }) diff --git a/src/query/service/src/pipelines/builders/builder_recluster.rs b/src/query/service/src/pipelines/builders/builder_recluster.rs index 936e3902013c..b2643c6bb3c4 100644 --- a/src/query/service/src/pipelines/builders/builder_recluster.rs +++ b/src/query/service/src/pipelines/builders/builder_recluster.rs @@ -39,7 +39,8 @@ use common_storages_fuse::operations::TransformSerializeBlock; use common_storages_fuse::FuseTable; use common_storages_fuse::TableContext; -use super::builder_sort::SortPipelineBuilder; +use crate::locks::LockManager; +use crate::pipelines::builders::SortPipelineBuilder; use crate::pipelines::processors::TransformAddStreamColumns; use crate::pipelines::PipelineBuilder; @@ -222,6 +223,7 @@ impl PipelineBuilder { })?; let snapshot_gen = MutationGenerator::new(recluster_sink.snapshot.clone()); + let lock = LockManager::create_table_lock(recluster_sink.table_info.clone())?; self.main_pipeline.add_sink(|input| { CommitSink::try_create( table, @@ -231,7 +233,7 @@ impl PipelineBuilder { snapshot_gen.clone(), input, None, - true, + Some(lock.clone()), None, ) }) diff --git a/src/query/service/src/pipelines/builders/mod.rs b/src/query/service/src/pipelines/builders/mod.rs index d661ab1456e0..0677001b9d45 100644 --- a/src/query/service/src/pipelines/builders/mod.rs +++ b/src/query/service/src/pipelines/builders/mod.rs @@ -39,3 +39,4 @@ mod builder_update; mod builder_window; pub use builder_replace_into::ValueSource; +pub use builder_sort::SortPipelineBuilder; diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs index 0dd82757c67b..a51fd68e4ab2 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs @@ -50,6 +50,7 @@ use common_storages_fuse::statistics::sort_by_cluster_stats; use common_storages_fuse::statistics::StatisticsAccumulator; use common_storages_fuse::FuseStorageFormat; use common_storages_fuse::FuseTable; +use databend_query::locks::LockManager; use databend_query::sessions::QueryContext; use databend_query::sessions::TableContext; use databend_query::test_kits::*; @@ -263,8 +264,10 @@ async fn build_mutator( limit, }; + let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?; let mut segment_mutator = SegmentCompactMutator::try_create( ctx.clone(), + table_lock, compact_params, tbl.meta_location_generator().clone(), tbl.get_operator(), diff --git a/src/query/storages/common/locks/Cargo.toml b/src/query/storages/common/locks/Cargo.toml deleted file mode 100644 index 446ba4a66bb4..000000000000 --- a/src/query/storages/common/locks/Cargo.toml +++ /dev/null @@ -1,31 +0,0 @@ -[package] -name = "storages-common-locks" -version = { workspace = true } -authors = { workspace = true } -license = { workspace = true } -publish = { workspace = true } -edition = { workspace = true } - -[lib] -doctest = false -test = false - -[dependencies] -common-base = { path = "../../../../common/base" } -common-catalog = { path = "../../../catalog" } -common-exception = { path = "../../../../common/exception" } -common-meta-app = { path = "../../../../meta/app" } -common-meta-kvapi = { path = "../../../../meta/kvapi" } -common-meta-types = { path = "../../../../meta/types" } -common-metrics = { path = "../../../../common/metrics" } -common-pipeline-core = { path = "../../../pipeline/core" } -common-users = { path = "../../../users" } - -async-backtrace = { workspace = true } -async-trait = { workspace = true } -backoff = { version = "0.4.0", features = ["futures", "tokio"] } -futures = { workspace = true } -futures-util = { workspace = true } -log = { workspace = true } -parking_lot = { workspace = true } -rand = { workspace = true } diff --git a/src/query/storages/common/locks/src/utils.rs b/src/query/storages/common/locks/src/utils.rs deleted file mode 100644 index 81865b8b0ed7..000000000000 --- a/src/query/storages/common/locks/src/utils.rs +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::time::Duration; - -use backoff::ExponentialBackoff; -use backoff::ExponentialBackoffBuilder; - -const OCC_DEFAULT_BACKOFF_INIT_DELAY_MS: Duration = Duration::from_millis(5); -const OCC_DEFAULT_BACKOFF_MAX_DELAY_MS: Duration = Duration::from_millis(20 * 1000); -const OCC_DEFAULT_BACKOFF_MAX_ELAPSED_MS: Duration = Duration::from_millis(120 * 1000); - -#[inline] -pub fn set_backoff( - init_retry_delay: Option, - max_retry_delay: Option, - max_retry_elapsed: Option, -) -> ExponentialBackoff { - // The initial retry delay in millisecond. By default, it is 5 ms. - let init_delay = init_retry_delay.unwrap_or(OCC_DEFAULT_BACKOFF_INIT_DELAY_MS); - - // The maximum back off delay in millisecond, once the retry interval reaches this value, it stops increasing. - // By default, it is 20 seconds. - let max_delay = max_retry_delay.unwrap_or(OCC_DEFAULT_BACKOFF_MAX_DELAY_MS); - - // The maximum elapsed time after the occ starts, beyond which there will be no more retries. - // By default, it is 2 minutes - let max_elapsed = max_retry_elapsed.unwrap_or(OCC_DEFAULT_BACKOFF_MAX_ELAPSED_MS); - - // TODO(xuanwo): move to backon instead. - // - // To simplify the settings, using fixed common values for randomization_factor and multiplier - ExponentialBackoffBuilder::new() - .with_initial_interval(init_delay) - .with_max_interval(max_delay) - .with_randomization_factor(0.5) - .with_multiplier(2.0) - .with_max_elapsed_time(Some(max_elapsed)) - .build() -} diff --git a/src/query/storages/fuse/Cargo.toml b/src/query/storages/fuse/Cargo.toml index 645b02859453..c787809fb652 100644 --- a/src/query/storages/fuse/Cargo.toml +++ b/src/query/storages/fuse/Cargo.toml @@ -36,7 +36,6 @@ storages-common-blocks = { path = "../common/blocks" } storages-common-cache = { path = "../common/cache" } storages-common-cache-manager = { path = "../common/cache_manager" } storages-common-index = { path = "../common/index" } -storages-common-locks = { path = "../common/locks" } storages-common-pruner = { path = "../common/pruner" } storages-common-table-meta = { path = "../common/table_meta" } diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 4cf38b63cb97..1fd2ff7b9107 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -19,6 +19,7 @@ use std::str::FromStr; use std::sync::Arc; use common_catalog::catalog::StorageDescription; +use common_catalog::lock::Lock; use common_catalog::plan::DataSourcePlan; use common_catalog::plan::PartStatistics; use common_catalog::plan::Partitions; @@ -776,9 +777,10 @@ impl Table for FuseTable { async fn compact_segments( &self, ctx: Arc, + lock: Arc, limit: Option, ) -> Result<()> { - self.do_compact_segments(ctx, limit).await + self.do_compact_segments(ctx, lock, limit).await } #[async_backtrace::framed] diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index 2c813746a3f6..22d6e42f935f 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -41,7 +41,6 @@ use log::warn; use opendal::Operator; use storages_common_cache::CacheAccessor; use storages_common_cache_manager::CachedObject; -use storages_common_locks::set_backoff; use storages_common_table_meta::meta::Location; use storages_common_table_meta::meta::SegmentInfo; use storages_common_table_meta::meta::SnapshotId; @@ -61,6 +60,7 @@ use crate::operations::common::CommitSink; use crate::operations::common::ConflictResolveContext; use crate::operations::common::TableMutationAggregator; use crate::operations::common::TransformSerializeSegment; +use crate::operations::set_backoff; use crate::statistics::merge_statistics; use crate::FuseTable; @@ -103,7 +103,7 @@ impl FuseTable { snapshot_gen.clone(), input, None, - false, + None, prev_snapshot_id, ) })?; diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index 74d0786f3581..71e7de27c15a 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -40,8 +40,6 @@ use log::error; use log::info; use log::warn; use opendal::Operator; -use storages_common_locks::set_backoff; -use storages_common_locks::LockManager; use storages_common_table_meta::meta::ClusterKey; use storages_common_table_meta::meta::SegmentInfo; use storages_common_table_meta::meta::SnapshotId; @@ -52,6 +50,7 @@ use crate::io::TableMetaLocationGenerator; use crate::operations::common::AbortOperation; use crate::operations::common::CommitMeta; use crate::operations::common::SnapshotGenerator; +use crate::operations::set_backoff; use crate::FuseTable; enum State { @@ -93,7 +92,7 @@ pub struct CommitSink { abort_operation: AbortOperation, lock_guard: Option, - need_lock: bool, + lock: Option>, start_time: Instant, prev_snapshot_id: Option, @@ -113,7 +112,7 @@ where F: SnapshotGenerator + Send + 'static snapshot_gen: F, input: Arc, max_retry_elapsed: Option, - need_lock: bool, + lock: Option>, prev_snapshot_id: Option, ) -> Result { Ok(ProcessorPtr::create(Box::new(CommitSink { @@ -131,7 +130,7 @@ where F: SnapshotGenerator + Send + 'static retries: 0, max_retry_elapsed, input, - need_lock, + lock, start_time: Instant::now(), prev_snapshot_id, change_tracking: table.change_tracking_enabled(), @@ -173,7 +172,7 @@ where F: SnapshotGenerator + Send + 'static self.snapshot_gen .set_conflict_resolve_context(meta.conflict_resolve_context); - if self.need_lock { + if self.lock.is_some() { self.state = State::TryLock; } else { self.state = State::FillDefault; @@ -302,23 +301,19 @@ where F: SnapshotGenerator + Send + 'static }; } } - State::TryLock => { - let table_lock = - LockManager::create_table_lock(self.table.get_table_info().clone())?; - match table_lock.try_lock(self.ctx.clone()).await { - Ok(guard) => { - self.lock_guard = guard; - self.state = State::FillDefault; - } - Err(e) => { - error!( - "commit mutation failed cause get lock failed, error: {:?}", - e - ); - self.state = State::AbortOperation; - } + State::TryLock => match self.lock.as_ref().unwrap().try_lock(self.ctx.clone()).await { + Ok(guard) => { + self.lock_guard = guard; + self.state = State::FillDefault; } - } + Err(e) => { + error!( + "commit mutation failed cause get lock failed, error: {:?}", + e + ); + self.state = State::AbortOperation; + } + }, State::TryCommit { data, snapshot, diff --git a/src/query/storages/fuse/src/operations/compact.rs b/src/query/storages/fuse/src/operations/compact.rs index 42970370119e..c2a0ee0556a3 100644 --- a/src/query/storages/fuse/src/operations/compact.rs +++ b/src/query/storages/fuse/src/operations/compact.rs @@ -16,6 +16,7 @@ use std::collections::HashSet; use std::sync::Arc; use common_base::runtime::Runtime; +use common_catalog::lock::Lock; use common_catalog::plan::Partitions; use common_catalog::plan::PartitionsShuffleKind; use common_catalog::plan::Projection; @@ -54,6 +55,7 @@ impl FuseTable { pub(crate) async fn do_compact_segments( &self, ctx: Arc, + lock: Arc, limit: Option, ) -> Result<()> { let compact_options = if let Some(v) = self.compact_options(limit).await? { @@ -64,6 +66,7 @@ impl FuseTable { let mut segment_mutator = SegmentCompactMutator::try_create( ctx.clone(), + lock, compact_options, self.meta_location_generator().clone(), self.operator.clone(), diff --git a/src/query/storages/fuse/src/operations/mod.rs b/src/query/storages/fuse/src/operations/mod.rs index dbee6b9a4585..2cb5d826fd14 100644 --- a/src/query/storages/fuse/src/operations/mod.rs +++ b/src/query/storages/fuse/src/operations/mod.rs @@ -45,3 +45,4 @@ pub use replace_into::*; pub use util::acquire_task_permit; pub use util::column_parquet_metas; pub use util::read_block; +pub use util::set_backoff; diff --git a/src/query/storages/fuse/src/operations/mutation/compact/segment_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/compact/segment_compact_mutator.rs index 30363ecd8be7..cf73574ba5ca 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact/segment_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact/segment_compact_mutator.rs @@ -21,7 +21,6 @@ use common_exception::Result; use log::info; use metrics::gauge; use opendal::Operator; -use storages_common_locks::LockManager; use storages_common_table_meta::meta::Location; use storages_common_table_meta::meta::SegmentInfo; use storages_common_table_meta::meta::Statistics; @@ -48,6 +47,7 @@ pub struct SegmentCompactionState { pub struct SegmentCompactMutator { ctx: Arc, + lock: Arc, compact_params: CompactOptions, data_accessor: Operator, location_generator: TableMetaLocationGenerator, @@ -58,6 +58,7 @@ pub struct SegmentCompactMutator { impl SegmentCompactMutator { pub fn try_create( ctx: Arc, + lock: Arc, compact_params: CompactOptions, location_generator: TableMetaLocationGenerator, operator: Operator, @@ -65,6 +66,7 @@ impl SegmentCompactMutator { ) -> Result { Ok(Self { ctx, + lock, compact_params, data_accessor: operator, location_generator, @@ -139,8 +141,7 @@ impl SegmentCompactMutator { let statistics = self.compact_params.base_snapshot.summary.clone(); let fuse_table = FuseTable::try_from_table(table.as_ref())?; - let table_lock = LockManager::create_table_lock(fuse_table.table_info.clone())?; - let _guard = table_lock.try_lock(self.ctx.clone()).await?; + let _guard = self.lock.try_lock(self.ctx.clone()).await?; fuse_table .commit_mutation( diff --git a/src/query/storages/fuse/src/operations/util.rs b/src/query/storages/fuse/src/operations/util.rs index 48f73154e3c0..3e97a99faf8d 100644 --- a/src/query/storages/fuse/src/operations/util.rs +++ b/src/query/storages/fuse/src/operations/util.rs @@ -14,7 +14,10 @@ use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; +use backoff::ExponentialBackoff; +use backoff::ExponentialBackoffBuilder; use common_arrow::parquet::metadata::ThriftFileMetaData; use common_base::base::tokio::sync::OwnedSemaphorePermit; use common_base::base::tokio::sync::Semaphore; @@ -32,6 +35,39 @@ use crate::io::BlockReader; use crate::io::ReadSettings; use crate::FuseStorageFormat; +const OCC_DEFAULT_BACKOFF_INIT_DELAY_MS: Duration = Duration::from_millis(5); +const OCC_DEFAULT_BACKOFF_MAX_DELAY_MS: Duration = Duration::from_millis(20 * 1000); +const OCC_DEFAULT_BACKOFF_MAX_ELAPSED_MS: Duration = Duration::from_millis(120 * 1000); + +#[inline] +pub fn set_backoff( + init_retry_delay: Option, + max_retry_delay: Option, + max_retry_elapsed: Option, +) -> ExponentialBackoff { + // The initial retry delay in millisecond. By default, it is 5 ms. + let init_delay = init_retry_delay.unwrap_or(OCC_DEFAULT_BACKOFF_INIT_DELAY_MS); + + // The maximum back off delay in millisecond, once the retry interval reaches this value, it stops increasing. + // By default, it is 20 seconds. + let max_delay = max_retry_delay.unwrap_or(OCC_DEFAULT_BACKOFF_MAX_DELAY_MS); + + // The maximum elapsed time after the occ starts, beyond which there will be no more retries. + // By default, it is 2 minutes + let max_elapsed = max_retry_elapsed.unwrap_or(OCC_DEFAULT_BACKOFF_MAX_ELAPSED_MS); + + // TODO(xuanwo): move to backon instead. + // + // To simplify the settings, using fixed common values for randomization_factor and multiplier + ExponentialBackoffBuilder::new() + .with_initial_interval(init_delay) + .with_max_interval(max_delay) + .with_randomization_factor(0.5) + .with_multiplier(2.0) + .with_max_elapsed_time(Some(max_elapsed)) + .build() +} + pub fn column_parquet_metas( file_meta: &ThriftFileMetaData, schema: &TableSchemaRef, diff --git a/tests/suites/0_stateless/03_dml/03_0016_update_with_lock.result b/tests/suites/0_stateless/03_dml/03_0016_update_with_lock.result new file mode 100644 index 000000000000..b47a727a28e6 --- /dev/null +++ b/tests/suites/0_stateless/03_dml/03_0016_update_with_lock.result @@ -0,0 +1,3 @@ +10 +Test table lock for update +10 diff --git a/tests/suites/0_stateless/03_dml/03_0016_update_with_lock.sh b/tests/suites/0_stateless/03_dml/03_0016_update_with_lock.sh new file mode 100755 index 000000000000..eb6cc61e72c3 --- /dev/null +++ b/tests/suites/0_stateless/03_dml/03_0016_update_with_lock.sh @@ -0,0 +1,34 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../../../shell_env.sh + +echo "drop database if exists test_update" | $BENDSQL_CLIENT_CONNECT + +echo "CREATE DATABASE test_update" | $BENDSQL_CLIENT_CONNECT +echo "create table test_update.t(a int, b int)" | $BENDSQL_CLIENT_CONNECT +echo "set global enable_table_lock = 1" | $BENDSQL_CLIENT_CONNECT + +for i in $(seq 1 10);do + ( + j=$(($i+1)) + echo "insert into test_update.t values($i, $j)" | $BENDSQL_CLIENT_CONNECT + )& +done +wait + +echo "optimize table test_update.t compact" | $BENDSQL_CLIENT_CONNECT +echo "select count() from test_update.t where a + 1 = b" | $BENDSQL_CLIENT_CONNECT + +echo "Test table lock for update" +for i in $(seq 1 10);do + ( + echo "update test_update.t set b = $i where a = $i" | $BENDSQL_CLIENT_CONNECT + )& +done +wait + +echo "select count() from test_update.t where a = b" | $BENDSQL_CLIENT_CONNECT + +echo "drop table test_update.t all" | $BENDSQL_CLIENT_CONNECT +echo "drop database test_update" | $BENDSQL_CLIENT_CONNECT