Skip to content

Commit

Permalink
fix: fix RegionAliveKeeper does not find the table after restarting (G…
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu authored and paomian committed Oct 19, 2023
1 parent a377bc8 commit a0e3701
Showing 1 changed file with 37 additions and 20 deletions.
57 changes: 37 additions & 20 deletions src/catalog/src/remote/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,15 @@ impl RemoteCatalogManager {
let engine_manager = self.engine_manager.clone();
let memory_catalog_manager = self.memory_catalog_manager.clone();
let table_metadata_manager = self.table_metadata_manager.clone();
let region_alive_keepers = self.region_alive_keepers.clone();
common_runtime::spawn_bg(async move {
let table_id = datanode_table_value.table_id;
if let Err(e) = open_and_register_table(
engine_manager,
datanode_table_value,
memory_catalog_manager,
table_metadata_manager,
region_alive_keepers,
)
.await
{
Expand All @@ -116,6 +118,7 @@ async fn open_and_register_table(
datanode_table_value: DatanodeTableValue,
memory_catalog_manager: Arc<MemoryCatalogManager>,
table_metadata_manager: TableMetadataManagerRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
) -> Result<()> {
let context = EngineContext {};

Expand Down Expand Up @@ -192,7 +195,8 @@ async fn open_and_register_table(
table_id,
table,
};
let registered = memory_catalog_manager.register_table_sync(request)?;
let registered =
register_table(&memory_catalog_manager, &region_alive_keepers, request).await?;
ensure!(
registered,
TableExistsSnafu {
Expand All @@ -203,6 +207,32 @@ async fn open_and_register_table(
Ok(())
}

async fn register_table(
memory_catalog_manager: &Arc<MemoryCatalogManager>,
region_alive_keepers: &Arc<RegionAliveKeepers>,
request: RegisterTableRequest,
) -> Result<bool> {
let table = request.table.clone();

let registered = memory_catalog_manager.register_table_sync(request)?;

if registered {
let table_info = table.table_info();
let table_ident = TableIdent {
catalog: table_info.catalog_name.clone(),
schema: table_info.schema_name.clone(),
table: table_info.name.clone(),
table_id: table_info.table_id(),
engine: table_info.meta.engine.clone(),
};
region_alive_keepers
.register_table(table_ident, table)
.await?;
}

Ok(registered)
}

#[async_trait]
impl CatalogManager for RemoteCatalogManager {
async fn start(&self) -> Result<()> {
Expand All @@ -221,25 +251,12 @@ impl CatalogManager for RemoteCatalogManager {
}

async fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
let table = request.table.clone();

let registered = self.memory_catalog_manager.register_table_sync(request)?;

if registered {
let table_info = table.table_info();
let table_ident = TableIdent {
catalog: table_info.catalog_name.clone(),
schema: table_info.schema_name.clone(),
table: table_info.name.clone(),
table_id: table_info.table_id(),
engine: table_info.meta.engine.clone(),
};
self.region_alive_keepers
.register_table(table_ident, table)
.await?;
}

Ok(registered)
register_table(
&self.memory_catalog_manager,
&self.region_alive_keepers,
request,
)
.await
}

async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()> {
Expand Down

0 comments on commit a0e3701

Please sign in to comment.