Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#8702
Browse files Browse the repository at this point in the history
close tikv#8698

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lhy1024 authored and ti-chi-bot committed Dec 11, 2024
1 parent 57d5bdc commit 70bb1e1
Show file tree
Hide file tree
Showing 12 changed files with 274 additions and 49 deletions.
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig,
ruleManager: ruleManager,
labelerManager: labelerManager,
persistConfig: persistConfig,
hotStat: statistics.NewHotStat(ctx),
hotStat: statistics.NewHotStat(ctx, basicCluster),
labelStats: statistics.NewLabelStatistics(),
regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager),
storage: storage,
Expand Down
12 changes: 12 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ type Cluster struct {

// NewCluster creates a new Cluster
func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
bc := core.NewBasicCluster()
c := &Cluster{
<<<<<<< HEAD
ctx: ctx,
BasicCluster: core.NewBasicCluster(),
IDAllocator: mockid.NewIDAllocator(),
Expand All @@ -72,6 +74,16 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
PersistOptions: opts,
suspectRegions: map[uint64]struct{}{},
Storage: storage.NewStorageWithMemoryBackend(),
=======
ctx: ctx,
BasicCluster: bc,
IDAllocator: mockid.NewIDAllocator(),
HotStat: statistics.NewHotStat(ctx, bc),
HotBucketCache: buckets.NewBucketsCache(ctx),
PersistOptions: opts,
pendingProcessedRegions: map[uint64]struct{}{},
Storage: storage.NewStorageWithMemoryBackend(),
>>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702))
}
if c.PersistOptions.GetReplicationConfig().EnablePlacementRules {
c.initRuleManager()
Expand Down
27 changes: 27 additions & 0 deletions pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1657,7 +1657,13 @@ func TestHotCacheUpdateCache(t *testing.T) {
re := require.New(t)
cancel, _, tc, _ := prepareSchedulersTest()
defer cancel()
<<<<<<< HEAD
tc.SetHotRegionCacheHitsThreshold(0)
=======
for i := range 3 {
tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)}))
}
>>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702))

// For read flow
addRegionInfo(tc, utils.Read, []testRegionInfo{
Expand Down Expand Up @@ -1724,7 +1730,13 @@ func TestHotCacheKeyThresholds(t *testing.T) {
{ // only a few regions
cancel, _, tc, _ := prepareSchedulersTest()
defer cancel()
<<<<<<< HEAD
tc.SetHotRegionCacheHitsThreshold(0)
=======
for i := range 6 {
tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)}))
}
>>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702))
addRegionInfo(tc, utils.Read, []testRegionInfo{
{1, []uint64{1, 2, 3}, 0, 1, 0},
{2, []uint64{1, 2, 3}, 0, 1 * units.KiB, 0},
Expand All @@ -1743,6 +1755,9 @@ func TestHotCacheKeyThresholds(t *testing.T) {
{ // many regions
cancel, _, tc, _ := prepareSchedulersTest()
defer cancel()
for i := range 3 {
tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)}))
}
regions := []testRegionInfo{}
for i := 1; i <= 1000; i += 2 {
regions = append(regions,
Expand Down Expand Up @@ -1796,7 +1811,13 @@ func TestHotCacheByteAndKey(t *testing.T) {
re := require.New(t)
cancel, _, tc, _ := prepareSchedulersTest()
defer cancel()
<<<<<<< HEAD
tc.SetHotRegionCacheHitsThreshold(0)
=======
for i := range 3 {
tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)}))
}
>>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702))
statistics.ThresholdsUpdateInterval = 0
defer func() {
statistics.ThresholdsUpdateInterval = 8 * time.Second
Expand Down Expand Up @@ -1923,6 +1944,9 @@ func TestHotCacheCheckRegionFlow(t *testing.T) {
func checkHotCacheCheckRegionFlow(re *require.Assertions, testCase testHotCacheCheckRegionFlowCase, enablePlacementRules bool) {
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
for i := range 3 {
tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)}))
}
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetEnablePlacementRules(enablePlacementRules)
labels := []string{"zone", "host"}
Expand Down Expand Up @@ -1998,6 +2022,9 @@ func TestHotCacheCheckRegionFlowWithDifferentThreshold(t *testing.T) {
func checkHotCacheCheckRegionFlowWithDifferentThreshold(re *require.Assertions, enablePlacementRules bool) {
cancel, _, tc, _ := prepareSchedulersTest()
defer cancel()
for i := range 3 {
tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)}))
}
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetEnablePlacementRules(enablePlacementRules)
labels := []string{"zone", "host"}
Expand Down
6 changes: 3 additions & 3 deletions pkg/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ type HotCache struct {
}

