Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed May 14, 2024
1 parent d9fe256 commit a9eef12
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,13 @@ func (cr *ConcurrentRunner) Start() {

func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) {
start := time.Now()
defer func() {
RunnerTaskExecutionDuration.WithLabelValues(cr.name, task.name).Observe(time.Since(start).Seconds())
RunnerSucceededTasks.WithLabelValues(cr.name, task.name).Inc()
}()
task.f(task.ctx)
if token != nil {
token.Release()
cr.processPendingTasks()
}
RunnerTaskExecutionDuration.WithLabelValues(cr.name, task.name).Observe(time.Since(start).Seconds())
RunnerSucceededTasks.WithLabelValues(cr.name, task.name).Inc()
}

func (cr *ConcurrentRunner) processPendingTasks() {
Expand Down Expand Up @@ -198,11 +196,13 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con
opt(task)
}

cr.processPendingTasks()
cr.pendingMu.Lock()
defer cr.pendingMu.Unlock()
defer func() {
cr.pendingMu.Unlock()
cr.processPendingTasks()
}()
if task.priority >= constant.High {
// We use the max task number to prevent the OOM issue.
// We use the max task number to limit the memory usage.
// It occupies around 1.5GB memory when there is 20000000 pending task.
if len(cr.pendingHighPriorityTasks) > maxHighPriorityTaskNum {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
Expand Down

0 comments on commit a9eef12

Please sign in to comment.