diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs index 0713807cf7f1..2e7afd37efc7 100644 --- a/src/common/meta/src/cache_invalidator.rs +++ b/src/common/meta/src/cache_invalidator.rs @@ -18,6 +18,7 @@ use tokio::sync::RwLock; use crate::error::Result; use crate::instruction::CacheIdent; +use crate::key::schema_name::SchemaNameKey; use crate::key::table_info::TableInfoKey; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteKey; @@ -107,6 +108,10 @@ where let key: TableNameKey = (&table_name).into(); self.invalidate_key(&key.as_raw_key()).await } + CacheIdent::SchemaName(schema_name) => { + let key: SchemaNameKey = (&schema_name).into(); + self.invalidate_key(&key.as_raw_key()).await; + } } } Ok(()) diff --git a/src/common/meta/src/ddl/drop_database/metadata.rs b/src/common/meta/src/ddl/drop_database/metadata.rs index b6e25197ea8b..f06c51963a78 100644 --- a/src/common/meta/src/ddl/drop_database/metadata.rs +++ b/src/common/meta/src/ddl/drop_database/metadata.rs @@ -18,10 +18,12 @@ use common_procedure::Status; use serde::{Deserialize, Serialize}; use super::end::DropDatabaseEnd; +use crate::cache_invalidator::Context; use crate::ddl::drop_database::{DropDatabaseContext, State}; use crate::ddl::DdlContext; use crate::error::Result; -use crate::key::schema_name::SchemaNameKey; +use crate::instruction::CacheIdent; +use crate::key::schema_name::{SchemaName, SchemaNameKey}; #[derive(Debug, Serialize, Deserialize)] pub(crate) struct DropDatabaseRemoveMetadata; @@ -40,7 +42,53 @@ impl State for DropDatabaseRemoveMetadata { .delete(SchemaNameKey::new(&ctx.catalog, &ctx.schema)) .await?; - return Ok((Box::new(DropDatabaseEnd), Status::done())); + return Ok((Box::new(DropMetadataBroadcast), Status::executing(true))); + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct DropMetadataBroadcast; + +impl DropMetadataBroadcast { + /// Invalidates frontend caches + async fn invalidate_schema_cache( + &self, + ddl_ctx: &DdlContext, + db_ctx: &mut DropDatabaseContext, + ) -> Result<()> { + let cache_invalidator = &ddl_ctx.cache_invalidator; + let ctx = Context { + subject: Some("Invalidate schema cache by dropping database".to_string()), + }; + + cache_invalidator + .invalidate( + &ctx, + vec![CacheIdent::SchemaName(SchemaName { + catalog_name: db_ctx.catalog.clone(), + schema_name: db_ctx.schema.clone(), + })], + ) + .await?; + + Ok(()) + } +} + +#[async_trait::async_trait] +#[typetag::serde] +impl State for DropMetadataBroadcast { + async fn next( + &mut self, + ddl_ctx: &DdlContext, + ctx: &mut DropDatabaseContext, + ) -> Result<(Box, Status)> { + self.invalidate_schema_cache(ddl_ctx, ctx).await?; + Ok((Box::new(DropDatabaseEnd), Status::done())) } fn as_any(&self) -> &dyn Any { @@ -53,7 +101,7 @@ mod tests { use std::sync::Arc; use crate::ddl::drop_database::end::DropDatabaseEnd; - use crate::ddl::drop_database::metadata::DropDatabaseRemoveMetadata; + use crate::ddl::drop_database::metadata::{DropDatabaseRemoveMetadata, DropMetadataBroadcast}; use crate::ddl::drop_database::{DropDatabaseContext, State}; use crate::key::schema_name::SchemaNameKey; use crate::test_util::{new_ddl_context, MockDatanodeManager}; @@ -76,14 +124,23 @@ mod tests { tables: None, }; let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); - state.as_any().downcast_ref::().unwrap(); - assert!(status.is_done()); + state + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!status.is_done()); assert!(!ddl_context .table_metadata_manager .schema_manager() .exists(SchemaNameKey::new("foo", "bar")) .await .unwrap()); + + let mut state = DropMetadataBroadcast; + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + state.as_any().downcast_ref::().unwrap(); + assert!(status.is_done()); + // Schema not exists let mut state = DropDatabaseRemoveMetadata; let mut ctx = DropDatabaseContext { @@ -93,7 +150,10 @@ mod tests { tables: None, }; let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); - state.as_any().downcast_ref::().unwrap(); - assert!(status.is_done()); + state + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!status.is_done()); } } diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index caefe2eb221e..7981f2449d35 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -21,6 +21,7 @@ use store_api::storage::{RegionId, RegionNumber}; use strum::Display; use table::metadata::TableId; +use crate::key::schema_name::SchemaName; use crate::table_name::TableName; use crate::{ClusterId, DatanodeId}; @@ -156,6 +157,7 @@ pub struct UpgradeRegion { pub enum CacheIdent { TableId(TableId), TableName(TableName), + SchemaName(SchemaName), } #[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)] diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs index 8b391ca9b3d7..17fdb6a4c220 100644 --- a/src/common/meta/src/key/schema_name.rs +++ b/src/common/meta/src/key/schema_name.rs @@ -197,6 +197,21 @@ impl SchemaManager { } } +#[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)] +pub struct SchemaName { + pub catalog_name: String, + pub schema_name: String, +} + +impl<'a> From<&'a SchemaName> for SchemaNameKey<'a> { + fn from(value: &'a SchemaName) -> Self { + Self { + catalog: &value.catalog_name, + schema: &value.schema_name, + } + } +} + #[cfg(test)] mod tests { diff --git a/src/frontend/src/heartbeat/handler/tests.rs b/src/frontend/src/heartbeat/handler/tests.rs index 1b6885ddb777..c5b5b4ecde81 100644 --- a/src/frontend/src/heartbeat/handler/tests.rs +++ b/src/frontend/src/heartbeat/handler/tests.rs @@ -22,6 +22,7 @@ use common_meta::heartbeat::handler::{ }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; use common_meta::instruction::{CacheIdent, Instruction}; +use common_meta::key::schema_name::{SchemaName, SchemaNameKey}; use common_meta::key::table_info::TableInfoKey; use common_meta::key::TableMetaKey; use partition::manager::TableRouteCacheInvalidator; @@ -53,6 +54,27 @@ impl TableRouteCacheInvalidator for MockTableRouteCacheInvalidator { } } +pub fn test_message_meta(id: u64, subject: &str, to: &str, from: &str) -> MessageMeta { + MessageMeta { + id, + subject: subject.to_string(), + to: to.to_string(), + from: from.to_string(), + } +} + +async fn handle_instruction( + executor: Arc, + mailbox: Arc, + instruction: Instruction, +) { + let response = HeartbeatResponse::default(); + let mut ctx: HeartbeatResponseHandlerContext = + HeartbeatResponseHandlerContext::new(mailbox, response); + ctx.incoming_message = Some((test_message_meta(1, "hi", "foo", "bar"), instruction)); + executor.handle(ctx).await.unwrap(); +} + #[tokio::test] async fn test_invalidate_table_cache_handler() { let table_id = 1; @@ -92,23 +114,45 @@ async fn test_invalidate_table_cache_handler() { .await; } -pub fn test_message_meta(id: u64, subject: &str, to: &str, from: &str) -> MessageMeta { - MessageMeta { - id, - subject: subject.to_string(), - to: to.to_string(), - from: from.to_string(), - } -} +#[tokio::test] +async fn test_invalidate_schema_key_handler() { + let (catalog, schema) = ("foo", "bar"); + let schema_key = SchemaNameKey { catalog, schema }; + let inner = HashMap::from([(schema_key.as_raw_key(), 1)]); + let backend = Arc::new(MockKvCacheInvalidator { + inner: Mutex::new(inner), + }); -async fn handle_instruction( - executor: Arc, - mailbox: Arc, - instruction: Instruction, -) { - let response = HeartbeatResponse::default(); - let mut ctx: HeartbeatResponseHandlerContext = - HeartbeatResponseHandlerContext::new(mailbox, response); - ctx.incoming_message = Some((test_message_meta(1, "hi", "foo", "bar"), instruction)); - executor.handle(ctx).await.unwrap(); + let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new( + InvalidateTableCacheHandler::new(backend.clone()), + )])); + + let (tx, _) = mpsc::channel(8); + let mailbox = Arc::new(HeartbeatMailbox::new(tx)); + + // removes a valid key + let valid_key = SchemaName { + catalog_name: catalog.to_string(), + schema_name: schema.to_string(), + }; + handle_instruction( + executor.clone(), + mailbox.clone(), + Instruction::InvalidateCaches(vec![CacheIdent::SchemaName(valid_key.clone())]), + ) + .await; + + assert!(!backend + .inner + .lock() + .unwrap() + .contains_key(&schema_key.as_raw_key())); + + // removes a invalid key + handle_instruction( + executor, + mailbox, + Instruction::InvalidateCaches(vec![CacheIdent::SchemaName(valid_key)]), + ) + .await; }