Skip to content

Commit

Permalink
test: make TestOperatorTestSuite more stable
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Oct 26, 2023
1 parent 89128f1 commit 084c739
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 18 deletions.
6 changes: 0 additions & 6 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,12 +504,6 @@ func (s *Server) stopWatcher() {
s.metaWatcher.Close()
}

// GetPersistConfig returns the persist config.
// It's used to test.
func (s *Server) GetPersistConfig() *config.PersistConfig {
return s.persistConfig
}

// CreateServer creates the Server
func CreateServer(ctx context.Context, cfg *config.Config) *Server {
svr := &Server{
Expand Down
8 changes: 8 additions & 0 deletions tests/pdctl/operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/pdctl"
Expand Down Expand Up @@ -221,6 +222,13 @@ func (suite *operatorTestSuite) checkOperator(cluster *tests.TestCluster) {

_, err = pdctl.ExecuteCommand(cmd, "config", "set", "enable-placement-rules", "true")
re.NoError(err)
if sche := cluster.GetSchedulingPrimaryServer(); sche != nil {
// wait for the scheduler server to update the config
testutil.Eventually(re, func() bool {
return sche.GetCluster().GetCheckerConfig().IsPlacementRulesEnabled()
})
}

output, err = pdctl.ExecuteCommand(cmd, "operator", "add", "transfer-region", "1", "2", "3")
re.NoError(err)
re.Contains(string(output), "not supported")
Expand Down
4 changes: 1 addition & 3 deletions tests/pdctl/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,8 @@ func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *tests.TestClu
result := make(map[string]interface{})
testutil.Eventually(re, func() bool {
mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", schedulerName}, &result)
return len(result) != 0
return len(result) != 0 && expectedStatus == result["status"] && expectedSummary == result["summary"]
}, testutil.WithTickInterval(50*time.Millisecond))
re.Equal(expectedStatus, result["status"])
re.Equal(expectedSummary, result["summary"])
}

stores := []*metapb.Store{
Expand Down
69 changes: 60 additions & 9 deletions tests/server/api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package api

import (
"encoding/json"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -73,6 +74,18 @@ func (suite *operatorTestSuite) TestOperator() {

func (suite *operatorTestSuite) checkAddRemovePeer(cluster *tests.TestCluster) {
re := suite.Require()

// pause rule checker to avoid unexpected operator
checkerName := "rule"
addr := cluster.GetLeaderServer().GetAddr()
resp := make(map[string]interface{})
url := fmt.Sprintf("%s/pd/api/v1/checker/%s", addr, checkerName)
err := tu.CheckPostJSON(testDialClient, url, []byte(`{"delay":1000}`), tu.StatusOK(re))
re.NoError(err)
err = tu.ReadGetJSON(re, testDialClient, url, &resp)
re.NoError(err)
re.True(resp["paused"].(bool))

stores := []*metapb.Store{
{
Id: 1,
Expand Down Expand Up @@ -106,13 +119,15 @@ func (suite *operatorTestSuite) checkAddRemovePeer(cluster *tests.TestCluster) {
ConfVer: 1,
Version: 1,
},
StartKey: []byte("a"),
EndKey: []byte("b"),
}
regionInfo := core.NewRegionInfo(region, peer1)
tests.MustPutRegionInfo(re, cluster, regionInfo)

urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr())
regionURL := fmt.Sprintf("%s/operators/%d", urlPrefix, region.GetId())
err := tu.CheckGetJSON(testDialClient, regionURL, nil,
err = tu.CheckGetJSON(testDialClient, regionURL, nil,
tu.StatusNotOK(re), tu.StringContain(re, "operator not found"))
suite.NoError(err)
recordURL := fmt.Sprintf("%s/operators/records?from=%s", urlPrefix, strconv.FormatInt(time.Now().Unix(), 10))
Expand Down Expand Up @@ -168,14 +183,26 @@ func (suite *operatorTestSuite) checkAddRemovePeer(cluster *tests.TestCluster) {

// Fail to get operator if from is latest.
time.Sleep(time.Second)
url := fmt.Sprintf("%s/operators/records?from=%s", urlPrefix, strconv.FormatInt(time.Now().Unix(), 10))
url = fmt.Sprintf("%s/operators/records?from=%s", urlPrefix, strconv.FormatInt(time.Now().Unix(), 10))
err = tu.CheckGetJSON(testDialClient, url, nil,
tu.StatusNotOK(re), tu.StringContain(re, "operator not found"))
suite.NoError(err)
}

func (suite *operatorTestSuite) checkMergeRegionOperator(cluster *tests.TestCluster) {
re := suite.Require()

// pause rule checker to avoid unexpected operator
checkerName := "rule"
addr := cluster.GetLeaderServer().GetAddr()
resp := make(map[string]interface{})
url := fmt.Sprintf("%s/pd/api/v1/checker/%s", addr, checkerName)
err := tu.CheckPostJSON(testDialClient, url, []byte(`{"delay":1000}`), tu.StatusOK(re))
re.NoError(err)
err = tu.ReadGetJSON(re, testDialClient, url, &resp)
re.NoError(err)
re.True(resp["paused"].(bool))

r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1))
tests.MustPutRegionInfo(re, cluster, r1)
r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2), core.SetRegionVersion(3))
Expand All @@ -184,7 +211,7 @@ func (suite *operatorTestSuite) checkMergeRegionOperator(cluster *tests.TestClus
tests.MustPutRegionInfo(re, cluster, r3)

urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr())
err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 10, "target_region_id": 20}`), tu.StatusOK(re))
err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 10, "target_region_id": 20}`), tu.StatusOK(re))
suite.NoError(err)

tu.CheckDelete(testDialClient, fmt.Sprintf("%s/operators/%d", urlPrefix, 10), tu.StatusOK(re))
Expand All @@ -201,6 +228,18 @@ func (suite *operatorTestSuite) checkMergeRegionOperator(cluster *tests.TestClus

func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *tests.TestCluster) {
re := suite.Require()

// pause rule checker to avoid unexpected operator
checkerName := "rule"
addr := cluster.GetLeaderServer().GetAddr()
resp := make(map[string]interface{})
url := fmt.Sprintf("%s/pd/api/v1/checker/%s", addr, checkerName)
err := tu.CheckPostJSON(testDialClient, url, []byte(`{"delay":1000}`), tu.StatusOK(re))
re.NoError(err)
err = tu.ReadGetJSON(re, testDialClient, url, &resp)
re.NoError(err)
re.True(resp["paused"].(bool))

stores := []*metapb.Store{
{
Id: 1,
Expand Down Expand Up @@ -239,12 +278,14 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te
ConfVer: 1,
Version: 1,
},
StartKey: []byte("a"),
EndKey: []byte("b"),
}
tests.MustPutRegionInfo(re, cluster, core.NewRegionInfo(region, peer1))

urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr())
regionURL := fmt.Sprintf("%s/operators/%d", urlPrefix, region.GetId())
err := tu.CheckGetJSON(testDialClient, regionURL, nil,
err = tu.CheckGetJSON(testDialClient, regionURL, nil,
tu.StatusNotOK(re), tu.StringContain(re, "operator not found"))
re.NoError(err)
convertStepsToStr := func(steps []string) string {
Expand Down Expand Up @@ -408,13 +449,24 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te
},
}
svr := cluster.GetLeaderServer()
url = fmt.Sprintf("%s/pd/api/v1/config", svr.GetAddr())
for _, testCase := range testCases {
suite.T().Log(testCase.name)
// TODO: remove this after we can sync this config to all servers.
if sche := cluster.GetSchedulingPrimaryServer(); sche != nil {
sche.GetCluster().GetSchedulerConfig().SetPlacementRuleEnabled(testCase.placementRuleEnable)
data := make(map[string]interface{})
if testCase.placementRuleEnable {
data["enable-placement-rules"] = "true"
} else {
svr.GetRaftCluster().GetOpts().SetPlacementRuleEnabled(testCase.placementRuleEnable)
data["enable-placement-rules"] = "false"
}
reqData, e := json.Marshal(data)
re.NoError(e)
err := tu.CheckPostJSON(testDialClient, url, reqData, tu.StatusOK(re))
re.NoError(err)
if sche := cluster.GetSchedulingPrimaryServer(); sche != nil {
// wait for the scheduler server to update the config
tu.Eventually(re, func() bool {
return sche.GetCluster().GetCheckerConfig().IsPlacementRulesEnabled() == testCase.placementRuleEnable
})
}
manager := svr.GetRaftCluster().GetRuleManager()
if sche := cluster.GetSchedulingPrimaryServer(); sche != nil {
Expand All @@ -436,7 +488,6 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te
err = manager.DeleteRule("pd", "default")
suite.NoError(err)
}
var err error
if testCase.expectedError == nil {
err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), testCase.input, tu.StatusOK(re))
} else {
Expand Down

0 comments on commit 084c739

Please sign in to comment.