diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index ca51066411a..e71a8dd974b 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -210,7 +210,7 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { // When etcd is not available, the watcher.Watch will block, // so we check the etcd availability first. - if _, err := etcdutil.EtcdKVGet(ls.client, ls.leaderKey); err != nil { + if !etcdutil.IsHealthy(serverCtx, ls.client) { if time.Since(lastHealthyTime) > timeout { log.Error("the connect of leadership watcher is unhealthy", zap.Int64("revision", revision), @@ -235,7 +235,7 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { // server closed, return return case <-ticker.C: - if _, err := etcdutil.EtcdKVGet(ls.client, ls.leaderKey); err != nil { + if !etcdutil.IsHealthy(serverCtx, ls.client) { watchChanCancel() continue } diff --git a/pkg/election/leadership_test.go b/pkg/election/leadership_test.go index 1be5cb6e113..5e98f7a5d36 100644 --- a/pkg/election/leadership_test.go +++ b/pkg/election/leadership_test.go @@ -124,7 +124,6 @@ func TestExitWatch(t *testing.T) { re := require.New(t) leaderKey := "/test_leader" re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/fastTick", "return(true)")) - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/lessTimeout", "return(true)")) // Case1: close the client before the watch loop starts checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) { re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayWatcher", `pause`)) @@ -156,7 +155,6 @@ func TestExitWatch(t *testing.T) { server.Close() }) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/fastTick")) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/lessTimeout")) } func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embed.Etcd, client *clientv3.Client)) { diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 15ca1786e03..9592d953310 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -127,11 +127,7 @@ func RemoveEtcdMember(client *clientv3.Client, id uint64) (*clientv3.MemberRemov // EtcdKVGet returns the etcd GetResponse by given key or key prefix func EtcdKVGet(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { - timeout := DefaultRequestTimeout - failpoint.Inject("lessTimeout", func() { - timeout = 1 * time.Second - }) - ctx, cancel := context.WithTimeout(c.Ctx(), timeout) + ctx, cancel := context.WithTimeout(c.Ctx(), DefaultRequestTimeout) defer cancel() start := time.Now() @@ -148,6 +144,20 @@ func EtcdKVGet(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clie return resp, nil } +// IsHealthy checks if the etcd is healthy. +func IsHealthy(ctx context.Context, client *clientv3.Client) bool { + timeout := DefaultRequestTimeout + failpoint.Inject("fastTick", func() { + timeout = 100 * time.Millisecond + }) + ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(ctx), timeout) + defer cancel() + _, err := client.Get(ctx, "health") + // permission denied is OK since proposal goes through consensus to get it + // See: https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L124 + return err == nil || err == rpctypes.ErrPermissionDenied +} + // GetValue gets value with key from etcd. func GetValue(c *clientv3.Client, key string, opts ...clientv3.OpOption) ([]byte, error) { resp, err := get(c, key, opts...) @@ -328,13 +338,9 @@ func (checker *healthyChecker) patrol(ctx context.Context) []string { go func(key, value interface{}) { defer wg.Done() defer logutil.LogPanic() - ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(ctx), DefaultRequestTimeout) - defer cancel() ep := key.(string) client := value.(*healthyClient) - _, err := client.Get(ctx, "health") - // permission denied is OK since proposal goes through consensus to get it - if err == nil || err == rpctypes.ErrPermissionDenied { + if IsHealthy(ctx, client.Client) { hch <- ep checker.Store(ep, &healthyClient{ Client: client.Client, diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 4caa0f46d1a..5c7cc2ecaf4 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -19,6 +19,7 @@ import ( "crypto/tls" "fmt" "io" + "math/rand" "net" "strings" "sync" @@ -244,10 +245,7 @@ func TestEtcdClientSync(t *testing.T) { re.NoError(err) // Create a etcd client with etcd1 as endpoint. - ep1 := cfg1.LCUrls[0].String() - urls, err := types.NewURLs([]string{ep1}) - re.NoError(err) - client1, err := CreateEtcdClient(nil, urls) + client1, err := CreateEtcdClient(nil, cfg1.LCUrls) defer func() { client1.Close() }() @@ -258,7 +256,10 @@ func TestEtcdClientSync(t *testing.T) { etcd2 := checkAddEtcdMember(t, cfg1, client1) defer etcd2.Close() checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2}) - time.Sleep(200 * time.Millisecond) // wait for etcd client sync endpoints and client will be connected to etcd2 + testutil.Eventually(re, func() bool { + // wait for etcd client sync endpoints + return len(client1.Endpoints()) == 2 + }) // Remove the first member and close the etcd1. _, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID())) @@ -267,29 +268,13 @@ func TestEtcdClientSync(t *testing.T) { // Check the client can get the new member with the new endpoints. testutil.Eventually(re, func() bool { - listResp, err := ListEtcdMembers(client1) - return err == nil && len(listResp.Members) == 1 && listResp.Members[0].ID == uint64(etcd2.Server.ID()) + // wait for etcd client sync endpoints + return len(client1.Endpoints()) == 1 }) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) } -func TestEtcdWithHangLeaderEnableCheck(t *testing.T) { - re := require.New(t) - var err error - // Test with enable check. - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)")) - err = checkEtcdWithHangLeader(t) - re.NoError(err) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) - - // Test with disable check. - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/closeTick", "return(true)")) - err = checkEtcdWithHangLeader(t) - re.Error(err) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeTick")) -} - func TestEtcdScaleInAndOut(t *testing.T) { re := require.New(t) // Start a etcd server. @@ -299,18 +284,15 @@ func TestEtcdScaleInAndOut(t *testing.T) { etcd1.Close() }() re.NoError(err) - ep1 := cfg1.LCUrls[0].String() <-etcd1.Server.ReadyNotify() // Create two etcd clients with etcd1 as endpoint. - urls, err := types.NewURLs([]string{ep1}) - re.NoError(err) - client1, err := CreateEtcdClient(nil, urls) // execute member change operation with this client + client1, err := CreateEtcdClient(nil, cfg1.LCUrls) // execute member change operation with this client defer func() { client1.Close() }() re.NoError(err) - client2, err := CreateEtcdClient(nil, urls) // check member change with this client + client2, err := CreateEtcdClient(nil, cfg1.LCUrls) // check member change with this client defer func() { client2.Close() }() @@ -329,6 +311,71 @@ func TestEtcdScaleInAndOut(t *testing.T) { checkMembers(re, client2, []*embed.Etcd{etcd2}) } +func TestRandomKillEtcd(t *testing.T) { + re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)")) + // Start a etcd server. + cfg1 := NewTestSingleConfig(t) + etcd1, err := embed.StartEtcd(cfg1) + re.NoError(err) + <-etcd1.Server.ReadyNotify() + client1, err := CreateEtcdClient(nil, cfg1.LCUrls) + re.NoError(err) + defer func() { + client1.Close() + }() + + etcd2 := checkAddEtcdMember(t, cfg1, client1) + cfg2 := etcd2.Config() + <-etcd2.Server.ReadyNotify() + + etcd3 := checkAddEtcdMember(t, &cfg2, client1) + <-etcd3.Server.ReadyNotify() + + time.Sleep(1 * time.Second) + re.Len(client1.Endpoints(), 3) + + // Randomly kill an etcd server and restart it + etcds := []*embed.Etcd{etcd1, etcd2, etcd3} + cfgs := []embed.Config{etcd1.Config(), etcd2.Config(), etcd3.Config()} + for i := 0; i < 10; i++ { + killIndex := rand.Intn(len(etcds)) + etcds[killIndex].Close() + testutil.Eventually(re, func() bool { + return IsHealthy(context.Background(), client1) + }) + etcd, err := embed.StartEtcd(&cfgs[killIndex]) + re.NoError(err) + <-etcd.Server.ReadyNotify() + etcds[killIndex] = etcd + testutil.Eventually(re, func() bool { + return IsHealthy(context.Background(), client1) + }) + } + for _, etcd := range etcds { + if etcd != nil { + etcd.Close() + } + } + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) +} + +func TestEtcdWithHangLeaderEnableCheck(t *testing.T) { + re := require.New(t) + var err error + // Test with enable check. + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)")) + err = checkEtcdWithHangLeader(t) + re.NoError(err) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) + + // Test with disable check. + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/closeTick", "return(true)")) + err = checkEtcdWithHangLeader(t) + re.Error(err) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeTick")) +} + func checkEtcdWithHangLeader(t *testing.T) error { re := require.New(t) // Start a etcd server. @@ -355,7 +402,7 @@ func checkEtcdWithHangLeader(t *testing.T) error { }() re.NoError(err) - // Add a new member and set the client endpoints to etcd1 and etcd2. + // Add a new member etcd2 := checkAddEtcdMember(t, cfg1, client1) defer etcd2.Close() checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2}) @@ -363,7 +410,7 @@ func checkEtcdWithHangLeader(t *testing.T) error { // Hang the etcd1 and wait for the client to connect to etcd2. enableDiscard.Store(true) - time.Sleep(defaultDialKeepAliveTime + defaultDialKeepAliveTimeout*2) + time.Sleep(time.Second) _, err = EtcdKVGet(client1, "test/key1") return err } @@ -473,16 +520,14 @@ func TestLoopWatcherTestSuite(t *testing.T) { } func (suite *loopWatcherTestSuite) SetupSuite() { + var err error t := suite.T() suite.ctx, suite.cancel = context.WithCancel(context.Background()) suite.cleans = make([]func(), 0) // Start a etcd server and create a client with etcd1 as endpoint. suite.config = NewTestSingleConfig(t) suite.startEtcd() - ep1 := suite.config.LCUrls[0].String() - urls, err := types.NewURLs([]string{ep1}) - suite.NoError(err) - suite.client, err = CreateEtcdClient(nil, urls) + suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls) suite.NoError(err) suite.cleans = append(suite.cleans, func() { suite.client.Close()