Skip to content

Commit

Permalink
prevent oom
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed May 13, 2024
1 parent a342352 commit 41d17db
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 55 deletions.
14 changes: 8 additions & 6 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
Expand Down Expand Up @@ -729,7 +730,7 @@ func (r *RegionInfo) isRegionRecreated() bool {

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, metaUpdated bool)
type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool, priority constant.PriorityLevel)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
Expand All @@ -742,7 +743,8 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, metaUpdated bool) {
return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool, priority constant.PriorityLevel) {
priority = constant.Medium
logRunner := ctx.LogRunner
// print log asynchronously
debug, info := d, i
Expand Down Expand Up @@ -772,7 +774,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
}
saveKV, saveCache, metaUpdated = true, true, true
saveKV, saveCache, priority = true, true, constant.High
} else {
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
Expand All @@ -785,7 +787,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-version", r.GetVersion()),
)
}
saveKV, saveCache, metaUpdated = true, true, true
saveKV, saveCache, priority = true, true, constant.High
}
if r.GetConfVer() > o.GetConfVer() {
if log.GetLevel() <= zap.InfoLevel {
Expand All @@ -796,7 +798,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-confver", r.GetConfVer()),
)
}
saveKV, saveCache, metaUpdated = true, true, true
saveKV, saveCache, priority = true, true, constant.High
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel {
Expand All @@ -807,7 +809,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
)
}
// We check it first and do not return because the log is important for us to investigate,
saveCache, needSync, metaUpdated = true, true, true
saveCache, needSync, priority = true, true, constant.High
}
if len(region.GetPeers()) != len(origin.GetPeers()) {
saveKV, saveCache = true, true
Expand Down
42 changes: 8 additions & 34 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/cluster"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
"github.com/tikv/pd/pkg/ratelimit"
Expand Down Expand Up @@ -591,19 +592,12 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
return err
}
region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket())

ctx.TaskRunner.RunTask(
ctx,
ratelimit.HandleStatsAsync,
func(_ context.Context) {
cluster.HandleStatsAsync(c, region)
},
)
cluster.HandleStatsAsync(c, region)
tracer.OnAsyncHotStatsFinished()
hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
_, saveCache, _, metaUpdated := core.GenerateRegionGuideFunc(true)(ctx, region, origin)
_, saveCache, _, priority := core.GenerateRegionGuideFunc(true)(ctx, region, origin)

if !saveCache {
// Due to some config changes need to update the region stats as well,
Expand All @@ -617,11 +611,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
cluster.Collect(c, region, hasRegionStats)
}
},
<<<<<<< HEAD
=======
ratelimit.WithTaskName(ratelimit.ObserveRegionStatsAsync),
ratelimit.WithMetaUpdated(metaUpdated),
>>>>>>> distinguish the task priority
ratelimit.WithPriority(priority),
)
}
// region is not updated to the subtree.
Expand All @@ -632,11 +622,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
<<<<<<< HEAD
=======
ratelimit.WithTaskName(ratelimit.UpdateSubTree),
ratelimit.WithMetaUpdated(metaUpdated),
>>>>>>> distinguish the task priority
ratelimit.WithPriority(constant.High),
)
}
return nil
Expand All @@ -660,11 +646,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
<<<<<<< HEAD
=======
ratelimit.WithTaskName(ratelimit.UpdateSubTree),
ratelimit.WithMetaUpdated(metaUpdated),
>>>>>>> distinguish the task priority
ratelimit.WithPriority(priority),
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
Expand All @@ -673,11 +655,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
},
<<<<<<< HEAD
=======
ratelimit.WithTaskName(ratelimit.HandleOverlaps),
ratelimit.WithMetaUpdated(metaUpdated),
>>>>>>> distinguish the task priority
ratelimit.WithPriority(priority),
)
}
tracer.OnSaveCacheFinished()
Expand All @@ -688,11 +666,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
func(_ context.Context) {
cluster.Collect(c, region, hasRegionStats)
},
<<<<<<< HEAD
=======
ratelimit.WithTaskName(ratelimit.CollectRegionStatsAsync),
ratelimit.WithMetaUpdated(metaUpdated),
>>>>>>> distinguish the task priority
ratelimit.WithPriority(priority),
)
tracer.OnCollectRegionStatsFinished()
return nil
Expand Down
23 changes: 15 additions & 8 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/core/constant"
"go.uber.org/zap"
)

