Skip to content

Commit

Permalink
use goto to avoid to create a new watchChan
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Jun 28, 2023
1 parent 12d592e commit 78fe096
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 5 deletions.
4 changes: 3 additions & 1 deletion pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {

watchChan := watcher.Watch(watchChanCtx, ls.leaderKey, clientv3.WithRev(revision))
lastHealthyTime = time.Now()
WatchChan:
select {
case <-serverCtx.Done():
// server closed, return
Expand All @@ -239,6 +240,7 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
watchChanCancel()
continue
}
goto WatchChan // use goto to avoid to create a new watchChan
case wresp := <-watchChan:
// meet compacted error, use the compact revision.
if wresp.CompactRevision != 0 {
Expand Down Expand Up @@ -267,8 +269,8 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
}
}
revision = wresp.Header.Revision + 1
goto WatchChan // use goto to avoid to create a new watchChan
}
watchChanCancel()
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/election/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func TestExitWatch(t *testing.T) {
re := require.New(t)
leaderKey := "/test_leader"
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/fastTick", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)"))
// Case1: close the client before the watch loop starts
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayWatcher", `pause`))
Expand Down Expand Up @@ -155,6 +156,7 @@ func TestExitWatch(t *testing.T) {
server.Close()
})
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/fastTick"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick"))
}

func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embed.Etcd, client *clientv3.Client)) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,13 +675,13 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
defer watcher.Close()

for {
WatchChan:
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watchChanCtx, watchChanCancel := context.WithCancel(clientv3.WithRequireLeader(ctx))
defer watchChanCancel()
opts := append(lw.opts, clientv3.WithRev(revision))
watchChan := watcher.Watch(watchChanCtx, lw.key, opts...)
WatchChan:
select {
case <-ctx.Done():
return revision, nil
Expand All @@ -692,15 +692,15 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
zap.String("key", lw.key), zap.Error(err))
}
watchChanCancel()
goto WatchChan
continue
case wresp := <-watchChan:
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision in watch loop",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
watchChanCancel()
goto WatchChan
continue
} else if wresp.Err() != nil { // wresp.Err() contains CompactRevision not equal to 0
log.Error("watcher is canceled in watch loop",
zap.Int64("revision", revision),
Expand Down Expand Up @@ -733,8 +733,8 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
zap.String("key", lw.key), zap.Error(err))
}
revision = wresp.Header.Revision + 1
goto WatchChan // use goto to avoid to create a new watchChan
}
watchChanCancel()
}
}

Expand Down

0 comments on commit 78fe096

Please sign in to comment.