diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 8bd2616f41f..07636bed074 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -35,12 +35,7 @@ type Cluster interface { func HandleStatsAsync(c Cluster, region *core.RegionInfo) { c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) - c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region)) - } + c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(region, region.GetWriteLoads())) c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region) } diff --git a/pkg/core/peer.go b/pkg/core/peer.go index 659886e6d39..1f888ba58eb 100644 --- a/pkg/core/peer.go +++ b/pkg/core/peer.go @@ -77,34 +77,3 @@ func CountInJointState(peers ...*metapb.Peer) int { } return count } - -// PeerInfo provides peer information -type PeerInfo struct { - *metapb.Peer - loads []float64 - interval uint64 -} - -// NewPeerInfo creates PeerInfo -func NewPeerInfo(meta *metapb.Peer, loads []float64, interval uint64) *PeerInfo { - return &PeerInfo{ - Peer: meta, - loads: loads, - interval: interval, - } -} - -// GetLoads provides loads -func (p *PeerInfo) GetLoads() []float64 { - return p.loads -} - -// GetPeerID provides peer id -func (p *PeerInfo) GetPeerID() uint64 { - return p.GetId() -} - -// GetInterval returns reporting interval -func (p *PeerInfo) GetInterval() uint64 { - return p.interval -} diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index c6c365b03ad..96309d20270 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -442,8 +442,7 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq utils.RegionWriteKeys: 0, utils.RegionWriteQueryNum: 0, } - peerInfo := core.NewPeerInfo(peer, loads, interval) - c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads)) } // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index e5b3e39a502..95f42ebc5e5 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -894,16 +894,7 @@ func (mc *Cluster) CheckRegionRead(region *core.RegionInfo) []*statistics.HotPee items := make([]*statistics.HotPeerStat, 0) expiredItems := mc.HotCache.ExpiredReadItems(region) items = append(items, expiredItems...) - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) - item := mc.HotCache.CheckReadPeerSync(peerInfo, region) - if item != nil { - items = append(items, item) - } - } - return items + return append(items, mc.HotCache.CheckReadPeerSync(region, nil)...) } // CheckRegionWrite checks region write info with all peers @@ -911,16 +902,7 @@ func (mc *Cluster) CheckRegionWrite(region *core.RegionInfo) []*statistics.HotPe items := make([]*statistics.HotPeerStat, 0) expiredItems := mc.HotCache.ExpiredWriteItems(region) items = append(items, expiredItems...) - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) - item := mc.HotCache.CheckWritePeerSync(peerInfo, region) - if item != nil { - items = append(items, item) - } - } - return items + return append(items, mc.HotCache.CheckWritePeerSync(region, nil)...) } // CheckRegionLeaderRead checks region read info with leader peer @@ -928,15 +910,7 @@ func (mc *Cluster) CheckRegionLeaderRead(region *core.RegionInfo) []*statistics. items := make([]*statistics.HotPeerStat, 0) expiredItems := mc.HotCache.ExpiredReadItems(region) items = append(items, expiredItems...) - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - peer := region.GetLeader() - peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) - item := mc.HotCache.CheckReadPeerSync(peerInfo, region) - if item != nil { - items = append(items, item) - } - return items + return append(items, mc.HotCache.CheckReadPeerSync(region, nil)...) } // ObserveRegionsStats records the current stores stats from region stats. diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index 799fb240d10..fd7b67ae9a3 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -172,14 +172,14 @@ func (w *HotCache) Update(item *HotPeerStat, kind utils.RWType) { // CheckWritePeerSync checks the write status, returns update items. // This is used for mockcluster, for test purpose. -func (w *HotCache) CheckWritePeerSync(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { - return w.writeCache.checkPeerFlow(peer, region) +func (w *HotCache) CheckWritePeerSync(region *core.RegionInfo, loads []float64) []*HotPeerStat { + return w.writeCache.checkPeerFlow(region, region.GetPeers(), loads) } // CheckReadPeerSync checks the read status, returns update items. // This is used for mockcluster, for test purpose. -func (w *HotCache) CheckReadPeerSync(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { - return w.readCache.checkPeerFlow(peer, region) +func (w *HotCache) CheckReadPeerSync(region *core.RegionInfo, loads []float64) []*HotPeerStat { + return w.readCache.checkPeerFlow(region, region.GetPeers(), loads) } // ExpiredReadItems returns the read items which are already expired. diff --git a/pkg/statistics/hot_cache_task.go b/pkg/statistics/hot_cache_task.go index fa224b522ff..962835d9089 100644 --- a/pkg/statistics/hot_cache_task.go +++ b/pkg/statistics/hot_cache_task.go @@ -26,21 +26,21 @@ type FlowItemTask interface { } type checkPeerTask struct { - peerInfo *core.PeerInfo regionInfo *core.RegionInfo + loads []float64 } // NewCheckPeerTask creates task to update peerInfo -func NewCheckPeerTask(peerInfo *core.PeerInfo, regionInfo *core.RegionInfo) FlowItemTask { +func NewCheckPeerTask(regionInfo *core.RegionInfo, loads []float64) FlowItemTask { return &checkPeerTask{ - peerInfo: peerInfo, regionInfo: regionInfo, + loads: loads, } } func (t *checkPeerTask) runTask(cache *hotPeerCache) { - stat := cache.checkPeerFlow(t.peerInfo, t.regionInfo) - if stat != nil { + stats := cache.checkPeerFlow(t.regionInfo, t.regionInfo.GetPeers(), t.loads) + for _, stat := range stats { cache.updateStat(stat) } } diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index cd27dcad4c8..f1a9ce2cc1d 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -174,58 +174,69 @@ func (f *hotPeerCache) collectExpiredItems(region *core.RegionInfo) []*HotPeerSt // checkPeerFlow checks the flow information of a peer. // Notice: checkPeerFlow couldn't be used concurrently. // checkPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here. -func (f *hotPeerCache) checkPeerFlow(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { - interval := peer.GetInterval() +func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Peer, deltaLoads []float64) []*HotPeerStat { + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() if Denoising && interval < HotRegionReportMinInterval { // for test or simulator purpose return nil } - storeID := peer.GetStoreId() - deltaLoads := peer.GetLoads() + + if deltaLoads == nil { + deltaLoads = region.GetLoads() + } f.collectPeerMetrics(deltaLoads, interval) // update metrics regionID := region.GetID() - oldItem := f.getOldHotPeerStat(regionID, storeID) - - // check whether the peer is allowed to be inherited - source := utils.Direct - if oldItem == nil { - for _, storeID := range f.getAllStoreIDs(region) { - oldItem = f.getOldHotPeerStat(regionID, storeID) - if oldItem != nil && oldItem.allowInherited { - source = utils.Inherit - break + + var stats []*HotPeerStat + for _, peer := range peers { + storeID := peer.GetStoreId() + oldItem := f.getOldHotPeerStat(regionID, storeID) + + // check whether the peer is allowed to be inherited + source := utils.Direct + if oldItem == nil { + for _, storeID := range f.getAllStoreIDs(region) { + oldItem = f.getOldHotPeerStat(regionID, storeID) + if oldItem != nil && oldItem.allowInherited { + source = utils.Inherit + break + } } } - } - - // check new item whether is hot - if oldItem == nil { - regionStats := f.kind.RegionStats() - thresholds := f.calcHotThresholds(storeID) - isHot := slice.AnyOf(regionStats, func(i int) bool { - return deltaLoads[regionStats[i]]/float64(interval) >= thresholds[i] - }) - if !isHot { - return nil + // check new item whether is hot + if oldItem == nil { + regionStats := f.kind.RegionStats() + thresholds := f.calcHotThresholds(storeID) + isHot := slice.AnyOf(regionStats, func(i int) bool { + return deltaLoads[regionStats[i]]/float64(interval) >= thresholds[i] + }) + if !isHot { + continue + } } - } - peers := region.GetPeers() - newItem := &HotPeerStat{ - StoreID: storeID, - RegionID: regionID, - Loads: f.kind.GetLoadRatesFromPeer(peer), - isLeader: region.GetLeader().GetStoreId() == storeID, - actionType: utils.Update, - stores: make([]uint64, len(peers)), - } - for i, peer := range peers { - newItem.stores[i] = peer.GetStoreId() - } - - if oldItem == nil { - return f.updateNewHotPeerStat(newItem, deltaLoads, time.Duration(interval)*time.Second) + newItem := &HotPeerStat{ + StoreID: storeID, + RegionID: regionID, + Loads: f.kind.GetLoadRatesFromPeer(peer, deltaLoads, interval), + isLeader: region.GetLeader().GetStoreId() == storeID, + actionType: utils.Update, + stores: make([]uint64, len(peers)), + } + for i, peer := range peers { + newItem.stores[i] = peer.GetStoreId() + } + if oldItem == nil { + if stat := f.updateNewHotPeerStat(newItem, deltaLoads, time.Duration(interval)*time.Second); stat != nil { + stats = append(stats, stat) + } + continue + } + if stat := f.updateHotPeerStat(region, newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second, source); stat != nil { + stats = append(stats, stat) + } } - return f.updateHotPeerStat(region, newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second, source) + return stats } // checkColdPeer checks the collect the un-heartbeat peer and maintain it. diff --git a/pkg/statistics/hot_peer_cache_test.go b/pkg/statistics/hot_peer_cache_test.go index 36f922d3830..50c6b3c961e 100644 --- a/pkg/statistics/hot_peer_cache_test.go +++ b/pkg/statistics/hot_peer_cache_test.go @@ -106,17 +106,8 @@ func orderingPeers(cache *hotPeerCache, region *core.RegionInfo) []*metapb.Peer } func checkFlow(cache *hotPeerCache, region *core.RegionInfo, peers []*metapb.Peer) (res []*HotPeerStat) { - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() res = append(res, cache.collectExpiredItems(region)...) - for _, peer := range peers { - peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) - item := cache.checkPeerFlow(peerInfo, region) - if item != nil { - res = append(res, item) - } - } - return res + return append(res, cache.checkPeerFlow(region, peers, region.GetLoads())...) } func updateFlow(cache *hotPeerCache, res []*HotPeerStat) []*HotPeerStat { @@ -318,74 +309,74 @@ func TestUpdateHotPeerStat(t *testing.T) { }() // skip interval=0 - interval := 0 + region = region.Clone(core.SetReportInterval(0, 0)) deltaLoads := []float64{0.0, 0.0, 0.0} utils.MinHotThresholds[utils.RegionReadBytes] = 0.0 utils.MinHotThresholds[utils.RegionReadKeys] = 0.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0 - newItem := cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) + newItem := cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) re.Nil(newItem) // new peer, interval is larger than report interval, but no hot - interval = 10 + region = region.Clone(core.SetReportInterval(10, 0)) deltaLoads = []float64{0.0, 0.0, 0.0} utils.MinHotThresholds[utils.RegionReadBytes] = 1.0 utils.MinHotThresholds[utils.RegionReadKeys] = 1.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 1.0 - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) re.Nil(newItem) // new peer, interval is less than report interval - interval = 4 + region = region.Clone(core.SetReportInterval(4, 0)) deltaLoads = []float64{60.0, 60.0, 60.0} utils.MinHotThresholds[utils.RegionReadBytes] = 0.0 utils.MinHotThresholds[utils.RegionReadKeys] = 0.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 0.0 - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) re.NotNil(newItem) - re.Equal(0, newItem.HotDegree) - re.Equal(0, newItem.AntiCount) + re.Equal(0, newItem[0].HotDegree) + re.Equal(0, newItem[0].AntiCount) // sum of interval is less than report interval - interval = 4 + region = region.Clone(core.SetReportInterval(4, 0)) deltaLoads = []float64{60.0, 60.0, 60.0} - cache.updateStat(newItem) - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) - re.Equal(0, newItem.HotDegree) - re.Equal(0, newItem.AntiCount) + cache.updateStat(newItem[0]) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + re.Equal(0, newItem[0].HotDegree) + re.Equal(0, newItem[0].AntiCount) // sum of interval is larger than report interval, and hot - newItem.AntiCount = utils.Read.DefaultAntiCount() - cache.updateStat(newItem) - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) - re.Equal(1, newItem.HotDegree) - re.Equal(2*m, newItem.AntiCount) + newItem[0].AntiCount = utils.Read.DefaultAntiCount() + cache.updateStat(newItem[0]) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + re.Equal(1, newItem[0].HotDegree) + re.Equal(2*m, newItem[0].AntiCount) // sum of interval is less than report interval - cache.updateStat(newItem) - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) - re.Equal(1, newItem.HotDegree) - re.Equal(2*m, newItem.AntiCount) + cache.updateStat(newItem[0]) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + re.Equal(1, newItem[0].HotDegree) + re.Equal(2*m, newItem[0].AntiCount) // sum of interval is larger than report interval, and hot - interval = 10 - cache.updateStat(newItem) - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) - re.Equal(2, newItem.HotDegree) - re.Equal(2*m, newItem.AntiCount) + region = region.Clone(core.SetReportInterval(10, 0)) + cache.updateStat(newItem[0]) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + re.Equal(2, newItem[0].HotDegree) + re.Equal(2*m, newItem[0].AntiCount) // sum of interval is larger than report interval, and cold utils.MinHotThresholds[utils.RegionReadBytes] = 10.0 utils.MinHotThresholds[utils.RegionReadKeys] = 10.0 utils.MinHotThresholds[utils.RegionReadQueryNum] = 10.0 - cache.updateStat(newItem) - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) - re.Equal(1, newItem.HotDegree) - re.Equal(2*m-1, newItem.AntiCount) + cache.updateStat(newItem[0]) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) + re.Equal(1, newItem[0].HotDegree) + re.Equal(2*m-1, newItem[0].AntiCount) // sum of interval is larger than report interval, and cold for i := 0; i < 2*m-1; i++ { - cache.updateStat(newItem) - newItem = cache.checkPeerFlow(core.NewPeerInfo(peer, deltaLoads, uint64(interval)), region) + cache.updateStat(newItem[0]) + newItem = cache.checkPeerFlow(region, []*metapb.Peer{peer}, deltaLoads) } - re.Less(newItem.HotDegree, 0) - re.Equal(0, newItem.AntiCount) - re.Equal(utils.Remove, newItem.actionType) + re.Less(newItem[0].HotDegree, 0) + re.Equal(0, newItem[0].AntiCount) + re.Equal(utils.Remove, newItem[0].actionType) } func TestThresholdWithUpdateHotPeerStat(t *testing.T) { @@ -688,9 +679,8 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { StartTimestamp: start, EndTimestamp: end, })) - newPeer := core.NewPeerInfo(meta.Peers[0], region.GetLoads(), end-start) - stat := cache.checkPeerFlow(newPeer, newRegion) - if stat != nil { + stats := cache.checkPeerFlow(newRegion, newRegion.GetPeers(), nil) + for _, stat := range stats { cache.updateStat(stat) } } @@ -717,22 +707,8 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { func BenchmarkCheckRegionFlow(b *testing.B) { cache := NewHotPeerCache(context.Background(), utils.Read) region := buildRegion(utils.Read, 3, 10) - peerInfos := make([]*core.PeerInfo, 0) - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetLoads(), 10) - peerInfos = append(peerInfos, peerInfo) - } b.ResetTimer() for i := 0; i < b.N; i++ { - items := make([]*HotPeerStat, 0) - for _, peerInfo := range peerInfos { - item := cache.checkPeerFlow(peerInfo, region) - if item != nil { - items = append(items, item) - } - } - for _, ret := range items { - cache.updateStat(ret) - } + cache.checkPeerFlow(region, region.GetPeers(), nil) } } diff --git a/pkg/statistics/utils/kind.go b/pkg/statistics/utils/kind.go index 4d44b8d57e1..463b35c450a 100644 --- a/pkg/statistics/utils/kind.go +++ b/pkg/statistics/utils/kind.go @@ -15,7 +15,7 @@ package utils import ( - "github.com/tikv/pd/pkg/core" + "github.com/pingcap/kvproto/pkg/metapb" ) const ( @@ -231,9 +231,7 @@ func (rw RWType) DefaultAntiCount() int { } // GetLoadRatesFromPeer gets the load rates of the read or write type from PeerInfo. -func (rw RWType) GetLoadRatesFromPeer(peer *core.PeerInfo) []float64 { - deltaLoads := peer.GetLoads() - interval := peer.GetInterval() +func (rw RWType) GetLoadRatesFromPeer(peer *metapb.Peer, deltaLoads []float64, interval uint64) []float64 { loads := make([]float64, DimLen) for dim, k := range rw.RegionStats() { loads[dim] = deltaLoads[k] / float64(interval) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 148b43541a2..870e2495dc3 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -959,8 +959,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest utils.RegionWriteKeys: 0, utils.RegionWriteQueryNum: 0, } - peerInfo := core.NewPeerInfo(peer, loads, interval) - c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads)) } } for _, stat := range stats.GetSnapshotStats() { diff --git a/tools/pd-ctl/tests/hot/hot_test.go b/tools/pd-ctl/tests/hot/hot_test.go index 7661704aa41..c06e42edf60 100644 --- a/tools/pd-ctl/tests/hot/hot_test.go +++ b/tools/pd-ctl/tests/hot/hot_test.go @@ -188,11 +188,10 @@ func (suite *hotTestSuite) checkHot(cluster *pdTests.TestCluster) { Id: 100 + regionIDCounter, StoreId: hotStoreID, } - peerInfo := core.NewPeerInfo(leader, loads, reportInterval) region := core.NewRegionInfo(&metapb.Region{ Id: hotRegionID, }, leader) - hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) + hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads)) testutil.Eventually(re, func() bool { hotPeerStat := getHotPeerStat(utils.Read, hotRegionID, hotStoreID) return hotPeerStat != nil