Skip to content

Commit

Permalink
feat: acquire all locks in procedure (#3514)
Browse files Browse the repository at this point in the history
* feat: acquire catalog and schema lock in region failover

* chore: remove unused code

* feat!: acquire catalog and schema lock in region migration

* feat: acquire catalog and schema lock in create table
  • Loading branch information
WenyXu authored Mar 14, 2024
1 parent 8ca9e01 commit a29e7eb
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 75 deletions.
11 changes: 9 additions & 2 deletions src/common/meta/src/ddl/create_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::ddl::DdlContext;
use crate::error::{Result, TableAlreadyExistsSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::lock_key::{TableLock, TableNameLock};
use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
Expand Down Expand Up @@ -307,8 +307,15 @@ impl Procedure for CreateLogicalTablesProcedure {
}

fn lock_key(&self) -> LockKey {
let mut lock_key = Vec::with_capacity(1 + self.creator.data.tasks.len());
// CatalogLock, SchemaLock,
// TableLock
// TableNameLock(s)
let mut lock_key = Vec::with_capacity(2 + 1 + self.creator.data.tasks.len());
let table_ref = self.creator.data.tasks[0].table_ref();
lock_key.push(CatalogLock::Read(table_ref.catalog).into());
lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
lock_key.push(TableLock::Write(self.creator.data.physical_table_id()).into());

for task in &self.creator.data.tasks {
lock_key.push(
TableNameLock::new(
Expand Down
12 changes: 6 additions & 6 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
use crate::error::{self, Result, TableRouteNotFoundSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::lock_key::TableNameLock;
use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{
Expand Down Expand Up @@ -343,11 +343,11 @@ impl Procedure for CreateTableProcedure {
fn lock_key(&self) -> LockKey {
let table_ref = &self.creator.data.table_ref();

LockKey::single(TableNameLock::new(
table_ref.catalog,
table_ref.schema,
table_ref.table,
))
LockKey::new(vec![
CatalogLock::Read(table_ref.catalog).into(),
SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
])
}
}

Expand Down
68 changes: 49 additions & 19 deletions src/meta-srv/src/procedure/region_failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use async_trait::async_trait;
use common_meta::key::datanode_table::DatanodeTableKey;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::lock_key::{RegionLock, TableLock};
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
use common_meta::table_name::TableName;
use common_meta::{ClusterId, RegionIdent};
use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
Expand All @@ -44,7 +45,7 @@ use snafu::ResultExt;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;

use crate::error::{RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu};
use crate::error::{self, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu};
use crate::lock::DistLockRef;
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::service::mailbox::MailboxRef;
Expand Down Expand Up @@ -164,7 +165,14 @@ impl RegionFailoverManager {
return Ok(());
};

if !self.table_exists(failed_region).await? {
let table_info = self
.table_metadata_manager
.table_info_manager()
.get(failed_region.table_id)
.await
.context(error::TableMetadataManagerSnafu)?;

if table_info.is_none() {
// The table could be dropped before the failure detector knows it. Then the region
// failover is not needed.
// Or the table could be renamed. But we will have a new region ident to detect failure.
Expand All @@ -178,7 +186,15 @@ impl RegionFailoverManager {
}

let context = self.create_context();
let procedure = RegionFailoverProcedure::new(failed_region.clone(), context);
// Safety: Check before.
let table_info = table_info.unwrap();
let TableName {
catalog_name,
schema_name,
..
} = table_info.table_name();
let procedure =
RegionFailoverProcedure::new(catalog_name, schema_name, failed_region.clone(), context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
info!("Starting region failover procedure {procedure_id} for region {failed_region:?}");
Expand Down Expand Up @@ -206,16 +222,6 @@ impl RegionFailoverManager {
Ok(())
}

async fn table_exists(&self, failed_region: &RegionIdent) -> Result<bool> {
Ok(self
.table_metadata_manager
.table_route_manager()
.get_region_distribution(failed_region.table_id)
.await
.context(TableMetadataManagerSnafu)?
.is_some())
}

async fn failed_region_exists(&self, failed_region: &RegionIdent) -> Result<bool> {
let table_id = failed_region.table_id;
let datanode_id = failed_region.datanode_id;
Expand All @@ -238,10 +244,17 @@ impl RegionFailoverManager {
}
}

#[derive(Serialize, Deserialize, Debug)]
struct LockMeta {
catalog: String,
schema: String,
}

/// A "Node" in the state machine of region failover procedure.
/// Contains the current state and the data.
#[derive(Serialize, Deserialize, Debug)]
struct Node {
lock_meta: LockMeta,
failed_region: RegionIdent,
state: Box<dyn State>,
}
Expand Down Expand Up @@ -330,9 +343,15 @@ pub struct RegionFailoverProcedure {
impl RegionFailoverProcedure {
const TYPE_NAME: &'static str = "metasrv-procedure::RegionFailover";

pub fn new(failed_region: RegionIdent, context: RegionFailoverContext) -> Self {
pub fn new(
catalog: String,
schema: String,
failed_region: RegionIdent,
context: RegionFailoverContext,
) -> Self {
let state = RegionFailoverStart::new();
let node = Node {
lock_meta: LockMeta { catalog, schema },
failed_region,
state: Box::new(state),
};
Expand Down Expand Up @@ -372,8 +391,9 @@ impl Procedure for RegionFailoverProcedure {

fn lock_key(&self) -> LockKey {
let region_ident = &self.node.failed_region;
// TODO(weny): acquires the catalog, schema read locks.
let lock_key = vec![
CatalogLock::Read(&self.node.lock_meta.catalog).into(),
SchemaLock::read(&self.node.lock_meta.catalog, &self.node.lock_meta.catalog).into(),
TableLock::Read(region_ident.table_id).into(),
RegionLock::Write(RegionId::new(
region_ident.table_id,
Expand Down Expand Up @@ -568,6 +588,8 @@ mod tests {
let failed_region = env.failed_region(1).await;

let mut procedure = Box::new(RegionFailoverProcedure::new(
"greptime".into(),
"public".into(),
failed_region.clone(),
env.context.clone(),
)) as BoxedProcedure;
Expand Down Expand Up @@ -671,7 +693,7 @@ mod tests {

assert_eq!(
procedure.dump().unwrap(),
r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverEnd"}}"#
r#"{"lock_meta":{"catalog":"greptime","schema":"public"},"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverEnd"}}"#
);

// Verifies that the failed region (region 1) is moved from failed datanode (datanode 1) to the candidate datanode.
Expand Down Expand Up @@ -700,6 +722,10 @@ mod tests {

let state = RegionFailoverStart::new();
let node = Node {
lock_meta: LockMeta {
catalog: "greptime".into(),
schema: "public".into(),
},
failed_region,
state: Box::new(state),
};
Expand All @@ -711,12 +737,12 @@ mod tests {
let s = procedure.dump().unwrap();
assert_eq!(
s,
r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverStart","failover_candidate":null}}"#
r#"{"lock_meta":{"catalog":"greptime","schema":"public"},"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverStart","failover_candidate":null}}"#,
);
let n: Node = serde_json::from_str(&s).unwrap();
assert_eq!(
format!("{n:?}"),
r#"Node { failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_id: 1, region_number: 1, engine: "mito2" }, state: RegionFailoverStart { failover_candidate: None } }"#
r#"Node { lock_meta: LockMeta { catalog: "greptime", schema: "public" }, failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_id: 1, region_number: 1, engine: "mito2" }, state: RegionFailoverStart { failover_candidate: None } }"#,
);
}

Expand Down Expand Up @@ -765,6 +791,10 @@ mod tests {

let state = RegionFailoverStart::new();
let node = Node {
lock_meta: LockMeta {
catalog: "greptime".into(),
schema: "public".into(),
},
failed_region,
state: Box::new(state),
};
Expand Down
19 changes: 9 additions & 10 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
// limitations under the License.

pub(crate) mod downgrade_leader_region;
// TODO(weny): remove it.
#[allow(dead_code)]
pub(crate) mod manager;
pub(crate) mod migration_abort;
pub(crate) mod migration_end;
Expand All @@ -36,7 +34,7 @@ use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::lock_key::{RegionLock, TableLock};
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
use common_meta::peer::Peer;
use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
use common_meta::ClusterId;
Expand All @@ -61,6 +59,10 @@ use crate::service::mailbox::{BroadcastChannel, MailboxRef};
/// **Notes: Stores with too large data in the context might incur replication overhead.**
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PersistentContext {
/// The table catalog.
catalog: String,
/// The table schema.
schema: String,
/// The Id of the cluster.
cluster_id: ClusterId,
/// The [Peer] of migration source.
Expand All @@ -81,8 +83,9 @@ fn default_replay_timeout() -> Duration {
impl PersistentContext {
pub fn lock_key(&self) -> Vec<StringKey> {
let region_id = self.region_id;
// TODO(weny): acquires the catalog, schema read locks.
let lock_key = vec![
CatalogLock::Read(&self.catalog).into(),
SchemaLock::read(&self.catalog, &self.schema).into(),
TableLock::Read(region_id.table_id()).into(),
RegionLock::Write(region_id).into(),
];
Expand Down Expand Up @@ -185,8 +188,6 @@ impl ContextFactory for DefaultContextFactory {
}
}

// TODO(weny): remove it.
#[allow(dead_code)]
/// The context of procedure execution.
pub struct Context {
persistent_ctx: PersistentContext,
Expand Down Expand Up @@ -368,7 +369,6 @@ pub struct RegionMigrationProcedure {
context: Context,
}

// TODO(weny): remove it.
#[allow(dead_code)]
impl RegionMigrationProcedure {
const TYPE_NAME: &'static str = "metasrv-procedure::RegionMigration";
Expand Down Expand Up @@ -487,16 +487,15 @@ mod tests {
let procedure = RegionMigrationProcedure::new(persistent_context, context);

let serialized = procedure.dump().unwrap();

let expected = r#"{"persistent_ctx":{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105,"replay_timeout":"1s"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
let expected = r#"{"persistent_ctx":{"catalog":"greptime","schema":"public","cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105,"replay_timeout":"1s"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
assert_eq!(expected, serialized);
}

#[test]
fn test_backward_compatibility() {
let persistent_ctx = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
// NOTES: Changes it will break backward compatibility.
let serialized = r#"{"cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
let serialized = r#"{"catalog":"greptime","schema":"public","cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap();

assert_eq!(persistent_ctx, deserialized);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ mod tests {

fn new_persistent_context() -> PersistentContext {
PersistentContext {
catalog: "greptime".into(),
schema: "public".into(),
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),
Expand Down
Loading

0 comments on commit a29e7eb

Please sign in to comment.