diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index 627408e6c43..4704996f7d9 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -17,6 +17,7 @@ package checker import ( "bytes" "context" + "strconv" "time" "github.com/pingcap/failpoint" @@ -35,12 +36,16 @@ import ( ) const ( + suspectRegionLimit = 1024 checkSuspectRangesInterval = 100 * time.Millisecond // DefaultPendingRegionCacheSize is the default length of waiting list. DefaultPendingRegionCacheSize = 100000 - // It takes about 1.3 minutes(1000000/128*10/60/1000) to iterate 1 million regions(with DefaultPatrolRegionInterval=10ms). - patrolScanRegionLimit = 128 - suspectRegionLimit = 1024 + // For 1,024,000 regions, patrolRegionScanLimit is 1000, which is max(MinPatrolRegionScanLimit, 1,024,000/patrolRegionPartition) + // In order to avoid the patrolRegionScanLimit to be too big or too small, it will be limited to [128,8192]. + // It takes about 10s to iterate 1,024,000 regions(with DefaultPatrolRegionInterval=10ms) where other steps are not considered. + MinPatrolRegionScanLimit = 128 + MaxPatrolScanRegionLimit = 8192 + patrolRegionPartition = 1024 ) var ( @@ -76,6 +81,9 @@ type Controller struct { // It's used to update the ticker, so we need to // record it to avoid updating the ticker frequently. interval time.Duration + // patrolRegionScanLimit is the limit of regions to scan. + // It is calculated by the number of regions. + patrolRegionScanLimit int } // NewController create a new Controller. @@ -96,6 +104,7 @@ func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config pendingProcessedRegions: pendingProcessedRegions, suspectKeyRanges: cache.NewStringTTL(ctx, time.Minute, 3*time.Minute), interval: cluster.GetCheckerConfig().GetPatrolRegionInterval(), + patrolRegionScanLimit: calculateScanLimit(cluster), } } @@ -113,36 +122,40 @@ func (c *Controller) PatrolRegions() { select { case <-ticker.C: c.updateTickerIfNeeded(ticker) + if c.cluster.IsSchedulingHalted() { + log.Debug("skip patrol regions due to scheduling is halted") + continue + } + + // Check priority regions first. + c.checkPriorityRegions() + // Check pending processed regions first. + c.checkPendingProcessedRegions() + + key, regions = c.checkRegions(key) + if len(regions) == 0 { + continue + } + // Updates the label level isolation statistics. + c.cluster.UpdateRegionsLabelLevelStats(regions) + // When the key is nil, it means that the scan is finished. + if len(key) == 0 { + // update the scan limit. + c.patrolRegionScanLimit = calculateScanLimit(c.cluster) + // update the metrics. + dur := time.Since(start) + patrolCheckRegionsGauge.Set(dur.Seconds()) + c.setPatrolRegionsDuration(dur) + start = time.Now() + } + failpoint.Inject("breakPatrol", func() { + failpoint.Return() + }) case <-c.ctx.Done(): patrolCheckRegionsGauge.Set(0) c.setPatrolRegionsDuration(0) return } - if c.cluster.IsSchedulingHalted() { - continue - } - - // Check priority regions first. - c.checkPriorityRegions() - // Check pending processed regions first. - c.checkPendingProcessedRegions() - - key, regions = c.checkRegions(key) - if len(regions) == 0 { - continue - } - // Updates the label level isolation statistics. - c.cluster.UpdateRegionsLabelLevelStats(regions) - // When the key is nil, it means that the scan is finished. - if len(key) == 0 { - dur := time.Since(start) - patrolCheckRegionsGauge.Set(dur.Seconds()) - c.setPatrolRegionsDuration(dur) - start = time.Now() - } - failpoint.Inject("breakPatrol", func() { - failpoint.Break() - }) } } @@ -160,7 +173,7 @@ func (c *Controller) setPatrolRegionsDuration(dur time.Duration) { } func (c *Controller) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) { - regions = c.cluster.ScanRegions(startKey, nil, patrolScanRegionLimit) + regions = c.cluster.ScanRegions(startKey, nil, c.patrolRegionScanLimit) if len(regions) == 0 { // Resets the scan key. key = nil @@ -439,3 +452,19 @@ func (c *Controller) updateTickerIfNeeded(ticker *time.Ticker) { log.Info("checkers starts patrol regions with new interval", zap.Duration("interval", newInterval)) } } + +// GetPatrolRegionScanLimit returns the limit of regions to scan. +// It only used for test. +func (c *Controller) GetPatrolRegionScanLimit() int { + return c.patrolRegionScanLimit +} + +func calculateScanLimit(cluster sche.CheckerCluster) int { + regionCount := cluster.GetTotalRegionCount() + failpoint.Inject("regionCount", func(val failpoint.Value) { + c, _ := strconv.ParseInt(val.(string), 10, 64) + regionCount = int(c) + }) + scanlimit := max(MinPatrolRegionScanLimit, regionCount/patrolRegionPartition) + return min(scanlimit, MaxPatrolScanRegionLimit) +} diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index cd7f94e001a..f33354b9668 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -44,6 +44,7 @@ import ( "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/progress" "github.com/tikv/pd/pkg/schedule" + "github.com/tikv/pd/pkg/schedule/checker" sc "github.com/tikv/pd/pkg/schedule/config" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/filter" @@ -1849,7 +1850,7 @@ func Test(t *testing.T) { for i := uint64(0); i < n; i++ { region := regions[i] - regionKey := []byte{byte(i)} + regionKey := []byte(fmt.Sprintf("a%20d", i+1)) re.Nil(cache.GetRegion(i)) re.Nil(cache.GetRegionByKey(regionKey)) @@ -2183,7 +2184,7 @@ func newTestRegions(n, m, np uint64) []*core.RegionInfo { peers := make([]*metapb.Peer, 0, np) for j := uint64(0); j < np; j++ { peer := &metapb.Peer{ - Id: i*np + j, + Id: 100000000 + i*np + j, } peer.StoreId = (i + j) % m peers = append(peers, peer) @@ -2191,8 +2192,8 @@ func newTestRegions(n, m, np uint64) []*core.RegionInfo { region := &metapb.Region{ Id: i, Peers: peers, - StartKey: []byte{byte(i)}, - EndKey: []byte{byte(i + 1)}, + StartKey: []byte(fmt.Sprintf("a%20d", i+1)), + EndKey: []byte(fmt.Sprintf("a%20d", i+2)), RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 2}, } regions = append(regions, core.NewRegionInfo(region, peers[0], core.SetApproximateSize(100), core.SetApproximateKeys(1000))) @@ -2880,6 +2881,55 @@ func TestCheckCache(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol")) } +func TestScanLimit(t *testing.T) { + re := require.New(t) + + checkScanLimit(re, 1000, checker.MinPatrolRegionScanLimit) + checkScanLimit(re, 10000) + checkScanLimit(re, 100000) + checkScanLimit(re, 1000000) + checkScanLimit(re, 10000000, checker.MaxPatrolScanRegionLimit) +} + +func checkScanLimit(re *require.Assertions, regionCount int, expectScanLimit ...int) { + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol", `return`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/regionCount", fmt.Sprintf("return(\"%d\")", regionCount))) + defer func() { + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/checker/regionCount")) + }() + + re.NoError(tc.addRegionStore(1, 0)) + re.NoError(tc.addRegionStore(2, 0)) + re.NoError(tc.addRegionStore(3, 0)) + regions := newTestRegions(10, 3, 3) + for i, region := range regions { + if i == 0 { + region.GetMeta().StartKey = []byte("") + } + if i == len(regions)-1 { + region.GetMeta().EndKey = []byte("") + } + re.NoError(tc.putRegion(region)) + } + + co.GetWaitGroup().Add(1) + co.PatrolRegions() + defer func() { + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + }() + + limit := co.GetCheckerController().GetPatrolRegionScanLimit() + re.LessOrEqual(checker.MinPatrolRegionScanLimit, limit) + re.GreaterOrEqual(checker.MaxPatrolScanRegionLimit, limit) + if len(expectScanLimit) > 0 { + re.Equal(expectScanLimit[0], limit) + } +} + func TestPeerState(t *testing.T) { re := require.New(t) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 5af750a3c2c..c95aa50cb3d 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -18,7 +18,9 @@ import ( "context" "fmt" "math" + "os" "strconv" + "strings" "sync" "testing" "time" @@ -1819,3 +1821,55 @@ func TestExternalTimestamp(t *testing.T) { re.Equal(ts, resp4.GetTimestamp()) } } + +func TestPatrolRegionConfigChange(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tc, err := tests.NewTestCluster(ctx, 1) + defer tc.Destroy() + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + tc.WaitLeader() + leaderServer := tc.GetLeaderServer() + re.NoError(leaderServer.BootstrapCluster()) + for i := 1; i <= 3; i++ { + store := &metapb.Store{ + Id: uint64(i), + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + } + tests.MustPutStore(re, tc, store) + } + for i := 1; i <= 200; i++ { + startKey := []byte(fmt.Sprintf("%d", i*2-1)) + endKey := []byte(fmt.Sprintf("%d", i*2)) + tests.MustPutRegion(re, tc, uint64(i), uint64(i%3+1), startKey, endKey) + } + fname := testutil.InitTempFileLogger("debug") + defer os.RemoveAll(fname) + checkLog(re, fname, "coordinator starts patrol regions") + + // test change patrol region interval + schedule := leaderServer.GetConfig().Schedule + schedule.PatrolRegionInterval = typeutil.NewDuration(99 * time.Millisecond) + leaderServer.GetServer().SetScheduleConfig(schedule) + checkLog(re, fname, "starts patrol regions with new interval") + + // test change schedule halt + schedule = leaderServer.GetConfig().Schedule + schedule.HaltScheduling = true + leaderServer.GetServer().SetScheduleConfig(schedule) + checkLog(re, fname, "skip patrol regions due to scheduling is halted") +} + +func checkLog(re *require.Assertions, fname, expect string) { + testutil.Eventually(re, func() bool { + b, _ := os.ReadFile(fname) + l := string(b) + return strings.Contains(l, expect) + }) + os.Truncate(fname, 0) +}