Skip to content

Commit

Permalink
etcdutil: refine the etcd client healthy checker code (tikv#7727)
Browse files Browse the repository at this point in the history
ref tikv#7499

Refine the etcd client healthy checker code.

Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Jan 17, 2024
1 parent aa9c83c commit 8f4f81f
Showing 1 changed file with 107 additions and 76 deletions.
183 changes: 107 additions & 76 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,97 +269,129 @@ func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client
failpoint.Inject("closeTick", func() {
failpoint.Return(client, err)
})
initHealthyChecker(tickerInterval, tlsConfig, client)

checker := &healthyChecker{
tlsConfig: tlsConfig,
return client, err
}

// healthyClient will wrap a etcd client and record its last health time.
// The etcd client inside will only maintain one connection to the etcd server
// to make sure each healthyClient could be used to check the health of a certain
// etcd endpoint without involving the load balancer of etcd client.
type healthyClient struct {
*clientv3.Client
lastHealth time.Time
}

// healthyChecker is used to check the health of etcd endpoints. Inside the checker,
// we will maintain a map from each available etcd endpoint to its healthyClient.
type healthyChecker struct {
tickerInterval time.Duration
tlsConfig *tls.Config

sync.Map // map[string]*healthyClient
// client is the etcd client the healthy checker is guarding, it will be set with
// the checked healthy endpoints dynamically and periodically.
client *clientv3.Client
}

// initHealthyChecker initializes the healthy checker for etcd client.
func initHealthyChecker(tickerInterval time.Duration, tlsConfig *tls.Config, client *clientv3.Client) {
healthyChecker := &healthyChecker{
tickerInterval: tickerInterval,
tlsConfig: tlsConfig,
client: client,
}
eps := syncUrls(client)
checker.update(eps)
// Healthy checker has the same lifetime with the given etcd client.
ctx := client.Ctx()
// Sync etcd endpoints and check the last health time of each endpoint periodically.
go healthyChecker.syncer(ctx)
// Inspect the health of each endpoint by reading the health key periodically.
go healthyChecker.inspector(ctx)
}

// Create a goroutine to check the health of etcd endpoints periodically.
go func(client *clientv3.Client) {
defer logutil.LogPanic()
ticker := time.NewTicker(tickerInterval)
defer ticker.Stop()
lastAvailable := time.Now()
for {
select {
case <-client.Ctx().Done():
log.Info("etcd client is closed, exit health check goroutine")
checker.Range(func(key, value interface{}) bool {
client := value.(*healthyClient)
client.Close()
return true
})
return
case <-ticker.C:
usedEps := client.Endpoints()
healthyEps := checker.patrol(client.Ctx())
if len(healthyEps) == 0 {
// when all endpoints are unhealthy, try to reset endpoints to update connect
// rather than delete them to avoid there is no any endpoint in client.
// Note: reset endpoints will trigger subconn closed, and then trigger reconnect.
// otherwise, the subconn will be retrying in grpc layer and use exponential backoff,
// and it cannot recover as soon as possible.
if time.Since(lastAvailable) > etcdServerDisconnectedTimeout {
log.Info("no available endpoint, try to reset endpoints", zap.Strings("last-endpoints", usedEps))
client.SetEndpoints([]string{}...)
client.SetEndpoints(usedEps...)
}
} else {
if !typeutil.AreStringSlicesEquivalent(healthyEps, usedEps) {
client.SetEndpoints(healthyEps...)
change := fmt.Sprintf("%d->%d", len(usedEps), len(healthyEps))
etcdStateGauge.WithLabelValues("endpoints").Set(float64(len(healthyEps)))
log.Info("update endpoints", zap.String("num-change", change),
zap.Strings("last-endpoints", usedEps), zap.Strings("endpoints", client.Endpoints()))
}
lastAvailable = time.Now()
}
}
func (checker *healthyChecker) syncer(ctx context.Context) {
defer logutil.LogPanic()
checker.update()
ticker := time.NewTicker(checker.tickerInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("etcd client is closed, exit update endpoint goroutine")
return
case <-ticker.C:
checker.update()
}
}(client)
}
}

// Notes: use another goroutine to update endpoints to avoid blocking health check in the first goroutine.
go func(client *clientv3.Client) {
defer logutil.LogPanic()
ticker := time.NewTicker(tickerInterval)
defer ticker.Stop()
for {
select {
case <-client.Ctx().Done():
log.Info("etcd client is closed, exit update endpoint goroutine")
return
case <-ticker.C:
eps := syncUrls(client)
checker.update(eps)
func (checker *healthyChecker) inspector(ctx context.Context) {
defer logutil.LogPanic()
ticker := time.NewTicker(checker.tickerInterval)
defer ticker.Stop()
lastAvailable := time.Now()
for {
select {
case <-ctx.Done():
log.Info("etcd client is closed, exit health check goroutine")
checker.close()
return
case <-ticker.C:
lastEps := checker.client.Endpoints()
healthyEps := checker.patrol(ctx)
if len(healthyEps) == 0 {
// when all endpoints are unhealthy, try to reset endpoints to update connect
// rather than delete them to avoid there is no any endpoint in client.
// Note: reset endpoints will trigger subconn closed, and then trigger reconnect.
// otherwise, the subconn will be retrying in grpc layer and use exponential backoff,
// and it cannot recover as soon as possible.
if time.Since(lastAvailable) > etcdServerDisconnectedTimeout {
log.Info("no available endpoint, try to reset endpoints",
zap.Strings("last-endpoints", lastEps))
resetClientEndpoints(checker.client, lastEps...)
}
} else {
if !typeutil.AreStringSlicesEquivalent(healthyEps, lastEps) {
checker.client.SetEndpoints(healthyEps...)
etcdStateGauge.WithLabelValues("endpoints").Set(float64(len(healthyEps)))
log.Info("update endpoints",
zap.String("num-change", fmt.Sprintf("%d->%d", len(lastEps), len(healthyEps))),
zap.Strings("last-endpoints", lastEps),
zap.Strings("endpoints", checker.client.Endpoints()))
}
lastAvailable = time.Now()
}
}
}(client)

return client, err
}
}

type healthyClient struct {
*clientv3.Client
lastHealth time.Time
func (checker *healthyChecker) close() {
checker.Range(func(key, value interface{}) bool {
client := value.(*healthyClient)
client.Close()
return true
})
}

type healthyChecker struct {
sync.Map // map[string]*healthyClient
tlsConfig *tls.Config
// Reset the etcd client endpoints to trigger reconnect.
func resetClientEndpoints(client *clientv3.Client, endpoints ...string) {
client.SetEndpoints()
client.SetEndpoints(endpoints...)
}

func (checker *healthyChecker) patrol(ctx context.Context) []string {
// See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L105-L145
var wg sync.WaitGroup
count := 0
checker.Range(func(key, value interface{}) bool {
count++
return true
})
hch := make(chan string, count)
healthyList := make([]string, 0, count)
var (
wg sync.WaitGroup
hch = make(chan string, count)
healthyList = make([]string, 0, count)
)
checker.Range(func(key, value interface{}) bool {
wg.Add(1)
go func(key, value interface{}) {
Expand All @@ -386,8 +418,9 @@ func (checker *healthyChecker) patrol(ctx context.Context) []string {
return healthyList
}

func (checker *healthyChecker) update(eps []string) {
epMap := make(map[string]struct{})
func (checker *healthyChecker) update() {
eps := syncUrls(checker.client)
epMap := make(map[string]struct{}, len(eps))
for _, ep := range eps {
epMap[ep] = struct{}{}
}
Expand All @@ -401,9 +434,7 @@ func (checker *healthyChecker) update(eps []string) {
checker.removeClient(ep)
}
if time.Since(lastHealthy) > etcdServerDisconnectedTimeout {
// try to reset client endpoint to trigger reconnect
client.(*healthyClient).Client.SetEndpoints([]string{}...)
client.(*healthyClient).Client.SetEndpoints(ep)
resetClientEndpoints(client.(*healthyClient).Client, ep)
}
continue
}
Expand Down

0 comments on commit 8f4f81f

Please sign in to comment.