From 74e95e0871fabd60fe86cb4dda54fe8ee9568423 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 4 Sep 2023 15:26:13 +0800 Subject: [PATCH] test: reduce redundant code about etcd server and logger (#7007) close tikv/pd#6890 Signed-off-by: lhy1024 --- pkg/audit/audit_test.go | 18 +- pkg/election/leadership_test.go | 132 +++---------- pkg/election/lease_test.go | 36 ++-- pkg/encryption/key_manager_test.go | 68 ++----- pkg/id/id_test.go | 20 +- pkg/mcs/discovery/discover_test.go | 36 +--- pkg/mcs/discovery/register_test.go | 16 +- pkg/storage/kv/kv_test.go | 13 +- pkg/storage/storage_tso_test.go | 62 ++---- pkg/tso/keyspace_group_manager_test.go | 4 +- pkg/tso/testutil.go | 25 --- pkg/utils/etcdutil/etcdutil_test.go | 239 +++++------------------- pkg/utils/etcdutil/testutil.go | 90 ++++++++- pkg/utils/testutil/testutil.go | 14 ++ tests/server/api/api_test.go | 12 +- tools/pd-backup/pdbackup/backup_test.go | 31 +-- 16 files changed, 253 insertions(+), 563 deletions(-) diff --git a/pkg/audit/audit_test.go b/pkg/audit/audit_test.go index be9416fde13..42d742ed243 100644 --- a/pkg/audit/audit_test.go +++ b/pkg/audit/audit_test.go @@ -24,11 +24,11 @@ import ( "testing" "time" - "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/requestutil" + "github.com/tikv/pd/pkg/utils/testutil" ) func TestLabelMatcher(t *testing.T) { @@ -93,7 +93,7 @@ func TestLocalLogBackendUsingFile(t *testing.T) { t.Parallel() re := require.New(t) backend := NewLocalLogBackend(true) - fname := initLog() + fname := testutil.InitTempFileLogger("info") defer os.RemoveAll(fname) req, _ := http.NewRequest(http.MethodGet, "http://127.0.0.1:2379/test?test=test", strings.NewReader("testBody")) re.False(backend.ProcessHTTPRequest(req)) @@ -125,7 +125,7 @@ func BenchmarkLocalLogAuditUsingTerminal(b *testing.B) { func BenchmarkLocalLogAuditUsingFile(b *testing.B) { b.StopTimer() backend := NewLocalLogBackend(true) - fname := initLog() + fname := testutil.InitTempFileLogger("info") defer os.RemoveAll(fname) req, _ := http.NewRequest(http.MethodGet, "http://127.0.0.1:2379/test?test=test", strings.NewReader("testBody")) b.StartTimer() @@ -135,15 +135,3 @@ func BenchmarkLocalLogAuditUsingFile(b *testing.B) { backend.ProcessHTTPRequest(req) } } - -func initLog() string { - cfg := &log.Config{} - f, _ := os.CreateTemp("/tmp", "pd_tests") - fname := f.Name() - f.Close() - cfg.File.Filename = fname - cfg.Level = "info" - lg, p, _ := log.InitLogger(cfg) - log.ReplaceGlobals(lg, p) - return fname -} diff --git a/pkg/election/leadership_test.go b/pkg/election/leadership_test.go index eec1b0c0b01..0afe17eccf1 100644 --- a/pkg/election/leadership_test.go +++ b/pkg/election/leadership_test.go @@ -16,14 +16,12 @@ package election import ( "context" - "fmt" "os" "strings" "testing" "time" "github.com/pingcap/failpoint" - "github.com/pingcap/log" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/testutil" @@ -35,27 +33,15 @@ const defaultLeaseTimeout = 1 func TestLeadership(t *testing.T) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) - - <-etcd.Server.ReadyNotify() + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() // Campaign the same leadership leadership1 := NewLeadership(client, "/test_leader", "test_leader_1") leadership2 := NewLeadership(client, "/test_leader", "test_leader_2") // leadership1 starts first and get the leadership - err = leadership1.Campaign(defaultLeaseTimeout, "test_leader_1") + err := leadership1.Campaign(defaultLeaseTimeout, "test_leader_1") re.NoError(err) // leadership2 starts then and can not get the leadership err = leadership2.Campaign(defaultLeaseTimeout, "test_leader_2") @@ -168,60 +154,24 @@ func TestExitWatch(t *testing.T) { // Case6: transfer leader without client reconnection. checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) func() { cfg1 := server.Config() - cfg2 := etcdutil.NewTestSingleConfig(t) - cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0]) - cfg2.ClusterState = embed.ClusterStateFlagExisting - peerURL := cfg2.LPUrls[0].String() - addResp, err := etcdutil.AddEtcdMember(client, []string{peerURL}) - re.NoError(err) - etcd2, err := embed.StartEtcd(cfg2) + etcd2 := etcdutil.MustAddEtcdMember(t, &cfg1, client) + client2, err := etcdutil.CreateEtcdClient(nil, etcd2.Config().LCUrls) re.NoError(err) - re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID) - <-etcd2.Server.ReadyNotify() - ep := cfg2.LCUrls[0].String() - client1, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) - + // close the original leader server.Server.HardStop() - client1.Delete(context.Background(), leaderKey) + // delete the leader key with the new client + client2.Delete(context.Background(), leaderKey) return func() { etcd2.Close() + client2.Close() } }) // Case7: loss the quorum when the watch loop is running checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) func() { - tempStdoutFile, _ := os.CreateTemp("/tmp", "pd_tests") - defer os.RemoveAll(tempStdoutFile.Name()) - logCfg := &log.Config{} - logCfg.File.Filename = tempStdoutFile.Name() - logCfg.Level = "info" - lg, p, _ := log.InitLogger(logCfg) - log.ReplaceGlobals(lg, p) - cfg1 := server.Config() - cfg2 := etcdutil.NewTestSingleConfig(t) - cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0]) - cfg2.ClusterState = embed.ClusterStateFlagExisting - peerURL := cfg2.LPUrls[0].String() - addResp, err := etcdutil.AddEtcdMember(client, []string{peerURL}) - re.NoError(err) - etcd2, err := embed.StartEtcd(cfg2) - re.NoError(err) - re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID) - <-etcd2.Server.ReadyNotify() - - cfg3 := etcdutil.NewTestSingleConfig(t) - cfg3.InitialCluster = cfg2.InitialCluster + fmt.Sprintf(",%s=%s", cfg3.Name, &cfg3.LPUrls[0]) - cfg3.ClusterState = embed.ClusterStateFlagExisting - peerURL = cfg3.LPUrls[0].String() - addResp, err = etcdutil.AddEtcdMember(client, []string{peerURL}) - re.NoError(err) - etcd3, err := embed.StartEtcd(cfg3) - re.NoError(err) - re.Equal(uint64(etcd3.Server.ID()), addResp.Member.ID) - <-etcd3.Server.ReadyNotify() + etcd2 := etcdutil.MustAddEtcdMember(t, &cfg1, client) + cfg2 := etcd2.Config() + etcd3 := etcdutil.MustAddEtcdMember(t, &cfg2, client) resp2, err := client.MemberList(context.Background()) re.NoError(err) @@ -237,24 +187,11 @@ func TestExitWatch(t *testing.T) { func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embed.Etcd, client *clientv3.Client) func()) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) - - ep := cfg.LCUrls[0].String() - client1, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) - client2, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) + servers, client1, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() + client2, err := etcdutil.CreateEtcdClient(nil, servers[0].Config().LCUrls) re.NoError(err) - - <-etcd.Server.ReadyNotify() + defer client2.Close() leadership1 := NewLeadership(client1, leaderKey, "test_leader_1") leadership2 := NewLeadership(client2, leaderKey, "test_leader_2") @@ -268,7 +205,7 @@ func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embe done <- struct{}{} }() - cleanFunc := injectFunc(etcd, client2) + cleanFunc := injectFunc(servers[0], client2) defer cleanFunc() testutil.Eventually(re, func() bool { @@ -283,33 +220,14 @@ func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embe func TestRequestProgress(t *testing.T) { checkWatcherRequestProgress := func(injectWatchChanBlock bool) { - tempStdoutFile, _ := os.CreateTemp("/tmp", "pd_tests") - defer os.RemoveAll(tempStdoutFile.Name()) - logCfg := &log.Config{} - logCfg.File.Filename = tempStdoutFile.Name() - logCfg.Level = "debug" - lg, p, _ := log.InitLogger(logCfg) - log.ReplaceGlobals(lg, p) - re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() + fname := testutil.InitTempFileLogger("debug") + defer os.RemoveAll(fname) + servers, client1, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() + client2, err := etcdutil.CreateEtcdClient(nil, servers[0].Config().LCUrls) re.NoError(err) - - ep := cfg.LCUrls[0].String() - client1, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) - client2, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) - - <-etcd.Server.ReadyNotify() + defer client2.Close() leaderKey := "/test_leader" leadership1 := NewLeadership(client1, leaderKey, "test_leader_1") @@ -328,14 +246,14 @@ func TestRequestProgress(t *testing.T) { if injectWatchChanBlock { failpoint.Enable("github.com/tikv/pd/pkg/election/watchChanBlock", "return(true)") testutil.Eventually(re, func() bool { - b, _ := os.ReadFile(tempStdoutFile.Name()) + b, _ := os.ReadFile(fname) l := string(b) return strings.Contains(l, "watch channel is blocked for a long time") }) failpoint.Disable("github.com/tikv/pd/pkg/election/watchChanBlock") } else { testutil.Eventually(re, func() bool { - b, _ := os.ReadFile(tempStdoutFile.Name()) + b, _ := os.ReadFile(fname) l := string(b) return strings.Contains(l, "watcher receives progress notify in watch loop") }) diff --git a/pkg/election/lease_test.go b/pkg/election/lease_test.go index dd10108277c..3d8515eadb2 100644 --- a/pkg/election/lease_test.go +++ b/pkg/election/lease_test.go @@ -22,25 +22,12 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/etcdutil" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/embed" ) func TestLease(t *testing.T) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) - - <-etcd.Server.ReadyNotify() + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() // Create the lease. lease1 := &lease{ @@ -101,3 +88,22 @@ func TestLease(t *testing.T) { time.Sleep((defaultLeaseTimeout + 1) * time.Second) re.True(lease1.IsExpired()) } + +func TestLeaseKeepAlive(t *testing.T) { + re := require.New(t) + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() + + // Create the lease. + lease := &lease{ + Purpose: "test_lease", + client: client, + lease: clientv3.NewLease(client), + } + + re.NoError(lease.Grant(defaultLeaseTimeout)) + ch := lease.keepAliveWorker(context.Background(), 2*time.Second) + time.Sleep(2 * time.Second) + <-ch + re.NoError(lease.Close()) +} diff --git a/pkg/encryption/key_manager_test.go b/pkg/encryption/key_manager_test.go index a8c2493adc7..d33f6f47b13 100644 --- a/pkg/encryption/key_manager_test.go +++ b/pkg/encryption/key_manager_test.go @@ -17,8 +17,6 @@ package encryption import ( "context" "encoding/hex" - "fmt" - "net/url" "os" "path/filepath" "sync/atomic" @@ -31,10 +29,8 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/utils/etcdutil" - "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/embed" ) const ( @@ -49,35 +45,11 @@ func getTestDataKey() []byte { return key } -func newTestEtcd(t *testing.T, re *require.Assertions) (client *clientv3.Client) { - cfg := embed.NewConfig() - cfg.Name = "test_etcd" - cfg.Dir = t.TempDir() - cfg.Logger = "zap" - pu, err := url.Parse(tempurl.Alloc()) - re.NoError(err) - cfg.LPUrls = []url.URL{*pu} - cfg.APUrls = cfg.LPUrls - cu, err := url.Parse(tempurl.Alloc()) - re.NoError(err) - cfg.LCUrls = []url.URL{*cu} - cfg.ACUrls = cfg.LCUrls - cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, &cfg.LPUrls[0]) - cfg.ClusterState = embed.ClusterStateFlagNew - server, err := embed.StartEtcd(cfg) - re.NoError(err) - <-server.Server.ReadyNotify() - - client, err = clientv3.New(clientv3.Config{ - Endpoints: []string{cfg.LCUrls[0].String()}, - }) - re.NoError(err) - +func newTestEtcd(t *testing.T) (client *clientv3.Client) { + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) t.Cleanup(func() { - client.Close() - server.Close() + clean() }) - return client } @@ -113,7 +85,7 @@ func checkMasterKeyMeta(re *require.Assertions, value []byte, meta *encryptionpb func TestNewKeyManagerBasic(t *testing.T) { re := require.New(t) // Initialize. - client := newTestEtcd(t, re) + client := newTestEtcd(t) // Use default config. config := &Config{} err := config.Adjust() @@ -135,7 +107,7 @@ func TestNewKeyManagerBasic(t *testing.T) { func TestNewKeyManagerWithCustomConfig(t *testing.T) { re := require.New(t) // Initialize. - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) // Custom config rotatePeriod, err := time.ParseDuration("100h") @@ -173,7 +145,7 @@ func TestNewKeyManagerWithCustomConfig(t *testing.T) { func TestNewKeyManagerLoadKeys(t *testing.T) { re := require.New(t) // Initialize. - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Use default config. @@ -214,7 +186,7 @@ func TestNewKeyManagerLoadKeys(t *testing.T) { func TestGetCurrentKey(t *testing.T) { re := require.New(t) // Initialize. - client := newTestEtcd(t, re) + client := newTestEtcd(t) // Use default config. config := &Config{} err := config.Adjust() @@ -257,7 +229,7 @@ func TestGetCurrentKey(t *testing.T) { func TestGetKey(t *testing.T) { re := require.New(t) // Initialize. - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Store initial keys in etcd. @@ -312,7 +284,7 @@ func TestGetKey(t *testing.T) { func TestLoadKeyEmpty(t *testing.T) { re := require.New(t) // Initialize. - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Store initial keys in etcd. @@ -348,7 +320,7 @@ func TestWatcher(t *testing.T) { // Initialize. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Setup helper @@ -424,7 +396,7 @@ func TestWatcher(t *testing.T) { func TestSetLeadershipWithEncryptionOff(t *testing.T) { re := require.New(t) // Initialize. - client := newTestEtcd(t, re) + client := newTestEtcd(t) // Use default config. config := &Config{} err := config.Adjust() @@ -449,7 +421,7 @@ func TestSetLeadershipWithEncryptionEnabling(t *testing.T) { // Initialize. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Setup helper @@ -502,7 +474,7 @@ func TestSetLeadershipWithEncryptionMethodChanged(t *testing.T) { // Initialize. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Setup helper @@ -578,7 +550,7 @@ func TestSetLeadershipWithCurrentKeyExposed(t *testing.T) { // Initialize. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Setup helper @@ -649,7 +621,7 @@ func TestSetLeadershipWithCurrentKeyExpired(t *testing.T) { // Initialize. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Setup helper @@ -724,7 +696,7 @@ func TestSetLeadershipWithMasterKeyChanged(t *testing.T) { // Initialize. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) keyFile2 := newTestKeyFile(t, re, testMasterKey2) leadership := newTestLeader(re, client) @@ -789,7 +761,7 @@ func TestSetLeadershipWithMasterKeyChanged(t *testing.T) { func TestSetLeadershipMasterKeyWithCiphertextKey(t *testing.T) { re := require.New(t) // Initialize. - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Setup helper @@ -867,7 +839,7 @@ func TestSetLeadershipWithEncryptionDisabling(t *testing.T) { // Initialize. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Setup helper @@ -923,7 +895,7 @@ func TestKeyRotation(t *testing.T) { // Initialize. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Setup helper @@ -1019,7 +991,7 @@ func TestKeyRotationConflict(t *testing.T) { // Initialize. ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := newTestEtcd(t, re) + client := newTestEtcd(t) keyFile := newTestKeyFile(t, re) leadership := newTestLeader(re, client) // Setup helper diff --git a/pkg/id/id_test.go b/pkg/id/id_test.go index 1b1632cc763..94f0670b979 100644 --- a/pkg/id/id_test.go +++ b/pkg/id/id_test.go @@ -22,8 +22,6 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/etcdutil" - "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/embed" ) const ( @@ -39,23 +37,11 @@ const ( // share rootPath and member val update their ids concurrently. func TestMultipleAllocator(t *testing.T) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) - - <-etcd.Server.ReadyNotify() + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() // Put memberValue to leaderPath to simulate an election success. - _, err = client.Put(context.Background(), leaderPath, memberVal) + _, err := client.Put(context.Background(), leaderPath, memberVal) re.NoError(err) wg := sync.WaitGroup{} diff --git a/pkg/mcs/discovery/discover_test.go b/pkg/mcs/discovery/discover_test.go index fed1d7844a0..2894dfa8d2d 100644 --- a/pkg/mcs/discovery/discover_test.go +++ b/pkg/mcs/discovery/discover_test.go @@ -21,28 +21,14 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/etcdutil" - "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/embed" ) func TestDiscover(t *testing.T) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) - - ep := cfg.LCUrls[0].String() - re.NoError(err) - - client, err := clientv3.NewFromURL(ep) - re.NoError(err) - - <-etcd.Server.ReadyNotify() + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() sr1 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:1", "127.0.0.1:1", 1) - err = sr1.Register() + err := sr1.Register() re.NoError(err) sr2 := NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1) err = sr2.Register() @@ -64,20 +50,8 @@ func TestDiscover(t *testing.T) { func TestServiceRegistryEntry(t *testing.T) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) - - ep := cfg.LCUrls[0].String() - re.NoError(err) - - client, err := clientv3.NewFromURL(ep) - re.NoError(err) - - <-etcd.Server.ReadyNotify() + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() entry1 := &ServiceRegistryEntry{ServiceAddr: "127.0.0.1:1"} s1, err := entry1.Serialize() re.NoError(err) diff --git a/pkg/mcs/discovery/register_test.go b/pkg/mcs/discovery/register_test.go index f27f3160afe..032b0558a79 100644 --- a/pkg/mcs/discovery/register_test.go +++ b/pkg/mcs/discovery/register_test.go @@ -28,18 +28,13 @@ import ( func TestRegister(t *testing.T) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - re.NoError(err) - ep := cfg.LCUrls[0].String() - client, err := clientv3.NewFromURL(ep) - re.NoError(err) - <-etcd.Server.ReadyNotify() + servers, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() + etcd, cfg := servers[0], servers[0].Config() // Test register with http prefix. sr := NewServiceRegister(context.Background(), client, "12345", "test_service", "http://127.0.0.1:1", "http://127.0.0.1:1", 10) - re.NoError(err) - err = sr.Register() + err := sr.Register() re.NoError(err) re.Equal("/ms/12345/test_service/registry/http://127.0.0.1:1", sr.key) resp, err := client.Get(context.Background(), sr.key) @@ -69,14 +64,13 @@ func TestRegister(t *testing.T) { etcd.Server.HardStop() // close the etcd to make the keepalive failed time.Sleep(etcdutil.DefaultDialTimeout) // ensure that the request is timeout etcd.Close() - etcd, err = embed.StartEtcd(cfg) + etcd, err = embed.StartEtcd(&cfg) re.NoError(err) <-etcd.Server.ReadyNotify() testutil.Eventually(re, func() bool { return getKeyAfterLeaseExpired(re, client, sr.key) == "127.0.0.1:2" }) } - etcd.Close() } func getKeyAfterLeaseExpired(re *require.Assertions, client *clientv3.Client, key string) string { diff --git a/pkg/storage/kv/kv_test.go b/pkg/storage/kv/kv_test.go index d2db558a748..93359934da1 100644 --- a/pkg/storage/kv/kv_test.go +++ b/pkg/storage/kv/kv_test.go @@ -24,21 +24,12 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/etcdutil" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/embed" ) func TestEtcd(t *testing.T) { re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - re.NoError(err) - defer etcd.Close() - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) kv := NewEtcdKVBase(client, rootPath) diff --git a/pkg/storage/storage_tso_test.go b/pkg/storage/storage_tso_test.go index 1dbba289512..9718565d78f 100644 --- a/pkg/storage/storage_tso_test.go +++ b/pkg/storage/storage_tso_test.go @@ -23,28 +23,14 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" - "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/embed" ) func TestSaveLoadTimestamp(t *testing.T) { re := require.New(t) - - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - re.NoError(err) - defer etcd.Close() - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) - rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) - storage := NewStorageWithEtcdBackend(client, rootPath) - + storage, clean := newTestStorage(t) + defer clean() expectedTS := time.Now().Round(0) - err = storage.SaveTimestamp(endpoint.TimestampKey, expectedTS) + err := storage.SaveTimestamp(endpoint.TimestampKey, expectedTS) re.NoError(err) ts, err := storage.LoadTimestamp("") re.NoError(err) @@ -53,27 +39,15 @@ func TestSaveLoadTimestamp(t *testing.T) { func TestGlobalLocalTimestamp(t *testing.T) { re := require.New(t) - - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - re.NoError(err) - defer etcd.Close() - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) - rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) - storage := NewStorageWithEtcdBackend(client, rootPath) - + storage, clean := newTestStorage(t) + defer clean() ltaKey := "lta" dc1LocationKey, dc2LocationKey := "dc1", "dc2" localTS1 := time.Now().Round(0) l1 := path.Join(ltaKey, dc1LocationKey, endpoint.TimestampKey) l2 := path.Join(ltaKey, dc2LocationKey, endpoint.TimestampKey) - err = storage.SaveTimestamp(l1, localTS1) + err := storage.SaveTimestamp(l1, localTS1) re.NoError(err) globalTS := time.Now().Round(0) err = storage.SaveTimestamp(endpoint.TimestampKey, globalTS) @@ -93,22 +67,10 @@ func TestGlobalLocalTimestamp(t *testing.T) { func TestTimestampTxn(t *testing.T) { re := require.New(t) - - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - re.NoError(err) - defer etcd.Close() - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - re.NoError(err) - rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) - storage := NewStorageWithEtcdBackend(client, rootPath) - + storage, clean := newTestStorage(t) + defer clean() globalTS1 := time.Now().Round(0) - err = storage.SaveTimestamp(endpoint.TimestampKey, globalTS1) + err := storage.SaveTimestamp(endpoint.TimestampKey, globalTS1) re.NoError(err) globalTS2 := globalTS1.Add(-time.Millisecond).Round(0) @@ -119,3 +81,9 @@ func TestTimestampTxn(t *testing.T) { re.NoError(err) re.Equal(globalTS1, ts) } + +func newTestStorage(t *testing.T) (Storage, func()) { + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) + return NewStorageWithEtcdBackend(client, rootPath), clean +} diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 1719b98c937..91c68a5a268 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -35,6 +35,7 @@ import ( "github.com/tikv/pd/pkg/mcs/discovery" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" @@ -67,7 +68,8 @@ func (suite *keyspaceGroupManagerTestSuite) SetupSuite() { t := suite.T() suite.ctx, suite.cancel = context.WithCancel(context.Background()) suite.ClusterID = rand.Uint64() - suite.backendEndpoints, suite.etcdClient, suite.clean = startEmbeddedEtcd(t) + servers, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + suite.backendEndpoints, suite.etcdClient, suite.clean = servers[0].Config().LCUrls[0].String(), client, clean suite.cfg = suite.createConfig() } diff --git a/pkg/tso/testutil.go b/pkg/tso/testutil.go index 9225b21dfac..6fc1ccc98a7 100644 --- a/pkg/tso/testutil.go +++ b/pkg/tso/testutil.go @@ -15,14 +15,9 @@ package tso import ( - "testing" "time" - "github.com/stretchr/testify/require" - "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/grpcutil" - "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/embed" ) var _ ServiceConfig = (*TestServiceConfig)(nil) @@ -90,23 +85,3 @@ func (c *TestServiceConfig) GetMaxResetTSGap() time.Duration { func (c *TestServiceConfig) GetTLSConfig() *grpcutil.TLSConfig { return c.TLSConfig } - -func startEmbeddedEtcd(t *testing.T) (backendEndpoint string, etcdClient *clientv3.Client, clean func()) { - re := require.New(t) - cfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - re.NoError(err) - clean = func() { - etcd.Close() - } - - backendEndpoint = cfg.LCUrls[0].String() - re.NoError(err) - - etcdClient, err = clientv3.NewFromURL(backendEndpoint) - re.NoError(err) - - <-etcd.Server.ReadyNotify() - - return -} diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index bb006f50da8..80194a6287e 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -29,7 +29,6 @@ import ( "time" "github.com/pingcap/failpoint" - "github.com/pingcap/log" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/utils/tempurl" @@ -48,23 +47,9 @@ func TestMain(m *testing.M) { func TestMemberHelpers(t *testing.T) { re := require.New(t) - cfg1 := NewTestSingleConfig(t) - etcd1, err := embed.StartEtcd(cfg1) - defer func() { - etcd1.Close() - }() - re.NoError(err) - - ep1 := cfg1.LCUrls[0].String() - client1, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep1}, - }) - defer func() { - client1.Close() - }() - re.NoError(err) - - <-etcd1.Server.ReadyNotify() + servers, client1, clean := NewTestEtcdCluster(t, 1) + defer clean() + etcd1, cfg1 := servers[0], servers[0].Config() // Test ListEtcdMembers listResp1, err := ListEtcdMembers(client1) @@ -74,21 +59,12 @@ func TestMemberHelpers(t *testing.T) { re.Equal(uint64(etcd1.Server.ID()), listResp1.Members[0].ID) // Test AddEtcdMember - etcd2 := checkAddEtcdMember(t, cfg1, client1) - cfg2 := etcd2.Config() + etcd2 := MustAddEtcdMember(t, &cfg1, client1) defer etcd2.Close() - ep2 := cfg2.LCUrls[0].String() - client2, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep2}, - }) - defer func() { - client2.Close() - }() - re.NoError(err) - checkMembers(re, client2, []*embed.Etcd{etcd1, etcd2}) + checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2}) // Test CheckClusterID - urlsMap, err := types.NewURLsMap(cfg2.InitialCluster) + urlsMap, err := types.NewURLsMap(etcd2.Config().InitialCluster) re.NoError(err) err = CheckClusterID(etcd1.Server.Cluster().ID(), urlsMap, &tls.Config{MinVersion: tls.VersionTLS12}) re.NoError(err) @@ -105,30 +81,15 @@ func TestMemberHelpers(t *testing.T) { func TestEtcdKVGet(t *testing.T) { re := require.New(t) - cfg := NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - defer func() { - client.Close() - }() - re.NoError(err) - - <-etcd.Server.ReadyNotify() + _, client, clean := NewTestEtcdCluster(t, 1) + defer clean() keys := []string{"test/key1", "test/key2", "test/key3", "test/key4", "test/key5"} vals := []string{"val1", "val2", "val3", "val4", "val5"} kv := clientv3.NewKV(client) for i := range keys { - _, err = kv.Put(context.TODO(), keys[i], vals[i]) + _, err := kv.Put(context.TODO(), keys[i], vals[i]) re.NoError(err) } @@ -158,25 +119,10 @@ func TestEtcdKVGet(t *testing.T) { func TestEtcdKVPutWithTTL(t *testing.T) { re := require.New(t) - cfg := NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - defer func() { - client.Close() - }() - re.NoError(err) - - <-etcd.Server.ReadyNotify() + _, client, clean := NewTestEtcdCluster(t, 1) + defer clean() - _, err = EtcdKVPutWithTTL(context.TODO(), client, "test/ttl1", "val1", 2) + _, err := EtcdKVPutWithTTL(context.TODO(), client, "test/ttl1", "val1", 2) re.NoError(err) _, err = EtcdKVPutWithTTL(context.TODO(), client, "test/ttl2", "val2", 4) re.NoError(err) @@ -201,24 +147,8 @@ func TestEtcdKVPutWithTTL(t *testing.T) { func TestInitClusterID(t *testing.T) { re := require.New(t) - cfg := NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(cfg) - defer func() { - etcd.Close() - }() - re.NoError(err) - - ep := cfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - defer func() { - client.Close() - }() - re.NoError(err) - - <-etcd.Server.ReadyNotify() - + _, client, clean := NewTestEtcdCluster(t, 1) + defer clean() pdClusterIDPath := "test/TestInitClusterID/pd/cluster_id" // Get any cluster key to parse the cluster ID. resp, err := EtcdKVGet(client, pdClusterIDPath) @@ -238,24 +168,12 @@ func TestEtcdClientSync(t *testing.T) { re := require.New(t) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)")) - // Start a etcd server. - cfg1 := NewTestSingleConfig(t) - etcd1, err := embed.StartEtcd(cfg1) - defer func() { - etcd1.Close() - }() - re.NoError(err) - - // Create a etcd client with etcd1 as endpoint. - client1, err := CreateEtcdClient(nil, cfg1.LCUrls) - defer func() { - client1.Close() - }() - re.NoError(err) - <-etcd1.Server.ReadyNotify() + servers, client1, clean := NewTestEtcdCluster(t, 1) + defer clean() + etcd1, cfg1 := servers[0], servers[0].Config() // Add a new member. - etcd2 := checkAddEtcdMember(t, cfg1, client1) + etcd2 := MustAddEtcdMember(t, &cfg1, client1) defer etcd2.Close() checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2}) testutil.Eventually(re, func() bool { @@ -264,7 +182,7 @@ func TestEtcdClientSync(t *testing.T) { }) // Remove the first member and close the etcd1. - _, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID())) + _, err := RemoveEtcdMember(client1, uint64(etcd1.Server.ID())) re.NoError(err) etcd1.Close() @@ -280,31 +198,21 @@ func TestEtcdClientSync(t *testing.T) { func TestEtcdScaleInAndOut(t *testing.T) { re := require.New(t) // Start a etcd server. - cfg1 := NewTestSingleConfig(t) - etcd1, err := embed.StartEtcd(cfg1) - defer func() { - etcd1.Close() - }() - re.NoError(err) - <-etcd1.Server.ReadyNotify() + servers, _, clean := NewTestEtcdCluster(t, 1) + defer clean() + etcd1, cfg1 := servers[0], servers[0].Config() // Create two etcd clients with etcd1 as endpoint. client1, err := CreateEtcdClient(nil, cfg1.LCUrls) // execute member change operation with this client - defer func() { - client1.Close() - }() re.NoError(err) + defer client1.Close() client2, err := CreateEtcdClient(nil, cfg1.LCUrls) // check member change with this client - defer func() { - client2.Close() - }() re.NoError(err) + defer client2.Close() // Add a new member and check members - etcd2 := checkAddEtcdMember(t, cfg1, client1) - defer func() { - etcd2.Close() - }() + etcd2 := MustAddEtcdMember(t, &cfg1, client1) + defer etcd2.Close() checkMembers(re, client2, []*embed.Etcd{etcd1, etcd2}) // scale in etcd1 @@ -317,29 +225,14 @@ func TestRandomKillEtcd(t *testing.T) { re := require.New(t) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick", "return(true)")) // Start a etcd server. - cfg1 := NewTestSingleConfig(t) - etcd1, err := embed.StartEtcd(cfg1) - re.NoError(err) - <-etcd1.Server.ReadyNotify() - client1, err := CreateEtcdClient(nil, cfg1.LCUrls) - re.NoError(err) - defer func() { - client1.Close() - }() - - etcd2 := checkAddEtcdMember(t, cfg1, client1) - cfg2 := etcd2.Config() - <-etcd2.Server.ReadyNotify() - - etcd3 := checkAddEtcdMember(t, &cfg2, client1) - <-etcd3.Server.ReadyNotify() - - time.Sleep(1 * time.Second) - re.Len(client1.Endpoints(), 3) + etcds, client1, clean := NewTestEtcdCluster(t, 3) + defer clean() + testutil.Eventually(re, func() bool { + return len(client1.Endpoints()) == 3 + }) // Randomly kill an etcd server and restart it - etcds := []*embed.Etcd{etcd1, etcd2, etcd3} - cfgs := []embed.Config{etcd1.Config(), etcd2.Config(), etcd3.Config()} + cfgs := []embed.Config{etcds[0].Config(), etcds[1].Config(), etcds[2].Config()} for i := 0; i < 10; i++ { killIndex := rand.Intn(len(etcds)) etcds[killIndex].Close() @@ -381,31 +274,24 @@ func TestEtcdWithHangLeaderEnableCheck(t *testing.T) { func checkEtcdWithHangLeader(t *testing.T) error { re := require.New(t) // Start a etcd server. - cfg1 := NewTestSingleConfig(t) - etcd1, err := embed.StartEtcd(cfg1) - defer func() { - etcd1.Close() - }() - re.NoError(err) - ep1 := cfg1.LCUrls[0].String() - <-etcd1.Server.ReadyNotify() + servers, _, clean := NewTestEtcdCluster(t, 1) + defer clean() + etcd1, cfg1 := servers[0], servers[0].Config() // Create a proxy to etcd1. proxyAddr := tempurl.Alloc() var enableDiscard atomic.Bool - go proxyWithDiscard(re, ep1, proxyAddr, &enableDiscard) + go proxyWithDiscard(re, cfg1.LCUrls[0].String(), proxyAddr, &enableDiscard) // Create a etcd client with etcd1 as endpoint. urls, err := types.NewURLs([]string{proxyAddr}) re.NoError(err) client1, err := CreateEtcdClient(nil, urls) - defer func() { - client1.Close() - }() re.NoError(err) + defer client1.Close() // Add a new member - etcd2 := checkAddEtcdMember(t, cfg1, client1) + etcd2 := MustAddEtcdMember(t, &cfg1, client1) defer etcd2.Close() checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2}) time.Sleep(1 * time.Second) // wait for etcd client sync endpoints @@ -417,40 +303,6 @@ func checkEtcdWithHangLeader(t *testing.T) error { return err } -func checkAddEtcdMember(t *testing.T, cfg1 *embed.Config, client *clientv3.Client) *embed.Etcd { - re := require.New(t) - cfg2 := NewTestSingleConfig(t) - cfg2.Name = genRandName() - cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0]) - cfg2.ClusterState = embed.ClusterStateFlagExisting - peerURL := cfg2.LPUrls[0].String() - addResp, err := AddEtcdMember(client, []string{peerURL}) - re.NoError(err) - etcd2, err := embed.StartEtcd(cfg2) - re.NoError(err) - re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID) - <-etcd2.Server.ReadyNotify() - return etcd2 -} - -func checkMembers(re *require.Assertions, client *clientv3.Client, etcds []*embed.Etcd) { - // Check the client can get the new member. - listResp, err := ListEtcdMembers(client) - re.NoError(err) - re.Len(listResp.Members, len(etcds)) - inList := func(m *etcdserverpb.Member) bool { - for _, etcd := range etcds { - if m.ID == uint64(etcd.Server.ID()) { - return true - } - } - return false - } - for _, m := range listResp.Members { - re.True(inList(m)) - } -} - func proxyWithDiscard(re *require.Assertions, server, proxy string, enableDiscard *atomic.Bool) { server = strings.TrimPrefix(server, "http://") proxy = strings.TrimPrefix(proxy, "http://") @@ -527,7 +379,7 @@ func (suite *loopWatcherTestSuite) SetupSuite() { suite.ctx, suite.cancel = context.WithCancel(context.Background()) suite.cleans = make([]func(), 0) // Start a etcd server and create a client with etcd1 as endpoint. - suite.config = NewTestSingleConfig(t) + suite.config = newTestSingleConfig(t) suite.startEtcd() suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls) suite.NoError(err) @@ -771,13 +623,8 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { func (suite *loopWatcherTestSuite) TestWatcherRequestProgress() { checkWatcherRequestProgress := func(injectWatchChanBlock bool) { - tempStdoutFile, _ := os.CreateTemp("/tmp", "pd_tests") - defer os.RemoveAll(tempStdoutFile.Name()) - cfg := &log.Config{} - cfg.File.Filename = tempStdoutFile.Name() - cfg.Level = "debug" - lg, p, _ := log.InitLogger(cfg) - log.ReplaceGlobals(lg, p) + fname := testutil.InitTempFileLogger("debug") + defer os.RemoveAll(fname) watcher := NewLoopWatcher( suite.ctx, @@ -799,14 +646,14 @@ func (suite *loopWatcherTestSuite) TestWatcherRequestProgress() { if injectWatchChanBlock { failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/watchChanBlock", "return(true)") testutil.Eventually(suite.Require(), func() bool { - b, _ := os.ReadFile(tempStdoutFile.Name()) + b, _ := os.ReadFile(fname) l := string(b) return strings.Contains(l, "watch channel is blocked for a long time") }) failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/watchChanBlock") } else { testutil.Eventually(suite.Require(), func() bool { - b, _ := os.ReadFile(tempStdoutFile.Name()) + b, _ := os.ReadFile(fname) l := string(b) return strings.Contains(l, "watcher receives progress notify in watch loop") }) diff --git a/pkg/utils/etcdutil/testutil.go b/pkg/utils/etcdutil/testutil.go index 971e93e1ed6..54ba38b93b6 100644 --- a/pkg/utils/etcdutil/testutil.go +++ b/pkg/utils/etcdutil/testutil.go @@ -21,12 +21,16 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/tempurl" + "github.com/tikv/pd/pkg/utils/testutil" + "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" + "go.etcd.io/etcd/etcdserver/etcdserverpb" ) -// NewTestSingleConfig is used to create a etcd config for the unit test purpose. -func NewTestSingleConfig(t *testing.T) *embed.Config { +// newTestSingleConfig is used to create a etcd config for the unit test purpose. +func newTestSingleConfig(t *testing.T) *embed.Config { cfg := embed.NewConfig() cfg.Name = genRandName() cfg.Dir = t.TempDir() @@ -50,3 +54,85 @@ func NewTestSingleConfig(t *testing.T) *embed.Config { func genRandName() string { return "test_etcd_" + strconv.FormatInt(time.Now().UnixNano()%10000, 10) } + +// NewTestEtcdCluster is used to create a etcd cluster for the unit test purpose. +func NewTestEtcdCluster(t *testing.T, count int) (servers []*embed.Etcd, etcdClient *clientv3.Client, clean func()) { + re := require.New(t) + servers = make([]*embed.Etcd, 0, count) + + cfg := newTestSingleConfig(t) + etcd, err := embed.StartEtcd(cfg) + re.NoError(err) + etcdClient, err = CreateEtcdClient(nil, cfg.LCUrls) + re.NoError(err) + <-etcd.Server.ReadyNotify() + servers = append(servers, etcd) + + for i := 1; i < count; i++ { + // Check the client can get the new member. + listResp, err := ListEtcdMembers(etcdClient) + re.NoError(err) + re.Len(listResp.Members, i) + // Add a new member. + etcd2 := MustAddEtcdMember(t, cfg, etcdClient) + cfg2 := etcd2.Config() + cfg = &cfg2 + <-etcd2.Server.ReadyNotify() + servers = append(servers, etcd2) + } + + checkMembers(re, etcdClient, servers) + + clean = func() { + etcdClient.Close() + for _, server := range servers { + if server != nil { + server.Close() + } + } + } + + return +} + +// MustAddEtcdMember is used to add a new etcd member to the cluster for test. +func MustAddEtcdMember(t *testing.T, cfg1 *embed.Config, client *clientv3.Client) *embed.Etcd { + re := require.New(t) + cfg2 := newTestSingleConfig(t) + cfg2.Name = genRandName() + cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0]) + cfg2.ClusterState = embed.ClusterStateFlagExisting + peerURL := cfg2.LPUrls[0].String() + addResp, err := AddEtcdMember(client, []string{peerURL}) + re.NoError(err) + // Check the client can get the new member. + testutil.Eventually(re, func() bool { + members, err := ListEtcdMembers(client) + re.NoError(err) + return len(addResp.Members) == len(members.Members) + }) + // Start the new etcd member. + etcd2, err := embed.StartEtcd(cfg2) + re.NoError(err) + re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID) + <-etcd2.Server.ReadyNotify() + return etcd2 +} + +func checkMembers(re *require.Assertions, client *clientv3.Client, etcds []*embed.Etcd) { + // Check the client can get the new member. + listResp, err := ListEtcdMembers(client) + re.NoError(err) + re.Len(listResp.Members, len(etcds)) + inList := func(m *etcdserverpb.Member) bool { + for _, etcd := range etcds { + if m.ID == uint64(etcd.Server.ID()) { + return true + } + } + return false + } + for _, m := range listResp.Members { + re.True(inList(m)) + } +} diff --git a/pkg/utils/testutil/testutil.go b/pkg/utils/testutil/testutil.go index 7d31f2263c6..a48db0bd60f 100644 --- a/pkg/utils/testutil/testutil.go +++ b/pkg/utils/testutil/testutil.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" "github.com/stretchr/testify/require" "google.golang.org/grpc" ) @@ -86,3 +87,16 @@ func CleanServer(dataDir string) { // Clean data directory os.RemoveAll(dataDir) } + +// InitTempFileLogger initializes the logger and redirects the log output to a temporary file. +func InitTempFileLogger(level string) (fname string) { + cfg := &log.Config{} + f, _ := os.CreateTemp("/tmp", "pd_tests") + fname = f.Name() + f.Close() + cfg.File.Filename = fname + cfg.Level = level + lg, p, _ := log.InitLogger(cfg) + log.ReplaceGlobals(lg, p) + return fname +} diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 54603192abe..c9c86a9a53f 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/pingcap/log" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" @@ -453,13 +452,8 @@ func (suite *middlewareTestSuite) TestAuditPrometheusBackend() { } func (suite *middlewareTestSuite) TestAuditLocalLogBackend() { - tempStdoutFile, _ := os.CreateTemp("/tmp", "pd_tests") - defer os.RemoveAll(tempStdoutFile.Name()) - cfg := &log.Config{} - cfg.File.Filename = tempStdoutFile.Name() - cfg.Level = "info" - lg, p, _ := log.InitLogger(cfg) - log.ReplaceGlobals(lg, p) + fname := testutil.InitTempFileLogger("info") + defer os.RemoveAll(fname) leader := suite.cluster.GetServer(suite.cluster.GetLeader()) input := map[string]interface{}{ "enable-audit": "true", @@ -477,7 +471,7 @@ func (suite *middlewareTestSuite) TestAuditLocalLogBackend() { suite.NoError(err) _, err = io.ReadAll(resp.Body) resp.Body.Close() - b, _ := os.ReadFile(tempStdoutFile.Name()) + b, _ := os.ReadFile(fname) suite.Contains(string(b), "audit log") suite.NoError(err) suite.Equal(http.StatusOK, resp.StatusCode) diff --git a/tools/pd-backup/pdbackup/backup_test.go b/tools/pd-backup/pdbackup/backup_test.go index 7f77ebc104f..40e4190f5d4 100644 --- a/tools/pd-backup/pdbackup/backup_test.go +++ b/tools/pd-backup/pdbackup/backup_test.go @@ -14,7 +14,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/storage/endpoint" @@ -48,14 +47,12 @@ type backupTestSuite struct { } func TestBackupTestSuite(t *testing.T) { - re := require.New(t) - - etcd, etcdClient, err := setupEtcd(t) - re.NoError(err) + servers, etcdClient, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() server, serverConfig := setupServer() testSuite := &backupTestSuite{ - etcd: etcd, + etcd: servers[0], etcdClient: etcdClient, server: server, serverConfig: serverConfig, @@ -64,24 +61,6 @@ func TestBackupTestSuite(t *testing.T) { suite.Run(t, testSuite) } -func setupEtcd(t *testing.T) (*embed.Etcd, *clientv3.Client, error) { - etcdCfg := etcdutil.NewTestSingleConfig(t) - etcd, err := embed.StartEtcd(etcdCfg) - if err != nil { - return nil, nil, err - } - - ep := etcdCfg.LCUrls[0].String() - client, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - }) - if err != nil { - return nil, nil, err - } - - return etcd, client, nil -} - func setupServer() (*httptest.Server, *config.Config) { serverConfig := &config.Config{ ClientUrls: "example.com:2379", @@ -120,10 +99,6 @@ func setupServer() (*httptest.Server, *config.Config) { } func (s *backupTestSuite) BeforeTest(suiteName, testName string) { - // the etcd server is set up in TestBackupTestSuite() before the test suite - // runs - <-s.etcd.Server.ReadyNotify() - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel()