Skip to content

Commit

Permalink
[Chore](job) Provides configuration of job execution queue size (apac…
Browse files Browse the repository at this point in the history
…he#42253)

When dealing with a large number of tasks, the default execution queue
size is 1024. This can lead to tasks being dropped if the queue becomes
full.
eg

`dispatch instant task failed, job id is xxx`

To address this, you can add the parameters `insert_task_queue_size` and
`mtmv_task_queue_size` in the `fe.conf` configuration file. These
parameters must be set to a power of 2.

**Keep in mind, increasing this value is recommended only when thread
resources are limited; otherwise, you should consider increasing the
number of task execution threads.**

(cherry picked from commit f9ea8f8)
  • Loading branch information
CalvinKirs committed Oct 28, 2024
1 parent d529627 commit 6138080
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 15 deletions.
26 changes: 18 additions & 8 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1746,7 +1746,7 @@ public class Config extends ConfigBase {
* corresponding type of job
* The value should be greater than 0, if it is 0 or <=0, set it to 5
*/
@ConfField(description = {"用于分发定时任务的线程数",
@ConfField(masterOnly = true, description = {"用于分发定时任务的线程数",
"The number of threads used to dispatch timer job."})
public static int job_dispatch_timer_job_thread_num = 2;

Expand All @@ -1757,24 +1757,34 @@ public class Config extends ConfigBase {
* {@code @dispatch_timer_job_thread_num}
* The value should be greater than 0, if it is 0 or <=0, set it to 1024
*/
@ConfField(description = {"任务堆积时用于存放定时任务的队列大小", "The number of timer jobs that can be queued."})
@ConfField(masterOnly = true, description = {"任务堆积时用于存放定时任务的队列大小", "The number of timer jobs that can be queued."})
public static int job_dispatch_timer_job_queue_size = 1024;
@ConfField(description = {"一个 Job 的 task 最大的持久化数量,超过这个限制将会丢弃旧的 task 记录, 如果值 < 1, 将不会持久化。",
@ConfField(masterOnly = true, description = {"一个 Job 的 task 最大的持久化数量,超过这个限制将会丢弃旧的 task 记录, 如果值 < 1, 将不会持久化。",
"Maximum number of persistence allowed per task in a job,exceeding which old tasks will be discarded,"
+ "If the value is less than 1, it will not be persisted." })
public static int max_persistence_task_count = 100;
@ConfField(description = {"finished 状态的 job 最长保存时间,超过这个时间将会被删除, 单位:小时",
@ConfField(masterOnly = true, description = {"MV task 的等待队列大小,如果是负数,则会使用 1024,如果不是 2 的幂,则会自动选择一个最接近的"
+ " 2 的幂次方数", "The size of the MV task's waiting queue If the size is negative, 1024 will be used. If "
+ "the size is not a power of two, the nearest power of the size will be"
+ " automatically selected."})
public static int mtmv_task_queue_size = 1024;
@ConfField(masterOnly = true, description = {"Insert task 的等待队列大小,如果是负数,则会使用 1024,如果不是 2 的幂,则会自动选择一个最接近"
+ " 的 2 的幂次方数", "The size of the Insert task's waiting queue If the size is negative, 1024 will be used."
+ " If the size is not a power of two, the nearest power of the size will "
+ "be automatically selected."})
public static int insert_task_queue_size = 1024;
@ConfField(masterOnly = true, description = {"finished 状态的 job 最长保存时间,超过这个时间将会被删除, 单位:小时",
"The longest time to save the job in finished status, it will be deleted after this time. Unit: hour"})
public static int finished_job_cleanup_threshold_time_hour = 24;

@ConfField(description = {"用于执行 Insert 任务的线程数,值应该大于0,否则默认为5",
@ConfField(masterOnly = true, description = {"用于执行 Insert 任务的线程数,值应该大于0,否则默认为10",
"The number of threads used to consume Insert tasks, "
+ "the value should be greater than 0, if it is <=0, default is 5."})
+ "the value should be greater than 0, if it is <=0, default is 10."})
public static int job_insert_task_consumer_thread_num = 10;

@ConfField(description = {"用于执行 MTMV 任务的线程数,值应该大于0,否则默认为5",
@ConfField(masterOnly = true, description = {"用于执行 MTMV 任务的线程数,值应该大于0,否则默认为10",
"The number of threads used to consume mtmv tasks, "
+ "the value should be greater than 0, if it is <=0, default is 5."})
+ "the value should be greater than 0, if it is <=0, default is 10."})
public static int job_mtmv_task_consumer_thread_num = 10;

/* job test config */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
new Column("SucceedTaskCount", ScalarType.createStringType()),
new Column("FailedTaskCount", ScalarType.createStringType()),
new Column("CanceledTaskCount", ScalarType.createStringType())
);
);
@SerializedName(value = "jid")
private Long jobId;

Expand Down Expand Up @@ -415,6 +415,23 @@ public TRow getTvfInfo() {
return getCommonTvfInfo();
}

/**
* Generates a common error message when the execution queue is full.
*
* @param taskId The ID of the task.
* @param queueConfigName The name of the queue configuration.
* @param executeThreadConfigName The name of the execution thread configuration.
* @return A formatted error message.
*/
protected String commonFormatMsgWhenExecuteQueueFull(Long taskId, String queueConfigName,
String executeThreadConfigName) {
return String.format("Dispatch task failed, jobId: %d, jobName: %s, taskId: %d, the queue size is full, "
+ "you can increase the queue size by setting the property "
+ "%s in the fe.conf file or increase the value of "
+ "the property %s in the fe.conf file", getJobId(), getJobName(), taskId, queueConfigName,
executeThreadConfigName);
}

@Override
public ShowResultSetMetaData getJobMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
Expand Down
7 changes: 7 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,11 @@ public interface Job<T extends AbstractTask, C> {
* @return TRow
*/
TRow getTvfInfo();

/**
* Generates a common error message when the execution queue is full.
* @param taskId task id
* @return error msg for user
*/
String formatMsgWhenExecuteQueueFull(Long taskId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,17 @@ public void run(Timeout timeout) {
return;
}
if (!dispatchDisruptor.publishEvent(this.job)) {
log.warn("dispatch timer job failed, job id is {}, job name is {}",
this.job.getJobId(), this.job.getJobName());
log.warn("dispatch timer job failed, queue maybe full. job id is {}, job name is {}",
this.job.getJobId(), this.job.getJobName() + getMsgWhenExecuteQueueFull());
}
} catch (Exception e) {
log.warn("dispatch timer job error, task id is {}", this.job.getJobId(), e);
}
}

private String getMsgWhenExecuteQueueFull() {
return "you can increase the queue size by setting the property "
+ "job_dispatch_timer_job_queue_size in the fe.conf file or increase the value of "
+ "the property job_dispatch_timer_job_thread_num in the fe.conf file";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,12 @@ public List<String> getShowInfo() {
}
}

@Override
public String formatMsgWhenExecuteQueueFull(Long taskId) {
return commonFormatMsgWhenExecuteQueueFull(taskId, "insert_task_queue_size",
"job_insert_task_consumer_thread_num");
}

private String getPriority() {
return properties.getOrDefault(LoadStmt.PRIORITY, Priority.NORMAL.name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ public List<String> getShowInfo() {
return data;
}

@Override
public String formatMsgWhenExecuteQueueFull(Long taskId) {
return commonFormatMsgWhenExecuteQueueFull(taskId, "mtmv_task_queue_size",
"job_mtmv_task_consumer_thread_num");

}

@Override
public TRow getTvfInfo() {
TRow trow = new TRow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public class TaskDisruptorGroupManager<T extends AbstractTask> {
private static final int DISPATCH_MTMV_THREAD_NUM = Config.job_mtmv_task_consumer_thread_num > 0
? Config.job_mtmv_task_consumer_thread_num : DEFAULT_CONSUMER_THREAD_NUM;

private static final int DISPATCH_INSERT_TASK_QUEUE_SIZE = DEFAULT_RING_BUFFER_SIZE;
private static final int DISPATCH_MTMV_TASK_QUEUE_SIZE = DEFAULT_RING_BUFFER_SIZE;
private static final int DISPATCH_INSERT_TASK_QUEUE_SIZE = normalizeRingbufferSize(Config.insert_task_queue_size);
private static final int DISPATCH_MTMV_TASK_QUEUE_SIZE = normalizeRingbufferSize(Config.mtmv_task_queue_size);


public void init() {
Expand Down Expand Up @@ -133,4 +133,24 @@ public boolean dispatchInstantTask(AbstractTask task, JobType jobType,
}


/**
* Normalizes the given size to the nearest power of two.
* This method ensures that the size is a power of two, which is often required for optimal
* performance in certain data structures like ring buffers.
*
* @param size The input size to be normalized.
* @return The nearest power of two greater than or equal to the input size.
*/
public static int normalizeRingbufferSize(int size) {
int ringBufferSize = size - 1;
if (size < 1) {
return DEFAULT_RING_BUFFER_SIZE;
}
ringBufferSize |= ringBufferSize >>> 1;
ringBufferSize |= ringBufferSize >>> 2;
ringBufferSize |= ringBufferSize >>> 4;
ringBufferSize |= ringBufferSize >>> 8;
ringBufferSize |= ringBufferSize >>> 16;
return ringBufferSize + 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ public void schedulerInstantJob(T job, TaskType taskType, C context) throws JobE
for (AbstractTask task : tasks) {
if (!taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(),
job.getJobConfig())) {
throw new JobException("dispatch instant task failed, job id is "
+ job.getJobId() + ", task id is " + task.getTaskId());
throw new JobException(job.formatMsgWhenExecuteQueueFull(task.getTaskId()));

}
log.info("dispatch instant job, job id is {}, job name is {}, task id is {}", job.getJobId(),
job.getJobName(), task.getTaskId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.job.manager;


import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TaskDisruptorGroupManagerTest {

@Test
public void testInit() {
Assertions.assertEquals(1024, TaskDisruptorGroupManager.normalizeRingbufferSize(1024));
Assertions.assertEquals(1024, TaskDisruptorGroupManager.normalizeRingbufferSize(-1));
Assertions.assertEquals(16, TaskDisruptorGroupManager.normalizeRingbufferSize(15));
Assertions.assertEquals(1024, TaskDisruptorGroupManager.normalizeRingbufferSize(1023));
Assertions.assertEquals(1024, TaskDisruptorGroupManager.normalizeRingbufferSize(-8));
Assertions.assertEquals(2048, TaskDisruptorGroupManager.normalizeRingbufferSize(1025));
Assertions.assertEquals(4096, TaskDisruptorGroupManager.normalizeRingbufferSize(2049));
}
}
1 change: 1 addition & 0 deletions regression-test/pipeline/p0/conf/fe.conf
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ label_keep_max_second = 300
# job test configurations
#allows the creation of jobs with an interval of second
enable_job_schedule_second_for_test = true
mtmv_task_queue_size = 4096

enable_workload_group = true
publish_topic_info_interval_ms = 1000
Expand Down

0 comments on commit 6138080

Please sign in to comment.