diff --git a/python/ray/dag/tests/experimental/test_accelerated_dag.py b/python/ray/dag/tests/experimental/test_accelerated_dag.py index 57d45ca5dae82..53ca89a43ad56 100644 --- a/python/ray/dag/tests/experimental/test_accelerated_dag.py +++ b/python/ray/dag/tests/experimental/test_accelerated_dag.py @@ -1110,12 +1110,6 @@ def test_dag_exception_chained(ray_start_regular, capsys): # Can use the DAG after exceptions are thrown. assert ray.get(compiled_dag.execute(1)) == 2 - # Note: somehow the auto triggered teardown() from ray.shutdown() - # does not finish in time for this test, leading to a segfault - # of the following test (likely due to a dangling monitor thread - # upon the new Ray init). - compiled_dag.teardown() - @pytest.mark.parametrize("single_fetch", [True, False]) def test_dag_exception_multi_output(ray_start_regular, single_fetch, capsys): diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index 1a80a39fbfdae..b52beab992b8b 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include "ray/common/ray_config.h" @@ -31,6 +32,14 @@ constexpr int64_t kUnhandledErrorGracePeriodNanos = static_cast(5e9); // when there are too many local objects. constexpr int kMaxUnhandledErrorScanItems = 1000; +namespace { + +std::atomic signal_received = -1; + +void SignalHandler(int signal) { signal_received = signal; } + +} // namespace + /// A class that represents a `Get` request. class GetRequest { public: @@ -345,29 +354,45 @@ Status CoreWorkerMemoryStore::GetImpl(const std::vector &object_ids, bool done = false; bool timed_out = false; - Status signal_status = Status::OK(); int64_t remaining_timeout = timeout_ms; int64_t iteration_timeout = timeout_ms == -1 ? RayConfig::instance().get_timeout_milliseconds() : std::min(timeout_ms, RayConfig::instance().get_timeout_milliseconds()); + // We need to register a custom signal handler because check_signals cannot check + // signals if they're not on the main python thread, so we use both to check for SIGINT + // and SIGTERM and for python SystemExit. +#if defined(__APPLE__) || defined(__linux__) + struct sigaction new_action {}; + struct sigaction old_sigint {}; + struct sigaction old_sigterm {}; + sigemptyset(&new_action.sa_mask); + new_action.sa_flags = 0; + new_action.sa_handler = SignalHandler; + sigaction(SIGINT, &new_action, &old_sigint); + sigaction(SIGTERM, &new_action, &old_sigterm); +#endif + auto signal_status = Status::OK(); // Repeatedly call Wait() on a shorter timeout so we can check for signals between // calls. If timeout_ms == -1, this should run forever until all objects are // ready or a signal is received. Else it should run repeatedly until that timeout // is reached. - while (!timed_out && signal_status.ok() && - !(done = get_request->Wait(iteration_timeout))) { - if (check_signals_) { + while (!timed_out && !(done = get_request->Wait(iteration_timeout)) && + (signal_status.ok() && signal_received == -1)) { + if (check_signals_ != nullptr) { signal_status = check_signals_(); } - if (remaining_timeout >= 0) { remaining_timeout -= iteration_timeout; iteration_timeout = std::min(remaining_timeout, iteration_timeout); timed_out = remaining_timeout <= 0; } } +#if defined(__APPLE__) || defined(__linux__) + sigaction(SIGINT, &old_sigint, nullptr); + sigaction(SIGTERM, &old_sigterm, nullptr); +#endif if (should_notify_raylet) { RAY_CHECK_OK(raylet_client_->NotifyDirectCallTaskUnblocked()); @@ -401,6 +426,9 @@ Status CoreWorkerMemoryStore::GetImpl(const std::vector &object_ids, if (!signal_status.ok()) { return signal_status; + } else if (signal_received != -1) { + return Status::Interrupted("Interrupted by signal: " + + std::to_string(signal_received)); } else if (done) { return Status::OK(); } else {