// NewHotCache creates a new hot spot cache.
func NewHotCache(ctx context.Context) *HotCache {
func NewHotCache(ctx context.Context, cluster *core.BasicCluster) *HotCache {
w := &HotCache{
ctx: ctx,
writeCache: NewHotPeerCache(ctx, utils.Write),
readCache: NewHotPeerCache(ctx, utils.Read),
writeCache: NewHotPeerCache(ctx, cluster, utils.Write),
readCache: NewHotPeerCache(ctx, cluster, utils.Read),
}
go w.updateItems(w.readCache.taskQueue, w.runReadTask)
go w.updateItems(w.writeCache.taskQueue, w.runWriteTask)
Expand Down
40 changes: 40 additions & 0 deletions pkg/statistics/hot_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package statistics

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/statistics/utils"
)

func TestIsHot(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for i := utils.RWType(0); i < utils.RWTypeLen; i++ {
cluster := core.NewBasicCluster()
cache := NewHotCache(ctx, cluster)
region := buildRegion(cluster, i, 3, 60)
stats := cache.CheckReadPeerSync(region, region.GetPeers(), []float64{100000000, 1000, 1000}, 60)
cache.Update(stats[0], i)
for range 100 {
re.True(cache.IsRegionHot(region, 1))
}
}
}
41 changes: 40 additions & 1 deletion pkg/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,28 @@ type thresholds struct {
// hotPeerCache saves the hot peer's statistics.
type hotPeerCache struct {
kind utils.RWType
cluster *core.BasicCluster
peersOfStore map[uint64]*utils.TopN // storeID -> hot peers
storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs
regionsOfStore map[uint64]map[uint64]struct{} // storeID -> regionIDs
topNTTL time.Duration
taskQueue *chanx.UnboundedChan[FlowItemTask]
thresholdsOfStore map[uint64]*thresholds // storeID -> thresholds
metrics map[uint64][utils.ActionTypeLen]prometheus.Gauge // storeID -> metrics
// TODO: consider to remove store info when store is offline.
lastGCTime time.Time
}

<<<<<<< HEAD
// NewHotPeerCache creates a hotPeerCache
func NewHotPeerCache(ctx context.Context, kind utils.RWType) *hotPeerCache {
return &hotPeerCache{
=======
// NewHotPeerCache creates a HotPeerCache
func NewHotPeerCache(ctx context.Context, cluster *core.BasicCluster, kind utils.RWType) *HotPeerCache {
return &HotPeerCache{
>>>>>>> 20087e290 (statistics: add gc in hot peer cache (#8702))
kind: kind,
cluster: cluster,
peersOfStore: make(map[uint64]*utils.TopN),
storesOfRegion: make(map[uint64]map[uint64]struct{}),
regionsOfStore: make(map[uint64]map[uint64]struct{}),
Expand Down Expand Up @@ -114,6 +122,7 @@ func (f *hotPeerCache) updateStat(item *HotPeerStat) {
return
}
f.incMetrics(item.actionType, item.StoreID)
f.gc()
}

func (f *hotPeerCache) incMetrics(action utils.ActionType, storeID uint64) {
Expand Down Expand Up @@ -544,6 +553,36 @@ func (f *hotPeerCache) removeItem(item *HotPeerStat) {
}
}

func (f *HotPeerCache) gc() {
if time.Since(f.lastGCTime) < f.topNTTL {
return
}
f.lastGCTime = time.Now()
// remove tombstone stores
stores := make(map[uint64]struct{})
for _, storeID := range f.cluster.GetStores() {
stores[storeID.GetID()] = struct{}{}
}
for storeID := range f.peersOfStore {
if _, ok := stores[storeID]; !ok {
delete(f.peersOfStore, storeID)
delete(f.regionsOfStore, storeID)
delete(f.thresholdsOfStore, storeID)
delete(f.metrics, storeID)
}
}
// remove expired items
for _, peers := range f.peersOfStore {
regions := peers.RemoveExpired()
for _, regionID := range regions {
delete(f.storesOfRegion, regionID)
for storeID := range f.regionsOfStore {
delete(f.regionsOfStore[storeID], regionID)
}
}
}
}

// removeAllItem removes all items of the cache.
// It is used for test.
func (f *hotPeerCache) removeAllItem() {
Expand Down
Loading

0 comments on commit 70bb1e1

Please sign in to comment.