diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index d56d0e662e3..6bb3dba1609 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -418,6 +418,8 @@ type LoopWatcher struct { // updateClientCh is used to update the etcd client. // It's only used for testing. updateClientCh chan *clientv3.Client + // watchChTimeoutDuration is the timeout duration for a watchChan. + watchChTimeoutDuration time.Duration } // NewLoopWatcher creates a new LoopWatcher. @@ -448,6 +450,7 @@ func NewLoopWatcher( loadRetryTimes: defaultLoadFromEtcdRetryTimes, loadBatchSize: maxLoadBatchSize, watchChangeRetryInterval: defaultEtcdRetryInterval, + watchChTimeoutDuration: WatchChTimeoutDuration, } } @@ -597,7 +600,7 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision cancel() // If no message comes from an etcd watchChan for WatchChTimeoutDuration, // create a new one and need not to reset lastReceivedResponseTime. - if time.Since(lastReceivedResponseTime) >= WatchChTimeoutDuration { + if time.Since(lastReceivedResponseTime) >= lw.watchChTimeoutDuration { log.Warn("watch channel is blocked for a long time, recreating a new one in watch loop", zap.Duration("timeout", time.Since(lastReceivedResponseTime)), zap.Int64("revision", revision), zap.String("name", lw.name), zap.String("key", lw.key)) diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 63fa50fd800..4fb96895942 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -583,9 +583,25 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLargeKey() { count := 65536 ctx, cancel := context.WithCancel(suite.ctx) defer cancel() + + // create data + var wg sync.WaitGroup + tasks := make(chan int, count) + for w := 0; w < 16; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := range tasks { + suite.put(re, fmt.Sprintf("TestWatcherLoadLargeKey/test-%d", i), "") + } + }() + } for i := 0; i < count; i++ { - suite.put(re, fmt.Sprintf("TestWatcherLoadLargeKey/test-%d", i), "") + tasks <- i } + close(tasks) + wg.Wait() + cache := make([]string, 0) watcher := NewLoopWatcher( ctx, @@ -724,6 +740,7 @@ func (suite *loopWatcherTestSuite) TestWatcherRequestProgress() { func([]*clientv3.Event) error { return nil }, false, /* withPrefix */ ) + watcher.watchChTimeoutDuration = 2 * RequestProgressInterval suite.wg.Add(1) go func() {