Skip to content

Commit

Permalink
[16/N][VirtualCluster] Add virtual cluster failover logic
Browse files Browse the repository at this point in the history
  • Loading branch information
wumuzi520 committed Dec 24, 2024
1 parent bd65b46 commit 7aa0c3a
Show file tree
Hide file tree
Showing 8 changed files with 516 additions and 66 deletions.
2 changes: 1 addition & 1 deletion src/ray/common/virtual_cluster_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class VirtualClusterID : public SimpleID<VirtualClusterID> {
return id_.find(kJobClusterIDSeperator) != std::string::npos;
}

VirtualClusterID OwnerID() const {
VirtualClusterID ParentID() const {
auto pos = id_.find(kJobClusterIDSeperator);
return pos == std::string::npos ? Nil()
: VirtualClusterID::FromBinary(id_.substr(0, pos));
Expand Down
18 changes: 17 additions & 1 deletion src/ray/gcs/gcs_server/gcs_init_data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace ray {
namespace gcs {
void GcsInitData::AsyncLoad(const EmptyCallback &on_done) {
// There are 5 kinds of table data need to be loaded.
auto count_down = std::make_shared<int>(5);
auto count_down = std::make_shared<int>(6);
auto on_load_finished = [count_down, on_done] {
if (--(*count_down) == 0) {
if (on_done) {
Expand All @@ -36,6 +36,8 @@ void GcsInitData::AsyncLoad(const EmptyCallback &on_done) {
AsyncLoadActorTaskSpecTableData(on_load_finished);

AsyncLoadPlacementGroupTableData(on_load_finished);

AsyncLoadVirtualClusterTableData(on_load_finished);
}

void GcsInitData::AsyncLoadJobTableData(const EmptyCallback &on_done) {
Expand Down Expand Up @@ -102,5 +104,19 @@ void GcsInitData::AsyncLoadActorTaskSpecTableData(const EmptyCallback &on_done)
load_actor_task_spec_table_data_callback));
}

void GcsInitData::AsyncLoadVirtualClusterTableData(const EmptyCallback &on_done) {
RAY_LOG(INFO) << "Loading virtual cluster table data.";
auto load_virtual_cluster_table_data_callback =
[this,
on_done](absl::flat_hash_map<VirtualClusterID, VirtualClusterTableData> &&result) {
virtual_cluster_table_data_ = std::move(result);
RAY_LOG(INFO) << "Finished loading virtual cluster table data, size = "
<< virtual_cluster_table_data_.size();
on_done();
};
RAY_CHECK_OK(gcs_table_storage_.VirtualClusterTable().GetAll(
load_virtual_cluster_table_data_callback));
}

} // namespace gcs
} // namespace ray
15 changes: 15 additions & 0 deletions src/ray/gcs/gcs_server/gcs_init_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ class GcsInitData {
return placement_group_table_data_;
}

/// Get virtual cluster metadata.
const absl::flat_hash_map<VirtualClusterID, rpc::VirtualClusterTableData>
&VirtualClusters() const {
return virtual_cluster_table_data_;
}

private:
/// Load job metadata from the store into memory asynchronously.
///
Expand All @@ -87,6 +93,11 @@ class GcsInitData {

void AsyncLoadActorTaskSpecTableData(const EmptyCallback &on_done);

/// Load virtual cluster metadata from the store into memory asynchronously.
///
/// \param on_done The callback when virtual cluster metadata is loaded successfully.
void AsyncLoadVirtualClusterTableData(const EmptyCallback &on_done);

protected:
/// The gcs table storage.
gcs::GcsTableStorage &gcs_table_storage_;
Expand All @@ -105,6 +116,10 @@ class GcsInitData {
absl::flat_hash_map<ActorID, rpc::ActorTableData> actor_table_data_;

absl::flat_hash_map<ActorID, rpc::TaskSpec> actor_task_spec_table_data_;

/// Virtual cluster metadata.
absl::flat_hash_map<VirtualClusterID, rpc::VirtualClusterTableData>
virtual_cluster_table_data_;
};

} // namespace gcs
Expand Down
162 changes: 133 additions & 29 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ void VirtualCluster::RemoveNodeInstances(ReplicaInstances replica_instances) {
RAY_CHECK(job_cluster_iter != template_iter->second.end());

for (auto &[id, _] : node_instances) {
RAY_CHECK(job_cluster_iter->second.erase(id) == 1);
RAY_CHECK(--(replica_set_iter->second) >= 0);
if (job_cluster_iter->second.erase(id) == 0) {
RAY_LOG(WARNING) << "The node instance " << id << " is not found in cluster "
<< GetID();
} else {
RAY_CHECK(--(replica_set_iter->second) >= 0);
}
}

if (job_cluster_iter->second.empty()) {
Expand Down Expand Up @@ -178,7 +182,6 @@ std::shared_ptr<rpc::VirtualClusterTableData> VirtualCluster::ToProto() const {
data->set_id(GetID());
data->set_mode(GetMode());
data->set_revision(GetRevision());
data->mutable_replica_sets()->insert(replica_sets_.begin(), replica_sets_.end());
for (auto &[template_id, job_node_instances] : visible_node_instances_) {
for (auto &[job_cluster_id, node_instances] : job_node_instances) {
for (auto &[id, node_instance] : node_instances) {
Expand All @@ -205,19 +208,26 @@ std::string VirtualCluster::DebugString() const {
}

///////////////////////// ExclusiveCluster /////////////////////////
Status ExclusiveCluster::CreateJobCluster(const std::string &job_name,
void ExclusiveCluster::LoadJobCluster(const rpc::VirtualClusterTableData &data) {
RAY_CHECK(GetMode() == rpc::AllocationMode::EXCLUSIVE);

const auto &job_cluster_id = data.id();
RAY_CHECK(VirtualClusterID::FromBinary(job_cluster_id).IsJobClusterID());
RAY_CHECK(job_clusters_.find(job_cluster_id) == job_clusters_.end());

DoCreateJobCluster(data.id(), toReplicaInstances(data.node_instances()));
}

Status ExclusiveCluster::CreateJobCluster(const std::string &job_cluster_id,
ReplicaSets replica_sets,
CreateOrUpdateVirtualClusterCallback callback) {
if (GetMode() != rpc::AllocationMode::EXCLUSIVE) {
std::ostringstream ostr;
ostr << "The job cluster can only be created in exclusive mode, virtual_cluster_id: "
<< GetID() << ", job_name: " << job_name;
<< GetID();
return Status::InvalidArgument(ostr.str());
}

auto job_cluster_id =
VirtualClusterID::FromBinary(GetID()).BuildJobClusterID(job_name).Binary();

auto iter = job_clusters_.find(job_cluster_id);
if (iter != job_clusters_.end()) {
std::ostringstream ostr;
Expand All @@ -236,6 +246,14 @@ Status ExclusiveCluster::CreateJobCluster(const std::string &job_name,
return Status::OutOfResource(ostr.str());
}

auto job_cluster =
DoCreateJobCluster(job_cluster_id, std::move(replica_instances_to_add));
// Flush and publish the job cluster data.
return async_data_flusher_(job_cluster->ToProto(), std::move(callback));
}

std::shared_ptr<JobCluster> ExclusiveCluster::DoCreateJobCluster(
const std::string &job_cluster_id, ReplicaInstances replica_instances_to_add) {
auto replica_instances_to_remove_from_current_cluster = replica_instances_to_add;
auto replica_instances_to_add_to_current_cluster = replica_instances_to_add;
for (auto &[template_id, job_node_instances] :
Expand All @@ -247,27 +265,22 @@ Status ExclusiveCluster::CreateJobCluster(const std::string &job_name,
UpdateNodeInstances(std::move(replica_instances_to_add_to_current_cluster),
std::move(replica_instances_to_remove_from_current_cluster));

// Create a job cluster.
auto job_cluster = std::make_shared<JobCluster>(job_cluster_id);
job_cluster->UpdateNodeInstances(std::move(replica_instances_to_add),
ReplicaInstances());
RAY_CHECK(job_clusters_.emplace(job_cluster_id, job_cluster).second);

// Flush and publish the job cluster data.
return async_data_flusher_(job_cluster->ToProto(), std::move(callback));
return job_cluster;
}

Status ExclusiveCluster::RemoveJobCluster(const std::string &job_name,
Status ExclusiveCluster::RemoveJobCluster(const std::string &job_cluster_id,
RemoveVirtualClusterCallback callback) {
if (GetMode() != rpc::AllocationMode::EXCLUSIVE) {
std::ostringstream ostr;
ostr << "The job cluster can only be removed in exclusive mode, virtual_cluster_id: "
<< GetID() << ", job_name: " << job_name;
<< GetID();
return Status::InvalidArgument(ostr.str());
}

auto job_cluster_id =
VirtualClusterID::FromBinary(GetID()).BuildJobClusterID(job_name).Binary();
auto iter = job_clusters_.find(job_cluster_id);
if (iter == job_clusters_.end()) {
return Status::NotFound("The job cluster " + job_cluster_id + " does not exist.");
Expand Down Expand Up @@ -300,9 +313,7 @@ Status ExclusiveCluster::RemoveJobCluster(const std::string &job_name,
}

std::shared_ptr<JobCluster> ExclusiveCluster::GetJobCluster(
const std::string &job_id) const {
auto job_cluster_id =
VirtualClusterID::FromBinary(GetID()).BuildJobClusterID(job_id).Binary();
const std::string &job_cluster_id) const {
auto iter = job_clusters_.find(job_cluster_id);
return iter != job_clusters_.end() ? iter->second : nullptr;
}
Expand All @@ -316,13 +327,12 @@ bool ExclusiveCluster::IsIdleNodeInstance(const std::string &job_cluster_id,
}

void ExclusiveCluster::ForeachJobCluster(
const std::function<void(const std::string &, const std::shared_ptr<JobCluster> &)>
&fn) const {
const std::function<void(const std::shared_ptr<JobCluster> &)> &fn) const {
if (fn == nullptr) {
return;
}
for (const auto &[job_cluster_id, job_cluster] : job_clusters_) {
fn(job_cluster_id, job_cluster);
for (const auto &[_, job_cluster] : job_clusters_) {
fn(job_cluster);
}
}

Expand All @@ -341,12 +351,104 @@ bool MixedCluster::InUse() const {
}

///////////////////////// PrimaryCluster /////////////////////////
void PrimaryCluster::Initialize(const GcsInitData &gcs_init_data) {
const auto &nodes = gcs_init_data.Nodes();
absl::flat_hash_map<std::string, std::string> dead_node_instances;
absl::flat_hash_map<VirtualClusterID, const rpc::VirtualClusterTableData *>
logical_clusters_data;
absl::flat_hash_map<VirtualClusterID, const rpc::VirtualClusterTableData *>
job_clusters_data;
for (const auto &[virtual_cluster_id, virtual_cluster_data] :
gcs_init_data.VirtualClusters()) {
if (virtual_cluster_id.IsJobClusterID()) {
job_clusters_data[virtual_cluster_id] = &virtual_cluster_data;
} else {
logical_clusters_data[virtual_cluster_id] = &virtual_cluster_data;
}
for (auto &[id, node_instance] : virtual_cluster_data.node_instances()) {
auto node_id = NodeID::FromHex(id);
if (!nodes.contains(node_id) ||
nodes.at(node_id).state() == rpc::GcsNodeInfo::DEAD) {
dead_node_instances.emplace(id, node_instance.template_id());
}
}
}

for (const auto &[_, node] : nodes) {
if (node.state() == rpc::GcsNodeInfo::ALIVE) {
OnNodeAdd(node);
}
}

for (const auto &[_, virtual_cluster_data] : logical_clusters_data) {
LoadLogicalCluster(*virtual_cluster_data);
}

for (auto &[job_cluster_id, virtual_cluster_data] : job_clusters_data) {
auto parent_cluster_id = job_cluster_id.ParentID().Binary();
if (parent_cluster_id == kPrimaryClusterID) {
LoadJobCluster(*virtual_cluster_data);
} else {
auto logical_cluster = std::dynamic_pointer_cast<ExclusiveCluster>(
GetLogicalCluster(parent_cluster_id));
RAY_CHECK(logical_cluster != nullptr &&
logical_cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE);
logical_cluster->LoadJobCluster(*virtual_cluster_data);
}
}

for (const auto &[id, node_type_name] : dead_node_instances) {
OnNodeInstanceDead(id, node_type_name);
}
}

std::shared_ptr<VirtualCluster> PrimaryCluster::GetLogicalCluster(
const std::string &logical_cluster_id) const {
auto iter = logical_clusters_.find(logical_cluster_id);
return iter != logical_clusters_.end() ? iter->second : nullptr;
}

void PrimaryCluster::ForeachVirtualCluster(
const std::function<void(const std::shared_ptr<VirtualCluster> &)> &fn) const {
if (fn == nullptr) {
return;
}
for (const auto &[_, logical_cluster] : logical_clusters_) {
fn(logical_cluster);
if (logical_cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE) {
auto exclusive_cluster =
std::dynamic_pointer_cast<ExclusiveCluster>(logical_cluster);
exclusive_cluster->ForeachJobCluster(fn);
}
}
ForeachJobCluster(fn);
}

std::shared_ptr<VirtualCluster> PrimaryCluster::LoadLogicalCluster(
const rpc::VirtualClusterTableData &data) {
const auto &logical_cluster_id = data.id();
std::shared_ptr<VirtualCluster> logical_cluster;
if (data.mode() == rpc::AllocationMode::EXCLUSIVE) {
logical_cluster =
std::make_shared<ExclusiveCluster>(logical_cluster_id, async_data_flusher_);
} else {
logical_cluster = std::make_shared<MixedCluster>(logical_cluster_id);
}
RAY_CHECK(logical_clusters_.emplace(logical_cluster_id, logical_cluster).second);

auto replica_instances_to_add_to_logical_cluster =
toReplicaInstances(data.node_instances());
auto replica_instances_to_remove_from_primary_cluster =
replica_instances_to_add_to_logical_cluster;
UpdateNodeInstances(ReplicaInstances(),
std::move(replica_instances_to_remove_from_primary_cluster));

// Update the virtual cluster replica sets and node instances.
logical_cluster->UpdateNodeInstances(
std::move(replica_instances_to_add_to_logical_cluster), ReplicaInstances());
return logical_cluster;
}

Status PrimaryCluster::CreateOrUpdateVirtualCluster(
rpc::CreateOrUpdateVirtualClusterRequest request,
CreateOrUpdateVirtualClusterCallback callback) {
Expand Down Expand Up @@ -438,16 +540,20 @@ void PrimaryCluster::OnNodeAdd(const rpc::GcsNodeInfo &node) {
}

void PrimaryCluster::OnNodeDead(const rpc::GcsNodeInfo &node) {
const auto &template_id = node.node_type_name();
const auto &node_type_name = node.node_type_name();
auto node_instance_id = NodeID::FromBinary(node.node_id()).Hex();
OnNodeInstanceDead(node_instance_id, node_type_name);
}

void PrimaryCluster::OnNodeInstanceDead(const std::string &node_instance_id,
const std::string &node_type_name) {
// TODO(Shanly): Build an index from node instance id to cluster id.
if (MarkNodeInstanceAsDead(template_id, node_instance_id)) {
if (MarkNodeInstanceAsDead(node_type_name, node_instance_id)) {
return;
}

for (const auto &[_, logical_cluster] : logical_clusters_) {
if (logical_cluster->MarkNodeInstanceAsDead(template_id, node_instance_id)) {
if (logical_cluster->MarkNodeInstanceAsDead(node_type_name, node_instance_id)) {
return;
}
}
Expand Down Expand Up @@ -525,9 +631,7 @@ void PrimaryCluster::GetVirtualClustersData(rpc::GetVirtualClustersRequest reque
if (include_job_clusters && cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE) {
auto exclusive_cluster = dynamic_cast<const ExclusiveCluster *>(cluster);
exclusive_cluster->ForeachJobCluster(
[&](const std::string &_, const auto &job_cluster) {
callback(job_cluster->ToProto());
});
[&](const auto &job_cluster) { callback(job_cluster->ToProto()); });
}
if (only_include_mixed_cluster &&
cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE) {
Expand Down
Loading

0 comments on commit 7aa0c3a

Please sign in to comment.