Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#6827
Browse files Browse the repository at this point in the history
close tikv#6645

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lhy1024 authored and ti-chi-bot committed Jul 31, 2023
1 parent 2892b46 commit 9b7778b
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 29 deletions.
131 changes: 106 additions & 25 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,48 @@ import (
"go.uber.org/zap"
)

<<<<<<< HEAD:server/schedulers/hot_region.go
=======
var (
topnPosition = 10
statisticsInterval = time.Second
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
hotSchedulerCounter = schedulerCounter.WithLabelValues(HotRegionName, "schedule")
hotSchedulerSkipCounter = schedulerCounter.WithLabelValues(HotRegionName, "skip")
hotSchedulerSearchRevertRegionsCounter = schedulerCounter.WithLabelValues(HotRegionName, "search_revert_regions")
hotSchedulerNotSameEngineCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_same_engine")
hotSchedulerNoRegionCounter = schedulerCounter.WithLabelValues(HotRegionName, "no_region")
hotSchedulerUnhealthyReplicaCounter = schedulerCounter.WithLabelValues(HotRegionName, "unhealthy_replica")
hotSchedulerAbnormalReplicaCounter = schedulerCounter.WithLabelValues(HotRegionName, "abnormal_replica")
hotSchedulerCreateOperatorFailedCounter = schedulerCounter.WithLabelValues(HotRegionName, "create_operator_failed")
hotSchedulerNewOperatorCounter = schedulerCounter.WithLabelValues(HotRegionName, "new_operator")
hotSchedulerSnapshotSenderLimitCounter = schedulerCounter.WithLabelValues(HotRegionName, "snapshot_sender_limit")

// counter related with the split region
hotSchedulerNotFoundSplitKeysCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_found_split_keys")
hotSchedulerRegionBucketsNotHotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_not_hot")
hotSchedulerRegionBucketsSingleHotSpotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_single_hot_spot")
hotSchedulerSplitSuccessCounter = schedulerCounter.WithLabelValues(HotRegionName, "split_success")
hotSchedulerNeedSplitBeforeScheduleCounter = schedulerCounter.WithLabelValues(HotRegionName, "need_split_before_move_peer")
hotSchedulerRegionTooHotNeedSplitCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_is_too_hot_need_split")

hotSchedulerMoveLeaderCounter = schedulerCounter.WithLabelValues(HotRegionName, moveLeader.String())
hotSchedulerMovePeerCounter = schedulerCounter.WithLabelValues(HotRegionName, movePeer.String())
hotSchedulerTransferLeaderCounter = schedulerCounter.WithLabelValues(HotRegionName, transferLeader.String())

readSkipAllDimUniformStoreCounter = schedulerCounter.WithLabelValues(HotRegionName, "read-skip-all-dim-uniform-store")
writeSkipAllDimUniformStoreCounter = schedulerCounter.WithLabelValues(HotRegionName, "write-skip-all-dim-uniform-store")
readSkipByteDimUniformStoreCounter = schedulerCounter.WithLabelValues(HotRegionName, "read-skip-byte-uniform-store")
writeSkipByteDimUniformStoreCounter = schedulerCounter.WithLabelValues(HotRegionName, "write-skip-byte-uniform-store")
readSkipKeyDimUniformStoreCounter = schedulerCounter.WithLabelValues(HotRegionName, "read-skip-key-uniform-store")
writeSkipKeyDimUniformStoreCounter = schedulerCounter.WithLabelValues(HotRegionName, "write-skip-key-uniform-store")
readSkipQueryDimUniformStoreCounter = schedulerCounter.WithLabelValues(HotRegionName, "read-skip-query-uniform-store")
writeSkipQueryDimUniformStoreCounter = schedulerCounter.WithLabelValues(HotRegionName, "write-skip-query-uniform-store")

pendingOpFails = schedulerStatus.WithLabelValues(HotRegionName, "pending_op_fails")
)

