Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed May 24, 2024
1 parent 43e9492 commit 52e1681
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 191 deletions.
7 changes: 1 addition & 6 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,7 @@ type Cluster interface {
func HandleStatsAsync(c Cluster, region *core.RegionInfo) {
c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval)
c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region))
}
c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(region, region.GetWriteLoads()))
c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region)
}

Expand Down
31 changes: 0 additions & 31 deletions pkg/core/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,34 +77,3 @@ func CountInJointState(peers ...*metapb.Peer) int {
}
return count
}

// PeerInfo provides peer information
type PeerInfo struct {
*metapb.Peer
loads []float64
interval uint64
}

// NewPeerInfo creates PeerInfo
func NewPeerInfo(meta *metapb.Peer, loads []float64, interval uint64) *PeerInfo {
return &PeerInfo{
Peer: meta,
loads: loads,
interval: interval,
}
}

// GetLoads provides loads
func (p *PeerInfo) GetLoads() []float64 {
return p.loads
}

// GetPeerID provides peer id
func (p *PeerInfo) GetPeerID() uint64 {
return p.GetId()
}

// GetInterval returns reporting interval
func (p *PeerInfo) GetInterval() uint64 {
return p.interval
}
3 changes: 1 addition & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,7 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq
utils.RegionWriteKeys: 0,
utils.RegionWriteQueryNum: 0,
}
peerInfo := core.NewPeerInfo(peer, loads, interval)
c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region))
c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(region, loads))
}

// Here we will compare the reported regions with the previous hot peers to decide if it is still hot.
Expand Down
32 changes: 3 additions & 29 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,49 +894,23 @@ func (mc *Cluster) CheckRegionRead(region *core.RegionInfo) []*statistics.HotPee
items := make([]*statistics.HotPeerStat, 0)
expiredItems := mc.HotCache.ExpiredReadItems(region)
items = append(items, expiredItems...)
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval)
item := mc.HotCache.CheckReadPeerSync(peerInfo, region)
if item != nil {
items = append(items, item)
}
}
return items
return append(items, mc.HotCache.CheckReadPeerSync(region, nil)...)
}

// CheckRegionWrite checks region write info with all peers
func (mc *Cluster) CheckRegionWrite(region *core.RegionInfo) []*statistics.HotPeerStat {
items := make([]*statistics.HotPeerStat, 0)
expiredItems := mc.HotCache.ExpiredWriteItems(region)
items = append(items, expiredItems...)
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval)
item := mc.HotCache.CheckWritePeerSync(peerInfo, region)
if item != nil {
items = append(items, item)
}
}
return items
return append(items, mc.HotCache.CheckWritePeerSync(region, nil)...)
}

// CheckRegionLeaderRead checks region read info with leader peer
func (mc *Cluster) CheckRegionLeaderRead(region *core.RegionInfo) []*statistics.HotPeerStat {
items := make([]*statistics.HotPeerStat, 0)
expiredItems := mc.HotCache.ExpiredReadItems(region)
items = append(items, expiredItems...)
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
peer := region.GetLeader()
peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval)
item := mc.HotCache.CheckReadPeerSync(peerInfo, region)
if item != nil {
items = append(items, item)
}
return items
return append(items, mc.HotCache.CheckReadPeerSync(region, nil)...)
}

