Skip to content

Commit

Permalink
feat: Update throttler to support network signal (facebookincubator#1…
Browse files Browse the repository at this point in the history
…1611)

Summary:

Storage systems may return a network signal telling us that the backbone has transient issues. We should respect this signal and backoff

Reviewed By: arhimondr

Differential Revision: D66272632
  • Loading branch information
yuandagits authored and facebook-github-bot committed Nov 21, 2024
1 parent af3f638 commit 8cc4f48
Show file tree
Hide file tree
Showing 5 changed files with 305 additions and 93 deletions.
3 changes: 3 additions & 0 deletions velox/common/base/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,4 +357,7 @@ constexpr folly::StringPiece kMetricStorageLocalThrottled{

constexpr folly::StringPiece kMetricStorageGlobalThrottled{
"velox.storage_global_throttled_count"};

constexpr folly::StringPiece kMetricStorageNetworkThrottled{
"velox.storage_network_throttled_count"};
} // namespace facebook::velox
4 changes: 3 additions & 1 deletion velox/docs/monitoring/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,9 @@ Storage
* - storage_global_throttled_count
- Count
- The number of times that storage IOs get throttled in a storage cluster.

* - storage_network_throttled_count
- Count
- The number of times that storage IOs get throttled in a storage cluster because of network.
Spilling
--------

