Skip to content

Commit

Permalink
Merge pull request #18608 from srivastav-abhishek/periodic-compaction…
Browse files Browse the repository at this point in the history
…-flake-fix

Fixed periodic compaction tests
  • Loading branch information
ahrtr authored Sep 22, 2024
2 parents 6ea81c1 + b8cb654 commit 2f9532b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 17 deletions.
5 changes: 4 additions & 1 deletion client/pkg/testutil/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ func (r *recorderStream) Chan() <-chan Action {

func (r *recorderStream) Wait(n int) ([]Action, error) {
acts := make([]Action, n)
timeoutC := time.After(r.waitTimeout)
var timeoutC <-chan time.Time
if r.waitTimeout != 0 {
timeoutC = time.After(r.waitTimeout)
}
for i := 0; i < n; i++ {
select {
case acts[i] = <-r.ch:
Expand Down
38 changes: 22 additions & 16 deletions server/etcdserver/api/v3compactor/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestPeriodicHourly(t *testing.T) {

fc := clockwork.NewFakeClock()
// TODO: Do not depand or real time (Recorder.Wait) in unit tests.
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)

Expand All @@ -43,8 +43,8 @@ func TestPeriodicHourly(t *testing.T) {
initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10

// compaction doesn't happen til 2 hours elapse
for i := 0; i < initialIntervals; i++ {
rg.Wait(1)
for i := 0; i < initialIntervals-1; i++ {
waitOneAction(t, rg)
fc.Advance(tb.getRetryInterval())
}

Expand All @@ -63,7 +63,7 @@ func TestPeriodicHourly(t *testing.T) {
for i := 0; i < 3; i++ {
// advance one hour, one revision for each interval
for j := 0; j < intervalsPerPeriod; j++ {
rg.Wait(1)
waitOneAction(t, rg)
fc.Advance(tb.getRetryInterval())
}

Expand All @@ -84,7 +84,7 @@ func TestPeriodicMinutes(t *testing.T) {
retentionDuration := time.Duration(retentionMinutes) * time.Minute

fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)

Expand All @@ -94,8 +94,8 @@ func TestPeriodicMinutes(t *testing.T) {
initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10

// compaction doesn't happen til 5 minutes elapse
for i := 0; i < initialIntervals; i++ {
rg.Wait(1)
for i := 0; i < initialIntervals-1; i++ {
waitOneAction(t, rg)
fc.Advance(tb.getRetryInterval())
}

Expand All @@ -113,7 +113,7 @@ func TestPeriodicMinutes(t *testing.T) {
for i := 0; i < 5; i++ {
// advance 5-minute, one revision for each interval
for j := 0; j < intervalsPerPeriod; j++ {
rg.Wait(1)
waitOneAction(t, rg)
fc.Advance(tb.getRetryInterval())
}

Expand All @@ -132,7 +132,7 @@ func TestPeriodicMinutes(t *testing.T) {
func TestPeriodicPause(t *testing.T) {
fc := clockwork.NewFakeClock()
retentionDuration := time.Hour
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)

Expand All @@ -143,7 +143,7 @@ func TestPeriodicPause(t *testing.T) {

// tb will collect 3 hours of revisions but not compact since paused
for i := 0; i < n*3; i++ {
rg.Wait(1)
waitOneAction(t, rg)
fc.Advance(tb.getRetryInterval())
}
// t.revs = [21 22 23 24 25 26 27 28 29 30]
Expand All @@ -156,7 +156,7 @@ func TestPeriodicPause(t *testing.T) {

// tb resumes to being blocked on the clock
tb.Resume()
rg.Wait(1)
waitOneAction(t, rg)

// unblock clock, will kick off a compaction at T=3h6m by retry
fc.Advance(tb.getRetryInterval())
Expand All @@ -179,7 +179,7 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
retentionDuration := time.Duration(retentionMinutes) * time.Minute

fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)

Expand All @@ -189,10 +189,10 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10

// first compaction happens til 5 minutes elapsed
for i := 0; i < initialIntervals; i++ {
for i := 0; i < initialIntervals-1; i++ {
// every time set the same revision with 100
rg.SetRev(int64(100))
rg.Wait(1)
waitOneAction(t, rg)
fc.Advance(tb.getRetryInterval())
}

Expand All @@ -212,7 +212,7 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
for i := 0; i < 5; i++ {
for j := 0; j < intervalsPerPeriod; j++ {
rg.SetRev(int64(100))
rg.Wait(1)
waitOneAction(t, rg)
fc.Advance(tb.getRetryInterval())
}

Expand All @@ -224,7 +224,7 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {

// when revision changed, compaction is normally
for i := 0; i < initialIntervals; i++ {
rg.Wait(1)
waitOneAction(t, rg)
fc.Advance(tb.getRetryInterval())
}

Expand All @@ -238,3 +238,9 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
}

func waitOneAction(t *testing.T, r testutil.Recorder) {
if actions, _ := r.Wait(1); len(actions) != 1 {
t.Errorf("expect 1 action, got %v instead", len(actions))
}
}

0 comments on commit 2f9532b

Please sign in to comment.