Skip to content

Commit

Permalink
Merge branch 'master' into fix_test
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] committed Jul 11, 2023
2 parents 6abbcbb + f816c51 commit 66804da
Show file tree
Hide file tree
Showing 15 changed files with 259 additions and 115 deletions.
4 changes: 2 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,8 +808,8 @@ func (mc *Cluster) MockRegionInfo(regionID uint64, leaderStoreID uint64,
followerStoreIDs, learnerStoreIDs []uint64, epoch *metapb.RegionEpoch) *core.RegionInfo {
region := &metapb.Region{
Id: regionID,
StartKey: []byte(fmt.Sprintf("%20d", regionID)),
EndKey: []byte(fmt.Sprintf("%20d", regionID+1)),
StartKey: []byte(fmt.Sprintf("%20d0", regionID)),
EndKey: []byte(fmt.Sprintf("%20d0", regionID+1)),
RegionEpoch: epoch,
}
var leader *metapb.Peer
Expand Down
5 changes: 5 additions & 0 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/storage/endpoint"
)

// RejectLeader is the label property type that suggests a store should not
Expand All @@ -30,6 +31,10 @@ func IsSchedulerRegistered(name string) bool {
// Config is the interface that wraps the Config related methods.
type Config interface {
IsSchedulingHalted() bool
IsSchedulerDisabled(string) bool
AddSchedulerCfg(string, []string)
RemoveSchedulerCfg(string)
Persist(endpoint.ConfigStorage) error

GetReplicaScheduleLimit() uint64
GetRegionScheduleLimit() uint64
Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ type Coordinator struct {
// NewCoordinator creates a new Coordinator.
func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Coordinator {
ctx, cancel := context.WithCancel(ctx)
opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetPersistOptions(), hbStreams)
schedulers := schedulers.NewController(ctx, cluster, opController)
opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetOpts(), hbStreams)
schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController)
c := &Coordinator{
ctx: ctx,
cancel: cancel,
Expand All @@ -101,7 +101,7 @@ func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams
hbStreams: hbStreams,
pluginInterface: NewPluginInterface(),
}
c.diagnosticManager = diagnostic.NewManager(schedulers, cluster.GetPersistOptions())
c.diagnosticManager = diagnostic.NewManager(schedulers, cluster.GetOpts())
return c
}

Expand Down Expand Up @@ -316,7 +316,7 @@ func (c *Coordinator) RunUntilStop() {
c.Run()
<-c.ctx.Done()
log.Info("Coordinator is stopping")
c.GetSchedulersController().GetWaitGroup().Wait()
c.GetSchedulersController().Wait()
c.wg.Wait()
log.Info("Coordinator has been stopped")
}
Expand Down
16 changes: 4 additions & 12 deletions pkg/schedule/diagnostic/diagnostic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ import (
"time"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/server/config"
)

// Manager is used to manage the diagnostic result of schedulers for now.
type Manager struct {
config *config.PersistOptions
config config.Config
schedulerController *schedulers.Controller
}