// ObserveRegionsStats records the current stores stats from region stats.
Expand Down
8 changes: 4 additions & 4 deletions pkg/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,14 @@ func (w *HotCache) Update(item *HotPeerStat, kind utils.RWType) {

// CheckWritePeerSync checks the write status, returns update items.
// This is used for mockcluster, for test purpose.
func (w *HotCache) CheckWritePeerSync(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat {
return w.writeCache.checkPeerFlow(peer, region)
func (w *HotCache) CheckWritePeerSync(region *core.RegionInfo, loads []float64) []*HotPeerStat {
return w.writeCache.checkPeerFlow(region, region.GetPeers(), loads)
}

// CheckReadPeerSync checks the read status, returns update items.
// This is used for mockcluster, for test purpose.
func (w *HotCache) CheckReadPeerSync(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat {
return w.readCache.checkPeerFlow(peer, region)
func (w *HotCache) CheckReadPeerSync(region *core.RegionInfo, loads []float64) []*HotPeerStat {
return w.readCache.checkPeerFlow(region, region.GetPeers(), loads)
}

// ExpiredReadItems returns the read items which are already expired.
Expand Down
10 changes: 5 additions & 5 deletions pkg/statistics/hot_cache_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,21 @@ type FlowItemTask interface {
}

type checkPeerTask struct {
peerInfo *core.PeerInfo
regionInfo *core.RegionInfo
loads []float64
}

// NewCheckPeerTask creates task to update peerInfo
func NewCheckPeerTask(peerInfo *core.PeerInfo, regionInfo *core.RegionInfo) FlowItemTask {
func NewCheckPeerTask(regionInfo *core.RegionInfo, loads []float64) FlowItemTask {
return &checkPeerTask{
peerInfo: peerInfo,
regionInfo: regionInfo,
loads: loads,
}
}

func (t *checkPeerTask) runTask(cache *hotPeerCache) {
stat := cache.checkPeerFlow(t.peerInfo, t.regionInfo)
if stat != nil {
stats := cache.checkPeerFlow(t.regionInfo, t.regionInfo.GetPeers(), t.loads)
for _, stat := range stats {
cache.updateStat(stat)
}
}
Expand Down
95 changes: 53 additions & 42 deletions pkg/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,58 +174,69 @@ func (f *hotPeerCache) collectExpiredItems(region *core.RegionInfo) []*HotPeerSt
// checkPeerFlow checks the flow information of a peer.
// Notice: checkPeerFlow couldn't be used concurrently.
// checkPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here.
func (f *hotPeerCache) checkPeerFlow(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat {
interval := peer.GetInterval()
func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Peer, deltaLoads []float64) []*HotPeerStat {
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
if Denoising && interval < HotRegionReportMinInterval { // for test or simulator purpose
return nil
}
storeID := peer.GetStoreId()
deltaLoads := peer.GetLoads()

if deltaLoads == nil {
deltaLoads = region.GetLoads()
}
f.collectPeerMetrics(deltaLoads, interval) // update metrics
regionID := region.GetID()
oldItem := f.getOldHotPeerStat(regionID, storeID)

// check whether the peer is allowed to be inherited
source := utils.Direct
if oldItem == nil {
for _, storeID := range f.getAllStoreIDs(region) {
oldItem = f.getOldHotPeerStat(regionID, storeID)
if oldItem != nil && oldItem.allowInherited {
source = utils.Inherit
break

var stats []*HotPeerStat
for _, peer := range peers {
storeID := peer.GetStoreId()
oldItem := f.getOldHotPeerStat(regionID, storeID)

// check whether the peer is allowed to be inherited
source := utils.Direct
if oldItem == nil {
for _, storeID := range f.getAllStoreIDs(region) {
oldItem = f.getOldHotPeerStat(regionID, storeID)
if oldItem != nil && oldItem.allowInherited {
source = utils.Inherit
break
}
}
}
}

// check new item whether is hot
if oldItem == nil {
regionStats := f.kind.RegionStats()
thresholds := f.calcHotThresholds(storeID)
isHot := slice.AnyOf(regionStats, func(i int) bool {
return deltaLoads[regionStats[i]]/float64(interval) >= thresholds[i]
})
if !isHot {
return nil
// check new item whether is hot
if oldItem == nil {
regionStats := f.kind.RegionStats()
thresholds := f.calcHotThresholds(storeID)
isHot := slice.AnyOf(regionStats, func(i int) bool {
return deltaLoads[regionStats[i]]/float64(interval) >= thresholds[i]
})
if !isHot {
continue
}
}
}

peers := region.GetPeers()
newItem := &HotPeerStat{
StoreID: storeID,
RegionID: regionID,
Loads: f.kind.GetLoadRatesFromPeer(peer),
isLeader: region.GetLeader().GetStoreId() == storeID,
actionType: utils.Update,
stores: make([]uint64, len(peers)),
}
for i, peer := range peers {
newItem.stores[i] = peer.GetStoreId()
}

if oldItem == nil {
return f.updateNewHotPeerStat(newItem, deltaLoads, time.Duration(interval)*time.Second)
newItem := &HotPeerStat{
StoreID: storeID,
RegionID: regionID,
Loads: f.kind.GetLoadRatesFromPeer(peer, deltaLoads, interval),
isLeader: region.GetLeader().GetStoreId() == storeID,
actionType: utils.Update,
stores: make([]uint64, len(peers)),
}
for i, peer := range peers {
newItem.stores[i] = peer.GetStoreId()
}
if oldItem == nil {
if stat := f.updateNewHotPeerStat(newItem, deltaLoads, time.Duration(interval)*time.Second); stat != nil {
stats = append(stats, stat)
}
continue
}
if stat := f.updateHotPeerStat(region, newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second, source); stat != nil {
stats = append(stats, stat)
}
}
return f.updateHotPeerStat(region, newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second, source)
return stats
}

// checkColdPeer checks the collect the un-heartbeat peer and maintain it.
Expand Down
Loading

0 comments on commit 52e1681

Please sign in to comment.