Skip to content

Commit

Permalink
mcs: support region label http interface in scheduling server
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Oct 27, 2023
1 parent 744e51d commit 0452c63
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 9 deletions.
160 changes: 160 additions & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package apis
import (
"fmt"
"net/http"
"net/url"
"strconv"
"sync"

Expand All @@ -26,6 +27,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
sche "github.com/tikv/pd/pkg/schedule/core"
Expand Down Expand Up @@ -114,6 +116,8 @@ func NewService(srv *scheserver.Service) *Service {
s.RegisterSchedulersRouter()
s.RegisterCheckersRouter()
s.RegisterHotspotRouter()
s.RegisterRegionsRouter()
s.RegisterRegionLabelRouter()
return s
}

Expand Down Expand Up @@ -160,6 +164,21 @@ func (s *Service) RegisterOperatorsRouter() {
router.GET("/records", getOperatorRecords)
}

// RegisterRegionsRouter registers the router of the regions handler.
func (s *Service) RegisterRegionsRouter() {
router := s.root.Group("regions")
router.GET("/:id/label/:key", getRegionLabelByKey)
router.GET("/:id/labels", getRegionLabels)
}

// RegisterRegionLabelRouter registers the router of the region label handler.
func (s *Service) RegisterRegionLabelRouter() {
router := s.root.Group("config/region-label")
router.GET("rules", getAllRegionLabelRules)
router.GET("rules/ids", getRegionLabelRulesByIDs)
router.GET("rule/:id", getRegionLabelRuleByID)
}

func changeLogLevel(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
var level string
Expand Down Expand Up @@ -548,3 +567,144 @@ func getHistoryHotRegions(c *gin.Context) {
var res storage.HistoryHotRegions
c.IndentedJSON(http.StatusOK, res)
}

// @Tags region_label
// @Summary Get label of a region.
// @Param id path integer true "Region Id"
// @Param key path string true "Label key"
// @Produce json
// @Success 200 {string} string
// @Failure 400 {string} string "The input is invalid."
// @Failure 404 {string} string "The region does not exist."
// @Router /regions/{id}/label/{key} [get]
func getRegionLabelByKey(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)

idStr := c.Param("id")
labelKey := c.Param("key") // TODO: test https://github.com/tikv/pd/pull/4004

id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

region := handler.GetRegion(id)
if region == nil {
c.String(http.StatusNotFound, errs.ErrRegionNotFound.FastGenByArgs().Error())
return
}

l, err := handler.GetRegionLabeler()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
labelValue := l.GetRegionLabel(region, labelKey)
c.IndentedJSON(http.StatusOK, labelValue)
}

// @Tags region_label
// @Summary Get labels of a region.
// @Param id path integer true "Region Id"
// @Produce json
// @Success 200 {string} string
// @Failure 400 {string} string "The input is invalid."
// @Failure 404 {string} string "The region does not exist."
// @Router /regions/{id}/labels [get]
func getRegionLabels(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)

idStr := c.Param("id")
id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

region := handler.GetRegion(id)
if region == nil {
c.String(http.StatusNotFound, errs.ErrRegionNotFound.FastGenByArgs().Error())
return
}
l, err := handler.GetRegionLabeler()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
labels := l.GetRegionLabels(region)
c.IndentedJSON(http.StatusOK, labels)
}

// @Tags region_label
// @Summary List all label rules of cluster.
// @Produce json
// @Success 200 {array} labeler.LabelRule
// @Router /config/region-label/rules [get]
func getAllRegionLabelRules(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
l, err := handler.GetRegionLabeler()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
rules := l.GetAllLabelRules()
c.IndentedJSON(http.StatusOK, rules)
}

// @Tags region_label
// @Summary Get label rules of cluster by ids.
// @Param body body []string true "IDs of query rules"
// @Produce json
// @Success 200 {array} labeler.LabelRule
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/region-label/rules/ids [get]
func getRegionLabelRulesByIDs(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
l, err := handler.GetRegionLabeler()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
var ids []string
if err := c.BindJSON(&ids); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
rules, err := l.GetLabelRules(ids)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, rules)
}

// @Tags region_label
// @Summary Get label rule of cluster by id.
// @Param id path string true "Rule Id"
// @Produce json
// @Success 200 {object} labeler.LabelRule
// @Failure 404 {string} string "The rule does not exist."
// @Router /config/region-label/rule/{id} [get]
func getRegionLabelRuleByID(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)

id, err := url.PathUnescape(c.Param("id"))
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

l, err := handler.GetRegionLabeler()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
rule := l.GetLabelRule(id)
if rule == nil {
c.String(http.StatusNotFound, errs.ErrRegionRuleNotFound.FastGenByArgs().Error())
return
}
c.IndentedJSON(http.StatusOK, rule)
}
19 changes: 19 additions & 0 deletions pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/scatter"
Expand Down Expand Up @@ -1040,3 +1041,21 @@ func (h *Handler) GetHotBuckets(regionIDs ...uint64) (HotBucketsResponse, error)
}
return ret, nil
}

// GetRegion returns the region labeler.
func (h *Handler) GetRegion(id uint64) *core.RegionInfo {
c := h.GetCluster()
if c == nil {
return nil
}
return c.GetRegion(id)
}

// GetRegionLabeler returns the region labeler.
func (h *Handler) GetRegionLabeler() (*labeler.RegionLabeler, error) {
c := h.GetCluster()
if c == nil || c.GetRegionLabeler() == nil {
return nil, errs.ErrNotBootstrapped
}
return c.GetRegionLabeler(), nil
}
23 changes: 16 additions & 7 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type microserviceRedirectRule struct {
targetPath string
targetServiceName string
matchMethods []string
filter func(*http.Request) bool
}

// NewRedirector redirects request to the leader if needs to be handled in the leader.
Expand All @@ -94,14 +95,19 @@ func NewRedirector(s *server.Server, opts ...RedirectorOption) negroni.Handler {
type RedirectorOption func(*redirector)

// MicroserviceRedirectRule new a microservice redirect rule option
func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string, methods []string) RedirectorOption {
func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string,
methods []string, filters ...func(*http.Request) bool) RedirectorOption {
return func(s *redirector) {
s.microserviceRedirectRules = append(s.microserviceRedirectRules, &microserviceRedirectRule{
matchPath,
targetPath,
targetServiceName,
methods,
})
rule := &microserviceRedirectRule{
matchPath: matchPath,
targetPath: targetPath,
targetServiceName: targetServiceName,
matchMethods: methods,
}
if len(filters) > 0 {
rule.filter = filters[0]
}
s.microserviceRedirectRules = append(s.microserviceRedirectRules, rule)
}
}

Expand All @@ -117,6 +123,9 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri
r.URL.Path = strings.TrimRight(r.URL.Path, "/")
for _, rule := range h.microserviceRedirectRules {
if strings.HasPrefix(r.URL.Path, rule.matchPath) && slice.Contains(rule.matchMethods, r.Method) {
if rule.filter != nil && !rule.filter(r) {
continue
}
addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName)
if !ok || addr == "" {
log.Warn("failed to get the service primary addr when trying to match redirect rules",
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/testutil/api_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func ReadGetJSON(re *require.Assertions, client *http.Client, url string, data i
}

// ReadGetJSONWithBody is used to do get request with input and check whether given data can be extracted successfully.
func ReadGetJSONWithBody(re *require.Assertions, client *http.Client, url string, input []byte, data interface{}) error {
func ReadGetJSONWithBody(re *require.Assertions, client *http.Client, url string, input []byte, data interface{}, checkOpts ...func([]byte, int, http.Header)) error {
resp, err := apiutil.GetJSON(client, url, input)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion server/api/region_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (h *regionLabelHandler) PatchRegionLabelRules(w http.ResponseWriter, r *htt
// @Success 200 {array} labeler.LabelRule
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/region-label/rule/ids [get]
// @Router /config/region-label/rules/ids [get]
func (h *regionLabelHandler) GetRegionLabelRulesByIDs(w http.ResponseWriter, r *http.Request) {
cluster := getCluster(r)
var ids []string
Expand Down
16 changes: 16 additions & 0 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package api
import (
"context"
"net/http"
"strings"

"github.com/gorilla/mux"
scheapi "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1"
Expand Down Expand Up @@ -78,6 +79,21 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
scheapi.APIPathPrefix+"/checkers",
mcs.SchedulingServiceName,
[]string{http.MethodPost, http.MethodGet}),
serverapi.MicroserviceRedirectRule(
prefix+"/region/id",
scheapi.APIPathPrefix+"/regions",
mcs.SchedulingServiceName,
[]string{http.MethodGet},
func(r *http.Request) bool {
// The original code uses the path "/region/id" to get the region id.
// However, the path "/region/id" is used to get the region by id, which is not what we want.
return strings.Contains(r.URL.Path, "label")
}),
serverapi.MicroserviceRedirectRule(
prefix+"/config/region-label",
scheapi.APIPathPrefix+"/config/region-label",
mcs.SchedulingServiceName,
[]string{http.MethodGet}),
serverapi.MicroserviceRedirectRule(
prefix+"/hotspot",
scheapi.APIPathPrefix+"/hotspot",
Expand Down
23 changes: 23 additions & 0 deletions tests/integrations/mcs/scheduling/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/suite"
_ "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1"
"github.com/tikv/pd/pkg/schedule/handler"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/apiutil"
Expand Down Expand Up @@ -217,4 +218,26 @@ func (suite *apiTestSuite) TestAPIForward() {
err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "hotspot/regions/history"), &history,
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
re.NoError(err)

// Test region label
var rules []*labeler.LabelRule
err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/region-label/rules"), &rules,
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
re.NoError(err)
err = testutil.ReadGetJSONWithBody(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/region-label/rules/ids"), []byte(`["rule1", "rule3"]`),
&rules, testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
re.NoError(err)
err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/region-label/rule/rule1"), nil,
testutil.StatusNotOK(re), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
re.NoError(err)

err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "region/id/1"), nil,
testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader))
re.NoError(err)
err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "region/id/1/label/key"), nil,
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader,"true"))
re.NoError(err)
err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "region/id/1/labels"), nil,
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader,"true"))
re.NoError(err)
}

0 comments on commit 0452c63

Please sign in to comment.