Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: make scheduling server support operator http interface #7090

Merged
merged 14 commits into from
Sep 27, 2023
138 changes: 111 additions & 27 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"github.com/joho/godotenv"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/handler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
Expand All @@ -35,6 +38,7 @@ import (

// APIPathPrefix is the prefix of the API path.
const APIPathPrefix = "/scheduling/api/v1"
const handlerKey = "handler"

var (
once sync.Once
Expand Down Expand Up @@ -62,6 +66,18 @@ type Service struct {
rd *render.Render
}

type server struct {
server *scheserver.Server
}

func (s *server) GetCoordinator() *schedule.Coordinator {
return s.server.GetCoordinator()
}

func (s *server) GetCluster() sche.SharedCluster {
return s.server.GetCluster()
}

func createIndentRender() *render.Render {
return render.New(render.Options{
IndentJSON: true,
Expand All @@ -81,6 +97,7 @@ func NewService(srv *scheserver.Service) *Service {
apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression))
apiHandlerEngine.Use(func(c *gin.Context) {
c.Set(multiservicesapi.ServiceContextKey, srv.Server)
c.Set(handlerKey, handler.NewHandler(&server{server: srv.Server}))
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
Expand Down Expand Up @@ -115,7 +132,10 @@ func (s *Service) RegisterCheckersRouter() {
func (s *Service) RegisterOperatorsRouter() {
router := s.root.Group("operators")
router.GET("", getOperators)
router.GET("/:id", getOperatorByID)
router.POST("", createOperator)
router.GET("/:id", getOperatorByRegion)
router.DELETE("/:id", deleteOperatorByRegion)
router.GET("/records", getOperatorRecords)
}

// @Tags operators
Expand All @@ -126,8 +146,8 @@ func (s *Service) RegisterOperatorsRouter() {
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators/{id} [GET]
func getOperatorByID(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
func getOperatorByRegion(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
id := c.Param("id")

regionID, err := strconv.ParseUint(id, 10, 64)
Expand All @@ -136,13 +156,13 @@ func getOperatorByID(c *gin.Context) {
return
}

opController := svr.GetCoordinator().GetOperatorController()
if opController == nil {
op, err := handler.GetOperatorStatus(regionID)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

c.IndentedJSON(http.StatusOK, opController.GetOperatorStatus(regionID))
c.IndentedJSON(http.StatusOK, op)
}

// @Tags operators
Expand All @@ -153,40 +173,104 @@ func getOperatorByID(c *gin.Context) {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators [GET]
func getOperators(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
handler := c.MustGet(handlerKey).(*handler.Handler)
var (
results []*operator.Operator
ops []*operator.Operator
err error
)

opController := svr.GetCoordinator().GetOperatorController()
if opController == nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
kinds := c.QueryArray("kind")
if len(kinds) == 0 {
results = opController.GetOperators()
results, err = handler.GetOperators()
} else {
for _, kind := range kinds {
switch kind {
case "admin":
ops = opController.GetOperatorsOfKind(operator.OpAdmin)
case "leader":
ops = opController.GetOperatorsOfKind(operator.OpLeader)
case "region":
ops = opController.GetOperatorsOfKind(operator.OpRegion)
case "waiting":
ops = opController.GetWaitingOperators()
}
results = append(results, ops...)
}
results, err = handler.GetOperatorsByKinds(kinds)
}

if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, results)
}

// @Tags operator
// @Summary Cancel a Region's pending operator.
// @Param region_id path int true "A Region's Id"
// @Produce json
// @Success 200 {string} string "The pending operator is canceled."
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators/{region_id} [delete]
func deleteOperatorByRegion(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
id := c.Param("id")

regionID, err := strconv.ParseUint(id, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

if err = handler.RemoveOperator(regionID); err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

c.String(http.StatusOK, "The pending operator is canceled.")
}

// @Tags operator
// @Summary lists the finished operators since the given timestamp in second.
// @Param from query integer false "From Unix timestamp"
// @Produce json
// @Success 200 {object} []operator.OpRecord
// @Failure 400 {string} string "The request is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a copy from server/api, maybe we need to fix all other comments too.

// @Router /operators/records [get]
func getOperatorRecords(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
from, err := apiutil.ParseTime(c.Query("from"))
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
records, err := handler.GetRecords(from)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, records)
}

// FIXME: details of input json body params
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean? 👀

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only copy from previous code

// @Tags operator
// @Summary Create an operator.
// @Accept json
// @Param body body object true "json params"
// @Produce json
// @Success 200 {string} string "The operator is created."
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators [post]
func createOperator(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
var input map[string]interface{}
if err := c.BindJSON(&input); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
statusCode, result, err := handler.HandleOperatorCreation(input)
if err != nil {
c.String(statusCode, err.Error())
return
}
if statusCode == http.StatusOK && result == nil {
c.String(http.StatusOK, "The operator is created.")
return
}
c.IndentedJSON(statusCode, result)
}

// @Tags checkers
// @Summary Get checker by name
// @Param name path string true "The name of the checker."
Expand Down
6 changes: 6 additions & 0 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,12 @@ func (s *Server) startWatcher() (err error) {
return err
}

// GetPersistConfig returns the persist config.
// It's used to test.
func (s *Server) GetPersistConfig() *config.PersistConfig {
return s.persistConfig
}

// CreateServer creates the Server
func CreateServer(ctx context.Context, cfg *config.Config) *Server {
svr := &Server{
Expand Down
Loading
Loading