Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Jul 6, 2023
1 parent 1e612e0 commit 1d707b3
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 17 deletions.
31 changes: 22 additions & 9 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,21 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {

watcher := clientv3.NewWatcher(ls.client)
defer watcher.Close()
var watchChanCancel *context.CancelFunc
defer func() {
if watchChanCancel != nil {
(*watchChanCancel)()
}
}()
for {
failpoint.Inject("delayWatcher", nil)
if watchChanCancel != nil {
(*watchChanCancel)()
}
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watchChanCtx, watchChanCancel := context.WithCancel(clientv3.WithRequireLeader(serverCtx))
defer watchChanCancel()
watchChanCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(serverCtx))
watchChanCancel = &cancel

// When etcd is not available, the watcher.Watch will block,
// so we check the etcd availability first.
Expand All @@ -223,32 +232,35 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
// server closed, return
return
case <-ticker.C:
watchChanCancel()
// continue to check the etcd availability
continue
}
}

watchChan := watcher.Watch(watchChanCtx, ls.leaderKey, clientv3.WithRev(revision))
lastHealthyTime = time.Now()
WatchChan:
select {
case <-serverCtx.Done():
// server closed, return
return
case <-ticker.C:
if !etcdutil.IsHealthy(serverCtx, ls.client) {
watchChanCancel()
continue
if time.Since(lastHealthyTime) > watchLoopUnhealthyTimeout {
log.Error("the connect of leadership watcher is unhealthy",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
return
}
goto WatchChan
}
goto WatchChan // use goto to avoid to create a new watchChan
case wresp := <-watchChan:
// meet compacted error, use the compact revision.
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
watchChanCancel()
continue
} else if wresp.Err() != nil { // wresp.Err() contains CompactRevision not equal to 0
log.Error("leadership watcher is canceled with",
Expand All @@ -269,8 +281,9 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
}
}
revision = wresp.Header.Revision + 1
goto WatchChan // use goto to avoid to create a new watchChan
}
lastHealthyTime = time.Now()
goto WatchChan // use goto to avoid to create a new watchChan
}
}

Expand Down
27 changes: 19 additions & 8 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client
select {
case <-client.Ctx().Done():
log.Info("[etcd client] etcd client is closed, exit health check goroutine")
checker.Range(func(key, value interface{}) bool {
client := value.(*healthyClient)
client.Close()
return true
})
return
case <-ticker.C:
usedEps := client.Endpoints()
Expand Down Expand Up @@ -371,9 +376,9 @@ func (checker *healthyChecker) update(eps []string) {
checker.Delete(ep)
}
if time.Since(lastHealthy) > etcdServerDisconnectedTimeout {
// try to update client to trigger reconnect
checker.addClient(ep, lastHealthy)
client.(*healthyClient).Close()
// try to reset client endpoint to trigger reconnect
client.(*healthyClient).Client.SetEndpoints([]string{}...)
client.(*healthyClient).Client.SetEndpoints(ep)
}
continue
}
Expand Down Expand Up @@ -672,12 +677,20 @@ func (lw *LoopWatcher) initFromEtcd(ctx context.Context) int64 {
func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision int64, err error) {
watcher := clientv3.NewWatcher(lw.client)
defer watcher.Close()

var watchChanCancel *context.CancelFunc
defer func() {
if watchChanCancel != nil {
(*watchChanCancel)()
}
}()
for {
if watchChanCancel != nil {
(*watchChanCancel)()
}
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watchChanCtx, watchChanCancel := context.WithCancel(clientv3.WithRequireLeader(ctx))
defer watchChanCancel()
watchChanCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(ctx))
watchChanCancel = &cancel
opts := append(lw.opts, clientv3.WithRev(revision))
watchChan := watcher.Watch(watchChanCtx, lw.key, opts...)
WatchChan:
Expand All @@ -690,15 +703,13 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
log.Warn("force load key failed in watch loop", zap.String("name", lw.name),
zap.String("key", lw.key), zap.Error(err))
}
watchChanCancel()
continue
case wresp := <-watchChan:
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision in watch loop",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
watchChanCancel()
continue
} else if wresp.Err() != nil { // wresp.Err() contains CompactRevision not equal to 0
log.Error("watcher is canceled in watch loop",
Expand Down

0 comments on commit 1d707b3

Please sign in to comment.