Skip to content

Commit

Permalink
tests: reduce etcdutil test time (#7945)
Browse files Browse the repository at this point in the history
ref #7930

Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 authored Mar 19, 2024
1 parent 3fe806a commit b0c49fa
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
5 changes: 4 additions & 1 deletion pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,8 @@ type LoopWatcher struct {
// updateClientCh is used to update the etcd client.
// It's only used for testing.
updateClientCh chan *clientv3.Client
// watchChTimeoutDuration is the timeout duration for a watchChan.
watchChTimeoutDuration time.Duration
}

// NewLoopWatcher creates a new LoopWatcher.
Expand Down Expand Up @@ -448,6 +450,7 @@ func NewLoopWatcher(
loadRetryTimes: defaultLoadFromEtcdRetryTimes,
loadBatchSize: maxLoadBatchSize,
watchChangeRetryInterval: defaultEtcdRetryInterval,
watchChTimeoutDuration: WatchChTimeoutDuration,
}
}

Expand Down Expand Up @@ -597,7 +600,7 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
cancel()
// If no message comes from an etcd watchChan for WatchChTimeoutDuration,
// create a new one and need not to reset lastReceivedResponseTime.
if time.Since(lastReceivedResponseTime) >= WatchChTimeoutDuration {
if time.Since(lastReceivedResponseTime) >= lw.watchChTimeoutDuration {
log.Warn("watch channel is blocked for a long time, recreating a new one in watch loop",
zap.Duration("timeout", time.Since(lastReceivedResponseTime)),
zap.Int64("revision", revision), zap.String("name", lw.name), zap.String("key", lw.key))
Expand Down
19 changes: 18 additions & 1 deletion pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,9 +583,25 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLargeKey() {
count := 65536
ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()

// create data
var wg sync.WaitGroup
tasks := make(chan int, count)
for w := 0; w < 16; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := range tasks {
suite.put(re, fmt.Sprintf("TestWatcherLoadLargeKey/test-%d", i), "")
}
}()
}
for i := 0; i < count; i++ {
suite.put(re, fmt.Sprintf("TestWatcherLoadLargeKey/test-%d", i), "")
tasks <- i
}
close(tasks)
wg.Wait()

cache := make([]string, 0)
watcher := NewLoopWatcher(
ctx,
Expand Down Expand Up @@ -724,6 +740,7 @@ func (suite *loopWatcherTestSuite) TestWatcherRequestProgress() {
func([]*clientv3.Event) error { return nil },
false, /* withPrefix */
)
watcher.watchChTimeoutDuration = 2 * RequestProgressInterval

suite.wg.Add(1)
go func() {
Expand Down

0 comments on commit b0c49fa

Please sign in to comment.