Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Retryable grpc client #47981

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ ray_cc_library(
"src/ray/rpc/client_call.h",
"src/ray/rpc/common.h",
"src/ray/rpc/grpc_client.h",
"src/ray/rpc/retryable_grpc_client.h",
"src/ray/rpc/grpc_server.h",
"src/ray/rpc/metrics_agent_client.h",
"src/ray/rpc/server_call.h",
Expand Down
2 changes: 2 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ class ObjectRefGenerator:
return False
else:
return True
else:
return False

"""
Private APIs
Expand Down
4 changes: 4 additions & 0 deletions python/ray/tests/test_streaming_generator_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ def test_ray_datasetlike_mini_stress_test(
"RAY_testing_asio_delay_us",
"CoreWorkerService.grpc_server.ReportGeneratorItemReturns=10000:1000000",
)
m.setenv(
"RAY_testing_rpc_failure",
"CoreWorkerService.grpc_client.ReportGeneratorItemReturns=5",
)
cluster = ray_start_cluster
cluster.add_node(
num_cpus=1,
Expand Down
4 changes: 3 additions & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ RAY_CONFIG(int32_t, gcs_grpc_initial_reconnect_backoff_ms, 100)
RAY_CONFIG(uint64_t, gcs_grpc_max_request_queued_max_bytes, 1024UL * 1024 * 1024 * 5)

/// The duration between two checks for grpc status.
RAY_CONFIG(int32_t, gcs_client_check_connection_status_interval_milliseconds, 1000)
RAY_CONFIG(int32_t, grpc_client_check_connection_status_interval_milliseconds, 1000)

/// Due to the protocol drawback, raylet needs to refresh the message if
/// no message is received for a while.
Expand Down Expand Up @@ -693,6 +693,8 @@ RAY_CONFIG(int64_t, timeout_ms_task_wait_for_death_info, 1000)
/// report the loads to raylet.
RAY_CONFIG(int64_t, core_worker_internal_heartbeat_ms, 1000)

RAY_CONFIG(int32_t, core_worker_rpc_server_reconnect_timeout_s, 60)

/// Maximum amount of memory that will be used by running tasks' args.
RAY_CONFIG(float, max_task_args_memory_fraction, 0.7)

Expand Down
42 changes: 39 additions & 3 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,43 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
}

core_worker_client_pool_ =
std::make_shared<rpc::CoreWorkerClientPool>(*client_call_manager_);
std::make_shared<rpc::CoreWorkerClientPool>([&](const rpc::Address &addr) {
return std::make_shared<rpc::CoreWorkerClient>(
addr, *client_call_manager_, [this, addr]() {
const NodeID node_id = NodeID::FromBinary(addr.raylet_id());
const WorkerID worker_id = WorkerID::FromBinary(addr.worker_id());
const rpc::GcsNodeInfo *node_info =
gcs_client_->Nodes().Get(node_id, /*filter_dead_nodes=*/true);
if (node_info == nullptr) {
RAY_LOG(INFO).WithField(worker_id).WithField(node_id)
<< "Disconnect core worker client since its node is dead";
io_service_.post(
[this, worker_id]() {
core_worker_client_pool_->Disconnect(worker_id);
},
"CoreWorkerClientPool.Disconnect");

return;
}

std::shared_ptr<raylet::RayletClient> raylet_client =
std::make_shared<raylet::RayletClient>(
rpc::NodeManagerWorkerClient::make(
node_info->node_manager_address(),
node_info->node_manager_port(),
*client_call_manager_));
raylet_client->IsLocalWorkerDead(
worker_id,
[this, worker_id](const Status &status,
rpc::IsLocalWorkerDeadReply &&reply) {
if (status.ok() && reply.is_dead()) {
RAY_LOG(INFO).WithField(worker_id)
<< "Disconnect core worker client since it is dead";
core_worker_client_pool_->Disconnect(worker_id);
}
});
});
});

object_info_publisher_ = std::make_unique<pubsub::Publisher>(
/*channels=*/std::vector<
Expand Down Expand Up @@ -3306,13 +3342,13 @@ Status CoreWorker::ReportGeneratorItemReturns(
if (status.ok()) {
num_objects_consumed = reply.total_num_object_consumed();
} else {
// TODO(sang): Handle network error more gracefully.
// If the request fails, we should just resume until task finishes without
// backpressure.
num_objects_consumed = waiter->TotalObjectGenerated();
RAY_LOG(WARNING).WithField(return_id)
<< "Failed to report streaming generator return "
"to the caller. The yield'ed ObjectRef may not be usable.";
"to the caller. The yield'ed ObjectRef may not be usable. "
<< status;
}
waiter->HandleObjectReported(num_objects_consumed);
});
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/transport/normal_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ void NormalTaskSubmitter::RequestNewWorkerIfNeeded(const SchedulingKey &scheduli

void NormalTaskSubmitter::PushNormalTask(
const rpc::Address &addr,
shared_ptr<rpc::CoreWorkerClientInterface> client,
std::shared_ptr<rpc::CoreWorkerClientInterface> client,
const SchedulingKey &scheduling_key,
const TaskSpecification &task_spec,
const google::protobuf::RepeatedPtrField<rpc::ResourceMapEntry> &assigned_resources) {
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ GcsActorManager::GcsActorManager(
RuntimeEnvManager &runtime_env_manager,
GcsFunctionManager &function_manager,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
const rpc::ClientFactoryFn &worker_client_factory)
const rpc::CoreWorkerClientFactoryFn &worker_client_factory)
: gcs_actor_scheduler_(std::move(scheduler)),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_publisher_(std::move(gcs_publisher)),
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ class GcsActorManager : public rpc::ActorInfoHandler {
RuntimeEnvManager &runtime_env_manager,
GcsFunctionManager &function_manager,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
const rpc::ClientFactoryFn &worker_client_factory = nullptr);
const rpc::CoreWorkerClientFactoryFn &worker_client_factory = nullptr);

~GcsActorManager() = default;

Expand Down Expand Up @@ -692,7 +692,7 @@ class GcsActorManager : public rpc::ActorInfoHandler {
std::shared_ptr<GcsPublisher> gcs_publisher_;
/// Factory to produce clients to workers. This is used to communicate with
/// actors and their owners.
rpc::ClientFactoryFn worker_client_factory_;
rpc::CoreWorkerClientFactoryFn worker_client_factory_;
/// A callback that is used to destroy placemenet group owned by the actor.
/// This method MUST BE IDEMPOTENT because it can be called multiple times during
/// actor destroy process.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_actor_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ GcsActorScheduler::GcsActorScheduler(
GcsActorSchedulerFailureCallback schedule_failure_handler,
GcsActorSchedulerSuccessCallback schedule_success_handler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
rpc::ClientFactoryFn client_factory,
rpc::CoreWorkerClientFactoryFn client_factory,
std::function<void(const NodeID &, const rpc::ResourcesData &)>
normal_task_resources_changed_callback)
: io_context_(io_context),
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_actor_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
GcsActorSchedulerFailureCallback schedule_failure_handler,
GcsActorSchedulerSuccessCallback schedule_success_handler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
rpc::ClientFactoryFn client_factory = nullptr,
rpc::CoreWorkerClientFactoryFn client_factory = nullptr,
std::function<void(const NodeID &, const rpc::ResourcesData &)>
normal_task_resources_changed_callback = nullptr);
virtual ~GcsActorScheduler() = default;
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_job_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class GcsJobManager : public rpc::JobInfoHandler {
RuntimeEnvManager &runtime_env_manager,
GcsFunctionManager &function_manager,
InternalKVInterface &internal_kv,
rpc::ClientFactoryFn client_factory = nullptr)
rpc::CoreWorkerClientFactoryFn client_factory = nullptr)
: gcs_table_storage_(std::move(gcs_table_storage)),
gcs_publisher_(std::move(gcs_publisher)),
runtime_env_manager_(runtime_env_manager),
Expand Down
13 changes: 10 additions & 3 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,9 @@ void GcsServer::InitClusterTaskManager() {

void GcsServer::InitGcsJobManager(const GcsInitData &gcs_init_data) {
auto client_factory = [this](const rpc::Address &address) {
return std::make_shared<rpc::CoreWorkerClient>(address, client_call_manager_);
return std::make_shared<rpc::CoreWorkerClient>(address, client_call_manager_, []() {
// Keep retrying
});
};
RAY_CHECK(gcs_table_storage_ && gcs_publisher_);
gcs_job_manager_ = std::make_unique<GcsJobManager>(gcs_table_storage_,
Expand Down Expand Up @@ -447,7 +449,9 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) {
gcs_actor_manager_->OnActorCreationSuccess(std::move(actor), reply);
};
auto client_factory = [this](const rpc::Address &address) {
return std::make_shared<rpc::CoreWorkerClient>(address, client_call_manager_);
return std::make_shared<rpc::CoreWorkerClient>(address, client_call_manager_, []() {
// Keep retrying
});
};

RAY_CHECK(gcs_resource_manager_ && cluster_task_manager_);
Expand All @@ -474,7 +478,10 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) {
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenActorDead(actor_id);
},
[this](const rpc::Address &address) {
return std::make_shared<rpc::CoreWorkerClient>(address, client_call_manager_);
return std::make_shared<rpc::CoreWorkerClient>(
address, client_call_manager_, []() {
// Keep retrying.
});
});

// Initialize by gcs tables data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class GcsJobManagerTest : public ::testing::Test {
std::unique_ptr<gcs::GcsFunctionManager> function_manager_;
std::unique_ptr<gcs::MockInternalKVInterface> kv_;
std::unique_ptr<gcs::FakeInternalKVInterface> fake_kv_;
rpc::ClientFactoryFn client_factory_;
rpc::CoreWorkerClientFactoryFn client_factory_;
RuntimeEnvManager runtime_env_manager_;
const std::chrono::milliseconds timeout_ms_{5000};
std::string log_dir_;
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/test/gcs_job_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class GcsJobManagerTest : public ::testing::Test {
std::unique_ptr<gcs::GcsFunctionManager> function_manager_;
std::unique_ptr<gcs::MockInternalKVInterface> kv_;
std::unique_ptr<gcs::FakeInternalKVInterface> fake_kv_;
rpc::ClientFactoryFn client_factory_;
rpc::CoreWorkerClientFactoryFn client_factory_;
RuntimeEnvManager runtime_env_manager_;
const std::chrono::milliseconds timeout_ms_{5000};
};
Expand Down
9 changes: 9 additions & 0 deletions src/ray/protobuf/node_manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,14 @@ message PushMutableObjectReply {
bool done = 1;
}

message IsLocalWorkerDeadRequest {
bytes worker_id = 1;
}

message IsLocalWorkerDeadReply {
bool is_dead = 1;
}

// Service for inter-node-manager communication.
service NodeManagerService {
// Handle the case when GCS restarted.
Expand Down Expand Up @@ -440,4 +448,5 @@ service NodeManagerService {
rpc RegisterMutableObject(RegisterMutableObjectRequest)
returns (RegisterMutableObjectReply);
rpc PushMutableObject(PushMutableObjectRequest) returns (PushMutableObjectReply);
rpc IsLocalWorkerDead(IsLocalWorkerDeadRequest) returns (IsLocalWorkerDeadReply);
}
2 changes: 1 addition & 1 deletion src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ int main(int argc, char *argv[]) {
auto signal_handler = [&raylet, shutdown_raylet_gracefully_internal](
const boost::system::error_code &error, int signal_number) {
ray::rpc::NodeDeathInfo node_death_info;
optional<ray::rpc::DrainRayletRequest> drain_request =
std::optional<ray::rpc::DrainRayletRequest> drain_request =
raylet->node_manager().GetLocalDrainRequest();
RAY_LOG(INFO) << "received SIGTERM. Existing local drain request = "
<< (drain_request.has_value() ? drain_request->DebugString() : "None");
Expand Down
14 changes: 13 additions & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ NodeManager::NodeManager(
config.ray_debugger_external,
/*get_time=*/[]() { return absl::GetCurrentTimeNanos() / 1e6; }),
client_call_manager_(io_service),
worker_rpc_pool_(client_call_manager_),
worker_rpc_pool_([&](const rpc::Address &addr) {
return std::make_shared<rpc::CoreWorkerClient>(addr, client_call_manager_, []() {
// Keep retrying.
});
}),
core_worker_subscriber_(std::make_unique<pubsub::Subscriber>(
self_node_id_,
/*channels=*/
Expand Down Expand Up @@ -1988,6 +1992,14 @@ void NodeManager::HandleReturnWorker(rpc::ReturnWorkerRequest request,
send_reply_callback(status, nullptr, nullptr);
}

void NodeManager::HandleIsLocalWorkerDead(rpc::IsLocalWorkerDeadRequest request,
rpc::IsLocalWorkerDeadReply *reply,
rpc::SendReplyCallback send_reply_callback) {
reply->set_is_dead(worker_pool_.GetRegisteredWorker(
WorkerID::FromBinary(request.worker_id())) == nullptr);
send_reply_callback(Status::OK(), nullptr, nullptr);
}

void NodeManager::HandleDrainRaylet(rpc::DrainRayletRequest request,
rpc::DrainRayletReply *reply,
rpc::SendReplyCallback send_reply_callback) {
Expand Down
6 changes: 5 additions & 1 deletion src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
}

/// Get the local drain request.
optional<rpc::DrainRayletRequest> GetLocalDrainRequest() const {
std::optional<rpc::DrainRayletRequest> GetLocalDrainRequest() const {
return cluster_resource_scheduler_->GetLocalResourceManager().GetLocalDrainRequest();
}

Expand Down Expand Up @@ -552,6 +552,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
rpc::DrainRayletReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void HandleIsLocalWorkerDead(rpc::IsLocalWorkerDeadRequest request,
rpc::IsLocalWorkerDeadReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle a `CancelWorkerLease` request.
void HandleCancelWorkerLease(rpc::CancelWorkerLeaseRequest request,
rpc::CancelWorkerLeaseReply *reply,
Expand Down
4 changes: 3 additions & 1 deletion src/ray/raylet/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ void Worker::Connect(int port) {
rpc::Address addr;
addr.set_ip_address(ip_address_);
addr.set_port(port_);
rpc_client_ = std::make_unique<rpc::CoreWorkerClient>(addr, client_call_manager_);
rpc_client_ = std::make_unique<rpc::CoreWorkerClient>(addr, client_call_manager_, []() {
// Keep retrying.
});
Connect(rpc_client_);
}

Expand Down
14 changes: 14 additions & 0 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,20 @@ Status WorkerPool::RegisterDriver(const std::shared_ptr<WorkerInterface> &driver
return Status::OK();
}

std::shared_ptr<WorkerInterface> WorkerPool::GetRegisteredWorker(
const WorkerID &worker_id) const {
for (const auto &entry : states_by_lang_) {
for (auto it = entry.second.registered_workers.begin();
it != entry.second.registered_workers.end();
it++) {
if ((*it)->WorkerId() == worker_id) {
return (*it);
}
}
}
return nullptr;
}

std::shared_ptr<WorkerInterface> WorkerPool::GetRegisteredWorker(
const std::shared_ptr<ClientConnection> &connection) const {
for (const auto &entry : states_by_lang_) {
Expand Down
2 changes: 2 additions & 0 deletions src/ray/raylet/worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
std::shared_ptr<WorkerInterface> GetRegisteredWorker(
const std::shared_ptr<ClientConnection> &connection) const;

std::shared_ptr<WorkerInterface> GetRegisteredWorker(const WorkerID &worker_id) const;

/// Get the client connection's registered driver.
///
/// \param The client connection owned by a registered driver.
Expand Down
8 changes: 8 additions & 0 deletions src/ray/raylet_client/raylet_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,14 @@ void raylet::RayletClient::DrainRaylet(
grpc_client_->DrainRaylet(request, callback);
}

void raylet::RayletClient::IsLocalWorkerDead(
const WorkerID &worker_id,
const rpc::ClientCallback<rpc::IsLocalWorkerDeadReply> &callback) {
rpc::IsLocalWorkerDeadRequest request;
request.set_worker_id(worker_id.Binary());
grpc_client_->IsLocalWorkerDead(request, callback);
}

void raylet::RayletClient::GlobalGC(
const rpc::ClientCallback<rpc::GlobalGCReply> &callback) {
rpc::GlobalGCRequest request;
Expand Down
8 changes: 8 additions & 0 deletions src/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ class RayletClientInterface : public PinObjectsInterface,
int64_t deadline_timestamp_ms,
const rpc::ClientCallback<rpc::DrainRayletReply> &callback) = 0;

virtual void IsLocalWorkerDead(
const WorkerID &worker_id,
const rpc::ClientCallback<rpc::IsLocalWorkerDeadReply> &callback) = 0;

virtual std::shared_ptr<grpc::Channel> GetChannel() const = 0;
};

Expand Down Expand Up @@ -523,6 +527,10 @@ class RayletClient : public RayletClientInterface {
int64_t deadline_timestamp_ms,
const rpc::ClientCallback<rpc::DrainRayletReply> &callback) override;

void IsLocalWorkerDead(
const WorkerID &worker_id,
const rpc::ClientCallback<rpc::IsLocalWorkerDeadReply> &callback) override;

void GetSystemConfig(
const rpc::ClientCallback<rpc::GetSystemConfigReply> &callback) override;

Expand Down
Loading