Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce node label #19153

Merged
merged 12 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.risingwave.proto.Catalog.Table;
import com.risingwave.proto.ClusterServiceGrpc.ClusterServiceBlockingStub;
import com.risingwave.proto.Common.HostAddress;
import com.risingwave.proto.Common.WorkerNode.Property;
import com.risingwave.proto.Common.WorkerType;
import com.risingwave.proto.DdlServiceGrpc.DdlServiceBlockingStub;
import com.risingwave.proto.DdlServiceOuterClass.GetTableRequest;
Expand All @@ -29,7 +30,6 @@
import com.risingwave.proto.Hummock.UnpinVersionBeforeRequest;
import com.risingwave.proto.HummockManagerServiceGrpc.HummockManagerServiceBlockingStub;
import com.risingwave.proto.Meta.AddWorkerNodeRequest;
import com.risingwave.proto.Meta.AddWorkerNodeRequest.Property;
import com.risingwave.proto.Meta.AddWorkerNodeResponse;
import com.risingwave.proto.Meta.HeartbeatRequest;
import io.grpc.Grpc;
Expand Down Expand Up @@ -100,7 +100,6 @@ public MetaClient(String metaAddr, ScheduledExecutorService scheduler) {
Property.newBuilder()
.setIsStreaming(false)
.setIsServing(false)
.setWorkerNodeParallelism(0)
.build())
.build();
AddWorkerNodeResponse resp = clusterStub.addWorkerNode(req);
Expand Down
13 changes: 8 additions & 5 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
uint32 worker_node_id = 2;
}

