diff --git a/go.mod b/go.mod index 5cc8b55950c..7e908bd18fd 100644 --- a/go.mod +++ b/go.mod @@ -173,6 +173,7 @@ require ( github.com/yusufpapurcu/wmi v1.2.2 // indirect // Fix panic in unit test with go >= 1.14, ref: etcd-io/bbolt#201 https://github.com/etcd-io/bbolt/pull/201 go.etcd.io/bbolt v1.3.6 // indirect + go.etcd.io/etcd/api/v3 v3.5.9 go.uber.org/atomic v1.10.0 go.uber.org/dig v1.9.0 // indirect go.uber.org/fx v1.12.0 // indirect diff --git a/go.sum b/go.sum index ac034473b3b..dc48e262264 100644 --- a/go.sum +++ b/go.sum @@ -595,6 +595,8 @@ go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= go.etcd.io/etcd v0.5.0-alpha.5.0.20220915004622-85b640cee793 h1:fqmtdYQlwZ/vKWSz5amW+a4cnjg23ojz5iL7rjf08Wg= go.etcd.io/etcd v0.5.0-alpha.5.0.20220915004622-85b640cee793/go.mod h1:eBhtbxXP1qpW0F6+WxoJ64DM1Mrfx46PHtVxEdkLe0I= +go.etcd.io/etcd/api/v3 v3.5.9 h1:4wSsluwyTbGGmyjJktOf3wFQoTBIURXHnq9n/G/JQHs= +go.etcd.io/etcd/api/v3 v3.5.9/go.mod h1:uyAal843mC8uUVSLWz6eHa/d971iDGnCRpmKd2Z+X8k= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 6705c4b1da9..459664c3aaf 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -264,7 +264,7 @@ func (s *Server) initClient() error { if err != nil { return err } - s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)[0]) + s.etcdClient, s.httpClient, err = etcdutil.CreateClients(s.ctx, tlsConfig, []url.URL(u)[0]) return err } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 1b4b5c36374..0b73098549c 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -386,7 +386,7 @@ func (s *Server) initClient() error { if err != nil { return err } - s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls[0]) + s.etcdClient, s.httpClient, err = etcdutil.CreateClients(s.ctx, tlsConfig, s.backendUrls[0]) return err } diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 4b1f1d7fa15..54ddcf4f8e1 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -17,9 +17,11 @@ package etcdutil import ( "context" "crypto/tls" + "fmt" "math/rand" "net/http" "net/url" + "sort" "sync" "time" @@ -27,9 +29,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/typeutil" + "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/etcdserver" "go.etcd.io/etcd/mvcc/mvccpb" @@ -197,8 +201,8 @@ func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value } // CreateClients creates etcd v3 client and http client. -func CreateClients(tlsConfig *tls.Config, acUrls url.URL) (*clientv3.Client, *http.Client, error) { - client, err := CreateEtcdClient(tlsConfig, acUrls) +func CreateClients(ctx context.Context, tlsConfig *tls.Config, acUrls url.URL) (*clientv3.Client, *http.Client, error) { + client, err := CreateEtcdClient(ctx, tlsConfig, acUrls) if err != nil { return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() } @@ -245,23 +249,203 @@ func createEtcdClientWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL) return client, err } -// CreateEtcdClient creates etcd v3 client. -// Note: it will be used by legacy pd-server, and only connect to leader only. -func CreateEtcdClient(tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) { +var etcdStateGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "server", + Name: "etcd_client", + Help: "Etcd raft states.", + }, []string{"type"}) + +func init() { + prometheus.MustRegister(etcdStateGauge) +} + +func newClient(tlsConfig *tls.Config, acURL string) (*clientv3.Client, error) { lgc := zap.NewProductionConfig() lgc.Encoding = log.ZapEncodingName client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{acURL.String()}, - DialTimeout: defaultEtcdClientTimeout, - TLS: tlsConfig, - LogConfig: &lgc, + Endpoints: []string{acURL}, + DialTimeout: defaultEtcdClientTimeout, + TLS: tlsConfig, + LogConfig: &lgc, + DialKeepAliveTime: defaultDialKeepAliveTime, + DialKeepAliveTimeout: defaultDialKeepAliveTimeout, }) - if err == nil { - log.Info("create etcd v3 client", zap.String("endpoints", acURL.String())) + return client, err +} + +// CreateEtcdClient creates etcd v3 client. +// Note: it will be used by legacy pd-server, and only connect to leader only. +func CreateEtcdClient(ctx context.Context, tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) { + // TODO: avoid goroutine leak. + client, err := newClient(tlsConfig, acURL.String()) + if err != nil { + return nil, err } + checker := &healthyChecker{ + tlsConfig: tlsConfig, + } + eps := syncUrls(ctx, client) + checker.update(eps) + + // TODO: remove these goroutines after etcd client support balancer by custom. + go func(ctx context.Context, client *clientv3.Client) { + defer logutil.LogPanic() + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + healthyEps := checker.patrol(ctx) + if len(healthyEps) == 0 { + log.Error("[etcd client] no available endpoint") + } else { + usedEps := client.Endpoints() + if !isEqual(healthyEps, usedEps) { + client.SetEndpoints(healthyEps...) + change := fmt.Sprintf("%d->%d", len(usedEps), len(healthyEps)) + etcdStateGauge.WithLabelValues("ep").Set(float64(len(healthyEps))) + log.Info("[etcd client] update endpoints", zap.String("num-change", change), + zap.Strings("last-endpoints", usedEps), zap.Strings("endpoints", client.Endpoints())) + } + } + } + } + }(ctx, client) + + // Notes: use another goroutine to update endpoints to avoid blocking health check in the first goroutine. + go func(ctx context.Context, client *clientv3.Client) { + defer logutil.LogPanic() + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + eps := syncUrls(ctx, client) + checker.update(eps) + } + } + }(ctx, client) + return client, err } +// offlineTimeout is the timeout for an unhealthy etcd endpoint to be offline from healthy checker. +const offlineTimeout = 30 * time.Minute + +type healthyClient struct { + *clientv3.Client + lastHealth time.Time +} + +type healthyChecker struct { + sync.Map // map[string]*healthyClient + tlsConfig *tls.Config +} + +func (checker *healthyChecker) patrol(ctx context.Context) []string { + // See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L105 + var wg sync.WaitGroup + count := 0 + checker.Range(func(key, value interface{}) bool { + count++ + return true + }) + hch := make(chan string, count) + healthyList := make([]string, 0, count) + checker.Range(func(key, value interface{}) bool { + wg.Add(1) + go func(key, value interface{}) { + defer wg.Done() + defer logutil.LogPanic() + ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(ctx), DefaultRequestTimeout) + defer cancel() + ep := key.(string) + client := value.(*healthyClient) + _, err := client.Get(ctx, "health") + // permission denied is OK since proposal goes through consensus to get it + if err == nil || err == rpctypes.ErrPermissionDenied { + hch <- ep + checker.Store(ep, &healthyClient{ + Client: client.Client, + lastHealth: time.Now(), + }) + return + } + }(key, value) + return true + }) + wg.Wait() + close(hch) + for h := range hch { + healthyList = append(healthyList, h) + } + return healthyList +} + +func (checker *healthyChecker) update(eps []string) { + for _, ep := range eps { + // check if client exists, if not, create one, if exists, check if it's offline + if client, ok := checker.Load(ep); ok { + if time.Since(client.(*healthyClient).lastHealth) > offlineTimeout { + log.Info("[etcd client] some endpoint maybe offline", zap.String("endpoint", ep)) + checker.Delete(ep) + } + continue + } + client, err := newClient(checker.tlsConfig, ep) + if err != nil { + log.Error("[etcd client] failed to create etcd healthy client", zap.Error(err)) + continue + } + checker.Store(ep, &healthyClient{ + Client: client, + lastHealth: time.Now(), + }) + } +} + +func syncUrls(ctx context.Context, client *clientv3.Client) []string { + // See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/clientv3/client.go#L170 + ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(ctx), DefaultRequestTimeout) + defer cancel() + now := time.Now() + mresp, err := client.MemberList(ctx) + if err != nil { + log.Error("[etcd client] failed to list members", errs.ZapError(err)) + return []string{} + } + if time.Since(now) > defaultEtcdClientTimeout { + log.Warn("[etcd client] sync etcd members slow", zap.Duration("cost", time.Since(now)), zap.Int("members", len(mresp.Members))) + } + var eps []string + for _, m := range mresp.Members { + if len(m.Name) != 0 && !m.IsLearner { + eps = append(eps, m.ClientURLs...) + } + } + return eps +} + +func isEqual(a, b []string) bool { + if len(a) != len(b) { + return false + } + sort.Strings(a) + sort.Strings(b) + for i, v := range a { + if v != b[i] { + return false + } + } + return true +} + func createHTTPClient(tlsConfig *tls.Config) *http.Client { return &http.Client{ Transport: &http.Transport{ diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 4b7e2d9957b..8ca88866568 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -274,6 +274,44 @@ func TestEtcdClientSync(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval")) } +func TestEtcdClientSync2(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start a etcd server. + cfg1 := NewTestSingleConfig(t) + etcd1, err := embed.StartEtcd(cfg1) + defer func() { + etcd1.Close() + }() + re.NoError(err) + + // Create a etcd client with etcd1 as endpoint. + ep1 := cfg1.LCUrls[0].String() + urls, err := types.NewURLs([]string{ep1}) + re.NoError(err) + client1, err := CreateEtcdClient(ctx, nil, urls[0]) + defer func() { + client1.Close() + }() + re.NoError(err) + <-etcd1.Server.ReadyNotify() + // Add a new member. + etcd2 := checkAddEtcdMember(t, cfg1, client1) + defer etcd2.Close() + checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2}) + time.Sleep(5 * time.Second) + // Remove the first member and close the etcd1. + _, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID())) + re.NoError(err) + etcd1.Close() + time.Sleep(5 * time.Second) + // Check the client can get the new member with the new endpoints. + checkMembers(re, client1, []*embed.Etcd{etcd2}) + time.Sleep(5 * time.Second) +} + func TestEtcdWithHangLeaderEnableCheck(t *testing.T) { re := require.New(t) var err error @@ -305,12 +343,12 @@ func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) { // Create two etcd clients with etcd1 as endpoint. urls, err := types.NewURLs([]string{ep1}) re.NoError(err) - client1, err := CreateEtcdClient(nil, urls[0]) // execute member change operation with this client + client1, err := CreateEtcdClient(context.Background(), nil, urls[0]) // execute member change operation with this client defer func() { client1.Close() }() re.NoError(err) - client2, err := CreateEtcdClient(nil, urls[0]) // check member change with this client + client2, err := CreateEtcdClient(context.Background(), nil, urls[0]) // check member change with this client defer func() { client2.Close() }() @@ -482,7 +520,7 @@ func (suite *loopWatcherTestSuite) SetupSuite() { ep1 := suite.config.LCUrls[0].String() urls, err := types.NewURLs([]string{ep1}) suite.NoError(err) - suite.client, err = CreateEtcdClient(nil, urls[0]) + suite.client, err = CreateEtcdClient(context.Background(), nil, urls[0]) suite.NoError(err) suite.cleans = append(suite.cleans, func() { suite.client.Close() @@ -685,7 +723,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { // Case2: close the etcd client and put a new value after watcher restarts suite.client.Close() - suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0]) + suite.client, err = CreateEtcdClient(context.Background(), nil, suite.config.LCUrls[0]) suite.NoError(err) watcher.updateClientCh <- suite.client suite.put("TestWatcherBreak", "2") @@ -693,7 +731,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { // Case3: close the etcd client and put a new value before watcher restarts suite.client.Close() - suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0]) + suite.client, err = CreateEtcdClient(context.Background(), nil, suite.config.LCUrls[0]) suite.NoError(err) suite.put("TestWatcherBreak", "3") watcher.updateClientCh <- suite.client @@ -701,7 +739,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { // Case4: close the etcd client and put a new value with compact suite.client.Close() - suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0]) + suite.client, err = CreateEtcdClient(context.Background(), nil, suite.config.LCUrls[0]) suite.NoError(err) suite.put("TestWatcherBreak", "4") resp, err := EtcdKVGet(suite.client, "TestWatcherBreak") diff --git a/server/server.go b/server/server.go index 40f5f4d59bd..04b9c84e9d8 100644 --- a/server/server.go +++ b/server/server.go @@ -338,12 +338,12 @@ func (s *Server) startEtcd(ctx context.Context) error { } // start client - s.client, s.httpClient, err = startClient(s.cfg) + s.client, s.httpClient, err = s.startClient() if err != nil { return err } - s.electionClient, err = startElectionClient(s.cfg) + s.electionClient, err = s.startElectionClient() if err != nil { return err } @@ -370,29 +370,29 @@ func (s *Server) startEtcd(ctx context.Context) error { return nil } -func startClient(cfg *config.Config) (*clientv3.Client, *http.Client, error) { - tlsConfig, err := cfg.Security.ToTLSConfig() +func (s *Server) startClient() (*clientv3.Client, *http.Client, error) { + tlsConfig, err := s.cfg.Security.ToTLSConfig() if err != nil { return nil, nil, err } - etcdCfg, err := cfg.GenEmbedEtcdConfig() + etcdCfg, err := s.cfg.GenEmbedEtcdConfig() if err != nil { return nil, nil, err } - return etcdutil.CreateClients(tlsConfig, etcdCfg.ACUrls[0]) + return etcdutil.CreateClients(s.ctx, tlsConfig, etcdCfg.ACUrls[0]) } -func startElectionClient(cfg *config.Config) (*clientv3.Client, error) { - tlsConfig, err := cfg.Security.ToTLSConfig() +func (s *Server) startElectionClient() (*clientv3.Client, error) { + tlsConfig, err := s.cfg.Security.ToTLSConfig() if err != nil { return nil, err } - etcdCfg, err := cfg.GenEmbedEtcdConfig() + etcdCfg, err := s.cfg.GenEmbedEtcdConfig() if err != nil { return nil, err } - return etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls[0]) + return etcdutil.CreateEtcdClient(s.ctx, tlsConfig, etcdCfg.ACUrls[0]) } // AddStartCallback adds a callback in the startServer phase.