From 8cc4f4849b694087998f0b03e8fb568c807251f1 Mon Sep 17 00:00:00 2001 From: Yenda Li Date: Thu, 21 Nov 2024 14:07:14 -0800 Subject: [PATCH] feat: Update throttler to support network signal (#11611) Summary: Storage systems may return a network signal telling us that the backbone has transient issues. We should respect this signal and backoff Reviewed By: arhimondr Differential Revision: D66272632 --- velox/common/base/Counters.h | 3 + velox/docs/monitoring/metrics.rst | 4 +- velox/dwio/common/Throttler.cpp | 161 +++++++++++------ velox/dwio/common/Throttler.h | 31 +++- velox/dwio/common/tests/ThrottlerTest.cpp | 199 ++++++++++++++++++---- 5 files changed, 305 insertions(+), 93 deletions(-) diff --git a/velox/common/base/Counters.h b/velox/common/base/Counters.h index b72060c6c99b1..8fba41378f9fa 100644 --- a/velox/common/base/Counters.h +++ b/velox/common/base/Counters.h @@ -357,4 +357,7 @@ constexpr folly::StringPiece kMetricStorageLocalThrottled{ constexpr folly::StringPiece kMetricStorageGlobalThrottled{ "velox.storage_global_throttled_count"}; + +constexpr folly::StringPiece kMetricStorageNetworkThrottled{ + "velox.storage_network_throttled_count"}; } // namespace facebook::velox diff --git a/velox/docs/monitoring/metrics.rst b/velox/docs/monitoring/metrics.rst index 4d80a25229ac0..6fea60a7f03f4 100644 --- a/velox/docs/monitoring/metrics.rst +++ b/velox/docs/monitoring/metrics.rst @@ -435,7 +435,9 @@ Storage * - storage_global_throttled_count - Count - The number of times that storage IOs get throttled in a storage cluster. - + * - storage_network_throttled_count + - Count + - The number of times that storage IOs get throttled in a storage cluster because of network. Spilling -------- diff --git a/velox/dwio/common/Throttler.cpp b/velox/dwio/common/Throttler.cpp index 55dce7003166d..398a98d196050 100644 --- a/velox/dwio/common/Throttler.cpp +++ b/velox/dwio/common/Throttler.cpp @@ -40,6 +40,7 @@ Throttler::Config::Config( double _backoffScaleFactor, uint32_t _minLocalThrottledSignals, uint32_t _minGlobalThrottledSignals, + uint32_t _minNetworkThrottledSignals, uint32_t _maxCacheEntries, uint32_t _cacheTTLMs) : throttleEnabled(_throttleEnabled), @@ -48,18 +49,20 @@ Throttler::Config::Config( backoffScaleFactor(_backoffScaleFactor), minLocalThrottledSignals(_minLocalThrottledSignals), minGlobalThrottledSignals(_minGlobalThrottledSignals), + minNetworkThrottledSignals(_minNetworkThrottledSignals), maxCacheEntries(_maxCacheEntries), cacheTTLMs(_cacheTTLMs) {} std::string Throttler::Config::toString() const { return fmt::format( - "throttleEnabled:{} minThrottleBackoffMs:{} maxThrottleBackoffMs:{} backoffScaleFactor:{} minLocalThrottledSignals:{} minGlobalThrottledSignals:{} maxCacheEntries:{} cacheTTLMs:{}", + "throttleEnabled:{} minThrottleBackoffMs:{} maxThrottleBackoffMs:{} backoffScaleFactor:{} minLocalThrottledSignals:{} minGlobalThrottledSignals:{} minNetworkThrottledSignals:{} maxCacheEntries:{} cacheTTLMs:{}", throttleEnabled, succinctMillis(minThrottleBackoffMs), succinctMillis(maxThrottleBackoffMs), backoffScaleFactor, minLocalThrottledSignals, minGlobalThrottledSignals, + minNetworkThrottledSignals, maxCacheEntries, succinctMillis(cacheTTLMs)); }; @@ -72,6 +75,8 @@ std::string Throttler::signalTypeName(SignalType type) { return "Local"; case SignalType::kGlobal: return "Global"; + case SignalType::kNetwork: + return "Network"; default: return fmt::format("Unknown Signal Type: {}", static_cast(type)); } @@ -103,24 +108,21 @@ Throttler::Throttler(const Config& config) minThrottleBackoffDurationMs_(config.minThrottleBackoffMs), maxThrottleBackoffDurationMs_(config.maxThrottleBackoffMs), backoffScaleFactor_(config.backoffScaleFactor), - minLocalThrottledSignalsToBackoff_(config.minLocalThrottledSignals), - minGlobalThrottledSignalsToBackoff_(config.minGlobalThrottledSignals), - localThrottleCache_( - !throttleEnabled_ - ? nullptr - : new ThrottleSignalFactory{std::make_unique>( - config.maxCacheEntries, - config.cacheTTLMs), - std::unique_ptr{ - new ThrottleSignalGenerator{}}}), - globalThrottleCache_( - !throttleEnabled_ - ? nullptr - : new ThrottleSignalFactory{std::make_unique>( - config.maxCacheEntries, - config.cacheTTLMs), - std::unique_ptr{ - new ThrottleSignalGenerator{}}}) { + localThrottleCache_(maybeMakeThrottleSignalCache( + config.throttleEnabled, + config.minLocalThrottledSignals, + config.maxCacheEntries, + config.cacheTTLMs)), + globalThrottleCache_(maybeMakeThrottleSignalCache( + config.throttleEnabled, + config.minGlobalThrottledSignals, + config.maxCacheEntries, + config.cacheTTLMs)), + networkThrottleCache_(maybeMakeThrottleSignalCache( + config.throttleEnabled, + config.minNetworkThrottledSignals, + config.maxCacheEntries, + config.cacheTTLMs)) { LOG(INFO) << "IO throttler config: " << config.toString(); } @@ -149,12 +151,22 @@ void Throttler::updateThrottleStats(SignalType type, uint64_t backoffDelayMs) { stats_.backOffDelay.increment(backoffDelayMs); RECORD_HISTOGRAM_METRIC_VALUE( kMetricStorageThrottledDurationMs, backoffDelayMs); - if (type == SignalType::kLocal) { - ++stats_.localThrottled; - RECORD_METRIC_VALUE(kMetricStorageLocalThrottled); - } else { - ++stats_.globalThrottled; - RECORD_METRIC_VALUE(kMetricStorageGlobalThrottled); + + switch (type) { + case SignalType::kLocal: + ++stats_.localThrottled; + RECORD_METRIC_VALUE(kMetricStorageLocalThrottled); + break; + case SignalType::kGlobal: + ++stats_.globalThrottled; + RECORD_METRIC_VALUE(kMetricStorageGlobalThrottled); + break; + case SignalType::kNetwork: + ++stats_.networkThrottled; + RECORD_METRIC_VALUE(kMetricStorageNetworkThrottled); + break; + default: + break; } } @@ -163,48 +175,78 @@ void Throttler::updateThrottleCacheLocked( const std::string& cluster, const std::string& directory, CachedThrottleSignalPtr& localSignal, - CachedThrottleSignalPtr& globalSignal) { + CachedThrottleSignalPtr& globalSignal, + CachedThrottleSignalPtr& networkSignal) { VELOX_CHECK(throttleEnabled()); - - if (type == SignalType::kLocal) { - if (localSignal.get() == nullptr) { - localThrottleCache_->generate(localThrottleCacheKey(cluster, directory)); - } else { - ++localSignal->count; - } - } else { - if (globalSignal.get() == nullptr) { - globalThrottleCache_->generate(cluster); - } else { - ++globalSignal->count; - } - } + switch (type) { + case SignalType::kLocal: + if (localSignal.get() == nullptr) { + localThrottleCache_.throttleCache->generate( + localThrottleCacheKey(cluster, directory)); + } else { + ++localSignal->count; + } + return; + case SignalType::kGlobal: + if (globalSignal.get() == nullptr) { + globalThrottleCache_.throttleCache->generate(cluster); + } else { + ++globalSignal->count; + } + return; + case SignalType::kNetwork: + if (networkSignal.get() == nullptr) { + networkThrottleCache_.throttleCache->generate(cluster); + } else { + ++networkSignal->count; + } + return; + default: + VELOX_UNREACHABLE("Invalid type provided: {}", signalTypeName(type)); + }; } uint64_t Throttler::calculateBackoffDurationAndUpdateThrottleCache( SignalType type, const std::string& cluster, - const std::string& directoy) { + const std::string& directory) { std::lock_guard l(mu_); - // Gets maximum count of local and global throttle signals in Cache. - auto localThrottleCachePtr = - localThrottleCache_->get(localThrottleCacheKey(cluster, directoy)); - int64_t localThrottleCount = + // Gets maximum count of local, global, and network throttle signals in Cache. + auto localThrottleCachePtr = localThrottleCache_.throttleCache->get( + localThrottleCacheKey(cluster, directory)); + const int64_t localThrottleCount = (localThrottleCachePtr.get() != nullptr ? localThrottleCachePtr->count : 0) + - (type == SignalType::kLocal ? 1 : 0) - minLocalThrottledSignalsToBackoff_; - auto globalThrottleCachePtr = globalThrottleCache_->get(cluster); + (type == SignalType::kLocal ? 1 : 0) - + localThrottleCache_.minThrottledSignalsToBackOff; + + auto globalThrottleCachePtr = + globalThrottleCache_.throttleCache->get(cluster); const int64_t globalThrottleCount = (globalThrottleCachePtr.get() != nullptr ? globalThrottleCachePtr->count : 0) + (type == SignalType::kGlobal ? 1 : 0) - - minGlobalThrottledSignalsToBackoff_; + globalThrottleCache_.minThrottledSignalsToBackOff; + + auto networkThrottleCachePtr = + networkThrottleCache_.throttleCache->get(cluster); + const int64_t networkThrottleCount = + (networkThrottleCachePtr.get() != nullptr ? networkThrottleCachePtr->count + : 0) + + (type == SignalType::kNetwork ? 1 : 0) - + networkThrottleCache_.minThrottledSignalsToBackOff; + // Update throttling signal cache. updateThrottleCacheLocked( - type, cluster, directoy, localThrottleCachePtr, globalThrottleCachePtr); + type, + cluster, + directory, + localThrottleCachePtr, + globalThrottleCachePtr, + networkThrottleCachePtr); - const int64_t throttleAttempts = - std::max(localThrottleCount, globalThrottleCount); + const int64_t throttleAttempts = std::max( + networkThrottleCount, std::max(localThrottleCount, globalThrottleCount)); // Calculates the delay with exponential backoff if (throttleAttempts <= 0) { @@ -229,4 +271,21 @@ Throttler::ThrottleSignalGenerator::operator()( const void* /*unused*/) { return std::unique_ptr(new ThrottleSignal{1}); } + +/* static */ +Throttler::ThrottleSignalCache Throttler::maybeMakeThrottleSignalCache( + bool enabled, + uint32_t minThrottledSignals, + uint32_t maxCacheEntries, + uint32_t cacheTTLMs) { + return { + .throttleCache = !enabled + ? nullptr + : std::make_unique( + std::make_unique>( + maxCacheEntries, cacheTTLMs), + std::make_unique()), + .minThrottledSignalsToBackOff = minThrottledSignals, + }; +} } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/Throttler.h b/velox/dwio/common/Throttler.h index eb5667a59287a..0ebf1e0882057 100644 --- a/velox/dwio/common/Throttler.h +++ b/velox/dwio/common/Throttler.h @@ -51,6 +51,10 @@ class Throttler { /// backoff. uint32_t minGlobalThrottledSignals; + /// The minimum number of received network throttled signals before starting + /// backoff. + uint32_t minNetworkThrottledSignals; + /// The maximum number of entries in the throttled signal cache. There is /// one cache for each throttle signal type. For local throttle signal /// cache, each cache entry corresponds to a unqiue file direcotry in a @@ -68,6 +72,7 @@ class Throttler { static constexpr double kBackoffScaleFactorDefault{2.0}; static constexpr uint32_t kMinLocalThrottledSignalsDefault{1'000}; static constexpr uint32_t kMinGlobalThrottledSignalsDefault{100'000}; + static constexpr uint32_t kMinNetworkThrottledSignal{1'000}; static constexpr uint32_t kMaxCacheEntriesDefault{10'000}; static constexpr uint32_t kCacheTTLMsDefault{3 * 60 * 1'000}; @@ -78,6 +83,7 @@ class Throttler { double backoffScaleFactor = kBackoffScaleFactorDefault, uint32_t minLocalThrottledSignals = kMinLocalThrottledSignalsDefault, uint32_t minGlobalThrottledSignals = kMinGlobalThrottledSignalsDefault, + uint32_t minNetworkThrottledSignals = kMinNetworkThrottledSignal, uint32_t maxCacheEntries = kMaxCacheEntriesDefault, uint32_t cacheTTLMs = kCacheTTLMsDefault); @@ -88,6 +94,7 @@ class Throttler { struct Stats { std::atomic_uint64_t localThrottled{0}; std::atomic_uint64_t globalThrottled{0}; + std::atomic_uint64_t networkThrottled{0}; /// Counts the backoff delay in milliseconds. io::IoCounter backOffDelay; }; @@ -104,6 +111,8 @@ class Throttler { kLocal, /// A cluster-wise throttled signal. kGlobal, + /// Network throttled signal. + kNetwork, }; static std::string signalTypeName(SignalType type); @@ -174,25 +183,37 @@ class Throttler { using ThrottleSignalFactory = facebook::velox:: CachedFactory; + struct ThrottleSignalCache { + std::unique_ptr throttleCache; + uint32_t minThrottledSignalsToBackOff; + }; + void updateThrottleCacheLocked( SignalType type, const std::string& cluster, const std::string& directory, CachedThrottleSignalPtr& localSignal, - CachedThrottleSignalPtr& globalSignal); + CachedThrottleSignalPtr& globalSignal, + CachedThrottleSignalPtr& networkSignal); void updateThrottleStats(SignalType type, uint64_t backoffDelayMs); + static ThrottleSignalCache maybeMakeThrottleSignalCache( + bool enabled, + uint32_t minThrottledSignals, + uint32_t maxCacheEntries, + uint32_t cacheTTLMs); + static const uint64_t kNoBackOffMs_{0}; const bool throttleEnabled_; const uint64_t minThrottleBackoffDurationMs_; const uint64_t maxThrottleBackoffDurationMs_; const double backoffScaleFactor_; - const uint32_t minLocalThrottledSignalsToBackoff_; - const uint32_t minGlobalThrottledSignalsToBackoff_; - const std::unique_ptr localThrottleCache_; - const std::unique_ptr globalThrottleCache_; + + const ThrottleSignalCache localThrottleCache_; + const ThrottleSignalCache globalThrottleCache_; + const ThrottleSignalCache networkThrottleCache_; mutable std::mutex mu_; diff --git a/velox/dwio/common/tests/ThrottlerTest.cpp b/velox/dwio/common/tests/ThrottlerTest.cpp index 6a3b62e1f5103..773e8d0dd70eb 100644 --- a/velox/dwio/common/tests/ThrottlerTest.cpp +++ b/velox/dwio/common/tests/ThrottlerTest.cpp @@ -27,25 +27,31 @@ namespace { class ThrottlerTest : public testing::Test { protected: static Throttler::Config throttleConfig(uint32_t cacheTTLMs = 3'600 * 1'000) { - return Throttler::Config(true, 1, 4, 2.0, 10, 40, 4, cacheTTLMs); + return Throttler::Config(true, 1, 4, 2.0, 10, 40, 10, 4, cacheTTLMs); } void SetUp() override { Throttler::testingReset(); } + static constexpr std::array kSignalTypes{ + Throttler::SignalType::kLocal, + Throttler::SignalType::kGlobal, + Throttler::SignalType::kNetwork}; }; TEST_F(ThrottlerTest, config) { const auto config = throttleConfig(); ASSERT_EQ( config.toString(), - "throttleEnabled:true minThrottleBackoffMs:1ms maxThrottleBackoffMs:4ms backoffScaleFactor:2 minLocalThrottledSignals:10 minGlobalThrottledSignals:40 maxCacheEntries:4 cacheTTLMs:1h 0m 0s"); + "throttleEnabled:true minThrottleBackoffMs:1ms maxThrottleBackoffMs:4ms backoffScaleFactor:2 minLocalThrottledSignals:10 minGlobalThrottledSignals:40 minNetworkThrottledSignals:10 maxCacheEntries:4 cacheTTLMs:1h 0m 0s"); } TEST_F(ThrottlerTest, signalType) { ASSERT_EQ(Throttler::signalTypeName(Throttler::SignalType::kLocal), "Local"); ASSERT_EQ( Throttler::signalTypeName(Throttler::SignalType::kGlobal), "Global"); + ASSERT_EQ( + Throttler::signalTypeName(Throttler::SignalType::kNetwork), "Network"); ASSERT_EQ(Throttler::signalTypeName(Throttler::SignalType::kNone), "None"); ASSERT_EQ( Throttler::signalTypeName(static_cast(100)), @@ -70,21 +76,17 @@ TEST_F(ThrottlerTest, throttleDisabled) { auto* instance = Throttler::instance(); for (int i = 1; i <= 100; ++i) { ASSERT_EQ( - instance->throttleBackoff( - i % 2 ? Throttler::SignalType::kLocal - : Throttler::SignalType::kGlobal, - cluster, - directory), - 0); + instance->throttleBackoff(kSignalTypes[i % 3], cluster, directory), 0); } const auto& stats = instance->stats(); ASSERT_EQ(stats.localThrottled, 0); ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); ASSERT_EQ(stats.backOffDelay.count(), 0); } TEST_F(ThrottlerTest, noThrottlerSignal) { - Throttler::init(Throttler::Config(true, 100, 200, 2.0, 10, 1'000)); + Throttler::init(Throttler::Config(true, 100, 200, 2.0, 10, 1'000, 10)); const std::string cluster{"noThrottlerSignal"}; const std::string directory{"noThrottlerSignal"}; auto* instance = Throttler::instance(); @@ -97,14 +99,15 @@ TEST_F(ThrottlerTest, noThrottlerSignal) { const auto& stats = instance->stats(); ASSERT_EQ(stats.localThrottled, 0); ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); ASSERT_EQ(stats.backOffDelay.count(), 0); } TEST_F(ThrottlerTest, throttle) { const uint64_t minThrottleBackoffMs = 1'000; const uint64_t maxThrottleBackoffMs = 2'000; - for (const bool global : {true, false}) { - SCOPED_TRACE(fmt::format("global {}", global)); + for (const auto signal : kSignalTypes) { + SCOPED_TRACE(fmt::format("signal: {}", Throttler::signalTypeName(signal))); Throttler::testingReset(); Throttler::init(Throttler::Config( @@ -112,37 +115,44 @@ TEST_F(ThrottlerTest, throttle) { minThrottleBackoffMs, maxThrottleBackoffMs, 2.0, - global ? 1'0000 : 2, - global ? 2 : 1'0000)); + signal == Throttler::SignalType::kLocal ? 2 : 1'000, + signal == Throttler::SignalType::kGlobal ? 2 : 1'000, + signal == Throttler::SignalType::kNetwork ? 2 : 1'000)); auto* instance = Throttler::instance(); const auto& stats = instance->stats(); ASSERT_EQ(stats.localThrottled, 0); ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); ASSERT_EQ(stats.backOffDelay.count(), 0); - const Throttler::SignalType type = - global ? Throttler::SignalType::kGlobal : Throttler::SignalType::kLocal; const std::string cluster{"throttle"}; const std::string directory{"throttle"}; - ASSERT_EQ(instance->throttleBackoff(type, cluster, directory), 0); - ASSERT_EQ(instance->throttleBackoff(type, cluster, directory), 0); + ASSERT_EQ(instance->throttleBackoff(signal, cluster, directory), 0); + ASSERT_EQ(instance->throttleBackoff(signal, cluster, directory), 0); ASSERT_EQ(stats.localThrottled, 0); ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); ASSERT_EQ(stats.backOffDelay.count(), 0); uint64_t measuredBackOffMs{0}; uint64_t firstBackoffMs{0}; { MicrosecondTimer timer(&measuredBackOffMs); - firstBackoffMs = instance->throttleBackoff(type, cluster, directory); + firstBackoffMs = instance->throttleBackoff(signal, cluster, directory); } ASSERT_LE(firstBackoffMs, maxThrottleBackoffMs); ASSERT_GE(firstBackoffMs, minThrottleBackoffMs); ASSERT_GE(measuredBackOffMs, firstBackoffMs); - ASSERT_EQ(stats.localThrottled, global ? 0 : 1); - ASSERT_EQ(stats.globalThrottled, global ? 1 : 0); + ASSERT_EQ( + stats.localThrottled, signal == Throttler::SignalType::kLocal ? 1 : 0); + ASSERT_EQ( + stats.globalThrottled, + signal == Throttler::SignalType::kGlobal ? 1 : 0); + ASSERT_EQ( + stats.networkThrottled, + signal == Throttler::SignalType::kNetwork ? 1 : 0); ASSERT_EQ(stats.backOffDelay.count(), 1); ASSERT_EQ(stats.backOffDelay.sum(), firstBackoffMs); @@ -150,15 +160,21 @@ TEST_F(ThrottlerTest, throttle) { uint64_t secondBackoffMs{0}; { MicrosecondTimer timer(&measuredBackOffMs); - secondBackoffMs = instance->throttleBackoff(type, cluster, directory); + secondBackoffMs = instance->throttleBackoff(signal, cluster, directory); } ASSERT_LE(secondBackoffMs, maxThrottleBackoffMs); ASSERT_GE(secondBackoffMs, minThrottleBackoffMs); ASSERT_GE(measuredBackOffMs, secondBackoffMs); ASSERT_LT(firstBackoffMs, secondBackoffMs); - ASSERT_EQ(stats.localThrottled, global ? 0 : 2); - ASSERT_EQ(stats.globalThrottled, global ? 2 : 0); + ASSERT_EQ( + stats.localThrottled, signal == Throttler::SignalType::kLocal ? 2 : 0); + ASSERT_EQ( + stats.globalThrottled, + signal == Throttler::SignalType::kGlobal ? 2 : 0); + ASSERT_EQ( + stats.networkThrottled, + signal == Throttler::SignalType::kNetwork ? 2 : 0); ASSERT_EQ(stats.backOffDelay.count(), 2); ASSERT_EQ(stats.backOffDelay.sum(), firstBackoffMs + secondBackoffMs); } @@ -167,42 +183,44 @@ TEST_F(ThrottlerTest, throttle) { TEST_F(ThrottlerTest, expire) { const uint64_t minThrottleBackoffMs = 1'00; const uint64_t maxThrottleBackoffMs = 2'00; - for (const bool global : {true, false}) { - SCOPED_TRACE(fmt::format("global {}", global)); + for (const auto signal : kSignalTypes) { + SCOPED_TRACE(fmt::format("signal: {}", Throttler::signalTypeName(signal))); Throttler::testingReset(); Throttler::init(Throttler::Config( true, minThrottleBackoffMs, maxThrottleBackoffMs, 2.0, - global ? 1'0000 : 2, - global ? 2 : 1'0000, + signal == Throttler::SignalType::kLocal ? 2 : 1'000, + signal == Throttler::SignalType::kGlobal ? 2 : 1'000, + signal == Throttler::SignalType::kNetwork ? 2 : 1'000, 1'000, 1'000)); auto* instance = Throttler::instance(); const auto& stats = instance->stats(); ASSERT_EQ(stats.localThrottled, 0); ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); ASSERT_EQ(stats.backOffDelay.count(), 0); - const Throttler::SignalType type = - global ? Throttler::SignalType::kGlobal : Throttler::SignalType::kLocal; const std::string cluster{"expire"}; const std::string directory{"expire"}; - ASSERT_EQ(instance->throttleBackoff(type, cluster, directory), 0); - ASSERT_EQ(instance->throttleBackoff(type, cluster, directory), 0); + ASSERT_EQ(instance->throttleBackoff(signal, cluster, directory), 0); + ASSERT_EQ(instance->throttleBackoff(signal, cluster, directory), 0); ASSERT_EQ(stats.localThrottled, 0); ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); ASSERT_EQ(stats.backOffDelay.count(), 0); std::this_thread::sleep_for(std::chrono::seconds(2)); // NOLINT - ASSERT_EQ(instance->throttleBackoff(type, cluster, directory), 0); - ASSERT_EQ(instance->throttleBackoff(type, cluster, directory), 0); + ASSERT_EQ(instance->throttleBackoff(signal, cluster, directory), 0); + ASSERT_EQ(instance->throttleBackoff(signal, cluster, directory), 0); ASSERT_EQ(stats.localThrottled, 0); ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); ASSERT_EQ(stats.backOffDelay.count(), 0); } } @@ -216,6 +234,7 @@ TEST_F(ThrottlerTest, differentLocals) { const auto& stats = instance->stats(); ASSERT_EQ(stats.localThrottled, 0); ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); ASSERT_EQ(stats.backOffDelay.count(), 0); const std::string cluster1{"differentLocals1"}; @@ -231,6 +250,7 @@ TEST_F(ThrottlerTest, differentLocals) { ASSERT_EQ(stats.localThrottled, 0); ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); ASSERT_EQ(stats.backOffDelay.count(), 0); const std::string directory2{"differentLocals2"}; @@ -245,6 +265,7 @@ TEST_F(ThrottlerTest, differentLocals) { ASSERT_EQ(stats.localThrottled, 0); ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); ASSERT_EQ(stats.backOffDelay.count(), 0); const auto path1firstBackoffMs = instance->throttleBackoff( @@ -254,6 +275,7 @@ TEST_F(ThrottlerTest, differentLocals) { ASSERT_EQ(stats.localThrottled, 1); ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); ASSERT_EQ(stats.backOffDelay.count(), 1); ASSERT_EQ(stats.backOffDelay.sum(), path1firstBackoffMs); @@ -264,6 +286,7 @@ TEST_F(ThrottlerTest, differentLocals) { ASSERT_EQ(stats.localThrottled, 2); ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); ASSERT_EQ(stats.backOffDelay.count(), 2); ASSERT_EQ( stats.backOffDelay.sum(), path1firstBackoffMs + path2firstBackoffMs); @@ -274,6 +297,7 @@ TEST_F(ThrottlerTest, differentLocals) { ASSERT_EQ(stats.localThrottled, 3); ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); ASSERT_EQ(stats.backOffDelay.count(), 3); ASSERT_EQ( stats.backOffDelay.sum(), @@ -285,6 +309,7 @@ TEST_F(ThrottlerTest, differentLocals) { ASSERT_EQ(stats.localThrottled, 4); ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); ASSERT_EQ(stats.backOffDelay.count(), 4); ASSERT_EQ( stats.backOffDelay.sum(), @@ -301,6 +326,7 @@ TEST_F(ThrottlerTest, differentGlobals) { const auto& stats = instance->stats(); ASSERT_EQ(stats.localThrottled, 0); ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); ASSERT_EQ(stats.backOffDelay.count(), 0); const std::string cluster1{"differentGlobals1"}; @@ -316,6 +342,7 @@ TEST_F(ThrottlerTest, differentGlobals) { ASSERT_EQ(stats.localThrottled, 0); ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); ASSERT_EQ(stats.backOffDelay.count(), 0); const std::string cluster2{"differentGlobals2"}; @@ -331,6 +358,7 @@ TEST_F(ThrottlerTest, differentGlobals) { ASSERT_EQ(stats.localThrottled, 0); ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); ASSERT_EQ(stats.backOffDelay.count(), 0); const auto path1firstBackoffMs = instance->throttleBackoff( @@ -378,6 +406,105 @@ TEST_F(ThrottlerTest, differentGlobals) { path2SecondBackoffMs); } +TEST_F(ThrottlerTest, differentNetworks) { + const uint64_t minThrottleBackoffMs = 1'000; + const uint64_t maxThrottleBackoffMs = 2'000; + Throttler::init(Throttler::Config( + true, + minThrottleBackoffMs, + maxThrottleBackoffMs, + 2.0, + 1'0000, + 1'0000, + 2)); + auto* instance = Throttler::instance(); + const auto& stats = instance->stats(); + ASSERT_EQ(stats.localThrottled, 0); + ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); + ASSERT_EQ(stats.backOffDelay.count(), 0); + + const std::string cluster1{"differentNetworks1"}; + const std::string directory1{"differentNetworks1"}; + ASSERT_EQ( + instance->throttleBackoff( + Throttler::SignalType::kNetwork, cluster1, directory1), + 0); + ASSERT_EQ( + instance->throttleBackoff( + Throttler::SignalType::kNetwork, cluster1, directory1), + 0); + + ASSERT_EQ(stats.localThrottled, 0); + ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); + ASSERT_EQ(stats.backOffDelay.count(), 0); + + const std::string cluster2{"differentNetworks2"}; + const std::string directory2{"differentGlobals1"}; + ASSERT_EQ( + instance->throttleBackoff( + Throttler::SignalType::kNetwork, cluster2, directory2), + 0); + ASSERT_EQ( + instance->throttleBackoff( + Throttler::SignalType::kNetwork, cluster2, directory2), + 0); + + ASSERT_EQ(stats.localThrottled, 0); + ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 0); + ASSERT_EQ(stats.backOffDelay.count(), 0); + + const auto path1firstBackoffMs = instance->throttleBackoff( + Throttler::SignalType::kNetwork, cluster1, directory1); + ASSERT_GT(path1firstBackoffMs, 0); + ASSERT_LT(path1firstBackoffMs, maxThrottleBackoffMs); + + ASSERT_EQ(stats.localThrottled, 0); + ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 1); + ASSERT_EQ(stats.backOffDelay.count(), 1); + ASSERT_EQ(stats.backOffDelay.sum(), path1firstBackoffMs); + + const auto path2firstBackoffMs = instance->throttleBackoff( + Throttler::SignalType::kNetwork, cluster2, directory2); + ASSERT_GT(path2firstBackoffMs, 0); + ASSERT_LT(path2firstBackoffMs, maxThrottleBackoffMs); + + ASSERT_EQ(stats.localThrottled, 0); + ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 2); + ASSERT_EQ(stats.backOffDelay.count(), 2); + ASSERT_EQ( + stats.backOffDelay.sum(), path1firstBackoffMs + path2firstBackoffMs); + + const auto path1SecondBackoffMs = instance->throttleBackoff( + Throttler::SignalType::kNetwork, cluster1, directory1); + ASSERT_EQ(path1SecondBackoffMs, maxThrottleBackoffMs); + + ASSERT_EQ(stats.localThrottled, 0); + ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 3); + ASSERT_EQ(stats.backOffDelay.count(), 3); + ASSERT_EQ( + stats.backOffDelay.sum(), + path1firstBackoffMs + path2firstBackoffMs + path1SecondBackoffMs); + + const auto path2SecondBackoffMs = instance->throttleBackoff( + Throttler::SignalType::kNetwork, cluster2, directory2); + ASSERT_EQ(path2SecondBackoffMs, maxThrottleBackoffMs); + + ASSERT_EQ(stats.localThrottled, 0); + ASSERT_EQ(stats.globalThrottled, 0); + ASSERT_EQ(stats.networkThrottled, 4); + ASSERT_EQ(stats.backOffDelay.count(), 4); + ASSERT_EQ( + stats.backOffDelay.sum(), + path1firstBackoffMs + path2firstBackoffMs + path1SecondBackoffMs + + path2SecondBackoffMs); +} + TEST_F(ThrottlerTest, maxOfGlobalAndLocal) { const uint64_t minThrottleBackoffMs = 1'000; const uint64_t maxThrottleBackoffMs = 2'000; @@ -496,6 +623,7 @@ TEST_F(ThrottlerTest, fuzz) { const double backoffScaleFactor = 2.0; const uint32_t minLocalThrottledSignals = 10; const uint32_t minGlobalThrottledSignals = 20; + const uint32_t minNetworkThrottledSignals = 10; const uint32_t maxCacheEntries = 64; const uint32_t cacheTTLMs = 10; Throttler::testingReset(); @@ -506,6 +634,7 @@ TEST_F(ThrottlerTest, fuzz) { backoffScaleFactor, minLocalThrottledSignals, minGlobalThrottledSignals, + minNetworkThrottledSignals, maxCacheEntries, cacheTTLMs)); auto* instance = Throttler::instance(); @@ -535,9 +664,7 @@ TEST_F(ThrottlerTest, fuzz) { threads.emplace_back([&]() { folly::Random::DefaultGenerator rng(seed); while (!stopped) { - const Throttler::SignalType type = folly::Random::oneIn(3) - ? Throttler::SignalType::kGlobal - : Throttler::SignalType::kLocal; + const auto type = kSignalTypes[folly::Random::rand32(0, 3)]; const int directoryIndex = folly::Random::rand32(rng) % numDirectories; const int clusterIndex = directoryIndex % numClusters; instance->throttleBackoff(