diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index e10650704e39..ad77859e63a7 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -18,6 +18,7 @@ import ( "context" "sync" "sync/atomic" + "time" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" @@ -30,6 +31,11 @@ import ( "go.uber.org/zap" ) +const ( + watchLoopUnhealthyTimeout = 60 * time.Second + detectHealthyInterval = 10 * time.Second +) + // GetLeader gets the corresponding leader from etcd by given leaderPath (as the key). func GetLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) { leader := &pdpb.Member{} @@ -182,26 +188,68 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { if ls == nil { return } + + interval := detectHealthyInterval + timeout := watchLoopUnhealthyTimeout + failpoint.Inject("fastTick", func() { + timeout = 5 * time.Second + interval = 1 * time.Second + }) + ticker := time.NewTicker(interval) + defer ticker.Stop() + lastHealthyTime := time.Now() + watcher := clientv3.NewWatcher(ls.client) defer watcher.Close() - ctx, cancel := context.WithCancel(serverCtx) - defer cancel() - // The revision is the revision of last modification on this key. - // If the revision is compacted, will meet required revision has been compacted error. - // In this case, use the compact revision to re-watch the key. for { failpoint.Inject("delayWatcher", nil) - rch := watcher.Watch(ctx, ls.leaderKey, clientv3.WithRev(revision)) - for wresp := range rch { + // In order to prevent a watch stream being stuck in a partitioned node, + // make sure to wrap context with "WithRequireLeader". + watchChanCtx, watchChanCancel := context.WithCancel(clientv3.WithRequireLeader(serverCtx)) + defer watchChanCancel() + + // When etcd is not available, the watcher.Watch will block, + // so we check the etcd availability first. + if !etcdutil.IsHealthy(serverCtx, ls.client) { + if time.Since(lastHealthyTime) > timeout { + log.Error("the connect of leadership watcher is unhealthy", + zap.Int64("revision", revision), + zap.String("leader-key", ls.leaderKey), + zap.String("purpose", ls.purpose)) + return + } + select { + case <-serverCtx.Done(): + // server closed, return + return + case <-ticker.C: + watchChanCancel() + continue + } + } + + watchChan := watcher.Watch(watchChanCtx, ls.leaderKey, clientv3.WithRev(revision)) + lastHealthyTime = time.Now() + WatchChan: + select { + case <-serverCtx.Done(): + // server closed, return + return + case <-ticker.C: + if !etcdutil.IsHealthy(serverCtx, ls.client) { + watchChanCancel() + continue + } + case wresp := <-watchChan: // meet compacted error, use the compact revision. if wresp.CompactRevision != 0 { log.Warn("required revision has been compacted, use the compact revision", zap.Int64("required-revision", revision), zap.Int64("compact-revision", wresp.CompactRevision)) revision = wresp.CompactRevision - break - } - if wresp.Canceled { + watchChanCancel() + continue + } else if wresp.Err() != nil { // wresp.Err() contains CompactRevision not equal to 0 log.Error("leadership watcher is canceled with", zap.Int64("revision", revision), zap.String("leader-key", ls.leaderKey), @@ -213,19 +261,15 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { for _, ev := range wresp.Events { if ev.Type == mvccpb.DELETE { log.Info("current leadership is deleted", + zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose)) return } } + revision = wresp.Header.Revision + 1 } - - select { - case <-ctx.Done(): - // server closed, return - return - default: - } + goto WatchChan // use goto to avoid to create a new watchChan } } diff --git a/pkg/election/leadership_test.go b/pkg/election/leadership_test.go index 63b25378518e..1021462a9027 100644 --- a/pkg/election/leadership_test.go +++ b/pkg/election/leadership_test.go @@ -19,8 +19,10 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/testutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" ) @@ -118,3 +120,87 @@ func TestLeadership(t *testing.T) { re.NoError(lease1.Close()) re.NoError(lease2.Close()) } + +func TestExitWatch(t *testing.T) { + re := require.New(t) + leaderKey := "/test_leader" + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/fastTick", "return(true)")) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)")) + // Case1: close the client before the watch loop starts + checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) { + re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayWatcher", `pause`)) + client.Close() + re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayWatcher")) + }) + // Case2: close the client when the watch loop is running + checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) { + // Wait for the watch loop to start + time.Sleep(500 * time.Millisecond) + client.Close() + }) + // Case3: delete the leader key + checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) { + leaderKey := leaderKey + _, err := client.Delete(context.Background(), leaderKey) + re.NoError(err) + }) + // Case4: close the server before the watch loop starts + checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) { + re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayWatcher", `pause`)) + server.Close() + re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayWatcher")) + }) + // Case5: close the server when the watch loop is running + checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) { + // Wait for the watch loop to start + time.Sleep(500 * time.Millisecond) + server.Close() + }) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/fastTick")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) +} + +func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embed.Etcd, client *clientv3.Client)) { + re := require.New(t) + cfg := etcdutil.NewTestSingleConfig(t) + etcd, err := embed.StartEtcd(cfg) + defer func() { + etcd.Close() + }() + re.NoError(err) + + ep := cfg.LCUrls[0].String() + client1, err := clientv3.New(clientv3.Config{ + Endpoints: []string{ep}, + }) + re.NoError(err) + client2, err := clientv3.New(clientv3.Config{ + Endpoints: []string{ep}, + }) + re.NoError(err) + + <-etcd.Server.ReadyNotify() + + leadership1 := NewLeadership(client1, leaderKey, "test_leader_1") + leadership2 := NewLeadership(client2, leaderKey, "test_leader_2") + err = leadership1.Campaign(defaultLeaseTimeout, "test_leader_1") + re.NoError(err) + resp, err := client2.Get(context.Background(), leaderKey) + re.NoError(err) + done := make(chan struct{}) + go func() { + leadership2.Watch(context.Background(), resp.Header.Revision) + done <- struct{}{} + }() + + injectFunc(etcd, client2) + + testutil.Eventually(re, func() bool { + select { + case <-done: + return true + default: + return false + } + }) +} diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 6705c4b1da9e..de3edd5fe5a8 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -264,7 +264,7 @@ func (s *Server) initClient() error { if err != nil { return err } - s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)[0]) + s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)) return err } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 1b4b5c363746..02a5dd943106 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -386,7 +386,7 @@ func (s *Server) initClient() error { if err != nil { return err } - s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls[0]) + s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls) return err } diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 4b1f1d7fa151..10c8006ece7b 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -17,9 +17,11 @@ package etcdutil import ( "context" "crypto/tls" + "fmt" "math/rand" "net/http" "net/url" + "sort" "sync" "time" @@ -32,6 +34,7 @@ import ( "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/etcdserver" + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/types" "go.uber.org/zap" @@ -41,9 +44,6 @@ const ( // defaultEtcdClientTimeout is the default timeout for etcd client. defaultEtcdClientTimeout = 3 * time.Second - // defaultAutoSyncInterval is the interval to sync etcd cluster. - defaultAutoSyncInterval = 60 * time.Second - // defaultDialKeepAliveTime is the time after which client pings the server to see if transport is alive. defaultDialKeepAliveTime = 10 * time.Second @@ -144,6 +144,20 @@ func EtcdKVGet(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clie return resp, nil } +// IsHealthy checks if the etcd is healthy. +func IsHealthy(ctx context.Context, client *clientv3.Client) bool { + timeout := DefaultRequestTimeout + failpoint.Inject("fastTick", func() { + timeout = 100 * time.Millisecond + }) + ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(ctx), timeout) + defer cancel() + _, err := client.Get(ctx, "health") + // permission denied is OK since proposal goes through consensus to get it + // See: https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L124 + return err == nil || err == rpctypes.ErrPermissionDenied +} + // GetValue gets value with key from etcd. func GetValue(c *clientv3.Client, key string, opts ...clientv3.OpOption) ([]byte, error) { resp, err := get(c, key, opts...) @@ -196,72 +210,231 @@ func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value return kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID)) } -// CreateClients creates etcd v3 client and http client. -func CreateClients(tlsConfig *tls.Config, acUrls url.URL) (*clientv3.Client, *http.Client, error) { - client, err := CreateEtcdClient(tlsConfig, acUrls) - if err != nil { - return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() - } - httpClient := createHTTPClient(tlsConfig) - return client, httpClient, nil -} +const ( + // etcdServerOfflineTimeout is the timeout for an unhealthy etcd endpoint to be offline from healthy checker. + etcdServerOfflineTimeout = 30 * time.Minute + // etcdServerDisconnectedTimeout is the timeout for an unhealthy etcd endpoint to be disconnected from healthy checker. + etcdServerDisconnectedTimeout = 1 * time.Minute +) -// createEtcdClientWithMultiEndpoint creates etcd v3 client. -// Note: it will be used by micro service server and support multi etcd endpoints. -// FIXME: But it cannot switch etcd endpoints as soon as possible when one of endpoints is with io hang. -func createEtcdClientWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, error) { - if len(acUrls) == 0 { - return nil, errs.ErrNewEtcdClient.FastGenByArgs("no available etcd address") - } - endpoints := make([]string, 0, len(acUrls)) - for _, u := range acUrls { - endpoints = append(endpoints, u.String()) - } +func newClient(tlsConfig *tls.Config, endpoints ...string) (*clientv3.Client, error) { lgc := zap.NewProductionConfig() lgc.Encoding = log.ZapEncodingName - autoSyncInterval := defaultAutoSyncInterval - dialKeepAliveTime := defaultDialKeepAliveTime - dialKeepAliveTimeout := defaultDialKeepAliveTimeout - failpoint.Inject("autoSyncInterval", func() { - autoSyncInterval = 10 * time.Millisecond - }) - failpoint.Inject("closeKeepAliveCheck", func() { - autoSyncInterval = 0 - dialKeepAliveTime = 0 - dialKeepAliveTimeout = 0 - }) client, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: defaultEtcdClientTimeout, - AutoSyncInterval: autoSyncInterval, TLS: tlsConfig, LogConfig: &lgc, - DialKeepAliveTime: dialKeepAliveTime, - DialKeepAliveTimeout: dialKeepAliveTimeout, + DialKeepAliveTime: defaultDialKeepAliveTime, + DialKeepAliveTimeout: defaultDialKeepAliveTimeout, }) - if err == nil { - log.Info("create etcd v3 client", zap.Strings("endpoints", endpoints)) + return client, err +} + +// CreateEtcdClient creates etcd v3 client with detecting endpoints. +func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client, error) { + urls := make([]string, 0, len(acURLs)) + for _, u := range acURLs { + urls = append(urls, u.String()) + } + client, err := newClient(tlsConfig, urls...) + if err != nil { + return nil, err } + + tickerInterval := defaultDialKeepAliveTime + failpoint.Inject("fastTick", func() { + tickerInterval = 100 * time.Millisecond + }) + failpoint.Inject("closeTick", func() { + tickerInterval = 0 + }) + if tickerInterval == 0 { + return client, err + } + + checker := &healthyChecker{ + tlsConfig: tlsConfig, + } + eps := syncUrls(client) + checker.update(eps) + + // Create a goroutine to check the health of etcd endpoints periodically. + go func(client *clientv3.Client) { + defer logutil.LogPanic() + ticker := time.NewTicker(tickerInterval) + defer ticker.Stop() + lastAvailable := time.Now() + for { + select { + case <-client.Ctx().Done(): + log.Info("[etcd client] etcd client is closed, exit health check goroutine") + return + case <-ticker.C: + usedEps := client.Endpoints() + healthyEps := checker.patrol(client.Ctx()) + if len(healthyEps) == 0 { + // when all endpoints are unhealthy, try to reset endpoints to update connect + // rather than delete them to avoid there is no any endpoint in client. + if time.Since(lastAvailable) > etcdServerDisconnectedTimeout { + log.Info("[etcd client] no available endpoint, try to reset endpoints", zap.Strings("last-endpoints", usedEps)) + client.SetEndpoints([]string{}...) + client.SetEndpoints(usedEps...) + } + } else { + if !isEqual(healthyEps, usedEps) { + client.SetEndpoints(healthyEps...) + change := fmt.Sprintf("%d->%d", len(usedEps), len(healthyEps)) + etcdStateGauge.WithLabelValues("endpoints").Set(float64(len(healthyEps))) + log.Info("[etcd client] update endpoints", zap.String("num-change", change), + zap.Strings("last-endpoints", usedEps), zap.Strings("endpoints", client.Endpoints())) + } + lastAvailable = time.Now() + } + } + } + }(client) + + // Notes: use another goroutine to update endpoints to avoid blocking health check in the first goroutine. + go func(client *clientv3.Client) { + defer logutil.LogPanic() + ticker := time.NewTicker(tickerInterval) + defer ticker.Stop() + for { + select { + case <-client.Ctx().Done(): + log.Info("[etcd client] etcd client is closed, exit update endpoint goroutine") + return + case <-ticker.C: + eps := syncUrls(client) + checker.update(eps) + } + } + }(client) + return client, err } -// CreateEtcdClient creates etcd v3 client. -// Note: it will be used by legacy pd-server, and only connect to leader only. -func CreateEtcdClient(tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) { - lgc := zap.NewProductionConfig() - lgc.Encoding = log.ZapEncodingName - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{acURL.String()}, - DialTimeout: defaultEtcdClientTimeout, - TLS: tlsConfig, - LogConfig: &lgc, +type healthyClient struct { + *clientv3.Client + lastHealth time.Time +} + +type healthyChecker struct { + sync.Map // map[string]*healthyClient + tlsConfig *tls.Config +} + +func (checker *healthyChecker) patrol(ctx context.Context) []string { + // See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L105-L145 + var wg sync.WaitGroup + count := 0 + checker.Range(func(key, value interface{}) bool { + count++ + return true + }) + hch := make(chan string, count) + healthyList := make([]string, 0, count) + checker.Range(func(key, value interface{}) bool { + wg.Add(1) + go func(key, value interface{}) { + defer wg.Done() + defer logutil.LogPanic() + ep := key.(string) + client := value.(*healthyClient) + if IsHealthy(ctx, client.Client) { + hch <- ep + checker.Store(ep, &healthyClient{ + Client: client.Client, + lastHealth: time.Now(), + }) + return + } + }(key, value) + return true }) - if err == nil { - log.Info("create etcd v3 client", zap.String("endpoints", acURL.String())) + wg.Wait() + close(hch) + for h := range hch { + healthyList = append(healthyList, h) } - return client, err + return healthyList +} + +func (checker *healthyChecker) update(eps []string) { + for _, ep := range eps { + // check if client exists, if not, create one, if exists, check if it's offline or disconnected. + if client, ok := checker.Load(ep); ok { + lastHealthy := client.(*healthyClient).lastHealth + if time.Since(lastHealthy) > etcdServerOfflineTimeout { + log.Info("[etcd client] some etcd server maybe offline", zap.String("endpoint", ep)) + checker.Delete(ep) + } + if time.Since(lastHealthy) > etcdServerDisconnectedTimeout { + // try to update client to trigger reconnect + client.(*healthyClient).Client.SetEndpoints(ep) + } + continue + } + checker.addClient(ep, time.Now()) + } +} + +func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) { + client, err := newClient(checker.tlsConfig, ep) + if err != nil { + log.Error("[etcd client] failed to create etcd healthy client", zap.Error(err)) + return + } + checker.Store(ep, &healthyClient{ + Client: client, + lastHealth: lastHealth, + }) +} + +func syncUrls(client *clientv3.Client) []string { + // See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/clientv3/client.go#L170-L183 + ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(client.Ctx()), DefaultRequestTimeout) + defer cancel() + mresp, err := client.MemberList(ctx) + if err != nil { + log.Error("[etcd client] failed to list members", errs.ZapError(err)) + return []string{} + } + var eps []string + for _, m := range mresp.Members { + if len(m.Name) != 0 && !m.IsLearner { + eps = append(eps, m.ClientURLs...) + } + } + return eps } +func isEqual(a, b []string) bool { + if len(a) != len(b) { + return false + } + sort.Strings(a) + sort.Strings(b) + for i, v := range a { + if v != b[i] { + return false + } + } + return true +} + +// CreateClients creates etcd v3 client and http client. +func CreateClients(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) { + client, err := CreateEtcdClient(tlsConfig, acUrls) + if err != nil { + return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() + } + httpClient := createHTTPClient(tlsConfig) + return client, httpClient, nil +} + +// createHTTPClient creates a http client with the given tls config. func createHTTPClient(tlsConfig *tls.Config) *http.Client { return &http.Client{ Transport: &http.Transport{ @@ -500,13 +673,13 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision defer watcher.Close() for { - WatchChan: // In order to prevent a watch stream being stuck in a partitioned node, // make sure to wrap context with "WithRequireLeader". watchChanCtx, watchChanCancel := context.WithCancel(clientv3.WithRequireLeader(ctx)) defer watchChanCancel() opts := append(lw.opts, clientv3.WithRev(revision)) watchChan := watcher.Watch(watchChanCtx, lw.key, opts...) + WatchChan: select { case <-ctx.Done(): return revision, nil @@ -517,7 +690,7 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision zap.String("key", lw.key), zap.Error(err)) } watchChanCancel() - goto WatchChan + continue case wresp := <-watchChan: if wresp.CompactRevision != 0 { log.Warn("required revision has been compacted, use the compact revision in watch loop", @@ -525,7 +698,7 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision zap.Int64("compact-revision", wresp.CompactRevision)) revision = wresp.CompactRevision watchChanCancel() - goto WatchChan + continue } else if wresp.Err() != nil { // wresp.Err() contains CompactRevision not equal to 0 log.Error("watcher is canceled in watch loop", zap.Int64("revision", revision), @@ -558,8 +731,8 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision zap.String("key", lw.key), zap.Error(err)) } revision = wresp.Header.Revision + 1 + goto WatchChan // use goto to avoid to create a new watchChan } - watchChanCancel() } } diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 4b7e2d9957be..5c7cc2ecaf4a 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -19,6 +19,7 @@ import ( "crypto/tls" "fmt" "io" + "math/rand" "net" "strings" "sync" @@ -233,7 +234,7 @@ func TestInitClusterID(t *testing.T) { func TestEtcdClientSync(t *testing.T) { re := require.New(t) - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval", "return(true)")) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)")) // Start a etcd server. cfg1 := NewTestSingleConfig(t) @@ -244,10 +245,7 @@ func TestEtcdClientSync(t *testing.T) { re.NoError(err) // Create a etcd client with etcd1 as endpoint. - ep1 := cfg1.LCUrls[0].String() - urls, err := types.NewURLs([]string{ep1}) - re.NoError(err) - client1, err := createEtcdClientWithMultiEndpoint(nil, urls) + client1, err := CreateEtcdClient(nil, cfg1.LCUrls) defer func() { client1.Close() }() @@ -258,39 +256,26 @@ func TestEtcdClientSync(t *testing.T) { etcd2 := checkAddEtcdMember(t, cfg1, client1) defer etcd2.Close() checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2}) + testutil.Eventually(re, func() bool { + // wait for etcd client sync endpoints + return len(client1.Endpoints()) == 2 + }) // Remove the first member and close the etcd1. _, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID())) re.NoError(err) - time.Sleep(20 * time.Millisecond) // wait for etcd client sync endpoints and client will be connected to etcd2 etcd1.Close() // Check the client can get the new member with the new endpoints. - listResp3, err := ListEtcdMembers(client1) - re.NoError(err) - re.Len(listResp3.Members, 1) - re.Equal(uint64(etcd2.Server.ID()), listResp3.Members[0].ID) - - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval")) -} - -func TestEtcdWithHangLeaderEnableCheck(t *testing.T) { - re := require.New(t) - var err error - // Test with enable check. - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval", "return(true)")) - err = checkEtcdWithHangLeader(t) - re.NoError(err) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval")) + testutil.Eventually(re, func() bool { + // wait for etcd client sync endpoints + return len(client1.Endpoints()) == 1 + }) - // Test with disable check. - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck", "return(true)")) - err = checkEtcdWithHangLeader(t) - re.Error(err) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) } -func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) { +func TestEtcdScaleInAndOut(t *testing.T) { re := require.New(t) // Start a etcd server. cfg1 := NewTestSingleConfig(t) @@ -299,18 +284,15 @@ func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) { etcd1.Close() }() re.NoError(err) - ep1 := cfg1.LCUrls[0].String() <-etcd1.Server.ReadyNotify() // Create two etcd clients with etcd1 as endpoint. - urls, err := types.NewURLs([]string{ep1}) - re.NoError(err) - client1, err := CreateEtcdClient(nil, urls[0]) // execute member change operation with this client + client1, err := CreateEtcdClient(nil, cfg1.LCUrls) // execute member change operation with this client defer func() { client1.Close() }() re.NoError(err) - client2, err := CreateEtcdClient(nil, urls[0]) // check member change with this client + client2, err := CreateEtcdClient(nil, cfg1.LCUrls) // check member change with this client defer func() { client2.Close() }() @@ -329,6 +311,71 @@ func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) { checkMembers(re, client2, []*embed.Etcd{etcd2}) } +func TestRandomKillEtcd(t *testing.T) { + re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)")) + // Start a etcd server. + cfg1 := NewTestSingleConfig(t) + etcd1, err := embed.StartEtcd(cfg1) + re.NoError(err) + <-etcd1.Server.ReadyNotify() + client1, err := CreateEtcdClient(nil, cfg1.LCUrls) + re.NoError(err) + defer func() { + client1.Close() + }() + + etcd2 := checkAddEtcdMember(t, cfg1, client1) + cfg2 := etcd2.Config() + <-etcd2.Server.ReadyNotify() + + etcd3 := checkAddEtcdMember(t, &cfg2, client1) + <-etcd3.Server.ReadyNotify() + + time.Sleep(1 * time.Second) + re.Len(client1.Endpoints(), 3) + + // Randomly kill an etcd server and restart it + etcds := []*embed.Etcd{etcd1, etcd2, etcd3} + cfgs := []embed.Config{etcd1.Config(), etcd2.Config(), etcd3.Config()} + for i := 0; i < 10; i++ { + killIndex := rand.Intn(len(etcds)) + etcds[killIndex].Close() + testutil.Eventually(re, func() bool { + return IsHealthy(context.Background(), client1) + }) + etcd, err := embed.StartEtcd(&cfgs[killIndex]) + re.NoError(err) + <-etcd.Server.ReadyNotify() + etcds[killIndex] = etcd + testutil.Eventually(re, func() bool { + return IsHealthy(context.Background(), client1) + }) + } + for _, etcd := range etcds { + if etcd != nil { + etcd.Close() + } + } + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) +} + +func TestEtcdWithHangLeaderEnableCheck(t *testing.T) { + re := require.New(t) + var err error + // Test with enable check. + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)")) + err = checkEtcdWithHangLeader(t) + re.NoError(err) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) + + // Test with disable check. + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/closeTick", "return(true)")) + err = checkEtcdWithHangLeader(t) + re.Error(err) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeTick")) +} + func checkEtcdWithHangLeader(t *testing.T) error { re := require.New(t) // Start a etcd server. @@ -349,13 +396,13 @@ func checkEtcdWithHangLeader(t *testing.T) error { // Create a etcd client with etcd1 as endpoint. urls, err := types.NewURLs([]string{proxyAddr}) re.NoError(err) - client1, err := createEtcdClientWithMultiEndpoint(nil, urls) + client1, err := CreateEtcdClient(nil, urls) defer func() { client1.Close() }() re.NoError(err) - // Add a new member and set the client endpoints to etcd1 and etcd2. + // Add a new member etcd2 := checkAddEtcdMember(t, cfg1, client1) defer etcd2.Close() checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2}) @@ -363,7 +410,7 @@ func checkEtcdWithHangLeader(t *testing.T) error { // Hang the etcd1 and wait for the client to connect to etcd2. enableDiscard.Store(true) - time.Sleep(defaultDialKeepAliveTime + defaultDialKeepAliveTimeout*2) + time.Sleep(time.Second) _, err = EtcdKVGet(client1, "test/key1") return err } @@ -473,16 +520,14 @@ func TestLoopWatcherTestSuite(t *testing.T) { } func (suite *loopWatcherTestSuite) SetupSuite() { + var err error t := suite.T() suite.ctx, suite.cancel = context.WithCancel(context.Background()) suite.cleans = make([]func(), 0) // Start a etcd server and create a client with etcd1 as endpoint. suite.config = NewTestSingleConfig(t) suite.startEtcd() - ep1 := suite.config.LCUrls[0].String() - urls, err := types.NewURLs([]string{ep1}) - suite.NoError(err) - suite.client, err = CreateEtcdClient(nil, urls[0]) + suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls) suite.NoError(err) suite.cleans = append(suite.cleans, func() { suite.client.Close() @@ -685,7 +730,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { // Case2: close the etcd client and put a new value after watcher restarts suite.client.Close() - suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0]) + suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls) suite.NoError(err) watcher.updateClientCh <- suite.client suite.put("TestWatcherBreak", "2") @@ -693,7 +738,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { // Case3: close the etcd client and put a new value before watcher restarts suite.client.Close() - suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0]) + suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls) suite.NoError(err) suite.put("TestWatcherBreak", "3") watcher.updateClientCh <- suite.client @@ -701,7 +746,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { // Case4: close the etcd client and put a new value with compact suite.client.Close() - suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0]) + suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls) suite.NoError(err) suite.put("TestWatcherBreak", "4") resp, err := EtcdKVGet(suite.client, "TestWatcherBreak") diff --git a/pkg/utils/etcdutil/metrics.go b/pkg/utils/etcdutil/metrics.go new file mode 100644 index 000000000000..f78e0864ba2c --- /dev/null +++ b/pkg/utils/etcdutil/metrics.go @@ -0,0 +1,29 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdutil + +import "github.com/prometheus/client_golang/prometheus" + +var etcdStateGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "server", + Name: "etcd_client", + Help: "Etcd client states.", + }, []string{"type"}) + +func init() { + prometheus.MustRegister(etcdStateGauge) +} diff --git a/server/server.go b/server/server.go index 40f5f4d59bd1..b60fa423fc1a 100644 --- a/server/server.go +++ b/server/server.go @@ -338,12 +338,12 @@ func (s *Server) startEtcd(ctx context.Context) error { } // start client - s.client, s.httpClient, err = startClient(s.cfg) + s.client, s.httpClient, err = s.startClient() if err != nil { return err } - s.electionClient, err = startElectionClient(s.cfg) + s.electionClient, err = s.startElectionClient() if err != nil { return err } @@ -370,29 +370,29 @@ func (s *Server) startEtcd(ctx context.Context) error { return nil } -func startClient(cfg *config.Config) (*clientv3.Client, *http.Client, error) { - tlsConfig, err := cfg.Security.ToTLSConfig() +func (s *Server) startClient() (*clientv3.Client, *http.Client, error) { + tlsConfig, err := s.cfg.Security.ToTLSConfig() if err != nil { return nil, nil, err } - etcdCfg, err := cfg.GenEmbedEtcdConfig() + etcdCfg, err := s.cfg.GenEmbedEtcdConfig() if err != nil { return nil, nil, err } - return etcdutil.CreateClients(tlsConfig, etcdCfg.ACUrls[0]) + return etcdutil.CreateClients(tlsConfig, etcdCfg.ACUrls) } -func startElectionClient(cfg *config.Config) (*clientv3.Client, error) { - tlsConfig, err := cfg.Security.ToTLSConfig() +func (s *Server) startElectionClient() (*clientv3.Client, error) { + tlsConfig, err := s.cfg.Security.ToTLSConfig() if err != nil { return nil, err } - etcdCfg, err := cfg.GenEmbedEtcdConfig() + etcdCfg, err := s.cfg.GenEmbedEtcdConfig() if err != nil { return nil, err } - return etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls[0]) + return etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls) } // AddStartCallback adds a callback in the startServer phase.