Skip to content

Commit

Permalink
optimize memory usage
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed May 11, 2024
1 parent 78cb081 commit 91fbb40
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 67 deletions.
18 changes: 17 additions & 1 deletion pkg/core/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package core

import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -114,6 +115,7 @@ type RegionHeartbeatProcessTracer interface {
OnCollectRegionStatsFinished()
OnAllStageFinished()
LogFields() []zap.Field
Release()
}

type noopHeartbeatProcessTracer struct{}
Expand All @@ -138,6 +140,7 @@ func (*noopHeartbeatProcessTracer) OnAllStageFinished() {}
func (*noopHeartbeatProcessTracer) LogFields() []zap.Field {
return nil
}
func (*noopHeartbeatProcessTracer) Release() {}

Check warning on line 143 in pkg/core/metrics.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/metrics.go#L143

Added line #L143 was not covered by tests

type regionHeartbeatProcessTracer struct {
startTime time.Time
Expand All @@ -149,9 +152,22 @@ type regionHeartbeatProcessTracer struct {
OtherDuration time.Duration
}

var tracerPool = sync.Pool{
New: func() any {
return &regionHeartbeatProcessTracer{}
},
}

// NewHeartbeatProcessTracer returns a heartbeat process tracer.
func NewHeartbeatProcessTracer() RegionHeartbeatProcessTracer {
return &regionHeartbeatProcessTracer{}
return tracerPool.Get().(*regionHeartbeatProcessTracer)
}

// Release puts the tracer back into the pool.
func (h *regionHeartbeatProcessTracer) Release() {
// Reset the fields of h to their zero values.
*h = regionHeartbeatProcessTracer{}
tracerPool.Put(h)
}

func (h *regionHeartbeatProcessTracer) Begin() {
Expand Down
23 changes: 10 additions & 13 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,26 +138,23 @@ func NewRegionInfo(region *metapb.Region, leader *metapb.Peer, opts ...RegionCre

// classifyVoterAndLearner sorts out voter and learner from peers into different slice.
func classifyVoterAndLearner(region *RegionInfo) {
learners := make([]*metapb.Peer, 0, 1)
voters := make([]*metapb.Peer, 0, len(region.meta.Peers))
witnesses := make([]*metapb.Peer, 0, 1)
// Reset slices
region.learners = region.learners[:0]
region.voters = region.voters[:0]
region.witnesses = region.witnesses[:0]
for _, p := range region.meta.Peers {
if IsLearner(p) {
learners = append(learners, p)
region.learners = append(region.learners, p)
} else {
voters = append(voters, p)
region.voters = append(region.voters, p)
}
// Whichever peer role can be a witness
if IsWitness(p) {
witnesses = append(witnesses, p)
region.witnesses = append(region.witnesses, p)
}
}
sort.Sort(peerSlice(learners))
sort.Sort(peerSlice(voters))
sort.Sort(peerSlice(witnesses))
region.learners = learners
region.voters = voters
region.witnesses = witnesses
sort.Sort(peerSlice(region.learners))
sort.Sort(peerSlice(region.voters))
sort.Sort(peerSlice(region.witnesses))
}

// peersEqualTo returns true when the peers are not changed, which may caused by: the region leader not changed,
Expand Down
81 changes: 51 additions & 30 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package ratelimit

import (
"container/list"
"context"
"errors"
"sync"
Expand All @@ -35,23 +36,48 @@ const (
SaveRegionToKV = "SaveRegionToKV"
)

const initialCapacity = 100

// Runner is the interface for running tasks.
type Runner interface {
RunTask(ctx context.Context, f func(context.Context), opts ...TaskOption) error
Start()
Stop()
}

// Task is a task to be run.
// TaskPool is a pool for tasks.
var TaskPool = &sync.Pool{
New: func() any {
return &Task{}
},
}

// Task is a task to run.
type Task struct {
Ctx context.Context
Opts *TaskOpts
f func(context.Context)
submittedAt time.Time
}

func NewTask(ctx context.Context, f func(context.Context), opts ...TaskOption) *Task {
task := TaskPool.Get().(*Task)
task.Ctx = ctx
task.f = f
task.Opts = &TaskOpts{}
task.submittedAt = time.Now()
for _, opt := range opts {
opt(task.Opts)
}
return task
}

// ReleaseTask releases the task.
func ReleaseTask(task *Task) {
task.Ctx = nil
task.Opts = nil
task.f = nil
TaskPool.Put(task)
}

// ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum.
var ErrMaxWaitingTasksExceeded = errors.New("max waiting tasks exceeded")

Expand All @@ -61,7 +87,7 @@ type ConcurrentRunner struct {
limiter *ConcurrencyLimiter
maxPendingDuration time.Duration
taskChan chan *Task
pendingTasks []*Task
pendingTasks *list.List
pendingMu sync.Mutex
stopChan chan struct{}
wg sync.WaitGroup
Expand All @@ -77,7 +103,7 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur
limiter: limiter,
maxPendingDuration: maxPendingDuration,
taskChan: make(chan *Task),
pendingTasks: make([]*Task, 0, initialCapacity),
pendingTasks: list.New(),
failedTaskCount: RunnerTaskFailedTasks.WithLabelValues(name),
pendingTaskCount: make(map[string]int64),
maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name),
Expand Down Expand Up @@ -114,21 +140,22 @@ func (cr *ConcurrentRunner) Start() {
if err != nil {
continue
}
go cr.run(task.Ctx, task.f, token)
go cr.run(task, token)
} else {
go cr.run(task.Ctx, task.f, nil)
go cr.run(task, nil)

Check warning on line 145 in pkg/ratelimit/runner.go

View check run for this annotation

Codecov / codecov/patch

pkg/ratelimit/runner.go#L145

Added line #L145 was not covered by tests
}
case <-cr.stopChan:
cr.pendingMu.Lock()
cr.pendingTasks = make([]*Task, 0, initialCapacity)
cr.pendingTasks = list.New()
cr.pendingMu.Unlock()
log.Info("stopping async task runner", zap.String("name", cr.name))
return
case <-ticker.C:
maxDuration := time.Duration(0)
cr.pendingMu.Lock()
if len(cr.pendingTasks) > 0 {
maxDuration = time.Since(cr.pendingTasks[0].submittedAt)
if cr.pendingTasks.Len() > 0 {
first := cr.pendingTasks.Front()
maxDuration = time.Since(first.Value.(*Task).submittedAt)
}
for name, cnt := range cr.pendingTaskCount {
RunnerTaskPendingTasks.WithLabelValues(cr.name, name).Set(float64(cnt))
Expand All @@ -140,27 +167,28 @@ func (cr *ConcurrentRunner) Start() {
}()
}

func (cr *ConcurrentRunner) run(ctx context.Context, task func(context.Context), token *TaskToken) {
task(ctx)
func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) {
task.f(task.Ctx)
if token != nil {
token.Release()
cr.processPendingTasks()
}
ReleaseTask(task)
}

func (cr *ConcurrentRunner) processPendingTasks() {
cr.pendingMu.Lock()
defer cr.pendingMu.Unlock()
for len(cr.pendingTasks) > 0 {
task := cr.pendingTasks[0]
if cr.pendingTasks.Len() > 0 {
first := cr.pendingTasks.Front()
task := first.Value.(*Task)
select {
case cr.taskChan <- task:
cr.pendingTasks = cr.pendingTasks[1:]
cr.pendingTasks.Remove(first)
cr.pendingTaskCount[task.Opts.TaskName]--
return
default:
return
}
return
}
}

Expand All @@ -172,32 +200,25 @@ func (cr *ConcurrentRunner) Stop() {

// RunTask runs the task asynchronously.
func (cr *ConcurrentRunner) RunTask(ctx context.Context, f func(context.Context), opts ...TaskOption) error {
taskOpts := &TaskOpts{}
for _, opt := range opts {
opt(taskOpts)
}
task := &Task{
Ctx: ctx,
f: f,
Opts: taskOpts,
}
task := NewTask(ctx, f, opts...)

cr.processPendingTasks()
select {
case cr.taskChan <- task:
default:
cr.pendingMu.Lock()
defer cr.pendingMu.Unlock()
if len(cr.pendingTasks) > 0 {
maxWait := time.Since(cr.pendingTasks[0].submittedAt)
if cr.pendingTasks.Len() > 0 {
first := cr.pendingTasks.Front()
maxWait := time.Since(first.Value.(*Task).submittedAt)
if maxWait > cr.maxPendingDuration {
cr.failedTaskCount.Inc()
return ErrMaxWaitingTasksExceeded
}
}
task.submittedAt = time.Now()
cr.pendingTasks = append(cr.pendingTasks, task)
cr.pendingTaskCount[taskOpts.TaskName]++
cr.pendingTasks.PushBack(task)
cr.pendingTaskCount[task.Opts.TaskName]++
}
return nil
}
Expand Down
39 changes: 23 additions & 16 deletions pkg/statistics/region_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,27 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store
}
}
}

peers := region.GetPeers()
downPeers := region.GetDownPeers()
pendingPeers := region.GetPendingPeers()
learners := region.GetLearners()
voters := region.GetVoters()
regionSize := region.GetApproximateSize()
regionMaxSize := int64(r.conf.GetRegionMaxSize())
regionMaxKeys := int64(r.conf.GetRegionMaxKeys())
maxMergeRegionSize := int64(r.conf.GetMaxMergeRegionSize())
maxMergeRegionKeys := int64(r.conf.GetMaxMergeRegionKeys())
leaderIsWitness := region.GetLeader().GetIsWitness()

// Better to make sure once any of these conditions changes, it will trigger the heartbeat `save_cache`.
// Otherwise, the state may be out-of-date for a long time, which needs another way to apply the change ASAP.
// For example, see `RegionStatsNeedUpdate` above to know how `OversizedRegion` and `UndersizedRegion` are updated.
conditions := map[RegionStatisticType]bool{
MissPeer: len(region.GetPeers()) < desiredReplicas,
ExtraPeer: len(region.GetPeers()) > desiredReplicas,
DownPeer: len(region.GetDownPeers()) > 0,
PendingPeer: len(region.GetPendingPeers()) > 0,
MissPeer: len(peers) < desiredReplicas,
ExtraPeer: len(peers) > desiredReplicas,
DownPeer: len(downPeers) > 0,
PendingPeer: len(pendingPeers) > 0,
OfflinePeer: func() bool {
for _, store := range stores {
if store.IsRemoving() {
Expand All @@ -226,17 +239,11 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store
}
return false
}(),
LearnerPeer: len(region.GetLearners()) > 0,
EmptyRegion: region.GetApproximateSize() <= core.EmptyRegionApproximateSize,
OversizedRegion: region.IsOversized(
int64(r.conf.GetRegionMaxSize()),
int64(r.conf.GetRegionMaxKeys()),
),
UndersizedRegion: region.NeedMerge(
int64(r.conf.GetMaxMergeRegionSize()),
int64(r.conf.GetMaxMergeRegionKeys()),
),
WitnessLeader: region.GetLeader().GetIsWitness(),
LearnerPeer: len(learners) > 0,
EmptyRegion: regionSize <= core.EmptyRegionApproximateSize,
OversizedRegion: region.IsOversized(regionMaxSize, regionMaxKeys),
UndersizedRegion: region.NeedMerge(maxMergeRegionSize, maxMergeRegionKeys),
WitnessLeader: leaderIsWitness,
}
// Check if the region meets any of the conditions and update the corresponding info.
regionID := region.GetID()
Expand All @@ -253,7 +260,7 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store
info.startDownPeerTS = time.Now().Unix()
logDownPeerWithNoDisconnectedStore(region, stores)
}
} else if typ == MissPeer && len(region.GetVoters()) < desiredVoters {
} else if typ == MissPeer && len(voters) < desiredVoters {
if info.startMissVoterPeerTS != 0 {
regionMissVoterPeerDuration.Observe(float64(time.Now().Unix() - info.startMissVoterPeerTS))
} else {
Expand Down
10 changes: 3 additions & 7 deletions pkg/utils/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,9 @@ func ResetForwardContext(ctx context.Context) context.Context {

// GetForwardedHost returns the forwarded host in metadata.
func GetForwardedHost(ctx context.Context) string {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
log.Debug("failed to get gRPC incoming metadata when getting forwarded host")
return ""
}
if t, ok := md[ForwardMetadataKey]; ok {
return t[0]
s := metadata.ValueFromIncomingContext(ctx, ForwardMetadataKey)
if len(s) > 0 {
return s[0]
}
return ""
}
Expand Down
1 change: 1 addition & 0 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {
return err
}
tracer.OnAllStageFinished()
tracer.Release()

if c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
return nil
Expand Down

0 comments on commit 91fbb40

Please sign in to comment.