Skip to content

Commit

Permalink
add watcher close
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Aug 21, 2023
1 parent 1402111 commit 1e1e7de
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
13 changes: 11 additions & 2 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,17 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
return
}

var watcherCancel context.CancelFunc
var (
watcher clientv3.Watcher
watcherCancel context.CancelFunc
)
defer func() {
if watcherCancel != nil {
watcherCancel()
}
if watcher != nil {
watcher.Close()
}
}()
unhealthyTimeout := watchLoopUnhealthyTimeout
failpoint.Inject("fastTick", func() {
Expand All @@ -207,9 +213,12 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
if watcherCancel != nil {
watcherCancel()
}
if watcher != nil {
watcher.Close()
}
watcher = clientv3.NewWatcher(ls.client)
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watcher := clientv3.NewWatcher(ls.client)
watcherCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(serverCtx))
watcherCancel = cancel

Expand Down
15 changes: 12 additions & 3 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,11 +683,17 @@ func (lw *LoopWatcher) initFromEtcd(ctx context.Context) int64 {
}

func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision int64, err error) {
var watcherCancel context.CancelFunc
var (
watcher clientv3.Watcher
watcherCancel context.CancelFunc
)
defer func() {
if watcherCancel != nil {
watcherCancel()
}
if watcher != nil {
watcher.Close()
}
}()
ticker := time.NewTicker(RequestProgressInterval)
defer ticker.Stop()
Expand All @@ -697,12 +703,15 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
if watcherCancel != nil {
watcherCancel()
}
if watcher != nil {
watcher.Close()
}
watcher = clientv3.NewWatcher(lw.client)
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watcher := clientv3.NewWatcher(lw.client)
watcherCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(ctx))
watcherCancel = cancel
opts := append(lw.opts, clientv3.WithRev(revision))
opts := append(lw.opts, clientv3.WithRev(revision), clientv3.WithProgressNotify())
watchChan := watcher.Watch(watcherCtx, lw.key, opts...)
WatchChanLoop:
select {
Expand Down

0 comments on commit 1e1e7de

Please sign in to comment.