Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[18/N][VirtualCluster] Handle job finished event in gcs virtual cluster manager #436

Merged
merged 2 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -924,8 +924,13 @@ void CoreWorker::ConnectToRayletInternal() {
// NOTE: This also marks the worker as available in Raylet. We do this at the
// very end in case there is a problem during construction.
if (options_.worker_type == WorkerType::DRIVER) {
// Get virtual cluster id from worker env to put it into job table data
std::string virtual_cluster_id = std::getenv(kEnvVarKeyVirtualClusterID)
? std::getenv(kEnvVarKeyVirtualClusterID)
: "";

Status status = local_raylet_client_->AnnounceWorkerPortForDriver(
core_worker_server_->GetPort(), options_.entrypoint);
core_worker_server_->GetPort(), options_.entrypoint, virtual_cluster_id);
RAY_CHECK(status.ok()) << "Failed to announce driver's port to raylet and GCS: "
<< status;
} else {
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,7 @@ void GcsServer::InstallEventListeners() {
const auto job_id = JobID::FromBinary(job_data.job_id());
gcs_task_manager_->OnJobFinished(job_id, job_data.end_time());
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(job_id);
gcs_virtual_cluster_manager_->OnJobFinished(job_data);
});

// Install scheduling event listeners.
Expand Down
54 changes: 52 additions & 2 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,57 @@ void GcsVirtualClusterManager::OnNodeDead(const rpc::GcsNodeInfo &node) {
primary_cluster_->OnNodeDead(node);
}

void GcsVirtualClusterManager::OnJobFinished(const rpc::JobTableData &job_data) {
// exit early when job has no virtual cluster id
const auto &virtual_cluster_id = job_data.virtual_cluster_id();
if (virtual_cluster_id.empty()) {
return;
}

auto job_cluster_id = VirtualClusterID::FromBinary(virtual_cluster_id);

if (!job_cluster_id.IsJobClusterID()) {
// exit early when this job is submitted in a mixed cluster
return;
}

std::string exclusive_cluster_id = job_cluster_id.ParentID().Binary();

auto virtual_cluster = GetVirtualCluster(exclusive_cluster_id);
if (virtual_cluster == nullptr) {
RAY_LOG(ERROR) << "Remove job cluster on job finished failed for job cluster "
<< virtual_cluster_id << ", parent virtual cluster not exists";
return;
}

if (virtual_cluster->GetMode() != rpc::AllocationMode::EXCLUSIVE) {
// this should not happen, virtual cluster should be exclusive
return;
}

ExclusiveCluster *exclusive_cluster =
dynamic_cast<ExclusiveCluster *>(virtual_cluster.get());

auto status = exclusive_cluster->RemoveJobCluster(
virtual_cluster_id,
[this, virtual_cluster_id](const Status &status,
std::shared_ptr<rpc::VirtualClusterTableData> data) {
if (!status.ok() || !data->is_removed()) {
RAY_LOG(WARNING) << "Remove job cluster on job finished failed for job cluster "
xsuler marked this conversation as resolved.
Show resolved Hide resolved
<< virtual_cluster_id
<< ", error message: " << status.message();
} else {
RAY_LOG(INFO)
<< "Remove job cluster on job finished successfully for job cluster "
<< virtual_cluster_id;
}
});
if (!status.ok()) {
RAY_LOG(WARNING) << "Remove job cluster on job finished failed for job cluster "
<< job_cluster_id << ", error message: " << status.message();
}
}

std::shared_ptr<VirtualCluster> GcsVirtualClusterManager::GetVirtualCluster(
const std::string &virtual_cluster_id) {
if (virtual_cluster_id.empty()) {
Expand Down Expand Up @@ -150,8 +201,7 @@ void GcsVirtualClusterManager::HandleCreateJobCluster(
ReplicaSets replica_sets(request.replica_sets().begin(), request.replica_sets().end());

auto exclusive_cluster = dynamic_cast<ExclusiveCluster *>(virtual_cluster.get());
const std::string &job_cluster_id =
exclusive_cluster->BuildJobClusterID(request.job_id());
std::string job_cluster_id = exclusive_cluster->BuildJobClusterID(request.job_id());

exclusive_cluster->CreateJobCluster(
job_cluster_id,
Expand Down
5 changes: 5 additions & 0 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ class GcsVirtualClusterManager : public rpc::VirtualClusterInfoHandler {
/// \param node The node that is dead.
void OnNodeDead(const rpc::GcsNodeInfo &node);

/// Handle the job finished event.
///
/// \param job_data The job that is finished.
void OnJobFinished(const rpc::JobTableData &job_data);

/// Get virtual cluster by virtual cluster id
///
/// \param virtual_cluster_id The id of virtual cluster
Expand Down
2 changes: 2 additions & 0 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,8 @@ message JobTableData {
optional bool is_running_tasks = 11;
// Address of the driver that started this job.
Address driver_address = 12;
// The virtual cluster this job belongs to.
string virtual_cluster_id = 13;
}
///////////////////////////////////////////////////////////////////////////////

Expand Down
2 changes: 2 additions & 0 deletions src/ray/raylet/format/node_manager.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ table AnnounceWorkerPort {
port: int;
// The entrypoint of the job. Only populated if the worker is a driver.
entrypoint: string;
// The virtual cluster this job belongs to.
virtual_cluster_id: string;
}

table AnnounceWorkerPortReply {
Expand Down
3 changes: 3 additions & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1451,6 +1451,9 @@ void NodeManager::ProcessAnnounceWorkerPortMessage(
string_from_flatbuf(*message->entrypoint()),
*job_config);

job_data_ptr->set_virtual_cluster_id(
string_from_flatbuf(*message->virtual_cluster_id()));

RAY_CHECK_OK(
gcs_client_->Jobs().AsyncAdd(job_data_ptr, [this, client](Status status) {
if (!status.ok()) {
Expand Down
8 changes: 4 additions & 4 deletions src/ray/raylet_client/raylet_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,11 @@ Status raylet::RayletClient::AnnounceWorkerPortForWorker(int port) {
return conn_->WriteMessage(MessageType::AnnounceWorkerPort, &fbb);
}

Status raylet::RayletClient::AnnounceWorkerPortForDriver(int port,
const std::string &entrypoint) {
Status raylet::RayletClient::AnnounceWorkerPortForDriver(
int port, const std::string &entrypoint, const std::string &virtual_cluster_id) {
flatbuffers::FlatBufferBuilder fbb;
auto message =
protocol::CreateAnnounceWorkerPort(fbb, port, fbb.CreateString(entrypoint));
auto message = protocol::CreateAnnounceWorkerPort(
fbb, port, fbb.CreateString(entrypoint), fbb.CreateString(virtual_cluster_id));
fbb.Finish(message);
std::vector<uint8_t> reply;
RAY_RETURN_NOT_OK(conn_->AtomicRequestReply(MessageType::AnnounceWorkerPort,
Expand Down
4 changes: 3 additions & 1 deletion src/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,9 @@ class RayletClient : public RayletClientInterface {
/// \param port The port.
/// \param entrypoint The entrypoint of the driver's job.
/// \return ray::Status.
Status AnnounceWorkerPortForDriver(int port, const std::string &entrypoint);
Status AnnounceWorkerPortForDriver(int port,
const std::string &entrypoint,
const std::string &virtual_cluster_id);

/// Tell the raylet that the client has finished executing a task.
///
Expand Down