Skip to content

Commit

Permalink
Merge branch 'master' into alloc-id
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Aug 31, 2023
2 parents 00a22f4 + 931bf7d commit 1e22465
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 129 deletions.
111 changes: 63 additions & 48 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,34 @@ import (
"go.uber.org/zap"
)

const (
// HotRegionName is balance hot region scheduler name.
HotRegionName = "balance-hot-region-scheduler"
// HotRegionType is balance hot region scheduler type.
HotRegionType = "hot-region"
splitHotReadBuckets = "split-hot-read-region"
splitHotWriteBuckets = "split-hot-write-region"
splitProgressiveRank = int64(-5)
minHotScheduleInterval = time.Second
maxHotScheduleInterval = 20 * time.Second
)

var (
topnPosition = 10
// schedulePeerPr the probability of schedule the hot peer.
schedulePeerPr = 0.66
// pendingAmpFactor will amplify the impact of pending influence, making scheduling slower or even serial when two stores are close together
pendingAmpFactor = 2.0
// If the distribution of a dimension is below the corresponding stddev threshold, then scheduling will no longer be based on this dimension,
// as it implies that this dimension is sufficiently uniform.
stddevThreshold = 0.1
// topnPosition is the position of the topn peer in the hot peer list.
// We use it to judge whether to schedule the hot peer in some cases.
topnPosition = 10
// statisticsInterval is the interval to update statistics information.
statisticsInterval = time.Second
)

