From 2ef12f1c3a7e227bb40628e64bf5e73caefb2fbe Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 21 Jun 2023 20:27:30 +0800 Subject: [PATCH] etcdutil, leadership: make more high availability Signed-off-by: lhy1024 --- pkg/election/leadership.go | 54 +++-- pkg/mcs/resourcemanager/server/server.go | 2 +- pkg/mcs/tso/server/server.go | 2 +- pkg/utils/etcdutil/etcdutil.go | 285 ++++++++++++++++++----- pkg/utils/etcdutil/etcdutil_test.go | 50 ++-- server/server.go | 20 +- 6 files changed, 313 insertions(+), 100 deletions(-) diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index e10650704e39..499eb117b1f6 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -18,6 +18,7 @@ import ( "context" "sync" "sync/atomic" + "time" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" @@ -182,26 +183,49 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { if ls == nil { return } + lastHealthyTime := time.Now() watcher := clientv3.NewWatcher(ls.client) defer watcher.Close() - ctx, cancel := context.WithCancel(serverCtx) - defer cancel() - // The revision is the revision of last modification on this key. - // If the revision is compacted, will meet required revision has been compacted error. - // In this case, use the compact revision to re-watch the key. for { failpoint.Inject("delayWatcher", nil) - rch := watcher.Watch(ctx, ls.leaderKey, clientv3.WithRev(revision)) - for wresp := range rch { + // In order to prevent a watch stream being stuck in a partitioned node, + // make sure to wrap context with "WithRequireLeader". + watchChanCtx, watchChanCancel := context.WithCancel(clientv3.WithRequireLeader(serverCtx)) + defer watchChanCancel() + + // when etcd is not available, the watcher.Watch will block. + // so we check the etcd availability first. + if _, err := etcdutil.EtcdKVGet(ls.client, ls.leaderKey); err != nil { + // If the watcher is unhealthy for more than 60 seconds, we should exit. + if time.Since(lastHealthyTime) > 60*time.Second { + log.Error("leadership watcher is unhealthy", + zap.Int64("revision", revision), + zap.String("leader-key", ls.leaderKey), + zap.String("purpose", ls.purpose)) + return + } + // If the watcher is unhealthy, we should cancel the watchChan and retry. + // Because the etcdutil.EtcdKVGet has a timeout, we don't need to sleep here. + watchChanCancel() + continue + } + lastHealthyTime = time.Now() + watchChan := watcher.Watch(watchChanCtx, ls.leaderKey, clientv3.WithRev(revision)) + + select { + case <-serverCtx.Done(): + // server closed, return + return + case wresp := <-watchChan: // meet compacted error, use the compact revision. if wresp.CompactRevision != 0 { log.Warn("required revision has been compacted, use the compact revision", zap.Int64("required-revision", revision), zap.Int64("compact-revision", wresp.CompactRevision)) revision = wresp.CompactRevision - break - } - if wresp.Canceled { + watchChanCancel() + continue + } else if wresp.Err() != nil { // wresp.Err() contains CompactRevision not equal to 0 log.Error("leadership watcher is canceled with", zap.Int64("revision", revision), zap.String("leader-key", ls.leaderKey), @@ -213,19 +237,15 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { for _, ev := range wresp.Events { if ev.Type == mvccpb.DELETE { log.Info("current leadership is deleted", + zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose)) return } } + revision = wresp.Header.Revision + 1 } - - select { - case <-ctx.Done(): - // server closed, return - return - default: - } + watchChanCancel() } } diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 6705c4b1da9e..59b32aaf506a 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -264,7 +264,7 @@ func (s *Server) initClient() error { if err != nil { return err } - s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)[0]) + s.etcdClient, s.httpClient, err = etcdutil.CreateClients(s.ctx, &s.serverLoopWg, tlsConfig, []url.URL(u)) return err } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 1b4b5c363746..9f8aa3f7ff12 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -386,7 +386,7 @@ func (s *Server) initClient() error { if err != nil { return err } - s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls[0]) + s.etcdClient, s.httpClient, err = etcdutil.CreateClients(s.ctx, &s.serverLoopWg, tlsConfig, s.backendUrls) return err } diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 4b1f1d7fa151..6ea809ef70f9 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -17,9 +17,11 @@ package etcdutil import ( "context" "crypto/tls" + "fmt" "math/rand" "net/http" "net/url" + "sort" "sync" "time" @@ -27,11 +29,13 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/etcdserver" + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/types" "go.uber.org/zap" @@ -41,9 +45,6 @@ const ( // defaultEtcdClientTimeout is the default timeout for etcd client. defaultEtcdClientTimeout = 3 * time.Second - // defaultAutoSyncInterval is the interval to sync etcd cluster. - defaultAutoSyncInterval = 60 * time.Second - // defaultDialKeepAliveTime is the time after which client pings the server to see if transport is alive. defaultDialKeepAliveTime = 10 * time.Second @@ -196,72 +197,252 @@ func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value return kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID)) } -// CreateClients creates etcd v3 client and http client. -func CreateClients(tlsConfig *tls.Config, acUrls url.URL) (*clientv3.Client, *http.Client, error) { - client, err := CreateEtcdClient(tlsConfig, acUrls) - if err != nil { - return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() - } - httpClient := createHTTPClient(tlsConfig) - return client, httpClient, nil +var etcdStateGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "server", + Name: "etcd_client", + Help: "Etcd raft states.", + }, []string{"type"}) + +func init() { + prometheus.MustRegister(etcdStateGauge) } -// createEtcdClientWithMultiEndpoint creates etcd v3 client. -// Note: it will be used by micro service server and support multi etcd endpoints. -// FIXME: But it cannot switch etcd endpoints as soon as possible when one of endpoints is with io hang. -func createEtcdClientWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, error) { - if len(acUrls) == 0 { - return nil, errs.ErrNewEtcdClient.FastGenByArgs("no available etcd address") - } - endpoints := make([]string, 0, len(acUrls)) - for _, u := range acUrls { - endpoints = append(endpoints, u.String()) - } +func newClient(tlsConfig *tls.Config, acURL ...string) (*clientv3.Client, error) { lgc := zap.NewProductionConfig() lgc.Encoding = log.ZapEncodingName - autoSyncInterval := defaultAutoSyncInterval - dialKeepAliveTime := defaultDialKeepAliveTime - dialKeepAliveTimeout := defaultDialKeepAliveTimeout - failpoint.Inject("autoSyncInterval", func() { - autoSyncInterval = 10 * time.Millisecond - }) - failpoint.Inject("closeKeepAliveCheck", func() { - autoSyncInterval = 0 - dialKeepAliveTime = 0 - dialKeepAliveTimeout = 0 - }) client, err := clientv3.New(clientv3.Config{ - Endpoints: endpoints, + Endpoints: acURL, DialTimeout: defaultEtcdClientTimeout, - AutoSyncInterval: autoSyncInterval, TLS: tlsConfig, LogConfig: &lgc, - DialKeepAliveTime: dialKeepAliveTime, - DialKeepAliveTimeout: dialKeepAliveTimeout, + DialKeepAliveTime: defaultDialKeepAliveTime, + DialKeepAliveTimeout: defaultDialKeepAliveTimeout, }) - if err == nil { - log.Info("create etcd v3 client", zap.Strings("endpoints", endpoints)) + return client, err +} + +// CreateEtcdClient creates etcd v3 client with detecting endpoints. +func CreateEtcdClient(ctx context.Context, wg *sync.WaitGroup, tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client, error) { + urls := make([]string, 0, len(acURLs)) + for _, u := range acURLs { + urls = append(urls, u.String()) + } + client, err := newClient(tlsConfig, urls...) + if err != nil { + return nil, err + } + checker := &healthyChecker{ + tlsConfig: tlsConfig, } + eps := syncUrls(ctx, client) + checker.update(eps) + tickerInterval := defaultDialKeepAliveTime + failpoint.Inject("fastTick", func() { + tickerInterval = 100 * time.Millisecond + }) + failpoint.Inject("closeTick", func() { + tickerInterval = 0 + }) + if tickerInterval == 0 { + return client, err + } + + wg.Add(2) + go func(ctx context.Context, client *clientv3.Client) { + defer logutil.LogPanic() + defer wg.Done() + ticker := time.NewTicker(tickerInterval) + defer ticker.Stop() + lastReset := time.Now() + lastAvailable := time.Now() + for { + select { + case <-client.Ctx().Done(): + return + case <-ctx.Done(): + return + case <-ticker.C: + usedEps := client.Endpoints() + healthyEps := checker.patrol(ctx) + if len(healthyEps) == 0 { + // when all endpoints are unhealthy, try to reset endpoints rather than delete them + // to avoid blocking there is no any endpoint in client. + if time.Since(lastAvailable) > 60*time.Second && time.Since(lastReset) > 10*time.Second { // reset endpoints after 60s + log.Info("[etcd client] no available endpoint, try to reset endpoints", zap.Strings("last-endpoints", usedEps)) + client.SetEndpoints(healthyEps...) + client.SetEndpoints(usedEps...) + lastReset = time.Now() + } + } else { + if !isEqual(healthyEps, usedEps) { + client.SetEndpoints(healthyEps...) + change := fmt.Sprintf("%d->%d", len(usedEps), len(healthyEps)) + etcdStateGauge.WithLabelValues("ep").Set(float64(len(healthyEps))) + log.Info("[etcd client] update endpoints", zap.String("num-change", change), + zap.Strings("last-endpoints", usedEps), zap.Strings("endpoints", client.Endpoints())) + } + lastAvailable = time.Now() + } + } + } + }(ctx, client) + + // Notes: use another goroutine to update endpoints to avoid blocking health check in the first goroutine. + go func(ctx context.Context, client *clientv3.Client) { + defer logutil.LogPanic() + defer wg.Done() + ticker := time.NewTicker(tickerInterval) + defer ticker.Stop() + for { + select { + case <-client.Ctx().Done(): + return + case <-ctx.Done(): + return + case <-ticker.C: + eps := syncUrls(ctx, client) + checker.update(eps) + } + } + }(ctx, client) + return client, err } -// CreateEtcdClient creates etcd v3 client. -// Note: it will be used by legacy pd-server, and only connect to leader only. -func CreateEtcdClient(tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) { - lgc := zap.NewProductionConfig() - lgc.Encoding = log.ZapEncodingName - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{acURL.String()}, - DialTimeout: defaultEtcdClientTimeout, - TLS: tlsConfig, - LogConfig: &lgc, +// offlineTimeout is the timeout for an unhealthy etcd endpoint to be offline from healthy checker. +const offlineTimeout = 120 * time.Minute +const disconnectedTimeout = 1 * time.Minute + +type healthyClient struct { + *clientv3.Client + lastHealth time.Time +} + +type healthyChecker struct { + sync.Map // map[string]*healthyClient + tlsConfig *tls.Config +} + +func (checker *healthyChecker) patrol(ctx context.Context) []string { + // See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L105 + var wg sync.WaitGroup + count := 0 + checker.Range(func(key, value interface{}) bool { + count++ + return true }) - if err == nil { - log.Info("create etcd v3 client", zap.String("endpoints", acURL.String())) + hch := make(chan string, count) + healthyList := make([]string, 0, count) + checker.Range(func(key, value interface{}) bool { + wg.Add(1) + go func(key, value interface{}) { + defer wg.Done() + defer logutil.LogPanic() + ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(ctx), DefaultRequestTimeout) + defer cancel() + ep := key.(string) + client := value.(*healthyClient) + _, err := client.Get(ctx, "health") + // permission denied is OK since proposal goes through consensus to get it + if err == nil || err == rpctypes.ErrPermissionDenied { + hch <- ep + checker.Store(ep, &healthyClient{ + Client: client.Client, + lastHealth: time.Now(), + }) + return + } + }(key, value) + return true + }) + wg.Wait() + close(hch) + for h := range hch { + healthyList = append(healthyList, h) } - return client, err + return healthyList +} + +func (checker *healthyChecker) update(eps []string) { + for _, ep := range eps { + // check if client exists, if not, create one, if exists, check if it's offline + if client, ok := checker.Load(ep); ok { + lastHealthy := client.(*healthyClient).lastHealth + if time.Since(lastHealthy) > offlineTimeout { + log.Info("[etcd client] some endpoint maybe offline", zap.String("endpoint", ep)) + checker.Delete(ep) + } + if time.Since(lastHealthy) > disconnectedTimeout { + // try to update client + checker.addClient(ep, lastHealthy) + } + continue + } + checker.addClient(ep, time.Now()) + } +} + +func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) { + client, err := newClient(checker.tlsConfig, ep) + if err != nil { + log.Error("[etcd client] failed to create etcd healthy client", zap.Error(err)) + return + } + checker.Store(ep, &healthyClient{ + Client: client, + lastHealth: lastHealth, + }) +} + +func syncUrls(ctx context.Context, client *clientv3.Client) []string { + // See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/clientv3/client.go#L170 + ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(ctx), DefaultRequestTimeout) + defer cancel() + now := time.Now() + mresp, err := client.MemberList(ctx) + if err != nil { + log.Error("[etcd client] failed to list members", errs.ZapError(err)) + return []string{} + } + if time.Since(now) > defaultEtcdClientTimeout { + log.Warn("[etcd client] sync etcd members slow", zap.Duration("cost", time.Since(now)), zap.Int("members", len(mresp.Members))) + } + var eps []string + for _, m := range mresp.Members { + if len(m.Name) != 0 && !m.IsLearner { + eps = append(eps, m.ClientURLs...) + } + } + return eps +} + +func isEqual(a, b []string) bool { + if len(a) != len(b) { + return false + } + sort.Strings(a) + sort.Strings(b) + for i, v := range a { + if v != b[i] { + return false + } + } + return true +} + +// CreateClients creates etcd v3 client and http client. +func CreateClients(ctx context.Context, wg *sync.WaitGroup, tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) { + client, err := CreateEtcdClient(ctx, wg, tlsConfig, acUrls) + if err != nil { + return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() + } + httpClient := createHTTPClient(tlsConfig) + return client, httpClient, nil } +// createHTTPClient creates a http client with the given tls config. func createHTTPClient(tlsConfig *tls.Config) *http.Client { return &http.Client{ Transport: &http.Transport{ diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 4b7e2d9957be..3a0a9237d5bc 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -233,7 +233,9 @@ func TestInitClusterID(t *testing.T) { func TestEtcdClientSync(t *testing.T) { re := require.New(t) - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval", "return(true)")) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)")) + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} // Start a etcd server. cfg1 := NewTestSingleConfig(t) @@ -247,7 +249,7 @@ func TestEtcdClientSync(t *testing.T) { ep1 := cfg1.LCUrls[0].String() urls, err := types.NewURLs([]string{ep1}) re.NoError(err) - client1, err := createEtcdClientWithMultiEndpoint(nil, urls) + client1, err := CreateEtcdClient(ctx, &wg, nil, urls) defer func() { client1.Close() }() @@ -258,40 +260,44 @@ func TestEtcdClientSync(t *testing.T) { etcd2 := checkAddEtcdMember(t, cfg1, client1) defer etcd2.Close() checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2}) + time.Sleep(200 * time.Millisecond) // wait for etcd client sync endpoints and client will be connected to etcd2 // Remove the first member and close the etcd1. _, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID())) re.NoError(err) - time.Sleep(20 * time.Millisecond) // wait for etcd client sync endpoints and client will be connected to etcd2 etcd1.Close() // Check the client can get the new member with the new endpoints. - listResp3, err := ListEtcdMembers(client1) - re.NoError(err) - re.Len(listResp3.Members, 1) - re.Equal(uint64(etcd2.Server.ID()), listResp3.Members[0].ID) + testutil.Eventually(re, func() bool { + listResp, err := ListEtcdMembers(client1) + return err == nil && len(listResp.Members) == 1 && listResp.Members[0].ID == uint64(etcd2.Server.ID()) + }) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval")) + cancel() + wg.Wait() + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) } func TestEtcdWithHangLeaderEnableCheck(t *testing.T) { re := require.New(t) var err error // Test with enable check. - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval", "return(true)")) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)")) err = checkEtcdWithHangLeader(t) re.NoError(err) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) // Test with disable check. - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck", "return(true)")) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/closeTick", "return(true)")) err = checkEtcdWithHangLeader(t) re.Error(err) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeTick")) } func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) { re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} // Start a etcd server. cfg1 := NewTestSingleConfig(t) etcd1, err := embed.StartEtcd(cfg1) @@ -305,12 +311,12 @@ func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) { // Create two etcd clients with etcd1 as endpoint. urls, err := types.NewURLs([]string{ep1}) re.NoError(err) - client1, err := CreateEtcdClient(nil, urls[0]) // execute member change operation with this client + client1, err := CreateEtcdClient(ctx, &wg, nil, urls) // execute member change operation with this client defer func() { client1.Close() }() re.NoError(err) - client2, err := CreateEtcdClient(nil, urls[0]) // check member change with this client + client2, err := CreateEtcdClient(ctx, &wg, nil, urls) // check member change with this client defer func() { client2.Close() }() @@ -327,10 +333,14 @@ func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) { _, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID())) re.NoError(err) checkMembers(re, client2, []*embed.Etcd{etcd2}) + cancel() + wg.Wait() } func checkEtcdWithHangLeader(t *testing.T) error { re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} // Start a etcd server. cfg1 := NewTestSingleConfig(t) etcd1, err := embed.StartEtcd(cfg1) @@ -349,7 +359,7 @@ func checkEtcdWithHangLeader(t *testing.T) error { // Create a etcd client with etcd1 as endpoint. urls, err := types.NewURLs([]string{proxyAddr}) re.NoError(err) - client1, err := createEtcdClientWithMultiEndpoint(nil, urls) + client1, err := CreateEtcdClient(ctx, &wg, nil, urls) defer func() { client1.Close() }() @@ -365,6 +375,8 @@ func checkEtcdWithHangLeader(t *testing.T) error { enableDiscard.Store(true) time.Sleep(defaultDialKeepAliveTime + defaultDialKeepAliveTimeout*2) _, err = EtcdKVGet(client1, "test/key1") + cancel() + wg.Wait() return err } @@ -482,7 +494,7 @@ func (suite *loopWatcherTestSuite) SetupSuite() { ep1 := suite.config.LCUrls[0].String() urls, err := types.NewURLs([]string{ep1}) suite.NoError(err) - suite.client, err = CreateEtcdClient(nil, urls[0]) + suite.client, err = CreateEtcdClient(suite.ctx, &suite.wg, nil, urls) suite.NoError(err) suite.cleans = append(suite.cleans, func() { suite.client.Close() @@ -685,7 +697,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { // Case2: close the etcd client and put a new value after watcher restarts suite.client.Close() - suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0]) + suite.client, err = CreateEtcdClient(suite.ctx, &suite.wg, nil, suite.config.LCUrls) suite.NoError(err) watcher.updateClientCh <- suite.client suite.put("TestWatcherBreak", "2") @@ -693,7 +705,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { // Case3: close the etcd client and put a new value before watcher restarts suite.client.Close() - suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0]) + suite.client, err = CreateEtcdClient(suite.ctx, &suite.wg, nil, suite.config.LCUrls) suite.NoError(err) suite.put("TestWatcherBreak", "3") watcher.updateClientCh <- suite.client @@ -701,7 +713,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { // Case4: close the etcd client and put a new value with compact suite.client.Close() - suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0]) + suite.client, err = CreateEtcdClient(suite.ctx, &suite.wg, nil, suite.config.LCUrls) suite.NoError(err) suite.put("TestWatcherBreak", "4") resp, err := EtcdKVGet(suite.client, "TestWatcherBreak") diff --git a/server/server.go b/server/server.go index 40f5f4d59bd1..e2c068fbf818 100644 --- a/server/server.go +++ b/server/server.go @@ -338,12 +338,12 @@ func (s *Server) startEtcd(ctx context.Context) error { } // start client - s.client, s.httpClient, err = startClient(s.cfg) + s.client, s.httpClient, err = s.startClient() if err != nil { return err } - s.electionClient, err = startElectionClient(s.cfg) + s.electionClient, err = s.startElectionClient() if err != nil { return err } @@ -370,29 +370,29 @@ func (s *Server) startEtcd(ctx context.Context) error { return nil } -func startClient(cfg *config.Config) (*clientv3.Client, *http.Client, error) { - tlsConfig, err := cfg.Security.ToTLSConfig() +func (s *Server) startClient() (*clientv3.Client, *http.Client, error) { + tlsConfig, err := s.cfg.Security.ToTLSConfig() if err != nil { return nil, nil, err } - etcdCfg, err := cfg.GenEmbedEtcdConfig() + etcdCfg, err := s.cfg.GenEmbedEtcdConfig() if err != nil { return nil, nil, err } - return etcdutil.CreateClients(tlsConfig, etcdCfg.ACUrls[0]) + return etcdutil.CreateClients(s.ctx, &s.serverLoopWg, tlsConfig, etcdCfg.ACUrls) } -func startElectionClient(cfg *config.Config) (*clientv3.Client, error) { - tlsConfig, err := cfg.Security.ToTLSConfig() +func (s *Server) startElectionClient() (*clientv3.Client, error) { + tlsConfig, err := s.cfg.Security.ToTLSConfig() if err != nil { return nil, err } - etcdCfg, err := cfg.GenEmbedEtcdConfig() + etcdCfg, err := s.cfg.GenEmbedEtcdConfig() if err != nil { return nil, err } - return etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls[0]) + return etcdutil.CreateEtcdClient(s.ctx, &s.serverLoopWg, tlsConfig, etcdCfg.ACUrls) } // AddStartCallback adds a callback in the startServer phase.