Skip to content

Commit

Permalink
Merge #131432
Browse files Browse the repository at this point in the history
131432: rac2: exclude replicas with send-queue from quorum r=kvoli a=sumeerbhola

Also, replicas not in StateReplicate are excluded from the quorum,
as outlined in
https://docs.google.com/document/d/1ROE1lpRVhfLxP39rs8J5mKoUfgsoiJyEkUkoNPhBJUk/edit#heading=h.7c7mxg7g2gsy

Informs #130433

Epic: CRDB-37515

Release note: None

Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
craig[bot] and sumeerbhola committed Sep 27, 2024
2 parents 0ea0320 + 7bc1086 commit 467fd26
Show file tree
Hide file tree
Showing 7 changed files with 403 additions and 30 deletions.
106 changes: 84 additions & 22 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ import (
//
// None of the methods are called with Replica.mu held. The caller and callee
// should order their mutexes before Replica.mu.
//
// RangeController dynamically switches between push and pull mode based on
// RaftEvent handling. In general, the code here is oblivious to the fact that
// WaitForEval in push mode will only be called for elastic work. However,
// there are emergent behaviors that rely on this behavior (which are noted in
// comments). Unit tests can run RangeController in push mode and call
// WaitForEval for regular work.
type RangeController interface {
// WaitForEval seeks admission to evaluate a request at the given priority.
// This blocks until there are positive tokens available for the request to
Expand Down Expand Up @@ -358,6 +365,8 @@ type rangeController struct {
nextRaftIndex uint64

mu struct {
// All the fields in this struct are modified while holding raftMu and
// this mutex. So readers can hold either mutex.
syncutil.RWMutex

// State for waiters. When anything in voterSets or nonVoterSets changes,
Expand All @@ -378,12 +387,23 @@ type voterStateForWaiters struct {
stateForWaiters
isLeader bool
isLeaseHolder bool
// When hasSendQ is true, the voter is not included as part of the quorum.
hasSendQ bool
}

// stateForWaiters informs whether WaitForEval is required to wait for
// eval-tokens for a replica.
type stateForWaiters struct {
replicaID roachpb.ReplicaID
replicaID roachpb.ReplicaID
// !isStateReplicate replicas are not required to be waited on for
// evaluating elastic work.
//
// For voters, we ensure the following invariant: !isStateReplicate =>
// hasSendQ. Since, hasSendQ voters are not included in the quorum, this
// ensures that !isStateReplicate are not in the quorum. This is done since
// voters that are down will tend to have all the eval tokens returned by
// their streams, so will have positive eval tokens. We don't want to
// erroneously think that these are actually part of the quorum.
isStateReplicate bool
evalTokenCounter *tokenCounter
}
Expand All @@ -404,7 +424,7 @@ func NewRangeController(
}
rc.mu.waiterSetRefreshCh = make(chan struct{})
rc.updateReplicaSet(ctx, init.ReplicaSet)
rc.updateWaiterSets()
rc.updateWaiterSetsRaftMuLocked()
return rc
}

Expand Down Expand Up @@ -448,17 +468,17 @@ retry:
// First check the voter set, which participate in quorum.
for _, v := range vs {
available, handle := v.evalTokenCounter.TokensAvailable(wc)
if available {
if available && !v.hasSendQ {
votersHaveEvalTokensCount++
continue
}

// Don't have eval tokens, and have a handle.
// Don't have eval tokens, and have a handle OR have a send-queue and no handle.
handleInfo := tokenWaitingHandleInfo{
handle: handle,
requiredWait: v.isLeader || v.isLeaseHolder ||
(waitForAllReplicateHandles && v.isStateReplicate),
partOfQuorum: true,
partOfQuorum: !v.hasSendQ,
}
handles = append(handles, handleInfo)
if !requiredWait && handleInfo.requiredWait {
Expand Down Expand Up @@ -499,6 +519,11 @@ retry:
}
if remainingForQuorum > 0 || requiredWait {
var state WaitEndState
// We may call WaitForEval with a remainingForQuorum count higher than
// the number of handles that have partOfQuorum set to true. This can
// happen when not enough replicas are in StateReplicate, or not enough
// have no send-queue. This is acceptable in that the callee will end up
// waiting on the refreshCh.
state, scratch = WaitForEval(ctx, refreshCh, handles, remainingForQuorum, scratch)
switch state {
case WaitSuccess:
Expand Down Expand Up @@ -732,12 +757,12 @@ func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e Ra
}
shouldWaitChange = rs.handleReadyState(
ctx, info, eventForReplica.nextRaftIndex, eventForReplica.recreateSendStream) || shouldWaitChange
rs.handleReadyEntries(ctx, eventForReplica)
shouldWaitChange = rs.handleReadyEntries(ctx, eventForReplica) || shouldWaitChange
}
// If there was a quorum change, update the voter sets, triggering the
// refresh channel for any requests waiting for eval tokens.
if shouldWaitChange {
rc.updateWaiterSets()
rc.updateWaiterSetsRaftMuLocked()
}
return nil
}
Expand Down Expand Up @@ -781,7 +806,7 @@ func (rc *rangeController) MaybeSendPingsRaftMuLocked() {
// Requires replica.raftMu to be held.
func (rc *rangeController) SetReplicasRaftMuLocked(ctx context.Context, replicas ReplicaSet) error {
rc.updateReplicaSet(ctx, replicas)
rc.updateWaiterSets()
rc.updateWaiterSetsRaftMuLocked()
return nil
}

Expand All @@ -796,7 +821,7 @@ func (rc *rangeController) SetLeaseholderRaftMuLocked(
}
log.VInfof(ctx, 1, "r%v setting range leaseholder replica_id=%v", rc.opts.RangeID, replica)
rc.leaseholder = replica
rc.updateWaiterSets()
rc.updateWaiterSetsRaftMuLocked()
}

// CloseRaftMuLocked closes the range controller.
Expand Down Expand Up @@ -882,7 +907,7 @@ func (rc *rangeController) updateReplicaSet(ctx context.Context, newSet ReplicaS
rc.replicaSet = newSet
}

func (rc *rangeController) updateWaiterSets() {
func (rc *rangeController) updateWaiterSetsRaftMuLocked() {
rc.mu.Lock()
defer rc.mu.Unlock()

Expand All @@ -908,9 +933,10 @@ func (rc *rangeController) updateWaiterSets() {
isNew := r.IsVoterNewConfig()

rs := rc.replicaMap[r.ReplicaID]
isStateReplicate, hasSendQ := rs.isStateReplicateAndSendQ()
waiterState := stateForWaiters{
replicaID: r.ReplicaID,
isStateReplicate: rs.isStateReplicate(),
isStateReplicate: isStateReplicate,
evalTokenCounter: rs.evalTokenCounter,
}

Expand All @@ -927,6 +953,7 @@ func (rc *rangeController) updateWaiterSets() {
stateForWaiters: waiterState,
isLeader: r.ReplicaID == rc.opts.LocalReplicaID,
isLeaseHolder: r.ReplicaID == rc.leaseholder,
hasSendQ: hasSendQ,
}
if isOld {
voterSets[0] = append(voterSets[0], vsfw)
Expand Down Expand Up @@ -1129,9 +1156,24 @@ func (rs *replicaState) createReplicaSendStream(

}

func (rs *replicaState) isStateReplicate() bool {
// probeRecentlyReplicate is also included in this state.
return rs.sendStream != nil
func (rs *replicaState) isStateReplicateAndSendQ() (isStateReplicate, hasSendQ bool) {
if rs.sendStream == nil {
return false, true
}
rs.sendStream.mu.Lock()
defer rs.sendStream.mu.Unlock()
isStateReplicate = rs.sendStream.mu.connectedState == replicate
if isStateReplicate {
hasSendQ = !rs.sendStream.isEmptySendQueueLocked()
} else {
// For WaitForEval, we treat probeRecentlyNoSendQ as having a send-queue
// and not part of the quorum. We don't want to keep evaluating and pile
// up work. Note, that this is the exact opposite of how
// probeRecentlyNoSendQ behaves wrt contributing to the quorum when
// deciding to force-flush.
hasSendQ = true
}
return isStateReplicate, hasSendQ
}

type entryFCState struct {
Expand Down Expand Up @@ -1168,7 +1210,7 @@ func getEntryFCStateOrFatal(ctx context.Context, entry raftpb.Entry) entryFCStat

func (rs *replicaState) handleReadyEntries(
ctx context.Context, eventForReplica raftEventForReplica,
) {
) (transitionedSendQStateAsVoter bool) {
if rs.sendStream == nil {
return
}
Expand All @@ -1178,7 +1220,7 @@ func (rs *replicaState) handleReadyEntries(
if rs.sendStream.mu.connectedState != replicate {
return
}
rs.sendStream.handleReadyEntriesLocked(ctx, eventForReplica)
return rs.sendStream.handleReadyEntriesLocked(ctx, eventForReplica)
}

// handleReadyState handles state management for the replica based on the
Expand All @@ -1194,7 +1236,7 @@ func (rs *replicaState) handleReadyState(
// We have already closed the stream, nothing to do.
return false
}
if shouldClose := func() (should bool) {
if shouldClose := func() (shouldClose bool) {
now := rs.parent.opts.Clock.PhysicalTime()
rs.sendStream.mu.Lock()
defer rs.sendStream.mu.Unlock()
Expand All @@ -1204,22 +1246,23 @@ func (rs *replicaState) handleReadyState(
// The replica has been in StateProbe for at least
// probeRecentlyReplicateDuration (default 1s) second, close the
// stream.
should = true
shouldClose = true
} else if state != probeRecentlyReplicate {
rs.sendStream.changeToProbeLocked(ctx, now)
// probeRecentlyReplicate is considered to have a send-queue, so
// waiting may need to change.
shouldWaitChange = true
}
return should
return shouldClose
}(); shouldClose {
rs.closeSendStream(ctx)
shouldWaitChange = true
}

case tracker.StateReplicate:
if rs.sendStream == nil {
if !recreateSendStream {
panic(errors.AssertionFailedf("in StateReplica, but recreateSendStream is false"))
}
shouldWaitChange = true
}
if rs.sendStream != nil && recreateSendStream {
// This includes both (a) inconsistencies, and (b) transition from
Expand All @@ -1228,6 +1271,8 @@ func (rs *replicaState) handleReadyState(
}
if rs.sendStream == nil {
rs.createReplicaSendStream(ctx, info.Next, nextRaftIndex)
// Have stale send-queue state.
shouldWaitChange = true
}

case tracker.StateSnapshot:
Expand Down Expand Up @@ -1262,7 +1307,8 @@ func (rss *replicaSendStream) closeLocked(ctx context.Context) {

func (rss *replicaSendStream) handleReadyEntriesLocked(
ctx context.Context, event raftEventForReplica,
) {
) (transitionedSendQStateAsVoter bool) {
wasEmptySendQ := rss.isEmptySendQueueLocked()
if n := len(event.sendingEntries); n > 0 {
if event.sendingEntries[0].index != rss.mu.sendQueue.indexToSend {
panic(errors.AssertionFailedf("first send entry %d does not match indexToSend %d",
Expand Down Expand Up @@ -1338,6 +1384,20 @@ func (rss *replicaSendStream) handleReadyEntriesLocked(
rss.mu.eval.tokensDeducted[wc] += tokens
}
}
// NB: we don't special case to an empty send-queue in push mode, where Raft
// is responsible for causing this send-queue. Raft does not keep track of
// whether the send-queues are causing a loss of quorum, so in the worst
// case we could stop evaluating because of a majority of voters having a
// send-queue. But in push mode only elastic work will be subject to
// replication admission control, and regular work will not call
// WaitForEval, so we accept this behavior.
transitionedSendQStateAsVoter =
rss.parent.desc.IsAnyVoter() && (wasEmptySendQ != rss.isEmptySendQueueLocked())
return transitionedSendQStateAsVoter
}

func (rss *replicaSendStream) isEmptySendQueueLocked() bool {
return rss.mu.sendQueue.indexToSend == rss.mu.sendQueue.nextRaftIndex
}

func (rss *replicaSendStream) changeToProbeLocked(ctx context.Context, now time.Time) {
Expand Down Expand Up @@ -1444,6 +1504,8 @@ type connectedState uint32
//
// Initial states: replicate
// State transitions: replicate <=> probeRecentlyReplicate
//
// TODO(sumeer): replace probeRecentlyReplicate with probeRecentlyNoSendQ.
const (
replicate connectedState = iota
probeRecentlyReplicate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ func scanRanges(t *testing.T, input string) []testingRange {
parts[3] = strings.TrimSpace(parts[3])
require.True(t, strings.HasPrefix(parts[3], "next_raft_index="))
parts[3] = strings.TrimPrefix(strings.TrimSpace(parts[3]), "next_raft_index=")
nextRaftIndex, err = strconv.Atoi(parts[2])
nextRaftIndex, err = strconv.Atoi(parts[3])
require.NoError(t, err)

replicas = append(replicas, testingRange{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Intialize a range with voters on s1,s2 and s3. The local replica and
# Initialize a range with voters on s1,s2 and s3. The local replica and
# leaseholder will be s1. The leaseholder is denoted by the '*' suffix. Also
# set all streams to initially have 0 tokens and a limit of 1 token to simplify
# the test, as evaluation requests only wait for positive tokens.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# This test excercises calling WaitForEval with a variety of replica sets which
# This test exercises calling WaitForEval with a variety of replica sets which
# include non-voters in various states.
#
# Initialize the test state with a single range, holding two voters and one
Expand Down Expand Up @@ -105,7 +105,8 @@ t1/s6: eval reg=+1 B/+1 B ela=+0 B/+1 B
# Start a low priority evaluation 'c'. This should not complete until:
# (1) The leader has elastic tokens (already true)
# (2) The leaseholder has elastic tokens (already true)
# (3) A quorum of voting replicas have elastic tokens e.g., s1 + (s2|s3)
# (3) A quorum of voting replicas have elastic tokens and are in
# StateReplicate e.g., s1 + (s2|s3)
# (4) All non-voting replicas in StateReplicate have elastic tokens
wait_for_eval name=c range_id=1 pri=LowPri
----
Expand Down Expand Up @@ -302,9 +303,8 @@ range_id=1 tenant_id={1} local_replica_id=1
name=c pri=low-pri done=true waited=true err=<nil>
name=d pri=high-pri done=false waited=false err=<nil>

# Lastly, add tokens back the voting replica in StateSnapshot. Despite being in
# StateSnapshot, the replica stream having tokens is sufficient to complete the
# evaluation 'd', as it completes a quorum (s1+s3/3).
# Add tokens back the voting replica in StateSnapshot. Being in StateSnapshot,
# it is not allowed to be part of the quorum.
adjust_tokens
store_id=3 pri=HighPri tokens=1
----
Expand All @@ -321,6 +321,32 @@ t1/s5: eval reg=+1 B/+1 B ela=+1 B/+1 B
t1/s6: eval reg=+1 B/+1 B ela=+1 B/+1 B
send reg=+1 B/+1 B ela=+1 B/+1 B

check_state
----
range_id=1 tenant_id={1} local_replica_id=1
name=a pri=low-pri done=true waited=true err=<nil>
name=b pri=low-pri done=true waited=true err=<nil>
name=c pri=low-pri done=true waited=true err=<nil>
name=d pri=high-pri done=false waited=false err=<nil>

# Add tokens back to the voting replica in StateReplicate. This is sufficient
# to complete evaluation 'd', as it completes a quorum (s1+s2/3).
adjust_tokens
store_id=2 pri=HighPri tokens=1
----
t1/s1: eval reg=+1 B/+1 B ela=+1 B/+1 B
send reg=+1 B/+1 B ela=+1 B/+1 B
t1/s2: eval reg=+1 B/+1 B ela=+1 B/+1 B
send reg=+1 B/+1 B ela=+1 B/+1 B
t1/s3: eval reg=+1 B/+1 B ela=+1 B/+1 B
send reg=+1 B/+1 B ela=+1 B/+1 B
t1/s4: eval reg=+1 B/+1 B ela=+1 B/+1 B
send reg=+1 B/+1 B ela=+1 B/+1 B
t1/s5: eval reg=+1 B/+1 B ela=+1 B/+1 B
send reg=+1 B/+1 B ela=+1 B/+1 B
t1/s6: eval reg=+1 B/+1 B ela=+1 B/+1 B
send reg=+1 B/+1 B ela=+1 B/+1 B

check_state
----
range_id=1 tenant_id={1} local_replica_id=1
Expand Down
Loading

0 comments on commit 467fd26

Please sign in to comment.