From bf54a27bfb1fa8731320b052b9c709f412fbdb7a Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 8 Apr 2024 11:36:21 -0700 Subject: [PATCH 1/5] Revert "sql: fix leak in memory accounting around TxnFingerprintIDCache" This reverts commit 780ddf49ab00dddc7a2bfff0ce7024c3c2b0d4c0. --- pkg/sql/conn_executor.go | 6 ++++- pkg/sql/conn_executor_exec.go | 10 ++++++-- pkg/sql/txn_fingerprint_id_cache.go | 31 +++++++++++++++++++++--- pkg/sql/txn_fingerprint_id_cache_test.go | 6 +++-- 4 files changed, 44 insertions(+), 9 deletions(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 39a4325883e1..17d47c0652f4 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1059,6 +1059,7 @@ func (s *Server) newConnExecutor( MaxHist: memMetrics.TxnMaxBytesHist, Settings: s.cfg.Settings, }) + txnFingerprintIDCacheAcc := sessionMon.MakeBoundAccount() nodeIDOrZero, _ := s.cfg.NodeInfo.NodeID.OptionalNodeID() ex := &connExecutor{ @@ -1100,7 +1101,8 @@ func (s *Server) newConnExecutor( indexUsageStats: s.indexUsageStats, txnIDCacheWriter: s.txnIDCache, totalActiveTimeStopWatch: timeutil.NewStopWatch(), - txnFingerprintIDCache: NewTxnFingerprintIDCache(s.cfg.Settings), + txnFingerprintIDCache: NewTxnFingerprintIDCache(ctx, s.cfg.Settings, &txnFingerprintIDCacheAcc), + txnFingerprintIDAcc: &txnFingerprintIDCacheAcc, } ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount @@ -1329,6 +1331,7 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) { ex.mu.IdleInSessionTimeout.Stop() ex.mu.IdleInTransactionSessionTimeout.Stop() + ex.txnFingerprintIDAcc.Close(ctx) if closeType != panicClose { ex.state.mon.Stop(ctx) ex.sessionPreparedMon.Stop(ctx) @@ -1744,6 +1747,7 @@ type connExecutor struct { // txnFingerprintIDCache is used to track the most recent // txnFingerprintIDs executed in this session. txnFingerprintIDCache *TxnFingerprintIDCache + txnFingerprintIDAcc *mon.BoundAccount // totalActiveTimeStopWatch tracks the total active time of the session. // This is defined as the time spent executing transactions and statements. diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 23f6d19a549e..1a53f435797e 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -3213,7 +3213,13 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent, txnErr err ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionEndExecTransaction, timeutil.Now()) transactionFingerprintID := appstatspb.TransactionFingerprintID(ex.extraTxnState.transactionStatementsHash.Sum()) - ex.txnFingerprintIDCache.Add(transactionFingerprintID) + + err := ex.txnFingerprintIDCache.Add(ctx, transactionFingerprintID) + if err != nil { + if log.V(1) { + log.Warningf(ctx, "failed to enqueue transactionFingerprintID = %d: %s", transactionFingerprintID, err) + } + } ex.statsCollector.EndTransaction( ctx, @@ -3229,7 +3235,7 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent, txnErr err ) } - err := ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart, txnErr) + err = ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart, txnErr) if err != nil { if log.V(1) { log.Warningf(ctx, "failed to record transaction stats: %s", err) diff --git a/pkg/sql/txn_fingerprint_id_cache.go b/pkg/sql/txn_fingerprint_id_cache.go index b97700660d55..b8fdf7dbd768 100644 --- a/pkg/sql/txn_fingerprint_id_cache.go +++ b/pkg/sql/txn_fingerprint_id_cache.go @@ -11,10 +11,14 @@ package sql import ( + "context" + "unsafe" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" "github.com/cockroachdb/cockroach/pkg/util/cache" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) @@ -36,13 +40,22 @@ type TxnFingerprintIDCache struct { mu struct { syncutil.RWMutex + acc *mon.BoundAccount cache *cache.UnorderedCache } } +const ( + cacheEntrySize = int64(unsafe.Sizeof(cache.Entry{})) + txnFingerprintIDSize = int64(unsafe.Sizeof(appstatspb.TransactionFingerprintID(0))) +) + // NewTxnFingerprintIDCache returns a new TxnFingerprintIDCache. -func NewTxnFingerprintIDCache(st *cluster.Settings) *TxnFingerprintIDCache { +func NewTxnFingerprintIDCache( + ctx context.Context, st *cluster.Settings, acc *mon.BoundAccount, +) *TxnFingerprintIDCache { b := &TxnFingerprintIDCache{st: st} + b.mu.acc = acc b.mu.cache = cache.NewUnorderedCache(cache.Config{ Policy: cache.CacheFIFO, ShouldEvict: func(size int, _, _ interface{}) bool { @@ -53,18 +66,28 @@ func NewTxnFingerprintIDCache(st *cluster.Settings) *TxnFingerprintIDCache { capacity := TxnFingerprintIDCacheCapacity.Get(&st.SV) return int64(size) > capacity }, + OnEvictedEntry: func(entry *cache.Entry) { + // We must be holding the mutex already because this callback is + // executed during Cache.Add which we surround with the lock. + b.mu.AssertHeld() + b.mu.acc.Shrink(ctx, cacheEntrySize+txnFingerprintIDSize) + }, }) return b } // Add adds a TxnFingerprintID to the cache, truncating the cache to the cache's // capacity if necessary. -func (b *TxnFingerprintIDCache) Add(id appstatspb.TransactionFingerprintID) { +func (b *TxnFingerprintIDCache) Add( + ctx context.Context, id appstatspb.TransactionFingerprintID, +) error { b.mu.Lock() defer b.mu.Unlock() - // TODO(yuzefovich): we should perform memory accounting for this. Note that - // it can be quite tricky to get right - see #121844. + if err := b.mu.acc.Grow(ctx, cacheEntrySize+txnFingerprintIDSize); err != nil { + return err + } b.mu.cache.Add(id, nil /* value */) + return nil } // GetAllTxnFingerprintIDs returns a slice of all TxnFingerprintIDs in the cache. diff --git a/pkg/sql/txn_fingerprint_id_cache_test.go b/pkg/sql/txn_fingerprint_id_cache_test.go index 6756b6c89d61..775af7ff166d 100644 --- a/pkg/sql/txn_fingerprint_id_cache_test.go +++ b/pkg/sql/txn_fingerprint_id_cache_test.go @@ -45,7 +45,7 @@ func TestTxnFingerprintIDCacheDataDriven(t *testing.T) { d.ScanArgs(t, "capacity", &capacity) st := cluster.MakeTestingClusterSettings() - txnFingerprintIDCache = NewTxnFingerprintIDCache(st) + txnFingerprintIDCache = NewTxnFingerprintIDCache(ctx, st, nil /* acc */) TxnFingerprintIDCacheCapacity.Override(ctx, &st.SV, int64(capacity)) @@ -66,7 +66,9 @@ func TestTxnFingerprintIDCacheDataDriven(t *testing.T) { require.NoError(t, err) txnFingerprintID := appstatspb.TransactionFingerprintID(id) - txnFingerprintIDCache.Add(txnFingerprintID) + err = txnFingerprintIDCache.Add(ctx, txnFingerprintID) + require.NoError(t, err) + return fmt.Sprintf("size: %d", txnFingerprintIDCache.size()) case "show": From f70a1c950b5cdb51e4a9ef21529ace8f85f77306 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 8 Apr 2024 11:47:49 -0700 Subject: [PATCH 2/5] sql: fix accounting for entries in Txn Fingerprint ID cache This commit fixes a bug that could previously result in the memory accounting leak that was exposed by 88ebd70d7ca24d8cebe1acdcb711d4f0e840c619. Namely, the problem is that we previously unconditionally grew the memory account in `Add` even though if the ID is already present in the cache, it wouldn't be inserted again. As a result, we'd only shrink the account once but might have grown it any number of times for a particular ID. Now we check whether the ID is present in the cache and only grow the account if not. Epic: None Release note: None --- pkg/sql/testdata/txn_fingerprint_id_cache | 36 +++++++++++++++++++++++ pkg/sql/txn_fingerprint_id_cache.go | 4 +++ pkg/sql/txn_fingerprint_id_cache_test.go | 16 ++++++---- 3 files changed, 51 insertions(+), 5 deletions(-) diff --git a/pkg/sql/testdata/txn_fingerprint_id_cache b/pkg/sql/testdata/txn_fingerprint_id_cache index 533be5d3e1bb..76015684a92c 100644 --- a/pkg/sql/testdata/txn_fingerprint_id_cache +++ b/pkg/sql/testdata/txn_fingerprint_id_cache @@ -25,6 +25,30 @@ show ---- [8 7 6 5] +# Each cache entry takes up 56 bytes. +accounting +---- +224 bytes + +# Attempt to add ids that are already present in the cache. Neither the cache +# nor the accounting should change. + +enqueue id=05 +---- +size: 4 + +enqueue id=07 +---- +size: 4 + +show +---- +[8 7 6 5] + +accounting +---- +224 bytes + enqueue id=09 ---- size: 5 @@ -37,6 +61,10 @@ show ---- [10 9 8 7 6 5] +accounting +---- +336 bytes + # Decrease the TxnFingerprintIDCacheCapacity cluster setting to below current size. override capacity=3 ---- @@ -52,6 +80,10 @@ show ---- [11 10 9] +accounting +---- +168 bytes + # Check that retrieving IDs also properly truncates the cache when the capacity has # been changed. # Increase capacity back up to 5, insert some values, then decrease capacity to 2 and @@ -75,3 +107,7 @@ TxnFingerprintIDCacheCapacity: 2 show ---- [13 12] + +accounting +---- +112 bytes diff --git a/pkg/sql/txn_fingerprint_id_cache.go b/pkg/sql/txn_fingerprint_id_cache.go index b8fdf7dbd768..8560f969faac 100644 --- a/pkg/sql/txn_fingerprint_id_cache.go +++ b/pkg/sql/txn_fingerprint_id_cache.go @@ -83,6 +83,10 @@ func (b *TxnFingerprintIDCache) Add( ) error { b.mu.Lock() defer b.mu.Unlock() + if _, ok := b.mu.cache.StealthyGet(id); ok { + // The value is already in the cache - do nothing. + return nil + } if err := b.mu.acc.Grow(ctx, cacheEntrySize+txnFingerprintIDSize); err != nil { return err } diff --git a/pkg/sql/txn_fingerprint_id_cache_test.go b/pkg/sql/txn_fingerprint_id_cache_test.go index 775af7ff166d..7d5b1072c9ba 100644 --- a/pkg/sql/txn_fingerprint_id_cache_test.go +++ b/pkg/sql/txn_fingerprint_id_cache_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -34,18 +35,21 @@ import ( func TestTxnFingerprintIDCacheDataDriven(t *testing.T) { defer leaktest.AfterTest(t)() + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + memMonitor := execinfra.NewTestMemMonitor(ctx, st) + memAccount := memMonitor.MakeBoundAccount() var txnFingerprintIDCache *TxnFingerprintIDCache datadriven.Walk(t, datapathutils.TestDataPath(t, "txn_fingerprint_id_cache"), func(t *testing.T, path string) { datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { - ctx := context.Background() switch d.Cmd { case "init": var capacity int d.ScanArgs(t, "capacity", &capacity) - st := cluster.MakeTestingClusterSettings() - txnFingerprintIDCache = NewTxnFingerprintIDCache(ctx, st, nil /* acc */) + txnFingerprintIDCache = NewTxnFingerprintIDCache(ctx, st, &memAccount) TxnFingerprintIDCacheCapacity.Override(ctx, &st.SV, int64(capacity)) @@ -74,10 +78,12 @@ func TestTxnFingerprintIDCacheDataDriven(t *testing.T) { case "show": return printTxnFingerprintIDCache(txnFingerprintIDCache) + case "accounting": + return fmt.Sprintf("%d bytes", memAccount.Used()) + default: + return "" } - return "" - }) }) } From 95fc66107447c810569cc555710356d38ffe7d76 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Tue, 9 Apr 2024 10:02:22 -0400 Subject: [PATCH 3/5] backupccl: skipBackuprestoreJobDescription under stress race Informs #121927 Release note: none --- pkg/ccl/backupccl/backup_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index e79d7417d3cd..9438dde6176c 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -733,6 +733,8 @@ func TestBackupAndRestoreJobDescription(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderStressRace(t, "this test is heavyweight and is not expected to reveal any direct bugs under stress race") + const numAccounts = 1 _, sqlDB, tmpDir, cleanupFn := backupRestoreTestSetup(t, multiNode, numAccounts, InitManualReplication) defer cleanupFn() From eb7a7c63611ee5f48f6f592e8b8629e257a55c7f Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Sun, 17 Mar 2024 15:50:20 +0000 Subject: [PATCH 4/5] raft: remove unused read-only requests Epic: none Release note: none --- pkg/kv/kvserver/replica_raft.go | 11 +- pkg/raft/BUILD.bazel | 1 - pkg/raft/node.go | 21 -- pkg/raft/node_test.go | 45 ---- pkg/raft/raft.go | 185 +------------- pkg/raft/raft_test.go | 239 ------------------ pkg/raft/raftpb/raft.proto | 4 +- .../interaction_env_handler_add_nodes.go | 9 - pkg/raft/rawnode.go | 17 -- pkg/raft/rawnode_test.go | 54 +--- pkg/raft/read_only.go | 124 --------- .../forget_leader_read_only_lease_based.txt | 30 --- pkg/raft/util.go | 4 - pkg/raft/util_test.go | 4 - 14 files changed, 13 insertions(+), 735 deletions(-) delete mode 100644 pkg/raft/read_only.go delete mode 100644 pkg/raft/testdata/forget_leader_read_only_lease_based.txt diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index c693b1747631..8a408a0fd1ad 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1207,12 +1207,6 @@ type asyncReady struct { // It is not required to consume or store SoftState. *raft.SoftState - // ReadStates can be used for node to serve linearizable read requests locally - // when its applied index is greater than the index in ReadState. - // Note that the readState will be returned when raft receives msgReadIndex. - // The returned is only valid for the request that requested to read. - ReadStates []raft.ReadState - // Messages specifies outbound messages to other peers and to local storage // threads. These messages can be sent in any order. // @@ -1224,9 +1218,8 @@ type asyncReady struct { // makeAsyncReady constructs an asyncReady from the provided Ready. func makeAsyncReady(rd raft.Ready) asyncReady { return asyncReady{ - SoftState: rd.SoftState, - ReadStates: rd.ReadStates, - Messages: rd.Messages, + SoftState: rd.SoftState, + Messages: rd.Messages, } } diff --git a/pkg/raft/BUILD.bazel b/pkg/raft/BUILD.bazel index d4cdc81d1711..cd4b2e2ed41d 100644 --- a/pkg/raft/BUILD.bazel +++ b/pkg/raft/BUILD.bazel @@ -11,7 +11,6 @@ go_library( "node.go", "raft.go", "rawnode.go", - "read_only.go", "status.go", "storage.go", "types.go", diff --git a/pkg/raft/node.go b/pkg/raft/node.go index c6a4d90773cb..9bd6de72cbe5 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -68,12 +68,6 @@ type Ready struct { // Messages slice. pb.HardState - // ReadStates can be used for node to serve linearizable read requests locally - // when its applied index is greater than the index in ReadState. - // Note that the readState will be returned when raft receives msgReadIndex. - // The returned is only valid for the request that requested to read. - ReadStates []ReadState - // Entries specifies entries to be saved to stable storage BEFORE // Messages are sent. // @@ -213,19 +207,8 @@ type Node interface { // from the leader recently). However, 3 can not campaign unilaterally, a // quorum have to agree that the leader is dead, which avoids disrupting the // leader if individual nodes are wrong about it being dead. - // - // This does nothing with ReadOnlyLeaseBased, since it would allow a new - // leader to be elected without the old leader knowing. ForgetLeader(ctx context.Context) error - // ReadIndex request a read state. The read state will be set in the ready. - // Read state has a read index. Once the application advances further than the read - // index, any linearizable read requests issued before the read request can be - // processed safely. The read state will have the same rctx attached. - // Note that request can be lost without notice, therefore it is user's job - // to ensure read index retries. - ReadIndex(ctx context.Context, rctx []byte) error - // Status returns the current status of the raft state machine. Status() Status // ReportUnreachable reports the given node is not reachable for the last send. @@ -607,7 +590,3 @@ func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) func (n *node) ForgetLeader(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgForgetLeader}) } - -func (n *node) ReadIndex(ctx context.Context, rctx []byte) error { - return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}) -} diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index 4d20fa424f8c..d17bf49d7003 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -193,51 +193,6 @@ func TestDisableProposalForwarding(t *testing.T) { require.Empty(t, r3.msgs) } -// TestNodeReadIndexToOldLeader ensures that raftpb.MsgReadIndex to old leader -// gets forwarded to the new leader and 'send' method does not attach its term. -func TestNodeReadIndexToOldLeader(t *testing.T) { - r1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - r2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - r3 := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - - nt := newNetwork(r1, r2, r3) - - // elect r1 as leader - nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup}) - - var testEntries = []raftpb.Entry{{Data: []byte("testdata")}} - - // send readindex request to r2(follower) - r2.Step(raftpb.Message{From: 2, To: 2, Type: raftpb.MsgReadIndex, Entries: testEntries}) - - // verify r2(follower) forwards this message to r1(leader) with term not set - require.Len(t, r2.msgs, 1) - readIndxMsg1 := raftpb.Message{From: 2, To: 1, Type: raftpb.MsgReadIndex, Entries: testEntries} - require.Equal(t, readIndxMsg1, r2.msgs[0]) - - // send readindex request to r3(follower) - r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries}) - - // verify r3(follower) forwards this message to r1(leader) with term not set as well. - require.Len(t, r3.msgs, 1) - readIndxMsg2 := raftpb.Message{From: 3, To: 1, Type: raftpb.MsgReadIndex, Entries: testEntries} - require.Equal(t, readIndxMsg2, r3.msgs[0]) - - // now elect r3 as leader - nt.send(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgHup}) - - // let r1 steps the two messages previously we got from r2, r3 - r1.Step(readIndxMsg1) - r1.Step(readIndxMsg2) - - // verify r1(follower) forwards these messages again to r3(new leader) - require.Len(t, r1.msgs, 2) - readIndxMsg3 := raftpb.Message{From: 2, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries} - require.Equal(t, readIndxMsg3, r1.msgs[0]) - readIndxMsg3 = raftpb.Message{From: 3, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries} - require.Equal(t, readIndxMsg3, r1.msgs[1]) -} - // TestNodeProposeConfig ensures that node.ProposeConfChange sends the given configuration proposal // to the underlying raft. func TestNodeProposeConfig(t *testing.T) { diff --git a/pkg/raft/raft.go b/pkg/raft/raft.go index 5c33949f5213..ee69e17848fd 100644 --- a/pkg/raft/raft.go +++ b/pkg/raft/raft.go @@ -56,20 +56,6 @@ const ( numStates ) -type ReadOnlyOption int - -const ( - // ReadOnlySafe guarantees the linearizability of the read only request by - // communicating with the quorum. It is the default and suggested option. - ReadOnlySafe ReadOnlyOption = iota - // ReadOnlyLeaseBased ensures linearizability of the read only request by - // relying on the leader lease. It can be affected by clock drift. - // If the clock drift is unbounded, leader might keep the lease longer than it - // should (clock can move backward/pause without any bound). ReadIndex is not safe - // in that case. - ReadOnlyLeaseBased -) - // Possible values for CampaignType const ( // campaignPreElection represents the first phase of a normal election when @@ -231,19 +217,6 @@ type Config struct { // rejoins the cluster. PreVote bool - // ReadOnlyOption specifies how the read only request is processed. - // - // ReadOnlySafe guarantees the linearizability of the read only request by - // communicating with the quorum. It is the default and suggested option. - // - // ReadOnlyLeaseBased ensures linearizability of the read only request by - // relying on the leader lease. It can be affected by clock drift. - // If the clock drift is unbounded, leader might keep the lease longer than it - // should (clock can move backward/pause without any bound). ReadIndex is not safe - // in that case. - // CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased. - ReadOnlyOption ReadOnlyOption - // Logger is the logger used for raft log. For multinode which can host // multiple raft group, each raft group can have its own logger Logger Logger @@ -331,10 +304,6 @@ func (c *Config) validate() error { c.Logger = getLogger() } - if c.ReadOnlyOption == ReadOnlyLeaseBased && !c.CheckQuorum { - return errors.New("CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased") - } - return nil } @@ -344,8 +313,6 @@ type raft struct { Term uint64 Vote uint64 - readStates []ReadState - // the log raftLog *raftLog @@ -396,8 +363,6 @@ type raft struct { // term changes. uncommittedSize entryPayloadSize - readOnly *readOnly - // number of ticks since it reached last electionTimeout when it is leader // or candidate. // number of ticks since it reached last electionTimeout or received a @@ -424,12 +389,6 @@ type raft struct { step stepFunc logger Logger - - // pendingReadIndexMessages is used to store messages of type MsgReadIndex - // that can't be answered as new leader didn't committed any log in - // current term. Those will be handled as fast as first log is committed in - // current term. - pendingReadIndexMessages []pb.Message } func newRaft(c *Config) *raft { @@ -455,7 +414,6 @@ func newRaft(c *Config) *raft { logger: c.Logger, checkQuorum: c.CheckQuorum, preVote: c.PreVote, - readOnly: newReadOnly(c.ReadOnlyOption), disableProposalForwarding: c.DisableProposalForwarding, disableConfChangeValidation: c.DisableConfChangeValidation, stepDownOnRemoval: c.StepDownOnRemoval, @@ -528,11 +486,9 @@ func (r *raft) send(m pb.Message) { if m.Term != 0 { r.logger.Panicf("term should not be set when sending %s (was %d)", m.Type, m.Term) } - // do not attach term to MsgProp, MsgReadIndex - // proposals are a way to forward to the leader and - // should be treated as local message. - // MsgReadIndex is also forwarded to leader. - if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex { + // Do not attach term to MsgProp. Proposals are a way to forward to the + // leader, and should be treated as local message. + if m.Type != pb.MsgProp { m.Term = r.Term } } @@ -682,7 +638,7 @@ func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool { } // sendHeartbeat sends a heartbeat RPC to the given peer. -func (r *raft) sendHeartbeat(to uint64, ctx []byte) { +func (r *raft) sendHeartbeat(to uint64) { pr := r.trk.Progress[to] // Attach the commit as min(to.matched, r.committed). // When the leader sends out heartbeat message, @@ -692,10 +648,9 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // an unmatched index. commit := min(pr.Match, r.raftLog.committed) r.send(pb.Message{ - To: to, - Type: pb.MsgHeartbeat, - Commit: commit, - Context: ctx, + To: to, + Type: pb.MsgHeartbeat, + Commit: commit, }) pr.SentCommit(commit) } @@ -713,20 +668,11 @@ func (r *raft) bcastAppend() { // bcastHeartbeat sends RPC, without entries to all the peers. func (r *raft) bcastHeartbeat() { - lastCtx := r.readOnly.lastPendingRequestCtx() - if len(lastCtx) == 0 { - r.bcastHeartbeatWithCtx(nil) - } else { - r.bcastHeartbeatWithCtx([]byte(lastCtx)) - } -} - -func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { r.trk.Visit(func(id uint64, _ *tracker.Progress) { if id == r.id { return } - r.sendHeartbeat(id, ctx) + r.sendHeartbeat(id) }) } @@ -800,7 +746,6 @@ func (r *raft) reset(term uint64) { r.pendingConfIndex = 0 r.uncommittedSize = 0 - r.readOnly = newReadOnly(r.readOnly.option) } func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { @@ -1325,25 +1270,7 @@ func stepLeader(r *raft, m pb.Message) error { } r.bcastAppend() return nil - case pb.MsgReadIndex: - // only one voting member (the leader) in the cluster - if r.trk.IsSingleton() { - if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None { - r.send(resp) - } - return nil - } - - // Postpone read only request when this leader has not committed - // any log entry at its term. - if !r.committedEntryInCurrentTerm() { - r.pendingReadIndexMessages = append(r.pendingReadIndexMessages, m) - return nil - } - - sendMsgReadIndexResponse(r, m) - return nil case pb.MsgForgetLeader: return nil // noop on leader } @@ -1522,9 +1449,6 @@ func stepLeader(r *raft, m pb.Message) error { } if r.maybeCommit() { - // committed index has progressed for the term, so it is safe - // to respond to pending read index requests - releasePendingReadIndexMessages(r) r.bcastAppend() } else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) { // This node may be missing the latest commit index, so send it. @@ -1571,20 +1495,6 @@ func stepLeader(r *raft, m pb.Message) error { r.sendAppend(m.From) } - if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { - return nil - } - - if r.trk.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon { - return nil - } - - rss := r.readOnly.advance(m) - for _, rs := range rss { - if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None { - r.send(resp) - } - } case pb.MsgSnapStatus: if pr.State != tracker.StateSnapshot { return nil @@ -1724,10 +1634,6 @@ func stepFollower(r *raft, m pb.Message) error { m.To = r.lead r.send(m) case pb.MsgForgetLeader: - if r.readOnly.option == ReadOnlyLeaseBased { - r.logger.Error("ignoring MsgForgetLeader due to ReadOnlyLeaseBased") - return nil - } if r.lead != None { r.logger.Infof("%x forgetting leader %x at term %d", r.id, r.lead, r.Term) r.lead = None @@ -1738,19 +1644,6 @@ func stepFollower(r *raft, m pb.Message) error { // know we are not recovering from a partition so there is no need for the // extra round trip. r.hup(campaignTransfer) - case pb.MsgReadIndex: - if r.lead == None { - r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term) - return nil - } - m.To = r.lead - r.send(m) - case pb.MsgReadIndexResp: - if len(m.Entries) != 1 { - r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries)) - return nil - } - r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data}) } return nil } @@ -2038,31 +1931,6 @@ func (r *raft) abortLeaderTransfer() { r.leadTransferee = None } -// committedEntryInCurrentTerm return true if the peer has committed an entry in its term. -func (r *raft) committedEntryInCurrentTerm() bool { - // NB: r.Term is never 0 on a leader, so if zeroTermOnOutOfBounds returns 0, - // we won't see it as a match with r.Term. - return r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(r.raftLog.committed)) == r.Term -} - -// responseToReadIndexReq constructs a response for `req`. If `req` comes from the peer -// itself, a blank value will be returned. -func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message { - if req.From == None || req.From == r.id { - r.readStates = append(r.readStates, ReadState{ - Index: readIndex, - RequestCtx: req.Entries[0].Data, - }) - return pb.Message{} - } - return pb.Message{ - Type: pb.MsgReadIndexResp, - To: req.From, - Index: readIndex, - Entries: req.Entries, - } -} - // increaseUncommittedSize computes the size of the proposed entries and // determines whether they would push leader over its maxUncommittedSize limit. // If the new entries would exceed the limit, the method returns false. If not, @@ -2099,40 +1967,3 @@ func (r *raft) reduceUncommittedSize(s entryPayloadSize) { r.uncommittedSize -= s } } - -func releasePendingReadIndexMessages(r *raft) { - if len(r.pendingReadIndexMessages) == 0 { - // Fast path for the common case to avoid a call to storage.LastIndex() - // via committedEntryInCurrentTerm. - return - } - if !r.committedEntryInCurrentTerm() { - r.logger.Error("pending MsgReadIndex should be released only after first commit in current term") - return - } - - msgs := r.pendingReadIndexMessages - r.pendingReadIndexMessages = nil - - for _, m := range msgs { - sendMsgReadIndexResponse(r, m) - } -} - -func sendMsgReadIndexResponse(r *raft, m pb.Message) { - // thinking: use an internally defined context instead of the user given context. - // We can express this in terms of the term and index instead of a user-supplied value. - // This would allow multiple reads to piggyback on the same message. - switch r.readOnly.option { - // If more than the local vote is needed, go through a full broadcast. - case ReadOnlySafe: - r.readOnly.addRequest(r.raftLog.committed, m) - // The local node automatically acks the request. - r.readOnly.recvAck(r.id, m.Entries[0].Data) - r.bcastHeartbeatWithCtx(m.Entries[0].Data) - case ReadOnlyLeaseBased: - if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None { - r.send(resp) - } - } -} diff --git a/pkg/raft/raft_test.go b/pkg/raft/raft_test.go index e02fe8719441..1c96b9191c52 100644 --- a/pkg/raft/raft_test.go +++ b/pkg/raft/raft_test.go @@ -1191,39 +1191,6 @@ func TestHandleHeartbeatResp(t *testing.T) { require.Empty(t, msgs) } -// TestRaftFreesReadOnlyMem ensures raft will free read request from -// readOnly readIndexQueue and pendingReadIndex map. -// related issue: https://github.com/etcd-io/etcd/issues/7571 -func TestRaftFreesReadOnlyMem(t *testing.T) { - sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2))) - sm.becomeCandidate() - sm.becomeLeader() - sm.raftLog.commitTo(sm.raftLog.lastIndex()) - - ctx := []byte("ctx") - - // leader starts linearizable read request. - // more info: raft dissertation 6.4, step 2. - sm.Step(pb.Message{From: 2, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: ctx}}}) - msgs := sm.readMessages() - require.Len(t, msgs, 1) - require.Equal(t, pb.MsgHeartbeat, msgs[0].Type) - require.Equal(t, ctx, msgs[0].Context) - require.Len(t, sm.readOnly.readIndexQueue, 1) - require.Len(t, sm.readOnly.pendingReadIndex, 1) - _, ok := sm.readOnly.pendingReadIndex[string(ctx)] - require.True(t, ok) - - // heartbeat responses from majority of followers (1 in this case) - // acknowledge the authority of the leader. - // more info: raft dissertation 6.4, step 3. - sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp, Context: ctx}) - require.Empty(t, sm.readOnly.readIndexQueue) - require.Empty(t, sm.readOnly.pendingReadIndex) - _, ok = sm.readOnly.pendingReadIndex[string(ctx)] - require.False(t, ok) -} - // TestMsgAppRespWaitReset verifies the resume behavior of a leader // MsgAppResp. func TestMsgAppRespWaitReset(t *testing.T) { @@ -1906,212 +1873,6 @@ func TestDisruptiveFollowerPreVote(t *testing.T) { require.Equal(t, StateLeader, n1.state) } -func TestReadOnlyOptionSafe(t *testing.T) { - a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - - nt := newNetwork(a, b, c) - setRandomizedElectionTimeout(b, b.electionTimeout+1) - - for i := 0; i < b.electionTimeout; i++ { - b.tick() - } - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) - - require.Equal(t, StateLeader, a.state) - - tests := []struct { - sm *raft - proposals int - wri uint64 - wctx []byte - }{ - {a, 10, 11, []byte("ctx1")}, - {b, 10, 21, []byte("ctx2")}, - {c, 10, 31, []byte("ctx3")}, - {a, 10, 41, []byte("ctx4")}, - {b, 10, 51, []byte("ctx5")}, - {c, 10, 61, []byte("ctx6")}, - } - - for i, tt := range tests { - for j := 0; j < tt.proposals; j++ { - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) - } - - nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}}) - - r := tt.sm - assert.NotEmpty(t, r.readStates, "#%d", i) - rs := r.readStates[0] - assert.Equal(t, tt.wri, rs.Index, "#%d", i) - assert.Equal(t, tt.wctx, rs.RequestCtx, "#%d", i) - r.readStates = nil - } -} - -func TestReadOnlyWithLearner(t *testing.T) { - s := newTestMemoryStorage(withPeers(1), withLearners(2)) - a := newTestLearnerRaft(1, 10, 1, s) - b := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2))) - - nt := newNetwork(a, b) - setRandomizedElectionTimeout(b, b.electionTimeout+1) - - for i := 0; i < b.electionTimeout; i++ { - b.tick() - } - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) - - require.Equal(t, StateLeader, a.state) - - tests := []struct { - sm *raft - proposals int - wri uint64 - wctx []byte - }{ - {a, 10, 11, []byte("ctx1")}, - {b, 10, 21, []byte("ctx2")}, - {a, 10, 31, []byte("ctx3")}, - {b, 10, 41, []byte("ctx4")}, - } - - for i, tt := range tests { - for j := 0; j < tt.proposals; j++ { - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) - nextEnts(a, s) // append the entries on the leader - } - - nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}}) - - r := tt.sm - require.NotEmpty(t, r.readStates, "#%d", i) - rs := r.readStates[0] - assert.Equal(t, tt.wri, rs.Index, "#%d", i) - assert.Equal(t, tt.wctx, rs.RequestCtx, "#%d", i) - r.readStates = nil - } -} - -func TestReadOnlyOptionLease(t *testing.T) { - a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - a.readOnly.option = ReadOnlyLeaseBased - b.readOnly.option = ReadOnlyLeaseBased - c.readOnly.option = ReadOnlyLeaseBased - a.checkQuorum = true - b.checkQuorum = true - c.checkQuorum = true - - nt := newNetwork(a, b, c) - setRandomizedElectionTimeout(b, b.electionTimeout+1) - - for i := 0; i < b.electionTimeout; i++ { - b.tick() - } - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) - - require.Equal(t, StateLeader, a.state) - - tests := []struct { - sm *raft - proposals int - wri uint64 - wctx []byte - }{ - {a, 10, 11, []byte("ctx1")}, - {b, 10, 21, []byte("ctx2")}, - {c, 10, 31, []byte("ctx3")}, - {a, 10, 41, []byte("ctx4")}, - {b, 10, 51, []byte("ctx5")}, - {c, 10, 61, []byte("ctx6")}, - } - - for i, tt := range tests { - for j := 0; j < tt.proposals; j++ { - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) - } - - nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}}) - - r := tt.sm - rs := r.readStates[0] - assert.Equal(t, tt.wri, rs.Index, "#%d", i) - assert.Equal(t, tt.wctx, rs.RequestCtx, "#%d", i) - r.readStates = nil - } -} - -// TestReadOnlyForNewLeader ensures that a leader only accepts MsgReadIndex message -// when it commits at least one log entry at it term. -func TestReadOnlyForNewLeader(t *testing.T) { - nodeConfigs := []struct { - id uint64 - committed uint64 - applied uint64 - compactIndex uint64 - }{ - {1, 1, 1, 0}, - {2, 2, 2, 2}, - {3, 2, 2, 2}, - } - peers := make([]stateMachine, 0) - for _, c := range nodeConfigs { - storage := newTestMemoryStorage(withPeers(1, 2, 3)) - require.NoError(t, storage.Append(index(1).terms(1, 1))) - storage.SetHardState(pb.HardState{Term: 1, Commit: c.committed}) - if c.compactIndex != 0 { - storage.Compact(c.compactIndex) - } - cfg := newTestConfig(c.id, 10, 1, storage) - cfg.Applied = c.applied - raft := newRaft(cfg) - peers = append(peers, raft) - } - nt := newNetwork(peers...) - - // Drop MsgApp to forbid peer a to commit any log entry at its term after it becomes leader. - nt.ignore(pb.MsgApp) - // Force peer a to become leader. - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) - - sm := nt.peers[1].(*raft) - require.Equal(t, StateLeader, sm.state) - - // Ensure peer a drops read only request. - var windex uint64 = 4 - wctx := []byte("ctx") - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}}) - require.Empty(t, sm.readStates) - - nt.recover() - - // Force peer a to commit a log entry at its term - for i := 0; i < sm.heartbeatTimeout; i++ { - sm.tick() - } - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) - require.Equal(t, uint64(4), sm.raftLog.committed) - lastLogTerm := sm.raftLog.zeroTermOnOutOfBounds(sm.raftLog.term(sm.raftLog.committed)) - require.Equal(t, sm.Term, lastLogTerm) - - // Ensure peer a processed postponed read only request after it committed an entry at its term. - require.Len(t, sm.readStates, 1) - rs := sm.readStates[0] - require.Equal(t, windex, rs.Index) - require.Equal(t, wctx, rs.RequestCtx) - - // Ensure peer a accepts read only request after it committed an entry at its term. - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}}) - require.Len(t, sm.readStates, 2) - rs = sm.readStates[1] - require.Equal(t, windex, rs.Index) - require.Equal(t, wctx, rs.RequestCtx) -} - func TestLeaderAppResp(t *testing.T) { // initial progress: match = 0; next = 3 tests := []struct { diff --git a/pkg/raft/raftpb/raft.proto b/pkg/raft/raftpb/raft.proto index d37565e0cd72..d31b7bb39430 100644 --- a/pkg/raft/raftpb/raft.proto +++ b/pkg/raft/raftpb/raft.proto @@ -55,8 +55,6 @@ enum MessageType { MsgCheckQuorum = 12; MsgTransferLeader = 13; MsgTimeoutNow = 14; - MsgReadIndex = 15; - MsgReadIndexResp = 16; MsgPreVote = 17; MsgPreVoteResp = 18; MsgStorageAppend = 19; @@ -67,6 +65,8 @@ enum MessageType { // NOTE: when adding new message types, remember to update the isLocalMsg and // isResponseMsg arrays in raft/util.go and update the corresponding tests in // raft/util_test.go. + + reserved 15, 16; // used to be MsgReadIndex(Resp) } message Message { diff --git a/pkg/raft/rafttest/interaction_env_handler_add_nodes.go b/pkg/raft/rafttest/interaction_env_handler_add_nodes.go index a8f2602f7dbb..395f59c2ff77 100644 --- a/pkg/raft/rafttest/interaction_env_handler_add_nodes.go +++ b/pkg/raft/rafttest/interaction_env_handler_add_nodes.go @@ -60,15 +60,6 @@ func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) e arg.Scan(t, i, &cfg.MaxCommittedSizePerReady) case "disable-conf-change-validation": arg.Scan(t, i, &cfg.DisableConfChangeValidation) - case "read-only": - switch arg.Vals[i] { - case "safe": - cfg.ReadOnlyOption = raft.ReadOnlySafe - case "lease-based": - cfg.ReadOnlyOption = raft.ReadOnlyLeaseBased - default: - return fmt.Errorf("invalid read-only option %q", arg.Vals[i]) - } case "step-down-on-removal": arg.Scan(t, i, &cfg.StepDownOnRemoval) } diff --git a/pkg/raft/rawnode.go b/pkg/raft/rawnode.go index 4a4279abac87..9a1bdd7d693e 100644 --- a/pkg/raft/rawnode.go +++ b/pkg/raft/rawnode.go @@ -160,9 +160,6 @@ func (rn *RawNode) readyWithoutAccept() Ready { if r.raftLog.hasNextUnstableSnapshot() { rd.Snapshot = *r.raftLog.nextUnstableSnapshot() } - if len(r.readStates) != 0 { - rd.ReadStates = r.readStates - } rd.MustSync = MustSync(r.hardState(), rn.prevHardSt, len(rd.Entries)) if rn.asyncStorageWrites { @@ -412,9 +409,6 @@ func (rn *RawNode) acceptReady(rd Ready) { if !IsEmptyHardState(rd.HardState) { rn.prevHardSt = rd.HardState } - if len(rd.ReadStates) != 0 { - rn.raft.readStates = nil - } if !rn.asyncStorageWrites { if len(rn.stepsOnAdvance) != 0 { rn.raft.logger.Panicf("two accepted Ready structs without call to Advance") @@ -469,9 +463,6 @@ func (rn *RawNode) HasReady() bool { if r.raftLog.hasNextUnstableEnts() || r.raftLog.hasNextCommittedEnts(rn.applyUnstableEntries()) { return true } - if len(r.readStates) != 0 { - return true - } return false } @@ -553,11 +544,3 @@ func (rn *RawNode) TransferLeader(transferee uint64) { func (rn *RawNode) ForgetLeader() error { return rn.raft.Step(pb.Message{Type: pb.MsgForgetLeader}) } - -// ReadIndex requests a read state. The read state will be set in ready. -// Read State has a read index. Once the application advances further than the read -// index, any linearizable read requests issued before the read request can be -// processed safely. The read state will have the same rctx attached. -func (rn *RawNode) ReadIndex(rctx []byte) { - _ = rn.raft.Step(pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}) -} diff --git a/pkg/raft/rawnode_test.go b/pkg/raft/rawnode_test.go index 097d7770eaa8..43e1cba1fa22 100644 --- a/pkg/raft/rawnode_test.go +++ b/pkg/raft/rawnode_test.go @@ -62,12 +62,7 @@ func (a *rawNodeAdapter) Ready() <-chan Ready { return nil } // Node takes more contexts. Easy enough to fix. -func (a *rawNodeAdapter) Campaign(context.Context) error { return a.RawNode.Campaign() } -func (a *rawNodeAdapter) ReadIndex(_ context.Context, rctx []byte) error { - a.RawNode.ReadIndex(rctx) - // RawNode swallowed the error in ReadIndex, it probably should not do that. - return nil -} +func (a *rawNodeAdapter) Campaign(context.Context) error { return a.RawNode.Campaign() } func (a *rawNodeAdapter) Step(_ context.Context, m pb.Message) error { return a.RawNode.Step(m) } func (a *rawNodeAdapter) Propose(_ context.Context, data []byte) error { return a.RawNode.Propose(data) @@ -507,53 +502,6 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) { assert.Equal(t, ccdata2, entries[2].Data) } -// TestRawNodeReadIndex ensures that Rawnode.ReadIndex sends the MsgReadIndex message -// to the underlying raft. It also ensures that ReadState can be read out. -func TestRawNodeReadIndex(t *testing.T) { - var msgs []pb.Message - appendStep := func(r *raft, m pb.Message) error { - msgs = append(msgs, m) - return nil - } - wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}} - - s := newTestMemoryStorage(withPeers(1)) - c := newTestConfig(1, 10, 1, s) - rawNode, err := NewRawNode(c) - require.NoError(t, err) - - rawNode.raft.readStates = wrs - // ensure the ReadStates can be read out - assert.True(t, rawNode.HasReady()) - rd := rawNode.Ready() - assert.Equal(t, wrs, rd.ReadStates) - s.Append(rd.Entries) - rawNode.Advance(rd) - // ensure raft.readStates is reset after advance - assert.Nil(t, rawNode.raft.readStates) - - wrequestCtx := []byte("somedata2") - rawNode.Campaign() - for { - rd = rawNode.Ready() - s.Append(rd.Entries) - - if rd.SoftState.Lead == rawNode.raft.id { - rawNode.Advance(rd) - - // Once we are the leader, issue a ReadIndex request - rawNode.raft.step = appendStep - rawNode.ReadIndex(wrequestCtx) - break - } - rawNode.Advance(rd) - } - // ensure that MsgReadIndex message is sent to the underlying raft - require.Len(t, msgs, 1) - assert.Equal(t, pb.MsgReadIndex, msgs[0].Type) - assert.Equal(t, wrequestCtx, msgs[0].Entries[0].Data) -} - // TestBlockProposal from node_test.go has no equivalent in rawNode because there is // no leader check in RawNode. diff --git a/pkg/raft/read_only.go b/pkg/raft/read_only.go deleted file mode 100644 index 661138f64bc5..000000000000 --- a/pkg/raft/read_only.go +++ /dev/null @@ -1,124 +0,0 @@ -// This code has been modified from its original form by Cockroach Labs, Inc. -// All modifications are Copyright 2024 Cockroach Labs, Inc. -// -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package raft - -import pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb" - -// ReadState provides state for read only query. -// It's caller's responsibility to call ReadIndex first before getting -// this state from ready, it's also caller's duty to differentiate if this -// state is what it requests through RequestCtx, eg. given a unique id as -// RequestCtx -type ReadState struct { - Index uint64 - RequestCtx []byte -} - -type readIndexStatus struct { - req pb.Message - index uint64 - // NB: this never records 'false', but it's more convenient to use this - // instead of a map[uint64]struct{} due to the API of quorum.VoteResult. If - // this becomes performance sensitive enough (doubtful), quorum.VoteResult - // can change to an API that is closer to that of CommittedIndex. - acks map[uint64]bool -} - -type readOnly struct { - option ReadOnlyOption - pendingReadIndex map[string]*readIndexStatus - readIndexQueue []string -} - -func newReadOnly(option ReadOnlyOption) *readOnly { - return &readOnly{ - option: option, - pendingReadIndex: make(map[string]*readIndexStatus), - } -} - -// addRequest adds a read only request into readonly struct. -// `index` is the commit index of the raft state machine when it received -// the read only request. -// `m` is the original read only request message from the local or remote node. -func (ro *readOnly) addRequest(index uint64, m pb.Message) { - s := string(m.Entries[0].Data) - if _, ok := ro.pendingReadIndex[s]; ok { - return - } - ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]bool)} - ro.readIndexQueue = append(ro.readIndexQueue, s) -} - -// recvAck notifies the readonly struct that the raft state machine received -// an acknowledgment of the heartbeat that attached with the read only request -// context. -func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]bool { - rs, ok := ro.pendingReadIndex[string(context)] - if !ok { - return nil - } - - rs.acks[id] = true - return rs.acks -} - -// advance advances the read only request queue kept by the readonly struct. -// It dequeues the requests until it finds the read only request that has -// the same context as the given `m`. -func (ro *readOnly) advance(m pb.Message) []*readIndexStatus { - var ( - i int - found bool - ) - - ctx := string(m.Context) - var rss []*readIndexStatus - - for _, okctx := range ro.readIndexQueue { - i++ - rs, ok := ro.pendingReadIndex[okctx] - if !ok { - panic("cannot find corresponding read state from pending map") - } - rss = append(rss, rs) - if okctx == ctx { - found = true - break - } - } - - if found { - ro.readIndexQueue = ro.readIndexQueue[i:] - for _, rs := range rss { - delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data)) - } - return rss - } - - return nil -} - -// lastPendingRequestCtx returns the context of the last pending read only -// request in readonly struct. -func (ro *readOnly) lastPendingRequestCtx() string { - if len(ro.readIndexQueue) == 0 { - return "" - } - return ro.readIndexQueue[len(ro.readIndexQueue)-1] -} diff --git a/pkg/raft/testdata/forget_leader_read_only_lease_based.txt b/pkg/raft/testdata/forget_leader_read_only_lease_based.txt deleted file mode 100644 index 4d4bd1883605..000000000000 --- a/pkg/raft/testdata/forget_leader_read_only_lease_based.txt +++ /dev/null @@ -1,30 +0,0 @@ -log-level none ----- -ok - -add-nodes 3 voters=(1,2,3) index=10 checkquorum=true read-only=lease-based ----- -ok - -campaign 1 ----- -ok - -stabilize ----- -ok - -log-level debug ----- -ok - -# ForgetLeader fails with lease-based reads, as it's not safe. -forget-leader 2 ----- -ERROR ignoring MsgForgetLeader due to ReadOnlyLeaseBased - -raft-state ----- -1: StateLeader (Voter) Term:1 Lead:1 -2: StateFollower (Voter) Term:1 Lead:1 -3: StateFollower (Voter) Term:1 Lead:1 diff --git a/pkg/raft/util.go b/pkg/raft/util.go index d93778606f48..a1894eb24c13 100644 --- a/pkg/raft/util.go +++ b/pkg/raft/util.go @@ -46,7 +46,6 @@ var isResponseMsg = [...]bool{ pb.MsgVoteResp: true, pb.MsgHeartbeatResp: true, pb.MsgUnreachable: true, - pb.MsgReadIndexResp: true, pb.MsgPreVoteResp: true, pb.MsgStorageAppendResp: true, pb.MsgStorageApplyResp: true, @@ -117,9 +116,6 @@ func DescribeReady(rd Ready, f EntryFormatter) string { fmt.Fprintf(&buf, "HardState %s", DescribeHardState(rd.HardState)) buf.WriteByte('\n') } - if len(rd.ReadStates) > 0 { - fmt.Fprintf(&buf, "ReadStates %v\n", rd.ReadStates) - } if len(rd.Entries) > 0 { buf.WriteString("Entries:\n") fmt.Fprint(&buf, DescribeEntries(rd.Entries, f)) diff --git a/pkg/raft/util_test.go b/pkg/raft/util_test.go index 18a5b6054717..624a27928d06 100644 --- a/pkg/raft/util_test.go +++ b/pkg/raft/util_test.go @@ -90,8 +90,6 @@ func TestIsLocalMsg(t *testing.T) { {pb.MsgHeartbeat, false}, {pb.MsgHeartbeatResp, false}, {pb.MsgTimeoutNow, false}, - {pb.MsgReadIndex, false}, - {pb.MsgReadIndexResp, false}, {pb.MsgPreVote, false}, {pb.MsgPreVoteResp, false}, {pb.MsgStorageAppend, true}, @@ -127,8 +125,6 @@ func TestIsResponseMsg(t *testing.T) { {pb.MsgHeartbeat, false}, {pb.MsgHeartbeatResp, true}, {pb.MsgTimeoutNow, false}, - {pb.MsgReadIndex, false}, - {pb.MsgReadIndexResp, true}, {pb.MsgPreVote, false}, {pb.MsgPreVoteResp, true}, {pb.MsgStorageAppend, false}, From b4091ca2f976041f6cc80703f25af85c2fcdf8d7 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Sun, 17 Mar 2024 21:03:37 +0000 Subject: [PATCH 5/5] raft: don't echo Context on MsgHeartbeat Epic: none Release note: none --- pkg/raft/raft.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/raft/raft.go b/pkg/raft/raft.go index ee69e17848fd..61168a9ce8a5 100644 --- a/pkg/raft/raft.go +++ b/pkg/raft/raft.go @@ -1704,7 +1704,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { func (r *raft) handleHeartbeat(m pb.Message) { r.raftLog.commitTo(m.Commit) - r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) + r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp}) } func (r *raft) handleSnapshot(m pb.Message) {