Expand Down
161 changes: 110 additions & 51 deletions velox/dwio/common/Throttler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Throttler::Config::Config(
double _backoffScaleFactor,
uint32_t _minLocalThrottledSignals,
uint32_t _minGlobalThrottledSignals,
uint32_t _minNetworkThrottledSignals,
uint32_t _maxCacheEntries,
uint32_t _cacheTTLMs)
: throttleEnabled(_throttleEnabled),
Expand All @@ -48,18 +49,20 @@ Throttler::Config::Config(
backoffScaleFactor(_backoffScaleFactor),
minLocalThrottledSignals(_minLocalThrottledSignals),
minGlobalThrottledSignals(_minGlobalThrottledSignals),
minNetworkThrottledSignals(_minNetworkThrottledSignals),
maxCacheEntries(_maxCacheEntries),
cacheTTLMs(_cacheTTLMs) {}

std::string Throttler::Config::toString() const {
return fmt::format(
"throttleEnabled:{} minThrottleBackoffMs:{} maxThrottleBackoffMs:{} backoffScaleFactor:{} minLocalThrottledSignals:{} minGlobalThrottledSignals:{} maxCacheEntries:{} cacheTTLMs:{}",
"throttleEnabled:{} minThrottleBackoffMs:{} maxThrottleBackoffMs:{} backoffScaleFactor:{} minLocalThrottledSignals:{} minGlobalThrottledSignals:{} minNetworkThrottledSignals:{} maxCacheEntries:{} cacheTTLMs:{}",
throttleEnabled,
succinctMillis(minThrottleBackoffMs),
succinctMillis(maxThrottleBackoffMs),
backoffScaleFactor,
minLocalThrottledSignals,
minGlobalThrottledSignals,
minNetworkThrottledSignals,
maxCacheEntries,
succinctMillis(cacheTTLMs));
};
Expand All @@ -72,6 +75,8 @@ std::string Throttler::signalTypeName(SignalType type) {
return "Local";
case SignalType::kGlobal:
return "Global";
case SignalType::kNetwork:
return "Network";
default:
return fmt::format("Unknown Signal Type: {}", static_cast<int>(type));
}
Expand Down Expand Up @@ -103,24 +108,21 @@ Throttler::Throttler(const Config& config)
minThrottleBackoffDurationMs_(config.minThrottleBackoffMs),
maxThrottleBackoffDurationMs_(config.maxThrottleBackoffMs),
backoffScaleFactor_(config.backoffScaleFactor),
minLocalThrottledSignalsToBackoff_(config.minLocalThrottledSignals),
minGlobalThrottledSignalsToBackoff_(config.minGlobalThrottledSignals),
localThrottleCache_(
!throttleEnabled_
? nullptr
: new ThrottleSignalFactory{std::make_unique<SimpleLRUCache<std::string, ThrottleSignal>>(
config.maxCacheEntries,
config.cacheTTLMs),
std::unique_ptr<ThrottleSignalGenerator>{
new ThrottleSignalGenerator{}}}),
globalThrottleCache_(
!throttleEnabled_
? nullptr
: new ThrottleSignalFactory{std::make_unique<SimpleLRUCache<std::string, ThrottleSignal>>(
config.maxCacheEntries,
config.cacheTTLMs),
std::unique_ptr<ThrottleSignalGenerator>{
new ThrottleSignalGenerator{}}}) {
localThrottleCache_(maybeMakeThrottleSignalCache(
config.throttleEnabled,
config.minLocalThrottledSignals,
config.maxCacheEntries,
config.cacheTTLMs)),
globalThrottleCache_(maybeMakeThrottleSignalCache(
config.throttleEnabled,
config.minGlobalThrottledSignals,
config.maxCacheEntries,
config.cacheTTLMs)),
networkThrottleCache_(maybeMakeThrottleSignalCache(
config.throttleEnabled,
config.minNetworkThrottledSignals,
config.maxCacheEntries,
config.cacheTTLMs)) {
LOG(INFO) << "IO throttler config: " << config.toString();
}

Expand Down Expand Up @@ -149,12 +151,22 @@ void Throttler::updateThrottleStats(SignalType type, uint64_t backoffDelayMs) {
stats_.backOffDelay.increment(backoffDelayMs);
RECORD_HISTOGRAM_METRIC_VALUE(
kMetricStorageThrottledDurationMs, backoffDelayMs);
if (type == SignalType::kLocal) {
++stats_.localThrottled;
RECORD_METRIC_VALUE(kMetricStorageLocalThrottled);
} else {
++stats_.globalThrottled;
RECORD_METRIC_VALUE(kMetricStorageGlobalThrottled);

switch (type) {
case SignalType::kLocal:
++stats_.localThrottled;
RECORD_METRIC_VALUE(kMetricStorageLocalThrottled);
break;
case SignalType::kGlobal:
++stats_.globalThrottled;
RECORD_METRIC_VALUE(kMetricStorageGlobalThrottled);
break;
case SignalType::kNetwork:
++stats_.networkThrottled;
RECORD_METRIC_VALUE(kMetricStorageNetworkThrottled);
break;
default:
break;
}
}

Expand All @@ -163,48 +175,78 @@ void Throttler::updateThrottleCacheLocked(
const std::string& cluster,
const std::string& directory,
CachedThrottleSignalPtr& localSignal,
CachedThrottleSignalPtr& globalSignal) {
CachedThrottleSignalPtr& globalSignal,
CachedThrottleSignalPtr& networkSignal) {
VELOX_CHECK(throttleEnabled());

if (type == SignalType::kLocal) {
if (localSignal.get() == nullptr) {
localThrottleCache_->generate(localThrottleCacheKey(cluster, directory));
} else {
++localSignal->count;
}
} else {
if (globalSignal.get() == nullptr) {
globalThrottleCache_->generate(cluster);
} else {
++globalSignal->count;
}
}
switch (type) {
case SignalType::kLocal:
if (localSignal.get() == nullptr) {
localThrottleCache_.throttleCache->generate(
localThrottleCacheKey(cluster, directory));
} else {
++localSignal->count;
}
return;
case SignalType::kGlobal:
if (globalSignal.get() == nullptr) {
globalThrottleCache_.throttleCache->generate(cluster);
} else {
++globalSignal->count;
}
return;
case SignalType::kNetwork:
if (networkSignal.get() == nullptr) {
networkThrottleCache_.throttleCache->generate(cluster);
} else {
++networkSignal->count;
}
return;
default:
VELOX_UNREACHABLE("Invalid type provided: {}", signalTypeName(type));
};
}

uint64_t Throttler::calculateBackoffDurationAndUpdateThrottleCache(
SignalType type,
const std::string& cluster,
const std::string& directoy) {
const std::string& directory) {
std::lock_guard<std::mutex> l(mu_);
// Gets maximum count of local and global throttle signals in Cache.
auto localThrottleCachePtr =
localThrottleCache_->get(localThrottleCacheKey(cluster, directoy));
int64_t localThrottleCount =
// Gets maximum count of local, global, and network throttle signals in Cache.
auto localThrottleCachePtr = localThrottleCache_.throttleCache->get(
localThrottleCacheKey(cluster, directory));
const int64_t localThrottleCount =
(localThrottleCachePtr.get() != nullptr ? localThrottleCachePtr->count
: 0) +
(type == SignalType::kLocal ? 1 : 0) - minLocalThrottledSignalsToBackoff_;
auto globalThrottleCachePtr = globalThrottleCache_->get(cluster);
(type == SignalType::kLocal ? 1 : 0) -
localThrottleCache_.minThrottledSignalsToBackOff;

auto globalThrottleCachePtr =
globalThrottleCache_.throttleCache->get(cluster);
const int64_t globalThrottleCount =
(globalThrottleCachePtr.get() != nullptr ? globalThrottleCachePtr->count
: 0) +
(type == SignalType::kGlobal ? 1 : 0) -
minGlobalThrottledSignalsToBackoff_;
globalThrottleCache_.minThrottledSignalsToBackOff;

auto networkThrottleCachePtr =
networkThrottleCache_.throttleCache->get(cluster);
const int64_t networkThrottleCount =
(networkThrottleCachePtr.get() != nullptr ? networkThrottleCachePtr->count
: 0) +
(type == SignalType::kNetwork ? 1 : 0) -
networkThrottleCache_.minThrottledSignalsToBackOff;

// Update throttling signal cache.
updateThrottleCacheLocked(
type, cluster, directoy, localThrottleCachePtr, globalThrottleCachePtr);
type,
cluster,
directory,
localThrottleCachePtr,
globalThrottleCachePtr,
networkThrottleCachePtr);

const int64_t throttleAttempts =
std::max(localThrottleCount, globalThrottleCount);
const int64_t throttleAttempts = std::max(
networkThrottleCount, std::max(localThrottleCount, globalThrottleCount));

// Calculates the delay with exponential backoff
if (throttleAttempts <= 0) {
Expand All @@ -229,4 +271,21 @@ Throttler::ThrottleSignalGenerator::operator()(
const void* /*unused*/) {
return std::unique_ptr<ThrottleSignal>(new ThrottleSignal{1});
}

/* static */
Throttler::ThrottleSignalCache Throttler::maybeMakeThrottleSignalCache(
bool enabled,
uint32_t minThrottledSignals,
uint32_t maxCacheEntries,
uint32_t cacheTTLMs) {
return {
.throttleCache = !enabled
? nullptr
: std::make_unique<ThrottleSignalFactory>(
std::make_unique<SimpleLRUCache<std::string, ThrottleSignal>>(
maxCacheEntries, cacheTTLMs),
std::make_unique<ThrottleSignalGenerator>()),
.minThrottledSignalsToBackOff = minThrottledSignals,
};
}
} // namespace facebook::velox::dwio::common
31 changes: 26 additions & 5 deletions velox/dwio/common/Throttler.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ class Throttler {
/// backoff.
uint32_t minGlobalThrottledSignals;

/// The minimum number of received network throttled signals before starting
/// backoff.
uint32_t minNetworkThrottledSignals;

/// The maximum number of entries in the throttled signal cache. There is
/// one cache for each throttle signal type. For local throttle signal
/// cache, each cache entry corresponds to a unqiue file direcotry in a
Expand All @@ -68,6 +72,7 @@ class Throttler {
static constexpr double kBackoffScaleFactorDefault{2.0};
static constexpr uint32_t kMinLocalThrottledSignalsDefault{1'000};
static constexpr uint32_t kMinGlobalThrottledSignalsDefault{100'000};
static constexpr uint32_t kMinNetworkThrottledSignal{1'000};
static constexpr uint32_t kMaxCacheEntriesDefault{10'000};
static constexpr uint32_t kCacheTTLMsDefault{3 * 60 * 1'000};

Expand All @@ -78,6 +83,7 @@ class Throttler {
double backoffScaleFactor = kBackoffScaleFactorDefault,
uint32_t minLocalThrottledSignals = kMinLocalThrottledSignalsDefault,
uint32_t minGlobalThrottledSignals = kMinGlobalThrottledSignalsDefault,
uint32_t minNetworkThrottledSignals = kMinNetworkThrottledSignal,
uint32_t maxCacheEntries = kMaxCacheEntriesDefault,
uint32_t cacheTTLMs = kCacheTTLMsDefault);

Expand All @@ -88,6 +94,7 @@ class Throttler {
struct Stats {
std::atomic_uint64_t localThrottled{0};
std::atomic_uint64_t globalThrottled{0};
std::atomic_uint64_t networkThrottled{0};
/// Counts the backoff delay in milliseconds.
io::IoCounter backOffDelay;
};
Expand All @@ -104,6 +111,8 @@ class Throttler {
kLocal,
/// A cluster-wise throttled signal.
kGlobal,
/// Network throttled signal.
kNetwork,
};
static std::string signalTypeName(SignalType type);

Expand Down Expand Up @@ -174,25 +183,37 @@ class Throttler {
using ThrottleSignalFactory = facebook::velox::
CachedFactory<std::string, ThrottleSignal, ThrottleSignalGenerator>;

struct ThrottleSignalCache {
std::unique_ptr<ThrottleSignalFactory> throttleCache;
uint32_t minThrottledSignalsToBackOff;
};

void updateThrottleCacheLocked(
SignalType type,
const std::string& cluster,
const std::string& directory,
CachedThrottleSignalPtr& localSignal,
CachedThrottleSignalPtr& globalSignal);
CachedThrottleSignalPtr& globalSignal,
CachedThrottleSignalPtr& networkSignal);

void updateThrottleStats(SignalType type, uint64_t backoffDelayMs);

static ThrottleSignalCache maybeMakeThrottleSignalCache(
bool enabled,
uint32_t minThrottledSignals,
uint32_t maxCacheEntries,
uint32_t cacheTTLMs);

static const uint64_t kNoBackOffMs_{0};

const bool throttleEnabled_;
const uint64_t minThrottleBackoffDurationMs_;
const uint64_t maxThrottleBackoffDurationMs_;
const double backoffScaleFactor_;
const uint32_t minLocalThrottledSignalsToBackoff_;
const uint32_t minGlobalThrottledSignalsToBackoff_;
const std::unique_ptr<ThrottleSignalFactory> localThrottleCache_;
const std::unique_ptr<ThrottleSignalFactory> globalThrottleCache_;

const ThrottleSignalCache localThrottleCache_;
const ThrottleSignalCache globalThrottleCache_;
const ThrottleSignalCache networkThrottleCache_;

mutable std::mutex mu_;

Expand Down
Loading

0 comments on commit 8cc4f48

Please sign in to comment.