diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 1ca5b5cc665..00759d9b902 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -65,25 +65,14 @@ type Cluster struct { func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster { bc := core.NewBasicCluster() c := &Cluster{ -<<<<<<< HEAD ctx: ctx, - BasicCluster: core.NewBasicCluster(), + BasicCluster: bc, IDAllocator: mockid.NewIDAllocator(), - HotStat: statistics.NewHotStat(ctx), + HotStat: statistics.NewHotStat(ctx, bc), HotBucketCache: buckets.NewBucketsCache(ctx), PersistOptions: opts, suspectRegions: map[uint64]struct{}{}, Storage: storage.NewStorageWithMemoryBackend(), -======= - ctx: ctx, - BasicCluster: bc, - IDAllocator: mockid.NewIDAllocator(), - HotStat: statistics.NewHotStat(ctx, bc), - HotBucketCache: buckets.NewBucketsCache(ctx), - PersistOptions: opts, - pendingProcessedRegions: map[uint64]struct{}{}, - Storage: storage.NewStorageWithMemoryBackend(), ->>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) } if c.PersistOptions.GetReplicationConfig().EnablePlacementRules { c.initRuleManager() diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 090f430b2ab..29a4417344f 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -1657,13 +1657,10 @@ func TestHotCacheUpdateCache(t *testing.T) { re := require.New(t) cancel, _, tc, _ := prepareSchedulersTest() defer cancel() -<<<<<<< HEAD tc.SetHotRegionCacheHitsThreshold(0) -======= - for i := range 3 { + for i := 0; i < 3; i++ { tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) } ->>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) // For read flow addRegionInfo(tc, utils.Read, []testRegionInfo{ @@ -1730,13 +1727,10 @@ func TestHotCacheKeyThresholds(t *testing.T) { { // only a few regions cancel, _, tc, _ := prepareSchedulersTest() defer cancel() -<<<<<<< HEAD tc.SetHotRegionCacheHitsThreshold(0) -======= - for i := range 6 { + for i := 0; i < 6; i++ { tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) } ->>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) addRegionInfo(tc, utils.Read, []testRegionInfo{ {1, []uint64{1, 2, 3}, 0, 1, 0}, {2, []uint64{1, 2, 3}, 0, 1 * units.KiB, 0}, @@ -1755,7 +1749,7 @@ func TestHotCacheKeyThresholds(t *testing.T) { { // many regions cancel, _, tc, _ := prepareSchedulersTest() defer cancel() - for i := range 3 { + for i := 0; i < 3; i++ { tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) } regions := []testRegionInfo{} @@ -1811,13 +1805,10 @@ func TestHotCacheByteAndKey(t *testing.T) { re := require.New(t) cancel, _, tc, _ := prepareSchedulersTest() defer cancel() -<<<<<<< HEAD tc.SetHotRegionCacheHitsThreshold(0) -======= - for i := range 3 { + for i := 0; i < 3; i++ { tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) } ->>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) statistics.ThresholdsUpdateInterval = 0 defer func() { statistics.ThresholdsUpdateInterval = 8 * time.Second @@ -1944,7 +1935,7 @@ func TestHotCacheCheckRegionFlow(t *testing.T) { func checkHotCacheCheckRegionFlow(re *require.Assertions, testCase testHotCacheCheckRegionFlowCase, enablePlacementRules bool) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - for i := range 3 { + for i := 0; i < 3; i++ { tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) } tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) @@ -2022,7 +2013,7 @@ func TestHotCacheCheckRegionFlowWithDifferentThreshold(t *testing.T) { func checkHotCacheCheckRegionFlowWithDifferentThreshold(re *require.Assertions, enablePlacementRules bool) { cancel, _, tc, _ := prepareSchedulersTest() defer cancel() - for i := range 3 { + for i := 0; i < 3; i++ { tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) } tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) diff --git a/pkg/statistics/hot_cache_test.go b/pkg/statistics/hot_cache_test.go deleted file mode 100644 index b0232b658d4..00000000000 --- a/pkg/statistics/hot_cache_test.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2024 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package statistics - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" - "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/statistics/utils" -) - -func TestIsHot(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - for i := utils.RWType(0); i < utils.RWTypeLen; i++ { - cluster := core.NewBasicCluster() - cache := NewHotCache(ctx, cluster) - region := buildRegion(cluster, i, 3, 60) - stats := cache.CheckReadPeerSync(region, region.GetPeers(), []float64{100000000, 1000, 1000}, 60) - cache.Update(stats[0], i) - for range 100 { - re.True(cache.IsRegionHot(region, 1)) - } - } -} diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index d64c0d58319..f5700f3fa83 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -71,15 +71,9 @@ type hotPeerCache struct { lastGCTime time.Time } -<<<<<<< HEAD // NewHotPeerCache creates a hotPeerCache -func NewHotPeerCache(ctx context.Context, kind utils.RWType) *hotPeerCache { +func NewHotPeerCache(ctx context.Context, cluster *core.BasicCluster, kind utils.RWType) *hotPeerCache { return &hotPeerCache{ -======= -// NewHotPeerCache creates a HotPeerCache -func NewHotPeerCache(ctx context.Context, cluster *core.BasicCluster, kind utils.RWType) *HotPeerCache { - return &HotPeerCache{ ->>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) kind: kind, cluster: cluster, peersOfStore: make(map[uint64]*utils.TopN), @@ -553,7 +547,7 @@ func (f *hotPeerCache) removeItem(item *HotPeerStat) { } } -func (f *HotPeerCache) gc() { +func (f *hotPeerCache) gc() { if time.Since(f.lastGCTime) < f.topNTTL { return } diff --git a/pkg/statistics/hot_peer_cache_test.go b/pkg/statistics/hot_peer_cache_test.go index 93484089dc6..8e69e4e4141 100644 --- a/pkg/statistics/hot_peer_cache_test.go +++ b/pkg/statistics/hot_peer_cache_test.go @@ -33,26 +33,6 @@ import ( "github.com/tikv/pd/pkg/utils/typeutil" ) -<<<<<<< HEAD -func TestStoreTimeUnsync(t *testing.T) { - re := require.New(t) - cache := NewHotPeerCache(context.Background(), utils.Write) - intervals := []uint64{120, 60} - for _, interval := range intervals { - region := buildRegion(utils.Write, 3, interval) - checkAndUpdate(re, cache, region, 3) - { - stats := cache.RegionStats(0) - re.Len(stats, 3) - for _, s := range stats { - re.Len(s, 1) - } - } - } -} - -======= ->>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) type operator int const ( @@ -272,7 +252,7 @@ func getIDAllocator() *mockid.IDAllocator { func buildRegion(cluster *core.BasicCluster, kind utils.RWType, peerCount int, interval uint64) (region *core.RegionInfo) { peers := make([]*metapb.Peer, 0, peerCount) - for range peerCount { + for i := 0; i < peerCount; i++ { id, _ := getIDAllocator().Alloc() storeID, _ := getIDAllocator().Alloc() peers = append(peers, &metapb.Peer{ @@ -698,12 +678,8 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { cluster := core.NewBasicCluster() cache := NewHotPeerCache(context.Background(), cluster, utils.Write) now := time.Now() -<<<<<<< HEAD - for id := uint64(0); id < 100; id++ { -======= storeID := uint64(1) - for id := range uint64(100) { ->>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) + for id := uint64(0); id < 100; id++ { meta := &metapb.Region{ Id: id, Peers: []*metapb.Peer{{Id: id, StoreId: storeID}}, @@ -778,7 +754,7 @@ func TestDifferentReportInterval(t *testing.T) { for _, interval := range []uint64{120, 60, 30} { region = region.Clone(core.SetReportInterval(0, interval)) checkAndUpdate(re, cache, region, 3) - stats := cache.GetHotPeerStats(0) + stats := cache.RegionStats(0) re.Len(stats, 3) for _, s := range stats { re.Len(s, 1) @@ -787,19 +763,14 @@ func TestDifferentReportInterval(t *testing.T) { } func BenchmarkCheckRegionFlow(b *testing.B) { -<<<<<<< HEAD - cache := NewHotPeerCache(context.Background(), utils.Read) - region := buildRegion(utils.Read, 3, 10) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) + region := buildRegion(cluster, utils.Read, 3, 10) peerInfos := make([]*core.PeerInfo, 0) for _, peer := range region.GetPeers() { peerInfo := core.NewPeerInfo(peer, region.GetLoads(), 10) peerInfos = append(peerInfos, peerInfo) } -======= - cluster := core.NewBasicCluster() - cache := NewHotPeerCache(context.Background(), cluster, utils.Read) - region := buildRegion(cluster, utils.Read, 3, 10) ->>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) b.ResetTimer() for i := 0; i < b.N; i++ { items := make([]*HotPeerStat, 0) diff --git a/pkg/statistics/utils/topn.go b/pkg/statistics/utils/topn.go index bb3029f1ace..c010f1f8481 100644 --- a/pkg/statistics/utils/topn.go +++ b/pkg/statistics/utils/topn.go @@ -96,12 +96,7 @@ func (tn *TopN) Put(item TopNItem) (isUpdate bool) { for _, stn := range tn.topns { isUpdate = stn.Put(item) } -<<<<<<< HEAD tn.ttlLst.Put(item.ID()) - tn.maintain() -======= - tn.ttlLst.put(item.ID()) ->>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) return } @@ -119,28 +114,16 @@ func (tn *TopN) Remove(id uint64) (item TopNItem) { for _, stn := range tn.topns { item = stn.Remove(id) } -<<<<<<< HEAD _ = tn.ttlLst.Remove(id) - tn.maintain() - return -} - -func (tn *TopN) maintain() { - for _, id := range tn.ttlLst.TakeExpired() { - for _, stn := range tn.topns { - stn.Remove(id) -======= - _ = tn.ttlLst.remove(id) return } func (tn *TopN) maintain() []uint64 { ids := make([]uint64, 0) - for _, id := range tn.ttlLst.takeExpired() { + for _, id := range tn.ttlLst.TakeExpired() { for _, stn := range tn.topns { - stn.remove(id) + stn.Remove(id) ids = append(ids, id) ->>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) } } return ids diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index fb381a4f63f..c65369fa63a 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -604,7 +604,7 @@ func TestRegionHeartbeatHotStat(t *testing.T) { cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) stores := newTestStores(4, "2.0.0") for _, store := range stores { - cluster.PutStore(store) + cluster.PutStore(store.GetMeta()) } peers := []*metapb.Peer{ { diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index 6f73fbf2f6b..6a63060022f 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -67,13 +67,8 @@ func newSchedulingController(parentCtx context.Context, basicCluster *core.Basic BasicCluster: basicCluster, opt: opt, labelStats: statistics.NewLabelStatistics(), -<<<<<<< HEAD - hotStat: statistics.NewHotStat(parentCtx), - slowStat: statistics.NewSlowStat(parentCtx), -======= hotStat: statistics.NewHotStat(parentCtx, basicCluster), - slowStat: statistics.NewSlowStat(), ->>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702)) + slowStat: statistics.NewSlowStat(parentCtx), regionStats: statistics.NewRegionStatistics(basicCluster, opt, ruleManager), } }