Skip to content

Commit

Permalink
etcdutil, leadership: make more high availability
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Jun 21, 2023
1 parent ac31f87 commit 2ef12f1
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 100 deletions.
54 changes: 37 additions & 17 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand Down Expand Up @@ -182,26 +183,49 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
if ls == nil {
return
}
lastHealthyTime := time.Now()
watcher := clientv3.NewWatcher(ls.client)
defer watcher.Close()
ctx, cancel := context.WithCancel(serverCtx)
defer cancel()
// The revision is the revision of last modification on this key.
// If the revision is compacted, will meet required revision has been compacted error.
// In this case, use the compact revision to re-watch the key.
for {
failpoint.Inject("delayWatcher", nil)
rch := watcher.Watch(ctx, ls.leaderKey, clientv3.WithRev(revision))
for wresp := range rch {
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watchChanCtx, watchChanCancel := context.WithCancel(clientv3.WithRequireLeader(serverCtx))
defer watchChanCancel()

// when etcd is not available, the watcher.Watch will block.
// so we check the etcd availability first.
if _, err := etcdutil.EtcdKVGet(ls.client, ls.leaderKey); err != nil {
// If the watcher is unhealthy for more than 60 seconds, we should exit.
if time.Since(lastHealthyTime) > 60*time.Second {
log.Error("leadership watcher is unhealthy",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
return
}
// If the watcher is unhealthy, we should cancel the watchChan and retry.
// Because the etcdutil.EtcdKVGet has a timeout, we don't need to sleep here.
watchChanCancel()
continue
}
lastHealthyTime = time.Now()
watchChan := watcher.Watch(watchChanCtx, ls.leaderKey, clientv3.WithRev(revision))

select {
case <-serverCtx.Done():
// server closed, return
return
case wresp := <-watchChan:
// meet compacted error, use the compact revision.
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
break
}
if wresp.Canceled {
watchChanCancel()
continue
} else if wresp.Err() != nil { // wresp.Err() contains CompactRevision not equal to 0
log.Error("leadership watcher is canceled with",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
Expand All @@ -213,19 +237,15 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
for _, ev := range wresp.Events {
if ev.Type == mvccpb.DELETE {
log.Info("current leadership is deleted",
zap.Int64("revision", wresp.Header.Revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
return
}
}
revision = wresp.Header.Revision + 1
}

select {
case <-ctx.Done():
// server closed, return
return
default:
}
watchChanCancel()
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (s *Server) initClient() error {
if err != nil {
return err
}
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)[0])
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(s.ctx, &s.serverLoopWg, tlsConfig, []url.URL(u))
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (s *Server) initClient() error {
if err != nil {
return err
}
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls[0])
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(s.ctx, &s.serverLoopWg, tlsConfig, s.backendUrls)
return err
}

Expand Down
Loading

0 comments on commit 2ef12f1

Please sign in to comment.