Skip to content

Commit

Permalink
add forward and test
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Oct 12, 2023
1 parent 600de27 commit dfb7cae
Show file tree
Hide file tree
Showing 9 changed files with 395 additions and 30 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,11 @@ error = '''
invalid rule content, %s
'''

["PD:placement:ErrRuleNotFound"]
error = '''
rule not found
'''

["PD:plugin:ErrLoadPlugin"]
error = '''
failed to load plugin
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ var (
ErrBuildRuleList = errors.Normalize("build rule list failed, %s", errors.RFCCodeText("PD:placement:ErrBuildRuleList"))
ErrPlacementDisabled = errors.Normalize("placement rules feature is disabled", errors.RFCCodeText("PD:placement:ErrPlacementDisabled"))
ErrKeyFormat = errors.Normalize("key should be in hex format", errors.RFCCodeText("PD:placement:ErrKeyFormat"))
ErrRuleNotFound = errors.Normalize("rule not found", errors.RFCCodeText("PD:placement:ErrRuleNotFound"))
)

// region label errors
Expand Down
302 changes: 292 additions & 10 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package apis

import (
"encoding/hex"
"fmt"
"net/http"
"strconv"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
sche "github.com/tikv/pd/pkg/schedule/core"
Expand Down Expand Up @@ -114,6 +116,7 @@ func NewService(srv *scheserver.Service) *Service {
s.RegisterSchedulersRouter()
s.RegisterCheckersRouter()
s.RegisterHotspotRouter()
s.RegisterConfigRouter()
return s
}

Expand Down Expand Up @@ -150,16 +153,6 @@ func (s *Service) RegisterHotspotRouter() {
router.GET("/buckets", getHotBuckets)
}

// RegisterConfigRouter registers the router of the config handler.
func (s *Service) RegisterConfigRouter() {
// router := s.root.Group("config")
// router.GET("/rule", getHotBuckets)
// router.GET("/rules", getHotBuckets)
// router.GET("/rule_group", getHotBuckets)
// router.GET("/rule_groups", getHotBuckets)
// router.GET("/placement-rule", getHotBuckets)
}

// RegisterOperatorsRouter registers the router of the operators handler.
func (s *Service) RegisterOperatorsRouter() {
router := s.root.Group("operators")
Expand All @@ -170,6 +163,31 @@ func (s *Service) RegisterOperatorsRouter() {
router.GET("/records", getOperatorRecords)
}

// RegisterConfigRouter registers the router of the config handler.
func (s *Service) RegisterConfigRouter() {
router := s.root.Group("config")

rules := router.Group("rules")
rules.GET("", getAllRules)
rules.GET("/group/:group", getRuleByGroup)
rules.GET("/region/:region", getRulesByRegion)
rules.GET("/region/:region/detail", checkRegionPlacementRule)
rules.GET("/key/:key", getRulesByKey)

// We cannot merge `/rule` and `/rules`, because we allow `group_id` to be "group",
// which is the same as the prefix of `/rules/group/:group`.
rule := router.Group("rule")
rule.GET("/:group/:id", getRuleByGroupAndID)

groups := router.Group("rule_groups")
groups.GET("", getAllGroupConfigs)
groups.GET("/:id", getGroupConfig)

placementRule := router.Group("placement-rule")
placementRule.GET("", getPlacementRules)
placementRule.GET("/:group", getPlacementRuleByGroup)
}

func changeLogLevel(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
var level string
Expand Down Expand Up @@ -558,3 +576,267 @@ func getHistoryHotRegions(c *gin.Context) {
var res storage.HistoryHotRegions
c.IndentedJSON(http.StatusOK, res)
}

// @Tags rule
// @Summary List all rules of cluster.
// @Produce json
// @Success 200 {array} placement.Rule
// @Failure 412 {string} string "Placement rules feature is disabled."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/rules [get]
func getAllRules(c *gin.Context) {
fmt.Println("============")
handler := c.MustGet(handlerKey).(*handler.Handler)
manager, err := handler.GetRuleManager()
if err == errs.ErrPlacementDisabled {
c.String(http.StatusPreconditionFailed, err.Error())
return
}
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
rules := manager.GetAllRules()
c.IndentedJSON(http.StatusOK, rules)
}

// @Tags rule
// @Summary List all rules of cluster by group.
// @Param group path string true "The name of group"
// @Produce json
// @Success 200 {array} placement.Rule
// @Failure 412 {string} string "Placement rules feature is disabled."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/rules/group/{group} [get]
func getRuleByGroup(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
manager, err := handler.GetRuleManager()
if err == errs.ErrPlacementDisabled {
c.String(http.StatusPreconditionFailed, err.Error())
return
}
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
group := c.Param("group")
rules := manager.GetRulesByGroup(group)
c.IndentedJSON(http.StatusOK, rules)
}

// @Tags rule
// @Summary List all rules of cluster by region.
// @Param id path integer true "Region Id"
// @Produce json
// @Success 200 {array} placement.Rule
// @Failure 400 {string} string "The input is invalid."
// @Failure 404 {string} string "The region does not exist."
// @Failure 412 {string} string "Placement rules feature is disabled."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/rules/region/{region} [get]
func getRulesByRegion(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
manager, err := handler.GetRuleManager()
if err == errs.ErrPlacementDisabled {
c.String(http.StatusPreconditionFailed, err.Error())
return
}
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
regionStr := c.Param("region")
region, code, err := handler.PreCheckForRegion(regionStr)
if err != nil {
c.String(code, err.Error())
return
}
rules := manager.GetRulesForApplyRegion(region)
c.IndentedJSON(http.StatusOK, rules)
}

// @Tags rule
// @Summary List rules and matched peers related to the given region.
// @Param id path integer true "Region Id"
// @Produce json
// @Success 200 {object} placement.RegionFit
// @Failure 400 {string} string "The input is invalid."
// @Failure 404 {string} string "The region does not exist."
// @Failure 412 {string} string "Placement rules feature is disabled."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/rules/region/{region}/detail [get]
func checkRegionPlacementRule(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
regionStr := c.Param("region")
region, code, err := handler.PreCheckForRegion(regionStr)
if err != nil {
c.String(code, err.Error())
return
}
regionFit, err := handler.CheckRegionPlacementRule(region)
if err == errs.ErrPlacementDisabled {
c.String(http.StatusPreconditionFailed, err.Error())
return
}
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, regionFit)
}

// @Tags rule
// @Summary List all rules of cluster by key.
// @Param key path string true "The name of key"
// @Produce json
// @Success 200 {array} placement.Rule
// @Failure 400 {string} string "The input is invalid."
// @Failure 412 {string} string "Placement rules feature is disabled."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/rules/key/{key} [get]
func getRulesByKey(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
manager, err := handler.GetRuleManager()
if err == errs.ErrPlacementDisabled {
c.String(http.StatusPreconditionFailed, err.Error())
return
}
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
keyHex := c.Param("key")
key, err := hex.DecodeString(keyHex)
if err != nil {
c.String(http.StatusBadRequest, errs.ErrKeyFormat.Error())
return
}
rules := manager.GetRulesByKey(key)
c.IndentedJSON(http.StatusOK, rules)
}

// @Tags rule
// @Summary Get rule of cluster by group and id.
// @Param group path string true "The name of group"
// @Param id path string true "Rule Id"
// @Produce json
// @Success 200 {object} placement.Rule
// @Failure 404 {string} string "The rule does not exist."
// @Failure 412 {string} string "Placement rules feature is disabled."
// @Router /config/rule/{group}/{id} [get]
func getRuleByGroupAndID(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
manager, err := handler.GetRuleManager()
if err == errs.ErrPlacementDisabled {
c.String(http.StatusPreconditionFailed, err.Error())
return
}
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
group, id := c.Param("group"), c.Param("id")
rule := manager.GetRule(group, id)
if rule == nil {
c.String(http.StatusNotFound, errs.ErrRuleNotFound.Error())
return
}
c.IndentedJSON(http.StatusOK, rule)
}

// @Tags rule
// @Summary List all rule group configs.
// @Produce json
// @Success 200 {array} placement.RuleGroup
// @Failure 412 {string} string "Placement rules feature is disabled."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/rule_groups [get]
func getAllGroupConfigs(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
manager, err := handler.GetRuleManager()
if err == errs.ErrPlacementDisabled {
c.String(http.StatusPreconditionFailed, err.Error())
return
}
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
ruleGroups := manager.GetRuleGroups()
c.IndentedJSON(http.StatusOK, ruleGroups)
}

// @Tags rule
// @Summary Get rule group config by group id.
// @Param id path string true "Group Id"
// @Produce json
// @Success 200 {object} placement.RuleGroup
// @Failure 404 {string} string "The RuleGroup does not exist."
// @Failure 412 {string} string "Placement rules feature is disabled."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/rule_groups/{id} [get]
func getGroupConfig(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
manager, err := handler.GetRuleManager()
if err == errs.ErrPlacementDisabled {
c.String(http.StatusPreconditionFailed, err.Error())
return
}
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
id := c.Param("id")
group := manager.GetRuleGroup(id)
if group == nil {
c.String(http.StatusNotFound, errs.ErrRuleNotFound.Error())
return
}
c.IndentedJSON(http.StatusOK, group)
}

// @Tags rule
// @Summary List all rules and groups configuration.
// @Produce json
// @Success 200 {array} placement.GroupBundle
// @Failure 412 {string} string "Placement rules feature is disabled."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/placement-rules [get]
func getPlacementRules(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
manager, err := handler.GetRuleManager()
if err == errs.ErrPlacementDisabled {
c.String(http.StatusPreconditionFailed, err.Error())
return
}
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
bundles := manager.GetAllGroupBundles()
c.IndentedJSON(http.StatusOK, bundles)
}

// @Tags rule
// @Summary Get group config and all rules belong to the group.
// @Param group path string true "The name of group"
// @Produce json
// @Success 200 {object} placement.GroupBundle
// @Failure 412 {string} string "Placement rules feature is disabled."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/placement-rules/{group} [get]
func getPlacementRuleByGroup(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
manager, err := handler.GetRuleManager()
if err == errs.ErrPlacementDisabled {
c.String(http.StatusPreconditionFailed, err.Error())
return
}
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
g := c.Param("group")
group := manager.GetGroupBundle(g)
c.IndentedJSON(http.StatusOK, group)
}
2 changes: 2 additions & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -516,6 +517,7 @@ func (c *Cluster) StopBackgroundJobs() {

// HandleRegionHeartbeat processes RegionInfo reports from client.
func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
fmt.Println("handle", region.GetID())
if err := c.processRegionHeartbeat(region); err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/utils/apiutil/apiutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,10 @@ func NewAccessPath(path, method string) AccessPath {
func PostJSON(client *http.Client, url string, data []byte) (*http.Response, error) {
req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(data))
if err != nil {
fmt.Println("PostJSON199", req.Method, err.Error())
return nil, err
}
fmt.Println("PostJSON202", req.Method)
req.Header.Set("Content-Type", "application/json")
return client.Do(req)
}
Expand Down
Loading

0 comments on commit dfb7cae

Please sign in to comment.