Skip to content

Commit

Permalink
schedule: adjust patrol region scan limit according to the num of re…
Browse files Browse the repository at this point in the history
…gions (tikv#8479)

ref tikv#7963

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] authored Aug 6, 2024
1 parent c8ad186 commit 5e0fdb8
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 33 deletions.
87 changes: 58 additions & 29 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package checker
import (
"bytes"
"context"
"strconv"
"time"

"github.com/pingcap/failpoint"
Expand All @@ -35,12 +36,16 @@ import (
)

const (
suspectRegionLimit = 1024
checkSuspectRangesInterval = 100 * time.Millisecond
// DefaultPendingRegionCacheSize is the default length of waiting list.
DefaultPendingRegionCacheSize = 100000
// It takes about 1.3 minutes(1000000/128*10/60/1000) to iterate 1 million regions(with DefaultPatrolRegionInterval=10ms).
patrolScanRegionLimit = 128
suspectRegionLimit = 1024
// For 1,024,000 regions, patrolRegionScanLimit is 1000, which is max(MinPatrolRegionScanLimit, 1,024,000/patrolRegionPartition)
// In order to avoid the patrolRegionScanLimit to be too big or too small, it will be limited to [128,8192].
// It takes about 10s to iterate 1,024,000 regions(with DefaultPatrolRegionInterval=10ms) where other steps are not considered.
MinPatrolRegionScanLimit = 128
MaxPatrolScanRegionLimit = 8192
patrolRegionPartition = 1024
)

var (
Expand Down Expand Up @@ -76,6 +81,9 @@ type Controller struct {
// It's used to update the ticker, so we need to
// record it to avoid updating the ticker frequently.
interval time.Duration
// patrolRegionScanLimit is the limit of regions to scan.
// It is calculated by the number of regions.
patrolRegionScanLimit int
}

// NewController create a new Controller.
Expand All @@ -96,6 +104,7 @@ func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config
pendingProcessedRegions: pendingProcessedRegions,
suspectKeyRanges: cache.NewStringTTL(ctx, time.Minute, 3*time.Minute),
interval: cluster.GetCheckerConfig().GetPatrolRegionInterval(),
patrolRegionScanLimit: calculateScanLimit(cluster),
}
}

Expand All @@ -113,36 +122,40 @@ func (c *Controller) PatrolRegions() {
select {
case <-ticker.C:
c.updateTickerIfNeeded(ticker)
if c.cluster.IsSchedulingHalted() {
log.Debug("skip patrol regions due to scheduling is halted")
continue
}

// Check priority regions first.
c.checkPriorityRegions()
// Check pending processed regions first.
c.checkPendingProcessedRegions()

key, regions = c.checkRegions(key)
if len(regions) == 0 {
continue
}
// Updates the label level isolation statistics.
c.cluster.UpdateRegionsLabelLevelStats(regions)
// When the key is nil, it means that the scan is finished.
if len(key) == 0 {
// update the scan limit.
c.patrolRegionScanLimit = calculateScanLimit(c.cluster)
// update the metrics.
dur := time.Since(start)
patrolCheckRegionsGauge.Set(dur.Seconds())
c.setPatrolRegionsDuration(dur)
start = time.Now()
}
failpoint.Inject("breakPatrol", func() {
failpoint.Return()
})
case <-c.ctx.Done():
patrolCheckRegionsGauge.Set(0)
c.setPatrolRegionsDuration(0)
return
}
if c.cluster.IsSchedulingHalted() {
continue
}

// Check priority regions first.
c.checkPriorityRegions()
// Check pending processed regions first.
c.checkPendingProcessedRegions()

key, regions = c.checkRegions(key)
if len(regions) == 0 {
continue
}
// Updates the label level isolation statistics.
c.cluster.UpdateRegionsLabelLevelStats(regions)
// When the key is nil, it means that the scan is finished.
if len(key) == 0 {
dur := time.Since(start)
patrolCheckRegionsGauge.Set(dur.Seconds())
c.setPatrolRegionsDuration(dur)
start = time.Now()
}
failpoint.Inject("breakPatrol", func() {
failpoint.Break()
})
}
}

Expand All @@ -160,7 +173,7 @@ func (c *Controller) setPatrolRegionsDuration(dur time.Duration) {
}

