Skip to content

Commit

Permalink
[Core] Expose NodeDeathInfo in ActorDiedError (#45497)
Browse files Browse the repository at this point in the history
Signed-off-by: Rui Qiao <[email protected]>
  • Loading branch information
ruisearch42 authored Jun 1, 2024
1 parent fe191e6 commit 7021b10
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 50 deletions.
4 changes: 2 additions & 2 deletions dashboard/tests/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
wait_until_server_available,
wait_until_succeeded_without_exception,
)
from ray.core.generated import gcs_pb2
from ray.core.generated import common_pb2
import ray.scripts.scripts as scripts
from ray.dashboard import dashboard
from ray.dashboard.head import DashboardHead
Expand Down Expand Up @@ -219,7 +219,7 @@ def test_raylet_and_agent_share_fate(shutdown_only):
node for node in ray.nodes() if node["NodeID"] == worker_node_id
][0]
assert not worker_node_info["Alive"]
assert worker_node_info["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value(
assert worker_node_info["DeathReason"] == common_pb2.NodeDeathInfo.Reason.Value(
"UNEXPECTED_TERMINATION"
)
assert (
Expand Down
14 changes: 9 additions & 5 deletions python/ray/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
ActorDiedErrorContext,
Address,
Language,
NodeDeathInfo,
RayException,
)
from ray.util.annotations import DeveloperAPI, PublicAPI
Expand Down Expand Up @@ -339,7 +340,9 @@ class ActorDiedError(RayActorError):

BASE_ERROR_MSG = "The actor died unexpectedly before finishing this task."

def __init__(self, cause: Union[RayTaskError, ActorDiedErrorContext] = None):
def __init__(
self, cause: Optional[Union[RayTaskError, ActorDiedErrorContext]] = None
):
"""
Construct a RayActorError by building the arguments.
"""
Expand Down Expand Up @@ -380,11 +383,12 @@ def __init__(self, cause: Union[RayTaskError, ActorDiedErrorContext] = None):
error_msg_lines.append(
"The actor never ran - it was cancelled before it started running."
)
if cause.preempted:
if (
cause.node_death_info
and cause.node_death_info.reason
== NodeDeathInfo.AUTOSCALER_DRAIN_PREEMPTED
):
preempted = True
error_msg_lines.append(
"\tThe actor's node was killed by a spot preemption."
)
error_msg = "\n".join(error_msg_lines)
actor_id = ActorID(cause.actor_id).hex()
super().__init__(actor_id, error_msg, actor_init_failed, preempted)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_actor_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ def create_actor(self):
cluster.remove_node(node_to_kill)
with pytest.raises(
ray.exceptions.RayActorError,
match="The actor is dead because its node has died.",
match="The actor died because its node has died.",
) as exc_info:
ray.get(a.check_alive.remote())
assert exc_info.value.actor_id == a._actor_id.hex()
Expand Down
18 changes: 11 additions & 7 deletions python/ray/tests/test_draining.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import ray
import time
from ray._raylet import GcsClient
from ray.core.generated import autoscaler_pb2, gcs_pb2
from ray.core.generated import autoscaler_pb2, common_pb2
from ray._private.test_utils import wait_for_condition
from ray.util.scheduling_strategies import (
NodeAffinitySchedulingStrategy,
Expand Down Expand Up @@ -69,7 +69,7 @@ def drain_until_accept():
)

worker_node = [node for node in ray.nodes() if node["NodeID"] == worker_node_id][0]
assert worker_node["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value(
assert worker_node["DeathReason"] == common_pb2.NodeDeathInfo.Reason.Value(
"AUTOSCALER_DRAIN_IDLE"
)
assert worker_node["DeathReasonMessage"] == "idle for long enough"
Expand Down Expand Up @@ -137,7 +137,7 @@ def ping(self):
)

worker_node = [node for node in ray.nodes() if node["NodeID"] == worker_node_id][0]
assert worker_node["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value(
assert worker_node["DeathReason"] == common_pb2.NodeDeathInfo.Reason.Value(
"AUTOSCALER_DRAIN_PREEMPTED"
)
assert worker_node["DeathReasonMessage"] == "preemption"
Expand Down Expand Up @@ -195,7 +195,7 @@ def ping(self):
)

worker_node = [node for node in ray.nodes() if node["NodeID"] == worker_node_id][0]
assert worker_node["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value(
assert worker_node["DeathReason"] == common_pb2.NodeDeathInfo.Reason.Value(
"AUTOSCALER_DRAIN_PREEMPTED"
)
assert worker_node["DeathReasonMessage"] == "preemption"
Expand Down Expand Up @@ -251,7 +251,7 @@ def ping(self):
# Since worker node failure is detected to be before the draining deadline,
# this is considered as an unexpected termination.
worker_node = [node for node in ray.nodes() if node["NodeID"] == worker_node_id][0]
assert worker_node["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value(
assert worker_node["DeathReason"] == common_pb2.NodeDeathInfo.Reason.Value(
"UNEXPECTED_TERMINATION"
)
assert (
Expand Down Expand Up @@ -412,11 +412,12 @@ def ping(self):
actor = Actor.options(num_cpus=0, resources={"node2": 1}).remote()
ray.get(actor.ping.remote())

drain_reason_message = "testing node preemption."
# Preemption is always accepted.
is_accepted, _ = gcs_client.drain_node(
node2_id,
autoscaler_pb2.DrainNodeReason.Value("DRAIN_NODE_REASON_PREEMPTION"),
"preemption",
drain_reason_message,
1,
)
assert is_accepted
Expand All @@ -426,8 +427,11 @@ def ping(self):
try:
ray.get(actor.ping.remote())
raise
except ray.exceptions.RayActorError as e:
except ray.exceptions.ActorDiedError as e:
assert e.preempted
if graceful:
assert "The actor died because its node has died." in str(e)
assert "the actor's node was preempted: " + drain_reason_message in str(e)


if __name__ == "__main__":
Expand Down
43 changes: 40 additions & 3 deletions python/ray/tests/test_node_death.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import ray

from ray._private.test_utils import wait_for_condition
from ray.core.generated import gcs_pb2
from ray.core.generated import common_pb2


def test_normal_termination(ray_start_cluster):
Expand All @@ -14,17 +14,35 @@ def test_normal_termination(ray_start_cluster):
worker_node = cluster.add_node(resources={"worker": 1})
cluster.wait_for_nodes()
worker_node_id = worker_node.node_id

@ray.remote
class Actor:
def ping(self):
pass

actor = Actor.options(num_cpus=0, resources={"worker": 1}).remote()
ray.get(actor.ping.remote())

# normal node termination
cluster.remove_node(worker_node)

worker_node_info = [
node for node in ray.nodes() if node["NodeID"] == worker_node_id
][0]
assert not worker_node_info["Alive"]
assert worker_node_info["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value(
assert worker_node_info["DeathReason"] == common_pb2.NodeDeathInfo.Reason.Value(
"EXPECTED_TERMINATION"
)
assert worker_node_info["DeathReasonMessage"] == "received SIGTERM"

try:
ray.get(actor.ping.remote())
raise
except ray.exceptions.ActorDiedError as e:
assert not e.preempted
assert "The actor died because its node has died." in str(e)
assert "the actor's node was terminated expectedly: received SIGTERM" in str(e)


def test_abnormal_termination(monkeypatch, ray_start_cluster):
monkeypatch.setenv("RAY_health_check_failure_threshold", "3")
Expand All @@ -46,6 +64,14 @@ def test_abnormal_termination(monkeypatch, ray_start_cluster):
== {head_node_id, worker_node_id}
)

@ray.remote
class Actor:
def ping(self):
pass

actor = Actor.options(num_cpus=0, resources={"worker": 1}).remote()
ray.get(actor.ping.remote())

# Simulate the worker node crashes.
cluster.remove_node(worker_node, False)

Expand All @@ -55,14 +81,25 @@ def test_abnormal_termination(monkeypatch, ray_start_cluster):
)

worker_node = [node for node in ray.nodes() if node["NodeID"] == worker_node_id][0]
assert worker_node["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value(
assert worker_node["DeathReason"] == common_pb2.NodeDeathInfo.Reason.Value(
"UNEXPECTED_TERMINATION"
)
assert (
worker_node["DeathReasonMessage"]
== "health check failed due to missing too many heartbeats"
)

try:
ray.get(actor.ping.remote())
raise
except ray.exceptions.ActorDiedError as e:
assert not e.preempted
assert "The actor died because its node has died." in str(e)
assert (
"the actor's node was terminated unexpectedly: "
"health check failed due to missing too many heartbeats" in str(e)
)


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
4 changes: 2 additions & 2 deletions python/ray/tests/test_runtime_env_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
init_error_pubsub,
wait_for_condition,
)
from ray.core.generated import gcs_pb2
from ray.core.generated import common_pb2
from ray.runtime_env import RuntimeEnv
import psutil

Expand Down Expand Up @@ -160,7 +160,7 @@ def test_raylet_and_agent_share_fate(shutdown_only):
node for node in ray.nodes() if node["NodeID"] == worker_node_id
][0]
assert not worker_node_info["Alive"]
assert worker_node_info["DeathReason"] == gcs_pb2.NodeDeathInfo.Reason.Value(
assert worker_node_info["DeathReason"] == common_pb2.NodeDeathInfo.Reason.Value(
"UNEXPECTED_TERMINATION"
)
assert (
Expand Down
13 changes: 7 additions & 6 deletions src/ray/core_worker/transport/direct_actor_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,13 @@ void CoreWorkerDirectActorTaskSubmitter::FailTaskWithError(
// Special error for preempted actor. The task "timed out" because the actor may
// not have sent a notification to the gcs; regardless we already know it's
// preempted and it's dead.
rpc::ActorDeathCause &actor_death_cause = *error_info.mutable_actor_died_error();
actor_death_cause.mutable_actor_died_error_context()->set_actor_id(
task.task_spec.ActorId().Binary());
actor_death_cause.mutable_actor_died_error_context()->set_preempted(
task.actor_preempted);

auto actor_death_cause = error_info.mutable_actor_died_error();
auto actor_died_error_context = actor_death_cause->mutable_actor_died_error_context();
actor_died_error_context->set_actor_id(task.task_spec.ActorId().Binary());
auto node_death_info = actor_died_error_context->mutable_node_death_info();
node_death_info->set_reason(rpc::NodeDeathInfo::AUTOSCALER_DRAIN_PREEMPTED);
node_death_info->set_reason_message(
"the node was inferred to be dead due to draining.");
error_info.set_error_type(rpc::ErrorType::ACTOR_DIED);
error_info.set_error_message("Actor died by preemption.");
}
Expand Down
34 changes: 26 additions & 8 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,35 @@ const ray::rpc::ActorDeathCause GcsActorManager::GenNodeDiedCause(
const std::string ip_address,
std::shared_ptr<rpc::GcsNodeInfo> node) {
ray::rpc::ActorDeathCause death_cause;

auto actor_died_error_ctx = death_cause.mutable_actor_died_error_context();
AddActorInfo(actor, actor_died_error_ctx);
actor_died_error_ctx->set_error_message(
absl::StrCat("The actor is dead because its node has died. Node Id: ",
NodeID::FromBinary(node->node_id()).Hex()));

// TODO(vitsai): Publish this information as well
if (auto death_info = node->death_info();
death_info.reason() == rpc::NodeDeathInfo::AUTOSCALER_DRAIN_PREEMPTED) {
actor_died_error_ctx->set_preempted(true);
auto node_death_info = actor_died_error_ctx->mutable_node_death_info();
node_death_info->CopyFrom(node->death_info());

std::ostringstream oss;
oss << "The actor died because its node has died. Node Id: "
<< NodeID::FromBinary(node->node_id()).Hex() << "\n";
switch (node_death_info->reason()) {
case rpc::NodeDeathInfo::EXPECTED_TERMINATION:
oss << "\tthe actor's node was terminated expectedly: ";
break;
case rpc::NodeDeathInfo::UNEXPECTED_TERMINATION:
oss << "\tthe actor's node was terminated unexpectedly: ";
break;
case rpc::NodeDeathInfo::AUTOSCALER_DRAIN_PREEMPTED:
oss << "\tthe actor's node was preempted: ";
break;
default:
// Control should not reach here, but in case it happens in unexpected scenarios,
// log it and provide a generic message to the user.
RAY_LOG(ERROR) << "Actor death is not expected to be caused by "
<< rpc::NodeDeathInfo_Reason_Name(node_death_info->reason());
oss << "\tthe actor's node was terminated: ";
}
oss << node_death_info->reason_message();
actor_died_error_ctx->set_error_message(oss.str());

return death_cause;
}

Expand Down
18 changes: 16 additions & 2 deletions src/ray/protobuf/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,20 @@ message RayException {
string formatted_exception_string = 3;
}

message NodeDeathInfo {
// TODO(sang): Update drain reason
enum Reason {
UNSPECIFIED = 0;
EXPECTED_TERMINATION = 1;
UNEXPECTED_TERMINATION = 2;
AUTOSCALER_DRAIN_PREEMPTED = 3;
AUTOSCALER_DRAIN_IDLE = 4;
}
Reason reason = 1;
// A message describing the reason for the node death.
string reason_message = 2;
}

message ActorDeathCause {
oneof context {
// Indicates that this actor is marked as DEAD due to actor creation task failure.
Expand Down Expand Up @@ -350,8 +364,8 @@ message ActorDiedErrorContext {
// Whether the actor had never started running before it died, i.e. it was cancelled
// before scheduling had completed.
bool never_started = 10;
// Whether the actor was on a preempted node.
bool preempted = 11;
// The node death info, if node death is the cause of actor death.
optional NodeDeathInfo node_death_info = 11;
}

// Context for task OOM.
Expand Down
14 changes: 0 additions & 14 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -318,20 +318,6 @@ message NodeSnapshot {
repeated string node_activity = 3;
}

message NodeDeathInfo {
// TODO(sang): Update drain reason
enum Reason {
UNSPECIFIED = 0;
EXPECTED_TERMINATION = 1;
UNEXPECTED_TERMINATION = 2;
AUTOSCALER_DRAIN_PREEMPTED = 3;
AUTOSCALER_DRAIN_IDLE = 4;
}
Reason reason = 1;
// A message describing the reason for the node death.
string reason_message = 2;
}

message GcsNodeInfo {
// State of a node.
enum GcsNodeState {
Expand Down

0 comments on commit 7021b10

Please sign in to comment.