Skip to content

Commit

Permalink
Merge branch 'master' into refactor-independent
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Oct 25, 2024
2 parents c600920 + 24343e4 commit 4d6164a
Show file tree
Hide file tree
Showing 17 changed files with 399 additions and 364 deletions.
14 changes: 14 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,17 @@ flag_management:
target: 74% # increase it if you want to enforce higher coverage for project, current setting as 74% is for do not let the error be reported and lose the meaning of warning.
- type: patch
target: 74% # increase it if you want to enforce higher coverage for project, current setting as 74% is for do not let the error be reported and lose the meaning of warning.

ignore:
- "tools/pd-analysis"
- "tools/pd-api-bench"
- "tools/pd-backup"
- "tools/pd-heartbeat-bench"
- "tools/pd-recover"
- "tools/pd-simulator"
- "tools/pd-tso-bench"
- "tools/pd-ut"
- "tools/regions-dump"
- "tools/stores-dump"
- "tests"

25 changes: 11 additions & 14 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,21 +184,18 @@ func (c *Cluster) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *sta
return c.hotStat.GetHotPeerStat(rw, regionID, storeID)
}

// RegionReadStats returns hot region's read stats.
// GetHotPeerStats returns the read or write statistics for hot regions.
// It returns a map where the keys are store IDs and the values are slices of HotPeerStat.
// The result only includes peers that are hot enough.
// RegionStats is a thread-safe method
func (c *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat {
// As read stats are reported by store heartbeat, the threshold needs to be adjusted.
threshold := c.persistConfig.GetHotRegionCacheHitsThreshold() *
(utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval)
return c.hotStat.RegionStats(utils.Read, threshold)
}

// RegionWriteStats returns hot region's write stats.
// The result only includes peers that are hot enough.
func (c *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat {
// RegionStats is a thread-safe method
return c.hotStat.RegionStats(utils.Write, c.persistConfig.GetHotRegionCacheHitsThreshold())
// GetHotPeerStats is a thread-safe method.
func (c *Cluster) GetHotPeerStats(rw utils.RWType) map[uint64][]*statistics.HotPeerStat {
threshold := c.persistConfig.GetHotRegionCacheHitsThreshold()
if rw == utils.Read {
// As read stats are reported by store heartbeat, the threshold needs to be adjusted.
threshold = c.persistConfig.GetHotRegionCacheHitsThreshold() *
(utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval)
}
return c.hotStat.GetHotPeerStats(rw, threshold)
}

// BucketsStats returns hot region's buckets stats.
Expand Down
16 changes: 5 additions & 11 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,6 @@ func (mc *Cluster) GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *st
return mc.HotCache.GetHotPeerStat(rw, regionID, storeID)
}

// RegionReadStats returns hot region's read stats.
// The result only includes peers that are hot enough.
func (mc *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat {
// We directly use threshold for read stats for mockCluster
return mc.HotCache.RegionStats(utils.Read, mc.GetHotRegionCacheHitsThreshold())
}

// BucketsStats returns hot region's buckets stats.
func (mc *Cluster) BucketsStats(degree int, regions ...uint64) map[uint64][]*buckets.BucketStat {
task := buckets.NewCollectBucketStatsTask(degree, regions...)
Expand All @@ -164,10 +157,11 @@ func (mc *Cluster) BucketsStats(degree int, regions ...uint64) map[uint64][]*buc
return task.WaitRet(mc.ctx)
}

// RegionWriteStats returns hot region's write stats.
// GetHotPeerStats returns the read or write statistics for hot regions.
// It returns a map where the keys are store IDs and the values are slices of HotPeerStat.
// The result only includes peers that are hot enough.
func (mc *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat {
return mc.HotCache.RegionStats(utils.Write, mc.GetHotRegionCacheHitsThreshold())
func (mc *Cluster) GetHotPeerStats(rw utils.RWType) map[uint64][]*statistics.HotPeerStat {
return mc.HotCache.GetHotPeerStats(rw, mc.GetHotRegionCacheHitsThreshold())
}

// HotRegionsFromStore picks hot regions in specify store.
Expand All @@ -185,7 +179,7 @@ func (mc *Cluster) HotRegionsFromStore(store uint64, kind utils.RWType) []*core.

// hotRegionsFromStore picks hot region in specify store.
func hotRegionsFromStore(w *statistics.HotCache, storeID uint64, kind utils.RWType, minHotDegree int) []*statistics.HotPeerStat {
if stats, ok := w.RegionStats(kind, minHotDegree)[storeID]; ok && len(stats) > 0 {
if stats, ok := w.GetHotPeerStats(kind, minHotDegree)[storeID]; ok && len(stats) > 0 {
return stats
}
return nil
Expand Down
29 changes: 5 additions & 24 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,16 +432,8 @@ func (c *Coordinator) GetHotRegionsByType(typ utils.RWType) *statistics.StoreHot
isTraceFlow := c.cluster.GetSchedulerConfig().IsTraceRegionFlow()
storeLoads := c.cluster.GetStoresLoads()
stores := c.cluster.GetStores()
var infos *statistics.StoreHotPeersInfos
switch typ {
case utils.Write:
regionStats := c.cluster.RegionWriteStats()
infos = statistics.GetHotStatus(stores, storeLoads, regionStats, utils.Write, isTraceFlow)
case utils.Read:
regionStats := c.cluster.RegionReadStats()
infos = statistics.GetHotStatus(stores, storeLoads, regionStats, utils.Read, isTraceFlow)
default:
}
hotPeerStats := c.cluster.GetHotPeerStats(typ)
infos := statistics.GetHotStatus(stores, storeLoads, hotPeerStats, typ, isTraceFlow)
// update params `IsLearner` and `LastUpdateTime`
s := []statistics.StoreHotPeersStat{infos.AsLeader, infos.AsPeer}
for i, stores := range s {
Expand Down Expand Up @@ -505,20 +497,9 @@ func (c *Coordinator) CollectHotSpotMetrics() {
}

func collectHotMetrics(cluster sche.ClusterInformer, stores []*core.StoreInfo, typ utils.RWType) {
var (
kind string
regionStats map[uint64][]*statistics.HotPeerStat
)

switch typ {
case utils.Read:
regionStats = cluster.RegionReadStats()
kind = utils.Read.String()
case utils.Write:
regionStats = cluster.RegionWriteStats()
kind = utils.Write.String()
}
status := statistics.CollectHotPeerInfos(stores, regionStats) // only returns TotalBytesRate,TotalKeysRate,TotalQueryRate,Count
kind := typ.String()
hotPeerStats := cluster.GetHotPeerStats(typ)
status := statistics.CollectHotPeerInfos(stores, hotPeerStats) // only returns TotalBytesRate,TotalKeysRate,TotalQueryRate,Count

for _, s := range stores {
// TODO: pre-allocate gauge metrics
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (s *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.Sche
// update read statistics
// avoid to update read statistics frequently
if time.Since(s.updateReadTime) >= statisticsInterval {
regionRead := cluster.RegionReadStats()
regionRead := cluster.GetHotPeerStats(utils.Read)
prepare(regionRead, utils.Read, constant.LeaderKind)
prepare(regionRead, utils.Read, constant.RegionKind)
s.updateReadTime = time.Now()
Expand All @@ -139,7 +139,7 @@ func (s *baseHotScheduler) prepareForBalance(typ resourceType, cluster sche.Sche
// update write statistics
// avoid to update write statistics frequently
if time.Since(s.updateWriteTime) >= statisticsInterval {
regionWrite := cluster.RegionWriteStats()
regionWrite := cluster.GetHotPeerStats(utils.Write)
prepare(regionWrite, utils.Write, constant.LeaderKind)
prepare(regionWrite, utils.Write, constant.RegionKind)
s.updateWriteTime = time.Now()
Expand Down
32 changes: 16 additions & 16 deletions pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1297,7 +1297,7 @@ func TestHotReadRegionScheduleByteRateOnly(t *testing.T) {
r := tc.HotRegionsFromStore(2, utils.Read)
re.Len(r, 3)
// check hot items
stats := tc.HotCache.RegionStats(utils.Read, 0)
stats := tc.HotCache.GetHotPeerStats(utils.Read, 0)
re.Len(stats, 3)
for _, ss := range stats {
for _, s := range ss {
Expand Down Expand Up @@ -1623,7 +1623,7 @@ func TestHotCacheUpdateCache(t *testing.T) {
// lower than hot read flow rate, but higher than write flow rate
{11, []uint64{1, 2, 3}, 7 * units.KiB, 0, 0},
})
stats := tc.RegionStats(utils.Read, 0)
stats := tc.HotCache.GetHotPeerStats(utils.Read, 0)
re.Len(stats[1], 3)
re.Len(stats[2], 3)
re.Len(stats[3], 3)
Expand All @@ -1632,7 +1632,7 @@ func TestHotCacheUpdateCache(t *testing.T) {
{3, []uint64{2, 1, 3}, 20 * units.KiB, 0, 0},
{11, []uint64{1, 2, 3}, 7 * units.KiB, 0, 0},
})
stats = tc.RegionStats(utils.Read, 0)
stats = tc.HotCache.GetHotPeerStats(utils.Read, 0)
re.Len(stats[1], 3)
re.Len(stats[2], 3)
re.Len(stats[3], 3)
Expand All @@ -1642,15 +1642,15 @@ func TestHotCacheUpdateCache(t *testing.T) {
{5, []uint64{1, 2, 3}, 20 * units.KiB, 0, 0},
{6, []uint64{1, 2, 3}, 0.8 * units.KiB, 0, 0},
})
stats = tc.RegionStats(utils.Write, 0)
stats = tc.HotCache.GetHotPeerStats(utils.Write, 0)
re.Len(stats[1], 2)
re.Len(stats[2], 2)
re.Len(stats[3], 2)

addRegionInfo(tc, utils.Write, []testRegionInfo{
{5, []uint64{1, 2, 5}, 20 * units.KiB, 0, 0},
})
stats = tc.RegionStats(utils.Write, 0)
stats = tc.HotCache.GetHotPeerStats(utils.Write, 0)

re.Len(stats[1], 2)
re.Len(stats[2], 2)
Expand All @@ -1665,7 +1665,7 @@ func TestHotCacheUpdateCache(t *testing.T) {
// lower than hot read flow rate, but higher than write flow rate
{31, []uint64{4, 5, 6}, 7 * units.KiB, 0, 0},
})
stats = tc.RegionStats(utils.Read, 0)
stats = tc.HotCache.GetHotPeerStats(utils.Read, 0)
re.Len(stats[4], 2)
re.Len(stats[5], 1)
re.Empty(stats[6])
Expand All @@ -1684,13 +1684,13 @@ func TestHotCacheKeyThresholds(t *testing.T) {
{1, []uint64{1, 2, 3}, 0, 1, 0},
{2, []uint64{1, 2, 3}, 0, 1 * units.KiB, 0},
})
stats := tc.RegionStats(utils.Read, 0)
stats := tc.HotCache.GetHotPeerStats(utils.Read, 0)
re.Len(stats[1], 1)
addRegionInfo(tc, utils.Write, []testRegionInfo{
{3, []uint64{4, 5, 6}, 0, 1, 0},
{4, []uint64{4, 5, 6}, 0, 1 * units.KiB, 0},
})
stats = tc.RegionStats(utils.Write, 0)
stats = tc.HotCache.GetHotPeerStats(utils.Write, 0)
re.Len(stats[4], 1)
re.Len(stats[5], 1)
re.Len(stats[6], 1)
Expand All @@ -1716,20 +1716,20 @@ func TestHotCacheKeyThresholds(t *testing.T) {

{ // read
addRegionInfo(tc, utils.Read, regions)
stats := tc.RegionStats(utils.Read, 0)
stats := tc.HotCache.GetHotPeerStats(utils.Read, 0)
re.Greater(len(stats[1]), 500)

// for AntiCount
addRegionInfo(tc, utils.Read, regions)
addRegionInfo(tc, utils.Read, regions)
addRegionInfo(tc, utils.Read, regions)
addRegionInfo(tc, utils.Read, regions)
stats = tc.RegionStats(utils.Read, 0)
stats = tc.HotCache.GetHotPeerStats(utils.Read, 0)
re.Len(stats[1], 500)
}
{ // write
addRegionInfo(tc, utils.Write, regions)
stats := tc.RegionStats(utils.Write, 0)
stats := tc.HotCache.GetHotPeerStats(utils.Write, 0)
re.Greater(len(stats[1]), 500)
re.Greater(len(stats[2]), 500)
re.Greater(len(stats[3]), 500)
Expand All @@ -1739,7 +1739,7 @@ func TestHotCacheKeyThresholds(t *testing.T) {
addRegionInfo(tc, utils.Write, regions)
addRegionInfo(tc, utils.Write, regions)
addRegionInfo(tc, utils.Write, regions)
stats = tc.RegionStats(utils.Write, 0)
stats = tc.HotCache.GetHotPeerStats(utils.Write, 0)
re.Len(stats[1], 500)
re.Len(stats[2], 500)
re.Len(stats[3], 500)
Expand All @@ -1766,7 +1766,7 @@ func TestHotCacheByteAndKey(t *testing.T) {
}
{ // read
addRegionInfo(tc, utils.Read, regions)
stats := tc.RegionStats(utils.Read, 0)
stats := tc.HotCache.GetHotPeerStats(utils.Read, 0)
re.Len(stats[1], 500)

addRegionInfo(tc, utils.Read, []testRegionInfo{
Expand All @@ -1775,12 +1775,12 @@ func TestHotCacheByteAndKey(t *testing.T) {
{10003, []uint64{1, 2, 3}, 10 * units.KiB, 500 * units.KiB, 0},
{10004, []uint64{1, 2, 3}, 500 * units.KiB, 500 * units.KiB, 0},
})
stats = tc.RegionStats(utils.Read, 0)
stats = tc.HotCache.GetHotPeerStats(utils.Read, 0)
re.Len(stats[1], 503)
}
{ // write
addRegionInfo(tc, utils.Write, regions)
stats := tc.RegionStats(utils.Write, 0)
stats := tc.HotCache.GetHotPeerStats(utils.Write, 0)
re.Len(stats[1], 500)
re.Len(stats[2], 500)
re.Len(stats[3], 500)
Expand All @@ -1790,7 +1790,7 @@ func TestHotCacheByteAndKey(t *testing.T) {
{10003, []uint64{1, 2, 3}, 10 * units.KiB, 500 * units.KiB, 0},
{10004, []uint64{1, 2, 3}, 500 * units.KiB, 500 * units.KiB, 0},
})
stats = tc.RegionStats(utils.Write, 0)
stats = tc.HotCache.GetHotPeerStats(utils.Write, 0)
re.Len(stats[1], 503)
re.Len(stats[2], 503)
re.Len(stats[3], 503)
Expand Down
7 changes: 4 additions & 3 deletions pkg/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,12 @@ func (w *HotCache) CheckReadAsync(task func(cache *HotPeerCache)) bool {
}
}

// RegionStats returns hot items according to kind
func (w *HotCache) RegionStats(kind utils.RWType, minHotDegree int) map[uint64][]*HotPeerStat {
// RegionStats returns the read or write statistics for hot regions.
// It returns a map where the keys are store IDs and the values are slices of HotPeerStat.
func (w *HotCache) GetHotPeerStats(kind utils.RWType, minHotDegree int) map[uint64][]*HotPeerStat {
ret := make(chan map[uint64][]*HotPeerStat, 1)
collectRegionStatsTask := func(cache *HotPeerCache) {
ret <- cache.RegionStats(minHotDegree)
ret <- cache.GetHotPeerStats(minHotDegree)
}
var succ bool
switch kind {
Expand Down
6 changes: 3 additions & 3 deletions pkg/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ func NewHotPeerCache(ctx context.Context, kind utils.RWType) *HotPeerCache {
}
}

// TODO: rename RegionStats as PeerStats
// RegionStats returns hot items
func (f *HotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat {
// GetHotPeerStats returns the read or write statistics for hot regions.
// It returns a map where the keys are store IDs and the values are slices of HotPeerStat.
func (f *HotPeerCache) GetHotPeerStats(minHotDegree int) map[uint64][]*HotPeerStat {
res := make(map[uint64][]*HotPeerStat)
defaultAntiCount := f.kind.DefaultAntiCount()
for storeID, peers := range f.peersOfStore {
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/hot_peer_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestStoreTimeUnsync(t *testing.T) {
region := buildRegion(utils.Write, 3, interval)
checkAndUpdate(re, cache, region, 3)
{
stats := cache.RegionStats(0)
stats := cache.GetHotPeerStats(0)
re.Len(stats, 3)
for _, s := range stats {
re.Len(s, 1)
Expand Down
8 changes: 3 additions & 5 deletions pkg/statistics/region_stat_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ import (
type RegionStatInformer interface {
GetHotPeerStat(rw utils.RWType, regionID, storeID uint64) *HotPeerStat
IsRegionHot(region *core.RegionInfo) bool
// RegionWriteStats return the storeID -> write stat of peers on this store.
// GetHotPeerStats return the read or write statistics for hot regions.
// It returns a map where the keys are store IDs and the values are slices of HotPeerStat.
// The result only includes peers that are hot enough.
RegionWriteStats() map[uint64][]*HotPeerStat
// RegionReadStats return the storeID -> read stat of peers on this store.
// The result only includes peers that are hot enough.
RegionReadStats() map[uint64][]*HotPeerStat
GetHotPeerStats(rw utils.RWType) map[uint64][]*HotPeerStat
}
Loading

0 comments on commit 4d6164a

Please sign in to comment.