From 3c3518fd76184f552da4cf5c0376f8fc8c4c509e Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 27 Sep 2023 22:15:38 +0800 Subject: [PATCH] temp Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/apis/v1/api.go | 137 ++++++++-------- pkg/schedule/handler/handler.go | 150 ++++++++++++++++++ pkg/utils/apiutil/serverapi/middleware.go | 5 + server/api/checker.go | 5 +- server/api/diagnostic.go | 19 +-- server/api/scheduler.go | 74 +-------- server/api/scheduler_test.go | 24 +-- server/api/server.go | 27 +++- server/handler.go | 96 ----------- tests/integrations/mcs/scheduling/api_test.go | 39 +++-- tests/integrations/mcs/tso/api_test.go | 10 +- 11 files changed, 307 insertions(+), 279 deletions(-) diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index e66bf00ef945..7bfe6a8b2bbb 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -18,7 +18,6 @@ import ( "net/http" "strconv" "sync" - "time" "github.com/gin-contrib/cors" "github.com/gin-contrib/gzip" @@ -120,12 +119,14 @@ func NewService(srv *scheserver.Service) *Service { func (s *Service) RegisterSchedulersRouter() { router := s.root.Group("schedulers") router.GET("", getSchedulers) + router.GET("/diagnostic/:name", getDiagnosticResult) } // RegisterCheckersRouter registers the router of the checkers handler. func (s *Service) RegisterCheckersRouter() { router := s.root.Group("checkers") router.GET("/:name", getCheckerByName) + router.POST("/:name", pauseOrResumeChecker) } // RegisterOperatorsRouter registers the router of the operators handler. @@ -279,24 +280,54 @@ func createOperator(c *gin.Context) { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /checkers/{name} [get] func getCheckerByName(c *gin.Context) { - svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + handler := c.MustGet(handlerKey).(*handler.Handler) name := c.Param("name") - co := svr.GetCoordinator() - isPaused, err := co.IsCheckerPaused(name) + output, err := handler.GetCheckerStatus(name) if err != nil { c.String(http.StatusInternalServerError, err.Error()) return } - output := map[string]bool{ - "paused": isPaused, - } c.IndentedJSON(http.StatusOK, output) } -type schedulerPausedPeriod struct { - Name string `json:"name"` - PausedAt time.Time `json:"paused_at"` - ResumeAt time.Time `json:"resume_at"` +// FIXME: details of input json body params +// @Tags checker +// @Summary Pause or resume region merge. +// @Accept json +// @Param name path string true "The name of the checker." +// @Param body body object true "json params" +// @Produce json +// @Success 200 {string} string "Pause or resume the scheduler successfully." +// @Failure 400 {string} string "Bad format request." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /checker/{name} [post] +func pauseOrResumeChecker(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + var input map[string]int + if err := c.BindJSON(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + name := c.Param("name") + t, ok := input["delay"] + if !ok { + c.String(http.StatusBadRequest, "missing pause time") + return + } + if t < 0 { + c.String(http.StatusBadRequest, "delay cannot be negative") + return + } + if err := handler.PauseOrResumeChecker(name, int64(t)); err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + if t == 0 { + c.String(http.StatusOK, "Resume the checker successfully.") + } else { + c.String(http.StatusOK, "Pause the checker successfully.") + } } // @Tags schedulers @@ -306,70 +337,30 @@ type schedulerPausedPeriod struct { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /schedulers [get] func getSchedulers(c *gin.Context) { - svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) - co := svr.GetCoordinator() - sc := co.GetSchedulersController() - schedulers := sc.GetSchedulerNames() - + handler := c.MustGet(handlerKey).(*handler.Handler) status := c.Query("status") _, needTS := c.GetQuery("timestamp") - switch status { - case "paused": - var pausedSchedulers []string - pausedPeriods := []schedulerPausedPeriod{} - for _, scheduler := range schedulers { - paused, err := sc.IsSchedulerPaused(scheduler) - if err != nil { - c.String(http.StatusInternalServerError, err.Error()) - return - } - - if paused { - if needTS { - s := schedulerPausedPeriod{ - Name: scheduler, - PausedAt: time.Time{}, - ResumeAt: time.Time{}, - } - pausedAt, err := sc.GetPausedSchedulerDelayAt(scheduler) - if err != nil { - c.String(http.StatusInternalServerError, err.Error()) - return - } - s.PausedAt = time.Unix(pausedAt, 0) - resumeAt, err := sc.GetPausedSchedulerDelayUntil(scheduler) - if err != nil { - c.String(http.StatusInternalServerError, err.Error()) - return - } - s.ResumeAt = time.Unix(resumeAt, 0) - pausedPeriods = append(pausedPeriods, s) - } else { - pausedSchedulers = append(pausedSchedulers, scheduler) - } - } - } - if needTS { - c.IndentedJSON(http.StatusOK, pausedPeriods) - } else { - c.IndentedJSON(http.StatusOK, pausedSchedulers) - } + output, err := handler.GetSchedulerByStatus(status, needTS) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, output) +} + +// @Tags schedulers +// @Summary List schedulers diagnostic result. +// @Produce json +// @Success 200 {array} string +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /schedulers/diagnostic/{name} [get] +func getDiagnosticResult(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + name := c.Param("name") + result, err := handler.GetDiagnosticResult(name) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) return - case "disabled": - var disabledSchedulers []string - for _, scheduler := range schedulers { - disabled, err := sc.IsSchedulerDisabled(scheduler) - if err != nil { - c.String(http.StatusInternalServerError, err.Error()) - return - } - - if disabled { - disabledSchedulers = append(disabledSchedulers, scheduler) - } - } - c.IndentedJSON(http.StatusOK, disabledSchedulers) - default: - c.IndentedJSON(http.StatusOK, schedulers) } + c.IndentedJSON(http.StatusOK, result) } diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index d9c162ac1cc6..d34b4c49d9b9 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -33,7 +33,9 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/schedule/scatter" + "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/utils/typeutil" + "go.uber.org/zap" ) // Server is the interface for handler about schedule. @@ -720,3 +722,151 @@ func parseStoreIDsAndPeerRole(ids interface{}, roles interface{}) (map[uint64]pl } return storeIDToPeerRole, true } + +// GetCheckerStatus returns the status of the checker. +func (h *Handler) GetCheckerStatus(name string) (map[string]bool, error) { + co := h.GetCoordinator() + if co == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + isPaused, err := co.IsCheckerPaused(name) + if err != nil { + return nil, err + } + return map[string]bool{ + "paused": isPaused, + }, nil +} + +// GetSchedulerNames returns all names of schedulers. +func (h *Handler) GetSchedulerNames() ([]string, error) { + co := h.GetCoordinator() + if co == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + return co.GetSchedulersController().GetSchedulerNames(), nil +} + +type schedulerPausedPeriod struct { + Name string `json:"name"` + PausedAt time.Time `json:"paused_at"` + ResumeAt time.Time `json:"resume_at"` +} + +func (h *Handler) GetSchedulerByStatus(status string, needTS bool) (interface{}, error) { + co := h.GetCoordinator() + if co == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + sc := co.GetSchedulersController() + schedulers := sc.GetSchedulerNames() + switch status { + case "paused": + var pausedSchedulers []string + pausedPeriods := []schedulerPausedPeriod{} + for _, scheduler := range schedulers { + paused, err := sc.IsSchedulerPaused(scheduler) + if err != nil { + return nil, err + } + if paused { + if needTS { + s := schedulerPausedPeriod{ + Name: scheduler, + PausedAt: time.Time{}, + ResumeAt: time.Time{}, + } + pausedAt, err := sc.GetPausedSchedulerDelayAt(scheduler) + if err != nil { + return nil, err + } + s.PausedAt = time.Unix(pausedAt, 0) + resumeAt, err := sc.GetPausedSchedulerDelayUntil(scheduler) + if err != nil { + return nil, err + } + s.ResumeAt = time.Unix(resumeAt, 0) + pausedPeriods = append(pausedPeriods, s) + } else { + pausedSchedulers = append(pausedSchedulers, scheduler) + } + } + } + if needTS { + return pausedPeriods, nil + } else { + return pausedSchedulers, nil + } + case "disabled": + var disabledSchedulers []string + for _, scheduler := range schedulers { + disabled, err := sc.IsSchedulerDisabled(scheduler) + if err != nil { + return nil, err + } + if disabled { + disabledSchedulers = append(disabledSchedulers, scheduler) + } + } + return disabledSchedulers, nil + default: + return schedulers, nil + } +} + +func (h *Handler) GetDiagnosticResult(name string) (*schedulers.DiagnosticResult, error) { + if _, ok := schedulers.DiagnosableSummaryFunc[name]; !ok { + return nil, errs.ErrSchedulerUndiagnosable.FastGenByArgs(name) + } + co := h.GetCoordinator() + if co == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + result, err := co.GetDiagnosticResult(name) + if err != nil { + return nil, err + } + return result, nil +} + +// PauseOrResumeScheduler pauses a scheduler for delay seconds or resume a paused scheduler. +// t == 0 : resume scheduler. +// t > 0 : scheduler delays t seconds. +func (h *Handler) PauseOrResumeScheduler(name string, t int64) (err error) { + co := h.GetCoordinator() + if co == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } + if err = co.GetSchedulersController().PauseOrResumeScheduler(name, t); err != nil { + if t == 0 { + log.Error("can not resume scheduler", zap.String("scheduler-name", name), errs.ZapError(err)) + } else { + log.Error("can not pause scheduler", zap.String("scheduler-name", name), errs.ZapError(err)) + } + } else { + if t == 0 { + log.Info("resume scheduler successfully", zap.String("scheduler-name", name)) + } else { + log.Info("pause scheduler successfully", zap.String("scheduler-name", name), zap.Int64("pause-seconds", t)) + } + } + return err +} + +// PauseOrResumeChecker pauses checker for delay seconds or resume checker +// t == 0 : resume checker. +// t > 0 : checker delays t seconds. +func (h *Handler) PauseOrResumeChecker(name string, t int64) (err error) { + co := h.GetCoordinator() + if co == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } + if err = co.PauseOrResumeChecker(name, t); err != nil { + if t == 0 { + log.Error("can not resume checker", zap.String("checker-name", name), errs.ZapError(err)) + } else { + log.Error("can not pause checker", zap.String("checker-name", name), errs.ZapError(err)) + } + } + return err +} diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index 063ad042dbba..566d52972390 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -112,6 +112,9 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri if len(h.microserviceRedirectRules) == 0 { return false, "" } + // Remove trailing '/' from the URL path + // It will be helpful when matching the redirect rules "schedulers" or "schedulers/{name}" + r.URL.Path = strings.TrimRight(r.URL.Path, "/") for _, rule := range h.microserviceRedirectRules { if strings.HasPrefix(r.URL.Path, rule.matchPath) && slice.Contains(rule.matchMethods, r.Method) { addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName) @@ -131,6 +134,8 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri } else { r.URL.Path = rule.targetPath } + log.Info("redirect to micro service", zap.String("path", r.URL.Path), zap.String("target", addr), + zap.String("method", r.Method)) return true, addr } } diff --git a/server/api/checker.go b/server/api/checker.go index 09dc81366b90..709e641c37b4 100644 --- a/server/api/checker.go +++ b/server/api/checker.go @@ -83,13 +83,10 @@ func (c *checkerHandler) PauseOrResumeChecker(w http.ResponseWriter, r *http.Req // @Router /checker/{name} [get] func (c *checkerHandler) GetCheckerStatus(w http.ResponseWriter, r *http.Request) { name := mux.Vars(r)["name"] - isPaused, err := c.IsCheckerPaused(name) + output, err := c.Handler.GetCheckerStatus(name) if err != nil { c.r.JSON(w, http.StatusInternalServerError, err.Error()) return } - output := map[string]bool{ - "paused": isPaused, - } c.r.JSON(w, http.StatusOK, output) } diff --git a/server/api/diagnostic.go b/server/api/diagnostic.go index f83f9c83efb3..1a05b0d83b8d 100644 --- a/server/api/diagnostic.go +++ b/server/api/diagnostic.go @@ -18,32 +18,27 @@ import ( "net/http" "github.com/gorilla/mux" - "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/server" "github.com/unrolled/render" ) type diagnosticHandler struct { - svr *server.Server - rd *render.Render + handler *server.Handler + svr *server.Server + rd *render.Render } func newDiagnosticHandler(svr *server.Server, rd *render.Render) *diagnosticHandler { return &diagnosticHandler{ - svr: svr, - rd: rd, + handler: svr.GetHandler(), + svr: svr, + rd: rd, } } func (h *diagnosticHandler) GetDiagnosticResult(w http.ResponseWriter, r *http.Request) { name := mux.Vars(r)["name"] - if _, ok := schedulers.DiagnosableSummaryFunc[name]; !ok { - h.rd.JSON(w, http.StatusBadRequest, errs.ErrSchedulerUndiagnosable.FastGenByArgs(name).Error()) - return - } - rc := getCluster(r) - result, err := rc.GetCoordinator().GetDiagnosticResult(name) + result, err := h.handler.GetDiagnosticResult(name) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return diff --git a/server/api/scheduler.go b/server/api/scheduler.go index c2691ea98269..dea798edb390 100644 --- a/server/api/scheduler.go +++ b/server/api/scheduler.go @@ -18,7 +18,6 @@ import ( "net/http" "net/url" "strings" - "time" "github.com/gorilla/mux" "github.com/pingcap/errors" @@ -43,12 +42,6 @@ func newSchedulerHandler(svr *server.Server, r *render.Render) *schedulerHandler } } -type schedulerPausedPeriod struct { - Name string `json:"name"` - PausedAt time.Time `json:"paused_at"` - ResumeAt time.Time `json:"resume_at"` -} - // @Tags scheduler // @Summary List all created schedulers by status. // @Produce json @@ -56,73 +49,14 @@ type schedulerPausedPeriod struct { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /schedulers [get] func (h *schedulerHandler) GetSchedulers(w http.ResponseWriter, r *http.Request) { - schedulers, err := h.Handler.GetSchedulers() + status := r.URL.Query().Get("status") + _, needTS := r.URL.Query()["timestamp"] + output, err := h.Handler.GetSchedulerByStatus(status, needTS) if err != nil { h.r.JSON(w, http.StatusInternalServerError, err.Error()) return } - - status := r.URL.Query().Get("status") - _, tsFlag := r.URL.Query()["timestamp"] - switch status { - case "paused": - var pausedSchedulers []string - pausedPeriods := []schedulerPausedPeriod{} - for _, scheduler := range schedulers { - paused, err := h.Handler.IsSchedulerPaused(scheduler) - if err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - - if paused { - if tsFlag { - s := schedulerPausedPeriod{ - Name: scheduler, - PausedAt: time.Time{}, - ResumeAt: time.Time{}, - } - pausedAt, err := h.Handler.GetPausedSchedulerDelayAt(scheduler) - if err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - s.PausedAt = time.Unix(pausedAt, 0) - resumeAt, err := h.Handler.GetPausedSchedulerDelayUntil(scheduler) - if err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - s.ResumeAt = time.Unix(resumeAt, 0) - pausedPeriods = append(pausedPeriods, s) - } else { - pausedSchedulers = append(pausedSchedulers, scheduler) - } - } - } - if tsFlag { - h.r.JSON(w, http.StatusOK, pausedPeriods) - } else { - h.r.JSON(w, http.StatusOK, pausedSchedulers) - } - return - case "disabled": - var disabledSchedulers []string - for _, scheduler := range schedulers { - disabled, err := h.Handler.IsSchedulerDisabled(scheduler) - if err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - - if disabled { - disabledSchedulers = append(disabledSchedulers, scheduler) - } - } - h.r.JSON(w, http.StatusOK, disabledSchedulers) - default: - h.r.JSON(w, http.StatusOK, schedulers) - } + h.r.JSON(w, http.StatusOK, output) } // FIXME: details of input json body params diff --git a/server/api/scheduler_test.go b/server/api/scheduler_test.go index b015bbe8f524..978bc0423987 100644 --- a/server/api/scheduler_test.go +++ b/server/api/scheduler_test.go @@ -481,12 +481,15 @@ func (suite *scheduleTestSuite) TestAPI() { err = tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/all", pauseArgs, tu.StatusOK(re)) suite.NoError(err) handler := suite.svr.GetHandler() + rc, err := handler.GetRaftCluster() + suite.NoError(err) + sc := rc.GetCoordinator().GetSchedulersController() for _, testCase := range testCases { createdName := testCase.createdName if createdName == "" { createdName = testCase.name } - isPaused, err := handler.IsSchedulerPaused(createdName) + isPaused, err := sc.IsSchedulerPaused(createdName) suite.NoError(err) suite.True(isPaused) } @@ -501,7 +504,7 @@ func (suite *scheduleTestSuite) TestAPI() { if createdName == "" { createdName = testCase.name } - isPaused, err := handler.IsSchedulerPaused(createdName) + isPaused, err := sc.IsSchedulerPaused(createdName) suite.NoError(err) suite.False(isPaused) } @@ -522,7 +525,7 @@ func (suite *scheduleTestSuite) TestAPI() { if createdName == "" { createdName = testCase.name } - isPaused, err := handler.IsSchedulerPaused(createdName) + isPaused, err := sc.IsSchedulerPaused(createdName) suite.NoError(err) suite.False(isPaused) } @@ -598,9 +601,12 @@ func (suite *scheduleTestSuite) testPauseOrResume(name, createdName string, body err := tu.CheckPostJSON(testDialClient, suite.urlPrefix, body, tu.StatusOK(re)) suite.NoError(err) handler := suite.svr.GetHandler() - sches, err := handler.GetSchedulers() + sches, err := handler.GetSchedulerNames() suite.NoError(err) suite.Equal(createdName, sches[0]) + rc, err := handler.GetRaftCluster() + suite.NoError(err) + sc := rc.GetCoordinator().GetSchedulersController() // test pause. input := make(map[string]interface{}) @@ -609,7 +615,7 @@ func (suite *scheduleTestSuite) testPauseOrResume(name, createdName string, body suite.NoError(err) err = tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/"+createdName, pauseArgs, tu.StatusOK(re)) suite.NoError(err) - isPaused, err := handler.IsSchedulerPaused(createdName) + isPaused, err := sc.IsSchedulerPaused(createdName) suite.NoError(err) suite.True(isPaused) input["delay"] = 1 @@ -617,13 +623,13 @@ func (suite *scheduleTestSuite) testPauseOrResume(name, createdName string, body suite.NoError(err) err = tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/"+createdName, pauseArgs, tu.StatusOK(re)) suite.NoError(err) - pausedAt, err := handler.GetPausedSchedulerDelayAt(createdName) + pausedAt, err := rc.GetPausedSchedulerDelayAt(createdName) suite.NoError(err) - resumeAt, err := handler.GetPausedSchedulerDelayUntil(createdName) + resumeAt, err := rc.GetPausedSchedulerDelayUntil(createdName) suite.NoError(err) suite.Equal(int64(1), resumeAt-pausedAt) time.Sleep(time.Second) - isPaused, err = handler.IsSchedulerPaused(createdName) + isPaused, err = sc.IsSchedulerPaused(createdName) suite.NoError(err) suite.False(isPaused) @@ -639,7 +645,7 @@ func (suite *scheduleTestSuite) testPauseOrResume(name, createdName string, body suite.NoError(err) err = tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/"+createdName, pauseArgs, tu.StatusOK(re)) suite.NoError(err) - isPaused, err = handler.IsSchedulerPaused(createdName) + isPaused, err = sc.IsSchedulerPaused(createdName) suite.NoError(err) suite.False(isPaused) } diff --git a/server/api/server.go b/server/api/server.go index 0094d8eb5dd7..23fc35aa8d94 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -39,6 +39,22 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP prefix := apiPrefix + "/api/v1" r := createRouter(apiPrefix, svr) router := mux.NewRouter() + + // Need to redirect the following requests: + // "/admin/reset-ts", http.MethodPost + // "/operators", http.MethodGet + // "/operators", http.MethodPost + // "/operators/records",http.MethodGet + // "/operators/{region_id}", http.MethodGet + // "/operators/{region_id}", http.MethodDelete + // "/checker/{name}", http.MethodPost + // "/checker/{name}", http.MethodGet + // "/schedulers", http.MethodGet + // "/schedulers/{name}", http.MethodPost + // "/schedulers/diagnostic/{name}", http.MethodGet + // Note: following requests are not redirected: + // "/schedulers", http.MethodPost + // "/schedulers/{name}", http.MethodDelete router.PathPrefix(apiPrefix).Handler(negroni.New( serverapi.NewRuntimeServiceValidator(svr, group), serverapi.NewRedirector(svr, @@ -52,18 +68,23 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP scheapi.APIPathPrefix+"/operators", mcs.SchedulingServiceName, []string{http.MethodPost, http.MethodGet, http.MethodDelete}), - // because the writing of all the meta information of the scheduling service is in the API server, - // we only forward read-only requests about checkers and schedulers to the scheduling service. serverapi.MicroserviceRedirectRule( prefix+"/checker", // Note: this is a typo in the original code scheapi.APIPathPrefix+"/checkers", mcs.SchedulingServiceName, - []string{http.MethodGet}), + []string{http.MethodPost, http.MethodGet}), + // because the writing of all the meta information of the scheduling service is in the API server, + // we should not post and delete the scheduler directly in the scheduling service. serverapi.MicroserviceRedirectRule( prefix+"/schedulers", scheapi.APIPathPrefix+"/schedulers", mcs.SchedulingServiceName, []string{http.MethodGet}), + serverapi.MicroserviceRedirectRule( + prefix+"/schedulers/", // Note: this means "/schedulers/{name}" + scheapi.APIPathPrefix+"/schedulers", + mcs.SchedulingServiceName, + []string{http.MethodPost}), // TODO: we need to consider the case that v1 api not support restful api. // we might change the previous path parameters to query parameters. ), diff --git a/server/handler.go b/server/handler.go index ace7592cd7c5..7fb74d5c7153 100644 --- a/server/handler.go +++ b/server/handler.go @@ -92,24 +92,6 @@ func (h *Handler) GetRaftCluster() (*cluster.RaftCluster, error) { return rc, nil } -// IsSchedulerPaused returns whether scheduler is paused. -func (h *Handler) IsSchedulerPaused(name string) (bool, error) { - rc, err := h.GetRaftCluster() - if err != nil { - return false, err - } - return rc.GetCoordinator().GetSchedulersController().IsSchedulerPaused(name) -} - -// IsSchedulerDisabled returns whether scheduler is disabled. -func (h *Handler) IsSchedulerDisabled(name string) (bool, error) { - rc, err := h.GetRaftCluster() - if err != nil { - return false, err - } - return rc.GetCoordinator().GetSchedulersController().IsSchedulerDisabled(name) -} - // IsSchedulerExisted returns whether scheduler is existed. func (h *Handler) IsSchedulerExisted(name string) (bool, error) { rc, err := h.GetRaftCluster() @@ -124,24 +106,6 @@ func (h *Handler) GetScheduleConfig() *sc.ScheduleConfig { return h.s.GetScheduleConfig() } -// GetSchedulers returns all names of schedulers. -func (h *Handler) GetSchedulers() ([]string, error) { - c, err := h.GetRaftCluster() - if err != nil { - return nil, err - } - return c.GetSchedulers(), nil -} - -// IsCheckerPaused returns if checker is paused -func (h *Handler) IsCheckerPaused(name string) (bool, error) { - rc, err := h.GetRaftCluster() - if err != nil { - return false, err - } - return rc.GetCoordinator().IsCheckerPaused(name) -} - // GetStores returns all stores in the cluster. func (h *Handler) GetStores() ([]*core.StoreInfo, error) { rc := h.s.GetRaftCluster() @@ -269,48 +233,6 @@ func (h *Handler) RemoveScheduler(name string) error { return err } -// PauseOrResumeScheduler pauses a scheduler for delay seconds or resume a paused scheduler. -// t == 0 : resume scheduler. -// t > 0 : scheduler delays t seconds. -func (h *Handler) PauseOrResumeScheduler(name string, t int64) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - if err = c.PauseOrResumeScheduler(name, t); err != nil { - if t == 0 { - log.Error("can not resume scheduler", zap.String("scheduler-name", name), errs.ZapError(err)) - } else { - log.Error("can not pause scheduler", zap.String("scheduler-name", name), errs.ZapError(err)) - } - } else { - if t == 0 { - log.Info("resume scheduler successfully", zap.String("scheduler-name", name)) - } else { - log.Info("pause scheduler successfully", zap.String("scheduler-name", name), zap.Int64("pause-seconds", t)) - } - } - return err -} - -// PauseOrResumeChecker pauses checker for delay seconds or resume checker -// t == 0 : resume checker. -// t > 0 : checker delays t seconds. -func (h *Handler) PauseOrResumeChecker(name string, t int64) error { - c, err := h.GetRaftCluster() - if err != nil { - return err - } - if err = c.PauseOrResumeChecker(name, t); err != nil { - if t == 0 { - log.Error("can not resume checker", zap.String("checker-name", name), errs.ZapError(err)) - } else { - log.Error("can not pause checker", zap.String("checker-name", name), errs.ZapError(err)) - } - } - return err -} - // AddBalanceLeaderScheduler adds a balance-leader-scheduler. func (h *Handler) AddBalanceLeaderScheduler() error { return h.AddScheduler(schedulers.BalanceLeaderType) @@ -680,21 +602,3 @@ func (h *Handler) AddEvictOrGrant(storeID float64, name string) error { } return nil } - -// GetPausedSchedulerDelayAt returns paused unix timestamp when a scheduler is paused -func (h *Handler) GetPausedSchedulerDelayAt(name string) (int64, error) { - rc, err := h.GetRaftCluster() - if err != nil { - return -1, err - } - return rc.GetPausedSchedulerDelayAt(name) -} - -// GetPausedSchedulerDelayUntil returns resume unix timestamp when a scheduler is paused -func (h *Handler) GetPausedSchedulerDelayUntil(name string) (int64, error) { - rc, err := h.GetRaftCluster() - if err != nil { - return -1, err - } - return rc.GetPausedSchedulerDelayUntil(name) -} diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index e91d3cd633e0..1e5f9baf1bd5 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -112,16 +112,16 @@ func (suite *apiTestSuite) TestGetCheckerByName() { func (suite *apiTestSuite) TestAPIForward() { re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/apiutil/serverapi/checkHeader", "return(true)")) + defer func() { + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/apiutil/serverapi/checkHeader")) + }() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.backendEndpoints) re.NoError(err) defer tc.Destroy() tc.WaitForPrimaryServing(re) - failpoint.Enable("github.com/tikv/pd/pkg/utils/apiutil/serverapi/checkHeader", "return(true)") - defer func() { - failpoint.Disable("github.com/tikv/pd/pkg/utils/apiutil/serverapi/checkHeader") - }() - urlPrefix := fmt.Sprintf("%s/pd/api/v1", suite.backendEndpoints) var slice []string var resp map[string]interface{} @@ -148,7 +148,7 @@ func (suite *apiTestSuite) TestAPIForward() { testutil.StatusNotOK(re), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) - // Test checker: only read-only requests are forwarded + // Test checker: err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "checker/merge"), &resp, testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) @@ -159,10 +159,17 @@ func (suite *apiTestSuite) TestAPIForward() { pauseArgs, err := json.Marshal(input) suite.NoError(err) err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "checker/merge"), pauseArgs, - testutil.StatusOK(re), testutil.WithoutHeader(re, apiutil.PDRedirectorHeader)) + testutil.StatusOK(re), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) suite.NoError(err) - // Test scheduler: only read-only requests are forwarded + // Test scheduler: + // Need to redirect: + // "/schedulers", http.MethodGet + // "/schedulers/{name}", http.MethodPost + // "/schedulers/diagnostic/{name}", http.MethodGet + // Should not redirect: + // "/schedulers", http.MethodPost + // "/schedulers/{name}", http.MethodDelete err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "schedulers"), &slice, testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) @@ -171,7 +178,19 @@ func (suite *apiTestSuite) TestAPIForward() { input["delay"] = 30 pauseArgs, err = json.Marshal(input) suite.NoError(err) - err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "schedulers/all"), pauseArgs, - testutil.StatusOK(re), testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "schedulers/balance-leader-scheduler"), pauseArgs, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + suite.NoError(err) + + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "schedulers/diagnostic/balance-leader-scheduler"), &resp, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) suite.NoError(err) + + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "schedulers"), pauseArgs, + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) + + err = testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "schedulers/balance-leader-scheduler"), + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) } diff --git a/tests/integrations/mcs/tso/api_test.go b/tests/integrations/mcs/tso/api_test.go index 7e870fbc1989..81cc798851fc 100644 --- a/tests/integrations/mcs/tso/api_test.go +++ b/tests/integrations/mcs/tso/api_test.go @@ -30,6 +30,7 @@ import ( apis "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" @@ -100,6 +101,11 @@ func (suite *tsoAPITestSuite) TestGetKeyspaceGroupMembers() { func (suite *tsoAPITestSuite) TestForwardResetTS() { re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/apiutil/serverapi/checkHeader", "return(true)")) + defer func() { + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/apiutil/serverapi/checkHeader")) + }() + primary := suite.tsoCluster.WaitForDefaultPrimaryServing(re) re.NotNil(primary) url := suite.backendEndpoints + "/pd/api/v1/admin/reset-ts" @@ -107,13 +113,13 @@ func (suite *tsoAPITestSuite) TestForwardResetTS() { // Test reset ts input := []byte(`{"tso":"121312", "force-use-larger":true}`) err := testutil.CheckPostJSON(dialClient, url, input, - testutil.StatusOK(re), testutil.StringContain(re, "Reset ts successfully")) + testutil.StatusOK(re), testutil.StringContain(re, "Reset ts successfully"), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) suite.NoError(err) // Test reset ts with invalid tso input = []byte(`{}`) err = testutil.CheckPostJSON(dialClient, url, input, - testutil.StatusNotOK(re), testutil.StringContain(re, "invalid tso value")) + testutil.StatusNotOK(re), testutil.StringContain(re, "invalid tso value"), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) }