diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index a5254179d948..1fc3ad1bb630 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -36,7 +36,7 @@ wait_until_server_available, wait_until_succeeded_without_exception, ) -from ray.core.generated import gcs_pb2 +from ray.core.generated import common_pb2 import ray.scripts.scripts as scripts from ray.dashboard import dashboard from ray.dashboard.head import DashboardHead @@ -219,7 +219,7 @@ def test_raylet_and_agent_share_fate(shutdown_only): node for node in ray.nodes() if node["NodeID"] == worker_node_id ][0] assert not worker_node_info["Alive"] - assert worker_node_info["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value( + assert worker_node_info["DeathReason"] == common_pb2.NodeDeathInfo.Reason.Value( "UNEXPECTED_TERMINATION" ) assert ( diff --git a/python/ray/exceptions.py b/python/ray/exceptions.py index e35eee260d95..14c308e20eac 100644 --- a/python/ray/exceptions.py +++ b/python/ray/exceptions.py @@ -13,6 +13,7 @@ ActorDiedErrorContext, Address, Language, + NodeDeathInfo, RayException, ) from ray.util.annotations import DeveloperAPI, PublicAPI @@ -339,7 +340,9 @@ class ActorDiedError(RayActorError): BASE_ERROR_MSG = "The actor died unexpectedly before finishing this task." - def __init__(self, cause: Union[RayTaskError, ActorDiedErrorContext] = None): + def __init__( + self, cause: Optional[Union[RayTaskError, ActorDiedErrorContext]] = None + ): """ Construct a RayActorError by building the arguments. """ @@ -380,11 +383,12 @@ def __init__(self, cause: Union[RayTaskError, ActorDiedErrorContext] = None): error_msg_lines.append( "The actor never ran - it was cancelled before it started running." ) - if cause.preempted: + if ( + cause.node_death_info + and cause.node_death_info.reason + == NodeDeathInfo.AUTOSCALER_DRAIN_PREEMPTED + ): preempted = True - error_msg_lines.append( - "\tThe actor's node was killed by a spot preemption." - ) error_msg = "\n".join(error_msg_lines) actor_id = ActorID(cause.actor_id).hex() super().__init__(actor_id, error_msg, actor_init_failed, preempted) diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index 86e14eaee994..975b942d5f9f 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -748,7 +748,7 @@ def create_actor(self): cluster.remove_node(node_to_kill) with pytest.raises( ray.exceptions.RayActorError, - match="The actor is dead because its node has died.", + match="The actor died because its node has died.", ) as exc_info: ray.get(a.check_alive.remote()) assert exc_info.value.actor_id == a._actor_id.hex() diff --git a/python/ray/tests/test_draining.py b/python/ray/tests/test_draining.py index 360e5d5d34ad..ee942f4c13bf 100644 --- a/python/ray/tests/test_draining.py +++ b/python/ray/tests/test_draining.py @@ -4,7 +4,7 @@ import ray import time from ray._raylet import GcsClient -from ray.core.generated import autoscaler_pb2, gcs_pb2 +from ray.core.generated import autoscaler_pb2, common_pb2 from ray._private.test_utils import wait_for_condition from ray.util.scheduling_strategies import ( NodeAffinitySchedulingStrategy, @@ -69,7 +69,7 @@ def drain_until_accept(): ) worker_node = [node for node in ray.nodes() if node["NodeID"] == worker_node_id][0] - assert worker_node["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value( + assert worker_node["DeathReason"] == common_pb2.NodeDeathInfo.Reason.Value( "AUTOSCALER_DRAIN_IDLE" ) assert worker_node["DeathReasonMessage"] == "idle for long enough" @@ -137,7 +137,7 @@ def ping(self): ) worker_node = [node for node in ray.nodes() if node["NodeID"] == worker_node_id][0] - assert worker_node["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value( + assert worker_node["DeathReason"] == common_pb2.NodeDeathInfo.Reason.Value( "AUTOSCALER_DRAIN_PREEMPTED" ) assert worker_node["DeathReasonMessage"] == "preemption" @@ -195,7 +195,7 @@ def ping(self): ) worker_node = [node for node in ray.nodes() if node["NodeID"] == worker_node_id][0] - assert worker_node["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value( + assert worker_node["DeathReason"] == common_pb2.NodeDeathInfo.Reason.Value( "AUTOSCALER_DRAIN_PREEMPTED" ) assert worker_node["DeathReasonMessage"] == "preemption" @@ -251,7 +251,7 @@ def ping(self): # Since worker node failure is detected to be before the draining deadline, # this is considered as an unexpected termination. worker_node = [node for node in ray.nodes() if node["NodeID"] == worker_node_id][0] - assert worker_node["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value( + assert worker_node["DeathReason"] == common_pb2.NodeDeathInfo.Reason.Value( "UNEXPECTED_TERMINATION" ) assert ( @@ -412,11 +412,12 @@ def ping(self): actor = Actor.options(num_cpus=0, resources={"node2": 1}).remote() ray.get(actor.ping.remote()) + drain_reason_message = "testing node preemption." # Preemption is always accepted. is_accepted, _ = gcs_client.drain_node( node2_id, autoscaler_pb2.DrainNodeReason.Value("DRAIN_NODE_REASON_PREEMPTION"), - "preemption", + drain_reason_message, 1, ) assert is_accepted @@ -426,8 +427,11 @@ def ping(self): try: ray.get(actor.ping.remote()) raise - except ray.exceptions.RayActorError as e: + except ray.exceptions.ActorDiedError as e: assert e.preempted + if graceful: + assert "The actor died because its node has died." in str(e) + assert "the actor's node was preempted: " + drain_reason_message in str(e) if __name__ == "__main__": diff --git a/python/ray/tests/test_node_death.py b/python/ray/tests/test_node_death.py index de36091c4341..cc2ef747102b 100644 --- a/python/ray/tests/test_node_death.py +++ b/python/ray/tests/test_node_death.py @@ -4,7 +4,7 @@ import ray from ray._private.test_utils import wait_for_condition -from ray.core.generated import gcs_pb2 +from ray.core.generated import common_pb2 def test_normal_termination(ray_start_cluster): @@ -14,17 +14,35 @@ def test_normal_termination(ray_start_cluster): worker_node = cluster.add_node(resources={"worker": 1}) cluster.wait_for_nodes() worker_node_id = worker_node.node_id + + @ray.remote + class Actor: + def ping(self): + pass + + actor = Actor.options(num_cpus=0, resources={"worker": 1}).remote() + ray.get(actor.ping.remote()) + + # normal node termination cluster.remove_node(worker_node) worker_node_info = [ node for node in ray.nodes() if node["NodeID"] == worker_node_id ][0] assert not worker_node_info["Alive"] - assert worker_node_info["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value( + assert worker_node_info["DeathReason"] == common_pb2.NodeDeathInfo.Reason.Value( "EXPECTED_TERMINATION" ) assert worker_node_info["DeathReasonMessage"] == "received SIGTERM" + try: + ray.get(actor.ping.remote()) + raise + except ray.exceptions.ActorDiedError as e: + assert not e.preempted + assert "The actor died because its node has died." in str(e) + assert "the actor's node was terminated expectedly: received SIGTERM" in str(e) + def test_abnormal_termination(monkeypatch, ray_start_cluster): monkeypatch.setenv("RAY_health_check_failure_threshold", "3") @@ -46,6 +64,14 @@ def test_abnormal_termination(monkeypatch, ray_start_cluster): == {head_node_id, worker_node_id} ) + @ray.remote + class Actor: + def ping(self): + pass + + actor = Actor.options(num_cpus=0, resources={"worker": 1}).remote() + ray.get(actor.ping.remote()) + # Simulate the worker node crashes. cluster.remove_node(worker_node, False) @@ -55,7 +81,7 @@ def test_abnormal_termination(monkeypatch, ray_start_cluster): ) worker_node = [node for node in ray.nodes() if node["NodeID"] == worker_node_id][0] - assert worker_node["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value( + assert worker_node["DeathReason"] == common_pb2.NodeDeathInfo.Reason.Value( "UNEXPECTED_TERMINATION" ) assert ( @@ -63,6 +89,17 @@ def test_abnormal_termination(monkeypatch, ray_start_cluster): == "health check failed due to missing too many heartbeats" ) + try: + ray.get(actor.ping.remote()) + raise + except ray.exceptions.ActorDiedError as e: + assert not e.preempted + assert "The actor died because its node has died." in str(e) + assert ( + "the actor's node was terminated unexpectedly: " + "health check failed due to missing too many heartbeats" in str(e) + ) + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/tests/test_runtime_env_agent.py b/python/ray/tests/test_runtime_env_agent.py index 4af9165754a3..0320bcba8b94 100644 --- a/python/ray/tests/test_runtime_env_agent.py +++ b/python/ray/tests/test_runtime_env_agent.py @@ -13,7 +13,7 @@ init_error_pubsub, wait_for_condition, ) -from ray.core.generated import gcs_pb2 +from ray.core.generated import common_pb2 from ray.runtime_env import RuntimeEnv import psutil @@ -160,7 +160,7 @@ def test_raylet_and_agent_share_fate(shutdown_only): node for node in ray.nodes() if node["NodeID"] == worker_node_id ][0] assert not worker_node_info["Alive"] - assert worker_node_info["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value( + assert worker_node_info["DeathReason"] == common_pb2.NodeDeathInfo.Reason.Value( "UNEXPECTED_TERMINATION" ) assert ( diff --git a/src/ray/core_worker/transport/direct_actor_task_submitter.cc b/src/ray/core_worker/transport/direct_actor_task_submitter.cc index 7c740f4cea70..0bda386195cc 100644 --- a/src/ray/core_worker/transport/direct_actor_task_submitter.cc +++ b/src/ray/core_worker/transport/direct_actor_task_submitter.cc @@ -347,12 +347,13 @@ void CoreWorkerDirectActorTaskSubmitter::FailTaskWithError( // Special error for preempted actor. The task "timed out" because the actor may // not have sent a notification to the gcs; regardless we already know it's // preempted and it's dead. - rpc::ActorDeathCause &actor_death_cause = *error_info.mutable_actor_died_error(); - actor_death_cause.mutable_actor_died_error_context()->set_actor_id( - task.task_spec.ActorId().Binary()); - actor_death_cause.mutable_actor_died_error_context()->set_preempted( - task.actor_preempted); - + auto actor_death_cause = error_info.mutable_actor_died_error(); + auto actor_died_error_context = actor_death_cause->mutable_actor_died_error_context(); + actor_died_error_context->set_actor_id(task.task_spec.ActorId().Binary()); + auto node_death_info = actor_died_error_context->mutable_node_death_info(); + node_death_info->set_reason(rpc::NodeDeathInfo::AUTOSCALER_DRAIN_PREEMPTED); + node_death_info->set_reason_message( + "the node was inferred to be dead due to draining."); error_info.set_error_type(rpc::ErrorType::ACTOR_DIED); error_info.set_error_message("Actor died by preemption."); } diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 008b6c13e3a2..fb89e092f256 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -205,17 +205,35 @@ const ray::rpc::ActorDeathCause GcsActorManager::GenNodeDiedCause( const std::string ip_address, std::shared_ptr node) { ray::rpc::ActorDeathCause death_cause; + auto actor_died_error_ctx = death_cause.mutable_actor_died_error_context(); AddActorInfo(actor, actor_died_error_ctx); - actor_died_error_ctx->set_error_message( - absl::StrCat("The actor is dead because its node has died. Node Id: ", - NodeID::FromBinary(node->node_id()).Hex())); - - // TODO(vitsai): Publish this information as well - if (auto death_info = node->death_info(); - death_info.reason() == rpc::NodeDeathInfo::AUTOSCALER_DRAIN_PREEMPTED) { - actor_died_error_ctx->set_preempted(true); + auto node_death_info = actor_died_error_ctx->mutable_node_death_info(); + node_death_info->CopyFrom(node->death_info()); + + std::ostringstream oss; + oss << "The actor died because its node has died. Node Id: " + << NodeID::FromBinary(node->node_id()).Hex() << "\n"; + switch (node_death_info->reason()) { + case rpc::NodeDeathInfo::EXPECTED_TERMINATION: + oss << "\tthe actor's node was terminated expectedly: "; + break; + case rpc::NodeDeathInfo::UNEXPECTED_TERMINATION: + oss << "\tthe actor's node was terminated unexpectedly: "; + break; + case rpc::NodeDeathInfo::AUTOSCALER_DRAIN_PREEMPTED: + oss << "\tthe actor's node was preempted: "; + break; + default: + // Control should not reach here, but in case it happens in unexpected scenarios, + // log it and provide a generic message to the user. + RAY_LOG(ERROR) << "Actor death is not expected to be caused by " + << rpc::NodeDeathInfo_Reason_Name(node_death_info->reason()); + oss << "\tthe actor's node was terminated: "; } + oss << node_death_info->reason_message(); + actor_died_error_ctx->set_error_message(oss.str()); + return death_cause; } diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 05c148b3fb2b..8f6d9c27cdee 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -308,6 +308,20 @@ message RayException { string formatted_exception_string = 3; } +message NodeDeathInfo { + // TODO(sang): Update drain reason + enum Reason { + UNSPECIFIED = 0; + EXPECTED_TERMINATION = 1; + UNEXPECTED_TERMINATION = 2; + AUTOSCALER_DRAIN_PREEMPTED = 3; + AUTOSCALER_DRAIN_IDLE = 4; + } + Reason reason = 1; + // A message describing the reason for the node death. + string reason_message = 2; +} + message ActorDeathCause { oneof context { // Indicates that this actor is marked as DEAD due to actor creation task failure. @@ -350,8 +364,8 @@ message ActorDiedErrorContext { // Whether the actor had never started running before it died, i.e. it was cancelled // before scheduling had completed. bool never_started = 10; - // Whether the actor was on a preempted node. - bool preempted = 11; + // The node death info, if node death is the cause of actor death. + optional NodeDeathInfo node_death_info = 11; } // Context for task OOM. diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 652310c3b0bf..fd8310b9f566 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -318,20 +318,6 @@ message NodeSnapshot { repeated string node_activity = 3; } -message NodeDeathInfo { - // TODO(sang): Update drain reason - enum Reason { - UNSPECIFIED = 0; - EXPECTED_TERMINATION = 1; - UNEXPECTED_TERMINATION = 2; - AUTOSCALER_DRAIN_PREEMPTED = 3; - AUTOSCALER_DRAIN_IDLE = 4; - } - Reason reason = 1; - // A message describing the reason for the node death. - string reason_message = 2; -} - message GcsNodeInfo { // State of a node. enum GcsNodeState {