Skip to content

Commit

Permalink
Merge #130370
Browse files Browse the repository at this point in the history
130370: kv: synchronize between leader lease acquisition and leadership transfers r=nvanbenschoten a=nvanbenschoten

Fixes #129807.

This commit adds synchronization between leader lease acquisition and leadership transfers, ensuring that a raft leader holding a leader lease will never transfer leadership away. Doing so could lead to a lease expiration regression, as the leadership term would end before lead support had expired.

This property is built on top of two new restrictions:
1. a raft leader will never begin to acquire (or promote to) a leader lease if is in the process of transferring away raft leadership.
2. a raft leader will never begin to transfer away raft leadership if it is in the process of acquire a lease.

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Sep 25, 2024
2 parents d926d5e + 8d1ae3b commit b92a8ea
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 54 deletions.
30 changes: 21 additions & 9 deletions pkg/kv/kvserver/leases/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,15 +370,27 @@ func leaseType(st Settings, i BuildInput) roachpb.LeaseType {
// construct an epoch-based lease.
return roachpb.LeaseEpoch
}
if i.RaftStatus.RaftState != raft.StateLeader {
// If this range wants to use a leader lease, but it is not currently the
// raft leader, we construct an expiration-based lease. It is highly likely
// that the lease acquisition will be rejected before being proposed by the
// lease safety checks in verifyAcquisition. If not (e.g. because the
// kv.lease.reject_on_leader_unknown.enabled setting is set to a non-default
// value of false), we may end up with an expiration-based lease, which is
// safe and can be upgraded to a leader lease when the range becomes the
// leader.
if i.RaftStatus.RaftState != raft.StateLeader || i.RaftStatus.LeadTransferee != raft.None {
// If this range wants to use a leader lease, but the local replica is not
// currently the raft leader, we construct an expiration-based lease. It is
// highly likely that the lease acquisition will be rejected before being
// proposed by the lease safety checks in verifyAcquisition. If not (e.g.
// because the kv.lease.reject_on_leader_unknown.enabled setting is set to
// the default value of false), the local replica may end up with an
// expiration-based lease, which is safe and can be upgraded to a leader
// lease when the replica becomes the leader.
//
// Similarly, if the replica is the raft leader but it is in the process of
// transferring away its leadership, we construct an expiration-based lease
// instead of a leader lease, as a precaution. This ensures that a poorly
// timed leader lease acquisition does not race with a leadership transfer
// and cause a leader lease to be prematurely invalidated when the leader
// transfer completes and leadership is stolen away, before leader support
// expires. The race cannot occur in the other direction (lease acquisition
// in-progress, then leadership transfer initiated) because a raft leader
// will only initiate a leadership transfer if it does not currently hold
// the lease and is not in the process of acquiring it. The two synchronize
// on the replica mutex.
return roachpb.LeaseExpiration
}
// We're the leader and we prefer leader leases, so we construct a leader
Expand Down
73 changes: 73 additions & 0 deletions pkg/kv/kvserver/leases/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ func raftStatusLeader(replicaID roachpb.ReplicaID) *raft.Status {
return s
}

func raftStatusLeaderDuringTransfer(replicaID roachpb.ReplicaID) *raft.Status {
s := raftStatusLeader(replicaID)
s.LeadTransferee = raftpb.PeerID(repl2.ReplicaID)
return s
}

// mockNodeLiveness implements the NodeLiveness interface.
type mockNodeLiveness struct {
record liveness.Record
Expand Down Expand Up @@ -468,6 +474,28 @@ func TestBuild(t *testing.T) {
},
},
},
{
name: "acquire leader lease, as raft leader, during leadership transfer",
st: useLeaderSettings(),
input: func() BuildInput {
i := defaultInput
i.RaftStatus = raftStatusLeaderDuringTransfer(repl1.ReplicaID)
return i
}(),
// The replica is a leader that is transferring leadership away, so it
// gets an expiration-based lease.
expOutput: Output{
NextLease: roachpb.Lease{
Replica: repl1,
Start: cts20,
ProposedTS: cts20,
Expiration: &ts40,
DeprecatedStartStasis: &ts40,
Sequence: 8,
AcquisitionType: roachpb.LeaseAcquisitionType_Request,
},
},
},
{
name: "replace expiration, acquire leader lease, as raft follower",
st: useLeaderSettings(),
Expand All @@ -490,6 +518,29 @@ func TestBuild(t *testing.T) {
},
},
},
{
name: "replace expiration, acquire leader lease, as raft leader, during leadership transfer",
st: useLeaderSettings(),
input: func() BuildInput {
i := expirationInput
i.RaftStatus = raftStatusLeaderDuringTransfer(repl1.ReplicaID)
return i
}(),
// The replica is a leader that is transferring leadership away, so it
// gets an expiration-based lease.
expOutput: Output{
NextLease: roachpb.Lease{
Replica: repl1,
// Start time backdated to the expiration of the previous lease.
Start: hlc.ClockTimestamp{WallTime: 10, Logical: 1},
ProposedTS: cts20,
Expiration: &ts40,
DeprecatedStartStasis: &ts40,
Sequence: 8,
AcquisitionType: roachpb.LeaseAcquisitionType_Request,
},
},
},
{
name: "missing node liveness",
nl: missingNodeLiveness(),
Expand Down Expand Up @@ -687,6 +738,28 @@ func TestBuild(t *testing.T) {
},
},
},
{
name: "promote expiration to leader lease, as raft leader, during leadership transfer",
st: useLeaderSettings(),
input: func() BuildInput {
i := expirationInput
i.RaftStatus = raftStatusLeaderDuringTransfer(repl1.ReplicaID)
return i
}(),
// The replica is a leader that is transferring leadership away, so it
// gets an (extended) expiration-based lease.
expOutput: Output{
NextLease: roachpb.Lease{
Replica: repl1,
Start: cts10,
ProposedTS: cts20,
Expiration: &ts40,
DeprecatedStartStasis: &ts40,
Sequence: 7, // sequence not changed
AcquisitionType: roachpb.LeaseAcquisitionType_Request,
},
},
},
{
name: "extend expiration",
st: useExpirationSettings(),
Expand Down
29 changes: 28 additions & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2551,8 +2551,10 @@ func (r *Replica) maybeTransferRaftLeadershipToLeaseholderLocked(
if r.store.TestingKnobs().DisableLeaderFollowsLeaseholder {
return
}
raftStatus := r.mu.internalRaftGroup.SparseStatus()
leaseAcquisitionPending := r.mu.pendingLeaseRequest.AcquisitionInProgress()
ok := shouldTransferRaftLeadershipToLeaseholderLocked(
r.mu.internalRaftGroup.SparseStatus(), leaseStatus, r.StoreID(), r.store.IsDraining())
raftStatus, leaseStatus, leaseAcquisitionPending, r.StoreID(), r.store.IsDraining())
if ok {
lhReplicaID := raftpb.PeerID(leaseStatus.Lease.Replica.ReplicaID)
log.VEventf(ctx, 1, "transferring raft leadership to replica ID %v", lhReplicaID)
Expand All @@ -2564,6 +2566,7 @@ func (r *Replica) maybeTransferRaftLeadershipToLeaseholderLocked(
func shouldTransferRaftLeadershipToLeaseholderLocked(
raftStatus raft.SparseStatus,
leaseStatus kvserverpb.LeaseStatus,
leaseAcquisitionPending bool,
storeID roachpb.StoreID,
draining bool,
) bool {
Expand All @@ -2578,6 +2581,30 @@ func shouldTransferRaftLeadershipToLeaseholderLocked(
return false
}

// If there is an attempt to acquire the lease in progress, we don't want to
// transfer leadership away. This is more than just an optimization. If we
// were to transfer away leadership while a lease request was in progress, we
// may end up acquiring a leader lease after leadership has been transferred
// away. Or worse, the leader lease acquisition may succeed and then at some
// later point, the leadership transfer could succeed, leading to leadership
// being stolen out from under the leader lease. This second case could lead
// to a lease expiration regression, as the leadership term would end before
// lead support had expired.
//
// This same form of race is not possible if the lease is transferred to us as
// raft leader, because lease transfers always send targets expiration-based
// leases and never leader leases.
//
// NOTE: this check may be redundant with the lease validity check above, as a
// replica will not attempt to acquire a valid lease. We include it anyway for
// defense-in-depth and so that the proper synchronization between leader
// leases and leadership transfer makes fewer assumptions. A leader holding
// a leader lease must never transfer leadership away before transferring the
// lease away first.
if leaseAcquisitionPending {
return false
}

// If we're draining, begin the transfer regardless of the leaseholder's raft
// progress. The leadership transfer itself will still need to wait for the
// target replica to catch up on its log before it can tell the target to
Expand Down
70 changes: 40 additions & 30 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ type pendingLeaseRequest struct {
// All accesses require repl.mu to be exclusively locked.
llHandles map[*leaseRequestHandle]struct{}
// nextLease is the pending RequestLease request, if any. It can be used to
// figure out if we're in the process of extending our own lease, or
// transferring it to another replica.
// figure out if we're in the process of acquiring our own lease, extending
// our own lease, or transferring it to another replica.
nextLease roachpb.Lease
}

Expand All @@ -240,14 +240,49 @@ func makePendingLeaseRequest(repl *Replica) pendingLeaseRequest {
}
}

// RequestPending returns the pending Lease, if one is in progress.
// The second return val is true if a lease request is pending.
// RequestPending returns the pending Lease, if one is in the process of being
// acquired, extended, or transferred. The second return val is true if a lease
// request is pending.
//
// Requires repl.mu is read locked.
func (p *pendingLeaseRequest) RequestPending() (roachpb.Lease, bool) {
return p.nextLease, p.nextLease != roachpb.Lease{}
}

// AcquisitionInProgress returns whether the replica is in the process of
// acquiring a range lease for itself. Lease extensions do not count as
// acquisitions.
//
// Requires repl.mu is read locked.
func (p *pendingLeaseRequest) AcquisitionInProgress() bool {
if nextLease, ok := p.RequestPending(); ok {
// Is the lease being acquired? (as opposed to extended or transferred)
prevLocal := p.repl.ReplicaID() == p.repl.mu.state.Lease.Replica.ReplicaID
nextLocal := p.repl.ReplicaID() == nextLease.Replica.ReplicaID
return !prevLocal && nextLocal
}
return false
}

// TransferInProgress returns whether the replica is in the process of
// transferring away its range lease. Note that the return values are
// best-effort and shouldn't be relied upon for correctness: if a previous
// transfer has returned an error, TransferInProgress will return `false`, but
// that doesn't necessarily mean that the transfer cannot still apply (see
// replica.mu.minLeaseProposedTS).
//
// It is assumed that the replica owning this pendingLeaseRequest owns the
// lease.
//
// Requires repl.mu is read locked.
func (p *pendingLeaseRequest) TransferInProgress() bool {
if nextLease, ok := p.RequestPending(); ok {
// Is the lease being transferred? (as opposed to just extended)
return p.repl.ReplicaID() != nextLease.Replica.ReplicaID
}
return false
}

// InitOrJoinRequest executes a RequestLease command asynchronously and returns a
// handle on which the result will be posted. If there's already a request in
// progress, we join in waiting for the results of that request.
Expand Down Expand Up @@ -624,27 +659,6 @@ func (p *pendingLeaseRequest) JoinRequest() *leaseRequestHandle {
return llHandle
}

// TransferInProgress returns whether the replica is in the process of
// transferring away its range lease. Note that the return values are
// best-effort and shouldn't be relied upon for correctness: if a previous
// transfer has returned an error, TransferInProgress will return `false`, but
// that doesn't necessarily mean that the transfer cannot still apply (see
// replica.mu.minLeaseProposedTS).
//
// It is assumed that the replica owning this pendingLeaseRequest owns the
// LeaderLease.
//
// replicaID is the ID of the parent replica.
//
// Requires repl.mu is read locked.
func (p *pendingLeaseRequest) TransferInProgress(replicaID roachpb.ReplicaID) bool {
if nextLease, ok := p.RequestPending(); ok {
// Is the lease being transferred? (as opposed to just extended)
return replicaID != nextLease.Replica.ReplicaID
}
return false
}

// newHandle creates a new leaseRequestHandle referencing the pending lease
// request.
func (p *pendingLeaseRequest) newHandle() *leaseRequestHandle {
Expand Down Expand Up @@ -1228,11 +1242,7 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest(
// commands - see comments on AdminTransferLease and TransferLease.
// So wait on the lease transfer to complete either successfully or
// unsuccessfully before redirecting or retrying.
repDesc, err := r.getReplicaDescriptorRLocked()
if err != nil {
return nil, kvserverpb.LeaseStatus{}, false, kvpb.NewError(err)
}
if ok := r.mu.pendingLeaseRequest.TransferInProgress(repDesc.ReplicaID); ok {
if ok := r.mu.pendingLeaseRequest.TransferInProgress(); ok {
return r.mu.pendingLeaseRequest.JoinRequest(), kvserverpb.LeaseStatus{}, true /* transfer */, nil
}

Expand Down
28 changes: 14 additions & 14 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,12 +715,7 @@ func TestBehaviorDuringLeaseTransfer(t *testing.T) {
<-transferSem
// Check that a transfer is indeed on-going.
tc.repl.mu.Lock()
repDesc, err := tc.repl.getReplicaDescriptorRLocked()
if err != nil {
tc.repl.mu.Unlock()
t.Fatal(err)
}
pending := tc.repl.mu.pendingLeaseRequest.TransferInProgress(repDesc.ReplicaID)
pending := tc.repl.mu.pendingLeaseRequest.TransferInProgress()
tc.repl.mu.Unlock()
if !pending {
t.Fatalf("expected transfer to be in progress, and it wasn't")
Expand Down Expand Up @@ -761,7 +756,7 @@ func TestBehaviorDuringLeaseTransfer(t *testing.T) {
testutils.SucceedsSoon(t, func() error {
tc.repl.mu.Lock()
defer tc.repl.mu.Unlock()
pending := tc.repl.mu.pendingLeaseRequest.TransferInProgress(repDesc.ReplicaID)
pending := tc.repl.mu.pendingLeaseRequest.TransferInProgress()
if pending {
return errors.New("transfer pending")
}
Expand Down Expand Up @@ -11802,10 +11797,11 @@ func TestReplicaShouldTransferRaftLeadershipToLeaseholder(t *testing.T) {
defer log.Scope(t).Close(t)

type params struct {
raftStatus raft.SparseStatus
leaseStatus kvserverpb.LeaseStatus
storeID roachpb.StoreID
draining bool
raftStatus raft.SparseStatus
leaseStatus kvserverpb.LeaseStatus
leaseAcquisitionPending bool
storeID roachpb.StoreID
draining bool
}

// Set up a base state that we can vary, representing this node n1 being a
Expand Down Expand Up @@ -11834,8 +11830,9 @@ func TestReplicaShouldTransferRaftLeadershipToLeaseholder(t *testing.T) {
}},
State: kvserverpb.LeaseState_VALID,
},
storeID: localID,
draining: false,
leaseAcquisitionPending: false,
storeID: localID,
draining: false,
}

testcases := map[string]struct {
Expand Down Expand Up @@ -11863,6 +11860,9 @@ func TestReplicaShouldTransferRaftLeadershipToLeaseholder(t *testing.T) {
"local lease": {false, func(p *params) {
p.leaseStatus.Lease.Replica.ReplicaID = localID
}},
"lease request pending": {false, func(p *params) {
p.leaseAcquisitionPending = true
}},
"no progress": {false, func(p *params) {
p.raftStatus.Progress = map[raftpb.PeerID]tracker.Progress{}
}},
Expand All @@ -11884,7 +11884,7 @@ func TestReplicaShouldTransferRaftLeadershipToLeaseholder(t *testing.T) {
p := base
tc.modify(&p)
require.Equal(t, tc.expect, shouldTransferRaftLeadershipToLeaseholderLocked(
p.raftStatus, p.leaseStatus, p.storeID, p.draining))
p.raftStatus, p.leaseStatus, p.leaseAcquisitionPending, p.storeID, p.draining))
})
}
}
Expand Down

0 comments on commit b92a8ea

Please sign in to comment.