diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 9811eef28b6337..ba2075f866d5cf 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1746,7 +1746,7 @@ public class Config extends ConfigBase { * corresponding type of job * The value should be greater than 0, if it is 0 or <=0, set it to 5 */ - @ConfField(description = {"用于分发定时任务的线程数", + @ConfField(masterOnly = true, description = {"用于分发定时任务的线程数", "The number of threads used to dispatch timer job."}) public static int job_dispatch_timer_job_thread_num = 2; @@ -1757,24 +1757,34 @@ public class Config extends ConfigBase { * {@code @dispatch_timer_job_thread_num} * The value should be greater than 0, if it is 0 or <=0, set it to 1024 */ - @ConfField(description = {"任务堆积时用于存放定时任务的队列大小", "The number of timer jobs that can be queued."}) + @ConfField(masterOnly = true, description = {"任务堆积时用于存放定时任务的队列大小", "The number of timer jobs that can be queued."}) public static int job_dispatch_timer_job_queue_size = 1024; - @ConfField(description = {"一个 Job 的 task 最大的持久化数量,超过这个限制将会丢弃旧的 task 记录, 如果值 < 1, 将不会持久化。", + @ConfField(masterOnly = true, description = {"一个 Job 的 task 最大的持久化数量,超过这个限制将会丢弃旧的 task 记录, 如果值 < 1, 将不会持久化。", "Maximum number of persistence allowed per task in a job,exceeding which old tasks will be discarded," + "If the value is less than 1, it will not be persisted." }) public static int max_persistence_task_count = 100; - @ConfField(description = {"finished 状态的 job 最长保存时间,超过这个时间将会被删除, 单位:小时", + @ConfField(masterOnly = true, description = {"MV task 的等待队列大小,如果是负数,则会使用 1024,如果不是 2 的幂,则会自动选择一个最接近的" + + " 2 的幂次方数", "The size of the MV task's waiting queue If the size is negative, 1024 will be used. If " + + "the size is not a power of two, the nearest power of the size will be" + + " automatically selected."}) + public static int mtmv_task_queue_size = 1024; + @ConfField(masterOnly = true, description = {"Insert task 的等待队列大小,如果是负数,则会使用 1024,如果不是 2 的幂,则会自动选择一个最接近" + + " 的 2 的幂次方数", "The size of the Insert task's waiting queue If the size is negative, 1024 will be used." + + " If the size is not a power of two, the nearest power of the size will " + + "be automatically selected."}) + public static int insert_task_queue_size = 1024; + @ConfField(masterOnly = true, description = {"finished 状态的 job 最长保存时间,超过这个时间将会被删除, 单位:小时", "The longest time to save the job in finished status, it will be deleted after this time. Unit: hour"}) public static int finished_job_cleanup_threshold_time_hour = 24; - @ConfField(description = {"用于执行 Insert 任务的线程数,值应该大于0,否则默认为5", + @ConfField(masterOnly = true, description = {"用于执行 Insert 任务的线程数,值应该大于0,否则默认为10", "The number of threads used to consume Insert tasks, " - + "the value should be greater than 0, if it is <=0, default is 5."}) + + "the value should be greater than 0, if it is <=0, default is 10."}) public static int job_insert_task_consumer_thread_num = 10; - @ConfField(description = {"用于执行 MTMV 任务的线程数,值应该大于0,否则默认为5", + @ConfField(masterOnly = true, description = {"用于执行 MTMV 任务的线程数,值应该大于0,否则默认为10", "The number of threads used to consume mtmv tasks, " - + "the value should be greater than 0, if it is <=0, default is 5."}) + + "the value should be greater than 0, if it is <=0, default is 10."}) public static int job_mtmv_task_consumer_thread_num = 10; /* job test config */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 94a0b0146cd514..62ac0c4d59d743 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -63,7 +63,7 @@ public abstract class AbstractJob implements Job { * @return TRow */ TRow getTvfInfo(); + + /** + * Generates a common error message when the execution queue is full. + * @param taskId task id + * @return error msg for user + */ + String formatMsgWhenExecuteQueueFull(Long taskId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java index 25bbccf3fa2fa6..65a9cf2e091164 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java @@ -45,11 +45,17 @@ public void run(Timeout timeout) { return; } if (!dispatchDisruptor.publishEvent(this.job)) { - log.warn("dispatch timer job failed, job id is {}, job name is {}", - this.job.getJobId(), this.job.getJobName()); + log.warn("dispatch timer job failed, queue maybe full. job id is {}, job name is {}", + this.job.getJobId(), this.job.getJobName() + getMsgWhenExecuteQueueFull()); } } catch (Exception e) { log.warn("dispatch timer job error, task id is {}", this.job.getJobId(), e); } } + + private String getMsgWhenExecuteQueueFull() { + return "you can increase the queue size by setting the property " + + "job_dispatch_timer_job_queue_size in the fe.conf file or increase the value of " + + "the property job_dispatch_timer_job_thread_num in the fe.conf file"; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index 487591efc04745..ce35227feb1e15 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -522,6 +522,12 @@ public List getShowInfo() { } } + @Override + public String formatMsgWhenExecuteQueueFull(Long taskId) { + return commonFormatMsgWhenExecuteQueueFull(taskId, "insert_task_queue_size", + "job_insert_task_consumer_thread_num"); + } + private String getPriority() { return properties.getOrDefault(LoadStmt.PRIORITY, Priority.NORMAL.name()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java index add191001f9125..62c005ca2807ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java @@ -218,6 +218,13 @@ public List getShowInfo() { return data; } + @Override + public String formatMsgWhenExecuteQueueFull(Long taskId) { + return commonFormatMsgWhenExecuteQueueFull(taskId, "mtmv_task_queue_size", + "job_mtmv_task_consumer_thread_num"); + + } + @Override public TRow getTvfInfo() { TRow trow = new TRow(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java index b1ccb9764438c8..cc82b59a36a36f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java @@ -65,8 +65,8 @@ public class TaskDisruptorGroupManager { private static final int DISPATCH_MTMV_THREAD_NUM = Config.job_mtmv_task_consumer_thread_num > 0 ? Config.job_mtmv_task_consumer_thread_num : DEFAULT_CONSUMER_THREAD_NUM; - private static final int DISPATCH_INSERT_TASK_QUEUE_SIZE = DEFAULT_RING_BUFFER_SIZE; - private static final int DISPATCH_MTMV_TASK_QUEUE_SIZE = DEFAULT_RING_BUFFER_SIZE; + private static final int DISPATCH_INSERT_TASK_QUEUE_SIZE = normalizeRingbufferSize(Config.insert_task_queue_size); + private static final int DISPATCH_MTMV_TASK_QUEUE_SIZE = normalizeRingbufferSize(Config.mtmv_task_queue_size); public void init() { @@ -133,4 +133,24 @@ public boolean dispatchInstantTask(AbstractTask task, JobType jobType, } + /** + * Normalizes the given size to the nearest power of two. + * This method ensures that the size is a power of two, which is often required for optimal + * performance in certain data structures like ring buffers. + * + * @param size The input size to be normalized. + * @return The nearest power of two greater than or equal to the input size. + */ + public static int normalizeRingbufferSize(int size) { + int ringBufferSize = size - 1; + if (size < 1) { + return DEFAULT_RING_BUFFER_SIZE; + } + ringBufferSize |= ringBufferSize >>> 1; + ringBufferSize |= ringBufferSize >>> 2; + ringBufferSize |= ringBufferSize >>> 4; + ringBufferSize |= ringBufferSize >>> 8; + ringBufferSize |= ringBufferSize >>> 16; + return ringBufferSize + 1; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 862b85597cdc3a..ea0c263a5ee1c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -168,8 +168,8 @@ public void schedulerInstantJob(T job, TaskType taskType, C context) throws JobE for (AbstractTask task : tasks) { if (!taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(), job.getJobConfig())) { - throw new JobException("dispatch instant task failed, job id is " - + job.getJobId() + ", task id is " + task.getTaskId()); + throw new JobException(job.formatMsgWhenExecuteQueueFull(task.getTaskId())); + } log.info("dispatch instant job, job id is {}, job name is {}, task id is {}", job.getJobId(), job.getJobName(), task.getTaskId()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/manager/TaskDisruptorGroupManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/manager/TaskDisruptorGroupManagerTest.java new file mode 100644 index 00000000000000..661bdcdcca2b1b --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/job/manager/TaskDisruptorGroupManagerTest.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.manager; + + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TaskDisruptorGroupManagerTest { + + @Test + public void testInit() { + Assertions.assertEquals(1024, TaskDisruptorGroupManager.normalizeRingbufferSize(1024)); + Assertions.assertEquals(1024, TaskDisruptorGroupManager.normalizeRingbufferSize(-1)); + Assertions.assertEquals(16, TaskDisruptorGroupManager.normalizeRingbufferSize(15)); + Assertions.assertEquals(1024, TaskDisruptorGroupManager.normalizeRingbufferSize(1023)); + Assertions.assertEquals(1024, TaskDisruptorGroupManager.normalizeRingbufferSize(-8)); + Assertions.assertEquals(2048, TaskDisruptorGroupManager.normalizeRingbufferSize(1025)); + Assertions.assertEquals(4096, TaskDisruptorGroupManager.normalizeRingbufferSize(2049)); + } +} diff --git a/regression-test/pipeline/p0/conf/fe.conf b/regression-test/pipeline/p0/conf/fe.conf index ce965f7f99608e..48cc9598ef4b96 100644 --- a/regression-test/pipeline/p0/conf/fe.conf +++ b/regression-test/pipeline/p0/conf/fe.conf @@ -67,6 +67,7 @@ label_keep_max_second = 300 # job test configurations #allows the creation of jobs with an interval of second enable_job_schedule_second_for_test = true +mtmv_task_queue_size = 4096 enable_workload_group = true publish_topic_info_interval_ms = 1000