Skip to content

Commit

Permalink
[13/N][VirtualCluster] Add VirtualClusterManager to raylet
Browse files Browse the repository at this point in the history
  • Loading branch information
wumuzi520 committed Dec 20, 2024
1 parent 4fa1cab commit da1aa10
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 2 deletions.
6 changes: 4 additions & 2 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ NodeManager::NodeManager(

mutable_object_provider_ = std::make_unique<core::experimental::MutableObjectProvider>(
*store_client_, absl::bind_front(&NodeManager::CreateRayletClient, this));

virtual_cluster_manager_ = std::make_shared<VirtualClusterManager>();
}

std::shared_ptr<raylet::RayletClient> NodeManager::CreateRayletClient(
Expand Down Expand Up @@ -505,8 +507,8 @@ ray::Status NodeManager::RegisterGcs() {
// Subscribe to all virtual clusrter update notification.
const auto virtual_cluster_update_notification_handler =
[this](const VirtualClusterID &virtual_cluster_id,
const rpc::VirtualClusterTableData &virtual_cluster_data) {
// TODO(Shanly): To be implemented.
rpc::VirtualClusterTableData &&virtual_cluster_data) {
virtual_cluster_manager_->UpdateVirtualCluster(std::move(virtual_cluster_data));
};
RAY_RETURN_NOT_OK(gcs_client_->VirtualCluster().AsyncSubscribeAll(
virtual_cluster_update_notification_handler, [](const ray::Status &status) {
Expand Down
4 changes: 4 additions & 0 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/bundle_spec.h"
#include "ray/raylet/placement_group_resource_manager.h"
#include "ray/raylet/virtual_cluster_manager.h"
#include "ray/raylet/worker_killing_policy.h"
#include "ray/core_worker/experimental_mutable_object_provider.h"
// clang-format on
Expand Down Expand Up @@ -894,6 +895,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
std::unique_ptr<MemoryMonitor> memory_monitor_;

std::unique_ptr<core::experimental::MutableObjectProvider> mutable_object_provider_;

/// The virtual cluster manager.
std::shared_ptr<VirtualClusterManager> virtual_cluster_manager_;
};

} // namespace raylet
Expand Down
49 changes: 49 additions & 0 deletions src/ray/raylet/virtual_cluster_manager.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License

#include "ray/raylet/virtual_cluster_manager.h"

namespace ray {

namespace raylet {

//////////////////////// VirtualClusterManager ////////////////////////
void VirtualClusterManager::UpdateVirtualCluster(
rpc::VirtualClusterTableData virtual_cluster_data) {
RAY_LOG(INFO) << "Virtual cluster updated: " << virtual_cluster_data.id();

const auto &virtual_cluster_id = virtual_cluster_data.id();
auto it = virtual_clusters_.find(virtual_cluster_id);
if (it == virtual_clusters_.end()) {
virtual_clusters_[virtual_cluster_id] = std::move(virtual_cluster_data);
} else {
it->second = std::move(virtual_cluster_data);
}
}

bool VirtualClusterManager::ContainsNodeInstance(const std::string &virtual_cluster_id,
const NodeID &node_id) const {
auto it = virtual_clusters_.find(virtual_cluster_id);
if (it == virtual_clusters_.end()) {
return false;
}
const auto &virtual_cluster_data = it->second;
RAY_CHECK(virtual_cluster_data.mode() == rpc::AllocationMode::MIXED);

const auto &node_instances = virtual_cluster_data.node_instances();
return node_instances.find(node_id.Binary()) != node_instances.end();
}

} // namespace raylet
} // namespace ray
38 changes: 38 additions & 0 deletions src/ray/raylet/virtual_cluster_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "ray/common/id.h"
#include "src/ray/protobuf/gcs_service.pb.h"

namespace ray {

namespace raylet {

class VirtualClusterManager {
public:
VirtualClusterManager() = default;

void UpdateVirtualCluster(rpc::VirtualClusterTableData virtual_cluster_data);

bool ContainsNodeInstance(const std::string &virtual_cluster_id,
const NodeID &node_id) const;

private:
absl::flat_hash_map<std::string, rpc::VirtualClusterTableData> virtual_clusters_;
};

} // namespace raylet
} // end namespace ray

0 comments on commit da1aa10

Please sign in to comment.