var (
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
hotSchedulerCounter = schedulerCounter.WithLabelValues(HotRegionName, "schedule")
hotSchedulerSkipCounter = schedulerCounter.WithLabelValues(HotRegionName, "skip")
Expand Down Expand Up @@ -85,17 +110,14 @@ var (

type baseHotScheduler struct {
*BaseScheduler
// store information, including pending Influence by resource type
// stLoadInfos contain store statistics information by resource type.
// stLoadInfos is temporary states but exported to API or metrics.
// Every time `Schedule()` will recalculate it.
stInfos map[uint64]*statistics.StoreSummaryInfo
// temporary states but exported to API or metrics
// Every time `Schedule()` will recalculate it.
stLoadInfos [resourceTypeLen]map[uint64]*statistics.StoreLoadDetail
stLoadInfos [resourceTypeLen]map[uint64]*statistics.StoreLoadDetail
// stHistoryLoads stores the history `stLoadInfos`
// Every time `Schedule()` will rolling update it.
stHistoryLoads *statistics.StoreHistoryLoads
// temporary states
// Every time `Schedule()` will recalculate it.
storesLoads map[uint64][]float64
// regionPendings stores regionID -> pendingInfluence
// regionPendings stores regionID -> pendingInfluence,
// this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't
// be selected if its owner region is tracked in this attribute.
regionPendings map[uint64]*pendingInfluence
Expand Down Expand Up @@ -123,16 +145,16 @@ func newBaseHotScheduler(opController *operator.Controller) *baseHotScheduler {
// prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for
// each store, only update read or write load detail
func (h *baseHotScheduler) prepareForBalance(rw utils.RWType, cluster sche.SchedulerCluster) {
h.stInfos = statistics.SummaryStoreInfos(cluster.GetStores())
h.summaryPendingInfluence()
h.storesLoads = cluster.GetStoresLoads()
storeInfos := statistics.SummaryStoreInfos(cluster.GetStores())
h.summaryPendingInfluence(storeInfos)
storesLoads := cluster.GetStoresLoads()
isTraceRegionFlow := cluster.GetSchedulerConfig().IsTraceRegionFlow()

prepare := func(regionStats map[uint64][]*statistics.HotPeerStat, resource constant.ResourceKind) {
ty := buildResourceType(rw, resource)
h.stLoadInfos[ty] = statistics.SummaryStoresLoad(
h.stInfos,
h.storesLoads,
storeInfos,
storesLoads,
h.stHistoryLoads,
regionStats,
isTraceRegionFlow,
Expand Down Expand Up @@ -161,11 +183,11 @@ func (h *baseHotScheduler) prepareForBalance(rw utils.RWType, cluster sche.Sched
// summaryPendingInfluence calculate the summary of pending Influence for each store
// and clean the region from regionInfluence if they have ended operator.
// It makes each dim rate or count become `weight` times to the origin value.
func (h *baseHotScheduler) summaryPendingInfluence() {
func (h *baseHotScheduler) summaryPendingInfluence(storeInfos map[uint64]*statistics.StoreSummaryInfo) {
for id, p := range h.regionPendings {
for _, from := range p.froms {
from := h.stInfos[from]
to := h.stInfos[p.to]
from := storeInfos[from]
to := storeInfos[p.to]
maxZombieDur := p.maxZombieDuration
weight, needGC := calcPendingInfluence(p.op, maxZombieDur)

Expand All @@ -182,9 +204,9 @@ func (h *baseHotScheduler) summaryPendingInfluence() {
}
}
}
for storeID, info := range h.stInfos {
for storeID, info := range storeInfos {
storeLabel := strconv.FormatUint(storeID, 10)
if infl := info.PendingSum; infl != nil {
if infl := info.PendingSum; infl != nil && len(infl.Loads) != 0 {
utils.ForeachRegionStats(func(rwTy utils.RWType, dim int, kind utils.RegionStatKind) {
setHotPendingInfluenceMetrics(storeLabel, rwTy.String(), utils.DimToString(dim), infl.Loads[kind])
})
Expand All @@ -201,30 +223,6 @@ func (h *baseHotScheduler) randomRWType() utils.RWType {
return h.types[h.r.Int()%len(h.types)]
}

const (
// HotRegionName is balance hot region scheduler name.
HotRegionName = "balance-hot-region-scheduler"
// HotRegionType is balance hot region scheduler type.
HotRegionType = "hot-region"

minHotScheduleInterval = time.Second
maxHotScheduleInterval = 20 * time.Second
)

var (
// schedulePeerPr the probability of schedule the hot peer.
schedulePeerPr = 0.66
// pendingAmpFactor will amplify the impact of pending influence, making scheduling slower or even serial when two stores are close together
pendingAmpFactor = 2.0
// If the distribution of a dimension is below the corresponding stddev threshold, then scheduling will no longer be based on this dimension,
// as it implies that this dimension is sufficiently uniform.
stddevThreshold = 0.1

splitHotReadBuckets = "split-hot-read-region"
splitHotWriteBuckets = "split-hot-write-region"
splitProgressiveRank = int64(-5)
)

type hotScheduler struct {
name string
*baseHotScheduler
Expand Down Expand Up @@ -408,7 +406,8 @@ type solution struct {
secondScore int
}

// getExtremeLoad returns the min load of the src store and the max load of the dst store.
// getExtremeLoad returns the closest load in the selected src and dst statistics.
// in other word, the min load of the src store and the max load of the dst store.
// If peersRate is negative, the direction is reversed.
func (s *solution) getExtremeLoad(dim int) (src float64, dst float64) {
if s.getPeersRateFromCache(dim) >= 0 {
Expand Down Expand Up @@ -1352,10 +1351,16 @@ func (bs *balanceSolver) getRkCmpPrioritiesV1(old *solution) (firstCmp int, seco
return
}

// smaller is better
// compareSrcStore compares the source store of detail1, detail2, the result is:
// 1. if detail1 is better than detail2, return -1
// 2. if detail1 is worse than detail2, return 1
// 3. if detail1 is equal to detail2, return 0
// The comparison is based on the following principles:
// 1. select the min load of store in current and future, because we want to select the store as source store;
// 2. compare detail1 and detail2 by first priority and second priority, we pick the larger one to speed up the convergence;
// 3. if the first priority and second priority are equal, we pick the store with the smaller difference between current and future to minimize oscillations.
func (bs *balanceSolver) compareSrcStore(detail1, detail2 *statistics.StoreLoadDetail) int {
if detail1 != detail2 {
// compare source store
var lpCmp storeLPCmp
if bs.resourceTy == writeLeader {
lpCmp = sliceLPCmp(
Expand Down Expand Up @@ -1385,7 +1390,14 @@ func (bs *balanceSolver) compareSrcStore(detail1, detail2 *statistics.StoreLoadD
return 0
}

// smaller is better
// compareDstStore compares the destination store of detail1, detail2, the result is:
// 1. if detail1 is better than detail2, return -1
// 2. if detail1 is worse than detail2, return 1
// 3. if detail1 is equal to detail2, return 0
// The comparison is based on the following principles:
// 1. select the max load of store in current and future, because we want to select the store as destination store;
// 2. compare detail1 and detail2 by first priority and second priority, we pick the smaller one to speed up the convergence;
// 3. if the first priority and second priority are equal, we pick the store with the smaller difference between current and future to minimize oscillations.
func (bs *balanceSolver) compareDstStore(detail1, detail2 *statistics.StoreLoadDetail) int {
if detail1 != detail2 {
// compare destination store
Expand Down Expand Up @@ -1417,6 +1429,9 @@ func (bs *balanceSolver) compareDstStore(detail1, detail2 *statistics.StoreLoadD
return 0
}

// stepRank returns a function can calculate the discretized data,
// where `rate` will be discretized by `step`.
// `rate` is the speed of the dim, `step` is the step size of the discretized data.
func stepRank(rk0 float64, step float64) func(float64) int64 {
return func(rate float64) int64 {
return int64((rate - rk0) / step)
Expand Down
43 changes: 22 additions & 21 deletions pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) {
}
}

hb.summaryPendingInfluence() // Calling this function will GC.
storeInfos := statistics.SummaryStoreInfos(tc.GetStores())
hb.summaryPendingInfluence(storeInfos) // Calling this function will GC.

for i := range opInfluenceCreators {
for j, typ := range typs {
Expand Down Expand Up @@ -2047,16 +2048,16 @@ func TestInfluenceByRWType(t *testing.T) {
op := ops[0]
re.NotNil(op)

hb.(*hotScheduler).summaryPendingInfluence()
stInfos := hb.(*hotScheduler).stInfos
re.True(nearlyAbout(stInfos[1].PendingSum.Loads[utils.RegionWriteKeys], -0.5*units.MiB))
re.True(nearlyAbout(stInfos[1].PendingSum.Loads[utils.RegionWriteBytes], -0.5*units.MiB))
re.True(nearlyAbout(stInfos[4].PendingSum.Loads[utils.RegionWriteKeys], 0.5*units.MiB))
re.True(nearlyAbout(stInfos[4].PendingSum.Loads[utils.RegionWriteBytes], 0.5*units.MiB))
re.True(nearlyAbout(stInfos[1].PendingSum.Loads[utils.RegionReadKeys], -0.5*units.MiB))
re.True(nearlyAbout(stInfos[1].PendingSum.Loads[utils.RegionReadBytes], -0.5*units.MiB))
re.True(nearlyAbout(stInfos[4].PendingSum.Loads[utils.RegionReadKeys], 0.5*units.MiB))
re.True(nearlyAbout(stInfos[4].PendingSum.Loads[utils.RegionReadBytes], 0.5*units.MiB))
storeInfos := statistics.SummaryStoreInfos(tc.GetStores())
hb.(*hotScheduler).summaryPendingInfluence(storeInfos)
re.True(nearlyAbout(storeInfos[1].PendingSum.Loads[utils.RegionWriteKeys], -0.5*units.MiB))
re.True(nearlyAbout(storeInfos[1].PendingSum.Loads[utils.RegionWriteBytes], -0.5*units.MiB))
re.True(nearlyAbout(storeInfos[4].PendingSum.Loads[utils.RegionWriteKeys], 0.5*units.MiB))
re.True(nearlyAbout(storeInfos[4].PendingSum.Loads[utils.RegionWriteBytes], 0.5*units.MiB))
re.True(nearlyAbout(storeInfos[1].PendingSum.Loads[utils.RegionReadKeys], -0.5*units.MiB))
re.True(nearlyAbout(storeInfos[1].PendingSum.Loads[utils.RegionReadBytes], -0.5*units.MiB))
re.True(nearlyAbout(storeInfos[4].PendingSum.Loads[utils.RegionReadKeys], 0.5*units.MiB))
re.True(nearlyAbout(storeInfos[4].PendingSum.Loads[utils.RegionReadBytes], 0.5*units.MiB))

// consider pending amp, there are nine regions or more.
for i := 2; i < 13; i++ {
Expand All @@ -2072,17 +2073,17 @@ func TestInfluenceByRWType(t *testing.T) {
op = ops[0]
re.NotNil(op)

hb.(*hotScheduler).summaryPendingInfluence()
stInfos = hb.(*hotScheduler).stInfos
storeInfos = statistics.SummaryStoreInfos(tc.GetStores())
hb.(*hotScheduler).summaryPendingInfluence(storeInfos)
// assert read/write influence is the sum of write peer and write leader
re.True(nearlyAbout(stInfos[1].PendingSum.Loads[utils.RegionWriteKeys], -1.2*units.MiB))
re.True(nearlyAbout(stInfos[1].PendingSum.Loads[utils.RegionWriteBytes], -1.2*units.MiB))
re.True(nearlyAbout(stInfos[3].PendingSum.Loads[utils.RegionWriteKeys], 0.7*units.MiB))
re.True(nearlyAbout(stInfos[3].PendingSum.Loads[utils.RegionWriteBytes], 0.7*units.MiB))
re.True(nearlyAbout(stInfos[1].PendingSum.Loads[utils.RegionReadKeys], -1.2*units.MiB))
re.True(nearlyAbout(stInfos[1].PendingSum.Loads[utils.RegionReadBytes], -1.2*units.MiB))
re.True(nearlyAbout(stInfos[3].PendingSum.Loads[utils.RegionReadKeys], 0.7*units.MiB))
re.True(nearlyAbout(stInfos[3].PendingSum.Loads[utils.RegionReadBytes], 0.7*units.MiB))
re.True(nearlyAbout(storeInfos[1].PendingSum.Loads[utils.RegionWriteKeys], -1.2*units.MiB))
re.True(nearlyAbout(storeInfos[1].PendingSum.Loads[utils.RegionWriteBytes], -1.2*units.MiB))
re.True(nearlyAbout(storeInfos[3].PendingSum.Loads[utils.RegionWriteKeys], 0.7*units.MiB))
re.True(nearlyAbout(storeInfos[3].PendingSum.Loads[utils.RegionWriteBytes], 0.7*units.MiB))
re.True(nearlyAbout(storeInfos[1].PendingSum.Loads[utils.RegionReadKeys], -1.2*units.MiB))
re.True(nearlyAbout(storeInfos[1].PendingSum.Loads[utils.RegionReadBytes], -1.2*units.MiB))
re.True(nearlyAbout(storeInfos[3].PendingSum.Loads[utils.RegionReadKeys], 0.7*units.MiB))
re.True(nearlyAbout(storeInfos[3].PendingSum.Loads[utils.RegionReadBytes], 0.7*units.MiB))
}

func nearlyAbout(f1, f2 float64) bool {
Expand Down
Loading

0 comments on commit 1e22465

Please sign in to comment.