From 2f28e2717463a423b952dd36d030317ea827aaf5 Mon Sep 17 00:00:00 2001 From: dayshah Date: Tue, 17 Dec 2024 14:52:28 -0800 Subject: [PATCH 1/3] Rework memory store signal checking in CPP instead of cython Signed-off-by: dayshah --- .../experimental/test_accelerated_dag.py | 6 ---- src/ray/core_worker/core_worker.cc | 1 - .../memory_store/memory_store.cc | 34 ++++++++++++------- .../memory_store/memory_store.h | 4 --- src/ray/core_worker/test/memory_store_test.cc | 9 ++--- 5 files changed, 24 insertions(+), 30 deletions(-) diff --git a/python/ray/dag/tests/experimental/test_accelerated_dag.py b/python/ray/dag/tests/experimental/test_accelerated_dag.py index e24241ff7b8f..5b3e5896ec51 100644 --- a/python/ray/dag/tests/experimental/test_accelerated_dag.py +++ b/python/ray/dag/tests/experimental/test_accelerated_dag.py @@ -1110,12 +1110,6 @@ def test_dag_exception_chained(ray_start_regular, capsys): # Can use the DAG after exceptions are thrown. assert ray.get(compiled_dag.execute(1)) == 2 - # Note: somehow the auto triggered teardown() from ray.shutdown() - # does not finish in time for this test, leading to a segfault - # of the following test (likely due to a dangling monitor thread - # upon the new Ray init). - compiled_dag.teardown() - @pytest.mark.parametrize("single_fetch", [True, False]) def test_dag_exception_multi_output(ray_start_regular, single_fetch, capsys): diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 881f25462dda..bd89107b6bce 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -532,7 +532,6 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id) io_service_, reference_counter_.get(), local_raylet_client_, - options_.check_signals, [this](const RayObject &obj) { rpc::ErrorType error_type; if (obj.IsException(&error_type) && diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index 8669212f4ddc..43f83294d1e3 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include "ray/common/ray_config.h" @@ -31,6 +32,16 @@ const int64_t kUnhandledErrorGracePeriodNanos = static_cast(5e9); // when there are too many local objects. const int kMaxUnhandledErrorScanItems = 1000; +namespace { + +Status signal_status = Status::OK(); + +void SignalHandler(int sigint) { + signal_status = Status::Interrupted("Interrupted by signal: " + std::to_string(sigint)); +} + +} // namespace + /// A class that represents a `Get` request. class GetRequest { public: @@ -153,14 +164,12 @@ CoreWorkerMemoryStore::CoreWorkerMemoryStore( instrumented_io_context &io_context, ReferenceCounter *counter, std::shared_ptr raylet_client, - std::function check_signals, std::function unhandled_exception_handler, std::function( const ray::RayObject &object, const ObjectID &object_id)> object_allocator) : io_context_(io_context), ref_counter_(counter), raylet_client_(std::move(raylet_client)), - check_signals_(std::move(check_signals)), unhandled_exception_handler_(std::move(unhandled_exception_handler)), object_allocator_(std::move(object_allocator)) {} @@ -366,7 +375,6 @@ Status CoreWorkerMemoryStore::GetImpl(const std::vector &object_ids, bool done = false; bool timed_out = false; - Status signal_status = Status::OK(); int64_t remaining_timeout = timeout_ms; int64_t iteration_timeout = std::min(timeout_ms, RayConfig::instance().get_timeout_milliseconds()); @@ -379,16 +387,16 @@ Status CoreWorkerMemoryStore::GetImpl(const std::vector &object_ids, // calls. If timeout_ms == -1, this should run forever until all objects are // ready or a signal is received. Else it should run repeatedly until that timeout // is reached. - while (!timed_out && signal_status.ok() && - !(done = get_request->Wait(iteration_timeout))) { - if (check_signals_) { - signal_status = check_signals_(); - } - - if (remaining_timeout >= 0) { - remaining_timeout -= iteration_timeout; - iteration_timeout = std::min(remaining_timeout, iteration_timeout); - timed_out = remaining_timeout <= 0; + { + std::signal(SIGINT, SignalHandler); + std::signal(SIGTERM, SignalHandler); + while (!timed_out && signal_status.ok() && + !(done = get_request->Wait(iteration_timeout))) { + if (remaining_timeout >= 0) { + remaining_timeout -= iteration_timeout; + iteration_timeout = std::min(remaining_timeout, iteration_timeout); + timed_out = remaining_timeout <= 0; + } } } diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.h b/src/ray/core_worker/store_provider/memory_store/memory_store.h index 64a82081ff7c..f779dc281923 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.h +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.h @@ -51,7 +51,6 @@ class CoreWorkerMemoryStore { instrumented_io_context &io_context, ReferenceCounter *counter = nullptr, std::shared_ptr raylet_client = nullptr, - std::function check_signals = nullptr, std::function unhandled_exception_handler = nullptr, std::function(const RayObject &object, const ObjectID &object_id)> @@ -221,9 +220,6 @@ class CoreWorkerMemoryStore { std::vector)>>> object_async_get_requests_ ABSL_GUARDED_BY(mu_); - /// Function passed in to be called to check for signals (e.g., Ctrl-C). - std::function check_signals_; - /// Function called to report unhandled exceptions. std::function unhandled_exception_handler_; diff --git a/src/ray/core_worker/test/memory_store_test.cc b/src/ray/core_worker/test/memory_store_test.cc index b0beef6ecb97..ea98e34d124f 100644 --- a/src/ray/core_worker/test/memory_store_test.cc +++ b/src/ray/core_worker/test/memory_store_test.cc @@ -44,11 +44,9 @@ TEST(TestMemoryStore, TestReportUnhandledErrors) { std::shared_ptr provider = std::make_shared( - io_context.GetIoService(), - nullptr, - nullptr, - nullptr, - [&](const RayObject &obj) { unhandled_count++; }); + io_context.GetIoService(), nullptr, nullptr, [&](const RayObject &obj) { + unhandled_count++; + }); RayObject obj1(rpc::ErrorType::TASK_EXECUTION_EXCEPTION); RayObject obj2(rpc::ErrorType::TASK_EXECUTION_EXCEPTION); auto id1 = ObjectID::FromRandom(); @@ -203,7 +201,6 @@ TEST(TestMemoryStore, TestObjectAllocator) { nullptr, nullptr, nullptr, - nullptr, std::move(my_object_allocator)); const int32_t max_rounds = 1000; const std::string hello = "hello"; From 52d53f48c6c0e9b8b100a59787c785b4b48e223a Mon Sep 17 00:00:00 2001 From: dayshah Date: Tue, 17 Dec 2024 17:15:31 -0800 Subject: [PATCH 2/3] remove sigterm handler Signed-off-by: dayshah --- src/ray/core_worker/store_provider/memory_store/memory_store.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index 43f83294d1e3..0385f7c276cc 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -389,7 +389,6 @@ Status CoreWorkerMemoryStore::GetImpl(const std::vector &object_ids, // is reached. { std::signal(SIGINT, SignalHandler); - std::signal(SIGTERM, SignalHandler); while (!timed_out && signal_status.ok() && !(done = get_request->Wait(iteration_timeout))) { if (remaining_timeout >= 0) { From ebb1ae89dd1714ca686d5c041e047263b7e7e1ee Mon Sep 17 00:00:00 2001 From: dayshah Date: Fri, 20 Dec 2024 17:54:27 -0800 Subject: [PATCH 3/3] add back sigterm Signed-off-by: dayshah --- .../store_provider/memory_store/memory_store.cc | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index 0385f7c276cc..2e3edef72d53 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -34,11 +34,9 @@ const int kMaxUnhandledErrorScanItems = 1000; namespace { -Status signal_status = Status::OK(); +std::atomic signal_received = -1; -void SignalHandler(int sigint) { - signal_status = Status::Interrupted("Interrupted by signal: " + std::to_string(sigint)); -} +void SignalHandler(int signal) { signal_received = signal; } } // namespace @@ -389,7 +387,8 @@ Status CoreWorkerMemoryStore::GetImpl(const std::vector &object_ids, // is reached. { std::signal(SIGINT, SignalHandler); - while (!timed_out && signal_status.ok() && + std::signal(SIGTERM, SignalHandler); + while (!timed_out && signal_received == -1 && !(done = get_request->Wait(iteration_timeout))) { if (remaining_timeout >= 0) { remaining_timeout -= iteration_timeout; @@ -431,8 +430,9 @@ Status CoreWorkerMemoryStore::GetImpl(const std::vector &object_ids, } } - if (!signal_status.ok()) { - return signal_status; + if (signal_received != -1) { + return Status::Interrupted("Interrupted by signal: " + + std::to_string(signal_received)); } else if (done) { return Status::OK(); } else {