diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index b5c2bace7ae2e1..a7c78d966a26e2 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1776,6 +1776,12 @@ public class Config extends ConfigBase { "Maximum number of persistence allowed per task in a job,exceeding which old tasks will be discarded," + "If the value is less than 1, it will not be persisted." }) public static int max_persistence_task_count = 100; + @ConfField(description = {"MV task 的等待队列大小,值必须是 2 的幂,否则将使用默认值 1024", "The size of the MV task's" + + "waiting queue must be a power of 2; otherwise, the default value of 1024 will be used."}) + public static int mtmv_task_queue_size = 1024; + @ConfField(description = {"Insert task 的等待队列大小,值必须是 2 的幂,否则将使用默认值 1024", "The size of the Insert " + + "task's waiting queue must be a power of 2; otherwise, the default value of 1024 will be used."}) + public static int insert_task_queue_size = 1024; @ConfField(description = {"finished 状态的 job 最长保存时间,超过这个时间将会被删除, 单位:小时", "The longest time to save the job in finished status, it will be deleted after this time. Unit: hour"}) public static int finished_job_cleanup_threshold_time_hour = 24; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java index b1ccb9764438c8..ed45b97e1a2ba9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java @@ -65,8 +65,10 @@ public class TaskDisruptorGroupManager { private static final int DISPATCH_MTMV_THREAD_NUM = Config.job_mtmv_task_consumer_thread_num > 0 ? Config.job_mtmv_task_consumer_thread_num : DEFAULT_CONSUMER_THREAD_NUM; - private static final int DISPATCH_INSERT_TASK_QUEUE_SIZE = DEFAULT_RING_BUFFER_SIZE; - private static final int DISPATCH_MTMV_TASK_QUEUE_SIZE = DEFAULT_RING_BUFFER_SIZE; + private static final int DISPATCH_INSERT_TASK_QUEUE_SIZE = isPowerOfTwo(Config.insert_task_queue_size) + ? Config.insert_task_queue_size : DEFAULT_RING_BUFFER_SIZE; + private static final int DISPATCH_MTMV_TASK_QUEUE_SIZE = isPowerOfTwo(Config.mtmv_task_queue_size) + ? Config.mtmv_task_queue_size : DEFAULT_RING_BUFFER_SIZE; public void init() { @@ -132,5 +134,8 @@ public boolean dispatchInstantTask(AbstractTask task, JobType jobType, return disruptorMap.get(jobType).publishEvent(task, jobExecutionConfiguration); } - + private static boolean isPowerOfTwo(int n) { + // Check if n is greater than 0 and if n & (n - 1) equals 0 + return n > 0 && (n & (n - 1)) == 0; + } } diff --git a/regression-test/pipeline/p0/conf/fe.conf b/regression-test/pipeline/p0/conf/fe.conf index e9448b340144ac..625012f9a65478 100644 --- a/regression-test/pipeline/p0/conf/fe.conf +++ b/regression-test/pipeline/p0/conf/fe.conf @@ -67,6 +67,7 @@ label_keep_max_second = 300 # job test configurations #allows the creation of jobs with an interval of second enable_job_schedule_second_for_test = true +mtmv_task_queue_size = 4096 enable_workload_group = true publish_topic_info_interval_ms = 1000