// NewManager creates a new Manager.
func NewManager(schedulerController *schedulers.Controller, config *config.PersistOptions) *Manager {
func NewManager(schedulerController *schedulers.Controller, config config.Config) *Manager {
return &Manager{
config: config,
schedulerController: schedulerController,
Expand All @@ -48,15 +48,7 @@ func (d *Manager) GetDiagnosticResult(name string) (*schedulers.DiagnosticResult
res := &schedulers.DiagnosticResult{Name: name, Timestamp: ts, Status: schedulers.Disabled}
return res, nil
}
var isDisabled bool
t := scheduler.Scheduler.GetType()
scheduleConfig := d.config.GetScheduleConfig()
for _, s := range scheduleConfig.Schedulers {
if t == s.Type {
isDisabled = s.Disable
break
}
}
isDisabled := d.config.IsSchedulerDisabled(scheduler.Scheduler.GetType())
if isDisabled {
ts := uint64(time.Now().Unix())
res := &schedulers.DiagnosticResult{Name: name, Timestamp: ts, Status: schedulers.Disabled}
Expand Down
111 changes: 82 additions & 29 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ var (
hotSchedulerSnapshotSenderLimitCounter = schedulerCounter.WithLabelValues(HotRegionName, "snapshot_sender_limit")

// counter related with the split region
hotSchedulerNotFoundSplitKeysCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_found_split_keys")
hotSchedulerRegionBucketsNotHotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_not_hot")
hotSchedulerSplitSuccessCounter = schedulerCounter.WithLabelValues(HotRegionName, "split_success")
hotSchedulerNeedSplitBeforeScheduleCounter = schedulerCounter.WithLabelValues(HotRegionName, "need_split_before_move_peer")
hotSchedulerNotFoundSplitKeysCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_found_split_keys")
hotSchedulerRegionBucketsNotHotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_not_hot")
hotSchedulerRegionBucketsSingleHotSpotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_single_hot_spot")
hotSchedulerSplitSuccessCounter = schedulerCounter.WithLabelValues(HotRegionName, "split_success")
hotSchedulerNeedSplitBeforeScheduleCounter = schedulerCounter.WithLabelValues(HotRegionName, "need_split_before_move_peer")
hotSchedulerRegionTooHotNeedSplitCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_is_too_hot_need_split")

hotSchedulerMoveLeaderCounter = schedulerCounter.WithLabelValues(HotRegionName, moveLeader.String())
hotSchedulerMovePeerCounter = schedulerCounter.WithLabelValues(HotRegionName, movePeer.String())
Expand Down Expand Up @@ -159,21 +161,23 @@ func (h *baseHotScheduler) prepareForBalance(rw statistics.RWType, cluster sche.
// It makes each dim rate or count become `weight` times to the origin value.
func (h *baseHotScheduler) summaryPendingInfluence() {
for id, p := range h.regionPendings {
from := h.stInfos[p.from]
to := h.stInfos[p.to]
maxZombieDur := p.maxZombieDuration
weight, needGC := calcPendingInfluence(p.op, maxZombieDur)

if needGC {
delete(h.regionPendings, id)
continue
}
for _, from := range p.froms {
from := h.stInfos[from]
to := h.stInfos[p.to]
maxZombieDur := p.maxZombieDuration
weight, needGC := calcPendingInfluence(p.op, maxZombieDur)

if needGC {
delete(h.regionPendings, id)
continue
}

if from != nil && weight > 0 {
from.AddInfluence(&p.origin, -weight)
}
if to != nil && weight > 0 {
to.AddInfluence(&p.origin, weight)
if from != nil && weight > 0 {
from.AddInfluence(&p.origin, -weight)
}
if to != nil && weight > 0 {
to.AddInfluence(&p.origin, weight)
}
}
}
for storeID, info := range h.stInfos {
Expand Down Expand Up @@ -214,7 +218,8 @@ var (
// as it implies that this dimension is sufficiently uniform.
stddevThreshold = 0.1

splitBucket = "split-hot-region"
splitBucket = "split-hot-region"
splitProgressiveRank = int64(-5)
)

type hotScheduler struct {
Expand Down Expand Up @@ -294,7 +299,7 @@ func (h *hotScheduler) dispatch(typ statistics.RWType, cluster sche.ScheduleClus
return nil
}

func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl statistics.Influence, maxZombieDur time.Duration) bool {
func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore []uint64, dstStore uint64, infl statistics.Influence, maxZombieDur time.Duration) bool {
regionID := op.RegionID()
_, ok := h.regionPendings[regionID]
if ok {
Expand Down Expand Up @@ -651,6 +656,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
}
}
snapshotFilter := filter.NewSnapshotSendFilter(bs.GetStores(), constant.Medium)
splitThresholds := bs.sche.conf.getSplitThresholds()
for _, srcStore := range bs.filterSrcStores() {
bs.cur.srcStore = srcStore
srcStoreID := srcStore.GetID()
Expand All @@ -664,6 +670,16 @@ func (bs *balanceSolver) solve() []*operator.Operator {
}
}
bs.cur.mainPeerStat = mainPeerStat
if tooHotNeedSplit(srcStore, mainPeerStat, splitThresholds) && bs.GetStoreConfig().IsEnableRegionBucket() {
hotSchedulerRegionTooHotNeedSplitCounter.Inc()
ops := bs.createSplitOperator([]*core.RegionInfo{bs.cur.region}, true /*too hot need to split*/)
if len(ops) > 0 {
bs.ops = ops
bs.cur.calcPeersRate(bs.firstPriority, bs.secondPriority)
bs.best = bs.cur
return ops
}
}

for _, dstStore := range bs.filterDstStores() {
bs.cur.dstStore = dstStore
Expand Down Expand Up @@ -723,24 +739,38 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool {
if bs.best == nil || len(bs.ops) == 0 {
return false
}
if bs.best.srcStore.IsTiFlash() != bs.best.dstStore.IsTiFlash() {
isSplit := bs.ops[0].Kind() == operator.OpSplit
if !isSplit && bs.best.srcStore.IsTiFlash() != bs.best.dstStore.IsTiFlash() {
hotSchedulerNotSameEngineCounter.Inc()
return false
}
maxZombieDur := bs.calcMaxZombieDur()

// TODO: Process operators atomically.
// main peer
srcStoreID := bs.best.srcStore.GetID()
dstStoreID := bs.best.dstStore.GetID()

srcStoreIDs := make([]uint64, 0)
dstStoreID := uint64(0)
if isSplit {
region := bs.GetRegion(bs.ops[0].RegionID())
for id := range region.GetStoreIDs() {
srcStoreIDs = append(srcStoreIDs, id)
}
} else {
srcStoreIDs = append(srcStoreIDs, bs.best.srcStore.GetID())
dstStoreID = bs.best.dstStore.GetID()
}
infl := bs.collectPendingInfluence(bs.best.mainPeerStat)
if !bs.sche.tryAddPendingInfluence(bs.ops[0], srcStoreID, dstStoreID, infl, maxZombieDur) {
if !bs.sche.tryAddPendingInfluence(bs.ops[0], srcStoreIDs, dstStoreID, infl, maxZombieDur) {
return false
}
if isSplit {
return true
}
// revert peers
if bs.best.revertPeerStat != nil && len(bs.ops) > 1 {
infl := bs.collectPendingInfluence(bs.best.revertPeerStat)
if !bs.sche.tryAddPendingInfluence(bs.ops[1], dstStoreID, srcStoreID, infl, maxZombieDur) {
if !bs.sche.tryAddPendingInfluence(bs.ops[1], srcStoreIDs, dstStoreID, infl, maxZombieDur) {
return false
}
}
Expand Down Expand Up @@ -1243,7 +1273,7 @@ func (bs *balanceSolver) getMinRate(dim int) float64 {

// betterThan checks if `bs.cur` is a better solution than `old`.
func (bs *balanceSolver) betterThanV1(old *solution) bool {
if old == nil {
if old == nil || bs.cur.progressiveRank <= splitProgressiveRank {
return true
}
if bs.cur.progressiveRank != old.progressiveRank {
Expand Down Expand Up @@ -1435,7 +1465,7 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) {
}
}
if len(splitRegions) > 0 {
return bs.createSplitOperator(splitRegions)
return bs.createSplitOperator(splitRegions, false /* region is too big need split before move */)
}

srcStoreID := bs.cur.srcStore.GetID()
Expand Down Expand Up @@ -1475,7 +1505,8 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) {
}

// createSplitOperator creates split operators for the given regions.
func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*operator.Operator {
// isTooHot true indicates that the region is too hot and needs split.
func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo, isTooHot bool) []*operator.Operator {
if len(regions) == 0 {
return nil
}
Expand All @@ -1492,6 +1523,11 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*oper
hotSchedulerRegionBucketsNotHotCounter.Inc()
return
}
// skip if only one hot buckets exists on this region.
if len(stats) <= 1 && isTooHot {
hotSchedulerRegionBucketsSingleHotSpotCounter.Inc()
return
}
startKey, endKey := region.GetStartKey(), region.GetEndKey()
splitKey := make([][]byte, 0)
for _, stat := range stats {
Expand All @@ -1505,7 +1541,13 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*oper
// Otherwise, we should append the current start key and end key.
// E.g. [a, b), [c, d) -> [a, b), [c, d) split keys is [a,b,c,d]
if bytes.Equal(stat.StartKey, splitKey[len(splitKey)-1]) {
splitKey[len(splitKey)-1] = stat.EndKey
// If the region is too hot, we should split all the buckets to balance the load.
// otherwise, we should split the buckets that are too hot.
if isTooHot {
splitKey = append(splitKey, stat.EndKey)
} else {
splitKey[len(splitKey)-1] = stat.EndKey
}
} else {
splitKey = append(splitKey, stat.StartKey, stat.EndKey)
}
Expand All @@ -1529,6 +1571,10 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*oper
for _, region := range regions {
createFunc(region)
}
// the split bucket's priority is highest
if len(operators) > 0 {
bs.cur.progressiveRank = splitProgressiveRank
}
return operators
}

Expand Down Expand Up @@ -1789,3 +1835,10 @@ func dimToString(dim int) string {
func prioritiesToDim(priorities []string) (firstPriority int, secondPriority int) {
return stringToDim(priorities[0]), stringToDim(priorities[1])
}

// tooHotNeedSplit returns true if any dim of the hot region is greater than the store threshold.
func tooHotNeedSplit(store *statistics.StoreLoadDetail, region *statistics.HotPeerStat, splitThresholds float64) bool {
return slice.AnyOf(store.LoadPred.Current.Loads, func(i int) bool {
return region.Loads[i] > store.LoadPred.Current.Loads[i]*splitThresholds
})
}
13 changes: 13 additions & 0 deletions pkg/schedule/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig {
EnableForTiFlash: true,
RankFormulaVersion: "v2",
ForbidRWType: "none",
SplitThresholds: 0.2,
}
cfg.applyPrioritiesConfig(defaultPrioritiesConfig)
return cfg
Expand Down Expand Up @@ -102,6 +103,7 @@ func (conf *hotRegionSchedulerConfig) getValidConf() *hotRegionSchedulerConfig {
EnableForTiFlash: conf.EnableForTiFlash,
RankFormulaVersion: conf.getRankFormulaVersionLocked(),
ForbidRWType: conf.getForbidRWTypeLocked(),
SplitThresholds: conf.SplitThresholds,
}
}

Expand Down Expand Up @@ -143,6 +145,8 @@ type hotRegionSchedulerConfig struct {
RankFormulaVersion string `json:"rank-formula-version"`
// forbid read or write scheduler, only for test
ForbidRWType string `json:"forbid-rw-type,omitempty"`
// SplitThresholds is the threshold to split hot region if the flow of on hot region exceeds it.
SplitThresholds float64 `json:"split-thresholds"`
}

func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) {
Expand Down Expand Up @@ -316,6 +320,12 @@ func (conf *hotRegionSchedulerConfig) IsForbidRWType(rw statistics.RWType) bool
return rw.String() == conf.ForbidRWType
}

func (conf *hotRegionSchedulerConfig) getSplitThresholds() float64 {
conf.RLock()
defer conf.RUnlock()
return conf.SplitThresholds
}

func (conf *hotRegionSchedulerConfig) getForbidRWTypeLocked() string {
switch conf.ForbidRWType {
case statistics.Read.String(), statistics.Write.String():
Expand Down Expand Up @@ -377,6 +387,9 @@ func (conf *hotRegionSchedulerConfig) valid() error {
conf.ForbidRWType != "none" && conf.ForbidRWType != "" {
return errs.ErrSchedulerConfig.FastGenByArgs("invalid forbid-rw-type")
}
if conf.SplitThresholds < 0.01 || conf.SplitThresholds > 1.0 {
return errs.ErrSchedulerConfig.FastGenByArgs("invalid split-thresholds, should be in range [0.01, 1.0]")
}
return nil
}

Expand Down
Loading

0 comments on commit 66804da

Please sign in to comment.