-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Rework memory store signal checking in C++ instead of cython #49319
Changes from 1 commit
2f28e27
52d53f4
ebb1ae8
89e4afc
ef4782f
fbe7ccb
b27239d
09d2879
8fc08bf
0e1c6c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
// limitations under the License. | ||
|
||
#include <condition_variable> | ||
#include <csignal> | ||
#include <utility> | ||
|
||
#include "ray/common/ray_config.h" | ||
|
@@ -31,6 +32,16 @@ const int64_t kUnhandledErrorGracePeriodNanos = static_cast<int64_t>(5e9); | |
// when there are too many local objects. | ||
const int kMaxUnhandledErrorScanItems = 1000; | ||
|
||
namespace { | ||
|
||
Status signal_status = Status::OK(); | ||
|
||
void SignalHandler(int sigint) { | ||
signal_status = Status::Interrupted("Interrupted by signal: " + std::to_string(sigint)); | ||
} | ||
|
||
} // namespace | ||
|
||
/// A class that represents a `Get` request. | ||
class GetRequest { | ||
public: | ||
|
@@ -153,14 +164,12 @@ CoreWorkerMemoryStore::CoreWorkerMemoryStore( | |
instrumented_io_context &io_context, | ||
ReferenceCounter *counter, | ||
std::shared_ptr<raylet::RayletClient> raylet_client, | ||
std::function<Status()> check_signals, | ||
std::function<void(const RayObject &)> unhandled_exception_handler, | ||
std::function<std::shared_ptr<ray::RayObject>( | ||
const ray::RayObject &object, const ObjectID &object_id)> object_allocator) | ||
: io_context_(io_context), | ||
ref_counter_(counter), | ||
raylet_client_(std::move(raylet_client)), | ||
check_signals_(std::move(check_signals)), | ||
unhandled_exception_handler_(std::move(unhandled_exception_handler)), | ||
object_allocator_(std::move(object_allocator)) {} | ||
|
||
|
@@ -366,7 +375,6 @@ Status CoreWorkerMemoryStore::GetImpl(const std::vector<ObjectID> &object_ids, | |
|
||
bool done = false; | ||
bool timed_out = false; | ||
Status signal_status = Status::OK(); | ||
int64_t remaining_timeout = timeout_ms; | ||
int64_t iteration_timeout = | ||
std::min(timeout_ms, RayConfig::instance().get_timeout_milliseconds()); | ||
|
@@ -379,16 +387,16 @@ Status CoreWorkerMemoryStore::GetImpl(const std::vector<ObjectID> &object_ids, | |
// calls. If timeout_ms == -1, this should run forever until all objects are | ||
// ready or a signal is received. Else it should run repeatedly until that timeout | ||
// is reached. | ||
while (!timed_out && signal_status.ok() && | ||
!(done = get_request->Wait(iteration_timeout))) { | ||
if (check_signals_) { | ||
signal_status = check_signals_(); | ||
} | ||
|
||
if (remaining_timeout >= 0) { | ||
remaining_timeout -= iteration_timeout; | ||
iteration_timeout = std::min(remaining_timeout, iteration_timeout); | ||
timed_out = remaining_timeout <= 0; | ||
{ | ||
std::signal(SIGINT, SignalHandler); | ||
std::signal(SIGTERM, SignalHandler); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you sure you want to overwrite default SIGTERM handler? Which means exception thrown will not have any effect There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Trying to replicate the behavior of the check_signals mentioned in the pr description, I think sigterm maps to the SystemExit, will double check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm possibly not, removed sigterm and only keeping sigint for now, we really just need to check for ctrl+c looking at the previous pr that introduced check_signals |
||
while (!timed_out && signal_status.ok() && | ||
!(done = get_request->Wait(iteration_timeout))) { | ||
if (remaining_timeout >= 0) { | ||
remaining_timeout -= iteration_timeout; | ||
iteration_timeout = std::min(remaining_timeout, iteration_timeout); | ||
timed_out = remaining_timeout <= 0; | ||
} | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has the possibility of being called from multiple threads right, should we make this safe by making it thread_local or guarded by a mutex?