From e5524877ab669ff6cc4f2fa84e7707a8da34fdc4 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 8 Aug 2024 10:39:03 +0800 Subject: [PATCH] use ttl cache Signed-off-by: Ryan Leung --- pkg/schedule/checker/checker_controller.go | 13 ++++++------- pkg/schedule/checker/replica_checker.go | 4 ++-- pkg/schedule/checker/replica_checker_test.go | 20 ++++++++++---------- pkg/schedule/checker/rule_checker.go | 4 ++-- pkg/schedule/checker/rule_checker_test.go | 4 ++-- 5 files changed, 22 insertions(+), 23 deletions(-) diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index 4704996f7d9..da36c8db74f 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -68,7 +68,7 @@ type Controller struct { mergeChecker *MergeChecker jointStateChecker *JointStateChecker priorityInspector *PriorityInspector - pendingProcessedRegions cache.Cache + pendingProcessedRegions *cache.TTLUint64 suspectKeyRanges *cache.TTLString // suspect key-range regions that may need fix // duration is the duration of the last patrol round. @@ -88,7 +88,7 @@ type Controller struct { // NewController create a new Controller. func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller { - pendingProcessedRegions := cache.NewDefaultCache(DefaultPendingRegionCacheSize) + pendingProcessedRegions := cache.NewIDTTL(ctx, time.Minute, 3*time.Minute) return &Controller{ ctx: ctx, cluster: cluster, @@ -327,16 +327,15 @@ func (c *Controller) GetRuleChecker() *RuleChecker { // GetPendingProcessedRegions returns the pending processed regions in the cache. func (c *Controller) GetPendingProcessedRegions() []uint64 { - pendingRegions := make([]uint64, 0) - for _, item := range c.pendingProcessedRegions.Elems() { - pendingRegions = append(pendingRegions, item.Key) - } - return pendingRegions + return c.pendingProcessedRegions.GetAllID() } // AddPendingProcessedRegions adds the pending processed region into the cache. func (c *Controller) AddPendingProcessedRegions(ids ...uint64) { for _, id := range ids { + if c.pendingProcessedRegions.Len() > DefaultPendingRegionCacheSize { + return + } c.pendingProcessedRegions.Put(id, nil) } } diff --git a/pkg/schedule/checker/replica_checker.go b/pkg/schedule/checker/replica_checker.go index 6be5432125b..b0c42e88258 100644 --- a/pkg/schedule/checker/replica_checker.go +++ b/pkg/schedule/checker/replica_checker.go @@ -44,11 +44,11 @@ type ReplicaChecker struct { PauseController cluster sche.CheckerCluster conf config.CheckerConfigProvider - pendingProcessedRegions cache.Cache + pendingProcessedRegions *cache.TTLUint64 } // NewReplicaChecker creates a replica checker. -func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigProvider, pendingProcessedRegions cache.Cache) *ReplicaChecker { +func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigProvider, pendingProcessedRegions *cache.TTLUint64) *ReplicaChecker { return &ReplicaChecker{ cluster: cluster, conf: conf, diff --git a/pkg/schedule/checker/replica_checker_test.go b/pkg/schedule/checker/replica_checker_test.go index a9139ee9804..da04fb6d768 100644 --- a/pkg/schedule/checker/replica_checker_test.go +++ b/pkg/schedule/checker/replica_checker_test.go @@ -51,7 +51,7 @@ func (suite *replicaCheckerTestSuite) SetupTest() { suite.ctx, suite.cancel = context.WithCancel(context.Background()) suite.cluster = mockcluster.NewCluster(suite.ctx, cfg) suite.cluster.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - suite.rc = NewReplicaChecker(suite.cluster, suite.cluster.GetCheckerConfig(), cache.NewDefaultCache(10)) + suite.rc = NewReplicaChecker(suite.cluster, suite.cluster.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) stats := &pdpb.StoreStats{ Capacity: 100, Available: 100, @@ -213,7 +213,7 @@ func (suite *replicaCheckerTestSuite) TestBasic() { tc := mockcluster.NewCluster(suite.ctx, opt) tc.SetMaxSnapshotCount(2) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10)) + rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) // Add stores 1,2,3,4. tc.AddRegionStore(1, 4) @@ -290,7 +290,7 @@ func (suite *replicaCheckerTestSuite) TestLostStore() { tc.AddRegionStore(1, 1) tc.AddRegionStore(2, 1) - rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10)) + rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) // now region peer in store 1,2,3.but we just have store 1,2 // This happens only in recovering the PD tc @@ -309,7 +309,7 @@ func (suite *replicaCheckerTestSuite) TestOffline() { tc.SetMaxReplicas(3) tc.SetLocationLabels([]string{"zone", "rack", "host"}) - rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10)) + rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) tc.AddLabelsStore(2, 2, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"}) tc.AddLabelsStore(3, 3, map[string]string{"zone": "z3", "rack": "r1", "host": "h1"}) @@ -361,7 +361,7 @@ func (suite *replicaCheckerTestSuite) TestDistinctScore() { tc.SetMaxReplicas(3) tc.SetLocationLabels([]string{"zone", "rack", "host"}) - rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10)) + rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) tc.AddLabelsStore(1, 9, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) tc.AddLabelsStore(2, 8, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) @@ -441,7 +441,7 @@ func (suite *replicaCheckerTestSuite) TestDistinctScore2() { tc.SetMaxReplicas(5) tc.SetLocationLabels([]string{"zone", "host"}) - rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10)) + rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "host": "h1"}) tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1", "host": "h2"}) @@ -470,7 +470,7 @@ func (suite *replicaCheckerTestSuite) TestStorageThreshold() { tc := mockcluster.NewCluster(suite.ctx, opt) tc.SetLocationLabels([]string{"zone"}) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10)) + rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) tc.UpdateStorageRatio(1, 0.5, 0.5) @@ -506,7 +506,7 @@ func (suite *replicaCheckerTestSuite) TestOpts() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10)) + rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) tc.AddRegionStore(1, 100) tc.AddRegionStore(2, 100) @@ -539,7 +539,7 @@ func (suite *replicaCheckerTestSuite) TestFixDownPeer() { tc := mockcluster.NewCluster(suite.ctx, opt) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetLocationLabels([]string{"zone"}) - rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10)) + rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1"}) @@ -571,7 +571,7 @@ func (suite *replicaCheckerTestSuite) TestFixOfflinePeer() { tc := mockcluster.NewCluster(suite.ctx, opt) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetLocationLabels([]string{"zone"}) - rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10)) + rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1"}) diff --git a/pkg/schedule/checker/rule_checker.go b/pkg/schedule/checker/rule_checker.go index a8acb002951..e29cd2bc05b 100644 --- a/pkg/schedule/checker/rule_checker.go +++ b/pkg/schedule/checker/rule_checker.go @@ -52,14 +52,14 @@ type RuleChecker struct { PauseController cluster sche.CheckerCluster ruleManager *placement.RuleManager - pendingProcessedRegions cache.Cache + pendingProcessedRegions *cache.TTLUint64 pendingList cache.Cache switchWitnessCache *cache.TTLUint64 record *recorder } // NewRuleChecker creates a checker instance. -func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManager *placement.RuleManager, pendingProcessedRegions cache.Cache) *RuleChecker { +func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManager *placement.RuleManager, pendingProcessedRegions *cache.TTLUint64) *RuleChecker { return &RuleChecker{ cluster: cluster, ruleManager: ruleManager, diff --git a/pkg/schedule/checker/rule_checker_test.go b/pkg/schedule/checker/rule_checker_test.go index f99208a988b..b24a95e2ade 100644 --- a/pkg/schedule/checker/rule_checker_test.go +++ b/pkg/schedule/checker/rule_checker_test.go @@ -62,7 +62,7 @@ func (suite *ruleCheckerTestSuite) SetupTest() { suite.cluster.SetEnableWitness(true) suite.cluster.SetEnableUseJointConsensus(false) suite.ruleManager = suite.cluster.RuleManager - suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewDefaultCache(10)) + suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) } func (suite *ruleCheckerTestSuite) TearDownTest() { @@ -1955,7 +1955,7 @@ func (suite *ruleCheckerTestAdvancedSuite) SetupTest() { suite.cluster.SetEnableWitness(true) suite.cluster.SetEnableUseJointConsensus(true) suite.ruleManager = suite.cluster.RuleManager - suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewDefaultCache(10)) + suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) } func (suite *ruleCheckerTestAdvancedSuite) TearDownTest() {