Skip to content

Commit

Permalink
limit the count of high priority queue
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed May 13, 2024
1 parent 6f3ef65 commit d9fe256
Showing 1 changed file with 9 additions and 7 deletions.
16 changes: 9 additions & 7 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ const (
SaveRegionToKV = "SaveRegionToKV"
)

const initialCapacity = 10000
const (
initialCapacity = 10000
maxHighPriorityTaskNum = 20000000
)

// Runner is the interface for running tasks.
type Runner interface {
Expand Down Expand Up @@ -199,12 +202,11 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con
cr.pendingMu.Lock()
defer cr.pendingMu.Unlock()
if task.priority >= constant.High {
if len(cr.pendingHighPriorityTasks) > 0 {
maxWait := time.Since(cr.pendingHighPriorityTasks[0].submittedAt)
if maxWait > cr.maxPendingDuration {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
return ErrMaxWaitingTasksExceeded
}
// We use the max task number to prevent the OOM issue.
// It occupies around 1.5GB memory when there is 20000000 pending task.
if len(cr.pendingHighPriorityTasks) > maxHighPriorityTaskNum {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
return ErrMaxWaitingTasksExceeded

Check warning on line 209 in pkg/ratelimit/runner.go

View check run for this annotation

Codecov / codecov/patch

pkg/ratelimit/runner.go#L208-L209

Added lines #L208 - L209 were not covered by tests
}
task.submittedAt = time.Now()
cr.pendingHighPriorityTasks = append(cr.pendingHighPriorityTasks, task)
Expand Down

0 comments on commit d9fe256

Please sign in to comment.