Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test operator test #22

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
sche "github.com/tikv/pd/pkg/schedule/core"
Expand Down Expand Up @@ -128,6 +129,7 @@ func (s *Service) RegisterSchedulersRouter() {
router := s.root.Group("schedulers")
router.GET("", getSchedulers)
router.GET("/diagnostic/:name", getDiagnosticResult)
router.GET("/config/:name/:suffix", getSchedulerConfigByName)
// TODO: in the future, we should split pauseOrResumeScheduler to two different APIs.
// And we need to do one-to-two forwarding in the API middleware.
router.POST("/:name", pauseOrResumeScheduler)
Expand Down Expand Up @@ -385,6 +387,19 @@ func getSchedulers(c *gin.Context) {
c.IndentedJSON(http.StatusOK, output)
}

func getSchedulerConfigByName(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
handlers := svr.GetCoordinator().GetSchedulersController().GetSchedulerHandlers()
name := c.Param("name")
if _, ok := handlers[name]; !ok {
c.String(http.StatusNotFound, errs.ErrSchedulerNotFound.Error())
return
}
suffix := c.Param("suffix")
c.Request.URL.Path = "/" + suffix
handlers[name].ServeHTTP(c.Writer, c.Request)
}

// @Tags schedulers
// @Summary List schedulers diagnostic result.
// @Produce json
Expand Down
14 changes: 13 additions & 1 deletion pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,19 @@ func (h *Handler) GetSchedulerByStatus(status string, needTS bool) (interface{},
}
return disabledSchedulers, nil
default:
return schedulers, nil
// The default scheduler could not be deleted, it could only be disabled.
// TODO: Should we distinguish between disabled and removed schedulers?
var enabledSchedulers []string
for _, scheduler := range schedulers {
disabled, err := sc.IsSchedulerDisabled(scheduler)
if err != nil {
return nil, err
}
if !disabled {
enabledSchedulers = append(enabledSchedulers, scheduler)
}
}
return enabledSchedulers, nil
}
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/utils/testutil/api_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,29 @@ func StatusOK(re *require.Assertions) func([]byte, int, http.Header) {

// StatusNotOK is used to check whether http response code is not equal http.StatusOK.
func StatusNotOK(re *require.Assertions) func([]byte, int, http.Header) {
return func(_ []byte, i int, _ http.Header) {
re.NotEqual(http.StatusOK, i)
return func(resp []byte, i int, _ http.Header) {
re.NotEqual(http.StatusOK, i, "resp: "+string(resp))
}
}

// ExtractJSON is used to check whether given data can be extracted successfully.
func ExtractJSON(re *require.Assertions, data interface{}) func([]byte, int, http.Header) {
return func(res []byte, _ int, _ http.Header) {
re.NoError(json.Unmarshal(res, data))
return func(resp []byte, _ int, _ http.Header) {
re.NoError(json.Unmarshal(resp, data), "resp: "+string(resp))
}
}

// StringContain is used to check whether response context contains given string.
func StringContain(re *require.Assertions, sub string) func([]byte, int, http.Header) {
return func(res []byte, _ int, _ http.Header) {
re.Contains(string(res), sub)
return func(resp []byte, _ int, _ http.Header) {
re.Contains(string(resp), sub, "resp: "+string(resp))
}
}

// StringEqual is used to check whether response context equal given string.
func StringEqual(re *require.Assertions, str string) func([]byte, int, http.Header) {
return func(res []byte, _ int, _ http.Header) {
re.Contains(string(res), str)
return func(resp []byte, _ int, _ http.Header) {
re.Contains(string(resp), str, "resp: "+string(resp))
}
}

Expand Down
6 changes: 6 additions & 0 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
// "/schedulers", http.MethodGet
// "/schedulers/{name}", http.MethodPost
// "/schedulers/diagnostic/{name}", http.MethodGet
// "/scheduler-config", http.MethodGet
// "/hotspot/regions/read", http.MethodGet
// "/hotspot/regions/write", http.MethodGet
// "/hotspot/regions/history", http.MethodGet
Expand Down Expand Up @@ -90,6 +91,11 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
scheapi.APIPathPrefix+"/schedulers",
mcs.SchedulingServiceName,
[]string{http.MethodGet}),
serverapi.MicroserviceRedirectRule(
prefix+"/scheduler-config",
scheapi.APIPathPrefix+"/schedulers/config",
mcs.SchedulingServiceName,
[]string{http.MethodGet}),
serverapi.MicroserviceRedirectRule(
prefix+"/schedulers/", // Note: this means "/schedulers/{name}"
scheapi.APIPathPrefix+"/schedulers",
Expand Down
5 changes: 2 additions & 3 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,11 +789,10 @@ func (o *PersistOptions) Persist(storage endpoint.ConfigStorage) error {
},
StoreConfig: *o.GetStoreConfig(),
}
err := storage.SaveConfig(cfg)
failpoint.Inject("persistFail", func() {
err = errors.New("fail to persist")
failpoint.Return(errors.New("fail to persist"))
})
return err
return storage.SaveConfig(cfg)
}

// Reload reloads the configuration from the storage.
Expand Down
12 changes: 12 additions & 0 deletions tests/integrations/mcs/scheduling/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (suite *apiTestSuite) TestAPIForward() {
// "/schedulers", http.MethodGet
// "/schedulers/{name}", http.MethodPost
// "/schedulers/diagnostic/{name}", http.MethodGet
// "/scheduler-config/", http.MethodGet
// Should not redirect:
// "/schedulers", http.MethodPost
// "/schedulers/{name}", http.MethodDelete
Expand All @@ -189,6 +190,17 @@ func (suite *apiTestSuite) TestAPIForward() {
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
suite.NoError(err)

schedulers := []string{
"balance-leader-scheduler",
"balance-witness-scheduler",
"balance-hot-region-scheduler",
}
for _, schedulerName := range schedulers {
err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s/%s/%s", urlPrefix, "scheduler-config", schedulerName, "list"), &resp,
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
suite.NoError(err)
}

err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "schedulers"), pauseArgs,
testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader))
re.NoError(err)
Expand Down
9 changes: 8 additions & 1 deletion tests/pdctl/operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/pdctl"
Expand Down Expand Up @@ -221,9 +222,15 @@ func (suite *operatorTestSuite) checkOperator(cluster *tests.TestCluster) {

_, err = pdctl.ExecuteCommand(cmd, "config", "set", "enable-placement-rules", "true")
re.NoError(err)
// wait for the config to take effect in scheduling server when cluster is in ap mode.
if sche := cluster.GetSchedulingPrimaryServer(); sche != nil {
testutil.Eventually(re, func() bool {
return sche.GetCluster().GetSchedulerConfig().IsPlacementRulesEnabled()
})
}
output, err = pdctl.ExecuteCommand(cmd, "operator", "add", "transfer-region", "1", "2", "3")
re.NoError(err)
re.Contains(string(output), "not supported")
re.Contains(string(output), "not supported", "output: "+string(output))
output, err = pdctl.ExecuteCommand(cmd, "operator", "add", "transfer-region", "1", "2", "follower", "3")
re.NoError(err)
re.Contains(string(output), "not match")
Expand Down
93 changes: 56 additions & 37 deletions tests/pdctl/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ package scheduler_test
import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
"testing"
"time"

Expand All @@ -43,8 +46,7 @@ func TestSchedulerTestSuite(t *testing.T) {

func (suite *schedulerTestSuite) TestScheduler() {
env := tests.NewSchedulingTestEnvironment(suite.T())
// Fixme: use RunTestInTwoModes when sync deleted scheduler is supported.
env.RunTestInPDMode(suite.checkScheduler)
env.RunTestInTwoModes(suite.checkScheduler)
env.RunTestInTwoModes(suite.checkSchedulerDiagnostic)
}

Expand Down Expand Up @@ -83,20 +85,30 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) {
}

checkSchedulerCommand := func(args []string, expected map[string]bool) {
if args != nil {
mustExec(re, cmd, args, nil)
}
var schedulers []string
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, &schedulers)
for _, scheduler := range schedulers {
re.True(expected[scheduler])
}
testutil.Eventually(re, func() bool {
if args != nil {
mustExec(re, cmd, args, nil)
}
var schedulers []string
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, &schedulers)
if len(schedulers) != len(expected) {
return false
}
for _, scheduler := range schedulers {
if _, ok := expected[scheduler]; !ok {
return false
}
}
return true
})
}

checkSchedulerConfigCommand := func(expectedConfig map[string]interface{}, schedulerName string) {
configInfo := make(map[string]interface{})
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName}, &configInfo)
re.Equal(expectedConfig, configInfo)
testutil.Eventually(re, func() bool {
configInfo := make(map[string]interface{})
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName}, &configInfo)
return reflect.DeepEqual(expectedConfig, configInfo)
})
}

