From 5bfd4a173d39faf6f66019b3c049ae7c4aad9966 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 30 Aug 2023 12:40:45 +0800 Subject: [PATCH 1/7] refactor etcd test server Signed-off-by: lhy1024 --- pkg/election/leadership_test.go | 96 ++--------- pkg/election/lease_test.go | 33 +--- pkg/encryption/key_manager_test.go | 30 +--- pkg/id/id_test.go | 20 +-- pkg/mcs/discovery/discover_test.go | 36 +--- pkg/mcs/discovery/register_test.go | 16 +- pkg/storage/kv/kv_test.go | 13 +- pkg/storage/storage_tso_test.go | 44 +---- pkg/tso/keyspace_group_manager_test.go | 4 +- pkg/tso/testutil.go | 25 --- pkg/utils/etcdutil/etcdutil_test.go | 215 ++++-------------------- pkg/utils/etcdutil/testutil.go | 80 ++++++++- tools/pd-backup/pdbackup/backup_test.go | 31 +--- 13 files changed, 164 insertions(+), 479 deletions(-) diff --git a/pkg/election/leadership_test.go b/pkg/election/leadership_test.go index 422f583575f..372486634ae 100644 --- a/pkg/election/leadership_test.go +++ b/pkg/election/leadership_test.go @@ -16,7 +16,6 @@ package election import ( "context" - "fmt" "os" "strings" "testing" @@ -35,27 +34,15 @@ const defaultLeaseTimeout = 1 func TestLeadership(t *testing.T) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) - - <-etcd.Server.ReadyNotify() + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() // Campaign the same leadership leadership1 := NewLeadership(client, "/test_leader", "test_leader_1") leadership2 := NewLeadership(client, "/test_leader", "test_leader_2") // leadership1 starts first and get the leadership - err = leadership1.Campaign(defaultLeaseTimeout, "test_leader_1") + err := leadership1.Campaign(defaultLeaseTimeout, "test_leader_1") re.NoError(err) // leadership2 starts then and can not get the leadership err = leadership2.Campaign(defaultLeaseTimeout, "test_leader_2") @@ -168,23 +155,14 @@ func TestExitWatch(t *testing.T) { // Case6: transfer leader without client reconnection. checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) func() { cfg1 := server.Config() - cfg2 := etcdutil.NewTestSingleConfig(t) - cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0]) - cfg2.ClusterState = embed.ClusterStateFlagExisting - peerURL := cfg2.LPUrls[0].String() - addResp, err := etcdutil.AddEtcdMember(client, []string{peerURL}) - re.NoError(err) - etcd2, err := embed.StartEtcd(cfg2) - re.NoError(err) - re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID) - <-etcd2.Server.ReadyNotify() - ep := cfg2.LCUrls[0].String() + etcd2 := etcdutil.MustAddEtcdMember(t, &cfg1, client) client1, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, + Endpoints: []string{etcd2.Config().LCUrls[0].String()}, }) re.NoError(err) - + // close the original leader server.Server.HardStop() + // delete the leader key with the new client client1.Delete(context.Background(), leaderKey) return func() { etcd2.Close() @@ -201,27 +179,9 @@ func TestExitWatch(t *testing.T) { log.ReplaceGlobals(lg, p) cfg1 := server.Config() - cfg2 := etcdutil.NewTestSingleConfig(t) - cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0]) - cfg2.ClusterState = embed.ClusterStateFlagExisting - peerURL := cfg2.LPUrls[0].String() - addResp, err := etcdutil.AddEtcdMember(client, []string{peerURL}) - re.NoError(err) - etcd2, err := embed.StartEtcd(cfg2) - re.NoError(err) - re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID) - <-etcd2.Server.ReadyNotify() - - cfg3 := etcdutil.NewTestSingleConfig(t) - cfg3.InitialCluster = cfg2.InitialCluster + fmt.Sprintf(",%s=%s", cfg3.Name, &cfg3.LPUrls[0]) - cfg3.ClusterState = embed.ClusterStateFlagExisting - peerURL = cfg3.LPUrls[0].String() - addResp, err = etcdutil.AddEtcdMember(client, []string{peerURL}) - re.NoError(err) - etcd3, err := embed.StartEtcd(cfg3) - re.NoError(err) - re.Equal(uint64(etcd3.Server.ID()), addResp.Member.ID) - <-etcd3.Server.ReadyNotify() + etcd2 := etcdutil.MustAddEtcdMember(t, &cfg1, client) + cfg2 := etcd2.Config() + etcd3 := etcdutil.MustAddEtcdMember(t, &cfg2, client) resp2, err := client.MemberList(context.Background()) re.NoError(err) @@ -237,25 +197,14 @@ func TestExitWatch(t *testing.T) { func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embed.Etcd, client *clientv3.Client) func()) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) + servers, client1, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() - ep := cfg.LCUrls[0].String() - client1, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) client2, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, + Endpoints: []string{servers[0].Config().LCUrls[0].String()}, }) re.NoError(err) - <-etcd.Server.ReadyNotify() - leadership1 := NewLeadership(client1, leaderKey, "test_leader_1") leadership2 := NewLeadership(client2, leaderKey, "test_leader_2") err = leadership1.Campaign(defaultLeaseTimeout, "test_leader_1") @@ -268,7 +217,7 @@ func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embe done <- struct{}{} }() - cleanFunc := injectFunc(etcd, client2) + cleanFunc := injectFunc(servers[0], client2) defer cleanFunc() testutil.Eventually(re, func() bool { @@ -292,25 +241,14 @@ func TestRequestProgress(t *testing.T) { log.ReplaceGlobals(lg, p) re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) + servers, client1, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() - ep := cfg.LCUrls[0].String() - client1, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) client2, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, + Endpoints: []string{servers[0].Config().LCUrls[0].String()}, }) re.NoError(err) - <-etcd.Server.ReadyNotify() - leaderKey := "/test_leader" leadership1 := NewLeadership(client1, leaderKey, "test_leader_1") leadership2 := NewLeadership(client2, leaderKey, "test_leader_2") diff --git a/pkg/election/lease_test.go b/pkg/election/lease_test.go index 70f55230293..3d8515eadb2 100644 --- a/pkg/election/lease_test.go +++ b/pkg/election/lease_test.go @@ -22,25 +22,12 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/etcdutil" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/embed" ) func TestLease(t *testing.T) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) - - <-etcd.Server.ReadyNotify() + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() // Create the lease. lease1 := &lease{ @@ -104,20 +91,8 @@ func TestLease(t *testing.T) { func TestLeaseKeepAlive(t *testing.T) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) - - <-etcd.Server.ReadyNotify() + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() // Create the lease. lease := &lease{ diff --git a/pkg/encryption/key_manager_test.go b/pkg/encryption/key_manager_test.go index a8c2493adc7..ac68fb94cc2 100644 --- a/pkg/encryption/key_manager_test.go +++ b/pkg/encryption/key_manager_test.go @@ -17,8 +17,6 @@ package encryption import ( "context" "encoding/hex" - "fmt" - "net/url" "os" "path/filepath" "sync/atomic" @@ -31,10 +29,8 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/utils/etcdutil" - "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/embed" ) const ( @@ -50,32 +46,10 @@ func getTestDataKey() []byte { } func newTestEtcd(t *testing.T, re *require.Assertions) (client *clientv3.Client) { - cfg := embed.NewConfig() - cfg.Name = "test_etcd" - cfg.Dir = t.TempDir() - cfg.Logger = "zap" - pu, err := url.Parse(tempurl.Alloc()) - re.NoError(err) - cfg.LPUrls = []url.URL{*pu} - cfg.APUrls = cfg.LPUrls - cu, err := url.Parse(tempurl.Alloc()) - re.NoError(err) - cfg.LCUrls = []url.URL{*cu} - cfg.ACUrls = cfg.LCUrls - cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, &cfg.LPUrls[0]) - cfg.ClusterState = embed.ClusterStateFlagNew - server, err := embed.StartEtcd(cfg) - re.NoError(err) - <-server.Server.ReadyNotify() - - client, err = clientv3.New(clientv3.Config{ - Endpoints: []string{cfg.LCUrls[0].String()}, - }) - re.NoError(err) + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) t.Cleanup(func() { - client.Close() - server.Close() + clean() }) return client diff --git a/pkg/id/id_test.go b/pkg/id/id_test.go index 1b1632cc763..94f0670b979 100644 --- a/pkg/id/id_test.go +++ b/pkg/id/id_test.go @@ -22,8 +22,6 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/etcdutil" - "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/embed" ) const ( @@ -39,23 +37,11 @@ const ( // share rootPath and member val update their ids concurrently. func TestMultipleAllocator(t *testing.T) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) - - <-etcd.Server.ReadyNotify() + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() // Put memberValue to leaderPath to simulate an election success. - _, err = client.Put(context.Background(), leaderPath, memberVal) + _, err := client.Put(context.Background(), leaderPath, memberVal) re.NoError(err) wg := sync.WaitGroup{} diff --git a/pkg/mcs/discovery/discover_test.go b/pkg/mcs/discovery/discover_test.go index fed1d7844a0..2894dfa8d2d 100644 --- a/pkg/mcs/discovery/discover_test.go +++ b/pkg/mcs/discovery/discover_test.go @@ -21,28 +21,14 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/etcdutil" - "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/embed" ) func TestDiscover(t *testing.T) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) - - ep := cfg.LCUrls[0].String() - re.NoError(err) - - client, err := clientv3.NewFromURL(ep) - re.NoError(err) - - <-etcd.Server.ReadyNotify() + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() sr1 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:1", "127.0.0.1:1", 1) - err = sr1.Register() + err := sr1.Register() re.NoError(err) sr2 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1) err = sr2.Register() @@ -64,20 +50,8 @@ func TestDiscover(t *testing.T) { func TestServiceRegistryEntry(t *testing.T) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) - - ep := cfg.LCUrls[0].String() - re.NoError(err) - - client, err := clientv3.NewFromURL(ep) - re.NoError(err) - - <-etcd.Server.ReadyNotify() + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() entry1 := &ServiceRegistryEntry{ServiceAddr: "127.0.0.1:1"} s1, err := entry1.Serialize() re.NoError(err) diff --git a/pkg/mcs/discovery/register_test.go b/pkg/mcs/discovery/register_test.go index f27f3160afe..032b0558a79 100644 --- a/pkg/mcs/discovery/register_test.go +++ b/pkg/mcs/discovery/register_test.go @@ -28,18 +28,13 @@ import ( func TestRegister(t *testing.T) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - re.NoError(err) - ep := cfg.LCUrls[0].String() - client, err := clientv3.NewFromURL(ep) - re.NoError(err) - <-etcd.Server.ReadyNotify() + servers, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() + etcd, cfg := servers[0], servers[0].Config() // Test register with http prefix. sr := NewServiceRegister(context.Background(), client, "12345", "test_service", "http://127.0.0.1:1", "http://127.0.0.1:1", 10) - re.NoError(err) - err = sr.Register() + err := sr.Register() re.NoError(err) re.Equal("/ms/12345/test_service/registry/http://127.0.0.1:1", sr.key) resp, err := client.Get(context.Background(), sr.key) @@ -69,14 +64,13 @@ func TestRegister(t *testing.T) { etcd.Server.HardStop() // close the etcd to make the keepalive failed time.Sleep(etcdutil.DefaultDialTimeout) // ensure that the request is timeout etcd.Close() - etcd, err = embed.StartEtcd(cfg) + etcd, err = embed.StartEtcd(&cfg) re.NoError(err) <-etcd.Server.ReadyNotify() testutil.Eventually(re, func() bool { return getKeyAfterLeaseExpired(re, client, sr.key) == "127.0.0.1:2" }) } - etcd.Close() } func getKeyAfterLeaseExpired(re *require.Assertions, client *clientv3.Client, key string) string { diff --git a/pkg/storage/kv/kv_test.go b/pkg/storage/kv/kv_test.go index d2db558a748..93359934da1 100644 --- a/pkg/storage/kv/kv_test.go +++ b/pkg/storage/kv/kv_test.go @@ -24,21 +24,12 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/etcdutil" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/embed" ) func TestEtcd(t *testing.T) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - re.NoError(err) - defer etcd.Close() - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) kv := NewEtcdKVBase(client, rootPath) diff --git a/pkg/storage/storage_tso_test.go b/pkg/storage/storage_tso_test.go index 1dbba289512..dd8c02d8497 100644 --- a/pkg/storage/storage_tso_test.go +++ b/pkg/storage/storage_tso_test.go @@ -23,28 +23,18 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" - "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/embed" ) func TestSaveLoadTimestamp(t *testing.T) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - re.NoError(err) - defer etcd.Close() - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) storage := NewStorageWithEtcdBackend(client, rootPath) expectedTS := time.Now().Round(0) - err = storage.SaveTimestamp(endpoint.TimestampKey, expectedTS) + err := storage.SaveTimestamp(endpoint.TimestampKey, expectedTS) re.NoError(err) ts, err := storage.LoadTimestamp("") re.NoError(err) @@ -54,16 +44,8 @@ func TestSaveLoadTimestamp(t *testing.T) { func TestGlobalLocalTimestamp(t *testing.T) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - re.NoError(err) - defer etcd.Close() - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) storage := NewStorageWithEtcdBackend(client, rootPath) @@ -73,7 +55,7 @@ func TestGlobalLocalTimestamp(t *testing.T) { l1 := path.Join(ltaKey, dc1LocationKey, endpoint.TimestampKey) l2 := path.Join(ltaKey, dc2LocationKey, endpoint.TimestampKey) - err = storage.SaveTimestamp(l1, localTS1) + err := storage.SaveTimestamp(l1, localTS1) re.NoError(err) globalTS := time.Now().Round(0) err = storage.SaveTimestamp(endpoint.TimestampKey, globalTS) @@ -94,21 +76,13 @@ func TestGlobalLocalTimestamp(t *testing.T) { func TestTimestampTxn(t *testing.T) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - re.NoError(err) - defer etcd.Close() - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) storage := NewStorageWithEtcdBackend(client, rootPath) globalTS1 := time.Now().Round(0) - err = storage.SaveTimestamp(endpoint.TimestampKey, globalTS1) + err := storage.SaveTimestamp(endpoint.TimestampKey, globalTS1) re.NoError(err) globalTS2 := globalTS1.Add(-time.Millisecond).Round(0) diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 2ffa802b4f3..47e2dcb0ebc 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -35,6 +35,7 @@ import ( "github.com/tikv/pd/pkg/mcs/discovery" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" @@ -67,7 +68,8 @@ func (suite *keyspaceGroupManagerTestSuite) SetupSuite() { t := suite.T() suite.ctx, suite.cancel = context.WithCancel(context.Background()) suite.ClusterID = rand.Uint64() - suite.backendEndpoints, suite.etcdClient, suite.clean = startEmbeddedEtcd(t) + servers, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + suite.backendEndpoints, suite.etcdClient, suite.clean = servers[0].Config().LCUrls[0].String(), client, clean suite.cfg = suite.createConfig() } diff --git a/pkg/tso/testutil.go b/pkg/tso/testutil.go index 9225b21dfac..6fc1ccc98a7 100644 --- a/pkg/tso/testutil.go +++ b/pkg/tso/testutil.go @@ -15,14 +15,9 @@ package tso import ( - "testing" "time" - "github.com/stretchr/testify/require" - "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/grpcutil" - "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/embed" ) var _ ServiceConfig = (*TestServiceConfig)(nil) @@ -90,23 +85,3 @@ func (c *TestServiceConfig) GetMaxResetTSGap() time.Duration { func (c *TestServiceConfig) GetTLSConfig() *grpcutil.TLSConfig { return c.TLSConfig } - -func startEmbeddedEtcd(t *testing.T) (backendEndpoint string, etcdClient *clientv3.Client, clean func()) { - re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - re.NoError(err) - clean = func() { - etcd.Close() - } - - backendEndpoint = cfg.LCUrls[0].String() - re.NoError(err) - - etcdClient, err = clientv3.NewFromURL(backendEndpoint) - re.NoError(err) - - <-etcd.Server.ReadyNotify() - - return -} diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index e9731de2329..e4719568f3e 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -48,44 +48,11 @@ func TestMain(m *testing.M) { func TestMemberHelpers(t *testing.T) { re := require.New(t) - cfg1 := NewTestSingleConfig(t) - etcd1, err := embed.StartEtcd(cfg1) - defer func() { - etcd1.Close() - }() - re.NoError(err) - - ep1 := cfg1.LCUrls[0].String() - client1, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep1}, - }) - defer func() { - client1.Close() - }() - re.NoError(err) + servers, client1, clean := NewTestEtcdCluster(t, 2) + defer clean() - <-etcd1.Server.ReadyNotify() - - // Test ListEtcdMembers - listResp1, err := ListEtcdMembers(client1) - re.NoError(err) - re.Len(listResp1.Members, 1) - // types.ID is an alias of uint64. - re.Equal(uint64(etcd1.Server.ID()), listResp1.Members[0].ID) - - // Test AddEtcdMember - etcd2 := checkAddEtcdMember(t, cfg1, client1) - cfg2 := etcd2.Config() - defer etcd2.Close() - ep2 := cfg2.LCUrls[0].String() - client2, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep2}, - }) - defer func() { - client2.Close() - }() - re.NoError(err) - checkMembers(re, client2, []*embed.Etcd{etcd1, etcd2}) + etcd1, etcd2 := servers[0], servers[1] + _, cfg2 := servers[0].Config(), servers[1].Config() // Test CheckClusterID urlsMap, err := types.NewURLsMap(cfg2.InitialCluster) @@ -105,30 +72,15 @@ func TestMemberHelpers(t *testing.T) { func TestEtcdKVGet(t *testing.T) { re := require.New(t) - cfg := NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - defer func() { - client.Close() - }() - re.NoError(err) - - <-etcd.Server.ReadyNotify() + _, client, clean := NewTestEtcdCluster(t, 1) + defer clean() keys := []string{"test/key1", "test/key2", "test/key3", "test/key4", "test/key5"} vals := []string{"val1", "val2", "val3", "val4", "val5"} kv := clientv3.NewKV(client) for i := range keys { - _, err = kv.Put(context.TODO(), keys[i], vals[i]) + _, err := kv.Put(context.TODO(), keys[i], vals[i]) re.NoError(err) } @@ -158,25 +110,10 @@ func TestEtcdKVGet(t *testing.T) { func TestEtcdKVPutWithTTL(t *testing.T) { re := require.New(t) - cfg := NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) + _, client, clean := NewTestEtcdCluster(t, 1) + defer clean() - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - defer func() { - client.Close() - }() - re.NoError(err) - - <-etcd.Server.ReadyNotify() - - _, err = EtcdKVPutWithTTL(context.TODO(), client, "test/ttl1", "val1", 2) + _, err := EtcdKVPutWithTTL(context.TODO(), client, "test/ttl1", "val1", 2) re.NoError(err) _, err = EtcdKVPutWithTTL(context.TODO(), client, "test/ttl2", "val2", 4) re.NoError(err) @@ -201,24 +138,8 @@ func TestEtcdKVPutWithTTL(t *testing.T) { func TestInitClusterID(t *testing.T) { re := require.New(t) - cfg := NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - defer func() { - client.Close() - }() - re.NoError(err) - - <-etcd.Server.ReadyNotify() - + _, client, clean := NewTestEtcdCluster(t, 1) + defer clean() pdClusterIDPath := "test/TestInitClusterID/pd/cluster_id" // Get any cluster key to parse the cluster ID. resp, err := EtcdKVGet(client, pdClusterIDPath) @@ -238,24 +159,12 @@ func TestEtcdClientSync(t *testing.T) { re := require.New(t) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)")) - // Start a etcd server. - cfg1 := NewTestSingleConfig(t) - etcd1, err := embed.StartEtcd(cfg1) - defer func() { - etcd1.Close() - }() - re.NoError(err) - - // Create a etcd client with etcd1 as endpoint. - client1, err := CreateEtcdClient(nil, cfg1.LCUrls) - defer func() { - client1.Close() - }() - re.NoError(err) - <-etcd1.Server.ReadyNotify() + servers, client1, clean := NewTestEtcdCluster(t, 1) + defer clean() + etcd1, cfg1 := servers[0], servers[0].Config() // Add a new member. - etcd2 := checkAddEtcdMember(t, cfg1, client1) + etcd2 := MustAddEtcdMember(t, &cfg1, client1) defer etcd2.Close() checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2}) testutil.Eventually(re, func() bool { @@ -264,7 +173,7 @@ func TestEtcdClientSync(t *testing.T) { }) // Remove the first member and close the etcd1. - _, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID())) + _, err := RemoveEtcdMember(client1, uint64(etcd1.Server.ID())) re.NoError(err) etcd1.Close() @@ -280,13 +189,9 @@ func TestEtcdClientSync(t *testing.T) { func TestEtcdScaleInAndOut(t *testing.T) { re := require.New(t) // Start a etcd server. - cfg1 := NewTestSingleConfig(t) - etcd1, err := embed.StartEtcd(cfg1) - defer func() { - etcd1.Close() - }() - re.NoError(err) - <-etcd1.Server.ReadyNotify() + servers, _, clean := NewTestEtcdCluster(t, 1) + defer clean() + etcd1, cfg1 := servers[0], servers[0].Config() // Create two etcd clients with etcd1 as endpoint. client1, err := CreateEtcdClient(nil, cfg1.LCUrls) // execute member change operation with this client @@ -301,7 +206,7 @@ func TestEtcdScaleInAndOut(t *testing.T) { re.NoError(err) // Add a new member and check members - etcd2 := checkAddEtcdMember(t, cfg1, client1) + etcd2 := MustAddEtcdMember(t, &cfg1, client1) defer func() { etcd2.Close() }() @@ -317,29 +222,14 @@ func TestRandomKillEtcd(t *testing.T) { re := require.New(t) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)")) // Start a etcd server. - cfg1 := NewTestSingleConfig(t) - etcd1, err := embed.StartEtcd(cfg1) - re.NoError(err) - <-etcd1.Server.ReadyNotify() - client1, err := CreateEtcdClient(nil, cfg1.LCUrls) - re.NoError(err) - defer func() { - client1.Close() - }() - - etcd2 := checkAddEtcdMember(t, cfg1, client1) - cfg2 := etcd2.Config() - <-etcd2.Server.ReadyNotify() - - etcd3 := checkAddEtcdMember(t, &cfg2, client1) - <-etcd3.Server.ReadyNotify() - - time.Sleep(1 * time.Second) - re.Len(client1.Endpoints(), 3) + etcds, client1, clean := NewTestEtcdCluster(t, 3) + defer clean() + testutil.Eventually(re, func() bool { + return len(client1.Endpoints()) == 3 + }) // Randomly kill an etcd server and restart it - etcds := []*embed.Etcd{etcd1, etcd2, etcd3} - cfgs := []embed.Config{etcd1.Config(), etcd2.Config(), etcd3.Config()} + cfgs := []embed.Config{etcds[0].Config(), etcds[1].Config(), etcds[2].Config()} for i := 0; i < 10; i++ { killIndex := rand.Intn(len(etcds)) etcds[killIndex].Close() @@ -381,19 +271,14 @@ func TestEtcdWithHangLeaderEnableCheck(t *testing.T) { func checkEtcdWithHangLeader(t *testing.T) error { re := require.New(t) // Start a etcd server. - cfg1 := NewTestSingleConfig(t) - etcd1, err := embed.StartEtcd(cfg1) - defer func() { - etcd1.Close() - }() - re.NoError(err) - ep1 := cfg1.LCUrls[0].String() - <-etcd1.Server.ReadyNotify() + servers, _, clean := NewTestEtcdCluster(t, 1) + defer clean() + etcd1, cfg1 := servers[0], servers[0].Config() // Create a proxy to etcd1. proxyAddr := tempurl.Alloc() var enableDiscard atomic.Bool - go proxyWithDiscard(re, ep1, proxyAddr, &enableDiscard) + go proxyWithDiscard(re, cfg1.LCUrls[0].String(), proxyAddr, &enableDiscard) // Create a etcd client with etcd1 as endpoint. urls, err := types.NewURLs([]string{proxyAddr}) @@ -405,7 +290,7 @@ func checkEtcdWithHangLeader(t *testing.T) error { re.NoError(err) // Add a new member - etcd2 := checkAddEtcdMember(t, cfg1, client1) + etcd2 := MustAddEtcdMember(t, &cfg1, client1) defer etcd2.Close() checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2}) time.Sleep(1 * time.Second) // wait for etcd client sync endpoints @@ -417,40 +302,6 @@ func checkEtcdWithHangLeader(t *testing.T) error { return err } -func checkAddEtcdMember(t *testing.T, cfg1 *embed.Config, client *clientv3.Client) *embed.Etcd { - re := require.New(t) - cfg2 := NewTestSingleConfig(t) - cfg2.Name = genRandName() - cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0]) - cfg2.ClusterState = embed.ClusterStateFlagExisting - peerURL := cfg2.LPUrls[0].String() - addResp, err := AddEtcdMember(client, []string{peerURL}) - re.NoError(err) - etcd2, err := embed.StartEtcd(cfg2) - re.NoError(err) - re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID) - <-etcd2.Server.ReadyNotify() - return etcd2 -} - -func checkMembers(re *require.Assertions, client *clientv3.Client, etcds []*embed.Etcd) { - // Check the client can get the new member. - listResp, err := ListEtcdMembers(client) - re.NoError(err) - re.Len(listResp.Members, len(etcds)) - inList := func(m *etcdserverpb.Member) bool { - for _, etcd := range etcds { - if m.ID == uint64(etcd.Server.ID()) { - return true - } - } - return false - } - for _, m := range listResp.Members { - re.True(inList(m)) - } -} - func proxyWithDiscard(re *require.Assertions, server, proxy string, enableDiscard *atomic.Bool) { server = strings.TrimPrefix(server, "http://") proxy = strings.TrimPrefix(proxy, "http://") @@ -527,7 +378,7 @@ func (suite *loopWatcherTestSuite) SetupSuite() { suite.ctx, suite.cancel = context.WithCancel(context.Background()) suite.cleans = make([]func(), 0) // Start a etcd server and create a client with etcd1 as endpoint. - suite.config = NewTestSingleConfig(t) + suite.config = newTestSingleConfig(t) suite.startEtcd() suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls) suite.NoError(err) diff --git a/pkg/utils/etcdutil/testutil.go b/pkg/utils/etcdutil/testutil.go index 971e93e1ed6..e1ab0937b1b 100644 --- a/pkg/utils/etcdutil/testutil.go +++ b/pkg/utils/etcdutil/testutil.go @@ -21,12 +21,15 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/tempurl" + "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" + "go.etcd.io/etcd/etcdserver/etcdserverpb" ) -// NewTestSingleConfig is used to create a etcd config for the unit test purpose. -func NewTestSingleConfig(t *testing.T) *embed.Config { +// newTestSingleConfig is used to create a etcd config for the unit test purpose. +func newTestSingleConfig(t *testing.T) *embed.Config { cfg := embed.NewConfig() cfg.Name = genRandName() cfg.Dir = t.TempDir() @@ -50,3 +53,76 @@ func NewTestSingleConfig(t *testing.T) *embed.Config { func genRandName() string { return "test_etcd_" + strconv.FormatInt(time.Now().UnixNano()%10000, 10) } + +// NewTestEtcdCluster is used to create a etcd cluster for the unit test purpose. +func NewTestEtcdCluster(t *testing.T, count int) (servers []*embed.Etcd, etcdClient *clientv3.Client, clean func()) { + re := require.New(t) + servers = make([]*embed.Etcd, 0, count) + + cfg := newTestSingleConfig(t) + etcd, err := embed.StartEtcd(cfg) + re.NoError(err) + etcdClient, err = CreateEtcdClient(nil, cfg.LCUrls) + re.NoError(err) + <-etcd.Server.ReadyNotify() + servers = append(servers, etcd) + + for i := 1; i < count; i++ { + listResp, err := ListEtcdMembers(etcdClient) + re.NoError(err) + re.Len(listResp.Members, i) + etcd2 := MustAddEtcdMember(t, cfg, etcdClient) + cfg2 := etcd2.Config() + cfg = &cfg2 + <-etcd2.Server.ReadyNotify() + servers = append(servers, etcd2) + } + + checkMembers(re, etcdClient, servers) + + clean = func() { + etcdClient.Close() + for _, server := range servers { + if server != nil { + server.Close() + } + } + } + + return +} + +// MustAddEtcdMember is used to add a new etcd member to the cluster. +func MustAddEtcdMember(t *testing.T, cfg1 *embed.Config, client *clientv3.Client) *embed.Etcd { + re := require.New(t) + cfg2 := newTestSingleConfig(t) + cfg2.Name = genRandName() + cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0]) + cfg2.ClusterState = embed.ClusterStateFlagExisting + peerURL := cfg2.LPUrls[0].String() + addResp, err := AddEtcdMember(client, []string{peerURL}) + re.NoError(err) + etcd2, err := embed.StartEtcd(cfg2) + re.NoError(err) + re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID) + <-etcd2.Server.ReadyNotify() + return etcd2 +} + +func checkMembers(re *require.Assertions, client *clientv3.Client, etcds []*embed.Etcd) { + // Check the client can get the new member. + listResp, err := ListEtcdMembers(client) + re.NoError(err) + re.Len(listResp.Members, len(etcds)) + inList := func(m *etcdserverpb.Member) bool { + for _, etcd := range etcds { + if m.ID == uint64(etcd.Server.ID()) { + return true + } + } + return false + } + for _, m := range listResp.Members { + re.True(inList(m)) + } +} diff --git a/tools/pd-backup/pdbackup/backup_test.go b/tools/pd-backup/pdbackup/backup_test.go index d93fd77a336..c6814e47160 100644 --- a/tools/pd-backup/pdbackup/backup_test.go +++ b/tools/pd-backup/pdbackup/backup_test.go @@ -14,7 +14,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/storage/endpoint" @@ -48,14 +47,12 @@ type backupTestSuite struct { } func TestBackupTestSuite(t *testing.T) { - re := require.New(t) - - etcd, etcdClient, err := setupEtcd(t) - re.NoError(err) + servers, etcdClient, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() server, serverConfig := setupServer() testSuite := &backupTestSuite{ - etcd: etcd, + etcd: servers[0], etcdClient: etcdClient, server: server, serverConfig: serverConfig, @@ -64,24 +61,6 @@ func TestBackupTestSuite(t *testing.T) { suite.Run(t, testSuite) } -func setupEtcd(t *testing.T) (*embed.Etcd, *clientv3.Client, error) { - etcdCfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(etcdCfg) - if err != nil { - return nil, nil, err - } - - ep := etcdCfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - if err != nil { - return nil, nil, err - } - - return etcd, client, nil -} - func setupServer() (*httptest.Server, *config.Config) { serverConfig := &config.Config{ ClientUrls: "example.com:2379", @@ -120,10 +99,6 @@ func setupServer() (*httptest.Server, *config.Config) { } func (s *backupTestSuite) BeforeTest(suiteName, testName string) { - // the etcd server is set up in TestBackupTestSuite() before the test suite - // runs - <-s.etcd.Server.ReadyNotify() - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() From 60e5f07ed1c6bf7a457627ed617c319ff5eb67c8 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 30 Aug 2023 12:51:04 +0800 Subject: [PATCH 2/7] refactor InitLogger Signed-off-by: lhy1024 --- pkg/audit/audit_test.go | 18 +++--------------- pkg/election/leadership_test.go | 22 ++++------------------ pkg/utils/etcdutil/etcdutil_test.go | 14 ++++---------- pkg/utils/testutil/testutil.go | 13 +++++++++++++ tests/server/api/api_test.go | 23 ++++++----------------- 5 files changed, 30 insertions(+), 60 deletions(-) diff --git a/pkg/audit/audit_test.go b/pkg/audit/audit_test.go index 3cb43ceead0..019db1cbf99 100644 --- a/pkg/audit/audit_test.go +++ b/pkg/audit/audit_test.go @@ -24,11 +24,11 @@ import ( "testing" "time" - "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/requestutil" + "github.com/tikv/pd/pkg/utils/testutil" ) func TestLabelMatcher(t *testing.T) { @@ -93,7 +93,7 @@ func TestLocalLogBackendUsingFile(t *testing.T) { t.Parallel() re := require.New(t) backend := NewLocalLogBackend(true) - fname := initLog() + fname := testutil.InitLog("info") defer os.Remove(fname) req, _ := http.NewRequest(http.MethodGet, "http://127.0.0.1:2379/test?test=test", strings.NewReader("testBody")) re.False(backend.ProcessHTTPRequest(req)) @@ -125,7 +125,7 @@ func BenchmarkLocalLogAuditUsingTerminal(b *testing.B) { func BenchmarkLocalLogAuditUsingFile(b *testing.B) { b.StopTimer() backend := NewLocalLogBackend(true) - fname := initLog() + fname := testutil.InitLog("info") defer os.Remove(fname) req, _ := http.NewRequest(http.MethodGet, "http://127.0.0.1:2379/test?test=test", strings.NewReader("testBody")) b.StartTimer() @@ -135,15 +135,3 @@ func BenchmarkLocalLogAuditUsingFile(b *testing.B) { backend.ProcessHTTPRequest(req) } } - -func initLog() string { - cfg := &log.Config{} - f, _ := os.CreateTemp("/tmp", "pd_tests") - fname := f.Name() - f.Close() - cfg.File.Filename = fname - cfg.Level = "info" - lg, p, _ := log.InitLogger(cfg) - log.ReplaceGlobals(lg, p) - return fname -} diff --git a/pkg/election/leadership_test.go b/pkg/election/leadership_test.go index 372486634ae..1897586f8b5 100644 --- a/pkg/election/leadership_test.go +++ b/pkg/election/leadership_test.go @@ -22,7 +22,6 @@ import ( "time" "github.com/pingcap/failpoint" - "github.com/pingcap/log" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/testutil" @@ -170,14 +169,6 @@ func TestExitWatch(t *testing.T) { }) // Case7: loss the quorum when the watch loop is running checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) func() { - tempStdoutFile, _ := os.CreateTemp("/tmp", "pd_tests") - defer os.Remove(tempStdoutFile.Name()) - logCfg := &log.Config{} - logCfg.File.Filename = tempStdoutFile.Name() - logCfg.Level = "info" - lg, p, _ := log.InitLogger(logCfg) - log.ReplaceGlobals(lg, p) - cfg1 := server.Config() etcd2 := etcdutil.MustAddEtcdMember(t, &cfg1, client) cfg2 := etcd2.Config() @@ -232,13 +223,8 @@ func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embe func TestRequestProgress(t *testing.T) { checkWatcherRequestProgress := func(injectWatchChanBlock bool) { - tempStdoutFile, _ := os.CreateTemp("/tmp", "pd_tests") - defer os.Remove(tempStdoutFile.Name()) - logCfg := &log.Config{} - logCfg.File.Filename = tempStdoutFile.Name() - logCfg.Level = "debug" - lg, p, _ := log.InitLogger(logCfg) - log.ReplaceGlobals(lg, p) + fname := testutil.InitLog("debug") + defer os.Remove(fname) re := require.New(t) servers, client1, clean := etcdutil.NewTestEtcdCluster(t, 1) @@ -266,14 +252,14 @@ func TestRequestProgress(t *testing.T) { if injectWatchChanBlock { failpoint.Enable("github.com/tikv/pd/pkg/election/watchChanBlock", "return(true)") testutil.Eventually(re, func() bool { - b, _ := os.ReadFile(tempStdoutFile.Name()) + b, _ := os.ReadFile(fname) l := string(b) return strings.Contains(l, "watch channel is blocked for a long time") }) failpoint.Disable("github.com/tikv/pd/pkg/election/watchChanBlock") } else { testutil.Eventually(re, func() bool { - b, _ := os.ReadFile(tempStdoutFile.Name()) + b, _ := os.ReadFile(fname) l := string(b) return strings.Contains(l, "watcher receives progress notify in watch loop") }) diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index e4719568f3e..f3178ac5e09 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -29,7 +29,6 @@ import ( "time" "github.com/pingcap/failpoint" - "github.com/pingcap/log" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/utils/tempurl" @@ -622,13 +621,8 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { func (suite *loopWatcherTestSuite) TestWatcherRequestProgress() { checkWatcherRequestProgress := func(injectWatchChanBlock bool) { - tempStdoutFile, _ := os.CreateTemp("/tmp", "pd_tests") - defer os.Remove(tempStdoutFile.Name()) - cfg := &log.Config{} - cfg.File.Filename = tempStdoutFile.Name() - cfg.Level = "debug" - lg, p, _ := log.InitLogger(cfg) - log.ReplaceGlobals(lg, p) + fname := testutil.InitLog("debug") + defer os.Remove(fname) watcher := NewLoopWatcher( suite.ctx, @@ -650,14 +644,14 @@ func (suite *loopWatcherTestSuite) TestWatcherRequestProgress() { if injectWatchChanBlock { failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/watchChanBlock", "return(true)") testutil.Eventually(suite.Require(), func() bool { - b, _ := os.ReadFile(tempStdoutFile.Name()) + b, _ := os.ReadFile(fname) l := string(b) return strings.Contains(l, "watch channel is blocked for a long time") }) failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/watchChanBlock") } else { testutil.Eventually(suite.Require(), func() bool { - b, _ := os.ReadFile(tempStdoutFile.Name()) + b, _ := os.ReadFile(fname) l := string(b) return strings.Contains(l, "watcher receives progress notify in watch loop") }) diff --git a/pkg/utils/testutil/testutil.go b/pkg/utils/testutil/testutil.go index 7d31f2263c6..512b8a32e2c 100644 --- a/pkg/utils/testutil/testutil.go +++ b/pkg/utils/testutil/testutil.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" "github.com/stretchr/testify/require" "google.golang.org/grpc" ) @@ -86,3 +87,15 @@ func CleanServer(dataDir string) { // Clean data directory os.RemoveAll(dataDir) } + +func InitLog(level string) string { + cfg := &log.Config{} + f, _ := os.CreateTemp("/tmp", "pd_tests") + fname := f.Name() + f.Close() + cfg.File.Filename = fname + cfg.Level = level + lg, p, _ := log.InitLogger(cfg) + log.ReplaceGlobals(lg, p) + return fname +} diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 375a0cf7c80..a75b2e28b85 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/pingcap/log" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" @@ -453,13 +452,8 @@ func (suite *middlewareTestSuite) TestAuditPrometheusBackend() { } func (suite *middlewareTestSuite) TestAuditLocalLogBackend() { - tempStdoutFile, _ := os.CreateTemp("/tmp", "pd_tests") - defer os.Remove(tempStdoutFile.Name()) - cfg := &log.Config{} - cfg.File.Filename = tempStdoutFile.Name() - cfg.Level = "info" - lg, p, _ := log.InitLogger(cfg) - log.ReplaceGlobals(lg, p) + fname := testutil.InitLog("info") + defer os.Remove(fname) leader := suite.cluster.GetServer(suite.cluster.GetLeader()) input := map[string]interface{}{ "enable-audit": "true", @@ -477,7 +471,7 @@ func (suite *middlewareTestSuite) TestAuditLocalLogBackend() { suite.NoError(err) _, err = io.ReadAll(resp.Body) resp.Body.Close() - b, _ := os.ReadFile(tempStdoutFile.Name()) + b, _ := os.ReadFile(fname) suite.Contains(string(b), "audit log") suite.NoError(err) suite.Equal(http.StatusOK, resp.StatusCode) @@ -667,13 +661,8 @@ func (suite *redirectorTestSuite) TestNotLeader() { func (suite *redirectorTestSuite) TestXForwardedFor() { leader := suite.cluster.GetServer(suite.cluster.GetLeader()) suite.NoError(leader.BootstrapCluster()) - tempStdoutFile, _ := os.CreateTemp("/tmp", "pd_tests") - defer os.Remove(tempStdoutFile.Name()) - cfg := &log.Config{} - cfg.File.Filename = tempStdoutFile.Name() - cfg.Level = "info" - lg, p, _ := log.InitLogger(cfg) - log.ReplaceGlobals(lg, p) + fname := testutil.InitLog("info") + defer os.Remove(fname) follower := suite.cluster.GetServer(suite.cluster.GetFollower()) addr := follower.GetAddr() + "/pd/api/v1/regions" @@ -684,7 +673,7 @@ func (suite *redirectorTestSuite) TestXForwardedFor() { defer resp.Body.Close() suite.Equal(http.StatusOK, resp.StatusCode) time.Sleep(1 * time.Second) - b, _ := os.ReadFile(tempStdoutFile.Name()) + b, _ := os.ReadFile(fname) l := string(b) suite.Contains(l, "/pd/api/v1/regions") suite.NotContains(l, suite.cluster.GetConfig().GetClientURLs()) From b22a84ea59c46d01dae71a6f76c510867f1f78ce Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 30 Aug 2023 13:05:22 +0800 Subject: [PATCH 3/7] fix lint Signed-off-by: lhy1024 --- pkg/audit/audit_test.go | 4 +-- pkg/election/leadership_test.go | 24 +++++++----------- pkg/encryption/key_manager_test.go | 38 ++++++++++++++--------------- pkg/utils/etcdutil/etcdutil_test.go | 2 +- pkg/utils/testutil/testutil.go | 5 ++-- tests/server/api/api_test.go | 4 +-- 6 files changed, 35 insertions(+), 42 deletions(-) diff --git a/pkg/audit/audit_test.go b/pkg/audit/audit_test.go index 019db1cbf99..d72c67969b4 100644 --- a/pkg/audit/audit_test.go +++ b/pkg/audit/audit_test.go @@ -93,7 +93,7 @@ func TestLocalLogBackendUsingFile(t *testing.T) { t.Parallel() re := require.New(t) backend := NewLocalLogBackend(true) - fname := testutil.InitLog("info") + fname := testutil.InitTempFileLogger("info") defer os.Remove(fname) req, _ := http.NewRequest(http.MethodGet, "http://127.0.0.1:2379/test?test=test", strings.NewReader("testBody")) re.False(backend.ProcessHTTPRequest(req)) @@ -125,7 +125,7 @@ func BenchmarkLocalLogAuditUsingTerminal(b *testing.B) { func BenchmarkLocalLogAuditUsingFile(b *testing.B) { b.StopTimer() backend := NewLocalLogBackend(true) - fname := testutil.InitLog("info") + fname := testutil.InitTempFileLogger("info") defer os.Remove(fname) req, _ := http.NewRequest(http.MethodGet, "http://127.0.0.1:2379/test?test=test", strings.NewReader("testBody")) b.StartTimer() diff --git a/pkg/election/leadership_test.go b/pkg/election/leadership_test.go index 1897586f8b5..048969cfb32 100644 --- a/pkg/election/leadership_test.go +++ b/pkg/election/leadership_test.go @@ -155,16 +155,15 @@ func TestExitWatch(t *testing.T) { checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) func() { cfg1 := server.Config() etcd2 := etcdutil.MustAddEtcdMember(t, &cfg1, client) - client1, err := clientv3.New(clientv3.Config{ - Endpoints: []string{etcd2.Config().LCUrls[0].String()}, - }) + client2, err := etcdutil.CreateEtcdClient(nil, etcd2.Config().LCUrls) re.NoError(err) // close the original leader server.Server.HardStop() // delete the leader key with the new client - client1.Delete(context.Background(), leaderKey) + client2.Delete(context.Background(), leaderKey) return func() { etcd2.Close() + client2.Close() } }) // Case7: loss the quorum when the watch loop is running @@ -190,11 +189,9 @@ func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embe re := require.New(t) servers, client1, clean := etcdutil.NewTestEtcdCluster(t, 1) defer clean() - - client2, err := clientv3.New(clientv3.Config{ - Endpoints: []string{servers[0].Config().LCUrls[0].String()}, - }) + client2, err := etcdutil.CreateEtcdClient(nil, servers[0].Config().LCUrls) re.NoError(err) + defer client2.Close() leadership1 := NewLeadership(client1, leaderKey, "test_leader_1") leadership2 := NewLeadership(client2, leaderKey, "test_leader_2") @@ -223,17 +220,14 @@ func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embe func TestRequestProgress(t *testing.T) { checkWatcherRequestProgress := func(injectWatchChanBlock bool) { - fname := testutil.InitLog("debug") - defer os.Remove(fname) - re := require.New(t) + fname := testutil.InitTempFileLogger("debug") + defer os.Remove(fname) servers, client1, clean := etcdutil.NewTestEtcdCluster(t, 1) defer clean() - - client2, err := clientv3.New(clientv3.Config{ - Endpoints: []string{servers[0].Config().LCUrls[0].String()}, - }) + client2, err := etcdutil.CreateEtcdClient(nil, servers[0].Config().LCUrls) re.NoError(err) + defer client2.Close() leaderKey := "/test_leader" leadership1 := NewLeadership(client1, leaderKey, "test_leader_1") diff --git a/pkg/encryption/key_manager_test.go b/pkg/encryption/key_manager_test.go index ac68fb94cc2..d33f6f47b13 100644 --- a/pkg/encryption/key_manager_test.go +++ b/pkg/encryption/key_manager_test.go @@ -45,13 +45,11 @@ func getTestDataKey() []byte { return key } -func newTestEtcd(t *testing.T, re *require.Assertions) (client *clientv3.Client) { +func newTestEtcd(t *testing.T) (client *clientv3.Client) { _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) - t.Cleanup(func() { clean() }) - return client } @@ -87,7 +85,7 @@ func checkMasterKeyMeta(re *require.Assertions, value []byte, meta *encryptionpb func TestNewKeyManagerBasic(t *testing.T) { re := require.New(t) // Initialize. - client := newTestEtcd(t, re) + client := newTestEtcd(t) // Use default config. config := &Config{} err := config.Adjust() @@ -109,7 +107,7 @@ func TestNewKeyManagerBasic(t *testing.T) { func TestNewKeyManagerWithCustomConfig(t *testing.T) { re := require.New(t) // Initialize. - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) // Custom config rotatePeriod, err := time.ParseDuration("100h") @@ -147,7 +145,7 @@ func TestNewKeyManagerWithCustomConfig(t *testing.T) { func TestNewKeyManagerLoadKeys(t *testing.T) { re := require.New(t) // Initialize. - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Use default config. @@ -188,7 +186,7 @@ func TestNewKeyManagerLoadKeys(t *testing.T) { func TestGetCurrentKey(t *testing.T) { re := require.New(t) // Initialize. - client := newTestEtcd(t, re) + client := newTestEtcd(t) // Use default config. config := &Config{} err := config.Adjust() @@ -231,7 +229,7 @@ func TestGetCurrentKey(t *testing.T) { func TestGetKey(t *testing.T) { re := require.New(t) // Initialize. - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Store initial keys in etcd. @@ -286,7 +284,7 @@ func TestGetKey(t *testing.T) { func TestLoadKeyEmpty(t *testing.T) { re := require.New(t) // Initialize. - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Store initial keys in etcd. @@ -322,7 +320,7 @@ func TestWatcher(t *testing.T) { // Initialize. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Setup helper @@ -398,7 +396,7 @@ func TestWatcher(t *testing.T) { func TestSetLeadershipWithEncryptionOff(t *testing.T) { re := require.New(t) // Initialize. - client := newTestEtcd(t, re) + client := newTestEtcd(t) // Use default config. config := &Config{} err := config.Adjust() @@ -423,7 +421,7 @@ func TestSetLeadershipWithEncryptionEnabling(t *testing.T) { // Initialize. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Setup helper @@ -476,7 +474,7 @@ func TestSetLeadershipWithEncryptionMethodChanged(t *testing.T) { // Initialize. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Setup helper @@ -552,7 +550,7 @@ func TestSetLeadershipWithCurrentKeyExposed(t *testing.T) { // Initialize. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Setup helper @@ -623,7 +621,7 @@ func TestSetLeadershipWithCurrentKeyExpired(t *testing.T) { // Initialize. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Setup helper @@ -698,7 +696,7 @@ func TestSetLeadershipWithMasterKeyChanged(t *testing.T) { // Initialize. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) keyFile2 := newTestKeyFile(t, re, testMasterKey2) leadership := newTestLeader(re, client) @@ -763,7 +761,7 @@ func TestSetLeadershipWithMasterKeyChanged(t *testing.T) { func TestSetLeadershipMasterKeyWithCiphertextKey(t *testing.T) { re := require.New(t) // Initialize. - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Setup helper @@ -841,7 +839,7 @@ func TestSetLeadershipWithEncryptionDisabling(t *testing.T) { // Initialize. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Setup helper @@ -897,7 +895,7 @@ func TestKeyRotation(t *testing.T) { // Initialize. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Setup helper @@ -993,7 +991,7 @@ func TestKeyRotationConflict(t *testing.T) { // Initialize. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Setup helper diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index f3178ac5e09..b28c4365b0e 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -621,7 +621,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { func (suite *loopWatcherTestSuite) TestWatcherRequestProgress() { checkWatcherRequestProgress := func(injectWatchChanBlock bool) { - fname := testutil.InitLog("debug") + fname := testutil.InitTempFileLogger("debug") defer os.Remove(fname) watcher := NewLoopWatcher( diff --git a/pkg/utils/testutil/testutil.go b/pkg/utils/testutil/testutil.go index 512b8a32e2c..a48db0bd60f 100644 --- a/pkg/utils/testutil/testutil.go +++ b/pkg/utils/testutil/testutil.go @@ -88,10 +88,11 @@ func CleanServer(dataDir string) { os.RemoveAll(dataDir) } -func InitLog(level string) string { +// InitTempFileLogger initializes the logger and redirects the log output to a temporary file. +func InitTempFileLogger(level string) (fname string) { cfg := &log.Config{} f, _ := os.CreateTemp("/tmp", "pd_tests") - fname := f.Name() + fname = f.Name() f.Close() cfg.File.Filename = fname cfg.Level = level diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index a75b2e28b85..9c7d9c82deb 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -452,7 +452,7 @@ func (suite *middlewareTestSuite) TestAuditPrometheusBackend() { } func (suite *middlewareTestSuite) TestAuditLocalLogBackend() { - fname := testutil.InitLog("info") + fname := testutil.InitTempFileLogger("info") defer os.Remove(fname) leader := suite.cluster.GetServer(suite.cluster.GetLeader()) input := map[string]interface{}{ @@ -661,7 +661,7 @@ func (suite *redirectorTestSuite) TestNotLeader() { func (suite *redirectorTestSuite) TestXForwardedFor() { leader := suite.cluster.GetServer(suite.cluster.GetLeader()) suite.NoError(leader.BootstrapCluster()) - fname := testutil.InitLog("info") + fname := testutil.InitTempFileLogger("info") defer os.Remove(fname) follower := suite.cluster.GetServer(suite.cluster.GetFollower()) From 11f0a90c27523919b9f436b4bbea73a699385897 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 30 Aug 2023 14:16:54 +0800 Subject: [PATCH 4/7] fix #6890 Signed-off-by: lhy1024 --- pkg/storage/storage_tso_test.go | 28 ++++++++++------------------ pkg/utils/etcdutil/etcdutil_test.go | 16 ++++------------ pkg/utils/etcdutil/testutil.go | 19 ++++++++++++++++++- 3 files changed, 32 insertions(+), 31 deletions(-) diff --git a/pkg/storage/storage_tso_test.go b/pkg/storage/storage_tso_test.go index dd8c02d8497..1666abc4003 100644 --- a/pkg/storage/storage_tso_test.go +++ b/pkg/storage/storage_tso_test.go @@ -27,12 +27,7 @@ import ( func TestSaveLoadTimestamp(t *testing.T) { re := require.New(t) - - _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) - defer clean() - rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) - storage := NewStorageWithEtcdBackend(client, rootPath) - + storage := newTestStorage(t) expectedTS := time.Now().Round(0) err := storage.SaveTimestamp(endpoint.TimestampKey, expectedTS) re.NoError(err) @@ -43,12 +38,7 @@ func TestSaveLoadTimestamp(t *testing.T) { func TestGlobalLocalTimestamp(t *testing.T) { re := require.New(t) - - _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) - defer clean() - rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) - storage := NewStorageWithEtcdBackend(client, rootPath) - + storage := newTestStorage(t) ltaKey := "lta" dc1LocationKey, dc2LocationKey := "dc1", "dc2" localTS1 := time.Now().Round(0) @@ -75,12 +65,7 @@ func TestGlobalLocalTimestamp(t *testing.T) { func TestTimestampTxn(t *testing.T) { re := require.New(t) - - _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) - defer clean() - rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) - storage := NewStorageWithEtcdBackend(client, rootPath) - + storage := newTestStorage(t) globalTS1 := time.Now().Round(0) err := storage.SaveTimestamp(endpoint.TimestampKey, globalTS1) re.NoError(err) @@ -93,3 +78,10 @@ func TestTimestampTxn(t *testing.T) { re.NoError(err) re.Equal(globalTS1, ts) } + +func newTestStorage(t *testing.T) Storage { + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() + rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) + return NewStorageWithEtcdBackend(client, rootPath) +} diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index b28c4365b0e..827216379f0 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -194,21 +194,15 @@ func TestEtcdScaleInAndOut(t *testing.T) { // Create two etcd clients with etcd1 as endpoint. client1, err := CreateEtcdClient(nil, cfg1.LCUrls) // execute member change operation with this client - defer func() { - client1.Close() - }() re.NoError(err) + defer client1.Close() client2, err := CreateEtcdClient(nil, cfg1.LCUrls) // check member change with this client - defer func() { - client2.Close() - }() re.NoError(err) + defer client2.Close() // Add a new member and check members etcd2 := MustAddEtcdMember(t, &cfg1, client1) - defer func() { - etcd2.Close() - }() + defer etcd2.Close() checkMembers(re, client2, []*embed.Etcd{etcd1, etcd2}) // scale in etcd1 @@ -283,10 +277,8 @@ func checkEtcdWithHangLeader(t *testing.T) error { urls, err := types.NewURLs([]string{proxyAddr}) re.NoError(err) client1, err := CreateEtcdClient(nil, urls) - defer func() { - client1.Close() - }() re.NoError(err) + defer client1.Close() // Add a new member etcd2 := MustAddEtcdMember(t, &cfg1, client1) diff --git a/pkg/utils/etcdutil/testutil.go b/pkg/utils/etcdutil/testutil.go index e1ab0937b1b..76db21e5c77 100644 --- a/pkg/utils/etcdutil/testutil.go +++ b/pkg/utils/etcdutil/testutil.go @@ -68,9 +68,11 @@ func NewTestEtcdCluster(t *testing.T, count int) (servers []*embed.Etcd, etcdCli servers = append(servers, etcd) for i := 1; i < count; i++ { + // Check the client can get the new member. listResp, err := ListEtcdMembers(etcdClient) re.NoError(err) re.Len(listResp.Members, i) + // Add a new member. etcd2 := MustAddEtcdMember(t, cfg, etcdClient) cfg2 := etcd2.Config() cfg = &cfg2 @@ -94,6 +96,10 @@ func NewTestEtcdCluster(t *testing.T, count int) (servers []*embed.Etcd, etcdCli // MustAddEtcdMember is used to add a new etcd member to the cluster. func MustAddEtcdMember(t *testing.T, cfg1 *embed.Config, client *clientv3.Client) *embed.Etcd { + return addEtcdMemberWithRetry(t, cfg1, client, 3) +} + +func addEtcdMemberWithRetry(t *testing.T, cfg1 *embed.Config, client *clientv3.Client, retry int) *embed.Etcd { re := require.New(t) cfg2 := newTestSingleConfig(t) cfg2.Name = genRandName() @@ -102,8 +108,19 @@ func MustAddEtcdMember(t *testing.T, cfg1 *embed.Config, client *clientv3.Client peerURL := cfg2.LPUrls[0].String() addResp, err := AddEtcdMember(client, []string{peerURL}) re.NoError(err) - etcd2, err := embed.StartEtcd(cfg2) + // Check the client can get the new member. + members, err := ListEtcdMembers(client) re.NoError(err) + re.Len(addResp.Members, len(members.Members)) + // Start the new etcd member. + etcd2, err := embed.StartEtcd(cfg2) + if err != nil { + re.Contains(err.Error(), "error validating peerURLs") + if retry > 0 { + return addEtcdMemberWithRetry(t, cfg1, client, retry-1) + } + } + re.NoError(err, "addEtcdMemberWithRetry failed after retry") re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID) <-etcd2.Server.ReadyNotify() return etcd2 From 4ccfaf66eb7a5bba472eca1469a5681fa65e883a Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 30 Aug 2023 14:30:46 +0800 Subject: [PATCH 5/7] fix test Signed-off-by: lhy1024 --- pkg/storage/storage_tso_test.go | 14 ++++++++------ pkg/utils/etcdutil/testutil.go | 2 ++ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/pkg/storage/storage_tso_test.go b/pkg/storage/storage_tso_test.go index 1666abc4003..9718565d78f 100644 --- a/pkg/storage/storage_tso_test.go +++ b/pkg/storage/storage_tso_test.go @@ -27,7 +27,8 @@ import ( func TestSaveLoadTimestamp(t *testing.T) { re := require.New(t) - storage := newTestStorage(t) + storage, clean := newTestStorage(t) + defer clean() expectedTS := time.Now().Round(0) err := storage.SaveTimestamp(endpoint.TimestampKey, expectedTS) re.NoError(err) @@ -38,7 +39,8 @@ func TestSaveLoadTimestamp(t *testing.T) { func TestGlobalLocalTimestamp(t *testing.T) { re := require.New(t) - storage := newTestStorage(t) + storage, clean := newTestStorage(t) + defer clean() ltaKey := "lta" dc1LocationKey, dc2LocationKey := "dc1", "dc2" localTS1 := time.Now().Round(0) @@ -65,7 +67,8 @@ func TestGlobalLocalTimestamp(t *testing.T) { func TestTimestampTxn(t *testing.T) { re := require.New(t) - storage := newTestStorage(t) + storage, clean := newTestStorage(t) + defer clean() globalTS1 := time.Now().Round(0) err := storage.SaveTimestamp(endpoint.TimestampKey, globalTS1) re.NoError(err) @@ -79,9 +82,8 @@ func TestTimestampTxn(t *testing.T) { re.Equal(globalTS1, ts) } -func newTestStorage(t *testing.T) Storage { +func newTestStorage(t *testing.T) (Storage, func()) { _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) - defer clean() rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) - return NewStorageWithEtcdBackend(client, rootPath) + return NewStorageWithEtcdBackend(client, rootPath), clean } diff --git a/pkg/utils/etcdutil/testutil.go b/pkg/utils/etcdutil/testutil.go index 76db21e5c77..b2a1497f2be 100644 --- a/pkg/utils/etcdutil/testutil.go +++ b/pkg/utils/etcdutil/testutil.go @@ -117,6 +117,8 @@ func addEtcdMemberWithRetry(t *testing.T, cfg1 *embed.Config, client *clientv3.C if err != nil { re.Contains(err.Error(), "error validating peerURLs") if retry > 0 { + _, err := RemoveEtcdMember(client, addResp.Member.ID) + re.NoError(err) return addEtcdMemberWithRetry(t, cfg1, client, retry-1) } } From 4af12f32fb15475147e16e5adb5e141ab34af0e4 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 30 Aug 2023 14:55:41 +0800 Subject: [PATCH 6/7] address comments Signed-off-by: lhy1024 --- pkg/utils/etcdutil/etcdutil_test.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 827216379f0..6c8aa9f0d9c 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -47,14 +47,24 @@ func TestMain(m *testing.M) { func TestMemberHelpers(t *testing.T) { re := require.New(t) - servers, client1, clean := NewTestEtcdCluster(t, 2) + servers, client1, clean := NewTestEtcdCluster(t, 1) defer clean() + etcd1, cfg1 := servers[0], servers[0].Config() + + // Test ListEtcdMembers + listResp1, err := ListEtcdMembers(client1) + re.NoError(err) + re.Len(listResp1.Members, 1) + // types.ID is an alias of uint64. + re.Equal(uint64(etcd1.Server.ID()), listResp1.Members[0].ID) - etcd1, etcd2 := servers[0], servers[1] - _, cfg2 := servers[0].Config(), servers[1].Config() + // Test AddEtcdMember + etcd2 := MustAddEtcdMember(t, &cfg1, client1) + defer etcd2.Close() + checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2}) // Test CheckClusterID - urlsMap, err := types.NewURLsMap(cfg2.InitialCluster) + urlsMap, err := types.NewURLsMap(etcd2.Config().InitialCluster) re.NoError(err) err = CheckClusterID(etcd1.Server.Cluster().ID(), urlsMap, &tls.Config{MinVersion: tls.VersionTLS12}) re.NoError(err) From b68b701c0e281a142cd0a2c42126ec5cc5036dbb Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 1 Sep 2023 19:04:53 +0800 Subject: [PATCH 7/7] remove recursion Signed-off-by: lhy1024 --- pkg/utils/etcdutil/testutil.go | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/pkg/utils/etcdutil/testutil.go b/pkg/utils/etcdutil/testutil.go index b2a1497f2be..54ba38b93b6 100644 --- a/pkg/utils/etcdutil/testutil.go +++ b/pkg/utils/etcdutil/testutil.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/tempurl" + "github.com/tikv/pd/pkg/utils/testutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" "go.etcd.io/etcd/etcdserver/etcdserverpb" @@ -94,12 +95,8 @@ func NewTestEtcdCluster(t *testing.T, count int) (servers []*embed.Etcd, etcdCli return } -// MustAddEtcdMember is used to add a new etcd member to the cluster. +// MustAddEtcdMember is used to add a new etcd member to the cluster for test. func MustAddEtcdMember(t *testing.T, cfg1 *embed.Config, client *clientv3.Client) *embed.Etcd { - return addEtcdMemberWithRetry(t, cfg1, client, 3) -} - -func addEtcdMemberWithRetry(t *testing.T, cfg1 *embed.Config, client *clientv3.Client, retry int) *embed.Etcd { re := require.New(t) cfg2 := newTestSingleConfig(t) cfg2.Name = genRandName() @@ -109,20 +106,14 @@ func addEtcdMemberWithRetry(t *testing.T, cfg1 *embed.Config, client *clientv3.C addResp, err := AddEtcdMember(client, []string{peerURL}) re.NoError(err) // Check the client can get the new member. - members, err := ListEtcdMembers(client) - re.NoError(err) - re.Len(addResp.Members, len(members.Members)) + testutil.Eventually(re, func() bool { + members, err := ListEtcdMembers(client) + re.NoError(err) + return len(addResp.Members) == len(members.Members) + }) // Start the new etcd member. etcd2, err := embed.StartEtcd(cfg2) - if err != nil { - re.Contains(err.Error(), "error validating peerURLs") - if retry > 0 { - _, err := RemoveEtcdMember(client, addResp.Member.ID) - re.NoError(err) - return addEtcdMemberWithRetry(t, cfg1, client, retry-1) - } - } - re.NoError(err, "addEtcdMemberWithRetry failed after retry") + re.NoError(err) re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID) <-etcd2.Server.ReadyNotify() return etcd2