Skip to content

Commit

Permalink
etcdutil, leadership: use RequestProgress
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Aug 16, 2023
1 parent 7dc9d17 commit 385310c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 25 deletions.
38 changes: 14 additions & 24 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ import (
"go.uber.org/zap"
)

const (
watchLoopUnhealthyTimeout = 60 * time.Second
detectHealthyInterval = 10 * time.Second
)

// GetLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func GetLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) {
leader := &pdpb.Member{}
Expand Down Expand Up @@ -189,15 +184,9 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
return
}

interval := detectHealthyInterval
unhealthyTimeout := watchLoopUnhealthyTimeout
failpoint.Inject("fastTick", func() {
unhealthyTimeout = 5 * time.Second
interval = 1 * time.Second
})
ticker := time.NewTicker(interval)
ticker := time.NewTicker(etcdutil.RequestProgressInterval)
defer ticker.Stop()
lastHealthyTime := time.Now()
lastReceivedResponseTime := time.Now()

watcher := clientv3.NewWatcher(ls.client)
defer watcher.Close()
Expand All @@ -220,7 +209,7 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
// When etcd is not available, the watcher.Watch will block,
// so we check the etcd availability first.
if !etcdutil.IsHealthy(serverCtx, ls.client) {
if time.Since(lastHealthyTime) > unhealthyTimeout {
if time.Since(lastReceivedResponseTime) > etcdutil.WatchChTimeoutDuration {
log.Error("the connect of leadership watcher is unhealthy",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
Expand All @@ -239,6 +228,7 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
}
}

lastReceivedResponseTime = time.Now()
watchChan := watcher.Watch(watchChanCtx, ls.leaderKey, clientv3.WithRev(revision))
WatchChanLoop:
select {
Expand All @@ -249,23 +239,24 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
return
case <-ticker.C:
if !etcdutil.IsHealthy(serverCtx, ls.client) {
if time.Since(lastHealthyTime) > unhealthyTimeout {
log.Error("the connect of leadership watcher is unhealthy",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
return
}
goto WatchChanLoop
// When etcd is not available, the watcher.RequestProgress will block,
// so we check the etcd availability first.
continue
}
if err := watcher.RequestProgress(serverCtx); err != nil {
log.Warn("failed to request progress in watch loop", zap.Error(err))
}
if time.Since(lastReceivedResponseTime) >= etcdutil.WatchChTimeoutDuration {
continue
}
case wresp := <-watchChan:
lastReceivedResponseTime = time.Now()
// meet compacted error, use the compact revision.
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
lastHealthyTime = time.Now()
continue
} else if wresp.Err() != nil { // wresp.Err() contains CompactRevision not equal to 0
log.Error("leadership watcher is canceled with",
Expand All @@ -287,7 +278,6 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
}
revision = wresp.Header.Revision + 1
}
lastHealthyTime = time.Now()
goto WatchChanLoop // use goto to avoid to create a new watchChan
}
}
Expand Down
23 changes: 22 additions & 1 deletion pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,13 @@ const (
defaultLoadBatchSize = 400
defaultWatchChangeRetryInterval = 1 * time.Second
defaultForceLoadMinimalInterval = 200 * time.Millisecond

// If no msg comes from an etcd watchCh for RequestProgressInterval long,
// we should call RequestProgress of etcd client
RequestProgressInterval = 1 * time.Second
// If no msg comes from an etcd watchCh for WatchChTimeoutDuration long,
// we should cancel the watchCh and request a new watchCh from etcd client
WatchChTimeoutDuration = DefaultRequestTimeout
)

// LoopWatcher loads data from etcd and sets a watcher for it.
Expand Down Expand Up @@ -678,6 +685,9 @@ func (lw *LoopWatcher) initFromEtcd(ctx context.Context) int64 {
}

func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision int64, err error) {
ticker := time.NewTicker(RequestProgressInterval)
defer ticker.Stop()

watcher := clientv3.NewWatcher(lw.client)
defer watcher.Close()
var watchChanCancel context.CancelFunc
Expand All @@ -696,10 +706,18 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
watchChanCancel = cancel
opts := append(lw.opts, clientv3.WithRev(revision))
watchChan := watcher.Watch(watchChanCtx, lw.key, opts...)
lastReceivedResponseTime := time.Now()
WatchChanLoop:
select {
case <-ctx.Done():
return revision, nil
case <-ticker.C:
if err := watcher.RequestProgress(ctx); err != nil {
log.Warn("failed to request progress in watch loop", zap.Error(err))
}
if time.Since(lastReceivedResponseTime) >= WatchChTimeoutDuration {
continue
}
case <-lw.forceLoadCh:
revision, err = lw.load(ctx)
if err != nil {
Expand All @@ -708,6 +726,7 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
}
continue
case wresp := <-watchChan:
lastReceivedResponseTime = time.Now()
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision in watch loop",
zap.Int64("required-revision", revision),
Expand All @@ -719,6 +738,8 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
zap.Int64("revision", revision),
errs.ZapError(errs.ErrEtcdWatcherCancel, wresp.Err()))
return revision, wresp.Err()
} else if wresp.IsProgressNotify() {
goto WatchChanLoop
}
for _, event := range wresp.Events {
switch event.Type {
Expand Down Expand Up @@ -746,8 +767,8 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
zap.String("key", lw.key), zap.Error(err))
}
revision = wresp.Header.Revision + 1
goto WatchChanLoop // use goto to avoid to create a new watchChan
}
goto WatchChanLoop // use goto to avoid to create a new watchChan
}
}

Expand Down

0 comments on commit 385310c

Please sign in to comment.