From 7674875dd7de12c1e2c3d597ebbe4739fe60a57c Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 22 Aug 2024 16:59:57 +0800 Subject: [PATCH] join, etcdutil: enable linearizable in MemberList Signed-off-by: lhy1024 --- pkg/utils/etcdutil/etcdutil.go | 9 +++++++-- pkg/utils/etcdutil/etcdutil_test.go | 2 ++ pkg/utils/etcdutil/testutil.go | 10 +++++----- server/join/join.go | 12 ++++++++++-- tests/server/join/join_test.go | 12 ++++++++++++ 5 files changed, 36 insertions(+), 9 deletions(-) diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 3eb1afabeac..04491fc7109 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -114,7 +114,7 @@ func AddEtcdMember(client *clientv3.Client, urls []string) (*clientv3.MemberAddR } // ListEtcdMembers returns a list of internal etcd members. -func ListEtcdMembers(ctx context.Context, client *clientv3.Client) (*clientv3.MemberListResponse, error) { +func ListEtcdMembers(ctx context.Context, client *clientv3.Client, linearizables ...bool) (*clientv3.MemberListResponse, error) { failpoint.Inject("SlowEtcdMemberList", func(val failpoint.Value) { d := val.(int) time.Sleep(time.Duration(d) * time.Second) @@ -125,7 +125,12 @@ func ListEtcdMembers(ctx context.Context, client *clientv3.Client) (*clientv3.Me // If Linearizable is set to false, the member list will be returned with server's local data. // If Linearizable is set to true, it is served with linearizable guarantee. If the server is disconnected from quorum, `MemberList` call will fail. c := clientv3.RetryClusterClient(client) - resp, err := c.MemberList(newCtx, &etcdserverpb.MemberListRequest{Linearizable: false}) + linearizable := true + if len(linearizables) > 0 { + linearizable = linearizables[0] + } + + resp, err := c.MemberList(newCtx, &etcdserverpb.MemberListRequest{Linearizable: linearizable}) cancel() if err != nil { return (*clientv3.MemberListResponse)(resp), errs.ErrEtcdMemberList.Wrap(err).GenWithStackByCause() diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 99f71ffde05..f4a650709ea 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -227,6 +227,7 @@ func checkEtcdClientHealth(re *require.Assertions, client *clientv3.Client) { func TestEtcdScaleInAndOut(t *testing.T) { re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)")) // Start a etcd server. servers, _, clean := NewTestEtcdCluster(t, 1) defer clean() @@ -249,6 +250,7 @@ func TestEtcdScaleInAndOut(t *testing.T) { _, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID())) re.NoError(err) checkMembers(re, client2, []*embed.Etcd{etcd2}) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) } func TestRandomKillEtcd(t *testing.T) { diff --git a/pkg/utils/etcdutil/testutil.go b/pkg/utils/etcdutil/testutil.go index e67ae415736..3b1f90a2ac2 100644 --- a/pkg/utils/etcdutil/testutil.go +++ b/pkg/utils/etcdutil/testutil.go @@ -113,17 +113,17 @@ func MustAddEtcdMember(t *testing.T, cfg1 *embed.Config, client *clientv3.Client peerURL := cfg2.ListenPeerUrls[0].String() addResp, err := AddEtcdMember(client, []string{peerURL}) re.NoError(err) + // Start the new etcd member. + etcd2, err := embed.StartEtcd(cfg2) + re.NoError(err) + re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID) + <-etcd2.Server.ReadyNotify() // Check the client can get the new member. testutil.Eventually(re, func() bool { members, err := ListEtcdMembers(client.Ctx(), client) re.NoError(err) return len(addResp.Members) == len(members.Members) }) - // Start the new etcd member. - etcd2, err := embed.StartEtcd(cfg2) - re.NoError(err) - re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID) - <-etcd2.Server.ReadyNotify() return etcd2 } diff --git a/server/join/join.go b/server/join/join.go index bdc2704cd47..bec25df52a7 100644 --- a/server/join/join.go +++ b/server/join/join.go @@ -133,8 +133,9 @@ func PrepareJoinCluster(cfg *config.Config) error { return err } + originMembers := listResp.Members existed := false - for _, m := range listResp.Members { + for _, m := range originMembers { if len(m.Name) == 0 { log.Error("there is an abnormal joined member in the current member list", zap.Uint64("id", m.ID), @@ -174,8 +175,15 @@ func PrepareJoinCluster(cfg *config.Config) error { listSucc bool ) + // If there is only one member in the cluster, the cluster maybe lost leader + // since quorum is not active after adding the new member. + linearizable := true + if len(originMembers) == 1 { + linearizable = false + } + for i := 0; i < listMemberRetryTimes; i++ { - listResp, err = etcdutil.ListEtcdMembers(client.Ctx(), client) + listResp, err = etcdutil.ListEtcdMembers(client.Ctx(), client, linearizable) if err != nil { return err } diff --git a/tests/server/join/join_test.go b/tests/server/join/join_test.go index 5d087caf5e4..03a0c05e665 100644 --- a/tests/server/join/join_test.go +++ b/tests/server/join/join_test.go @@ -113,6 +113,18 @@ func TestFailedAndDeletedPDJoinsPreviousCluster(t *testing.T) { members, err := etcdutil.ListEtcdMembers(ctx, client) re.NoError(err) re.Len(members.Members, 2) + + // Join another PD. + pd4, err := cluster.Join(ctx) + re.NoError(err) + err = pd4.Run() + re.NoError(err) + _, err = os.Stat(filepath.Join(pd4.GetConfig().DataDir, "join")) + re.False(os.IsNotExist(err)) + re.NotEmpty(cluster.WaitLeader()) + members, err = etcdutil.ListEtcdMembers(ctx, client) + re.NoError(err) + re.Len(members.Members, 3) } // A deleted PD joins the previous cluster.