leaderServer := cluster.GetLeaderServer()
Expand All @@ -106,7 +118,6 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) {

// note: because pdqsort is a unstable sort algorithm, set ApproximateSize for this region.
tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10))
time.Sleep(3 * time.Second)

// scheduler show command
expected := map[string]bool{
Expand All @@ -120,7 +131,6 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) {

// scheduler delete command
args := []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}
time.Sleep(10 * time.Second)
expected = map[string]bool{
"balance-leader-scheduler": true,
"balance-hot-region-scheduler": true,
Expand Down Expand Up @@ -160,8 +170,11 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) {
checkSchedulerCommand(args, expected)

// check update success
expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}, "3": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}}
checkSchedulerConfigCommand(expectedConfig, schedulers[idx])
// FIXME: remove this check after scheduler config is updated
if cluster.GetSchedulingPrimaryServer() == nil {
expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}, "3": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}}
checkSchedulerConfigCommand(expectedConfig, schedulers[idx])
}

// scheduler delete command
args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx]}
Expand Down Expand Up @@ -271,6 +284,8 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) {
re.Contains(echo, "Success!")
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil)
re.NotContains(echo, "Success!")
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil)
re.Contains(echo, "Success!")
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}, nil)
re.Contains(echo, "Success!")
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil)
Expand Down Expand Up @@ -412,24 +427,31 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) {
for _, schedulerName := range evictSlownessSchedulers {
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", schedulerName}, nil)
re.Contains(echo, "Success!")
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil)
re.Contains(echo, schedulerName)
testutil.Eventually(re, func() bool {
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil)
return strings.Contains(echo, schedulerName)
})
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "set", "recovery-duration", "100"}, nil)
re.Contains(echo, "Success!")
conf = make(map[string]interface{})
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "show"}, &conf)
re.Equal(100., conf["recovery-duration"])
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", schedulerName}, nil)
re.Contains(echo, "Success!")
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil)
re.NotContains(echo, schedulerName)
testutil.Eventually(re, func() bool {
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil)
return !strings.Contains(echo, schedulerName)
})
}

