Skip to content

Commit

Permalink
[14/N][VirtualCluster] Raylet only subscribe mixed cluster data (#431)
Browse files Browse the repository at this point in the history
Signed-off-by: 黑驰 <[email protected]>
wumuzi520 authored Dec 23, 2024

Verified

This commit was signed with the committer’s verified signature.
BerndKue Bernd
1 parent 105f018 commit bd65b46
Showing 7 changed files with 37 additions and 4 deletions.
4 changes: 3 additions & 1 deletion src/mock/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
@@ -365,7 +365,9 @@ class MockVirtualClusterInfoAccessor : public VirtualClusterInfoAccessor {
(override));
MOCK_METHOD(Status,
AsyncGetAll,
(const MultiItemCallback<rpc::VirtualClusterTableData> &callback),
(bool include_job_clusters,
bool only_include_mixed_clusters,
(const MultiItemCallback<rpc::VirtualClusterTableData> &callback)),
(override));
MOCK_METHOD(Status,
AsyncSubscribeAll,
8 changes: 7 additions & 1 deletion src/ray/gcs/gcs_client/accessor.ant.cc
Original file line number Diff line number Diff line change
@@ -47,10 +47,13 @@ Status VirtualClusterInfoAccessor::AsyncGet(
}

Status VirtualClusterInfoAccessor::AsyncGetAll(
bool include_job_clusters,
bool only_include_mixed_clusters,
const MultiItemCallback<rpc::VirtualClusterTableData> &callback) {
RAY_LOG(DEBUG) << "Getting all virtual cluster info.";
rpc::GetVirtualClustersRequest request;
request.set_include_job_clusters(true);
request.set_only_include_mixed_clusters(true);
client_impl_->GetGcsRpcClient().GetVirtualClusters(
request, [callback](const Status &status, rpc::GetVirtualClustersReply &&reply) {
callback(
@@ -79,7 +82,10 @@ Status VirtualClusterInfoAccessor::AsyncSubscribeAll(
done(status);
}
};
RAY_CHECK_OK(AsyncGetAll(callback));
RAY_CHECK_OK(AsyncGetAll(
/*include_job_clusters=*/true,
/*only_include_mixed_clusters=*/true,
callback));
};
subscribe_operation_ = [this, subscribe](const StatusCallback &done) {
return client_impl_->GetGcsSubscriber().SubscribeAllVirtualClusters(subscribe, done);
2 changes: 2 additions & 0 deletions src/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
@@ -1016,6 +1016,8 @@ class VirtualClusterInfoAccessor {
/// \param callback Callback that will be called after lookup finished.
/// \return Status
virtual Status AsyncGetAll(
bool include_job_clusters,
bool only_include_mixed_clusters,
const MultiItemCallback<rpc::VirtualClusterTableData> &callback);

/// Subscribe to virtual cluster updates.
5 changes: 5 additions & 0 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster.cc
Original file line number Diff line number Diff line change
@@ -519,6 +519,7 @@ void PrimaryCluster::GetVirtualClustersData(rpc::GetVirtualClustersRequest reque
std::vector<std::shared_ptr<rpc::VirtualClusterTableData>> virtual_cluster_data_list;
auto virtual_cluster_id = request.virtual_cluster_id();
bool include_job_clusters = request.include_job_clusters();
bool only_include_mixed_cluster = request.only_include_mixed_clusters();

auto visit_proto_data = [&](const VirtualCluster *cluster) {
if (include_job_clusters && cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE) {
@@ -528,6 +529,10 @@ void PrimaryCluster::GetVirtualClustersData(rpc::GetVirtualClustersRequest reque
callback(job_cluster->ToProto());
});
}
if (only_include_mixed_cluster &&
cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE) {
return;
}
if (cluster->GetID() != kPrimaryClusterID) {
// Skip the primary cluster's proto data.
callback(cluster->ToProto());
6 changes: 6 additions & 0 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc
Original file line number Diff line number Diff line change
@@ -216,6 +216,12 @@ Status GcsVirtualClusterManager::FlushAndPublish(
auto on_done = [this, data, callback = std::move(callback)](const Status &status) {
// The backend storage is supposed to be reliable, so the status must be ok.
RAY_CHECK_OK(status);
if (data->mode() != rpc::AllocationMode::MIXED) {
// Tasks can only be scheduled on the nodes in the mixed cluster, so we just need to
// publish the mixed cluster data.
return;
}

RAY_CHECK_OK(gcs_publisher_.PublishVirtualCluster(
VirtualClusterID::FromBinary(data->id()), *data, nullptr));
if (callback) {
14 changes: 12 additions & 2 deletions src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc
Original file line number Diff line number Diff line change
@@ -58,7 +58,7 @@ class VirtualClusterTest : public ::testing::Test {
for (size_t i = 0; i < node_count; ++i) {
auto node = Mocker::GenNodeInfo();
auto template_id = std::to_string(i % template_count);
node->set_template_id(template_id);
node->set_node_type_name(template_id);
primary_cluster->OnNodeAdd(*node);
if (template_id_to_nodes != nullptr) {
(*template_id_to_nodes)[template_id].emplace(NodeID::FromBinary(node->node_id()),
@@ -628,7 +628,6 @@ TEST_F(PrimaryClusterTest, GetVirtualClusters) {
virtual_clusters_data_map;
primary_cluster->GetVirtualClustersData(
request, [this, &virtual_clusters_data_map](auto data) {
RAY_LOG(INFO) << "xxx: " << data->id();
virtual_clusters_data_map.emplace(data->id(), data);
});
ASSERT_EQ(virtual_clusters_data_map.size(), 1);
@@ -646,6 +645,17 @@ TEST_F(PrimaryClusterTest, GetVirtualClusters) {
auto job_cluster = virtual_cluster_0->GetJobCluster("job_1");
ASSERT_TRUE(job_cluster != nullptr);
ASSERT_TRUE(virtual_clusters_data_map.contains(job_cluster->GetID()));

virtual_clusters_data_map.clear();
request.set_include_job_clusters(true);
request.set_only_include_mixed_clusters(true);
primary_cluster->GetVirtualClustersData(
request, [this, &virtual_clusters_data_map](auto data) {
virtual_clusters_data_map.emplace(data->id(), data);
});
ASSERT_EQ(virtual_clusters_data_map.size(), 1);
ASSERT_FALSE(virtual_clusters_data_map.contains(virtual_cluster_id_0));
ASSERT_TRUE(virtual_clusters_data_map.contains(job_cluster->GetID()));
}
}

2 changes: 2 additions & 0 deletions src/ray/protobuf/gcs_service.proto
Original file line number Diff line number Diff line change
@@ -852,6 +852,8 @@ message GetVirtualClustersRequest {
string virtual_cluster_id = 1;
// Wether include job clusters.
bool include_job_clusters = 2;
// It will reply mixed clusters if only_include_mixed_clusters is true.
bool only_include_mixed_clusters = 3;
}

message GetVirtualClustersReply {

0 comments on commit bd65b46

Please sign in to comment.