Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[18/N][VirtualCluster] Handle job finished event in gcs virtual cluster manager #436

Merged
merged 2 commits into from
Dec 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
@@ -924,8 +924,13 @@ void CoreWorker::ConnectToRayletInternal() {
// NOTE: This also marks the worker as available in Raylet. We do this at the
// very end in case there is a problem during construction.
if (options_.worker_type == WorkerType::DRIVER) {
// Get virtual cluster id from worker env to put it into job table data
std::string virtual_cluster_id = std::getenv(kEnvVarKeyVirtualClusterID)
? std::getenv(kEnvVarKeyVirtualClusterID)
: "";

Status status = local_raylet_client_->AnnounceWorkerPortForDriver(
core_worker_server_->GetPort(), options_.entrypoint);
core_worker_server_->GetPort(), options_.entrypoint, virtual_cluster_id);
RAY_CHECK(status.ok()) << "Failed to announce driver's port to raylet and GCS: "
<< status;
} else {
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
@@ -774,6 +774,7 @@ void GcsServer::InstallEventListeners() {
const auto job_id = JobID::FromBinary(job_data.job_id());
gcs_task_manager_->OnJobFinished(job_id, job_data.end_time());
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(job_id);
gcs_virtual_cluster_manager_->OnJobFinished(job_data);
});

// Install scheduling event listeners.
53 changes: 51 additions & 2 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc
Original file line number Diff line number Diff line change
@@ -31,6 +31,56 @@ void GcsVirtualClusterManager::OnNodeDead(const rpc::GcsNodeInfo &node) {
primary_cluster_->OnNodeDead(node);
}

void GcsVirtualClusterManager::OnJobFinished(const rpc::JobTableData &job_data) {
// exit early when job has no virtual cluster id
const auto &virtual_cluster_id = job_data.virtual_cluster_id();
if (virtual_cluster_id.empty()) {
return;
}

auto job_cluster_id = VirtualClusterID::FromBinary(virtual_cluster_id);

if (!job_cluster_id.IsJobClusterID()) {
// exit early when this job is submitted in a mixed cluster
return;
}

std::string exclusive_cluster_id = job_cluster_id.ParentID().Binary();

auto virtual_cluster = GetVirtualCluster(exclusive_cluster_id);
if (virtual_cluster == nullptr) {
RAY_LOG(WARNING) << "Failed to remove job cluster " << job_cluster_id.Binary()
<< " when handling job finished event, parent cluster not exists.";
return;
}

if (virtual_cluster->GetMode() != rpc::AllocationMode::EXCLUSIVE) {
// this should not happen, virtual cluster should be exclusive
return;
}

ExclusiveCluster *exclusive_cluster =
dynamic_cast<ExclusiveCluster *>(virtual_cluster.get());

auto status = exclusive_cluster->RemoveJobCluster(
virtual_cluster_id,
[this, job_cluster_id](const Status &status,
std::shared_ptr<rpc::VirtualClusterTableData> data) {
if (!status.ok() || !data->is_removed()) {
RAY_LOG(WARNING) << "Failed to remove job cluster " << job_cluster_id.Binary()
<< " when handling job finished event. status: "
<< status.message();
} else {
RAY_LOG(INFO) << "Successfully removed job cluster " << job_cluster_id.Binary()
<< " after handling job finished event.";
}
});
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to remove job cluster " << job_cluster_id.Binary()
<< " when handling job finished event. status: " << status.message();
}
}

std::shared_ptr<VirtualCluster> GcsVirtualClusterManager::GetVirtualCluster(
const std::string &virtual_cluster_id) {
if (virtual_cluster_id.empty()) {
@@ -150,8 +200,7 @@ void GcsVirtualClusterManager::HandleCreateJobCluster(
ReplicaSets replica_sets(request.replica_sets().begin(), request.replica_sets().end());

auto exclusive_cluster = dynamic_cast<ExclusiveCluster *>(virtual_cluster.get());
const std::string &job_cluster_id =
exclusive_cluster->BuildJobClusterID(request.job_id());
std::string job_cluster_id = exclusive_cluster->BuildJobClusterID(request.job_id());

exclusive_cluster->CreateJobCluster(
job_cluster_id,
5 changes: 5 additions & 0 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.h
Original file line number Diff line number Diff line change
@@ -51,6 +51,11 @@ class GcsVirtualClusterManager : public rpc::VirtualClusterInfoHandler {
/// \param node The node that is dead.
void OnNodeDead(const rpc::GcsNodeInfo &node);

/// Handle the job finished event.
///
/// \param job_data The job that is finished.
void OnJobFinished(const rpc::JobTableData &job_data);

/// Get virtual cluster by virtual cluster id
///
/// \param virtual_cluster_id The id of virtual cluster
2 changes: 2 additions & 0 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
@@ -706,6 +706,8 @@ message JobTableData {
optional bool is_running_tasks = 11;
// Address of the driver that started this job.
Address driver_address = 12;
// The virtual cluster this job belongs to.
string virtual_cluster_id = 13;
}
///////////////////////////////////////////////////////////////////////////////

2 changes: 2 additions & 0 deletions src/ray/raylet/format/node_manager.fbs
Original file line number Diff line number Diff line change
@@ -149,6 +149,8 @@ table AnnounceWorkerPort {
port: int;
// The entrypoint of the job. Only populated if the worker is a driver.
entrypoint: string;
// The virtual cluster this job belongs to.
virtual_cluster_id: string;
}

table AnnounceWorkerPortReply {
3 changes: 3 additions & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
@@ -1451,6 +1451,9 @@ void NodeManager::ProcessAnnounceWorkerPortMessage(
string_from_flatbuf(*message->entrypoint()),
*job_config);

job_data_ptr->set_virtual_cluster_id(
string_from_flatbuf(*message->virtual_cluster_id()));

RAY_CHECK_OK(
gcs_client_->Jobs().AsyncAdd(job_data_ptr, [this, client](Status status) {
if (!status.ok()) {
8 changes: 4 additions & 4 deletions src/ray/raylet_client/raylet_client.cc
Original file line number Diff line number Diff line change
@@ -205,11 +205,11 @@ Status raylet::RayletClient::AnnounceWorkerPortForWorker(int port) {
return conn_->WriteMessage(MessageType::AnnounceWorkerPort, &fbb);
}

Status raylet::RayletClient::AnnounceWorkerPortForDriver(int port,
const std::string &entrypoint) {
Status raylet::RayletClient::AnnounceWorkerPortForDriver(
int port, const std::string &entrypoint, const std::string &virtual_cluster_id) {
flatbuffers::FlatBufferBuilder fbb;
auto message =
protocol::CreateAnnounceWorkerPort(fbb, port, fbb.CreateString(entrypoint));
auto message = protocol::CreateAnnounceWorkerPort(
fbb, port, fbb.CreateString(entrypoint), fbb.CreateString(virtual_cluster_id));
fbb.Finish(message);
std::vector<uint8_t> reply;
RAY_RETURN_NOT_OK(conn_->AtomicRequestReply(MessageType::AnnounceWorkerPort,
4 changes: 3 additions & 1 deletion src/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
@@ -358,7 +358,9 @@ class RayletClient : public RayletClientInterface {
/// \param port The port.
/// \param entrypoint The entrypoint of the driver's job.
/// \return ray::Status.
Status AnnounceWorkerPortForDriver(int port, const std::string &entrypoint);
Status AnnounceWorkerPortForDriver(int port,
const std::string &entrypoint,
const std::string &virtual_cluster_id);

/// Tell the raylet that the client has finished executing a task.
///