Skip to content

Commit

Permalink
Merge pull request etcd-io#17555 from chaochn47/fix-watch-event-loss
Browse files Browse the repository at this point in the history
Fix watch event loss
  • Loading branch information
ahrtr committed Mar 16, 2024
2 parents 63e394d + 405862e commit fca3e8a
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 2 deletions.
1 change: 1 addition & 0 deletions server/etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ func (sws *serverWatchStream) sendLoop() {
sws.mu.RUnlock()

var serr error
// gofail: var beforeSendWatchResponse struct{}
if !fragmented && !ok {
serr = sws.gRPCStream.Send(wr)
} else {
Expand Down
7 changes: 7 additions & 0 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ var (
maxWatchersPerSync = 512
)

func ChanBufLen() int { return chanBufLen }

type watchable interface {
watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
progress(w *watcher)
Expand Down Expand Up @@ -370,6 +372,11 @@ func (s *watchableStore) syncWatchers() int {
victims := make(watcherBatch)
wb := newWatcherBatch(wg, evs)
for w := range wg.watchers {
if w.minRev < compactionRev {
// Skip the watcher that failed to send compacted watch response due to w.ch is full.
// Next retry of syncWatchers would try to resend the compacted watch response to w.ch
continue
}
w.minRev = curRev + 1

eb, ok := wb[w]
Expand Down
58 changes: 58 additions & 0 deletions server/storage/mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"

"go.etcd.io/etcd/api/v3/mvccpb"
Expand Down Expand Up @@ -250,6 +251,63 @@ func TestWatchCompacted(t *testing.T) {
}
}

func TestWatchNoEventLossOnCompact(t *testing.T) {
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync

b, _ := betesting.NewDefaultTmpBackend(t)
lg := zaptest.NewLogger(t)
s := newWatchableStore(lg, b, &lease.FakeLessor{}, StoreConfig{})

defer func() {
cleanup(s, b)
chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync
}()

chanBufLen, maxWatchersPerSync = 1, 4
testKey, testValue := []byte("foo"), []byte("bar")

maxRev := 10
compactRev := int64(5)
for i := 0; i < maxRev; i++ {
s.Put(testKey, testValue, lease.NoLease)
}
_, err := s.Compact(traceutil.TODO(), compactRev)
require.NoErrorf(t, err, "failed to compact kv (%v)", err)

w := s.NewWatchStream()
defer w.Close()

watchers := map[WatchID]int64{
0: 1,
1: 1, // create unsyncd watchers with startRev < compactRev
2: 6, // create unsyncd watchers with compactRev < startRev < currentRev
}
for id, startRev := range watchers {
_, err := w.Watch(id, testKey, nil, startRev)
require.NoError(t, err)
}
// fill up w.Chan() with 1 buf via 2 compacted watch response
s.syncWatchers()

for len(watchers) > 0 {
resp := <-w.Chan()
if resp.CompactRevision != 0 {
require.Equal(t, resp.CompactRevision, compactRev)
require.Contains(t, watchers, resp.WatchID)
delete(watchers, resp.WatchID)
continue
}
nextRev := watchers[resp.WatchID]
for _, ev := range resp.Events {
require.Equalf(t, nextRev, ev.Kv.ModRevision, "got event revision %d but want %d for watcher with watch ID %d", ev.Kv.ModRevision, nextRev, resp.WatchID)
nextRev++
}
if nextRev == s.rev()+1 {
delete(watchers, resp.WatchID)
}
}
}

func TestWatchFutureRev(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
Expand Down
58 changes: 58 additions & 0 deletions tests/integration/v3_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,25 @@ package integration
import (
"bytes"
"context"
"errors"
"fmt"
"reflect"
"sort"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.etcd.io/etcd/tests/v3/framework/integration"
gofail "go.etcd.io/gofail/runtime"
)

// TestV3WatchFromCurrentRevision tests Watch APIs from current revision.
Expand Down Expand Up @@ -1512,3 +1517,56 @@ func TestV3WatchProgressWaitsForSyncNoEvents(t *testing.T) {
}
require.True(t, gotProgressNotification, "Expected to get progress notification")
}

// TestV3NoEventsLostOnCompact verifies that slow watchers exit with compacted watch response
// if its next revision of events are compacted and no lost events sent to client.
func TestV3NoEventsLostOnCompact(t *testing.T) {
if integration.ThroughProxy {
t.Skip("grpc proxy currently does not support requesting progress notifications")
}
integration.BeforeTest(t)
if len(gofail.List()) == 0 {
t.Skip("please run 'make gofail-enable' before running the test")
}
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

client := clus.RandClient()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// sendLoop throughput is rate-limited to 1 event per second
require.NoError(t, gofail.Enable("beforeSendWatchResponse", `sleep("1s")`))
wch := client.Watch(ctx, "foo")

var rev int64
writeCount := mvcc.ChanBufLen() * 11 / 10
for i := 0; i < writeCount; i++ {
resp, err := client.Put(ctx, "foo", "bar")
require.NoError(t, err)
rev = resp.Header.Revision
}
_, err := client.Compact(ctx, rev)
require.NoError(t, err)

time.Sleep(time.Second)
require.NoError(t, gofail.Disable("beforeSendWatchResponse"))

eventCount := 0
compacted := false
for resp := range wch {
err = resp.Err()
if err != nil {
if !errors.Is(err, rpctypes.ErrCompacted) {
t.Fatalf("want watch response err %v but got %v", rpctypes.ErrCompacted, err)
}
compacted = true
break
}
eventCount += len(resp.Events)
if eventCount == writeCount {
break
}
}
assert.Truef(t, compacted, "Expected stream to get compacted, instead we got %d events out of %d events", eventCount, writeCount)
}
4 changes: 2 additions & 2 deletions tests/robustness/makefile.mk
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ GOFAIL_VERSION = $(shell cd tools/mod && go list -m -f {{.Version}} go.etcd.io/g

.PHONY: gofail-enable
gofail-enable: install-gofail
gofail enable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/
gofail enable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
cd ./server && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./etcdutl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./etcdctl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./tests && go get go.etcd.io/gofail@${GOFAIL_VERSION}

.PHONY: gofail-disable
gofail-disable: install-gofail
gofail disable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/
gofail disable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
cd ./server && go mod tidy
cd ./etcdutl && go mod tidy
cd ./etcdctl && go mod tidy
Expand Down

0 comments on commit fca3e8a

Please sign in to comment.