Skip to content

Commit

Permalink
token bucket: new atomic token bucket implementation (envoyproxy#34789)
Browse files Browse the repository at this point in the history
Commit Message: token bucket: new atomic token bucket implementation
Additional Description:

The main target of this new token bucket is to replace the timer based
token buckets in the local rate limit filter. **The timer based one
depends on the main thread to do the refilling periodically. If the main
thread is occupied by other tasks, the token buckets won't be refreshed
in timely and will effects users' traffics directly.**

See envoyproxy#34774.

Actually, I don't know why we design the token bucket with the timer at
the initial local rate limit filter. We had the `SharedTokenBucketImpl`
as the thread safe token buckets. So, I assume may be we don't want the
lock in the local rate limit filter?

Risk Level: low, only new implementation of token buckets but haven't
been used.
Testing: unit.
Docs Changes: n/a.
Release Notes: n/a.
Platform Specific Features: n/a.

---------

Signed-off-by: wbpcode <[email protected]>
Signed-off-by: wbpcode <[email protected]>
Co-authored-by: wbpcode <[email protected]>
  • Loading branch information
code and wbpcode authored Jul 14, 2024
1 parent bea314b commit 23c5d98
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 0 deletions.
42 changes: 42 additions & 0 deletions source/common/common/token_bucket_impl.cc
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#include "source/common/common/token_bucket_impl.h"

#include <atomic>
#include <chrono>

namespace Envoy {

namespace {
// The minimal fill rate will be one second every year.
constexpr double kMinFillRate = 1.0 / (365 * 24 * 60 * 60);

} // namespace

TokenBucketImpl::TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate)
Expand Down Expand Up @@ -56,4 +58,44 @@ void TokenBucketImpl::maybeReset(uint64_t num_tokens) {
last_fill_ = time_source_.monotonicTime();
}

AtomicTokenBucketImpl::AtomicTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source,
double fill_rate, bool init_fill)
: max_tokens_(max_tokens), fill_rate_(std::max(std::abs(fill_rate), kMinFillRate)),
time_source_(time_source) {
auto time_in_seconds = timeNowInSeconds();
if (init_fill) {
time_in_seconds -= max_tokens_ / fill_rate_;
}
time_in_seconds_.store(time_in_seconds, std::memory_order_relaxed);
}

bool AtomicTokenBucketImpl::consume() {
constexpr auto consumed_cb = [](double total_tokens) -> double {
return total_tokens >= 1 ? 1 : 0;
};
return consume(consumed_cb) == 1;
}

uint64_t AtomicTokenBucketImpl::consume(uint64_t tokens, bool allow_partial) {
const auto consumed_cb = [tokens, allow_partial](double total_tokens) {
const auto consumed = static_cast<double>(tokens);
if (total_tokens >= consumed) {
return consumed; // There are enough tokens to consume.
}
// If allow_partial is true, consume all available tokens.
return allow_partial ? std::max<double>(0, std::floor(total_tokens)) : 0;
};
return static_cast<uint64_t>(consume(consumed_cb));
}

double AtomicTokenBucketImpl::remainingTokens() const {
const double time_now = timeNowInSeconds();
const double time_old = time_in_seconds_.load(std::memory_order_relaxed);
return std::min(max_tokens_, (time_now - time_old) * fill_rate_);
}

double AtomicTokenBucketImpl::timeNowInSeconds() const {
return std::chrono::duration<double>(time_source_.monotonicTime().time_since_epoch()).count();
}

} // namespace Envoy
88 changes: 88 additions & 0 deletions source/common/common/token_bucket_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,92 @@ class TokenBucketImpl : public TokenBucket {
TimeSource& time_source_;
};

/**
* Atomic token bucket. This class is thread-safe.
*/
class AtomicTokenBucketImpl {
public:
/**
* @param max_tokens supplies the maximum number of tokens in the bucket.
* @param time_source supplies the time source.
* @param fill_rate supplies the number of tokens that will return to the bucket on each second.
* The default is 1.
* @param init_fill supplies whether the bucket should be initialized with max_tokens.
*/
explicit AtomicTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source,
double fill_rate = 1.0, bool init_fill = true);

// This reference https://github.com/facebook/folly/blob/main/folly/TokenBucket.h.
template <class GetConsumedTokens> double consume(const GetConsumedTokens& cb) {
const double time_now = timeNowInSeconds();

double time_old = time_in_seconds_.load(std::memory_order_relaxed);
double time_new{};
double consumed{};
do {
const double total_tokens = std::min(max_tokens_, (time_now - time_old) * fill_rate_);
if (consumed = cb(total_tokens); consumed == 0) {
return 0;
}

// There are two special cases that should rarely happen in practice but we will not
// prevent them in this common template method:
// The consumed is negative. It means the token is added back to the bucket.
// The consumed is larger than total_tokens. It means the bucket is overflowed and future
// tokens are consumed.

// Move the time_in_seconds_ forward by the number of tokens consumed.
const double total_tokens_new = total_tokens - consumed;
time_new = time_now - (total_tokens_new / fill_rate_);
} while (
!time_in_seconds_.compare_exchange_weak(time_old, time_new, std::memory_order_relaxed));

return consumed;
}