// test show scheduler with paused and disabled status.
checkSchedulerWithStatusCommand := func(status string, expected []string) {
var schedulers []string
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show", "--status", status}, &schedulers)
re.Equal(expected, schedulers)
testutil.Eventually(re, func() bool {
var schedulers []string
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show", "--status", status}, &schedulers)
fmt.Println(schedulers, expected)
return reflect.DeepEqual(expected, schedulers)
})
}

mustUsage([]string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler"})
Expand Down Expand Up @@ -469,13 +491,14 @@ func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *tests.TestClu
cmd := pdctlCmd.GetRootCmd()

checkSchedulerDescribeCommand := func(schedulerName, expectedStatus, expectedSummary string) {
result := make(map[string]interface{})
testutil.Eventually(re, func() bool {
mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", schedulerName}, &result)
return len(result) != 0
}, testutil.WithTickInterval(50*time.Millisecond))
re.Equal(expectedStatus, result["status"])
re.Equal(expectedSummary, result["summary"])
result := make(map[string]interface{})
testutil.Eventually(re, func() bool {
mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", schedulerName}, &result)
return len(result) != 0
}, testutil.WithTickInterval(50*time.Millisecond))
return result["status"] == expectedStatus && result["summary"] == expectedSummary
})
}

stores := []*metapb.Store{
Expand Down Expand Up @@ -506,18 +529,14 @@ func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *tests.TestClu

// note: because pdqsort is a unstable sort algorithm, set ApproximateSize for this region.
tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10))
time.Sleep(3 * time.Second)

