Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Sep 2, 2023
1 parent a05b3d7 commit 89d13c8
Showing 1 changed file with 32 additions and 24 deletions.
56 changes: 32 additions & 24 deletions src/meta-srv/src/selector/load_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@
// limitations under the License.

use api::v1::meta::Peer;
use common_catalog::format_full_table_name;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::TableMetadataManager;
use common_meta::rpc::router::find_leaders;
use common_telemetry::warn;
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;

use crate::error::{self, Result};
use crate::keys::{LeaseKey, LeaseValue, StatKey};
Expand All @@ -31,6 +30,36 @@ const MAX_REGION_NUMBER: u64 = u64::MAX;

pub struct LoadBasedSelector;

async fn get_leader_peer_ids(
table_metadata_manager: &TableMetadataManager,
catalog: &str,
schema: &str,
table: &str,
) -> Result<Vec<u64>> {
let table_name = table_metadata_manager
.table_name_manager()
.get(TableNameKey::new(catalog, schema, table))
.await
.context(error::TableMetadataManagerSnafu)?;

Ok(if let Some(table_name) = table_name {
table_metadata_manager
.table_route_manager()
.get(table_name.table_id())
.await
.context(error::TableMetadataManagerSnafu)?
.map(|route| {
find_leaders(&route.region_routes)
.into_iter()
.map(|peer| peer.id)
.collect()
})
.unwrap_or_default()
} else {
Vec::new()
})
}

#[async_trait::async_trait]
impl Selector for LoadBasedSelector {
type Context = SelectorContext;
Expand All @@ -53,28 +82,7 @@ impl Selector for LoadBasedSelector {
let table_metadata_manager =
TableMetadataManager::new(KvBackendAdapter::wrap(ctx.kv_store.clone()));

let table_name = table_metadata_manager
.table_name_manager()
.get(TableNameKey::new(catalog, schema, table))
.await
.context(error::TableMetadataManagerSnafu)?;

if let Some(table_name) = table_name {
let route = table_metadata_manager
.table_route_manager()
.get(table_name.table_id())
.await
.context(error::TableMetadataManagerSnafu)?
.with_context(|| error::TableRouteNotFoundSnafu {
table_name: format_full_table_name(catalog, schema, table),
})?;
find_leaders(&route.region_routes)
.into_iter()
.map(|peer| peer.id)
.collect()
} else {
Vec::new()
}
get_leader_peer_ids(&table_metadata_manager, catalog, schema, table).await?
} else {
Vec::new()
};
Expand Down

0 comments on commit 89d13c8

Please sign in to comment.