Skip to content

Commit

Permalink
[1/N] add virtual cluster grpc service and handler (#410)
Browse files Browse the repository at this point in the history
Co-authored-by: 黑驰 <[email protected]>
  • Loading branch information
wumuzi520 and wumuzi520 authored Dec 10, 2024
1 parent d6128f5 commit e5fc758
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 0 deletions.
32 changes: 32 additions & 0 deletions src/ray/gcs/gcs_server/gcs_server.ant.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/gcs/gcs_server/gcs_server.h"

#include "ray/gcs/gcs_server/gcs_virtual_cluster_manager.h"

namespace ray {
namespace gcs {
void GcsServer::InitGcsVirtualClusterManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_publisher_);
gcs_virtual_cluster_manager_ = std::make_shared<gcs::GcsVirtualClusterManager>();
// Initialize by gcs tables data.
gcs_virtual_cluster_manager_->Initialize(gcs_init_data);
// Register service.
gcs_virtual_cluster_service_.reset(new rpc::VirtualClusterInfoGrpcService(
io_context_provider_.GetDefaultIOContext(), *gcs_virtual_cluster_manager_));
rpc_server_.RegisterService(*gcs_virtual_cluster_service_);
}
} // namespace gcs
} // namespace ray
3 changes: 3 additions & 0 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Init cluster task manager.
InitClusterTaskManager();

// Init gcs virtual cluster manager.
InitGcsVirtualClusterManager(gcs_init_data);

// Init gcs resource manager.
InitGcsResourceManager(gcs_init_data);

Expand Down
7 changes: 7 additions & 0 deletions src/ray/gcs/gcs_server/gcs_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct GcsServerConfig {
};

class GcsNodeManager;
class GcsVirtualClusterManager;
class GcsActorManager;
class GcsJobManager;
class GcsWorkerManager;
Expand Down Expand Up @@ -155,6 +156,9 @@ class GcsServer {
/// Initialize gcs actor manager.
void InitGcsActorManager(const GcsInitData &gcs_init_data);

/// Initialize gcs virtual cluster manager.
void InitGcsVirtualClusterManager(const GcsInitData &gcs_init_data);

/// Initialize gcs placement group manager.
void InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data);

