From 439d3071c9849d785e18d7e095470d86ee1e7257 Mon Sep 17 00:00:00 2001 From: dentiny Date: Sat, 18 Jan 2025 05:54:35 +0000 Subject: [PATCH 1/5] cleanup utils folder bazel dependency Signed-off-by: dentiny --- BUILD.bazel | 2 + src/ray/common/asio/asio_util.h | 1 + src/ray/common/id.h | 1 + src/ray/core_worker/task_manager.cc | 2 +- src/ray/gcs/redis_context.h | 1 + src/ray/object_manager/plasma/store_runner.cc | 3 +- src/ray/raylet/agent_manager.cc | 1 + src/ray/rpc/client_call.h | 1 + src/ray/rpc/grpc_server.cc | 1 + src/ray/util/BUILD | 223 ++++++++++++++++-- src/ray/util/event.cc | 3 + src/ray/util/event.h | 1 - src/ray/util/exponential_backoff.cc | 6 +- src/ray/util/exponential_backoff.h | 45 +++- src/ray/util/filesystem.h | 4 +- src/ray/util/logging.cc | 2 +- src/ray/util/memory.h | 2 +- src/ray/util/ordered_set.h | 82 ------- src/ray/util/pipe_logger.cc | 1 + src/ray/util/process.cc | 1 - src/ray/util/random.h | 30 +++ src/ray/util/string_utils.h | 33 +++ src/ray/util/tests/event_test.cc | 7 +- .../util/tests/exponential_backoff_test.cc | 41 +++- src/ray/util/tests/util_test.cc | 17 -- src/ray/util/thread_utils.h | 111 +++++++++ src/ray/util/timestamp_utils.h | 27 +++ src/ray/util/util.h | 141 ----------- 28 files changed, 493 insertions(+), 297 deletions(-) delete mode 100644 src/ray/util/ordered_set.h create mode 100644 src/ray/util/random.h create mode 100644 src/ray/util/string_utils.h create mode 100644 src/ray/util/thread_utils.h create mode 100644 src/ray/util/timestamp_utils.h diff --git a/BUILD.bazel b/BUILD.bazel index 3cc60e7c04c8d..d3944f4c69f47 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -589,6 +589,7 @@ ray_cc_library( ], deps = [ "//src/ray/util", + "//src/ray/util:size_literals", "@com_github_jupp0r_prometheus_cpp//pull", "@com_google_absl//absl/base:core_headers", "@com_google_absl//absl/container:flat_hash_map", @@ -786,6 +787,7 @@ ray_cc_library( ":worker_rpc", "//src/ray/protobuf:worker_cc_proto", "//src/ray/util", + "//src/ray/util:shared_lru", "@boost//:circular_buffer", "@boost//:fiber", "@com_google_absl//absl/cleanup", diff --git a/src/ray/common/asio/asio_util.h b/src/ray/common/asio/asio_util.h index a22110502d242..2614bf09f779e 100644 --- a/src/ray/common/asio/asio_util.h +++ b/src/ray/common/asio/asio_util.h @@ -23,6 +23,7 @@ #include "ray/common/asio/instrumented_io_context.h" #include "ray/util/array.h" +#include "ray/util/thread_utils.h" #include "ray/util/util.h" template diff --git a/src/ray/common/id.h b/src/ray/common/id.h index ef349a75f607e..35bb5affdbcf8 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -26,6 +26,7 @@ #include "ray/common/constants.h" #include "ray/util/logging.h" +#include "ray/util/random.h" #include "ray/util/util.h" #include "ray/util/visibility.h" diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 79c559a15415f..2a4ca4c1967f9 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -1015,7 +1015,7 @@ bool TaskManager::RetryTaskIfPossible(const TaskID &task_id, // TODO(clarng): clean up and remove task_retry_delay_ms that is relied // on by some tests. int32_t delay_ms = task_failed_due_to_oom - ? ExponentialBackoff::GetBackoffMs( + ? ExponentialBackOff::GetBackoffMs( spec.AttemptNumber(), RayConfig::instance().task_oom_retry_delay_base_ms()) : RayConfig::instance().task_retry_delay_ms(); diff --git a/src/ray/gcs/redis_context.h b/src/ray/gcs/redis_context.h index 5fa105790e38b..0f2060882702a 100644 --- a/src/ray/gcs/redis_context.h +++ b/src/ray/gcs/redis_context.h @@ -22,6 +22,7 @@ #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/status.h" #include "ray/gcs/redis_async_context.h" +#include "ray/util/exponential_backoff.h" #include "ray/util/logging.h" #include "ray/util/util.h" #include "src/ray/protobuf/gcs.pb.h" diff --git a/src/ray/object_manager/plasma/store_runner.cc b/src/ray/object_manager/plasma/store_runner.cc index 44570d0daae41..432c4242da0f8 100644 --- a/src/ray/object_manager/plasma/store_runner.cc +++ b/src/ray/object_manager/plasma/store_runner.cc @@ -9,11 +9,12 @@ #include "ray/common/ray_config.h" #include "ray/object_manager/plasma/plasma_allocator.h" +#include "ray/util/thread_utils.h" namespace plasma { namespace internal { void SetMallocGranularity(int value); -} +} // namespace internal PlasmaStoreRunner::PlasmaStoreRunner(std::string socket_name, int64_t system_memory, diff --git a/src/ray/raylet/agent_manager.cc b/src/ray/raylet/agent_manager.cc index 49f40ddcead40..e90d1e85aeec4 100644 --- a/src/ray/raylet/agent_manager.cc +++ b/src/ray/raylet/agent_manager.cc @@ -21,6 +21,7 @@ #include "ray/util/event_label.h" #include "ray/util/logging.h" #include "ray/util/process.h" +#include "ray/util/thread_utils.h" #include "ray/util/util.h" namespace ray { diff --git a/src/ray/rpc/client_call.h b/src/ray/rpc/client_call.h index 948f36c950e44..4af71ec8ef465 100644 --- a/src/ray/rpc/client_call.h +++ b/src/ray/rpc/client_call.h @@ -26,6 +26,7 @@ #include "ray/common/grpc_util.h" #include "ray/common/id.h" #include "ray/common/status.h" +#include "ray/util/thread_utils.h" #include "ray/util/util.h" namespace ray { diff --git a/src/ray/rpc/grpc_server.cc b/src/ray/rpc/grpc_server.cc index bed7ec2467aba..abc932064302d 100644 --- a/src/ray/rpc/grpc_server.cc +++ b/src/ray/rpc/grpc_server.cc @@ -23,6 +23,7 @@ #include "ray/common/ray_config.h" #include "ray/rpc/common.h" #include "ray/stats/metric.h" +#include "ray/util/thread_utils.h" #include "ray/util/util.h" namespace ray { diff --git a/src/ray/util/BUILD b/src/ray/util/BUILD index 7fbba0dd89edb..6e230c190e285 100644 --- a/src/ray/util/BUILD +++ b/src/ray/util/BUILD @@ -1,44 +1,213 @@ load("//bazel:ray.bzl", "ray_cc_library") ray_cc_library( - name = "util", - srcs = glob( - [ - "*.cc", - ], - exclude = [ - "*_test.cc", - ], - ), - hdrs = glob([ - "*.h", - ]), - linkopts = select({ - "@platforms//os:windows": [], - "//conditions:default": ["-lpthread"], - }), + name = "visibility", + hdrs = ["visibility.h"], +) + +ray_cc_library( + name = "macros", + hdrs = ["macros.h"], +) + +ray_cc_library( + name = "event_label", + hdrs = ["event_label.h"], +) + +ray_cc_library( + name = "array", + hdrs = ["array.h"], +) + +ray_cc_library( + name = "thread_utils", + hdrs = ["thread_utils.h"], deps = [ ":thread_checker", - "//:aligned_alloc", - "//:sha256", + ], +) + +ray_cc_library( + name = "exponential_backoff", + hdrs = ["exponential_backoff.h"], + srcs = ["exponential_backoff.cc"], + deps = [ + ":logging", + ], +) + +# TODO(hjiang): filesystem and logging has interdependency, we should split them into three targets: filesystem, logging, ray_check_macros. +ray_cc_library( + name = "logging", + hdrs = [ + "filesystem.h", + "logging.h", + ], + srcs = [ + "filesystem.cc", + "logging.cc", + ], + deps = [ + ":event_label", + ":macros", + ":thread_utils", + "@com_github_spdlog//:spdlog", + "@com_google_absl//absl/debugging:failure_signal_handler", + "@com_google_absl//absl/strings:str_format", + "@com_google_googletest//:gtest_main", + "@nlohmann_json", + ], +) + +ray_cc_library( + name = "container_util", + hdrs = ["container_util.h"], + deps = [ + ":logging", + "@com_google_absl//absl/container:flat_hash_map", + "@com_google_absl//absl/container:flat_hash_set", + ], +) + +ray_cc_library( + name = "process", + hdrs = [ + "process.h", + "subreaper.h", + ], + srcs = [ + "process.cc", + "subreaper.cc", + ], + deps = [ + ":logging", + ":macros", + "@boost//:asio", + "@com_google_absl//absl/container:flat_hash_set", + "@com_google_absl//absl/synchronization", + ], +) + +ray_cc_library( + name = "function_traits", + hdrs = ["function_traits.h"], + deps = [ + "@boost//:functional", + ], +) + +ray_cc_library( + name = "counter_map", + hdrs = ["counter_map.h"], + deps = [ + ":logging", + ], +) + +ray_cc_library( + name = "event", + hdrs = ["event.h"], + srcs = ["event.cc"], + deps = [ + ":logging", + ":random", + ":string_utils", + ":timestamp_utils", "//src/ray/protobuf:event_cc_proto", "//src/ray/protobuf:export_event_cc_proto", "@boost//:asio", "@com_github_spdlog//:spdlog", "@com_google_absl//absl/container:flat_hash_map", - "@com_google_absl//absl/container:flat_hash_set", - "@com_google_absl//absl/debugging:failure_signal_handler", - "@com_google_absl//absl/debugging:stacktrace", - "@com_google_absl//absl/debugging:symbolize", - "@com_google_absl//absl/random", - "@com_google_absl//absl/synchronization", - "@com_google_absl//absl/time", - "@com_google_googletest//:gtest_main", + "@com_google_googletest//:gtest_prod", "@com_google_protobuf//:protobuf", "@nlohmann_json", ], ) +ray_cc_library( + name = "timestamp_utils", + hdrs = ["timestamp_utils.h"], +) + +ray_cc_library( + name = "random", + hdrs = ["random.h"], + deps = [ + "@com_google_absl//absl/random", + ], +) + +ray_cc_library( + name = "string_utils", + hdrs = ["string_utils.h"], +) + +ray_cc_library( + name = "memory", + hdrs = ["memory.h"], + srcs = ["memory.cc"], +) + +ray_cc_library( + name = "type_traits", + hdrs = ["type_traits.h"], +) + +ray_cc_library( + name = "throttler", + hdrs = ["throttler.h"], + deps = [ + "@com_google_absl//absl/time", + ], +) + +ray_cc_library( + name = "sequencer", + hdrs = ["sequencer.h"], + deps = [ + "@com_google_absl//absl/container:flat_hash_map", + "@com_google_absl//absl/synchronization", + ], +) + +ray_cc_library( + name = "sample", + hdrs = ["sample.h"], +) + +# A giant 'util' target is split since PR (TODO: fill in PR number), here we keep the 'util' target for API compatibility. +# +# TODO(hjiang): We include a bunch of misc util function/class inside of the class, should split into multiple files and build targets. +ray_cc_library( + name = "util", + hdrs = ["util.h"], + srcs = ["util.cc"], + deps = [ + ":array", + ":container_util", + ":counter_map", + ":event", + ":event_label", + ":exponential_backoff", + ":function_traits", + ":logging", + ":macros", + ":memory", + ":process", + ":random", + ":sample", + ":sequencer", + ":string_utils", + ":timestamp_utils", + ":throttler", + ":thread_utils", + ":type_traits", + ":visibility", + "//:sha256", + ], +) + ray_cc_library( name = "size_literals", hdrs = ["size_literals.h"], @@ -82,6 +251,8 @@ ray_cc_library( srcs = ["pipe_logger.cc"], deps = [ ":compat", + ":stream_redirection_options", + ":thread_utils", ":util", "@com_github_spdlog//:spdlog", "@com_google_absl//absl/strings", diff --git a/src/ray/util/event.cc b/src/ray/util/event.cc index e18d4addae9a9..ee22e93cde71a 100644 --- a/src/ray/util/event.cc +++ b/src/ray/util/event.cc @@ -21,6 +21,9 @@ #include "absl/base/call_once.h" #include "absl/time/time.h" +#include "ray/util/random.h" +#include "ray/util/string_utils.h" +#include "ray/util/timestamp_utils.h" using json = nlohmann::json; diff --git a/src/ray/util/event.h b/src/ray/util/event.h index d6a1652c2597a..bcbabfdbf0e0c 100644 --- a/src/ray/util/event.h +++ b/src/ray/util/event.h @@ -29,7 +29,6 @@ #include "absl/container/flat_hash_map.h" #include "nlohmann/json.hpp" #include "ray/util/logging.h" -#include "ray/util/util.h" #include "spdlog/sinks/basic_file_sink.h" #include "spdlog/sinks/rotating_file_sink.h" #include "spdlog/spdlog.h" diff --git a/src/ray/util/exponential_backoff.cc b/src/ray/util/exponential_backoff.cc index 5f050e8d5a9dd..1685136088de2 100644 --- a/src/ray/util/exponential_backoff.cc +++ b/src/ray/util/exponential_backoff.cc @@ -14,13 +14,11 @@ #include "ray/util/exponential_backoff.h" -#include - -#include "ray/util/logging.h" +#include namespace ray { -uint64_t ExponentialBackoff::GetBackoffMs(uint64_t attempt, +uint64_t ExponentialBackOff::GetBackoffMs(uint64_t attempt, uint64_t base_ms, uint64_t max_backoff_ms) { uint64_t delay = static_cast(pow(2, attempt)); diff --git a/src/ray/util/exponential_backoff.h b/src/ray/util/exponential_backoff.h index f3465bb6cf90b..bf57eedcb4655 100644 --- a/src/ray/util/exponential_backoff.h +++ b/src/ray/util/exponential_backoff.h @@ -14,15 +14,33 @@ #pragma once -#include -#include +#include +#include + +#include "ray/util/logging.h" namespace ray { /// Provides the exponential backoff algorithm that is typically used /// for throttling. -class ExponentialBackoff { +class ExponentialBackOff { public: + /// Construct an exponential back off counter. + /// + /// \param[in] initial_value The start value for this counter + /// \param[in] multiplier The multiplier for this counter. + /// \param[in] max_value The maximum value for this counter. By default it's + /// infinite double. + ExponentialBackOff(uint64_t initial_value, + double multiplier, + uint64_t max_value = std::numeric_limits::max()) + : curr_value_(initial_value), + initial_value_(initial_value), + max_value_(max_value), + multiplier_(multiplier) { + RAY_CHECK(multiplier > 0.0) << "Multiplier must be greater than 0"; + } + /// Computes the backoff delay using the exponential backoff algorithm, /// using the formula /// @@ -33,11 +51,28 @@ class ExponentialBackoff { /// @return the delay in ms based on the formula static uint64_t GetBackoffMs(uint64_t attempt, uint64_t base_ms, - uint64_t max_backoff_ms = kMaxBackoffMs); + uint64_t max_backoff_ms = kDefaultMaxBackoffMs); + + uint64_t Next() { + auto ret = curr_value_; + curr_value_ = curr_value_ * multiplier_; + curr_value_ = std::min(curr_value_, max_value_); + return ret; + } + + uint64_t Current() { return curr_value_; } + + void Reset() { curr_value_ = initial_value_; } private: + private: + uint64_t curr_value_; + uint64_t initial_value_; + uint64_t max_value_; + double multiplier_; + // The default cap on the backoff delay. - static constexpr uint64_t kMaxBackoffMs = 1 * 60 * 1000; + static constexpr uint64_t kDefaultMaxBackoffMs = 1 * 60 * 1000; }; } // namespace ray diff --git a/src/ray/util/filesystem.h b/src/ray/util/filesystem.h index 5f15d055368ed..390c67f3f3eb7 100644 --- a/src/ray/util/filesystem.h +++ b/src/ray/util/filesystem.h @@ -25,7 +25,7 @@ namespace ray { /// \return The portable directory separator (slash on all OSes). -static inline char GetAltDirSep() { return '/'; } +inline char GetAltDirSep() { return '/'; } /// Equivalent to Python's os.path.basename() for file system paths. std::string GetFileName(const std::string &path); @@ -34,7 +34,7 @@ std::string GetFileName(const std::string &path); std::string GetUserTempDir(); /// \return Whether or not the given character is a directory separator on this platform. -static inline bool IsDirSep(char ch) { +inline bool IsDirSep(char ch) { bool result = ch == std::filesystem::path::preferred_separator; #ifdef _WIN32 result |= ch == GetAltDirSep(); diff --git a/src/ray/util/logging.cc b/src/ray/util/logging.cc index 3e40277751437..96835c5912a55 100644 --- a/src/ray/util/logging.cc +++ b/src/ray/util/logging.cc @@ -42,7 +42,7 @@ #include "nlohmann/json.hpp" #include "ray/util/event_label.h" #include "ray/util/filesystem.h" -#include "ray/util/util.h" +#include "ray/util/thread_utils.h" #include "spdlog/sinks/basic_file_sink.h" #include "spdlog/sinks/rotating_file_sink.h" #include "spdlog/sinks/stdout_color_sinks.h" diff --git a/src/ray/util/memory.h b/src/ray/util/memory.h index 9eebe5448ccfb..0eb1c8955cd0f 100644 --- a/src/ray/util/memory.h +++ b/src/ray/util/memory.h @@ -14,7 +14,7 @@ #pragma once -#include +#include namespace ray { diff --git a/src/ray/util/ordered_set.h b/src/ray/util/ordered_set.h deleted file mode 100644 index 3f4563d5c5d8f..0000000000000 --- a/src/ray/util/ordered_set.h +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2017 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include - -#include "absl/container/flat_hash_map.h" -/// \class ordered_set -/// -/// This container has properties of both a deque and a set. It is like a deque -/// in the sense that it maintains the insertion order and allows you to -/// push_back elements and pop_front elements. It is like a set in the sense -/// that it does not allow duplicate entries. Looking up and erasing elements is -/// quick. -template -class ordered_set { - private: - using elements_type = std::list; - using positions_type = absl::flat_hash_map; - using iterator = typename elements_type::iterator; - using const_iterator = typename elements_type::const_iterator; - - public: - ordered_set() {} - - ordered_set(const ordered_set &other) = delete; - - ordered_set &operator=(const ordered_set &other) = delete; - - void push_back(const T &value) { - RAY_CHECK(positions_.find(value) == positions_.end()); - auto list_iterator = elements_.insert(elements_.end(), value); - positions_[value] = list_iterator; - } - - size_t count(const T &k) const { return positions_.count(k); } - - void pop_front() { - positions_.erase(elements_.front()); - elements_.pop_front(); - } - - const T &front() const { return elements_.front(); } - - size_t size() const noexcept { return positions_.size(); } - - size_t erase(const T &k) { - auto it = positions_.find(k); - RAY_CHECK(it != positions_.end()); - elements_.erase(it->second); - return positions_.erase(k); - } - - iterator erase(const iterator position) { - positions_.erase(*position); - return elements_.erase(position); - } - - iterator begin() noexcept { return elements_.begin(); } - - const_iterator begin() const noexcept { return elements_.begin(); } - - iterator end() noexcept { return elements_.end(); } - - const_iterator end() const noexcept { return elements_.end(); } - - private: - elements_type elements_; - positions_type positions_; -}; diff --git a/src/ray/util/pipe_logger.cc b/src/ray/util/pipe_logger.cc index 991d91fcac7f1..75a775108f64c 100644 --- a/src/ray/util/pipe_logger.cc +++ b/src/ray/util/pipe_logger.cc @@ -24,6 +24,7 @@ #include #include "absl/strings/str_split.h" +#include "ray/util/thread_utils.h" #include "spdlog/sinks/basic_file_sink.h" #include "spdlog/sinks/rotating_file_sink.h" #include "spdlog/sinks/stdout_color_sinks.h" diff --git a/src/ray/util/process.cc b/src/ray/util/process.cc index ddeac390671a4..fa3d9147306da 100644 --- a/src/ray/util/process.cc +++ b/src/ray/util/process.cc @@ -42,7 +42,6 @@ #include "ray/util/logging.h" #include "ray/util/macros.h" #include "ray/util/subreaper.h" -#include "ray/util/util.h" #ifdef __APPLE__ extern char **environ; diff --git a/src/ray/util/random.h b/src/ray/util/random.h new file mode 100644 index 0000000000000..8e0573b82e769 --- /dev/null +++ b/src/ray/util/random.h @@ -0,0 +1,30 @@ +// Copyright 2025 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "absl/random/random.h" + +/// A helper function to fill random bytes into the `data`. +/// Warning: this is not fork-safe, we need to re-seed after that. +template +void FillRandom(T *data) { + RAY_CHECK(data != nullptr); + + thread_local absl::BitGen generator; + for (size_t i = 0; i < data->size(); i++) { + (*data)[i] = static_cast( + absl::Uniform(generator, 0, std::numeric_limits::max())); + } +} diff --git a/src/ray/util/string_utils.h b/src/ray/util/string_utils.h new file mode 100644 index 0000000000000..cffe833f6addb --- /dev/null +++ b/src/ray/util/string_utils.h @@ -0,0 +1,33 @@ +// Copyright 2025 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace ray { + +// Transfer the string to the Hex format. It can be more readable than the ANSI mode +inline std::string StringToHex(const std::string &str) { + constexpr char hex[] = "0123456789abcdef"; + std::string result; + for (size_t i = 0; i < str.size(); i++) { + unsigned char val = str[i]; + result.push_back(hex[val >> 4]); + result.push_back(hex[val & 0xf]); + } + return result; +} + +} // namespace ray diff --git a/src/ray/util/tests/event_test.cc b/src/ray/util/tests/event_test.cc index 60d375d803606..f29b4ca809a53 100644 --- a/src/ray/util/tests/event_test.cc +++ b/src/ray/util/tests/event_test.cc @@ -14,6 +14,9 @@ #include "ray/util/event.h" +#include +#include + #include #include #include @@ -21,9 +24,9 @@ #include #include -#include "gmock/gmock.h" -#include "gtest/gtest.h" #include "ray/util/event_label.h" +#include "ray/util/random.h" +#include "ray/util/string_utils.h" #include "src/ray/protobuf/gcs.pb.h" using json = nlohmann::json; diff --git a/src/ray/util/tests/exponential_backoff_test.cc b/src/ray/util/tests/exponential_backoff_test.cc index 3853a113c1fb3..21568c010d120 100644 --- a/src/ray/util/tests/exponential_backoff_test.cc +++ b/src/ray/util/tests/exponential_backoff_test.cc @@ -20,26 +20,26 @@ namespace ray { -TEST(ExponentialBackoffTest, TestExponentialIncrease) { - ASSERT_EQ(ExponentialBackoff::GetBackoffMs(0, 157), 157 * 1); - ASSERT_EQ(ExponentialBackoff::GetBackoffMs(1, 157), 157 * 2); - ASSERT_EQ(ExponentialBackoff::GetBackoffMs(2, 157), 157 * 4); - ASSERT_EQ(ExponentialBackoff::GetBackoffMs(3, 157), 157 * 8); - - ASSERT_EQ(ExponentialBackoff::GetBackoffMs(10, 0), 0); - ASSERT_EQ(ExponentialBackoff::GetBackoffMs(11, 0), 0); +TEST(ExponentialBackOffTest, TestExponentialIncrease) { + ASSERT_EQ(ExponentialBackOff::GetBackoffMs(0, 157), 157 * 1); + ASSERT_EQ(ExponentialBackOff::GetBackoffMs(1, 157), 157 * 2); + ASSERT_EQ(ExponentialBackOff::GetBackoffMs(2, 157), 157 * 4); + ASSERT_EQ(ExponentialBackOff::GetBackoffMs(3, 157), 157 * 8); + + ASSERT_EQ(ExponentialBackOff::GetBackoffMs(10, 0), 0); + ASSERT_EQ(ExponentialBackOff::GetBackoffMs(11, 0), 0); } -TEST(ExponentialBackoffTest, TestExceedMaxBackoffReturnsMaxBackoff) { - auto backoff = ExponentialBackoff::GetBackoffMs( +TEST(ExponentialBackOffTest, TestExceedMaxBackoffReturnsMaxBackoff) { + auto backoff = ExponentialBackOff::GetBackoffMs( /*attempt*/ 10, /*base_ms*/ 1, /*max_backoff_ms*/ 5); ASSERT_EQ(backoff, 5); } -TEST(ExponentialBackoffTest, TestOverflowReturnsMaxBackoff) { +TEST(ExponentialBackOffTest, TestOverflowReturnsMaxBackoff) { // 2 ^ 64+ will overflow. for (int i = 64; i < 10000; i++) { - auto backoff = ExponentialBackoff::GetBackoffMs( + auto backoff = ExponentialBackOff::GetBackoffMs( /*attempt*/ i, /*base_ms*/ 1, /*max_backoff_ms*/ 1234); @@ -47,6 +47,23 @@ TEST(ExponentialBackoffTest, TestOverflowReturnsMaxBackoff) { } } +TEST(ExponentialBackOffTest, GetNext) { + auto exp = ExponentialBackOff{1, 2, 9}; + ASSERT_EQ(1, exp.Next()); + ASSERT_EQ(2, exp.Next()); + ASSERT_EQ(4, exp.Next()); + ASSERT_EQ(8, exp.Next()); + ASSERT_EQ(9, exp.Next()); + ASSERT_EQ(9, exp.Next()); + exp.Reset(); + ASSERT_EQ(1, exp.Next()); + ASSERT_EQ(2, exp.Next()); + ASSERT_EQ(4, exp.Next()); + ASSERT_EQ(8, exp.Next()); + ASSERT_EQ(9, exp.Next()); + ASSERT_EQ(9, exp.Next()); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/util/tests/util_test.cc b/src/ray/util/tests/util_test.cc index 9be4bf110c95d..133da93a16168 100644 --- a/src/ray/util/tests/util_test.cc +++ b/src/ray/util/tests/util_test.cc @@ -109,23 +109,6 @@ TEST(UtilTest, ParseCommandLineTest) { ASSERT_EQ(ParseCommandLine(R"(x' a \b')", win32), ArgList({R"(x')", R"(a)", R"(\b')"})); } -TEST(UtilTest, ExponentialBackOffTest) { - auto exp = ExponentialBackOff(1, 2, 9); - ASSERT_EQ(1, exp.Next()); - ASSERT_EQ(2, exp.Next()); - ASSERT_EQ(4, exp.Next()); - ASSERT_EQ(8, exp.Next()); - ASSERT_EQ(9, exp.Next()); - ASSERT_EQ(9, exp.Next()); - exp.Reset(); - ASSERT_EQ(1, exp.Next()); - ASSERT_EQ(2, exp.Next()); - ASSERT_EQ(4, exp.Next()); - ASSERT_EQ(8, exp.Next()); - ASSERT_EQ(9, exp.Next()); - ASSERT_EQ(9, exp.Next()); -} - TEST(UtilTest, ParseURLTest) { const std::string url = "http://abc?num_objects=9&offset=8388878&size=8388878"; auto parsed_url = *ParseURL(url); diff --git a/src/ray/util/thread_utils.h b/src/ray/util/thread_utils.h new file mode 100644 index 0000000000000..dbb1d44b955a4 --- /dev/null +++ b/src/ray/util/thread_utils.h @@ -0,0 +1,111 @@ +// Copyright 2025 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#ifdef __APPLE__ +#include +#endif + +#ifdef __linux__ +#include +#endif + +#ifdef _WIN32 +#ifndef _WINDOWS_ +#ifndef WIN32_LEAN_AND_MEAN // Sorry for the inconvenience. Please include any related + // headers you need manually. + // (https://stackoverflow.com/a/8294669) +#define WIN32_LEAN_AND_MEAN // Prevent inclusion of WinSock2.h +#endif +#include // Force inclusion of WinGDI here to resolve name conflict +#endif +#endif + +#include "ray/util/thread_checker.h" + +// Returns the TID of the calling thread. +#ifdef __APPLE__ +inline uint64_t GetTid() { + uint64_t tid; + RAY_CHECK_EQ(pthread_threadid_np(NULL, &tid), 0); + return tid; +} +#elif defined(_WIN32) +inline DWORD GetTid() { return GetCurrentThreadId(); } +#else +inline pid_t GetTid() { return syscall(__NR_gettid); } +#endif + +inline std::string GetThreadName() { +#if defined(__linux__) || defined(__APPLE__) + char name[128]; + auto rc = pthread_getname_np(pthread_self(), name, sizeof(name)); + if (rc != 0) { + return "ERROR"; + } else { + return name; + } +#else + return "UNKNOWN"; +#endif +} + +// Set [thread_name] to current thread; if it fails, error will be logged. +// NOTICE: It only works for macos and linux. +inline void SetThreadName(const std::string &thread_name) { + int ret = 0; +#if defined(__APPLE__) + ret = pthread_setname_np(thread_name.c_str()); +#elif defined(__linux__) + ret = pthread_setname_np(pthread_self(), thread_name.substr(0, 15).c_str()); +#endif + if (ret < 0) { + RAY_LOG(ERROR) << "Fails to set thread name to " << thread_name << " since " + << strerror(errno); + } +} + +namespace ray { +template +class ThreadPrivate { + public: + template + explicit ThreadPrivate(Ts &&...ts) : t_(std::forward(ts)...) {} + + T &operator*() { + RAY_CHECK(thread_checker_.IsOnSameThread()); + return t_; + } + + T *operator->() { + RAY_CHECK(thread_checker_.IsOnSameThread()); + return &t_; + } + + const T &operator*() const { + RAY_CHECK(thread_checker_.IsOnSameThread()); + return t_; + } + + const T *operator->() const { + RAY_CHECK(thread_checker_.IsOnSameThread()); + return &t_; + } + + private: + T t_; + mutable ThreadChecker thread_checker_; +}; +} // namespace ray diff --git a/src/ray/util/timestamp_utils.h b/src/ray/util/timestamp_utils.h new file mode 100644 index 0000000000000..69d034cb9cebe --- /dev/null +++ b/src/ray/util/timestamp_utils.h @@ -0,0 +1,27 @@ +// Copyright 2025 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace ray { + +inline int64_t current_sys_time_s() { + std::chrono::seconds s_since_epoch = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()); + return s_since_epoch.count(); +} + +} // namespace ray diff --git a/src/ray/util/util.h b/src/ray/util/util.h index c93a0a0414299..ad8a6811e4034 100644 --- a/src/ray/util/util.h +++ b/src/ray/util/util.h @@ -45,11 +45,9 @@ #include #include "absl/container/flat_hash_map.h" -#include "absl/random/random.h" #include "ray/util/logging.h" #include "ray/util/macros.h" #include "ray/util/process.h" -#include "ray/util/thread_checker.h" #ifdef _WIN32 #include // to ensure getpid() on Windows @@ -73,18 +71,6 @@ class stream_protocol; enum class CommandLineSyntax { System, POSIX, Windows }; -// Transfer the string to the Hex format. It can be more readable than the ANSI mode -inline std::string StringToHex(const std::string &str) { - constexpr char hex[] = "0123456789abcdef"; - std::string result; - for (size_t i = 0; i < str.size(); i++) { - unsigned char val = str[i]; - result.push_back(hex[val >> 4]); - result.push_back(hex[val & 0xf]); - } - return result; -} - // Append append_str to the begining of each line of str. inline std::string AppendToEachLine(const std::string &str, const std::string &append_str) { @@ -99,19 +85,6 @@ inline std::string AppendToEachLine(const std::string &str, return ss.str(); } -// Returns the TID of the calling thread. -#ifdef __APPLE__ -inline uint64_t GetTid() { - uint64_t tid; - RAY_CHECK_EQ(pthread_threadid_np(NULL, &tid), 0); - return tid; -} -#elif defined(_WIN32) -inline DWORD GetTid() { return GetCurrentThreadId(); } -#else -inline pid_t GetTid() { return syscall(__NR_gettid); } -#endif - inline int64_t current_sys_time_s() { std::chrono::seconds s_since_epoch = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()); @@ -230,19 +203,6 @@ struct EnumClassHash { template using EnumUnorderedMap = absl::flat_hash_map; -/// A helper function to fill random bytes into the `data`. -/// Warning: this is not fork-safe, we need to re-seed after that. -template -void FillRandom(T *data) { - RAY_CHECK(data != nullptr); - - thread_local absl::BitGen generator; - for (size_t i = 0; i < data->size(); i++) { - (*data)[i] = static_cast( - absl::Uniform(generator, 0, std::numeric_limits::max())); - } -} - inline void setEnv(const std::string &name, const std::string &value) { #ifdef _WIN32 std::string env = name + "=" + value; @@ -264,108 +224,7 @@ inline void unsetEnv(const std::string &name) { RAY_CHECK_EQ(ret, 0) << "Failed to unset env var " << name; } -// Set [thread_name] to current thread; if it fails, error will be logged. -// NOTICE: It only works for macos and linux. -inline void SetThreadName(const std::string &thread_name) { - int ret = 0; -#if defined(__APPLE__) - ret = pthread_setname_np(thread_name.c_str()); -#elif defined(__linux__) - ret = pthread_setname_np(pthread_self(), thread_name.substr(0, 15).c_str()); -#endif - if (ret < 0) { - RAY_LOG(ERROR) << "Fails to set thread name to " << thread_name << " since " - << strerror(errno); - } -} - -inline std::string GetThreadName() { -#if defined(__linux__) || defined(__APPLE__) - char name[128]; - auto rc = pthread_getname_np(pthread_self(), name, sizeof(name)); - if (rc != 0) { - return "ERROR"; - } else { - return name; - } -#else - return "UNKNOWN"; -#endif -} - namespace ray { -template -class ThreadPrivate { - public: - template - explicit ThreadPrivate(Ts &&...ts) : t_(std::forward(ts)...) {} - - T &operator*() { - RAY_CHECK(thread_checker_.IsOnSameThread()); - return t_; - } - - T *operator->() { - RAY_CHECK(thread_checker_.IsOnSameThread()); - return &t_; - } - - const T &operator*() const { - RAY_CHECK(thread_checker_.IsOnSameThread()); - return t_; - } - - const T *operator->() const { - RAY_CHECK(thread_checker_.IsOnSameThread()); - return &t_; - } - - private: - T t_; - mutable ThreadChecker thread_checker_; -}; - -class ExponentialBackOff { - public: - ExponentialBackOff() = default; - ExponentialBackOff(const ExponentialBackOff &) = default; - ExponentialBackOff(ExponentialBackOff &&) = default; - ExponentialBackOff &operator=(const ExponentialBackOff &) = default; - ExponentialBackOff &operator=(ExponentialBackOff &&) = default; - - /// Construct an exponential back off counter. - /// - /// \param[in] initial_value The start value for this counter - /// \param[in] multiplier The multiplier for this counter. - /// \param[in] max_value The maximum value for this counter. By default it's - /// infinite double. - ExponentialBackOff(uint64_t initial_value, - double multiplier, - uint64_t max_value = std::numeric_limits::max()) - : curr_value_(initial_value), - initial_value_(initial_value), - max_value_(max_value), - multiplier_(multiplier) { - RAY_CHECK(multiplier > 0.0) << "Multiplier must be greater than 0"; - } - - uint64_t Next() { - auto ret = curr_value_; - curr_value_ = curr_value_ * multiplier_; - curr_value_ = std::min(curr_value_, max_value_); - return ret; - } - - uint64_t Current() { return curr_value_; } - - void Reset() { curr_value_ = initial_value_; } - - private: - uint64_t curr_value_; - uint64_t initial_value_; - uint64_t max_value_; - double multiplier_; -}; /// Return true if the raylet is failed. This util function is only meant to be used by /// core worker modules. From e794acddbd76cb4a4dcf1a1c5ff96b3075995a59 Mon Sep 17 00:00:00 2001 From: dentiny Date: Sat, 18 Jan 2025 20:43:22 +0000 Subject: [PATCH 2/5] fix build and revert static Signed-off-by: dentiny --- .../export_api/gcs_node_manager_export_event_test.cc | 9 ++++++--- src/ray/util/filesystem.h | 4 ++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/ray/gcs/gcs_server/test/export_api/gcs_node_manager_export_event_test.cc b/src/ray/gcs/gcs_server/test/export_api/gcs_node_manager_export_event_test.cc index b2ea5e25ad394..c906209e31ffd 100644 --- a/src/ray/gcs/gcs_server/test/export_api/gcs_node_manager_export_event_test.cc +++ b/src/ray/gcs/gcs_server/test/export_api/gcs_node_manager_export_event_test.cc @@ -12,18 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include #include #include -// clang-format off -#include "gtest/gtest.h" #include "ray/gcs/gcs_server/test/gcs_server_test_util.h" #include "ray/gcs/test/gcs_test_util.h" +#include "ray/util/event.h" +#include "ray/util/string_utils.h" + +// clang-format off #include "ray/rpc/node_manager/node_manager_client.h" #include "ray/rpc/node_manager/node_manager_client_pool.h" #include "mock/ray/pubsub/publisher.h" -#include "ray/util/event.h" // clang-format on using json = nlohmann::json; diff --git a/src/ray/util/filesystem.h b/src/ray/util/filesystem.h index 390c67f3f3eb7..5f15d055368ed 100644 --- a/src/ray/util/filesystem.h +++ b/src/ray/util/filesystem.h @@ -25,7 +25,7 @@ namespace ray { /// \return The portable directory separator (slash on all OSes). -inline char GetAltDirSep() { return '/'; } +static inline char GetAltDirSep() { return '/'; } /// Equivalent to Python's os.path.basename() for file system paths. std::string GetFileName(const std::string &path); @@ -34,7 +34,7 @@ std::string GetFileName(const std::string &path); std::string GetUserTempDir(); /// \return Whether or not the given character is a directory separator on this platform. -inline bool IsDirSep(char ch) { +static inline bool IsDirSep(char ch) { bool result = ch == std::filesystem::path::preferred_separator; #ifdef _WIN32 result |= ch == GetAltDirSep(); From 8ba3ffa4c814645d9820d515b7d69c547be240fe Mon Sep 17 00:00:00 2001 From: dentiny Date: Sun, 19 Jan 2025 02:10:13 +0000 Subject: [PATCH 3/5] add pr reference Signed-off-by: dentiny --- src/ray/util/BUILD | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/util/BUILD b/src/ray/util/BUILD index 6e230c190e285..9320729a95be3 100644 --- a/src/ray/util/BUILD +++ b/src/ray/util/BUILD @@ -176,7 +176,7 @@ ray_cc_library( hdrs = ["sample.h"], ) -# A giant 'util' target is split since PR (TODO: fill in PR number), here we keep the 'util' target for API compatibility. +# A giant 'util' target is split since PR https://github.com/ray-project/ray/pull/49938, here we keep the 'util' target for API compatibility. # # TODO(hjiang): We include a bunch of misc util function/class inside of the class, should split into multiple files and build targets. ray_cc_library( From 8755a5f33ab5f47009679a67e40000b2e5fa8cd6 Mon Sep 17 00:00:00 2001 From: dentiny Date: Mon, 20 Jan 2025 03:28:05 +0000 Subject: [PATCH 4/5] backoff Signed-off-by: dentiny --- src/ray/core_worker/task_manager.cc | 2 +- src/ray/util/exponential_backoff.cc | 2 +- .../util/tests/exponential_backoff_test.cc | 22 +++++++++---------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 2a4ca4c1967f9..79c559a15415f 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -1015,7 +1015,7 @@ bool TaskManager::RetryTaskIfPossible(const TaskID &task_id, // TODO(clarng): clean up and remove task_retry_delay_ms that is relied // on by some tests. int32_t delay_ms = task_failed_due_to_oom - ? ExponentialBackOff::GetBackoffMs( + ? ExponentialBackoff::GetBackoffMs( spec.AttemptNumber(), RayConfig::instance().task_oom_retry_delay_base_ms()) : RayConfig::instance().task_retry_delay_ms(); diff --git a/src/ray/util/exponential_backoff.cc b/src/ray/util/exponential_backoff.cc index 1685136088de2..f94452f5c1917 100644 --- a/src/ray/util/exponential_backoff.cc +++ b/src/ray/util/exponential_backoff.cc @@ -18,7 +18,7 @@ namespace ray { -uint64_t ExponentialBackOff::GetBackoffMs(uint64_t attempt, +uint64_t ExponentialBackoff::GetBackoffMs(uint64_t attempt, uint64_t base_ms, uint64_t max_backoff_ms) { uint64_t delay = static_cast(pow(2, attempt)); diff --git a/src/ray/util/tests/exponential_backoff_test.cc b/src/ray/util/tests/exponential_backoff_test.cc index 6d0190e9f3b4b..c8c274f36e8d5 100644 --- a/src/ray/util/tests/exponential_backoff_test.cc +++ b/src/ray/util/tests/exponential_backoff_test.cc @@ -20,26 +20,26 @@ namespace ray { -TEST(ExponentialBackOffTest, TestExponentialIncrease) { - ASSERT_EQ(ExponentialBackOff::GetBackoffMs(0, 157), 157 * 1); - ASSERT_EQ(ExponentialBackOff::GetBackoffMs(1, 157), 157 * 2); - ASSERT_EQ(ExponentialBackOff::GetBackoffMs(2, 157), 157 * 4); - ASSERT_EQ(ExponentialBackOff::GetBackoffMs(3, 157), 157 * 8); +TEST(ExponentialBackoffTest, TestExponentialIncrease) { + ASSERT_EQ(ExponentialBackoff::GetBackoffMs(0, 157), 157 * 1); + ASSERT_EQ(ExponentialBackoff::GetBackoffMs(1, 157), 157 * 2); + ASSERT_EQ(ExponentialBackoff::GetBackoffMs(2, 157), 157 * 4); + ASSERT_EQ(ExponentialBackoff::GetBackoffMs(3, 157), 157 * 8); - ASSERT_EQ(ExponentialBackOff::GetBackoffMs(10, 0), 0); - ASSERT_EQ(ExponentialBackOff::GetBackoffMs(11, 0), 0); + ASSERT_EQ(ExponentialBackoff::GetBackoffMs(10, 0), 0); + ASSERT_EQ(ExponentialBackoff::GetBackoffMs(11, 0), 0); } -TEST(ExponentialBackOffTest, TestExceedMaxBackoffReturnsMaxBackoff) { - auto backoff = ExponentialBackOff::GetBackoffMs( +TEST(ExponentialBackoffTest, TestExceedMaxBackoffReturnsMaxBackoff) { + auto backoff = ExponentialBackoff::GetBackoffMs( /*attempt*/ 10, /*base_ms*/ 1, /*max_backoff_ms*/ 5); ASSERT_EQ(backoff, 5); } -TEST(ExponentialBackOffTest, TestOverflowReturnsMaxBackoff) { +TEST(ExponentialBackoffTest, TestOverflowReturnsMaxBackoff) { // 2 ^ 64+ will overflow. for (int i = 64; i < 10000; i++) { - auto backoff = ExponentialBackOff::GetBackoffMs( + auto backoff = ExponentialBackoff::GetBackoffMs( /*attempt*/ i, /*base_ms*/ 1, /*max_backoff_ms*/ 1234); From 10ab8b9bede93be49afa57c506cbc9817042530c Mon Sep 17 00:00:00 2001 From: dentiny Date: Mon, 20 Jan 2025 22:57:47 +0000 Subject: [PATCH 5/5] fix git conflict Signed-off-by: dentiny --- src/ray/util/tests/util_test.cc | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/src/ray/util/tests/util_test.cc b/src/ray/util/tests/util_test.cc index 20fb072a20603..133da93a16168 100644 --- a/src/ray/util/tests/util_test.cc +++ b/src/ray/util/tests/util_test.cc @@ -109,26 +109,6 @@ TEST(UtilTest, ParseCommandLineTest) { ASSERT_EQ(ParseCommandLine(R"(x' a \b')", win32), ArgList({R"(x')", R"(a)", R"(\b')"})); } -<<<<<<< HEAD -======= -TEST(UtilTest, ExponentialBackoffTest) { - auto exp = ExponentialBackoff(1, 2, 9); - ASSERT_EQ(1, exp.Next()); - ASSERT_EQ(2, exp.Next()); - ASSERT_EQ(4, exp.Next()); - ASSERT_EQ(8, exp.Next()); - ASSERT_EQ(9, exp.Next()); - ASSERT_EQ(9, exp.Next()); - exp.Reset(); - ASSERT_EQ(1, exp.Next()); - ASSERT_EQ(2, exp.Next()); - ASSERT_EQ(4, exp.Next()); - ASSERT_EQ(8, exp.Next()); - ASSERT_EQ(9, exp.Next()); - ASSERT_EQ(9, exp.Next()); -} - ->>>>>>> master TEST(UtilTest, ParseURLTest) { const std::string url = "http://abc?num_objects=9&offset=8388878&size=8388878"; auto parsed_url = *ParseURL(url);