echo := mustExec(re, cmd, []string{"-u", pdAddr, "config", "set", "enable-diagnostic", "true"}, nil)
re.Contains(echo, "Success!")
checkSchedulerDescribeCommand("balance-region-scheduler", "pending", "1 store(s) RegionNotMatchRule; ")

// scheduler delete command
// Fixme: use RunTestInTwoModes when sync deleted scheduler is supported.
if sche := cluster.GetSchedulingPrimaryServer(); sche == nil {
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil)
checkSchedulerDescribeCommand("balance-region-scheduler", "disabled", "")
}
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil)
checkSchedulerDescribeCommand("balance-region-scheduler", "disabled", "")

mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil)
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil)
Expand All @@ -530,7 +549,7 @@ func mustExec(re *require.Assertions, cmd *cobra.Command, args []string, v inter
if v == nil {
return string(output)
}
re.NoError(json.Unmarshal(output, v))
re.NoError(json.Unmarshal(output, v), string(output))
return ""
}

Expand Down
10 changes: 6 additions & 4 deletions tests/server/api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
"github.com/tikv/pd/pkg/core"
pdoperator "github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/testutil"

Check failure on line 31 in tests/server/api/operator_test.go

View workflow job for this annotation

GitHub Actions / statics

ST1019: package "github.com/tikv/pd/pkg/utils/testutil" is being imported more than once (stylecheck)

Check failure on line 31 in tests/server/api/operator_test.go

View workflow job for this annotation

GitHub Actions / statics

ST1019: package "github.com/tikv/pd/pkg/utils/testutil" is being imported more than once (stylecheck)
tu "github.com/tikv/pd/pkg/utils/testutil"

Check failure on line 32 in tests/server/api/operator_test.go

View workflow job for this annotation

GitHub Actions / statics

ST1019(related information): other import of "github.com/tikv/pd/pkg/utils/testutil" (stylecheck)

Check failure on line 32 in tests/server/api/operator_test.go

View workflow job for this annotation

GitHub Actions / statics

ST1019(related information): other import of "github.com/tikv/pd/pkg/utils/testutil" (stylecheck)
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
)
Expand Down Expand Up @@ -410,11 +411,12 @@
svr := cluster.GetLeaderServer()
for _, testCase := range testCases {
suite.T().Log(testCase.name)
// TODO: remove this after we can sync this config to all servers.
svr.GetPersistOptions().SetPlacementRuleEnabled(testCase.placementRuleEnable)
// wait for the config to take effect in scheduling server when cluster is in api mode.
if sche := cluster.GetSchedulingPrimaryServer(); sche != nil {
sche.GetCluster().GetSchedulerConfig().SetPlacementRuleEnabled(testCase.placementRuleEnable)
} else {
svr.GetRaftCluster().GetOpts().SetPlacementRuleEnabled(testCase.placementRuleEnable)
testutil.Eventually(re, func() bool {
return sche.GetCluster().GetSchedulerConfig().IsPlacementRulesEnabled() == testCase.placementRuleEnable
})
}
manager := svr.GetRaftCluster().GetRuleManager()
if sche := cluster.GetSchedulingPrimaryServer(); sche != nil {
Expand Down
Loading
Loading