diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index fc48380b5fe..9b37e710450 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -227,7 +227,27 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) { return nil, compactMainRev, nil } -func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) { +// checkPrevCompactionCompleted checks whether the previous scheduled compaction is completed. +func (s *store) checkPrevCompactionCompleted() bool { + tx := s.b.ReadTx() + tx.Lock() + defer tx.Unlock() + _, scheduledCompactBytes := tx.UnsafeRange(buckets.Meta, scheduledCompactKeyName, nil, 0) + scheduledCompact := int64(0) + if len(scheduledCompactBytes) != 0 { + scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main + } + + _, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0) + finishedCompact := int64(0) + if len(finishedCompactBytes) != 0 { + finishedCompact = bytesToRev(finishedCompactBytes[0]).main + + } + return scheduledCompact == finishedCompact +} + +func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error) { ch := make(chan struct{}) var j = func(ctx context.Context) { if ctx.Err() != nil { @@ -240,7 +260,13 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch s.compactBarrier(context.TODO(), ch) return } - s.hashes.Store(hash) + // Only store the hash value if the previous hash is completed, i.e. this compaction + // hashes every revision from last compaction. For more details, see #15919. + if prevCompactionCompleted { + s.hashes.Store(hash) + } else { + s.lg.Info("previous compaction was interrupted, skip storing compaction hash value") + } close(ch) } @@ -250,17 +276,19 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch } func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) { + prevCompactionCompleted := s.checkPrevCompactionCompleted() ch, prevCompactRev, err := s.updateCompactRev(rev) if err != nil { return ch, err } - return s.compact(traceutil.TODO(), rev, prevCompactRev) + return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted) } func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { s.mu.Lock() + prevCompactionCompleted := s.checkPrevCompactionCompleted() ch, prevCompactRev, err := s.updateCompactRev(rev) trace.Step("check and update compact revision") if err != nil { @@ -269,7 +297,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err } s.mu.Unlock() - return s.compact(trace, rev, prevCompactRev) + return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted) } func (s *store) Commit() { diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index 548e7ac2b90..af5293dc05f 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -333,6 +333,8 @@ func TestStoreCompact(t *testing.T) { fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}} key1 := newTestKeyBytes(revision{1, 0}, false) key2 := newTestKeyBytes(revision{2, 0}, false) + b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}} + b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}} b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}} s.Compact(traceutil.TODO(), 3) @@ -344,6 +346,8 @@ func TestStoreCompact(t *testing.T) { end := make([]byte, 8) binary.BigEndian.PutUint64(end, uint64(4)) wact := []testutil.Action{ + {Name: "range", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, []uint8(nil), int64(0)}}, + {Name: "range", Params: []interface{}{buckets.Meta, finishedCompactKeyName, []uint8(nil), int64(0)}}, {Name: "put", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}}, {Name: "range", Params: []interface{}{buckets.Key, make([]byte, 17), end, int64(10000)}}, {Name: "delete", Params: []interface{}{buckets.Key, key2}}, diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index e63e9a5d88d..bc39a5ac48a 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -175,6 +175,7 @@ type etcdProcessClusterConfig struct { CompactHashCheckEnabled bool CompactHashCheckTime time.Duration WatchProcessNotifyInterval time.Duration + CompactionBatchLimit int } // newEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -339,6 +340,9 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []* if cfg.WatchProcessNotifyInterval != 0 { args = append(args, "--experimental-watch-progress-notify-interval", cfg.WatchProcessNotifyInterval.String()) } + if cfg.CompactionBatchLimit != 0 { + args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit)) + } etcdCfgs[i] = &etcdServerProcessConfig{ lg: lg, diff --git a/tests/e2e/corrupt_test.go b/tests/e2e/corrupt_test.go index 2914b6a0f19..5eb9cafb66e 100644 --- a/tests/e2e/corrupt_test.go +++ b/tests/e2e/corrupt_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/datadir" @@ -180,3 +181,61 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) { assert.NoError(t, err, "error on alarm list") assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: 0}}, alarmResponse.Alarms) } + +func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) { + checkTime := time.Second + BeforeTest(t) + + slowCompactionNodeIndex := 1 + + // Start a new cluster, with compact hash check enabled. + t.Log("creating a new cluster with 3 nodes...") + + epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{ + clusterSize: 3, + keepDataDir: true, + CompactHashCheckEnabled: true, + CompactHashCheckTime: checkTime, + logLevel: "info", + CompactionBatchLimit: 1, + }) + require.NoError(t, err) + t.Cleanup(func() { + if errC := epc.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }) + + // Put 200 identical keys to the cluster, so that the compaction will drop some stale values. + // We need a relatively big number here to make the compaction takes a non-trivial time, and we can interrupt it. + t.Log("putting 200 values to the identical key...") + cc := NewEtcdctl(epc.EndpointsV3(), clientNonTLS, false, false) + + for i := 0; i < 200; i++ { + err = cc.Put("key", fmt.Sprint(i)) + require.NoError(t, err, "error on put") + } + + t.Log("compaction started...") + _, err = cc.Compact(200) + + t.Logf("restart proc %d to interrupt its compaction...", slowCompactionNodeIndex) + err = epc.procs[slowCompactionNodeIndex].Restart() + require.NoError(t, err) + + // Wait until the node finished compaction. + _, err = epc.procs[slowCompactionNodeIndex].Logs().Expect("finished scheduled compaction") + require.NoError(t, err, "can't get log indicating finished scheduled compaction") + + // Wait for compaction hash check + time.Sleep(checkTime * 5) + + alarmResponse, err := cc.AlarmList() + require.NoError(t, err, "error on alarm list") + for _, alarm := range alarmResponse.Alarms { + if alarm.Alarm == etcdserverpb.AlarmType_CORRUPT { + t.Fatal("there should be no corruption after resuming the compaction, but corruption detected") + } + } + t.Log("no corruption detected.") +}