Expand Down Expand Up @@ -245,6 +249,9 @@ class GcsServer {
std::unique_ptr<GcsNodeManager> gcs_node_manager_;
/// The health check manager.
std::unique_ptr<GcsHealthCheckManager> gcs_healthcheck_manager_;
/// The gcs virtual cluster handler and service.
std::shared_ptr<GcsVirtualClusterManager> gcs_virtual_cluster_manager_;
std::unique_ptr<rpc::VirtualClusterInfoGrpcService> gcs_virtual_cluster_service_;
/// The gcs redis failure detector.
std::unique_ptr<GcsRedisFailureDetector> gcs_redis_failure_detector_;
/// The gcs placement group manager.
Expand Down
51 changes: 51 additions & 0 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/gcs/gcs_server/gcs_virtual_cluster_manager.h"

namespace ray {
namespace gcs {

void GcsVirtualClusterManager::Initialize(const GcsInitData &gcs_init_data) {
// TODO(Shanly): To be implement.
}

void GcsVirtualClusterManager::HandleCreateOrUpdateVirtualCluster(
rpc::CreateOrUpdateVirtualClusterRequest request,
rpc::CreateOrUpdateVirtualClusterReply *reply,
rpc::SendReplyCallback send_reply_callback) {
const auto &virtual_cluster_id = request.virtual_cluster_id();
RAY_LOG(INFO) << "Start creating or updating virtual cluster " << virtual_cluster_id;
// TODO(Shanly): To be implement.
}

void GcsVirtualClusterManager::HandleRemoveVirtualCluster(
rpc::RemoveVirtualClusterRequest request,
rpc::RemoveVirtualClusterReply *reply,
rpc::SendReplyCallback send_reply_callback) {
const auto &virtual_cluster_id = request.virtual_cluster_id();
RAY_LOG(INFO) << "Start removing virtual cluster " << virtual_cluster_id;
// TODO(Shanly): To be implement.
}

void GcsVirtualClusterManager::HandleGetAllVirtualClusters(
rpc::GetAllVirtualClustersRequest request,
rpc::GetAllVirtualClustersReply *reply,
rpc::SendReplyCallback send_reply_callback) {
RAY_LOG(DEBUG) << "Getting all virtual clusters.";
// TODO(Shanly): To be implement.
}

} // namespace gcs
} // namespace ray
48 changes: 48 additions & 0 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "ray/gcs/gcs_server/gcs_init_data.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h"

namespace ray {
namespace gcs {

/// This implementation class of `VirtualClusterInfoHandler`.
class GcsVirtualClusterManager : public rpc::VirtualClusterInfoHandler {
public:
/// Initialize with the gcs tables data synchronously.
/// This should be called when GCS server restarts after a failure.
///
/// \param gcs_init_data.
void Initialize(const GcsInitData &gcs_init_data);

protected:
void HandleCreateOrUpdateVirtualCluster(
rpc::CreateOrUpdateVirtualClusterRequest request,
rpc::CreateOrUpdateVirtualClusterReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void HandleRemoveVirtualCluster(rpc::RemoveVirtualClusterRequest request,
rpc::RemoveVirtualClusterReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void HandleGetAllVirtualClusters(rpc::GetAllVirtualClustersRequest request,
rpc::GetAllVirtualClustersReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
};

} // namespace gcs
} // namespace ray
29 changes: 29 additions & 0 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -705,3 +705,32 @@ message JobTableData {
Address driver_address = 12;
}
///////////////////////////////////////////////////////////////////////////////

enum JobExecMode {
// A single node can carray tasks for multiple jobs.
Mixed = 0;
// A single node can only carray tasks for one job.
Exclusive = 1;
}

message NodeInstance {
// The Hostname address of the node instance.
string hostname = 1;
// The template id of the node instance.
string template_id = 2;
// Whether it's dead.
bool is_dead = 3;
}

message VirtualClusterTableData {
// The virtual cluster id.
string id = 1;
// The virtual cluster name.
string name = 2;
// The mode of jobs executed in this virtual cluster.
JobExecMode mode = 3;
// Mapping from node id to it's instance.
map<string, NodeInstance> node_instances = 4;
// Version number of the last modification to the virtual cluster.
uint64 revision = 5;
}
55 changes: 55 additions & 0 deletions src/ray/protobuf/gcs_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -817,3 +817,58 @@ service TaskInfoGcsService {
}

///////////////////////////////////////////////////////////////////////////////

message ReplicaSet {
// The number of replica instances.
int32 replicas = 1;
// The template id for the replica instances.
string template_id = 2;
}

message CreateOrUpdateVirtualClusterRequest {
// The virtual cluster id.
string virtual_cluster_id = 1;
// The virtual cluster name.
string virtual_cluster_name = 2;
// The mode of jobs executed in this virtual cluster.
JobExecMode mode = 3;
// The replica set list of the virtual cluster.
repeated ReplicaSet replica_set_list = 4;
// Version number of the last modification to the virtual cluster.
uint64 revision = 5;
}

message CreateOrUpdateVirtualClusterReply {
GcsStatus status = 1;
// Version number of the last modification to the virtual cluster.
uint64 revision = 2;
// Mapping from node id to it's instance.
map<string, NodeInstance> node_instances = 3;
}

message RemoveVirtualClusterRequest {
// ID of the virtual cluster to be removed.
string virtual_cluster_id = 1;
}

message RemoveVirtualClusterReply {
GcsStatus status = 1;
}

message GetAllVirtualClustersRequest {
}

message GetAllVirtualClustersReply {
GcsStatus status = 1;
repeated VirtualClusterTableData virtual_cluster_data_list = 2;
}

service VirtualClusterInfoGcsService {
// Create or update a virtual cluster.
rpc CreateOrUpdateVirtualCluster(CreateOrUpdateVirtualClusterRequest)
returns (CreateOrUpdateVirtualClusterReply);
// Remove a virtual cluster.
rpc RemoveVirtualCluster(RemoveVirtualClusterRequest) returns (RemoveVirtualClusterReply);
// Get all the virtual clusters.
rpc GetAllVirtualClusters(GetAllVirtualClustersRequest) returns (GetAllVirtualClustersReply);
}
50 changes: 50 additions & 0 deletions src/ray/rpc/gcs_server/gcs_rpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ namespace rpc {
#define INTERNAL_PUBSUB_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(InternalPubSubGcsService, HANDLER, -1)

#define VIRTUAL_CLUSTER_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(VirtualClusterInfoGcsService, HANDLER, -1)

#define GCS_RPC_SEND_REPLY(send_reply_callback, reply, status) \
reply->mutable_status()->set_code((int)status.code()); \
reply->mutable_status()->set_message(status.message()); \
Expand Down Expand Up @@ -714,6 +717,52 @@ class InternalPubSubGrpcService : public GrpcService {
InternalPubSubGcsServiceHandler &service_handler_;
};

class VirtualClusterInfoGcsServiceHandler {
public:
virtual ~VirtualClusterInfoGcsServiceHandler() = default;

virtual void HandleCreateOrUpdateVirtualCluster(
CreateOrUpdateVirtualClusterRequest request,
CreateOrUpdateVirtualClusterReply *reply,
SendReplyCallback send_reply_callback) = 0;

virtual void HandleRemoveVirtualCluster(RemoveVirtualClusterRequest request,
RemoveVirtualClusterReply *reply,
SendReplyCallback send_reply_callback) = 0;

virtual void HandleGetAllVirtualClusters(GetAllVirtualClustersRequest request,
GetAllVirtualClustersReply *reply,
SendReplyCallback send_reply_callback) = 0;
};

class VirtualClusterInfoGrpcService : public GrpcService {
public:
/// Constructor.
///
/// \param[in] handler The service handler that actually handle the requests.
explicit VirtualClusterInfoGrpcService(instrumented_io_context &io_service,
VirtualClusterInfoGcsServiceHandler &handler)
: GrpcService(io_service), service_handler_(handler){};

protected:
grpc::Service &GetGrpcService() override { return service_; }

void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
const ClusterID &cluster_id) override {
VIRTUAL_CLUSTER_SERVICE_RPC_HANDLER(CreateOrUpdateVirtualCluster);
VIRTUAL_CLUSTER_SERVICE_RPC_HANDLER(RemoveVirtualCluster);
VIRTUAL_CLUSTER_SERVICE_RPC_HANDLER(GetAllVirtualClusters);
}

private:
/// The grpc async service object.
VirtualClusterInfoGcsService::AsyncService service_;
/// The service handler that actually handle the requests.
VirtualClusterInfoGcsServiceHandler &service_handler_;
};

using JobInfoHandler = JobInfoGcsServiceHandler;
using ActorInfoHandler = ActorInfoGcsServiceHandler;
using NodeInfoHandler = NodeInfoGcsServiceHandler;
Expand All @@ -724,6 +773,7 @@ using InternalKVHandler = InternalKVGcsServiceHandler;
using InternalPubSubHandler = InternalPubSubGcsServiceHandler;
using RuntimeEnvHandler = RuntimeEnvGcsServiceHandler;
using TaskInfoHandler = TaskInfoGcsServiceHandler;
using VirtualClusterInfoHandler = VirtualClusterInfoGcsServiceHandler;

} // namespace rpc
} // namespace ray

0 comments on commit e5fc758

Please sign in to comment.