Skip to content

Commit

Permalink
[core] [easy] [no-op] Unify exponential backoff (ray-project#49957)
Browse files Browse the repository at this point in the history
We have two exponential backoff class, one with `Off` in uppercase,
another one with `off` in lowercase, which is confusion; and I think it
doesn't make sense to have two classes serving strongly relavent
feature.

This PR is originally part of the refactor PR, suggested by @jjyao to
make a separate one.
Refactor PR for reference: ray-project#49938
Jiajun's comment:
ray-project#49938 (comment)

---------

Signed-off-by: dentiny <[email protected]>
Signed-off-by: Jimmy Xie <[email protected]>
  • Loading branch information
dentiny authored and jimmyxie-figma committed Jan 20, 2025
1 parent a334999 commit 9e27b95
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 66 deletions.
8 changes: 4 additions & 4 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace gcs {

namespace {

ExponentialBackOff CreateDefaultBackoff() {
ExponentialBackoff CreateDefaultBackoff() {
// std::chrono conversions are unwieldy but safer.
// ms -> ns
using std::chrono::duration_cast;
Expand All @@ -44,7 +44,7 @@ ExponentialBackOff CreateDefaultBackoff() {
milliseconds(
RayConfig::instance().gcs_create_placement_group_retry_max_interval_ms()))
.count();
return ExponentialBackOff(
return ExponentialBackoff(
initial_delay_ns,
RayConfig::instance().gcs_create_placement_group_retry_multiplier(),
max_delay_ns);
Expand Down Expand Up @@ -307,7 +307,7 @@ PlacementGroupID GcsPlacementGroupManager::GetPlacementGroupIDByName(

void GcsPlacementGroupManager::OnPlacementGroupCreationFailed(
std::shared_ptr<GcsPlacementGroup> placement_group,
ExponentialBackOff backoff,
ExponentialBackoff backoff,
bool is_feasible) {
RAY_LOG(DEBUG).WithField(placement_group->GetPlacementGroupID())
<< "Failed to create placement group " << placement_group->GetName()
Expand Down Expand Up @@ -758,7 +758,7 @@ void GcsPlacementGroupManager::WaitPlacementGroup(
void GcsPlacementGroupManager::AddToPendingQueue(
std::shared_ptr<GcsPlacementGroup> pg,
std::optional<int64_t> rank,
std::optional<ExponentialBackOff> exp_backer) {
std::optional<ExponentialBackoff> exp_backer) {
if (!rank) {
rank = absl::GetCurrentTimeNanos();
}
Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// \param placement_group The placement_group whose creation task is infeasible.
/// \param is_feasible whether the scheduler can be retry or not currently.
void OnPlacementGroupCreationFailed(std::shared_ptr<GcsPlacementGroup> placement_group,
ExponentialBackOff backoff,
ExponentialBackoff backoff,
bool is_feasible);

/// Handle placement_group creation task success. This should be called when the
Expand Down Expand Up @@ -408,7 +408,7 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// it's not set. This will be used to generate the deferred time for this pg.
void AddToPendingQueue(std::shared_ptr<GcsPlacementGroup> pg,
std::optional<int64_t> rank = std::nullopt,
std::optional<ExponentialBackOff> exp_backer = std::nullopt);
std::optional<ExponentialBackoff> exp_backer = std::nullopt);
void RemoveFromPendingQueue(const PlacementGroupID &pg_id);

/// Try to create placement group after a short time.
Expand Down Expand Up @@ -471,7 +471,7 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// need to post retry job to io context. And when schedule pending placement
/// group, we always start with the one with the smallest key.
absl::btree_multimap<int64_t,
std::pair<ExponentialBackOff, std::shared_ptr<GcsPlacementGroup>>>
std::pair<ExponentialBackoff, std::shared_ptr<GcsPlacementGroup>>>
pending_placement_groups_;

/// The infeasible placement_groups that can't be scheduled currently.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ TEST_F(GcsPlacementGroupManagerMockTest, PendingQueuePriorityFailed) {
pg->UpdateState(rpc::PlacementGroupTableData::PENDING);
now = absl::GetCurrentTimeNanos();
request.failure_callback(pg, true);
auto exp_backer = ExponentialBackOff(
auto exp_backer = ExponentialBackoff(
1000000 * RayConfig::instance().gcs_create_placement_group_retry_min_interval_ms(),
RayConfig::instance().gcs_create_placement_group_retry_multiplier(),
1000000 * RayConfig::instance().gcs_create_placement_group_retry_max_interval_ms());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class GcsPlacementGroupManagerTest : public ::testing::Test {

void RunIOService() { io_service_.poll(); }

ExponentialBackOff GetExpBackOff() { return ExponentialBackOff(0, 1); }
ExponentialBackoff GetExpBackOff() { return ExponentialBackoff(0, 1); }

std::shared_ptr<MockPlacementGroupScheduler> mock_placement_group_scheduler_;
std::unique_ptr<gcs::GcsPlacementGroupManager> gcs_placement_group_manager_;
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/redis_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ struct RedisRequestContext {
void Run();

private:
ExponentialBackOff exp_back_off_;
ExponentialBackoff exp_back_off_;
instrumented_io_context &io_service_;
RedisAsyncContext *redis_context_;
size_t pending_retries_;
Expand Down
4 changes: 1 addition & 3 deletions src/ray/util/exponential_backoff.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@

#include "ray/util/exponential_backoff.h"

#include <math.h>

#include "ray/util/logging.h"
#include <cmath>

namespace ray {

Expand Down
43 changes: 39 additions & 4 deletions src/ray/util/exponential_backoff.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,33 @@

#pragma once

#include <gtest/gtest_prod.h>
#include <stdint.h>
#include <cstdint>
#include <limits>

#include "ray/util/logging.h"

namespace ray {

/// Provides the exponential backoff algorithm that is typically used
/// for throttling.
class ExponentialBackoff {
public:
/// Construct an exponential back off counter.
///
/// \param[in] initial_value The start value for this counter
/// \param[in] multiplier The multiplier for this counter.
/// \param[in] max_value The maximum value for this counter. By default it's
/// infinite double.
ExponentialBackoff(uint64_t initial_value,
double multiplier,
uint64_t max_value = std::numeric_limits<uint64_t>::max())
: curr_value_(initial_value),
initial_value_(initial_value),
max_value_(max_value),
multiplier_(multiplier) {
RAY_CHECK(multiplier > 0.0) << "Multiplier must be greater than 0";
}

/// Computes the backoff delay using the exponential backoff algorithm,
/// using the formula
///
Expand All @@ -33,11 +51,28 @@ class ExponentialBackoff {
/// @return the delay in ms based on the formula
static uint64_t GetBackoffMs(uint64_t attempt,
uint64_t base_ms,
uint64_t max_backoff_ms = kMaxBackoffMs);
uint64_t max_backoff_ms = kDefaultMaxBackoffMs);

uint64_t Next() {
auto ret = curr_value_;
curr_value_ = curr_value_ * multiplier_;
curr_value_ = std::min(curr_value_, max_value_);
return ret;
}

uint64_t Current() { return curr_value_; }

void Reset() { curr_value_ = initial_value_; }

private:
private:
uint64_t curr_value_;
uint64_t initial_value_;
uint64_t max_value_;
double multiplier_;

// The default cap on the backoff delay.
static constexpr uint64_t kMaxBackoffMs = 1 * 60 * 1000;
static constexpr uint64_t kDefaultMaxBackoffMs = 1 * 60 * 1000;
};

} // namespace ray
22 changes: 17 additions & 5 deletions src/ray/util/tests/exponential_backoff_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,21 @@ TEST(ExponentialBackoffTest, TestOverflowReturnsMaxBackoff) {
}
}

} // namespace ray

int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
TEST(ExponentialBackoffTest, GetNext) {
auto exp = ExponentialBackoff{1, 2, 9};
ASSERT_EQ(1, exp.Next());
ASSERT_EQ(2, exp.Next());
ASSERT_EQ(4, exp.Next());
ASSERT_EQ(8, exp.Next());
ASSERT_EQ(9, exp.Next());
ASSERT_EQ(9, exp.Next());
exp.Reset();
ASSERT_EQ(1, exp.Next());
ASSERT_EQ(2, exp.Next());
ASSERT_EQ(4, exp.Next());
ASSERT_EQ(8, exp.Next());
ASSERT_EQ(9, exp.Next());
ASSERT_EQ(9, exp.Next());
}

} // namespace ray
4 changes: 2 additions & 2 deletions src/ray/util/tests/util_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ TEST(UtilTest, ParseCommandLineTest) {
ASSERT_EQ(ParseCommandLine(R"(x' a \b')", win32), ArgList({R"(x')", R"(a)", R"(\b')"}));
}

TEST(UtilTest, ExponentialBackOffTest) {
auto exp = ExponentialBackOff(1, 2, 9);
TEST(UtilTest, ExponentialBackoffTest) {
auto exp = ExponentialBackoff(1, 2, 9);
ASSERT_EQ(1, exp.Next());
ASSERT_EQ(2, exp.Next());
ASSERT_EQ(4, exp.Next());
Expand Down
43 changes: 1 addition & 42 deletions src/ray/util/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

#include "absl/container/flat_hash_map.h"
#include "absl/random/random.h"
#include "ray/util/exponential_backoff.h"
#include "ray/util/logging.h"
#include "ray/util/macros.h"
#include "ray/util/process.h"
Expand Down Expand Up @@ -325,48 +326,6 @@ class ThreadPrivate {
mutable ThreadChecker thread_checker_;
};

class ExponentialBackOff {
public:
ExponentialBackOff() = default;
ExponentialBackOff(const ExponentialBackOff &) = default;
ExponentialBackOff(ExponentialBackOff &&) = default;
ExponentialBackOff &operator=(const ExponentialBackOff &) = default;
ExponentialBackOff &operator=(ExponentialBackOff &&) = default;

/// Construct an exponential back off counter.
///
/// \param[in] initial_value The start value for this counter
/// \param[in] multiplier The multiplier for this counter.
/// \param[in] max_value The maximum value for this counter. By default it's
/// infinite double.
ExponentialBackOff(uint64_t initial_value,
double multiplier,
uint64_t max_value = std::numeric_limits<uint64_t>::max())
: curr_value_(initial_value),
initial_value_(initial_value),
max_value_(max_value),
multiplier_(multiplier) {
RAY_CHECK(multiplier > 0.0) << "Multiplier must be greater than 0";
}

uint64_t Next() {
auto ret = curr_value_;
curr_value_ = curr_value_ * multiplier_;
curr_value_ = std::min(curr_value_, max_value_);
return ret;
}

uint64_t Current() { return curr_value_; }

void Reset() { curr_value_ = initial_value_; }

private:
uint64_t curr_value_;
uint64_t initial_value_;
uint64_t max_value_;
double multiplier_;
};

/// Return true if the raylet is failed. This util function is only meant to be used by
/// core worker modules.
bool IsRayletFailed(const std::string &raylet_pid);
Expand Down

0 comments on commit 9e27b95

Please sign in to comment.