Skip to content

Commit

Permalink
mcs: make scheduling server support operator http interface (tikv#7090)
Browse files Browse the repository at this point in the history
ref tikv#5839

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] committed Sep 27, 2023
1 parent 6752974 commit 7a9e566
Show file tree
Hide file tree
Showing 13 changed files with 730 additions and 509 deletions.
138 changes: 111 additions & 27 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"github.com/joho/godotenv"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/handler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
Expand All @@ -35,6 +38,7 @@ import (

// APIPathPrefix is the prefix of the API path.
const APIPathPrefix = "/scheduling/api/v1"
const handlerKey = "handler"

var (
once sync.Once
Expand Down Expand Up @@ -62,6 +66,18 @@ type Service struct {
rd *render.Render
}

type server struct {
server *scheserver.Server
}

func (s *server) GetCoordinator() *schedule.Coordinator {
return s.server.GetCoordinator()
}

func (s *server) GetCluster() sche.SharedCluster {
return s.server.GetCluster()
}

func createIndentRender() *render.Render {
return render.New(render.Options{
IndentJSON: true,
Expand All @@ -81,6 +97,7 @@ func NewService(srv *scheserver.Service) *Service {
apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression))
apiHandlerEngine.Use(func(c *gin.Context) {
c.Set(multiservicesapi.ServiceContextKey, srv.Server)
c.Set(handlerKey, handler.NewHandler(&server{server: srv.Server}))
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
Expand Down Expand Up @@ -115,7 +132,10 @@ func (s *Service) RegisterCheckersRouter() {
func (s *Service) RegisterOperatorsRouter() {
router := s.root.Group("operators")
router.GET("", getOperators)
router.GET("/:id", getOperatorByID)
router.POST("", createOperator)
router.GET("/:id", getOperatorByRegion)
router.DELETE("/:id", deleteOperatorByRegion)
router.GET("/records", getOperatorRecords)
}

// @Tags operators
Expand All @@ -126,8 +146,8 @@ func (s *Service) RegisterOperatorsRouter() {
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators/{id} [GET]
func getOperatorByID(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
func getOperatorByRegion(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
id := c.Param("id")

regionID, err := strconv.ParseUint(id, 10, 64)
Expand All @@ -136,13 +156,13 @@ func getOperatorByID(c *gin.Context) {
return
}

opController := svr.GetCoordinator().GetOperatorController()
if opController == nil {
op, err := handler.GetOperatorStatus(regionID)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

c.IndentedJSON(http.StatusOK, opController.GetOperatorStatus(regionID))
c.IndentedJSON(http.StatusOK, op)
}

// @Tags operators
Expand All @@ -153,40 +173,104 @@ func getOperatorByID(c *gin.Context) {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators [GET]
func getOperators(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
handler := c.MustGet(handlerKey).(*handler.Handler)
var (
results []*operator.Operator
ops []*operator.Operator
err error
)

opController := svr.GetCoordinator().GetOperatorController()
if opController == nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
kinds := c.QueryArray("kind")
if len(kinds) == 0 {
results = opController.GetOperators()
results, err = handler.GetOperators()
} else {
for _, kind := range kinds {
switch kind {
case "admin":
ops = opController.GetOperatorsOfKind(operator.OpAdmin)
case "leader":
ops = opController.GetOperatorsOfKind(operator.OpLeader)
case "region":
ops = opController.GetOperatorsOfKind(operator.OpRegion)
case "waiting":
ops = opController.GetWaitingOperators()
}
results = append(results, ops...)
}
results, err = handler.GetOperatorsByKinds(kinds)
}

if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, results)
}

// @Tags operator
// @Summary Cancel a Region's pending operator.
// @Param region_id path int true "A Region's Id"
// @Produce json
// @Success 200 {string} string "The pending operator is canceled."
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators/{region_id} [delete]
func deleteOperatorByRegion(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
id := c.Param("id")

regionID, err := strconv.ParseUint(id, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

if err = handler.RemoveOperator(regionID); err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

c.String(http.StatusOK, "The pending operator is canceled.")
}

// @Tags operator
// @Summary lists the finished operators since the given timestamp in second.
// @Param from query integer false "From Unix timestamp"
// @Produce json
// @Success 200 {object} []operator.OpRecord
// @Failure 400 {string} string "The request is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators/records [get]
func getOperatorRecords(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
from, err := apiutil.ParseTime(c.Query("from"))
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
records, err := handler.GetRecords(from)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, records)
}

// FIXME: details of input json body params
// @Tags operator
// @Summary Create an operator.
// @Accept json
// @Param body body object true "json params"
// @Produce json
// @Success 200 {string} string "The operator is created."
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators [post]
func createOperator(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
var input map[string]interface{}
if err := c.BindJSON(&input); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
statusCode, result, err := handler.HandleOperatorCreation(input)
if err != nil {
c.String(statusCode, err.Error())
return
}
if statusCode == http.StatusOK && result == nil {
c.String(http.StatusOK, "The operator is created.")
return
}
c.IndentedJSON(statusCode, result)
}

// @Tags checkers
// @Summary Get checker by name
// @Param name path string true "The name of the checker."
Expand Down
6 changes: 6 additions & 0 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,12 @@ func (s *Server) startWatcher() (err error) {
return err
}

// GetPersistConfig returns the persist config.
// It's used to test.
func (s *Server) GetPersistConfig() *config.PersistConfig {
return s.persistConfig
}

// CreateServer creates the Server
func CreateServer(ctx context.Context, cfg *config.Config) *Server {
svr := &Server{
Expand Down
Loading

0 comments on commit 7a9e566

Please sign in to comment.