Expand All @@ -35,7 +36,7 @@ const (
SaveRegionToKV = "SaveRegionToKV"
)

const initialCapacity = 100
const initialCapacity = 10000

// Runner is the interface for running tasks.
type Runner interface {
Expand Down Expand Up @@ -88,16 +89,15 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur

// TaskOpts is the options for RunTask.
type TaskOpts struct {
// IsMetaUpdated indicates whether the meta is updated.
MetaUpdated bool
priority constant.PriorityLevel
}

// TaskOption configures TaskOp
type TaskOption func(opts *TaskOpts)

// WithMetaUpdated specify whether the meta is updated.
func WithMetaUpdated(metaUpdated bool) TaskOption {
return func(opts *TaskOpts) { opts.MetaUpdated = metaUpdated }
// WithPriority sets the priority of the task.
func WithPriority(priority constant.PriorityLevel) TaskOption {
return func(opts *TaskOpts) { opts.priority = priority }
}

// Start starts the runner.
Expand Down Expand Up @@ -202,9 +202,16 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con
cr.processPendingTasks()
cr.pendingMu.Lock()
defer cr.pendingMu.Unlock()
if task.opts.MetaUpdated {
if task.opts.priority >= constant.High {
if len(cr.pendingHighPriorityTasks) > 0 {
maxWait := time.Since(cr.pendingHighPriorityTasks[0].submittedAt)
if maxWait > cr.maxPendingDuration {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()

Check warning on line 209 in pkg/ratelimit/runner.go

View check run for this annotation

Codecov / codecov/patch

pkg/ratelimit/runner.go#L209

Added line #L209 was not covered by tests
return ErrMaxWaitingTasksExceeded
}
}
task.submittedAt = time.Now()
cr.pendingHighPriorityTasks = append(cr.pendingNormalPriorityTasks, task)
cr.pendingHighPriorityTasks = append(cr.pendingHighPriorityTasks, task)
cr.pendingTaskCount[task.name]++
return nil
}
Expand Down
15 changes: 8 additions & 7 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/cluster"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/gc"
Expand Down Expand Up @@ -1030,7 +1031,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
saveKV, saveCache, needSync, metaUpdated := regionGuide(ctx, region, origin)
saveKV, saveCache, needSync, priority := regionGuide(ctx, region, origin)
tracer.OnRegionGuideFinished()
if !saveKV && !saveCache {
// Due to some config changes need to update the region stats as well,
Expand All @@ -1047,7 +1048,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
cluster.Collect(c, region, hasRegionStats)
}
},
ratelimit.WithMetaUpdated(metaUpdated),
ratelimit.WithPriority(priority),
)
}
// region is not updated to the subtree.
Expand All @@ -1058,7 +1059,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithMetaUpdated(true),
ratelimit.WithPriority(constant.High),
)
}
return nil
Expand Down Expand Up @@ -1086,7 +1087,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithMetaUpdated(metaUpdated),
ratelimit.WithPriority(priority),
)
tracer.OnUpdateSubTreeFinished()

Expand All @@ -1097,7 +1098,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
},
ratelimit.WithMetaUpdated(metaUpdated),
ratelimit.WithPriority(priority),
)
}
regionUpdateCacheEventCounter.Inc()
Expand All @@ -1114,7 +1115,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// We need to think of a better way to reduce this part of the cost in the future.
cluster.Collect(c, region, hasRegionStats)
},
ratelimit.WithMetaUpdated(metaUpdated),
ratelimit.WithPriority(priority),
)

tracer.OnCollectRegionStatsFinished()
Expand Down Expand Up @@ -1144,7 +1145,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
}
regionUpdateKVEventCounter.Inc()
},
ratelimit.WithMetaUpdated(metaUpdated),
ratelimit.WithPriority(priority),
)
}
}
Expand Down

0 comments on commit 41d17db

Please sign in to comment.