diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index e71a8dd974b..c5d4d80dc6e 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -230,6 +230,7 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { watchChan := watcher.Watch(watchChanCtx, ls.leaderKey, clientv3.WithRev(revision)) lastHealthyTime = time.Now() + WatchChan: select { case <-serverCtx.Done(): // server closed, return @@ -239,6 +240,7 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { watchChanCancel() continue } + goto WatchChan // use goto to avoid to create a new watchChan case wresp := <-watchChan: // meet compacted error, use the compact revision. if wresp.CompactRevision != 0 { @@ -267,8 +269,8 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { } } revision = wresp.Header.Revision + 1 + goto WatchChan // use goto to avoid to create a new watchChan } - watchChanCancel() } } diff --git a/pkg/election/leadership_test.go b/pkg/election/leadership_test.go index 5e98f7a5d36..5a394de56e1 100644 --- a/pkg/election/leadership_test.go +++ b/pkg/election/leadership_test.go @@ -124,6 +124,7 @@ func TestExitWatch(t *testing.T) { re := require.New(t) leaderKey := "/test_leader" re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/fastTick", "return(true)")) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)")) // Case1: close the client before the watch loop starts checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) { re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayWatcher", `pause`)) @@ -155,6 +156,7 @@ func TestExitWatch(t *testing.T) { server.Close() }) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/fastTick")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) } func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embed.Etcd, client *clientv3.Client)) { diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 9592d953310..dc1df7e4ca8 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -675,13 +675,13 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision defer watcher.Close() for { - WatchChan: // In order to prevent a watch stream being stuck in a partitioned node, // make sure to wrap context with "WithRequireLeader". watchChanCtx, watchChanCancel := context.WithCancel(clientv3.WithRequireLeader(ctx)) defer watchChanCancel() opts := append(lw.opts, clientv3.WithRev(revision)) watchChan := watcher.Watch(watchChanCtx, lw.key, opts...) + WatchChan: select { case <-ctx.Done(): return revision, nil @@ -692,7 +692,7 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision zap.String("key", lw.key), zap.Error(err)) } watchChanCancel() - goto WatchChan + continue case wresp := <-watchChan: if wresp.CompactRevision != 0 { log.Warn("required revision has been compacted, use the compact revision in watch loop", @@ -700,7 +700,7 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision zap.Int64("compact-revision", wresp.CompactRevision)) revision = wresp.CompactRevision watchChanCancel() - goto WatchChan + continue } else if wresp.Err() != nil { // wresp.Err() contains CompactRevision not equal to 0 log.Error("watcher is canceled in watch loop", zap.Int64("revision", revision), @@ -733,8 +733,8 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision zap.String("key", lw.key), zap.Error(err)) } revision = wresp.Header.Revision + 1 + goto WatchChan // use goto to avoid to create a new watchChan } - watchChanCancel() } }