>>>>>>> 16926ad89 (scheduler: make hot v2 more suitable small hot region (#6827)):pkg/schedule/schedulers/hot_region.go
type baseHotScheduler struct {
*BaseScheduler
// store information, including pending Influence by resource type
Expand Down Expand Up @@ -416,12 +458,23 @@ func isAvailableV1(s *solution) bool {
}

type balanceSolver struct {
<<<<<<< HEAD:server/schedulers/hot_region.go
schedule.Cluster
sche *hotScheduler
stLoadDetail map[uint64]*statistics.StoreLoadDetail
rwTy statistics.RWType
opTy opType
resourceTy resourceType
=======
sche.SchedulerCluster
sche *hotScheduler
stLoadDetail map[uint64]*statistics.StoreLoadDetail
filteredHotPeers map[uint64][]*statistics.HotPeerStat // storeID -> hotPeers(filtered)
nthHotPeer map[uint64][]*statistics.HotPeerStat // storeID -> [dimLen]hotPeers
rwTy statistics.RWType
opTy opType
resourceTy resourceType
>>>>>>> 16926ad89 (scheduler: make hot v2 more suitable small hot region (#6827)):pkg/schedule/schedulers/hot_region.go

cur *solution

Expand Down Expand Up @@ -457,8 +510,21 @@ type balanceSolver struct {
}

func (bs *balanceSolver) init() {
// Init store load detail according to the type.
// Load the configuration items of the scheduler.
bs.resourceTy = toResourceType(bs.rwTy, bs.opTy)
bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber()
bs.minHotDegree = bs.GetSchedulerConfig().GetHotRegionCacheHitsThreshold()
bs.firstPriority, bs.secondPriority = prioritiesToDim(bs.getPriorities())
bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio()
bs.isRaftKV2 = bs.GetStoreConfig().IsRaftKV2()
switch bs.sche.conf.GetRankFormulaVersion() {
case "v1":
bs.initRankV1()
default:
bs.initRankV2()
}

// Init store load detail according to the type.
bs.stLoadDetail = bs.sche.stLoadInfos[bs.resourceTy]

bs.maxSrc = &statistics.StoreLoad{Loads: make([]float64, statistics.DimLen)}
Expand All @@ -471,10 +537,14 @@ func (bs *balanceSolver) init() {
}
maxCur := &statistics.StoreLoad{Loads: make([]float64, statistics.DimLen)}

bs.filteredHotPeers = make(map[uint64][]*statistics.HotPeerStat)
bs.nthHotPeer = make(map[uint64][]*statistics.HotPeerStat)
for _, detail := range bs.stLoadDetail {
bs.maxSrc = statistics.MaxLoad(bs.maxSrc, detail.LoadPred.Min())
bs.minDst = statistics.MinLoad(bs.minDst, detail.LoadPred.Max())
maxCur = statistics.MaxLoad(maxCur, &detail.LoadPred.Current)
bs.nthHotPeer[detail.GetID()] = make([]*statistics.HotPeerStat, statistics.DimLen)
bs.filteredHotPeers[detail.GetID()] = bs.filterHotPeers(detail)
}

rankStepRatios := []float64{
Expand All @@ -489,6 +559,7 @@ func (bs *balanceSolver) init() {
Loads: stepLoads,
Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(),
}
<<<<<<< HEAD:server/schedulers/hot_region.go

bs.firstPriority, bs.secondPriority = prioritiesToDim(bs.getPriorities())
bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio()
Expand All @@ -501,6 +572,8 @@ func (bs *balanceSolver) init() {
default:
bs.initRankV2()
}
=======
>>>>>>> 16926ad89 (scheduler: make hot v2 more suitable small hot region (#6827)):pkg/schedule/schedulers/hot_region.go
}

func (bs *balanceSolver) initRankV1() {
Expand Down Expand Up @@ -621,7 +694,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
for _, srcStore := range bs.filterSrcStores() {
bs.cur.srcStore = srcStore
srcStoreID := srcStore.GetID()
for _, mainPeerStat := range bs.filterHotPeers(srcStore) {
for _, mainPeerStat := range bs.filteredHotPeers[srcStoreID] {
if bs.cur.region = bs.getRegion(mainPeerStat, srcStoreID); bs.cur.region == nil {
continue
} else if bs.opTy == movePeer && bs.cur.region.GetApproximateSize() > bs.GetOpts().GetMaxMovableHotPeerSize() {
Expand All @@ -637,7 +710,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
if bs.needSearchRevertRegions() {
schedulerCounter.WithLabelValues(bs.sche.GetName(), "search-revert-regions").Inc()
dstStoreID := dstStore.GetID()
for _, revertPeerStat := range bs.filterHotPeers(bs.cur.dstStore) {
for _, revertPeerStat := range bs.filteredHotPeers[dstStoreID] {
revertRegion := bs.getRegion(revertPeerStat, dstStoreID)
if revertRegion == nil || revertRegion.GetID() == bs.cur.region.GetID() ||
!allowRevertRegion(revertRegion, srcStoreID) {
Expand Down Expand Up @@ -760,44 +833,52 @@ func (bs *balanceSolver) checkSrcByPriorityAndTolerance(minLoad, expectLoad *sta

// filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status.
// The returned hotPeer count in controlled by `max-peer-number`.
func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) (ret []*statistics.HotPeerStat) {
func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) []*statistics.HotPeerStat {
hotPeers := storeLoad.HotPeers
ret := make([]*statistics.HotPeerStat, 0, len(hotPeers))
appendItem := func(item *statistics.HotPeerStat) {
if _, ok := bs.sche.regionPendings[item.ID()]; !ok && !item.IsNeedCoolDownTransferLeader(bs.minHotDegree, bs.rwTy) {
// no in pending operator and no need cool down after transfer leader
ret = append(ret, item)
}
}

src := storeLoad.HotPeers
// At most MaxPeerNum peers, to prevent balanceSolver.solve() too slow.
if len(src) <= bs.maxPeerNum {
ret = make([]*statistics.HotPeerStat, 0, len(src))
for _, peer := range src {
appendItem(peer)
}
} else {
union := bs.sortHotPeers(src)
var firstSort, secondSort []*statistics.HotPeerStat
if len(hotPeers) >= topnPosition || len(hotPeers) > bs.maxPeerNum {
firstSort = make([]*statistics.HotPeerStat, len(hotPeers))
copy(firstSort, hotPeers)
sort.Slice(firstSort, func(i, j int) bool {
return firstSort[i].GetLoad(bs.firstPriority) > firstSort[j].GetLoad(bs.firstPriority)
})
secondSort = make([]*statistics.HotPeerStat, len(hotPeers))
copy(secondSort, hotPeers)
sort.Slice(secondSort, func(i, j int) bool {
return secondSort[i].GetLoad(bs.secondPriority) > secondSort[j].GetLoad(bs.secondPriority)
})
}
if len(hotPeers) >= topnPosition {
storeID := storeLoad.GetID()
bs.nthHotPeer[storeID][bs.firstPriority] = firstSort[topnPosition-1]
bs.nthHotPeer[storeID][bs.secondPriority] = secondSort[topnPosition-1]
}
if len(hotPeers) > bs.maxPeerNum {
union := bs.sortHotPeers(firstSort, secondSort)
ret = make([]*statistics.HotPeerStat, 0, len(union))
for peer := range union {
appendItem(peer)
}
return ret
}

return
for _, peer := range hotPeers {
appendItem(peer)
}
return ret
}

func (bs *balanceSolver) sortHotPeers(ret []*statistics.HotPeerStat) map[*statistics.HotPeerStat]struct{} {
firstSort := make([]*statistics.HotPeerStat, len(ret))
copy(firstSort, ret)
sort.Slice(firstSort, func(i, j int) bool {
return firstSort[i].GetLoad(bs.firstPriority) > firstSort[j].GetLoad(bs.firstPriority)
})
secondSort := make([]*statistics.HotPeerStat, len(ret))
copy(secondSort, ret)
sort.Slice(secondSort, func(i, j int) bool {
return secondSort[i].GetLoad(bs.secondPriority) > secondSort[j].GetLoad(bs.secondPriority)
})
func (bs *balanceSolver) sortHotPeers(firstSort, secondSort []*statistics.HotPeerStat) map[*statistics.HotPeerStat]struct{} {
union := make(map[*statistics.HotPeerStat]struct{}, bs.maxPeerNum)
// At most MaxPeerNum peers, to prevent balanceSolver.solve() too slow.
for len(union) < bs.maxPeerNum {
for len(firstSort) > 0 {
peer := firstSort[0]
Expand Down
11 changes: 7 additions & 4 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1788,20 +1788,23 @@ func TestHotCacheSortHotPeer(t *testing.T) {
},
}}

st := &statistics.StoreLoadDetail{
HotPeers: hotPeers,
}
leaderSolver.maxPeerNum = 1
u := leaderSolver.sortHotPeers(hotPeers)
u := leaderSolver.filterHotPeers(st)
checkSortResult(re, []uint64{1}, u)

leaderSolver.maxPeerNum = 2
u = leaderSolver.sortHotPeers(hotPeers)
u = leaderSolver.filterHotPeers(st)
checkSortResult(re, []uint64{1, 2}, u)
}

func checkSortResult(re *require.Assertions, regions []uint64, hotPeers map[*statistics.HotPeerStat]struct{}) {
func checkSortResult(re *require.Assertions, regions []uint64, hotPeers []*statistics.HotPeerStat) {
re.Equal(len(hotPeers), len(regions))
for _, region := range regions {
in := false
for hotPeer := range hotPeers {
for _, hotPeer := range hotPeers {
if hotPeer.RegionID == region {
in = true
break
Expand Down
7 changes: 7 additions & 0 deletions server/schedulers/hot_region_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,17 @@ func (bs *balanceSolver) getScoreByPriorities(dim int, rs *rankV2Ratios) int {
srcPendingRate, dstPendingRate := bs.cur.getPendingLoad(dim)
peersRate := bs.cur.getPeersRateFromCache(dim)
highRate, lowRate := srcRate, dstRate
topnHotPeer := bs.nthHotPeer[bs.cur.srcStore.GetID()][dim]
reverse := false
if srcRate < dstRate {
highRate, lowRate = dstRate, srcRate
peersRate = -peersRate
reverse = true
topnHotPeer = bs.nthHotPeer[bs.cur.dstStore.GetID()][dim]
}
topnRate := math.MaxFloat64
if topnHotPeer != nil {
topnRate = topnHotPeer.GetLoad(dim)
}

if highRate*rs.balancedCheckRatio <= lowRate {
Expand Down Expand Up @@ -260,6 +266,7 @@ func (bs *balanceSolver) getScoreByPriorities(dim int, rs *rankV2Ratios) int {
// maxBetterRate may be less than minBetterRate, in which case a positive fraction cannot be produced.
minNotWorsenedRate = -bs.getMinRate(dim)
minBetterRate = math.Min(minBalancedRate*rs.perceivedRatio, lowRate*rs.minHotRatio)
minBetterRate = math.Min(minBetterRate, topnRate)
maxBetterRate = maxBalancedRate + (highRate-lowRate-minBetterRate-maxBalancedRate)*rs.perceivedRatio
maxNotWorsenedRate = maxBalancedRate + (highRate-lowRate-minNotWorsenedRate-maxBalancedRate)*rs.perceivedRatio
}
Expand Down
Loading

0 comments on commit 9b7778b

Please sign in to comment.