Skip to content

Commit

Permalink
*: batch process peer task (#8213)
Browse files Browse the repository at this point in the history
ref #7897

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored May 28, 2024
1 parent beb91c1 commit 9d580d0
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 188 deletions.
7 changes: 1 addition & 6 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,7 @@ type Cluster interface {
func HandleStatsAsync(c Cluster, region *core.RegionInfo) {
c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval)
c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region))
}
c.GetHotStat().CheckWriteAsync(statistics.NewCheckWritePeerTask(region))
c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region)
}

Expand Down
31 changes: 0 additions & 31 deletions pkg/core/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,34 +77,3 @@ func CountInJointState(peers ...*metapb.Peer) int {
}
return count
}

// PeerInfo provides peer information
type PeerInfo struct {
*metapb.Peer
loads []float64
interval uint64
}

// NewPeerInfo creates PeerInfo
func NewPeerInfo(meta *metapb.Peer, loads []float64, interval uint64) *PeerInfo {
return &PeerInfo{
Peer: meta,
loads: loads,
interval: interval,
}
}

// GetLoads provides loads
func (p *PeerInfo) GetLoads() []float64 {
return p.loads
}

// GetPeerID provides peer id
func (p *PeerInfo) GetPeerID() uint64 {
return p.GetId()
}

// GetInterval returns reporting interval
func (p *PeerInfo) GetInterval() uint64 {
return p.interval
}
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/schedulingpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -442,8 +443,7 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq
utils.RegionWriteKeys: 0,
utils.RegionWriteQueryNum: 0,
}
peerInfo := core.NewPeerInfo(peer, loads, interval)
c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region))
c.hotStat.CheckReadAsync(statistics.NewCheckReadPeerTask(region, []*metapb.Peer{peer}, loads, interval))
}

// Here we will compare the reported regions with the previous hot peers to decide if it is still hot.
Expand Down
26 changes: 3 additions & 23 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,14 +896,7 @@ func (mc *Cluster) CheckRegionRead(region *core.RegionInfo) []*statistics.HotPee
items = append(items, expiredItems...)
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval)
item := mc.HotCache.CheckReadPeerSync(peerInfo, region)
if item != nil {
items = append(items, item)
}
}
return items
return append(items, mc.HotCache.CheckReadPeerSync(region, region.GetPeers(), region.GetLoads(), interval)...)
}

// CheckRegionWrite checks region write info with all peers
Expand All @@ -913,14 +906,7 @@ func (mc *Cluster) CheckRegionWrite(region *core.RegionInfo) []*statistics.HotPe
items = append(items, expiredItems...)
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval)
item := mc.HotCache.CheckWritePeerSync(peerInfo, region)
if item != nil {
items = append(items, item)
}
}
return items
return append(items, mc.HotCache.CheckWritePeerSync(region, region.GetPeers(), region.GetLoads(), interval)...)
}

// CheckRegionLeaderRead checks region read info with leader peer
Expand All @@ -930,13 +916,7 @@ func (mc *Cluster) CheckRegionLeaderRead(region *core.RegionInfo) []*statistics.
items = append(items, expiredItems...)
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
peer := region.GetLeader()
peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval)
item := mc.HotCache.CheckReadPeerSync(peerInfo, region)
if item != nil {
items = append(items, item)
}
return items
return append(items, mc.HotCache.CheckReadPeerSync(region, []*metapb.Peer{region.GetLeader()}, region.GetLoads(), interval)...)
}

