Skip to content

Commit

Permalink
fair_queue: make the fair_group token grabbing discipline more fair
Browse files Browse the repository at this point in the history
The current design of `fair_group` isn't fair enough to shards.
During contention, the group will be -- aproximately -- taking requests
from shards one-by-one, in round robin.

This guarantees that each contender will dispatch an equal *number* of
requests. This is some kind of fairness, but it's not the kind we want,
probably ever.

A better kind of fairness is that under contention, each shard should
be guaranteed `1/nr_shards` of the disk's IOPS and/or `1/nr_shards` of
byte-bandwidth, whichever dimension it pressures more.
This is needed so that each shard can be relied on to sustain a certain
rate of requests -- the lower bound of the slowest shard's throughput
usually dictates the throughput of the entire cluster.

But those two kinds of fairness are only the same if all IO requests have
the same size and direction. Otherwise they can be drastically different.

With the current design it's easy to create a situation where a shard receives
an arbitrarily small fraction of both IOPS and bandwidth, despite being
IO-bound. (Example: a node with X shards, where one shard spams only very small
requests and other shards spams only big requests).

This is a problem in practice. In ScyllaDB, we observed IO starvation of
some shards during realistic workloads. While they require some workload
asymmetry to occur, even small asymmetries can cause serious unfairness to occur.
(For example, a shard which receives 6% more of database queries than other
shards can be starved to less than 50% of its fair share of IOPS and/or
bandwidth -- because each of those 1 kiB queries is "fairly" matched with
16x costlier 128 kiB low-priority batch IO requests on other shards).

To improve this, `fair_group` needs a different queueing discipline.
There are many possible ways, but this patch chooses the one which is relatively
the most similar to the current one. The main idea is that we still rely on the
"approximate round robin" of token queue as the basis for fairness, but we reserve
a fixed-size batch of tokens at a time, rather than a fixed-size (i.e. 1) batch
of _requests_ at a time. This turns the discipline from
approximately-request-fair to approximately-token-fair, which is what we want.

The implementation details are non-trivial, though, and should be carefully reviewed.
  • Loading branch information
michoecho committed Jan 21, 2025
1 parent 7781c4c commit 2330929
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 60 deletions.
50 changes: 31 additions & 19 deletions include/seastar/core/fair_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ public:
capacity_t per_tick_grab_threshold() const noexcept { return _per_tick_threshold; }
capacity_t grab_capacity(capacity_t cap) noexcept;
clock_type::time_point replenished_ts() const noexcept { return _token_bucket.replenished_ts(); }
void refund_tokens(capacity_t) noexcept;
void replenish_capacity(clock_type::time_point now) noexcept;
void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept;

Expand Down Expand Up @@ -331,25 +332,25 @@ private:
size_t _nr_classes = 0;
capacity_t _last_accumulated = 0;

/*
* When the shared capacity os over the local queue delays
* further dispatching untill better times
*
* \head -- the value group head rover is expected to cross
* \cap -- the capacity that's accounted on the group
*
* The last field is needed to "rearm" the wait in case
* queue decides that it wants to dispatch another capacity
* in the middle of the waiting
*/
// _pending represents a reservation of tokens from the bucket.
//
// In the "dispatch timeline" defined by the growing bucket head of the group,
// tokens in the range [_pending.head - cap, _pending.head) belong
// to this queue.
//
// For example, if:
// _group._token_bucket.head == 300
// _pending.head == 700
// _pending.cap == 500
// then the reservation is [200, 700), 100 tokens are ready to be dispatched by this queue,
// and another 400 tokens are going to be appear soon. (And after that, this queue
// will be able to make its next reservation).
struct pending {
capacity_t head;
capacity_t cap;

pending(capacity_t t, capacity_t c) noexcept : head(t), cap(c) {}
capacity_t head = 0;
capacity_t cap = 0;
};
pending _pending;

std::optional<pending> _pending;
// Total capacity of all requests waiting in the queue.
capacity_t _queued_capacity = 0;

Expand All @@ -359,9 +360,20 @@ private:
void plug_priority_class(priority_class_data& pc) noexcept;
void unplug_priority_class(priority_class_data& pc) noexcept;