/**
* Consumes one tokens from the bucket.
* @return true if the token is consumed, false otherwise.
*/
bool consume();

/**
* Consumes multiple tokens from the bucket.
* @param tokens the number of tokens to consume.
* @param allow_partial whether to allow partial consumption.
* @return the number of tokens consumed.
*/
uint64_t consume(uint64_t tokens, bool allow_partial);

/**
* Get the maximum number of tokens in the bucket. The actual maximum number of tokens in the
* bucket may be changed with the factor.
* @return the maximum number of tokens in the bucket.
*/
double maxTokens() const { return max_tokens_; }

/**
* Get the fill rate of the bucket. This is a constant for the lifetime of the bucket. But note
* the actual used fill rate will multiply the dynamic factor.
* @return the fill rate of the bucket.
*/
double fillRate() const { return fill_rate_; }

/**
* Get the remaining number of tokens in the bucket. This is a snapshot and may change after the
* call.
* @return the remaining number of tokens in the bucket.
*/
double remainingTokens() const;

private:
double timeNowInSeconds() const;

const double max_tokens_;
const double fill_rate_;

std::atomic<double> time_in_seconds_{};
TimeSource& time_source_;
};

} // namespace Envoy
1 change: 1 addition & 0 deletions test/common/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ envoy_cc_test(
deps = [
"//source/common/common:token_bucket_impl_lib",
"//test/test_common:simulated_time_system_lib",
"//test/test_common:test_time_lib",
"//test/test_common:utility_lib",
],
)
Expand Down
169 changes: 169 additions & 0 deletions test/common/common/token_bucket_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "source/common/common/token_bucket_impl.h"

#include "test/test_common/simulated_time_system.h"
#include "test/test_common/test_time.h"

#include "gtest/gtest.h"

Expand Down Expand Up @@ -126,4 +127,172 @@ TEST_F(TokenBucketImplTest, YearlyMinRefillRate) {
EXPECT_EQ(1, token_bucket.consume(1, false));
}

class AtomicTokenBucketImplTest : public testing::Test {
protected:
Event::SimulatedTimeSystem time_system_;
};

// Verifies TokenBucket initialization.
TEST_F(AtomicTokenBucketImplTest, Initialization) {
AtomicTokenBucketImpl token_bucket{1, time_system_, -1.0};

EXPECT_EQ(1, token_bucket.fillRate());
EXPECT_EQ(1, token_bucket.maxTokens());
EXPECT_EQ(1, token_bucket.remainingTokens());

EXPECT_EQ(1, token_bucket.consume(1, false));
EXPECT_EQ(0, token_bucket.consume(1, false));
EXPECT_EQ(false, token_bucket.consume());
}

// Verifies TokenBucket's maximum capacity.
TEST_F(AtomicTokenBucketImplTest, MaxBucketSize) {
AtomicTokenBucketImpl token_bucket{3, time_system_, 1};

EXPECT_EQ(1, token_bucket.fillRate());
EXPECT_EQ(3, token_bucket.maxTokens());
EXPECT_EQ(3, token_bucket.remainingTokens());

EXPECT_EQ(3, token_bucket.consume(3, false));
time_system_.setMonotonicTime(std::chrono::seconds(10));
EXPECT_EQ(0, token_bucket.consume(4, false));
EXPECT_EQ(3, token_bucket.consume(3, false));
}

// Verifies that TokenBucket can consume tokens.
TEST_F(AtomicTokenBucketImplTest, Consume) {
AtomicTokenBucketImpl token_bucket{10, time_system_, 1};

EXPECT_EQ(0, token_bucket.consume(20, false));
EXPECT_EQ(9, token_bucket.consume(9, false));

// consume() == consume(1, false)
EXPECT_EQ(true, token_bucket.consume());

time_system_.setMonotonicTime(std::chrono::milliseconds(999));
EXPECT_EQ(0, token_bucket.consume(1, false));

time_system_.setMonotonicTime(std::chrono::milliseconds(5999));
EXPECT_EQ(0, token_bucket.consume(6, false));

time_system_.setMonotonicTime(std::chrono::milliseconds(6000));
EXPECT_EQ(6, token_bucket.consume(6, false));
EXPECT_EQ(0, token_bucket.consume(1, false));
}

