diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index ea3f4c1eb571..bef951ded638 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -90,13 +90,13 @@ use crate::kv_backend::KvBackendRef; use crate::rpc::router::{region_distribution, RegionRoute, RegionStatus}; use crate::DatanodeId; -pub const REMOVED_PREFIX: &str = "__removed"; - pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.]*"; +pub const MAINTENANCE_KEY: &str = "maintenance"; const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table"; const TABLE_REGION_KEY_PREFIX: &str = "__table_region"; +pub const REMOVED_PREFIX: &str = "__removed"; pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info"; pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name"; pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name"; diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index fb56bca3bbee..ced24419f3ed 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -301,6 +301,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to parse bool: {}", err_msg))] + ParseBool { + err_msg: String, + #[snafu(source)] + error: std::str::ParseBoolError, + location: Location, + }, + #[snafu(display("Invalid arguments: {}", err_msg))] InvalidArguments { err_msg: String, location: Location }, @@ -709,6 +717,7 @@ impl ErrorExt for Error { | Error::InvalidStatKey { .. } | Error::InvalidInactiveRegionKey { .. } | Error::ParseNum { .. } + | Error::ParseBool { .. } | Error::ParseAddr { .. } | Error::ParseDuration { .. } | Error::UnsupportedSelectorType { .. } diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index d9cbdee0a1a5..9748737fae30 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -107,6 +107,9 @@ impl HeartbeatHandler for RegionFailureHandler { #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; + + use common_meta::key::MAINTENANCE_KEY; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; @@ -163,4 +166,37 @@ mod tests { let dump = handler.failure_detect_runner.dump().await; assert_eq!(dump.iter().collect::>().len(), 0); } + + #[tokio::test(flavor = "multi_thread")] + async fn test_maintenance_mode() { + let region_failover_manager = create_region_failover_manager(); + let kv_backend = region_failover_manager.create_context().kv_backend.clone(); + let _handler = RegionFailureHandler::try_new( + None, + region_failover_manager.clone(), + PhiAccrualFailureDetectorOptions::default(), + ) + .await + .unwrap(); + + let kv_req = common_meta::rpc::store::PutRequest { + key: Vec::from(MAINTENANCE_KEY), + value: vec![], + prev_kv: false, + }; + let _ = kv_backend.put(kv_req.clone()).await.unwrap(); + assert_matches!( + region_failover_manager.is_maintenance_mode().await, + Ok(true) + ); + + let _ = kv_backend + .delete(MAINTENANCE_KEY.as_bytes(), false) + .await + .unwrap(); + assert_matches!( + region_failover_manager.is_maintenance_mode().await, + Ok(false) + ); + } } diff --git a/src/meta-srv/src/handler/failure_handler/runner.rs b/src/meta-srv/src/handler/failure_handler/runner.rs index 4d9ad3676441..39eb2665c113 100644 --- a/src/meta-srv/src/handler/failure_handler/runner.rs +++ b/src/meta-srv/src/handler/failure_handler/runner.rs @@ -140,40 +140,59 @@ impl FailureDetectRunner { let election = self.election.clone(); let region_failover_manager = self.region_failover_manager.clone(); let runner_handle = common_runtime::spawn_bg(async move { + async fn maybe_region_failover( + failure_detectors: &Arc, + region_failover_manager: &Arc, + ) { + match region_failover_manager.is_maintenance_mode().await { + Ok(false) => {} + Ok(true) => { + info!("Maintenance mode is enabled, skip failover"); + return; + } + Err(err) => { + error!(err; "Failed to check maintenance mode"); + return; + } + } + + let failed_regions = failure_detectors + .iter() + .filter_map(|e| { + // Intentionally not place `current_time_millis()` out of the iteration. + // The failure detection determination should be happened "just in time", + // i.e., failed or not has to be compared with the most recent "now". + // Besides, it might reduce the false positive of failure detection, + // because during the iteration, heartbeats are coming in as usual, + // and the `phi`s are still updating. + if !e.failure_detector().is_available(current_time_millis()) { + Some(e.region_ident().clone()) + } else { + None + } + }) + .collect::>(); + + for r in failed_regions { + if let Err(e) = region_failover_manager.do_region_failover(&r).await { + error!(e; "Failed to do region failover for {r}"); + } else { + // Now that we know the region is starting to do failover, remove it + // from the failure detectors, avoiding the failover procedure to be + // triggered again. + // If the region is back alive (the failover procedure runs successfully), + // it will be added back to the failure detectors again. + failure_detectors.remove(&r); + } + } + } + loop { let start = Instant::now(); let is_leader = election.as_ref().map(|x| x.is_leader()).unwrap_or(true); if is_leader { - let failed_regions = failure_detectors - .iter() - .filter_map(|e| { - // Intentionally not place `current_time_millis()` out of the iteration. - // The failure detection determination should be happened "just in time", - // i.e., failed or not has to be compared with the most recent "now". - // Besides, it might reduce the false positive of failure detection, - // because during the iteration, heartbeats are coming in as usual, - // and the `phi`s are still updating. - if !e.failure_detector().is_available(current_time_millis()) { - Some(e.region_ident().clone()) - } else { - None - } - }) - .collect::>(); - - for r in failed_regions { - if let Err(e) = region_failover_manager.do_region_failover(&r).await { - error!(e; "Failed to do region failover for {r}"); - } else { - // Now that we know the region is starting to do failover, remove it - // from the failure detectors, avoiding the failover procedure to be - // triggered again. - // If the region is back alive (the failover procedure runs successfully), - // it will be added back to the failure detectors again. - failure_detectors.remove(&r); - } - } + maybe_region_failover(&failure_detectors, ®ion_failover_manager).await; } let elapsed = Instant::now().duration_since(start); diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index be126d218f0d..68b3579298db 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -43,7 +43,7 @@ use tokio::sync::broadcast::error::RecvError; use crate::cluster::MetaPeerClientRef; use crate::election::{Election, LeaderChangeMessage}; use crate::error::{ - self, InitMetadataSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu, + InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu, StopProcedureManagerSnafu, }; use crate::failure_detector::PhiAccrualFailureDetectorOptions; @@ -357,7 +357,7 @@ impl MetaSrv { self.leader_cached_kv_backend .load() .await - .context(error::KvBackendSnafu)?; + .context(KvBackendSnafu)?; self.procedure_manager .start() .await diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 7447fdc67b1a..81c83816eed1 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -260,6 +260,7 @@ impl MetaSrvBuilder { let region_failover_manager = Arc::new(RegionFailoverManager::new( distributed_time_constants::REGION_LEASE_SECS, in_memory.clone(), + kv_backend.clone(), mailbox.clone(), procedure_manager.clone(), (selector.clone(), selector_ctx.clone()), diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index ac0e4ecb7bed..7d82ad36d520 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -26,8 +26,8 @@ use std::time::Duration; use async_trait::async_trait; use common_meta::key::datanode_table::DatanodeTableKey; -use common_meta::key::TableMetadataManagerRef; -use common_meta::kv_backend::ResettableKvBackendRef; +use common_meta::key::{TableMetadataManagerRef, MAINTENANCE_KEY}; +use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock}; use common_meta::table_name::TableName; use common_meta::{ClusterId, RegionIdent}; @@ -45,7 +45,9 @@ use snafu::ResultExt; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; -use crate::error::{self, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu}; +use crate::error::{ + self, KvBackendSnafu, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu, +}; use crate::lock::DistLockRef; use crate::metasrv::{SelectorContext, SelectorRef}; use crate::service::mailbox::MailboxRef; @@ -73,6 +75,7 @@ impl From for RegionFailoverKey { pub(crate) struct RegionFailoverManager { region_lease_secs: u64, in_memory: ResettableKvBackendRef, + kv_backend: KvBackendRef, mailbox: MailboxRef, procedure_manager: ProcedureManagerRef, selector: SelectorRef, @@ -94,9 +97,11 @@ impl Drop for FailoverProcedureGuard { } impl RegionFailoverManager { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( region_lease_secs: u64, in_memory: ResettableKvBackendRef, + kv_backend: KvBackendRef, mailbox: MailboxRef, procedure_manager: ProcedureManagerRef, (selector, selector_ctx): (SelectorRef, SelectorContext), @@ -106,6 +111,7 @@ impl RegionFailoverManager { Self { region_lease_secs, in_memory, + kv_backend, mailbox, procedure_manager, selector, @@ -120,6 +126,7 @@ impl RegionFailoverManager { RegionFailoverContext { region_lease_secs: self.region_lease_secs, in_memory: self.in_memory.clone(), + kv_backend: self.kv_backend.clone(), mailbox: self.mailbox.clone(), selector: self.selector.clone(), selector_ctx: self.selector_ctx.clone(), @@ -159,6 +166,13 @@ impl RegionFailoverManager { } } + pub(crate) async fn is_maintenance_mode(&self) -> Result { + self.kv_backend + .exists(MAINTENANCE_KEY.as_bytes()) + .await + .context(KvBackendSnafu) + } + pub(crate) async fn do_region_failover(&self, failed_region: &RegionIdent) -> Result<()> { let Some(guard) = self.insert_running_procedures(failed_region) else { warn!("Region failover procedure for region {failed_region} is already running!"); @@ -264,6 +278,7 @@ struct Node { pub struct RegionFailoverContext { pub region_lease_secs: u64, pub in_memory: ResettableKvBackendRef, + pub kv_backend: KvBackendRef, pub mailbox: MailboxRef, pub selector: SelectorRef, pub selector_ctx: SelectorContext, @@ -569,6 +584,7 @@ mod tests { context: RegionFailoverContext { region_lease_secs: 10, in_memory, + kv_backend, mailbox, selector, selector_ctx, diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index fa1443de32d9..7bf0d04640de 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -15,10 +15,9 @@ mod health; mod heartbeat; mod leader; +mod maintenance; mod meta; -// TODO(weny): removes it. mod node_lease; -#[allow(dead_code)] mod region_migration; mod route; mod util; @@ -99,6 +98,13 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { }; let router = router.route("/region-migration", handler); + let handler = maintenance::MaintenanceHandler { + kv_backend: meta_srv.kv_backend().clone(), + }; + let router = router + .route("/maintenance", handler.clone()) + .route("/maintenance/set", handler); + let router = Router::nest("/admin", router); Admin::new(router) diff --git a/src/meta-srv/src/service/admin/maintenance.rs b/src/meta-srv/src/service/admin/maintenance.rs new file mode 100644 index 000000000000..01e62aece6ef --- /dev/null +++ b/src/meta-srv/src/service/admin/maintenance.rs @@ -0,0 +1,103 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use common_meta::key::MAINTENANCE_KEY; +use common_meta::kv_backend::KvBackendRef; +use common_meta::rpc::store::PutRequest; +use snafu::{OptionExt, ResultExt}; +use tonic::codegen::http; +use tonic::codegen::http::Response; + +use crate::error::{ + InvalidHttpBodySnafu, KvBackendSnafu, MissingRequiredParameterSnafu, ParseBoolSnafu, +}; +use crate::service::admin::HttpHandler; + +#[derive(Clone)] +pub struct MaintenanceHandler { + pub kv_backend: KvBackendRef, +} + +impl MaintenanceHandler { + async fn get_maintenance(&self) -> crate::Result> { + let enabled = self + .kv_backend + .exists(MAINTENANCE_KEY.as_bytes()) + .await + .context(KvBackendSnafu)?; + let response = if enabled { + "Maintenance mode is enabled" + } else { + "Maintenance mode is disabled" + }; + http::Response::builder() + .status(http::StatusCode::OK) + .body(response.into()) + .context(InvalidHttpBodySnafu) + } + + async fn set_maintenance( + &self, + params: &HashMap, + ) -> crate::Result> { + let enable = params + .get("enable") + .map(|v| v.parse::()) + .context(MissingRequiredParameterSnafu { param: "enable" })? + .context(ParseBoolSnafu { + err_msg: "'enable' must be 'true' or 'false'", + })?; + + let response = if enable { + let req = PutRequest { + key: Vec::from(MAINTENANCE_KEY), + value: vec![], + prev_kv: false, + }; + self.kv_backend + .put(req.clone()) + .await + .context(KvBackendSnafu)?; + "Maintenance mode enabled" + } else { + self.kv_backend + .delete(MAINTENANCE_KEY.as_bytes(), false) + .await + .context(KvBackendSnafu)?; + "Maintenance mode disabled" + }; + + http::Response::builder() + .status(http::StatusCode::OK) + .body(response.into()) + .context(InvalidHttpBodySnafu) + } +} + +#[async_trait::async_trait] +impl HttpHandler for MaintenanceHandler { + async fn handle( + &self, + path: &str, + params: &HashMap, + ) -> crate::Result> { + if path.ends_with("/set") { + self.set_maintenance(params).await + } else { + self.get_maintenance().await + } + } +} diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index 4d021fae97fa..b6fa285311f6 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -86,6 +86,7 @@ pub(crate) fn create_region_failover_manager() -> Arc { Arc::new(RegionFailoverManager::new( 10, in_memory, + kv_backend.clone(), mailbox, procedure_manager, (selector, selector_ctx), diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index 74289149c066..e0d82f658704 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -352,6 +352,7 @@ async fn run_region_failover_procedure( RegionFailoverContext { region_lease_secs: 10, in_memory: meta_srv.in_memory().clone(), + kv_backend: meta_srv.kv_backend().clone(), mailbox: meta_srv.mailbox().clone(), selector, selector_ctx: SelectorContext {