Skip to content

Commit

Permalink
Merge branch 'master' into min-ssl-version
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Nov 8, 2024
2 parents 0d98544 + 49307e2 commit dddd2f4
Show file tree
Hide file tree
Showing 36 changed files with 600 additions and 110 deletions.
6 changes: 4 additions & 2 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func initMetrics(constLabels prometheus.Labels) {
}

var (
cmdDurationWait prometheus.Observer
cmdDurationTSOWait prometheus.Observer
cmdDurationTSO prometheus.Observer
cmdDurationTSOAsyncWait prometheus.Observer
cmdDurationGetRegion prometheus.Observer
Expand All @@ -166,6 +166,7 @@ var (
cmdDurationUpdateServiceSafePointV2 prometheus.Observer

cmdFailDurationGetRegion prometheus.Observer
cmdFailDurationTSOWait prometheus.Observer
cmdFailDurationTSO prometheus.Observer
cmdFailDurationGetAllMembers prometheus.Observer
cmdFailDurationGetPrevRegion prometheus.Observer
Expand All @@ -189,7 +190,7 @@ var (

func initCmdDurations() {
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
cmdDurationWait = cmdDuration.WithLabelValues("wait")
cmdDurationTSOWait = cmdDuration.WithLabelValues("wait")
cmdDurationTSO = cmdDuration.WithLabelValues("tso")
cmdDurationTSOAsyncWait = cmdDuration.WithLabelValues("tso_async_wait")
cmdDurationGetRegion = cmdDuration.WithLabelValues("get_region")
Expand All @@ -216,6 +217,7 @@ func initCmdDurations() {
cmdDurationUpdateServiceSafePointV2 = cmdDuration.WithLabelValues("update_service_safe_point_v2")

cmdFailDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region")
cmdFailDurationTSOWait = cmdFailedDuration.WithLabelValues("wait")
cmdFailDurationTSO = cmdFailedDuration.WithLabelValues("tso")
cmdFailDurationGetAllMembers = cmdFailedDuration.WithLabelValues("get_member_info")
cmdFailDurationGetPrevRegion = cmdFailedDuration.WithLabelValues("get_prev_region")
Expand Down
3 changes: 3 additions & 0 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,9 @@ func (c *pdServiceDiscovery) updateServiceModeLoop() {
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
ticker := time.NewTicker(serviceModeUpdateInterval)
failpoint.Inject("fastUpdateServiceMode", func() {
ticker.Reset(10 * time.Millisecond)
})
defer ticker.Stop()

for {
Expand Down
7 changes: 4 additions & 3 deletions client/tso_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,14 @@ func (req *tsoRequest) waitCtx(ctx context.Context) (physical int64, logical int
defer req.pool.Put(req)
defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqDone").End()
err = errors.WithStack(err)
now := time.Now()
if err != nil {
cmdFailDurationTSO.Observe(time.Since(req.start).Seconds())
cmdFailDurationTSOWait.Observe(now.Sub(start).Seconds())
cmdFailDurationTSO.Observe(now.Sub(req.start).Seconds())
return 0, 0, err
}
physical, logical = req.physical, req.logical
now := time.Now()
cmdDurationWait.Observe(now.Sub(start).Seconds())
cmdDurationTSOWait.Observe(now.Sub(start).Seconds())
cmdDurationTSO.Observe(now.Sub(req.start).Seconds())
return
case <-ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/pingcap/kvproto v0.0.0-20240910154453-b242104f8d31
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20240924035706-618b5cded5bf
github.com/pingcap/tidb-dashboard v0.0.0-20241104061623-bce95733dad7
github.com/prometheus/client_golang v1.19.0
github.com/prometheus/common v0.51.1
github.com/sasha-s/go-deadlock v0.3.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
github.com/pingcap/tidb-dashboard v0.0.0-20240924035706-618b5cded5bf h1:Eh4U5EAP0Hf/h8ApbwXiDwaKzl0wurQrhwS7eaNjyiM=
github.com/pingcap/tidb-dashboard v0.0.0-20240924035706-618b5cded5bf/go.mod h1:AT9vfeojwr/GGCHTURXtA8yZBE9AW8LdIo02/eYdfHU=
github.com/pingcap/tidb-dashboard v0.0.0-20241104061623-bce95733dad7 h1:qQnXeY2585WkQaPVc7p4i1MWUECKCbkbw5/QcrK5Ahg=
github.com/pingcap/tidb-dashboard v0.0.0-20241104061623-bce95733dad7/go.mod h1:AT9vfeojwr/GGCHTURXtA8yZBE9AW8LdIo02/eYdfHU=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
5 changes: 5 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,11 @@ func (o *PersistConfig) GetHotRegionCacheHitsThreshold() int {
return int(o.GetScheduleConfig().HotRegionCacheHitsThreshold)
}

// GetPatrolRegionWorkerCount returns the worker count of the patrol.
func (o *PersistConfig) GetPatrolRegionWorkerCount() int {
return o.GetScheduleConfig().PatrolRegionWorkerCount
}

// GetMaxMovableHotPeerSize returns the max movable hot peer size.
func (o *PersistConfig) GetMaxMovableHotPeerSize() int64 {
return o.GetScheduleConfig().MaxMovableHotPeerSize
Expand Down
106 changes: 98 additions & 8 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"strconv"
"sync"
"time"

"github.com/pingcap/failpoint"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/keyutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)
Expand All @@ -47,6 +49,7 @@ const (
// MaxPatrolScanRegionLimit is the max limit of regions to scan for a batch.
MaxPatrolScanRegionLimit = 8192
patrolRegionPartition = 1024
patrolRegionChanLen = MaxPatrolScanRegionLimit
)

var (
Expand All @@ -71,6 +74,7 @@ type Controller struct {
priorityInspector *PriorityInspector
pendingProcessedRegions *cache.TTLUint64
suspectKeyRanges *cache.TTLString // suspect key-range regions that may need fix
patrolRegionContext *PatrolRegionContext

// duration is the duration of the last patrol round.
// It's exported, so it should be protected by a mutex.
Expand All @@ -82,6 +86,8 @@ type Controller struct {
// It's used to update the ticker, so we need to
// record it to avoid updating the ticker frequently.
interval time.Duration
// workerCount is the count of workers to patrol regions.
workerCount int
// patrolRegionScanLimit is the limit of regions to scan.
// It is calculated by the number of regions.
patrolRegionScanLimit int
Expand All @@ -104,6 +110,7 @@ func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config
priorityInspector: NewPriorityInspector(cluster, conf),
pendingProcessedRegions: pendingProcessedRegions,
suspectKeyRanges: cache.NewStringTTL(ctx, time.Minute, 3*time.Minute),
patrolRegionContext: &PatrolRegionContext{},
interval: cluster.GetCheckerConfig().GetPatrolRegionInterval(),
patrolRegionScanLimit: calculateScanLimit(cluster),
}
Expand All @@ -112,6 +119,9 @@ func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config
// PatrolRegions is used to scan regions.
// The checkers will check these regions to decide if they need to do some operations.
func (c *Controller) PatrolRegions() {
c.patrolRegionContext.init(c.ctx)
c.patrolRegionContext.startPatrolRegionWorkers(c)
defer c.patrolRegionContext.stop()
ticker := time.NewTicker(c.interval)
defer ticker.Stop()
start := time.Now()
Expand All @@ -123,11 +133,20 @@ func (c *Controller) PatrolRegions() {
select {
case <-ticker.C:
c.updateTickerIfNeeded(ticker)
c.updatePatrolWorkersIfNeeded()
if c.cluster.IsSchedulingHalted() {
for len(c.patrolRegionContext.regionChan) > 0 {
<-c.patrolRegionContext.regionChan
}
log.Debug("skip patrol regions due to scheduling is halted")
continue
}

// wait for the regionChan to be drained
if len(c.patrolRegionContext.regionChan) > 0 {
continue
}

// Check priority regions first.
c.checkPriorityRegions()
// Check pending processed regions first.
Expand All @@ -150,6 +169,9 @@ func (c *Controller) PatrolRegions() {
start = time.Now()
}
failpoint.Inject("breakPatrol", func() {
for !c.IsPatrolRegionChanEmpty() {
time.Sleep(time.Millisecond * 10)
}
failpoint.Return()
})
case <-c.ctx.Done():
Expand All @@ -160,6 +182,32 @@ func (c *Controller) PatrolRegions() {
}
}

func (c *Controller) updateTickerIfNeeded(ticker *time.Ticker) {
// Note: we reset the ticker here to support updating configuration dynamically.
newInterval := c.cluster.GetCheckerConfig().GetPatrolRegionInterval()
if c.interval != newInterval {
c.interval = newInterval
ticker.Reset(newInterval)
log.Info("checkers starts patrol regions with new interval", zap.Duration("interval", newInterval))
}
}

func (c *Controller) updatePatrolWorkersIfNeeded() {
newWorkersCount := c.cluster.GetCheckerConfig().GetPatrolRegionWorkerCount()
if c.workerCount != newWorkersCount {
oldWorkersCount := c.workerCount
c.workerCount = newWorkersCount
// Stop the old workers and start the new workers.
c.patrolRegionContext.workersCancel()
c.patrolRegionContext.wg.Wait()
c.patrolRegionContext.workersCtx, c.patrolRegionContext.workersCancel = context.WithCancel(c.ctx)
c.patrolRegionContext.startPatrolRegionWorkers(c)
log.Info("checkers starts patrol regions with new workers count",
zap.Int("old-workers-count", oldWorkersCount),
zap.Int("new-workers-count", newWorkersCount))
}
}

// GetPatrolRegionsDuration returns the duration of the last patrol region round.
func (c *Controller) GetPatrolRegionsDuration() time.Duration {
c.mu.RLock()
Expand All @@ -182,7 +230,7 @@ func (c *Controller) checkRegions(startKey []byte) (key []byte, regions []*core.
}

for _, region := range regions {
c.tryAddOperators(region)
c.patrolRegionContext.regionChan <- region
key = region.GetEndKey()
}
return
Expand Down Expand Up @@ -446,13 +494,55 @@ func (c *Controller) GetPauseController(name string) (*PauseController, error) {
}
}

func (c *Controller) updateTickerIfNeeded(ticker *time.Ticker) {
// Note: we reset the ticker here to support updating configuration dynamically.
newInterval := c.cluster.GetCheckerConfig().GetPatrolRegionInterval()
if c.interval != newInterval {
c.interval = newInterval
ticker.Reset(newInterval)
log.Info("checkers starts patrol regions with new interval", zap.Duration("interval", newInterval))
// IsPatrolRegionChanEmpty returns whether the patrol region channel is empty.
func (c *Controller) IsPatrolRegionChanEmpty() bool {
if c.patrolRegionContext == nil {
return true
}
return len(c.patrolRegionContext.regionChan) == 0
}

// PatrolRegionContext is used to store the context of patrol regions.
type PatrolRegionContext struct {
workersCtx context.Context
workersCancel context.CancelFunc
regionChan chan *core.RegionInfo
wg sync.WaitGroup
}

func (p *PatrolRegionContext) init(ctx context.Context) {
p.regionChan = make(chan *core.RegionInfo, patrolRegionChanLen)
p.workersCtx, p.workersCancel = context.WithCancel(ctx)
}

func (p *PatrolRegionContext) stop() {
log.Debug("closing patrol region workers")
close(p.regionChan)
p.workersCancel()
p.wg.Wait()
log.Debug("patrol region workers are closed")
}

func (p *PatrolRegionContext) startPatrolRegionWorkers(c *Controller) {
for i := range c.workerCount {
p.wg.Add(1)
go func(i int) {
defer logutil.LogPanic()
defer p.wg.Done()
for {
select {
case region, ok := <-p.regionChan:
if !ok {
log.Debug("region channel is closed", zap.Int("worker-id", i))
return
}
c.tryAddOperators(region)
case <-p.workersCtx.Done():
log.Debug("region worker is closed", zap.Int("worker-id", i))
return
}
}
}(i)
}
}

Expand Down
33 changes: 23 additions & 10 deletions pkg/schedule/checker/priority_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/tikv/pd/pkg/schedule/config"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/syncutil"
)

// defaultPriorityQueueSize is the default value of priority queue size.
Expand All @@ -31,16 +32,20 @@ const defaultPriorityQueueSize = 1280
type PriorityInspector struct {
cluster sche.CheckerCluster
conf config.CheckerConfigProvider
queue *cache.PriorityQueue
mu struct {
syncutil.RWMutex
queue *cache.PriorityQueue
}
}

// NewPriorityInspector creates a priority inspector.
func NewPriorityInspector(cluster sche.CheckerCluster, conf config.CheckerConfigProvider) *PriorityInspector {
return &PriorityInspector{
res := &PriorityInspector{
cluster: cluster,
conf: conf,
queue: cache.NewPriorityQueue(defaultPriorityQueueSize),
}
res.mu.queue = cache.NewPriorityQueue(defaultPriorityQueueSize)
return res
}

// RegionPriorityEntry records region priority info.
Expand Down Expand Up @@ -99,24 +104,28 @@ func (p *PriorityInspector) inspectRegionInReplica(region *core.RegionInfo) (mak
// It will remove if region's priority equal 0.
// It's Attempt will increase if region's priority equal last.
func (p *PriorityInspector) addOrRemoveRegion(priority int, regionID uint64) {
p.mu.Lock()
defer p.mu.Unlock()
if priority < 0 {
if entry := p.queue.Get(regionID); entry != nil && entry.Priority == priority {
if entry := p.mu.queue.Get(regionID); entry != nil && entry.Priority == priority {
e := entry.Value.(*RegionPriorityEntry)
e.Attempt++
e.Last = time.Now()
p.queue.Put(priority, e)
p.mu.queue.Put(priority, e)
} else {
entry := NewRegionEntry(regionID)
p.queue.Put(priority, entry)
p.mu.queue.Put(priority, entry)
}
} else {
p.queue.Remove(regionID)
p.mu.queue.Remove(regionID)
}
}

// GetPriorityRegions returns all regions in priority queue that needs rerun.
func (p *PriorityInspector) GetPriorityRegions() (ids []uint64) {
entries := p.queue.Elems()
p.mu.RLock()
defer p.mu.RUnlock()
entries := p.mu.queue.Elems()
for _, e := range entries {
re := e.Value.(*RegionPriorityEntry)
// avoid to some priority region occupy checker, region don't need check on next check interval
Expand All @@ -130,11 +139,15 @@ func (p *PriorityInspector) GetPriorityRegions() (ids []uint64) {

// RemovePriorityRegion removes priority region from priority queue.
func (p *PriorityInspector) RemovePriorityRegion(regionID uint64) {
p.queue.Remove(regionID)
p.mu.Lock()
defer p.mu.Unlock()
p.mu.queue.Remove(regionID)
}

// getQueueLen returns the length of priority queue.
// it's only used for test.
func (p *PriorityInspector) getQueueLen() int {
return p.queue.Len()
p.mu.RLock()
defer p.mu.RUnlock()
return p.mu.queue.Len()
}
Loading

0 comments on commit dddd2f4

Please sign in to comment.