Skip to content

Commit

Permalink
add http support for scheduling service
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Aug 15, 2023
1 parent 67a51b7 commit cacdb34
Show file tree
Hide file tree
Showing 8 changed files with 435 additions and 19 deletions.
199 changes: 198 additions & 1 deletion pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package apis

import (
"net/http"
"strconv"
"sync"
"time"

"github.com/gin-contrib/cors"
"github.com/gin-contrib/gzip"
Expand All @@ -25,6 +27,7 @@ import (
"github.com/joho/godotenv"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/unrolled/render"
Expand Down Expand Up @@ -77,7 +80,7 @@ func NewService(srv *scheserver.Service) *Service {
apiHandlerEngine.Use(cors.Default())
apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression))
apiHandlerEngine.Use(func(c *gin.Context) {
c.Set(multiservicesapi.ServiceContextKey, srv)
c.Set(multiservicesapi.ServiceContextKey, srv.Server)
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
Expand All @@ -90,5 +93,199 @@ func NewService(srv *scheserver.Service) *Service {
root: root,
rd: createIndentRender(),
}
s.RegisterOperatorsRouter()
s.RegisterSchedulersRouter()
s.RegisterCheckersRouter()
return s
}

// RegisterSchedulersRouter registers the router of the schedulers handler.
func (s *Service) RegisterSchedulersRouter() {
router := s.root.Group("schedulers")
router.GET("", getSchedulers)
}

// RegisterCheckersRouter registers the router of the checkers handler.
func (s *Service) RegisterCheckersRouter() {
router := s.root.Group("checkers")
router.GET("/:name", getCheckerByName)
}

// RegisterOperatorsRouter registers the router of the operators handler.
func (s *Service) RegisterOperatorsRouter() {
router := s.root.Group("operators")
router.GET("", getOperators)
router.GET("/:id", getOperatorByID)
}

// @Tags operators
// @Summary Get a Region's pending operator.
// @Param region_id path int true "A Region's Id"
// @Produce json
// @Success 200 {object} operator.OpWithStatus
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators/{id} [GET]
func getOperatorByID(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
id := c.Param("id")

regionID, err := strconv.ParseUint(id, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

opController := svr.GetCoordinator().GetOperatorController()
if opController == nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

c.JSON(http.StatusOK, opController.GetOperatorStatus(regionID))
}

// @Tags operators
// @Summary List pending operators.
// @Param kind query string false "Specify the operator kind." Enums(admin, leader, region)
// @Produce json
// @Success 200 {array} operator.Operator
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators [GET]
func getOperators(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
var (
results []*operator.Operator
ops []*operator.Operator
err error
)

opController := svr.GetCoordinator().GetOperatorController()
if opController == nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
kinds := c.QueryArray("kind")
if len(kinds) == 0 {
results = opController.GetOperators()
} else {
for _, kind := range kinds {
switch kind {
case "admin":
ops = opController.GetOperatorsOfKind(operator.OpAdmin)
case "leader":
ops = opController.GetOperatorsOfKind(operator.OpLeader)
case "region":
ops = opController.GetOperatorsOfKind(operator.OpRegion)
case "waiting":
ops = opController.GetWaitingOperators()
}
results = append(results, ops...)
}
}

c.JSON(http.StatusOK, results)
}

// @Tags checkers
// @Summary Get if checker is paused
// @Param name path string true "The name of the scheduler."
// @Produce json
// @Success 200 {string} string "Pause or resume the scheduler successfully."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /checkers/{name} [get]
func getCheckerByName(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
name := c.Param("name")
co := svr.GetCoordinator()
isPaused, err := co.IsCheckerPaused(name)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
output := map[string]bool{
"paused": isPaused,
}
c.JSON(http.StatusOK, output)
}

type schedulerPausedPeriod struct {
Name string `json:"name"`
PausedAt time.Time `json:"paused_at"`
ResumeAt time.Time `json:"resume_at"`
}

// @Tags schedulers
// @Summary List all created schedulers by status.
// @Produce json
// @Success 200 {array} string
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /schedulers [get]
func getSchedulers(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
co := svr.GetCoordinator()
sc := co.GetSchedulersController()
schedulers := sc.GetSchedulerNames()

status := c.Query("status")
_, needTS := c.GetQuery("timestamp")
switch status {
case "paused":
var pausedSchedulers []string
pausedPeriods := []schedulerPausedPeriod{}
for _, scheduler := range schedulers {
paused, err := sc.IsSchedulerPaused(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

if paused {
if needTS {
s := schedulerPausedPeriod{
Name: scheduler,
PausedAt: time.Time{},
ResumeAt: time.Time{},
}
pausedAt, err := sc.GetPausedSchedulerDelayAt(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
s.PausedAt = time.Unix(pausedAt, 0)
resumeAt, err := sc.GetPausedSchedulerDelayUntil(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
s.ResumeAt = time.Unix(resumeAt, 0)
pausedPeriods = append(pausedPeriods, s)
} else {
pausedSchedulers = append(pausedSchedulers, scheduler)
}
}
}
if needTS {
c.JSON(http.StatusOK, pausedPeriods)
} else {
c.JSON(http.StatusOK, pausedSchedulers)
}
return
case "disabled":
var disabledSchedulers []string
for _, scheduler := range schedulers {
disabled, err := sc.IsSchedulerDisabled(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

if disabled {
disabledSchedulers = append(disabledSchedulers, scheduler)
}
}
c.JSON(http.StatusOK, disabledSchedulers)
default:
c.JSON(http.StatusOK, schedulers)
}
}
5 changes: 3 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConf

// TODO: implement the following methods

// UpdateRegionsLabelLevelStats updates the region label level stats.
func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {}
// UpdateRegionsLabelLevelStats updates the status of the region label level by types.
func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {
}

// AllocID allocates a new ID.
func (c *Cluster) AllocID() (uint64, error) { return 0, nil }
7 changes: 6 additions & 1 deletion pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
return &s.cfg.Security.TLSConfig
}

// GetCoordinator returns the coordinator.
func (s *Server) GetCoordinator() *schedule.Coordinator {
return s.coordinator
}

func (s *Server) initClient() error {
tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
Expand Down Expand Up @@ -501,6 +506,7 @@ func (s *Server) startServer() (err error) {
if err != nil {
return err
}
s.service = &Service{Server: s}
tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
return err
Expand Down Expand Up @@ -543,7 +549,6 @@ func (s *Server) startServer() (err error) {
log.Error("failed to register the service", zap.String("service-name", utils.SchedulingServiceName), errs.ZapError(err))
return err
}

atomic.StoreInt64(&s.isRunning, 1)
return nil
}
Expand Down
78 changes: 78 additions & 0 deletions pkg/mcs/scheduling/server/testutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"os"

"github.com/pingcap/log"
"github.com/spf13/pflag"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/testutil"
)

// NewTestServer creates a resource manager server for testing.
func NewTestServer(ctx context.Context, re *require.Assertions, cfg *config.Config) (*Server, testutil.CleanupFunc, error) {
// New zap logger
err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
re.NoError(err)
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
// Flushing any buffered log entries
defer log.Sync()

s := CreateServer(ctx, cfg)
if err = s.Run(); err != nil {
return nil, nil, err
}

cleanup := func() {
s.Close()
os.RemoveAll(cfg.DataDir)
}
return s, cleanup, nil
}

// GenerateConfig generates a new config with the given options.
func GenerateConfig(c *config.Config) (*config.Config, error) {
arguments := []string{
"--listen-addr=" + c.ListenAddr,
"--advertise-listen-addr=" + c.AdvertiseListenAddr,
"--backend-endpoints=" + c.BackendEndpoints,
}

flagSet := pflag.NewFlagSet("test", pflag.ContinueOnError)
flagSet.BoolP("version", "V", false, "print version information and exit")
flagSet.StringP("config", "", "", "config file")
flagSet.StringP("backend-endpoints", "", "", "url for etcd client")
flagSet.StringP("listen-addr", "", "", "listen address for tso service")
flagSet.StringP("advertise-listen-addr", "", "", "advertise urls for listen address (default '${listen-addr}')")
flagSet.StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
flagSet.StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
flagSet.StringP("key", "", "", "path of file that contains X509 key in PEM format")
err := flagSet.Parse(arguments)
if err != nil {
return nil, err
}
cfg := config.NewConfig()
err = cfg.Parse(flagSet)
if err != nil {
return nil, err
}

return cfg, nil
}
15 changes: 15 additions & 0 deletions pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,21 @@ func (oc *Controller) GetWaitingOperators() []*Operator {
return oc.wop.ListOperator()
}

// GetOperatorsOfKind returns the running operators of the kind.
func (oc *Controller) GetOperatorsOfKind(mask OpKind) []*Operator {
oc.RLock()
defer oc.RUnlock()

operators := make([]*Operator, 0, len(oc.operators))
for _, op := range oc.operators {
if op.Kind()&mask != 0 {
operators = append(operators, op)
}
}

return operators
}

// SendScheduleCommand sends a command to the region.
func (oc *Controller) SendScheduleCommand(region *core.RegionInfo, step OpStep, source string) {
log.Info("send schedule command",
Expand Down
Loading

0 comments on commit cacdb34

Please sign in to comment.