diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index 26eab6d94243..0858773ca96d 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -16,7 +16,9 @@ package apis import ( "net/http" + "strconv" "sync" + "time" "github.com/gin-contrib/cors" "github.com/gin-contrib/gzip" @@ -25,6 +27,7 @@ import ( "github.com/joho/godotenv" scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server" "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" "github.com/unrolled/render" @@ -77,7 +80,7 @@ func NewService(srv *scheserver.Service) *Service { apiHandlerEngine.Use(cors.Default()) apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression)) apiHandlerEngine.Use(func(c *gin.Context) { - c.Set(multiservicesapi.ServiceContextKey, srv) + c.Set(multiservicesapi.ServiceContextKey, srv.Server) c.Next() }) apiHandlerEngine.Use(multiservicesapi.ServiceRedirector()) @@ -90,5 +93,199 @@ func NewService(srv *scheserver.Service) *Service { root: root, rd: createIndentRender(), } + s.RegisterOperatorsRouter() + s.RegisterSchedulersRouter() + s.RegisterCheckersRouter() return s } + +// RegisterSchedulersRouter registers the router of the schedulers handler. +func (s *Service) RegisterSchedulersRouter() { + router := s.root.Group("schedulers") + router.GET("", getSchedulers) +} + +// RegisterCheckersRouter registers the router of the checkers handler. +func (s *Service) RegisterCheckersRouter() { + router := s.root.Group("checkers") + router.GET("/:name", getCheckerByName) +} + +// RegisterOperatorsRouter registers the router of the operators handler. +func (s *Service) RegisterOperatorsRouter() { + router := s.root.Group("operators") + router.GET("", getOperators) + router.GET("/:id", getOperatorByID) +} + +// @Tags operators +// @Summary Get a Region's pending operator. +// @Param region_id path int true "A Region's Id" +// @Produce json +// @Success 200 {object} operator.OpWithStatus +// @Failure 400 {string} string "The input is invalid." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /operators/{id} [GET] +func getOperatorByID(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + id := c.Param("id") + + regionID, err := strconv.ParseUint(id, 10, 64) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + opController := svr.GetCoordinator().GetOperatorController() + if opController == nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + + c.JSON(http.StatusOK, opController.GetOperatorStatus(regionID)) +} + +// @Tags operators +// @Summary List pending operators. +// @Param kind query string false "Specify the operator kind." Enums(admin, leader, region) +// @Produce json +// @Success 200 {array} operator.Operator +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /operators [GET] +func getOperators(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + var ( + results []*operator.Operator + ops []*operator.Operator + err error + ) + + opController := svr.GetCoordinator().GetOperatorController() + if opController == nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + kinds := c.QueryArray("kind") + if len(kinds) == 0 { + results = opController.GetOperators() + } else { + for _, kind := range kinds { + switch kind { + case "admin": + ops = opController.GetOperatorsOfKind(operator.OpAdmin) + case "leader": + ops = opController.GetOperatorsOfKind(operator.OpLeader) + case "region": + ops = opController.GetOperatorsOfKind(operator.OpRegion) + case "waiting": + ops = opController.GetWaitingOperators() + } + results = append(results, ops...) + } + } + + c.JSON(http.StatusOK, results) +} + +// @Tags checkers +// @Summary Get if checker is paused +// @Param name path string true "The name of the scheduler." +// @Produce json +// @Success 200 {string} string "Pause or resume the scheduler successfully." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /checkers/{name} [get] +func getCheckerByName(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + name := c.Param("name") + co := svr.GetCoordinator() + isPaused, err := co.IsCheckerPaused(name) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + output := map[string]bool{ + "paused": isPaused, + } + c.JSON(http.StatusOK, output) +} + +type schedulerPausedPeriod struct { + Name string `json:"name"` + PausedAt time.Time `json:"paused_at"` + ResumeAt time.Time `json:"resume_at"` +} + +// @Tags schedulers +// @Summary List all created schedulers by status. +// @Produce json +// @Success 200 {array} string +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /schedulers [get] +func getSchedulers(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + co := svr.GetCoordinator() + sc := co.GetSchedulersController() + schedulers := sc.GetSchedulerNames() + + status := c.Query("status") + _, needTS := c.GetQuery("timestamp") + switch status { + case "paused": + var pausedSchedulers []string + pausedPeriods := []schedulerPausedPeriod{} + for _, scheduler := range schedulers { + paused, err := sc.IsSchedulerPaused(scheduler) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + + if paused { + if needTS { + s := schedulerPausedPeriod{ + Name: scheduler, + PausedAt: time.Time{}, + ResumeAt: time.Time{}, + } + pausedAt, err := sc.GetPausedSchedulerDelayAt(scheduler) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + s.PausedAt = time.Unix(pausedAt, 0) + resumeAt, err := sc.GetPausedSchedulerDelayUntil(scheduler) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + s.ResumeAt = time.Unix(resumeAt, 0) + pausedPeriods = append(pausedPeriods, s) + } else { + pausedSchedulers = append(pausedSchedulers, scheduler) + } + } + } + if needTS { + c.JSON(http.StatusOK, pausedPeriods) + } else { + c.JSON(http.StatusOK, pausedSchedulers) + } + return + case "disabled": + var disabledSchedulers []string + for _, scheduler := range schedulers { + disabled, err := sc.IsSchedulerDisabled(scheduler) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + + if disabled { + disabledSchedulers = append(disabledSchedulers, scheduler) + } + } + c.JSON(http.StatusOK, disabledSchedulers) + default: + c.JSON(http.StatusOK, schedulers) + } +} diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index f58fba2ed0b3..403b676e6916 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -119,8 +119,9 @@ func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConf // TODO: implement the following methods -// UpdateRegionsLabelLevelStats updates the region label level stats. -func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {} +// UpdateRegionsLabelLevelStats updates the status of the region label level by types. +func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { +} // AllocID allocates a new ID. func (c *Cluster) AllocID() (uint64, error) { return 0, nil } diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 845dbe38aa51..5ad011e2b4c3 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -328,6 +328,11 @@ func (s *Server) GetTLSConfig() *grpcutil.TLSConfig { return &s.cfg.Security.TLSConfig } +// GetCoordinator returns the coordinator. +func (s *Server) GetCoordinator() *schedule.Coordinator { + return s.coordinator +} + func (s *Server) initClient() error { tlsConfig, err := s.cfg.Security.ToTLSConfig() if err != nil { @@ -501,6 +506,7 @@ func (s *Server) startServer() (err error) { if err != nil { return err } + s.service = &Service{Server: s} tlsConfig, err := s.cfg.Security.ToTLSConfig() if err != nil { return err @@ -543,7 +549,6 @@ func (s *Server) startServer() (err error) { log.Error("failed to register the service", zap.String("service-name", utils.SchedulingServiceName), errs.ZapError(err)) return err } - atomic.StoreInt64(&s.isRunning, 1) return nil } diff --git a/pkg/mcs/scheduling/server/testutil.go b/pkg/mcs/scheduling/server/testutil.go new file mode 100644 index 000000000000..74baac448082 --- /dev/null +++ b/pkg/mcs/scheduling/server/testutil.go @@ -0,0 +1,78 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "os" + + "github.com/pingcap/log" + "github.com/spf13/pflag" + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/mcs/scheduling/server/config" + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/testutil" +) + +// NewTestServer creates a resource manager server for testing. +func NewTestServer(ctx context.Context, re *require.Assertions, cfg *config.Config) (*Server, testutil.CleanupFunc, error) { + // New zap logger + err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) + re.NoError(err) + log.ReplaceGlobals(cfg.Logger, cfg.LogProps) + // Flushing any buffered log entries + defer log.Sync() + + s := CreateServer(ctx, cfg) + if err = s.Run(); err != nil { + return nil, nil, err + } + + cleanup := func() { + s.Close() + os.RemoveAll(cfg.DataDir) + } + return s, cleanup, nil +} + +// GenerateConfig generates a new config with the given options. +func GenerateConfig(c *config.Config) (*config.Config, error) { + arguments := []string{ + "--listen-addr=" + c.ListenAddr, + "--advertise-listen-addr=" + c.AdvertiseListenAddr, + "--backend-endpoints=" + c.BackendEndpoints, + } + + flagSet := pflag.NewFlagSet("test", pflag.ContinueOnError) + flagSet.BoolP("version", "V", false, "print version information and exit") + flagSet.StringP("config", "", "", "config file") + flagSet.StringP("backend-endpoints", "", "", "url for etcd client") + flagSet.StringP("listen-addr", "", "", "listen address for tso service") + flagSet.StringP("advertise-listen-addr", "", "", "advertise urls for listen address (default '${listen-addr}')") + flagSet.StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs") + flagSet.StringP("cert", "", "", "path of file that contains X509 certificate in PEM format") + flagSet.StringP("key", "", "", "path of file that contains X509 key in PEM format") + err := flagSet.Parse(arguments) + if err != nil { + return nil, err + } + cfg := config.NewConfig() + err = cfg.Parse(flagSet) + if err != nil { + return nil, err + } + + return cfg, nil +} diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index 1385a1203374..6ee995178a9b 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -689,6 +689,21 @@ func (oc *Controller) GetWaitingOperators() []*Operator { return oc.wop.ListOperator() } +// GetOperatorsOfKind returns the running operators of the kind. +func (oc *Controller) GetOperatorsOfKind(mask OpKind) []*Operator { + oc.RLock() + defer oc.RUnlock() + + operators := make([]*Operator, 0, len(oc.operators)) + for _, op := range oc.operators { + if op.Kind()&mask != 0 { + operators = append(operators, op) + } + } + + return operators +} + // SendScheduleCommand sends a command to the region. func (oc *Controller) SendScheduleCommand(region *core.RegionInfo, step OpStep, source string) { log.Info("send schedule command", diff --git a/server/handler.go b/server/handler.go index 708a94c1bdc3..d86b889b63d8 100644 --- a/server/handler.go +++ b/server/handler.go @@ -455,32 +455,29 @@ func (h *Handler) GetWaitingOperators() ([]*operator.Operator, error) { // GetAdminOperators returns the running admin operators. func (h *Handler) GetAdminOperators() ([]*operator.Operator, error) { - return h.GetOperatorsOfKind(operator.OpAdmin) + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetOperatorsOfKind(operator.OpAdmin), nil } // GetLeaderOperators returns the running leader operators. func (h *Handler) GetLeaderOperators() ([]*operator.Operator, error) { - return h.GetOperatorsOfKind(operator.OpLeader) + c, err := h.GetOperatorController() + if err != nil { + return nil, err + } + return c.GetOperatorsOfKind(operator.OpLeader), nil } // GetRegionOperators returns the running region operators. func (h *Handler) GetRegionOperators() ([]*operator.Operator, error) { - return h.GetOperatorsOfKind(operator.OpRegion) -} - -// GetOperatorsOfKind returns the running operators of the kind. -func (h *Handler) GetOperatorsOfKind(mask operator.OpKind) ([]*operator.Operator, error) { - ops, err := h.GetOperators() + c, err := h.GetOperatorController() if err != nil { return nil, err } - var results []*operator.Operator - for _, op := range ops { - if op.Kind()&mask != 0 { - results = append(results, op) - } - } - return results, nil + return c.GetOperatorsOfKind(operator.OpRegion), nil } // GetHistory returns finished operators' history since start. diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go new file mode 100644 index 000000000000..645204e1dde7 --- /dev/null +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -0,0 +1,104 @@ +package scheduling_test + +import ( + "context" + "fmt" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/suite" + _ "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1" + "github.com/tikv/pd/pkg/utils/tempurl" + "github.com/tikv/pd/pkg/utils/testutil" + tu "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/tests" +) + +var testDialClient = &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: true, + }, +} + +type apiTestSuite struct { + suite.Suite + ctx context.Context + cleanupFunc testutil.CleanupFunc + cluster *tests.TestCluster + server *tests.TestServer + backendEndpoints string + dialClient *http.Client +} + +func TestAPI(t *testing.T) { + suite.Run(t, &apiTestSuite{}) +} + +func (suite *apiTestSuite) SetupSuite() { + ctx, cancel := context.WithCancel(context.Background()) + suite.ctx = ctx + cluster, err := tests.NewTestAPICluster(suite.ctx, 1) + suite.cluster = cluster + suite.NoError(err) + suite.NoError(cluster.RunInitialServers()) + suite.NotEmpty(cluster.WaitLeader()) + suite.server = cluster.GetServer(cluster.GetLeader()) + suite.NoError(suite.server.BootstrapCluster()) + suite.backendEndpoints = suite.server.GetAddr() + suite.dialClient = &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: true, + }, + } + suite.cleanupFunc = func() { + cancel() + } +} + +func (suite *apiTestSuite) TestGetCheckerByName() { + testCases := []struct { + name string + }{ + {name: "learner"}, + {name: "replica"}, + {name: "rule"}, + {name: "split"}, + {name: "merge"}, + {name: "joint-state"}, + } + + re := suite.Require() + s, cleanup := tests.StartSingleSchedulingTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) + defer cleanup() + testutil.Eventually(re, func() bool { + return s.IsServing() + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + addr := s.GetAddr() + urlPrefix := fmt.Sprintf("%s/scheduling/api/v1/checkers", addr) + co := s.GetCoordinator() + + for _, testCase := range testCases { + name := testCase.name + // normal run + resp := make(map[string]interface{}) + err := tu.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, name), &resp) + suite.NoError(err) + suite.False(resp["paused"].(bool)) + // paused + err = co.PauseOrResumeChecker(name, 30) + suite.NoError(err) + resp = make(map[string]interface{}) + err = tu.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, name), &resp) + suite.NoError(err) + suite.True(resp["paused"].(bool)) + // resumed + err = co.PauseOrResumeChecker(name, 1) + suite.NoError(err) + time.Sleep(time.Second) + resp = make(map[string]interface{}) + err = tu.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, name), &resp) + suite.NoError(err) + suite.False(resp["paused"].(bool)) + } +} diff --git a/tests/testutil.go b/tests/testutil.go index 25ff86c274fb..bd914ef9e55b 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -24,6 +24,8 @@ import ( "github.com/stretchr/testify/require" bs "github.com/tikv/pd/pkg/basicserver" rm "github.com/tikv/pd/pkg/mcs/resourcemanager/server" + scheduling "github.com/tikv/pd/pkg/mcs/scheduling/server" + sc "github.com/tikv/pd/pkg/mcs/scheduling/server/config" tso "github.com/tikv/pd/pkg/mcs/tso/server" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/testutil" @@ -100,6 +102,23 @@ func NewTSOTestServer(ctx context.Context, cfg *tso.Config) (*tso.Server, testut return s, cleanup, nil } +// StartSingleSchedulingTestServer creates and starts a scheduling server with default config for testing. +func StartSingleSchedulingTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*scheduling.Server, func()) { + cfg := sc.NewConfig() + cfg.BackendEndpoints = backendEndpoints + cfg.ListenAddr = listenAddrs + cfg, err := scheduling.GenerateConfig(cfg) + re.NoError(err) + + s, cleanup, err := scheduling.NewTestServer(ctx, re, cfg) + re.NoError(err) + testutil.Eventually(re, func() bool { + return !s.IsClosed() + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + return s, cleanup +} + // WaitForPrimaryServing waits for one of servers being elected to be the primary/leader func WaitForPrimaryServing(re *require.Assertions, serverMap map[string]bs.Server) string { var primary string