// Verifies that TokenBucket can refill tokens.
TEST_F(AtomicTokenBucketImplTest, Refill) {
AtomicTokenBucketImpl token_bucket{1, time_system_, 0.5};
EXPECT_EQ(1, token_bucket.consume(1, false));

time_system_.setMonotonicTime(std::chrono::milliseconds(500));
EXPECT_EQ(0, token_bucket.consume(1, false));
time_system_.setMonotonicTime(std::chrono::milliseconds(1500));
EXPECT_EQ(0, token_bucket.consume(1, false));
time_system_.setMonotonicTime(std::chrono::milliseconds(2000));
EXPECT_EQ(1, token_bucket.consume(1, false));
}

// Test partial consumption of tokens.
TEST_F(AtomicTokenBucketImplTest, PartialConsumption) {
AtomicTokenBucketImpl token_bucket{16, time_system_, 16};
EXPECT_EQ(16, token_bucket.consume(18, true));
time_system_.advanceTimeWait(std::chrono::milliseconds(62));
EXPECT_EQ(0, token_bucket.consume(1, true));
time_system_.advanceTimeWait(std::chrono::milliseconds(1));
EXPECT_EQ(1, token_bucket.consume(2, true));
}

// Validate that a minimal refresh time is 1 year.
TEST_F(AtomicTokenBucketImplTest, YearlyMinRefillRate) {
constexpr uint64_t seconds_per_year = 365 * 24 * 60 * 60;
// Set the fill rate to be 2 years.
AtomicTokenBucketImpl token_bucket{1, time_system_, 1.0 / (seconds_per_year * 2)};

// Consume first token.
EXPECT_EQ(1, token_bucket.consume(1, false));

// Less than a year should still have no tokens.
time_system_.setMonotonicTime(std::chrono::seconds(seconds_per_year - 1));
EXPECT_EQ(0, token_bucket.consume(1, false));
time_system_.setMonotonicTime(std::chrono::seconds(seconds_per_year));
EXPECT_EQ(1, token_bucket.consume(1, false));
}

TEST_F(AtomicTokenBucketImplTest, ConsumeNegativeTokens) {
AtomicTokenBucketImpl token_bucket{10, time_system_, 1};

EXPECT_EQ(3, token_bucket.consume([](double) { return 3; }));
EXPECT_EQ(7, token_bucket.remainingTokens());
EXPECT_EQ(-3, token_bucket.consume([](double) { return -3; }));
EXPECT_EQ(10, token_bucket.remainingTokens());
}

TEST_F(AtomicTokenBucketImplTest, ConsumeSuperLargeTokens) {
AtomicTokenBucketImpl token_bucket{10, time_system_, 1};

EXPECT_EQ(100, token_bucket.consume([](double) { return 100; }));
EXPECT_EQ(-90, token_bucket.remainingTokens());
}

TEST_F(AtomicTokenBucketImplTest, MultipleThreadsConsume) {
// Real time source to ensure we will not fall into endless loop.
Event::TestRealTimeSystem real_time_source;

AtomicTokenBucketImpl token_bucket{1200, time_system_, 1.0};

// Exhaust all tokens.
EXPECT_EQ(1200, token_bucket.consume(1200, false));
EXPECT_EQ(0, token_bucket.consume(1, false));

std::vector<std::thread> threads;
auto timeout_point = real_time_source.monotonicTime() + std::chrono::seconds(30);

size_t thread_1_token = 0;
threads.push_back(std::thread([&] {
while (thread_1_token < 300 && real_time_source.monotonicTime() < timeout_point) {
thread_1_token += token_bucket.consume(1, false);
}
}));

size_t thread_2_token = 0;
threads.push_back(std::thread([&] {
while (thread_2_token < 300 && real_time_source.monotonicTime() < timeout_point) {
thread_2_token += token_bucket.consume(1, false);
}
}));

size_t thread_3_token = 0;
threads.push_back(std::thread([&] {
while (thread_3_token < 300 && real_time_source.monotonicTime() < timeout_point) {
const size_t left = 300 - thread_3_token;
thread_3_token += token_bucket.consume(std::min<size_t>(left, 2), true);
}
}));

size_t thread_4_token = 0;
threads.push_back(std::thread([&] {
while (thread_4_token < 300 && real_time_source.monotonicTime() < timeout_point) {
const size_t left = 300 - thread_4_token;
thread_4_token += token_bucket.consume(std::min<size_t>(left, 3), true);
}
}));

// Fill the buckets by changing the time.
for (size_t i = 0; i < 200; i++) {
time_system_.advanceTimeWait(std::chrono::seconds(1));
}
for (size_t i = 0; i < 100; i++) {
time_system_.advanceTimeWait(std::chrono::seconds(10));
}

for (auto& thread : threads) {
thread.join();
}

EXPECT_EQ(1200, thread_1_token + thread_2_token + thread_3_token + thread_4_token);

EXPECT_EQ(0, token_bucket.consume(1, false));
}

} // namespace Envoy

0 comments on commit 23c5d98

Please sign in to comment.