From 476f1625c02f4c6371b8fcaa1fb306379875a534 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Tue, 22 Oct 2024 17:00:31 +0800 Subject: [PATCH] [Chore](job)Provides configuration of job execution queue size When dealing with a large number of tasks, the default execution queue size is 1024. This can lead to tasks being dropped if the queue becomes full. To address this, you can add the parameters `insert_task_queue_size` and `mtmv_task_queue_size` in the `fe.conf` configuration file. These parameters must be set to a power of 2. Keep in mind, increasing this value is recommended only when thread resources are limited; otherwise, you should consider increasing the number of task execution threads. --- .../org/apache/doris/job/executor/TimerJobSchedulerTask.java | 2 +- .../main/java/org/apache/doris/job/scheduler/JobScheduler.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java index 25bbccf3fa2fa6..50793d80296915 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java @@ -45,7 +45,7 @@ public void run(Timeout timeout) { return; } if (!dispatchDisruptor.publishEvent(this.job)) { - log.warn("dispatch timer job failed, job id is {}, job name is {}", + log.warn("dispatch timer job failed, queue maybe full. job id is {}, job name is {}", this.job.getJobId(), this.job.getJobName()); } } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 862b85597cdc3a..c9bc77ff9b8166 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -168,7 +168,7 @@ public void schedulerInstantJob(T job, TaskType taskType, C context) throws JobE for (AbstractTask task : tasks) { if (!taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(), job.getJobConfig())) { - throw new JobException("dispatch instant task failed, job id is " + throw new JobException("dispatch instant task failed, queue maybe full, job id is " + job.getJobId() + ", task id is " + task.getTaskId()); } log.info("dispatch instant job, job id is {}, job name is {}, task id is {}", job.getJobId(),