func (c *Controller) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) {
regions = c.cluster.ScanRegions(startKey, nil, patrolScanRegionLimit)
regions = c.cluster.ScanRegions(startKey, nil, c.patrolRegionScanLimit)
if len(regions) == 0 {
// Resets the scan key.
key = nil
Expand Down Expand Up @@ -439,3 +452,19 @@ func (c *Controller) updateTickerIfNeeded(ticker *time.Ticker) {
log.Info("checkers starts patrol regions with new interval", zap.Duration("interval", newInterval))
}
}

// GetPatrolRegionScanLimit returns the limit of regions to scan.
// It only used for test.
func (c *Controller) GetPatrolRegionScanLimit() int {
return c.patrolRegionScanLimit
}

func calculateScanLimit(cluster sche.CheckerCluster) int {
regionCount := cluster.GetTotalRegionCount()
failpoint.Inject("regionCount", func(val failpoint.Value) {
c, _ := strconv.ParseInt(val.(string), 10, 64)
regionCount = int(c)
})
scanlimit := max(MinPatrolRegionScanLimit, regionCount/patrolRegionPartition)
return min(scanlimit, MaxPatrolScanRegionLimit)
}
58 changes: 54 additions & 4 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/progress"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/checker"
sc "github.com/tikv/pd/pkg/schedule/config"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
Expand Down Expand Up @@ -1849,7 +1850,7 @@ func Test(t *testing.T) {

for i := uint64(0); i < n; i++ {
region := regions[i]
regionKey := []byte{byte(i)}
regionKey := []byte(fmt.Sprintf("a%20d", i+1))

re.Nil(cache.GetRegion(i))
re.Nil(cache.GetRegionByKey(regionKey))
Expand Down Expand Up @@ -2183,16 +2184,16 @@ func newTestRegions(n, m, np uint64) []*core.RegionInfo {
peers := make([]*metapb.Peer, 0, np)
for j := uint64(0); j < np; j++ {
peer := &metapb.Peer{
Id: i*np + j,
Id: 100000000 + i*np + j,
}
peer.StoreId = (i + j) % m
peers = append(peers, peer)
}
region := &metapb.Region{
Id: i,
Peers: peers,
StartKey: []byte{byte(i)},
EndKey: []byte{byte(i + 1)},
StartKey: []byte(fmt.Sprintf("a%20d", i+1)),
EndKey: []byte(fmt.Sprintf("a%20d", i+2)),
RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 2},
}
regions = append(regions, core.NewRegionInfo(region, peers[0], core.SetApproximateSize(100), core.SetApproximateKeys(1000)))
Expand Down Expand Up @@ -2880,6 +2881,55 @@ func TestCheckCache(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol"))
}

func TestScanLimit(t *testing.T) {
re := require.New(t)

checkScanLimit(re, 1000, checker.MinPatrolRegionScanLimit)
checkScanLimit(re, 10000)
checkScanLimit(re, 100000)
checkScanLimit(re, 1000000)
checkScanLimit(re, 10000000, checker.MaxPatrolScanRegionLimit)
}

func checkScanLimit(re *require.Assertions, regionCount int, expectScanLimit ...int) {
tc, co, cleanup := prepare(nil, nil, nil, re)
defer cleanup()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol", `return`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/regionCount", fmt.Sprintf("return(\"%d\")", regionCount)))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/checker/regionCount"))
}()

re.NoError(tc.addRegionStore(1, 0))
re.NoError(tc.addRegionStore(2, 0))
re.NoError(tc.addRegionStore(3, 0))
regions := newTestRegions(10, 3, 3)
for i, region := range regions {
if i == 0 {
region.GetMeta().StartKey = []byte("")
}
if i == len(regions)-1 {
region.GetMeta().EndKey = []byte("")
}
re.NoError(tc.putRegion(region))
}

co.GetWaitGroup().Add(1)
co.PatrolRegions()
defer func() {
co.GetSchedulersController().Wait()
co.GetWaitGroup().Wait()
}()

limit := co.GetCheckerController().GetPatrolRegionScanLimit()
re.LessOrEqual(checker.MinPatrolRegionScanLimit, limit)
re.GreaterOrEqual(checker.MaxPatrolScanRegionLimit, limit)
if len(expectScanLimit) > 0 {
re.Equal(expectScanLimit[0], limit)
}
}

func TestPeerState(t *testing.T) {
re := require.New(t)

Expand Down
54 changes: 54 additions & 0 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"context"
"fmt"
"math"
"os"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1819,3 +1821,55 @@ func TestExternalTimestamp(t *testing.T) {
re.Equal(ts, resp4.GetTimestamp())
}
}

func TestPatrolRegionConfigChange(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tc, err := tests.NewTestCluster(ctx, 1)
defer tc.Destroy()
re.NoError(err)
err = tc.RunInitialServers()
re.NoError(err)
tc.WaitLeader()
leaderServer := tc.GetLeaderServer()
re.NoError(leaderServer.BootstrapCluster())
for i := 1; i <= 3; i++ {
store := &metapb.Store{
Id: uint64(i),
State: metapb.StoreState_Up,
NodeState: metapb.NodeState_Serving,
LastHeartbeat: time.Now().UnixNano(),
}
tests.MustPutStore(re, tc, store)
}
for i := 1; i <= 200; i++ {
startKey := []byte(fmt.Sprintf("%d", i*2-1))
endKey := []byte(fmt.Sprintf("%d", i*2))
tests.MustPutRegion(re, tc, uint64(i), uint64(i%3+1), startKey, endKey)
}
fname := testutil.InitTempFileLogger("debug")
defer os.RemoveAll(fname)
checkLog(re, fname, "coordinator starts patrol regions")

// test change patrol region interval
schedule := leaderServer.GetConfig().Schedule
schedule.PatrolRegionInterval = typeutil.NewDuration(99 * time.Millisecond)
leaderServer.GetServer().SetScheduleConfig(schedule)
checkLog(re, fname, "starts patrol regions with new interval")

// test change schedule halt
schedule = leaderServer.GetConfig().Schedule
schedule.HaltScheduling = true
leaderServer.GetServer().SetScheduleConfig(schedule)
checkLog(re, fname, "skip patrol regions due to scheduling is halted")
}

func checkLog(re *require.Assertions, fname, expect string) {
testutil.Eventually(re, func() bool {
b, _ := os.ReadFile(fname)
l := string(b)
return strings.Contains(l, expect)
})
os.Truncate(fname, 0)
}

0 comments on commit 5e0fdb8

Please sign in to comment.