From 2baee837932694d194e16d4618664c0059a8e073 Mon Sep 17 00:00:00 2001 From: Hu# Date: Fri, 26 Jul 2024 14:22:35 +0800 Subject: [PATCH 1/8] tools/simulator: Make simulator work with large scale cluster (#8269) close tikv/pd#5683, ref tikv/pd#8135 Signed-off-by: husharp Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/core/region.go | 10 -- tools/pd-simulator/main.go | 6 + tools/pd-simulator/simulator/cases/cases.go | 1 + .../simulator/cases/stable_env.go | 66 ++++++++ tools/pd-simulator/simulator/config/config.go | 3 + tools/pd-simulator/simulator/drive.go | 156 ++++++++++++++++-- tools/pd-simulator/simulator/node.go | 48 ++---- tools/pd-simulator/simulator/raft.go | 5 - 8 files changed, 232 insertions(+), 63 deletions(-) create mode 100644 tools/pd-simulator/simulator/cases/stable_env.go diff --git a/pkg/core/region.go b/pkg/core/region.go index 4f7af8cc333..244fef836f8 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -2264,13 +2264,3 @@ func NewTestRegionInfo(regionID, storeID uint64, start, end []byte, opts ...Regi } return NewRegionInfo(metaRegion, leader, opts...) } - -// TraverseRegions executes a function on all regions. -// ONLY for simulator now and only for READ. -func (r *RegionsInfo) TraverseRegions(lockedFunc func(*RegionInfo)) { - r.t.RLock() - defer r.t.RUnlock() - for _, item := range r.regions { - lockedFunc(item.RegionInfo) - } -} diff --git a/tools/pd-simulator/main.go b/tools/pd-simulator/main.go index 63ba7f9134d..7fe1d3ecedd 100644 --- a/tools/pd-simulator/main.go +++ b/tools/pd-simulator/main.go @@ -148,6 +148,7 @@ func simStart(pdAddr, statusAddress string, simCase string, simConfig *sc.SimCon } tickInterval := simConfig.SimTickInterval.Duration + ctx, cancel := context.WithCancel(context.Background()) tick := time.NewTicker(tickInterval) defer tick.Stop() sc := make(chan os.Signal, 1) @@ -161,6 +162,10 @@ func simStart(pdAddr, statusAddress string, simCase string, simConfig *sc.SimCon simResult := "FAIL" + go driver.StoresHeartbeat(ctx) + go driver.RegionsHeartbeat(ctx) + go driver.StepRegions(ctx) + EXIT: for { select { @@ -175,6 +180,7 @@ EXIT: } } + cancel() driver.Stop() if len(clean) != 0 && clean[0] != nil { clean[0]() diff --git a/tools/pd-simulator/simulator/cases/cases.go b/tools/pd-simulator/simulator/cases/cases.go index 0ddd66608b1..026e095342b 100644 --- a/tools/pd-simulator/simulator/cases/cases.go +++ b/tools/pd-simulator/simulator/cases/cases.go @@ -101,6 +101,7 @@ var CaseMap = map[string]func(*config.SimConfig) *Case{ "diagnose-label-not-match1": newLabelNotMatch1, "diagnose-label-isolation1": newLabelIsolation1, "diagnose-label-isolation2": newLabelIsolation2, + "stable": newStableEnv, } // NewCase creates a new case. diff --git a/tools/pd-simulator/simulator/cases/stable_env.go b/tools/pd-simulator/simulator/cases/stable_env.go new file mode 100644 index 00000000000..8a015c90ca8 --- /dev/null +++ b/tools/pd-simulator/simulator/cases/stable_env.go @@ -0,0 +1,66 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cases + +import ( + "github.com/docker/go-units" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/pkg/core" + sc "github.com/tikv/pd/tools/pd-simulator/simulator/config" + "github.com/tikv/pd/tools/pd-simulator/simulator/info" + "github.com/tikv/pd/tools/pd-simulator/simulator/simutil" +) + +// newStableEnv provides a stable environment for test. +func newStableEnv(config *sc.SimConfig) *Case { + var simCase Case + + totalStore := config.TotalStore + totalRegion := config.TotalRegion + allStores := make(map[uint64]struct{}, totalStore) + arrStoresID := make([]uint64, 0, totalStore) + replica := int(config.ServerConfig.Replication.MaxReplicas) + for i := 0; i < totalStore; i++ { + id := simutil.IDAllocator.NextID() + simCase.Stores = append(simCase.Stores, &Store{ + ID: id, + Status: metapb.StoreState_Up, + }) + allStores[id] = struct{}{} + arrStoresID = append(arrStoresID, id) + } + + for i := 0; i < totalRegion; i++ { + peers := make([]*metapb.Peer, 0, replica) + for j := 0; j < replica; j++ { + peers = append(peers, &metapb.Peer{ + Id: simutil.IDAllocator.NextID(), + StoreId: arrStoresID[(i+j)%totalStore], + }) + } + simCase.Regions = append(simCase.Regions, Region{ + ID: simutil.IDAllocator.NextID(), + Peers: peers, + Leader: peers[0], + Size: 96 * units.MiB, + Keys: 960000, + }) + } + + simCase.Checker = func(_ []*metapb.Store, _ *core.RegionsInfo, _ []info.StoreStats) bool { + return false + } + return &simCase +} diff --git a/tools/pd-simulator/simulator/config/config.go b/tools/pd-simulator/simulator/config/config.go index 6598cf35c0f..030655bd3dc 100644 --- a/tools/pd-simulator/simulator/config/config.go +++ b/tools/pd-simulator/simulator/config/config.go @@ -36,6 +36,7 @@ const ( defaultTotalStore = 3 defaultTotalRegion = 1000 defaultEnableTransferRegionCounter = false + defaultHibernatePercent = 0 // store defaultStoreIOMBPerSecond = 40 defaultStoreHeartbeat = 10 * time.Second @@ -62,6 +63,7 @@ type SimConfig struct { TotalRegion int `toml:"total-region"` EnableTransferRegionCounter bool `toml:"enable-transfer-region-counter"` SimTickInterval typeutil.Duration `toml:"sim-tick-interval"` + HibernatePercent int `toml:"hibernate-percent"` // store StoreIOMBPerSecond int64 `toml:"store-io-per-second"` StoreVersion string `toml:"store-version"` @@ -107,6 +109,7 @@ func (sc *SimConfig) Adjust(meta *toml.MetaData) error { configutil.AdjustDuration(&sc.SimTickInterval, defaultSimTickInterval) configutil.AdjustInt(&sc.TotalStore, defaultTotalStore) configutil.AdjustInt(&sc.TotalRegion, defaultTotalRegion) + configutil.AdjustInt(&sc.HibernatePercent, defaultHibernatePercent) configutil.AdjustBool(&sc.EnableTransferRegionCounter, defaultEnableTransferRegionCounter) configutil.AdjustInt64(&sc.StoreIOMBPerSecond, defaultStoreIOMBPerSecond) configutil.AdjustString(&sc.StoreVersion, versioninfo.PDReleaseVersion) diff --git a/tools/pd-simulator/simulator/drive.go b/tools/pd-simulator/simulator/drive.go index 8c511b5ac5c..87b779559ca 100644 --- a/tools/pd-simulator/simulator/drive.go +++ b/tools/pd-simulator/simulator/drive.go @@ -16,6 +16,7 @@ package simulator import ( "context" + "math/rand" "net/http" "net/http/pprof" "path" @@ -47,12 +48,18 @@ type Driver struct { pdAddr string statusAddress string simCase *cases.Case - tickCount int64 eventRunner *EventRunner raftEngine *RaftEngine conn *Connection simConfig *config.SimConfig pdConfig *config.PDConfig + + tick struct { + count int64 + region chan int64 + store chan int64 + stepRegion chan int64 + } } // NewDriver returns a driver. @@ -64,17 +71,22 @@ func NewDriver(pdAddr, statusAddress, caseName string, simConfig *config.SimConf pdConfig := &config.PDConfig{} pdConfig.PlacementRules = simCase.Rules pdConfig.LocationLabels = simCase.Labels - return &Driver{ + driver := Driver{ pdAddr: pdAddr, statusAddress: statusAddress, simCase: simCase, simConfig: simConfig, pdConfig: pdConfig, - }, nil + } + driver.tick.stepRegion = make(chan int64, 1) + driver.tick.region = make(chan int64, 1) + driver.tick.store = make(chan int64, 1) + return &driver, nil } // Prepare initializes cluster information, bootstraps cluster and starts nodes. func (d *Driver) Prepare() error { + simutil.Logger.Info("prepare cluster") conn, err := NewConnection(d.simCase, d.simConfig) if err != nil { return err @@ -166,15 +178,136 @@ func (d *Driver) updateNodesClient() error { // Tick invokes nodes' Tick. func (d *Driver) Tick() { - d.tickCount++ - d.raftEngine.stepRegions() - d.eventRunner.Tick(d.tickCount) + d.tick.count++ + curTick := d.tick.count + go func() { + d.tick.stepRegion <- curTick + }() + go func() { + d.tick.region <- curTick + }() + go func() { + d.tick.store <- curTick + }() +} + +func (d *Driver) StepRegions(ctx context.Context) { + for { + select { + case tick := <-d.tick.stepRegion: + d.raftEngine.stepRegions() + d.eventRunner.Tick(tick) + for _, n := range d.conn.Nodes { + n.reportRegionChange() + d.wg.Add(1) + go n.Tick(&d.wg) + } + d.wg.Wait() + case <-ctx.Done(): + return + } + } +} + +func (d *Driver) StoresHeartbeat(ctx context.Context) { + config := d.raftEngine.storeConfig + storeInterval := uint64(config.RaftStore.StoreHeartBeatInterval.Duration / config.SimTickInterval.Duration) + var wg sync.WaitGroup + for { + select { + case tick := <-d.tick.store: + if uint64(tick)%storeInterval == 0 { + for _, n := range d.conn.Nodes { + wg.Add(1) + go n.storeHeartBeat(&wg) + } + wg.Wait() + } + case <-ctx.Done(): + return + } + } +} + +func (d *Driver) RegionsHeartbeat(ctx context.Context) { + // ensure only wait for the first time heartbeat done + firstReport := true + config := d.raftEngine.storeConfig + regionInterval := uint64(config.RaftStore.RegionHeartBeatInterval.Duration / config.SimTickInterval.Duration) + nodesChannel := make(map[uint64]chan *core.RegionInfo, len(d.conn.Nodes)) for _, n := range d.conn.Nodes { - n.reportRegionChange() - d.wg.Add(1) - go n.Tick(&d.wg) + nodesChannel[n.Store.GetId()] = make(chan *core.RegionInfo, d.simConfig.TotalRegion) + go func(storeID uint64, ch chan *core.RegionInfo) { + for { + select { + case region := <-ch: + d.conn.Nodes[storeID].regionHeartBeat(region) + case <-ctx.Done(): + close(ch) + return + } + } + }(n.Store.GetId(), nodesChannel[n.Store.GetId()]) + } + + for { + select { + case tick := <-d.tick.region: + if uint64(tick)%regionInterval == 0 { + regions := d.raftEngine.GetRegions() + healthyNodes := make(map[uint64]bool) + for _, n := range d.conn.Nodes { + if n.GetNodeState() != metapb.NodeState_Preparing && n.GetNodeState() != metapb.NodeState_Serving { + healthyNodes[n.Store.GetId()] = false + } else { + healthyNodes[n.Store.GetId()] = true + } + } + report := 0 + for _, region := range regions { + hibernatePercent := d.simConfig.HibernatePercent + // using rand(0,100) to meet hibernatePercent + if !firstReport && rand.Intn(100) < hibernatePercent { + continue + } + + if region.GetLeader() != nil { + storeID := region.GetLeader().GetStoreId() + if healthy, ok := healthyNodes[storeID]; !ok || !healthy { + continue + } + nodesChannel[storeID] <- region.Clone() + report++ + } + } + + // Only set HaltSchedule to false when the leader count is 80% of the total region count. + // using firstReport to avoid the haltSchedule set to true manually. + if HaltSchedule.Load() && firstReport { + storeInterval := uint64(config.RaftStore.StoreHeartBeatInterval.Duration / config.SimTickInterval.Duration) + ticker := time.NewTicker(time.Duration(storeInterval)) + for range ticker.C { + // need to wait for first time heartbeat done + stores, _ := PDHTTPClient.GetStores(ctx) + var leaderCount int64 + for _, store := range stores.Stores { + leaderCount += store.Status.LeaderCount + } + // Add halt schedule check to avoid the situation that the leader count is always less than 80%. + if leaderCount > int64(float64(d.simConfig.TotalRegion)*0.8) || !HaltSchedule.Load() { + ChooseToHaltPDSchedule(false) + firstReport = false + ticker.Stop() + simutil.Logger.Info("first region heartbeat done", zap.Int64("leaderCount", leaderCount), zap.Int("checkRegions", len(regions))) + break + } + } + } + } + case <-ctx.Done(): + return + } } - d.wg.Wait() } var HaltSchedule atomic.Bool @@ -197,6 +330,7 @@ func (d *Driver) Check() bool { // Start starts all nodes. func (d *Driver) Start() error { + simutil.Logger.Info("init nodes") if err := d.updateNodesClient(); err != nil { return err } @@ -221,7 +355,7 @@ func (d *Driver) Stop() { // TickCount returns the simulation's tick count. func (d *Driver) TickCount() int64 { - return d.tickCount + return d.tick.count } // GetBootstrapInfo returns a valid bootstrap store and region. diff --git a/tools/pd-simulator/simulator/node.go b/tools/pd-simulator/simulator/node.go index 2059107227e..59b0d393c47 100644 --- a/tools/pd-simulator/simulator/node.go +++ b/tools/pd-simulator/simulator/node.go @@ -129,7 +129,6 @@ func (n *Node) Tick(wg *sync.WaitGroup) { if n.GetNodeState() != metapb.NodeState_Preparing && n.GetNodeState() != metapb.NodeState_Serving { return } - n.stepHeartBeat() n.stepCompaction() n.stepTask() n.tick++ @@ -154,29 +153,14 @@ func (n *Node) stepTask() { } } -var schedulerCheck sync.Once - -func (n *Node) stepHeartBeat() { - config := n.raftEngine.storeConfig - - period := uint64(config.RaftStore.StoreHeartBeatInterval.Duration / config.SimTickInterval.Duration) - if n.tick%period == 0 { - n.storeHeartBeat() - } - period = uint64(config.RaftStore.RegionHeartBeatInterval.Duration / config.SimTickInterval.Duration) - if n.tick%period == 0 { - n.regionHeartBeat() - schedulerCheck.Do(func() { ChooseToHaltPDSchedule(false) }) - } -} - func (n *Node) stepCompaction() { if n.tick%compactionDelayPeriod == 0 { n.compaction() } } -func (n *Node) storeHeartBeat() { +func (n *Node) storeHeartBeat(wg *sync.WaitGroup) { + defer wg.Done() if n.GetNodeState() != metapb.NodeState_Preparing && n.GetNodeState() != metapb.NodeState_Serving { return } @@ -205,26 +189,16 @@ func (n *Node) compaction() { n.stats.ToCompactionSize = 0 } -func (n *Node) regionHeartBeat() { - if n.GetNodeState() != metapb.NodeState_Preparing && n.GetNodeState() != metapb.NodeState_Serving { - return +func (n *Node) regionHeartBeat(region *core.RegionInfo) { + ctx, cancel := context.WithTimeout(n.ctx, pdTimeout) + err := n.client.RegionHeartbeat(ctx, region) + if err != nil { + simutil.Logger.Info("report region heartbeat error", + zap.Uint64("node-id", n.Id), + zap.Uint64("region-id", region.GetID()), + zap.Error(err)) } - n.raftEngine.TraverseRegions(func(region *core.RegionInfo) { - if region.GetLeader() != nil && region.GetLeader().GetStoreId() == n.Id { - ctx, cancel := context.WithTimeout(n.ctx, pdTimeout) - if region == nil { - simutil.Logger.Fatal("region not found") - } - err := n.client.RegionHeartbeat(ctx, region.Clone()) - if err != nil { - simutil.Logger.Info("report region heartbeat error", - zap.Uint64("node-id", n.Id), - zap.Uint64("region-id", region.GetID()), - zap.Error(err)) - } - cancel() - } - }) + cancel() } func (n *Node) reportRegionChange() { diff --git a/tools/pd-simulator/simulator/raft.go b/tools/pd-simulator/simulator/raft.go index 727cc6ab805..9a219d32f9f 100644 --- a/tools/pd-simulator/simulator/raft.go +++ b/tools/pd-simulator/simulator/raft.go @@ -255,11 +255,6 @@ func (r *RaftEngine) ResetRegionChange(storeID uint64, regionID uint64) { } } -// TraverseRegions executes a function on all regions, and function need to be self-locked. -func (r *RaftEngine) TraverseRegions(lockedFunc func(*core.RegionInfo)) { - r.regionsInfo.TraverseRegions(lockedFunc) -} - // GetRegions gets all RegionInfo from regionMap func (r *RaftEngine) GetRegions() []*core.RegionInfo { r.RLock() From 024656bc74c22afe9fe7ab8fed8bee849e47f013 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 26 Jul 2024 14:42:06 +0800 Subject: [PATCH 2/8] scheduler: add more hot scheduler comments and replace negative rank (#8345) ref tikv/pd#5691 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/schedule/schedulers/hot_region.go | 11 +-- pkg/schedule/schedulers/hot_region_config.go | 1 - pkg/schedule/schedulers/hot_region_rank_v1.go | 68 ++++++++++-------- pkg/schedule/schedulers/hot_region_rank_v2.go | 72 +++++++++---------- .../schedulers/hot_region_rank_v2_test.go | 10 +-- pkg/statistics/store_hot_peers_infos.go | 52 ++++++-------- 6 files changed, 109 insertions(+), 105 deletions(-) diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 6f671eac72e..f79d8fac760 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -50,7 +50,7 @@ const ( HotRegionType = "hot-region" splitHotReadBuckets = "split-hot-read-region" splitHotWriteBuckets = "split-hot-write-region" - splitProgressiveRank = int64(-5) + splitProgressiveRank = 5 minHotScheduleInterval = time.Second maxHotScheduleInterval = 20 * time.Second defaultPendingAmpFactor = 2.0 @@ -127,6 +127,7 @@ func (h *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.Sche switch typ { case readLeader, readPeer: // update read statistics + // avoid to update read statistics frequently if time.Since(h.updateReadTime) >= statisticsInterval { regionRead := cluster.RegionReadStats() prepare(regionRead, utils.Read, constant.LeaderKind) @@ -135,6 +136,7 @@ func (h *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.Sche } case writeLeader, writePeer: // update write statistics + // avoid to update write statistics frequently if time.Since(h.updateWriteTime) >= statisticsInterval { regionWrite := cluster.RegionWriteStats() prepare(regionWrite, utils.Write, constant.LeaderKind) @@ -408,10 +410,10 @@ type solution struct { cachedPeersRate []float64 // progressiveRank measures the contribution for balance. - // The smaller the rank, the better this solution is. - // If progressiveRank <= 0, this solution makes thing better. + // The bigger the rank, the better this solution is. + // If progressiveRank >= 0, this solution makes thing better. // 0 indicates that this is a solution that cannot be used directly, but can be optimized. - // 1 indicates that this is a non-optimizable solution. + // -1 indicates that this is a non-optimizable solution. // See `calcProgressiveRank` for more about progressive rank. progressiveRank int64 // only for rank v2 @@ -483,6 +485,7 @@ type balanceSolver struct { best *solution ops []*operator.Operator + // maxSrc and minDst are used to calculate the rank. maxSrc *statistics.StoreLoad minDst *statistics.StoreLoad rankStep *statistics.StoreLoad diff --git a/pkg/schedule/schedulers/hot_region_config.go b/pkg/schedule/schedulers/hot_region_config.go index 517edb1d637..5f08d755f76 100644 --- a/pkg/schedule/schedulers/hot_region_config.go +++ b/pkg/schedule/schedulers/hot_region_config.go @@ -38,7 +38,6 @@ import ( ) const ( - // Scheduling has a bigger impact on TiFlash, so it needs to be corrected in configuration items // In the default config, the TiKV difference is 1.05*1.05-1 = 0.1025, and the TiFlash difference is 1.15*1.15-1 = 0.3225 tiflashToleranceRatioCorrection = 0.1 diff --git a/pkg/schedule/schedulers/hot_region_rank_v1.go b/pkg/schedule/schedulers/hot_region_rank_v1.go index 35c9bd00427..ebf6e9bf744 100644 --- a/pkg/schedule/schedulers/hot_region_rank_v1.go +++ b/pkg/schedule/schedulers/hot_region_rank_v1.go @@ -29,9 +29,10 @@ func initRankV1(r *balanceSolver) *rankV1 { } // isAvailable returns the solution is available. -// The solution should have no revertRegion and progressiveRank < 0. +// The solution should progressiveRank > 0. +// v1 does not support revert regions, so no need to check revertRegions. func (*rankV1) isAvailable(s *solution) bool { - return s.progressiveRank < 0 + return s.progressiveRank > 0 } func (r *rankV1) checkByPriorityAndTolerance(loads []float64, f func(int) bool) bool { @@ -66,12 +67,12 @@ func (r *rankV1) filterUniformStore() (string, bool) { // If both dims are enough uniform, any schedule is unnecessary. return "all-dim", true } - if isUniformFirstPriority && (r.cur.progressiveRank == -1 || r.cur.progressiveRank == -3) { - // If first priority dim is enough uniform, -1 is unnecessary and maybe lead to worse balance for second priority dim + if isUniformFirstPriority && (r.cur.progressiveRank == 1 || r.cur.progressiveRank == 3) { + // If first priority dim is enough uniform, rank 1 is unnecessary and maybe lead to worse balance for second priority dim return utils.DimToString(r.firstPriority), true } - if isUniformSecondPriority && r.cur.progressiveRank == -2 { - // If second priority dim is enough uniform, -2 is unnecessary and maybe lead to worse balance for first priority dim + if isUniformSecondPriority && r.cur.progressiveRank == 2 { + // If second priority dim is enough uniform, rank 2 is unnecessary and maybe lead to worse balance for first priority dim return utils.DimToString(r.secondPriority), true } return "", false @@ -79,12 +80,12 @@ func (r *rankV1) filterUniformStore() (string, bool) { // calcProgressiveRank calculates `r.cur.progressiveRank`. // See the comments of `solution.progressiveRank` for more about progressive rank. -// | ↓ firstPriority \ secondPriority → | isBetter | isNotWorsened | Worsened | -// | isBetter | -4 | -3 | -1 / 0 | -// | isNotWorsened | -2 | 1 | 1 | -// | Worsened | 0 | 1 | 1 | +// | ↓ firstPriority \ secondPriority → | isBetter | isNotWorsened | Worsened | +// | isBetter | 4 | 3 | 1 | +// | isNotWorsened | 2 | -1 | -1 | +// | Worsened | 0 | -1 | -1 | func (r *rankV1) calcProgressiveRank() { - r.cur.progressiveRank = 1 + r.cur.progressiveRank = -1 r.cur.calcPeersRate(r.firstPriority, r.secondPriority) if r.cur.getPeersRateFromCache(r.firstPriority) < r.getMinRate(r.firstPriority) && r.cur.getPeersRateFromCache(r.secondPriority) < r.getMinRate(r.secondPriority) { @@ -93,10 +94,10 @@ func (r *rankV1) calcProgressiveRank() { if r.resourceTy == writeLeader { // For write leader, only compare the first priority. - // If the first priority is better, the progressiveRank is -3. + // If the first priority is better, the progressiveRank is 3. // Because it is not a solution that needs to be optimized. if r.isBetterForWriteLeader() { - r.cur.progressiveRank = -3 + r.cur.progressiveRank = 3 } return } @@ -107,16 +108,16 @@ func (r *rankV1) calcProgressiveRank() { switch { case isFirstBetter && isSecondBetter: // If belonging to the case, all two dim will be more balanced, the best choice. - r.cur.progressiveRank = -4 + r.cur.progressiveRank = 4 case isFirstBetter && isSecondNotWorsened: // If belonging to the case, the first priority dim will be more balanced, the second priority dim will be not worsened. - r.cur.progressiveRank = -3 + r.cur.progressiveRank = 3 case isFirstNotWorsened && isSecondBetter: // If belonging to the case, the first priority dim will be not worsened, the second priority dim will be more balanced. - r.cur.progressiveRank = -2 + r.cur.progressiveRank = 2 case isFirstBetter: // If belonging to the case, the first priority dim will be more balanced, ignore the second priority dim. - r.cur.progressiveRank = -1 + r.cur.progressiveRank = 1 case isSecondBetter: // If belonging to the case, the second priority dim will be more balanced, ignore the first priority dim. // It's a solution that cannot be used directly, but can be optimized. @@ -126,12 +127,12 @@ func (r *rankV1) calcProgressiveRank() { // betterThan checks if `r.cur` is a better solution than `old`. func (r *rankV1) betterThan(old *solution) bool { - if old == nil || r.cur.progressiveRank <= splitProgressiveRank { + if old == nil || r.cur.progressiveRank >= splitProgressiveRank { return true } if r.cur.progressiveRank != old.progressiveRank { - // Smaller rank is better. - return r.cur.progressiveRank < old.progressiveRank + // Bigger rank is better. + return r.cur.progressiveRank > old.progressiveRank } if (r.cur.revertRegion == nil) != (old.revertRegion == nil) { // Fewer revertRegions are better. @@ -159,24 +160,28 @@ func (r *rankV1) betterThan(old *solution) bool { // We will firstly consider ensuring converge faster, secondly reduce oscillation firstCmp, secondCmp := r.getRkCmpPriorities(old) switch r.cur.progressiveRank { - case -4: // isBetter(firstPriority) && isBetter(secondPriority) + case 4: // isBetter(firstPriority) && isBetter(secondPriority) + // Both are better, prefer the one with higher first priority rate. + // If the first priority rate is the similar, prefer the one with higher second priority rate. if firstCmp != 0 { return firstCmp > 0 } return secondCmp > 0 - case -3: // isBetter(firstPriority) && isNotWorsened(secondPriority) + case 3: // isBetter(firstPriority) && isNotWorsened(secondPriority) + // The first priority is better, prefer the one with higher first priority rate. if firstCmp != 0 { return firstCmp > 0 } // prefer smaller second priority rate, to reduce oscillation return secondCmp < 0 - case -2: // isNotWorsened(firstPriority) && isBetter(secondPriority) + case 2: // isNotWorsened(firstPriority) && isBetter(secondPriority) + // The second priority is better, prefer the one with higher second priority rate. if secondCmp != 0 { return secondCmp > 0 } // prefer smaller first priority rate, to reduce oscillation return firstCmp < 0 - case -1: // isBetter(firstPriority) + case 1: // isBetter(firstPriority) return firstCmp > 0 // TODO: The smaller the difference between the value and the expectation, the better. } @@ -193,21 +198,24 @@ func (r *rankV1) getRkCmpPriorities(old *solution) (firstCmp int, secondCmp int) func (r *rankV1) rankToDimString() string { switch r.cur.progressiveRank { - case -4: + case 4: return "all" - case -3: + case 3: return utils.DimToString(r.firstPriority) - case -2: + case 2: return utils.DimToString(r.secondPriority) - case -1: + case 1: return utils.DimToString(r.firstPriority) + "-only" default: return "none" } } -func (*rankV1) needSearchRevertRegions() bool { return false } -func (*rankV1) setSearchRevertRegions() {} +func (*rankV1) needSearchRevertRegions() bool { + return false +} + +func (*rankV1) setSearchRevertRegions() {} func (r *rankV1) isBetterForWriteLeader() bool { srcRate, dstRate := r.cur.getExtremeLoad(r.firstPriority) diff --git a/pkg/schedule/schedulers/hot_region_rank_v2.go b/pkg/schedule/schedulers/hot_region_rank_v2.go index d90da3ca8a6..fd00c3b8345 100644 --- a/pkg/schedule/schedulers/hot_region_rank_v2.go +++ b/pkg/schedule/schedulers/hot_region_rank_v2.go @@ -113,11 +113,11 @@ func initRankV2(bs *balanceSolver) *rankV2 { } // isAvailable returns the solution is available. -// If the solution has no revertRegion, progressiveRank should < 0. -// If the solution has some revertRegion, progressiveRank should equal to -4 or -3. +// If the solution has no revertRegion, progressiveRank should > 0. +// If the solution has some revertRegion, progressiveRank should equal to 4 or 3. func (*rankV2) isAvailable(s *solution) bool { - // TODO: Test if revert region can be enabled for -1. - return s.progressiveRank <= -3 || (s.progressiveRank < 0 && s.revertRegion == nil) + // TODO: Test if revert region can be enabled for 1. + return s.progressiveRank >= 3 || (s.progressiveRank > 0 && s.revertRegion == nil) } func (r *rankV2) checkByPriorityAndTolerance(loads []float64, f func(int) bool) bool { @@ -151,12 +151,12 @@ func (r *rankV2) filterUniformStore() (string, bool) { // If both dims are enough uniform, any schedule is unnecessary. return "all-dim", true } - if isUniformFirstPriority && (r.cur.progressiveRank == -2 || r.cur.progressiveRank == -3) { - // If first priority dim is enough uniform, -2 is unnecessary and maybe lead to worse balance for second priority dim + if isUniformFirstPriority && (r.cur.progressiveRank == 2 || r.cur.progressiveRank == 3) { + // If first priority dim is enough uniform, rank 2 is unnecessary and maybe lead to worse balance for second priority dim return utils.DimToString(r.firstPriority), true } - if isUniformSecondPriority && r.cur.progressiveRank == -1 { - // If second priority dim is enough uniform, -1 is unnecessary and maybe lead to worse balance for first priority dim + if isUniformSecondPriority && r.cur.progressiveRank == 1 { + // If second priority dim is enough uniform, rank 1 is unnecessary and maybe lead to worse balance for first priority dim return utils.DimToString(r.secondPriority), true } return "", false @@ -164,24 +164,24 @@ func (r *rankV2) filterUniformStore() (string, bool) { // The search-revert-regions is performed only when the following conditions are met to improve performance. // * `searchRevertRegions` is true. It depends on the result of the last `solve`. -// * The current solution is not good enough. progressiveRank == -2/0 +// * The current solution is not good enough. progressiveRank == 2/0 // * The current best solution is not good enough. -// - The current best solution has progressiveRank < -2 , but contain revert regions. -// - The current best solution has progressiveRank >= -2. +// - The current best solution has progressiveRank > 2 , but contain revert regions. +// - The current best solution has progressiveRank <= 2. func (r *rankV2) needSearchRevertRegions() bool { if !r.sche.searchRevertRegions[r.resourceTy] { return false } - return (r.cur.progressiveRank == -2 || r.cur.progressiveRank == 0) && - (r.best == nil || r.best.progressiveRank >= -2 || r.best.revertRegion != nil) + return (r.cur.progressiveRank == 2 || r.cur.progressiveRank == 0) && + (r.best == nil || r.best.progressiveRank <= 2 || r.best.revertRegion != nil) } func (r *rankV2) setSearchRevertRegions() { // The next solve is allowed to search-revert-regions only when the following conditions are met. // * No best solution was found this time. - // * The progressiveRank of the best solution == -2. (first is better, second is worsened) + // * The progressiveRank of the best solution == 2. (first is better, second is worsened) // * The best solution contain revert regions. - searchRevertRegions := r.best == nil || r.best.progressiveRank == -2 || r.best.revertRegion != nil + searchRevertRegions := r.best == nil || r.best.progressiveRank == 2 || r.best.revertRegion != nil r.sche.searchRevertRegions[r.resourceTy] = searchRevertRegions if searchRevertRegions { event := fmt.Sprintf("%s-%s-allow-search-revert-regions", r.rwTy.String(), r.opTy.String()) @@ -191,15 +191,15 @@ func (r *rankV2) setSearchRevertRegions() { // calcProgressiveRank calculates `r.cur.progressiveRank`. // See the comments of `solution.progressiveRank` for more about progressive rank. -// isBetter: score > 0 +// isBetter: score < 0 // isNotWorsened: score == 0 -// isWorsened: score < 0 +// isWorsened: score > 0 // | ↓ firstPriority \ secondPriority → | isBetter | isNotWorsened | isWorsened | -// | isBetter | -4 | -3 | -2 | -// | isNotWorsened | -1 | 1 | 1 | -// | isWorsened | 0 | 1 | 1 | +// | isBetter | 4 | 3 | 2 | +// | isNotWorsened | 1 | -1 | -1 | +// | isWorsened | 0 | -1 | -1 | func (r *rankV2) calcProgressiveRank() { - r.cur.progressiveRank = 1 + r.cur.progressiveRank = -1 r.cur.calcPeersRate(r.firstPriority, r.secondPriority) if r.cur.getPeersRateFromCache(r.firstPriority) < r.getMinRate(r.firstPriority) && r.cur.getPeersRateFromCache(r.secondPriority) < r.getMinRate(r.secondPriority) { @@ -208,10 +208,10 @@ func (r *rankV2) calcProgressiveRank() { if r.resourceTy == writeLeader { // For write leader, only compare the first priority. - // If the first priority is better, the progressiveRank is -3. + // If the first priority is better, the progressiveRank is 3. // Because it is not a solution that needs to be optimized. if r.getScoreByPriorities(r.firstPriority, r.firstPriorityRatios) > 0 { - r.cur.progressiveRank = -3 + r.cur.progressiveRank = 3 } return } @@ -222,16 +222,16 @@ func (r *rankV2) calcProgressiveRank() { switch { case firstScore > 0 && secondScore > 0: // If belonging to the case, all two dim will be more balanced, the best choice. - r.cur.progressiveRank = -4 + r.cur.progressiveRank = 4 case firstScore > 0 && secondScore == 0: // If belonging to the case, the first priority dim will be more balanced, the second priority dim will be not worsened. - r.cur.progressiveRank = -3 + r.cur.progressiveRank = 3 case firstScore > 0: // If belonging to the case, the first priority dim will be more balanced, ignore the second priority dim. - r.cur.progressiveRank = -2 + r.cur.progressiveRank = 2 case firstScore == 0 && secondScore > 0: // If belonging to the case, the first priority dim will be not worsened, the second priority dim will be more balanced. - r.cur.progressiveRank = -1 + r.cur.progressiveRank = 1 case secondScore > 0: // If belonging to the case, the second priority dim will be more balanced, ignore the first priority dim. // It's a solution that cannot be used directly, but can be optimized. @@ -437,12 +437,12 @@ func (r *rankV2) getScoreByPriorities(dim int, rs *rankRatios) int { // betterThan checks if `r.cur` is a better solution than `old`. func (r *rankV2) betterThan(old *solution) bool { - if old == nil || r.cur.progressiveRank <= splitProgressiveRank { + if old == nil || r.cur.progressiveRank >= splitProgressiveRank { return true } if r.cur.progressiveRank != old.progressiveRank { - // Smaller rank is better. - return r.cur.progressiveRank < old.progressiveRank + // Bigger rank is better. + return r.cur.progressiveRank > old.progressiveRank } if (r.cur.revertRegion == nil) != (old.revertRegion == nil) { // Fewer revertRegions are better. @@ -473,12 +473,12 @@ func (r *rankV2) betterThan(old *solution) bool { secondCmp := getRkCmpByPriority(r.secondPriority, r.cur.secondScore, old.secondScore, r.cur.getPeersRateFromCache(r.secondPriority), old.getPeersRateFromCache(r.secondPriority)) switch r.cur.progressiveRank { - case -4, -3, -2: // firstPriority + case 4, 3, 2: // firstPriority if firstCmp != 0 { return firstCmp > 0 } return secondCmp > 0 - case -1: // secondPriority + case 1: // secondPriority if secondCmp != 0 { return secondCmp > 0 } @@ -509,13 +509,13 @@ func getRkCmpByPriority(dim int, curScore, oldScore int, curPeersRate, oldPeersR func (r *rankV2) rankToDimString() string { switch r.cur.progressiveRank { - case -4: + case 4: return "all" - case -3: + case 3: return utils.DimToString(r.firstPriority) - case -2: + case 2: return utils.DimToString(r.firstPriority) + "-only" - case -1: + case 1: return utils.DimToString(r.secondPriority) default: return "none" diff --git a/pkg/schedule/schedulers/hot_region_rank_v2_test.go b/pkg/schedule/schedulers/hot_region_rank_v2_test.go index dd1d99fc01d..0237c2156ec 100644 --- a/pkg/schedule/schedulers/hot_region_rank_v2_test.go +++ b/pkg/schedule/schedulers/hot_region_rank_v2_test.go @@ -28,7 +28,7 @@ import ( ) func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { - // This is a test that searchRevertRegions finds a solution of rank -1. + // This is a test that searchRevertRegions finds a solution of rank 1. re := require.New(t) cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -69,7 +69,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { re.True(hb.searchRevertRegions[writePeer]) // Two operators can be generated when RankFormulaVersion == "v2". ops, _ = hb.Schedule(tc, false) - /* The revert region is currently disabled for the -1 case. + /* The revert region is currently disabled for the rank 1 case. re.Len(ops, 2) operatorutil.CheckTransferPeer(re, ops[0], operator.OpHotRegion, 2, 5) operatorutil.CheckTransferPeer(re, ops[1], operator.OpHotRegion, 5, 2) @@ -89,7 +89,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { } func TestHotWriteRegionScheduleWithRevertRegionsDimFirst(t *testing.T) { - // This is a test that searchRevertRegions finds a solution of rank -3. + // This is a test that searchRevertRegions finds a solution of rank 3. re := require.New(t) cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -141,7 +141,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirst(t *testing.T) { } func TestHotWriteRegionScheduleWithRevertRegionsDimFirstOnly(t *testing.T) { - // This is a test that searchRevertRegions finds a solution of rank -2. + // This is a test that searchRevertRegions finds a solution of rank 2. re := require.New(t) cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -242,7 +242,7 @@ func TestHotReadRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { re.True(hb.searchRevertRegions[readLeader]) // Two operators can be generated when RankFormulaVersion == "v2". ops, _ = hb.Schedule(tc, false) - /* The revert region is currently disabled for the -1 case. + /* The revert region is currently disabled for the rank 1 case. re.Len(ops, 2) operatorutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 2, 5) operatorutil.CheckTransferLeader(re, ops[1], operator.OpHotRegion, 5, 2) diff --git a/pkg/statistics/store_hot_peers_infos.go b/pkg/statistics/store_hot_peers_infos.go index a65366c41bf..f7873bdd744 100644 --- a/pkg/statistics/store_hot_peers_infos.go +++ b/pkg/statistics/store_hot_peers_infos.go @@ -15,7 +15,6 @@ package statistics import ( - "fmt" "math" "github.com/tikv/pd/pkg/core" @@ -151,7 +150,7 @@ func summaryStoresLoadByEngine( ) []*StoreLoadDetail { loadDetail := make([]*StoreLoadDetail, 0, len(storeInfos)) allStoreLoadSum := make([]float64, utils.DimLen) - allStoreHistoryLoadSum := make([][]float64, utils.DimLen) + allStoreHistoryLoadSum := make([][]float64, utils.DimLen) // row: dim, column: time allStoreCount := 0 allHotPeersCount := 0 @@ -166,49 +165,37 @@ func summaryStoresLoadByEngine( // Find all hot peers first var hotPeers []*HotPeerStat peerLoadSum := make([]float64, utils.DimLen) - // TODO: To remove `filterHotPeers`, we need to: - // HotLeaders consider `Write{Bytes,Keys}`, so when we schedule `writeLeader`, all peers are leader. + // For hot leaders, we need to calculate the sum of the leader's write and read flow rather than the all peers. for _, peer := range filterHotPeers(kind, storeHotPeers[id]) { for i := range peerLoadSum { peerLoadSum[i] += peer.GetLoad(i) } hotPeers = append(hotPeers, peer.Clone()) } - { - // Metric for debug. - // TODO: pre-allocate gauge metrics - ty := "byte-rate-" + rwTy.String() + "-" + kind.String() - hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[utils.ByteDim]) - ty = "key-rate-" + rwTy.String() + "-" + kind.String() - hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[utils.KeyDim]) - ty = "query-rate-" + rwTy.String() + "-" + kind.String() - hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[utils.QueryDim]) - } - loads := collector.GetLoads(storeLoads, peerLoadSum, rwTy, kind) + currentLoads := collector.GetLoads(storeLoads, peerLoadSum, rwTy, kind) var historyLoads [][]float64 if storesHistoryLoads != nil { - historyLoads = storesHistoryLoads.Get(id, rwTy, kind) - for i, loads := range historyLoads { - if allStoreHistoryLoadSum[i] == nil || len(allStoreHistoryLoadSum[i]) < len(loads) { - allStoreHistoryLoadSum[i] = make([]float64, len(loads)) + for i, historyLoads := range storesHistoryLoads.Get(id, rwTy, kind) { + if allStoreHistoryLoadSum[i] == nil || len(allStoreHistoryLoadSum[i]) < len(historyLoads) { + allStoreHistoryLoadSum[i] = make([]float64, len(historyLoads)) } - for j, load := range loads { - allStoreHistoryLoadSum[i][j] += load + for j, historyLoad := range historyLoads { + allStoreHistoryLoadSum[i][j] += historyLoad } } - storesHistoryLoads.Add(id, rwTy, kind, loads) + storesHistoryLoads.Add(id, rwTy, kind, currentLoads) } for i := range allStoreLoadSum { - allStoreLoadSum[i] += loads[i] + allStoreLoadSum[i] += currentLoads[i] } allStoreCount += 1 allHotPeersCount += len(hotPeers) // Build store load prediction from current load and pending influence. stLoadPred := (&StoreLoad{ - Loads: loads, + Loads: currentLoads, Count: float64(len(hotPeers)), HistoryLoads: historyLoads, }).ToLoadPred(rwTy, info.PendingSum) @@ -231,8 +218,8 @@ func summaryStoresLoadByEngine( expectLoads[i] = allStoreLoadSum[i] / float64(allStoreCount) } - // todo: remove some the max value or min value to avoid the effect of extreme value. - expectHistoryLoads := make([][]float64, utils.DimLen) + // TODO: remove some the max value or min value to avoid the effect of extreme value. + expectHistoryLoads := make([][]float64, utils.DimLen) // row: dim, column: time for i := range allStoreHistoryLoadSum { expectHistoryLoads[i] = make([]float64, len(allStoreHistoryLoadSum[i])) for j := range allStoreHistoryLoadSum[i] { @@ -285,13 +272,20 @@ func summaryStoresLoadByEngine( return loadDetail } +// filterHotPeers filters hot peers according to kind. +// If kind is RegionKind, all hot peers will be returned. +// If kind is LeaderKind, only leader hot peers will be returned. func filterHotPeers(kind constant.ResourceKind, peers []*HotPeerStat) []*HotPeerStat { ret := make([]*HotPeerStat, 0, len(peers)) for _, peer := range peers { - if kind == constant.LeaderKind && !peer.IsLeader() { - continue + switch kind { + case constant.RegionKind: + ret = append(ret, peer) + case constant.LeaderKind: + if peer.IsLeader() { + ret = append(ret, peer) + } } - ret = append(ret, peer) } return ret } From 11da62247bcd0c26dfedec25c11e56cab7c48626 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Fri, 26 Jul 2024 17:17:18 +0800 Subject: [PATCH 3/8] tools: fix panic issue in tso bench (#8449) close tikv/pd#8448 tools: fix panic issue in tso bench Signed-off-by: nolouch Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- tools/pd-tso-bench/main.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/tools/pd-tso-bench/main.go b/tools/pd-tso-bench/main.go index 3726373779e..1e59dfd2a2a 100644 --- a/tools/pd-tso-bench/main.go +++ b/tools/pd-tso-bench/main.go @@ -386,20 +386,27 @@ func reqWorker(ctx context.Context, pdClients []pd.Client, clientIdx int, durCh var ticker *time.Ticker if *maxTSOSendIntervalMilliseconds > 0 { sleepBeforeGetTS := time.Duration(rand.Intn(*maxTSOSendIntervalMilliseconds)) * time.Millisecond - ticker = time.NewTicker(sleepBeforeGetTS) - select { - case <-reqCtx.Done(): - case <-ticker.C: - totalSleepBeforeGetTS += sleepBeforeGetTS + if sleepBeforeGetTS > 0 { + ticker = time.NewTicker(sleepBeforeGetTS) + select { + case <-reqCtx.Done(): + case <-ticker.C: + totalSleepBeforeGetTS += sleepBeforeGetTS + } } } _, _, err = pdCli.GetLocalTS(reqCtx, *dcLocation) if errors.Cause(err) == context.Canceled { - ticker.Stop() + if ticker != nil { + ticker.Stop() + } + return } if err == nil { - ticker.Stop() + if ticker != nil { + ticker.Stop() + } break } log.Error(fmt.Sprintf("%v", err)) From c53f1d5c79ccb776c9090acdaae36d01e99b8ae6 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 29 Jul 2024 13:17:46 +0800 Subject: [PATCH 4/8] api: enhance ttl config api output (#8451) close tikv/pd#8450 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- server/api/config.go | 12 ++++++++---- tests/server/config/config_test.go | 6 ++++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/server/api/config.go b/server/api/config.go index c8233f8d5eb..d280439a988 100644 --- a/server/api/config.go +++ b/server/api/config.go @@ -119,19 +119,23 @@ func (h *confHandler) SetConfig(w http.ResponseWriter, r *http.Request) { return } - if ttlSec := r.URL.Query().Get("ttlSecond"); ttlSec != "" { - ttls, err := strconv.Atoi(ttlSec) + if ttlString := r.URL.Query().Get("ttlSecond"); ttlString != "" { + ttlSec, err := strconv.Atoi(ttlString) if err != nil { h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } // if ttlSecond defined, we will apply if to temp configuration. - err = h.svr.SaveTTLConfig(conf, time.Duration(ttls)*time.Second) + err = h.svr.SaveTTLConfig(conf, time.Duration(ttlSec)*time.Second) if err != nil { h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - h.rd.JSON(w, http.StatusOK, "The config is updated.") + if ttlSec == 0 { + h.rd.JSON(w, http.StatusOK, "The ttl config is deleted.") + } else { + h.rd.JSON(w, http.StatusOK, "The ttl config is updated.") + } return } diff --git a/tests/server/config/config_test.go b/tests/server/config/config_test.go index b5d5b044f12..d225614fa96 100644 --- a/tests/server/config/config_test.go +++ b/tests/server/config/config_test.go @@ -530,10 +530,12 @@ func (suite *configTestSuite) checkConfigTTL(cluster *tests.TestCluster) { assertTTLConfig(re, cluster, false) // test cleaning up - err = tu.CheckPostJSON(tests.TestDialClient, createTTLUrl(urlPrefix, 5), postData, tu.StatusOK(re)) + err = tu.CheckPostJSON(tests.TestDialClient, createTTLUrl(urlPrefix, 5), postData, + tu.StatusOK(re), tu.StringEqual(re, "\"The ttl config is updated.\"\n")) re.NoError(err) assertTTLConfig(re, cluster, true) - err = tu.CheckPostJSON(tests.TestDialClient, createTTLUrl(urlPrefix, 0), postData, tu.StatusOK(re)) + err = tu.CheckPostJSON(tests.TestDialClient, createTTLUrl(urlPrefix, 0), postData, + tu.StatusOK(re), tu.StatusOK(re), tu.StringEqual(re, "\"The ttl config is deleted.\"\n")) re.NoError(err) assertTTLConfig(re, cluster, false) From 5d774473752446e2e5a981369b7e00351fda8848 Mon Sep 17 00:00:00 2001 From: Hu# Date: Mon, 29 Jul 2024 13:29:46 +0800 Subject: [PATCH 5/8] tools/heartbeat: support to collect metrics (#8235) ref tikv/pd#8135 tools/heartbeat: support to collect metrics Signed-off-by: husharp Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- tools/go.mod | 2 +- tools/pd-heartbeat-bench/config/config.go | 2 + tools/pd-heartbeat-bench/main.go | 33 +++- tools/pd-heartbeat-bench/metrics/util.go | 231 ++++++++++++++++++++++ 4 files changed, 259 insertions(+), 9 deletions(-) create mode 100644 tools/pd-heartbeat-bench/metrics/util.go diff --git a/tools/go.mod b/tools/go.mod index caafef12b87..aef95dfa660 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -26,6 +26,7 @@ require ( github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.19.0 + github.com/prometheus/common v0.51.1 github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 @@ -132,7 +133,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect github.com/prometheus/client_model v0.6.0 // indirect - github.com/prometheus/common v0.51.1 // indirect github.com/prometheus/procfs v0.13.0 // indirect github.com/rs/cors v1.7.0 // indirect github.com/samber/lo v1.37.0 // indirect diff --git a/tools/pd-heartbeat-bench/config/config.go b/tools/pd-heartbeat-bench/config/config.go index 12455d78658..dc5a2a6a047 100644 --- a/tools/pd-heartbeat-bench/config/config.go +++ b/tools/pd-heartbeat-bench/config/config.go @@ -53,6 +53,7 @@ type Config struct { ReportRatio float64 `toml:"report-ratio" json:"report-ratio"` Sample bool `toml:"sample" json:"sample"` Round int `toml:"round" json:"round"` + MetricsAddr string `toml:"metrics-addr" json:"metrics-addr"` } // NewConfig return a set of settings. @@ -69,6 +70,7 @@ func NewConfig() *Config { fs.StringVar(&cfg.Security.CertPath, "cert", "", "path of file that contains X509 certificate in PEM format") fs.StringVar(&cfg.Security.KeyPath, "key", "", "path of file that contains X509 key in PEM format") fs.Uint64Var(&cfg.InitEpochVer, "epoch-ver", 1, "the initial epoch version value") + fs.StringVar(&cfg.MetricsAddr, "metrics-addr", "127.0.0.1:9090", "the address to pull metrics") return cfg } diff --git a/tools/pd-heartbeat-bench/main.go b/tools/pd-heartbeat-bench/main.go index 77ae6354bff..9d71be1129b 100644 --- a/tools/pd-heartbeat-bench/main.go +++ b/tools/pd-heartbeat-bench/main.go @@ -23,6 +23,7 @@ import ( "net/http" "os" "os/signal" + "strconv" "sync" "sync/atomic" "syscall" @@ -46,6 +47,7 @@ import ( "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/tools/pd-heartbeat-bench/config" + "github.com/tikv/pd/tools/pd-heartbeat-bench/metrics" "go.etcd.io/etcd/pkg/report" "go.uber.org/zap" ) @@ -528,6 +530,7 @@ func main() { defer heartbeatTicker.Stop() var resolvedTSTicker = time.NewTicker(time.Second) defer resolvedTSTicker.Stop() + withMetric := metrics.InitMetric2Collect(cfg.MetricsAddr) for { select { case <-heartbeatTicker.C: @@ -544,21 +547,19 @@ func main() { wg.Add(1) go regions.handleRegionHeartbeat(wg, streams[id], id, rep) } + if withMetric { + metrics.CollectMetrics(regions.updateRound, time.Second) + } wg.Wait() since := time.Since(startTime).Seconds() close(rep.Results()) regions.result(cfg.RegionCount, since) stats := <-r - log.Info("region heartbeat stats", zap.String("total", fmt.Sprintf("%.4fs", stats.Total.Seconds())), - zap.String("slowest", fmt.Sprintf("%.4fs", stats.Slowest)), - zap.String("fastest", fmt.Sprintf("%.4fs", stats.Fastest)), - zap.String("average", fmt.Sprintf("%.4fs", stats.Average)), - zap.String("stddev", fmt.Sprintf("%.4fs", stats.Stddev)), - zap.String("rps", fmt.Sprintf("%.4f", stats.RPS)), - zap.Uint64("max-epoch-version", maxVersion), - ) + log.Info("region heartbeat stats", + metrics.RegionFields(stats, zap.Uint64("max-epoch-version", maxVersion))...) log.Info("store heartbeat stats", zap.String("max", fmt.Sprintf("%.4fs", since))) + metrics.CollectRegionAndStoreStats(&stats, &since) regions.update(cfg, options) go stores.update(regions) // update stores in background, unusually region heartbeat is slower than store update. case <-resolvedTSTicker.C: @@ -594,6 +595,7 @@ func main() { } func exit(code int) { + metrics.OutputConclusion() os.Exit(code) } @@ -689,6 +691,21 @@ func runHTTPServer(cfg *config.Config, options *config.Options) { c.IndentedJSON(http.StatusOK, output) }) + engine.GET("metrics-collect", func(c *gin.Context) { + second := c.Query("second") + if second == "" { + c.String(http.StatusBadRequest, "missing second") + return + } + secondInt, err := strconv.Atoi(second) + if err != nil { + c.String(http.StatusBadRequest, "invalid second") + return + } + metrics.CollectMetrics(metrics.WarmUpRound, time.Duration(secondInt)*time.Second) + c.IndentedJSON(http.StatusOK, "Successfully collect metrics") + }) + engine.Run(cfg.StatusAddr) } diff --git a/tools/pd-heartbeat-bench/metrics/util.go b/tools/pd-heartbeat-bench/metrics/util.go new file mode 100644 index 00000000000..bf5010e73da --- /dev/null +++ b/tools/pd-heartbeat-bench/metrics/util.go @@ -0,0 +1,231 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "context" + "fmt" + "math" + "net/url" + "strings" + "time" + + "github.com/pingcap/log" + "github.com/prometheus/client_golang/api" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "go.etcd.io/etcd/pkg/report" + "go.uber.org/zap" +) + +var ( + prometheusCli api.Client + finalMetrics2Collect []Metric + avgRegionStats report.Stats + avgStoreTime float64 + collectRound = 1.0 + + metrics2Collect = []Metric{ + {promSQL: cpuMetric, name: "max cpu usage(%)", max: true}, + {promSQL: memoryMetric, name: "max memory usage(G)", max: true}, + {promSQL: goRoutineMetric, name: "max go routines", max: true}, + {promSQL: hbLatency99Metric, name: "99% Heartbeat Latency(ms)"}, + {promSQL: hbLatencyAvgMetric, name: "Avg Heartbeat Latency(ms)"}, + } + + // Prometheus SQL + cpuMetric = `max_over_time(irate(process_cpu_seconds_total{job=~".*pd.*"}[30s])[1h:30s]) * 100` + memoryMetric = `max_over_time(go_memstats_heap_inuse_bytes{job=~".*pd.*"}[1h])/1024/1024/1024` + goRoutineMetric = `max_over_time(go_goroutines{job=~".*pd.*"}[1h])` + hbLatency99Metric = `histogram_quantile(0.99, sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_bucket{}[1m])) by (le))` + hbLatencyAvgMetric = `sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_sum{}[1m])) / sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_count{}[1m]))` + + // Heartbeat Performance Duration BreakDown + breakdownNames = []string{ + "AsyncHotStatsDuration", + "CollectRegionStats", + "Other", + "PreCheck", + "RegionGuide", + "SaveCache_CheckOverlaps", + "SaveCache_InvalidRegion", + "SaveCache_SetRegion", + "SaveCache_UpdateSubTree", + } + hbBreakdownMetricByName = func(name string) string { + return fmt.Sprintf(`sum(rate(pd_core_region_heartbeat_breakdown_handle_duration_seconds_sum{name="%s"}[1m]))`, name) + } +) + +type Metric struct { + promSQL string + name string + value float64 + // max indicates whether the metric is a max value + max bool +} + +func InitMetric2Collect(endpoint string) (withMetric bool) { + for _, name := range breakdownNames { + metrics2Collect = append(metrics2Collect, Metric{ + promSQL: hbBreakdownMetricByName(name), + name: name, + }) + } + finalMetrics2Collect = metrics2Collect + + if j := strings.Index(endpoint, "//"); j == -1 { + endpoint = "http://" + endpoint + } + cu, err := url.Parse(endpoint) + if err != nil { + log.Error("parse prometheus url error", zap.Error(err)) + return false + } + prometheusCli, err = NewPrometheusClient(*cu) + if err != nil { + log.Error("create prometheus client error", zap.Error(err)) + return false + } + // check whether the prometheus is available + _, err = getMetric(prometheusCli, goRoutineMetric, time.Now()) + if err != nil { + log.Error("check prometheus availability error, please check the prometheus address", zap.Error(err)) + return false + } + return true +} + +func NewPrometheusClient(prometheusURL url.URL) (api.Client, error) { + client, err := api.NewClient(api.Config{ + Address: prometheusURL.String(), + }) + if err != nil { + return nil, err + } + + return client, nil +} + +// WarmUpRound wait for the first round to warm up +const WarmUpRound = 1 + +func CollectMetrics(curRound int, wait time.Duration) { + if curRound < WarmUpRound { + return + } + // retry 5 times to get average value + res := make([]struct { + sum float64 + count int + }, len(metrics2Collect)) + for i := 0; i < 5; i++ { + for j, m := range metrics2Collect { + r, err := getMetric(prometheusCli, m.promSQL, time.Now()) + if err != nil { + log.Error("get metric error", zap.String("name", m.name), zap.String("prom sql", m.promSQL), zap.Error(err)) + } else if len(r) > 0 { + res[j].sum += r[0] + res[j].count += 1 + } + } + time.Sleep(wait) + } + getRes := func(index int) float64 { + if res[index].count == 0 { + return 0 + } + return res[index].sum / float64(res[index].count) + } + for i := 0; i < len(metrics2Collect); i++ { + metrics2Collect[i].value = getRes(i) + if metrics2Collect[i].max { + finalMetrics2Collect[i].value = max(finalMetrics2Collect[i].value, metrics2Collect[i].value) + } else { + finalMetrics2Collect[i].value = (finalMetrics2Collect[i].value*collectRound + metrics2Collect[i].value) / (collectRound + 1) + } + } + + collectRound += 1 + log.Info("metrics collected", zap.Float64("round", collectRound), zap.String("metrics", formatMetrics(metrics2Collect))) +} + +func getMetric(cli api.Client, query string, ts time.Time) ([]float64, error) { + httpAPI := v1.NewAPI(cli) + val, _, err := httpAPI.Query(context.Background(), query, ts) + if err != nil { + return nil, err + } + valMatrix := val.(model.Vector) + if len(valMatrix) == 0 { + return nil, nil + } + var value []float64 + for i := range valMatrix { + value = append(value, float64(valMatrix[i].Value)) + // judge whether exceeded float maximum value + if math.IsNaN(value[i]) { + return nil, fmt.Errorf("prometheus query result exceeded float maximum value, result=%s", valMatrix[i].String()) + } + } + return value, nil +} + +func formatMetrics(ms []Metric) string { + res := "" + for _, m := range ms { + res += "[" + m.name + "]" + " " + fmt.Sprintf("%.10f", m.value) + " " + } + return res +} + +func CollectRegionAndStoreStats(regionStats *report.Stats, storeTime *float64) { + if regionStats != nil && storeTime != nil { + collect(*regionStats, *storeTime) + } +} + +func collect(regionStats report.Stats, storeTime float64) { + average := func(avg, new float64) float64 { + return (avg*collectRound + new) / (collectRound + 1) + } + + avgRegionStats.Total = time.Duration(average(float64(avgRegionStats.Total), float64(regionStats.Total))) + avgRegionStats.Average = average(avgRegionStats.Average, regionStats.Average) + avgRegionStats.Stddev = average(avgRegionStats.Stddev, regionStats.Stddev) + avgRegionStats.Fastest = average(avgRegionStats.Fastest, regionStats.Fastest) + avgRegionStats.Slowest = average(avgRegionStats.Slowest, regionStats.Slowest) + avgRegionStats.RPS = average(avgRegionStats.RPS, regionStats.RPS) + avgStoreTime = average(avgStoreTime, storeTime) +} + +func OutputConclusion() { + logFields := RegionFields(avgRegionStats, + zap.Float64("avg store time", avgStoreTime), + zap.Float64("current round", collectRound), + zap.String("metrics", formatMetrics(finalMetrics2Collect))) + log.Info("final metrics collected", logFields...) +} + +func RegionFields(stats report.Stats, fields ...zap.Field) []zap.Field { + return append([]zap.Field{ + zap.String("total", fmt.Sprintf("%.4fs", stats.Total.Seconds())), + zap.String("slowest", fmt.Sprintf("%.4fs", stats.Slowest)), + zap.String("fastest", fmt.Sprintf("%.4fs", stats.Fastest)), + zap.String("average", fmt.Sprintf("%.4fs", stats.Average)), + zap.String("stddev", fmt.Sprintf("%.4fs", stats.Stddev)), + zap.String("rps", fmt.Sprintf("%.4f", stats.RPS)), + }, fields...) +} From 9af28fc1d426697f90bba2f46b3957ec4f03bdcb Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Mon, 29 Jul 2024 16:13:17 +0800 Subject: [PATCH 6/8] client: Merge the two tsoStream types to reuse the same error handling and metrics reporting code (#8433) ref tikv/pd#8432 client: Merge the two tsoStream types to reuse the same error handling and metrics reporting code This commit merges the two `xxxTSOStream` types so that the error handling and metrics reporting logic for PD server deployment and TSO service deployment can be reused. Signed-off-by: MyonKeminta Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/tso_client.go | 6 +- client/tso_dispatcher.go | 4 +- client/tso_stream.go | 138 +++++++++++++++++++-------------------- 3 files changed, 72 insertions(+), 76 deletions(-) diff --git a/client/tso_client.go b/client/tso_client.go index 5e221eae478..2f3b949f017 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -350,9 +350,7 @@ type tsoConnectionContext struct { // Current URL of the stream connection. streamURL string // Current stream to send gRPC requests. - // - `pdpb.PD_TsoClient` for a leader/follower in the PD cluster. - // - `tsopb.TSO_TsoClient` for a primary/secondary in the TSO cluster. - stream tsoStream + stream *tsoStream } // updateConnectionCtxs will choose the proper way to update the connections for the given dc-location. @@ -382,7 +380,7 @@ func (c *tsoClient) tryConnectToTSO( var ( networkErrNum uint64 err error - stream tsoStream + stream *tsoStream url string cc *grpc.ClientConn updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) { diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 0919fd84744..a7c99057275 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -186,7 +186,7 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) { streamCtx context.Context cancel context.CancelFunc streamURL string - stream tsoStream + stream *tsoStream ) // Loop through each batch of TSO requests and send them for processing. streamLoopTimer := time.NewTimer(option.timeout) @@ -393,7 +393,7 @@ func chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext } func (td *tsoDispatcher) processRequests( - stream tsoStream, dcLocation string, tbc *tsoBatchController, + stream *tsoStream, dcLocation string, tbc *tsoBatchController, ) error { var ( requests = tbc.getCollectedRequests() diff --git a/client/tso_stream.go b/client/tso_stream.go index 9c4d78dfe18..da9cab95ba0 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -47,7 +47,7 @@ func (*tsoTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBui // TSO Stream Builder type tsoStreamBuilder interface { - build(context.Context, context.CancelFunc, time.Duration) (tsoStream, error) + build(context.Context, context.CancelFunc, time.Duration) (*tsoStream, error) } type pdTSOStreamBuilder struct { @@ -55,14 +55,14 @@ type pdTSOStreamBuilder struct { client pdpb.PDClient } -func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (tsoStream, error) { +func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (*tsoStream, error) { done := make(chan struct{}) // TODO: we need to handle a conner case that this goroutine is timeout while the stream is successfully created. go checkStreamTimeout(ctx, cancel, done, timeout) stream, err := b.client.Tso(ctx) done <- struct{}{} if err == nil { - return &pdTSOStream{stream: stream, serverURL: b.serverURL}, nil + return &tsoStream{stream: pdTSOStreamAdapter{stream}, serverURL: b.serverURL}, nil } return nil, err } @@ -74,14 +74,14 @@ type tsoTSOStreamBuilder struct { func (b *tsoTSOStreamBuilder) build( ctx context.Context, cancel context.CancelFunc, timeout time.Duration, -) (tsoStream, error) { +) (*tsoStream, error) { done := make(chan struct{}) // TODO: we need to handle a conner case that this goroutine is timeout while the stream is successfully created. go checkStreamTimeout(ctx, cancel, done, timeout) stream, err := b.client.Tso(ctx) done <- struct{}{} if err == nil { - return &tsoTSOStream{stream: stream, serverURL: b.serverURL}, nil + return &tsoStream{stream: tsoTSOStreamAdapter{stream}, serverURL: b.serverURL}, nil } return nil, err } @@ -99,30 +99,24 @@ func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done cha <-done } -// TSO Stream - -type tsoStream interface { - getServerURL() string - // processRequests processes TSO requests in streaming mode to get timestamps - processRequests( - clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, - count int64, batchStartTime time.Time, - ) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) +type tsoRequestResult struct { + physical, logical int64 + count uint32 + suffixBits uint32 + respKeyspaceGroupID uint32 } -type pdTSOStream struct { - serverURL string - stream pdpb.PD_TsoClient +type grpcTSOStreamAdapter interface { + Send(clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, + count int64) error + Recv() (tsoRequestResult, error) } -func (s *pdTSOStream) getServerURL() string { - return s.serverURL +type pdTSOStreamAdapter struct { + stream pdpb.PD_TsoClient } -func (s *pdTSOStream) processRequests( - clusterID uint64, _, _ uint32, dcLocation string, count int64, batchStartTime time.Time, -) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { - start := time.Now() +func (s pdTSOStreamAdapter) Send(clusterID uint64, _, _ uint32, dcLocation string, count int64) error { req := &pdpb.TsoRequest{ Header: &pdpb.RequestHeader{ ClusterId: clusterID, @@ -130,55 +124,28 @@ func (s *pdTSOStream) processRequests( Count: uint32(count), DcLocation: dcLocation, } + return s.stream.Send(req) +} - if err = s.stream.Send(req); err != nil { - if err == io.EOF { - err = errs.ErrClientTSOStreamClosed - } else { - err = errors.WithStack(err) - } - return - } - tsoBatchSendLatency.Observe(time.Since(batchStartTime).Seconds()) +func (s pdTSOStreamAdapter) Recv() (tsoRequestResult, error) { resp, err := s.stream.Recv() - duration := time.Since(start).Seconds() if err != nil { - requestFailedDurationTSO.Observe(duration) - if err == io.EOF { - err = errs.ErrClientTSOStreamClosed - } else { - err = errors.WithStack(err) - } - return + return tsoRequestResult{}, err } - requestDurationTSO.Observe(duration) - tsoBatchSize.Observe(float64(count)) - - if resp.GetCount() != uint32(count) { - err = errors.WithStack(errTSOLength) - return - } - - ts := resp.GetTimestamp() - respKeyspaceGroupID = defaultKeySpaceGroupID - physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits() - return + return tsoRequestResult{ + physical: resp.GetTimestamp().GetPhysical(), + logical: resp.GetTimestamp().GetLogical(), + count: resp.GetCount(), + suffixBits: resp.GetTimestamp().GetSuffixBits(), + respKeyspaceGroupID: defaultKeySpaceGroupID, + }, nil } -type tsoTSOStream struct { - serverURL string - stream tsopb.TSO_TsoClient +type tsoTSOStreamAdapter struct { + stream tsopb.TSO_TsoClient } -func (s *tsoTSOStream) getServerURL() string { - return s.serverURL -} - -func (s *tsoTSOStream) processRequests( - clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, - count int64, batchStartTime time.Time, -) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { - start := time.Now() +func (s tsoTSOStreamAdapter) Send(clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, count int64) error { req := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ ClusterId: clusterID, @@ -188,8 +155,40 @@ func (s *tsoTSOStream) processRequests( Count: uint32(count), DcLocation: dcLocation, } + return s.stream.Send(req) +} - if err = s.stream.Send(req); err != nil { +func (s tsoTSOStreamAdapter) Recv() (tsoRequestResult, error) { + resp, err := s.stream.Recv() + if err != nil { + return tsoRequestResult{}, err + } + return tsoRequestResult{ + physical: resp.GetTimestamp().GetPhysical(), + logical: resp.GetTimestamp().GetLogical(), + count: resp.GetCount(), + suffixBits: resp.GetTimestamp().GetSuffixBits(), + respKeyspaceGroupID: resp.GetHeader().GetKeyspaceGroupId(), + }, nil +} + +type tsoStream struct { + serverURL string + // The internal gRPC stream. + // - `pdpb.PD_TsoClient` for a leader/follower in the PD cluster. + // - `tsopb.TSO_TsoClient` for a primary/secondary in the TSO cluster. + stream grpcTSOStreamAdapter +} + +func (s *tsoStream) getServerURL() string { + return s.serverURL +} + +func (s *tsoStream) processRequests( + clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, count int64, batchStartTime time.Time, +) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { + start := time.Now() + if err = s.stream.Send(clusterID, keyspaceID, keyspaceGroupID, dcLocation, count); err != nil { if err == io.EOF { err = errs.ErrClientTSOStreamClosed } else { @@ -198,7 +197,7 @@ func (s *tsoTSOStream) processRequests( return } tsoBatchSendLatency.Observe(time.Since(batchStartTime).Seconds()) - resp, err := s.stream.Recv() + res, err := s.stream.Recv() duration := time.Since(start).Seconds() if err != nil { requestFailedDurationTSO.Observe(duration) @@ -212,13 +211,12 @@ func (s *tsoTSOStream) processRequests( requestDurationTSO.Observe(duration) tsoBatchSize.Observe(float64(count)) - if resp.GetCount() != uint32(count) { + if res.count != uint32(count) { err = errors.WithStack(errTSOLength) return } - ts := resp.GetTimestamp() - respKeyspaceGroupID = resp.GetHeader().GetKeyspaceGroupId() - physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits() + respKeyspaceGroupID = res.respKeyspaceGroupID + physical, logical, suffixBits = res.physical, res.logical, res.suffixBits return } From 84f90f464ae58c26b7eb97a4518e7de97242730f Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Wed, 31 Jul 2024 14:43:18 +0800 Subject: [PATCH 7/8] *: add some comments to exported function, part of enable revive.exported (#8459) ref tikv/pd#8458 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- .golangci.yml | 3 + pkg/core/metrics.go | 77 ++++++++++++++++++++---- pkg/core/region_tree.go | 3 + pkg/core/storelimit/store_limit.go | 1 + pkg/id/id.go | 1 + pkg/ratelimit/metrics.go | 20 +++--- pkg/ratelimit/runner.go | 12 ++-- pkg/schedule/filter/counter.go | 1 + pkg/schedule/filter/filters.go | 35 +++++++++++ pkg/schedule/filter/region_filters.go | 2 + pkg/schedule/operator/builder.go | 1 + pkg/schedule/operator/operator_queue.go | 5 ++ pkg/schedule/schedulers/split_bucket.go | 8 +-- pkg/schedule/splitter/region_splitter.go | 2 + pkg/schedule/type/type.go | 7 ++- pkg/statistics/collector.go | 24 ++++---- pkg/statistics/hot_peer.go | 10 +-- pkg/statistics/hot_peer_cache.go | 7 ++- pkg/statistics/store_collection.go | 11 ++-- pkg/statistics/store_hot_peers_infos.go | 6 +- pkg/statistics/utils/topn.go | 46 +++++++------- pkg/storage/kv/etcd_kv.go | 4 ++ pkg/storage/kv/mem_kv.go | 5 ++ pkg/syncer/client.go | 12 ++-- pkg/syncer/history_buffer.go | 8 +-- pkg/syncer/history_buffer_test.go | 12 ++-- pkg/syncer/server.go | 14 ++--- pkg/window/counter.go | 10 +++ plugin/scheduler_example/evict_leader.go | 14 +++++ 29 files changed, 253 insertions(+), 108 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index d5b2e4e7f5a..bc1ba393f39 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -214,3 +214,6 @@ issues: - path: (pd-analysis|pd-api-bench|pd-backup|pd-ctl|pd-heartbeat-bench|pd-recover|pd-simulator|pd-tso-bench|pd-ut|regions-dump|stores-dump) linters: - errcheck + include: + # remove the comment after the path is ready + # - EXC0012 diff --git a/pkg/core/metrics.go b/pkg/core/metrics.go index 7d2c904f319..65cc8be861e 100644 --- a/pkg/core/metrics.go +++ b/pkg/core/metrics.go @@ -108,19 +108,33 @@ type saveCacheStats struct { // RegionHeartbeatProcessTracer is used to trace the process of handling region heartbeat. type RegionHeartbeatProcessTracer interface { + // Begin starts the tracing. Begin() + // OnPreCheckFinished will be called when the pre-check is finished. OnPreCheckFinished() + // OnAsyncHotStatsFinished will be called when the async hot stats is finished. OnAsyncHotStatsFinished() + // OnRegionGuideFinished will be called when the region guide is finished. OnRegionGuideFinished() + // OnSaveCacheBegin will be called when the save cache begins. OnSaveCacheBegin() + // OnSaveCacheFinished will be called when the save cache is finished. OnSaveCacheFinished() + // OnCheckOverlapsFinished will be called when the check overlaps is finished. OnCheckOverlapsFinished() + // OnValidateRegionFinished will be called when the validate region is finished. OnValidateRegionFinished() + // OnSetRegionFinished will be called when the set region is finished. OnSetRegionFinished() + // OnUpdateSubTreeFinished will be called when the update sub tree is finished. OnUpdateSubTreeFinished() + // OnCollectRegionStatsFinished will be called when the collect region stats is finished. OnCollectRegionStatsFinished() + // OnAllStageFinished will be called when all stages are finished. OnAllStageFinished() + // LogFields returns the log fields. LogFields() []zap.Field + // Release releases the tracer. Release() } @@ -131,21 +145,48 @@ func NewNoopHeartbeatProcessTracer() RegionHeartbeatProcessTracer { return &noopHeartbeatProcessTracer{} } -func (*noopHeartbeatProcessTracer) Begin() {} -func (*noopHeartbeatProcessTracer) OnPreCheckFinished() {} -func (*noopHeartbeatProcessTracer) OnAsyncHotStatsFinished() {} -func (*noopHeartbeatProcessTracer) OnRegionGuideFinished() {} -func (*noopHeartbeatProcessTracer) OnSaveCacheBegin() {} -func (*noopHeartbeatProcessTracer) OnSaveCacheFinished() {} -func (*noopHeartbeatProcessTracer) OnCheckOverlapsFinished() {} -func (*noopHeartbeatProcessTracer) OnValidateRegionFinished() {} -func (*noopHeartbeatProcessTracer) OnSetRegionFinished() {} -func (*noopHeartbeatProcessTracer) OnUpdateSubTreeFinished() {} +// Begin implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) Begin() {} + +// OnPreCheckFinished implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnPreCheckFinished() {} + +// OnAsyncHotStatsFinished implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnAsyncHotStatsFinished() {} + +// OnRegionGuideFinished implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnRegionGuideFinished() {} + +// OnSaveCacheBegin implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnSaveCacheBegin() {} + +// OnSaveCacheFinished implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnSaveCacheFinished() {} + +// OnCheckOverlapsFinished implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnCheckOverlapsFinished() {} + +// OnValidateRegionFinished implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnValidateRegionFinished() {} + +// OnSetRegionFinished implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnSetRegionFinished() {} + +// OnUpdateSubTreeFinished implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnUpdateSubTreeFinished() {} + +// OnCollectRegionStatsFinished implements the RegionHeartbeatProcessTracer interface. func (*noopHeartbeatProcessTracer) OnCollectRegionStatsFinished() {} -func (*noopHeartbeatProcessTracer) OnAllStageFinished() {} + +// OnAllStageFinished implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnAllStageFinished() {} + +// LogFields implements the RegionHeartbeatProcessTracer interface. func (*noopHeartbeatProcessTracer) LogFields() []zap.Field { return nil } + +// Release implements the RegionHeartbeatProcessTracer interface. func (*noopHeartbeatProcessTracer) Release() {} type regionHeartbeatProcessTracer struct { @@ -163,12 +204,14 @@ func NewHeartbeatProcessTracer() RegionHeartbeatProcessTracer { return tracerPool.Get().(*regionHeartbeatProcessTracer) } +// Begin implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) Begin() { now := time.Now() h.startTime = now h.lastCheckTime = now } +// OnPreCheckFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnPreCheckFinished() { now := time.Now() h.preCheckDuration = now.Sub(h.lastCheckTime) @@ -177,6 +220,7 @@ func (h *regionHeartbeatProcessTracer) OnPreCheckFinished() { preCheckCount.Inc() } +// OnAsyncHotStatsFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnAsyncHotStatsFinished() { now := time.Now() h.asyncHotStatsDuration = now.Sub(h.lastCheckTime) @@ -185,6 +229,7 @@ func (h *regionHeartbeatProcessTracer) OnAsyncHotStatsFinished() { asyncHotStatsCount.Inc() } +// OnRegionGuideFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnRegionGuideFinished() { now := time.Now() h.regionGuideDuration = now.Sub(h.lastCheckTime) @@ -193,6 +238,7 @@ func (h *regionHeartbeatProcessTracer) OnRegionGuideFinished() { regionGuideCount.Inc() } +// OnSaveCacheBegin implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnSaveCacheBegin() { now := time.Now() h.saveCacheStats.startTime = now @@ -200,11 +246,13 @@ func (h *regionHeartbeatProcessTracer) OnSaveCacheBegin() { h.lastCheckTime = now } +// OnSaveCacheFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnSaveCacheFinished() { // update the outer checkpoint time h.lastCheckTime = time.Now() } +// OnCollectRegionStatsFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnCollectRegionStatsFinished() { now := time.Now() regionCollectDurationSum.Add(now.Sub(h.lastCheckTime).Seconds()) @@ -212,6 +260,7 @@ func (h *regionHeartbeatProcessTracer) OnCollectRegionStatsFinished() { h.lastCheckTime = now } +// OnCheckOverlapsFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnCheckOverlapsFinished() { now := time.Now() h.saveCacheStats.checkOverlapsDuration = now.Sub(h.lastCheckTime) @@ -220,6 +269,7 @@ func (h *regionHeartbeatProcessTracer) OnCheckOverlapsFinished() { checkOverlapsCount.Inc() } +// OnValidateRegionFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnValidateRegionFinished() { now := time.Now() h.saveCacheStats.validateRegionDuration = now.Sub(h.saveCacheStats.lastCheckTime) @@ -228,6 +278,7 @@ func (h *regionHeartbeatProcessTracer) OnValidateRegionFinished() { validateRegionCount.Inc() } +// OnSetRegionFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnSetRegionFinished() { now := time.Now() h.saveCacheStats.setRegionDuration = now.Sub(h.saveCacheStats.lastCheckTime) @@ -236,6 +287,7 @@ func (h *regionHeartbeatProcessTracer) OnSetRegionFinished() { setRegionCount.Inc() } +// OnUpdateSubTreeFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnUpdateSubTreeFinished() { now := time.Now() h.saveCacheStats.updateSubTreeDuration = now.Sub(h.saveCacheStats.lastCheckTime) @@ -244,6 +296,7 @@ func (h *regionHeartbeatProcessTracer) OnUpdateSubTreeFinished() { updateSubTreeCount.Inc() } +// OnAllStageFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnAllStageFinished() { now := time.Now() h.OtherDuration = now.Sub(h.lastCheckTime) @@ -251,6 +304,7 @@ func (h *regionHeartbeatProcessTracer) OnAllStageFinished() { otherCount.Inc() } +// LogFields implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) LogFields() []zap.Field { return []zap.Field{ zap.Duration("pre-check-duration", h.preCheckDuration), @@ -264,6 +318,7 @@ func (h *regionHeartbeatProcessTracer) LogFields() []zap.Field { } } +// Release implements the RegionHeartbeatProcessTracer interface. // Release puts the tracer back into the pool. func (h *regionHeartbeatProcessTracer) Release() { // Reset the fields of h to their zero values. diff --git a/pkg/core/region_tree.go b/pkg/core/region_tree.go index 9a148eeed18..0be207d515d 100644 --- a/pkg/core/region_tree.go +++ b/pkg/core/region_tree.go @@ -437,6 +437,7 @@ func (t *regionTree) RandomRegions(n int, ranges []KeyRange) []*RegionInfo { return regions } +// TotalSize returns the total size of all regions. func (t *regionTree) TotalSize() int64 { if t.length() == 0 { return 0 @@ -444,6 +445,8 @@ func (t *regionTree) TotalSize() int64 { return t.totalSize } +// TotalWriteRate returns the total write bytes rate and the total write keys +// rate of all regions. func (t *regionTree) TotalWriteRate() (bytesRate, keysRate float64) { if t.length() == 0 { return 0, 0 diff --git a/pkg/core/storelimit/store_limit.go b/pkg/core/storelimit/store_limit.go index e35ec773d80..fb7cad442bb 100644 --- a/pkg/core/storelimit/store_limit.go +++ b/pkg/core/storelimit/store_limit.go @@ -178,6 +178,7 @@ func (l *limit) Take(count int64) bool { return l.limiter.AllowN(int(count)) } +// GetRatePerSec returns the rate per second. func (l *limit) GetRatePerSec() float64 { l.ratePerSecMutex.RLock() defer l.ratePerSecMutex.RUnlock() diff --git a/pkg/id/id.go b/pkg/id/id.go index d0889717242..ea4a2a54341 100644 --- a/pkg/id/id.go +++ b/pkg/id/id.go @@ -105,6 +105,7 @@ func (alloc *allocatorImpl) Alloc() (uint64, error) { return alloc.base, nil } +// SetBase sets the base. func (alloc *allocatorImpl) SetBase(newBase uint64) error { alloc.mu.Lock() defer alloc.mu.Unlock() diff --git a/pkg/ratelimit/metrics.go b/pkg/ratelimit/metrics.go index c5510e66b26..0096a76de4c 100644 --- a/pkg/ratelimit/metrics.go +++ b/pkg/ratelimit/metrics.go @@ -24,35 +24,35 @@ const ( ) var ( - RunnerTaskMaxWaitingDuration = prometheus.NewGaugeVec( + runnerTaskMaxWaitingDuration = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "pd", Subsystem: "ratelimit", Name: "runner_task_max_waiting_duration_seconds", Help: "The duration of tasks waiting in the runner.", }, []string{nameStr}) - RunnerPendingTasks = prometheus.NewGaugeVec( + runnerPendingTasks = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "pd", Subsystem: "ratelimit", Name: "runner_pending_tasks", Help: "The number of pending tasks in the runner.", }, []string{nameStr, taskStr}) - RunnerFailedTasks = prometheus.NewCounterVec( + runnerFailedTasks = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", Subsystem: "ratelimit", Name: "runner_failed_tasks_total", Help: "The number of failed tasks in the runner.", }, []string{nameStr, taskStr}) - RunnerSucceededTasks = prometheus.NewCounterVec( + runnerSucceededTasks = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", Subsystem: "ratelimit", Name: "runner_success_tasks_total", Help: "The number of tasks in the runner.", }, []string{nameStr, taskStr}) - RunnerTaskExecutionDuration = prometheus.NewHistogramVec( + runnerTaskExecutionDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "pd", Subsystem: "ratelimit", @@ -63,9 +63,9 @@ var ( ) func init() { - prometheus.MustRegister(RunnerTaskMaxWaitingDuration) - prometheus.MustRegister(RunnerPendingTasks) - prometheus.MustRegister(RunnerFailedTasks) - prometheus.MustRegister(RunnerTaskExecutionDuration) - prometheus.MustRegister(RunnerSucceededTasks) + prometheus.MustRegister(runnerTaskMaxWaitingDuration) + prometheus.MustRegister(runnerPendingTasks) + prometheus.MustRegister(runnerFailedTasks) + prometheus.MustRegister(runnerTaskExecutionDuration) + prometheus.MustRegister(runnerSucceededTasks) } diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index a230177ac73..4b1b51f1768 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -90,7 +90,7 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur pendingTasks: make([]*Task, 0, initialCapacity), pendingTaskCount: make(map[string]int), existTasks: make(map[taskID]*Task), - maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), + maxWaitingDuration: runnerTaskMaxWaitingDuration.WithLabelValues(name), } return s } @@ -136,7 +136,7 @@ func (cr *ConcurrentRunner) Start(ctx context.Context) { maxDuration = time.Since(cr.pendingTasks[0].submittedAt) } for taskName, cnt := range cr.pendingTaskCount { - RunnerPendingTasks.WithLabelValues(cr.name, taskName).Set(float64(cnt)) + runnerPendingTasks.WithLabelValues(cr.name, taskName).Set(float64(cnt)) } cr.pendingMu.Unlock() cr.maxWaitingDuration.Set(maxDuration.Seconds()) @@ -157,8 +157,8 @@ func (cr *ConcurrentRunner) run(ctx context.Context, task *Task, token *TaskToke cr.limiter.ReleaseToken(token) cr.processPendingTasks() } - RunnerTaskExecutionDuration.WithLabelValues(cr.name, task.name).Observe(time.Since(start).Seconds()) - RunnerSucceededTasks.WithLabelValues(cr.name, task.name).Inc() + runnerTaskExecutionDuration.WithLabelValues(cr.name, task.name).Observe(time.Since(start).Seconds()) + runnerSucceededTasks.WithLabelValues(cr.name, task.name).Inc() } func (cr *ConcurrentRunner) processPendingTasks() { @@ -214,12 +214,12 @@ func (cr *ConcurrentRunner) RunTask(id uint64, name string, f func(context.Conte if !task.retained { maxWait := time.Since(cr.pendingTasks[0].submittedAt) if maxWait > cr.maxPendingDuration { - RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() + runnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() return ErrMaxWaitingTasksExceeded } } if pendingTaskNum > maxPendingTaskNum { - RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() + runnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() return ErrMaxWaitingTasksExceeded } } diff --git a/pkg/schedule/filter/counter.go b/pkg/schedule/filter/counter.go index 9742d2d0c9d..41211c1acce 100644 --- a/pkg/schedule/filter/counter.go +++ b/pkg/schedule/filter/counter.go @@ -128,6 +128,7 @@ func NewCounter(scope string) *Counter { return &Counter{counter: counter, scope: scope} } +// SetScope sets the scope for the counter. func (c *Counter) SetScope(scope string) { c.scope = scope } diff --git a/pkg/schedule/filter/filters.go b/pkg/schedule/filter/filters.go index 1838f0104f4..6c5dd748d17 100644 --- a/pkg/schedule/filter/filters.go +++ b/pkg/schedule/filter/filters.go @@ -181,14 +181,17 @@ func NewExcludedFilter(scope string, sources, targets map[uint64]struct{}) Filte } } +// Scope returns the scheduler or the checker which the filter acts on. func (f *excludedFilter) Scope() string { return f.scope } +// Type returns the type of the filter. func (*excludedFilter) Type() filterType { return excluded } +// Source filters stores when select them as schedule source. func (f *excludedFilter) Source(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { if _, ok := f.sources[store.GetID()]; ok { return statusStoreAlreadyHasPeer @@ -196,6 +199,7 @@ func (f *excludedFilter) Source(_ config.SharedConfigProvider, store *core.Store return statusOK } +// Target filters stores when select them as schedule target. func (f *excludedFilter) Target(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { if _, ok := f.targets[store.GetID()]; ok { return statusStoreAlreadyHasPeer @@ -211,18 +215,22 @@ func NewStorageThresholdFilter(scope string) Filter { return &storageThresholdFilter{scope: scope} } +// Scope returns the scheduler or the checker which the filter acts on. func (f *storageThresholdFilter) Scope() string { return f.scope } +// Type returns the name of the filter. func (*storageThresholdFilter) Type() filterType { return storageThreshold } +// Source filters stores when select them as schedule source. func (*storageThresholdFilter) Source(config.SharedConfigProvider, *core.StoreInfo) *plan.Status { return statusOK } +// Target filters stores when select them as schedule target. func (*storageThresholdFilter) Target(conf config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { if !store.IsLowSpace(conf.GetLowSpaceRatio()) { return statusOK @@ -279,18 +287,22 @@ func newDistinctScoreFilter(scope string, labels []string, stores []*core.StoreI } } +// Scope returns the scheduler or the checker which the filter acts on. func (f *distinctScoreFilter) Scope() string { return f.scope } +// Type returns the type of the filter. func (*distinctScoreFilter) Type() filterType { return distinctScore } +// Source filters stores when select them as schedule source. func (*distinctScoreFilter) Source(config.SharedConfigProvider, *core.StoreInfo) *plan.Status { return statusOK } +// Target filters stores when select them as schedule target. func (f *distinctScoreFilter) Target(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { score := core.DistinctScore(f.labels, f.stores, store) switch f.policy { @@ -630,14 +642,17 @@ func newRuleFitFilter(scope string, cluster *core.BasicCluster, ruleManager *pla } } +// Scope returns the scheduler or the checker which the filter acts on. func (f *ruleFitFilter) Scope() string { return f.scope } +// Type returns the name of the filter. func (*ruleFitFilter) Type() filterType { return ruleFit } +// Source filters stores when select them as schedule source. func (*ruleFitFilter) Source(config.SharedConfigProvider, *core.StoreInfo) *plan.Status { return statusOK } @@ -683,18 +698,22 @@ func newRuleLeaderFitFilter(scope string, cluster *core.BasicCluster, ruleManage } } +// Scope returns the scheduler or the checker which the filter acts on. func (f *ruleLeaderFitFilter) Scope() string { return f.scope } +// Type returns the name of the filter. func (*ruleLeaderFitFilter) Type() filterType { return ruleLeader } +// Source filters stores when select them as schedule source. func (*ruleLeaderFitFilter) Source(config.SharedConfigProvider, *core.StoreInfo) *plan.Status { return statusOK } +// Target filters stores when select them as schedule target. func (f *ruleLeaderFitFilter) Target(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { targetStoreID := store.GetID() targetPeer := f.region.GetStorePeer(targetStoreID) @@ -739,18 +758,22 @@ func newRuleWitnessFitFilter(scope string, cluster *core.BasicCluster, ruleManag } } +// Scope returns the scheduler or the checker which the filter acts on. func (f *ruleWitnessFitFilter) Scope() string { return f.scope } +// Type returns the name of the filter. func (*ruleWitnessFitFilter) Type() filterType { return ruleFit } +// Source filters stores when select them as schedule source. func (*ruleWitnessFitFilter) Source(config.SharedConfigProvider, *core.StoreInfo) *plan.Status { return statusOK } +// Target filters stores when select them as schedule target. func (f *ruleWitnessFitFilter) Target(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { targetStoreID := store.GetID() targetPeer := f.region.GetStorePeer(targetStoreID) @@ -811,14 +834,17 @@ func NewEngineFilter(scope string, constraint placement.LabelConstraint) Filter } } +// Scope returns the scheduler or the checker which the filter acts on. func (f *engineFilter) Scope() string { return f.scope } +// Type returns the name of the filter. func (*engineFilter) Type() filterType { return engine } +// Source filters stores when select them as schedule source. func (f *engineFilter) Source(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { if f.constraint.MatchStore(store) { return statusOK @@ -826,6 +852,7 @@ func (f *engineFilter) Source(_ config.SharedConfigProvider, store *core.StoreIn return statusStoreNotMatchRule } +// Target filters stores when select them as schedule target. func (f *engineFilter) Target(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { if f.constraint.MatchStore(store) { return statusOK @@ -854,14 +881,17 @@ func NewSpecialUseFilter(scope string, allowUses ...string) Filter { } } +// Scope returns the scheduler or the checker which the filter acts on. func (f *specialUseFilter) Scope() string { return f.scope } +// Type returns the name of the filter. func (*specialUseFilter) Type() filterType { return specialUse } +// Source filters stores when select them as schedule source. func (f *specialUseFilter) Source(conf config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { if store.IsLowSpace(conf.GetLowSpaceRatio()) || !f.constraint.MatchStore(store) { return statusOK @@ -869,6 +899,7 @@ func (f *specialUseFilter) Source(conf config.SharedConfigProvider, store *core. return statusStoreNotMatchRule } +// Target filters stores when select them as schedule target. func (f *specialUseFilter) Target(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { if !f.constraint.MatchStore(store) { return statusOK @@ -928,18 +959,22 @@ func NewIsolationFilter(scope, isolationLevel string, locationLabels []string, r return isolationFilter } +// Scope returns the scheduler or the checker which the filter acts on. func (f *isolationFilter) Scope() string { return f.scope } +// Type returns the name of the filter. func (*isolationFilter) Type() filterType { return isolation } +// Source filters stores when select them as schedule source. func (*isolationFilter) Source(config.SharedConfigProvider, *core.StoreInfo) *plan.Status { return statusOK } +// Target filters stores when select them as schedule target. func (f *isolationFilter) Target(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { // No isolation constraint to fit if len(f.constraintSet) == 0 { diff --git a/pkg/schedule/filter/region_filters.go b/pkg/schedule/filter/region_filters.go index 7cd015412c2..e233ec75973 100644 --- a/pkg/schedule/filter/region_filters.go +++ b/pkg/schedule/filter/region_filters.go @@ -76,6 +76,7 @@ func NewRegionPendingFilter() RegionFilter { return ®ionPendingFilter{} } +// Select implements the RegionFilter interface. func (*regionPendingFilter) Select(region *core.RegionInfo) *plan.Status { if hasPendingPeers(region) { return statusRegionPendingPeer @@ -91,6 +92,7 @@ func NewRegionDownFilter() RegionFilter { return ®ionDownFilter{} } +// Select implements the RegionFilter interface. func (*regionDownFilter) Select(region *core.RegionInfo) *plan.Status { if hasDownPeers(region) { return statusRegionDownPeer diff --git a/pkg/schedule/operator/builder.go b/pkg/schedule/operator/builder.go index 1852f292db0..e28e7de973a 100644 --- a/pkg/schedule/operator/builder.go +++ b/pkg/schedule/operator/builder.go @@ -959,6 +959,7 @@ func (p stepPlan) String() string { p.leaderBeforeAdd, p.add, p.promote, p.leaderBeforeRemove, p.demote, p.remove, p.nonWitness, p.promoteNonWitness, p.witness) } +// IsEmpty checks if the plan is empty. func (p stepPlan) IsEmpty() bool { return p.promote == nil && p.demote == nil && p.add == nil && p.remove == nil && p.nonWitness == nil && p.promoteNonWitness == nil && p.witness == nil } diff --git a/pkg/schedule/operator/operator_queue.go b/pkg/schedule/operator/operator_queue.go index 2233845724e..8643717d5ad 100644 --- a/pkg/schedule/operator/operator_queue.go +++ b/pkg/schedule/operator/operator_queue.go @@ -27,21 +27,26 @@ type operatorWithTime struct { type operatorQueue []*operatorWithTime +// Len implements heap.Interface. func (opn operatorQueue) Len() int { return len(opn) } +// Less implements heap.Interface. func (opn operatorQueue) Less(i, j int) bool { return opn[i].time.Before(opn[j].time) } +// Swap implements heap.Interface. func (opn operatorQueue) Swap(i, j int) { opn[i], opn[j] = opn[j], opn[i] } +// Push implements heap.Interface. func (opn *operatorQueue) Push(x any) { item := x.(*operatorWithTime) *opn = append(*opn, item) } +// Pop implements heap.Interface. func (opn *operatorQueue) Pop() any { old := *opn n := len(old) diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index 4516dfe4433..7df3ee8f552 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -98,12 +98,12 @@ type splitBucketHandler struct { rd *render.Render } -func (h *splitBucketHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { +func (h *splitBucketHandler) listConfig(w http.ResponseWriter, _ *http.Request) { conf := h.conf.Clone() h.rd.JSON(w, http.StatusOK, conf) } -func (h *splitBucketHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { +func (h *splitBucketHandler) updateConfig(w http.ResponseWriter, r *http.Request) { h.conf.Lock() defer h.conf.Unlock() rd := render.New(render.Options{IndentJSON: true}) @@ -148,8 +148,8 @@ func newSplitBucketHandler(conf *splitBucketSchedulerConfig) http.Handler { rd: render.New(render.Options{IndentJSON: true}), } router := mux.NewRouter() - router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) - router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.listConfig).Methods(http.MethodGet) + router.HandleFunc("/config", h.updateConfig).Methods(http.MethodPost) return router } diff --git a/pkg/schedule/splitter/region_splitter.go b/pkg/schedule/splitter/region_splitter.go index aeab4b70cf0..124ad935655 100644 --- a/pkg/schedule/splitter/region_splitter.go +++ b/pkg/schedule/splitter/region_splitter.go @@ -187,6 +187,7 @@ type splitRegionsHandler struct { oc *operator.Controller } +// SplitRegionByKeys split region by keys. func (h *splitRegionsHandler) SplitRegionByKeys(region *core.RegionInfo, splitKeys [][]byte) error { op, err := operator.CreateSplitRegionOperator("region-splitter", region, 0, pdpb.CheckPolicy_USEKEY, splitKeys) if err != nil { @@ -200,6 +201,7 @@ func (h *splitRegionsHandler) SplitRegionByKeys(region *core.RegionInfo, splitKe return nil } +// ScanRegionsByKeyRange scans regions by key range. func (h *splitRegionsHandler) ScanRegionsByKeyRange(groupKeys *regionGroupKeys, results *splitKeyResults) { splitKeys := groupKeys.keys startKey, endKey := groupKeys.region.GetStartKey(), groupKeys.region.GetEndKey() diff --git a/pkg/schedule/type/type.go b/pkg/schedule/type/type.go index 26e1b6a737a..16910c631fd 100644 --- a/pkg/schedule/type/type.go +++ b/pkg/schedule/type/type.go @@ -14,10 +14,12 @@ package types +// CheckerSchedulerType is the type of checker/scheduler. type CheckerSchedulerType string -func (n CheckerSchedulerType) String() string { - return string(n) +// String implements fmt.Stringer. +func (t CheckerSchedulerType) String() string { + return string(t) } const ( @@ -93,6 +95,7 @@ var SchedulerTypeCompatibleMap = map[CheckerSchedulerType]string{ LabelScheduler: "label", } +// SchedulerStr2Type is a map to convert the scheduler string to the CheckerSchedulerType. var SchedulerStr2Type = map[string]CheckerSchedulerType{ "balance-leader-scheduler": BalanceLeaderScheduler, "balance-region-scheduler": BalanceRegionScheduler, diff --git a/pkg/statistics/collector.go b/pkg/statistics/collector.go index 88986b93d4b..4e3e2fa2c7a 100644 --- a/pkg/statistics/collector.go +++ b/pkg/statistics/collector.go @@ -22,12 +22,12 @@ import ( // storeCollector define the behavior of different engines of stores. type storeCollector interface { - // Engine returns the type of Store. - Engine() string - // Filter determines whether the Store needs to be handled by itself. - Filter(info *StoreSummaryInfo, kind constant.ResourceKind) bool - // GetLoads obtains available loads from storeLoads and peerLoadSum according to rwTy and kind. - GetLoads(storeLoads, peerLoadSum []float64, rwTy utils.RWType, kind constant.ResourceKind) (loads []float64) + // engine returns the type of Store. + engine() string + // filter determines whether the Store needs to be handled by itself. + filter(info *StoreSummaryInfo, kind constant.ResourceKind) bool + // getLoads obtains available loads from storeLoads and peerLoadSum according to rwTy and kind. + getLoads(storeLoads, peerLoadSum []float64, rwTy utils.RWType, kind constant.ResourceKind) (loads []float64) } type tikvCollector struct{} @@ -36,11 +36,11 @@ func newTikvCollector() storeCollector { return tikvCollector{} } -func (tikvCollector) Engine() string { +func (tikvCollector) engine() string { return core.EngineTiKV } -func (tikvCollector) Filter(info *StoreSummaryInfo, kind constant.ResourceKind) bool { +func (tikvCollector) filter(info *StoreSummaryInfo, kind constant.ResourceKind) bool { if info.IsTiFlash() { return false } @@ -53,7 +53,7 @@ func (tikvCollector) Filter(info *StoreSummaryInfo, kind constant.ResourceKind) return false } -func (tikvCollector) GetLoads(storeLoads, peerLoadSum []float64, rwTy utils.RWType, kind constant.ResourceKind) (loads []float64) { +func (tikvCollector) getLoads(storeLoads, peerLoadSum []float64, rwTy utils.RWType, kind constant.ResourceKind) (loads []float64) { loads = make([]float64, utils.DimLen) switch rwTy { case utils.Read: @@ -87,11 +87,11 @@ func newTiFlashCollector(isTraceRegionFlow bool) storeCollector { return tiflashCollector{isTraceRegionFlow: isTraceRegionFlow} } -func (tiflashCollector) Engine() string { +func (tiflashCollector) engine() string { return core.EngineTiFlash } -func (tiflashCollector) Filter(info *StoreSummaryInfo, kind constant.ResourceKind) bool { +func (tiflashCollector) filter(info *StoreSummaryInfo, kind constant.ResourceKind) bool { switch kind { case constant.LeaderKind: return false @@ -101,7 +101,7 @@ func (tiflashCollector) Filter(info *StoreSummaryInfo, kind constant.ResourceKin return false } -func (c tiflashCollector) GetLoads(storeLoads, peerLoadSum []float64, rwTy utils.RWType, kind constant.ResourceKind) (loads []float64) { +func (c tiflashCollector) getLoads(storeLoads, peerLoadSum []float64, rwTy utils.RWType, kind constant.ResourceKind) (loads []float64) { loads = make([]float64, utils.DimLen) switch rwTy { case utils.Read: diff --git a/pkg/statistics/hot_peer.go b/pkg/statistics/hot_peer.go index 79757d6e27f..8f92fbff542 100644 --- a/pkg/statistics/hot_peer.go +++ b/pkg/statistics/hot_peer.go @@ -41,7 +41,7 @@ func newDimStat(reportInterval time.Duration) *dimStat { } } -func (d *dimStat) Add(delta float64, interval time.Duration) { +func (d *dimStat) add(delta float64, interval time.Duration) { d.Lock() defer d.Unlock() d.lastIntervalSum += int(interval.Seconds()) @@ -74,13 +74,13 @@ func (d *dimStat) clearLastAverage() { d.lastDelta = 0 } -func (d *dimStat) Get() float64 { +func (d *dimStat) get() float64 { d.RLock() defer d.RUnlock() return d.rolling.Get() } -func (d *dimStat) Clone() *dimStat { +func (d *dimStat) clone() *dimStat { d.RLock() defer d.RUnlock() return &dimStat{ @@ -162,7 +162,7 @@ func (stat *HotPeerStat) GetActionType() utils.ActionType { // GetLoad returns denoising load if possible. func (stat *HotPeerStat) GetLoad(dim int) float64 { if stat.rollingLoads != nil { - return math.Round(stat.rollingLoads[dim].Get()) + return math.Round(stat.rollingLoads[dim].get()) } return math.Round(stat.Loads[dim]) } @@ -172,7 +172,7 @@ func (stat *HotPeerStat) GetLoads() []float64 { if stat.rollingLoads != nil { ret := make([]float64, len(stat.rollingLoads)) for dim := range ret { - ret[dim] = math.Round(stat.rollingLoads[dim].Get()) + ret[dim] = math.Round(stat.rollingLoads[dim].get()) } return ret } diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index 4db0c304bb9..8d1f64ca540 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -102,6 +102,7 @@ func (f *HotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat { return res } +// UpdateStat updates the stat cache. func (f *HotPeerCache) UpdateStat(item *HotPeerStat) { switch item.actionType { case utils.Remove: @@ -439,7 +440,7 @@ func (f *HotPeerCache) updateHotPeerStat(region *core.RegionInfo, newItem, oldIt if source == utils.Inherit { for _, dim := range oldItem.rollingLoads { - newItem.rollingLoads = append(newItem.rollingLoads, dim.Clone()) + newItem.rollingLoads = append(newItem.rollingLoads, dim.clone()) } newItem.allowInherited = false } else { @@ -462,7 +463,7 @@ func (f *HotPeerCache) updateHotPeerStat(region *core.RegionInfo, newItem, oldIt } for i, k := range regionStats { - newItem.rollingLoads[i].Add(deltaLoads[k], interval) + newItem.rollingLoads[i].add(deltaLoads[k], interval) } isFull := newItem.rollingLoads[0].isFull(f.interval()) // The intervals of dims are the same, so it is only necessary to determine whether any of them @@ -505,7 +506,7 @@ func (f *HotPeerCache) updateNewHotPeerStat(newItem *HotPeerStat, deltaLoads []f newItem.rollingLoads = make([]*dimStat, len(regionStats)) for i, k := range regionStats { ds := newDimStat(f.interval()) - ds.Add(deltaLoads[k], interval) + ds.add(deltaLoads[k], interval) if ds.isFull(f.interval()) { ds.clearLastAverage() } diff --git a/pkg/statistics/store_collection.go b/pkg/statistics/store_collection.go index 6d5df0bda62..f55c23b27b7 100644 --- a/pkg/statistics/store_collection.go +++ b/pkg/statistics/store_collection.go @@ -61,7 +61,7 @@ func newStoreStatistics(opt config.ConfProvider) *storeStatistics { } } -func (s *storeStatistics) Observe(store *core.StoreInfo) { +func (s *storeStatistics) observe(store *core.StoreInfo) { for _, k := range s.opt.GetLocationLabels() { v := store.GetLabelValue(k) if v == "" { @@ -147,6 +147,7 @@ func (s *storeStatistics) Observe(store *core.StoreInfo) { } } +// ObserveHotStat records the hot region metrics for the store. func ObserveHotStat(store *core.StoreInfo, stats *StoresStats) { // Store flows. storeAddress := store.GetAddress() @@ -178,7 +179,7 @@ func ObserveHotStat(store *core.StoreInfo, stats *StoresStats) { storeStatusGauge.WithLabelValues(storeAddress, id, "store_regions_write_rate_keys_instant").Set(storeFlowStats.GetInstantLoad(utils.StoreRegionsWriteKeys)) } -func (s *storeStatistics) Collect() { +func (s *storeStatistics) collect() { placementStatusGauge.Reset() metrics := make(map[string]float64) @@ -307,12 +308,14 @@ func NewStoreStatisticsMap(opt config.ConfProvider) *storeStatisticsMap { } } +// Observe observes the store. func (m *storeStatisticsMap) Observe(store *core.StoreInfo) { - m.stats.Observe(store) + m.stats.observe(store) } +// Collect collects the metrics. func (m *storeStatisticsMap) Collect() { - m.stats.Collect() + m.stats.collect() } // Reset resets the metrics. diff --git a/pkg/statistics/store_hot_peers_infos.go b/pkg/statistics/store_hot_peers_infos.go index f7873bdd744..f64c7743d16 100644 --- a/pkg/statistics/store_hot_peers_infos.go +++ b/pkg/statistics/store_hot_peers_infos.go @@ -158,7 +158,7 @@ func summaryStoresLoadByEngine( store := info.StoreInfo id := store.GetID() storeLoads, ok := storesLoads[id] - if !ok || !collector.Filter(info, kind) { + if !ok || !collector.filter(info, kind) { continue } @@ -172,7 +172,7 @@ func summaryStoresLoadByEngine( } hotPeers = append(hotPeers, peer.Clone()) } - currentLoads := collector.GetLoads(storeLoads, peerLoadSum, rwTy, kind) + currentLoads := collector.getLoads(storeLoads, peerLoadSum, rwTy, kind) var historyLoads [][]float64 if storesHistoryLoads != nil { @@ -240,7 +240,7 @@ func summaryStoresLoadByEngine( { // Metric for debug. - engine := collector.Engine() + engine := collector.engine() ty := "exp-byte-rate-" + rwTy.String() + "-" + kind.String() hotPeerSummary.WithLabelValues(ty, engine).Set(expectLoads[utils.ByteDim]) ty = "exp-key-rate-" + rwTy.String() + "-" + kind.String() diff --git a/pkg/statistics/utils/topn.go b/pkg/statistics/utils/topn.go index 7ab6c6eaf3e..cb97251edd9 100644 --- a/pkg/statistics/utils/topn.go +++ b/pkg/statistics/utils/topn.go @@ -58,35 +58,35 @@ func NewTopN(k, n int, ttl time.Duration) *TopN { func (tn *TopN) Len() int { tn.rw.RLock() defer tn.rw.RUnlock() - return tn.ttlLst.Len() + return tn.ttlLst.len() } // GetTopNMin returns the min item in top N of the `k`th dimension. func (tn *TopN) GetTopNMin(k int) TopNItem { tn.rw.RLock() defer tn.rw.RUnlock() - return tn.topns[k].GetTopNMin() + return tn.topns[k].getTopNMin() } // GetAllTopN returns the top N items of the `k`th dimension. func (tn *TopN) GetAllTopN(k int) []TopNItem { tn.rw.RLock() defer tn.rw.RUnlock() - return tn.topns[k].GetAllTopN() + return tn.topns[k].getAllTopN() } // GetAll returns all items. func (tn *TopN) GetAll() []TopNItem { tn.rw.RLock() defer tn.rw.RUnlock() - return tn.topns[0].GetAll() + return tn.topns[0].getAll() } // Get returns the item with given id, nil if there is no such item. func (tn *TopN) Get(id uint64) TopNItem { tn.rw.RLock() defer tn.rw.RUnlock() - return tn.topns[0].Get(id) + return tn.topns[0].get(id) } // Put inserts item or updates the old item if it exists. @@ -94,9 +94,9 @@ func (tn *TopN) Put(item TopNItem) (isUpdate bool) { tn.rw.Lock() defer tn.rw.Unlock() for _, stn := range tn.topns { - isUpdate = stn.Put(item) + isUpdate = stn.put(item) } - tn.ttlLst.Put(item.ID()) + tn.ttlLst.put(item.ID()) tn.maintain() return } @@ -113,17 +113,17 @@ func (tn *TopN) Remove(id uint64) (item TopNItem) { tn.rw.Lock() defer tn.rw.Unlock() for _, stn := range tn.topns { - item = stn.Remove(id) + item = stn.remove(id) } - _ = tn.ttlLst.Remove(id) + _ = tn.ttlLst.remove(id) tn.maintain() return } func (tn *TopN) maintain() { - for _, id := range tn.ttlLst.TakeExpired() { + for _, id := range tn.ttlLst.takeExpired() { for _, stn := range tn.topns { - stn.Remove(id) + stn.remove(id) } } } @@ -144,31 +144,27 @@ func newSingleTopN(k, n int) *singleTopN { } } -func (stn *singleTopN) Len() int { - return stn.topn.Len() + stn.rest.Len() -} - -func (stn *singleTopN) GetTopNMin() TopNItem { +func (stn *singleTopN) getTopNMin() TopNItem { return stn.topn.Top() } -func (stn *singleTopN) GetAllTopN() []TopNItem { +func (stn *singleTopN) getAllTopN() []TopNItem { return stn.topn.GetAll() } -func (stn *singleTopN) GetAll() []TopNItem { +func (stn *singleTopN) getAll() []TopNItem { topn := stn.topn.GetAll() return append(topn, stn.rest.GetAll()...) } -func (stn *singleTopN) Get(id uint64) TopNItem { +func (stn *singleTopN) get(id uint64) TopNItem { if item := stn.topn.Get(id); item != nil { return item } return stn.rest.Get(id) } -func (stn *singleTopN) Put(item TopNItem) (isUpdate bool) { +func (stn *singleTopN) put(item TopNItem) (isUpdate bool) { if stn.topn.Get(item.ID()) != nil { isUpdate = true stn.topn.Put(item) @@ -179,7 +175,7 @@ func (stn *singleTopN) Put(item TopNItem) (isUpdate bool) { return } -func (stn *singleTopN) Remove(id uint64) TopNItem { +func (stn *singleTopN) remove(id uint64) TopNItem { item := stn.topn.Remove(id) if item == nil { item = stn.rest.Remove(id) @@ -340,11 +336,11 @@ func newTTLList(ttl time.Duration) *ttlList { } } -func (tl *ttlList) Len() int { +func (tl *ttlList) len() int { return tl.lst.Len() } -func (tl *ttlList) TakeExpired() []uint64 { +func (tl *ttlList) takeExpired() []uint64 { expired := []uint64{} now := time.Now() for ele := tl.lst.Front(); ele != nil; ele = tl.lst.Front() { @@ -359,7 +355,7 @@ func (tl *ttlList) TakeExpired() []uint64 { return expired } -func (tl *ttlList) Put(id uint64) (isUpdate bool) { +func (tl *ttlList) put(id uint64) (isUpdate bool) { item := ttlItem{id: id} if ele, ok := tl.index[id]; ok { isUpdate = true @@ -370,7 +366,7 @@ func (tl *ttlList) Put(id uint64) (isUpdate bool) { return } -func (tl *ttlList) Remove(id uint64) (removed bool) { +func (tl *ttlList) remove(id uint64) (removed bool) { if ele, ok := tl.index[id]; ok { _ = tl.lst.Remove(ele) delete(tl.index, id) diff --git a/pkg/storage/kv/etcd_kv.go b/pkg/storage/kv/etcd_kv.go index 767aeff77a6..e2eb8c979eb 100644 --- a/pkg/storage/kv/etcd_kv.go +++ b/pkg/storage/kv/etcd_kv.go @@ -55,6 +55,7 @@ func NewEtcdKVBase(client *clientv3.Client, rootPath string) *etcdKVBase { } } +// NewEtcdKV creates a new etcd kv. func (kv *etcdKVBase) Load(key string) (string, error) { key = path.Join(kv.rootPath, key) @@ -70,6 +71,7 @@ func (kv *etcdKVBase) Load(key string) (string, error) { return string(resp.Kvs[0].Value), nil } +// LoadRange loads a range of keys [key, endKey) from etcd. func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []string, error) { // Note: reason to use `strings.Join` instead of `path.Join` is that the latter will // removes suffix '/' of the joined string. @@ -99,6 +101,7 @@ func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []stri return keys, values, nil } +// Save puts a key-value pair to etcd. func (kv *etcdKVBase) Save(key, value string) error { failpoint.Inject("etcdSaveFailed", func() { failpoint.Return(errors.New("save failed")) @@ -117,6 +120,7 @@ func (kv *etcdKVBase) Save(key, value string) error { return nil } +// Remove removes the key from etcd. func (kv *etcdKVBase) Remove(key string) error { key = path.Join(kv.rootPath, key) diff --git a/pkg/storage/kv/mem_kv.go b/pkg/storage/kv/mem_kv.go index 91d13c04e61..b97a3d6cfa1 100644 --- a/pkg/storage/kv/mem_kv.go +++ b/pkg/storage/kv/mem_kv.go @@ -41,10 +41,12 @@ type memoryKVItem struct { key, value string } +// Less compares two memoryKVItem. func (s *memoryKVItem) Less(than *memoryKVItem) bool { return s.key < than.key } +// Load loads the value for the key. func (kv *memoryKV) Load(key string) (string, error) { kv.RLock() defer kv.RUnlock() @@ -55,6 +57,7 @@ func (kv *memoryKV) Load(key string) (string, error) { return item.value, nil } +// LoadRange loads the keys in the range of [key, endKey). func (kv *memoryKV) LoadRange(key, endKey string, limit int) ([]string, []string, error) { failpoint.Inject("withRangeLimit", func(val failpoint.Value) { rangeLimit, ok := val.(int) @@ -77,6 +80,7 @@ func (kv *memoryKV) LoadRange(key, endKey string, limit int) ([]string, []string return keys, values, nil } +// Save saves the key-value pair. func (kv *memoryKV) Save(key, value string) error { kv.Lock() defer kv.Unlock() @@ -84,6 +88,7 @@ func (kv *memoryKV) Save(key, value string) error { return nil } +// Remove removes the key. func (kv *memoryKV) Remove(key string) error { kv.Lock() defer kv.Unlock() diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index 00fa8dc389b..a94f5c41f3f 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -69,7 +69,7 @@ func (s *RegionSyncer) syncRegion(ctx context.Context, conn *grpc.ClientConn) (C err = syncStream.Send(&pdpb.SyncRegionRequest{ Header: &pdpb.RequestHeader{ClusterId: s.server.ClusterID()}, Member: s.server.GetMemberInfo(), - StartIndex: s.history.GetNextIndex(), + StartIndex: s.history.getNextIndex(), }) if err != nil { return nil, err @@ -154,7 +154,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { time.Sleep(time.Second) continue } - log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.GetNextIndex())) + log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.getNextIndex())) for { resp, err := stream.Recv() if err != nil { @@ -166,14 +166,14 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { time.Sleep(time.Second) break } - if s.history.GetNextIndex() != resp.GetStartIndex() { + if s.history.getNextIndex() != resp.GetStartIndex() { log.Warn("server sync index not match the leader", zap.String("server", s.server.Name()), - zap.Uint64("own", s.history.GetNextIndex()), + zap.Uint64("own", s.history.getNextIndex()), zap.Uint64("leader", resp.GetStartIndex()), zap.Int("records-length", len(resp.GetRegions()))) // reset index - s.history.ResetWithIndex(resp.GetStartIndex()) + s.history.resetWithIndex(resp.GetStartIndex()) } stats := resp.GetRegionStats() regions := resp.GetRegions() @@ -224,7 +224,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { err = regionStorage.SaveRegion(r) } if err == nil { - s.history.Record(region) + s.history.record(region) } for _, old := range overlaps { _ = regionStorage.DeleteRegion(old.GetMeta()) diff --git a/pkg/syncer/history_buffer.go b/pkg/syncer/history_buffer.go index 08fe85cc8c5..7ff6f202ad3 100644 --- a/pkg/syncer/history_buffer.go +++ b/pkg/syncer/history_buffer.go @@ -84,7 +84,7 @@ func (h *historyBuffer) firstIndex() uint64 { return h.index - uint64(h.len()) } -func (h *historyBuffer) Record(r *core.RegionInfo) { +func (h *historyBuffer) record(r *core.RegionInfo) { h.Lock() defer h.Unlock() syncIndexGauge.Set(float64(h.index)) @@ -101,7 +101,7 @@ func (h *historyBuffer) Record(r *core.RegionInfo) { } } -func (h *historyBuffer) RecordsFrom(index uint64) []*core.RegionInfo { +func (h *historyBuffer) recordsFrom(index uint64) []*core.RegionInfo { h.RLock() defer h.RUnlock() var pos int @@ -117,7 +117,7 @@ func (h *historyBuffer) RecordsFrom(index uint64) []*core.RegionInfo { return records } -func (h *historyBuffer) ResetWithIndex(index uint64) { +func (h *historyBuffer) resetWithIndex(index uint64) { h.Lock() defer h.Unlock() h.index = index @@ -126,7 +126,7 @@ func (h *historyBuffer) ResetWithIndex(index uint64) { h.flushCount = defaultFlushCount } -func (h *historyBuffer) GetNextIndex() uint64 { +func (h *historyBuffer) getNextIndex() uint64 { h.RLock() defer h.RUnlock() return h.index diff --git a/pkg/syncer/history_buffer_test.go b/pkg/syncer/history_buffer_test.go index 4bca5b7f603..70a1caf13dc 100644 --- a/pkg/syncer/history_buffer_test.go +++ b/pkg/syncer/history_buffer_test.go @@ -34,7 +34,7 @@ func TestBufferSize(t *testing.T) { h := newHistoryBuffer(1, kv.NewMemoryKV()) re.Equal(0, h.len()) for _, r := range regions { - h.Record(r) + h.record(r) } re.Equal(1, h.len()) re.Equal(regions[h.nextIndex()-1], h.get(100)) @@ -43,7 +43,7 @@ func TestBufferSize(t *testing.T) { // size equals 2 h = newHistoryBuffer(2, kv.NewMemoryKV()) for _, r := range regions { - h.Record(r) + h.record(r) } re.Equal(2, h.len()) re.Equal(regions[h.nextIndex()-1], h.get(100)) @@ -54,7 +54,7 @@ func TestBufferSize(t *testing.T) { kvMem := kv.NewMemoryKV() h1 := newHistoryBuffer(100, kvMem) for i := 0; i < 6; i++ { - h1.Record(regions[i]) + h1.record(regions[i]) } re.Equal(6, h1.len()) re.Equal(uint64(6), h1.nextIndex()) @@ -68,7 +68,7 @@ func TestBufferSize(t *testing.T) { re.Equal(0, h2.len()) for _, r := range regions { index := h2.nextIndex() - h2.Record(r) + h2.record(r) re.Equal(r, h2.get(index)) } @@ -79,9 +79,9 @@ func TestBufferSize(t *testing.T) { // flush in index 106 re.Equal("106", s) - histories := h2.RecordsFrom(uint64(1)) + histories := h2.recordsFrom(uint64(1)) re.Empty(histories) - histories = h2.RecordsFrom(h2.firstIndex()) + histories = h2.recordsFrom(h2.firstIndex()) re.Len(histories, 100) re.Equal(uint64(7), h2.firstIndex()) re.Equal(regions[1:], histories) diff --git a/pkg/syncer/server.go b/pkg/syncer/server.go index 132b06aec69..2cdc01053f6 100644 --- a/pkg/syncer/server.go +++ b/pkg/syncer/server.go @@ -136,8 +136,8 @@ func (s *RegionSyncer) RunServer(ctx context.Context, regionNotifier <-chan *cor } buckets = append(buckets, bucket) leaders = append(leaders, first.GetLeader()) - startIndex := s.history.GetNextIndex() - s.history.Record(first) + startIndex := s.history.getNextIndex() + s.history.record(first) pending := len(regionNotifier) for i := 0; i < pending && i < maxSyncRegionBatchSize; i++ { region := <-regionNotifier @@ -150,7 +150,7 @@ func (s *RegionSyncer) RunServer(ctx context.Context, regionNotifier <-chan *cor } buckets = append(buckets, bucket) leaders = append(leaders, region.GetLeader()) - s.history.Record(region) + s.history.record(region) } regions := &pdpb.SyncRegionResponse{ Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()}, @@ -164,7 +164,7 @@ func (s *RegionSyncer) RunServer(ctx context.Context, regionNotifier <-chan *cor case <-ticker.C: alive := &pdpb.SyncRegionResponse{ Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()}, - StartIndex: s.history.GetNextIndex(), + StartIndex: s.history.getNextIndex(), } s.broadcast(alive) } @@ -223,9 +223,9 @@ func (s *RegionSyncer) Sync(ctx context.Context, stream pdpb.PD_SyncRegionsServe func (s *RegionSyncer) syncHistoryRegion(ctx context.Context, request *pdpb.SyncRegionRequest, stream pdpb.PD_SyncRegionsServer) error { startIndex := request.GetStartIndex() name := request.GetMember().GetName() - records := s.history.RecordsFrom(startIndex) + records := s.history.recordsFrom(startIndex) if len(records) == 0 { - if s.history.GetNextIndex() == startIndex { + if s.history.getNextIndex() == startIndex { log.Info("requested server has already in sync with server", zap.String("requested-server", name), zap.String("server", s.server.Name()), zap.Uint64("last-index", startIndex)) // still send a response to follower to show the history region sync. @@ -306,7 +306,7 @@ func (s *RegionSyncer) syncHistoryRegion(ctx context.Context, request *pdpb.Sync log.Info("sync the history regions with server", zap.String("server", name), zap.Uint64("from-index", startIndex), - zap.Uint64("last-index", s.history.GetNextIndex()), + zap.Uint64("last-index", s.history.getNextIndex()), zap.Int("records-length", len(records))) regions := make([]*metapb.Region, len(records)) stats := make([]*pdpb.RegionStat, len(records)) diff --git a/pkg/window/counter.go b/pkg/window/counter.go index 8eaf164b7c0..84325cdc14b 100644 --- a/pkg/window/counter.go +++ b/pkg/window/counter.go @@ -76,34 +76,44 @@ func NewRollingCounter(opts RollingCounterOpts) RollingCounter { } } +// Add adds the given value to the counter. func (r *rollingCounter) Add(val int64) { r.policy.Add(float64(val)) } +// Reduce applies the reduction function to all buckets within the window. func (r *rollingCounter) Reduce(f func(Iterator) float64) float64 { return r.policy.Reduce(f) } +// Avg computes average value within the window. func (r *rollingCounter) Avg() float64 { return r.policy.Reduce(Avg) } +// Min finds the min value within the window. func (r *rollingCounter) Min() float64 { return r.policy.Reduce(Min) } +// Max finds the max value within the window. func (r *rollingCounter) Max() float64 { return r.policy.Reduce(Max) } +// Sum computes sum value within the window. func (r *rollingCounter) Sum() float64 { return r.policy.Reduce(Sum) } +// Value gets the current value. func (r *rollingCounter) Value() int64 { return int64(r.Sum()) } +// Timespan returns passed bucket number since lastAppendTime, +// if it is one bucket duration earlier than the last recorded +// time, it will return the size. func (r *rollingCounter) Timespan() int { r.policy.mu.RLock() defer r.policy.mu.RUnlock() diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index a37874a8461..9ad797e0ae4 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -96,6 +96,7 @@ type evictLeaderSchedulerConfig struct { cluster *core.BasicCluster } +// BuildWithArgs builds the config with the args. func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error { if len(args) != 1 { return errors.New("should specify the store-id") @@ -115,6 +116,7 @@ func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error { return nil } +// Clone clones the config. func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig { conf.mu.RLock() defer conf.mu.RUnlock() @@ -123,6 +125,7 @@ func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig { } } +// Persist saves the config. func (conf *evictLeaderSchedulerConfig) Persist() error { name := conf.getScheduleName() conf.mu.RLock() @@ -166,24 +169,29 @@ func newEvictLeaderScheduler(opController *operator.Controller, conf *evictLeade } } +// ServeHTTP implements the http.Handler interface. func (s *evictLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.handler.ServeHTTP(w, r) } +// GetName returns the scheduler name. func (*evictLeaderScheduler) GetName() string { return EvictLeaderName } +// GetType returns the scheduler type. func (*evictLeaderScheduler) GetType() string { return EvictLeaderType } +// EncodeConfig serializes the config. func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) { s.conf.mu.RLock() defer s.conf.mu.RUnlock() return schedulers.EncodeConfig(s.conf) } +// PrepareConfig ensures the scheduler config is valid. func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { s.conf.mu.RLock() defer s.conf.mu.RUnlock() @@ -196,6 +204,7 @@ func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) erro return res } +// CleanConfig is used to clean the scheduler config. func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.conf.mu.RLock() defer s.conf.mu.RUnlock() @@ -204,6 +213,7 @@ func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { } } +// IsScheduleAllowed checks if the scheduler is allowed to schedule. func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() if !allowed { @@ -212,6 +222,7 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) return allowed } +// Schedule schedules the evict leader operator. func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { ops := make([]*operator.Operator, 0, len(s.conf.StoreIDWitRanges)) s.conf.mu.RLock() @@ -246,6 +257,7 @@ type evictLeaderHandler struct { config *evictLeaderSchedulerConfig } +// UpdateConfig updates the config. func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { var input map[string]any if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { @@ -286,11 +298,13 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R handler.rd.JSON(w, http.StatusOK, nil) } +// ListConfig lists the config. func (handler *evictLeaderHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.Clone() handler.rd.JSON(w, http.StatusOK, conf) } +// DeleteConfig deletes the config. func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.Request) { idStr := mux.Vars(r)["store_id"] id, err := strconv.ParseUint(idStr, 10, 64) From 45dac53d65c3903618e6a2a741fbd0bec20e3b44 Mon Sep 17 00:00:00 2001 From: Jack Lyu <63168620+JackL9u@users.noreply.github.com> Date: Wed, 31 Jul 2024 16:22:21 +0800 Subject: [PATCH 8/8] server: fix inappropriate log level (#8462) ref tikv/pd#8453 changed log.Error to log.Warn, since it's a warning Signed-off-by: Boyang Lyu Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mcs/resourcemanager/server/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index 13e46ea0bba..8588c22b9ae 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -121,7 +121,7 @@ func (m *Manager) Init(ctx context.Context) error { return err } if err = json.Unmarshal([]byte(v), &m.controllerConfig); err != nil { - log.Error("un-marshall controller config failed, fallback to default", zap.Error(err), zap.String("v", v)) + log.Warn("un-marshall controller config failed, fallback to default", zap.Error(err), zap.String("v", v)) } // re-save the config to make sure the config has been persisted.