message WorkerNode {

Check failure on line 46 in proto/common.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "11" with name "node_label" on message "WorkerNode" was deleted without reserving the name "node_label".

Check failure on line 46 in proto/common.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "11" with name "node_label" on message "WorkerNode" was deleted without reserving the number "11".
enum State {
UNSPECIFIED = 0;
STARTING = 1;
Expand All @@ -56,6 +56,11 @@
bool is_unschedulable = 3;
// This is used for frontend node to register its rpc address
string internal_rpc_host_addr = 4;
// Meta may assign labels to worker nodes to partition workload by label.
// This is used for serverless backfilling of materialized views.
optional string node_label = 5;

uint32 parallelism = 6;
}
message Resource {
string rw_version = 1;
Expand Down Expand Up @@ -83,11 +88,9 @@
// It's not persistent in meta store.
optional uint64 started_at = 9;

uint32 parallelism = 10;

// Meta may assign labels to worker nodes to partition workload by label.
// This is used for serverless backfilling of materialized views.
string node_label = 11;
// Moved to `Property` message.
reserved 10;
reserved "parallelism";
}

message Buffer {
Expand Down
12 changes: 2 additions & 10 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -342,21 +342,13 @@
}

// Below for cluster service.

message AddWorkerNodeRequest {

Check failure on line 345 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "4" with name "property" on message "AddWorkerNodeRequest" was deleted without reserving the name "property".
message Property {
uint64 worker_node_parallelism = 1;
bool is_streaming = 2;
bool is_serving = 3;
bool is_unschedulable = 4;
// This is used for frontend node to register its rpc address
string internal_rpc_host_addr = 5;
}
common.WorkerType worker_type = 1;
common.HostAddress host = 2;
reserved 3;
Property property = 4;
reserved 4;
common.WorkerNode.Resource resource = 5;
common.WorkerNode.Property property = 6;
}

message AddWorkerNodeResponse {
Expand Down
6 changes: 2 additions & 4 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,11 @@ mod tests {
r#type: WorkerType::ComputeNode as i32,
host: Some(HostAddr::try_from("127.0.0.1:1234").unwrap().to_protobuf()),
state: worker_node::State::Running as i32,
parallelism: 0,
property: Some(Property {
is_unschedulable: false,
is_serving: true,
is_streaming: true,
internal_rpc_host_addr: "".to_string(),
..Default::default()
}),
transactional_id: Some(1),
..Default::default()
Expand All @@ -427,12 +426,11 @@ mod tests {
r#type: WorkerType::ComputeNode as i32,
host: Some(HostAddr::try_from("127.0.0.1:1235").unwrap().to_protobuf()),
state: worker_node::State::Running as i32,
parallelism: 0,
property: Some(Property {
is_unschedulable: false,
is_serving: true,
is_streaming: false,
internal_rpc_host_addr: "".to_string(),
..Default::default()
}),
transactional_id: Some(2),
..Default::default()
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/util/worker_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@
// limitations under the License.

pub type WorkerNodeId = u32;

pub const DEFAULT_COMPUTE_NODE_LABEL: &str = "default";
22 changes: 14 additions & 8 deletions src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ mod tests {

use risingwave_common::hash::WorkerSlotMapping;
use risingwave_pb::common::worker_node::Property;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::common::{WorkerNode, WorkerType};

use crate::hash::VirtualNode;

Expand All @@ -232,7 +232,7 @@ mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: false,
internal_rpc_host_addr: "".to_string(),
..Default::default()
};

let count_same_vnode_mapping = |wm1: &WorkerSlotMapping, wm2: &WorkerSlotMapping| {
Expand All @@ -248,10 +248,12 @@ mod tests {
count
};

let mut property = serving_property.clone();
property.parallelism = 1;
let worker_1 = WorkerNode {
id: 1,
parallelism: 1,
property: Some(serving_property.clone()),
r#type: WorkerType::ComputeNode.into(),
property: Some(property),
..Default::default()
};

Expand All @@ -263,10 +265,12 @@ mod tests {
let re_worker_mapping_2 = place_vnode(None, &[worker_1.clone()], None).unwrap();
assert_eq!(re_worker_mapping_2.iter_unique().count(), 1);

let mut property = serving_property.clone();
property.parallelism = 50;
let worker_2 = WorkerNode {
id: 2,
parallelism: 50,
property: Some(serving_property.clone()),
property: Some(property),
r#type: WorkerType::ComputeNode.into(),
..Default::default()
};

Expand All @@ -282,10 +286,12 @@ mod tests {
let score = count_same_vnode_mapping(&re_worker_mapping_2, &re_worker_mapping);
assert!(score >= 5);

let mut property = serving_property.clone();
property.parallelism = 60;
let worker_3 = WorkerNode {
id: 3,
parallelism: 60,
property: Some(serving_property.clone()),
r#type: WorkerType::ComputeNode.into(),
property: Some(property),
..Default::default()
};
let re_pu_mapping_2 = place_vnode(
Expand Down
9 changes: 9 additions & 0 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_common::util::resource_util::cpu::total_cpu_available;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use risingwave_common::util::tokio_util::sync::CancellationToken;
use risingwave_common::util::worker_util::DEFAULT_COMPUTE_NODE_LABEL;
use serde::{Deserialize, Serialize};

/// If `total_memory_bytes` is not specified, the default memory limit will be set to
Expand Down Expand Up @@ -104,6 +105,10 @@ pub struct ComputeNodeOpts {
#[override_opts(if_absent, path = streaming.actor_runtime_worker_threads_num)]
pub parallelism: usize,

/// The parallelism that the compute node will register to the scheduler of the meta service.
#[clap(long, env = "RW_NODE_LABEL", default_value_t = default_node_label())]
pub node_label: String,

/// Decides whether the compute node can be used for streaming and serving.
#[clap(long, env = "RW_COMPUTE_NODE_ROLE", value_enum, default_value_t = default_role())]
pub role: Role,
Expand Down Expand Up @@ -249,6 +254,10 @@ pub fn default_parallelism() -> usize {
total_cpu_available().ceil() as usize
}

pub fn default_node_label() -> String {
DEFAULT_COMPUTE_NODE_LABEL.to_string()
}

pub fn default_role() -> Role {
Role::Both
}
5 changes: 3 additions & 2 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ use risingwave_common_heap_profiling::HeapProfiler;
use risingwave_common_service::{MetricsManager, ObserverManager, TracingExtractLayer};
use risingwave_connector::source::monitor::GLOBAL_SOURCE_METRICS;
use risingwave_dml::dml_manager::DmlManager;
use risingwave_pb::common::worker_node::Property;
use risingwave_pb::common::WorkerType;
use risingwave_pb::compute::config_service_server::ConfigServiceServer;
use risingwave_pb::health::health_server::HealthServer;
use risingwave_pb::meta::add_worker_node_request::Property;
use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
use risingwave_pb::stream_service::stream_service_server::StreamServiceServer;
use risingwave_pb::task_service::exchange_service_server::ExchangeServiceServer;
Expand Down Expand Up @@ -124,11 +124,12 @@ pub async fn compute_node_serve(
WorkerType::ComputeNode,
&advertise_addr,
Property {
worker_node_parallelism: opts.parallelism as u64,
parallelism: opts.parallelism as u32,
is_streaming: opts.role.for_streaming(),
is_serving: opts.role.for_serving(),
is_unschedulable: false,
internal_rpc_host_addr: "".to_string(),
node_label: Some(opts.node_label.clone()),
},
&config.meta,
)
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/common/meta_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::env;
use anyhow::{bail, Result};
use risingwave_common::config::MetaConfig;
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::common::worker_node::Property;
use risingwave_pb::common::WorkerType;
use risingwave_pb::meta::add_worker_node_request::Property;
use risingwave_rpc_client::MetaClient;

pub struct MetaServiceOpts {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct RwWorkerNode {
system_total_memory_bytes: Option<i64>,
system_total_cpu_cores: Option<i64>,
started_at: Option<Timestamptz>,
label: Option<String>,
}

#[system_catalog(table, "rw_catalog.rw_worker_nodes")]
Expand All @@ -58,7 +59,11 @@ async fn read_rw_worker_nodes_info(reader: &SysCatalogReaderImpl) -> Result<Vec<
port: host.map(|h| h.port.to_string()),
r#type: worker.get_type().unwrap().as_str_name().into(),
state: worker.get_state().unwrap().as_str_name().into(),
parallelism: worker.parallelism() as i32,
parallelism: if is_compute {
worker.parallelism() as i32
} else {
0
},
is_streaming: if is_compute {
property.map(|p| p.is_streaming)
} else {
Expand All @@ -81,6 +86,11 @@ async fn read_rw_worker_nodes_info(reader: &SysCatalogReaderImpl) -> Result<Vec<
started_at: worker
.started_at
.map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
label: if is_compute {
property.and_then(|p| p.node_label.clone())
} else {
None
},
}
})
.collect())
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ pub async fn handle_show_object(
addr: addr.to_string(),
r#type: worker.get_type().unwrap().as_str_name().into(),
state: worker.get_state().unwrap().as_str_name().to_string(),
parallelism: worker.get_parallelism() as _,
parallelism: worker.parallelism() as _,
is_streaming: property.map(|p| p.is_streaming),
is_serving: property.map(|p| p.is_serving),
is_unschedulable: property.map(|p| p.is_unschedulable),
Expand Down
12 changes: 6 additions & 6 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,12 +669,12 @@ pub(crate) mod tests {
port: 5687,
}),
state: risingwave_pb::common::worker_node::State::Running as i32,
parallelism: 8,
property: Some(Property {
parallelism: 8,
is_unschedulable: false,
is_serving: true,
is_streaming: true,
internal_rpc_host_addr: "".to_string(),
..Default::default()
}),
transactional_id: Some(0),
..Default::default()
Expand All @@ -687,12 +687,12 @@ pub(crate) mod tests {
port: 5688,
}),
state: risingwave_pb::common::worker_node::State::Running as i32,
parallelism: 8,
property: Some(Property {
parallelism: 8,
is_unschedulable: false,
is_serving: true,
is_streaming: true,
internal_rpc_host_addr: "".to_string(),
..Default::default()
}),
transactional_id: Some(1),
..Default::default()
Expand All @@ -705,12 +705,12 @@ pub(crate) mod tests {
port: 5689,
}),
state: risingwave_pb::common::worker_node::State::Running as i32,
parallelism: 8,
property: Some(Property {
parallelism: 8,
is_unschedulable: false,
is_serving: true,
is_streaming: true,
internal_rpc_host_addr: "".to_string(),
..Default::default()
}),
transactional_id: Some(2),
..Default::default()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ use risingwave_common::{GIT_SHA, RW_VERSION};
use risingwave_common_heap_profiling::HeapProfiler;
use risingwave_common_service::{MetricsManager, ObserverManager};
use risingwave_connector::source::monitor::{SourceMetrics, GLOBAL_SOURCE_METRICS};
use risingwave_pb::common::worker_node::Property as AddWorkerNodeProperty;
use risingwave_pb::common::WorkerType;
use risingwave_pb::frontend_service::frontend_service_server::FrontendServiceServer;
use risingwave_pb::health::health_server::HealthServer;
use risingwave_pb::meta::add_worker_node_request::Property as AddWorkerNodeProperty;
use risingwave_pb::user::auth_info::EncryptionType;
use risingwave_pb::user::grant_privilege::Object;
use risingwave_rpc_client::{ComputeClientPool, ComputeClientPoolRef, MetaClient};
Expand Down
2 changes: 2 additions & 0 deletions src/meta/model/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod m20240806_143329_add_rate_limit_to_source_catalog;
mod m20240820_081248_add_time_travel_per_table_epoch;
mod m20240911_083152_variable_vnode_count;
mod m20241016_065621_hummock_gc_history;
mod m20241022_072553_node_label;
mod m20241025_062548_singleton_vnode_count;
mod m20241115_085007_remove_function_type;
mod m20241120_182555_hummock_add_time_travel_sst_index;
Expand Down Expand Up @@ -90,6 +91,7 @@ impl MigratorTrait for Migrator {
Box::new(m20241025_062548_singleton_vnode_count::Migration),
Box::new(m20241115_085007_remove_function_type::Migration),
Box::new(m20241120_182555_hummock_add_time_travel_sst_index::Migration),
Box::new(m20241022_072553_node_label::Migration),
]
}
}
35 changes: 35 additions & 0 deletions src/meta/model/migration/src/m20241022_072553_node_label.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(WorkerProperty::Table)
.add_column(ColumnDef::new(WorkerProperty::Label).string())
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(WorkerProperty::Table)
.drop_column(WorkerProperty::Label)
.to_owned(),
)
.await
}
}

#[derive(DeriveIden)]
enum WorkerProperty {
Table,
Label,
}
1 change: 1 addition & 0 deletions src/meta/model/src/worker_property.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct Model {
pub is_serving: bool,
pub is_unschedulable: bool,
pub internal_rpc_host_addr: Option<String>,
pub label: Option<String>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
Loading
Loading