Skip to content

Commit

Permalink
[18/N][VirtualCluster] Handle job finished event in gcs virtual clust…
Browse files Browse the repository at this point in the history
…er manager

Signed-off-by: sule <[email protected]>
  • Loading branch information
xsuler committed Dec 24, 2024
1 parent f924c52 commit 41ec5cd
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 8 deletions.
7 changes: 6 additions & 1 deletion src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -924,8 +924,13 @@ void CoreWorker::ConnectToRayletInternal() {
// NOTE: This also marks the worker as available in Raylet. We do this at the
// very end in case there is a problem during construction.
if (options_.worker_type == WorkerType::DRIVER) {
// Get virtual cluster id from worker env to put it into job table data
std::string virtual_cluster_id = std::getenv(kEnvVarKeyVirtualClusterID)
? std::getenv(kEnvVarKeyVirtualClusterID)
: "";

Status status = local_raylet_client_->AnnounceWorkerPortForDriver(
core_worker_server_->GetPort(), options_.entrypoint);
core_worker_server_->GetPort(), options_.entrypoint, virtual_cluster_id);
RAY_CHECK(status.ok()) << "Failed to announce driver's port to raylet and GCS: "
<< status;
} else {
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,7 @@ void GcsServer::InstallEventListeners() {
const auto job_id = JobID::FromBinary(job_data.job_id());
gcs_task_manager_->OnJobFinished(job_id, job_data.end_time());
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(job_id);
gcs_virtual_cluster_manager_->OnJobFinished(job_data);
});

// Install scheduling event listeners.
Expand Down
54 changes: 52 additions & 2 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,57 @@ void GcsVirtualClusterManager::OnNodeDead(const rpc::GcsNodeInfo &node) {
primary_cluster_->OnNodeDead(node);
}

void GcsVirtualClusterManager::OnJobFinished(const rpc::JobTableData &job_data) {
// exit early when job has no virtual cluster id
const auto &virtual_cluster_id = job_data.virtual_cluster_id();
if (virtual_cluster_id.empty()) {
return;
}

auto job_cluster_id = VirtualClusterID::FromBinary(virtual_cluster_id);

if (!job_cluster_id.IsJobClusterID()) {
// exit early when this job is submitted in a mixed cluster
return;
}

std::string exclusive_cluster_id = job_cluster_id.ParentID().Binary();

auto virtual_cluster = GetVirtualCluster(exclusive_cluster_id);
if (virtual_cluster == nullptr) {
RAY_LOG(ERROR) << "Remove job cluster on job finished failed for job cluster "
<< virtual_cluster_id << ", parent virtual cluster not exists";
return;
}

if (virtual_cluster->GetMode() != rpc::AllocationMode::EXCLUSIVE) {
// this should not happen, virtual cluster should be exclusive
return;
}

ExclusiveCluster *exclusive_cluster =
dynamic_cast<ExclusiveCluster *>(virtual_cluster.get());

auto status = exclusive_cluster->RemoveJobCluster(
virtual_cluster_id,
[this, virtual_cluster_id](const Status &status,
std::shared_ptr<rpc::VirtualClusterTableData> data) {
if (!status.ok() || !data->is_removed()) {
RAY_LOG(WARNING) << "Remove job cluster on job finished failed for job cluster "
<< virtual_cluster_id
<< ", error message: " << status.message();
} else {
RAY_LOG(INFO)
<< "Remove job cluster on job finished successfully for job cluster "
<< virtual_cluster_id;
}
});
if (!status.ok()) {
RAY_LOG(WARNING) << "Remove job cluster on job finished failed for job cluster "
<< job_cluster_id << ", error message: " << status.message();
}
}

