From 96ace89decdc0b5e0a050aa17ba4356057ec3b88 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 21 Sep 2023 17:12:15 +0800 Subject: [PATCH] tests: refactor and make pd-ctl helper support mcs (#7120) ref tikv/pd#5839 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- tests/autoscaling/autoscaling_test.go | 2 +- tests/cluster.go | 20 ++++++ tests/compatibility/version_upgrade_test.go | 6 +- tests/dashboard/service_test.go | 2 +- tests/integrations/client/client_test.go | 12 ++-- .../mcs/keyspace/tso_keyspace_group_test.go | 2 +- .../resourcemanager/resource_manager_test.go | 4 +- tests/integrations/mcs/scheduling/api_test.go | 2 +- .../mcs/tso/keyspace_group_manager_test.go | 4 +- tests/integrations/tso/client_test.go | 2 +- tests/pdctl/cluster/cluster_test.go | 2 +- tests/pdctl/config/config_test.go | 36 ++++------ tests/pdctl/health/health_test.go | 2 +- tests/pdctl/helper.go | 64 ----------------- tests/pdctl/hot/hot_test.go | 36 +++++----- tests/pdctl/keyspace/keyspace_group_test.go | 16 ++--- tests/pdctl/keyspace/keyspace_test.go | 2 +- tests/pdctl/label/label_test.go | 4 +- tests/pdctl/log/log_test.go | 9 +-- tests/pdctl/member/member_test.go | 2 +- tests/pdctl/operator/operator_test.go | 10 +-- tests/pdctl/region/region_test.go | 18 ++--- tests/pdctl/scheduler/scheduler_test.go | 16 ++--- tests/pdctl/store/store_test.go | 14 ++-- tests/pdctl/unsafe/unsafe_operation_test.go | 2 +- tests/registry/registry_test.go | 4 +- tests/server/api/api_test.go | 61 ++++++++-------- tests/server/apiv2/handlers/keyspace_test.go | 2 +- .../apiv2/handlers/tso_keyspace_group_test.go | 2 +- tests/server/cluster/cluster_test.go | 42 +++++------ tests/server/cluster/cluster_work_test.go | 6 +- tests/server/config/config_test.go | 4 +- tests/server/id/id_test.go | 12 ++-- tests/server/keyspace/keyspace_test.go | 2 +- tests/server/member/member_test.go | 2 +- .../region_syncer/region_syncer_test.go | 10 +-- .../server/storage/hot_region_storage_test.go | 29 ++++---- tests/server/tso/consistency_test.go | 10 +-- tests/server/tso/global_tso_test.go | 4 +- tests/server/tso/tso_test.go | 4 +- tests/server/watch/leader_watch_test.go | 4 +- tests/testutil.go | 72 +++++++++++++++++++ 42 files changed, 289 insertions(+), 270 deletions(-) diff --git a/tests/autoscaling/autoscaling_test.go b/tests/autoscaling/autoscaling_test.go index 55e29297dbd..663bc92f562 100644 --- a/tests/autoscaling/autoscaling_test.go +++ b/tests/autoscaling/autoscaling_test.go @@ -42,7 +42,7 @@ func TestAPI(t *testing.T) { re.NoError(err) cluster.WaitLeader() - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) var jsonStr = []byte(` diff --git a/tests/cluster.go b/tests/cluster.go index ce8293531cd..c49f3cd982d 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -33,6 +33,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/keyspace" + scheduling "github.com/tikv/pd/pkg/mcs/scheduling/server" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/swaggerserver" @@ -447,6 +448,7 @@ type TestCluster struct { sync.Mutex pool map[uint64]struct{} } + schedulingCluster *TestSchedulingCluster } // ConfigOption is used to define customize settings in test. @@ -629,6 +631,11 @@ func (c *TestCluster) GetFollower() string { return "" } +// GetLeaderServer returns the leader server of all servers +func (c *TestCluster) GetLeaderServer() *TestServer { + return c.GetServer(c.GetLeader()) +} + // WaitLeader is used to get leader. // If it exceeds the maximum number of loops, it will return an empty string. func (c *TestCluster) WaitLeader(ops ...WaitOption) string { @@ -853,6 +860,19 @@ func (c *TestCluster) CheckTSOUnique(ts uint64) bool { return true } +// GetSchedulingPrimaryServer returns the scheduling primary server. +func (c *TestCluster) GetSchedulingPrimaryServer() *scheduling.Server { + if c.schedulingCluster == nil { + return nil + } + return c.schedulingCluster.GetPrimaryServer() +} + +// SetSchedulingCluster sets the scheduling cluster. +func (c *TestCluster) SetSchedulingCluster(cluster *TestSchedulingCluster) { + c.schedulingCluster = cluster +} + // WaitOp represent the wait configuration type WaitOp struct { retryTimes int diff --git a/tests/compatibility/version_upgrade_test.go b/tests/compatibility/version_upgrade_test.go index 11573e6da2f..8979d85c9bb 100644 --- a/tests/compatibility/version_upgrade_test.go +++ b/tests/compatibility/version_upgrade_test.go @@ -38,7 +38,7 @@ func TestStoreRegister(t *testing.T) { err = cluster.RunInitialServers() re.NoError(err) cluster.WaitLeader() - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) putStoreRequest := &pdpb.PutStoreRequest{ @@ -63,7 +63,7 @@ func TestStoreRegister(t *testing.T) { re.NoError(err) cluster.WaitLeader() - leaderServer = cluster.GetServer(cluster.GetLeader()) + leaderServer = cluster.GetLeaderServer() re.NotNil(leaderServer) newVersion := leaderServer.GetClusterVersion() re.Equal(version, newVersion) @@ -92,7 +92,7 @@ func TestRollingUpgrade(t *testing.T) { err = cluster.RunInitialServers() re.NoError(err) cluster.WaitLeader() - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) stores := []*pdpb.PutStoreRequest{ diff --git a/tests/dashboard/service_test.go b/tests/dashboard/service_test.go index f75e047d8f1..ab3a2c431cb 100644 --- a/tests/dashboard/service_test.go +++ b/tests/dashboard/service_test.go @@ -134,7 +134,7 @@ func (suite *dashboardTestSuite) testDashboard(internalProxy bool) { cluster.WaitLeader() servers := cluster.GetServers() - leader := cluster.GetServer(cluster.GetLeader()) + leader := cluster.GetLeaderServer() leaderAddr := leader.GetAddr() // auto select node diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index d669e17af90..9cabbb03090 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -347,7 +347,7 @@ func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) { go getTsoFunc() go func() { defer wg.Done() - leader := cluster.GetServer(cluster.GetLeader()) + leader := cluster.GetLeaderServer() leader.Stop() re.NotEmpty(cluster.WaitLeader()) leaderReadyTime = time.Now() @@ -362,7 +362,7 @@ func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) { go getTsoFunc() go func() { defer wg.Done() - leader := cluster.GetServer(cluster.GetLeader()) + leader := cluster.GetLeaderServer() re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)")) leader.Stop() re.NotEmpty(cluster.WaitLeader()) @@ -596,7 +596,7 @@ func TestGetTsoFromFollowerClient2(t *testing.T) { }) lastTS = checkTS(re, cli, lastTS) - re.NoError(cluster.GetServer(cluster.GetLeader()).ResignLeader()) + re.NoError(cluster.GetLeaderServer().ResignLeader()) re.NotEmpty(cluster.WaitLeader()) lastTS = checkTS(re, cli, lastTS) @@ -622,7 +622,7 @@ func runServer(re *require.Assertions, cluster *tests.TestCluster) []string { err := cluster.RunInitialServers() re.NoError(err) re.NotEmpty(cluster.WaitLeader()) - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) testServers := cluster.GetServers() @@ -1439,7 +1439,7 @@ func TestPutGet(t *testing.T) { getResp, err = client.Get(context.Background(), key) re.NoError(err) re.Equal([]byte("2"), getResp.GetKvs()[0].Value) - s := cluster.GetServer(cluster.GetLeader()) + s := cluster.GetLeaderServer() // use etcd client delete the key _, err = s.GetEtcdClient().Delete(context.Background(), string(key)) re.NoError(err) @@ -1459,7 +1459,7 @@ func TestClientWatchWithRevision(t *testing.T) { endpoints := runServer(re, cluster) client := setupCli(re, ctx, endpoints) defer client.Close() - s := cluster.GetServer(cluster.GetLeader()) + s := cluster.GetLeaderServer() watchPrefix := "watch_test" defer func() { _, err := s.GetEtcdClient().Delete(context.Background(), watchPrefix+"test") diff --git a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go index 59aabb260ae..af7b31553b3 100644 --- a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go +++ b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go @@ -62,7 +62,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() { suite.NoError(err) suite.NoError(cluster.RunInitialServers()) suite.NotEmpty(cluster.WaitLeader()) - suite.server = cluster.GetServer(cluster.GetLeader()) + suite.server = cluster.GetLeaderServer() suite.NoError(suite.server.BootstrapCluster()) suite.backendEndpoints = suite.server.GetAddr() suite.dialClient = &http.Client{ diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 546339bee0f..0be18d1bbd3 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -903,7 +903,7 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { // Test Resource Group CURD via HTTP finalNum = 1 getAddr := func(i int) string { - server := suite.cluster.GetServer(suite.cluster.GetLeader()) + server := suite.cluster.GetLeaderServer() if i%2 == 1 { server = suite.cluster.GetServer(suite.cluster.GetFollower()) } @@ -1298,7 +1298,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh } getAddr := func() string { - server := suite.cluster.GetServer(suite.cluster.GetLeader()) + server := suite.cluster.GetLeaderServer() if rand.Intn(100)%2 == 1 { server = suite.cluster.GetServer(suite.cluster.GetFollower()) } diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index 04671d84798..311c8a3fbed 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -45,7 +45,7 @@ func (suite *apiTestSuite) SetupSuite() { suite.NoError(err) suite.NoError(cluster.RunInitialServers()) suite.NotEmpty(cluster.WaitLeader()) - suite.server = cluster.GetServer(cluster.GetLeader()) + suite.server = cluster.GetLeaderServer() suite.NoError(suite.server.BootstrapCluster()) suite.backendEndpoints = suite.server.GetAddr() suite.dialClient = &http.Client{ diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 3d3fe25b372..d1a4cf35db4 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -517,7 +517,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { re.NoError(err) defer tc.Destroy() tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, pdAddr) @@ -711,7 +711,7 @@ func TestGetTSOImmediately(t *testing.T) { re.NoError(err) defer tc.Destroy() tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, pdAddr) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 1d2f437e849..63243214e81 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -389,7 +389,7 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() { if !suite.legacy { suite.tsoCluster.WaitForDefaultPrimaryServing(re).Close() } else { - suite.cluster.GetServer(suite.cluster.GetLeader()).GetServer().Close() + suite.cluster.GetLeaderServer().GetServer().Close() } time.Sleep(time.Duration(n) * time.Second) } diff --git a/tests/pdctl/cluster/cluster_test.go b/tests/pdctl/cluster/cluster_test.go index 2b8b8bc8f59..cd4ec6e1391 100644 --- a/tests/pdctl/cluster/cluster_test.go +++ b/tests/pdctl/cluster/cluster_test.go @@ -39,7 +39,7 @@ func TestClusterAndPing(t *testing.T) { err = cluster.RunInitialServers() re.NoError(err) cluster.WaitLeader() - err = cluster.GetServer(cluster.GetLeader()).BootstrapCluster() + err = cluster.GetLeaderServer().BootstrapCluster() re.NoError(err) pdAddr := cluster.GetConfig().GetClientURL() i := strings.Index(pdAddr, "//") diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index f43a964b50c..6ed0841bf74 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -64,10 +64,10 @@ func TestConfig(t *testing.T) { Id: 1, State: metapb.StoreState_Up, } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) svr := leaderServer.GetServer() - pdctl.MustPutStore(re, svr, store) + tests.MustPutStore(re, cluster, store) defer cluster.Destroy() // config show @@ -300,10 +300,9 @@ func TestPlacementRules(t *testing.T) { State: metapb.StoreState_Up, LastHeartbeat: time.Now().UnixNano(), } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) - svr := leaderServer.GetServer() - pdctl.MustPutStore(re, svr, store) + tests.MustPutStore(re, cluster, store) defer cluster.Destroy() output, err := pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "placement-rules", "enable") @@ -358,7 +357,7 @@ func TestPlacementRules(t *testing.T) { re.Equal([2]string{"pd", "test1"}, rules2[1].Key()) // test rule region detail - pdctl.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b")) + tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b")) fit := &placement.RegionFit{} // need clear up args, so create new a cobra.Command. Otherwise gourp still exists. cmd2 := pdctlCmd.GetRootCmd() @@ -398,10 +397,9 @@ func TestPlacementRuleGroups(t *testing.T) { State: metapb.StoreState_Up, LastHeartbeat: time.Now().UnixNano(), } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) - svr := leaderServer.GetServer() - pdctl.MustPutStore(re, svr, store) + tests.MustPutStore(re, cluster, store) defer cluster.Destroy() output, err := pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "placement-rules", "enable") @@ -473,10 +471,9 @@ func TestPlacementRuleBundle(t *testing.T) { State: metapb.StoreState_Up, LastHeartbeat: time.Now().UnixNano(), } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) - svr := leaderServer.GetServer() - pdctl.MustPutStore(re, svr, store) + tests.MustPutStore(re, cluster, store) defer cluster.Destroy() output, err := pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "placement-rules", "enable") @@ -609,10 +606,9 @@ func TestReplicationMode(t *testing.T) { State: metapb.StoreState_Up, LastHeartbeat: time.Now().UnixNano(), } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) - svr := leaderServer.GetServer() - pdctl.MustPutStore(re, svr, store) + tests.MustPutStore(re, cluster, store) defer cluster.Destroy() conf := config.ReplicationModeConfig{ @@ -668,10 +664,9 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { Id: 1, State: metapb.StoreState_Up, } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) - svr := leaderServer.GetServer() - pdctl.MustPutStore(re, svr, store) + tests.MustPutStore(re, cluster, store) defer cluster.Destroy() checkMaxReplicas := func(expect uint64) { @@ -813,10 +808,9 @@ func TestPDServerConfig(t *testing.T) { State: metapb.StoreState_Up, LastHeartbeat: time.Now().UnixNano(), } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) - svr := leaderServer.GetServer() - pdctl.MustPutStore(re, svr, store) + tests.MustPutStore(re, cluster, store) defer cluster.Destroy() output, err := pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "show", "server") diff --git a/tests/pdctl/health/health_test.go b/tests/pdctl/health/health_test.go index bc808a36750..748250babe4 100644 --- a/tests/pdctl/health/health_test.go +++ b/tests/pdctl/health/health_test.go @@ -36,7 +36,7 @@ func TestHealth(t *testing.T) { err = tc.RunInitialServers() re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) pdAddr := tc.GetConfig().GetClientURL() cmd := pdctlCmd.GetRootCmd() diff --git a/tests/pdctl/helper.go b/tests/pdctl/helper.go index d7d6a858497..3912cdfef7c 100644 --- a/tests/pdctl/helper.go +++ b/tests/pdctl/helper.go @@ -16,21 +16,13 @@ package pdctl import ( "bytes" - "context" - "fmt" "sort" - "github.com/docker/go-units" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/pdpb" "github.com/spf13/cobra" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/utils/typeutil" - "github.com/tikv/pd/pkg/versioninfo" - "github.com/tikv/pd/server" "github.com/tikv/pd/server/api" - "github.com/tikv/pd/tests" ) // ExecuteCommand is used for test purpose. @@ -89,59 +81,3 @@ func CheckRegionsInfo(re *require.Assertions, output *api.RegionsInfo, expected CheckRegionInfo(re, &got[i], region) } } - -// MustPutStore is used for test purpose. -func MustPutStore(re *require.Assertions, svr *server.Server, store *metapb.Store) { - store.Address = fmt.Sprintf("tikv%d", store.GetId()) - if len(store.Version) == 0 { - store.Version = versioninfo.MinSupportedVersion(versioninfo.Version2_0).String() - } - grpcServer := &server.GrpcServer{Server: svr} - _, err := grpcServer.PutStore(context.Background(), &pdpb.PutStoreRequest{ - Header: &pdpb.RequestHeader{ClusterId: svr.ClusterID()}, - Store: store, - }) - re.NoError(err) - - storeInfo := grpcServer.GetRaftCluster().GetStore(store.GetId()) - newStore := storeInfo.Clone(core.SetStoreStats(&pdpb.StoreStats{ - Capacity: uint64(10 * units.GiB), - UsedSize: uint64(9 * units.GiB), - Available: uint64(1 * units.GiB), - })) - grpcServer.GetRaftCluster().GetBasicCluster().PutStore(newStore) -} - -// MustPutRegion is used for test purpose. -func MustPutRegion(re *require.Assertions, cluster *tests.TestCluster, regionID, storeID uint64, start, end []byte, opts ...core.RegionCreateOption) *core.RegionInfo { - leader := &metapb.Peer{ - Id: regionID, - StoreId: storeID, - } - metaRegion := &metapb.Region{ - Id: regionID, - StartKey: start, - EndKey: end, - Peers: []*metapb.Peer{leader}, - RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, - } - r := core.NewRegionInfo(metaRegion, leader, opts...) - err := cluster.HandleRegionHeartbeat(r) - re.NoError(err) - return r -} - -// MustReportBuckets is used for test purpose. -func MustReportBuckets(re *require.Assertions, cluster *tests.TestCluster, regionID uint64, start, end []byte, stats *metapb.BucketStats) *metapb.Buckets { - buckets := &metapb.Buckets{ - RegionId: regionID, - Version: 1, - Keys: [][]byte{start, end}, - Stats: stats, - // report buckets interval is 10s - PeriodInMs: 10000, - } - err := cluster.HandleReportBuckets(buckets) - re.NoError(err) - return buckets -} diff --git a/tests/pdctl/hot/hot_test.go b/tests/pdctl/hot/hot_test.go index 352b891c092..359d89199c9 100644 --- a/tests/pdctl/hot/hot_test.go +++ b/tests/pdctl/hot/hot_test.go @@ -63,10 +63,10 @@ func TestHot(t *testing.T) { Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}, } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) - pdctl.MustPutStore(re, leaderServer.GetServer(), store1) - pdctl.MustPutStore(re, leaderServer.GetServer(), store2) + tests.MustPutStore(re, cluster, store1) + tests.MustPutStore(re, cluster, store2) defer cluster.Destroy() // test hot store @@ -159,7 +159,7 @@ func TestHot(t *testing.T) { } testHot(hotRegionID, hotStoreID, "read") case "write": - pdctl.MustPutRegion( + tests.MustPutRegion( re, cluster, hotRegionID, hotStoreID, []byte("c"), []byte("d"), @@ -222,16 +222,16 @@ func TestHotWithStoreID(t *testing.T) { }, } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) for _, store := range stores { - pdctl.MustPutStore(re, leaderServer.GetServer(), store) + tests.MustPutStore(re, cluster, store) } defer cluster.Destroy() - pdctl.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(3000000000), core.SetReportInterval(0, utils.RegionHeartBeatReportInterval)) - pdctl.MustPutRegion(re, cluster, 2, 2, []byte("c"), []byte("d"), core.SetWrittenBytes(6000000000), core.SetReportInterval(0, utils.RegionHeartBeatReportInterval)) - pdctl.MustPutRegion(re, cluster, 3, 1, []byte("e"), []byte("f"), core.SetWrittenBytes(9000000000), core.SetReportInterval(0, utils.RegionHeartBeatReportInterval)) + tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(3000000000), core.SetReportInterval(0, utils.RegionHeartBeatReportInterval)) + tests.MustPutRegion(re, cluster, 2, 2, []byte("c"), []byte("d"), core.SetWrittenBytes(6000000000), core.SetReportInterval(0, utils.RegionHeartBeatReportInterval)) + tests.MustPutRegion(re, cluster, 3, 1, []byte("e"), []byte("f"), core.SetWrittenBytes(9000000000), core.SetReportInterval(0, utils.RegionHeartBeatReportInterval)) // wait hot scheduler starts rc := leaderServer.GetRaftCluster() testutil.Eventually(re, func() bool { @@ -267,7 +267,7 @@ func TestHotWithStoreID(t *testing.T) { WriteBytes: []uint64{13 * units.MiB}, WriteQps: []uint64{0}, } - buckets := pdctl.MustReportBuckets(re, cluster, 1, []byte("a"), []byte("b"), stats) + buckets := tests.MustReportBuckets(re, cluster, 1, []byte("a"), []byte("b"), stats) args = []string{"-u", pdAddr, "hot", "buckets", "1"} output, err = pdctl.ExecuteCommand(cmd, args...) re.NoError(err) @@ -330,20 +330,20 @@ func TestHistoryHotRegions(t *testing.T) { }, } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) for _, store := range stores { - pdctl.MustPutStore(re, leaderServer.GetServer(), store) + tests.MustPutStore(re, cluster, store) } defer cluster.Destroy() startTime := time.Now().Unix() - pdctl.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(3000000000), + tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(3000000000), core.SetReportInterval(uint64(startTime-utils.RegionHeartBeatReportInterval), uint64(startTime))) - pdctl.MustPutRegion(re, cluster, 2, 2, []byte("c"), []byte("d"), core.SetWrittenBytes(6000000000), + tests.MustPutRegion(re, cluster, 2, 2, []byte("c"), []byte("d"), core.SetWrittenBytes(6000000000), core.SetReportInterval(uint64(startTime-utils.RegionHeartBeatReportInterval), uint64(startTime))) - pdctl.MustPutRegion(re, cluster, 3, 1, []byte("e"), []byte("f"), core.SetWrittenBytes(9000000000), + tests.MustPutRegion(re, cluster, 3, 1, []byte("e"), []byte("f"), core.SetWrittenBytes(9000000000), core.SetReportInterval(uint64(startTime-utils.RegionHeartBeatReportInterval), uint64(startTime))) - pdctl.MustPutRegion(re, cluster, 4, 3, []byte("g"), []byte("h"), core.SetWrittenBytes(9000000000), + tests.MustPutRegion(re, cluster, 4, 3, []byte("g"), []byte("h"), core.SetWrittenBytes(9000000000), core.SetReportInterval(uint64(startTime-utils.RegionHeartBeatReportInterval), uint64(startTime))) // wait hot scheduler starts testutil.Eventually(re, func() bool { @@ -440,10 +440,10 @@ func TestHotWithoutHotPeer(t *testing.T) { }, } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) for _, store := range stores { - pdctl.MustPutStore(re, leaderServer.GetServer(), store) + tests.MustPutStore(re, cluster, store) } timestamp := uint64(time.Now().UnixNano()) load := 1024.0 diff --git a/tests/pdctl/keyspace/keyspace_group_test.go b/tests/pdctl/keyspace/keyspace_group_test.go index 105e860ad17..0b09550d967 100644 --- a/tests/pdctl/keyspace/keyspace_group_test.go +++ b/tests/pdctl/keyspace/keyspace_group_test.go @@ -44,7 +44,7 @@ func TestKeyspaceGroup(t *testing.T) { err = tc.RunInitialServers() re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) pdAddr := tc.GetConfig().GetClientURL() cmd := pdctlCmd.GetRootCmd() @@ -113,7 +113,7 @@ func TestSplitKeyspaceGroup(t *testing.T) { cmd := pdctlCmd.GetRootCmd() tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) // split keyspace group. @@ -164,7 +164,7 @@ func TestExternalAllocNodeWhenStart(t *testing.T) { cmd := pdctlCmd.GetRootCmd() tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) // check keyspace group information. @@ -207,7 +207,7 @@ func TestSetNodeAndPriorityKeyspaceGroup(t *testing.T) { cmd := pdctlCmd.GetRootCmd() tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) // set-node keyspace group. @@ -309,7 +309,7 @@ func TestMergeKeyspaceGroup(t *testing.T) { cmd := pdctlCmd.GetRootCmd() tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) // split keyspace group. @@ -427,7 +427,7 @@ func TestKeyspaceGroupState(t *testing.T) { cmd := pdctlCmd.GetRootCmd() tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) // split keyspace group. @@ -518,7 +518,7 @@ func TestShowKeyspaceGroupPrimary(t *testing.T) { cmd := pdctlCmd.GetRootCmd() tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) defaultKeyspaceGroupID := fmt.Sprintf("%d", utils.DefaultKeyspaceGroupID) @@ -600,7 +600,7 @@ func TestInPDMode(t *testing.T) { pdAddr := tc.GetConfig().GetClientURL() cmd := pdctlCmd.GetRootCmd() tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) argsList := [][]string{ diff --git a/tests/pdctl/keyspace/keyspace_test.go b/tests/pdctl/keyspace/keyspace_test.go index a0bab4114df..57acdc86c70 100644 --- a/tests/pdctl/keyspace/keyspace_test.go +++ b/tests/pdctl/keyspace/keyspace_test.go @@ -58,7 +58,7 @@ func TestKeyspace(t *testing.T) { cmd := pdctlCmd.GetRootCmd() tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) defaultKeyspaceGroupID := fmt.Sprintf("%d", utils.DefaultKeyspaceGroupID) diff --git a/tests/pdctl/label/label_test.go b/tests/pdctl/label/label_test.go index ba31b1fb1d1..9c64933a127 100644 --- a/tests/pdctl/label/label_test.go +++ b/tests/pdctl/label/label_test.go @@ -92,11 +92,11 @@ func TestLabel(t *testing.T) { }, }, } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) for _, store := range stores { - pdctl.MustPutStore(re, leaderServer.GetServer(), store.Store.Store) + tests.MustPutStore(re, cluster, store.Store.Store) } defer cluster.Destroy() diff --git a/tests/pdctl/log/log_test.go b/tests/pdctl/log/log_test.go index 7f2e4f20584..e6995231329 100644 --- a/tests/pdctl/log/log_test.go +++ b/tests/pdctl/log/log_test.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/suite" - "github.com/tikv/pd/server" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" @@ -32,7 +31,6 @@ type logTestSuite struct { ctx context.Context cancel context.CancelFunc cluster *tests.TestCluster - svr *server.Server pdAddrs []string } @@ -54,10 +52,9 @@ func (suite *logTestSuite) SetupSuite() { State: metapb.StoreState_Up, LastHeartbeat: time.Now().UnixNano(), } - leaderServer := suite.cluster.GetServer(suite.cluster.GetLeader()) + leaderServer := suite.cluster.GetLeaderServer() suite.NoError(leaderServer.BootstrapCluster()) - suite.svr = leaderServer.GetServer() - pdctl.MustPutStore(suite.Require(), suite.svr, store) + tests.MustPutStore(suite.Require(), suite.cluster, store) } func (suite *logTestSuite) TearDownSuite() { @@ -97,7 +94,7 @@ func (suite *logTestSuite) TestLog() { for _, testCase := range testCases { _, err := pdctl.ExecuteCommand(cmd, testCase.cmd...) suite.NoError(err) - suite.Equal(testCase.expect, suite.svr.GetConfig().Log.Level) + suite.Equal(testCase.expect, suite.cluster.GetLeaderServer().GetConfig().Log.Level) } } diff --git a/tests/pdctl/member/member_test.go b/tests/pdctl/member/member_test.go index 9c787499253..af3ee771e82 100644 --- a/tests/pdctl/member/member_test.go +++ b/tests/pdctl/member/member_test.go @@ -38,7 +38,7 @@ func TestMember(t *testing.T) { err = cluster.RunInitialServers() re.NoError(err) cluster.WaitLeader() - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) pdAddr := cluster.GetConfig().GetClientURL() re.NoError(err) diff --git a/tests/pdctl/operator/operator_test.go b/tests/pdctl/operator/operator_test.go index 148cbc9e081..a95c620adcf 100644 --- a/tests/pdctl/operator/operator_test.go +++ b/tests/pdctl/operator/operator_test.go @@ -79,17 +79,17 @@ func TestOperator(t *testing.T) { }, } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) for _, store := range stores { - pdctl.MustPutStore(re, leaderServer.GetServer(), store) + tests.MustPutStore(re, cluster, store) } - pdctl.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetPeers([]*metapb.Peer{ + tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetPeers([]*metapb.Peer{ {Id: 1, StoreId: 1}, {Id: 2, StoreId: 2}, })) - pdctl.MustPutRegion(re, cluster, 3, 2, []byte("b"), []byte("d"), core.SetPeers([]*metapb.Peer{ + tests.MustPutRegion(re, cluster, 3, 2, []byte("b"), []byte("d"), core.SetPeers([]*metapb.Peer{ {Id: 3, StoreId: 1}, {Id: 4, StoreId: 2}, })) @@ -261,7 +261,7 @@ func TestForwardOperatorRequest(t *testing.T) { re.NoError(err) re.NoError(cluster.RunInitialServers()) re.NotEmpty(cluster.WaitLeader()) - server := cluster.GetServer(cluster.GetLeader()) + server := cluster.GetLeaderServer() re.NoError(server.BootstrapCluster()) backendEndpoints := server.GetAddr() tc, err := tests.NewTestSchedulingCluster(ctx, 2, backendEndpoints) diff --git a/tests/pdctl/region/region_test.go b/tests/pdctl/region/region_test.go index d56463d728d..b913f1b0923 100644 --- a/tests/pdctl/region/region_test.go +++ b/tests/pdctl/region/region_test.go @@ -45,9 +45,9 @@ func TestRegionKeyFormat(t *testing.T) { State: metapb.StoreState_Up, LastHeartbeat: time.Now().UnixNano(), } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) - pdctl.MustPutStore(re, leaderServer.GetServer(), store) + tests.MustPutStore(re, cluster, store) cmd := pdctlCmd.GetRootCmd() output, err := pdctl.ExecuteCommand(cmd, "-u", url, "region", "key", "--format=raw", " ") @@ -72,12 +72,12 @@ func TestRegion(t *testing.T) { State: metapb.StoreState_Up, LastHeartbeat: time.Now().UnixNano(), } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) - pdctl.MustPutStore(re, leaderServer.GetServer(), store) + tests.MustPutStore(re, cluster, store) downPeer := &metapb.Peer{Id: 8, StoreId: 3} - r1 := pdctl.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), + r1 := tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1), core.SetApproximateSize(1), core.SetApproximateKeys(100), core.SetPeers([]*metapb.Peer{ @@ -86,16 +86,16 @@ func TestRegion(t *testing.T) { {Id: 6, StoreId: 3}, {Id: 7, StoreId: 4}, })) - r2 := pdctl.MustPutRegion(re, cluster, 2, 1, []byte("b"), []byte("c"), + r2 := tests.MustPutRegion(re, cluster, 2, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2), core.SetRegionVersion(3), core.SetApproximateSize(144), core.SetApproximateKeys(14400), ) - r3 := pdctl.MustPutRegion(re, cluster, 3, 1, []byte("c"), []byte("d"), + r3 := tests.MustPutRegion(re, cluster, 3, 1, []byte("c"), []byte("d"), core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3), core.SetRegionVersion(2), core.SetApproximateSize(30), core.SetApproximateKeys(3000), core.WithDownPeers([]*pdpb.PeerStats{{Peer: downPeer, DownSeconds: 3600}}), core.WithPendingPeers([]*metapb.Peer{downPeer}), core.WithLearners([]*metapb.Peer{{Id: 3, StoreId: 1}})) - r4 := pdctl.MustPutRegion(re, cluster, 4, 1, []byte("d"), []byte("e"), + r4 := tests.MustPutRegion(re, cluster, 4, 1, []byte("d"), []byte("e"), core.SetWrittenBytes(100), core.SetReadBytes(100), core.SetRegionConfVer(1), core.SetRegionVersion(1), core.SetApproximateSize(10), core.SetApproximateKeys(1000), ) @@ -197,7 +197,7 @@ func TestRegion(t *testing.T) { } // Test region range-holes. - r5 := pdctl.MustPutRegion(re, cluster, 5, 1, []byte("x"), []byte("z")) + r5 := tests.MustPutRegion(re, cluster, 5, 1, []byte("x"), []byte("z")) output, err := pdctl.ExecuteCommand(cmd, []string{"-u", pdAddr, "region", "range-holes"}...) re.NoError(err) rangeHoles := new([][]string) diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 31e6270aa3b..f2d44a589a4 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -94,14 +94,14 @@ func TestScheduler(t *testing.T) { re.Equal(expectedConfig, configInfo) } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) for _, store := range stores { - pdctl.MustPutStore(re, leaderServer.GetServer(), store) + tests.MustPutStore(re, cluster, store) } // note: because pdqsort is a unstable sort algorithm, set ApproximateSize for this region. - pdctl.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) + tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) time.Sleep(3 * time.Second) // scheduler show command @@ -363,7 +363,7 @@ func TestScheduler(t *testing.T) { for _, store := range stores { version := versioninfo.HotScheduleWithQuery store.Version = versioninfo.MinSupportedVersion(version).String() - pdctl.MustPutStore(re, leaderServer.GetServer(), store) + tests.MustPutStore(re, cluster, store) } re.Equal("5.2.0", leaderServer.GetClusterVersion().String()) // After upgrading, we should not use query. @@ -488,14 +488,14 @@ func TestSchedulerDiagnostic(t *testing.T) { LastHeartbeat: time.Now().UnixNano(), }, } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) for _, store := range stores { - pdctl.MustPutStore(re, leaderServer.GetServer(), store) + tests.MustPutStore(re, cluster, store) } // note: because pdqsort is a unstable sort algorithm, set ApproximateSize for this region. - pdctl.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) + tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) time.Sleep(3 * time.Second) echo := mustExec(re, cmd, []string{"-u", pdAddr, "config", "set", "enable-diagnostic", "true"}, nil) @@ -539,7 +539,7 @@ func TestForwardSchedulerRequest(t *testing.T) { re.NoError(err) re.NoError(cluster.RunInitialServers()) re.NotEmpty(cluster.WaitLeader()) - server := cluster.GetServer(cluster.GetLeader()) + server := cluster.GetLeaderServer() re.NoError(server.BootstrapCluster()) backendEndpoints := server.GetAddr() tc, err := tests.NewTestSchedulingCluster(ctx, 2, backendEndpoints) diff --git a/tests/pdctl/store/store_test.go b/tests/pdctl/store/store_test.go index 0ac68e35d98..13c7350bb6f 100644 --- a/tests/pdctl/store/store_test.go +++ b/tests/pdctl/store/store_test.go @@ -79,11 +79,11 @@ func TestStore(t *testing.T) { }, } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) for _, store := range stores { - pdctl.MustPutStore(re, leaderServer.GetServer(), store.Store.Store) + tests.MustPutStore(re, cluster, store.Store.Store) } defer cluster.Destroy() @@ -293,7 +293,7 @@ func TestStore(t *testing.T) { NodeState: metapb.NodeState_Serving, LastHeartbeat: time.Now().UnixNano(), } - pdctl.MustPutStore(re, leaderServer.GetServer(), store2) + tests.MustPutStore(re, cluster, store2) } // store delete command @@ -506,15 +506,15 @@ func TestTombstoneStore(t *testing.T) { }, } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) for _, store := range stores { - pdctl.MustPutStore(re, leaderServer.GetServer(), store.Store.Store) + tests.MustPutStore(re, cluster, store.Store.Store) } defer cluster.Destroy() - pdctl.MustPutRegion(re, cluster, 1, 2, []byte("a"), []byte("b"), core.SetWrittenBytes(3000000000), core.SetReportInterval(0, utils.RegionHeartBeatReportInterval)) - pdctl.MustPutRegion(re, cluster, 2, 3, []byte("b"), []byte("c"), core.SetWrittenBytes(3000000000), core.SetReportInterval(0, utils.RegionHeartBeatReportInterval)) + tests.MustPutRegion(re, cluster, 1, 2, []byte("a"), []byte("b"), core.SetWrittenBytes(3000000000), core.SetReportInterval(0, utils.RegionHeartBeatReportInterval)) + tests.MustPutRegion(re, cluster, 2, 3, []byte("b"), []byte("c"), core.SetWrittenBytes(3000000000), core.SetReportInterval(0, utils.RegionHeartBeatReportInterval)) // store remove-tombstone args := []string{"-u", pdAddr, "store", "remove-tombstone"} output, err := pdctl.ExecuteCommand(cmd, args...) diff --git a/tests/pdctl/unsafe/unsafe_operation_test.go b/tests/pdctl/unsafe/unsafe_operation_test.go index 1e4e3468225..e0fdb983591 100644 --- a/tests/pdctl/unsafe/unsafe_operation_test.go +++ b/tests/pdctl/unsafe/unsafe_operation_test.go @@ -33,7 +33,7 @@ func TestRemoveFailedStores(t *testing.T) { err = cluster.RunInitialServers() re.NoError(err) cluster.WaitLeader() - err = cluster.GetServer(cluster.GetLeader()).BootstrapCluster() + err = cluster.GetLeaderServer().BootstrapCluster() re.NoError(err) pdAddr := cluster.GetConfig().GetClientURL() cmd := pdctlCmd.GetRootCmd() diff --git a/tests/registry/registry_test.go b/tests/registry/registry_test.go index da68bddd354..a3aff76a1cf 100644 --- a/tests/registry/registry_test.go +++ b/tests/registry/registry_test.go @@ -76,8 +76,8 @@ func TestRegistryService(t *testing.T) { err = cluster.RunInitialServers() re.NoError(err) - leaderName := cluster.WaitLeader() - leader := cluster.GetServer(leaderName) + re.NotEmpty(cluster.WaitLeader()) + leader := cluster.GetLeaderServer() // Test registered GRPC Service cc, err := grpc.DialContext(ctx, strings.TrimPrefix(leader.GetAddr(), "http://"), grpc.WithInsecure()) diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index cc35d9eaab3..ff430f1b848 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -40,7 +40,6 @@ import ( "github.com/tikv/pd/server/api" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" - "github.com/tikv/pd/tests/pdctl" "go.uber.org/goleak" ) @@ -64,6 +63,7 @@ func TestReconnect(t *testing.T) { // Make connections to followers. // Make sure they proxy requests to the leader. leader := cluster.WaitLeader() + re.NotEmpty(leader) for name, s := range cluster.GetServers() { if name != leader { res, err := http.Get(s.GetConfig().AdvertiseClientUrls + "/pd/api/v1/version") @@ -136,7 +136,7 @@ func (suite *middlewareTestSuite) TearDownSuite() { func (suite *middlewareTestSuite) TestRequestInfoMiddleware() { suite.NoError(failpoint.Enable("github.com/tikv/pd/server/api/addRequestInfoMiddleware", "return(true)")) - leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + leader := suite.cluster.GetLeaderServer() suite.NotNil(leader) input := map[string]interface{}{ @@ -190,7 +190,7 @@ func BenchmarkDoRequestWithServiceMiddleware(b *testing.B) { cluster, _ := tests.NewTestCluster(ctx, 1) cluster.RunInitialServers() cluster.WaitLeader() - leader := cluster.GetServer(cluster.GetLeader()) + leader := cluster.GetLeaderServer() input := map[string]interface{}{ "enable-audit": "true", } @@ -207,7 +207,7 @@ func BenchmarkDoRequestWithServiceMiddleware(b *testing.B) { } func (suite *middlewareTestSuite) TestRateLimitMiddleware() { - leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + leader := suite.cluster.GetLeaderServer() suite.NotNil(leader) input := map[string]interface{}{ "enable-rate-limit": "true", @@ -296,7 +296,7 @@ func (suite *middlewareTestSuite) TestRateLimitMiddleware() { servers = append(servers, s.GetServer()) } server.MustWaitLeader(suite.Require(), servers) - leader = suite.cluster.GetServer(suite.cluster.GetLeader()) + leader = suite.cluster.GetLeaderServer() suite.Equal(leader.GetServer().GetServiceMiddlewarePersistOptions().IsRateLimitEnabled(), true) cfg, ok := leader.GetServer().GetRateLimitConfig().LimiterConfig["SetLogLevel"] suite.Equal(ok, true) @@ -372,7 +372,7 @@ func (suite *middlewareTestSuite) TestRateLimitMiddleware() { } func (suite *middlewareTestSuite) TestSwaggerUrl() { - leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + leader := suite.cluster.GetLeaderServer() suite.NotNil(leader) req, _ := http.NewRequest(http.MethodGet, leader.GetAddr()+"/swagger/ui/index", nil) resp, err := dialClient.Do(req) @@ -382,7 +382,7 @@ func (suite *middlewareTestSuite) TestSwaggerUrl() { } func (suite *middlewareTestSuite) TestAuditPrometheusBackend() { - leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + leader := suite.cluster.GetLeaderServer() suite.NotNil(leader) input := map[string]interface{}{ "enable-audit": "true", @@ -418,7 +418,7 @@ func (suite *middlewareTestSuite) TestAuditPrometheusBackend() { servers = append(servers, s.GetServer()) } server.MustWaitLeader(suite.Require(), servers) - leader = suite.cluster.GetServer(suite.cluster.GetLeader()) + leader = suite.cluster.GetLeaderServer() timeUnix = time.Now().Unix() - 20 req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/pd/api/v1/trend?from=%d", leader.GetAddr(), timeUnix), nil) @@ -451,7 +451,7 @@ func (suite *middlewareTestSuite) TestAuditPrometheusBackend() { func (suite *middlewareTestSuite) TestAuditLocalLogBackend() { fname := testutil.InitTempFileLogger("info") defer os.RemoveAll(fname) - leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + leader := suite.cluster.GetLeaderServer() suite.NotNil(leader) input := map[string]interface{}{ "enable-audit": "true", @@ -481,7 +481,7 @@ func BenchmarkDoRequestWithLocalLogAudit(b *testing.B) { cluster, _ := tests.NewTestCluster(ctx, 1) cluster.RunInitialServers() cluster.WaitLeader() - leader := cluster.GetServer(cluster.GetLeader()) + leader := cluster.GetLeaderServer() input := map[string]interface{}{ "enable-audit": "true", } @@ -503,7 +503,7 @@ func BenchmarkDoRequestWithPrometheusAudit(b *testing.B) { cluster, _ := tests.NewTestCluster(ctx, 1) cluster.RunInitialServers() cluster.WaitLeader() - leader := cluster.GetServer(cluster.GetLeader()) + leader := cluster.GetLeaderServer() input := map[string]interface{}{ "enable-audit": "true", } @@ -525,7 +525,7 @@ func BenchmarkDoRequestWithoutServiceMiddleware(b *testing.B) { cluster, _ := tests.NewTestCluster(ctx, 1) cluster.RunInitialServers() cluster.WaitLeader() - leader := cluster.GetServer(cluster.GetLeader()) + leader := cluster.GetLeaderServer() input := map[string]interface{}{ "enable-audit": "false", } @@ -586,7 +586,7 @@ func (suite *redirectorTestSuite) TearDownSuite() { func (suite *redirectorTestSuite) TestRedirect() { re := suite.Require() - leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + leader := suite.cluster.GetLeaderServer() suite.NotNil(leader) header := mustRequestSuccess(re, leader.GetServer()) header.Del("Date") @@ -602,7 +602,7 @@ func (suite *redirectorTestSuite) TestRedirect() { func (suite *redirectorTestSuite) TestAllowFollowerHandle() { // Find a follower. var follower *server.Server - leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + leader := suite.cluster.GetLeaderServer() for _, svr := range suite.cluster.GetServers() { if svr != leader { follower = svr.GetServer() @@ -626,7 +626,7 @@ func (suite *redirectorTestSuite) TestAllowFollowerHandle() { func (suite *redirectorTestSuite) TestNotLeader() { // Find a follower. var follower *server.Server - leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + leader := suite.cluster.GetLeaderServer() for _, svr := range suite.cluster.GetServers() { if svr != leader { follower = svr.GetServer() @@ -657,7 +657,7 @@ func (suite *redirectorTestSuite) TestNotLeader() { } func (suite *redirectorTestSuite) TestXForwardedFor() { - leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + leader := suite.cluster.GetLeaderServer() suite.NoError(leader.BootstrapCluster()) fname := testutil.InitTempFileLogger("info") defer os.RemoveAll(fname) @@ -702,7 +702,7 @@ func TestRemovingProgress(t *testing.T) { re.NoError(err) cluster.WaitLeader() - leader := cluster.GetServer(cluster.GetLeader()) + leader := cluster.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr()) clusterID := leader.GetClusterID() req := &pdpb.BootstrapRequest{ @@ -735,12 +735,12 @@ func TestRemovingProgress(t *testing.T) { } for _, store := range stores { - pdctl.MustPutStore(re, leader.GetServer(), store) + tests.MustPutStore(re, cluster, store) } - pdctl.MustPutRegion(re, cluster, 1000, 1, []byte("a"), []byte("b"), core.SetApproximateSize(60)) - pdctl.MustPutRegion(re, cluster, 1001, 2, []byte("c"), []byte("d"), core.SetApproximateSize(30)) - pdctl.MustPutRegion(re, cluster, 1002, 1, []byte("e"), []byte("f"), core.SetApproximateSize(50)) - pdctl.MustPutRegion(re, cluster, 1003, 2, []byte("g"), []byte("h"), core.SetApproximateSize(40)) + tests.MustPutRegion(re, cluster, 1000, 1, []byte("a"), []byte("b"), core.SetApproximateSize(60)) + tests.MustPutRegion(re, cluster, 1001, 2, []byte("c"), []byte("d"), core.SetApproximateSize(30)) + tests.MustPutRegion(re, cluster, 1002, 1, []byte("e"), []byte("f"), core.SetApproximateSize(50)) + tests.MustPutRegion(re, cluster, 1003, 2, []byte("g"), []byte("h"), core.SetApproximateSize(40)) // no store removing output := sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?action=removing", http.MethodGet, http.StatusNotFound) @@ -762,8 +762,8 @@ func TestRemovingProgress(t *testing.T) { re.Equal(math.MaxFloat64, p.LeftSeconds) // update size - pdctl.MustPutRegion(re, cluster, 1000, 1, []byte("a"), []byte("b"), core.SetApproximateSize(20)) - pdctl.MustPutRegion(re, cluster, 1001, 2, []byte("c"), []byte("d"), core.SetApproximateSize(10)) + tests.MustPutRegion(re, cluster, 1000, 1, []byte("a"), []byte("b"), core.SetApproximateSize(20)) + tests.MustPutRegion(re, cluster, 1001, 2, []byte("c"), []byte("d"), core.SetApproximateSize(10)) // is not prepared time.Sleep(2 * time.Second) @@ -817,7 +817,8 @@ func TestSendApiWhenRestartRaftCluster(t *testing.T) { err = cluster.RunInitialServers() re.NoError(err) - leader := cluster.GetServer(cluster.WaitLeader()) + re.NotEmpty(cluster.WaitLeader()) + leader := cluster.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr()) clusterID := leader.GetClusterID() @@ -860,7 +861,7 @@ func TestPreparingProgress(t *testing.T) { re.NoError(err) cluster.WaitLeader() - leader := cluster.GetServer(cluster.GetLeader()) + leader := cluster.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr()) clusterID := leader.GetClusterID() req := &pdpb.BootstrapRequest{ @@ -910,10 +911,10 @@ func TestPreparingProgress(t *testing.T) { } for _, store := range stores { - pdctl.MustPutStore(re, leader.GetServer(), store) + tests.MustPutStore(re, cluster, store) } for i := 0; i < 100; i++ { - pdctl.MustPutRegion(re, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("p%d", i)), []byte(fmt.Sprintf("%d", i+1)), core.SetApproximateSize(10)) + tests.MustPutRegion(re, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("p%d", i)), []byte(fmt.Sprintf("%d", i+1)), core.SetApproximateSize(10)) } // no store preparing output := sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusNotFound) @@ -940,8 +941,8 @@ func TestPreparingProgress(t *testing.T) { re.Equal(math.MaxFloat64, p.LeftSeconds) // update size - pdctl.MustPutRegion(re, cluster, 1000, 4, []byte(fmt.Sprintf("%d", 1000)), []byte(fmt.Sprintf("%d", 1001)), core.SetApproximateSize(10)) - pdctl.MustPutRegion(re, cluster, 1001, 5, []byte(fmt.Sprintf("%d", 1001)), []byte(fmt.Sprintf("%d", 1002)), core.SetApproximateSize(40)) + tests.MustPutRegion(re, cluster, 1000, 4, []byte(fmt.Sprintf("%d", 1000)), []byte(fmt.Sprintf("%d", 1001)), core.SetApproximateSize(10)) + tests.MustPutRegion(re, cluster, 1001, 5, []byte(fmt.Sprintf("%d", 1001)), []byte(fmt.Sprintf("%d", 1002)), core.SetApproximateSize(40)) time.Sleep(2 * time.Second) output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusOK) re.NoError(json.Unmarshal(output, &p)) diff --git a/tests/server/apiv2/handlers/keyspace_test.go b/tests/server/apiv2/handlers/keyspace_test.go index 7fd8de013f7..f7b43ab194d 100644 --- a/tests/server/apiv2/handlers/keyspace_test.go +++ b/tests/server/apiv2/handlers/keyspace_test.go @@ -53,7 +53,7 @@ func (suite *keyspaceTestSuite) SetupTest() { suite.NoError(err) suite.NoError(cluster.RunInitialServers()) suite.NotEmpty(cluster.WaitLeader()) - suite.server = cluster.GetServer(cluster.GetLeader()) + suite.server = cluster.GetLeaderServer() suite.NoError(suite.server.BootstrapCluster()) suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion", "return(true)")) } diff --git a/tests/server/apiv2/handlers/tso_keyspace_group_test.go b/tests/server/apiv2/handlers/tso_keyspace_group_test.go index 1f0189c532f..214de6e95ef 100644 --- a/tests/server/apiv2/handlers/tso_keyspace_group_test.go +++ b/tests/server/apiv2/handlers/tso_keyspace_group_test.go @@ -45,7 +45,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() { suite.NoError(err) suite.NoError(cluster.RunInitialServers()) suite.NotEmpty(cluster.WaitLeader()) - suite.server = cluster.GetServer(cluster.GetLeader()) + suite.server = cluster.GetLeaderServer() suite.NoError(suite.server.BootstrapCluster()) } diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index f22a754b8bf..e1b04c4ebc1 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -71,7 +71,7 @@ func TestBootstrap(t *testing.T) { re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() @@ -111,7 +111,7 @@ func TestDamagedRegion(t *testing.T) { re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() bootstrapCluster(re, clusterID, grpcPDClient) @@ -191,7 +191,7 @@ func TestStaleRegion(t *testing.T) { re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() bootstrapCluster(re, clusterID, grpcPDClient) @@ -236,7 +236,7 @@ func TestGetPutConfig(t *testing.T) { re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() bootstrapCluster(re, clusterID, grpcPDClient) @@ -465,7 +465,7 @@ func TestRaftClusterRestart(t *testing.T) { re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() bootstrapCluster(re, clusterID, grpcPDClient) @@ -495,7 +495,7 @@ func TestRaftClusterMultipleRestart(t *testing.T) { re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() bootstrapCluster(re, clusterID, grpcPDClient) @@ -538,7 +538,7 @@ func TestGetPDMembers(t *testing.T) { re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() req := &pdpb.GetMembersRequest{Header: testutil.NewRequestHeader(clusterID)} @@ -582,7 +582,7 @@ func TestStoreVersionChange(t *testing.T) { re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() bootstrapCluster(re, clusterID, grpcPDClient) @@ -620,7 +620,7 @@ func TestConcurrentHandleRegion(t *testing.T) { err = tc.RunInitialServers() re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() bootstrapCluster(re, clusterID, grpcPDClient) @@ -737,7 +737,7 @@ func TestSetScheduleOpt(t *testing.T) { re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() bootstrapCluster(re, clusterID, grpcPDClient) @@ -808,7 +808,7 @@ func TestLoadClusterInfo(t *testing.T) { re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() svr := leaderServer.GetServer() rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) @@ -896,7 +896,7 @@ func TestTiFlashWithPlacementRules(t *testing.T) { err = tc.RunInitialServers() re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() bootstrapCluster(re, clusterID, grpcPDClient) @@ -949,7 +949,7 @@ func TestReplicationModeStatus(t *testing.T) { err = tc.RunInitialServers() re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() req := newBootstrapRequest(clusterID) @@ -1049,7 +1049,7 @@ func TestOfflineStoreLimit(t *testing.T) { err = tc.RunInitialServers() re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() bootstrapCluster(re, clusterID, grpcPDClient) @@ -1141,7 +1141,7 @@ func TestUpgradeStoreLimit(t *testing.T) { err = tc.RunInitialServers() re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() bootstrapCluster(re, clusterID, grpcPDClient) @@ -1199,7 +1199,7 @@ func TestStaleTermHeartbeat(t *testing.T) { err = tc.RunInitialServers() re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() bootstrapCluster(re, clusterID, grpcPDClient) @@ -1334,7 +1334,7 @@ func TestMinResolvedTS(t *testing.T) { err = tc.RunInitialServers() re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() id := leaderServer.GetAllocator() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() @@ -1443,7 +1443,7 @@ func TestTransferLeaderBack(t *testing.T) { err = tc.RunInitialServers() re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() svr := leaderServer.GetServer() rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster(), svr.GetKeyspaceGroupManager()) @@ -1470,7 +1470,7 @@ func TestTransferLeaderBack(t *testing.T) { // transfer PD leader to another PD tc.ResignLeader() tc.WaitLeader() - leaderServer = tc.GetServer(tc.GetLeader()) + leaderServer = tc.GetLeaderServer() svr1 := leaderServer.GetServer() rc1 := svr1.GetRaftCluster() re.NoError(err) @@ -1483,7 +1483,7 @@ func TestTransferLeaderBack(t *testing.T) { // transfer PD leader back to the previous PD tc.ResignLeader() tc.WaitLeader() - leaderServer = tc.GetServer(tc.GetLeader()) + leaderServer = tc.GetLeaderServer() svr = leaderServer.GetServer() rc = svr.GetRaftCluster() re.NotNil(rc) @@ -1503,7 +1503,7 @@ func TestExternalTimestamp(t *testing.T) { err = tc.RunInitialServers() re.NoError(err) tc.WaitLeader() - leaderServer := tc.GetServer(tc.GetLeader()) + leaderServer := tc.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() bootstrapCluster(re, clusterID, grpcPDClient) diff --git a/tests/server/cluster/cluster_work_test.go b/tests/server/cluster/cluster_work_test.go index f0f24ca6777..ef09e522305 100644 --- a/tests/server/cluster/cluster_work_test.go +++ b/tests/server/cluster/cluster_work_test.go @@ -42,7 +42,7 @@ func TestValidRequestRegion(t *testing.T) { re.NoError(err) cluster.WaitLeader() - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() bootstrapCluster(re, clusterID, grpcPDClient) @@ -86,7 +86,7 @@ func TestAskSplit(t *testing.T) { re.NoError(err) cluster.WaitLeader() - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() bootstrapCluster(re, clusterID, grpcPDClient) @@ -143,7 +143,7 @@ func TestSuspectRegions(t *testing.T) { re.NoError(err) cluster.WaitLeader() - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() bootstrapCluster(re, clusterID, grpcPDClient) diff --git a/tests/server/config/config_test.go b/tests/server/config/config_test.go index b9a746b8bed..1b2178bde33 100644 --- a/tests/server/config/config_test.go +++ b/tests/server/config/config_test.go @@ -43,7 +43,7 @@ func TestRateLimitConfigReload(t *testing.T) { defer cluster.Destroy() re.NoError(cluster.RunInitialServers()) re.NotEmpty(cluster.WaitLeader()) - leader := cluster.GetServer(cluster.GetLeader()) + leader := cluster.GetLeaderServer() re.NotNil(leader) re.Empty(leader.GetServer().GetServiceMiddlewareConfig().RateLimitConfig.LimiterConfig) limitCfg := make(map[string]ratelimit.DimensionConfig) @@ -69,7 +69,7 @@ func TestRateLimitConfigReload(t *testing.T) { servers = append(servers, s.GetServer()) } server.MustWaitLeader(re, servers) - leader = cluster.GetServer(cluster.GetLeader()) + leader = cluster.GetLeaderServer() re.NotNil(leader) re.True(leader.GetServer().GetServiceMiddlewarePersistOptions().IsRateLimitEnabled()) re.Len(leader.GetServer().GetServiceMiddlewarePersistOptions().GetRateLimitConfig().LimiterConfig, 1) diff --git a/tests/server/id/id_test.go b/tests/server/id/id_test.go index c4e1c8bb5de..737aa4deac2 100644 --- a/tests/server/id/id_test.go +++ b/tests/server/id/id_test.go @@ -44,7 +44,7 @@ func TestID(t *testing.T) { re.NoError(err) cluster.WaitLeader() - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() var last uint64 for i := uint64(0); i < allocStep; i++ { id, err := leaderServer.GetAllocator().Alloc() @@ -90,7 +90,7 @@ func TestCommand(t *testing.T) { re.NoError(err) cluster.WaitLeader() - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() req := &pdpb.AllocIDRequest{Header: testutil.NewRequestHeader(leaderServer.GetClusterID())} grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) @@ -116,7 +116,7 @@ func TestMonotonicID(t *testing.T) { re.NoError(err) cluster.WaitLeader() - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() var last1 uint64 for i := uint64(0); i < 10; i++ { id, err := leaderServer.GetAllocator().Alloc() @@ -127,7 +127,7 @@ func TestMonotonicID(t *testing.T) { err = cluster.ResignLeader() re.NoError(err) cluster.WaitLeader() - leaderServer = cluster.GetServer(cluster.GetLeader()) + leaderServer = cluster.GetLeaderServer() var last2 uint64 for i := uint64(0); i < 10; i++ { id, err := leaderServer.GetAllocator().Alloc() @@ -138,7 +138,7 @@ func TestMonotonicID(t *testing.T) { err = cluster.ResignLeader() re.NoError(err) cluster.WaitLeader() - leaderServer = cluster.GetServer(cluster.GetLeader()) + leaderServer = cluster.GetLeaderServer() id, err := leaderServer.GetAllocator().Alloc() re.NoError(err) re.Greater(id, last2) @@ -162,7 +162,7 @@ func TestPDRestart(t *testing.T) { err = cluster.RunInitialServers() re.NoError(err) cluster.WaitLeader() - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() var last uint64 for i := uint64(0); i < 10; i++ { diff --git a/tests/server/keyspace/keyspace_test.go b/tests/server/keyspace/keyspace_test.go index a36a7379550..86b8f6fd37c 100644 --- a/tests/server/keyspace/keyspace_test.go +++ b/tests/server/keyspace/keyspace_test.go @@ -59,7 +59,7 @@ func (suite *keyspaceTestSuite) SetupTest() { suite.NoError(err) suite.NoError(cluster.RunInitialServers()) suite.NotEmpty(cluster.WaitLeader()) - suite.server = cluster.GetServer(cluster.GetLeader()) + suite.server = cluster.GetLeaderServer() suite.manager = suite.server.GetKeyspaceManager() suite.NoError(suite.server.BootstrapCluster()) } diff --git a/tests/server/member/member_test.go b/tests/server/member/member_test.go index ca89e66a041..26d4fa2a904 100644 --- a/tests/server/member/member_test.go +++ b/tests/server/member/member_test.go @@ -63,7 +63,7 @@ func TestMemberDelete(t *testing.T) { re.NoError(err) leaderName := cluster.WaitLeader() re.NotEmpty(leaderName) - leader := cluster.GetServer(leaderName) + leader := cluster.GetLeaderServer() var members []*tests.TestServer for _, s := range cluster.GetConfig().InitialServers { if s.Name != leaderName { diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index afa5c87cdcc..f672f82f1f6 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -57,7 +57,7 @@ func TestRegionSyncer(t *testing.T) { re.NoError(cluster.RunInitialServers()) cluster.WaitLeader() - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) rc := leaderServer.GetServer().GetRaftCluster() re.NotNil(rc) @@ -140,7 +140,7 @@ func TestRegionSyncer(t *testing.T) { err = leaderServer.Stop() re.NoError(err) cluster.WaitLeader() - leaderServer = cluster.GetServer(cluster.GetLeader()) + leaderServer = cluster.GetLeaderServer() re.NotNil(leaderServer) loadRegions := leaderServer.GetServer().GetRaftCluster().GetRegions() re.Len(loadRegions, regionLen) @@ -166,7 +166,7 @@ func TestFullSyncWithAddMember(t *testing.T) { err = cluster.RunInitialServers() re.NoError(err) cluster.WaitLeader() - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) rc := leaderServer.GetServer().GetRaftCluster() re.NotNil(rc) @@ -210,7 +210,7 @@ func TestPrepareChecker(t *testing.T) { err = cluster.RunInitialServers() re.NoError(err) cluster.WaitLeader() - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) rc := leaderServer.GetServer().GetRaftCluster() re.NotNil(rc) @@ -235,7 +235,7 @@ func TestPrepareChecker(t *testing.T) { err = cluster.ResignLeader() re.NoError(err) re.Equal("pd2", cluster.WaitLeader()) - leaderServer = cluster.GetServer(cluster.GetLeader()) + leaderServer = cluster.GetLeaderServer() rc = leaderServer.GetServer().GetRaftCluster() for _, region := range regions { err = rc.HandleRegionHeartbeat(region) diff --git a/tests/server/storage/hot_region_storage_test.go b/tests/server/storage/hot_region_storage_test.go index 21881802d7d..00d0244a790 100644 --- a/tests/server/storage/hot_region_storage_test.go +++ b/tests/server/storage/hot_region_storage_test.go @@ -29,7 +29,6 @@ import ( "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" - "github.com/tikv/pd/tests/pdctl" ) func TestHotRegionStorage(t *testing.T) { @@ -61,20 +60,20 @@ func TestHotRegionStorage(t *testing.T) { }, } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) for _, store := range stores { - pdctl.MustPutStore(re, leaderServer.GetServer(), store) + tests.MustPutStore(re, cluster, store) } defer cluster.Destroy() startTime := time.Now().Unix() - pdctl.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(3000000000), + tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(3000000000), core.SetReportInterval(uint64(startTime-utils.RegionHeartBeatReportInterval), uint64(startTime))) - pdctl.MustPutRegion(re, cluster, 2, 2, []byte("c"), []byte("d"), core.SetWrittenBytes(6000000000), + tests.MustPutRegion(re, cluster, 2, 2, []byte("c"), []byte("d"), core.SetWrittenBytes(6000000000), core.SetReportInterval(uint64(startTime-utils.RegionHeartBeatReportInterval), uint64(startTime))) - pdctl.MustPutRegion(re, cluster, 3, 1, []byte("e"), []byte("f"), + tests.MustPutRegion(re, cluster, 3, 1, []byte("e"), []byte("f"), core.SetReportInterval(uint64(startTime-utils.RegionHeartBeatReportInterval), uint64(startTime))) - pdctl.MustPutRegion(re, cluster, 4, 2, []byte("g"), []byte("h"), + tests.MustPutRegion(re, cluster, 4, 2, []byte("g"), []byte("h"), core.SetReportInterval(uint64(startTime-utils.RegionHeartBeatReportInterval), uint64(startTime))) storeStats := []*pdpb.StoreStats{ { @@ -169,14 +168,14 @@ func TestHotRegionStorageReservedDayConfigChange(t *testing.T) { }, } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) for _, store := range stores { - pdctl.MustPutStore(re, leaderServer.GetServer(), store) + tests.MustPutStore(re, cluster, store) } defer cluster.Destroy() startTime := time.Now().Unix() - pdctl.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(3000000000), + tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(3000000000), core.SetReportInterval(uint64(startTime-utils.RegionHeartBeatReportInterval), uint64(startTime))) var iter storage.HotRegionStorageIterator var next *storage.HistoryHotRegion @@ -197,7 +196,7 @@ func TestHotRegionStorageReservedDayConfigChange(t *testing.T) { schedule.HotRegionsReservedDays = 0 leaderServer.GetServer().SetScheduleConfig(schedule) time.Sleep(3 * interval) - pdctl.MustPutRegion(re, cluster, 2, 2, []byte("c"), []byte("d"), core.SetWrittenBytes(6000000000), + tests.MustPutRegion(re, cluster, 2, 2, []byte("c"), []byte("d"), core.SetWrittenBytes(6000000000), core.SetReportInterval(uint64(time.Now().Unix()-utils.RegionHeartBeatReportInterval), uint64(time.Now().Unix()))) time.Sleep(10 * interval) hotRegionStorage := leaderServer.GetServer().GetHistoryHotRegionStorage() @@ -261,14 +260,14 @@ func TestHotRegionStorageWriteIntervalConfigChange(t *testing.T) { }, } - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) for _, store := range stores { - pdctl.MustPutStore(re, leaderServer.GetServer(), store) + tests.MustPutStore(re, cluster, store) } defer cluster.Destroy() startTime := time.Now().Unix() - pdctl.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), + tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(3000000000), core.SetReportInterval(uint64(startTime-utils.RegionHeartBeatReportInterval), uint64(startTime))) var iter storage.HotRegionStorageIterator @@ -290,7 +289,7 @@ func TestHotRegionStorageWriteIntervalConfigChange(t *testing.T) { schedule.HotRegionsWriteInterval.Duration = 20 * interval leaderServer.GetServer().SetScheduleConfig(schedule) time.Sleep(3 * interval) - pdctl.MustPutRegion(re, cluster, 2, 2, []byte("c"), []byte("d"), core.SetWrittenBytes(6000000000), + tests.MustPutRegion(re, cluster, 2, 2, []byte("c"), []byte("d"), core.SetWrittenBytes(6000000000), core.SetReportInterval(uint64(time.Now().Unix()-utils.RegionHeartBeatReportInterval), uint64(time.Now().Unix()))) time.Sleep(10 * interval) // it cant get new hot region because wait time smaller than hot region write interval diff --git a/tests/server/tso/consistency_test.go b/tests/server/tso/consistency_test.go index db6e2135d2b..9cfadbf5ba3 100644 --- a/tests/server/tso/consistency_test.go +++ b/tests/server/tso/consistency_test.go @@ -79,7 +79,7 @@ func (suite *tsoConsistencyTestSuite) TestSynchronizedGlobalTSO() { re := suite.Require() cluster.WaitAllLeaders(re, dcLocationConfig) - suite.leaderServer = cluster.GetServer(cluster.GetLeader()) + suite.leaderServer = cluster.GetLeaderServer() suite.NotNil(suite.leaderServer) suite.dcClientMap[tso.GlobalDCLocation] = testutil.MustNewGrpcClient(re, suite.leaderServer.GetAddr()) for _, dcLocation := range dcLocationConfig { @@ -154,7 +154,7 @@ func (suite *tsoConsistencyTestSuite) TestSynchronizedGlobalTSOOverflow() { re := suite.Require() cluster.WaitAllLeaders(re, dcLocationConfig) - suite.leaderServer = cluster.GetServer(cluster.GetLeader()) + suite.leaderServer = cluster.GetLeaderServer() suite.NotNil(suite.leaderServer) suite.dcClientMap[tso.GlobalDCLocation] = testutil.MustNewGrpcClient(re, suite.leaderServer.GetAddr()) for _, dcLocation := range dcLocationConfig { @@ -186,7 +186,7 @@ func (suite *tsoConsistencyTestSuite) TestLocalAllocatorLeaderChange() { re := suite.Require() cluster.WaitAllLeaders(re, dcLocationConfig) - suite.leaderServer = cluster.GetServer(cluster.GetLeader()) + suite.leaderServer = cluster.GetLeaderServer() suite.NotNil(suite.leaderServer) suite.dcClientMap[tso.GlobalDCLocation] = testutil.MustNewGrpcClient(re, suite.leaderServer.GetAddr()) for _, dcLocation := range dcLocationConfig { @@ -248,7 +248,7 @@ func (suite *tsoConsistencyTestSuite) TestLocalTSOAfterMemberChanged() { re := suite.Require() cluster.WaitAllLeaders(re, dcLocationConfig) - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() leaderCli := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) req := &pdpb.TsoRequest{ Header: testutil.NewRequestHeader(cluster.GetCluster().GetId()), @@ -286,7 +286,7 @@ func (suite *tsoConsistencyTestSuite) TestLocalTSOAfterMemberChanged() { func (suite *tsoConsistencyTestSuite) testTSO(cluster *tests.TestCluster, dcLocationConfig map[string]string, previousTS *pdpb.Timestamp) { re := suite.Require() - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() dcClientMap := make(map[string]pdpb.PDClient) for _, dcLocation := range dcLocationConfig { pdName := leaderServer.GetAllocatorLeader(dcLocation).GetName() diff --git a/tests/server/tso/global_tso_test.go b/tests/server/tso/global_tso_test.go index a6340e2671c..5ae2e6e0f67 100644 --- a/tests/server/tso/global_tso_test.go +++ b/tests/server/tso/global_tso_test.go @@ -97,7 +97,7 @@ func TestDelaySyncTimestamp(t *testing.T) { cluster.WaitLeader() var leaderServer, nextLeaderServer *tests.TestServer - leaderServer = cluster.GetServer(cluster.GetLeader()) + leaderServer = cluster.GetLeaderServer() re.NotNil(leaderServer) for _, s := range cluster.GetServers() { if s.GetConfig().Name != cluster.GetLeader() { @@ -145,7 +145,7 @@ func TestLogicalOverflow(t *testing.T) { re.NoError(cluster.RunInitialServers()) cluster.WaitLeader() - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() diff --git a/tests/server/tso/tso_test.go b/tests/server/tso/tso_test.go index 48df02a6c27..9eff1192e57 100644 --- a/tests/server/tso/tso_test.go +++ b/tests/server/tso/tso_test.go @@ -76,7 +76,7 @@ func TestLoadTimestamp(t *testing.T) { func requestLocalTSOs(re *require.Assertions, cluster *tests.TestCluster, dcLocationConfig map[string]string) map[string]*pdpb.Timestamp { dcClientMap := make(map[string]pdpb.PDClient) tsMap := make(map[string]*pdpb.Timestamp) - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() for _, dcLocation := range dcLocationConfig { pdName := leaderServer.GetAllocatorLeader(dcLocation).GetName() dcClientMap[dcLocation] = testutil.MustNewGrpcClient(re, cluster.GetServer(pdName).GetAddr()) @@ -125,7 +125,7 @@ func TestDisableLocalTSOAfterEnabling(t *testing.T) { cluster.WaitLeader() // Re-request the global TSOs. - leaderServer := cluster.GetServer(cluster.GetLeader()) + leaderServer := cluster.GetLeaderServer() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() req := &pdpb.TsoRequest{ diff --git a/tests/server/watch/leader_watch_test.go b/tests/server/watch/leader_watch_test.go index 049486ba068..f7765297023 100644 --- a/tests/server/watch/leader_watch_test.go +++ b/tests/server/watch/leader_watch_test.go @@ -42,7 +42,7 @@ func TestWatcher(t *testing.T) { err = cluster.RunInitialServers() re.NoError(err) cluster.WaitLeader() - pd1 := cluster.GetServer(cluster.GetLeader()) + pd1 := cluster.GetLeaderServer() re.NotNil(pd1) pd2, err := cluster.Join(ctx) @@ -80,7 +80,7 @@ func TestWatcherCompacted(t *testing.T) { err = cluster.RunInitialServers() re.NoError(err) cluster.WaitLeader() - pd1 := cluster.GetServer(cluster.GetLeader()) + pd1 := cluster.GetLeaderServer() re.NotNil(pd1) client := pd1.GetEtcdClient() _, err = client.Put(context.Background(), "test", "v") diff --git a/tests/testutil.go b/tests/testutil.go index 53efcff7658..3fd8e9dca35 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -16,19 +16,26 @@ package tests import ( "context" + "fmt" "os" "sync" "time" + "github.com/docker/go-units" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/stretchr/testify/require" bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/core" rm "github.com/tikv/pd/pkg/mcs/resourcemanager/server" scheduling "github.com/tikv/pd/pkg/mcs/scheduling/server" sc "github.com/tikv/pd/pkg/mcs/scheduling/server/config" tso "github.com/tikv/pd/pkg/mcs/tso/server" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/versioninfo" + "github.com/tikv/pd/server" "go.uber.org/zap" ) @@ -148,3 +155,68 @@ func WaitForPrimaryServing(re *require.Assertions, serverMap map[string]bs.Serve return primary } + +// MustPutStore is used for test purpose. +func MustPutStore(re *require.Assertions, cluster *TestCluster, store *metapb.Store) { + store.Address = fmt.Sprintf("tikv%d", store.GetId()) + if len(store.Version) == 0 { + store.Version = versioninfo.MinSupportedVersion(versioninfo.Version2_0).String() + } + svr := cluster.GetLeaderServer().GetServer() + grpcServer := &server.GrpcServer{Server: svr} + _, err := grpcServer.PutStore(context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: svr.ClusterID()}, + Store: store, + }) + re.NoError(err) + + storeInfo := grpcServer.GetRaftCluster().GetStore(store.GetId()) + newStore := storeInfo.Clone(core.SetStoreStats(&pdpb.StoreStats{ + Capacity: uint64(10 * units.GiB), + UsedSize: uint64(9 * units.GiB), + Available: uint64(1 * units.GiB), + })) + grpcServer.GetRaftCluster().GetBasicCluster().PutStore(newStore) + if cluster.GetSchedulingPrimaryServer() != nil { + cluster.GetSchedulingPrimaryServer().GetCluster().PutStore(newStore) + } +} + +// MustPutRegion is used for test purpose. +func MustPutRegion(re *require.Assertions, cluster *TestCluster, regionID, storeID uint64, start, end []byte, opts ...core.RegionCreateOption) *core.RegionInfo { + leader := &metapb.Peer{ + Id: regionID, + StoreId: storeID, + } + metaRegion := &metapb.Region{ + Id: regionID, + StartKey: start, + EndKey: end, + Peers: []*metapb.Peer{leader}, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, + } + r := core.NewRegionInfo(metaRegion, leader, opts...) + err := cluster.HandleRegionHeartbeat(r) + re.NoError(err) + if cluster.GetSchedulingPrimaryServer() != nil { + err = cluster.GetSchedulingPrimaryServer().GetCluster().HandleRegionHeartbeat(r) + re.NoError(err) + } + return r +} + +// MustReportBuckets is used for test purpose. +func MustReportBuckets(re *require.Assertions, cluster *TestCluster, regionID uint64, start, end []byte, stats *metapb.BucketStats) *metapb.Buckets { + buckets := &metapb.Buckets{ + RegionId: regionID, + Version: 1, + Keys: [][]byte{start, end}, + Stats: stats, + // report buckets interval is 10s + PeriodInMs: 10000, + } + err := cluster.HandleReportBuckets(buckets) + re.NoError(err) + // TODO: forwards to scheduling server after it supports buckets + return buckets +}