diff --git a/server/mvcc/hash.go b/server/mvcc/hash.go index 11232bf5677..8af5d90a46f 100644 --- a/server/mvcc/hash.go +++ b/server/mvcc/hash.go @@ -62,6 +62,9 @@ func (h *kvHasher) WriteKeyValue(k, v []byte) { if !upper.GreaterThan(kr) { return } + + isTombstoneRev := isTombstone(k) + lower := revision{main: h.compactRevision + 1} // skip revisions that are scheduled for deletion // due to compacting; don't skip if there isn't one. @@ -70,6 +73,17 @@ func (h *kvHasher) WriteKeyValue(k, v []byte) { return } } + + // When performing compaction, if the compacted revision is a + // tombstone, older versions (<= 3.5.15 or <= 3.4.33) will delete + // the tombstone. But newer versions (> 3.5.15 or > 3.4.33) won't + // delete it. So we should skip the tombstone in such cases when + // computing the hash to ensure that both older and newer versions + // can always generate the same hash values. + if kr.main == h.compactRevision && isTombstoneRev { + return + } + h.hash.Write(k) h.hash.Write(v) } diff --git a/server/mvcc/index_test.go b/server/mvcc/index_test.go index 87c31dd905f..c2749276cbd 100644 --- a/server/mvcc/index_test.go +++ b/server/mvcc/index_test.go @@ -18,8 +18,9 @@ import ( "reflect" "testing" - "github.com/google/btree" + "github.com/stretchr/testify/require" "go.uber.org/zap" + "go.uber.org/zap/zaptest" ) func TestIndexGet(t *testing.T) { @@ -196,98 +197,445 @@ func TestIndexRangeSince(t *testing.T) { func TestIndexCompactAndKeep(t *testing.T) { maxRev := int64(20) - tests := []struct { - key []byte - remove bool - rev revision - created revision - ver int64 + + // key: "foo" + // modified: 10 + // generations: + // {{10, 0}} + // {{1, 0}, {5, 0}, {9, 0}(t)} + // + // key: "foo1" + // modified: 10, 1 + // generations: + // {{10, 1}} + // {{2, 0}, {6, 0}, {7, 0}(t)} + // + // key: "foo2" + // modified: 8 + // generations: + // {empty} + // {{3, 0}, {4, 0}, {8, 0}(t)} + // + buildTreeIndex := func() index { + ti := newTreeIndex(zaptest.NewLogger(t)) + + ti.Put([]byte("foo"), revision{main: 1}) + ti.Put([]byte("foo1"), revision{main: 2}) + ti.Put([]byte("foo2"), revision{main: 3}) + ti.Put([]byte("foo2"), revision{main: 4}) + ti.Put([]byte("foo"), revision{main: 5}) + ti.Put([]byte("foo1"), revision{main: 6}) + require.NoError(t, ti.Tombstone([]byte("foo1"), revision{main: 7})) + require.NoError(t, ti.Tombstone([]byte("foo2"), revision{main: 8})) + require.NoError(t, ti.Tombstone([]byte("foo"), revision{main: 9})) + ti.Put([]byte("foo"), revision{main: 10}) + ti.Put([]byte("foo1"), revision{main: 10, sub: 1}) + return ti + } + + afterCompacts := []struct { + atRev int + keyIndexes []keyIndex + keep map[revision]struct{} + compacted map[revision]struct{} }{ - {[]byte("foo"), false, revision{main: 1}, revision{main: 1}, 1}, - {[]byte("foo1"), false, revision{main: 2}, revision{main: 2}, 1}, - {[]byte("foo2"), false, revision{main: 3}, revision{main: 3}, 1}, - {[]byte("foo2"), false, revision{main: 4}, revision{main: 3}, 2}, - {[]byte("foo"), false, revision{main: 5}, revision{main: 1}, 2}, - {[]byte("foo1"), false, revision{main: 6}, revision{main: 2}, 2}, - {[]byte("foo1"), true, revision{main: 7}, revision{}, 0}, - {[]byte("foo2"), true, revision{main: 8}, revision{}, 0}, - {[]byte("foo"), true, revision{main: 9}, revision{}, 0}, - {[]byte("foo"), false, revision{10, 0}, revision{10, 0}, 1}, - {[]byte("foo1"), false, revision{10, 1}, revision{10, 1}, 1}, + { + atRev: 1, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 3, created: revision{main: 1}, revs: []revision{revision{main: 1}, revision{main: 5}, revision{main: 9}}}, + {ver: 1, created: revision{main: 10}, revs: []revision{revision{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 3, created: revision{main: 2}, revs: []revision{revision{main: 2}, revision{main: 6}, revision{main: 7}}}, + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{revision{main: 10, sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: revision{main: 8}, + generations: []generation{ + {ver: 3, created: revision{main: 3}, revs: []revision{revision{main: 3}, revision{main: 4}, revision{main: 8}}}, + {}, + }, + }, + }, + keep: map[revision]struct{}{ + revision{main: 1}: {}, + }, + compacted: map[revision]struct{}{ + revision{main: 1}: {}, + }, + }, + { + atRev: 2, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 3, created: revision{main: 1}, revs: []revision{revision{main: 1}, revision{main: 5}, revision{main: 9}}}, + {ver: 1, created: revision{main: 10}, revs: []revision{revision{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 3, created: revision{main: 2}, revs: []revision{revision{main: 2}, revision{main: 6}, revision{main: 7}}}, + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{revision{main: 10, sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: revision{main: 8}, + generations: []generation{ + {ver: 3, created: revision{main: 3}, revs: []revision{revision{main: 3}, revision{main: 4}, revision{main: 8}}}, + {}, + }, + }, + }, + keep: map[revision]struct{}{ + revision{main: 1}: {}, + revision{main: 2}: {}, + }, + compacted: map[revision]struct{}{ + revision{main: 1}: {}, + revision{main: 2}: {}, + }, + }, + { + atRev: 3, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 3, created: revision{main: 1}, revs: []revision{revision{main: 1}, revision{main: 5}, revision{main: 9}}}, + {ver: 1, created: revision{main: 10}, revs: []revision{revision{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 3, created: revision{main: 2}, revs: []revision{revision{main: 2}, revision{main: 6}, revision{main: 7}}}, + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{revision{main: 10, sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: revision{main: 8}, + generations: []generation{ + {ver: 3, created: revision{main: 3}, revs: []revision{revision{main: 3}, revision{main: 4}, revision{main: 8}}}, + {}, + }, + }, + }, + keep: map[revision]struct{}{ + revision{main: 1}: {}, + revision{main: 2}: {}, + revision{main: 3}: {}, + }, + compacted: map[revision]struct{}{ + revision{main: 1}: {}, + revision{main: 2}: {}, + revision{main: 3}: {}, + }, + }, + { + atRev: 4, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 3, created: revision{main: 1}, revs: []revision{revision{main: 1}, revision{main: 5}, revision{main: 9}}}, + {ver: 1, created: revision{main: 10}, revs: []revision{revision{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 3, created: revision{main: 2}, revs: []revision{revision{main: 2}, revision{main: 6}, revision{main: 7}}}, + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{revision{main: 10, sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: revision{main: 8}, + generations: []generation{ + {ver: 3, created: revision{main: 3}, revs: []revision{revision{main: 4}, revision{main: 8}}}, + {}, + }, + }, + }, + keep: map[revision]struct{}{ + revision{main: 1}: {}, + revision{main: 2}: {}, + revision{main: 4}: {}, + }, + compacted: map[revision]struct{}{ + revision{main: 1}: {}, + revision{main: 2}: {}, + revision{main: 4}: {}, + }, + }, + { + atRev: 5, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 3, created: revision{main: 1}, revs: []revision{revision{main: 5}, revision{main: 9}}}, + {ver: 1, created: revision{main: 10}, revs: []revision{revision{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 3, created: revision{main: 2}, revs: []revision{revision{main: 2}, revision{main: 6}, revision{main: 7}}}, + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{revision{main: 10, sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: revision{main: 8}, + generations: []generation{ + {ver: 3, created: revision{main: 3}, revs: []revision{revision{main: 4}, revision{main: 8}}}, + {}, + }, + }, + }, + keep: map[revision]struct{}{ + revision{main: 2}: {}, + revision{main: 4}: {}, + revision{main: 5}: {}, + }, + compacted: map[revision]struct{}{ + revision{main: 2}: {}, + revision{main: 4}: {}, + revision{main: 5}: {}, + }, + }, + { + atRev: 6, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 3, created: revision{main: 1}, revs: []revision{revision{main: 5}, revision{main: 9}}}, + {ver: 1, created: revision{main: 10}, revs: []revision{revision{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 3, created: revision{main: 2}, revs: []revision{revision{main: 6}, revision{main: 7}}}, + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{revision{main: 10, sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: revision{main: 8}, + generations: []generation{ + {ver: 3, created: revision{main: 3}, revs: []revision{revision{main: 4}, revision{main: 8}}}, + {}, + }, + }, + }, + keep: map[revision]struct{}{ + revision{main: 6}: {}, + revision{main: 4}: {}, + revision{main: 5}: {}, + }, + compacted: map[revision]struct{}{ + revision{main: 6}: {}, + revision{main: 4}: {}, + revision{main: 5}: {}, + }, + }, + { + atRev: 7, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 3, created: revision{main: 1}, revs: []revision{revision{main: 5}, revision{main: 9}}}, + {ver: 1, created: revision{main: 10}, revs: []revision{revision{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 3, created: revision{main: 2}, revs: []revision{revision{main: 7}}}, + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{revision{main: 10, sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: revision{main: 8}, + generations: []generation{ + {ver: 3, created: revision{main: 3}, revs: []revision{revision{main: 4}, revision{main: 8}}}, + {}, + }, + }, + }, + keep: map[revision]struct{}{ + revision{main: 4}: {}, + revision{main: 5}: {}, + }, + compacted: map[revision]struct{}{ + revision{main: 7}: {}, + revision{main: 4}: {}, + revision{main: 5}: {}, + }, + }, + { + atRev: 8, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 3, created: revision{main: 1}, revs: []revision{revision{main: 5}, revision{main: 9}}}, + {ver: 1, created: revision{main: 10}, revs: []revision{revision{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{revision{main: 10, sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: revision{main: 8}, + generations: []generation{ + {ver: 3, created: revision{main: 3}, revs: []revision{revision{main: 8}}}, + {}, + }, + }, + }, + keep: map[revision]struct{}{ + revision{main: 5}: {}, + }, + compacted: map[revision]struct{}{ + revision{main: 8}: {}, + revision{main: 5}: {}, + }, + }, + { + atRev: 9, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 3, created: revision{main: 1}, revs: []revision{revision{main: 9}}}, + {ver: 1, created: revision{main: 10}, revs: []revision{revision{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{revision{main: 10, sub: 1}}}, + }, + }, + }, + keep: map[revision]struct{}{}, + compacted: map[revision]struct{}{ + revision{main: 9}: {}, + }, + }, + { + atRev: 10, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 1, created: revision{main: 10}, revs: []revision{revision{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{revision{main: 10, sub: 1}}}, + }, + }, + }, + keep: map[revision]struct{}{ + revision{main: 10}: {}, + revision{main: 10, sub: 1}: {}, + }, + compacted: map[revision]struct{}{ + revision{main: 10}: {}, + revision{main: 10, sub: 1}: {}, + }, + }, } + ti := buildTreeIndex() // Continuous Compact and Keep - ti := newTreeIndex(zap.NewExample()) - for _, tt := range tests { - if tt.remove { - ti.Tombstone(tt.key, tt.rev) - } else { - ti.Put(tt.key, tt.rev) - } - } for i := int64(1); i < maxRev; i++ { + j := i - 1 + if i >= int64(len(afterCompacts)) { + j = int64(len(afterCompacts)) - 1 + } + am := ti.Compact(i) + require.Equal(t, afterCompacts[j].compacted, am, "#%d: compact(%d) != expected", i, i) + keep := ti.Keep(i) - if !(reflect.DeepEqual(am, keep)) { - t.Errorf("#%d: compact keep %v != Keep keep %v", i, am, keep) - } - wti := &treeIndex{tree: btree.New(32)} - for _, tt := range tests { - if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision{main: i}) { - if tt.remove { - wti.Tombstone(tt.key, tt.rev) - } else { - restore(wti, tt.key, tt.created, tt.rev, tt.ver) - } - } - } - if !ti.Equal(wti) { - t.Errorf("#%d: not equal ti", i) + require.Equal(t, afterCompacts[j].keep, keep, "#%d: keep(%d) != expected", i, i) + + nti := newTreeIndex(zaptest.NewLogger(t)).(*treeIndex) + for k := range afterCompacts[j].keyIndexes { + ki := afterCompacts[j].keyIndexes[k] + nti.tree.ReplaceOrInsert(&ki) } + require.True(t, ti.Equal(nti), "#%d: not equal ti", i) } // Once Compact and Keep for i := int64(1); i < maxRev; i++ { - ti := newTreeIndex(zap.NewExample()) - for _, tt := range tests { - if tt.remove { - ti.Tombstone(tt.key, tt.rev) - } else { - ti.Put(tt.key, tt.rev) - } + ti := buildTreeIndex() + + j := i - 1 + if i >= int64(len(afterCompacts)) { + j = int64(len(afterCompacts)) - 1 } + am := ti.Compact(i) + require.Equal(t, afterCompacts[j].compacted, am, "#%d: compact(%d) != expected", i, i) + keep := ti.Keep(i) - if !(reflect.DeepEqual(am, keep)) { - t.Errorf("#%d: compact keep %v != Keep keep %v", i, am, keep) - } - wti := &treeIndex{tree: btree.New(32)} - for _, tt := range tests { - if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision{main: i}) { - if tt.remove { - wti.Tombstone(tt.key, tt.rev) - } else { - restore(wti, tt.key, tt.created, tt.rev, tt.ver) - } - } - } - if !ti.Equal(wti) { - t.Errorf("#%d: not equal ti", i) - } - } -} + require.Equal(t, afterCompacts[j].keep, keep, "#%d: keep(%d) != expected", i, i) -func restore(ti *treeIndex, key []byte, created, modified revision, ver int64) { - keyi := &keyIndex{key: key} + nti := newTreeIndex(zaptest.NewLogger(t)).(*treeIndex) + for k := range afterCompacts[j].keyIndexes { + ki := afterCompacts[j].keyIndexes[k] + nti.tree.ReplaceOrInsert(&ki) + } - ti.Lock() - defer ti.Unlock() - item := ti.tree.Get(keyi) - if item == nil { - keyi.restore(ti.lg, created, modified, ver) - ti.tree.ReplaceOrInsert(keyi) - return + require.True(t, ti.Equal(nti), "#%d: not equal ti", i) } - okeyi := item.(*keyIndex) - okeyi.put(ti.lg, modified.main, modified.sub) } diff --git a/server/mvcc/key_index.go b/server/mvcc/key_index.go index daec9782586..d38b0933d92 100644 --- a/server/mvcc/key_index.go +++ b/server/mvcc/key_index.go @@ -66,7 +66,8 @@ var ( // compact(5): // generations: // -// {empty} -> key SHOULD be removed. +// {empty} +// {5.0(t)} // // compact(6): // generations: @@ -203,8 +204,7 @@ func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision { } // compact compacts a keyIndex by removing the versions with smaller or equal -// revision than the given atRev except the largest one (If the largest one is -// a tombstone, it will not be kept). +// revision than the given atRev except the largest one. // If a generation becomes empty during compaction, it will be removed. func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) { if ki.isEmpty() { @@ -222,11 +222,6 @@ func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision] if revIndex != -1 { g.revs = g.revs[revIndex:] } - // remove any tombstone - if len(g.revs) == 1 && genIdx != len(ki.generations)-1 { - delete(available, g.revs[0]) - genIdx++ - } } // remove the previous generations. @@ -242,7 +237,12 @@ func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) { genIdx, revIndex := ki.doCompact(atRev, available) g := &ki.generations[genIdx] if !g.isEmpty() { - // remove any tombstone + // If the given `atRev` is a tombstone, we need to skip it. + // + // Note that this s different from the `compact` function which + // keeps tombstone in such case. We need to stay consistent with + // existing versions, ensuring they always generate the same hash + // values. if revIndex == len(g.revs)-1 && genIdx != len(ki.generations)-1 { delete(available, g.revs[revIndex]) } @@ -263,7 +263,7 @@ func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (gen genIdx, g := 0, &ki.generations[0] // find first generation includes atRev or created after atRev for genIdx < len(ki.generations)-1 { - if tomb := g.revs[len(g.revs)-1].main; tomb > atRev { + if tomb := g.revs[len(g.revs)-1].main; tomb >= atRev { break } genIdx++ diff --git a/server/mvcc/key_index_test.go b/server/mvcc/key_index_test.go index 6404752ed36..814e252fd4a 100644 --- a/server/mvcc/key_index_test.go +++ b/server/mvcc/key_index_test.go @@ -18,6 +18,7 @@ import ( "reflect" "testing" + "github.com/stretchr/testify/assert" "go.uber.org/zap" ) @@ -298,12 +299,15 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { key: []byte("foo"), modified: revision{16, 0}, generations: []generation{ + {created: revision{main: 2}, ver: 3, revs: []revision{revision{main: 6}}}, {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 15, sub: 1}, {main: 16}}}, {}, }, }, - map[revision]struct{}{}, + map[revision]struct{}{ + {main: 6}: {}, + }, }, { 7, @@ -384,11 +388,14 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { key: []byte("foo"), modified: revision{16, 0}, generations: []generation{ + {created: revision{main: 8}, ver: 3, revs: []revision{revision{main: 12}}}, {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 15, sub: 1}, {main: 16}}}, {}, }, }, - map[revision]struct{}{}, + map[revision]struct{}{ + {main: 12}: {}, + }, }, { 13, @@ -434,7 +441,21 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { 16, &keyIndex{ key: []byte("foo"), - modified: revision{16, 0}, + modified: revision{main: 16}, + generations: []generation{ + {created: revision{main: 14}, ver: 3, revs: []revision{revision{main: 16}}}, + {}, + }, + }, + map[revision]struct{}{ + revision{main: 16}: {}, + }, + }, + { + 17, + &keyIndex{ + key: []byte("foo"), + modified: revision{main: 16}, generations: []generation{ {}, }, @@ -443,18 +464,36 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { }, } + isTombstoneRevFn := func(ki *keyIndex, rev int64) bool { + for i := 0; i < len(ki.generations)-1; i++ { + g := ki.generations[i] + + if l := len(g.revs); l > 0 && g.revs[l-1].main == rev { + return true + } + } + return false + } + // Continuous Compaction and finding Keep ki := newTestKeyIndex() for i, tt := range tests { + isTombstone := isTombstoneRevFn(ki, tt.compact) + am := make(map[revision]struct{}) kiclone := cloneKeyIndex(ki) ki.keep(tt.compact, am) if !reflect.DeepEqual(ki, kiclone) { t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiclone) } - if !reflect.DeepEqual(am, tt.wam) { - t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) + + if isTombstone { + assert.Equal(t, 0, len(am), "#%d: ki = %d, keep result wants empty because tombstone", i, ki) + } else { + assert.Equal(t, tt.wam, am, + "#%d: ki = %d, compact keep should be equal to keep keep if it's not tombstone", i, ki) } + am = make(map[revision]struct{}) ki.compact(zap.NewExample(), tt.compact, am) if !reflect.DeepEqual(ki, tt.wki) { @@ -468,7 +507,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { // Jump Compaction and finding Keep ki = newTestKeyIndex() for i, tt := range tests { - if (i%2 == 0 && i < 6) || (i%2 == 1 && i > 6) { + if !isTombstoneRevFn(ki, tt.compact) { am := make(map[revision]struct{}) kiclone := cloneKeyIndex(ki) ki.keep(tt.compact, am) @@ -498,9 +537,14 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { if !reflect.DeepEqual(ki, kiClone) { t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiClone) } - if !reflect.DeepEqual(am, tt.wam) { - t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) + + if isTombstoneRevFn(ki, tt.compact) { + assert.Equal(t, 0, len(am), "#%d: ki = %d, keep result wants empty because tombstone", i, ki) + } else { + assert.Equal(t, tt.wam, am, + "#%d: ki = %d, compact keep should be equal to keep keep if it's not tombstone", i, ki) } + am = make(map[revision]struct{}) ki.compact(zap.NewExample(), tt.compact, am) if !reflect.DeepEqual(ki, tt.wki) { diff --git a/tests/e2e/watch_test.go b/tests/e2e/watch_test.go index d911736eced..d09347f5bdd 100644 --- a/tests/e2e/watch_test.go +++ b/tests/e2e/watch_test.go @@ -25,8 +25,12 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" + v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/e2e" "golang.org/x/sync/errgroup" @@ -245,8 +249,7 @@ func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Gr // - COMPACT r5 // - WATCH rev=r5 // -// We should get the DELETE event (r5) followed by the PUT event (r6). However, currently we only -// get the PUT event with returned revision of r6 (key=k, val=v6). +// We should get the DELETE event (r5) followed by the PUT event (r6). func TestDeleteEventDrop_Issue18089(t *testing.T) { e2e.BeforeTest(t) @@ -299,24 +302,183 @@ func TestDeleteEventDrop_Issue18089(t *testing.T) { watchChan := c.Watch(ctx, key, clientv3.WithRev(deleteResp.Header.Revision)) select { case watchResp := <-watchChan: - // TODO(MadhavJivrajani): update conditions once https://github.com/etcd-io/etcd/issues/18089 - // is resolved. The existing conditions do not mimic the desired behaviour and are there to - // test and reproduce etcd-io/etcd#18089. - if len(watchResp.Events) != 1 { - t.Fatalf("expected exactly one event in response, got: %d", len(watchResp.Events)) - } - if watchResp.Events[0].Type != mvccpb.PUT { - t.Fatalf("unexpected event type, expected: %s, got: %s", mvccpb.PUT, watchResp.Events[0].Type) - } - if string(watchResp.Events[0].Kv.Key) != key { - t.Fatalf("unexpected key, expected: %s, got: %s", key, string(watchResp.Events[0].Kv.Key)) - } - if string(watchResp.Events[0].Kv.Value) != v6 { - t.Fatalf("unexpected valye, expected: %s, got: %s", v6, string(watchResp.Events[0].Kv.Value)) - } + require.Len(t, watchResp.Events, 2) + + require.Equal(t, mvccpb.DELETE, watchResp.Events[0].Type) + deletedKey := string(watchResp.Events[0].Kv.Key) + require.Equal(t, key, deletedKey) + + require.Equal(t, mvccpb.PUT, watchResp.Events[1].Type) + + updatedKey := string(watchResp.Events[1].Kv.Key) + require.Equal(t, key, updatedKey) + + require.Equal(t, v6, string(watchResp.Events[1].Kv.Value)) case <-time.After(100 * time.Millisecond): // we care only about the first response, but have an // escape hatch in case the watch response is delayed. t.Fatal("timed out getting watch response") } } + +func TestStartWatcherFromCompactedRevision(t *testing.T) { + t.Run("compaction on tombstone revision", func(t *testing.T) { + testStartWatcherFromCompactedRevision(t, true) + }) + t.Run("compaction on normal revision", func(t *testing.T) { + testStartWatcherFromCompactedRevision(t, false) + }) +} + +func testStartWatcherFromCompactedRevision(t *testing.T, performCompactOnTombstone bool) { + e2e.BeforeTest(t) + + cfg := e2e.EtcdProcessClusterConfig{ + ClusterSize: 1, + IsClientAutoTLS: true, + ClientTLS: e2e.ClientTLS, + } + + clus, err := e2e.NewEtcdProcessCluster(t, &cfg) + require.NoError(t, err) + defer clus.Close() + + c := newClient(t, clus.EndpointsGRPC(), cfg.ClientTLS, cfg.IsClientAutoTLS) + defer c.Close() + + ctx := context.Background() + key := "foo" + totalRev := 100 + + type valueEvent struct { + value string + typ mvccpb.Event_EventType + } + + var ( + // requestedValues records all requested change + requestedValues = make([]valueEvent, 0) + // revisionChan sends each compacted revision via this channel + compactionRevChan = make(chan int64) + // compactionStep means that client performs a compaction on every 7 operations + compactionStep = 7 + ) + + // This goroutine will submit changes on $key $totalRev times. It will + // perform compaction after every $compactedAfterChanges changes. + // Except for first time, the watcher always receives the compacted + // revision as start. + go func() { + defer close(compactionRevChan) + + lastRevision := int64(1) + + compactionRevChan <- lastRevision + for vi := 1; vi <= totalRev; vi++ { + var respHeader *etcdserverpb.ResponseHeader + + if vi%compactionStep == 0 && performCompactOnTombstone { + t.Logf("DELETE key=%s", key) + + resp, derr := c.KV.Delete(ctx, key) + require.NoError(t, derr) + respHeader = resp.Header + + requestedValues = append(requestedValues, valueEvent{value: "", typ: mvccpb.DELETE}) + } else { + value := fmt.Sprintf("%d", vi) + + t.Logf("PUT key=%s, val=%s", key, value) + resp, perr := c.KV.Put(ctx, key, value) + require.NoError(t, perr) + respHeader = resp.Header + + requestedValues = append(requestedValues, valueEvent{value: value, typ: mvccpb.PUT}) + } + + lastRevision = respHeader.Revision + + if vi%compactionStep == 0 { + compactionRevChan <- lastRevision + + t.Logf("COMPACT rev=%d", lastRevision) + _, err = c.KV.Compact(ctx, lastRevision, clientv3.WithCompactPhysical()) + require.NoError(t, err) + } + } + }() + + receivedEvents := make([]*clientv3.Event, 0) + + fromCompactedRev := false + for fromRev := range compactionRevChan { + watchChan := c.Watch(ctx, key, clientv3.WithRev(fromRev)) + + prevEventCount := len(receivedEvents) + + // firstReceived represents this is first watch response. + // Just in case that ETCD sends event one by one. + firstReceived := true + + t.Logf("Start to watch key %s starting from revision %d", key, fromRev) + watchLoop: + for { + currentEventCount := len(receivedEvents) + if currentEventCount-prevEventCount == compactionStep || currentEventCount == totalRev { + break + } + + select { + case watchResp := <-watchChan: + t.Logf("Receive the number of events: %d", len(watchResp.Events)) + for i := range watchResp.Events { + ev := watchResp.Events[i] + + // If the $fromRev is the compacted revision, + // the first event should be the same as the last event receives in last watch response. + if firstReceived && fromCompactedRev { + firstReceived = false + + last := receivedEvents[prevEventCount-1] + + assert.Equal(t, last.Type, ev.Type, + "last received event type %s, but got event type %s", last.Type, ev.Type) + assert.Equal(t, string(last.Kv.Key), string(ev.Kv.Key), + "last received event key %s, but got event key %s", string(last.Kv.Key), string(ev.Kv.Key)) + assert.Equal(t, string(last.Kv.Value), string(ev.Kv.Value), + "last received event value %s, but got event value %s", string(last.Kv.Value), string(ev.Kv.Value)) + continue + } + receivedEvents = append(receivedEvents, ev) + } + + if len(watchResp.Events) == 0 { + require.Equal(t, v3rpc.ErrCompacted, watchResp.Err()) + break watchLoop + } + + case <-time.After(10 * time.Second): + t.Fatal("timed out getting watch response") + } + } + + fromCompactedRev = true + } + + t.Logf("Received total number of events: %d", len(receivedEvents)) + require.Len(t, requestedValues, totalRev) + require.Len(t, receivedEvents, totalRev, "should receive %d events", totalRev) + for idx, expected := range requestedValues { + ev := receivedEvents[idx] + + require.Equal(t, expected.typ, ev.Type, "#%d expected event %s", idx, expected.typ) + + updatedKey := string(ev.Kv.Key) + + require.Equal(t, key, updatedKey) + if expected.typ == mvccpb.PUT { + updatedValue := string(ev.Kv.Value) + require.Equal(t, expected.value, updatedValue) + } + } +}