diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index d3f3dd329fb0..84c471206f62 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -144,15 +144,13 @@ func (cr *ConcurrentRunner) Start() { func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) { start := time.Now() - defer func() { - RunnerTaskExecutionDuration.WithLabelValues(cr.name, task.name).Observe(time.Since(start).Seconds()) - RunnerSucceededTasks.WithLabelValues(cr.name, task.name).Inc() - }() task.f(task.ctx) if token != nil { token.Release() cr.processPendingTasks() } + RunnerTaskExecutionDuration.WithLabelValues(cr.name, task.name).Observe(time.Since(start).Seconds()) + RunnerSucceededTasks.WithLabelValues(cr.name, task.name).Inc() } func (cr *ConcurrentRunner) processPendingTasks() { @@ -198,11 +196,13 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con opt(task) } - cr.processPendingTasks() cr.pendingMu.Lock() - defer cr.pendingMu.Unlock() + defer func() { + cr.pendingMu.Unlock() + cr.processPendingTasks() + }() if task.priority >= constant.High { - // We use the max task number to prevent the OOM issue. + // We use the max task number to limit the memory usage. // It occupies around 1.5GB memory when there is 20000000 pending task. if len(cr.pendingHighPriorityTasks) > maxHighPriorityTaskNum { RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()