diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 111a3701367..360371db364 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -269,97 +269,129 @@ func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client failpoint.Inject("closeTick", func() { failpoint.Return(client, err) }) + initHealthyChecker(tickerInterval, tlsConfig, client) - checker := &healthyChecker{ - tlsConfig: tlsConfig, + return client, err +} + +// healthyClient will wrap a etcd client and record its last health time. +// The etcd client inside will only maintain one connection to the etcd server +// to make sure each healthyClient could be used to check the health of a certain +// etcd endpoint without involving the load balancer of etcd client. +type healthyClient struct { + *clientv3.Client + lastHealth time.Time +} + +// healthyChecker is used to check the health of etcd endpoints. Inside the checker, +// we will maintain a map from each available etcd endpoint to its healthyClient. +type healthyChecker struct { + tickerInterval time.Duration + tlsConfig *tls.Config + + sync.Map // map[string]*healthyClient + // client is the etcd client the healthy checker is guarding, it will be set with + // the checked healthy endpoints dynamically and periodically. + client *clientv3.Client +} + +// initHealthyChecker initializes the healthy checker for etcd client. +func initHealthyChecker(tickerInterval time.Duration, tlsConfig *tls.Config, client *clientv3.Client) { + healthyChecker := &healthyChecker{ + tickerInterval: tickerInterval, + tlsConfig: tlsConfig, + client: client, } - eps := syncUrls(client) - checker.update(eps) + // Healthy checker has the same lifetime with the given etcd client. + ctx := client.Ctx() + // Sync etcd endpoints and check the last health time of each endpoint periodically. + go healthyChecker.syncer(ctx) + // Inspect the health of each endpoint by reading the health key periodically. + go healthyChecker.inspector(ctx) +} - // Create a goroutine to check the health of etcd endpoints periodically. - go func(client *clientv3.Client) { - defer logutil.LogPanic() - ticker := time.NewTicker(tickerInterval) - defer ticker.Stop() - lastAvailable := time.Now() - for { - select { - case <-client.Ctx().Done(): - log.Info("etcd client is closed, exit health check goroutine") - checker.Range(func(key, value interface{}) bool { - client := value.(*healthyClient) - client.Close() - return true - }) - return - case <-ticker.C: - usedEps := client.Endpoints() - healthyEps := checker.patrol(client.Ctx()) - if len(healthyEps) == 0 { - // when all endpoints are unhealthy, try to reset endpoints to update connect - // rather than delete them to avoid there is no any endpoint in client. - // Note: reset endpoints will trigger subconn closed, and then trigger reconnect. - // otherwise, the subconn will be retrying in grpc layer and use exponential backoff, - // and it cannot recover as soon as possible. - if time.Since(lastAvailable) > etcdServerDisconnectedTimeout { - log.Info("no available endpoint, try to reset endpoints", zap.Strings("last-endpoints", usedEps)) - client.SetEndpoints([]string{}...) - client.SetEndpoints(usedEps...) - } - } else { - if !typeutil.AreStringSlicesEquivalent(healthyEps, usedEps) { - client.SetEndpoints(healthyEps...) - change := fmt.Sprintf("%d->%d", len(usedEps), len(healthyEps)) - etcdStateGauge.WithLabelValues("endpoints").Set(float64(len(healthyEps))) - log.Info("update endpoints", zap.String("num-change", change), - zap.Strings("last-endpoints", usedEps), zap.Strings("endpoints", client.Endpoints())) - } - lastAvailable = time.Now() - } - } +func (checker *healthyChecker) syncer(ctx context.Context) { + defer logutil.LogPanic() + checker.update() + ticker := time.NewTicker(checker.tickerInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + log.Info("etcd client is closed, exit update endpoint goroutine") + return + case <-ticker.C: + checker.update() } - }(client) + } +} - // Notes: use another goroutine to update endpoints to avoid blocking health check in the first goroutine. - go func(client *clientv3.Client) { - defer logutil.LogPanic() - ticker := time.NewTicker(tickerInterval) - defer ticker.Stop() - for { - select { - case <-client.Ctx().Done(): - log.Info("etcd client is closed, exit update endpoint goroutine") - return - case <-ticker.C: - eps := syncUrls(client) - checker.update(eps) +func (checker *healthyChecker) inspector(ctx context.Context) { + defer logutil.LogPanic() + ticker := time.NewTicker(checker.tickerInterval) + defer ticker.Stop() + lastAvailable := time.Now() + for { + select { + case <-ctx.Done(): + log.Info("etcd client is closed, exit health check goroutine") + checker.close() + return + case <-ticker.C: + lastEps := checker.client.Endpoints() + healthyEps := checker.patrol(ctx) + if len(healthyEps) == 0 { + // when all endpoints are unhealthy, try to reset endpoints to update connect + // rather than delete them to avoid there is no any endpoint in client. + // Note: reset endpoints will trigger subconn closed, and then trigger reconnect. + // otherwise, the subconn will be retrying in grpc layer and use exponential backoff, + // and it cannot recover as soon as possible. + if time.Since(lastAvailable) > etcdServerDisconnectedTimeout { + log.Info("no available endpoint, try to reset endpoints", + zap.Strings("last-endpoints", lastEps)) + resetClientEndpoints(checker.client, lastEps...) + } + } else { + if !typeutil.AreStringSlicesEquivalent(healthyEps, lastEps) { + checker.client.SetEndpoints(healthyEps...) + etcdStateGauge.WithLabelValues("endpoints").Set(float64(len(healthyEps))) + log.Info("update endpoints", + zap.String("num-change", fmt.Sprintf("%d->%d", len(lastEps), len(healthyEps))), + zap.Strings("last-endpoints", lastEps), + zap.Strings("endpoints", checker.client.Endpoints())) + } + lastAvailable = time.Now() } } - }(client) - - return client, err + } } -type healthyClient struct { - *clientv3.Client - lastHealth time.Time +func (checker *healthyChecker) close() { + checker.Range(func(key, value interface{}) bool { + client := value.(*healthyClient) + client.Close() + return true + }) } -type healthyChecker struct { - sync.Map // map[string]*healthyClient - tlsConfig *tls.Config +// Reset the etcd client endpoints to trigger reconnect. +func resetClientEndpoints(client *clientv3.Client, endpoints ...string) { + client.SetEndpoints() + client.SetEndpoints(endpoints...) } func (checker *healthyChecker) patrol(ctx context.Context) []string { // See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L105-L145 - var wg sync.WaitGroup count := 0 checker.Range(func(key, value interface{}) bool { count++ return true }) - hch := make(chan string, count) - healthyList := make([]string, 0, count) + var ( + wg sync.WaitGroup + hch = make(chan string, count) + healthyList = make([]string, 0, count) + ) checker.Range(func(key, value interface{}) bool { wg.Add(1) go func(key, value interface{}) { @@ -386,8 +418,9 @@ func (checker *healthyChecker) patrol(ctx context.Context) []string { return healthyList } -func (checker *healthyChecker) update(eps []string) { - epMap := make(map[string]struct{}) +func (checker *healthyChecker) update() { + eps := syncUrls(checker.client) + epMap := make(map[string]struct{}, len(eps)) for _, ep := range eps { epMap[ep] = struct{}{} } @@ -401,9 +434,7 @@ func (checker *healthyChecker) update(eps []string) { checker.removeClient(ep) } if time.Since(lastHealthy) > etcdServerDisconnectedTimeout { - // try to reset client endpoint to trigger reconnect - client.(*healthyClient).Client.SetEndpoints([]string{}...) - client.(*healthyClient).Client.SetEndpoints(ep) + resetClientEndpoints(client.(*healthyClient).Client, ep) } continue }