Skip to content

Commit

Permalink
close watcher
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Aug 18, 2023
1 parent df8186d commit 05c7477
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
14 changes: 11 additions & 3 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,21 +196,29 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
defer ticker.Stop()
lastReceivedResponseTime := time.Now()

watcher := clientv3.NewWatcher(ls.client)
defer watcher.Close()
var watchChanCancel context.CancelFunc
var (
watcher clientv3.Watcher
watchChanCancel context.CancelFunc
)
defer func() {
if watchChanCancel != nil {
watchChanCancel()
}
if watcher != nil {
watcher.Close()
}
}()
for {
failpoint.Inject("delayWatcher", nil)
if watchChanCancel != nil {
watchChanCancel()
}
if watcher != nil {
watcher.Close()
}
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watcher = clientv3.NewWatcher(ls.client)
watchChanCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(serverCtx))
watchChanCancel = cancel

Expand Down
11 changes: 8 additions & 3 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,21 +687,26 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
defer ticker.Stop()
lastReceivedResponseTime := time.Now()

watcher := clientv3.NewWatcher(lw.client)
defer watcher.Close()
var watcher clientv3.Watcher
var watchChanCancel context.CancelFunc
defer func() {
if watchChanCancel != nil {
watchChanCancel()
}
if watcher != nil {
watcher.Close()
}
}()

for {
if watchChanCancel != nil {
watchChanCancel()
}
if watcher != nil {
watcher.Close()
}
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watcher = clientv3.NewWatcher(lw.client)
watchChanCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(ctx))
watchChanCancel = cancel
opts := append(lw.opts, clientv3.WithRev(revision))
Expand Down

0 comments on commit 05c7477

Please sign in to comment.