From 6ac9d94d672fcfb8bd5d4ee0119a7d14caa6d5cb Mon Sep 17 00:00:00 2001 From: caojiamingalan Date: Fri, 14 Jul 2023 19:22:38 -0500 Subject: [PATCH] etcdserver: backport check scheduledCompactKeyName and finishedCompactKeyName before writing hash to release-3.5. Fix #15919. Check ScheduledCompactKeyName and FinishedCompactKeyName before writing hash to hashstore. If they do not match, then it means this compaction has once been interrupted and its hash value is invalid. In such cases, we won't write the hash values to the hashstore, and avoids the incorrect corruption alarm. Signed-off-by: caojiamingalan --- server/mvcc/kvstore.go | 36 +++++++++++++++++++--- server/mvcc/kvstore_test.go | 4 +++ tests/e2e/cluster_test.go | 4 +++ tests/e2e/corrupt_test.go | 59 +++++++++++++++++++++++++++++++++++++ 4 files changed, 99 insertions(+), 4 deletions(-) diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index fc48380b5fe..9b37e710450 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -227,7 +227,27 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) { return nil, compactMainRev, nil } -func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) { +// checkPrevCompactionCompleted checks whether the previous scheduled compaction is completed. +func (s *store) checkPrevCompactionCompleted() bool { + tx := s.b.ReadTx() + tx.Lock() + defer tx.Unlock() + _, scheduledCompactBytes := tx.UnsafeRange(buckets.Meta, scheduledCompactKeyName, nil, 0) + scheduledCompact := int64(0) + if len(scheduledCompactBytes) != 0 { + scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main + } + + _, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0) + finishedCompact := int64(0) + if len(finishedCompactBytes) != 0 { + finishedCompact = bytesToRev(finishedCompactBytes[0]).main + + } + return scheduledCompact == finishedCompact +} + +func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error) { ch := make(chan struct{}) var j = func(ctx context.Context) { if ctx.Err() != nil { @@ -240,7 +260,13 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch s.compactBarrier(context.TODO(), ch) return } - s.hashes.Store(hash) + // Only store the hash value if the previous hash is completed, i.e. this compaction + // hashes every revision from last compaction. For more details, see #15919. + if prevCompactionCompleted { + s.hashes.Store(hash) + } else { + s.lg.Info("previous compaction was interrupted, skip storing compaction hash value") + } close(ch) } @@ -250,17 +276,19 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch } func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) { + prevCompactionCompleted := s.checkPrevCompactionCompleted() ch, prevCompactRev, err := s.updateCompactRev(rev) if err != nil { return ch, err } - return s.compact(traceutil.TODO(), rev, prevCompactRev) + return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted) } func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { s.mu.Lock() + prevCompactionCompleted := s.checkPrevCompactionCompleted() ch, prevCompactRev, err := s.updateCompactRev(rev) trace.Step("check and update compact revision") if err != nil { @@ -269,7 +297,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err } s.mu.Unlock() - return s.compact(trace, rev, prevCompactRev) + return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted) } func (s *store) Commit() { diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index 548e7ac2b90..af5293dc05f 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -333,6 +333,8 @@ func TestStoreCompact(t *testing.T) { fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}} key1 := newTestKeyBytes(revision{1, 0}, false) key2 := newTestKeyBytes(revision{2, 0}, false) + b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}} + b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}} b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}} s.Compact(traceutil.TODO(), 3) @@ -344,6 +346,8 @@ func TestStoreCompact(t *testing.T) { end := make([]byte, 8) binary.BigEndian.PutUint64(end, uint64(4)) wact := []testutil.Action{ + {Name: "range", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, []uint8(nil), int64(0)}}, + {Name: "range", Params: []interface{}{buckets.Meta, finishedCompactKeyName, []uint8(nil), int64(0)}}, {Name: "put", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}}, {Name: "range", Params: []interface{}{buckets.Key, make([]byte, 17), end, int64(10000)}}, {Name: "delete", Params: []interface{}{buckets.Key, key2}}, diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index e63e9a5d88d..bc39a5ac48a 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -175,6 +175,7 @@ type etcdProcessClusterConfig struct { CompactHashCheckEnabled bool CompactHashCheckTime time.Duration WatchProcessNotifyInterval time.Duration + CompactionBatchLimit int } // newEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -339,6 +340,9 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* if cfg.WatchProcessNotifyInterval != 0 { args = append(args, "--experimental-watch-progress-notify-interval", cfg.WatchProcessNotifyInterval.String()) } + if cfg.CompactionBatchLimit != 0 { + args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit)) + } etcdCfgs[i] = &etcdServerProcessConfig{ lg: lg, diff --git a/tests/e2e/corrupt_test.go b/tests/e2e/corrupt_test.go index 2914b6a0f19..5eb9cafb66e 100644 --- a/tests/e2e/corrupt_test.go +++ b/tests/e2e/corrupt_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/datadir" @@ -180,3 +181,61 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) { assert.NoError(t, err, "error on alarm list") assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: 0}}, alarmResponse.Alarms) } + +func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) { + checkTime := time.Second + BeforeTest(t) + + slowCompactionNodeIndex := 1 + + // Start a new cluster, with compact hash check enabled. + t.Log("creating a new cluster with 3 nodes...") + + epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{ + clusterSize: 3, + keepDataDir: true, + CompactHashCheckEnabled: true, + CompactHashCheckTime: checkTime, + logLevel: "info", + CompactionBatchLimit: 1, + }) + require.NoError(t, err) + t.Cleanup(func() { + if errC := epc.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }) + + // Put 200 identical keys to the cluster, so that the compaction will drop some stale values. + // We need a relatively big number here to make the compaction takes a non-trivial time, and we can interrupt it. + t.Log("putting 200 values to the identical key...") + cc := NewEtcdctl(epc.EndpointsV3(), clientNonTLS, false, false) + + for i := 0; i < 200; i++ { + err = cc.Put("key", fmt.Sprint(i)) + require.NoError(t, err, "error on put") + } + + t.Log("compaction started...") + _, err = cc.Compact(200) + + t.Logf("restart proc %d to interrupt its compaction...", slowCompactionNodeIndex) + err = epc.procs[slowCompactionNodeIndex].Restart() + require.NoError(t, err) + + // Wait until the node finished compaction. + _, err = epc.procs[slowCompactionNodeIndex].Logs().Expect("finished scheduled compaction") + require.NoError(t, err, "can't get log indicating finished scheduled compaction") + + // Wait for compaction hash check + time.Sleep(checkTime * 5) + + alarmResponse, err := cc.AlarmList() + require.NoError(t, err, "error on alarm list") + for _, alarm := range alarmResponse.Alarms { + if alarm.Alarm == etcdserverpb.AlarmType_CORRUPT { + t.Fatal("there should be no corruption after resuming the compaction, but corruption detected") + } + } + t.Log("no corruption detected.") +}