Skip to content

Commit

Permalink
feat(meta): add a rpc to fetch meta store endpoint (#19594)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Nov 27, 2024
1 parent ac5cb40 commit 200ed05
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 12 deletions.
7 changes: 7 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -413,13 +413,20 @@ message GetClusterRecoveryStatusResponse {
RecoveryStatus status = 1;
}

message GetMetaStoreInfoRequest {}

message GetMetaStoreInfoResponse {
string meta_store_endpoint = 1;
}

service ClusterService {
rpc AddWorkerNode(AddWorkerNodeRequest) returns (AddWorkerNodeResponse);
rpc ActivateWorkerNode(ActivateWorkerNodeRequest) returns (ActivateWorkerNodeResponse);
rpc DeleteWorkerNode(DeleteWorkerNodeRequest) returns (DeleteWorkerNodeResponse);
rpc UpdateWorkerNodeSchedulability(UpdateWorkerNodeSchedulabilityRequest) returns (UpdateWorkerNodeSchedulabilityResponse);
rpc ListAllNodes(ListAllNodesRequest) returns (ListAllNodesResponse);
rpc GetClusterRecoveryStatus(GetClusterRecoveryStatusRequest) returns (GetClusterRecoveryStatusResponse);
rpc GetMetaStoreInfo(GetMetaStoreInfoRequest) returns (GetMetaStoreInfoResponse);
}

enum SubscribeType {
Expand Down
6 changes: 3 additions & 3 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub async fn rpc_serve(
));
let conn = sea_orm::Database::connect(IN_MEMORY_STORE).await?;
rpc_serve_with_store(
SqlMetaStore::new(conn),
SqlMetaStore::new(conn, IN_MEMORY_STORE.to_string()),
dummy_election_client,
address_info,
max_cluster_heartbeat_interval,
Expand All @@ -149,7 +149,7 @@ pub async fn rpc_serve(
}
MetaStoreBackend::Sql { endpoint, config } => {
let is_sqlite = DbBackend::Sqlite.is_prefix_of(&endpoint);
let mut options = sea_orm::ConnectOptions::new(endpoint);
let mut options = sea_orm::ConnectOptions::new(endpoint.clone());
options
.max_connections(config.max_connections)
.min_connections(config.min_connections)
Expand All @@ -164,7 +164,7 @@ pub async fn rpc_serve(
}

let conn = sea_orm::Database::connect(options).await?;
let meta_store_sql = SqlMetaStore::new(conn);
let meta_store_sql = SqlMetaStore::new(conn, endpoint);

// Init election client.
let id = address_info.advertise_addr.clone();
Expand Down
18 changes: 15 additions & 3 deletions src/meta/service/src/cluster_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use risingwave_pb::meta::cluster_service_server::ClusterService;
use risingwave_pb::meta::{
ActivateWorkerNodeRequest, ActivateWorkerNodeResponse, AddWorkerNodeRequest,
AddWorkerNodeResponse, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse,
GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse, ListAllNodesRequest,
ListAllNodesResponse, UpdateWorkerNodeSchedulabilityRequest,
UpdateWorkerNodeSchedulabilityResponse,
GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse, GetMetaStoreInfoRequest,
GetMetaStoreInfoResponse, ListAllNodesRequest, ListAllNodesResponse,
UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse,
};
use tonic::{Request, Response, Status};

Expand Down Expand Up @@ -167,4 +167,16 @@ impl ClusterService for ClusterServiceImpl {
status: self.barrier_manager.get_recovery_status() as _,
}))
}

async fn get_meta_store_info(
&self,
_request: Request<GetMetaStoreInfoRequest>,
) -> Result<Response<GetMetaStoreInfoResponse>, Status> {
Ok(Response::new(GetMetaStoreInfoResponse {
meta_store_endpoint: self
.metadata_manager
.cluster_controller
.meta_store_endpoint(),
}))
}
}
6 changes: 3 additions & 3 deletions src/meta/src/backup_restore/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult<SqlMetaStore> {
match meta_store_backend {
MetaStoreBackend::Mem => {
let conn = sea_orm::Database::connect(IN_MEMORY_STORE).await.unwrap();
Ok(SqlMetaStore::new(conn))
Ok(SqlMetaStore::new(conn, IN_MEMORY_STORE.to_string()))
}
MetaStoreBackend::Sql { endpoint, config } => {
let max_connection = if DbBackend::Sqlite.is_prefix_of(&endpoint) {
Expand All @@ -66,7 +66,7 @@ pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult<SqlMetaStore> {
} else {
config.max_connections
};
let mut options = sea_orm::ConnectOptions::new(endpoint);
let mut options = sea_orm::ConnectOptions::new(endpoint.clone());
options
.max_connections(max_connection)
.min_connections(config.min_connections)
Expand All @@ -76,7 +76,7 @@ pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult<SqlMetaStore> {
let conn = sea_orm::Database::connect(options)
.await
.map_err(|e| BackupError::MetaStorage(e.into()))?;
Ok(SqlMetaStore::new(conn))
Ok(SqlMetaStore::new(conn, endpoint))
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/controller/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,10 @@ impl ClusterController {
pub fn cluster_id(&self) -> &ClusterId {
self.env.cluster_id()
}

pub fn meta_store_endpoint(&self) -> String {
self.env.meta_store_ref().endpoint.clone()
}
}

/// The cluster info used for scheduling a streaming job.
Expand Down
10 changes: 7 additions & 3 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,24 @@ impl From<sea_orm::DbErr> for MetaError {
#[derive(Clone)]
pub struct SqlMetaStore {
pub conn: DatabaseConnection,
pub endpoint: String,
}

pub const IN_MEMORY_STORE: &str = "sqlite::memory:";

impl SqlMetaStore {
pub fn new(conn: DatabaseConnection) -> Self {
Self { conn }
pub fn new(conn: DatabaseConnection, endpoint: String) -> Self {
Self { conn, endpoint }
}

#[cfg(any(test, feature = "test"))]
pub async fn for_test() -> Self {
let conn = sea_orm::Database::connect(IN_MEMORY_STORE).await.unwrap();
Migrator::up(&conn, None).await.unwrap();
Self { conn }
Self {
conn,
endpoint: IN_MEMORY_STORE.to_string(),
}
}

/// Check whether the cluster, which uses SQL as the backend, is a new cluster.
Expand Down
7 changes: 7 additions & 0 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,12 @@ impl MetaClient {
Ok(resp.params.unwrap().into())
}

pub async fn get_meta_store_endpoint(&self) -> Result<String> {
let req = GetMetaStoreInfoRequest {};
let resp = self.inner.get_meta_store_info(req).await?;
Ok(resp.meta_store_endpoint)
}

pub async fn set_system_param(
&self,
param: String,
Expand Down Expand Up @@ -2066,6 +2072,7 @@ macro_rules! for_all_meta_rpc {
,{ cluster_client, update_worker_node_schedulability, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse }
,{ cluster_client, list_all_nodes, ListAllNodesRequest, ListAllNodesResponse }
,{ cluster_client, get_cluster_recovery_status, GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse }
,{ cluster_client, get_meta_store_info, GetMetaStoreInfoRequest, GetMetaStoreInfoResponse }
,{ heartbeat_client, heartbeat, HeartbeatRequest, HeartbeatResponse }
,{ stream_client, flush, FlushRequest, FlushResponse }
,{ stream_client, pause, PauseRequest, PauseResponse }
Expand Down

0 comments on commit 200ed05

Please sign in to comment.