Skip to content

Commit

Permalink
scheduler: add more comments
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Jun 27, 2024
1 parent 1578f29 commit 29ee921
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 31 deletions.
9 changes: 8 additions & 1 deletion pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func (h *baseHotScheduler) prepareForBalance(rw utils.RWType, cluster sche.Sched
switch rw {
case utils.Read:
// update read statistics
// avoid to update read statistics frequently
if time.Since(h.updateReadTime) >= statisticsInterval {
regionRead := cluster.RegionReadStats()
prepare(regionRead, constant.LeaderKind)
Expand All @@ -171,6 +172,7 @@ func (h *baseHotScheduler) prepareForBalance(rw utils.RWType, cluster sche.Sched
}
case utils.Write:
// update write statistics
// avoid to update write statistics frequently
if time.Since(h.updateWriteTime) >= statisticsInterval {
regionWrite := cluster.RegionWriteStats()
prepare(regionWrite, constant.LeaderKind)
Expand Down Expand Up @@ -510,6 +512,7 @@ type balanceSolver struct {
best *solution
ops []*operator.Operator

// maxSrc and minDst are used to calculate the rank.
maxSrc *statistics.StoreLoad
minDst *statistics.StoreLoad
rankStep *statistics.StoreLoad
Expand Down Expand Up @@ -1217,7 +1220,7 @@ func (bs *balanceSolver) isUniformSecondPriority(store *statistics.StoreLoadDeta
// calcProgressiveRank calculates `bs.cur.progressiveRank`.
// See the comments of `solution.progressiveRank` for more about progressive rank.
// | ↓ firstPriority \ secondPriority → | isBetter | isNotWorsened | Worsened |
// | isBetter | -4 | -3 | -1 / 0 |
// | isBetter | -4 | -3 | -1 |
// | isNotWorsened | -2 | 1 | 1 |
// | Worsened | 0 | 1 | 1 |
func (bs *balanceSolver) calcProgressiveRankV1() {
Expand Down Expand Up @@ -1363,17 +1366,21 @@ func (bs *balanceSolver) betterThanV1(old *solution) bool {
firstCmp, secondCmp := bs.getRkCmpPrioritiesV1(old)
switch bs.cur.progressiveRank {
case -4: // isBetter(firstPriority) && isBetter(secondPriority)
// Both are better, prefer the one with higher first priority rate.
// If the first priority rate is the similiar, prefer the one with higher second priority rate.
if firstCmp != 0 {
return firstCmp > 0
}
return secondCmp > 0
case -3: // isBetter(firstPriority) && isNotWorsened(secondPriority)
// The first priority is better, prefer the one with higher first priority rate.
if firstCmp != 0 {
return firstCmp > 0
}
// prefer smaller second priority rate, to reduce oscillation
return secondCmp < 0
case -2: // isNotWorsened(firstPriority) && isBetter(secondPriority)
// The second priority is better, prefer the one with higher second priority rate.
if secondCmp != 0 {
return secondCmp > 0
}
Expand Down
1 change: 0 additions & 1 deletion pkg/schedule/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
)

const (

// Scheduling has a bigger impact on TiFlash, so it needs to be corrected in configuration items
// In the default config, the TiKV difference is 1.05*1.05-1 = 0.1025, and the TiFlash difference is 1.15*1.15-1 = 0.3225
tiflashToleranceRatioCorrection = 0.1
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/hot_region_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ func (bs *balanceSolver) setSearchRevertRegionsV2() {

// calcProgressiveRankV2 calculates `bs.cur.progressiveRank`.
// See the comments of `solution.progressiveRank` for more about progressive rank.
// isBetter: score > 0
// isBetter: score < 0
// isNotWorsened: score == 0
// isWorsened: score < 0
// isWorsened: score > 0
// | ↓ firstPriority \ secondPriority → | isBetter | isNotWorsened | isWorsened |
// | isBetter | -4 | -3 | -2 |
// | isNotWorsened | -1 | 1 | 1 |
Expand Down
49 changes: 22 additions & 27 deletions pkg/statistics/store_hot_peers_infos.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package statistics

import (
"fmt"
"math"

"github.com/tikv/pd/pkg/core"
Expand Down Expand Up @@ -151,7 +150,7 @@ func summaryStoresLoadByEngine(
) []*StoreLoadDetail {
loadDetail := make([]*StoreLoadDetail, 0, len(storeInfos))
allStoreLoadSum := make([]float64, utils.DimLen)
allStoreHistoryLoadSum := make([][]float64, utils.DimLen)
allStoreHistoryLoadSum := make([][]float64, utils.DimLen) // row: dim, column: time
allStoreCount := 0
allHotPeersCount := 0

Expand All @@ -166,49 +165,37 @@ func summaryStoresLoadByEngine(
// Find all hot peers first
var hotPeers []*HotPeerStat
peerLoadSum := make([]float64, utils.DimLen)
// TODO: To remove `filterHotPeers`, we need to:
// HotLeaders consider `Write{Bytes,Keys}`, so when we schedule `writeLeader`, all peers are leader.
// For hot leaders, we need to calculate the sum of the leader's write and read flow rather than the all peers.
for _, peer := range filterHotPeers(kind, storeHotPeers[id]) {
for i := range peerLoadSum {
peerLoadSum[i] += peer.GetLoad(i)
}
hotPeers = append(hotPeers, peer.Clone())
}
{
// Metric for debug.
// TODO: pre-allocate gauge metrics
ty := "byte-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[utils.ByteDim])
ty = "key-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[utils.KeyDim])
ty = "query-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[utils.QueryDim])
}
loads := collector.GetLoads(storeLoads, peerLoadSum, rwTy, kind)
currentLoads := collector.GetLoads(storeLoads, peerLoadSum, rwTy, kind)

var historyLoads [][]float64
if storesHistoryLoads != nil {
historyLoads = storesHistoryLoads.Get(id, rwTy, kind)
for i, loads := range historyLoads {
if allStoreHistoryLoadSum[i] == nil || len(allStoreHistoryLoadSum[i]) < len(loads) {
allStoreHistoryLoadSum[i] = make([]float64, len(loads))
for i, historyLoads := range storesHistoryLoads.Get(id, rwTy, kind) {
if allStoreHistoryLoadSum[i] == nil || len(allStoreHistoryLoadSum[i]) < len(historyLoads) {
allStoreHistoryLoadSum[i] = make([]float64, len(historyLoads))
}
for j, load := range loads {
allStoreHistoryLoadSum[i][j] += load
for j, historyLoad := range historyLoads {
allStoreHistoryLoadSum[i][j] += historyLoad
}
}
storesHistoryLoads.Add(id, rwTy, kind, loads)
storesHistoryLoads.Add(id, rwTy, kind, currentLoads)
}

for i := range allStoreLoadSum {
allStoreLoadSum[i] += loads[i]
allStoreLoadSum[i] += currentLoads[i]
}
allStoreCount += 1
allHotPeersCount += len(hotPeers)

// Build store load prediction from current load and pending influence.
stLoadPred := (&StoreLoad{
Loads: loads,
Loads: currentLoads,
Count: float64(len(hotPeers)),
HistoryLoads: historyLoads,
}).ToLoadPred(rwTy, info.PendingSum)
Expand All @@ -231,8 +218,8 @@ func summaryStoresLoadByEngine(
expectLoads[i] = allStoreLoadSum[i] / float64(allStoreCount)
}

// todo: remove some the max value or min value to avoid the effect of extreme value.
expectHistoryLoads := make([][]float64, utils.DimLen)
// TODO: remove some the max value or min value to avoid the effect of extreme value.
expectHistoryLoads := make([][]float64, utils.DimLen) // row: dim, column: time
for i := range allStoreHistoryLoadSum {
expectHistoryLoads[i] = make([]float64, len(allStoreHistoryLoadSum[i]))
for j := range allStoreHistoryLoadSum[i] {
Expand Down Expand Up @@ -285,11 +272,19 @@ func summaryStoresLoadByEngine(
return loadDetail
}

// filterHotPeers filters hot peers according to kind.
// If kind is RegionKind, all hot peers will be returned.
// If kind is LeaderKind, only leader hot peers will be returned.
func filterHotPeers(kind constant.ResourceKind, peers []*HotPeerStat) []*HotPeerStat {
ret := make([]*HotPeerStat, 0, len(peers))
for _, peer := range peers {
if kind != constant.LeaderKind || peer.IsLeader() {
switch kind {
case constant.RegionKind:
ret = append(ret, peer)
case constant.LeaderKind:
if peer.IsLeader() {
ret = append(ret, peer)
}
}
}
return ret
Expand Down

0 comments on commit 29ee921

Please sign in to comment.