From 7f5abcea52da5c58976858d8db1b6d6c6d7acb91 Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 16 Mar 2024 11:58:01 +0800 Subject: [PATCH 1/6] feat(metasrv): implement maintenance Signed-off-by: tison --- src/common/meta/src/key.rs | 4 +- src/meta-srv/src/error.rs | 9 ++ .../src/handler/failure_handler/runner.rs | 31 ++++-- src/meta-srv/src/metasrv.rs | 12 ++- src/meta-srv/src/procedure/region_failover.rs | 6 +- src/meta-srv/src/service/admin.rs | 11 ++- src/meta-srv/src/service/admin/maintenance.rs | 99 +++++++++++++++++++ 7 files changed, 156 insertions(+), 16 deletions(-) create mode 100644 src/meta-srv/src/service/admin/maintenance.rs diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index ea3f4c1eb571..bef951ded638 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -90,13 +90,13 @@ use crate::kv_backend::KvBackendRef; use crate::rpc::router::{region_distribution, RegionRoute, RegionStatus}; use crate::DatanodeId; -pub const REMOVED_PREFIX: &str = "__removed"; - pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.]*"; +pub const MAINTENANCE_KEY: &str = "maintenance"; const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table"; const TABLE_REGION_KEY_PREFIX: &str = "__table_region"; +pub const REMOVED_PREFIX: &str = "__removed"; pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info"; pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name"; pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name"; diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index fb56bca3bbee..ced24419f3ed 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -301,6 +301,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to parse bool: {}", err_msg))] + ParseBool { + err_msg: String, + #[snafu(source)] + error: std::str::ParseBoolError, + location: Location, + }, + #[snafu(display("Invalid arguments: {}", err_msg))] InvalidArguments { err_msg: String, location: Location }, @@ -709,6 +717,7 @@ impl ErrorExt for Error { | Error::InvalidStatKey { .. } | Error::InvalidInactiveRegionKey { .. } | Error::ParseNum { .. } + | Error::ParseBool { .. } | Error::ParseAddr { .. } | Error::ParseDuration { .. } | Error::UnsupportedSelectorType { .. } diff --git a/src/meta-srv/src/handler/failure_handler/runner.rs b/src/meta-srv/src/handler/failure_handler/runner.rs index 4d9ad3676441..f27e6b54ae99 100644 --- a/src/meta-srv/src/handler/failure_handler/runner.rs +++ b/src/meta-srv/src/handler/failure_handler/runner.rs @@ -162,16 +162,27 @@ impl FailureDetectRunner { }) .collect::>(); - for r in failed_regions { - if let Err(e) = region_failover_manager.do_region_failover(&r).await { - error!(e; "Failed to do region failover for {r}"); - } else { - // Now that we know the region is starting to do failover, remove it - // from the failure detectors, avoiding the failover procedure to be - // triggered again. - // If the region is back alive (the failover procedure runs successfully), - // it will be added back to the failure detectors again. - failure_detectors.remove(&r); + match region_failover_manager.is_maintenance_mode().await { + Ok(false) => { + for r in failed_regions { + if let Err(e) = region_failover_manager.do_region_failover(&r).await + { + error!(e; "Failed to do region failover for {r}"); + } else { + // Now that we know the region is starting to do failover, remove it + // from the failure detectors, avoiding the failover procedure to be + // triggered again. + // If the region is back alive (the failover procedure runs successfully), + // it will be added back to the failure detectors again. + failure_detectors.remove(&r); + } + } + } + Ok(true) => { + info!("Maintenance mode is enabled, skip failover"); + } + Err(err) => { + error!("Failed to check maintenance mode: {}", err); } } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index be126d218f0d..355862a38a0e 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -22,10 +22,11 @@ use common_base::Plugins; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; use common_meta::ddl::ProcedureExecutorRef; -use common_meta::key::TableMetadataManagerRef; +use common_meta::key::{TableMetadataManagerRef, MAINTENANCE_KEY}; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeperRef; +use common_meta::rpc::store::PutRequest; use common_meta::wal_options_allocator::WalOptionsAllocatorRef; use common_meta::{distributed_time_constants, ClusterId}; use common_procedure::options::ProcedureConfig; @@ -364,6 +365,15 @@ impl MetaSrv { .context(StartProcedureManagerSnafu)?; } + if self.kv_backend.exists(MAINTENANCE_KEY.as_bytes()).await? { + let req = PutRequest { + key: Vec::from(MAINTENANCE_KEY), + value: vec![], + prev_kv: false, + }; + self.in_memory.put(req).await?; + } + info!("MetaSrv started"); Ok(()) } diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index ac0e4ecb7bed..50f31d039068 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -26,7 +26,7 @@ use std::time::Duration; use async_trait::async_trait; use common_meta::key::datanode_table::DatanodeTableKey; -use common_meta::key::TableMetadataManagerRef; +use common_meta::key::{TableMetadataManagerRef, MAINTENANCE_KEY}; use common_meta::kv_backend::ResettableKvBackendRef; use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock}; use common_meta::table_name::TableName; @@ -159,6 +159,10 @@ impl RegionFailoverManager { } } + pub(crate) async fn is_maintenance_mode(&self) -> Result { + self.in_memory.exists(MAINTENANCE_KEY.as_bytes()).await + } + pub(crate) async fn do_region_failover(&self, failed_region: &RegionIdent) -> Result<()> { let Some(guard) = self.insert_running_procedures(failed_region) else { warn!("Region failover procedure for region {failed_region} is already running!"); diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index fa1443de32d9..d285e1fabe9d 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -15,10 +15,9 @@ mod health; mod heartbeat; mod leader; +mod maintenance; mod meta; -// TODO(weny): removes it. mod node_lease; -#[allow(dead_code)] mod region_migration; mod route; mod util; @@ -99,6 +98,14 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { }; let router = router.route("/region-migration", handler); + let handler = maintenance::MaintenanceHandler { + kv_backend: meta_srv.kv_backend().clone(), + in_memory: meta_srv.in_memory().clone(), + }; + let router = router + .route("/maintenance", handler.clone()) + .route("/maintenance/set", handler); + let router = Router::nest("/admin", router); Admin::new(router) diff --git a/src/meta-srv/src/service/admin/maintenance.rs b/src/meta-srv/src/service/admin/maintenance.rs new file mode 100644 index 000000000000..b555945144f9 --- /dev/null +++ b/src/meta-srv/src/service/admin/maintenance.rs @@ -0,0 +1,99 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use common_meta::key::MAINTENANCE_KEY; +use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; +use common_meta::rpc::store::PutRequest; +use snafu::{OptionExt, ResultExt}; +use tonic::codegen::http; +use tonic::codegen::http::Response; + +use crate::error::{InvalidHttpBodySnafu, MissingRequiredParameterSnafu, ParseBoolSnafu}; +use crate::service::admin::HttpHandler; + +#[derive(Clone)] +pub struct MaintenanceHandler { + pub kv_backend: KvBackendRef, + pub in_memory: ResettableKvBackendRef, +} + +impl MaintenanceHandler { + async fn get_maintenance(&self) -> crate::Result> { + let enabled = self.in_memory.exists(MAINTENANCE_KEY.as_bytes()).await?; + let response = if enabled { + "Maintenance mode is enabled" + } else { + "Maintenance mode is disabled" + }; + http::Response::builder() + .status(http::StatusCode::OK) + .body(response.into()) + .context(InvalidHttpBodySnafu) + } + + async fn set_maintenance( + &self, + params: &HashMap, + ) -> crate::Result> { + let enable = params + .get("enable") + .map(|v| v.parse::()) + .context(MissingRequiredParameterSnafu { param: "enable" })? + .context(ParseBoolSnafu { + err_msg: "'enable' must be 'true' or 'false'", + })?; + + let req = PutRequest { + key: Vec::from(MAINTENANCE_KEY), + value: vec![], + prev_kv: false, + }; + + let response = if enable { + self.kv_backend.put(req.clone()).await?; + self.in_memory.put(req).await?; + "Maintenance mode enabled" + } else { + self.kv_backend + .delete(MAINTENANCE_KEY.as_bytes(), false) + .await?; + self.in_memory + .delete(MAINTENANCE_KEY.as_bytes(), false) + .await?; + "Maintenance mode disabled" + }; + + http::Response::builder() + .status(http::StatusCode::OK) + .body(response.into()) + .context(InvalidHttpBodySnafu) + } +} + +#[async_trait::async_trait] +impl HttpHandler for MaintenanceHandler { + async fn handle( + &self, + path: &str, + params: &HashMap, + ) -> crate::Result> { + if path.ends_with("/set") { + self.set_maintenance(params).await + } else { + self.get_maintenance().await + } + } +} From c34d543078c65d1e3eff0b5403119aafc9294fe0 Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 16 Mar 2024 12:16:16 +0800 Subject: [PATCH 2/6] fixup and test Signed-off-by: tison --- src/meta-srv/src/handler/failure_handler.rs | 36 +++++++++++++++++++ src/meta-srv/src/metasrv.rs | 13 ++++--- src/meta-srv/src/procedure/region_failover.rs | 9 +++-- src/meta-srv/src/service/admin/maintenance.rs | 23 ++++++++---- 4 files changed, 69 insertions(+), 12 deletions(-) diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index d9cbdee0a1a5..2f4bf6d491cb 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -107,6 +107,9 @@ impl HeartbeatHandler for RegionFailureHandler { #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; + + use common_meta::key::MAINTENANCE_KEY; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; @@ -163,4 +166,37 @@ mod tests { let dump = handler.failure_detect_runner.dump().await; assert_eq!(dump.iter().collect::>().len(), 0); } + + #[tokio::test(flavor = "multi_thread")] + async fn test_maintenance_mode() { + let region_failover_manager = create_region_failover_manager(); + let in_memory = region_failover_manager.create_context().in_memory.clone(); + let _handler = RegionFailureHandler::try_new( + None, + region_failover_manager.clone(), + PhiAccrualFailureDetectorOptions::default(), + ) + .await + .unwrap(); + + let kv_req = common_meta::rpc::store::PutRequest { + key: Vec::from(MAINTENANCE_KEY), + value: vec![], + prev_kv: false, + }; + let _ = in_memory.put(kv_req.clone()).await.unwrap(); + assert_matches!( + region_failover_manager.is_maintenance_mode().await, + Ok(true) + ); + + let _ = in_memory + .delete(MAINTENANCE_KEY.as_bytes(), false) + .await + .unwrap(); + assert_matches!( + region_failover_manager.is_maintenance_mode().await, + Ok(false) + ); + } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 355862a38a0e..c0f40087eb9d 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -44,7 +44,7 @@ use tokio::sync::broadcast::error::RecvError; use crate::cluster::MetaPeerClientRef; use crate::election::{Election, LeaderChangeMessage}; use crate::error::{ - self, InitMetadataSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu, + InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu, StopProcedureManagerSnafu, }; use crate::failure_detector::PhiAccrualFailureDetectorOptions; @@ -358,20 +358,25 @@ impl MetaSrv { self.leader_cached_kv_backend .load() .await - .context(error::KvBackendSnafu)?; + .context(KvBackendSnafu)?; self.procedure_manager .start() .await .context(StartProcedureManagerSnafu)?; } - if self.kv_backend.exists(MAINTENANCE_KEY.as_bytes()).await? { + if self + .kv_backend + .exists(MAINTENANCE_KEY.as_bytes()) + .await + .context(KvBackendSnafu)? + { let req = PutRequest { key: Vec::from(MAINTENANCE_KEY), value: vec![], prev_kv: false, }; - self.in_memory.put(req).await?; + self.in_memory.put(req).await.context(KvBackendSnafu)?; } info!("MetaSrv started"); diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 50f31d039068..c376ff632574 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -45,7 +45,9 @@ use snafu::ResultExt; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; -use crate::error::{self, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu}; +use crate::error::{ + self, KvBackendSnafu, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu, +}; use crate::lock::DistLockRef; use crate::metasrv::{SelectorContext, SelectorRef}; use crate::service::mailbox::MailboxRef; @@ -160,7 +162,10 @@ impl RegionFailoverManager { } pub(crate) async fn is_maintenance_mode(&self) -> Result { - self.in_memory.exists(MAINTENANCE_KEY.as_bytes()).await + self.in_memory + .exists(MAINTENANCE_KEY.as_bytes()) + .await + .context(KvBackendSnafu) } pub(crate) async fn do_region_failover(&self, failed_region: &RegionIdent) -> Result<()> { diff --git a/src/meta-srv/src/service/admin/maintenance.rs b/src/meta-srv/src/service/admin/maintenance.rs index b555945144f9..0a15bb1770c8 100644 --- a/src/meta-srv/src/service/admin/maintenance.rs +++ b/src/meta-srv/src/service/admin/maintenance.rs @@ -21,7 +21,9 @@ use snafu::{OptionExt, ResultExt}; use tonic::codegen::http; use tonic::codegen::http::Response; -use crate::error::{InvalidHttpBodySnafu, MissingRequiredParameterSnafu, ParseBoolSnafu}; +use crate::error::{ + InvalidHttpBodySnafu, KvBackendSnafu, MissingRequiredParameterSnafu, ParseBoolSnafu, +}; use crate::service::admin::HttpHandler; #[derive(Clone)] @@ -32,7 +34,11 @@ pub struct MaintenanceHandler { impl MaintenanceHandler { async fn get_maintenance(&self) -> crate::Result> { - let enabled = self.in_memory.exists(MAINTENANCE_KEY.as_bytes()).await?; + let enabled = self + .in_memory + .exists(MAINTENANCE_KEY.as_bytes()) + .await + .context(KvBackendSnafu)?; let response = if enabled { "Maintenance mode is enabled" } else { @@ -63,16 +69,21 @@ impl MaintenanceHandler { }; let response = if enable { - self.kv_backend.put(req.clone()).await?; - self.in_memory.put(req).await?; + self.kv_backend + .put(req.clone()) + .await + .context(KvBackendSnafu)?; + self.in_memory.put(req).await.context(KvBackendSnafu)?; "Maintenance mode enabled" } else { self.kv_backend .delete(MAINTENANCE_KEY.as_bytes(), false) - .await?; + .await + .context(KvBackendSnafu)?; self.in_memory .delete(MAINTENANCE_KEY.as_bytes(), false) - .await?; + .await + .context(KvBackendSnafu)?; "Maintenance mode disabled" }; From a0265183c7c9ebaffcbdb50f47bc50f39b8fe298 Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 16 Mar 2024 12:17:24 +0800 Subject: [PATCH 3/6] Add coauthors Co-authored-by: Yingwen Co-authored-by: xifyang <595482900@qq.com> From 50f2e85b9ffa06337fececa1f34b16ee1451df93 Mon Sep 17 00:00:00 2001 From: tison Date: Sat, 16 Mar 2024 12:26:56 +0800 Subject: [PATCH 4/6] tidy code Signed-off-by: tison --- .../src/handler/failure_handler/runner.rs | 88 ++++++++++--------- 1 file changed, 48 insertions(+), 40 deletions(-) diff --git a/src/meta-srv/src/handler/failure_handler/runner.rs b/src/meta-srv/src/handler/failure_handler/runner.rs index f27e6b54ae99..677256ba7c02 100644 --- a/src/meta-srv/src/handler/failure_handler/runner.rs +++ b/src/meta-srv/src/handler/failure_handler/runner.rs @@ -140,51 +140,59 @@ impl FailureDetectRunner { let election = self.election.clone(); let region_failover_manager = self.region_failover_manager.clone(); let runner_handle = common_runtime::spawn_bg(async move { + async fn maybe_region_failover( + failure_detectors: &Arc, + region_failover_manager: &Arc, + ) { + match region_failover_manager.is_maintenance_mode().await { + Ok(false) => {} + Ok(true) => { + info!("Maintenance mode is enabled, skip failover"); + return; + } + Err(err) => { + error!("Failed to check maintenance mode: {}", err); + return; + } + } + + let failed_regions = failure_detectors + .iter() + .filter_map(|e| { + // Intentionally not place `current_time_millis()` out of the iteration. + // The failure detection determination should be happened "just in time", + // i.e., failed or not has to be compared with the most recent "now". + // Besides, it might reduce the false positive of failure detection, + // because during the iteration, heartbeats are coming in as usual, + // and the `phi`s are still updating. + if !e.failure_detector().is_available(current_time_millis()) { + Some(e.region_ident().clone()) + } else { + None + } + }) + .collect::>(); + + for r in failed_regions { + if let Err(e) = region_failover_manager.do_region_failover(&r).await { + error!(e; "Failed to do region failover for {r}"); + } else { + // Now that we know the region is starting to do failover, remove it + // from the failure detectors, avoiding the failover procedure to be + // triggered again. + // If the region is back alive (the failover procedure runs successfully), + // it will be added back to the failure detectors again. + failure_detectors.remove(&r); + } + } + } + loop { let start = Instant::now(); let is_leader = election.as_ref().map(|x| x.is_leader()).unwrap_or(true); if is_leader { - let failed_regions = failure_detectors - .iter() - .filter_map(|e| { - // Intentionally not place `current_time_millis()` out of the iteration. - // The failure detection determination should be happened "just in time", - // i.e., failed or not has to be compared with the most recent "now". - // Besides, it might reduce the false positive of failure detection, - // because during the iteration, heartbeats are coming in as usual, - // and the `phi`s are still updating. - if !e.failure_detector().is_available(current_time_millis()) { - Some(e.region_ident().clone()) - } else { - None - } - }) - .collect::>(); - - match region_failover_manager.is_maintenance_mode().await { - Ok(false) => { - for r in failed_regions { - if let Err(e) = region_failover_manager.do_region_failover(&r).await - { - error!(e; "Failed to do region failover for {r}"); - } else { - // Now that we know the region is starting to do failover, remove it - // from the failure detectors, avoiding the failover procedure to be - // triggered again. - // If the region is back alive (the failover procedure runs successfully), - // it will be added back to the failure detectors again. - failure_detectors.remove(&r); - } - } - } - Ok(true) => { - info!("Maintenance mode is enabled, skip failover"); - } - Err(err) => { - error!("Failed to check maintenance mode: {}", err); - } - } + maybe_region_failover(&failure_detectors, ®ion_failover_manager).await; } let elapsed = Instant::now().duration_since(start); From 72dac1f25123d7c171eaaeea11468ded86144758 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 18 Mar 2024 10:59:26 +0800 Subject: [PATCH 5/6] Apply suggestions from code review Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com> --- src/meta-srv/src/handler/failure_handler/runner.rs | 2 +- src/meta-srv/src/service/admin/maintenance.rs | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/meta-srv/src/handler/failure_handler/runner.rs b/src/meta-srv/src/handler/failure_handler/runner.rs index 677256ba7c02..39eb2665c113 100644 --- a/src/meta-srv/src/handler/failure_handler/runner.rs +++ b/src/meta-srv/src/handler/failure_handler/runner.rs @@ -151,7 +151,7 @@ impl FailureDetectRunner { return; } Err(err) => { - error!("Failed to check maintenance mode: {}", err); + error!(err; "Failed to check maintenance mode"); return; } } diff --git a/src/meta-srv/src/service/admin/maintenance.rs b/src/meta-srv/src/service/admin/maintenance.rs index 0a15bb1770c8..9e7237156cba 100644 --- a/src/meta-srv/src/service/admin/maintenance.rs +++ b/src/meta-srv/src/service/admin/maintenance.rs @@ -62,13 +62,12 @@ impl MaintenanceHandler { err_msg: "'enable' must be 'true' or 'false'", })?; - let req = PutRequest { - key: Vec::from(MAINTENANCE_KEY), - value: vec![], - prev_kv: false, - }; - let response = if enable { + let req = PutRequest { + key: Vec::from(MAINTENANCE_KEY), + value: vec![], + prev_kv: false, + }; self.kv_backend .put(req.clone()) .await From 7b407fcb67648aa750229d75c407ef3aa756cdda Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 18 Mar 2024 16:52:07 +0800 Subject: [PATCH 6/6] always read kv_backend maintenance state Signed-off-by: tison --- src/meta-srv/src/handler/failure_handler.rs | 6 +++--- src/meta-srv/src/metasrv.rs | 17 +---------------- src/meta-srv/src/metasrv/builder.rs | 1 + src/meta-srv/src/procedure/region_failover.rs | 11 +++++++++-- src/meta-srv/src/service/admin.rs | 1 - src/meta-srv/src/service/admin/maintenance.rs | 10 ++-------- src/meta-srv/src/test_util.rs | 1 + tests-integration/tests/region_failover.rs | 1 + 8 files changed, 18 insertions(+), 30 deletions(-) diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 2f4bf6d491cb..9748737fae30 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -170,7 +170,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_maintenance_mode() { let region_failover_manager = create_region_failover_manager(); - let in_memory = region_failover_manager.create_context().in_memory.clone(); + let kv_backend = region_failover_manager.create_context().kv_backend.clone(); let _handler = RegionFailureHandler::try_new( None, region_failover_manager.clone(), @@ -184,13 +184,13 @@ mod tests { value: vec![], prev_kv: false, }; - let _ = in_memory.put(kv_req.clone()).await.unwrap(); + let _ = kv_backend.put(kv_req.clone()).await.unwrap(); assert_matches!( region_failover_manager.is_maintenance_mode().await, Ok(true) ); - let _ = in_memory + let _ = kv_backend .delete(MAINTENANCE_KEY.as_bytes(), false) .await .unwrap(); diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index c0f40087eb9d..68b3579298db 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -22,11 +22,10 @@ use common_base::Plugins; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; use common_meta::ddl::ProcedureExecutorRef; -use common_meta::key::{TableMetadataManagerRef, MAINTENANCE_KEY}; +use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeperRef; -use common_meta::rpc::store::PutRequest; use common_meta::wal_options_allocator::WalOptionsAllocatorRef; use common_meta::{distributed_time_constants, ClusterId}; use common_procedure::options::ProcedureConfig; @@ -365,20 +364,6 @@ impl MetaSrv { .context(StartProcedureManagerSnafu)?; } - if self - .kv_backend - .exists(MAINTENANCE_KEY.as_bytes()) - .await - .context(KvBackendSnafu)? - { - let req = PutRequest { - key: Vec::from(MAINTENANCE_KEY), - value: vec![], - prev_kv: false, - }; - self.in_memory.put(req).await.context(KvBackendSnafu)?; - } - info!("MetaSrv started"); Ok(()) } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 7447fdc67b1a..81c83816eed1 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -260,6 +260,7 @@ impl MetaSrvBuilder { let region_failover_manager = Arc::new(RegionFailoverManager::new( distributed_time_constants::REGION_LEASE_SECS, in_memory.clone(), + kv_backend.clone(), mailbox.clone(), procedure_manager.clone(), (selector.clone(), selector_ctx.clone()), diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index c376ff632574..7d82ad36d520 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -27,7 +27,7 @@ use std::time::Duration; use async_trait::async_trait; use common_meta::key::datanode_table::DatanodeTableKey; use common_meta::key::{TableMetadataManagerRef, MAINTENANCE_KEY}; -use common_meta::kv_backend::ResettableKvBackendRef; +use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock}; use common_meta::table_name::TableName; use common_meta::{ClusterId, RegionIdent}; @@ -75,6 +75,7 @@ impl From for RegionFailoverKey { pub(crate) struct RegionFailoverManager { region_lease_secs: u64, in_memory: ResettableKvBackendRef, + kv_backend: KvBackendRef, mailbox: MailboxRef, procedure_manager: ProcedureManagerRef, selector: SelectorRef, @@ -96,9 +97,11 @@ impl Drop for FailoverProcedureGuard { } impl RegionFailoverManager { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( region_lease_secs: u64, in_memory: ResettableKvBackendRef, + kv_backend: KvBackendRef, mailbox: MailboxRef, procedure_manager: ProcedureManagerRef, (selector, selector_ctx): (SelectorRef, SelectorContext), @@ -108,6 +111,7 @@ impl RegionFailoverManager { Self { region_lease_secs, in_memory, + kv_backend, mailbox, procedure_manager, selector, @@ -122,6 +126,7 @@ impl RegionFailoverManager { RegionFailoverContext { region_lease_secs: self.region_lease_secs, in_memory: self.in_memory.clone(), + kv_backend: self.kv_backend.clone(), mailbox: self.mailbox.clone(), selector: self.selector.clone(), selector_ctx: self.selector_ctx.clone(), @@ -162,7 +167,7 @@ impl RegionFailoverManager { } pub(crate) async fn is_maintenance_mode(&self) -> Result { - self.in_memory + self.kv_backend .exists(MAINTENANCE_KEY.as_bytes()) .await .context(KvBackendSnafu) @@ -273,6 +278,7 @@ struct Node { pub struct RegionFailoverContext { pub region_lease_secs: u64, pub in_memory: ResettableKvBackendRef, + pub kv_backend: KvBackendRef, pub mailbox: MailboxRef, pub selector: SelectorRef, pub selector_ctx: SelectorContext, @@ -578,6 +584,7 @@ mod tests { context: RegionFailoverContext { region_lease_secs: 10, in_memory, + kv_backend, mailbox, selector, selector_ctx, diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index d285e1fabe9d..7bf0d04640de 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -100,7 +100,6 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { let handler = maintenance::MaintenanceHandler { kv_backend: meta_srv.kv_backend().clone(), - in_memory: meta_srv.in_memory().clone(), }; let router = router .route("/maintenance", handler.clone()) diff --git a/src/meta-srv/src/service/admin/maintenance.rs b/src/meta-srv/src/service/admin/maintenance.rs index 9e7237156cba..01e62aece6ef 100644 --- a/src/meta-srv/src/service/admin/maintenance.rs +++ b/src/meta-srv/src/service/admin/maintenance.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use common_meta::key::MAINTENANCE_KEY; -use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; +use common_meta::kv_backend::KvBackendRef; use common_meta::rpc::store::PutRequest; use snafu::{OptionExt, ResultExt}; use tonic::codegen::http; @@ -29,13 +29,12 @@ use crate::service::admin::HttpHandler; #[derive(Clone)] pub struct MaintenanceHandler { pub kv_backend: KvBackendRef, - pub in_memory: ResettableKvBackendRef, } impl MaintenanceHandler { async fn get_maintenance(&self) -> crate::Result> { let enabled = self - .in_memory + .kv_backend .exists(MAINTENANCE_KEY.as_bytes()) .await .context(KvBackendSnafu)?; @@ -72,17 +71,12 @@ impl MaintenanceHandler { .put(req.clone()) .await .context(KvBackendSnafu)?; - self.in_memory.put(req).await.context(KvBackendSnafu)?; "Maintenance mode enabled" } else { self.kv_backend .delete(MAINTENANCE_KEY.as_bytes(), false) .await .context(KvBackendSnafu)?; - self.in_memory - .delete(MAINTENANCE_KEY.as_bytes(), false) - .await - .context(KvBackendSnafu)?; "Maintenance mode disabled" }; diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index 4d021fae97fa..b6fa285311f6 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -86,6 +86,7 @@ pub(crate) fn create_region_failover_manager() -> Arc { Arc::new(RegionFailoverManager::new( 10, in_memory, + kv_backend.clone(), mailbox, procedure_manager, (selector, selector_ctx), diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index 74289149c066..e0d82f658704 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -352,6 +352,7 @@ async fn run_region_failover_procedure( RegionFailoverContext { region_lease_secs: 10, in_memory: meta_srv.in_memory().clone(), + kv_backend: meta_srv.kv_backend().clone(), mailbox: meta_srv.mailbox().clone(), selector, selector_ctx: SelectorContext {