std::shared_ptr<VirtualCluster> GcsVirtualClusterManager::GetVirtualCluster(
const std::string &virtual_cluster_id) {
if (virtual_cluster_id.empty()) {
Expand Down Expand Up @@ -150,8 +201,7 @@ void GcsVirtualClusterManager::HandleCreateJobCluster(
ReplicaSets replica_sets(request.replica_sets().begin(), request.replica_sets().end());

auto exclusive_cluster = dynamic_cast<ExclusiveCluster *>(virtual_cluster.get());
const std::string &job_cluster_id =
exclusive_cluster->BuildJobClusterID(request.job_id());
std::string job_cluster_id = exclusive_cluster->BuildJobClusterID(request.job_id());

exclusive_cluster->CreateJobCluster(
job_cluster_id,
Expand Down
5 changes: 5 additions & 0 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ class GcsVirtualClusterManager : public rpc::VirtualClusterInfoHandler {
/// \param node The node that is dead.
void OnNodeDead(const rpc::GcsNodeInfo &node);

/// Handle the job finished event.
///
/// \param job_data The job that is finished.
void OnJobFinished(const rpc::JobTableData &job_data);

/// Get virtual cluster by virtual cluster id
///
/// \param virtual_cluster_id The id of virtual cluster
Expand Down
2 changes: 2 additions & 0 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,8 @@ message JobTableData {
optional bool is_running_tasks = 11;
// Address of the driver that started this job.
Address driver_address = 12;
// The virtual cluster this job belongs to.
string virtual_cluster_id = 13;
}
///////////////////////////////////////////////////////////////////////////////

Expand Down
2 changes: 2 additions & 0 deletions src/ray/raylet/format/node_manager.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ table AnnounceWorkerPort {
port: int;
// The entrypoint of the job. Only populated if the worker is a driver.
entrypoint: string;
// The virtual cluster this job belongs to.
virtual_cluster_id: string;
}

table AnnounceWorkerPortReply {
Expand Down
3 changes: 3 additions & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1451,6 +1451,9 @@ void NodeManager::ProcessAnnounceWorkerPortMessage(
string_from_flatbuf(*message->entrypoint()),
*job_config);

job_data_ptr->set_virtual_cluster_id(
string_from_flatbuf(*message->virtual_cluster_id()));

RAY_CHECK_OK(
gcs_client_->Jobs().AsyncAdd(job_data_ptr, [this, client](Status status) {
if (!status.ok()) {
Expand Down
8 changes: 4 additions & 4 deletions src/ray/raylet_client/raylet_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,11 @@ Status raylet::RayletClient::AnnounceWorkerPortForWorker(int port) {
return conn_->WriteMessage(MessageType::AnnounceWorkerPort, &fbb);
}

Status raylet::RayletClient::AnnounceWorkerPortForDriver(int port,
const std::string &entrypoint) {
Status raylet::RayletClient::AnnounceWorkerPortForDriver(
int port, const std::string &entrypoint, const std::string &virtual_cluster_id) {
flatbuffers::FlatBufferBuilder fbb;
auto message =
protocol::CreateAnnounceWorkerPort(fbb, port, fbb.CreateString(entrypoint));
auto message = protocol::CreateAnnounceWorkerPort(
fbb, port, fbb.CreateString(entrypoint), fbb.CreateString(virtual_cluster_id));
fbb.Finish(message);
std::vector<uint8_t> reply;
RAY_RETURN_NOT_OK(conn_->AtomicRequestReply(MessageType::AnnounceWorkerPort,
Expand Down
4 changes: 3 additions & 1 deletion src/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,9 @@ class RayletClient : public RayletClientInterface {
/// \param port The port.
/// \param entrypoint The entrypoint of the driver's job.
/// \return ray::Status.
Status AnnounceWorkerPortForDriver(int port, const std::string &entrypoint);
Status AnnounceWorkerPortForDriver(int port,
const std::string &entrypoint,
const std::string &virtual_cluster_id);

/// Tell the raylet that the client has finished executing a task.
///
Expand Down

0 comments on commit 41ec5cd

Please sign in to comment.