Skip to content

Commit

Permalink
use ttl cache
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Aug 8, 2024
1 parent 7ef2be0 commit e552487
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 23 deletions.
13 changes: 6 additions & 7 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type Controller struct {
mergeChecker *MergeChecker
jointStateChecker *JointStateChecker
priorityInspector *PriorityInspector
pendingProcessedRegions cache.Cache
pendingProcessedRegions *cache.TTLUint64
suspectKeyRanges *cache.TTLString // suspect key-range regions that may need fix

// duration is the duration of the last patrol round.
Expand All @@ -88,7 +88,7 @@ type Controller struct {

// NewController create a new Controller.
func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller {
pendingProcessedRegions := cache.NewDefaultCache(DefaultPendingRegionCacheSize)
pendingProcessedRegions := cache.NewIDTTL(ctx, time.Minute, 3*time.Minute)
return &Controller{
ctx: ctx,
cluster: cluster,
Expand Down Expand Up @@ -327,16 +327,15 @@ func (c *Controller) GetRuleChecker() *RuleChecker {

// GetPendingProcessedRegions returns the pending processed regions in the cache.
func (c *Controller) GetPendingProcessedRegions() []uint64 {
pendingRegions := make([]uint64, 0)
for _, item := range c.pendingProcessedRegions.Elems() {
pendingRegions = append(pendingRegions, item.Key)
}
return pendingRegions
return c.pendingProcessedRegions.GetAllID()
}

// AddPendingProcessedRegions adds the pending processed region into the cache.
func (c *Controller) AddPendingProcessedRegions(ids ...uint64) {
for _, id := range ids {
if c.pendingProcessedRegions.Len() > DefaultPendingRegionCacheSize {
return

Check warning on line 337 in pkg/schedule/checker/checker_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/checker/checker_controller.go#L337

Added line #L337 was not covered by tests
}
c.pendingProcessedRegions.Put(id, nil)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ type ReplicaChecker struct {
PauseController
cluster sche.CheckerCluster
conf config.CheckerConfigProvider
pendingProcessedRegions cache.Cache
pendingProcessedRegions *cache.TTLUint64
}

// NewReplicaChecker creates a replica checker.
func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigProvider, pendingProcessedRegions cache.Cache) *ReplicaChecker {
func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigProvider, pendingProcessedRegions *cache.TTLUint64) *ReplicaChecker {
return &ReplicaChecker{
cluster: cluster,
conf: conf,
Expand Down
20 changes: 10 additions & 10 deletions pkg/schedule/checker/replica_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (suite *replicaCheckerTestSuite) SetupTest() {
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster = mockcluster.NewCluster(suite.ctx, cfg)
suite.cluster.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
suite.rc = NewReplicaChecker(suite.cluster, suite.cluster.GetCheckerConfig(), cache.NewDefaultCache(10))
suite.rc = NewReplicaChecker(suite.cluster, suite.cluster.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))
stats := &pdpb.StoreStats{
Capacity: 100,
Available: 100,
Expand Down Expand Up @@ -213,7 +213,7 @@ func (suite *replicaCheckerTestSuite) TestBasic() {
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetMaxSnapshotCount(2)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

// Add stores 1,2,3,4.
tc.AddRegionStore(1, 4)
Expand Down Expand Up @@ -290,7 +290,7 @@ func (suite *replicaCheckerTestSuite) TestLostStore() {
tc.AddRegionStore(1, 1)
tc.AddRegionStore(2, 1)

rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

// now region peer in store 1,2,3.but we just have store 1,2
// This happens only in recovering the PD tc
Expand All @@ -309,7 +309,7 @@ func (suite *replicaCheckerTestSuite) TestOffline() {
tc.SetMaxReplicas(3)
tc.SetLocationLabels([]string{"zone", "rack", "host"})

rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))
tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
tc.AddLabelsStore(2, 2, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"})
tc.AddLabelsStore(3, 3, map[string]string{"zone": "z3", "rack": "r1", "host": "h1"})
Expand Down Expand Up @@ -361,7 +361,7 @@ func (suite *replicaCheckerTestSuite) TestDistinctScore() {
tc.SetMaxReplicas(3)
tc.SetLocationLabels([]string{"zone", "rack", "host"})

rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

tc.AddLabelsStore(1, 9, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
tc.AddLabelsStore(2, 8, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
Expand Down Expand Up @@ -441,7 +441,7 @@ func (suite *replicaCheckerTestSuite) TestDistinctScore2() {
tc.SetMaxReplicas(5)
tc.SetLocationLabels([]string{"zone", "host"})

rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "host": "h1"})
tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1", "host": "h2"})
Expand Down Expand Up @@ -470,7 +470,7 @@ func (suite *replicaCheckerTestSuite) TestStorageThreshold() {
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetLocationLabels([]string{"zone"})
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"})
tc.UpdateStorageRatio(1, 0.5, 0.5)
Expand Down Expand Up @@ -506,7 +506,7 @@ func (suite *replicaCheckerTestSuite) TestOpts() {
opt := mockconfig.NewTestOptions()
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

tc.AddRegionStore(1, 100)
tc.AddRegionStore(2, 100)
Expand Down Expand Up @@ -539,7 +539,7 @@ func (suite *replicaCheckerTestSuite) TestFixDownPeer() {
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetLocationLabels([]string{"zone"})
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"})
tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1"})
Expand Down Expand Up @@ -571,7 +571,7 @@ func (suite *replicaCheckerTestSuite) TestFixOfflinePeer() {
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetLocationLabels([]string{"zone"})
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))

tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"})
tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1"})
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ type RuleChecker struct {
PauseController
cluster sche.CheckerCluster
ruleManager *placement.RuleManager
pendingProcessedRegions cache.Cache
pendingProcessedRegions *cache.TTLUint64
pendingList cache.Cache
switchWitnessCache *cache.TTLUint64
record *recorder
}

// NewRuleChecker creates a checker instance.
func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManager *placement.RuleManager, pendingProcessedRegions cache.Cache) *RuleChecker {
func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManager *placement.RuleManager, pendingProcessedRegions *cache.TTLUint64) *RuleChecker {
return &RuleChecker{
cluster: cluster,
ruleManager: ruleManager,
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/rule_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (suite *ruleCheckerTestSuite) SetupTest() {
suite.cluster.SetEnableWitness(true)
suite.cluster.SetEnableUseJointConsensus(false)
suite.ruleManager = suite.cluster.RuleManager
suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewDefaultCache(10))
suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))
}

func (suite *ruleCheckerTestSuite) TearDownTest() {
Expand Down Expand Up @@ -1955,7 +1955,7 @@ func (suite *ruleCheckerTestAdvancedSuite) SetupTest() {
suite.cluster.SetEnableWitness(true)
suite.cluster.SetEnableUseJointConsensus(true)
suite.ruleManager = suite.cluster.RuleManager
suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewDefaultCache(10))
suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute))
}

func (suite *ruleCheckerTestAdvancedSuite) TearDownTest() {
Expand Down

0 comments on commit e552487

Please sign in to comment.