// ObserveRegionsStats records the current stores stats from region stats.
Expand Down
9 changes: 5 additions & 4 deletions pkg/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package statistics
import (
"context"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/smallnest/chanx"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/statistics/utils"
Expand Down Expand Up @@ -172,14 +173,14 @@ func (w *HotCache) Update(item *HotPeerStat, kind utils.RWType) {

// CheckWritePeerSync checks the write status, returns update items.
// This is used for mockcluster, for test purpose.
func (w *HotCache) CheckWritePeerSync(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat {
return w.writeCache.checkPeerFlow(peer, region)
func (w *HotCache) CheckWritePeerSync(region *core.RegionInfo, peers []*metapb.Peer, loads []float64, interval uint64) []*HotPeerStat {
return w.writeCache.checkPeerFlow(region, peers, loads, interval)
}

// CheckReadPeerSync checks the read status, returns update items.
// This is used for mockcluster, for test purpose.
func (w *HotCache) CheckReadPeerSync(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat {
return w.readCache.checkPeerFlow(peer, region)
func (w *HotCache) CheckReadPeerSync(region *core.RegionInfo, peers []*metapb.Peer, loads []float64, interval uint64) []*HotPeerStat {
return w.readCache.checkPeerFlow(region, peers, loads, interval)
}

// ExpiredReadItems returns the read items which are already expired.
Expand Down
43 changes: 34 additions & 9 deletions pkg/statistics/hot_cache_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package statistics
import (
"context"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core"
)

Expand All @@ -25,22 +26,46 @@ type FlowItemTask interface {
runTask(cache *hotPeerCache)
}

type checkPeerTask struct {
peerInfo *core.PeerInfo
type checkReadPeerTask struct {
regionInfo *core.RegionInfo
peers []*metapb.Peer
loads []float64
interval uint64
}

// NewCheckPeerTask creates task to update peerInfo
func NewCheckPeerTask(peerInfo *core.PeerInfo, regionInfo *core.RegionInfo) FlowItemTask {
return &checkPeerTask{
peerInfo: peerInfo,
// NewCheckReadPeerTask creates task to update peerInfo
func NewCheckReadPeerTask(regionInfo *core.RegionInfo, peers []*metapb.Peer, loads []float64, interval uint64) FlowItemTask {
return &checkReadPeerTask{
regionInfo: regionInfo,
peers: peers,
loads: loads,
interval: interval,
}
}

func (t *checkPeerTask) runTask(cache *hotPeerCache) {
stat := cache.checkPeerFlow(t.peerInfo, t.regionInfo)
if stat != nil {
func (t *checkReadPeerTask) runTask(cache *hotPeerCache) {
stats := cache.checkPeerFlow(t.regionInfo, t.peers, t.loads, t.interval)
for _, stat := range stats {
cache.updateStat(stat)
}
}

type checkWritePeerTask struct {
region *core.RegionInfo
}

// NewCheckWritePeerTask creates task to update peerInfo
func NewCheckWritePeerTask(region *core.RegionInfo) FlowItemTask {
return &checkWritePeerTask{
region: region,
}
}

func (t *checkWritePeerTask) runTask(cache *hotPeerCache) {
reportInterval := t.region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
stats := cache.checkPeerFlow(t.region, t.region.GetPeers(), t.region.GetWriteLoads(), interval)
for _, stat := range stats {
cache.updateStat(stat)
}
}
Expand Down
87 changes: 45 additions & 42 deletions pkg/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,58 +174,61 @@ func (f *hotPeerCache) collectExpiredItems(region *core.RegionInfo) []*HotPeerSt
// checkPeerFlow checks the flow information of a peer.
// Notice: checkPeerFlow couldn't be used concurrently.
// checkPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here.
func (f *hotPeerCache) checkPeerFlow(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat {
interval := peer.GetInterval()
func (f *hotPeerCache) checkPeerFlow(region *core.RegionInfo, peers []*metapb.Peer, deltaLoads []float64, interval uint64) []*HotPeerStat {
if Denoising && interval < HotRegionReportMinInterval { // for test or simulator purpose
return nil
}
storeID := peer.GetStoreId()
deltaLoads := peer.GetLoads()

f.collectPeerMetrics(deltaLoads, interval) // update metrics
regionID := region.GetID()
oldItem := f.getOldHotPeerStat(regionID, storeID)

// check whether the peer is allowed to be inherited
source := utils.Direct
if oldItem == nil {
for _, storeID := range f.getAllStoreIDs(region) {
oldItem = f.getOldHotPeerStat(regionID, storeID)
if oldItem != nil && oldItem.allowInherited {
source = utils.Inherit
break

regionPeers := region.GetPeers()
stats := make([]*HotPeerStat, 0, len(peers))
for _, peer := range peers {
storeID := peer.GetStoreId()
oldItem := f.getOldHotPeerStat(regionID, storeID)

// check whether the peer is allowed to be inherited
source := utils.Direct
if oldItem == nil {
for _, storeID := range f.getAllStoreIDs(region) {
oldItem = f.getOldHotPeerStat(regionID, storeID)
if oldItem != nil && oldItem.allowInherited {
source = utils.Inherit
break
}
}
}
}

// check new item whether is hot
if oldItem == nil {
regionStats := f.kind.RegionStats()
thresholds := f.calcHotThresholds(storeID)
isHot := slice.AnyOf(regionStats, func(i int) bool {
return deltaLoads[regionStats[i]]/float64(interval) >= thresholds[i]
})
if !isHot {
return nil
// check new item whether is hot
if oldItem == nil {
regionStats := f.kind.RegionStats()
thresholds := f.calcHotThresholds(storeID)
isHot := slice.AnyOf(regionStats, func(i int) bool {
return deltaLoads[regionStats[i]]/float64(interval) >= thresholds[i]
})
if !isHot {
continue
}
}
}

peers := region.GetPeers()
newItem := &HotPeerStat{
StoreID: storeID,
RegionID: regionID,
Loads: f.kind.GetLoadRatesFromPeer(peer),
isLeader: region.GetLeader().GetStoreId() == storeID,
actionType: utils.Update,
stores: make([]uint64, len(peers)),
}
for i, peer := range peers {
newItem.stores[i] = peer.GetStoreId()
}

if oldItem == nil {
return f.updateNewHotPeerStat(newItem, deltaLoads, time.Duration(interval)*time.Second)
newItem := &HotPeerStat{
StoreID: storeID,
RegionID: regionID,
Loads: f.kind.GetLoadRates(deltaLoads, interval),
isLeader: region.GetLeader().GetStoreId() == storeID,
actionType: utils.Update,
stores: make([]uint64, len(regionPeers)),
}
for i, peer := range regionPeers {
newItem.stores[i] = peer.GetStoreId()
}
if oldItem == nil {
stats = append(stats, f.updateNewHotPeerStat(newItem, deltaLoads, time.Duration(interval)*time.Second))
continue
}
stats = append(stats, f.updateHotPeerStat(region, newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second, source))
}
return f.updateHotPeerStat(region, newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second, source)
return stats
}

// checkColdPeer checks the collect the un-heartbeat peer and maintain it.
Expand Down
Loading

0 comments on commit 9d580d0

Please sign in to comment.