diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index eed7986d325e..bc9e296d671f 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -85,6 +85,7 @@ impl RemoteCatalogManager { let engine_manager = self.engine_manager.clone(); let memory_catalog_manager = self.memory_catalog_manager.clone(); let table_metadata_manager = self.table_metadata_manager.clone(); + let region_alive_keepers = self.region_alive_keepers.clone(); common_runtime::spawn_bg(async move { let table_id = datanode_table_value.table_id; if let Err(e) = open_and_register_table( @@ -92,6 +93,7 @@ impl RemoteCatalogManager { datanode_table_value, memory_catalog_manager, table_metadata_manager, + region_alive_keepers, ) .await { @@ -116,6 +118,7 @@ async fn open_and_register_table( datanode_table_value: DatanodeTableValue, memory_catalog_manager: Arc, table_metadata_manager: TableMetadataManagerRef, + region_alive_keepers: Arc, ) -> Result<()> { let context = EngineContext {}; @@ -192,7 +195,8 @@ async fn open_and_register_table( table_id, table, }; - let registered = memory_catalog_manager.register_table_sync(request)?; + let registered = + register_table(&memory_catalog_manager, ®ion_alive_keepers, request).await?; ensure!( registered, TableExistsSnafu { @@ -203,6 +207,32 @@ async fn open_and_register_table( Ok(()) } +async fn register_table( + memory_catalog_manager: &Arc, + region_alive_keepers: &Arc, + request: RegisterTableRequest, +) -> Result { + let table = request.table.clone(); + + let registered = memory_catalog_manager.register_table_sync(request)?; + + if registered { + let table_info = table.table_info(); + let table_ident = TableIdent { + catalog: table_info.catalog_name.clone(), + schema: table_info.schema_name.clone(), + table: table_info.name.clone(), + table_id: table_info.table_id(), + engine: table_info.meta.engine.clone(), + }; + region_alive_keepers + .register_table(table_ident, table) + .await?; + } + + Ok(registered) +} + #[async_trait] impl CatalogManager for RemoteCatalogManager { async fn start(&self) -> Result<()> { @@ -221,25 +251,12 @@ impl CatalogManager for RemoteCatalogManager { } async fn register_table(&self, request: RegisterTableRequest) -> Result { - let table = request.table.clone(); - - let registered = self.memory_catalog_manager.register_table_sync(request)?; - - if registered { - let table_info = table.table_info(); - let table_ident = TableIdent { - catalog: table_info.catalog_name.clone(), - schema: table_info.schema_name.clone(), - table: table_info.name.clone(), - table_id: table_info.table_id(), - engine: table_info.meta.engine.clone(), - }; - self.region_alive_keepers - .register_table(table_ident, table) - .await?; - } - - Ok(registered) + register_table( + &self.memory_catalog_manager, + &self.region_alive_keepers, + request, + ) + .await } async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()> {