diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index d83149539f03..a423d70291ed 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -4764,7 +4764,13 @@ func TestPartialPartition(t *testing.T) { require.NoError(t, tc.WaitForFullReplication()) desc := tc.LookupRangeOrFatal(t, scratchKey) - tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1)) + err := tc.TransferRangeLease(desc, tc.Target(1)) + if leaseType != roachpb.LeaseLeader { + // In leader leases, the leader won't campaign if it's not supported + // by a majority. We will keep trying to transfer the lease until it + // succeeds below. + require.NoError(t, err) + } // TODO(baptist): This test should work without this block. // After the lease is transferred, the lease might still be on @@ -4778,6 +4784,16 @@ func TestPartialPartition(t *testing.T) { // DistSender we will never succeed once we partition. Remove // this block once #118943 is fixed. testutils.SucceedsSoon(t, func() error { + if leaseType == roachpb.LeaseLeader { + // In leader leases, the leader won't campaign if it's not supported + // by a majority of voters. This causes a flake in this test + // where the new leader is not elected if it doesn't yet have + // support from a majority. We need to keep trying to transfer the + // lease until the new leader is elected. + if err := tc.TransferRangeLease(desc, tc.Target(1)); err != nil { + return err + } + } sl := tc.StorageLayer(1) store, err := sl.GetStores().(*kvserver.Stores).GetStore(sl.GetFirstStoreID()) require.NoError(t, err) @@ -4795,7 +4811,7 @@ func TestPartialPartition(t *testing.T) { // to fail faster. cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - err := txn.Put(cancelCtx, scratchKey, "abc") + err = txn.Put(cancelCtx, scratchKey, "abc") if test.useProxy { require.NoError(t, err) require.NoError(t, txn.Commit(cancelCtx)) diff --git a/pkg/kv/kvserver/client_raft_helpers_test.go b/pkg/kv/kvserver/client_raft_helpers_test.go index 3a7e60e5459c..5895e9cff137 100644 --- a/pkg/kv/kvserver/client_raft_helpers_test.go +++ b/pkg/kv/kvserver/client_raft_helpers_test.go @@ -472,6 +472,8 @@ func dropRaftMessagesFrom( drop := shouldDropFromStore(msg.From.StoreID) if drop { t.Logf("dropping msg %s from store %d: to %d", msg.Type, msg.From.StoreID, msg.To.StoreID) + } else { + t.Logf("allowing msg %s from store %d: to %d", msg.Type, msg.From.StoreID, msg.To.StoreID) } return drop }, diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 471b60e72a70..9dad8d41091d 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -1132,11 +1132,12 @@ func TestRequestsOnLaggingReplica(t *testing.T) { // handler on the other two stores only filters out messages from the // partitioned store. The configuration looks like: // - // [0] - // x x - // / \ - // x x - // [1]<---->[2] + // [symmetric=false] [symmetric=true] + // [0] [0] + // ^ ^ x x + // / \ / \ + // x x x x + // [1]<---->[2] [1]<----->[2] // log.Infof(ctx, "test: partitioning node") const partitionNodeIdx = 0 @@ -1161,8 +1162,8 @@ func TestRequestsOnLaggingReplica(t *testing.T) { // partitioned one. log.Infof(ctx, "test: waiting for leadership to fail over") testutils.SucceedsSoon(t, func() error { - if partRepl.RaftStatus().Lead != raft.None { - return errors.New("partitioned replica should stepped down") + if partRepl.RaftStatus().RaftState == raftpb.StateLeader { + return errors.New("partitioned replica should have stepped down") } lead := otherRepl.RaftStatus().Lead if lead == raft.None { @@ -1239,7 +1240,16 @@ func TestRequestsOnLaggingReplica(t *testing.T) { require.Error(t, pErr.GoError(), "unexpected success") nlhe := &kvpb.NotLeaseHolderError{} require.ErrorAs(t, pErr.GetDetail(), &nlhe, pErr) - require.True(t, nlhe.Lease.Empty()) + + if symmetric { + // In symmetric=true, we expect that the partitioned replica to not know + // about the leader. As a result, it returns a NotLeaseHolderError without + // a speculative lease. + require.True(t, nlhe.Lease.Empty(), "expected empty lease, got %v", nlhe.Lease) + } else { + require.False(t, nlhe.Lease.Empty()) + require.Equal(t, leaderReplicaID, nlhe.Lease.Replica.ReplicaID) + } // Resolve the partition, but continue blocking snapshots destined for the // previously-partitioned replica. The point of blocking the snapshots is to @@ -1270,7 +1280,7 @@ func TestRequestsOnLaggingReplica(t *testing.T) { continue } store := tc.GetFirstStoreFromServer(t, i) - store.Transport().ListenIncomingRaftMessages(store.Ident.StoreID, store) + store.Transport().ListenIncomingRaftMessages(store.StoreID(), store) store.StoreLivenessTransport().ListenMessages( store.StoreID(), store.TestingStoreLivenessMessageHandler(), ) @@ -6536,16 +6546,13 @@ func TestRaftCheckQuorum(t *testing.T) { require.False(t, repl1.IsQuiescent()) t.Logf("n1 not quiesced") - // Wait for the leader to become a candidate. + // Wait for the leader to step down. require.Eventually(t, func() bool { status := repl1.RaftStatus() logStatus(status) - // TODO(ibrahim): once we start checking StoreLiveness before - // transitioning to a pre-candidate, we'll need to switch this (and the - // conditional below) to handle this. - return status.RaftState == raftpb.StatePreCandidate + return status.RaftState != raftpb.StateLeader }, 10*time.Second, 500*time.Millisecond) - t.Logf("n1 became pre-candidate") + t.Logf("n1 stepped down as a leader") // n2 or n3 should elect a new leader. var leaderStatus *raft.Status @@ -6561,23 +6568,17 @@ func TestRaftCheckQuorum(t *testing.T) { }, 10*time.Second, 500*time.Millisecond) t.Logf("n%d became leader", leaderStatus.ID) - // n1 should remain pre-candidate, since it doesn't hear about the new - // leader. + // n1 shouldn't become a leader. require.Never(t, func() bool { status := repl1.RaftStatus() logStatus(status) - // TODO(ibrahim): uncomment this once we start checking StoreLiveness - // before transitioning to a pre-candidate. - //expState := status.RaftState == raftpb.StateFollower && status.Lead == raft.None - //return !expState // require.Never - return status.RaftState != raftpb.StatePreCandidate + expState := status.RaftState != raftpb.StateLeader && status.Lead == raft.None + return !expState // require.Never }, 3*time.Second, 500*time.Millisecond) - t.Logf("n1 remains pre-candidate") + t.Logf("n1 remains not leader") - // The existing leader shouldn't have been affected by n1's prevotes. - // TODO(ibrahim): This portion of the test can be removed entirely once we - // don't even transition to a pre-candidate because StoreLiveness doesn't - // let us. + // The existing leader shouldn't have been affected by the possible n1's + // prevotes. var finalStatus *raft.Status for _, status := range []*raft.Status{repl2.RaftStatus(), repl3.RaftStatus()} { logStatus(status) diff --git a/pkg/raft/raft.go b/pkg/raft/raft.go index afe3de0f746f..2d6222a2a43f 100644 --- a/pkg/raft/raft.go +++ b/pkg/raft/raft.go @@ -1356,6 +1356,14 @@ func (r *raft) hup(t CampaignType) { r.logger.Debugf("%x ignoring MsgHup due to leader fortification", r.id) return } + + // We shouldn't campaign if we don't have quorum support in store liveness. + if r.fortificationTracker.RequireQuorumSupportOnCampaign() && + !r.fortificationTracker.QuorumSupported() { + r.logger.Debugf("%x cannot campaign since it's not supported by a quorum in store liveness", r.id) + return + } + if r.hasUnappliedConfChanges() { r.logger.Warningf("%x cannot campaign at term %d since there are still pending configuration changes to apply", r.id, r.Term) return diff --git a/pkg/raft/testdata/de_fortification_checkquorum.txt b/pkg/raft/testdata/de_fortification_checkquorum.txt index 2a868d6d2a22..4b8744fc03cc 100644 --- a/pkg/raft/testdata/de_fortification_checkquorum.txt +++ b/pkg/raft/testdata/de_fortification_checkquorum.txt @@ -46,7 +46,6 @@ withdraw-support 3 1 2 x 1 1 3 x 1 1 - tick-election 1 ---- ok @@ -77,20 +76,16 @@ set-randomized-election-timeout 1 timeout=3 ---- ok -# Even though the leader doesn't have store liveness support, it'll still call -# a pre-vote election until we start checking store liveness before doing so. +# Since we don't have store liveness support, we can't campaign. # However, on this ticker, it'll also broadcast a de-fortify to all peers -- # which is what we're interested in for this test. tick-election 1 ---- -INFO 1 is starting a new election at term 1 -INFO 1 became pre-candidate at term 1 -INFO 1 [logterm: 1, index: 11] sent MsgPreVote request to 2 at term 1 -INFO 1 [logterm: 1, index: 11] sent MsgPreVote request to 3 at term 1 +DEBUG 1 cannot campaign since it's not supported by a quorum in store liveness raft-state 1 ---- -1: StatePreCandidate (Voter) Term:1 Lead:0 LeadEpoch:0 +1: StateFollower (Voter) Term:1 Lead:0 LeadEpoch:0 2: StateFollower (Voter) Term:1 Lead:1 LeadEpoch:1 3: StateFollower (Voter) Term:1 Lead:1 LeadEpoch:1 @@ -99,8 +94,8 @@ stabilize ---- > 1 handling Ready Ready MustSync=true: - State:StatePreCandidate - HardState Term:1 Vote:1 Commit:11 Lead:0 LeadEpoch:0 + State:StateFollower + HardState Term:1 Vote:1 Commit:11 Lead:1 LeadEpoch:0 Messages: 1->2 MsgDeFortifyLeader Term:1 Log:0/0 1->3 MsgDeFortifyLeader Term:1 Log:0/0 @@ -108,26 +103,18 @@ stabilize 1->3 MsgDeFortifyLeader Term:1 Log:0/0 1->2 MsgDeFortifyLeader Term:1 Log:0/0 1->3 MsgDeFortifyLeader Term:1 Log:0/0 - 1->2 MsgPreVote Term:2 Log:1/11 - 1->3 MsgPreVote Term:2 Log:1/11 - INFO 1 received MsgPreVoteResp from 1 at term 1 - INFO 1 has received 1 MsgPreVoteResp votes and 0 vote rejections > 2 receiving messages 1->2 MsgDeFortifyLeader Term:1 Log:0/0 1->2 MsgDeFortifyLeader Term:1 Log:0/0 DEBUG 2 is not fortifying 1; de-fortification is a no-op 1->2 MsgDeFortifyLeader Term:1 Log:0/0 DEBUG 2 is not fortifying 1; de-fortification is a no-op - 1->2 MsgPreVote Term:2 Log:1/11 - INFO 2 [logterm: 1, index: 11, vote: 1] ignored MsgPreVote from 1 [logterm: 1, index: 11] at term 1: recently received communication from leader (remaining ticks: 3) > 3 receiving messages 1->3 MsgDeFortifyLeader Term:1 Log:0/0 1->3 MsgDeFortifyLeader Term:1 Log:0/0 DEBUG 3 is not fortifying 1; de-fortification is a no-op 1->3 MsgDeFortifyLeader Term:1 Log:0/0 DEBUG 3 is not fortifying 1; de-fortification is a no-op - 1->3 MsgPreVote Term:2 Log:1/11 - INFO 3 [logterm: 1, index: 11, vote: 1] ignored MsgPreVote from 1 [logterm: 1, index: 11] at term 1: recently received communication from leader (remaining ticks: 3) > 2 handling Ready Ready MustSync=true: HardState Term:1 Vote:1 Commit:11 Lead:1 LeadEpoch:0 @@ -138,7 +125,7 @@ stabilize # All peers have been de-fortified successfully. raft-state ---- -1: StatePreCandidate (Voter) Term:1 Lead:0 LeadEpoch:0 +1: StateFollower (Voter) Term:1 Lead:0 LeadEpoch:0 2: StateFollower (Voter) Term:1 Lead:1 LeadEpoch:0 3: StateFollower (Voter) Term:1 Lead:1 LeadEpoch:0 diff --git a/pkg/raft/testdata/forget_leader_prevote_checkquorum.txt b/pkg/raft/testdata/forget_leader_prevote_checkquorum.txt index 770f040a98fb..17e6fda41daf 100644 --- a/pkg/raft/testdata/forget_leader_prevote_checkquorum.txt +++ b/pkg/raft/testdata/forget_leader_prevote_checkquorum.txt @@ -238,6 +238,18 @@ withdraw-support 1 3 2 x 1 x 3 x 1 1 +# At this point we can't campaign because we are not supported by a quorum. +campaign 1 +---- +DEBUG 1 cannot campaign since it's not supported by a quorum in store liveness + +grant-support 3 1 +---- + 1 2 3 +1 3 1 x +2 x 1 x +3 3 1 1 + campaign 1 ---- INFO 1 is starting a new election at term 2 diff --git a/pkg/raft/testdata/fortification_support_tracking.txt b/pkg/raft/testdata/fortification_support_tracking.txt index f59c5222226a..668f314d27d9 100644 --- a/pkg/raft/testdata/fortification_support_tracking.txt +++ b/pkg/raft/testdata/fortification_support_tracking.txt @@ -23,13 +23,6 @@ withdraw-support 2 1 2 x 1 1 3 1 1 1 -withdraw-support 3 1 ----- - 1 2 3 -1 1 1 1 -2 x 1 1 -3 x 1 1 - campaign 1 ---- INFO 1 is starting a new election at term 0 @@ -81,11 +74,13 @@ stabilize Entries: 1/11 EntryNormal "" Messages: + 1->3 MsgFortifyLeader Term:1 Log:0/0 1->2 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] 1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] > 2 receiving messages 1->2 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] > 3 receiving messages + 1->3 MsgFortifyLeader Term:1 Log:0/0 1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] > 2 handling Ready Ready MustSync=true: @@ -96,13 +91,15 @@ stabilize 2->1 MsgAppResp Term:1 Log:0/11 Commit:10 > 3 handling Ready Ready MustSync=true: - HardState Term:1 Vote:1 Commit:10 Lead:1 LeadEpoch:0 + HardState Term:1 Vote:1 Commit:10 Lead:1 LeadEpoch:1 Entries: 1/11 EntryNormal "" Messages: + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:1 3->1 MsgAppResp Term:1 Log:0/11 Commit:10 > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/11 Commit:10 + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:1 3->1 MsgAppResp Term:1 Log:0/11 Commit:10 > 1 handling Ready Ready MustSync=true: @@ -111,10 +108,12 @@ stabilize 1/11 EntryNormal "" Messages: 1->2 MsgApp Term:1 Log:1/11 Commit:11 + 1->3 MsgApp Term:1 Log:1/10 Commit:11 Entries:[1/11 EntryNormal ""] 1->3 MsgApp Term:1 Log:1/11 Commit:11 > 2 receiving messages 1->2 MsgApp Term:1 Log:1/11 Commit:11 > 3 receiving messages + 1->3 MsgApp Term:1 Log:1/10 Commit:11 Entries:[1/11 EntryNormal ""] 1->3 MsgApp Term:1 Log:1/11 Commit:11 > 2 handling Ready Ready MustSync=true: @@ -125,18 +124,28 @@ stabilize 2->1 MsgAppResp Term:1 Log:0/11 Commit:11 > 3 handling Ready Ready MustSync=true: - HardState Term:1 Vote:1 Commit:11 Lead:1 LeadEpoch:0 + HardState Term:1 Vote:1 Commit:11 Lead:1 LeadEpoch:1 CommittedEntries: 1/11 EntryNormal "" Messages: 3->1 MsgAppResp Term:1 Log:0/11 Commit:11 + 3->1 MsgAppResp Term:1 Log:0/11 Commit:11 > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/11 Commit:11 3->1 MsgAppResp Term:1 Log:0/11 Commit:11 + 3->1 MsgAppResp Term:1 Log:0/11 Commit:11 print-fortification-state 1 ---- 1 : 1 +3 : 1 + +withdraw-support 3 1 +---- + 1 2 3 +1 1 1 1 +2 x 1 1 +3 x 1 1 bump-epoch 2 ---- diff --git a/pkg/raft/testdata/prevote_checkquorum.txt b/pkg/raft/testdata/prevote_checkquorum.txt index 2a6286fe9629..3af8ac0e373e 100644 --- a/pkg/raft/testdata/prevote_checkquorum.txt +++ b/pkg/raft/testdata/prevote_checkquorum.txt @@ -228,6 +228,18 @@ withdraw-support 1 3 2 x 1 1 3 x 1 1 +# At this point we can't campaign because we are not supported by a quorum. +campaign 1 +---- +DEBUG 1 cannot campaign since it's not supported by a quorum in store liveness + +grant-support 3 1 +---- + 1 2 3 +1 2 1 x +2 x 1 1 +3 2 1 1 + # Node 3 is now the leader. Even though the leader is active, nodes 1 and 2 can # still win a prevote and election if they both explicitly campaign, since the # PreVote+CheckQuorum recent leader condition only applies to follower voters. @@ -264,9 +276,9 @@ stabilize withdraw-support 2 3 ---- 1 2 3 -1 1 1 x +1 2 1 x 2 x 1 x -3 x 1 1 +3 2 1 1 campaign 2 ---- diff --git a/pkg/raft/tracker/fortificationtracker.go b/pkg/raft/tracker/fortificationtracker.go index b1625005a712..56012e386e36 100644 --- a/pkg/raft/tracker/fortificationtracker.go +++ b/pkg/raft/tracker/fortificationtracker.go @@ -51,6 +51,10 @@ type FortificationTracker struct { // epochs that they have supported the leader in. fortification map[pb.PeerID]pb.Epoch + // votersSupport is a map that hangs off the fortificationTracker to prevent + // allocations on every call to QuorumSupported. + votersSupport map[pb.PeerID]bool + // leaderMaxSupported is the maximum LeadSupportUntil that the leader has // ever claimed to support. Tracking this ensures that LeadSupportUntil // never regresses for a raft group. Naively, without any tracking, this @@ -82,6 +86,7 @@ func NewFortificationTracker( config: config, storeLiveness: storeLiveness, fortification: map[pb.PeerID]pb.Epoch{}, + votersSupport: map[pb.PeerID]bool{}, logger: logger, } return &st @@ -320,6 +325,25 @@ func (ft *FortificationTracker) QuorumActive() bool { return !ft.storeLiveness.SupportExpired(ft.LeadSupportUntil(pb.StateLeader)) } +// RequireQuorumSupportOnCampaign returns true if quorum support before +// campaigning is required. +func (ft *FortificationTracker) RequireQuorumSupportOnCampaign() bool { + return ft.storeLiveness.SupportFromEnabled() +} + +// QuorumSupported returns whether this peer is currently supported by a quorum +// or not. +func (ft *FortificationTracker) QuorumSupported() bool { + clear(ft.votersSupport) + + ft.config.Voters.Visit(func(id pb.PeerID) { + _, isSupported := ft.IsFortifiedBy(id) + ft.votersSupport[id] = isSupported + }) + + return ft.config.Voters.VoteResult(ft.votersSupport) == quorum.VoteWon +} + // Term returns the leadership term for which the tracker is/was tracking // fortification state. func (ft *FortificationTracker) Term() uint64 { diff --git a/pkg/raft/tracker/fortificationtracker_test.go b/pkg/raft/tracker/fortificationtracker_test.go index a8b2fb595ce6..75982231026f 100644 --- a/pkg/raft/tracker/fortificationtracker_test.go +++ b/pkg/raft/tracker/fortificationtracker_test.go @@ -399,6 +399,163 @@ func TestQuorumActive(t *testing.T) { } } +// TestQuorumSupported ensures that we correctly determine whether a leader's +// quorum is supported or not. +func TestQuorumSupported(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ts := func(ts int64) hlc.Timestamp { + return hlc.Timestamp{ + WallTime: ts, + } + } + + createJointQuorum := func(c0 []pb.PeerID, c1 []pb.PeerID) quorum.JointConfig { + jointConfig := quorum.JointConfig{} + + // Populate the first joint config entry. + if len(c0) > 0 { + jointConfig[0] = make(quorum.MajorityConfig, len(c0)) + } + for _, id := range c0 { + jointConfig[0][id] = struct{}{} + } + + // Populate the second joint config entry. + if len(c1) > 0 { + jointConfig[1] = make(quorum.MajorityConfig, len(c1)) + } + for _, id := range c1 { + jointConfig[1][id] = struct{}{} + } + return jointConfig + } + + testCases := []struct { + curTS hlc.Timestamp + voters quorum.JointConfig + storeLiveness mockStoreLiveness + expQuorumSupported bool + }{ + { + curTS: ts(10), + voters: createJointQuorum([]pb.PeerID{1, 2, 3}, []pb.PeerID{}), + storeLiveness: makeMockStoreLiveness( + // No support recorded. + map[pb.PeerID]mockLivenessEntry{}, + ), + expQuorumSupported: false, + }, + { + curTS: ts(10), + voters: createJointQuorum([]pb.PeerID{1, 2, 3}, []pb.PeerID{}), + storeLiveness: makeMockStoreLiveness( + map[pb.PeerID]mockLivenessEntry{ + 1: makeMockLivenessEntry(10, ts(10)), + // Missing peer 2. + 3: makeMockLivenessEntry(30, ts(20)), + }, + ), + expQuorumSupported: true, + }, + { + curTS: ts(10), + voters: createJointQuorum([]pb.PeerID{1, 2, 3}, []pb.PeerID{}), + storeLiveness: makeMockStoreLiveness( + map[pb.PeerID]mockLivenessEntry{ + 1: makeMockLivenessEntry(10, ts(10)), + // Missing peers 2 and 3. + }, + ), + expQuorumSupported: false, + }, + { + curTS: ts(10), + voters: createJointQuorum([]pb.PeerID{1, 2, 3}, []pb.PeerID{}), + storeLiveness: makeMockStoreLiveness( + map[pb.PeerID]mockLivenessEntry{ + // Expired support for peer 1. + 1: makeMockLivenessEntry(10, ts(5)), + 2: makeMockLivenessEntry(20, ts(15)), + 3: makeMockLivenessEntry(30, ts(20)), + }, + ), + expQuorumSupported: true, + }, + { + curTS: ts(10), + voters: createJointQuorum([]pb.PeerID{1, 2, 3}, []pb.PeerID{}), + storeLiveness: makeMockStoreLiveness( + map[pb.PeerID]mockLivenessEntry{ + // Expired support for peers 1 and 2. + 1: makeMockLivenessEntry(10, ts(5)), + 2: makeMockLivenessEntry(20, ts(5)), + 3: makeMockLivenessEntry(30, ts(20)), + }, + ), + expQuorumSupported: false, + }, + { + curTS: ts(10), + voters: createJointQuorum([]pb.PeerID{1, 2, 3}, []pb.PeerID{}), + storeLiveness: makeMockStoreLiveness( + map[pb.PeerID]mockLivenessEntry{ + 1: makeMockLivenessEntry(10, ts(10)), + 2: makeMockLivenessEntry(20, ts(15)), + 3: makeMockLivenessEntry(30, ts(20)), + }, + ), + expQuorumSupported: true, + }, + { + curTS: ts(10), + // Simulate a joint quorum when adding two more nodes. + voters: createJointQuorum([]pb.PeerID{1, 2, 3}, []pb.PeerID{1, 2, 3, 4, 5}), + storeLiveness: makeMockStoreLiveness( + map[pb.PeerID]mockLivenessEntry{ + // Expired supported from 1 and 2. + 1: makeMockLivenessEntry(10, ts(5)), + 2: makeMockLivenessEntry(20, ts(5)), + 3: makeMockLivenessEntry(20, ts(15)), + 4: makeMockLivenessEntry(20, ts(15)), + 5: makeMockLivenessEntry(10, ts(15)), + }, + ), + // Expect the quorum to NOT be supported since the one of the majorities + // doesn't provide support. + expQuorumSupported: false, + }, + { + curTS: ts(10), + // Simulate a joint quorum when adding two more nodes. + voters: createJointQuorum([]pb.PeerID{1, 2, 3}, []pb.PeerID{1, 2, 3, 4, 5}), + storeLiveness: makeMockStoreLiveness( + map[pb.PeerID]mockLivenessEntry{ + // Expired supported from 1 and 5. + 1: makeMockLivenessEntry(10, ts(5)), + 2: makeMockLivenessEntry(20, ts(15)), + 3: makeMockLivenessEntry(10, ts(15)), + 4: makeMockLivenessEntry(20, ts(15)), + 5: makeMockLivenessEntry(10, ts(5)), + }, + ), + // Expect the quorum to be supported since the two majorities provided + // support. + expQuorumSupported: true, + }, + } + + for _, tc := range testCases { + tc.storeLiveness.curTS = tc.curTS + cfg := quorum.MakeEmptyConfig() + cfg.Voters = tc.voters + fortificationTracker := NewFortificationTracker(&cfg, tc.storeLiveness, + raftlogger.DiscardLogger) + require.Equal(t, tc.expQuorumSupported, fortificationTracker.QuorumSupported()) + } +} + // TestCanDefortify tests whether a leader can safely de-fortify or not based // on some tracked state. func TestCanDefortify(t *testing.T) { diff --git a/pkg/sql/run_control_test.go b/pkg/sql/run_control_test.go index a2cae8aa4929..5388460f5b8c 100644 --- a/pkg/sql/run_control_test.go +++ b/pkg/sql/run_control_test.go @@ -1016,6 +1016,10 @@ func TestStatementTimeoutForSchemaChangeCommit(t *testing.T) { require.NoError(t, err) // Test implicit transactions first. blockSchemaChange.Swap(true) + defer func() { + close(waitForTimeout) + blockSchemaChange.Swap(false) + }() if implicitTxn { _, err := conn.DB.ExecContext(ctx, "ALTER TABLE t1 ADD COLUMN j INT DEFAULT 32") require.ErrorContains(t, err, sqlerrors.QueryTimeoutError.Error()) @@ -1030,8 +1034,6 @@ func TestStatementTimeoutForSchemaChangeCommit(t *testing.T) { err = txn.Commit() require.NoError(t, err) } - close(waitForTimeout) - blockSchemaChange.Swap(false) }) } } diff --git a/pkg/workload/tpcc/BUILD.bazel b/pkg/workload/tpcc/BUILD.bazel index 4faece03107f..e4157466c886 100644 --- a/pkg/workload/tpcc/BUILD.bazel +++ b/pkg/workload/tpcc/BUILD.bazel @@ -30,6 +30,7 @@ go_library( "//pkg/util/bufalloc", "//pkg/util/ctxgroup", "//pkg/util/log", + "//pkg/util/quotapool", "//pkg/util/retry", "//pkg/util/syncutil", "//pkg/util/timeutil", diff --git a/pkg/workload/tpcc/tpcc.go b/pkg/workload/tpcc/tpcc.go index 2600d2b0ff0c..7cc11c13862b 100644 --- a/pkg/workload/tpcc/tpcc.go +++ b/pkg/workload/tpcc/tpcc.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/parser/statements" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" @@ -78,6 +79,7 @@ type tpcc struct { onTxnStartFns []func(ctx context.Context, tx pgx.Tx) error workers int + activeWorkers int fks bool separateColumnFamilies bool // deprecatedFKIndexes adds in foreign key indexes that are no longer needed @@ -247,6 +249,7 @@ var tpccMeta = workload.Meta{ `partition-strategy`: {RuntimeOnly: true}, `zones`: {RuntimeOnly: true}, `active-warehouses`: {RuntimeOnly: true}, + `active-workers`: {RuntimeOnly: true}, `scatter`: {RuntimeOnly: true}, `split`: {RuntimeOnly: true}, `wait`: {RuntimeOnly: true}, @@ -294,6 +297,7 @@ var tpccMeta = workload.Meta{ g.flags.StringSliceVar(&g.multiRegionCfg.regions, "regions", []string{}, "Regions to use for multi-region partitioning. The first region is the PRIMARY REGION. Does not work with --zones.") g.flags.Var(&g.multiRegionCfg.survivalGoal, "survival-goal", "Survival goal to use for multi-region setups. Allowed values: [zone, region].") g.flags.IntVar(&g.activeWarehouses, `active-warehouses`, 0, `Run the load generator against a specific number of warehouses. Defaults to --warehouses'`) + g.flags.IntVar(&g.activeWorkers, `active-workers`, 0, `Number of workers that can execute at any given time. Defaults to --workers'`) g.flags.BoolVar(&g.scatter, `scatter`, false, `Scatter ranges`) g.flags.BoolVar(&g.split, `split`, false, `Split tables`) g.flags.BoolVar(&g.expensiveChecks, `expensive-checks`, false, `Run expensive checks`) @@ -416,6 +420,12 @@ func (w *tpcc) Hooks() workload.Hooks { w.workers = w.activeWarehouses * NumWorkersPerWarehouse } + if w.activeWorkers > w.workers { + return errors.Errorf(`--active-workers needs to be less than or equal to workers (%d)`, w.workers) + } else if w.activeWorkers == 0 { + w.activeWorkers = w.workers + } + if w.numConns == 0 { // If we're not waiting, open up a connection for each worker. If we are // waiting, we only use up to a set number of connections per warehouse. @@ -428,8 +438,8 @@ func (w *tpcc) Hooks() workload.Hooks { } } - if w.waitFraction > 0 && w.workers != w.activeWarehouses*NumWorkersPerWarehouse { - return errors.Errorf(`--wait > 0 and --warehouses=%d requires --workers=%d`, + if w.waitFraction > 0 && w.activeWorkers != w.activeWarehouses*NumWorkersPerWarehouse { + return errors.Errorf(`--wait > 0 and --warehouses=%d requires --active-workers=%d`, w.activeWarehouses, w.warehouses*NumWorkersPerWarehouse) } @@ -991,6 +1001,9 @@ func (w *tpcc) Ops( // If nothing is mine, then everything is mine. return len(w.affinityPartitions) == 0 } + + workersSem := quotapool.NewIntPool("workers", uint64(w.activeWorkers)) + // Limit the amount of workers we initialize in parallel, to avoid running out // of memory (#36897). sem := make(chan struct{}, 100) @@ -1019,7 +1032,7 @@ func (w *tpcc) Ops( idx := len(ql.WorkerFns) - 1 sem <- struct{}{} group.Go(func() error { - worker, err := newWorker(ctx, w, db, reg.GetHandle(), w.txCounters, warehouse) + worker, err := newWorker(ctx, w, db, reg.GetHandle(), w.txCounters, warehouse, workersSem) if err == nil { ql.WorkerFns[idx] = worker.run } diff --git a/pkg/workload/tpcc/worker.go b/pkg/workload/tpcc/worker.go index 9a6442e5b320..e984f3b63f08 100644 --- a/pkg/workload/tpcc/worker.go +++ b/pkg/workload/tpcc/worker.go @@ -13,6 +13,7 @@ import ( "strings" "time" + "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/cockroach/pkg/workload/histogram" @@ -165,6 +166,8 @@ type worker struct { permIdx int counters txCounters + + workerSem *quotapool.IntPool } func newWorker( @@ -174,6 +177,7 @@ func newWorker( hists *histogram.Histograms, counters txCounters, warehouse int, + workerSem *quotapool.IntPool, ) (*worker, error) { w := &worker{ config: config, @@ -183,6 +187,7 @@ func newWorker( deckPerm: append([]int(nil), config.deck...), permIdx: len(config.deck), counters: counters, + workerSem: workerSem, } for i := range w.txs { var err error @@ -195,6 +200,11 @@ func newWorker( } func (w *worker) run(ctx context.Context) error { + sem, err := w.workerSem.Acquire(ctx, 1) + if err != nil { + return err + } + defer sem.Release() // 5.2.4.2: the required mix is achieved by selecting each new transaction // uniformly at random from a deck whose content guarantees the required // transaction mix. Each pass through a deck must be made in a different