diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 05cbba30edc0..c353621bb7fe 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -43,9 +43,34 @@ import ( "go.uber.org/zap" ) +const ( + // HotRegionName is balance hot region scheduler name. + HotRegionName = "balance-hot-region-scheduler" + // HotRegionType is balance hot region scheduler type. + HotRegionType = "hot-region" + splitHotReadBuckets = "split-hot-read-region" + splitHotWriteBuckets = "split-hot-write-region" + splitProgressiveRank = int64(-5) + minHotScheduleInterval = time.Second + maxHotScheduleInterval = 20 * time.Second +) + var ( - topnPosition = 10 + // schedulePeerPr the probability of schedule the hot peer. + schedulePeerPr = 0.66 + // pendingAmpFactor will amplify the impact of pending influence, making scheduling slower or even serial when two stores are close together + pendingAmpFactor = 2.0 + // If the distribution of a dimension is below the corresponding stddev threshold, then scheduling will no longer be based on this dimension, + // as it implies that this dimension is sufficiently uniform. + stddevThreshold = 0.1 + // topnPosition is the position of the topn peer in the hot peer list. + // We use it to judge whether to schedule the hot peer in some cases. + topnPosition = 10 + // statisticsInterval is the interval to update statistics information. statisticsInterval = time.Second +) + +var ( // WithLabelValues is a heavy operation, define variable to avoid call it every time. hotSchedulerCounter = schedulerCounter.WithLabelValues(HotRegionName, "schedule") hotSchedulerSkipCounter = schedulerCounter.WithLabelValues(HotRegionName, "skip") @@ -85,17 +110,14 @@ var ( type baseHotScheduler struct { *BaseScheduler - // store information, including pending Influence by resource type + // stLoadInfos contain store statistics information by resource type. + // stLoadInfos is temporary states but exported to API or metrics. // Every time `Schedule()` will recalculate it. - stInfos map[uint64]*statistics.StoreSummaryInfo - // temporary states but exported to API or metrics - // Every time `Schedule()` will recalculate it. - stLoadInfos [resourceTypeLen]map[uint64]*statistics.StoreLoadDetail + stLoadInfos [resourceTypeLen]map[uint64]*statistics.StoreLoadDetail + // stHistoryLoads stores the history `stLoadInfos` + // Every time `Schedule()` will rolling update it. stHistoryLoads *statistics.StoreHistoryLoads - // temporary states - // Every time `Schedule()` will recalculate it. - storesLoads map[uint64][]float64 - // regionPendings stores regionID -> pendingInfluence + // regionPendings stores regionID -> pendingInfluence, // this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't // be selected if its owner region is tracked in this attribute. regionPendings map[uint64]*pendingInfluence @@ -123,16 +145,16 @@ func newBaseHotScheduler(opController *operator.Controller) *baseHotScheduler { // prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for // each store, only update read or write load detail func (h *baseHotScheduler) prepareForBalance(rw utils.RWType, cluster sche.SchedulerCluster) { - h.stInfos = statistics.SummaryStoreInfos(cluster.GetStores()) - h.summaryPendingInfluence() - h.storesLoads = cluster.GetStoresLoads() + storeInfos := statistics.SummaryStoreInfos(cluster.GetStores()) + h.summaryPendingInfluence(storeInfos) + storesLoads := cluster.GetStoresLoads() isTraceRegionFlow := cluster.GetSchedulerConfig().IsTraceRegionFlow() prepare := func(regionStats map[uint64][]*statistics.HotPeerStat, resource constant.ResourceKind) { ty := buildResourceType(rw, resource) h.stLoadInfos[ty] = statistics.SummaryStoresLoad( - h.stInfos, - h.storesLoads, + storeInfos, + storesLoads, h.stHistoryLoads, regionStats, isTraceRegionFlow, @@ -161,11 +183,11 @@ func (h *baseHotScheduler) prepareForBalance(rw utils.RWType, cluster sche.Sched // summaryPendingInfluence calculate the summary of pending Influence for each store // and clean the region from regionInfluence if they have ended operator. // It makes each dim rate or count become `weight` times to the origin value. -func (h *baseHotScheduler) summaryPendingInfluence() { +func (h *baseHotScheduler) summaryPendingInfluence(storeInfos map[uint64]*statistics.StoreSummaryInfo) { for id, p := range h.regionPendings { for _, from := range p.froms { - from := h.stInfos[from] - to := h.stInfos[p.to] + from := storeInfos[from] + to := storeInfos[p.to] maxZombieDur := p.maxZombieDuration weight, needGC := calcPendingInfluence(p.op, maxZombieDur) @@ -182,9 +204,9 @@ func (h *baseHotScheduler) summaryPendingInfluence() { } } } - for storeID, info := range h.stInfos { + for storeID, info := range storeInfos { storeLabel := strconv.FormatUint(storeID, 10) - if infl := info.PendingSum; infl != nil { + if infl := info.PendingSum; infl != nil && len(infl.Loads) != 0 { utils.ForeachRegionStats(func(rwTy utils.RWType, dim int, kind utils.RegionStatKind) { setHotPendingInfluenceMetrics(storeLabel, rwTy.String(), utils.DimToString(dim), infl.Loads[kind]) }) @@ -201,30 +223,6 @@ func (h *baseHotScheduler) randomRWType() utils.RWType { return h.types[h.r.Int()%len(h.types)] } -const ( - // HotRegionName is balance hot region scheduler name. - HotRegionName = "balance-hot-region-scheduler" - // HotRegionType is balance hot region scheduler type. - HotRegionType = "hot-region" - - minHotScheduleInterval = time.Second - maxHotScheduleInterval = 20 * time.Second -) - -var ( - // schedulePeerPr the probability of schedule the hot peer. - schedulePeerPr = 0.66 - // pendingAmpFactor will amplify the impact of pending influence, making scheduling slower or even serial when two stores are close together - pendingAmpFactor = 2.0 - // If the distribution of a dimension is below the corresponding stddev threshold, then scheduling will no longer be based on this dimension, - // as it implies that this dimension is sufficiently uniform. - stddevThreshold = 0.1 - - splitHotReadBuckets = "split-hot-read-region" - splitHotWriteBuckets = "split-hot-write-region" - splitProgressiveRank = int64(-5) -) - type hotScheduler struct { name string *baseHotScheduler @@ -408,7 +406,8 @@ type solution struct { secondScore int } -// getExtremeLoad returns the min load of the src store and the max load of the dst store. +// getExtremeLoad returns the closest load in the selected src and dst statistics. +// in other word, the min load of the src store and the max load of the dst store. // If peersRate is negative, the direction is reversed. func (s *solution) getExtremeLoad(dim int) (src float64, dst float64) { if s.getPeersRateFromCache(dim) >= 0 { @@ -1352,10 +1351,16 @@ func (bs *balanceSolver) getRkCmpPrioritiesV1(old *solution) (firstCmp int, seco return } -// smaller is better +// compareSrcStore compares the source store of detail1, detail2, the result is: +// 1. if detail1 is better than detail2, return -1 +// 2. if detail1 is worse than detail2, return 1 +// 3. if detail1 is equal to detail2, return 0 +// The comparison is based on the following principles: +// 1. select the min load of store in current and future, because we want to select the store as source store; +// 2. compare detail1 and detail2 by first priority and second priority, we pick the larger one to speed up the convergence; +// 3. if the first priority and second priority are equal, we pick the store with the smaller difference between current and future to minimize oscillations. func (bs *balanceSolver) compareSrcStore(detail1, detail2 *statistics.StoreLoadDetail) int { if detail1 != detail2 { - // compare source store var lpCmp storeLPCmp if bs.resourceTy == writeLeader { lpCmp = sliceLPCmp( @@ -1385,7 +1390,14 @@ func (bs *balanceSolver) compareSrcStore(detail1, detail2 *statistics.StoreLoadD return 0 } -// smaller is better +// compareDstStore compares the destination store of detail1, detail2, the result is: +// 1. if detail1 is better than detail2, return -1 +// 2. if detail1 is worse than detail2, return 1 +// 3. if detail1 is equal to detail2, return 0 +// The comparison is based on the following principles: +// 1. select the max load of store in current and future, because we want to select the store as destination store; +// 2. compare detail1 and detail2 by first priority and second priority, we pick the smaller one to speed up the convergence; +// 3. if the first priority and second priority are equal, we pick the store with the smaller difference between current and future to minimize oscillations. func (bs *balanceSolver) compareDstStore(detail1, detail2 *statistics.StoreLoadDetail) int { if detail1 != detail2 { // compare destination store @@ -1417,6 +1429,9 @@ func (bs *balanceSolver) compareDstStore(detail1, detail2 *statistics.StoreLoadD return 0 } +// stepRank returns a function can calculate the discretized data, +// where `rate` will be discretized by `step`. +// `rate` is the speed of the dim, `step` is the step size of the discretized data. func stepRank(rk0 float64, step float64) func(float64) int64 { return func(rate float64) int64 { return int64((rate - rk0) / step) diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index ee569f4b70e8..b29898cd9b91 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -167,7 +167,8 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) { } } - hb.summaryPendingInfluence() // Calling this function will GC. + storeInfos := statistics.SummaryStoreInfos(tc.GetStores()) + hb.summaryPendingInfluence(storeInfos) // Calling this function will GC. for i := range opInfluenceCreators { for j, typ := range typs { @@ -2047,16 +2048,16 @@ func TestInfluenceByRWType(t *testing.T) { op := ops[0] re.NotNil(op) - hb.(*hotScheduler).summaryPendingInfluence() - stInfos := hb.(*hotScheduler).stInfos - re.True(nearlyAbout(stInfos[1].PendingSum.Loads[utils.RegionWriteKeys], -0.5*units.MiB)) - re.True(nearlyAbout(stInfos[1].PendingSum.Loads[utils.RegionWriteBytes], -0.5*units.MiB)) - re.True(nearlyAbout(stInfos[4].PendingSum.Loads[utils.RegionWriteKeys], 0.5*units.MiB)) - re.True(nearlyAbout(stInfos[4].PendingSum.Loads[utils.RegionWriteBytes], 0.5*units.MiB)) - re.True(nearlyAbout(stInfos[1].PendingSum.Loads[utils.RegionReadKeys], -0.5*units.MiB)) - re.True(nearlyAbout(stInfos[1].PendingSum.Loads[utils.RegionReadBytes], -0.5*units.MiB)) - re.True(nearlyAbout(stInfos[4].PendingSum.Loads[utils.RegionReadKeys], 0.5*units.MiB)) - re.True(nearlyAbout(stInfos[4].PendingSum.Loads[utils.RegionReadBytes], 0.5*units.MiB)) + storeInfos := statistics.SummaryStoreInfos(tc.GetStores()) + hb.(*hotScheduler).summaryPendingInfluence(storeInfos) + re.True(nearlyAbout(storeInfos[1].PendingSum.Loads[utils.RegionWriteKeys], -0.5*units.MiB)) + re.True(nearlyAbout(storeInfos[1].PendingSum.Loads[utils.RegionWriteBytes], -0.5*units.MiB)) + re.True(nearlyAbout(storeInfos[4].PendingSum.Loads[utils.RegionWriteKeys], 0.5*units.MiB)) + re.True(nearlyAbout(storeInfos[4].PendingSum.Loads[utils.RegionWriteBytes], 0.5*units.MiB)) + re.True(nearlyAbout(storeInfos[1].PendingSum.Loads[utils.RegionReadKeys], -0.5*units.MiB)) + re.True(nearlyAbout(storeInfos[1].PendingSum.Loads[utils.RegionReadBytes], -0.5*units.MiB)) + re.True(nearlyAbout(storeInfos[4].PendingSum.Loads[utils.RegionReadKeys], 0.5*units.MiB)) + re.True(nearlyAbout(storeInfos[4].PendingSum.Loads[utils.RegionReadBytes], 0.5*units.MiB)) // consider pending amp, there are nine regions or more. for i := 2; i < 13; i++ { @@ -2072,17 +2073,17 @@ func TestInfluenceByRWType(t *testing.T) { op = ops[0] re.NotNil(op) - hb.(*hotScheduler).summaryPendingInfluence() - stInfos = hb.(*hotScheduler).stInfos + storeInfos = statistics.SummaryStoreInfos(tc.GetStores()) + hb.(*hotScheduler).summaryPendingInfluence(storeInfos) // assert read/write influence is the sum of write peer and write leader - re.True(nearlyAbout(stInfos[1].PendingSum.Loads[utils.RegionWriteKeys], -1.2*units.MiB)) - re.True(nearlyAbout(stInfos[1].PendingSum.Loads[utils.RegionWriteBytes], -1.2*units.MiB)) - re.True(nearlyAbout(stInfos[3].PendingSum.Loads[utils.RegionWriteKeys], 0.7*units.MiB)) - re.True(nearlyAbout(stInfos[3].PendingSum.Loads[utils.RegionWriteBytes], 0.7*units.MiB)) - re.True(nearlyAbout(stInfos[1].PendingSum.Loads[utils.RegionReadKeys], -1.2*units.MiB)) - re.True(nearlyAbout(stInfos[1].PendingSum.Loads[utils.RegionReadBytes], -1.2*units.MiB)) - re.True(nearlyAbout(stInfos[3].PendingSum.Loads[utils.RegionReadKeys], 0.7*units.MiB)) - re.True(nearlyAbout(stInfos[3].PendingSum.Loads[utils.RegionReadBytes], 0.7*units.MiB)) + re.True(nearlyAbout(storeInfos[1].PendingSum.Loads[utils.RegionWriteKeys], -1.2*units.MiB)) + re.True(nearlyAbout(storeInfos[1].PendingSum.Loads[utils.RegionWriteBytes], -1.2*units.MiB)) + re.True(nearlyAbout(storeInfos[3].PendingSum.Loads[utils.RegionWriteKeys], 0.7*units.MiB)) + re.True(nearlyAbout(storeInfos[3].PendingSum.Loads[utils.RegionWriteBytes], 0.7*units.MiB)) + re.True(nearlyAbout(storeInfos[1].PendingSum.Loads[utils.RegionReadKeys], -1.2*units.MiB)) + re.True(nearlyAbout(storeInfos[1].PendingSum.Loads[utils.RegionReadBytes], -1.2*units.MiB)) + re.True(nearlyAbout(storeInfos[3].PendingSum.Loads[utils.RegionReadKeys], 0.7*units.MiB)) + re.True(nearlyAbout(storeInfos[3].PendingSum.Loads[utils.RegionReadBytes], 0.7*units.MiB)) } func nearlyAbout(f1, f2 float64) bool { diff --git a/pkg/schedule/schedulers/hot_region_v2.go b/pkg/schedule/schedulers/hot_region_v2.go index c0c4a1c1b9a4..04ba0fc978f9 100644 --- a/pkg/schedule/schedulers/hot_region_v2.go +++ b/pkg/schedule/schedulers/hot_region_v2.go @@ -25,10 +25,10 @@ import ( const ( firstPriorityPerceivedRatio = 0.2 // PeerRate needs to be 20% above what needs to be balanced. - firstPriorityMinHotRatio = 0.02 // PeerRate needs to be greater than 1% lowRate + firstPriorityMinHotRatio = 0.02 // PeerRate needs to be greater than 2% lowRate - secondPriorityPerceivedRatio = 0.3 // PeerRate needs to be 20% above what needs to be balanced. - secondPriorityMinHotRatio = 0.03 // PeerRate needs to be greater than 1.5% lowRate + secondPriorityPerceivedRatio = 0.3 // PeerRate needs to be 30% above what needs to be balanced. + secondPriorityMinHotRatio = 0.03 // PeerRate needs to be greater than 3% lowRate ) // isAvailable returns the solution is available. @@ -39,14 +39,40 @@ func isAvailableV2(s *solution) bool { return s.progressiveRank <= -3 || (s.progressiveRank < 0 && s.revertRegion == nil) } +type balanceChecker struct { + // We use the following example to illustrate the calculation process. + // Suppose preBalancedRatio is 0.9, balancedRatio is 0.95. + // If 0.95<=low/high, the two stores are considered balanced after the operator is completed. + // If low/high<0.9, the two stores are considered unbalanced after the operator is completed. + // If 0.9<=low/high<0.95, the two stores are considered pre-balanced after the operator is completed. + preBalancedRatio float64 + balancedRatio float64 +} + +// rankV2Ratios is used to calculate the balanced state. +// every rankV2Ratios only effect one dim. + +// There are three states of balance: balanced, pre-balanced, and unbalanced. +// It is determined by the ratio of the high and low values of the two stores. +// If the ratio is greater than the balancedRatio(0.95), it is considered to be in the balanced state. +// If the ratio is less than the preBalancedRatio(0.9), it is considered to be in the unbalanced state. +// If the ratio is between the two, it is considered to be in the pre-balanced state. + // TODO: Unified with stddevThreshold. type rankV2Ratios struct { - preBalancedRatio float64 - balancedRatio float64 - preBalancedCheckRatio float64 - balancedCheckRatio float64 - perceivedRatio float64 - minHotRatio float64 + // futureChecker is used to calculate the balanced state after the operator is completed. + // It is stricter than the currentChecker. + futureChecker *balanceChecker + // currentChecker is used to calculate the balanced state in the currentChecker state, which means that the operator is not triggered. + currentChecker *balanceChecker + + // perceivedRatio avoid to not worse in a state with a large region. + // For example, if the region is 20MB, the high store is 100MB, the low store is 80MB, the low/high is 0.8. + // If scheduling to the low store, the high store will be 80MB, the low store will be 100MB, the low/high still be 0.8, it is not worse. + // we need to avoid scheduling to the low store, so introduce perceivedRatio. + perceivedRatio float64 + // minHotRatio is the minimum ratio for the hot region to be scheduled. + minHotRatio float64 } func newRankV2Ratios(balancedRatio, perceivedRatio, minHotRatio float64) *rankV2Ratios { @@ -58,19 +84,29 @@ func newRankV2Ratios(balancedRatio, perceivedRatio, minHotRatio float64) *rankV2 balancedRatio = 0.95 } - rs := &rankV2Ratios{balancedRatio: balancedRatio, perceivedRatio: perceivedRatio, minHotRatio: minHotRatio} - // preBalancedRatio = 1.0 - 2*(1.0-balancedRatio) - // The maximum value with `balancedRatio-0.1` is to prevent the preBalance range becoming too large. - rs.preBalancedRatio = math.Max(2.0*balancedRatio-1.0, balancedRatio-0.1) - rs.balancedCheckRatio = balancedRatio - 0.02 - rs.preBalancedCheckRatio = rs.preBalancedRatio - 0.03 + futureStateChecker := &balanceChecker{ + balancedRatio: balancedRatio, + // preBalancedRatio = 1.0 - 2*(1.0-balancedRatio) + // The maximum value with `balancedRatio-0.1` is to prevent the preBalance range becoming too large. + preBalancedRatio: math.Max(2.0*balancedRatio-1.0, balancedRatio-0.1), + } + currentStateChecker := &balanceChecker{ + balancedRatio: balancedRatio - 0.02, + preBalancedRatio: futureStateChecker.preBalancedRatio - 0.03, + } + + rs := &rankV2Ratios{ + futureChecker: futureStateChecker, + currentChecker: currentStateChecker, + perceivedRatio: perceivedRatio, minHotRatio: minHotRatio} + return rs } func (bs *balanceSolver) initRankV2() { bs.firstPriorityV2Ratios = newRankV2Ratios(bs.sche.conf.GetGreatDecRatio(), firstPriorityPerceivedRatio, firstPriorityMinHotRatio) // The second priority is less demanding. Set the preBalancedRatio of the first priority to the balancedRatio of the second dimension. - bs.secondPriorityV2Ratios = newRankV2Ratios(bs.firstPriorityV2Ratios.preBalancedRatio, secondPriorityPerceivedRatio, secondPriorityMinHotRatio) + bs.secondPriorityV2Ratios = newRankV2Ratios(bs.firstPriorityV2Ratios.futureChecker.preBalancedRatio, secondPriorityPerceivedRatio, secondPriorityMinHotRatio) bs.isAvailable = isAvailableV2 bs.filterUniformStore = bs.filterUniformStoreV2 @@ -82,6 +118,9 @@ func (bs *balanceSolver) initRankV2() { bs.pickCheckPolicyV2() } +// pickCheckPolicyV2 will set checkByPriorityAndTolerance to the corresponding function. +// Note: PolicyV2 will search more possible solutions than PolicyV1. +// so it allows to schedule when any of the two dimensions is not balanced. func (bs *balanceSolver) pickCheckPolicyV2() { switch { case bs.resourceTy == writeLeader: @@ -93,6 +132,8 @@ func (bs *balanceSolver) pickCheckPolicyV2() { } } +// filterUniformStoreV2 filters stores by stddev. +// stddev is the standard deviation of the store's load for all stores. func (bs *balanceSolver) filterUniformStoreV2() (string, bool) { if !bs.enableExpectation() { return "", false @@ -141,7 +182,7 @@ func (bs *balanceSolver) setSearchRevertRegionsV2() { } } -// calcProgressiveRank calculates `bs.cur.progressiveRank`. +// calcProgressiveRankV2 calculates `bs.cur.progressiveRank`. // See the comments of `solution.progressiveRank` for more about progressive rank. // isBetter: score > 0 // isNotWorsened: score == 0 @@ -192,15 +233,32 @@ func (bs *balanceSolver) calcProgressiveRankV2() { } func (bs *balanceSolver) getScoreByPriorities(dim int, rs *rankV2Ratios) int { - // minNotWorsenedRate, minBetterRate, minBalancedRate, maxBalancedRate, maxBetterRate, maxNotWorsenedRate can be determined from src and dst. - // * peersRate < minNotWorsenedRate ====> score == -2 - // * minNotWorsenedRate <= peersRate < minBetterRate ====> score == 0 - // * minBetterRate <= peersRate < minBalancedRate ====> score == 2 - // * minBalancedRate <= peersRate <= maxBalancedRate ====> score == 3 - // * maxBalancedRate < peersRate <= maxBetterRate ====> score == 1 - // * maxBetterRate < peersRate <= maxNotWorsenedRate ====> score == -1 - // * peersRate > maxNotWorsenedRate ====> score == -2 - // The higher the score, the better. + // For unbalanced state, + // roughly speaking, as long as the diff is reduced, it is either better or not worse. + // To distinguish the small regions, the one where the diff is reduced too little is defined as not worse, + // and the one where the diff is reversed is regarded as worse. + // For pre-balanced state, + // it is better only if it reach the balanced state, + // and it is not worse if it still in the pre-balanced state. + // and it is worse if it becomes the unbalanced state. + // For balanced state, + // there is no better state to move to, + // it is not worse if it still in the balanced state. + // and it is worse if it becomes the pre-balanced state or unbalanced state. + + // minNotWorsenedRate, minBetterRate, minBalancedRate, maxBalancedRate, maxBetterRate, maxNotWorsenedRate + // can be determined from src, dst and peer. The higher the score, the better. + // The closer to the center the higher the score, the higher the score for symmetrical zones without revert than with revert. + // so d is higher than c and e, c is higher than e, only when state is better, the score is positive. + // so c and e are higher than b and f, b are higher than f, only when state is not worsened and not revert, the score is zero. + // so b and f are higher than a and g, the worse tate have the same score. + // * a: peersRate < minNotWorsenedRate ====> score == -2 + // * b: minNotWorsenedRate <= peersRate < minBetterRate ====> score == 0 + // * c: minBetterRate <= peersRate < minBalancedRate ====> score == 2 + // * d: minBalancedRate <= peersRate <= maxBalancedRate ====> score == 3 + // * e: maxBalancedRate < peersRate <= maxBetterRate ====> score == 1 + // * f: maxBetterRate < peersRate <= maxNotWorsenedRate ====> score == -1 + // * g: peersRate > maxNotWorsenedRate ====> score == -2 srcRate, dstRate := bs.cur.getExtremeLoad(dim) srcPendingRate, dstPendingRate := bs.cur.getPendingLoad(dim) @@ -219,19 +277,32 @@ func (bs *balanceSolver) getScoreByPriorities(dim int, rs *rankV2Ratios) int { topnRate = topnHotPeer.GetLoad(dim) } - if highRate*rs.balancedCheckRatio <= lowRate { - // At this time, it is considered to be in the balanced state, and score > 1 will not be judged. - // If the balanced state is not broken, score == 0. + if highRate*rs.currentChecker.balancedRatio <= lowRate { + // At this time, it is considered to be in the balanced state. + // Because rs.currentChecker.balancedRatio <= lowRate/highRate. + + // We use the following example to illustrate the calculation process. + // Suppose futureChecker.balancedRatio is 0.95, and currentChecker.balancedRatio is 0.93. + // Suppose the low and high are 94 and 101. + // So their ratio is 94/101≈0.9306, 0.93<0.9306, it is considered to be in the balanced state. + // If we hope the future stores are not worse than the current stores + // we need to ensure that the ratio of the future stores is 0.95. + // So the future stores need to be 94+1, 101-1, that is 95/100=0.95. + // Or the future stores need to be 94+6, 101-6, that is 95/100=0.95. + // So not-worse peer range is [1,6] + + // Because it has been balanced state, there is no better state to move to, so there's no 1 or 2 or 3 score. + // And there is no balanced state to move to. + // If the balanced state is not broken, but the loads are closer, score == 0, that is, peer range is [1,6]. // If the balanced state is broken, score = -2. - // minNotWorsenedRate == minBetterRate == minBalancedRate <= maxBalancedRate == maxBetterRate == maxNotWorsenedRate - // highRate - (highRate+lowRate)/(1.0+balancedRatio) - minNotWorsenedRate := (highRate*rs.balancedRatio - lowRate) / (1.0 + rs.balancedRatio) - // highRate - (highRate+lowRate)/(1.0+balancedRatio)*balancedRatio - maxNotWorsenedRate := (highRate - lowRate*rs.balancedRatio) / (1.0 + rs.balancedRatio) + // (lowRate+minNotWorsenedRate) / (highRate-minNotWorsenedRate) = futureChecker.balancedRatio + minNotWorsenedRate := (highRate*rs.futureChecker.balancedRatio - lowRate) / (1.0 + rs.futureChecker.balancedRatio) + // (highRate-maxNotWorsenedRate) / (lowRate+maxNotWorsenedRate) = futureChecker.balancedRatio + maxNotWorsenedRate := (highRate - lowRate*rs.futureChecker.balancedRatio) / (1.0 + rs.futureChecker.balancedRatio) - if minNotWorsenedRate > 0 { - minNotWorsenedRate = 0 + if minNotWorsenedRate > -bs.getMinRate(dim) { // use min rate as 0 value + minNotWorsenedRate = -bs.getMinRate(dim) } if peersRate >= minNotWorsenedRate && peersRate <= maxNotWorsenedRate { @@ -240,44 +311,104 @@ func (bs *balanceSolver) getScoreByPriorities(dim int, rs *rankV2Ratios) int { return -2 } - var minNotWorsenedRate, minBetterRate, maxBetterRate, maxNotWorsenedRate float64 - minBalancedRate := (highRate*rs.balancedRatio - lowRate) / (1.0 + rs.balancedRatio) - maxBalancedRate := (highRate - lowRate*rs.balancedRatio) / (1.0 + rs.balancedRatio) - pendingRateLimit := false + // When it is not in the balanced state, it is considered to be in the unbalanced state or pre-balanced state. + // We use the following example to illustrate the calculation process. + // Suppose futureChecker.balancedRatio is 0.95 + // Suppose the low and high are 75 and 120. + // If we hope it can reach the balanced state, we need to ensure that the ratio of the future stores is greater than 0.95. + // So the future stores need to be 75+20, 120-20, that is 95/100=0.95. + // Or the future stores need to be 75+25, 120-25, that is 95/100=0.95. + // So balanced peer range is [20,25] - if highRate*rs.preBalancedCheckRatio <= lowRate { + // (lowRate+minBalancedRate) / (highRate-minBalancedRate) = futureChecker.balancedRatio + minBalancedRate := (highRate*rs.futureChecker.balancedRatio - lowRate) / (1.0 + rs.futureChecker.balancedRatio) + // (highRate-maxBalancedRate) / (lowRate+maxBalancedRate) = futureChecker.balancedRatio + maxBalancedRate := (highRate - lowRate*rs.futureChecker.balancedRatio) / (1.0 + rs.futureChecker.balancedRatio) + + pendingRateLimit := false + var minNotWorsenedRate, minBetterRate, maxBetterRate, maxNotWorsenedRate float64 + if highRate*rs.currentChecker.preBalancedRatio <= lowRate { // At this time, it is considered to be in pre-balanced state. - // Only the schedules that reach the balanced state will be judged as 2, - // and the schedules that do not destroy the pre-balanced state will be judged as 0. - // minNotWorsenedRate <= minBetterRate <= maxBalancedRate == maxBetterRate <= maxNotWorsenedRate - // To generate a score of -2 at this state, the pendingRate needs to be 0. - minNotWorsenedRate = (highRate*rs.preBalancedRatio - lowRate) / (1.0 + rs.preBalancedRatio) - minBetterRate = (highRate*rs.balancedRatio - lowRate) / (1.0 + rs.balancedRatio) - maxBetterRate = maxBalancedRate - maxNotWorsenedRate = (highRate - lowRate*rs.preBalancedRatio) / (1.0 + rs.preBalancedRatio) - if minNotWorsenedRate > 0 { - minNotWorsenedRate = 0 + // Because rs.currentChecker.preBalancedRatio <= lowRate/highRate < rs.currentChecker.balancedRatio. + + // We use the following example to illustrate the calculation process. + // Suppose futureChecker.balancedRatio is 0.95, and currentChecker.balancedRatio is 0.93. + // Suppose futureChecker.preBalancedRatio is 0.9, and currentChecker.preBalancedRatio is 0.87. + // Suppose the low and high are 93 and 102. + // So their ratio is 93/102≈0.91, 0.87<0.91<0.93, it is considered to be in the pre-balanced state. + // For the pre-balanced state, only the schedules that reach the balanced state is considered to be better. + // So minBetterRate is minBalancedRate, maxBetterRate is maxBalancedRate. + // If we hope the future stores are better than the current stores + // we need to ensure that the ratio of the future stores is greater than 0.95. + // So the future stores need to be 93+2, 102-2, that is 95/100=0.95. + // Or the future stores need to be 93+7, 102-7, that is 95/100=0.95. + // So better range is [2,7] + // If we hope the future stores are not worse than the current stores, + // we need to ensure that the ratio of the future stores is 0.9. + // So the future stores need to be 93+(-1), 102-(-1), that is 92/103≈0.9. + // Or the future stores need to be 93+10, 102-10, that is 92/103≈0.9. + // So not-worse peer range is [-1,2) and (7,10], + // [-1,2) means there is no revert region, (7,10] means there is revert region. + // And we need to avoid scheduling with negative operators. + // not-worse peer range is [max(0,-1),2), which is [0,2). + + minBetterRate, maxBetterRate = minBalancedRate, maxBalancedRate + // (lowRate+minNotWorsenedRate) / (highRate-minNotWorsenedRate) = futureChecker.preBalancedRatio + minNotWorsenedRate = (highRate*rs.futureChecker.preBalancedRatio - lowRate) / (1.0 + rs.futureChecker.preBalancedRatio) + // (highRate-maxNotWorsenedRate) / (lowRate+maxNotWorsenedRate) = futureChecker.preBalancedRatio + maxNotWorsenedRate = (highRate - lowRate*rs.futureChecker.preBalancedRatio) / (1.0 + rs.futureChecker.preBalancedRatio) + if minNotWorsenedRate > -bs.getMinRate(dim) { // use min rate as 0 value + minNotWorsenedRate = -bs.getMinRate(dim) } // When approaching the balanced state, wait for pending influence to zero before scheduling to reduce jitter. + // From pre-balanced state to balanced state, we don't need other more schedule. pendingRateLimit = true } else { // At this time, it is considered to be in the unbalanced state. - // As long as the balance is significantly improved, it is judged as 1. - // If the balance is not reduced, it is judged as 0. - // If the rate relationship between src and dst is reversed, there will be a certain penalty. - // maxBetterRate may be less than minBetterRate, in which case a positive fraction cannot be produced. - minNotWorsenedRate = -bs.getMinRate(dim) + // Because lowRate/highRate < rs.currentChecker.balancedRatio. + + // We use the following example to illustrate the calculation process. + // Suppose futureChecker.balancedRatio is 0.95, and currentChecker.balancedRatio is 0.93. + // Suppose futureChecker.preBalancedRatio is 0.9, and currentChecker.preBalancedRatio is 0.87. + // Suppose the low and high are 75 and 120. + // So their ratio is 75/120=0.625, 0.625<0.87, it is considered to be in the unbalanced state. + // If we hope the future stores are balanced, + // we need to ensure that the ratio of the future stores is 0.95. + // So the future stores need to be 75+20, 120-20, that is 95/100=0.95. + // Or the future stores need to be 75+25, 120-25, that is 95/100=0.95. + // So balanced peer range is [20,25] + + // For the unbalanced state, as long as the diff is reduced, it is better. + // And we need to ensure that the ratio of the two store are not reversed, + // so the future stores need to be 75+45, 120-45. + // So better peer range is [0,17) and (28,45]. + // If that's all it is, + // min better is too small, and we don't want to give too high a score to a region that's too small. + // To avoid scheduling small regions, we take the minimum value of the three, + // which are according some of minBalancedRate, some of the low store and the top10 peer. + // Suppose perceivedRatio is 0.2, minHotRatio is 0.02, top10 is 5. + // So minBetterRate is min(0.2*20,0.02*75,1)=4 + // So minNotWorsenedRate is 0 + // Similarly, + // we don't want to dispatch a particularly large region to reverse the high store and the low store, which is worse for us. + // From the above, we know maxBetterRate is 25, and max rate which not reverse high store are low store is 45, + // we reduce it by a factor, namely perceivedRatio. + // So maxBetterRate is 25+(45-25-4)*0.2=28.2 + // So maxNotWorsenedRate is 25+(45-25-0)*0.2=29 + minBetterRate = math.Min(minBalancedRate*rs.perceivedRatio, lowRate*rs.minHotRatio) minBetterRate = math.Min(minBetterRate, topnRate) - maxBetterRate = maxBalancedRate + (highRate-lowRate-minBetterRate-maxBalancedRate)*rs.perceivedRatio - maxNotWorsenedRate = maxBalancedRate + (highRate-lowRate-minNotWorsenedRate-maxBalancedRate)*rs.perceivedRatio + maxBetterRate = maxBalancedRate + rs.perceivedRatio*(highRate-lowRate-maxBalancedRate-minBetterRate) + + maxNotWorsenedRate = maxBalancedRate + rs.perceivedRatio*(highRate-lowRate-maxBalancedRate-minNotWorsenedRate) + minNotWorsenedRate = -bs.getMinRate(dim) // use min rate as 0 value } switch { case minBetterRate <= peersRate && peersRate <= maxBetterRate: // Positive score requires some restrictions. if peersRate >= bs.getMinRate(dim) && bs.isTolerance(dim, reverse) && - (!pendingRateLimit || math.Abs(srcPendingRate)+math.Abs(dstPendingRate) < 1) { + (!pendingRateLimit || math.Abs(srcPendingRate)+math.Abs(dstPendingRate) < 1 /*byte*/) { // avoid with pending influence when approaching the balanced state switch { case peersRate < minBalancedRate: return 2 diff --git a/pkg/schedule/schedulers/hot_region_v2_test.go b/pkg/schedule/schedulers/hot_region_v2_test.go index 24ef78ba3d97..d11ac44dde9d 100644 --- a/pkg/schedule/schedulers/hot_region_v2_test.go +++ b/pkg/schedule/schedulers/hot_region_v2_test.go @@ -35,7 +35,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { defer cancel() statistics.Denoising = false statisticsInterval = 0 - + statistics.HistorySampleDuration = 0 sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) @@ -149,6 +149,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirstOnly(t *testing.T) { re := require.New(t) statistics.Denoising = false statisticsInterval = 0 + statistics.HistorySampleDuration = 0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -212,6 +213,7 @@ func TestHotReadRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { re := require.New(t) statistics.Denoising = false statisticsInterval = 0 + statistics.HistorySampleDuration = 0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -274,6 +276,7 @@ func TestSkipUniformStore(t *testing.T) { re := require.New(t) statistics.Denoising = false statisticsInterval = 0 + statistics.HistorySampleDuration = 0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() diff --git a/pkg/schedule/schedulers/utils.go b/pkg/schedule/schedulers/utils.go index 998b03075309..c7cdf9191ca3 100644 --- a/pkg/schedule/schedulers/utils.go +++ b/pkg/schedule/schedulers/utils.go @@ -251,6 +251,7 @@ func newPendingInfluence(op *operator.Operator, froms []uint64, to uint64, infl } } +// stLdRate returns a function to get the load rate of the store with the specified dimension. func stLdRate(dim int) func(ld *statistics.StoreLoad) float64 { return func(ld *statistics.StoreLoad) float64 { return ld.Loads[dim] @@ -263,12 +264,16 @@ func stLdCount(ld *statistics.StoreLoad) float64 { type storeLoadCmp func(ld1, ld2 *statistics.StoreLoad) int +// negLoadCmp returns a cmp that returns the negation of cmps. func negLoadCmp(cmp storeLoadCmp) storeLoadCmp { return func(ld1, ld2 *statistics.StoreLoad) int { return -cmp(ld1, ld2) } } +// sliceLoadCmp returns function with running cmps in order. +// If the cmp returns 0, which means equal, the next cmp will be used. +// If all cmps return 0, the two loads are considered equal. func sliceLoadCmp(cmps ...storeLoadCmp) storeLoadCmp { return func(ld1, ld2 *statistics.StoreLoad) int { for _, cmp := range cmps { @@ -280,12 +285,15 @@ func sliceLoadCmp(cmps ...storeLoadCmp) storeLoadCmp { } } +// stLdRankCmp returns a cmp that compares the two loads with discretized data. +// For example, if the rank function discretice data by step 10 , the load 11 and 19 will be considered equal. func stLdRankCmp(dim func(ld *statistics.StoreLoad) float64, rank func(value float64) int64) storeLoadCmp { return func(ld1, ld2 *statistics.StoreLoad) int { return rankCmp(dim(ld1), dim(ld2), rank) } } +// rankCmp compares the two values with discretized data. func rankCmp(a, b float64, rank func(value float64) int64) int { aRk, bRk := rank(a), rank(b) if aRk < bRk { @@ -298,6 +306,9 @@ func rankCmp(a, b float64, rank func(value float64) int64) int { type storeLPCmp func(lp1, lp2 *statistics.StoreLoadPred) int +// sliceLPCmp returns function with running cmps in order. +// If the cmp returns 0, which means equal, the next cmp will be used. +// If all cmps return 0, the two loads are considered equal. func sliceLPCmp(cmps ...storeLPCmp) storeLPCmp { return func(lp1, lp2 *statistics.StoreLoadPred) int { for _, cmp := range cmps { @@ -309,18 +320,21 @@ func sliceLPCmp(cmps ...storeLPCmp) storeLPCmp { } } +// minLPCmp is a function to select the min load of the store between current and future when comparing. func minLPCmp(ldCmp storeLoadCmp) storeLPCmp { return func(lp1, lp2 *statistics.StoreLoadPred) int { return ldCmp(lp1.Min(), lp2.Min()) } } +// maxLPCmp is a function to select the max load of the store between current and future when comparing. func maxLPCmp(ldCmp storeLoadCmp) storeLPCmp { return func(lp1, lp2 *statistics.StoreLoadPred) int { return ldCmp(lp1.Max(), lp2.Max()) } } +// diffCmp is a function to select the diff load of the store between current and future when comparing. func diffCmp(ldCmp storeLoadCmp) storeLPCmp { return func(lp1, lp2 *statistics.StoreLoadPred) int { return ldCmp(lp1.Diff(), lp2.Diff())