diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml index c0bdb7ea492..e2bf99c026f 100644 --- a/.github/workflows/check.yaml +++ b/.github/workflows/check.yaml @@ -6,7 +6,7 @@ concurrency: jobs: statics: runs-on: ubuntu-latest - timeout-minutes: 10 + timeout-minutes: 20 steps: - uses: actions/setup-go@v3 with: diff --git a/errors.toml b/errors.toml index 1b96de8a209..1d10d40d294 100644 --- a/errors.toml +++ b/errors.toml @@ -496,6 +496,16 @@ error = ''' init file log error, %s ''' +["PD:mcs:ErrNotFoundSchedulingAddr"] +error = ''' +cannot find scheduling address +''' + +["PD:mcs:ErrSchedulingServer"] +error = ''' +scheduling server meets %v +''' + ["PD:member:ErrCheckCampaign"] error = ''' check campaign failed diff --git a/pkg/core/region.go b/pkg/core/region.go index 6875844b7a6..2ac323a1272 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -1339,8 +1339,8 @@ func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate flo // GetClusterNotFromStorageRegionsCnt gets the total count of regions that not loaded from storage anymore func (r *RegionsInfo) GetClusterNotFromStorageRegionsCnt() int { - r.st.RLock() - defer r.st.RUnlock() + r.t.RLock() + defer r.t.RUnlock() return r.tree.notFromStorageRegionsCnt } diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 181dfc9b393..e5bac8519be 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -403,3 +403,9 @@ var ( ErrDeleteReservedGroup = errors.Normalize("cannot delete reserved group", errors.RFCCodeText("PD:resourcemanager:ErrDeleteReservedGroup")) ErrInvalidGroup = errors.Normalize("invalid group settings, please check the group name, priority and the number of resources", errors.RFCCodeText("PD:resourcemanager:ErrInvalidGroup")) ) + +// Micro service errors +var ( + ErrNotFoundSchedulingAddr = errors.Normalize("cannot find scheduling address", errors.RFCCodeText("PD:mcs:ErrNotFoundSchedulingAddr")) + ErrSchedulingServer = errors.Normalize("scheduling server meets %v", errors.RFCCodeText("PD:mcs:ErrSchedulingServer")) +) diff --git a/pkg/mcs/resourcemanager/server/resource_group.go b/pkg/mcs/resourcemanager/server/resource_group.go index 863cfd19026..fc3a58cab51 100644 --- a/pkg/mcs/resourcemanager/server/resource_group.go +++ b/pkg/mcs/resourcemanager/server/resource_group.go @@ -138,7 +138,7 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup { // RequestRU requests the RU of the resource group. func (rg *ResourceGroup) RequestRU( now time.Time, - neededTokens float64, + requiredToken float64, targetPeriodMs, clientUniqueID uint64, ) *rmpb.GrantedRUTokenBucket { rg.Lock() @@ -147,7 +147,7 @@ func (rg *ResourceGroup) RequestRU( if rg.RUSettings == nil || rg.RUSettings.RU.Settings == nil { return nil } - tb, trickleTimeMs := rg.RUSettings.RU.request(now, neededTokens, targetPeriodMs, clientUniqueID) + tb, trickleTimeMs := rg.RUSettings.RU.request(now, requiredToken, targetPeriodMs, clientUniqueID) return &rmpb.GrantedRUTokenBucket{GrantedTokens: tb, TrickleTimeMs: trickleTimeMs} } diff --git a/pkg/mcs/resourcemanager/server/token_buckets.go b/pkg/mcs/resourcemanager/server/token_buckets.go index 5efab52fe68..a0acba3b54d 100644 --- a/pkg/mcs/resourcemanager/server/token_buckets.go +++ b/pkg/mcs/resourcemanager/server/token_buckets.go @@ -268,7 +268,7 @@ func (gtb *GroupTokenBucket) init(now time.Time, clientID uint64) { } // updateTokens updates the tokens and settings. -func (gtb *GroupTokenBucket) updateTokens(now time.Time, burstLimit int64, clientUniqueID uint64, consumptionToken float64) { +func (gtb *GroupTokenBucket) updateTokens(now time.Time, burstLimit int64, clientUniqueID uint64, requiredToken float64) { var elapseTokens float64 if !gtb.Initialized { gtb.init(now, clientUniqueID) @@ -288,21 +288,21 @@ func (gtb *GroupTokenBucket) updateTokens(now time.Time, burstLimit int64, clien gtb.Tokens = burst } // Balance each slots. - gtb.balanceSlotTokens(clientUniqueID, gtb.Settings, consumptionToken, elapseTokens) + gtb.balanceSlotTokens(clientUniqueID, gtb.Settings, requiredToken, elapseTokens) } // request requests tokens from the corresponding slot. func (gtb *GroupTokenBucket) request(now time.Time, - neededTokens float64, + requiredToken float64, targetPeriodMs, clientUniqueID uint64, ) (*rmpb.TokenBucket, int64) { burstLimit := gtb.Settings.GetBurstLimit() - gtb.updateTokens(now, burstLimit, clientUniqueID, neededTokens) + gtb.updateTokens(now, burstLimit, clientUniqueID, requiredToken) slot, ok := gtb.tokenSlots[clientUniqueID] if !ok { return &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{BurstLimit: burstLimit}}, 0 } - res, trickleDuration := slot.assignSlotTokens(neededTokens, targetPeriodMs) + res, trickleDuration := slot.assignSlotTokens(requiredToken, targetPeriodMs) // Update bucket to record all tokens. gtb.Tokens -= slot.lastTokenCapacity - slot.tokenCapacity slot.lastTokenCapacity = slot.tokenCapacity @@ -310,24 +310,24 @@ func (gtb *GroupTokenBucket) request(now time.Time, return res, trickleDuration } -func (ts *TokenSlot) assignSlotTokens(neededTokens float64, targetPeriodMs uint64) (*rmpb.TokenBucket, int64) { +func (ts *TokenSlot) assignSlotTokens(requiredToken float64, targetPeriodMs uint64) (*rmpb.TokenBucket, int64) { var res rmpb.TokenBucket burstLimit := ts.settings.GetBurstLimit() res.Settings = &rmpb.TokenLimitSettings{BurstLimit: burstLimit} // If BurstLimit < 0, just return. if burstLimit < 0 { - res.Tokens = neededTokens + res.Tokens = requiredToken return &res, 0 } // FillRate is used for the token server unavailable in abnormal situation. - if neededTokens <= 0 { + if requiredToken <= 0 { return &res, 0 } // If the current tokens can directly meet the requirement, returns the need token. - if ts.tokenCapacity >= neededTokens { - ts.tokenCapacity -= neededTokens + if ts.tokenCapacity >= requiredToken { + ts.tokenCapacity -= requiredToken // granted the total request tokens - res.Tokens = neededTokens + res.Tokens = requiredToken return &res, 0 } @@ -336,7 +336,7 @@ func (ts *TokenSlot) assignSlotTokens(neededTokens float64, targetPeriodMs uint6 hasRemaining := false if ts.tokenCapacity > 0 { grantedTokens = ts.tokenCapacity - neededTokens -= grantedTokens + requiredToken -= grantedTokens ts.tokenCapacity = 0 hasRemaining = true } @@ -373,36 +373,36 @@ func (ts *TokenSlot) assignSlotTokens(neededTokens float64, targetPeriodMs uint6 for i := 1; i < loanCoefficient; i++ { p[i] = float64(loanCoefficient-i)*float64(fillRate)*targetPeriodTimeSec + p[i-1] } - for i := 0; i < loanCoefficient && neededTokens > 0 && trickleTime < targetPeriodTimeSec; i++ { + for i := 0; i < loanCoefficient && requiredToken > 0 && trickleTime < targetPeriodTimeSec; i++ { loan := -ts.tokenCapacity if loan >= p[i] { continue } roundReserveTokens := p[i] - loan fillRate := float64(loanCoefficient-i) * float64(fillRate) - if roundReserveTokens > neededTokens { - ts.tokenCapacity -= neededTokens - grantedTokens += neededTokens + if roundReserveTokens > requiredToken { + ts.tokenCapacity -= requiredToken + grantedTokens += requiredToken trickleTime += grantedTokens / fillRate - neededTokens = 0 + requiredToken = 0 } else { roundReserveTime := roundReserveTokens / fillRate if roundReserveTime+trickleTime >= targetPeriodTimeSec { roundTokens := (targetPeriodTimeSec - trickleTime) * fillRate - neededTokens -= roundTokens + requiredToken -= roundTokens ts.tokenCapacity -= roundTokens grantedTokens += roundTokens trickleTime = targetPeriodTimeSec } else { grantedTokens += roundReserveTokens - neededTokens -= roundReserveTokens + requiredToken -= roundReserveTokens ts.tokenCapacity -= roundReserveTokens trickleTime += roundReserveTime } } } - if neededTokens > 0 && grantedTokens < defaultReserveRatio*float64(fillRate)*targetPeriodTimeSec { - reservedTokens := math.Min(neededTokens+grantedTokens, defaultReserveRatio*float64(fillRate)*targetPeriodTimeSec) + if requiredToken > 0 && grantedTokens < defaultReserveRatio*float64(fillRate)*targetPeriodTimeSec { + reservedTokens := math.Min(requiredToken+grantedTokens, defaultReserveRatio*float64(fillRate)*targetPeriodTimeSec) ts.tokenCapacity -= reservedTokens - grantedTokens grantedTokens = reservedTokens } diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index 39be00ef9a0..9581c227741 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -15,7 +15,6 @@ package apis import ( - "fmt" "net/http" "strconv" "sync" @@ -26,11 +25,13 @@ import ( "github.com/gin-gonic/gin" "github.com/joho/godotenv" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server" mcsutils "github.com/tikv/pd/pkg/mcs/utils" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/handler" "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/apiutil" @@ -110,6 +111,7 @@ func NewService(srv *scheserver.Service) *Service { rd: createIndentRender(), } s.RegisterAdminRouter() + s.RegisterConfigRouter() s.RegisterOperatorsRouter() s.RegisterSchedulersRouter() s.RegisterCheckersRouter() @@ -121,6 +123,14 @@ func NewService(srv *scheserver.Service) *Service { func (s *Service) RegisterAdminRouter() { router := s.root.Group("admin") router.PUT("/log", changeLogLevel) + router.DELETE("cache/regions", deleteAllRegionCache) + router.DELETE("cache/regions/:id", deleteRegionCacheByID) +} + +// RegisterConfigRouter registers the router of the config handler. +func (s *Service) RegisterConfigRouter() { + router := s.root.Group("config") + router.GET("", getConfig) } // RegisterSchedulersRouter registers the router of the schedulers handler. @@ -128,6 +138,8 @@ func (s *Service) RegisterSchedulersRouter() { router := s.root.Group("schedulers") router.GET("", getSchedulers) router.GET("/diagnostic/:name", getDiagnosticResult) + router.GET("/config", getSchedulerConfig) + router.GET("/config/:name/list", getSchedulerConfigByName) // TODO: in the future, we should split pauseOrResumeScheduler to two different APIs. // And we need to do one-to-two forwarding in the API middleware. router.POST("/:name", pauseOrResumeScheduler) @@ -160,6 +172,11 @@ func (s *Service) RegisterOperatorsRouter() { router.GET("/records", getOperatorRecords) } +// @Tags admin +// @Summary Change the log level. +// @Produce json +// @Success 200 {string} string "The log level is updated." +// @Router /admin/log [put] func changeLogLevel(c *gin.Context) { svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) var level string @@ -176,6 +193,58 @@ func changeLogLevel(c *gin.Context) { c.String(http.StatusOK, "The log level is updated.") } +// @Tags config +// @Summary Get full config. +// @Produce json +// @Success 200 {object} config.Config +// @Router /config [get] +func getConfig(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + cfg := svr.GetConfig() + cfg.Schedule.MaxMergeRegionKeys = cfg.Schedule.GetMaxMergeRegionKeys() + c.IndentedJSON(http.StatusOK, cfg) +} + +// @Tags admin +// @Summary Drop all regions from cache. +// @Produce json +// @Success 200 {string} string "All regions are removed from server cache." +// @Router /admin/cache/regions [delete] +func deleteAllRegionCache(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + cluster := svr.GetCluster() + if cluster == nil { + c.String(http.StatusInternalServerError, errs.ErrNotBootstrapped.GenWithStackByArgs().Error()) + return + } + cluster.DropCacheAllRegion() + c.String(http.StatusOK, "All regions are removed from server cache.") +} + +// @Tags admin +// @Summary Drop a specific region from cache. +// @Param id path integer true "Region Id" +// @Produce json +// @Success 200 {string} string "The region is removed from server cache." +// @Failure 400 {string} string "The input is invalid." +// @Router /admin/cache/regions/{id} [delete] +func deleteRegionCacheByID(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + cluster := svr.GetCluster() + if cluster == nil { + c.String(http.StatusInternalServerError, errs.ErrNotBootstrapped.GenWithStackByArgs().Error()) + return + } + regionIDStr := c.Param("id") + regionID, err := strconv.ParseUint(regionIDStr, 10, 64) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + cluster.DropCacheRegion(regionID) + c.String(http.StatusOK, "The region is removed from server cache.") +} + // @Tags operators // @Summary Get an operator by ID. // @Param region_id path int true "A Region's Id" @@ -385,6 +454,60 @@ func getSchedulers(c *gin.Context) { c.IndentedJSON(http.StatusOK, output) } +// @Tags schedulers +// @Summary List all scheduler configs. +// @Produce json +// @Success 200 {object} map[string]interface{} +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /schedulers/config/ [get] +func getSchedulerConfig(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + sc, err := handler.GetSchedulersController() + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + sches, configs, err := sc.GetAllSchedulerConfigs() + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, schedulers.ToPayload(sches, configs)) +} + +// @Tags schedulers +// @Summary List scheduler config by name. +// @Produce json +// @Success 200 {object} map[string]interface{} +// @Failure 404 {string} string scheduler not found +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /schedulers/config/{name}/list [get] +func getSchedulerConfigByName(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + sc, err := handler.GetSchedulersController() + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + handlers := sc.GetSchedulerHandlers() + name := c.Param("name") + if _, ok := handlers[name]; !ok { + c.String(http.StatusNotFound, errs.ErrSchedulerNotFound.GenWithStackByArgs().Error()) + return + } + isDisabled, err := sc.IsSchedulerDisabled(name) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + if isDisabled { + c.String(http.StatusNotFound, errs.ErrSchedulerNotFound.GenWithStackByArgs().Error()) + return + } + c.Request.URL.Path = "/list" + handlers[name].ServeHTTP(c.Writer, c.Request) +} + // @Tags schedulers // @Summary List schedulers diagnostic result. // @Produce json @@ -475,7 +598,7 @@ func getHotRegions(typ utils.RWType, c *gin.Context) { for _, storeID := range storeIDs { id, err := strconv.ParseUint(storeID, 10, 64) if err != nil { - c.String(http.StatusBadRequest, fmt.Sprintf("invalid store id: %s", storeID)) + c.String(http.StatusBadRequest, errs.ErrInvalidStoreID.FastGenByArgs(storeID).Error()) return } _, err = handler.GetStore(id) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index e4983eca7ea..028c2a12b37 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -52,7 +52,10 @@ type Cluster struct { running atomic.Bool } -const regionLabelGCInterval = time.Hour +const ( + regionLabelGCInterval = time.Hour + requestTimeout = 3 * time.Second +) // NewCluster creates a new cluster. func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, storage storage.Storage, basicCluster *core.BasicCluster, hbStreams *hbstream.HeartbeatStreams, clusterID uint64, checkMembershipCh chan struct{}) (*Cluster, error) { @@ -199,9 +202,11 @@ func (c *Cluster) AllocID() (uint64, error) { if err != nil { return 0, err } - resp, err := client.AllocID(c.ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}}) + ctx, cancel := context.WithTimeout(c.ctx, requestTimeout) + defer cancel() + resp, err := client.AllocID(ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}}) if err != nil { - c.checkMembershipCh <- struct{}{} + c.triggerMembershipCheck() return 0, err } return resp.GetId(), nil @@ -210,12 +215,19 @@ func (c *Cluster) AllocID() (uint64, error) { func (c *Cluster) getAPIServerLeaderClient() (pdpb.PDClient, error) { cli := c.apiServerLeader.Load() if cli == nil { - c.checkMembershipCh <- struct{}{} + c.triggerMembershipCheck() return nil, errors.New("API server leader is not found") } return cli.(pdpb.PDClient), nil } +func (c *Cluster) triggerMembershipCheck() { + select { + case c.checkMembershipCh <- struct{}{}: + default: // avoid blocking + } +} + // SwitchAPIServerLeader switches the API server leader. func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool { old := c.apiServerLeader.Load() @@ -581,3 +593,13 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { func (c *Cluster) IsPrepared() bool { return c.coordinator.GetPrepareChecker().IsPrepared() } + +// DropCacheAllRegion removes all cached regions. +func (c *Cluster) DropCacheAllRegion() { + c.ResetRegionCache() +} + +// DropCacheRegion removes a region from the cache. +func (c *Cluster) DropCacheRegion(id uint64) { + c.RemoveRegionIfExist(id) +} diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index 4f9caca41e6..772eab835f1 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -61,9 +61,9 @@ type Config struct { Metric metricutil.MetricConfig `toml:"metric" json:"metric"` // Log related config. - Log log.Config `toml:"log" json:"log"` - Logger *zap.Logger - LogProps *log.ZapProperties + Log log.Config `toml:"log" json:"log"` + Logger *zap.Logger `json:"-"` + LogProps *log.ZapProperties `json:"-"` Security configutil.SecurityConfig `toml:"security" json:"security"` @@ -195,6 +195,13 @@ func (c *Config) validate() error { return nil } +// Clone creates a copy of current config. +func (c *Config) Clone() *Config { + cfg := &Config{} + *cfg = *c + return cfg +} + // PersistConfig wraps all configurations that need to persist to storage and // allows to access them safely. type PersistConfig struct { diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 9caae932037..1790cb2b4be 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -510,6 +510,23 @@ func (s *Server) GetPersistConfig() *config.PersistConfig { return s.persistConfig } +// GetConfig gets the config. +func (s *Server) GetConfig() *config.Config { + cfg := s.cfg.Clone() + cfg.Schedule = *s.persistConfig.GetScheduleConfig().Clone() + cfg.Replication = *s.persistConfig.GetReplicationConfig().Clone() + cfg.ClusterVersion = *s.persistConfig.GetClusterVersion() + if s.storage == nil { + return cfg + } + sches, configs, err := s.storage.LoadAllSchedulerConfigs() + if err != nil { + return cfg + } + cfg.Schedule.SchedulersPayload = schedulers.ToPayload(sches, configs) + return cfg +} + // CreateServer creates the Server func CreateServer(ctx context.Context, cfg *config.Config) *Server { svr := &Server{ diff --git a/pkg/schedule/checker/rule_checker.go b/pkg/schedule/checker/rule_checker.go index b0537bf9ce4..7012359ca36 100644 --- a/pkg/schedule/checker/rule_checker.go +++ b/pkg/schedule/checker/rule_checker.go @@ -479,6 +479,13 @@ loopFits: hasUnhealthyFit = true break loopFits } + // avoid to meet down store when fix orpahn peers, + // Isdisconnected is more strictly than IsUnhealthy. + if c.cluster.GetStore(p.GetStoreId()).IsDisconnected() { + hasUnhealthyFit = true + pinDownPeer = p + break loopFits + } } } @@ -491,6 +498,9 @@ loopFits: // try to use orphan peers to replace unhealthy down peers. for _, orphanPeer := range fit.OrphanPeers { if pinDownPeer != nil { + if pinDownPeer.GetId() == orphanPeer.GetId() { + continue + } // make sure the orphan peer is healthy. if isUnhealthyPeer(orphanPeer.GetId()) { continue @@ -514,6 +524,9 @@ loopFits: return operator.CreatePromoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer) case orphanPeerRole == metapb.PeerRole_Voter && destRole == metapb.PeerRole_Learner: return operator.CreateDemoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer) + case orphanPeerRole == metapb.PeerRole_Voter && destRole == metapb.PeerRole_Voter && + c.cluster.GetStore(pinDownPeer.GetStoreId()).IsDisconnected() && !dstStore.IsDisconnected(): + return operator.CreateRemovePeerOperator("remove-replaced-orphan-peer", c.cluster, 0, region, pinDownPeer.GetStoreId()) default: // destRole should not same with orphanPeerRole. if role is same, it fit with orphanPeer should be better than now. // destRole never be leader, so we not consider it. diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index fca43f3eeeb..45b0eaf502f 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -765,13 +765,22 @@ func (h *Handler) GetCheckerStatus(name string) (map[string]bool, error) { }, nil } -// GetSchedulerNames returns all names of schedulers. -func (h *Handler) GetSchedulerNames() ([]string, error) { +// GetSchedulersController returns controller of schedulers. +func (h *Handler) GetSchedulersController() (*schedulers.Controller, error) { co := h.GetCoordinator() if co == nil { return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() } - return co.GetSchedulersController().GetSchedulerNames(), nil + return co.GetSchedulersController(), nil +} + +// GetSchedulerNames returns all names of schedulers. +func (h *Handler) GetSchedulerNames() ([]string, error) { + sc, err := h.GetSchedulersController() + if err != nil { + return nil, err + } + return sc.GetSchedulerNames(), nil } type schedulerPausedPeriod struct { @@ -782,11 +791,10 @@ type schedulerPausedPeriod struct { // GetSchedulerByStatus returns all names of schedulers by status. func (h *Handler) GetSchedulerByStatus(status string, needTS bool) (interface{}, error) { - co := h.GetCoordinator() - if co == nil { - return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + sc, err := h.GetSchedulersController() + if err != nil { + return nil, err } - sc := co.GetSchedulersController() schedulers := sc.GetSchedulerNames() switch status { case "paused": @@ -837,7 +845,20 @@ func (h *Handler) GetSchedulerByStatus(status string, needTS bool) (interface{}, } return disabledSchedulers, nil default: - return schedulers, nil + // The default scheduler could not be deleted in scheduling server, + // so schedulers could only be disabled. + // We should not return the disabled schedulers here. + var enabledSchedulers []string + for _, scheduler := range schedulers { + disabled, err := sc.IsSchedulerDisabled(scheduler) + if err != nil { + return nil, err + } + if !disabled { + enabledSchedulers = append(enabledSchedulers, scheduler) + } + } + return enabledSchedulers, nil } } @@ -861,11 +882,11 @@ func (h *Handler) GetDiagnosticResult(name string) (*schedulers.DiagnosticResult // t == 0 : resume scheduler. // t > 0 : scheduler delays t seconds. func (h *Handler) PauseOrResumeScheduler(name string, t int64) (err error) { - co := h.GetCoordinator() - if co == nil { - return errs.ErrNotBootstrapped.GenWithStackByArgs() + sc, err := h.GetSchedulersController() + if err != nil { + return err } - if err = co.GetSchedulersController().PauseOrResumeScheduler(name, t); err != nil { + if err = sc.PauseOrResumeScheduler(name, t); err != nil { if t == 0 { log.Error("can not resume scheduler", zap.String("scheduler-name", name), errs.ZapError(err)) } else { diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index c353621bb7f..4806180e450 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -758,6 +758,9 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool { dstStoreID := uint64(0) if isSplit { region := bs.GetRegion(bs.ops[0].RegionID()) + if region == nil { + return false + } for id := range region.GetStoreIDs() { srcStoreIDs = append(srcStoreIDs, id) } diff --git a/pkg/schedule/schedulers/scheduler.go b/pkg/schedule/schedulers/scheduler.go index ba02c280d40..9262f7d0a65 100644 --- a/pkg/schedule/schedulers/scheduler.go +++ b/pkg/schedule/schedulers/scheduler.go @@ -66,6 +66,24 @@ func DecodeConfig(data []byte, v interface{}) error { return nil } +// ToPayload returns the payload of config. +func ToPayload(sches, configs []string) map[string]interface{} { + payload := make(map[string]interface{}) + for i, sche := range sches { + var config interface{} + err := DecodeConfig([]byte(configs[i]), &config) + if err != nil { + log.Error("failed to decode scheduler config", + zap.String("config", configs[i]), + zap.String("scheduler", sche), + errs.ZapError(err)) + continue + } + payload[sche] = config + } + return payload +} + // ConfigDecoder used to decode the config. type ConfigDecoder func(v interface{}) error diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index 0f2264392aa..79c8cbfbc92 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -418,6 +418,11 @@ func (c *Controller) CheckTransferWitnessLeader(region *core.RegionInfo) { } } +// GetAllSchedulerConfigs returns all scheduler configs. +func (c *Controller) GetAllSchedulerConfigs() ([]string, []string, error) { + return c.storage.LoadAllSchedulerConfigs() +} + // ScheduleController is used to manage a scheduler. type ScheduleController struct { Scheduler diff --git a/pkg/schedule/schedulers/shuffle_region_config.go b/pkg/schedule/schedulers/shuffle_region_config.go index f503a6f67c7..7d04879c992 100644 --- a/pkg/schedule/schedulers/shuffle_region_config.go +++ b/pkg/schedule/schedulers/shuffle_region_config.go @@ -69,6 +69,7 @@ func (conf *shuffleRegionSchedulerConfig) IsRoleAllow(role string) bool { func (conf *shuffleRegionSchedulerConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) { router := mux.NewRouter() + router.HandleFunc("/list", conf.handleGetRoles).Methods(http.MethodGet) router.HandleFunc("/roles", conf.handleGetRoles).Methods(http.MethodGet) router.HandleFunc("/roles", conf.handleSetRoles).Methods(http.MethodPost) router.ServeHTTP(w, r) diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index 5e75bded9b4..a08c84372b5 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -262,6 +262,9 @@ func (s *splitBucketScheduler) splitBucket(plan *splitBucketPlan) []*operator.Op } if splitBucket != nil { region := plan.cluster.GetRegion(splitBucket.RegionID) + if region == nil { + return nil + } splitKey := make([][]byte, 0) if bytes.Compare(region.GetStartKey(), splitBucket.StartKey) < 0 { splitKey = append(splitKey, splitBucket.StartKey) @@ -269,7 +272,7 @@ func (s *splitBucketScheduler) splitBucket(plan *splitBucketPlan) []*operator.Op if bytes.Compare(region.GetEndKey(), splitBucket.EndKey) > 0 { splitKey = append(splitKey, splitBucket.EndKey) } - op, err := operator.CreateSplitRegionOperator(SplitBucketType, plan.cluster.GetRegion(splitBucket.RegionID), operator.OpSplit, + op, err := operator.CreateSplitRegionOperator(SplitBucketType, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, splitKey) if err != nil { splitBucketCreateOpeartorFailCounter.Inc() diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index c79eb0a3132..26cbea9ef92 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -119,7 +119,11 @@ func (r *RegionStatistics) GetRegionStatsByType(typ RegionStatisticType) []*core defer r.RUnlock() res := make([]*core.RegionInfo, 0, len(r.stats[typ])) for regionID := range r.stats[typ] { - res = append(res, r.rip.GetRegion(regionID).Clone()) + region := r.rip.GetRegion(regionID) + if region == nil { + continue + } + res = append(res, region.Clone()) } return res } diff --git a/pkg/utils/testutil/api_check.go b/pkg/utils/testutil/api_check.go index 84af97f828d..4ce5e859f3f 100644 --- a/pkg/utils/testutil/api_check.go +++ b/pkg/utils/testutil/api_check.go @@ -37,29 +37,29 @@ func StatusOK(re *require.Assertions) func([]byte, int, http.Header) { // StatusNotOK is used to check whether http response code is not equal http.StatusOK. func StatusNotOK(re *require.Assertions) func([]byte, int, http.Header) { - return func(_ []byte, i int, _ http.Header) { - re.NotEqual(http.StatusOK, i) + return func(resp []byte, i int, _ http.Header) { + re.NotEqual(http.StatusOK, i, "resp: "+string(resp)) } } // ExtractJSON is used to check whether given data can be extracted successfully. func ExtractJSON(re *require.Assertions, data interface{}) func([]byte, int, http.Header) { - return func(res []byte, _ int, _ http.Header) { - re.NoError(json.Unmarshal(res, data)) + return func(resp []byte, _ int, _ http.Header) { + re.NoError(json.Unmarshal(resp, data), "resp: "+string(resp)) } } // StringContain is used to check whether response context contains given string. func StringContain(re *require.Assertions, sub string) func([]byte, int, http.Header) { - return func(res []byte, _ int, _ http.Header) { - re.Contains(string(res), sub) + return func(resp []byte, _ int, _ http.Header) { + re.Contains(string(resp), sub, "resp: "+string(resp)) } } // StringEqual is used to check whether response context equal given string. func StringEqual(re *require.Assertions, str string) func([]byte, int, http.Header) { - return func(res []byte, _ int, _ http.Header) { - re.Contains(string(res), str) + return func(resp []byte, _ int, _ http.Header) { + re.Contains(string(resp), str, "resp: "+string(resp)) } } diff --git a/server/api/admin.go b/server/api/admin.go index 7a1dfb0f1e8..246c9239f59 100644 --- a/server/api/admin.go +++ b/server/api/admin.go @@ -16,6 +16,7 @@ package api import ( "encoding/json" + "fmt" "io" "net/http" "strconv" @@ -24,6 +25,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/server" "github.com/unrolled/render" @@ -59,7 +61,11 @@ func (h *adminHandler) DeleteRegionCache(w http.ResponseWriter, r *http.Request) return } rc.DropCacheRegion(regionID) - h.rd.JSON(w, http.StatusOK, "The region is removed from server cache.") + if h.svr.IsAPIServiceMode() { + err = h.DeleteRegionCacheInSchedulingServer(regionID) + } + msg := "The region is removed from server cache." + h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err)) } // @Tags admin @@ -95,8 +101,11 @@ func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Reques } // Remove region from cache. rc.DropCacheRegion(regionID) - - h.rd.JSON(w, http.StatusOK, "The region is removed from server cache and region meta storage.") + if h.svr.IsAPIServiceMode() { + err = h.DeleteRegionCacheInSchedulingServer(regionID) + } + msg := "The region is removed from server cache and region meta storage." + h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err)) } // @Tags admin @@ -105,9 +114,14 @@ func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Reques // @Success 200 {string} string "All regions are removed from server cache." // @Router /admin/cache/regions [delete] func (h *adminHandler) DeleteAllRegionCache(w http.ResponseWriter, r *http.Request) { + var err error rc := getCluster(r) rc.DropCacheAllRegion() - h.rd.JSON(w, http.StatusOK, "All regions are removed from server cache.") + if h.svr.IsAPIServiceMode() { + err = h.DeleteRegionCacheInSchedulingServer() + } + msg := "All regions are removed from server cache." + h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err)) } // Intentionally no swagger mark as it is supposed to be only used in @@ -200,3 +214,35 @@ func (h *adminHandler) RecoverAllocID(w http.ResponseWriter, r *http.Request) { _ = h.rd.Text(w, http.StatusOK, "") } + +func (h *adminHandler) DeleteRegionCacheInSchedulingServer(id ...uint64) error { + addr, ok := h.svr.GetServicePrimaryAddr(h.svr.Context(), utils.SchedulingServiceName) + if !ok { + return errs.ErrNotFoundSchedulingAddr.FastGenByArgs() + } + var idStr string + if len(id) > 0 { + idStr = strconv.FormatUint(id[0], 10) + } + url := fmt.Sprintf("%s/scheduling/api/v1/admin/cache/regions/%s", addr, idStr) + req, err := http.NewRequest(http.MethodDelete, url, nil) + if err != nil { + return err + } + resp, err := h.svr.GetHTTPClient().Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return errs.ErrSchedulingServer.FastGenByArgs(resp.StatusCode) + } + return nil +} + +func (h *adminHandler) buildMsg(msg string, err error) string { + if h.svr.IsAPIServiceMode() && err != nil { + return fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error()) + } + return msg +} diff --git a/server/api/config.go b/server/api/config.go index c63bd953c37..746b1119a73 100644 --- a/server/api/config.go +++ b/server/api/config.go @@ -27,6 +27,8 @@ import ( "github.com/pingcap/errcode" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/utils" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/jsonutil" @@ -60,7 +62,17 @@ func newConfHandler(svr *server.Server, rd *render.Render) *confHandler { // @Router /config [get] func (h *confHandler) GetConfig(w http.ResponseWriter, r *http.Request) { cfg := h.svr.GetConfig() - cfg.Schedule.MaxMergeRegionKeys = cfg.Schedule.GetMaxMergeRegionKeys() + if h.svr.IsAPIServiceMode() { + schedulingServerConfig, err := h.GetSchedulingServerConfig() + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + cfg.Schedule = schedulingServerConfig.Schedule + cfg.Replication = schedulingServerConfig.Replication + } else { + cfg.Schedule.MaxMergeRegionKeys = cfg.Schedule.GetMaxMergeRegionKeys() + } h.rd.JSON(w, http.StatusOK, cfg) } @@ -301,6 +313,16 @@ func getConfigMap(cfg map[string]interface{}, key []string, value interface{}) m // @Success 200 {object} sc.ScheduleConfig // @Router /config/schedule [get] func (h *confHandler) GetScheduleConfig(w http.ResponseWriter, r *http.Request) { + if h.svr.IsAPIServiceMode() { + cfg, err := h.GetSchedulingServerConfig() + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + cfg.Schedule.SchedulersPayload = nil + h.rd.JSON(w, http.StatusOK, cfg.Schedule) + return + } cfg := h.svr.GetScheduleConfig() cfg.MaxMergeRegionKeys = cfg.GetMaxMergeRegionKeys() h.rd.JSON(w, http.StatusOK, cfg) @@ -364,6 +386,15 @@ func (h *confHandler) SetScheduleConfig(w http.ResponseWriter, r *http.Request) // @Success 200 {object} sc.ReplicationConfig // @Router /config/replicate [get] func (h *confHandler) GetReplicationConfig(w http.ResponseWriter, r *http.Request) { + if h.svr.IsAPIServiceMode() { + cfg, err := h.GetSchedulingServerConfig() + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, cfg.Replication) + return + } h.rd.JSON(w, http.StatusOK, h.svr.GetReplicationConfig()) } @@ -505,3 +536,33 @@ func (h *confHandler) SetReplicationModeConfig(w http.ResponseWriter, r *http.Re func (h *confHandler) GetPDServerConfig(w http.ResponseWriter, r *http.Request) { h.rd.JSON(w, http.StatusOK, h.svr.GetPDServerConfig()) } + +func (h *confHandler) GetSchedulingServerConfig() (*config.Config, error) { + addr, ok := h.svr.GetServicePrimaryAddr(h.svr.Context(), utils.SchedulingServiceName) + if !ok { + return nil, errs.ErrNotFoundSchedulingAddr.FastGenByArgs() + } + url := fmt.Sprintf("%s/scheduling/api/v1/config", addr) + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, err + } + resp, err := h.svr.GetHTTPClient().Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, errs.ErrSchedulingServer.FastGenByArgs(resp.StatusCode) + } + b, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + var schedulingServerConfig config.Config + err = json.Unmarshal(b, &schedulingServerConfig) + if err != nil { + return nil, err + } + return &schedulingServerConfig, nil +} diff --git a/server/api/config_test.go b/server/api/config_test.go deleted file mode 100644 index fbfb3f94518..00000000000 --- a/server/api/config_test.go +++ /dev/null @@ -1,440 +0,0 @@ -// Copyright 2016 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package api - -import ( - "encoding/json" - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/suite" - sc "github.com/tikv/pd/pkg/schedule/config" - tu "github.com/tikv/pd/pkg/utils/testutil" - "github.com/tikv/pd/pkg/utils/typeutil" - "github.com/tikv/pd/pkg/versioninfo" - "github.com/tikv/pd/server" - "github.com/tikv/pd/server/config" -) - -type configTestSuite struct { - suite.Suite - svr *server.Server - cleanup tu.CleanupFunc - urlPrefix string -} - -func TestConfigTestSuite(t *testing.T) { - suite.Run(t, new(configTestSuite)) -} - -func (suite *configTestSuite) SetupSuite() { - re := suite.Require() - suite.svr, suite.cleanup = mustNewServer(re, func(cfg *config.Config) { - cfg.Replication.EnablePlacementRules = false - }) - server.MustWaitLeader(re, []*server.Server{suite.svr}) - - addr := suite.svr.GetAddr() - suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix) -} - -func (suite *configTestSuite) TearDownSuite() { - suite.cleanup() -} - -func (suite *configTestSuite) TestConfigAll() { - re := suite.Require() - addr := fmt.Sprintf("%s/config", suite.urlPrefix) - cfg := &config.Config{} - err := tu.ReadGetJSON(re, testDialClient, addr, cfg) - suite.NoError(err) - - // the original way - r := map[string]int{"max-replicas": 5} - postData, err := json.Marshal(r) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) - suite.NoError(err) - l := map[string]interface{}{ - "location-labels": "zone,rack", - "region-schedule-limit": 10, - } - postData, err = json.Marshal(l) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) - suite.NoError(err) - - l = map[string]interface{}{ - "metric-storage": "http://127.0.0.1:9090", - } - postData, err = json.Marshal(l) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) - suite.NoError(err) - - newCfg := &config.Config{} - err = tu.ReadGetJSON(re, testDialClient, addr, newCfg) - suite.NoError(err) - cfg.Replication.MaxReplicas = 5 - cfg.Replication.LocationLabels = []string{"zone", "rack"} - cfg.Schedule.RegionScheduleLimit = 10 - cfg.PDServerCfg.MetricStorage = "http://127.0.0.1:9090" - suite.Equal(newCfg, cfg) - - // the new way - l = map[string]interface{}{ - "schedule.tolerant-size-ratio": 2.5, - "schedule.enable-tikv-split-region": "false", - "replication.location-labels": "idc,host", - "pd-server.metric-storage": "http://127.0.0.1:1234", - "log.level": "warn", - "cluster-version": "v4.0.0-beta", - "replication-mode.replication-mode": "dr-auto-sync", - "replication-mode.dr-auto-sync.label-key": "foobar", - } - postData, err = json.Marshal(l) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) - suite.NoError(err) - newCfg1 := &config.Config{} - err = tu.ReadGetJSON(re, testDialClient, addr, newCfg1) - suite.NoError(err) - cfg.Schedule.EnableTiKVSplitRegion = false - cfg.Schedule.TolerantSizeRatio = 2.5 - cfg.Replication.LocationLabels = []string{"idc", "host"} - cfg.PDServerCfg.MetricStorage = "http://127.0.0.1:1234" - cfg.Log.Level = "warn" - cfg.ReplicationMode.DRAutoSync.LabelKey = "foobar" - cfg.ReplicationMode.ReplicationMode = "dr-auto-sync" - v, err := versioninfo.ParseVersion("v4.0.0-beta") - suite.NoError(err) - cfg.ClusterVersion = *v - suite.Equal(cfg, newCfg1) - - // revert this to avoid it affects TestConfigTTL - l["schedule.enable-tikv-split-region"] = "true" - postData, err = json.Marshal(l) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) - suite.NoError(err) - - // illegal prefix - l = map[string]interface{}{ - "replicate.max-replicas": 1, - } - postData, err = json.Marshal(l) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, addr, postData, - tu.StatusNotOK(re), - tu.StringContain(re, "not found")) - suite.NoError(err) - - // update prefix directly - l = map[string]interface{}{ - "replication-mode": nil, - } - postData, err = json.Marshal(l) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, addr, postData, - tu.StatusNotOK(re), - tu.StringContain(re, "cannot update config prefix")) - suite.NoError(err) - - // config item not found - l = map[string]interface{}{ - "schedule.region-limit": 10, - } - postData, err = json.Marshal(l) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusNotOK(re), tu.StringContain(re, "not found")) - suite.NoError(err) -} - -func (suite *configTestSuite) TestConfigSchedule() { - re := suite.Require() - addr := fmt.Sprintf("%s/config/schedule", suite.urlPrefix) - scheduleConfig := &sc.ScheduleConfig{} - suite.NoError(tu.ReadGetJSON(re, testDialClient, addr, scheduleConfig)) - scheduleConfig.MaxStoreDownTime.Duration = time.Second - postData, err := json.Marshal(scheduleConfig) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) - suite.NoError(err) - - scheduleConfig1 := &sc.ScheduleConfig{} - suite.NoError(tu.ReadGetJSON(re, testDialClient, addr, scheduleConfig1)) - suite.Equal(*scheduleConfig1, *scheduleConfig) -} - -func (suite *configTestSuite) TestConfigReplication() { - re := suite.Require() - addr := fmt.Sprintf("%s/config/replicate", suite.urlPrefix) - rc := &sc.ReplicationConfig{} - err := tu.ReadGetJSON(re, testDialClient, addr, rc) - suite.NoError(err) - - rc.MaxReplicas = 5 - rc1 := map[string]int{"max-replicas": 5} - postData, err := json.Marshal(rc1) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) - suite.NoError(err) - - rc.LocationLabels = []string{"zone", "rack"} - rc2 := map[string]string{"location-labels": "zone,rack"} - postData, err = json.Marshal(rc2) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) - suite.NoError(err) - - rc.IsolationLevel = "zone" - rc3 := map[string]string{"isolation-level": "zone"} - postData, err = json.Marshal(rc3) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) - suite.NoError(err) - - rc4 := &sc.ReplicationConfig{} - err = tu.ReadGetJSON(re, testDialClient, addr, rc4) - suite.NoError(err) - - suite.Equal(*rc4, *rc) -} - -func (suite *configTestSuite) TestConfigLabelProperty() { - re := suite.Require() - addr := suite.svr.GetAddr() + apiPrefix + "/api/v1/config/label-property" - loadProperties := func() config.LabelPropertyConfig { - var cfg config.LabelPropertyConfig - err := tu.ReadGetJSON(re, testDialClient, addr, &cfg) - suite.NoError(err) - return cfg - } - - cfg := loadProperties() - suite.Empty(cfg) - - cmds := []string{ - `{"type": "foo", "action": "set", "label-key": "zone", "label-value": "cn1"}`, - `{"type": "foo", "action": "set", "label-key": "zone", "label-value": "cn2"}`, - `{"type": "bar", "action": "set", "label-key": "host", "label-value": "h1"}`, - } - for _, cmd := range cmds { - err := tu.CheckPostJSON(testDialClient, addr, []byte(cmd), tu.StatusOK(re)) - suite.NoError(err) - } - - cfg = loadProperties() - suite.Len(cfg, 2) - suite.Equal([]config.StoreLabel{ - {Key: "zone", Value: "cn1"}, - {Key: "zone", Value: "cn2"}, - }, cfg["foo"]) - suite.Equal([]config.StoreLabel{{Key: "host", Value: "h1"}}, cfg["bar"]) - - cmds = []string{ - `{"type": "foo", "action": "delete", "label-key": "zone", "label-value": "cn1"}`, - `{"type": "bar", "action": "delete", "label-key": "host", "label-value": "h1"}`, - } - for _, cmd := range cmds { - err := tu.CheckPostJSON(testDialClient, addr, []byte(cmd), tu.StatusOK(re)) - suite.NoError(err) - } - - cfg = loadProperties() - suite.Len(cfg, 1) - suite.Equal([]config.StoreLabel{{Key: "zone", Value: "cn2"}}, cfg["foo"]) -} - -func (suite *configTestSuite) TestConfigDefault() { - addr := fmt.Sprintf("%s/config", suite.urlPrefix) - - r := map[string]int{"max-replicas": 5} - postData, err := json.Marshal(r) - suite.NoError(err) - re := suite.Require() - err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) - suite.NoError(err) - l := map[string]interface{}{ - "location-labels": "zone,rack", - "region-schedule-limit": 10, - } - postData, err = json.Marshal(l) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) - suite.NoError(err) - - l = map[string]interface{}{ - "metric-storage": "http://127.0.0.1:9090", - } - postData, err = json.Marshal(l) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) - suite.NoError(err) - - addr = fmt.Sprintf("%s/config/default", suite.urlPrefix) - defaultCfg := &config.Config{} - err = tu.ReadGetJSON(re, testDialClient, addr, defaultCfg) - suite.NoError(err) - - suite.Equal(uint64(3), defaultCfg.Replication.MaxReplicas) - suite.Equal(typeutil.StringSlice([]string{}), defaultCfg.Replication.LocationLabels) - suite.Equal(uint64(2048), defaultCfg.Schedule.RegionScheduleLimit) - suite.Equal("", defaultCfg.PDServerCfg.MetricStorage) -} - -func (suite *configTestSuite) TestConfigPDServer() { - re := suite.Require() - addrPost := fmt.Sprintf("%s/config", suite.urlPrefix) - ms := map[string]interface{}{ - "metric-storage": "", - } - postData, err := json.Marshal(ms) - suite.NoError(err) - suite.NoError(tu.CheckPostJSON(testDialClient, addrPost, postData, tu.StatusOK(re))) - addrGet := fmt.Sprintf("%s/config/pd-server", suite.urlPrefix) - sc := &config.PDServerConfig{} - suite.NoError(tu.ReadGetJSON(re, testDialClient, addrGet, sc)) - suite.Equal(bool(true), sc.UseRegionStorage) - suite.Equal("table", sc.KeyType) - suite.Equal(typeutil.StringSlice([]string{}), sc.RuntimeServices) - suite.Equal("", sc.MetricStorage) - suite.Equal("auto", sc.DashboardAddress) - suite.Equal(int(3), sc.FlowRoundByDigit) - suite.Equal(typeutil.NewDuration(time.Second), sc.MinResolvedTSPersistenceInterval) - suite.Equal(24*time.Hour, sc.MaxResetTSGap.Duration) -} - -var ttlConfig = map[string]interface{}{ - "schedule.max-snapshot-count": 999, - "schedule.enable-location-replacement": false, - "schedule.max-merge-region-size": 999, - "schedule.max-merge-region-keys": 999, - "schedule.scheduler-max-waiting-operator": 999, - "schedule.leader-schedule-limit": 999, - "schedule.region-schedule-limit": 999, - "schedule.hot-region-schedule-limit": 999, - "schedule.replica-schedule-limit": 999, - "schedule.merge-schedule-limit": 999, - "schedule.enable-tikv-split-region": false, -} - -var invalidTTLConfig = map[string]interface{}{ - "schedule.invalid-ttl-config": 0, -} - -func assertTTLConfig( - options *config.PersistOptions, - equality func(interface{}, interface{}, ...interface{}) bool, -) { - equality(uint64(999), options.GetMaxSnapshotCount()) - equality(false, options.IsLocationReplacementEnabled()) - equality(uint64(999), options.GetMaxMergeRegionSize()) - equality(uint64(999), options.GetMaxMergeRegionKeys()) - equality(uint64(999), options.GetSchedulerMaxWaitingOperator()) - equality(uint64(999), options.GetLeaderScheduleLimit()) - equality(uint64(999), options.GetRegionScheduleLimit()) - equality(uint64(999), options.GetHotRegionScheduleLimit()) - equality(uint64(999), options.GetReplicaScheduleLimit()) - equality(uint64(999), options.GetMergeScheduleLimit()) - equality(false, options.IsTikvRegionSplitEnabled()) -} - -func createTTLUrl(url string, ttl int) string { - return fmt.Sprintf("%s/config?ttlSecond=%d", url, ttl) -} - -func (suite *configTestSuite) TestConfigTTL() { - postData, err := json.Marshal(ttlConfig) - suite.NoError(err) - - // test no config and cleaning up - re := suite.Require() - err = tu.CheckPostJSON(testDialClient, createTTLUrl(suite.urlPrefix, 0), postData, tu.StatusOK(re)) - suite.NoError(err) - assertTTLConfig(suite.svr.GetPersistOptions(), suite.NotEqual) - - // test time goes by - err = tu.CheckPostJSON(testDialClient, createTTLUrl(suite.urlPrefix, 1), postData, tu.StatusOK(re)) - suite.NoError(err) - assertTTLConfig(suite.svr.GetPersistOptions(), suite.Equal) - time.Sleep(2 * time.Second) - assertTTLConfig(suite.svr.GetPersistOptions(), suite.NotEqual) - - // test cleaning up - err = tu.CheckPostJSON(testDialClient, createTTLUrl(suite.urlPrefix, 1), postData, tu.StatusOK(re)) - suite.NoError(err) - assertTTLConfig(suite.svr.GetPersistOptions(), suite.Equal) - err = tu.CheckPostJSON(testDialClient, createTTLUrl(suite.urlPrefix, 0), postData, tu.StatusOK(re)) - suite.NoError(err) - assertTTLConfig(suite.svr.GetPersistOptions(), suite.NotEqual) - - postData, err = json.Marshal(invalidTTLConfig) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, createTTLUrl(suite.urlPrefix, 1), postData, - tu.StatusNotOK(re), tu.StringEqual(re, "\"unsupported ttl config schedule.invalid-ttl-config\"\n")) - suite.NoError(err) - - // only set max-merge-region-size - mergeConfig := map[string]interface{}{ - "schedule.max-merge-region-size": 999, - } - postData, err = json.Marshal(mergeConfig) - suite.NoError(err) - - err = tu.CheckPostJSON(testDialClient, createTTLUrl(suite.urlPrefix, 1), postData, tu.StatusOK(re)) - suite.NoError(err) - suite.Equal(uint64(999), suite.svr.GetPersistOptions().GetMaxMergeRegionSize()) - // max-merge-region-keys should keep consistence with max-merge-region-size. - suite.Equal(uint64(999*10000), suite.svr.GetPersistOptions().GetMaxMergeRegionKeys()) - - // on invalid value, we use default config - mergeConfig = map[string]interface{}{ - "schedule.enable-tikv-split-region": "invalid", - } - postData, err = json.Marshal(mergeConfig) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, createTTLUrl(suite.urlPrefix, 1), postData, tu.StatusOK(re)) - suite.NoError(err) - suite.True(suite.svr.GetPersistOptions().IsTikvRegionSplitEnabled()) -} - -func (suite *configTestSuite) TestTTLConflict() { - addr := createTTLUrl(suite.urlPrefix, 1) - postData, err := json.Marshal(ttlConfig) - suite.NoError(err) - re := suite.Require() - err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) - suite.NoError(err) - assertTTLConfig(suite.svr.GetPersistOptions(), suite.Equal) - - cfg := map[string]interface{}{"max-snapshot-count": 30} - postData, err = json.Marshal(cfg) - suite.NoError(err) - addr = fmt.Sprintf("%s/config", suite.urlPrefix) - err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusNotOK(re), tu.StringEqual(re, "\"need to clean up TTL first for schedule.max-snapshot-count\"\n")) - suite.NoError(err) - addr = fmt.Sprintf("%s/config/schedule", suite.urlPrefix) - err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusNotOK(re), tu.StringEqual(re, "\"need to clean up TTL first for schedule.max-snapshot-count\"\n")) - suite.NoError(err) - cfg = map[string]interface{}{"schedule.max-snapshot-count": 30} - postData, err = json.Marshal(cfg) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, createTTLUrl(suite.urlPrefix, 0), postData, tu.StatusOK(re)) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) - suite.NoError(err) -} diff --git a/server/api/server.go b/server/api/server.go index ee301ea54c8..ae877b8407c 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -52,6 +52,7 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP // "/schedulers", http.MethodGet // "/schedulers/{name}", http.MethodPost // "/schedulers/diagnostic/{name}", http.MethodGet + // "/scheduler-config", http.MethodGet // "/hotspot/regions/read", http.MethodGet // "/hotspot/regions/write", http.MethodGet // "/hotspot/regions/history", http.MethodGet @@ -90,6 +91,11 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP scheapi.APIPathPrefix+"/schedulers", mcs.SchedulingServiceName, []string{http.MethodGet}), + serverapi.MicroserviceRedirectRule( + prefix+"/scheduler-config", + scheapi.APIPathPrefix+"/schedulers/config", + mcs.SchedulingServiceName, + []string{http.MethodGet}), serverapi.MicroserviceRedirectRule( prefix+"/schedulers/", // Note: this means "/schedulers/{name}" scheapi.APIPathPrefix+"/schedulers", diff --git a/server/api/service_gc_safepoint.go b/server/api/service_gc_safepoint.go index b26edaba07d..270edca58bf 100644 --- a/server/api/service_gc_safepoint.go +++ b/server/api/service_gc_safepoint.go @@ -35,8 +35,9 @@ func newServiceGCSafepointHandler(svr *server.Server, rd *render.Render) *servic } } +// ListServiceGCSafepoint is the response for list service GC safepoint. // NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. -type listServiceGCSafepoint struct { +type ListServiceGCSafepoint struct { ServiceGCSafepoints []*endpoint.ServiceSafePoint `json:"service_gc_safe_points"` GCSafePoint uint64 `json:"gc_safe_point"` } @@ -44,7 +45,7 @@ type listServiceGCSafepoint struct { // @Tags service_gc_safepoint // @Summary Get all service GC safepoint. // @Produce json -// @Success 200 {array} listServiceGCSafepoint +// @Success 200 {array} ListServiceGCSafepoint // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /gc/safepoint [get] func (h *serviceGCSafepointHandler) GetGCSafePoint(w http.ResponseWriter, r *http.Request) { @@ -59,7 +60,7 @@ func (h *serviceGCSafepointHandler) GetGCSafePoint(w http.ResponseWriter, r *htt h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - list := listServiceGCSafepoint{ + list := ListServiceGCSafepoint{ GCSafePoint: gcSafepoint, ServiceGCSafepoints: ssps, } diff --git a/server/api/service_gc_safepoint_test.go b/server/api/service_gc_safepoint_test.go index 517a94c2e23..3df9102d116 100644 --- a/server/api/service_gc_safepoint_test.go +++ b/server/api/service_gc_safepoint_test.go @@ -58,7 +58,7 @@ func (suite *serviceGCSafepointTestSuite) TestServiceGCSafepoint() { sspURL := suite.urlPrefix + "/gc/safepoint" storage := suite.svr.GetStorage() - list := &listServiceGCSafepoint{ + list := &ListServiceGCSafepoint{ ServiceGCSafepoints: []*endpoint.ServiceSafePoint{ { ServiceID: "a", @@ -87,7 +87,7 @@ func (suite *serviceGCSafepointTestSuite) TestServiceGCSafepoint() { res, err := testDialClient.Get(sspURL) suite.NoError(err) defer res.Body.Close() - listResp := &listServiceGCSafepoint{} + listResp := &ListServiceGCSafepoint{} err = apiutil.ReadJSON(res.Body, listResp) suite.NoError(err) suite.Equal(list, listResp) diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index b7b9dcfb736..89c9ea32f19 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2796,7 +2796,7 @@ func TestReplica(t *testing.T) { re.NoError(tc.addLeaderRegion(2, 1, 2, 3, 4)) region = tc.GetRegion(2) re.NoError(dispatchHeartbeat(co, region, stream)) - region = waitRemovePeer(re, stream, region, 4) + region = waitRemovePeer(re, stream, region, 3) // store3 is down, we should remove it firstly. re.NoError(dispatchHeartbeat(co, region, stream)) waitNoResponse(re, stream) diff --git a/server/config/config_test.go b/server/config/config_test.go index 75e69c26d5c..07cdc966409 100644 --- a/server/config/config_test.go +++ b/server/config/config_test.go @@ -26,6 +26,7 @@ import ( "github.com/BurntSushi/toml" "github.com/spf13/pflag" "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/ratelimit" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/configutil" @@ -479,3 +480,28 @@ func newTestScheduleOption() (*PersistOptions, error) { opt := NewPersistOptions(cfg) return opt, nil } + +func TestRateLimitClone(t *testing.T) { + re := require.New(t) + cfg := &RateLimitConfig{ + EnableRateLimit: defaultEnableRateLimitMiddleware, + LimiterConfig: make(map[string]ratelimit.DimensionConfig), + } + clone := cfg.Clone() + clone.LimiterConfig["test"] = ratelimit.DimensionConfig{ + ConcurrencyLimit: 200, + } + dc := cfg.LimiterConfig["test"] + re.Equal(dc.ConcurrencyLimit, uint64(0)) + + gCfg := &GRPCRateLimitConfig{ + EnableRateLimit: defaultEnableGRPCRateLimitMiddleware, + LimiterConfig: make(map[string]ratelimit.DimensionConfig), + } + gClone := gCfg.Clone() + gClone.LimiterConfig["test"] = ratelimit.DimensionConfig{ + ConcurrencyLimit: 300, + } + gdc := gCfg.LimiterConfig["test"] + re.Equal(gdc.ConcurrencyLimit, uint64(0)) +} diff --git a/server/config/persist_options.go b/server/config/persist_options.go index c0a0ebf5c47..49a44449a22 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -789,11 +789,10 @@ func (o *PersistOptions) Persist(storage endpoint.ConfigStorage) error { }, StoreConfig: *o.GetStoreConfig(), } - err := storage.SaveConfig(cfg) failpoint.Inject("persistFail", func() { - err = errors.New("fail to persist") + failpoint.Return(errors.New("fail to persist")) }) - return err + return storage.SaveConfig(cfg) } // Reload reloads the configuration from the storage. diff --git a/server/config/service_middleware_config.go b/server/config/service_middleware_config.go index ef0b04b2abd..b13e3398ac5 100644 --- a/server/config/service_middleware_config.go +++ b/server/config/service_middleware_config.go @@ -78,7 +78,12 @@ type RateLimitConfig struct { // Clone returns a cloned rate limit config. func (c *RateLimitConfig) Clone() *RateLimitConfig { + m := make(map[string]ratelimit.DimensionConfig, len(c.LimiterConfig)) + for k, v := range c.LimiterConfig { + m[k] = v + } cfg := *c + cfg.LimiterConfig = m return &cfg } @@ -92,6 +97,11 @@ type GRPCRateLimitConfig struct { // Clone returns a cloned rate limit config. func (c *GRPCRateLimitConfig) Clone() *GRPCRateLimitConfig { + m := make(map[string]ratelimit.DimensionConfig, len(c.LimiterConfig)) + for k, v := range c.LimiterConfig { + m[k] = v + } cfg := *c + cfg.LimiterConfig = m return &cfg } diff --git a/server/server.go b/server/server.go index 160609e37a7..9cd7f18578e 100644 --- a/server/server.go +++ b/server/server.go @@ -948,20 +948,7 @@ func (s *Server) GetConfig() *config.Config { if err != nil { return cfg } - payload := make(map[string]interface{}) - for i, sche := range sches { - var config interface{} - err := schedulers.DecodeConfig([]byte(configs[i]), &config) - if err != nil { - log.Error("failed to decode scheduler config", - zap.String("config", configs[i]), - zap.String("scheduler", sche), - errs.ZapError(err)) - continue - } - payload[sche] = config - } - cfg.Schedule.SchedulersPayload = payload + cfg.Schedule.SchedulersPayload = schedulers.ToPayload(sches, configs) return cfg } diff --git a/server/util.go b/server/util.go index 654b424465e..f88d0146a7f 100644 --- a/server/util.go +++ b/server/util.go @@ -17,6 +17,7 @@ package server import ( "context" "net/http" + "net/http/pprof" "path/filepath" "strings" @@ -121,8 +122,13 @@ func combineBuilderServerHTTPService(ctx context.Context, svr *Server, serviceBu userHandlers[pathPrefix] = handler } } + apiService.UseHandler(router) userHandlers[pdAPIPrefix] = apiService + + // fix issue https://github.com/tikv/pd/issues/7253 + // FIXME: remove me after upgrade + userHandlers["/debug/pprof/trace"] = http.HandlerFunc(pprof.Trace) return userHandlers, nil } diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index 5284913813c..fe6ae325056 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -9,8 +9,11 @@ import ( "time" "github.com/pingcap/failpoint" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/core" _ "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1" + "github.com/tikv/pd/pkg/mcs/scheduling/server/config" "github.com/tikv/pd/pkg/schedule/handler" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/storage" @@ -170,6 +173,9 @@ func (suite *apiTestSuite) TestAPIForward() { // "/schedulers", http.MethodGet // "/schedulers/{name}", http.MethodPost // "/schedulers/diagnostic/{name}", http.MethodGet + // "/scheduler-config/", http.MethodGet + // "/scheduler-config/{name}/list", http.MethodGet + // "/scheduler-config/{name}/roles", http.MethodGet // Should not redirect: // "/schedulers", http.MethodPost // "/schedulers/{name}", http.MethodDelete @@ -189,6 +195,24 @@ func (suite *apiTestSuite) TestAPIForward() { testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) suite.NoError(err) + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "scheduler-config"), &resp, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + suite.NoError(err) + re.Contains(resp, "balance-leader-scheduler") + re.Contains(resp, "balance-witness-scheduler") + re.Contains(resp, "balance-hot-region-scheduler") + + schedulers := []string{ + "balance-leader-scheduler", + "balance-witness-scheduler", + "balance-hot-region-scheduler", + } + for _, schedulerName := range schedulers { + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s/%s/%s", urlPrefix, "scheduler-config", schedulerName, "list"), &resp, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + suite.NoError(err) + } + err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "schedulers"), pauseArgs, testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) re.NoError(err) @@ -218,3 +242,131 @@ func (suite *apiTestSuite) TestAPIForward() { testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) } + +func (suite *apiTestSuite) TestConfig() { + checkConfig := func(cluster *tests.TestCluster) { + re := suite.Require() + s := cluster.GetSchedulingPrimaryServer() + testutil.Eventually(re, func() bool { + return s.IsServing() + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + addr := s.GetAddr() + urlPrefix := fmt.Sprintf("%s/scheduling/api/v1/config", addr) + + var cfg config.Config + testutil.ReadGetJSON(re, testDialClient, urlPrefix, &cfg) + suite.Equal(cfg.GetListenAddr(), s.GetConfig().GetListenAddr()) + suite.Equal(cfg.Schedule.LeaderScheduleLimit, s.GetConfig().Schedule.LeaderScheduleLimit) + suite.Equal(cfg.Schedule.EnableCrossTableMerge, s.GetConfig().Schedule.EnableCrossTableMerge) + suite.Equal(cfg.Replication.MaxReplicas, s.GetConfig().Replication.MaxReplicas) + suite.Equal(cfg.Replication.LocationLabels, s.GetConfig().Replication.LocationLabels) + suite.Equal(cfg.DataDir, s.GetConfig().DataDir) + testutil.Eventually(re, func() bool { + // wait for all schedulers to be loaded in scheduling server. + return len(cfg.Schedule.SchedulersPayload) == 5 + }) + suite.Contains(cfg.Schedule.SchedulersPayload, "balance-leader-scheduler") + suite.Contains(cfg.Schedule.SchedulersPayload, "balance-region-scheduler") + suite.Contains(cfg.Schedule.SchedulersPayload, "balance-hot-region-scheduler") + suite.Contains(cfg.Schedule.SchedulersPayload, "balance-witness-scheduler") + suite.Contains(cfg.Schedule.SchedulersPayload, "transfer-witness-leader-scheduler") + } + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInAPIMode(checkConfig) +} + +func TestConfigForward(t *testing.T) { + re := require.New(t) + checkConfigForward := func(cluster *tests.TestCluster) { + sche := cluster.GetSchedulingPrimaryServer() + opts := sche.GetPersistConfig() + var cfg map[string]interface{} + addr := cluster.GetLeaderServer().GetAddr() + urlPrefix := fmt.Sprintf("%s/pd/api/v1/config", addr) + + // Test config forward + // Expect to get same config in scheduling server and api server + testutil.Eventually(re, func() bool { + testutil.ReadGetJSON(re, testDialClient, urlPrefix, &cfg) + re.Equal(cfg["schedule"].(map[string]interface{})["leader-schedule-limit"], + float64(opts.GetLeaderScheduleLimit())) + re.Equal(cfg["replication"].(map[string]interface{})["max-replicas"], + float64(opts.GetReplicationConfig().MaxReplicas)) + schedulers := cfg["schedule"].(map[string]interface{})["schedulers-payload"].(map[string]interface{}) + return len(schedulers) == 5 + }) + + // Test to change config only in scheduling server + // Expect to get new config in scheduling server but not old config in api server + + opts.GetScheduleConfig().LeaderScheduleLimit = 100 + re.Equal(100, int(opts.GetLeaderScheduleLimit())) + testutil.ReadGetJSON(re, testDialClient, urlPrefix, &cfg) + re.Equal(100., cfg["schedule"].(map[string]interface{})["leader-schedule-limit"]) + + opts.GetReplicationConfig().MaxReplicas = 5 + re.Equal(5, int(opts.GetReplicationConfig().MaxReplicas)) + testutil.ReadGetJSON(re, testDialClient, urlPrefix, &cfg) + re.Equal(5., cfg["replication"].(map[string]interface{})["max-replicas"]) + } + env := tests.NewSchedulingTestEnvironment(t) + env.RunTestInAPIMode(checkConfigForward) +} + +func TestAdminRegionCache(t *testing.T) { + re := require.New(t) + checkAdminRegionCache := func(cluster *tests.TestCluster) { + r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetRegionConfVer(100), core.SetRegionVersion(100)) + tests.MustPutRegionInfo(re, cluster, r1) + r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetRegionConfVer(100), core.SetRegionVersion(100)) + tests.MustPutRegionInfo(re, cluster, r2) + r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetRegionConfVer(100), core.SetRegionVersion(100)) + tests.MustPutRegionInfo(re, cluster, r3) + + schedulingServer := cluster.GetSchedulingPrimaryServer() + re.Equal(3, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) + + addr := schedulingServer.GetAddr() + urlPrefix := fmt.Sprintf("%s/scheduling/api/v1/admin/cache/regions", addr) + err := testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "30"), testutil.StatusOK(re)) + re.NoError(err) + re.Equal(2, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) + + err = testutil.CheckDelete(testDialClient, urlPrefix, testutil.StatusOK(re)) + re.NoError(err) + re.Equal(0, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) + } + env := tests.NewSchedulingTestEnvironment(t) + env.RunTestInAPIMode(checkAdminRegionCache) +} + +func TestAdminRegionCacheForward(t *testing.T) { + re := require.New(t) + checkAdminRegionCache := func(cluster *tests.TestCluster) { + r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetRegionConfVer(100), core.SetRegionVersion(100)) + tests.MustPutRegionInfo(re, cluster, r1) + r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetRegionConfVer(100), core.SetRegionVersion(100)) + tests.MustPutRegionInfo(re, cluster, r2) + r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetRegionConfVer(100), core.SetRegionVersion(100)) + tests.MustPutRegionInfo(re, cluster, r3) + + apiServer := cluster.GetLeaderServer().GetServer() + schedulingServer := cluster.GetSchedulingPrimaryServer() + re.Equal(3, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) + re.Equal(3, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{}).Count) + + addr := cluster.GetLeaderServer().GetAddr() + urlPrefix := fmt.Sprintf("%s/pd/api/v1/admin/cache/region", addr) + err := testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "30"), testutil.StatusOK(re)) + re.NoError(err) + re.Equal(2, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) + re.Equal(2, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{}).Count) + + err = testutil.CheckDelete(testDialClient, urlPrefix+"s", testutil.StatusOK(re)) + re.NoError(err) + re.Equal(0, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) + re.Equal(0, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{}).Count) + } + env := tests.NewSchedulingTestEnvironment(t) + env.RunTestInAPIMode(checkAdminRegionCache) +} diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 85cf84361b4..a359e1d023a 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -509,6 +509,6 @@ func checkOperatorFail(re *require.Assertions, oc *operator.Controller, op *oper func waitSyncFinish(re *require.Assertions, tc *tests.TestSchedulingCluster, typ storelimit.Type, expectedLimit float64) { testutil.Eventually(re, func() bool { - return tc.GetPrimaryServer().GetPersistConfig().GetStoreLimitByType(2, typ) == expectedLimit + return tc.GetPrimaryServer().GetCluster().GetSharedConfig().GetStoreLimitByType(2, typ) == expectedLimit }) } diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 6ed0841bf74..26d70bb955f 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -25,8 +25,10 @@ import ( "github.com/coreos/go-semver/semver" "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" @@ -48,24 +50,29 @@ func (t *testCase) judge(re *require.Assertions, scheduleConfigs ...*sc.Schedule } } -func TestConfig(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) - re.NoError(err) - err = cluster.RunInitialServers() - re.NoError(err) - cluster.WaitLeader() - pdAddr := cluster.GetConfig().GetClientURL() +type configTestSuite struct { + suite.Suite +} + +func TestConfigTestSuite(t *testing.T) { + suite.Run(t, new(configTestSuite)) +} + +func (suite *configTestSuite) TestConfig() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkConfig) +} + +func (suite *configTestSuite) checkConfig(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() cmd := pdctlCmd.GetRootCmd() store := &metapb.Store{ Id: 1, State: metapb.StoreState_Up, } - leaderServer := cluster.GetLeaderServer() - re.NoError(leaderServer.BootstrapCluster()) svr := leaderServer.GetServer() tests.MustPutStore(re, cluster, store) defer cluster.Destroy() @@ -283,16 +290,15 @@ func TestConfig(t *testing.T) { re.Contains(string(output), "is invalid") } -func TestPlacementRules(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) - re.NoError(err) - err = cluster.RunInitialServers() - re.NoError(err) - cluster.WaitLeader() - pdAddr := cluster.GetConfig().GetClientURL() +func (suite *configTestSuite) TestPlacementRules() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkPlacementRules) +} + +func (suite *configTestSuite) checkPlacementRules(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() cmd := pdctlCmd.GetRootCmd() store := &metapb.Store{ @@ -300,8 +306,6 @@ func TestPlacementRules(t *testing.T) { State: metapb.StoreState_Up, LastHeartbeat: time.Now().UnixNano(), } - leaderServer := cluster.GetLeaderServer() - re.NoError(leaderServer.BootstrapCluster()) tests.MustPutStore(re, cluster, store) defer cluster.Destroy() @@ -380,16 +384,15 @@ func TestPlacementRules(t *testing.T) { re.Equal([2]string{"pd", "test1"}, rules[0].Key()) } -func TestPlacementRuleGroups(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) - re.NoError(err) - err = cluster.RunInitialServers() - re.NoError(err) - cluster.WaitLeader() - pdAddr := cluster.GetConfig().GetClientURL() +func (suite *configTestSuite) TestPlacementRuleGroups() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkPlacementRuleGroups) +} + +func (suite *configTestSuite) checkPlacementRuleGroups(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() cmd := pdctlCmd.GetRootCmd() store := &metapb.Store{ @@ -397,8 +400,6 @@ func TestPlacementRuleGroups(t *testing.T) { State: metapb.StoreState_Up, LastHeartbeat: time.Now().UnixNano(), } - leaderServer := cluster.GetLeaderServer() - re.NoError(leaderServer.BootstrapCluster()) tests.MustPutStore(re, cluster, store) defer cluster.Destroy() @@ -454,16 +455,15 @@ func TestPlacementRuleGroups(t *testing.T) { re.Contains(string(output), "404") } -func TestPlacementRuleBundle(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) - re.NoError(err) - err = cluster.RunInitialServers() - re.NoError(err) - cluster.WaitLeader() - pdAddr := cluster.GetConfig().GetClientURL() +func (suite *configTestSuite) TestPlacementRuleBundle() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkPlacementRuleBundle) +} + +func (suite *configTestSuite) checkPlacementRuleBundle(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() cmd := pdctlCmd.GetRootCmd() store := &metapb.Store{ @@ -471,8 +471,6 @@ func TestPlacementRuleBundle(t *testing.T) { State: metapb.StoreState_Up, LastHeartbeat: time.Now().UnixNano(), } - leaderServer := cluster.GetLeaderServer() - re.NoError(leaderServer.BootstrapCluster()) tests.MustPutStore(re, cluster, store) defer cluster.Destroy() @@ -648,24 +646,21 @@ func TestReplicationMode(t *testing.T) { check() } -func TestUpdateDefaultReplicaConfig(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) - re.NoError(err) - err = cluster.RunInitialServers() - re.NoError(err) - cluster.WaitLeader() - pdAddr := cluster.GetConfig().GetClientURL() +func (suite *configTestSuite) TestUpdateDefaultReplicaConfig() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkUpdateDefaultReplicaConfig) +} + +func (suite *configTestSuite) checkUpdateDefaultReplicaConfig(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() cmd := pdctlCmd.GetRootCmd() store := &metapb.Store{ Id: 1, State: metapb.StoreState_Up, } - leaderServer := cluster.GetLeaderServer() - re.NoError(leaderServer.BootstrapCluster()) tests.MustPutStore(re, cluster, store) defer cluster.Destroy() @@ -675,7 +670,9 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { re.NoError(err) replicationCfg := sc.ReplicationConfig{} re.NoError(json.Unmarshal(output, &replicationCfg)) - re.Equal(expect, replicationCfg.MaxReplicas) + testutil.Eventually(re, func() bool { // wait for the config to be synced to the scheduling server + return replicationCfg.MaxReplicas == expect + }) } checkLocationLabels := func(expect int) { @@ -684,7 +681,9 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { re.NoError(err) replicationCfg := sc.ReplicationConfig{} re.NoError(json.Unmarshal(output, &replicationCfg)) - re.Len(replicationCfg.LocationLabels, expect) + testutil.Eventually(re, func() bool { // wait for the config to be synced to the scheduling server + return len(replicationCfg.LocationLabels) == expect + }) } checkIsolationLevel := func(expect string) { @@ -693,7 +692,9 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { re.NoError(err) replicationCfg := sc.ReplicationConfig{} re.NoError(json.Unmarshal(output, &replicationCfg)) - re.Equal(replicationCfg.IsolationLevel, expect) + testutil.Eventually(re, func() bool { // wait for the config to be synced to the scheduling server + return replicationCfg.IsolationLevel == expect + }) } checkRuleCount := func(expect int) { @@ -702,7 +703,9 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { re.NoError(err) rule := placement.Rule{} re.NoError(json.Unmarshal(output, &rule)) - re.Equal(expect, rule.Count) + testutil.Eventually(re, func() bool { // wait for the config to be synced to the scheduling server + return rule.Count == expect + }) } checkRuleLocationLabels := func(expect int) { @@ -711,7 +714,9 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { re.NoError(err) rule := placement.Rule{} re.NoError(json.Unmarshal(output, &rule)) - re.Len(rule.LocationLabels, expect) + testutil.Eventually(re, func() bool { // wait for the config to be synced to the scheduling server + return len(rule.LocationLabels) == expect + }) } checkRuleIsolationLevel := func(expect string) { @@ -720,7 +725,9 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { re.NoError(err) rule := placement.Rule{} re.NoError(json.Unmarshal(output, &rule)) - re.Equal(rule.IsolationLevel, expect) + testutil.Eventually(re, func() bool { // wait for the config to be synced to the scheduling server + return rule.IsolationLevel == expect + }) } // update successfully when placement rules is not enabled. @@ -764,7 +771,7 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { checkRuleIsolationLevel("host") // update unsuccessfully when many rule exists. - fname := t.TempDir() + fname := suite.T().TempDir() rules := []placement.Rule{ { GroupID: "pd", @@ -791,16 +798,15 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { checkRuleIsolationLevel("host") } -func TestPDServerConfig(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cluster, err := tests.NewTestCluster(ctx, 1) - re.NoError(err) - err = cluster.RunInitialServers() - re.NoError(err) - cluster.WaitLeader() - pdAddr := cluster.GetConfig().GetClientURL() +func (suite *configTestSuite) TestPDServerConfig() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkPDServerConfig) +} + +func (suite *configTestSuite) checkPDServerConfig(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() cmd := pdctlCmd.GetRootCmd() store := &metapb.Store{ @@ -808,8 +814,6 @@ func TestPDServerConfig(t *testing.T) { State: metapb.StoreState_Up, LastHeartbeat: time.Now().UnixNano(), } - leaderServer := cluster.GetLeaderServer() - re.NoError(leaderServer.BootstrapCluster()) tests.MustPutStore(re, cluster, store) defer cluster.Destroy() diff --git a/tests/pdctl/operator/operator_test.go b/tests/pdctl/operator/operator_test.go index 1752c28a3c0..8bb034993fa 100644 --- a/tests/pdctl/operator/operator_test.go +++ b/tests/pdctl/operator/operator_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" @@ -221,6 +222,13 @@ func (suite *operatorTestSuite) checkOperator(cluster *tests.TestCluster) { _, err = pdctl.ExecuteCommand(cmd, "config", "set", "enable-placement-rules", "true") re.NoError(err) + if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { + // wait for the scheduler server to update the config + testutil.Eventually(re, func() bool { + return sche.GetCluster().GetCheckerConfig().IsPlacementRulesEnabled() + }) + } + output, err = pdctl.ExecuteCommand(cmd, "operator", "add", "transfer-region", "1", "2", "3") re.NoError(err) re.Contains(string(output), "not supported") diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index b3d9f356ad1..cd599405124 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -17,6 +17,8 @@ package scheduler_test import ( "context" "encoding/json" + "reflect" + "strings" "testing" "time" @@ -43,9 +45,7 @@ func TestSchedulerTestSuite(t *testing.T) { func (suite *schedulerTestSuite) TestScheduler() { env := tests.NewSchedulingTestEnvironment(suite.T()) - // Fixme: use RunTestInTwoModes when sync deleted scheduler is supported. - env.RunTestInPDMode(suite.checkScheduler) - env.RunTestInTwoModes(suite.checkSchedulerDiagnostic) + env.RunTestInTwoModes(suite.checkScheduler) } func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { @@ -86,17 +86,27 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { if args != nil { mustExec(re, cmd, args, nil) } - var schedulers []string - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, &schedulers) - for _, scheduler := range schedulers { - re.True(expected[scheduler]) - } + testutil.Eventually(re, func() bool { + var schedulers []string + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, &schedulers) + if len(schedulers) != len(expected) { + return false + } + for _, scheduler := range schedulers { + if _, ok := expected[scheduler]; !ok { + return false + } + } + return true + }) } checkSchedulerConfigCommand := func(expectedConfig map[string]interface{}, schedulerName string) { - configInfo := make(map[string]interface{}) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName}, &configInfo) - re.Equal(expectedConfig, configInfo) + testutil.Eventually(re, func() bool { + configInfo := make(map[string]interface{}) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName}, &configInfo) + return reflect.DeepEqual(expectedConfig, configInfo) + }) } leaderServer := cluster.GetLeaderServer() @@ -106,7 +116,6 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { // note: because pdqsort is a unstable sort algorithm, set ApproximateSize for this region. tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) - time.Sleep(3 * time.Second) // scheduler show command expected := map[string]bool{ @@ -120,7 +129,6 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { // scheduler delete command args := []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"} - time.Sleep(10 * time.Second) expected = map[string]bool{ "balance-leader-scheduler": true, "balance-hot-region-scheduler": true, @@ -160,8 +168,11 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { checkSchedulerCommand(args, expected) // check update success - expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}, "3": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} - checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + // FIXME: remove this check after scheduler config is updated + if cluster.GetSchedulingPrimaryServer() == nil && schedulers[idx] == "grant-leader-scheduler" { + expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}, "3": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} + checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + } // scheduler delete command args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx]} @@ -261,26 +272,33 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler", "set", "2", "1,2,3"}, nil) expected3["store-leader-id"] = float64(2) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) - re.Equal(expected3, conf3) + // FIXME: remove this check after scheduler config is updated + if cluster.GetSchedulingPrimaryServer() == nil { // "grant-hot-region-scheduler" + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) + re.Equal(expected3, conf3) + } - // test balance region config + // test remove and add scheduler echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) re.Contains(echo, "Success!") echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) re.Contains(echo, "Success!") echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) re.NotContains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) + re.Contains(echo, "Success!") echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}, nil) re.Contains(echo, "Success!") echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) re.Contains(echo, "Success!") echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) re.Contains(echo, "404") + testutil.Eventually(re, func() bool { // wait for removed scheduler to be synced to scheduling server. + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler"}, nil) + return strings.Contains(echo, "[404] scheduler not found") + }) // test hot region config - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler"}, nil) - re.Contains(echo, "[404] scheduler not found") expected1 := map[string]interface{}{ "min-hot-byte-rate": float64(100), "min-hot-key-rate": float64(10), @@ -311,74 +329,77 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "src-tolerance-ratio", "1.02"}, nil) expected1["src-tolerance-ratio"] = 1.02 var conf1 map[string]interface{} - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,key"}, nil) - expected1["read-priorities"] = []interface{}{"byte", "key"} - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key"}, nil) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,byte"}, nil) - expected1["read-priorities"] = []interface{}{"key", "byte"} - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "foo,bar"}, nil) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", ""}, nil) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key"}, nil) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,byte"}, nil) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key,byte"}, nil) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - - // write-priorities is divided into write-leader-priorities and write-peer-priorities - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v0"}, nil) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - expected1["rank-formula-version"] = "v2" - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v2"}, nil) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - expected1["rank-formula-version"] = "v1" - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v1"}, nil) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - - expected1["forbid-rw-type"] = "read" - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "forbid-rw-type", "read"}, nil) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(expected1, conf1) - - // test compatibility - re.Equal("2.0.0", leaderServer.GetClusterVersion().String()) - for _, store := range stores { - version := versioninfo.HotScheduleWithQuery - store.Version = versioninfo.MinSupportedVersion(version).String() - tests.MustPutStore(re, cluster, store) + // FIXME: remove this check after scheduler config is updated + if cluster.GetSchedulingPrimaryServer() == nil { // "balance-hot-region-scheduler" + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,key"}, nil) + expected1["read-priorities"] = []interface{}{"byte", "key"} + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,byte"}, nil) + expected1["read-priorities"] = []interface{}{"key", "byte"} + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "foo,bar"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", ""}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,byte"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key,byte"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + + // write-priorities is divided into write-leader-priorities and write-peer-priorities + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v0"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + expected1["rank-formula-version"] = "v2" + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v2"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + expected1["rank-formula-version"] = "v1" + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v1"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + + expected1["forbid-rw-type"] = "read" + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "forbid-rw-type", "read"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(expected1, conf1) + + // test compatibility + re.Equal("2.0.0", leaderServer.GetClusterVersion().String()) + for _, store := range stores { + version := versioninfo.HotScheduleWithQuery + store.Version = versioninfo.MinSupportedVersion(version).String() + tests.MustPutStore(re, cluster, store) + } + re.Equal("5.2.0", leaderServer.GetClusterVersion().String()) + // After upgrading, we should not use query. + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(conf1["read-priorities"], []interface{}{"key", "byte"}) + // cannot set qps as write-peer-priorities + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil) + re.Contains(echo, "query is not allowed to be set in priorities for write-peer-priorities") + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + re.Equal(conf1["write-peer-priorities"], []interface{}{"byte", "key"}) } - re.Equal("5.2.0", leaderServer.GetClusterVersion().String()) - // After upgrading, we should not use query. - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(conf1["read-priorities"], []interface{}{"key", "byte"}) - // cannot set qps as write-peer-priorities - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil) - re.Contains(echo, "query is not allowed to be set in priorities for write-peer-priorities") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) - re.Equal(conf1["write-peer-priorities"], []interface{}{"byte", "key"}) // test remove and add echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-hot-region-scheduler"}, nil) @@ -392,8 +413,10 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "show"}, &conf) re.Equal(4., conf["batch"]) mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "set", "batch", "3"}, nil) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler"}, &conf1) - re.Equal(3., conf1["batch"]) + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler"}, &conf1) + return conf1["batch"] == 3. + }) echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil) re.NotContains(echo, "Success!") echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-leader-scheduler"}, nil) @@ -412,24 +435,33 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { for _, schedulerName := range evictSlownessSchedulers { echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", schedulerName}, nil) re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - re.Contains(echo, schedulerName) + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return strings.Contains(echo, schedulerName) + }) echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "set", "recovery-duration", "100"}, nil) re.Contains(echo, "Success!") conf = make(map[string]interface{}) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "show"}, &conf) - re.Equal(100., conf["recovery-duration"]) + // FIXME: remove this check after scheduler config is updated + if cluster.GetSchedulingPrimaryServer() == nil && schedulerName == "evict-slow-store-scheduler" { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "show"}, &conf) + re.Equal(100., conf["recovery-duration"]) + } echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", schedulerName}, nil) re.Contains(echo, "Success!") - echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) - re.NotContains(echo, schedulerName) + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return !strings.Contains(echo, schedulerName) + }) } // test show scheduler with paused and disabled status. checkSchedulerWithStatusCommand := func(status string, expected []string) { - var schedulers []string - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show", "--status", status}, &schedulers) - re.Equal(expected, schedulers) + testutil.Eventually(re, func() bool { + var schedulers []string + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show", "--status", status}, &schedulers) + return reflect.DeepEqual(expected, schedulers) + }) } mustUsage([]string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler"}) @@ -463,6 +495,11 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { checkSchedulerWithStatusCommand("disabled", nil) } +func (suite *schedulerTestSuite) TestSchedulerDiagnostic() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkSchedulerDiagnostic) +} + func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *tests.TestCluster) { re := suite.Require() pdAddr := cluster.GetConfig().GetClientURL() @@ -472,10 +509,8 @@ func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *tests.TestClu result := make(map[string]interface{}) testutil.Eventually(re, func() bool { mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", schedulerName}, &result) - return len(result) != 0 + return len(result) != 0 && expectedStatus == result["status"] && expectedSummary == result["summary"] }, testutil.WithTickInterval(50*time.Millisecond)) - re.Equal(expectedStatus, result["status"]) - re.Equal(expectedSummary, result["summary"]) } stores := []*metapb.Store{ @@ -506,18 +541,14 @@ func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *tests.TestClu // note: because pdqsort is a unstable sort algorithm, set ApproximateSize for this region. tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) - time.Sleep(3 * time.Second) echo := mustExec(re, cmd, []string{"-u", pdAddr, "config", "set", "enable-diagnostic", "true"}, nil) re.Contains(echo, "Success!") checkSchedulerDescribeCommand("balance-region-scheduler", "pending", "1 store(s) RegionNotMatchRule; ") // scheduler delete command - // Fixme: use RunTestInTwoModes when sync deleted scheduler is supported. - if sche := cluster.GetSchedulingPrimaryServer(); sche == nil { - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) - checkSchedulerDescribeCommand("balance-region-scheduler", "disabled", "") - } + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) + checkSchedulerDescribeCommand("balance-region-scheduler", "disabled", "") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) @@ -530,7 +561,7 @@ func mustExec(re *require.Assertions, cmd *cobra.Command, args []string, v inter if v == nil { return string(output) } - re.NoError(json.Unmarshal(output, v)) + re.NoError(json.Unmarshal(output, v), string(output)) return "" } diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index 64ed5114646..908daf21aac 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -15,6 +15,7 @@ package api import ( + "encoding/json" "errors" "fmt" "net/http" @@ -50,7 +51,7 @@ func TestOperatorTestSuite(t *testing.T) { suite.Run(t, new(operatorTestSuite)) } -func (suite *operatorTestSuite) TestOperator() { +func (suite *operatorTestSuite) TestAddRemovePeer() { opts := []tests.ConfigOption{ func(conf *config.Config, serverName string) { conf.Replication.MaxReplicas = 1 @@ -58,21 +59,11 @@ func (suite *operatorTestSuite) TestOperator() { } env := tests.NewSchedulingTestEnvironment(suite.T(), opts...) env.RunTestInTwoModes(suite.checkAddRemovePeer) - - env = tests.NewSchedulingTestEnvironment(suite.T(), opts...) - env.RunTestInTwoModes(suite.checkMergeRegionOperator) - - opts = []tests.ConfigOption{ - func(conf *config.Config, serverName string) { - conf.Replication.MaxReplicas = 3 - }, - } - env = tests.NewSchedulingTestEnvironment(suite.T(), opts...) - env.RunTestInTwoModes(suite.checkTransferRegionWithPlacementRule) } func (suite *operatorTestSuite) checkAddRemovePeer(cluster *tests.TestCluster) { re := suite.Require() + suite.pauseRuleChecker(cluster) stores := []*metapb.Store{ { Id: 1, @@ -106,6 +97,8 @@ func (suite *operatorTestSuite) checkAddRemovePeer(cluster *tests.TestCluster) { ConfVer: 1, Version: 1, }, + StartKey: []byte("a"), + EndKey: []byte("b"), } regionInfo := core.NewRegionInfo(region, peer1) tests.MustPutRegionInfo(re, cluster, regionInfo) @@ -174,8 +167,19 @@ func (suite *operatorTestSuite) checkAddRemovePeer(cluster *tests.TestCluster) { suite.NoError(err) } +func (suite *operatorTestSuite) TestMergeRegionOperator() { + opts := []tests.ConfigOption{ + func(conf *config.Config, serverName string) { + conf.Replication.MaxReplicas = 1 + }, + } + env := tests.NewSchedulingTestEnvironment(suite.T(), opts...) + env.RunTestInTwoModes(suite.checkMergeRegionOperator) +} + func (suite *operatorTestSuite) checkMergeRegionOperator(cluster *tests.TestCluster) { re := suite.Require() + suite.pauseRuleChecker(cluster) r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1)) tests.MustPutRegionInfo(re, cluster, r1) r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2), core.SetRegionVersion(3)) @@ -199,8 +203,19 @@ func (suite *operatorTestSuite) checkMergeRegionOperator(cluster *tests.TestClus suite.NoError(err) } +func (suite *operatorTestSuite) TestTransferRegionWithPlacementRule() { + opts := []tests.ConfigOption{ + func(conf *config.Config, serverName string) { + conf.Replication.MaxReplicas = 3 + }, + } + env := tests.NewSchedulingTestEnvironment(suite.T(), opts...) + env.RunTestInTwoModes(suite.checkTransferRegionWithPlacementRule) +} + func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *tests.TestCluster) { re := suite.Require() + suite.pauseRuleChecker(cluster) stores := []*metapb.Store{ { Id: 1, @@ -239,6 +254,8 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te ConfVer: 1, Version: 1, }, + StartKey: []byte("a"), + EndKey: []byte("b"), } tests.MustPutRegionInfo(re, cluster, core.NewRegionInfo(region, peer1)) @@ -408,13 +425,24 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te }, } svr := cluster.GetLeaderServer() + url := fmt.Sprintf("%s/pd/api/v1/config", svr.GetAddr()) for _, testCase := range testCases { suite.T().Log(testCase.name) - // TODO: remove this after we can sync this config to all servers. - if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { - sche.GetCluster().GetSchedulerConfig().SetPlacementRuleEnabled(testCase.placementRuleEnable) + data := make(map[string]interface{}) + if testCase.placementRuleEnable { + data["enable-placement-rules"] = "true" } else { - svr.GetRaftCluster().GetOpts().SetPlacementRuleEnabled(testCase.placementRuleEnable) + data["enable-placement-rules"] = "false" + } + reqData, e := json.Marshal(data) + re.NoError(e) + err := tu.CheckPostJSON(testDialClient, url, reqData, tu.StatusOK(re)) + re.NoError(err) + if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { + // wait for the scheduler server to update the config + tu.Eventually(re, func() bool { + return sche.GetCluster().GetCheckerConfig().IsPlacementRulesEnabled() == testCase.placementRuleEnable + }) } manager := svr.GetRaftCluster().GetRuleManager() if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { @@ -436,7 +464,6 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te err = manager.DeleteRule("pd", "default") suite.NoError(err) } - var err error if testCase.expectedError == nil { err = tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/operators", urlPrefix), testCase.input, tu.StatusOK(re)) } else { @@ -457,3 +484,17 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te suite.NoError(err) } } + +// pauseRuleChecker will pause rule checker to avoid unexpected operator. +func (suite *operatorTestSuite) pauseRuleChecker(cluster *tests.TestCluster) { + re := suite.Require() + checkerName := "rule" + addr := cluster.GetLeaderServer().GetAddr() + resp := make(map[string]interface{}) + url := fmt.Sprintf("%s/pd/api/v1/checker/%s", addr, checkerName) + err := tu.CheckPostJSON(testDialClient, url, []byte(`{"delay":1000}`), tu.StatusOK(re)) + re.NoError(err) + err = tu.ReadGetJSON(re, testDialClient, url, &resp) + re.NoError(err) + re.True(resp["paused"].(bool)) +} diff --git a/tests/server/api/scheduler_test.go b/tests/server/api/scheduler_test.go index 95c4d936a8c..38f691a4eda 100644 --- a/tests/server/api/scheduler_test.go +++ b/tests/server/api/scheduler_test.go @@ -23,8 +23,10 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/slice" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server" "github.com/tikv/pd/tests" @@ -40,14 +42,9 @@ func TestScheduleTestSuite(t *testing.T) { suite.Run(t, new(scheduleTestSuite)) } -func (suite *scheduleTestSuite) TestScheduler() { - // Fixme: use RunTestInTwoModes when sync deleted scheduler is supported. +func (suite *scheduleTestSuite) TestOriginAPI() { env := tests.NewSchedulingTestEnvironment(suite.T()) - env.RunTestInPDMode(suite.checkOriginAPI) - env = tests.NewSchedulingTestEnvironment(suite.T()) - env.RunTestInPDMode(suite.checkAPI) - env = tests.NewSchedulingTestEnvironment(suite.T()) - env.RunTestInPDMode(suite.checkDisable) + env.RunTestInTwoModes(suite.checkOriginAPI) } func (suite *scheduleTestSuite) checkOriginAPI(cluster *tests.TestCluster) { @@ -71,7 +68,7 @@ func (suite *scheduleTestSuite) checkOriginAPI(cluster *tests.TestCluster) { re := suite.Require() suite.NoError(tu.CheckPostJSON(testDialClient, urlPrefix, body, tu.StatusOK(re))) - suite.Len(suite.getSchedulers(urlPrefix), 1) + suite.assertSchedulerExists(re, urlPrefix, "evict-leader-scheduler") resp := make(map[string]interface{}) listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, "evict-leader-scheduler") suite.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) @@ -83,20 +80,20 @@ func (suite *scheduleTestSuite) checkOriginAPI(cluster *tests.TestCluster) { suite.NoError(err) suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/schedulers/persistFail", "return(true)")) suite.NoError(tu.CheckPostJSON(testDialClient, urlPrefix, body, tu.StatusNotOK(re))) - suite.Len(suite.getSchedulers(urlPrefix), 1) + suite.assertSchedulerExists(re, urlPrefix, "evict-leader-scheduler") resp = make(map[string]interface{}) suite.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) suite.Len(resp["store-id-ranges"], 1) suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/schedulers/persistFail")) suite.NoError(tu.CheckPostJSON(testDialClient, urlPrefix, body, tu.StatusOK(re))) - suite.Len(suite.getSchedulers(urlPrefix), 1) + suite.assertSchedulerExists(re, urlPrefix, "evict-leader-scheduler") resp = make(map[string]interface{}) suite.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) suite.Len(resp["store-id-ranges"], 2) deleteURL := fmt.Sprintf("%s/%s", urlPrefix, "evict-leader-scheduler-1") err = tu.CheckDelete(testDialClient, deleteURL, tu.StatusOK(re)) suite.NoError(err) - suite.Len(suite.getSchedulers(urlPrefix), 1) + suite.assertSchedulerExists(re, urlPrefix, "evict-leader-scheduler") resp1 := make(map[string]interface{}) suite.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp1)) suite.Len(resp1["store-id-ranges"], 1) @@ -104,16 +101,21 @@ func (suite *scheduleTestSuite) checkOriginAPI(cluster *tests.TestCluster) { suite.NoError(failpoint.Enable("github.com/tikv/pd/server/config/persistFail", "return(true)")) err = tu.CheckDelete(testDialClient, deleteURL, tu.Status(re, http.StatusInternalServerError)) suite.NoError(err) - suite.Len(suite.getSchedulers(urlPrefix), 1) + suite.assertSchedulerExists(re, urlPrefix, "evict-leader-scheduler") suite.NoError(failpoint.Disable("github.com/tikv/pd/server/config/persistFail")) err = tu.CheckDelete(testDialClient, deleteURL, tu.StatusOK(re)) suite.NoError(err) - suite.Empty(suite.getSchedulers(urlPrefix)) + suite.assertNoScheduler(re, urlPrefix, "evict-leader-scheduler") suite.NoError(tu.CheckGetJSON(testDialClient, listURL, nil, tu.Status(re, http.StatusNotFound))) err = tu.CheckDelete(testDialClient, deleteURL, tu.Status(re, http.StatusNotFound)) suite.NoError(err) } +func (suite *scheduleTestSuite) TestAPI() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkAPI) +} + func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { re := suite.Require() leaderAddr := cluster.GetLeaderServer().GetAddr() @@ -152,9 +154,12 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { body, err := json.Marshal(dataMap) suite.NoError(err) suite.NoError(tu.CheckPostJSON(testDialClient, updateURL, body, tu.StatusOK(re))) - resp = make(map[string]interface{}) - suite.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) - suite.Equal(3.0, resp["batch"]) + tu.Eventually(re, func() bool { // wait for scheduling server to be synced. + resp = make(map[string]interface{}) + suite.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) + return resp["batch"] == 3.0 + }) + // update again err = tu.CheckPostJSON(testDialClient, updateURL, body, tu.StatusOK(re), @@ -230,23 +235,27 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { suite.NoError(tu.CheckPostJSON(testDialClient, updateURL, body, tu.StatusOK(re))) resp = make(map[string]interface{}) suite.NoError(tu.ReadGetJSON(re, testDialClient, listURL, &resp)) - for key := range expectMap { - suite.Equal(expectMap[key], resp[key], "key %s", key) + // FIXME: remove this check after scheduler config is updated + if cluster.GetSchedulingPrimaryServer() == nil { // "balance-hot-region-scheduler" + for key := range expectMap { + suite.Equal(expectMap[key], resp[key], "key %s", key) + } + + // update again + err = tu.CheckPostJSON(testDialClient, updateURL, body, + tu.StatusOK(re), + tu.StringEqual(re, "Config is the same with origin, so do nothing.")) + suite.NoError(err) + // config item not found + dataMap = map[string]interface{}{} + dataMap["error"] = 3 + body, err = json.Marshal(dataMap) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, updateURL, body, + tu.Status(re, http.StatusBadRequest), + tu.StringEqual(re, "Config item is not found.")) + suite.NoError(err) } - // update again - err = tu.CheckPostJSON(testDialClient, updateURL, body, - tu.StatusOK(re), - tu.StringEqual(re, "Config is the same with origin, so do nothing.")) - suite.NoError(err) - // config item not found - dataMap = map[string]interface{}{} - dataMap["error"] = 3 - body, err = json.Marshal(dataMap) - suite.NoError(err) - err = tu.CheckPostJSON(testDialClient, updateURL, body, - tu.Status(re, http.StatusBadRequest), - tu.StringEqual(re, "Config item is not found.")) - suite.NoError(err) }, }, { @@ -468,6 +477,7 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { testCase.extraTestFunc(testCase.createdName) } suite.deleteScheduler(urlPrefix, testCase.createdName) + suite.assertNoScheduler(re, urlPrefix, testCase.createdName) } // test pause and resume all schedulers. @@ -482,6 +492,7 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { body, err := json.Marshal(input) suite.NoError(err) suite.addScheduler(urlPrefix, body) + suite.assertSchedulerExists(re, urlPrefix, testCase.createdName) // wait for scheduler to be synced. if testCase.extraTestFunc != nil { testCase.extraTestFunc(testCase.createdName) } @@ -545,9 +556,15 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { createdName = testCase.name } suite.deleteScheduler(urlPrefix, createdName) + suite.assertNoScheduler(re, urlPrefix, createdName) } } +func (suite *scheduleTestSuite) TestDisable() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkDisable) +} + func (suite *scheduleTestSuite) checkDisable(cluster *tests.TestCluster) { re := suite.Require() leaderAddr := cluster.GetLeaderServer().GetAddr() @@ -581,16 +598,8 @@ func (suite *scheduleTestSuite) checkDisable(cluster *tests.TestCluster) { err = tu.CheckPostJSON(testDialClient, u, body, tu.StatusOK(re)) suite.NoError(err) - var schedulers []string - err = tu.ReadGetJSON(re, testDialClient, urlPrefix, &schedulers) - suite.NoError(err) - suite.Len(schedulers, 1) - suite.Equal(name, schedulers[0]) - - err = tu.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s?status=disabled", urlPrefix), &schedulers) - suite.NoError(err) - suite.Len(schedulers, 1) - suite.Equal(name, schedulers[0]) + suite.assertNoScheduler(re, urlPrefix, name) + suite.assertSchedulerExists(re, fmt.Sprintf("%s?status=disabled", urlPrefix), name) // reset schedule config scheduleConfig.Schedulers = originSchedulers @@ -600,6 +609,7 @@ func (suite *scheduleTestSuite) checkDisable(cluster *tests.TestCluster) { suite.NoError(err) suite.deleteScheduler(urlPrefix, name) + suite.assertNoScheduler(re, urlPrefix, name) } func (suite *scheduleTestSuite) addScheduler(urlPrefix string, body []byte) { @@ -614,12 +624,17 @@ func (suite *scheduleTestSuite) deleteScheduler(urlPrefix string, createdName st } func (suite *scheduleTestSuite) testPauseOrResume(urlPrefix string, name, createdName string, body []byte) { + re := suite.Require() if createdName == "" { createdName = name } - re := suite.Require() - err := tu.CheckPostJSON(testDialClient, urlPrefix, body, tu.StatusOK(re)) - suite.NoError(err) + var schedulers []string + tu.ReadGetJSON(suite.Require(), testDialClient, urlPrefix, &schedulers) + if !slice.Contains(schedulers, createdName) { + err := tu.CheckPostJSON(testDialClient, urlPrefix, body, tu.StatusOK(re)) + re.NoError(err) + } + suite.assertSchedulerExists(re, urlPrefix, createdName) // wait for scheduler to be synced. // test pause. input := make(map[string]interface{}) @@ -655,9 +670,20 @@ func (suite *scheduleTestSuite) testPauseOrResume(urlPrefix string, name, create suite.False(isPaused) } -func (suite *scheduleTestSuite) getSchedulers(urlPrefix string) (resp []string) { - tu.ReadGetJSON(suite.Require(), testDialClient, urlPrefix, &resp) - return +func (suite *scheduleTestSuite) assertSchedulerExists(re *require.Assertions, urlPrefix string, scheduler string) { + var schedulers []string + tu.Eventually(re, func() bool { + tu.ReadGetJSON(suite.Require(), testDialClient, urlPrefix, &schedulers) + return slice.Contains(schedulers, scheduler) + }) +} + +func (suite *scheduleTestSuite) assertNoScheduler(re *require.Assertions, urlPrefix string, scheduler string) { + var schedulers []string + tu.Eventually(re, func() bool { + tu.ReadGetJSON(suite.Require(), testDialClient, urlPrefix, &schedulers) + return !slice.Contains(schedulers, scheduler) + }) } func (suite *scheduleTestSuite) isSchedulerPaused(urlPrefix, name string) bool { diff --git a/tests/server/config/config_test.go b/tests/server/config/config_test.go index 1b2178bde33..8d8cf40e692 100644 --- a/tests/server/config/config_test.go +++ b/tests/server/config/config_test.go @@ -18,17 +18,25 @@ import ( "bytes" "context" "encoding/json" + "fmt" "net/http" "testing" + "time" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/ratelimit" + sc "github.com/tikv/pd/pkg/schedule/config" + tu "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/utils/typeutil" + "github.com/tikv/pd/pkg/versioninfo" "github.com/tikv/pd/server" + "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" ) -// dialClient used to dial http request. -var dialClient = &http.Client{ +// testDialClient used to dial http request. +var testDialClient = &http.Client{ Transport: &http.Transport{ DisableKeepAlives: true, }, @@ -56,7 +64,7 @@ func TestRateLimitConfigReload(t *testing.T) { data, err := json.Marshal(input) re.NoError(err) req, _ := http.NewRequest(http.MethodPost, leader.GetAddr()+"/pd/api/v1/service-middleware/config", bytes.NewBuffer(data)) - resp, err := dialClient.Do(req) + resp, err := testDialClient.Do(req) re.NoError(err) resp.Body.Close() re.True(leader.GetServer().GetServiceMiddlewarePersistOptions().IsRateLimitEnabled()) @@ -74,3 +82,520 @@ func TestRateLimitConfigReload(t *testing.T) { re.True(leader.GetServer().GetServiceMiddlewarePersistOptions().IsRateLimitEnabled()) re.Len(leader.GetServer().GetServiceMiddlewarePersistOptions().GetRateLimitConfig().LimiterConfig, 1) } + +type configTestSuite struct { + suite.Suite +} + +func TestConfigTestSuite(t *testing.T) { + suite.Run(t, new(configTestSuite)) +} + +func (suite *configTestSuite) TestConfigAll() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkConfigAll) +} + +func (suite *configTestSuite) checkConfigAll(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + urlPrefix := leaderServer.GetAddr() + + addr := fmt.Sprintf("%s/pd/api/v1/config", urlPrefix) + cfg := &config.Config{} + tu.Eventually(re, func() bool { + err := tu.ReadGetJSON(re, testDialClient, addr, cfg) + suite.NoError(err) + return cfg.PDServerCfg.DashboardAddress != "auto" + }) + + // the original way + r := map[string]int{"max-replicas": 5} + postData, err := json.Marshal(r) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) + suite.NoError(err) + l := map[string]interface{}{ + "location-labels": "zone,rack", + "region-schedule-limit": 10, + } + postData, err = json.Marshal(l) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) + suite.NoError(err) + + l = map[string]interface{}{ + "metric-storage": "http://127.0.0.1:9090", + } + postData, err = json.Marshal(l) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) + suite.NoError(err) + + newCfg := &config.Config{} + err = tu.ReadGetJSON(re, testDialClient, addr, newCfg) + suite.NoError(err) + cfg.Replication.MaxReplicas = 5 + cfg.Replication.LocationLabels = []string{"zone", "rack"} + cfg.Schedule.RegionScheduleLimit = 10 + cfg.PDServerCfg.MetricStorage = "http://127.0.0.1:9090" + suite.Equal(newCfg, cfg) + + // the new way + l = map[string]interface{}{ + "schedule.tolerant-size-ratio": 2.5, + "schedule.enable-tikv-split-region": "false", + "replication.location-labels": "idc,host", + "pd-server.metric-storage": "http://127.0.0.1:1234", + "log.level": "warn", + "cluster-version": "v4.0.0-beta", + "replication-mode.replication-mode": "dr-auto-sync", + "replication-mode.dr-auto-sync.label-key": "foobar", + } + postData, err = json.Marshal(l) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) + suite.NoError(err) + newCfg1 := &config.Config{} + err = tu.ReadGetJSON(re, testDialClient, addr, newCfg1) + suite.NoError(err) + cfg.Schedule.EnableTiKVSplitRegion = false + cfg.Schedule.TolerantSizeRatio = 2.5 + cfg.Replication.LocationLabels = []string{"idc", "host"} + cfg.PDServerCfg.MetricStorage = "http://127.0.0.1:1234" + cfg.Log.Level = "warn" + cfg.ReplicationMode.DRAutoSync.LabelKey = "foobar" + cfg.ReplicationMode.ReplicationMode = "dr-auto-sync" + v, err := versioninfo.ParseVersion("v4.0.0-beta") + suite.NoError(err) + cfg.ClusterVersion = *v + suite.Equal(cfg, newCfg1) + + // revert this to avoid it affects TestConfigTTL + l["schedule.enable-tikv-split-region"] = "true" + postData, err = json.Marshal(l) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) + suite.NoError(err) + + // illegal prefix + l = map[string]interface{}{ + "replicate.max-replicas": 1, + } + postData, err = json.Marshal(l) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, addr, postData, + tu.StatusNotOK(re), + tu.StringContain(re, "not found")) + suite.NoError(err) + + // update prefix directly + l = map[string]interface{}{ + "replication-mode": nil, + } + postData, err = json.Marshal(l) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, addr, postData, + tu.StatusNotOK(re), + tu.StringContain(re, "cannot update config prefix")) + suite.NoError(err) + + // config item not found + l = map[string]interface{}{ + "schedule.region-limit": 10, + } + postData, err = json.Marshal(l) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusNotOK(re), tu.StringContain(re, "not found")) + suite.NoError(err) +} + +func (suite *configTestSuite) TestConfigSchedule() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkConfigSchedule) +} + +func (suite *configTestSuite) checkConfigSchedule(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + urlPrefix := leaderServer.GetAddr() + + addr := fmt.Sprintf("%s/pd/api/v1/config/schedule", urlPrefix) + + scheduleConfig := &sc.ScheduleConfig{} + suite.NoError(tu.ReadGetJSON(re, testDialClient, addr, scheduleConfig)) + scheduleConfig.MaxStoreDownTime.Duration = time.Second + postData, err := json.Marshal(scheduleConfig) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) + suite.NoError(err) + + scheduleConfig1 := &sc.ScheduleConfig{} + suite.NoError(tu.ReadGetJSON(re, testDialClient, addr, scheduleConfig1)) + suite.Equal(*scheduleConfig1, *scheduleConfig) +} + +func (suite *configTestSuite) TestConfigReplication() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkConfigReplication) +} + +func (suite *configTestSuite) checkConfigReplication(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + urlPrefix := leaderServer.GetAddr() + + addr := fmt.Sprintf("%s/pd/api/v1/config/replicate", urlPrefix) + rc := &sc.ReplicationConfig{} + err := tu.ReadGetJSON(re, testDialClient, addr, rc) + suite.NoError(err) + + rc.MaxReplicas = 5 + rc1 := map[string]int{"max-replicas": 5} + postData, err := json.Marshal(rc1) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) + suite.NoError(err) + + rc.LocationLabels = []string{"zone", "rack"} + rc2 := map[string]string{"location-labels": "zone,rack"} + postData, err = json.Marshal(rc2) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) + suite.NoError(err) + + rc.IsolationLevel = "zone" + rc3 := map[string]string{"isolation-level": "zone"} + postData, err = json.Marshal(rc3) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) + suite.NoError(err) + + rc4 := &sc.ReplicationConfig{} + err = tu.ReadGetJSON(re, testDialClient, addr, rc4) + suite.NoError(err) + + suite.Equal(*rc4, *rc) +} + +func (suite *configTestSuite) TestConfigLabelProperty() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkConfigLabelProperty) +} + +func (suite *configTestSuite) checkConfigLabelProperty(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + urlPrefix := leaderServer.GetAddr() + + addr := urlPrefix + "/pd/api/v1/config/label-property" + loadProperties := func() config.LabelPropertyConfig { + var cfg config.LabelPropertyConfig + err := tu.ReadGetJSON(re, testDialClient, addr, &cfg) + suite.NoError(err) + return cfg + } + + cfg := loadProperties() + suite.Empty(cfg) + + cmds := []string{ + `{"type": "foo", "action": "set", "label-key": "zone", "label-value": "cn1"}`, + `{"type": "foo", "action": "set", "label-key": "zone", "label-value": "cn2"}`, + `{"type": "bar", "action": "set", "label-key": "host", "label-value": "h1"}`, + } + for _, cmd := range cmds { + err := tu.CheckPostJSON(testDialClient, addr, []byte(cmd), tu.StatusOK(re)) + suite.NoError(err) + } + + cfg = loadProperties() + suite.Len(cfg, 2) + suite.Equal([]config.StoreLabel{ + {Key: "zone", Value: "cn1"}, + {Key: "zone", Value: "cn2"}, + }, cfg["foo"]) + suite.Equal([]config.StoreLabel{{Key: "host", Value: "h1"}}, cfg["bar"]) + + cmds = []string{ + `{"type": "foo", "action": "delete", "label-key": "zone", "label-value": "cn1"}`, + `{"type": "bar", "action": "delete", "label-key": "host", "label-value": "h1"}`, + } + for _, cmd := range cmds { + err := tu.CheckPostJSON(testDialClient, addr, []byte(cmd), tu.StatusOK(re)) + suite.NoError(err) + } + + cfg = loadProperties() + suite.Len(cfg, 1) + suite.Equal([]config.StoreLabel{{Key: "zone", Value: "cn2"}}, cfg["foo"]) +} + +func (suite *configTestSuite) TestConfigDefault() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkConfigDefault) +} + +func (suite *configTestSuite) checkConfigDefault(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + urlPrefix := leaderServer.GetAddr() + + addr := urlPrefix + "/pd/api/v1/config" + + r := map[string]int{"max-replicas": 5} + postData, err := json.Marshal(r) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) + suite.NoError(err) + l := map[string]interface{}{ + "location-labels": "zone,rack", + "region-schedule-limit": 10, + } + postData, err = json.Marshal(l) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) + suite.NoError(err) + + l = map[string]interface{}{ + "metric-storage": "http://127.0.0.1:9090", + } + postData, err = json.Marshal(l) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) + suite.NoError(err) + + addr = fmt.Sprintf("%s/pd/api/v1/config/default", urlPrefix) + defaultCfg := &config.Config{} + err = tu.ReadGetJSON(re, testDialClient, addr, defaultCfg) + suite.NoError(err) + + suite.Equal(uint64(3), defaultCfg.Replication.MaxReplicas) + suite.Equal(typeutil.StringSlice([]string{}), defaultCfg.Replication.LocationLabels) + suite.Equal(uint64(2048), defaultCfg.Schedule.RegionScheduleLimit) + suite.Equal("", defaultCfg.PDServerCfg.MetricStorage) +} + +func (suite *configTestSuite) TestConfigPDServer() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkConfigPDServer) +} + +func (suite *configTestSuite) checkConfigPDServer(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + urlPrefix := leaderServer.GetAddr() + + addrPost := urlPrefix + "/pd/api/v1/config" + ms := map[string]interface{}{ + "metric-storage": "", + } + postData, err := json.Marshal(ms) + suite.NoError(err) + suite.NoError(tu.CheckPostJSON(testDialClient, addrPost, postData, tu.StatusOK(re))) + addrGet := fmt.Sprintf("%s/pd/api/v1/config/pd-server", urlPrefix) + sc := &config.PDServerConfig{} + suite.NoError(tu.ReadGetJSON(re, testDialClient, addrGet, sc)) + suite.Equal(bool(true), sc.UseRegionStorage) + suite.Equal("table", sc.KeyType) + suite.Equal(typeutil.StringSlice([]string{}), sc.RuntimeServices) + suite.Equal("", sc.MetricStorage) + suite.Equal("auto", sc.DashboardAddress) + suite.Equal(int(3), sc.FlowRoundByDigit) + suite.Equal(typeutil.NewDuration(time.Second), sc.MinResolvedTSPersistenceInterval) + suite.Equal(24*time.Hour, sc.MaxResetTSGap.Duration) +} + +var ttlConfig = map[string]interface{}{ + "schedule.max-snapshot-count": 999, + "schedule.enable-location-replacement": false, + "schedule.max-merge-region-size": 999, + "schedule.max-merge-region-keys": 999, + "schedule.scheduler-max-waiting-operator": 999, + "schedule.leader-schedule-limit": 999, + "schedule.region-schedule-limit": 999, + "schedule.hot-region-schedule-limit": 999, + "schedule.replica-schedule-limit": 999, + "schedule.merge-schedule-limit": 999, + "schedule.enable-tikv-split-region": false, +} + +var invalidTTLConfig = map[string]interface{}{ + "schedule.invalid-ttl-config": 0, +} + +type ttlConfigInterface interface { + GetMaxSnapshotCount() uint64 + IsLocationReplacementEnabled() bool + GetMaxMergeRegionSize() uint64 + GetMaxMergeRegionKeys() uint64 + GetSchedulerMaxWaitingOperator() uint64 + GetLeaderScheduleLimit() uint64 + GetRegionScheduleLimit() uint64 + GetHotRegionScheduleLimit() uint64 + GetReplicaScheduleLimit() uint64 + GetMergeScheduleLimit() uint64 + IsTikvRegionSplitEnabled() bool +} + +func (suite *configTestSuite) assertTTLConfig( + cluster *tests.TestCluster, + expectedEqual bool, +) { + equality := suite.Equal + if !expectedEqual { + equality = suite.NotEqual + } + checkfunc := func(options ttlConfigInterface) { + equality(uint64(999), options.GetMaxSnapshotCount()) + equality(false, options.IsLocationReplacementEnabled()) + equality(uint64(999), options.GetMaxMergeRegionSize()) + equality(uint64(999), options.GetMaxMergeRegionKeys()) + equality(uint64(999), options.GetSchedulerMaxWaitingOperator()) + equality(uint64(999), options.GetLeaderScheduleLimit()) + equality(uint64(999), options.GetRegionScheduleLimit()) + equality(uint64(999), options.GetHotRegionScheduleLimit()) + equality(uint64(999), options.GetReplicaScheduleLimit()) + equality(uint64(999), options.GetMergeScheduleLimit()) + equality(false, options.IsTikvRegionSplitEnabled()) + } + checkfunc(cluster.GetLeaderServer().GetServer().GetPersistOptions()) + if cluster.GetSchedulingPrimaryServer() != nil { + // wait for the scheduling primary server to be synced + options := cluster.GetSchedulingPrimaryServer().GetPersistConfig() + tu.Eventually(suite.Require(), func() bool { + if expectedEqual { + return uint64(999) == options.GetMaxSnapshotCount() + } + return uint64(999) != options.GetMaxSnapshotCount() + }) + checkfunc(options) + } +} + +func (suite *configTestSuite) assertTTLConfigItemEqaul( + cluster *tests.TestCluster, + item string, + expectedValue interface{}, +) { + checkfunc := func(options ttlConfigInterface) bool { + switch item { + case "max-merge-region-size": + return expectedValue.(uint64) == options.GetMaxMergeRegionSize() + case "max-merge-region-keys": + return expectedValue.(uint64) == options.GetMaxMergeRegionKeys() + case "enable-tikv-split-region": + return expectedValue.(bool) == options.IsTikvRegionSplitEnabled() + } + return false + } + suite.True(checkfunc(cluster.GetLeaderServer().GetServer().GetPersistOptions())) + if cluster.GetSchedulingPrimaryServer() != nil { + // wait for the scheduling primary server to be synced + tu.Eventually(suite.Require(), func() bool { + return checkfunc(cluster.GetSchedulingPrimaryServer().GetPersistConfig()) + }) + } +} + +func createTTLUrl(url string, ttl int) string { + return fmt.Sprintf("%s/pd/api/v1/config?ttlSecond=%d", url, ttl) +} + +func (suite *configTestSuite) TestConfigTTL() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + // FIXME: enable this test in two modes after ttl config is supported. + env.RunTestInPDMode(suite.checkConfigTTL) +} + +func (suite *configTestSuite) checkConfigTTL(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + urlPrefix := leaderServer.GetAddr() + postData, err := json.Marshal(ttlConfig) + suite.NoError(err) + + // test no config and cleaning up + err = tu.CheckPostJSON(testDialClient, createTTLUrl(urlPrefix, 0), postData, tu.StatusOK(re)) + suite.NoError(err) + suite.assertTTLConfig(cluster, false) + + // test time goes by + err = tu.CheckPostJSON(testDialClient, createTTLUrl(urlPrefix, 1), postData, tu.StatusOK(re)) + suite.NoError(err) + suite.assertTTLConfig(cluster, true) + time.Sleep(2 * time.Second) + suite.assertTTLConfig(cluster, false) + + // test cleaning up + err = tu.CheckPostJSON(testDialClient, createTTLUrl(urlPrefix, 1), postData, tu.StatusOK(re)) + suite.NoError(err) + suite.assertTTLConfig(cluster, true) + err = tu.CheckPostJSON(testDialClient, createTTLUrl(urlPrefix, 0), postData, tu.StatusOK(re)) + suite.NoError(err) + suite.assertTTLConfig(cluster, false) + + postData, err = json.Marshal(invalidTTLConfig) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, createTTLUrl(urlPrefix, 1), postData, + tu.StatusNotOK(re), tu.StringEqual(re, "\"unsupported ttl config schedule.invalid-ttl-config\"\n")) + suite.NoError(err) + + // only set max-merge-region-size + mergeConfig := map[string]interface{}{ + "schedule.max-merge-region-size": 999, + } + postData, err = json.Marshal(mergeConfig) + suite.NoError(err) + + err = tu.CheckPostJSON(testDialClient, createTTLUrl(urlPrefix, 1), postData, tu.StatusOK(re)) + suite.NoError(err) + suite.assertTTLConfigItemEqaul(cluster, "max-merge-region-size", uint64(999)) + // max-merge-region-keys should keep consistence with max-merge-region-size. + suite.assertTTLConfigItemEqaul(cluster, "max-merge-region-keys", uint64(999*10000)) + + // on invalid value, we use default config + mergeConfig = map[string]interface{}{ + "schedule.enable-tikv-split-region": "invalid", + } + postData, err = json.Marshal(mergeConfig) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, createTTLUrl(urlPrefix, 10), postData, tu.StatusOK(re)) + suite.NoError(err) + suite.assertTTLConfigItemEqaul(cluster, "enable-tikv-split-region", true) +} + +func (suite *configTestSuite) TestTTLConflict() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + // FIXME: enable this test in two modes after ttl config is supported. + env.RunTestInPDMode(suite.checkTTLConflict) +} + +func (suite *configTestSuite) checkTTLConflict(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + urlPrefix := leaderServer.GetAddr() + addr := createTTLUrl(urlPrefix, 1) + postData, err := json.Marshal(ttlConfig) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) + suite.NoError(err) + suite.assertTTLConfig(cluster, true) + + cfg := map[string]interface{}{"max-snapshot-count": 30} + postData, err = json.Marshal(cfg) + suite.NoError(err) + addr = fmt.Sprintf("%s/pd/api/v1/config", urlPrefix) + err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusNotOK(re), tu.StringEqual(re, "\"need to clean up TTL first for schedule.max-snapshot-count\"\n")) + suite.NoError(err) + addr = fmt.Sprintf("%s/pd/api/v1/config/schedule", urlPrefix) + err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusNotOK(re), tu.StringEqual(re, "\"need to clean up TTL first for schedule.max-snapshot-count\"\n")) + suite.NoError(err) + cfg = map[string]interface{}{"schedule.max-snapshot-count": 30} + postData, err = json.Marshal(cfg) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, createTTLUrl(urlPrefix, 0), postData, tu.StatusOK(re)) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)) + suite.NoError(err) +} diff --git a/tools/pd-ctl/pdctl/command/gc_safepoint_command.go b/tools/pd-ctl/pdctl/command/gc_safepoint_command.go index 619cf5a928b..80c6328e955 100644 --- a/tools/pd-ctl/pdctl/command/gc_safepoint_command.go +++ b/tools/pd-ctl/pdctl/command/gc_safepoint_command.go @@ -15,9 +15,12 @@ package command import ( + "encoding/json" "net/http" + "sort" "github.com/spf13/cobra" + "github.com/tikv/pd/server/api" ) var ( @@ -52,7 +55,20 @@ func showSSPs(cmd *cobra.Command, args []string) { cmd.Printf("Failed to get service GC safepoint: %s\n", err) return } - cmd.Println(r) + var safepoint api.ListServiceGCSafepoint + if err := json.Unmarshal([]byte(r), &safepoint); err != nil { + cmd.Printf("Failed to unmarshal service GC safepoint: %s\n", err) + return + } + sort.Slice(safepoint.ServiceGCSafepoints, func(i, j int) bool { + return safepoint.ServiceGCSafepoints[i].SafePoint < safepoint.ServiceGCSafepoints[j].SafePoint + }) + data, err := json.MarshalIndent(safepoint, "", " ") + if err != nil { + cmd.Printf("Failed to marshal service GC safepoint: %s\n", err) + return + } + cmd.Println(string(data)) } func deleteSSP(cmd *cobra.Command, args []string) { diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 4349735f06d..526ff2646dc 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -745,11 +745,17 @@ func showShuffleRegionSchedulerRolesCommandFunc(cmd *cobra.Command, args []strin if p == "show-roles" { p = cmd.Parent().Name() } - path := path.Join(schedulerConfigPrefix, p, "roles") - r, err := doRequest(cmd, path, http.MethodGet, http.Header{}) + url := path.Join(schedulerConfigPrefix, p, "list") + r, err := doRequest(cmd, url, http.MethodGet, http.Header{}) if err != nil { - cmd.Println(err) - return + // try to use old api + var err2 error + url := path.Join(schedulerConfigPrefix, p, "roles") + r, err2 = doRequest(cmd, url, http.MethodGet, http.Header{}) + if err2 != nil { + cmd.Println(err, err2) + return + } } cmd.Println(r) }