enum class grab_result { grabbed, cant_preempt, pending };
grab_result grab_capacity(const fair_queue_entry& ent) noexcept;
grab_result grab_pending_capacity(const fair_queue_entry& ent) noexcept;
// Replaces _pending with a new reservation starting at the current
// group bucket tail.
void grab_capacity(capacity_t cap) noexcept;
// Shaves off the fulfilled frontal part from `_pending` (if any),
// and returns the fulfilled tokens in `ready_tokens`.
// Sets `our_turn_has_come` to the truth value of "`_pending` is empty or
// there are no unfulfilled reservations (from other shards) earlier than `_pending`".
//
// Assumes that `_group.maybe_replenish_capacity()` was called recently.
struct reap_result {
capacity_t ready_tokens;
bool our_turn_has_come;
};
reap_result reap_pending_capacity() noexcept;
public:
/// Constructs a fair queue with configuration parameters \c cfg.
///
Expand Down
4 changes: 4 additions & 0 deletions include/seastar/util/shared_token_bucket.hh
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ public:
_rovers.release(tokens);
}

void refund(T tokens) noexcept {
fetch_add(_rovers.head, tokens);
}

void replenish(typename Clock::time_point now) noexcept {
auto ts = _replenished.load(std::memory_order_relaxed);

Expand Down
132 changes: 91 additions & 41 deletions src/core/fair_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ void fair_group::replenish_capacity(clock_type::time_point now) noexcept {
_token_bucket.replenish(now);
}

void fair_group::refund_tokens(capacity_t cap) noexcept {
_token_bucket.refund(cap);
}

void fair_group::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept {
auto now = clock_type::now();
auto extra = _token_bucket.accumulated_in(now - local_ts);
Expand Down Expand Up @@ -223,35 +227,22 @@ void fair_queue::unplug_class(class_id cid) noexcept {
unplug_priority_class(*_priority_classes[cid]);
}

auto fair_queue::grab_pending_capacity(const fair_queue_entry& ent) noexcept -> grab_result {
_group.maybe_replenish_capacity(_group_replenish);

if (_group.capacity_deficiency(_pending->head)) {
return grab_result::pending;
}

capacity_t cap = ent._capacity;
if (cap > _pending->cap) {
return grab_result::cant_preempt;
auto fair_queue::reap_pending_capacity() noexcept -> reap_result {
auto result = reap_result{.ready_tokens = 0, .our_turn_has_come = true};
if (_pending.cap) {
capacity_t deficiency = _group.capacity_deficiency(_pending.head);
result.our_turn_has_come = deficiency <= _pending.cap;
if (result.our_turn_has_come) {
result.ready_tokens = _pending.cap - deficiency;
_pending.cap = deficiency;
}
}

_pending.reset();
return grab_result::grabbed;
return result;
}

auto fair_queue::grab_capacity(const fair_queue_entry& ent) noexcept -> grab_result {
if (_pending) {
return grab_pending_capacity(ent);
}

capacity_t cap = ent._capacity;
auto fair_queue::grab_capacity(capacity_t cap) noexcept -> void {
capacity_t want_head = _group.grab_capacity(cap);
if (_group.capacity_deficiency(want_head)) {
_pending.emplace(want_head, cap);
return grab_result::pending;
}

return grab_result::grabbed;
_pending = pending{want_head, cap};
}

void fair_queue::register_priority_class(class_id id, uint32_t shares) {
Expand Down Expand Up @@ -309,7 +300,7 @@ void fair_queue::notify_request_cancelled(fair_queue_entry& ent) noexcept {
}

fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept {
if (_pending) {
if (_pending.cap) {
/*
* We expect the disk to release the ticket within some time,
* but it's ... OK if it doesn't -- the pending wait still
Expand All @@ -320,37 +311,91 @@ fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept
* which's sub-optimal. The expectation is that we think disk
* works faster, than it really does.
*/
auto over = _group.capacity_deficiency(_pending->head);
auto over = _group.capacity_deficiency(_pending.head);
auto ticks = _group.capacity_duration(over);
return std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::microseconds>(ticks);
}

return std::chrono::steady_clock::time_point::max();
}

// This function is called by the shard on every poll.
// It picks up tokens granted by the group, spends available tokens on IO dispatches,
// and makes a reservation for more tokens, if needed.
//
// Reservations are done in batches of size `_group.per_tick_grab_threshold()`.
// During contention, in an average moment in time each contending shard can be expected to
// be holding a reservation of such size after the current head of the token bucket.
//
// A shard which is currently calling `dispatch_requests()` can expect a latency
// of at most `nr_contenders * (_group.per_tick_grab_threshold() + max_request_cap)` before its next reservation is fulfilled.
// If a shard calls `dispatch_requests()` at least once per X total tokens, it should receive bandwidth
// of at least `_group.per_tick_grab_threshold() / (X + nr_contenders * (_group.per_tick_grab_threshold() + max_request_cap))`.
//
// A shard which is polling continuously should be able to grab its fair share of the disk for itself.
//
// Given a task quota of 500us and IO latency goal of 750 us,
// a CPU-starved shard should still be able to grab at least ~30% of its fair share in the worst case.
// This is far from ideal, but it's something.
void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
capacity_t dispatched = 0;
boost::container::small_vector<priority_class_ptr, 2> preempt;
_group.maybe_replenish_capacity(_group_replenish);

const uint64_t max_unamortized_reservation = _group.per_tick_grab_threshold();
auto available = reap_pending_capacity();

while (!_handles.empty() && (dispatched < _group.per_tick_grab_threshold())) {
while (!_handles.empty()) {
priority_class_data& h = *_handles.top();
if (h._queue.empty() || !h._plugged) {
pop_priority_class(h);
continue;
}

auto& req = h._queue.front();
auto gr = grab_capacity(req);
if (gr == grab_result::pending) {
if (req._capacity <= available.ready_tokens) {
// We can dispatch the request immediately.
// We do that after the if-else.
} else if (req._capacity <= available.ready_tokens + _pending.cap || _pending.cap >= max_unamortized_reservation) {
// We can't dispatch the request yet, but we already have a pending reservation
// which will provide us with enough tokens for it eventually,
// or our reservation is already max-size and we can't reserve more tokens until we reap some.
// So we should just wait.
// We return any immediately-available tokens back to `_pending`
// and we bail. The next `dispatch_request` will again take those tokens
// (possibly joined by some newly-granted tokens) and retry.
_pending.cap += available.ready_tokens;
available.ready_tokens = 0;
break;
}

if (gr == grab_result::cant_preempt) {
pop_priority_class(h);
preempt.emplace_back(&h);
} else if (available.our_turn_has_come) {
// The current reservation isn't enough to fulfill the next request,
// and we can cancel it (because `our_turn_has_come == true`) and make a bigger one
// (because `_pending.cap < can_grab_this_tick`).
// So we cancel it and do a bigger one.

// We do token recycling here: we return the tokens which we have available, and the tokens we have reserved
// immediately after the group head, and we return them to the bucket, immediately grabbing the same amount from the tail.
// This is neutral to fairness. The bandwidth we consume is still influenced only by the
// `max_unarmortized_reservation` portions.
auto recycled = available.ready_tokens + _pending.cap;
capacity_t grab_amount = std::min<capacity_t>(recycled + max_unamortized_reservation, _queued_capacity);
// There's technically nothing wrong with grabbing more than `_group.maximum_capacity()`,
// but the token bucket has an assert for that, and its a reasonable expectation, so let's respect that limit.
// It shouldn't matter in practice.
grab_amount = std::min<capacity_t>(grab_amount, _group.maximum_capacity());
_group.refund_tokens(recycled);
grab_capacity(grab_amount);
available = reap_pending_capacity();
continue;
} else {
// We can already see that our current reservation is going to be insufficient
// for the highest-priority request as of now. But since group head didn't touch
// it yet, there's no good way to cancel it, so we have no choice but to wait
// until the touch time.
assert(available.ready_tokens == 0);
break;
}

available.ready_tokens -= req._capacity;

_last_accumulated = std::max(h._accumulated, _last_accumulated);
pop_priority_class(h);
h._queue.pop_front();
Expand All @@ -376,7 +421,6 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
}
h._accumulated += req_cost;
h._pure_accumulated += req_cap;
dispatched += req_cap;
_queued_capacity -= req_cap;

cb(req);
Expand All @@ -386,9 +430,15 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
}
}

for (auto&& h : preempt) {
push_priority_class(*h);
}
assert(_handles.empty() || available.ready_tokens == 0);

// Note: if IO cancellation happens, it's possible that we are still holding some tokens in `ready` here.
//
// We could refund them to the bucket, but permanently refunding tokens (as opposed to only
// "rotating" the bucket like the earlier refund() calls in this function do) is theoretically
// unpleasant (it can bloat the bucket beyond its size limit, and its hard to write a correct
// countermeasure for that), so we just discard the tokens. There's no harm in it, IO cancellation
// can't have resource-saving guarantees anyway.
}

std::vector<seastar::metrics::impl::metric_definition_impl> fair_queue::metrics(class_id c) {
Expand Down

0 comments on commit 2330929

Please sign in to comment.