diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 051856abf81..ef9c9a97db2 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -808,8 +808,8 @@ func (mc *Cluster) MockRegionInfo(regionID uint64, leaderStoreID uint64, followerStoreIDs, learnerStoreIDs []uint64, epoch *metapb.RegionEpoch) *core.RegionInfo { region := &metapb.Region{ Id: regionID, - StartKey: []byte(fmt.Sprintf("%20d", regionID)), - EndKey: []byte(fmt.Sprintf("%20d", regionID+1)), + StartKey: []byte(fmt.Sprintf("%20d0", regionID)), + EndKey: []byte(fmt.Sprintf("%20d0", regionID+1)), RegionEpoch: epoch, } var leader *metapb.Peer diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index a1fa70875c9..0f75d1e5cad 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -8,6 +8,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/core/storelimit" + "github.com/tikv/pd/pkg/storage/endpoint" ) // RejectLeader is the label property type that suggests a store should not @@ -30,6 +31,10 @@ func IsSchedulerRegistered(name string) bool { // Config is the interface that wraps the Config related methods. type Config interface { IsSchedulingHalted() bool + IsSchedulerDisabled(string) bool + AddSchedulerCfg(string, []string) + RemoveSchedulerCfg(string) + Persist(endpoint.ConfigStorage) error GetReplicaScheduleLimit() uint64 GetRegionScheduleLimit() uint64 diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 6467fcc29f3..cb36a367165 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -86,8 +86,8 @@ type Coordinator struct { // NewCoordinator creates a new Coordinator. func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Coordinator { ctx, cancel := context.WithCancel(ctx) - opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetPersistOptions(), hbStreams) - schedulers := schedulers.NewController(ctx, cluster, opController) + opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetOpts(), hbStreams) + schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController) c := &Coordinator{ ctx: ctx, cancel: cancel, @@ -101,7 +101,7 @@ func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams hbStreams: hbStreams, pluginInterface: NewPluginInterface(), } - c.diagnosticManager = diagnostic.NewManager(schedulers, cluster.GetPersistOptions()) + c.diagnosticManager = diagnostic.NewManager(schedulers, cluster.GetOpts()) return c } @@ -316,7 +316,7 @@ func (c *Coordinator) RunUntilStop() { c.Run() <-c.ctx.Done() log.Info("Coordinator is stopping") - c.GetSchedulersController().GetWaitGroup().Wait() + c.GetSchedulersController().Wait() c.wg.Wait() log.Info("Coordinator has been stopped") } diff --git a/pkg/schedule/diagnostic/diagnostic_manager.go b/pkg/schedule/diagnostic/diagnostic_manager.go index 70b96d1bd15..8e9546aa290 100644 --- a/pkg/schedule/diagnostic/diagnostic_manager.go +++ b/pkg/schedule/diagnostic/diagnostic_manager.go @@ -18,18 +18,18 @@ import ( "time" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/schedulers" - "github.com/tikv/pd/server/config" ) // Manager is used to manage the diagnostic result of schedulers for now. type Manager struct { - config *config.PersistOptions + config config.Config schedulerController *schedulers.Controller } // NewManager creates a new Manager. -func NewManager(schedulerController *schedulers.Controller, config *config.PersistOptions) *Manager { +func NewManager(schedulerController *schedulers.Controller, config config.Config) *Manager { return &Manager{ config: config, schedulerController: schedulerController, @@ -48,15 +48,7 @@ func (d *Manager) GetDiagnosticResult(name string) (*schedulers.DiagnosticResult res := &schedulers.DiagnosticResult{Name: name, Timestamp: ts, Status: schedulers.Disabled} return res, nil } - var isDisabled bool - t := scheduler.Scheduler.GetType() - scheduleConfig := d.config.GetScheduleConfig() - for _, s := range scheduleConfig.Schedulers { - if t == s.Type { - isDisabled = s.Disable - break - } - } + isDisabled := d.config.IsSchedulerDisabled(scheduler.Scheduler.GetType()) if isDisabled { ts := uint64(time.Now().Unix()) res := &schedulers.DiagnosticResult{Name: name, Timestamp: ts, Status: schedulers.Disabled} diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 4d701e7e7ca..7984aa15698 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -58,10 +58,12 @@ var ( hotSchedulerSnapshotSenderLimitCounter = schedulerCounter.WithLabelValues(HotRegionName, "snapshot_sender_limit") // counter related with the split region - hotSchedulerNotFoundSplitKeysCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_found_split_keys") - hotSchedulerRegionBucketsNotHotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_not_hot") - hotSchedulerSplitSuccessCounter = schedulerCounter.WithLabelValues(HotRegionName, "split_success") - hotSchedulerNeedSplitBeforeScheduleCounter = schedulerCounter.WithLabelValues(HotRegionName, "need_split_before_move_peer") + hotSchedulerNotFoundSplitKeysCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_found_split_keys") + hotSchedulerRegionBucketsNotHotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_not_hot") + hotSchedulerRegionBucketsSingleHotSpotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_single_hot_spot") + hotSchedulerSplitSuccessCounter = schedulerCounter.WithLabelValues(HotRegionName, "split_success") + hotSchedulerNeedSplitBeforeScheduleCounter = schedulerCounter.WithLabelValues(HotRegionName, "need_split_before_move_peer") + hotSchedulerRegionTooHotNeedSplitCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_is_too_hot_need_split") hotSchedulerMoveLeaderCounter = schedulerCounter.WithLabelValues(HotRegionName, moveLeader.String()) hotSchedulerMovePeerCounter = schedulerCounter.WithLabelValues(HotRegionName, movePeer.String()) @@ -159,21 +161,23 @@ func (h *baseHotScheduler) prepareForBalance(rw statistics.RWType, cluster sche. // It makes each dim rate or count become `weight` times to the origin value. func (h *baseHotScheduler) summaryPendingInfluence() { for id, p := range h.regionPendings { - from := h.stInfos[p.from] - to := h.stInfos[p.to] - maxZombieDur := p.maxZombieDuration - weight, needGC := calcPendingInfluence(p.op, maxZombieDur) - - if needGC { - delete(h.regionPendings, id) - continue - } + for _, from := range p.froms { + from := h.stInfos[from] + to := h.stInfos[p.to] + maxZombieDur := p.maxZombieDuration + weight, needGC := calcPendingInfluence(p.op, maxZombieDur) + + if needGC { + delete(h.regionPendings, id) + continue + } - if from != nil && weight > 0 { - from.AddInfluence(&p.origin, -weight) - } - if to != nil && weight > 0 { - to.AddInfluence(&p.origin, weight) + if from != nil && weight > 0 { + from.AddInfluence(&p.origin, -weight) + } + if to != nil && weight > 0 { + to.AddInfluence(&p.origin, weight) + } } } for storeID, info := range h.stInfos { @@ -214,7 +218,8 @@ var ( // as it implies that this dimension is sufficiently uniform. stddevThreshold = 0.1 - splitBucket = "split-hot-region" + splitBucket = "split-hot-region" + splitProgressiveRank = int64(-5) ) type hotScheduler struct { @@ -294,7 +299,7 @@ func (h *hotScheduler) dispatch(typ statistics.RWType, cluster sche.ScheduleClus return nil } -func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl statistics.Influence, maxZombieDur time.Duration) bool { +func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore []uint64, dstStore uint64, infl statistics.Influence, maxZombieDur time.Duration) bool { regionID := op.RegionID() _, ok := h.regionPendings[regionID] if ok { @@ -651,6 +656,7 @@ func (bs *balanceSolver) solve() []*operator.Operator { } } snapshotFilter := filter.NewSnapshotSendFilter(bs.GetStores(), constant.Medium) + splitThresholds := bs.sche.conf.getSplitThresholds() for _, srcStore := range bs.filterSrcStores() { bs.cur.srcStore = srcStore srcStoreID := srcStore.GetID() @@ -664,6 +670,16 @@ func (bs *balanceSolver) solve() []*operator.Operator { } } bs.cur.mainPeerStat = mainPeerStat + if tooHotNeedSplit(srcStore, mainPeerStat, splitThresholds) && bs.GetStoreConfig().IsEnableRegionBucket() { + hotSchedulerRegionTooHotNeedSplitCounter.Inc() + ops := bs.createSplitOperator([]*core.RegionInfo{bs.cur.region}, true /*too hot need to split*/) + if len(ops) > 0 { + bs.ops = ops + bs.cur.calcPeersRate(bs.firstPriority, bs.secondPriority) + bs.best = bs.cur + return ops + } + } for _, dstStore := range bs.filterDstStores() { bs.cur.dstStore = dstStore @@ -723,7 +739,8 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool { if bs.best == nil || len(bs.ops) == 0 { return false } - if bs.best.srcStore.IsTiFlash() != bs.best.dstStore.IsTiFlash() { + isSplit := bs.ops[0].Kind() == operator.OpSplit + if !isSplit && bs.best.srcStore.IsTiFlash() != bs.best.dstStore.IsTiFlash() { hotSchedulerNotSameEngineCounter.Inc() return false } @@ -731,16 +748,29 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool { // TODO: Process operators atomically. // main peer - srcStoreID := bs.best.srcStore.GetID() - dstStoreID := bs.best.dstStore.GetID() + + srcStoreIDs := make([]uint64, 0) + dstStoreID := uint64(0) + if isSplit { + region := bs.GetRegion(bs.ops[0].RegionID()) + for id := range region.GetStoreIDs() { + srcStoreIDs = append(srcStoreIDs, id) + } + } else { + srcStoreIDs = append(srcStoreIDs, bs.best.srcStore.GetID()) + dstStoreID = bs.best.dstStore.GetID() + } infl := bs.collectPendingInfluence(bs.best.mainPeerStat) - if !bs.sche.tryAddPendingInfluence(bs.ops[0], srcStoreID, dstStoreID, infl, maxZombieDur) { + if !bs.sche.tryAddPendingInfluence(bs.ops[0], srcStoreIDs, dstStoreID, infl, maxZombieDur) { return false } + if isSplit { + return true + } // revert peers if bs.best.revertPeerStat != nil && len(bs.ops) > 1 { infl := bs.collectPendingInfluence(bs.best.revertPeerStat) - if !bs.sche.tryAddPendingInfluence(bs.ops[1], dstStoreID, srcStoreID, infl, maxZombieDur) { + if !bs.sche.tryAddPendingInfluence(bs.ops[1], srcStoreIDs, dstStoreID, infl, maxZombieDur) { return false } } @@ -1243,7 +1273,7 @@ func (bs *balanceSolver) getMinRate(dim int) float64 { // betterThan checks if `bs.cur` is a better solution than `old`. func (bs *balanceSolver) betterThanV1(old *solution) bool { - if old == nil { + if old == nil || bs.cur.progressiveRank <= splitProgressiveRank { return true } if bs.cur.progressiveRank != old.progressiveRank { @@ -1435,7 +1465,7 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) { } } if len(splitRegions) > 0 { - return bs.createSplitOperator(splitRegions) + return bs.createSplitOperator(splitRegions, false /* region is too big need split before move */) } srcStoreID := bs.cur.srcStore.GetID() @@ -1475,7 +1505,8 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) { } // createSplitOperator creates split operators for the given regions. -func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*operator.Operator { +// isTooHot true indicates that the region is too hot and needs split. +func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo, isTooHot bool) []*operator.Operator { if len(regions) == 0 { return nil } @@ -1492,6 +1523,11 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*oper hotSchedulerRegionBucketsNotHotCounter.Inc() return } + // skip if only one hot buckets exists on this region. + if len(stats) <= 1 && isTooHot { + hotSchedulerRegionBucketsSingleHotSpotCounter.Inc() + return + } startKey, endKey := region.GetStartKey(), region.GetEndKey() splitKey := make([][]byte, 0) for _, stat := range stats { @@ -1505,7 +1541,13 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*oper // Otherwise, we should append the current start key and end key. // E.g. [a, b), [c, d) -> [a, b), [c, d) split keys is [a,b,c,d] if bytes.Equal(stat.StartKey, splitKey[len(splitKey)-1]) { - splitKey[len(splitKey)-1] = stat.EndKey + // If the region is too hot, we should split all the buckets to balance the load. + // otherwise, we should split the buckets that are too hot. + if isTooHot { + splitKey = append(splitKey, stat.EndKey) + } else { + splitKey[len(splitKey)-1] = stat.EndKey + } } else { splitKey = append(splitKey, stat.StartKey, stat.EndKey) } @@ -1529,6 +1571,10 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*oper for _, region := range regions { createFunc(region) } + // the split bucket's priority is highest + if len(operators) > 0 { + bs.cur.progressiveRank = splitProgressiveRank + } return operators } @@ -1789,3 +1835,10 @@ func dimToString(dim int) string { func prioritiesToDim(priorities []string) (firstPriority int, secondPriority int) { return stringToDim(priorities[0]), stringToDim(priorities[1]) } + +// tooHotNeedSplit returns true if any dim of the hot region is greater than the store threshold. +func tooHotNeedSplit(store *statistics.StoreLoadDetail, region *statistics.HotPeerStat, splitThresholds float64) bool { + return slice.AnyOf(store.LoadPred.Current.Loads, func(i int) bool { + return region.Loads[i] > store.LoadPred.Current.Loads[i]*splitThresholds + }) +} diff --git a/pkg/schedule/schedulers/hot_region_config.go b/pkg/schedule/schedulers/hot_region_config.go index c50f18d7c68..ffbe805e0bf 100644 --- a/pkg/schedule/schedulers/hot_region_config.go +++ b/pkg/schedule/schedulers/hot_region_config.go @@ -75,6 +75,7 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig { EnableForTiFlash: true, RankFormulaVersion: "v2", ForbidRWType: "none", + SplitThresholds: 0.2, } cfg.applyPrioritiesConfig(defaultPrioritiesConfig) return cfg @@ -102,6 +103,7 @@ func (conf *hotRegionSchedulerConfig) getValidConf() *hotRegionSchedulerConfig { EnableForTiFlash: conf.EnableForTiFlash, RankFormulaVersion: conf.getRankFormulaVersionLocked(), ForbidRWType: conf.getForbidRWTypeLocked(), + SplitThresholds: conf.SplitThresholds, } } @@ -143,6 +145,8 @@ type hotRegionSchedulerConfig struct { RankFormulaVersion string `json:"rank-formula-version"` // forbid read or write scheduler, only for test ForbidRWType string `json:"forbid-rw-type,omitempty"` + // SplitThresholds is the threshold to split hot region if the flow of on hot region exceeds it. + SplitThresholds float64 `json:"split-thresholds"` } func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) { @@ -316,6 +320,12 @@ func (conf *hotRegionSchedulerConfig) IsForbidRWType(rw statistics.RWType) bool return rw.String() == conf.ForbidRWType } +func (conf *hotRegionSchedulerConfig) getSplitThresholds() float64 { + conf.RLock() + defer conf.RUnlock() + return conf.SplitThresholds +} + func (conf *hotRegionSchedulerConfig) getForbidRWTypeLocked() string { switch conf.ForbidRWType { case statistics.Read.String(), statistics.Write.String(): @@ -377,6 +387,9 @@ func (conf *hotRegionSchedulerConfig) valid() error { conf.ForbidRWType != "none" && conf.ForbidRWType != "" { return errs.ErrSchedulerConfig.FastGenByArgs("invalid forbid-rw-type") } + if conf.SplitThresholds < 0.01 || conf.SplitThresholds > 1.0 { + return errs.ErrSchedulerConfig.FastGenByArgs("invalid split-thresholds, should be in range [0.01, 1.0]") + } return nil } diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index e4cf6b121f8..b61471ca79f 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -16,6 +16,7 @@ package schedulers import ( "encoding/hex" + "fmt" "math" "testing" "time" @@ -141,7 +142,7 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) { op.Start() op.SetStatusReachTime(operator.CREATED, time.Now().Add(-5*statistics.StoreHeartBeatReportInterval*time.Second)) op.SetStatusReachTime(operator.STARTED, time.Now().Add((-5*statistics.StoreHeartBeatReportInterval+1)*time.Second)) - return newPendingInfluence(op, 2, 4, statistics.Influence{}, hb.conf.GetStoreStatZombieDuration()) + return newPendingInfluence(op, []uint64{2}, 4, statistics.Influence{}, hb.conf.GetStoreStatZombieDuration()) } justDoneOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence { infl := notDoneOpInfluence(region, ty) @@ -202,6 +203,69 @@ func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) { checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */) } +func TestSplitIfRegionTooHot(t *testing.T) { + re := require.New(t) + statistics.Denoising = false + cancel, _, tc, oc := prepareSchedulersTest() + defer cancel() + tc.SetHotRegionCacheHitsThreshold(1) + hb, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + re.NoError(err) + b := &metapb.Buckets{ + RegionId: 1, + PeriodInMs: 1000, + Keys: [][]byte{ + []byte(fmt.Sprintf("%21d", 11)), + []byte(fmt.Sprintf("%21d", 12)), + []byte(fmt.Sprintf("%21d", 13)), + }, + Stats: &metapb.BucketStats{ + ReadBytes: []uint64{10 * units.KiB, 11 * units.KiB}, + ReadKeys: []uint64{256, 256}, + ReadQps: []uint64{0, 0}, + WriteBytes: []uint64{0, 0}, + WriteQps: []uint64{0, 0}, + WriteKeys: []uint64{0, 0}, + }, + } + + task := buckets.NewCheckPeerTask(b) + re.True(tc.HotBucketCache.CheckAsync(task)) + time.Sleep(time.Millisecond * 10) + + tc.AddRegionStore(1, 3) + tc.AddRegionStore(2, 2) + tc.AddRegionStore(3, 2) + + tc.UpdateStorageReadBytes(1, 6*units.MiB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(2, 1*units.MiB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadBytes(3, 1*units.MiB*statistics.StoreHeartBeatReportInterval) + // Region 1, 2 and 3 are hot regions. + addRegionInfo(tc, statistics.Read, []testRegionInfo{ + {1, []uint64{1, 2, 3}, 4 * units.MiB, 0, 0}, + }) + tc.GetStoreConfig().SetRegionBucketEnabled(true) + ops, _ := hb.Schedule(tc, false) + re.Len(ops, 1) + re.Equal(operator.OpSplit, ops[0].Kind()) + ops, _ = hb.Schedule(tc, false) + re.Len(ops, 0) + + tc.UpdateStorageWrittenBytes(1, 6*units.MiB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(2, 1*units.MiB*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageWrittenBytes(3, 1*units.MiB*statistics.StoreHeartBeatReportInterval) + // Region 1, 2 and 3 are hot regions. + addRegionInfo(tc, statistics.Write, []testRegionInfo{ + {1, []uint64{1, 2, 3}, 4 * units.MiB, 0, 0}, + }) + hb, _ = CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + ops, _ = hb.Schedule(tc, false) + re.Len(ops, 1) + re.Equal(operator.OpSplit, ops[0].Kind()) + ops, _ = hb.Schedule(tc, false) + re.Len(ops, 0) +} + func TestSplitBuckets(t *testing.T) { re := require.New(t) statistics.Denoising = false @@ -211,6 +275,7 @@ func TestSplitBuckets(t *testing.T) { hb, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) solve := newBalanceSolver(hb.(*hotScheduler), tc, statistics.Read, transferLeader) + solve.cur = &solution{} region := core.NewTestRegionInfo(1, 1, []byte(""), []byte("")) // the hot range is [a,c],[e,f] @@ -231,14 +296,23 @@ func TestSplitBuckets(t *testing.T) { task := buckets.NewCheckPeerTask(b) re.True(tc.HotBucketCache.CheckAsync(task)) time.Sleep(time.Millisecond * 10) - ops := solve.createSplitOperator([]*core.RegionInfo{region}) + ops := solve.createSplitOperator([]*core.RegionInfo{region}, false) re.Equal(1, len(ops)) op := ops[0] re.Equal(splitBucket, op.Desc()) expectKeys := [][]byte{[]byte("a"), []byte("c"), []byte("d"), []byte("f")} expectOp, err := operator.CreateSplitRegionOperator(splitBucket, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, expectKeys) re.NoError(err) - expectOp.GetCreateTime() + re.Equal(expectOp.Brief(), op.Brief()) + re.Equal(expectOp.GetAdditionalInfo(), op.GetAdditionalInfo()) + + ops = solve.createSplitOperator([]*core.RegionInfo{region}, true) + re.Equal(1, len(ops)) + op = ops[0] + re.Equal(splitBucket, op.Desc()) + expectKeys = [][]byte{[]byte("a"), []byte("b"), []byte("c"), []byte("d"), []byte("e"), []byte("f")} + expectOp, err = operator.CreateSplitRegionOperator(splitBucket, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, expectKeys) + re.NoError(err) re.Equal(expectOp.Brief(), op.Brief()) re.Equal(expectOp.GetAdditionalInfo(), op.GetAdditionalInfo()) } @@ -2414,6 +2488,14 @@ func TestConfigValidation(t *testing.T) { hc.ForbidRWType = "test" err = hc.valid() re.Error(err) + + hc.SplitThresholds = 0 + err = hc.valid() + re.Error(err) + + hc.SplitThresholds = 1.1 + err = hc.valid() + re.Error(err) } type maxZombieDurTestCase struct { diff --git a/pkg/schedule/schedulers/hot_region_v2.go b/pkg/schedule/schedulers/hot_region_v2.go index 2b7e96af0fb..7b296446de2 100644 --- a/pkg/schedule/schedulers/hot_region_v2.go +++ b/pkg/schedule/schedulers/hot_region_v2.go @@ -292,7 +292,7 @@ func (bs *balanceSolver) getScoreByPriorities(dim int, rs *rankV2Ratios) int { // betterThan checks if `bs.cur` is a better solution than `old`. func (bs *balanceSolver) betterThanV2(old *solution) bool { - if old == nil { + if old == nil || bs.cur.progressiveRank <= splitProgressiveRank { return true } if bs.cur.progressiveRank != old.progressiveRank { diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index acb760b50fa..ec6b07196b5 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -28,9 +28,8 @@ import ( "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" - "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/logutil" - "github.com/tikv/pd/server/config" "go.uber.org/zap" ) @@ -43,24 +42,26 @@ type Controller struct { sync.RWMutex wg sync.WaitGroup ctx context.Context - cluster sche.ClusterInformer + cluster sche.ScheduleCluster + storage endpoint.ConfigStorage schedulers map[string]*ScheduleController opController *operator.Controller } // NewController creates a scheduler controller. -func NewController(ctx context.Context, cluster sche.ClusterInformer, opController *operator.Controller) *Controller { +func NewController(ctx context.Context, cluster sche.ScheduleCluster, storage endpoint.ConfigStorage, opController *operator.Controller) *Controller { return &Controller{ ctx: ctx, cluster: cluster, + storage: storage, schedulers: make(map[string]*ScheduleController), opController: opController, } } -// GetWaitGroup returns the waitGroup of the controller. -func (c *Controller) GetWaitGroup() *sync.WaitGroup { - return &c.wg +// Wait waits on all schedulers to exit. +func (c *Controller) Wait() { + c.wg.Wait() } // GetScheduler returns a schedule controller by name. @@ -108,7 +109,7 @@ func (c *Controller) CollectSchedulerMetrics() { } func (c *Controller) isSchedulingHalted() bool { - return c.cluster.GetPersistOptions().IsSchedulingHalted() + return c.cluster.GetOpts().IsSchedulingHalted() } // ResetSchedulerMetrics resets metrics of all schedulers. @@ -133,7 +134,7 @@ func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error { c.wg.Add(1) go c.runScheduler(s) c.schedulers[s.Scheduler.GetName()] = s - c.cluster.GetPersistOptions().AddSchedulerCfg(s.Scheduler.GetType(), args) + c.cluster.GetOpts().AddSchedulerCfg(s.Scheduler.GetType(), args) return nil } @@ -149,18 +150,14 @@ func (c *Controller) RemoveScheduler(name string) error { return errs.ErrSchedulerNotFound.FastGenByArgs() } - opt := c.cluster.GetPersistOptions() - if err := c.removeOptScheduler(opt, name); err != nil { - log.Error("can not remove scheduler", zap.String("scheduler-name", name), errs.ZapError(err)) - return err - } - - if err := opt.Persist(c.cluster.GetStorage()); err != nil { + opt := c.cluster.GetOpts() + opt.RemoveSchedulerCfg(s.Scheduler.GetType()) + if err := opt.Persist(c.storage); err != nil { log.Error("the option can not persist scheduler config", errs.ZapError(err)) return err } - if err := c.cluster.GetStorage().RemoveScheduleConfig(name); err != nil { + if err := c.storage.RemoveScheduleConfig(name); err != nil { log.Error("can not remove the scheduler config", errs.ZapError(err)) return err } @@ -172,29 +169,6 @@ func (c *Controller) RemoveScheduler(name string) error { return nil } -func (c *Controller) removeOptScheduler(o *config.PersistOptions, name string) error { - v := o.GetScheduleConfig().Clone() - for i, schedulerCfg := range v.Schedulers { - // To create a temporary scheduler is just used to get scheduler's name - decoder := ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args) - tmp, err := CreateScheduler(schedulerCfg.Type, c.opController, storage.NewStorageWithMemoryBackend(), decoder, c.RemoveScheduler) - if err != nil { - return err - } - if tmp.GetName() == name { - if config.IsDefaultScheduler(tmp.GetType()) { - schedulerCfg.Disable = true - v.Schedulers[i] = schedulerCfg - } else { - v.Schedulers = append(v.Schedulers[:i], v.Schedulers[i+1:]...) - } - o.SetScheduleConfig(v) - return nil - } - } - return nil -} - // PauseOrResumeScheduler pauses or resumes a scheduler by name. func (c *Controller) PauseOrResumeScheduler(name string, t int64) error { c.Lock() @@ -265,14 +239,7 @@ func (c *Controller) IsSchedulerDisabled(name string) (bool, error) { if !ok { return false, errs.ErrSchedulerNotFound.FastGenByArgs() } - t := s.Scheduler.GetType() - scheduleConfig := c.cluster.GetPersistOptions().GetScheduleConfig() - for _, s := range scheduleConfig.Schedulers { - if t == s.Type { - return s.Disable, nil - } - } - return false, nil + return c.cluster.GetOpts().IsSchedulerDisabled(s.Scheduler.GetType()), nil } // IsSchedulerExisted returns whether a scheduler is existed. diff --git a/pkg/schedule/schedulers/utils.go b/pkg/schedule/schedulers/utils.go index afeb28044c7..dabef045279 100644 --- a/pkg/schedule/schedulers/utils.go +++ b/pkg/schedule/schedulers/utils.go @@ -237,15 +237,16 @@ func getKeyRanges(args []string) ([]core.KeyRange, error) { type pendingInfluence struct { op *operator.Operator - from, to uint64 + froms []uint64 + to uint64 origin statistics.Influence maxZombieDuration time.Duration } -func newPendingInfluence(op *operator.Operator, from, to uint64, infl statistics.Influence, maxZombieDur time.Duration) *pendingInfluence { +func newPendingInfluence(op *operator.Operator, froms []uint64, to uint64, infl statistics.Influence, maxZombieDur time.Duration) *pendingInfluence { return &pendingInfluence{ op: op, - from: from, + froms: froms, to: to, origin: infl, maxZombieDuration: maxZombieDur, diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index e6f2cf9e2c2..35e2ba9fa9a 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2440,7 +2440,7 @@ func prepare(setCfg func(*config.ScheduleConfig), setTc func(*testCluster), run } return tc, co, func() { co.Stop() - co.GetSchedulersController().GetWaitGroup().Wait() + co.GetSchedulersController().Wait() co.GetWaitGroup().Wait() hbStreams.Close() cancel() @@ -2723,7 +2723,7 @@ func TestCheckCache(t *testing.T) { re.Len(oc.GetOperators(), 1) re.Empty(co.GetCheckerController().GetWaitingRegions()) - co.GetSchedulersController().GetWaitGroup().Wait() + co.GetSchedulersController().Wait() co.GetWaitGroup().Wait() re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/break-patrol")) } @@ -2976,7 +2976,7 @@ func TestPersistScheduler(t *testing.T) { re.Len(sc.GetSchedulerNames(), defaultCount-3) re.NoError(co.GetCluster().GetPersistOptions().Persist(storage)) co.Stop() - co.GetSchedulersController().GetWaitGroup().Wait() + co.GetSchedulersController().Wait() co.GetWaitGroup().Wait() // make a new coordinator for testing // whether the schedulers added or removed in dynamic way are recorded in opt @@ -3006,7 +3006,7 @@ func TestPersistScheduler(t *testing.T) { sc = co.GetSchedulersController() re.Len(sc.GetSchedulerNames(), 3) co.Stop() - co.GetSchedulersController().GetWaitGroup().Wait() + co.GetSchedulersController().Wait() co.GetWaitGroup().Wait() // suppose restart PD again _, newOpt, err = newTestScheduleConfig() @@ -3034,7 +3034,7 @@ func TestPersistScheduler(t *testing.T) { re.Len(sc.GetSchedulerNames(), 4) re.NoError(co.GetCluster().GetPersistOptions().Persist(co.GetCluster().GetStorage())) co.Stop() - co.GetSchedulersController().GetWaitGroup().Wait() + co.GetSchedulersController().Wait() co.GetWaitGroup().Wait() _, newOpt, err = newTestScheduleConfig() re.NoError(err) @@ -3091,7 +3091,7 @@ func TestRemoveScheduler(t *testing.T) { re.Empty(sc.GetSchedulerNames()) re.NoError(co.GetCluster().GetPersistOptions().Persist(co.GetCluster().GetStorage())) co.Stop() - co.GetSchedulersController().GetWaitGroup().Wait() + co.GetSchedulersController().Wait() co.GetWaitGroup().Wait() // suppose restart PD again @@ -3105,7 +3105,7 @@ func TestRemoveScheduler(t *testing.T) { // the option remains default scheduler re.Len(co.GetCluster().GetPersistOptions().GetSchedulers(), defaultCount) co.Stop() - co.GetSchedulersController().GetWaitGroup().Wait() + co.GetSchedulersController().Wait() co.GetWaitGroup().Wait() } @@ -3137,7 +3137,7 @@ func TestRestart(t *testing.T) { re.NoError(dispatchHeartbeat(co, region, stream)) region = waitPromoteLearner(re, stream, region, 2) co.Stop() - co.GetSchedulersController().GetWaitGroup().Wait() + co.GetSchedulersController().Wait() co.GetWaitGroup().Wait() // Recreate coordinator then add another replica on store 3. diff --git a/server/config/config.go b/server/config/config.go index dc16f941059..31777cefa21 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -865,6 +865,10 @@ func (c *ScheduleConfig) adjust(meta *configutil.ConfigMetaData, reloading bool) configutil.AdjustUint64(&c.HotRegionsReservedDays, defaultHotRegionsReservedDays) } + if !meta.IsDefined("max-movable-hot-peer-size") { + configutil.AdjustInt64(&c.MaxMovableHotPeerSize, defaultMaxMovableHotPeerSize) + } + if !meta.IsDefined("slow-store-evicting-affected-store-ratio-threshold") { configutil.AdjustFloat64(&c.SlowStoreEvictingAffectedStoreRatioThreshold, defaultSlowStoreEvictingAffectedStoreRatioThreshold) } diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 37cd8a8fcc8..81484e12607 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -624,11 +624,7 @@ func (o *PersistOptions) IsLocationReplacementEnabled() bool { // GetMaxMovableHotPeerSize returns the max movable hot peer size. func (o *PersistOptions) GetMaxMovableHotPeerSize() int64 { - size := o.GetScheduleConfig().MaxMovableHotPeerSize - if size <= 0 { - size = defaultMaxMovableHotPeerSize - } - return size + return o.GetScheduleConfig().MaxMovableHotPeerSize } // IsDebugMetricsEnabled returns if debug metrics is enabled. @@ -669,6 +665,17 @@ func (o *PersistOptions) GetSchedulers() SchedulerConfigs { return o.GetScheduleConfig().Schedulers } +// IsSchedulerDisabled returns if the scheduler is disabled. +func (o *PersistOptions) IsSchedulerDisabled(t string) bool { + schedulers := o.GetScheduleConfig().Schedulers + for _, s := range schedulers { + if t == s.Type { + return s.Disable + } + } + return false +} + // GetHotRegionsWriteInterval gets interval for PD to store Hot Region information. func (o *PersistOptions) GetHotRegionsWriteInterval() time.Duration { return o.GetScheduleConfig().HotRegionsWriteInterval.Duration @@ -701,6 +708,23 @@ func (o *PersistOptions) AddSchedulerCfg(tp string, args []string) { o.SetScheduleConfig(v) } +// RemoveSchedulerCfg removes the scheduler configurations. +func (o *PersistOptions) RemoveSchedulerCfg(tp string) { + v := o.GetScheduleConfig().Clone() + for i, schedulerCfg := range v.Schedulers { + if tp == schedulerCfg.Type { + if IsDefaultScheduler(tp) { + schedulerCfg.Disable = true + v.Schedulers[i] = schedulerCfg + } else { + v.Schedulers = append(v.Schedulers[:i], v.Schedulers[i+1:]...) + } + o.SetScheduleConfig(v) + return + } + } +} + // SetLabelProperty sets the label property. func (o *PersistOptions) SetLabelProperty(typ, labelKey, labelValue string) { cfg := o.GetLabelPropertyConfig().Clone() diff --git a/tests/cluster.go b/tests/cluster.go index bfdf7acf80d..ffdc26de8c1 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -628,9 +628,11 @@ func (c *TestCluster) WaitLeader(ops ...WaitOption) string { counter := make(map[string]int) running := 0 for _, s := range c.servers { + s.RLock() if s.state == Running { running++ } + s.RUnlock() n := s.GetLeader().GetName() if n != "" { counter[n]++ diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 651eb11dbb2..5be5cf02e77 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -339,6 +339,7 @@ func TestScheduler(t *testing.T) { "strict-picking-store": "true", "enable-for-tiflash": "true", "rank-formula-version": "v2", + "split-thresholds": 0.2, } var conf map[string]interface{} mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "list"}, &conf)