diff --git a/errors.toml b/errors.toml index 1b96de8a209..ef61a01dd64 100644 --- a/errors.toml +++ b/errors.toml @@ -541,6 +541,11 @@ error = ''' build rule list failed, %s ''' +["PD:placement:ErrKeyFormat"] +error = ''' +key should be in hex format +''' + ["PD:placement:ErrLoadRule"] error = ''' load rule failed @@ -551,11 +556,21 @@ error = ''' load rule group failed ''' +["PD:placement:ErrPlacementDisabled"] +error = ''' +placement rules feature is disabled +''' + ["PD:placement:ErrRuleContent"] error = ''' invalid rule content, %s ''' +["PD:placement:ErrRuleNotFound"] +error = ''' +rule not found +''' + ["PD:plugin:ErrLoadPlugin"] error = ''' failed to load plugin @@ -606,6 +621,11 @@ error = ''' region %v has abnormal peer ''' +["PD:region:ErrRegionInvalidID"] +error = ''' +invalid region id +''' + ["PD:region:ErrRegionNotAdjacent"] error = ''' two regions are not adjacent diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 181dfc9b393..0edc97daf8c 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -102,6 +102,8 @@ var ( // region errors var ( + // ErrRegionInvalidID is error info for region id invalid. + ErrRegionInvalidID = errors.Normalize("invalid region id", errors.RFCCodeText("PD:region:ErrRegionInvalidID")) // ErrRegionNotAdjacent is error info for region not adjacent. ErrRegionNotAdjacent = errors.Normalize("two regions are not adjacent", errors.RFCCodeText("PD:region:ErrRegionNotAdjacent")) // ErrRegionNotFound is error info for region not found. @@ -153,10 +155,13 @@ var ( // placement errors var ( - ErrRuleContent = errors.Normalize("invalid rule content, %s", errors.RFCCodeText("PD:placement:ErrRuleContent")) - ErrLoadRule = errors.Normalize("load rule failed", errors.RFCCodeText("PD:placement:ErrLoadRule")) - ErrLoadRuleGroup = errors.Normalize("load rule group failed", errors.RFCCodeText("PD:placement:ErrLoadRuleGroup")) - ErrBuildRuleList = errors.Normalize("build rule list failed, %s", errors.RFCCodeText("PD:placement:ErrBuildRuleList")) + ErrRuleContent = errors.Normalize("invalid rule content, %s", errors.RFCCodeText("PD:placement:ErrRuleContent")) + ErrLoadRule = errors.Normalize("load rule failed", errors.RFCCodeText("PD:placement:ErrLoadRule")) + ErrLoadRuleGroup = errors.Normalize("load rule group failed", errors.RFCCodeText("PD:placement:ErrLoadRuleGroup")) + ErrBuildRuleList = errors.Normalize("build rule list failed, %s", errors.RFCCodeText("PD:placement:ErrBuildRuleList")) + ErrPlacementDisabled = errors.Normalize("placement rules feature is disabled", errors.RFCCodeText("PD:placement:ErrPlacementDisabled")) + ErrKeyFormat = errors.Normalize("key should be in hex format", errors.RFCCodeText("PD:placement:ErrKeyFormat")) + ErrRuleNotFound = errors.Normalize("rule not found", errors.RFCCodeText("PD:placement:ErrRuleNotFound")) ) // region label errors diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index 2b2a454d9b5..90124d49034 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -15,6 +15,8 @@ package apis import ( + "encoding/hex" + "fmt" "net/http" "strconv" "sync" @@ -25,12 +27,14 @@ import ( "github.com/gin-gonic/gin" "github.com/joho/godotenv" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server" - "github.com/tikv/pd/pkg/mcs/utils" - "github.com/tikv/pd/pkg/schedule" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/handler" "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/statistics/utils" + "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" "github.com/tikv/pd/pkg/utils/logutil" @@ -68,15 +72,11 @@ type Service struct { } type server struct { - server *scheserver.Server + *scheserver.Server } -func (s *server) GetCoordinator() *schedule.Coordinator { - return s.server.GetCoordinator() -} - -func (s *server) GetCluster() sche.SharedCluster { - return s.server.GetCluster() +func (s *server) GetCluster() sche.SchedulerCluster { + return s.Server.GetCluster() } func createIndentRender() *render.Render { @@ -98,11 +98,11 @@ func NewService(srv *scheserver.Service) *Service { apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression)) apiHandlerEngine.Use(func(c *gin.Context) { c.Set(multiservicesapi.ServiceContextKey, srv.Server) - c.Set(handlerKey, handler.NewHandler(&server{server: srv.Server})) + c.Set(handlerKey, handler.NewHandler(&server{srv.Server})) c.Next() }) apiHandlerEngine.Use(multiservicesapi.ServiceRedirector()) - apiHandlerEngine.GET("metrics", utils.PromHandler()) + apiHandlerEngine.GET("metrics", mcsutils.PromHandler()) pprof.Register(apiHandlerEngine) root := apiHandlerEngine.Group(APIPathPrefix) s := &Service{ @@ -115,6 +115,8 @@ func NewService(srv *scheserver.Service) *Service { s.RegisterOperatorsRouter() s.RegisterSchedulersRouter() s.RegisterCheckersRouter() + s.RegisterHotspotRouter() + s.RegisterConfigRouter() return s } @@ -141,6 +143,16 @@ func (s *Service) RegisterCheckersRouter() { router.POST("/:name", pauseOrResumeChecker) } +// RegisterHotspotRouter registers the router of the hotspot handler. +func (s *Service) RegisterHotspotRouter() { + router := s.root.Group("hotspot") + router.GET("/regions/write", getHotWriteRegions) + router.GET("/regions/read", getHotReadRegions) + router.GET("/regions/history", getHistoryHotRegions) + router.GET("/stores", getHotStores) + router.GET("/buckets", getHotBuckets) +} + // RegisterOperatorsRouter registers the router of the operators handler. func (s *Service) RegisterOperatorsRouter() { router := s.root.Group("operators") @@ -151,6 +163,31 @@ func (s *Service) RegisterOperatorsRouter() { router.GET("/records", getOperatorRecords) } +// RegisterConfigRouter registers the router of the config handler. +func (s *Service) RegisterConfigRouter() { + router := s.root.Group("config") + + rules := router.Group("rules") + rules.GET("", getAllRules) + rules.GET("/group/:group", getRuleByGroup) + rules.GET("/region/:region", getRulesByRegion) + rules.GET("/region/:region/detail", checkRegionPlacementRule) + rules.GET("/key/:key", getRulesByKey) + + // We cannot merge `/rule` and `/rules`, because we allow `group_id` to be "group", + // which is the same as the prefix of `/rules/group/:group`. + rule := router.Group("rule") + rule.GET("/:group/:id", getRuleByGroupAndID) + + groups := router.Group("rule_groups") + groups.GET("", getAllGroupConfigs) + groups.GET("/:id", getGroupConfig) + + placementRule := router.Group("placement-rule") + placementRule.GET("", getPlacementRules) + placementRule.GET("/:group", getPlacementRuleByGroup) +} + func changeLogLevel(c *gin.Context) { svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) var level string @@ -425,3 +462,380 @@ func pauseOrResumeScheduler(c *gin.Context) { } c.String(http.StatusOK, "Pause or resume the scheduler successfully.") } + +// @Tags hotspot +// @Summary List the hot write regions. +// @Produce json +// @Success 200 {object} statistics.StoreHotPeersInfos +// @Failure 400 {string} string "The request is invalid." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /hotspot/regions/write [get] +func getHotWriteRegions(c *gin.Context) { + getHotRegions(utils.Write, c) +} + +// @Tags hotspot +// @Summary List the hot read regions. +// @Produce json +// @Success 200 {object} statistics.StoreHotPeersInfos +// @Failure 400 {string} string "The request is invalid." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /hotspot/regions/read [get] +func getHotReadRegions(c *gin.Context) { + getHotRegions(utils.Read, c) +} + +func getHotRegions(typ utils.RWType, c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + + storeIDs := c.QueryArray("store_id") + if len(storeIDs) < 1 { + hotRegions, err := handler.GetHotRegions(typ) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, hotRegions) + return + } + + var ids []uint64 + for _, storeID := range storeIDs { + id, err := strconv.ParseUint(storeID, 10, 64) + if err != nil { + c.String(http.StatusBadRequest, fmt.Sprintf("invalid store id: %s", storeID)) + return + } + _, err = handler.GetStore(id) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + ids = append(ids, id) + } + + hotRegions, err := handler.GetHotRegions(typ, ids...) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, hotRegions) +} + +// @Tags hotspot +// @Summary List the hot stores. +// @Produce json +// @Success 200 {object} handler.HotStoreStats +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /hotspot/stores [get] +func getHotStores(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + stores, err := handler.GetHotStores() + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, stores) +} + +// @Tags hotspot +// @Summary List the hot buckets. +// @Produce json +// @Success 200 {object} handler.HotBucketsResponse +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /hotspot/buckets [get] +func getHotBuckets(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + + regionIDs := c.QueryArray("region_id") + ids := make([]uint64, len(regionIDs)) + for i, regionID := range regionIDs { + if id, err := strconv.ParseUint(regionID, 10, 64); err == nil { + ids[i] = id + } + } + ret, err := handler.GetHotBuckets(ids...) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, ret) +} + +// @Tags hotspot +// @Summary List the history hot regions. +// @Accept json +// @Produce json +// @Success 200 {object} storage.HistoryHotRegions +// @Failure 400 {string} string "The input is invalid." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /hotspot/regions/history [get] +func getHistoryHotRegions(c *gin.Context) { + // TODO: support history hotspot in scheduling server with stateless in the future. + // Ref: https://github.com/tikv/pd/pull/7183 + var res storage.HistoryHotRegions + c.IndentedJSON(http.StatusOK, res) +} + +// @Tags rule +// @Summary List all rules of cluster. +// @Produce json +// @Success 200 {array} placement.Rule +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/rules [get] +func getAllRules(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + manager, err := handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + rules := manager.GetAllRules() + c.IndentedJSON(http.StatusOK, rules) +} + +// @Tags rule +// @Summary List all rules of cluster by group. +// @Param group path string true "The name of group" +// @Produce json +// @Success 200 {array} placement.Rule +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/rules/group/{group} [get] +func getRuleByGroup(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + manager, err := handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + group := c.Param("group") + rules := manager.GetRulesByGroup(group) + c.IndentedJSON(http.StatusOK, rules) +} + +// @Tags rule +// @Summary List all rules of cluster by region. +// @Param id path integer true "Region Id" +// @Produce json +// @Success 200 {array} placement.Rule +// @Failure 400 {string} string "The input is invalid." +// @Failure 404 {string} string "The region does not exist." +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/rules/region/{region} [get] +func getRulesByRegion(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + manager, err := handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + regionStr := c.Param("region") + region, code, err := handler.PreCheckForRegion(regionStr) + if err != nil { + c.String(code, err.Error()) + return + } + rules := manager.GetRulesForApplyRegion(region) + c.IndentedJSON(http.StatusOK, rules) +} + +// @Tags rule +// @Summary List rules and matched peers related to the given region. +// @Param id path integer true "Region Id" +// @Produce json +// @Success 200 {object} placement.RegionFit +// @Failure 400 {string} string "The input is invalid." +// @Failure 404 {string} string "The region does not exist." +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/rules/region/{region}/detail [get] +func checkRegionPlacementRule(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + regionStr := c.Param("region") + region, code, err := handler.PreCheckForRegion(regionStr) + if err != nil { + c.String(code, err.Error()) + return + } + regionFit, err := handler.CheckRegionPlacementRule(region) + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, regionFit) +} + +// @Tags rule +// @Summary List all rules of cluster by key. +// @Param key path string true "The name of key" +// @Produce json +// @Success 200 {array} placement.Rule +// @Failure 400 {string} string "The input is invalid." +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/rules/key/{key} [get] +func getRulesByKey(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + manager, err := handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + keyHex := c.Param("key") + key, err := hex.DecodeString(keyHex) + if err != nil { + c.String(http.StatusBadRequest, errs.ErrKeyFormat.Error()) + return + } + rules := manager.GetRulesByKey(key) + c.IndentedJSON(http.StatusOK, rules) +} + +// @Tags rule +// @Summary Get rule of cluster by group and id. +// @Param group path string true "The name of group" +// @Param id path string true "Rule Id" +// @Produce json +// @Success 200 {object} placement.Rule +// @Failure 404 {string} string "The rule does not exist." +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Router /config/rule/{group}/{id} [get] +func getRuleByGroupAndID(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + manager, err := handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + group, id := c.Param("group"), c.Param("id") + rule := manager.GetRule(group, id) + if rule == nil { + c.String(http.StatusNotFound, errs.ErrRuleNotFound.Error()) + return + } + c.IndentedJSON(http.StatusOK, rule) +} + +// @Tags rule +// @Summary List all rule group configs. +// @Produce json +// @Success 200 {array} placement.RuleGroup +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/rule_groups [get] +func getAllGroupConfigs(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + manager, err := handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + ruleGroups := manager.GetRuleGroups() + c.IndentedJSON(http.StatusOK, ruleGroups) +} + +// @Tags rule +// @Summary Get rule group config by group id. +// @Param id path string true "Group Id" +// @Produce json +// @Success 200 {object} placement.RuleGroup +// @Failure 404 {string} string "The RuleGroup does not exist." +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/rule_groups/{id} [get] +func getGroupConfig(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + manager, err := handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + id := c.Param("id") + group := manager.GetRuleGroup(id) + if group == nil { + c.String(http.StatusNotFound, errs.ErrRuleNotFound.Error()) + return + } + c.IndentedJSON(http.StatusOK, group) +} + +// @Tags rule +// @Summary List all rules and groups configuration. +// @Produce json +// @Success 200 {array} placement.GroupBundle +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/placement-rules [get] +func getPlacementRules(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + manager, err := handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + bundles := manager.GetAllGroupBundles() + c.IndentedJSON(http.StatusOK, bundles) +} + +// @Tags rule +// @Summary Get group config and all rules belong to the group. +// @Param group path string true "The name of group" +// @Produce json +// @Success 200 {object} placement.GroupBundle +// @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/placement-rules/{group} [get] +func getPlacementRuleByGroup(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + manager, err := handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + c.String(http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + g := c.Param("group") + group := manager.GetGroupBundle(g) + c.IndentedJSON(http.StatusOK, group) +} diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 891a645053f..5031f07bd49 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -94,6 +94,12 @@ func (c *Cluster) GetHotStat() *statistics.HotStat { return c.hotStat } +// GetStoresStats returns stores' statistics from cluster. +// And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat +func (c *Cluster) GetStoresStats() *statistics.StoresStats { + return c.hotStat.StoresStats +} + // GetRegionStats gets region statistics. func (c *Cluster) GetRegionStats() *statistics.RegionStatistics { return c.regionStats diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 32788284b70..a647d252f02 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -364,7 +364,11 @@ func (s *Server) GetBasicCluster() *core.BasicCluster { // GetCoordinator returns the coordinator. func (s *Server) GetCoordinator() *schedule.Coordinator { - return s.GetCluster().GetCoordinator() + c := s.GetCluster() + if c == nil { + return nil + } + return c.GetCoordinator() } // ServerLoopWgDone decreases the server loop wait group. diff --git a/pkg/schedule/config/config_provider.go b/pkg/schedule/config/config_provider.go index f05b6ed3c96..20c7f0dc2cf 100644 --- a/pkg/schedule/config/config_provider.go +++ b/pkg/schedule/config/config_provider.go @@ -116,8 +116,10 @@ type SharedConfigProvider interface { IsWitnessAllowed() bool IsPlacementRulesCacheEnabled() bool SetHaltScheduling(bool, string) + GetHotRegionCacheHitsThreshold() int // for test purpose + SetPlacementRuleEnabled(bool) SetPlacementRulesCacheEnabled(bool) SetEnableWitness(bool) } diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 5b4ab8cdb4f..f35bd6d4de3 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -616,6 +616,25 @@ func (c *Coordinator) GetHotRegionsByType(typ utils.RWType) *statistics.StoreHot return infos } +// GetHotRegions gets hot regions' statistics by RWType and storeIDs. +// If storeIDs is empty, it returns all hot regions' statistics by RWType. +func (c *Coordinator) GetHotRegions(typ utils.RWType, storeIDs ...uint64) *statistics.StoreHotPeersInfos { + hotRegions := c.GetHotRegionsByType(typ) + if len(storeIDs) > 0 && hotRegions != nil { + asLeader := statistics.StoreHotPeersStat{} + asPeer := statistics.StoreHotPeersStat{} + for _, storeID := range storeIDs { + asLeader[storeID] = hotRegions.AsLeader[storeID] + asPeer[storeID] = hotRegions.AsPeer[storeID] + } + return &statistics.StoreHotPeersInfos{ + AsLeader: asLeader, + AsPeer: asPeer, + } + } + return hotRegions +} + // GetWaitGroup returns the wait group. Only for test purpose. func (c *Coordinator) GetWaitGroup() *sync.WaitGroup { return &c.wg diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index d45fd685fa2..c326c7fe716 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -18,6 +18,7 @@ import ( "bytes" "encoding/hex" "net/http" + "strconv" "strings" "time" @@ -34,6 +35,9 @@ import ( "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/schedule/scatter" "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/statistics/buckets" + "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/utils/typeutil" "go.uber.org/zap" ) @@ -42,7 +46,7 @@ import ( // TODO: remove it after GetCluster is unified between PD server and Scheduling server. type Server interface { GetCoordinator() *schedule.Coordinator - GetCluster() sche.SharedCluster + GetCluster() sche.SchedulerCluster } // Handler is a handler to handle http request about schedule. @@ -381,6 +385,9 @@ func (h *Handler) HandleOperatorCreation(input map[string]interface{}) (int, int // AddTransferLeaderOperator adds an operator to transfer leader to the store. func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) error { c := h.GetCluster() + if c == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } region := c.GetRegion(regionID) if region == nil { return errs.ErrRegionNotFound.FastGenByArgs(regionID) @@ -402,6 +409,9 @@ func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) err // AddTransferRegionOperator adds an operator to transfer region to the stores. func (h *Handler) AddTransferRegionOperator(regionID uint64, storeIDs map[uint64]placement.PeerRoleType) error { c := h.GetCluster() + if c == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } region := c.GetRegion(regionID) if region == nil { return errs.ErrRegionNotFound.FastGenByArgs(regionID) @@ -439,6 +449,9 @@ func (h *Handler) AddTransferRegionOperator(regionID uint64, storeIDs map[uint64 // AddTransferPeerOperator adds an operator to transfer peer. func (h *Handler) AddTransferPeerOperator(regionID uint64, fromStoreID, toStoreID uint64) error { c := h.GetCluster() + if c == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } region := c.GetRegion(regionID) if region == nil { return errs.ErrRegionNotFound.FastGenByArgs(regionID) @@ -465,6 +478,9 @@ func (h *Handler) AddTransferPeerOperator(regionID uint64, fromStoreID, toStoreI // checkAdminAddPeerOperator checks adminAddPeer operator with given region ID and store ID. func (h *Handler) checkAdminAddPeerOperator(regionID uint64, toStoreID uint64) (sche.SharedCluster, *core.RegionInfo, error) { c := h.GetCluster() + if c == nil { + return nil, nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } region := c.GetRegion(regionID) if region == nil { return nil, nil, errs.ErrRegionNotFound.FastGenByArgs(regionID) @@ -520,6 +536,9 @@ func (h *Handler) AddAddLearnerOperator(regionID uint64, toStoreID uint64) error // AddRemovePeerOperator adds an operator to remove peer. func (h *Handler) AddRemovePeerOperator(regionID uint64, fromStoreID uint64) error { c := h.GetCluster() + if c == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } region := c.GetRegion(regionID) if region == nil { return errs.ErrRegionNotFound.FastGenByArgs(regionID) @@ -540,6 +559,9 @@ func (h *Handler) AddRemovePeerOperator(regionID uint64, fromStoreID uint64) err // AddMergeRegionOperator adds an operator to merge region. func (h *Handler) AddMergeRegionOperator(regionID uint64, targetID uint64) error { c := h.GetCluster() + if c == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } region := c.GetRegion(regionID) if region == nil { return errs.ErrRegionNotFound.FastGenByArgs(regionID) @@ -575,6 +597,9 @@ func (h *Handler) AddMergeRegionOperator(regionID uint64, targetID uint64) error // AddSplitRegionOperator adds an operator to split a region. func (h *Handler) AddSplitRegionOperator(regionID uint64, policyStr string, keys []string) error { c := h.GetCluster() + if c == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } region := c.GetRegion(regionID) if region == nil { return errs.ErrRegionNotFound.FastGenByArgs(regionID) @@ -607,6 +632,9 @@ func (h *Handler) AddSplitRegionOperator(regionID uint64, policyStr string, keys // AddScatterRegionOperator adds an operator to scatter a region. func (h *Handler) AddScatterRegionOperator(regionID uint64, group string) error { c := h.GetCluster() + if c == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } region := c.GetRegion(regionID) if region == nil { return errs.ErrRegionNotFound.FastGenByArgs(regionID) @@ -871,3 +899,187 @@ func (h *Handler) PauseOrResumeChecker(name string, t int64) (err error) { } return err } + +// GetStore returns a store. +// If store does not exist, return error. +func (h *Handler) GetStore(storeID uint64) (*core.StoreInfo, error) { + c := h.GetCluster() + if c == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + store := c.GetStore(storeID) + if store == nil { + return nil, errs.ErrStoreNotFound.FastGenByArgs(storeID) + } + return store, nil +} + +// GetStores returns all stores in the cluster. +func (h *Handler) GetStores() ([]*core.StoreInfo, error) { + c := h.GetCluster() + if c == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + storeMetas := c.GetStores() + stores := make([]*core.StoreInfo, 0, len(storeMetas)) + for _, store := range storeMetas { + store, err := h.GetStore(store.GetID()) + if err != nil { + return nil, err + } + stores = append(stores, store) + } + return stores, nil +} + +// GetHotRegions gets hot regions' statistics by RWType and storeIDs. +// If storeIDs is empty, it returns all hot regions' statistics by RWType. +func (h *Handler) GetHotRegions(typ utils.RWType, storeIDs ...uint64) (*statistics.StoreHotPeersInfos, error) { + co := h.GetCoordinator() + if co == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + return co.GetHotRegions(typ, storeIDs...), nil +} + +// HotStoreStats is used to record the status of hot stores. +type HotStoreStats struct { + BytesWriteStats map[uint64]float64 `json:"bytes-write-rate,omitempty"` + BytesReadStats map[uint64]float64 `json:"bytes-read-rate,omitempty"` + KeysWriteStats map[uint64]float64 `json:"keys-write-rate,omitempty"` + KeysReadStats map[uint64]float64 `json:"keys-read-rate,omitempty"` + QueryWriteStats map[uint64]float64 `json:"query-write-rate,omitempty"` + QueryReadStats map[uint64]float64 `json:"query-read-rate,omitempty"` +} + +// GetHotStores gets all hot stores stats. +func (h *Handler) GetHotStores() (*HotStoreStats, error) { + stats := &HotStoreStats{ + BytesWriteStats: make(map[uint64]float64), + BytesReadStats: make(map[uint64]float64), + KeysWriteStats: make(map[uint64]float64), + KeysReadStats: make(map[uint64]float64), + QueryWriteStats: make(map[uint64]float64), + QueryReadStats: make(map[uint64]float64), + } + stores, error := h.GetStores() + if error != nil { + return nil, error + } + storesLoads, error := h.GetStoresLoads() + if error != nil { + return nil, error + } + for _, store := range stores { + id := store.GetID() + if loads, ok := storesLoads[id]; ok { + if store.IsTiFlash() { + stats.BytesWriteStats[id] = loads[utils.StoreRegionsWriteBytes] + stats.KeysWriteStats[id] = loads[utils.StoreRegionsWriteKeys] + } else { + stats.BytesWriteStats[id] = loads[utils.StoreWriteBytes] + stats.KeysWriteStats[id] = loads[utils.StoreWriteKeys] + } + stats.BytesReadStats[id] = loads[utils.StoreReadBytes] + stats.KeysReadStats[id] = loads[utils.StoreReadKeys] + stats.QueryWriteStats[id] = loads[utils.StoreWriteQuery] + stats.QueryReadStats[id] = loads[utils.StoreReadQuery] + } + } + return stats, nil +} + +// GetStoresLoads gets all hot write stores stats. +func (h *Handler) GetStoresLoads() (map[uint64][]float64, error) { + c := h.GetCluster() + if c == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + return c.GetStoresLoads(), nil +} + +// HotBucketsResponse is the response for hot buckets. +type HotBucketsResponse map[uint64][]*HotBucketsItem + +// HotBucketsItem is the item of hot buckets. +type HotBucketsItem struct { + StartKey string `json:"start_key"` + EndKey string `json:"end_key"` + HotDegree int `json:"hot_degree"` + ReadBytes uint64 `json:"read_bytes"` + ReadKeys uint64 `json:"read_keys"` + WriteBytes uint64 `json:"write_bytes"` + WriteKeys uint64 `json:"write_keys"` +} + +func convert(buckets *buckets.BucketStat) *HotBucketsItem { + return &HotBucketsItem{ + StartKey: core.HexRegionKeyStr(buckets.StartKey), + EndKey: core.HexRegionKeyStr(buckets.EndKey), + HotDegree: buckets.HotDegree, + ReadBytes: buckets.Loads[utils.RegionReadBytes], + ReadKeys: buckets.Loads[utils.RegionReadKeys], + WriteBytes: buckets.Loads[utils.RegionWriteBytes], + WriteKeys: buckets.Loads[utils.RegionWriteKeys], + } +} + +// GetHotBuckets returns all hot buckets stats. +func (h *Handler) GetHotBuckets(regionIDs ...uint64) (HotBucketsResponse, error) { + c := h.GetCluster() + if c == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + degree := c.GetSharedConfig().GetHotRegionCacheHitsThreshold() + stats := c.BucketsStats(degree, regionIDs...) + ret := HotBucketsResponse{} + for regionID, stats := range stats { + ret[regionID] = make([]*HotBucketsItem, len(stats)) + for i, stat := range stats { + ret[regionID][i] = convert(stat) + } + } + return ret, nil +} + +// GetRuleManager returns the rule manager. +func (h *Handler) GetRuleManager() (*placement.RuleManager, error) { + c := h.GetCluster() + if c == nil { + return nil, errs.ErrNotBootstrapped + } + if !c.GetSharedConfig().IsPlacementRulesEnabled() { + return nil, errs.ErrPlacementDisabled + } + return c.GetRuleManager(), nil +} + +// PreCheckForRegion checks if the region is valid. +func (h *Handler) PreCheckForRegion(regionStr string) (*core.RegionInfo, int, error) { + c := h.GetCluster() + if c == nil { + return nil, http.StatusInternalServerError, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + regionID, err := strconv.ParseUint(regionStr, 10, 64) + if err != nil { + return nil, http.StatusBadRequest, errs.ErrRegionInvalidID.FastGenByArgs() + } + region := c.GetRegion(regionID) + if region == nil { + return nil, http.StatusNotFound, errs.ErrRegionNotFound.FastGenByArgs(regionID) + } + return region, http.StatusOK, nil +} + +// CheckRegionPlacementRule checks if the region matches the placement rules. +func (h *Handler) CheckRegionPlacementRule(region *core.RegionInfo) (*placement.RegionFit, error) { + c := h.GetCluster() + if c == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + manager, err := h.GetRuleManager() + if err != nil { + return nil, err + } + return manager.FitRegion(c, region), nil +} diff --git a/pkg/storage/hot_region_storage.go b/pkg/storage/hot_region_storage.go index 690422b78d0..0393035c85b 100644 --- a/pkg/storage/hot_region_storage.go +++ b/pkg/storage/hot_region_storage.go @@ -33,6 +33,7 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/encryption" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" @@ -45,12 +46,12 @@ import ( // Close() must be called after the use. type HotRegionStorage struct { *kv.LevelDBKV - ekm *encryption.Manager - hotRegionLoopWg sync.WaitGroup - batchHotInfo map[string]*HistoryHotRegion - hotRegionInfoCtx context.Context - hotRegionInfoCancel context.CancelFunc - hotRegionStorageHandler HotRegionStorageHandler + ekm *encryption.Manager + hotRegionLoopWg sync.WaitGroup + batchHotInfo map[string]*HistoryHotRegion + hotRegionInfoCtx context.Context + hotRegionInfoCancel context.CancelFunc + hotRegionStorageHelper HotRegionStorageHelper curReservedDays uint64 curInterval time.Duration @@ -88,12 +89,10 @@ type HistoryHotRegion struct { EncryptionMeta *encryptionpb.EncryptionMeta `json:"encryption_meta,omitempty"` } -// HotRegionStorageHandler help hot region storage get hot region info. -type HotRegionStorageHandler interface { - // PackHistoryHotReadRegions get read hot region info in HistoryHotRegion form. - PackHistoryHotReadRegions() ([]HistoryHotRegion, error) - // PackHistoryHotWriteRegions get write hot region info in HistoryHotRegion form. - PackHistoryHotWriteRegions() ([]HistoryHotRegion, error) +// HotRegionStorageHelper help hot region storage get hot region info. +type HotRegionStorageHelper interface { + // GetHistoryHotRegions get hot region info in HistoryHotRegion form. + GetHistoryHotRegions(typ utils.RWType) ([]HistoryHotRegion, error) // IsLeader return true means this server is leader. IsLeader() bool // GetHotRegionsWriteInterval gets interval for PD to store Hot Region information. @@ -107,30 +106,10 @@ const ( defaultDeleteTime = 4 ) -// HotRegionType stands for hot type. -type HotRegionType uint32 - -// Flags for flow. -const ( - WriteType HotRegionType = iota - ReadType -) - // HotRegionTypes stands for hot type. var HotRegionTypes = []string{ - WriteType.String(), - ReadType.String(), -} - -// String return HotRegionType in string format. -func (h HotRegionType) String() string { - switch h { - case WriteType: - return "write" - case ReadType: - return "read" - } - return "unimplemented" + utils.Read.String(), + utils.Write.String(), } // NewHotRegionsStorage create storage to store hot regions info. @@ -138,7 +117,7 @@ func NewHotRegionsStorage( ctx context.Context, filePath string, ekm *encryption.Manager, - hotRegionStorageHandler HotRegionStorageHandler, + hotRegionStorageHelper HotRegionStorageHelper, ) (*HotRegionStorage, error) { levelDB, err := kv.NewLevelDBKV(filePath) if err != nil { @@ -146,14 +125,14 @@ func NewHotRegionsStorage( } hotRegionInfoCtx, hotRegionInfoCancel := context.WithCancel(ctx) h := HotRegionStorage{ - LevelDBKV: levelDB, - ekm: ekm, - batchHotInfo: make(map[string]*HistoryHotRegion), - hotRegionInfoCtx: hotRegionInfoCtx, - hotRegionInfoCancel: hotRegionInfoCancel, - hotRegionStorageHandler: hotRegionStorageHandler, - curReservedDays: hotRegionStorageHandler.GetHotRegionsReservedDays(), - curInterval: hotRegionStorageHandler.GetHotRegionsWriteInterval(), + LevelDBKV: levelDB, + ekm: ekm, + batchHotInfo: make(map[string]*HistoryHotRegion), + hotRegionInfoCtx: hotRegionInfoCtx, + hotRegionInfoCancel: hotRegionInfoCancel, + hotRegionStorageHelper: hotRegionStorageHelper, + curReservedDays: hotRegionStorageHelper.GetHotRegionsReservedDays(), + curInterval: hotRegionStorageHelper.GetHotRegionsWriteInterval(), } h.hotRegionLoopWg.Add(2) go h.backgroundFlush() @@ -218,7 +197,7 @@ func (h *HotRegionStorage) backgroundFlush() { if h.getCurReservedDays() == 0 { continue } - if h.hotRegionStorageHandler.IsLeader() { + if h.hotRegionStorageHelper.IsLeader() { if err := h.pullHotRegionInfo(); err != nil { log.Error("get hot_region stat meet error", errs.ZapError(err)) } @@ -259,19 +238,18 @@ func (h *HotRegionStorage) Close() error { } func (h *HotRegionStorage) pullHotRegionInfo() error { - historyHotReadRegions, err := h.hotRegionStorageHandler.PackHistoryHotReadRegions() + historyHotReadRegions, err := h.hotRegionStorageHelper.GetHistoryHotRegions(utils.Read) if err != nil { return err } - if err := h.packHistoryHotRegions(historyHotReadRegions, ReadType.String()); err != nil { + if err := h.packHistoryHotRegions(historyHotReadRegions, utils.Read.String()); err != nil { return err } - historyHotWriteRegions, err := h.hotRegionStorageHandler.PackHistoryHotWriteRegions() + historyHotWriteRegions, err := h.hotRegionStorageHelper.GetHistoryHotRegions(utils.Write) if err != nil { return err } - err = h.packHistoryHotRegions(historyHotWriteRegions, WriteType.String()) - return err + return h.packHistoryHotRegions(historyHotWriteRegions, utils.Write.String()) } func (h *HotRegionStorage) packHistoryHotRegions(historyHotRegions []HistoryHotRegion, hotRegionType string) error { @@ -297,7 +275,7 @@ func (h *HotRegionStorage) packHistoryHotRegions(historyHotRegions []HistoryHotR func (h *HotRegionStorage) updateInterval() { h.mu.Lock() defer h.mu.Unlock() - interval := h.hotRegionStorageHandler.GetHotRegionsWriteInterval() + interval := h.hotRegionStorageHelper.GetHotRegionsWriteInterval() if interval != h.curInterval { log.Info("hot region write interval changed", zap.Duration("previous-interval", h.curInterval), @@ -315,7 +293,7 @@ func (h *HotRegionStorage) getCurInterval() time.Duration { func (h *HotRegionStorage) updateReservedDays() { h.mu.Lock() defer h.mu.Unlock() - reservedDays := h.hotRegionStorageHandler.GetHotRegionsReservedDays() + reservedDays := h.hotRegionStorageHelper.GetHotRegionsReservedDays() if reservedDays != h.curReservedDays { log.Info("hot region reserved days changed", zap.Uint64("previous-reserved-days", h.curReservedDays), diff --git a/pkg/storage/hot_region_storage_test.go b/pkg/storage/hot_region_storage_test.go index 2de7d66fe1b..629c638c1ff 100644 --- a/pkg/storage/hot_region_storage_test.go +++ b/pkg/storage/hot_region_storage_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/statistics/utils" ) type MockPackHotRegionInfo struct { @@ -36,18 +37,18 @@ type MockPackHotRegionInfo struct { pullInterval time.Duration } -// PackHistoryHotWriteRegions get read hot region info in HistoryHotRegion from. -func (m *MockPackHotRegionInfo) PackHistoryHotReadRegions() ([]HistoryHotRegion, error) { - result := make([]HistoryHotRegion, len(m.historyHotReads)) - copy(result, m.historyHotReads) - return result, nil -} - -// PackHistoryHotWriteRegions get write hot region info in HistoryHotRegion form. -func (m *MockPackHotRegionInfo) PackHistoryHotWriteRegions() ([]HistoryHotRegion, error) { - result := make([]HistoryHotRegion, len(m.historyHotWrites)) - copy(result, m.historyHotWrites) - return result, nil +// GetHistoryHotRegions get hot region info in HistoryHotRegion form. +func (m *MockPackHotRegionInfo) GetHistoryHotRegions(typ utils.RWType) ([]HistoryHotRegion, error) { + switch typ { + case utils.Write: + result := make([]HistoryHotRegion, len(m.historyHotWrites)) + copy(result, m.historyHotWrites) + return result, nil + default: // case utils.Read: + result := make([]HistoryHotRegion, len(m.historyHotReads)) + copy(result, m.historyHotReads) + return result, nil + } } // IsLeader return isLeader. @@ -115,7 +116,7 @@ func TestHotRegionWrite(t *testing.T) { UpdateTime: now.UnixNano() / int64(time.Millisecond), RegionID: 1, StoreID: 1, - HotRegionType: ReadType.String(), + HotRegionType: utils.Read.String(), StartKey: string([]byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, 0x15, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0xff, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0xfa}), EndKey: string([]byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, 0x15, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0xff, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0xfa}), }, @@ -123,7 +124,7 @@ func TestHotRegionWrite(t *testing.T) { UpdateTime: now.Add(10*time.Second).UnixNano() / int64(time.Millisecond), RegionID: 2, StoreID: 1, - HotRegionType: ReadType.String(), + HotRegionType: utils.Read.String(), StartKey: string([]byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, 0x15, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0xff, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0xfa}), EndKey: string([]byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, 0x15, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0xff, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0xfa}), }, @@ -131,7 +132,7 @@ func TestHotRegionWrite(t *testing.T) { UpdateTime: now.Add(20*time.Second).UnixNano() / int64(time.Millisecond), RegionID: 3, StoreID: 1, - HotRegionType: ReadType.String(), + HotRegionType: utils.Read.String(), StartKey: string([]byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, 0x83, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0xff, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0xfa}), EndKey: string([]byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, 0x83, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0xff, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0xfa}), }, @@ -149,12 +150,12 @@ func TestHotRegionWrite(t *testing.T) { UpdateTime: now.Add(30*time.Second).UnixNano() / int64(time.Millisecond), RegionID: 4, StoreID: 1, - HotRegionType: WriteType.String(), + HotRegionType: utils.Write.String(), }, } store.pullHotRegionInfo() store.flush() - iter := store.NewIterator([]string{ReadType.String()}, + iter := store.NewIterator([]string{utils.Read.String()}, now.UnixNano()/int64(time.Millisecond), now.Add(40*time.Second).UnixNano()/int64(time.Millisecond)) index := 0 @@ -182,7 +183,7 @@ func TestHotRegionDelete(t *testing.T) { historyHotRegion := HistoryHotRegion{ UpdateTime: deleteDate.UnixNano() / int64(time.Millisecond), RegionID: 1, - HotRegionType: ReadType.String(), + HotRegionType: utils.Read.String(), } historyHotRegions = append(historyHotRegions, historyHotRegion) deleteDate = deleteDate.AddDate(0, 0, -1) diff --git a/server/api/hot_status.go b/server/api/hot_status.go index 7779591de1f..f352e21254b 100644 --- a/server/api/hot_status.go +++ b/server/api/hot_status.go @@ -21,11 +21,7 @@ import ( "net/http" "strconv" - "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/statistics/utils" - "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/server" "github.com/unrolled/render" ) @@ -35,55 +31,6 @@ type hotStatusHandler struct { rd *render.Render } -// HotBucketsResponse is the response for hot buckets. -type HotBucketsResponse map[uint64][]*HotBucketsItem - -// HotBucketsItem is the item of hot buckets. -type HotBucketsItem struct { - StartKey string `json:"start_key"` - EndKey string `json:"end_key"` - HotDegree int `json:"hot_degree"` - ReadBytes uint64 `json:"read_bytes"` - ReadKeys uint64 `json:"read_keys"` - WriteBytes uint64 `json:"write_bytes"` - WriteKeys uint64 `json:"write_keys"` -} - -func convert(buckets *buckets.BucketStat) *HotBucketsItem { - return &HotBucketsItem{ - StartKey: core.HexRegionKeyStr(buckets.StartKey), - EndKey: core.HexRegionKeyStr(buckets.EndKey), - HotDegree: buckets.HotDegree, - ReadBytes: buckets.Loads[utils.RegionReadBytes], - ReadKeys: buckets.Loads[utils.RegionReadKeys], - WriteBytes: buckets.Loads[utils.RegionWriteBytes], - WriteKeys: buckets.Loads[utils.RegionWriteKeys], - } -} - -// HotStoreStats is used to record the status of hot stores. -type HotStoreStats struct { - BytesWriteStats map[uint64]float64 `json:"bytes-write-rate,omitempty"` - BytesReadStats map[uint64]float64 `json:"bytes-read-rate,omitempty"` - KeysWriteStats map[uint64]float64 `json:"keys-write-rate,omitempty"` - KeysReadStats map[uint64]float64 `json:"keys-read-rate,omitempty"` - QueryWriteStats map[uint64]float64 `json:"query-write-rate,omitempty"` - QueryReadStats map[uint64]float64 `json:"query-read-rate,omitempty"` -} - -// HistoryHotRegionsRequest wrap request condition from tidb. -// it is request from tidb -type HistoryHotRegionsRequest struct { - StartTime int64 `json:"start_time,omitempty"` - EndTime int64 `json:"end_time,omitempty"` - RegionIDs []uint64 `json:"region_ids,omitempty"` - StoreIDs []uint64 `json:"store_ids,omitempty"` - PeerIDs []uint64 `json:"peer_ids,omitempty"` - IsLearners []bool `json:"is_learners,omitempty"` - IsLeaders []bool `json:"is_leaders,omitempty"` - HotRegionTypes []string `json:"hot_region_type,omitempty"` -} - func newHotStatusHandler(handler *server.Handler, rd *render.Render) *hotStatusHandler { return &hotStatusHandler{ Handler: handler, @@ -95,53 +42,33 @@ func newHotStatusHandler(handler *server.Handler, rd *render.Render) *hotStatusH // @Summary List the hot write regions. // @Produce json // @Success 200 {object} statistics.StoreHotPeersInfos +// @Failure 400 {string} string "The request is invalid." +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /hotspot/regions/write [get] func (h *hotStatusHandler) GetHotWriteRegions(w http.ResponseWriter, r *http.Request) { - storeIDs := r.URL.Query()["store_id"] - if len(storeIDs) < 1 { - h.rd.JSON(w, http.StatusOK, h.Handler.GetHotWriteRegions()) - return - } - - rc, err := h.GetRaftCluster() - if rc == nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - - var ids []uint64 - for _, storeID := range storeIDs { - id, err := strconv.ParseUint(storeID, 10, 64) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, fmt.Sprintf("invalid store id: %s", storeID)) - return - } - store := rc.GetStore(id) - if store == nil { - h.rd.JSON(w, http.StatusNotFound, errs.ErrStoreNotFound.FastGenByArgs(id).Error()) - return - } - ids = append(ids, id) - } - - h.rd.JSON(w, http.StatusOK, rc.GetHotWriteRegions(ids...)) + h.getHotRegions(utils.Write, w, r) } // @Tags hotspot // @Summary List the hot read regions. // @Produce json // @Success 200 {object} statistics.StoreHotPeersInfos +// @Failure 400 {string} string "The request is invalid." +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /hotspot/regions/read [get] func (h *hotStatusHandler) GetHotReadRegions(w http.ResponseWriter, r *http.Request) { + h.getHotRegions(utils.Read, w, r) +} + +func (h *hotStatusHandler) getHotRegions(typ utils.RWType, w http.ResponseWriter, r *http.Request) { storeIDs := r.URL.Query()["store_id"] if len(storeIDs) < 1 { - h.rd.JSON(w, http.StatusOK, h.Handler.GetHotReadRegions()) - return - } - - rc, err := h.GetRaftCluster() - if rc == nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + hotRegions, err := h.GetHotRegions(typ) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, hotRegions) return } @@ -152,48 +79,33 @@ func (h *hotStatusHandler) GetHotReadRegions(w http.ResponseWriter, r *http.Requ h.rd.JSON(w, http.StatusBadRequest, fmt.Sprintf("invalid store id: %s", storeID)) return } - store := rc.GetStore(id) - if store == nil { - h.rd.JSON(w, http.StatusNotFound, errs.ErrStoreNotFound.FastGenByArgs(id).Error()) + _, err = h.GetStore(id) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } ids = append(ids, id) } - h.rd.JSON(w, http.StatusOK, rc.GetHotReadRegions(ids...)) + hotRegions, err := h.GetHotRegions(typ, ids...) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, hotRegions) } // @Tags hotspot // @Summary List the hot stores. // @Produce json -// @Success 200 {object} HotStoreStats +// @Success 200 {object} handler.HotStoreStats +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /hotspot/stores [get] func (h *hotStatusHandler) GetHotStores(w http.ResponseWriter, r *http.Request) { - stats := HotStoreStats{ - BytesWriteStats: make(map[uint64]float64), - BytesReadStats: make(map[uint64]float64), - KeysWriteStats: make(map[uint64]float64), - KeysReadStats: make(map[uint64]float64), - QueryWriteStats: make(map[uint64]float64), - QueryReadStats: make(map[uint64]float64), - } - stores, _ := h.GetStores() - storesLoads := h.GetStoresLoads() - for _, store := range stores { - id := store.GetID() - if loads, ok := storesLoads[id]; ok { - if store.IsTiFlash() { - stats.BytesWriteStats[id] = loads[utils.StoreRegionsWriteBytes] - stats.KeysWriteStats[id] = loads[utils.StoreRegionsWriteKeys] - } else { - stats.BytesWriteStats[id] = loads[utils.StoreWriteBytes] - stats.KeysWriteStats[id] = loads[utils.StoreWriteKeys] - } - stats.BytesReadStats[id] = loads[utils.StoreReadBytes] - stats.KeysReadStats[id] = loads[utils.StoreReadKeys] - stats.QueryWriteStats[id] = loads[utils.StoreWriteQuery] - stats.QueryReadStats[id] = loads[utils.StoreReadQuery] - } + stats, err := h.Handler.GetHotStores() + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return } h.rd.JSON(w, http.StatusOK, stats) } @@ -201,7 +113,8 @@ func (h *hotStatusHandler) GetHotStores(w http.ResponseWriter, r *http.Request) // @Tags hotspot // @Summary List the hot buckets. // @Produce json -// @Success 200 {object} HotBucketsResponse +// @Success 200 {object} handler.HotBucketsResponse +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /hotspot/buckets [get] func (h *hotStatusHandler) GetHotBuckets(w http.ResponseWriter, r *http.Request) { regionIDs := r.URL.Query()["region_id"] @@ -211,13 +124,10 @@ func (h *hotStatusHandler) GetHotBuckets(w http.ResponseWriter, r *http.Request) ids[i] = id } } - stats := h.Handler.GetHotBuckets(ids...) - ret := HotBucketsResponse{} - for regionID, stats := range stats { - ret[regionID] = make([]*HotBucketsItem, len(stats)) - for i, stat := range stats { - ret[regionID][i] = convert(stat) - } + ret, err := h.Handler.GetHotBuckets(ids...) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return } h.rd.JSON(w, http.StatusOK, ret) } @@ -237,66 +147,16 @@ func (h *hotStatusHandler) GetHistoryHotRegions(w http.ResponseWriter, r *http.R h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - historyHotRegionsRequest := &HistoryHotRegionsRequest{} + historyHotRegionsRequest := &server.HistoryHotRegionsRequest{} err = json.Unmarshal(data, historyHotRegionsRequest) if err != nil { h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - results, err := getAllRequestHistoryHotRegion(h.Handler, historyHotRegionsRequest) + results, err := h.GetAllRequestHistoryHotRegion(historyHotRegionsRequest) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } h.rd.JSON(w, http.StatusOK, results) } - -func getAllRequestHistoryHotRegion(handler *server.Handler, request *HistoryHotRegionsRequest) (*storage.HistoryHotRegions, error) { - var hotRegionTypes = storage.HotRegionTypes - if len(request.HotRegionTypes) != 0 { - hotRegionTypes = request.HotRegionTypes - } - iter := handler.GetHistoryHotRegionIter(hotRegionTypes, request.StartTime, request.EndTime) - var results []*storage.HistoryHotRegion - regionSet, storeSet, peerSet, learnerSet, leaderSet := - make(map[uint64]bool), make(map[uint64]bool), - make(map[uint64]bool), make(map[bool]bool), make(map[bool]bool) - for _, id := range request.RegionIDs { - regionSet[id] = true - } - for _, id := range request.StoreIDs { - storeSet[id] = true - } - for _, id := range request.PeerIDs { - peerSet[id] = true - } - for _, isLearner := range request.IsLearners { - learnerSet[isLearner] = true - } - for _, isLeader := range request.IsLeaders { - leaderSet[isLeader] = true - } - var next *storage.HistoryHotRegion - var err error - for next, err = iter.Next(); next != nil && err == nil; next, err = iter.Next() { - if len(regionSet) != 0 && !regionSet[next.RegionID] { - continue - } - if len(storeSet) != 0 && !storeSet[next.StoreID] { - continue - } - if len(peerSet) != 0 && !peerSet[next.PeerID] { - continue - } - if !learnerSet[next.IsLearner] { - continue - } - if !leaderSet[next.IsLeader] { - continue - } - results = append(results, next) - } - return &storage.HistoryHotRegions{ - HistoryHotRegion: results, - }, err -} diff --git a/server/api/hot_status_test.go b/server/api/hot_status_test.go index d3d495f86fa..8dace1975da 100644 --- a/server/api/hot_status_test.go +++ b/server/api/hot_status_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/syndtr/goleveldb/leveldb" + "github.com/tikv/pd/pkg/schedule/handler" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/storage/kv" tu "github.com/tikv/pd/pkg/utils/testutil" @@ -56,13 +57,13 @@ func (suite *hotStatusTestSuite) TearDownSuite() { } func (suite *hotStatusTestSuite) TestGetHotStore() { - stat := HotStoreStats{} + stat := handler.HotStoreStats{} err := tu.ReadGetJSON(suite.Require(), testDialClient, suite.urlPrefix+"/stores", &stat) suite.NoError(err) } func (suite *hotStatusTestSuite) TestGetHistoryHotRegionsBasic() { - request := HistoryHotRegionsRequest{ + request := server.HistoryHotRegionsRequest{ StartTime: 0, EndTime: time.Now().AddDate(0, 2, 0).UnixNano() / int64(time.Millisecond), } @@ -89,7 +90,7 @@ func (suite *hotStatusTestSuite) TestGetHistoryHotRegionsTimeRange() { UpdateTime: now.Add(10*time.Minute).UnixNano() / int64(time.Millisecond), }, } - request := HistoryHotRegionsRequest{ + request := server.HistoryHotRegionsRequest{ StartTime: now.UnixNano() / int64(time.Millisecond), EndTime: now.Add(10*time.Second).UnixNano() / int64(time.Millisecond), } @@ -169,7 +170,7 @@ func (suite *hotStatusTestSuite) TestGetHistoryHotRegionsIDAndTypes() { UpdateTime: now.Add(50*time.Second).UnixNano() / int64(time.Millisecond), }, } - request := HistoryHotRegionsRequest{ + request := server.HistoryHotRegionsRequest{ RegionIDs: []uint64{1}, StoreIDs: []uint64{1}, PeerIDs: []uint64{1}, diff --git a/server/api/rule.go b/server/api/rule.go index b3a720ece41..116a50f6259 100644 --- a/server/api/rule.go +++ b/server/api/rule.go @@ -19,30 +19,26 @@ import ( "fmt" "net/http" "net/url" - "strconv" "github.com/gorilla/mux" - "github.com/pingcap/errors" - "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/server" - "github.com/tikv/pd/server/cluster" "github.com/unrolled/render" ) -var errPlacementDisabled = errors.New("placement rules feature is disabled") - type ruleHandler struct { + *server.Handler svr *server.Server rd *render.Render } func newRulesHandler(svr *server.Server, rd *render.Render) *ruleHandler { return &ruleHandler{ - svr: svr, - rd: rd, + Handler: svr.GetHandler(), + svr: svr, + rd: rd, } } @@ -51,14 +47,19 @@ func newRulesHandler(svr *server.Server, rd *render.Render) *ruleHandler { // @Produce json // @Success 200 {array} placement.Rule // @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /config/rules [get] func (h *ruleHandler) GetAllRules(w http.ResponseWriter, r *http.Request) { - cluster := getCluster(r) - if !cluster.GetOpts().IsPlacementRulesEnabled() { - h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error()) + manager, err := h.Handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + h.rd.JSON(w, http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - rules := cluster.GetRuleManager().GetAllRules() + rules := manager.GetAllRules() h.rd.JSON(w, http.StatusOK, rules) } @@ -72,9 +73,13 @@ func (h *ruleHandler) GetAllRules(w http.ResponseWriter, r *http.Request) { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /config/rules [post] func (h *ruleHandler) SetAllRules(w http.ResponseWriter, r *http.Request) { - cluster := getCluster(r) - if !cluster.GetOpts().IsPlacementRulesEnabled() { - h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error()) + manager, err := h.Handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + h.rd.JSON(w, http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } var rules []*placement.Rule @@ -87,7 +92,7 @@ func (h *ruleHandler) SetAllRules(w http.ResponseWriter, r *http.Request) { return } } - if err := cluster.GetRuleManager().SetKeyType(h.svr.GetConfig().PDServerCfg.KeyType). + if err := manager.SetKeyType(h.svr.GetConfig().PDServerCfg.KeyType). SetRules(rules); err != nil { if errs.ErrRuleContent.Equal(err) || errs.ErrHexDecodingString.Equal(err) { h.rd.JSON(w, http.StatusBadRequest, err.Error()) @@ -105,15 +110,20 @@ func (h *ruleHandler) SetAllRules(w http.ResponseWriter, r *http.Request) { // @Produce json // @Success 200 {array} placement.Rule // @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /config/rules/group/{group} [get] func (h *ruleHandler) GetRuleByGroup(w http.ResponseWriter, r *http.Request) { - cluster := getCluster(r) - if !cluster.GetOpts().IsPlacementRulesEnabled() { - h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error()) + manager, err := h.Handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + h.rd.JSON(w, http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } group := mux.Vars(r)["group"] - rules := cluster.GetRuleManager().GetRulesByGroup(group) + rules := manager.GetRulesByGroup(group) h.rd.JSON(w, http.StatusOK, rules) } @@ -125,13 +135,25 @@ func (h *ruleHandler) GetRuleByGroup(w http.ResponseWriter, r *http.Request) { // @Failure 400 {string} string "The input is invalid." // @Failure 404 {string} string "The region does not exist." // @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /config/rules/region/{region} [get] func (h *ruleHandler) GetRulesByRegion(w http.ResponseWriter, r *http.Request) { - cluster, region := h.preCheckForRegionAndRule(w, r) - if cluster == nil || region == nil { + manager, err := h.Handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + h.rd.JSON(w, http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - rules := cluster.GetRuleManager().GetRulesForApplyRegion(region) + regionStr := mux.Vars(r)["region"] + region, code, err := h.PreCheckForRegion(regionStr) + if err != nil { + h.rd.JSON(w, code, err.Error()) + return + } + rules := manager.GetRulesForApplyRegion(region) h.rd.JSON(w, http.StatusOK, rules) } @@ -143,34 +165,25 @@ func (h *ruleHandler) GetRulesByRegion(w http.ResponseWriter, r *http.Request) { // @Failure 400 {string} string "The input is invalid." // @Failure 404 {string} string "The region does not exist." // @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /config/rules/region/{region}/detail [get] func (h *ruleHandler) CheckRegionPlacementRule(w http.ResponseWriter, r *http.Request) { - cluster, region := h.preCheckForRegionAndRule(w, r) - if cluster == nil || region == nil { + regionStr := mux.Vars(r)["region"] + region, code, err := h.PreCheckForRegion(regionStr) + if err != nil { + h.rd.JSON(w, code, err.Error()) return } - regionFit := cluster.GetRuleManager().FitRegion(cluster, region) - h.rd.JSON(w, http.StatusOK, regionFit) -} - -func (h *ruleHandler) preCheckForRegionAndRule(w http.ResponseWriter, r *http.Request) (*cluster.RaftCluster, *core.RegionInfo) { - cluster := getCluster(r) - if !cluster.GetOpts().IsPlacementRulesEnabled() { - h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error()) - return cluster, nil + regionFit, err := h.Handler.CheckRegionPlacementRule(region) + if err == errs.ErrPlacementDisabled { + h.rd.JSON(w, http.StatusPreconditionFailed, err.Error()) + return } - regionStr := mux.Vars(r)["region"] - regionID, err := strconv.ParseUint(regionStr, 10, 64) if err != nil { - h.rd.JSON(w, http.StatusBadRequest, "invalid region id") - return cluster, nil - } - region := cluster.GetRegion(regionID) - if region == nil { - h.rd.JSON(w, http.StatusNotFound, errs.ErrRegionNotFound.FastGenByArgs(regionID).Error()) - return cluster, nil + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return } - return cluster, region + h.rd.JSON(w, http.StatusOK, regionFit) } // @Tags rule @@ -180,20 +193,25 @@ func (h *ruleHandler) preCheckForRegionAndRule(w http.ResponseWriter, r *http.Re // @Success 200 {array} placement.Rule // @Failure 400 {string} string "The input is invalid." // @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /config/rules/key/{key} [get] func (h *ruleHandler) GetRulesByKey(w http.ResponseWriter, r *http.Request) { - cluster := getCluster(r) - if !cluster.GetOpts().IsPlacementRulesEnabled() { - h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error()) + manager, err := h.Handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + h.rd.JSON(w, http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } keyHex := mux.Vars(r)["key"] key, err := hex.DecodeString(keyHex) if err != nil { - h.rd.JSON(w, http.StatusBadRequest, "key should be in hex format") + h.rd.JSON(w, http.StatusBadRequest, errs.ErrKeyFormat.Error()) return } - rules := cluster.GetRuleManager().GetRulesByKey(key) + rules := manager.GetRulesByKey(key) h.rd.JSON(w, http.StatusOK, rules) } @@ -207,15 +225,19 @@ func (h *ruleHandler) GetRulesByKey(w http.ResponseWriter, r *http.Request) { // @Failure 412 {string} string "Placement rules feature is disabled." // @Router /config/rule/{group}/{id} [get] func (h *ruleHandler) GetRuleByGroupAndID(w http.ResponseWriter, r *http.Request) { - cluster := getCluster(r) - if !cluster.GetOpts().IsPlacementRulesEnabled() { - h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error()) + manager, err := h.Handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + h.rd.JSON(w, http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } group, id := mux.Vars(r)["group"], mux.Vars(r)["id"] - rule := cluster.GetRuleManager().GetRule(group, id) + rule := manager.GetRule(group, id) if rule == nil { - h.rd.JSON(w, http.StatusNotFound, nil) + h.rd.JSON(w, http.StatusNotFound, errs.ErrRuleNotFound.Error()) return } h.rd.JSON(w, http.StatusOK, rule) @@ -232,21 +254,25 @@ func (h *ruleHandler) GetRuleByGroupAndID(w http.ResponseWriter, r *http.Request // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /config/rule [post] func (h *ruleHandler) SetRule(w http.ResponseWriter, r *http.Request) { - cluster := getCluster(r) - if !cluster.GetOpts().IsPlacementRulesEnabled() { - h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error()) + manager, err := h.Handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + h.rd.JSON(w, http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } var rule placement.Rule if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &rule); err != nil { return } - oldRule := cluster.GetRuleManager().GetRule(rule.GroupID, rule.ID) + oldRule := manager.GetRule(rule.GroupID, rule.ID) if err := h.syncReplicateConfigWithDefaultRule(&rule); err != nil { h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - if err := cluster.GetRuleManager().SetKeyType(h.svr.GetConfig().PDServerCfg.KeyType). + if err := manager.SetKeyType(h.svr.GetConfig().PDServerCfg.KeyType). SetRule(&rule); err != nil { if errs.ErrRuleContent.Equal(err) || errs.ErrHexDecodingString.Equal(err) { h.rd.JSON(w, http.StatusBadRequest, err.Error()) @@ -255,6 +281,7 @@ func (h *ruleHandler) SetRule(w http.ResponseWriter, r *http.Request) { } return } + cluster := getCluster(r) cluster.AddSuspectKeyRange(rule.StartKey, rule.EndKey) if oldRule != nil { cluster.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey) @@ -285,18 +312,23 @@ func (h *ruleHandler) syncReplicateConfigWithDefaultRule(rule *placement.Rule) e // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /config/rule/{group}/{id} [delete] func (h *ruleHandler) DeleteRuleByGroup(w http.ResponseWriter, r *http.Request) { - cluster := getCluster(r) - if !cluster.GetOpts().IsPlacementRulesEnabled() { - h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error()) + manager, err := h.Handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + h.rd.JSON(w, http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } group, id := mux.Vars(r)["group"], mux.Vars(r)["id"] - rule := cluster.GetRuleManager().GetRule(group, id) - if err := cluster.GetRuleManager().DeleteRule(group, id); err != nil { + rule := manager.GetRule(group, id) + if err := manager.DeleteRule(group, id); err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } if rule != nil { + cluster := getCluster(r) cluster.AddSuspectKeyRange(rule.StartKey, rule.EndKey) } @@ -313,16 +345,20 @@ func (h *ruleHandler) DeleteRuleByGroup(w http.ResponseWriter, r *http.Request) // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /config/rules/batch [post] func (h *ruleHandler) BatchRules(w http.ResponseWriter, r *http.Request) { - cluster := getCluster(r) - if !cluster.GetOpts().IsPlacementRulesEnabled() { - h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error()) + manager, err := h.Handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + h.rd.JSON(w, http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } var opts []placement.RuleOp if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &opts); err != nil { return } - if err := cluster.GetRuleManager().SetKeyType(h.svr.GetConfig().PDServerCfg.KeyType). + if err := manager.SetKeyType(h.svr.GetConfig().PDServerCfg.KeyType). Batch(opts); err != nil { if errs.ErrRuleContent.Equal(err) || errs.ErrHexDecodingString.Equal(err) { h.rd.JSON(w, http.StatusBadRequest, err.Error()) @@ -341,15 +377,20 @@ func (h *ruleHandler) BatchRules(w http.ResponseWriter, r *http.Request) { // @Success 200 {object} placement.RuleGroup // @Failure 404 {string} string "The RuleGroup does not exist." // @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /config/rule_group/{id} [get] func (h *ruleHandler) GetGroupConfig(w http.ResponseWriter, r *http.Request) { - cluster := getCluster(r) - if !cluster.GetOpts().IsPlacementRulesEnabled() { - h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error()) + manager, err := h.Handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + h.rd.JSON(w, http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } id := mux.Vars(r)["id"] - group := cluster.GetRuleManager().GetRuleGroup(id) + group := manager.GetRuleGroup(id) if group == nil { h.rd.JSON(w, http.StatusNotFound, nil) return @@ -368,21 +409,26 @@ func (h *ruleHandler) GetGroupConfig(w http.ResponseWriter, r *http.Request) { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /config/rule_group [post] func (h *ruleHandler) SetGroupConfig(w http.ResponseWriter, r *http.Request) { - cluster := getCluster(r) - if !cluster.GetOpts().IsPlacementRulesEnabled() { - h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error()) + manager, err := h.Handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + h.rd.JSON(w, http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } var ruleGroup placement.RuleGroup if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &ruleGroup); err != nil { return } - if err := cluster.GetRuleManager().SetRuleGroup(&ruleGroup); err != nil { + if err := manager.SetRuleGroup(&ruleGroup); err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - for _, r := range cluster.GetRuleManager().GetRulesByGroup(ruleGroup.ID) { - cluster.AddSuspectKeyRange(r.StartKey, r.EndKey) + cluster := getCluster(r) + for _, rule := range manager.GetRulesByGroup(ruleGroup.ID) { + cluster.AddSuspectKeyRange(rule.StartKey, rule.EndKey) } h.rd.JSON(w, http.StatusOK, "Update rule group successfully.") } @@ -396,18 +442,23 @@ func (h *ruleHandler) SetGroupConfig(w http.ResponseWriter, r *http.Request) { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /config/rule_group/{id} [delete] func (h *ruleHandler) DeleteGroupConfig(w http.ResponseWriter, r *http.Request) { - cluster := getCluster(r) - if !cluster.GetOpts().IsPlacementRulesEnabled() { - h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error()) + manager, err := h.Handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + h.rd.JSON(w, http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } id := mux.Vars(r)["id"] - err := cluster.GetRuleManager().DeleteRuleGroup(id) + err = manager.DeleteRuleGroup(id) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - for _, r := range cluster.GetRuleManager().GetRulesByGroup(id) { + cluster := getCluster(r) + for _, r := range manager.GetRulesByGroup(id) { cluster.AddSuspectKeyRange(r.StartKey, r.EndKey) } h.rd.JSON(w, http.StatusOK, "Delete rule group successfully.") @@ -418,14 +469,19 @@ func (h *ruleHandler) DeleteGroupConfig(w http.ResponseWriter, r *http.Request) // @Produce json // @Success 200 {array} placement.RuleGroup // @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /config/rule_groups [get] func (h *ruleHandler) GetAllGroupConfigs(w http.ResponseWriter, r *http.Request) { - cluster := getCluster(r) - if !cluster.GetOpts().IsPlacementRulesEnabled() { - h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error()) + manager, err := h.Handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + h.rd.JSON(w, http.StatusPreconditionFailed, err.Error()) return } - ruleGroups := cluster.GetRuleManager().GetRuleGroups() + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + ruleGroups := manager.GetRuleGroups() h.rd.JSON(w, http.StatusOK, ruleGroups) } @@ -434,14 +490,19 @@ func (h *ruleHandler) GetAllGroupConfigs(w http.ResponseWriter, r *http.Request) // @Produce json // @Success 200 {array} placement.GroupBundle // @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /config/placement-rule [get] func (h *ruleHandler) GetPlacementRules(w http.ResponseWriter, r *http.Request) { - cluster := getCluster(r) - if !cluster.GetOpts().IsPlacementRulesEnabled() { - h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error()) + manager, err := h.Handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + h.rd.JSON(w, http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - bundles := cluster.GetRuleManager().GetAllGroupBundles() + bundles := manager.GetAllGroupBundles() h.rd.JSON(w, http.StatusOK, bundles) } @@ -455,9 +516,13 @@ func (h *ruleHandler) GetPlacementRules(w http.ResponseWriter, r *http.Request) // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /config/placement-rule [post] func (h *ruleHandler) SetPlacementRules(w http.ResponseWriter, r *http.Request) { - cluster := getCluster(r) - if !cluster.GetOpts().IsPlacementRulesEnabled() { - h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error()) + manager, err := h.Handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + h.rd.JSON(w, http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } var groups []placement.GroupBundle @@ -465,7 +530,7 @@ func (h *ruleHandler) SetPlacementRules(w http.ResponseWriter, r *http.Request) return } _, partial := r.URL.Query()["partial"] - if err := cluster.GetRuleManager().SetKeyType(h.svr.GetConfig().PDServerCfg.KeyType). + if err := manager.SetKeyType(h.svr.GetConfig().PDServerCfg.KeyType). SetAllGroupBundles(groups, !partial); err != nil { if errs.ErrRuleContent.Equal(err) || errs.ErrHexDecodingString.Equal(err) { h.rd.JSON(w, http.StatusBadRequest, err.Error()) @@ -483,14 +548,20 @@ func (h *ruleHandler) SetPlacementRules(w http.ResponseWriter, r *http.Request) // @Produce json // @Success 200 {object} placement.GroupBundle // @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /config/placement-rule/{group} [get] func (h *ruleHandler) GetPlacementRuleByGroup(w http.ResponseWriter, r *http.Request) { - cluster := getCluster(r) - if !cluster.GetOpts().IsPlacementRulesEnabled() { - h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error()) + manager, err := h.Handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + h.rd.JSON(w, http.StatusPreconditionFailed, err.Error()) return } - group := cluster.GetRuleManager().GetGroupBundle(mux.Vars(r)["group"]) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + g := mux.Vars(r)["group"] + group := manager.GetGroupBundle(g) h.rd.JSON(w, http.StatusOK, group) } @@ -502,21 +573,26 @@ func (h *ruleHandler) GetPlacementRuleByGroup(w http.ResponseWriter, r *http.Req // @Success 200 {string} string "Delete group and rules successfully." // @Failure 400 {string} string "Bad request." // @Failure 412 {string} string "Placement rules feature is disabled." +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /config/placement-rule [delete] func (h *ruleHandler) DeletePlacementRuleByGroup(w http.ResponseWriter, r *http.Request) { - cluster := getCluster(r) - if !cluster.GetOpts().IsPlacementRulesEnabled() { - h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error()) + manager, err := h.Handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + h.rd.JSON(w, http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } group := mux.Vars(r)["group"] - group, err := url.PathUnescape(group) + group, err = url.PathUnescape(group) if err != nil { h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } _, regex := r.URL.Query()["regexp"] - if err := cluster.GetRuleManager().DeleteGroupBundle(group, regex); err != nil { + if err := manager.DeleteGroupBundle(group, regex); err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } @@ -532,9 +608,13 @@ func (h *ruleHandler) DeletePlacementRuleByGroup(w http.ResponseWriter, r *http. // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /config/placement-rule/{group} [post] func (h *ruleHandler) SetPlacementRuleByGroup(w http.ResponseWriter, r *http.Request) { - cluster := getCluster(r) - if !cluster.GetOpts().IsPlacementRulesEnabled() { - h.rd.JSON(w, http.StatusPreconditionFailed, errPlacementDisabled.Error()) + manager, err := h.Handler.GetRuleManager() + if err == errs.ErrPlacementDisabled { + h.rd.JSON(w, http.StatusPreconditionFailed, err.Error()) + return + } + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } groupID := mux.Vars(r)["group"] @@ -549,7 +629,7 @@ func (h *ruleHandler) SetPlacementRuleByGroup(w http.ResponseWriter, r *http.Req h.rd.JSON(w, http.StatusBadRequest, fmt.Sprintf("group id %s does not match request URI %s", group.ID, groupID)) return } - if err := cluster.GetRuleManager().SetKeyType(h.svr.GetConfig().PDServerCfg.KeyType). + if err := manager.SetKeyType(h.svr.GetConfig().PDServerCfg.KeyType). SetGroupBundle(group); err != nil { if errs.ErrRuleContent.Equal(err) || errs.ErrHexDecodingString.Equal(err) { h.rd.JSON(w, http.StatusBadRequest, err.Error()) diff --git a/server/api/rule_test.go b/server/api/rule_test.go index d2dc50f1119..434152cd74a 100644 --- a/server/api/rule_test.go +++ b/server/api/rule_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/schedule/placement" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server" @@ -961,7 +962,7 @@ func (suite *regionRuleTestSuite) TestRegionPlacementRule() { url = fmt.Sprintf("%s/config/rules/region/%s/detail", suite.urlPrefix, "id") err = tu.CheckGetJSON(testDialClient, url, nil, tu.Status(re, http.StatusBadRequest), tu.StringContain( - re, "invalid region id")) + re, errs.ErrRegionInvalidID.Error())) suite.NoError(err) suite.svr.GetRaftCluster().GetReplicationConfig().EnablePlacementRules = false diff --git a/server/api/server.go b/server/api/server.go index 19583bb9a12..d2b22e0d4a2 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -52,6 +52,11 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP // "/schedulers", http.MethodGet // "/schedulers/{name}", http.MethodPost // "/schedulers/diagnostic/{name}", http.MethodGet + // "/hotspot/regions/read", http.MethodGet + // "/hotspot/regions/write", http.MethodGet + // "/hotspot/regions/history", http.MethodGet + // "/hotspot/stores", http.MethodGet + // "/hotspot/buckets", http.MethodGet // Following requests are **not** redirected: // "/schedulers", http.MethodPost // "/schedulers/{name}", http.MethodDelete @@ -73,6 +78,36 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP scheapi.APIPathPrefix+"/checkers", mcs.SchedulingServiceName, []string{http.MethodPost, http.MethodGet}), + serverapi.MicroserviceRedirectRule( + prefix+"/hotspot", + scheapi.APIPathPrefix+"/hotspot", + mcs.SchedulingServiceName, + []string{http.MethodGet}), + serverapi.MicroserviceRedirectRule( + prefix+"/config/rules", + scheapi.APIPathPrefix+"/config/rules", + mcs.SchedulingServiceName, + []string{http.MethodGet}), + serverapi.MicroserviceRedirectRule( + prefix+"/config/rule", + scheapi.APIPathPrefix+"/config/rule", + mcs.SchedulingServiceName, + []string{http.MethodGet}), + serverapi.MicroserviceRedirectRule( + prefix+"/config/rule_group", + scheapi.APIPathPrefix+"/config/rule_groups", // Note: this is a typo in the original code + mcs.SchedulingServiceName, + []string{http.MethodGet}), + serverapi.MicroserviceRedirectRule( + prefix+"/config/rule_groups", + scheapi.APIPathPrefix+"/config/rule_groups", + mcs.SchedulingServiceName, + []string{http.MethodGet}), + serverapi.MicroserviceRedirectRule( + prefix+"/config/placement-rule", + scheapi.APIPathPrefix+"/config/placement-rule", + mcs.SchedulingServiceName, + []string{http.MethodGet}), // because the writing of all the meta information of the scheduling service is in the API server, // we should not post and delete the scheduler directly in the scheduling service. serverapi.MicroserviceRedirectRule( diff --git a/server/api/trend.go b/server/api/trend.go index 5dd82e79ec7..d75086d267d 100644 --- a/server/api/trend.go +++ b/server/api/trend.go @@ -19,6 +19,7 @@ import ( "time" "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/server" @@ -121,12 +122,13 @@ func (h *trendHandler) GetTrend(w http.ResponseWriter, r *http.Request) { } func (h *trendHandler) getTrendStores() ([]trendStore, error) { - var readStats, writeStats statistics.StoreHotPeersStat - if hotRead := h.GetHotReadRegions(); hotRead != nil { - readStats = hotRead.AsLeader + hotRead, err := h.GetHotRegions(utils.Read) + if err != nil { + return nil, err } - if hotWrite := h.GetHotWriteRegions(); hotWrite != nil { - writeStats = hotWrite.AsPeer + hotWrite, err := h.GetHotRegions(utils.Write) + if err != nil { + return nil, err } stores, err := h.GetStores() if err != nil { @@ -147,8 +149,8 @@ func (h *trendHandler) getTrendStores() ([]trendStore, error) { LastHeartbeatTS: info.Status.LastHeartbeatTS, Uptime: info.Status.Uptime, } - s.HotReadFlow, s.HotReadRegionFlows = h.getStoreFlow(readStats, store.GetID()) - s.HotWriteFlow, s.HotWriteRegionFlows = h.getStoreFlow(writeStats, store.GetID()) + s.HotReadFlow, s.HotReadRegionFlows = h.getStoreFlow(hotRead.AsLeader, store.GetID()) + s.HotWriteFlow, s.HotWriteRegionFlows = h.getStoreFlow(hotWrite.AsPeer, store.GetID()) trendStores = append(trendStores, s) } return trendStores, nil diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 094dd482107..7a0e99efa5f 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -2394,37 +2394,6 @@ func (c *RaftCluster) putRegion(region *core.RegionInfo) error { return nil } -// GetHotWriteRegions gets hot write regions' info. -func (c *RaftCluster) GetHotWriteRegions(storeIDs ...uint64) *statistics.StoreHotPeersInfos { - hotWriteRegions := c.coordinator.GetHotRegionsByType(utils.Write) - if len(storeIDs) > 0 && hotWriteRegions != nil { - hotWriteRegions = getHotRegionsByStoreIDs(hotWriteRegions, storeIDs...) - } - return hotWriteRegions -} - -// GetHotReadRegions gets hot read regions' info. -func (c *RaftCluster) GetHotReadRegions(storeIDs ...uint64) *statistics.StoreHotPeersInfos { - hotReadRegions := c.coordinator.GetHotRegionsByType(utils.Read) - if len(storeIDs) > 0 && hotReadRegions != nil { - hotReadRegions = getHotRegionsByStoreIDs(hotReadRegions, storeIDs...) - } - return hotReadRegions -} - -func getHotRegionsByStoreIDs(hotPeerInfos *statistics.StoreHotPeersInfos, storeIDs ...uint64) *statistics.StoreHotPeersInfos { - asLeader := statistics.StoreHotPeersStat{} - asPeer := statistics.StoreHotPeersStat{} - for _, storeID := range storeIDs { - asLeader[storeID] = hotPeerInfos.AsLeader[storeID] - asPeer[storeID] = hotPeerInfos.AsPeer[storeID] - } - return &statistics.StoreHotPeersInfos{ - AsLeader: asLeader, - AsPeer: asPeer, - } -} - // GetStoreLimiter returns the dynamic adjusting limiter func (c *RaftCluster) GetStoreLimiter() *StoreLimiter { return c.limiter diff --git a/server/handler.go b/server/handler.go index 7fb74d5c715..dc4b43238d0 100644 --- a/server/handler.go +++ b/server/handler.go @@ -36,7 +36,7 @@ import ( "github.com/tikv/pd/pkg/schedule/handler" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/statistics" - "github.com/tikv/pd/pkg/statistics/buckets" + "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/apiutil" @@ -54,10 +54,14 @@ type server struct { } func (s *server) GetCoordinator() *schedule.Coordinator { - return s.GetRaftCluster().GetCoordinator() + c := s.GetRaftCluster() + if c == nil { + return nil + } + return c.GetCoordinator() } -func (s *server) GetCluster() sche.SharedCluster { +func (s *server) GetCluster() sche.SchedulerCluster { return s.GetRaftCluster() } @@ -106,53 +110,6 @@ func (h *Handler) GetScheduleConfig() *sc.ScheduleConfig { return h.s.GetScheduleConfig() } -// GetStores returns all stores in the cluster. -func (h *Handler) GetStores() ([]*core.StoreInfo, error) { - rc := h.s.GetRaftCluster() - if rc == nil { - return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() - } - storeMetas := rc.GetMetaStores() - stores := make([]*core.StoreInfo, 0, len(storeMetas)) - for _, s := range storeMetas { - storeID := s.GetId() - store := rc.GetStore(storeID) - if store == nil { - return nil, errs.ErrStoreNotFound.FastGenByArgs(storeID) - } - stores = append(stores, store) - } - return stores, nil -} - -// GetHotWriteRegions gets all hot write regions stats. -func (h *Handler) GetHotWriteRegions() *statistics.StoreHotPeersInfos { - c, err := h.GetRaftCluster() - if err != nil { - return nil - } - return c.GetHotWriteRegions() -} - -// GetHotBuckets returns all hot buckets stats. -func (h *Handler) GetHotBuckets(regionIDs ...uint64) map[uint64][]*buckets.BucketStat { - c, err := h.GetRaftCluster() - if err != nil { - return nil - } - degree := c.GetOpts().GetHotRegionCacheHitsThreshold() - return c.BucketsStats(degree, regionIDs...) -} - -// GetHotReadRegions gets all hot read regions stats. -func (h *Handler) GetHotReadRegions() *statistics.StoreHotPeersInfos { - c, err := h.GetRaftCluster() - if err != nil { - return nil - } - return c.GetHotReadRegions() -} - // GetHotRegionsWriteInterval gets interval for PD to store Hot Region information.. func (h *Handler) GetHotRegionsWriteInterval() time.Duration { return h.opt.GetHotRegionsWriteInterval() @@ -163,13 +120,68 @@ func (h *Handler) GetHotRegionsReservedDays() uint64 { return h.opt.GetHotRegionsReservedDays() } -// GetStoresLoads gets all hot write stores stats. -func (h *Handler) GetStoresLoads() map[uint64][]float64 { - rc := h.s.GetRaftCluster() - if rc == nil { - return nil +// HistoryHotRegionsRequest wrap request condition from tidb. +// it is request from tidb +type HistoryHotRegionsRequest struct { + StartTime int64 `json:"start_time,omitempty"` + EndTime int64 `json:"end_time,omitempty"` + RegionIDs []uint64 `json:"region_ids,omitempty"` + StoreIDs []uint64 `json:"store_ids,omitempty"` + PeerIDs []uint64 `json:"peer_ids,omitempty"` + IsLearners []bool `json:"is_learners,omitempty"` + IsLeaders []bool `json:"is_leaders,omitempty"` + HotRegionTypes []string `json:"hot_region_type,omitempty"` +} + +// GetAllRequestHistoryHotRegion gets all hot region info in HistoryHotRegion form. +func (h *Handler) GetAllRequestHistoryHotRegion(request *HistoryHotRegionsRequest) (*storage.HistoryHotRegions, error) { + var hotRegionTypes = storage.HotRegionTypes + if len(request.HotRegionTypes) != 0 { + hotRegionTypes = request.HotRegionTypes + } + iter := h.GetHistoryHotRegionIter(hotRegionTypes, request.StartTime, request.EndTime) + var results []*storage.HistoryHotRegion + regionSet, storeSet, peerSet, learnerSet, leaderSet := + make(map[uint64]bool), make(map[uint64]bool), + make(map[uint64]bool), make(map[bool]bool), make(map[bool]bool) + for _, id := range request.RegionIDs { + regionSet[id] = true + } + for _, id := range request.StoreIDs { + storeSet[id] = true + } + for _, id := range request.PeerIDs { + peerSet[id] = true + } + for _, isLearner := range request.IsLearners { + learnerSet[isLearner] = true + } + for _, isLeader := range request.IsLeaders { + leaderSet[isLeader] = true + } + var next *storage.HistoryHotRegion + var err error + for next, err = iter.Next(); next != nil && err == nil; next, err = iter.Next() { + if len(regionSet) != 0 && !regionSet[next.RegionID] { + continue + } + if len(storeSet) != 0 && !storeSet[next.StoreID] { + continue + } + if len(peerSet) != 0 && !peerSet[next.PeerID] { + continue + } + if !learnerSet[next.IsLearner] { + continue + } + if !leaderSet[next.IsLeader] { + continue + } + results = append(results, next) } - return rc.GetStoresLoads() + return &storage.HistoryHotRegions{ + HistoryHotRegion: results, + }, err } // AddScheduler adds a scheduler. @@ -492,24 +504,14 @@ func (h *Handler) IsLeader() bool { return h.s.member.IsLeader() } -// PackHistoryHotReadRegions get read hot region info in HistoryHotRegion form. -func (h *Handler) PackHistoryHotReadRegions() ([]storage.HistoryHotRegion, error) { - hotReadRegions := h.GetHotReadRegions() - if hotReadRegions == nil { - return nil, nil - } - hotReadPeerRegions := hotReadRegions.AsPeer - return h.packHotRegions(hotReadPeerRegions, storage.ReadType.String()) -} - -// PackHistoryHotWriteRegions get write hot region info in HistoryHotRegion from -func (h *Handler) PackHistoryHotWriteRegions() ([]storage.HistoryHotRegion, error) { - hotWriteRegions := h.GetHotWriteRegions() - if hotWriteRegions == nil { - return nil, nil +// GetHistoryHotRegions get hot region info in HistoryHotRegion form. +func (h *Handler) GetHistoryHotRegions(typ utils.RWType) ([]storage.HistoryHotRegion, error) { + hotRegions, err := h.GetHotRegions(typ) + if hotRegions == nil || err != nil { + return nil, err } - hotWritePeerRegions := hotWriteRegions.AsPeer - return h.packHotRegions(hotWritePeerRegions, storage.WriteType.String()) + hotPeers := hotRegions.AsPeer + return h.packHotRegions(hotPeers, typ.String()) } func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, hotRegionType string) (historyHotRegions []storage.HistoryHotRegion, err error) { diff --git a/server/server.go b/server/server.go index c15e0156db0..160609e37a7 100644 --- a/server/server.go +++ b/server/server.go @@ -489,10 +489,12 @@ func (s *Server) startServer(ctx context.Context) error { s.safePointV2Manager = gc.NewSafePointManagerV2(s.ctx, s.storage, s.storage, s.storage) s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, "", s.cluster) // initial hot_region_storage in here. - s.hotRegionStorage, err = storage.NewHotRegionsStorage( - ctx, filepath.Join(s.cfg.DataDir, "hot-region"), s.encryptionKeyManager, s.handler) - if err != nil { - return err + if !s.IsAPIServiceMode() { + s.hotRegionStorage, err = storage.NewHotRegionsStorage( + ctx, filepath.Join(s.cfg.DataDir, "hot-region"), s.encryptionKeyManager, s.handler) + if err != nil { + return err + } } // Run callbacks log.Info("triggering the start callback functions") @@ -551,8 +553,10 @@ func (s *Server) Close() { log.Error("close storage meet error", errs.ZapError(err)) } - if err := s.hotRegionStorage.Close(); err != nil { - log.Error("close hot region storage meet error", errs.ZapError(err)) + if s.hotRegionStorage != nil { + if err := s.hotRegionStorage.Close(); err != nil { + log.Error("close hot region storage meet error", errs.ZapError(err)) + } } // Run callbacks diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index 1e5f9baf1bd..140fc9d3ebe 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -10,9 +10,13 @@ import ( "github.com/pingcap/failpoint" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/core" _ "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1" + "github.com/tikv/pd/pkg/schedule/handler" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/apiutil" - "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" ) @@ -37,7 +41,7 @@ func TestAPI(t *testing.T) { suite.Run(t, &apiTestSuite{}) } -func (suite *apiTestSuite) SetupSuite() { +func (suite *apiTestSuite) SetupTest() { ctx, cancel := context.WithCancel(context.Background()) suite.ctx = ctx cluster, err := tests.NewTestAPICluster(suite.ctx, 1) @@ -56,14 +60,19 @@ func (suite *apiTestSuite) SetupSuite() { suite.cleanupFunc = func() { cancel() } + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.backendEndpoints) + suite.NoError(err) + suite.cluster.SetSchedulingCluster(tc) + tc.WaitForPrimaryServing(suite.Require()) } -func (suite *apiTestSuite) TearDownSuite() { +func (suite *apiTestSuite) TearDownTest() { suite.cluster.Destroy() suite.cleanupFunc() } func (suite *apiTestSuite) TestGetCheckerByName() { + re := suite.Require() testCases := []struct { name string }{ @@ -75,14 +84,8 @@ func (suite *apiTestSuite) TestGetCheckerByName() { {name: "joint-state"}, } - re := suite.Require() - s, cleanup := tests.StartSingleSchedulingTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) - defer cleanup() - testutil.Eventually(re, func() bool { - return s.IsServing() - }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) - addr := s.GetAddr() - urlPrefix := fmt.Sprintf("%s/scheduling/api/v1/checkers", addr) + s := suite.cluster.GetSchedulingPrimaryServer() + urlPrefix := fmt.Sprintf("%s/scheduling/api/v1/checkers", s.GetAddr()) co := s.GetCoordinator() for _, testCase := range testCases { @@ -117,17 +120,12 @@ func (suite *apiTestSuite) TestAPIForward() { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/apiutil/serverapi/checkHeader")) }() - tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.backendEndpoints) - re.NoError(err) - defer tc.Destroy() - tc.WaitForPrimaryServing(re) - urlPrefix := fmt.Sprintf("%s/pd/api/v1", suite.backendEndpoints) var slice []string var resp map[string]interface{} // Test opeartor - err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "operators"), &slice, + err := testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "operators"), &slice, testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) re.Len(slice, 0) @@ -148,7 +146,7 @@ func (suite *apiTestSuite) TestAPIForward() { testutil.StatusNotOK(re), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) - // Test checker: + // Test checker err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "checker/merge"), &resp, testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) @@ -193,4 +191,99 @@ func (suite *apiTestSuite) TestAPIForward() { err = testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "schedulers/balance-leader-scheduler"), testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) re.NoError(err) + + // Test hotspot + var hotRegions statistics.StoreHotPeersInfos + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "hotspot/regions/write"), &hotRegions, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "hotspot/regions/read"), &hotRegions, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + var stores handler.HotStoreStats + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "hotspot/stores"), &stores, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + var buckets handler.HotBucketsResponse + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "hotspot/buckets"), &buckets, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + var history storage.HistoryHotRegions + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "hotspot/regions/history"), &history, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + + // Test rules: only forward `GET` request + var rules []*placement.Rule + tests.MustPutRegion(re, suite.cluster, 2, 1, []byte("a"), []byte("b"), core.SetApproximateSize(60)) + rules = []*placement.Rule{ + { + GroupID: "pd", + ID: "default", + Role: "voter", + Count: 3, + LocationLabels: []string{}, + }, + } + rulesArgs, err := json.Marshal(rules) + suite.NoError(err) + + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "/config/rules"), &rules, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rules"), rulesArgs, + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rules/batch"), rulesArgs, + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rules/group/pd"), &rules, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rules/region/2"), &rules, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + var fit placement.RegionFit + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rules/region/2/detail"), &fit, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rules/key/0000000000000001"), &rules, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rule/pd/2"), nil, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rule/pd/2"), + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rule"), rulesArgs, + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rule_group/pd"), nil, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rule_group/pd"), + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rule_group"), rulesArgs, + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/rule_groups"), nil, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/placement-rule"), nil, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/placement-rule"), rulesArgs, + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/placement-rule/pd"), nil, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/placement-rule/pd"), + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/placement-rule/pd"), rulesArgs, + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) } diff --git a/tests/pdctl/hot/hot_test.go b/tests/pdctl/hot/hot_test.go index 359d89199c9..ac9bb3d83bf 100644 --- a/tests/pdctl/hot/hot_test.go +++ b/tests/pdctl/hot/hot_test.go @@ -25,29 +25,52 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/schedule/handler" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/typeutil" - "github.com/tikv/pd/server/api" + "github.com/tikv/pd/server" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" ) -func TestHot(t *testing.T) { - re := require.New(t) +type hotTestSuite struct { + suite.Suite +} + +func TestHotTestSuite(t *testing.T) { + suite.Run(t, new(hotTestSuite)) +} + +func (suite *hotTestSuite) TestHot() { + var start time.Time + start = start.Add(time.Hour) + opts := []tests.ConfigOption{ + func(conf *config.Config, serverName string) { + conf.Schedule.MaxStoreDownTime.Duration = time.Since(start) + }, + } + env := tests.NewSchedulingTestEnvironment(suite.T(), opts...) + env.RunTestInTwoModes(suite.checkHot) + + opts = append(opts, func(conf *config.Config, serverName string) { + conf.Schedule.HotRegionCacheHitsThreshold = 0 + }) + env = tests.NewSchedulingTestEnvironment(suite.T(), opts...) + env.RunTestInTwoModes(suite.checkHotWithoutHotPeer) + env = tests.NewSchedulingTestEnvironment(suite.T(), opts...) + env.RunTestInTwoModes(suite.checkHotWithStoreID) +} + +func (suite *hotTestSuite) checkHot(cluster *tests.TestCluster) { + re := suite.Require() statistics.Denoising = false - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) - re.NoError(err) - err = cluster.RunInitialServers() - re.NoError(err) - cluster.WaitLeader() pdAddr := cluster.GetConfig().GetClientURL() cmd := pdctlCmd.GetRootCmd() @@ -63,13 +86,11 @@ func TestHot(t *testing.T) { Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}, } - leaderServer := cluster.GetLeaderServer() - re.NoError(leaderServer.BootstrapCluster()) tests.MustPutStore(re, cluster, store1) tests.MustPutStore(re, cluster, store2) - defer cluster.Destroy() // test hot store + leaderServer := cluster.GetLeaderServer() ss := leaderServer.GetStore(1) now := time.Now().Unix() @@ -82,22 +103,34 @@ func TestHot(t *testing.T) { newStats.BytesRead = bytesRead newStats.KeysWritten = keysWritten newStats.KeysRead = keysRead + rc := leaderServer.GetRaftCluster() + stats := rc.GetStoresStats() + hotStat := rc.GetHotStat() + getHotPeerStat := rc.GetHotPeerStat + if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { + stats = sche.GetCluster().GetStoresStats() + hotStat = sche.GetCluster().GetHotStat() + getHotPeerStat = sche.GetCluster().GetHotPeerStat + } + for i := utils.DefaultWriteMfSize; i > 0; i-- { start := uint64(now - utils.StoreHeartBeatReportInterval*int64(i)) end := start + utils.StoreHeartBeatReportInterval newStats.Interval = &pdpb.TimeInterval{StartTimestamp: start, EndTimestamp: end} - rc.GetStoresStats().Observe(ss.GetID(), newStats) + stats.Observe(ss.GetID(), newStats) } for i := statistics.RegionsStatsRollingWindowsSize; i > 0; i-- { - rc.GetStoresStats().ObserveRegionsStats([]uint64{2}, []float64{float64(bytesWritten)}, []float64{float64(keysWritten)}) + stats.ObserveRegionsStats([]uint64{2}, + []float64{float64(bytesWritten)}, + []float64{float64(keysWritten)}) } args := []string{"-u", pdAddr, "hot", "store"} output, err := pdctl.ExecuteCommand(cmd, args...) re.NoError(err) - hotStores := api.HotStoreStats{} + hotStores := handler.HotStoreStats{} re.NoError(json.Unmarshal(output, &hotStores)) re.Equal(float64(bytesWritten)/utils.StoreHeartBeatReportInterval, hotStores.BytesWriteStats[1]) re.Equal(float64(bytesRead)/utils.StoreHeartBeatReportInterval, hotStores.BytesReadStats[1]) @@ -149,9 +182,9 @@ func TestHot(t *testing.T) { region := core.NewRegionInfo(&metapb.Region{ Id: hotRegionID, }, leader) - rc.GetHotStat().CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) + hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) testutil.Eventually(re, func() bool { - hotPeerStat := rc.GetHotPeerStat(utils.Read, hotRegionID, hotStoreID) + hotPeerStat := getHotPeerStat(utils.Read, hotRegionID, hotStoreID) return hotPeerStat != nil }) if reportInterval >= utils.StoreHeartBeatReportInterval { @@ -165,7 +198,7 @@ func TestHot(t *testing.T) { []byte("c"), []byte("d"), core.SetWrittenBytes(1000000000*reportInterval), core.SetReportInterval(0, reportInterval)) testutil.Eventually(re, func() bool { - hotPeerStat := rc.GetHotPeerStat(utils.Write, hotRegionID, hotStoreID) + hotPeerStat := getHotPeerStat(utils.Write, hotRegionID, hotStoreID) return hotPeerStat != nil }) if reportInterval >= utils.RegionHeartBeatReportInterval { @@ -196,18 +229,12 @@ func TestHot(t *testing.T) { testCommand(reportIntervals, "read") } -func TestHotWithStoreID(t *testing.T) { - re := require.New(t) +func (suite *hotTestSuite) checkHotWithStoreID(cluster *tests.TestCluster) { + re := suite.Require() statistics.Denoising = false - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1, func(cfg *config.Config, serverName string) { cfg.Schedule.HotRegionCacheHitsThreshold = 0 }) - re.NoError(err) - err = cluster.RunInitialServers() - re.NoError(err) - cluster.WaitLeader() pdAddr := cluster.GetConfig().GetClientURL() cmd := pdctlCmd.GetRootCmd() + leaderServer := cluster.GetLeaderServer() stores := []*metapb.Store{ { @@ -222,22 +249,38 @@ func TestHotWithStoreID(t *testing.T) { }, } - leaderServer := cluster.GetLeaderServer() - re.NoError(leaderServer.BootstrapCluster()) for _, store := range stores { tests.MustPutStore(re, cluster, store) } - defer cluster.Destroy() + s := &server.GrpcServer{Server: leaderServer.GetServer()} + for _, store := range stores { + resp1, err := s.StoreHeartbeat( + context.Background(), &pdpb.StoreHeartbeatRequest{ + Header: &pdpb.RequestHeader{ClusterId: leaderServer.GetClusterID()}, + Stats: &pdpb.StoreStats{ + StoreId: store.Id, + Capacity: 1000 * units.MiB, + Available: 1000 * units.MiB, + }, + }, + ) + re.NoError(err) + re.Empty(resp1.GetHeader().GetError()) + } tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(3000000000), core.SetReportInterval(0, utils.RegionHeartBeatReportInterval)) tests.MustPutRegion(re, cluster, 2, 2, []byte("c"), []byte("d"), core.SetWrittenBytes(6000000000), core.SetReportInterval(0, utils.RegionHeartBeatReportInterval)) tests.MustPutRegion(re, cluster, 3, 1, []byte("e"), []byte("f"), core.SetWrittenBytes(9000000000), core.SetReportInterval(0, utils.RegionHeartBeatReportInterval)) - // wait hot scheduler starts - rc := leaderServer.GetRaftCluster() + + getHotPeerStat := leaderServer.GetRaftCluster().GetHotPeerStat + if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { + getHotPeerStat = sche.GetCluster().GetHotPeerStat + } + testutil.Eventually(re, func() bool { - return rc.GetHotPeerStat(utils.Write, 1, 1) != nil && - rc.GetHotPeerStat(utils.Write, 2, 2) != nil && - rc.GetHotPeerStat(utils.Write, 3, 1) != nil + return getHotPeerStat(utils.Write, 1, 1) != nil && + getHotPeerStat(utils.Write, 2, 2) != nil && + getHotPeerStat(utils.Write, 3, 1) != nil }) args := []string{"-u", pdAddr, "hot", "write", "1"} output, err := pdctl.ExecuteCommand(cmd, args...) @@ -247,53 +290,90 @@ func TestHotWithStoreID(t *testing.T) { re.Len(hotRegion.AsLeader, 1) re.Equal(2, hotRegion.AsLeader[1].Count) re.Equal(float64(200000000), hotRegion.AsLeader[1].TotalBytesRate) +} - args = []string{"-u", pdAddr, "hot", "write", "1", "2"} - output, err = pdctl.ExecuteCommand(cmd, args...) - re.NoError(err) - hotRegion = statistics.StoreHotPeersInfos{} - re.NoError(json.Unmarshal(output, &hotRegion)) - re.Len(hotRegion.AsLeader, 2) - re.Equal(2, hotRegion.AsLeader[1].Count) - re.Equal(1, hotRegion.AsLeader[2].Count) - re.Equal(float64(200000000), hotRegion.AsLeader[1].TotalBytesRate) - re.Equal(float64(100000000), hotRegion.AsLeader[2].TotalBytesRate) +func (suite *hotTestSuite) checkHotWithoutHotPeer(cluster *tests.TestCluster) { + re := suite.Require() + statistics.Denoising = false - stats := &metapb.BucketStats{ - ReadBytes: []uint64{10 * units.MiB}, - ReadKeys: []uint64{11 * units.MiB}, - ReadQps: []uint64{0}, - WriteKeys: []uint64{12 * units.MiB}, - WriteBytes: []uint64{13 * units.MiB}, - WriteQps: []uint64{0}, + pdAddr := cluster.GetConfig().GetClientURL() + cmd := pdctlCmd.GetRootCmd() + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, } - buckets := tests.MustReportBuckets(re, cluster, 1, []byte("a"), []byte("b"), stats) - args = []string{"-u", pdAddr, "hot", "buckets", "1"} - output, err = pdctl.ExecuteCommand(cmd, args...) - re.NoError(err) - hotBuckets := api.HotBucketsResponse{} - re.NoError(json.Unmarshal(output, &hotBuckets)) - re.Len(hotBuckets, 1) - re.Len(hotBuckets[1], 1) - item := hotBuckets[1][0] - re.Equal(core.HexRegionKeyStr(buckets.GetKeys()[0]), item.StartKey) - re.Equal(core.HexRegionKeyStr(buckets.GetKeys()[1]), item.EndKey) - re.Equal(1, item.HotDegree) - interval := buckets.GetPeriodInMs() / 1000 - re.Equal(buckets.GetStats().ReadBytes[0]/interval, item.ReadBytes) - re.Equal(buckets.GetStats().ReadKeys[0]/interval, item.ReadKeys) - re.Equal(buckets.GetStats().WriteBytes[0]/interval, item.WriteBytes) - re.Equal(buckets.GetStats().WriteKeys[0]/interval, item.WriteKeys) - args = []string{"-u", pdAddr, "hot", "buckets", "2"} - output, err = pdctl.ExecuteCommand(cmd, args...) - re.NoError(err) - hotBuckets = api.HotBucketsResponse{} - re.NoError(json.Unmarshal(output, &hotBuckets)) - re.Nil(hotBuckets[2]) + leaderServer := cluster.GetLeaderServer() + for _, store := range stores { + tests.MustPutStore(re, cluster, store) + } + timestamp := uint64(time.Now().UnixNano()) + load := 1024.0 + s := &server.GrpcServer{Server: leaderServer.GetServer()} + for _, store := range stores { + for i := 0; i < 5; i++ { + resp1, err := s.StoreHeartbeat( + context.Background(), &pdpb.StoreHeartbeatRequest{ + Header: &pdpb.RequestHeader{ClusterId: leaderServer.GetClusterID()}, + Stats: &pdpb.StoreStats{ + StoreId: store.Id, + BytesRead: uint64(load * utils.StoreHeartBeatReportInterval), + KeysRead: uint64(load * utils.StoreHeartBeatReportInterval), + BytesWritten: uint64(load * utils.StoreHeartBeatReportInterval), + KeysWritten: uint64(load * utils.StoreHeartBeatReportInterval), + Capacity: 1000 * units.MiB, + Available: 1000 * units.MiB, + Interval: &pdpb.TimeInterval{ + StartTimestamp: timestamp + uint64(i*utils.StoreHeartBeatReportInterval), + EndTimestamp: timestamp + uint64((i+1)*utils.StoreHeartBeatReportInterval)}, + }, + }, + ) + re.NoError(err) + re.Empty(resp1.GetHeader().GetError()) + } + } + + { + args := []string{"-u", pdAddr, "hot", "read"} + output, err := pdctl.ExecuteCommand(cmd, args...) + hotRegion := statistics.StoreHotPeersInfos{} + re.NoError(err) + re.NoError(json.Unmarshal(output, &hotRegion)) + re.Equal(hotRegion.AsPeer[1].Count, 0) + re.Equal(0.0, hotRegion.AsPeer[1].TotalBytesRate) + re.Equal(load, hotRegion.AsPeer[1].StoreByteRate) + re.Equal(hotRegion.AsLeader[1].Count, 0) + re.Equal(0.0, hotRegion.AsLeader[1].TotalBytesRate) + re.Equal(load, hotRegion.AsLeader[1].StoreByteRate) + } + { + args := []string{"-u", pdAddr, "hot", "write"} + output, err := pdctl.ExecuteCommand(cmd, args...) + hotRegion := statistics.StoreHotPeersInfos{} + re.NoError(err) + re.NoError(json.Unmarshal(output, &hotRegion)) + re.Equal(hotRegion.AsPeer[1].Count, 0) + re.Equal(0.0, hotRegion.AsPeer[1].TotalBytesRate) + re.Equal(load, hotRegion.AsPeer[1].StoreByteRate) + re.Equal(hotRegion.AsLeader[1].Count, 0) + re.Equal(0.0, hotRegion.AsLeader[1].TotalBytesRate) + re.Equal(0.0, hotRegion.AsLeader[1].StoreByteRate) // write leader sum + } } func TestHistoryHotRegions(t *testing.T) { + // TODO: support history hotspot in scheduling server with stateless in the future. + // Ref: https://github.com/tikv/pd/pull/7183 re := require.New(t) statistics.Denoising = false ctx, cancel := context.WithCancel(context.Background()) @@ -348,7 +428,7 @@ func TestHistoryHotRegions(t *testing.T) { // wait hot scheduler starts testutil.Eventually(re, func() bool { hotRegionStorage := leaderServer.GetServer().GetHistoryHotRegionStorage() - iter := hotRegionStorage.NewIterator([]string{storage.WriteType.String()}, startTime*1000, time.Now().UnixNano()/int64(time.Millisecond)) + iter := hotRegionStorage.NewIterator([]string{utils.Write.String()}, startTime*1000, time.Now().UnixNano()/int64(time.Millisecond)) next, err := iter.Next() return err == nil && next != nil }) @@ -414,7 +494,8 @@ func TestHistoryHotRegions(t *testing.T) { re.Error(json.Unmarshal(output, &hotRegions)) } -func TestHotWithoutHotPeer(t *testing.T) { +func TestBuckets(t *testing.T) { + // TODO: support forward bucket request in scheduling server in the future. re := require.New(t) statistics.Denoising = false ctx, cancel := context.WithCancel(context.Background()) @@ -445,53 +526,42 @@ func TestHotWithoutHotPeer(t *testing.T) { for _, store := range stores { tests.MustPutStore(re, cluster, store) } - timestamp := uint64(time.Now().UnixNano()) - load := 1024.0 - for _, store := range stores { - for i := 0; i < 5; i++ { - err := leaderServer.GetServer().GetRaftCluster().HandleStoreHeartbeat(&pdpb.StoreHeartbeatRequest{ - Stats: &pdpb.StoreStats{ - StoreId: store.Id, - BytesRead: uint64(load * utils.StoreHeartBeatReportInterval), - KeysRead: uint64(load * utils.StoreHeartBeatReportInterval), - BytesWritten: uint64(load * utils.StoreHeartBeatReportInterval), - KeysWritten: uint64(load * utils.StoreHeartBeatReportInterval), - Capacity: 1000 * units.MiB, - Available: 1000 * units.MiB, - Interval: &pdpb.TimeInterval{ - StartTimestamp: timestamp + uint64(i*utils.StoreHeartBeatReportInterval), - EndTimestamp: timestamp + uint64((i+1)*utils.StoreHeartBeatReportInterval)}, - }, - }, &pdpb.StoreHeartbeatResponse{}) - re.NoError(err) - } - } defer cluster.Destroy() - { - args := []string{"-u", pdAddr, "hot", "read"} - output, err := pdctl.ExecuteCommand(cmd, args...) - hotRegion := statistics.StoreHotPeersInfos{} - re.NoError(err) - re.NoError(json.Unmarshal(output, &hotRegion)) - re.Equal(hotRegion.AsPeer[1].Count, 0) - re.Equal(0.0, hotRegion.AsPeer[1].TotalBytesRate) - re.Equal(load, hotRegion.AsPeer[1].StoreByteRate) - re.Equal(hotRegion.AsLeader[1].Count, 0) - re.Equal(0.0, hotRegion.AsLeader[1].TotalBytesRate) - re.Equal(load, hotRegion.AsLeader[1].StoreByteRate) - } - { - args := []string{"-u", pdAddr, "hot", "write"} - output, err := pdctl.ExecuteCommand(cmd, args...) - hotRegion := statistics.StoreHotPeersInfos{} - re.NoError(err) - re.NoError(json.Unmarshal(output, &hotRegion)) - re.Equal(hotRegion.AsPeer[1].Count, 0) - re.Equal(0.0, hotRegion.AsPeer[1].TotalBytesRate) - re.Equal(load, hotRegion.AsPeer[1].StoreByteRate) - re.Equal(hotRegion.AsLeader[1].Count, 0) - re.Equal(0.0, hotRegion.AsLeader[1].TotalBytesRate) - re.Equal(0.0, hotRegion.AsLeader[1].StoreByteRate) // write leader sum + tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(3000000000), core.SetReportInterval(0, utils.RegionHeartBeatReportInterval)) + tests.MustPutRegion(re, cluster, 2, 2, []byte("c"), []byte("d"), core.SetWrittenBytes(6000000000), core.SetReportInterval(0, utils.RegionHeartBeatReportInterval)) + tests.MustPutRegion(re, cluster, 3, 1, []byte("e"), []byte("f"), core.SetWrittenBytes(9000000000), core.SetReportInterval(0, utils.RegionHeartBeatReportInterval)) + + stats := &metapb.BucketStats{ + ReadBytes: []uint64{10 * units.MiB}, + ReadKeys: []uint64{11 * units.MiB}, + ReadQps: []uint64{0}, + WriteKeys: []uint64{12 * units.MiB}, + WriteBytes: []uint64{13 * units.MiB}, + WriteQps: []uint64{0}, } + buckets := tests.MustReportBuckets(re, cluster, 1, []byte("a"), []byte("b"), stats) + args := []string{"-u", pdAddr, "hot", "buckets", "1"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + hotBuckets := handler.HotBucketsResponse{} + re.NoError(json.Unmarshal(output, &hotBuckets)) + re.Len(hotBuckets, 1) + re.Len(hotBuckets[1], 1) + item := hotBuckets[1][0] + re.Equal(core.HexRegionKeyStr(buckets.GetKeys()[0]), item.StartKey) + re.Equal(core.HexRegionKeyStr(buckets.GetKeys()[1]), item.EndKey) + re.Equal(1, item.HotDegree) + interval := buckets.GetPeriodInMs() / 1000 + re.Equal(buckets.GetStats().ReadBytes[0]/interval, item.ReadBytes) + re.Equal(buckets.GetStats().ReadKeys[0]/interval, item.ReadKeys) + re.Equal(buckets.GetStats().WriteBytes[0]/interval, item.WriteBytes) + re.Equal(buckets.GetStats().WriteKeys[0]/interval, item.WriteKeys) + + args = []string{"-u", pdAddr, "hot", "buckets", "2"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + hotBuckets = handler.HotBucketsResponse{} + re.NoError(json.Unmarshal(output, &hotBuckets)) + re.Nil(hotBuckets[2]) } diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index a6f11a49889..64ed5114646 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -412,7 +412,7 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te suite.T().Log(testCase.name) // TODO: remove this after we can sync this config to all servers. if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { - sche.GetPersistConfig().SetPlacementRuleEnabled(testCase.placementRuleEnable) + sche.GetCluster().GetSchedulerConfig().SetPlacementRuleEnabled(testCase.placementRuleEnable) } else { svr.GetRaftCluster().GetOpts().SetPlacementRuleEnabled(testCase.placementRuleEnable) } diff --git a/tests/server/storage/hot_region_storage_test.go b/tests/server/storage/hot_region_storage_test.go index 00d0244a790..12110be0249 100644 --- a/tests/server/storage/hot_region_storage_test.go +++ b/tests/server/storage/hot_region_storage_test.go @@ -104,35 +104,35 @@ func TestHotRegionStorage(t *testing.T) { var next *storage.HistoryHotRegion hotRegionStorage := leaderServer.GetServer().GetHistoryHotRegionStorage() testutil.Eventually(re, func() bool { // wait for the history hot region to be written to the storage - iter = hotRegionStorage.NewIterator([]string{storage.WriteType.String()}, startTime*1000, time.Now().UnixMilli()) + iter = hotRegionStorage.NewIterator([]string{utils.Write.String()}, startTime*1000, time.Now().UnixMilli()) next, err = iter.Next() return err == nil && next != nil }) re.Equal(uint64(1), next.RegionID) re.Equal(uint64(1), next.StoreID) - re.Equal(storage.WriteType.String(), next.HotRegionType) + re.Equal(utils.Write.String(), next.HotRegionType) next, err = iter.Next() re.NoError(err) re.NotNil(next) re.Equal(uint64(2), next.RegionID) re.Equal(uint64(2), next.StoreID) - re.Equal(storage.WriteType.String(), next.HotRegionType) + re.Equal(utils.Write.String(), next.HotRegionType) next, err = iter.Next() re.NoError(err) re.Nil(next) - iter = hotRegionStorage.NewIterator([]string{storage.ReadType.String()}, startTime*1000, time.Now().UnixMilli()) + iter = hotRegionStorage.NewIterator([]string{utils.Read.String()}, startTime*1000, time.Now().UnixMilli()) next, err = iter.Next() re.NoError(err) re.NotNil(next) re.Equal(uint64(3), next.RegionID) re.Equal(uint64(1), next.StoreID) - re.Equal(storage.ReadType.String(), next.HotRegionType) + re.Equal(utils.Read.String(), next.HotRegionType) next, err = iter.Next() re.NoError(err) re.NotNil(next) re.Equal(uint64(4), next.RegionID) re.Equal(uint64(2), next.StoreID) - re.Equal(storage.ReadType.String(), next.HotRegionType) + re.Equal(utils.Read.String(), next.HotRegionType) next, err = iter.Next() re.NoError(err) re.Nil(next) @@ -181,13 +181,13 @@ func TestHotRegionStorageReservedDayConfigChange(t *testing.T) { var next *storage.HistoryHotRegion testutil.Eventually(re, func() bool { // wait for the history hot region to be written to the storage hotRegionStorage := leaderServer.GetServer().GetHistoryHotRegionStorage() - iter = hotRegionStorage.NewIterator([]string{storage.WriteType.String()}, startTime*1000, time.Now().UnixMilli()) + iter = hotRegionStorage.NewIterator([]string{utils.Write.String()}, startTime*1000, time.Now().UnixMilli()) next, err = iter.Next() return err == nil && next != nil }) re.Equal(uint64(1), next.RegionID) re.Equal(uint64(1), next.StoreID) - re.Equal(storage.WriteType.String(), next.HotRegionType) + re.Equal(utils.Write.String(), next.HotRegionType) next, err = iter.Next() re.NoError(err) re.Nil(next) @@ -200,13 +200,13 @@ func TestHotRegionStorageReservedDayConfigChange(t *testing.T) { core.SetReportInterval(uint64(time.Now().Unix()-utils.RegionHeartBeatReportInterval), uint64(time.Now().Unix()))) time.Sleep(10 * interval) hotRegionStorage := leaderServer.GetServer().GetHistoryHotRegionStorage() - iter = hotRegionStorage.NewIterator([]string{storage.WriteType.String()}, startTime*1000, time.Now().UnixMilli()) + iter = hotRegionStorage.NewIterator([]string{utils.Write.String()}, startTime*1000, time.Now().UnixMilli()) next, err = iter.Next() re.NoError(err) re.NotNil(next) re.Equal(uint64(1), next.RegionID) re.Equal(uint64(1), next.StoreID) - re.Equal(storage.WriteType.String(), next.HotRegionType) + re.Equal(utils.Write.String(), next.HotRegionType) next, err = iter.Next() re.NoError(err) re.Nil(next) @@ -215,19 +215,19 @@ func TestHotRegionStorageReservedDayConfigChange(t *testing.T) { leaderServer.GetServer().SetScheduleConfig(schedule) time.Sleep(3 * interval) hotRegionStorage = leaderServer.GetServer().GetHistoryHotRegionStorage() - iter = hotRegionStorage.NewIterator([]string{storage.WriteType.String()}, startTime*1000, time.Now().UnixMilli()) + iter = hotRegionStorage.NewIterator([]string{utils.Write.String()}, startTime*1000, time.Now().UnixMilli()) next, err = iter.Next() re.NoError(err) re.NotNil(next) re.Equal(uint64(1), next.RegionID) re.Equal(uint64(1), next.StoreID) - re.Equal(storage.WriteType.String(), next.HotRegionType) + re.Equal(utils.Write.String(), next.HotRegionType) next, err = iter.Next() re.NoError(err) re.NotNil(next) re.Equal(uint64(2), next.RegionID) re.Equal(uint64(2), next.StoreID) - re.Equal(storage.WriteType.String(), next.HotRegionType) + re.Equal(utils.Write.String(), next.HotRegionType) } func TestHotRegionStorageWriteIntervalConfigChange(t *testing.T) { @@ -274,13 +274,13 @@ func TestHotRegionStorageWriteIntervalConfigChange(t *testing.T) { var next *storage.HistoryHotRegion testutil.Eventually(re, func() bool { // wait for the history hot region to be written to the storage hotRegionStorage := leaderServer.GetServer().GetHistoryHotRegionStorage() - iter = hotRegionStorage.NewIterator([]string{storage.WriteType.String()}, startTime*1000, time.Now().UnixMilli()) + iter = hotRegionStorage.NewIterator([]string{utils.Write.String()}, startTime*1000, time.Now().UnixMilli()) next, err = iter.Next() return err == nil && next != nil }) re.Equal(uint64(1), next.RegionID) re.Equal(uint64(1), next.StoreID) - re.Equal(storage.WriteType.String(), next.HotRegionType) + re.Equal(utils.Write.String(), next.HotRegionType) next, err = iter.Next() re.NoError(err) re.Nil(next) @@ -294,13 +294,13 @@ func TestHotRegionStorageWriteIntervalConfigChange(t *testing.T) { time.Sleep(10 * interval) // it cant get new hot region because wait time smaller than hot region write interval hotRegionStorage := leaderServer.GetServer().GetHistoryHotRegionStorage() - iter = hotRegionStorage.NewIterator([]string{storage.WriteType.String()}, startTime*1000, time.Now().UnixMilli()) + iter = hotRegionStorage.NewIterator([]string{utils.Write.String()}, startTime*1000, time.Now().UnixMilli()) next, err = iter.Next() re.NoError(err) re.NotNil(next) re.Equal(uint64(1), next.RegionID) re.Equal(uint64(1), next.StoreID) - re.Equal(storage.WriteType.String(), next.HotRegionType) + re.Equal(utils.Write.String(), next.HotRegionType) next, err = iter.Next() re.NoError(err) re.Nil(next)