From 23c5d98b88ef503e432e08a4fff9eec6135dbeea Mon Sep 17 00:00:00 2001 From: code Date: Sun, 14 Jul 2024 09:15:32 +0800 Subject: [PATCH] token bucket: new atomic token bucket implementation (#34789) Commit Message: token bucket: new atomic token bucket implementation Additional Description: The main target of this new token bucket is to replace the timer based token buckets in the local rate limit filter. **The timer based one depends on the main thread to do the refilling periodically. If the main thread is occupied by other tasks, the token buckets won't be refreshed in timely and will effects users' traffics directly.** See https://github.com/envoyproxy/envoy/issues/34774. Actually, I don't know why we design the token bucket with the timer at the initial local rate limit filter. We had the `SharedTokenBucketImpl` as the thread safe token buckets. So, I assume may be we don't want the lock in the local rate limit filter? Risk Level: low, only new implementation of token buckets but haven't been used. Testing: unit. Docs Changes: n/a. Release Notes: n/a. Platform Specific Features: n/a. --------- Signed-off-by: wbpcode Signed-off-by: wbpcode Co-authored-by: wbpcode --- source/common/common/token_bucket_impl.cc | 42 +++++ source/common/common/token_bucket_impl.h | 88 ++++++++++ test/common/common/BUILD | 1 + test/common/common/token_bucket_impl_test.cc | 169 +++++++++++++++++++ 4 files changed, 300 insertions(+) diff --git a/source/common/common/token_bucket_impl.cc b/source/common/common/token_bucket_impl.cc index f813d426d04f..b3d4f10e78bb 100644 --- a/source/common/common/token_bucket_impl.cc +++ b/source/common/common/token_bucket_impl.cc @@ -1,5 +1,6 @@ #include "source/common/common/token_bucket_impl.h" +#include #include namespace Envoy { @@ -7,6 +8,7 @@ namespace Envoy { namespace { // The minimal fill rate will be one second every year. constexpr double kMinFillRate = 1.0 / (365 * 24 * 60 * 60); + } // namespace TokenBucketImpl::TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate) @@ -56,4 +58,44 @@ void TokenBucketImpl::maybeReset(uint64_t num_tokens) { last_fill_ = time_source_.monotonicTime(); } +AtomicTokenBucketImpl::AtomicTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, + double fill_rate, bool init_fill) + : max_tokens_(max_tokens), fill_rate_(std::max(std::abs(fill_rate), kMinFillRate)), + time_source_(time_source) { + auto time_in_seconds = timeNowInSeconds(); + if (init_fill) { + time_in_seconds -= max_tokens_ / fill_rate_; + } + time_in_seconds_.store(time_in_seconds, std::memory_order_relaxed); +} + +bool AtomicTokenBucketImpl::consume() { + constexpr auto consumed_cb = [](double total_tokens) -> double { + return total_tokens >= 1 ? 1 : 0; + }; + return consume(consumed_cb) == 1; +} + +uint64_t AtomicTokenBucketImpl::consume(uint64_t tokens, bool allow_partial) { + const auto consumed_cb = [tokens, allow_partial](double total_tokens) { + const auto consumed = static_cast(tokens); + if (total_tokens >= consumed) { + return consumed; // There are enough tokens to consume. + } + // If allow_partial is true, consume all available tokens. + return allow_partial ? std::max(0, std::floor(total_tokens)) : 0; + }; + return static_cast(consume(consumed_cb)); +} + +double AtomicTokenBucketImpl::remainingTokens() const { + const double time_now = timeNowInSeconds(); + const double time_old = time_in_seconds_.load(std::memory_order_relaxed); + return std::min(max_tokens_, (time_now - time_old) * fill_rate_); +} + +double AtomicTokenBucketImpl::timeNowInSeconds() const { + return std::chrono::duration(time_source_.monotonicTime().time_since_epoch()).count(); +} + } // namespace Envoy diff --git a/source/common/common/token_bucket_impl.h b/source/common/common/token_bucket_impl.h index 96ac238e3778..673f0a74e2b3 100644 --- a/source/common/common/token_bucket_impl.h +++ b/source/common/common/token_bucket_impl.h @@ -35,4 +35,92 @@ class TokenBucketImpl : public TokenBucket { TimeSource& time_source_; }; +/** + * Atomic token bucket. This class is thread-safe. + */ +class AtomicTokenBucketImpl { +public: + /** + * @param max_tokens supplies the maximum number of tokens in the bucket. + * @param time_source supplies the time source. + * @param fill_rate supplies the number of tokens that will return to the bucket on each second. + * The default is 1. + * @param init_fill supplies whether the bucket should be initialized with max_tokens. + */ + explicit AtomicTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, + double fill_rate = 1.0, bool init_fill = true); + + // This reference https://github.com/facebook/folly/blob/main/folly/TokenBucket.h. + template double consume(const GetConsumedTokens& cb) { + const double time_now = timeNowInSeconds(); + + double time_old = time_in_seconds_.load(std::memory_order_relaxed); + double time_new{}; + double consumed{}; + do { + const double total_tokens = std::min(max_tokens_, (time_now - time_old) * fill_rate_); + if (consumed = cb(total_tokens); consumed == 0) { + return 0; + } + + // There are two special cases that should rarely happen in practice but we will not + // prevent them in this common template method: + // The consumed is negative. It means the token is added back to the bucket. + // The consumed is larger than total_tokens. It means the bucket is overflowed and future + // tokens are consumed. + + // Move the time_in_seconds_ forward by the number of tokens consumed. + const double total_tokens_new = total_tokens - consumed; + time_new = time_now - (total_tokens_new / fill_rate_); + } while ( + !time_in_seconds_.compare_exchange_weak(time_old, time_new, std::memory_order_relaxed)); + + return consumed; + } + + /** + * Consumes one tokens from the bucket. + * @return true if the token is consumed, false otherwise. + */ + bool consume(); + + /** + * Consumes multiple tokens from the bucket. + * @param tokens the number of tokens to consume. + * @param allow_partial whether to allow partial consumption. + * @return the number of tokens consumed. + */ + uint64_t consume(uint64_t tokens, bool allow_partial); + + /** + * Get the maximum number of tokens in the bucket. The actual maximum number of tokens in the + * bucket may be changed with the factor. + * @return the maximum number of tokens in the bucket. + */ + double maxTokens() const { return max_tokens_; } + + /** + * Get the fill rate of the bucket. This is a constant for the lifetime of the bucket. But note + * the actual used fill rate will multiply the dynamic factor. + * @return the fill rate of the bucket. + */ + double fillRate() const { return fill_rate_; } + + /** + * Get the remaining number of tokens in the bucket. This is a snapshot and may change after the + * call. + * @return the remaining number of tokens in the bucket. + */ + double remainingTokens() const; + +private: + double timeNowInSeconds() const; + + const double max_tokens_; + const double fill_rate_; + + std::atomic time_in_seconds_{}; + TimeSource& time_source_; +}; + } // namespace Envoy diff --git a/test/common/common/BUILD b/test/common/common/BUILD index 3bfa5d707f1a..23daa9263de6 100644 --- a/test/common/common/BUILD +++ b/test/common/common/BUILD @@ -346,6 +346,7 @@ envoy_cc_test( deps = [ "//source/common/common:token_bucket_impl_lib", "//test/test_common:simulated_time_system_lib", + "//test/test_common:test_time_lib", "//test/test_common:utility_lib", ], ) diff --git a/test/common/common/token_bucket_impl_test.cc b/test/common/common/token_bucket_impl_test.cc index 66308cfded3a..ce20de2cfb28 100644 --- a/test/common/common/token_bucket_impl_test.cc +++ b/test/common/common/token_bucket_impl_test.cc @@ -3,6 +3,7 @@ #include "source/common/common/token_bucket_impl.h" #include "test/test_common/simulated_time_system.h" +#include "test/test_common/test_time.h" #include "gtest/gtest.h" @@ -126,4 +127,172 @@ TEST_F(TokenBucketImplTest, YearlyMinRefillRate) { EXPECT_EQ(1, token_bucket.consume(1, false)); } +class AtomicTokenBucketImplTest : public testing::Test { +protected: + Event::SimulatedTimeSystem time_system_; +}; + +// Verifies TokenBucket initialization. +TEST_F(AtomicTokenBucketImplTest, Initialization) { + AtomicTokenBucketImpl token_bucket{1, time_system_, -1.0}; + + EXPECT_EQ(1, token_bucket.fillRate()); + EXPECT_EQ(1, token_bucket.maxTokens()); + EXPECT_EQ(1, token_bucket.remainingTokens()); + + EXPECT_EQ(1, token_bucket.consume(1, false)); + EXPECT_EQ(0, token_bucket.consume(1, false)); + EXPECT_EQ(false, token_bucket.consume()); +} + +// Verifies TokenBucket's maximum capacity. +TEST_F(AtomicTokenBucketImplTest, MaxBucketSize) { + AtomicTokenBucketImpl token_bucket{3, time_system_, 1}; + + EXPECT_EQ(1, token_bucket.fillRate()); + EXPECT_EQ(3, token_bucket.maxTokens()); + EXPECT_EQ(3, token_bucket.remainingTokens()); + + EXPECT_EQ(3, token_bucket.consume(3, false)); + time_system_.setMonotonicTime(std::chrono::seconds(10)); + EXPECT_EQ(0, token_bucket.consume(4, false)); + EXPECT_EQ(3, token_bucket.consume(3, false)); +} + +// Verifies that TokenBucket can consume tokens. +TEST_F(AtomicTokenBucketImplTest, Consume) { + AtomicTokenBucketImpl token_bucket{10, time_system_, 1}; + + EXPECT_EQ(0, token_bucket.consume(20, false)); + EXPECT_EQ(9, token_bucket.consume(9, false)); + + // consume() == consume(1, false) + EXPECT_EQ(true, token_bucket.consume()); + + time_system_.setMonotonicTime(std::chrono::milliseconds(999)); + EXPECT_EQ(0, token_bucket.consume(1, false)); + + time_system_.setMonotonicTime(std::chrono::milliseconds(5999)); + EXPECT_EQ(0, token_bucket.consume(6, false)); + + time_system_.setMonotonicTime(std::chrono::milliseconds(6000)); + EXPECT_EQ(6, token_bucket.consume(6, false)); + EXPECT_EQ(0, token_bucket.consume(1, false)); +} + +// Verifies that TokenBucket can refill tokens. +TEST_F(AtomicTokenBucketImplTest, Refill) { + AtomicTokenBucketImpl token_bucket{1, time_system_, 0.5}; + EXPECT_EQ(1, token_bucket.consume(1, false)); + + time_system_.setMonotonicTime(std::chrono::milliseconds(500)); + EXPECT_EQ(0, token_bucket.consume(1, false)); + time_system_.setMonotonicTime(std::chrono::milliseconds(1500)); + EXPECT_EQ(0, token_bucket.consume(1, false)); + time_system_.setMonotonicTime(std::chrono::milliseconds(2000)); + EXPECT_EQ(1, token_bucket.consume(1, false)); +} + +// Test partial consumption of tokens. +TEST_F(AtomicTokenBucketImplTest, PartialConsumption) { + AtomicTokenBucketImpl token_bucket{16, time_system_, 16}; + EXPECT_EQ(16, token_bucket.consume(18, true)); + time_system_.advanceTimeWait(std::chrono::milliseconds(62)); + EXPECT_EQ(0, token_bucket.consume(1, true)); + time_system_.advanceTimeWait(std::chrono::milliseconds(1)); + EXPECT_EQ(1, token_bucket.consume(2, true)); +} + +// Validate that a minimal refresh time is 1 year. +TEST_F(AtomicTokenBucketImplTest, YearlyMinRefillRate) { + constexpr uint64_t seconds_per_year = 365 * 24 * 60 * 60; + // Set the fill rate to be 2 years. + AtomicTokenBucketImpl token_bucket{1, time_system_, 1.0 / (seconds_per_year * 2)}; + + // Consume first token. + EXPECT_EQ(1, token_bucket.consume(1, false)); + + // Less than a year should still have no tokens. + time_system_.setMonotonicTime(std::chrono::seconds(seconds_per_year - 1)); + EXPECT_EQ(0, token_bucket.consume(1, false)); + time_system_.setMonotonicTime(std::chrono::seconds(seconds_per_year)); + EXPECT_EQ(1, token_bucket.consume(1, false)); +} + +TEST_F(AtomicTokenBucketImplTest, ConsumeNegativeTokens) { + AtomicTokenBucketImpl token_bucket{10, time_system_, 1}; + + EXPECT_EQ(3, token_bucket.consume([](double) { return 3; })); + EXPECT_EQ(7, token_bucket.remainingTokens()); + EXPECT_EQ(-3, token_bucket.consume([](double) { return -3; })); + EXPECT_EQ(10, token_bucket.remainingTokens()); +} + +TEST_F(AtomicTokenBucketImplTest, ConsumeSuperLargeTokens) { + AtomicTokenBucketImpl token_bucket{10, time_system_, 1}; + + EXPECT_EQ(100, token_bucket.consume([](double) { return 100; })); + EXPECT_EQ(-90, token_bucket.remainingTokens()); +} + +TEST_F(AtomicTokenBucketImplTest, MultipleThreadsConsume) { + // Real time source to ensure we will not fall into endless loop. + Event::TestRealTimeSystem real_time_source; + + AtomicTokenBucketImpl token_bucket{1200, time_system_, 1.0}; + + // Exhaust all tokens. + EXPECT_EQ(1200, token_bucket.consume(1200, false)); + EXPECT_EQ(0, token_bucket.consume(1, false)); + + std::vector threads; + auto timeout_point = real_time_source.monotonicTime() + std::chrono::seconds(30); + + size_t thread_1_token = 0; + threads.push_back(std::thread([&] { + while (thread_1_token < 300 && real_time_source.monotonicTime() < timeout_point) { + thread_1_token += token_bucket.consume(1, false); + } + })); + + size_t thread_2_token = 0; + threads.push_back(std::thread([&] { + while (thread_2_token < 300 && real_time_source.monotonicTime() < timeout_point) { + thread_2_token += token_bucket.consume(1, false); + } + })); + + size_t thread_3_token = 0; + threads.push_back(std::thread([&] { + while (thread_3_token < 300 && real_time_source.monotonicTime() < timeout_point) { + const size_t left = 300 - thread_3_token; + thread_3_token += token_bucket.consume(std::min(left, 2), true); + } + })); + + size_t thread_4_token = 0; + threads.push_back(std::thread([&] { + while (thread_4_token < 300 && real_time_source.monotonicTime() < timeout_point) { + const size_t left = 300 - thread_4_token; + thread_4_token += token_bucket.consume(std::min(left, 3), true); + } + })); + + // Fill the buckets by changing the time. + for (size_t i = 0; i < 200; i++) { + time_system_.advanceTimeWait(std::chrono::seconds(1)); + } + for (size_t i = 0; i < 100; i++) { + time_system_.advanceTimeWait(std::chrono::seconds(10)); + } + + for (auto& thread : threads) { + thread.join(); + } + + EXPECT_EQ(1200, thread_1_token + thread_2_token + thread_3_token + thread_4_token); + + EXPECT_EQ(0, token_bucket.consume(1, false)); +} + } // namespace Envoy