From 1e1e7def394edea885267483e9f2c6957774ac86 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 21 Aug 2023 23:32:04 +0800 Subject: [PATCH] add watcher close Signed-off-by: lhy1024 --- pkg/election/leadership.go | 13 +++++++++++-- pkg/utils/etcdutil/etcdutil.go | 15 ++++++++++++--- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index 866689ad45b..f810734375b 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -188,11 +188,17 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { return } - var watcherCancel context.CancelFunc + var ( + watcher clientv3.Watcher + watcherCancel context.CancelFunc + ) defer func() { if watcherCancel != nil { watcherCancel() } + if watcher != nil { + watcher.Close() + } }() unhealthyTimeout := watchLoopUnhealthyTimeout failpoint.Inject("fastTick", func() { @@ -207,9 +213,12 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { if watcherCancel != nil { watcherCancel() } + if watcher != nil { + watcher.Close() + } + watcher = clientv3.NewWatcher(ls.client) // In order to prevent a watch stream being stuck in a partitioned node, // make sure to wrap context with "WithRequireLeader". - watcher := clientv3.NewWatcher(ls.client) watcherCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(serverCtx)) watcherCancel = cancel diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 8bab02b83cf..98e8c7353bf 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -683,11 +683,17 @@ func (lw *LoopWatcher) initFromEtcd(ctx context.Context) int64 { } func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision int64, err error) { - var watcherCancel context.CancelFunc + var ( + watcher clientv3.Watcher + watcherCancel context.CancelFunc + ) defer func() { if watcherCancel != nil { watcherCancel() } + if watcher != nil { + watcher.Close() + } }() ticker := time.NewTicker(RequestProgressInterval) defer ticker.Stop() @@ -697,12 +703,15 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision if watcherCancel != nil { watcherCancel() } + if watcher != nil { + watcher.Close() + } + watcher = clientv3.NewWatcher(lw.client) // In order to prevent a watch stream being stuck in a partitioned node, // make sure to wrap context with "WithRequireLeader". - watcher := clientv3.NewWatcher(lw.client) watcherCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(ctx)) watcherCancel = cancel - opts := append(lw.opts, clientv3.WithRev(revision)) + opts := append(lw.opts, clientv3.WithRev(revision), clientv3.WithProgressNotify()) watchChan := watcher.Watch(watcherCtx, lw.key, opts...) WatchChanLoop: select {