Skip to content

Commit

Permalink
[Chore](job)Provides configuration of job execution queue size
Browse files Browse the repository at this point in the history
When dealing with a large number of tasks, the default execution queue size is 1024. This can lead to tasks being dropped if the queue becomes full. To address this, you can add the parameters `insert_task_queue_size` and `mtmv_task_queue_size` in the `fe.conf` configuration file. These parameters must be set to a power of 2. Keep in mind, increasing this value is recommended only when thread resources are limited; otherwise, you should consider increasing the number of task execution threads.
  • Loading branch information
CalvinKirs committed Oct 22, 2024
1 parent bd2d846 commit 11e6560
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1776,6 +1776,12 @@ public class Config extends ConfigBase {
"Maximum number of persistence allowed per task in a job,exceeding which old tasks will be discarded,"
+ "If the value is less than 1, it will not be persisted." })
public static int max_persistence_task_count = 100;
@ConfField(description = {"MV task 的等待队列大小,值必须是 2 的幂,否则将使用默认值 1024", "The size of the MV task's"
+ "waiting queue must be a power of 2; otherwise, the default value of 1024 will be used."})
public static int mtmv_task_queue_size = 1024;
@ConfField(description = {"Insert task 的等待队列大小,值必须是 2 的幂,否则将使用默认值 1024", "The size of the Insert "
+ "task's waiting queue must be a power of 2; otherwise, the default value of 1024 will be used."})
public static int insert_task_queue_size = 1024;
@ConfField(description = {"finished 状态的 job 最长保存时间,超过这个时间将会被删除, 单位:小时",
"The longest time to save the job in finished status, it will be deleted after this time. Unit: hour"})
public static int finished_job_cleanup_threshold_time_hour = 24;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ public class TaskDisruptorGroupManager<T extends AbstractTask> {
private static final int DISPATCH_MTMV_THREAD_NUM = Config.job_mtmv_task_consumer_thread_num > 0
? Config.job_mtmv_task_consumer_thread_num : DEFAULT_CONSUMER_THREAD_NUM;

private static final int DISPATCH_INSERT_TASK_QUEUE_SIZE = DEFAULT_RING_BUFFER_SIZE;
private static final int DISPATCH_MTMV_TASK_QUEUE_SIZE = DEFAULT_RING_BUFFER_SIZE;
private static final int DISPATCH_INSERT_TASK_QUEUE_SIZE = isPowerOfTwo(Config.insert_task_queue_size)
? Config.insert_task_queue_size : DEFAULT_RING_BUFFER_SIZE;
private static final int DISPATCH_MTMV_TASK_QUEUE_SIZE = isPowerOfTwo(Config.mtmv_task_queue_size)
? Config.mtmv_task_queue_size : DEFAULT_RING_BUFFER_SIZE;


public void init() {
Expand Down Expand Up @@ -132,5 +134,8 @@ public boolean dispatchInstantTask(AbstractTask task, JobType jobType,
return disruptorMap.get(jobType).publishEvent(task, jobExecutionConfiguration);
}


private static boolean isPowerOfTwo(int n) {
// Check if n is greater than 0 and if n & (n - 1) equals 0
return n > 0 && (n & (n - 1)) == 0;
}
}
1 change: 1 addition & 0 deletions regression-test/pipeline/p0/conf/fe.conf
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ label_keep_max_second = 300
# job test configurations
#allows the creation of jobs with an interval of second
enable_job_schedule_second_for_test = true
mtmv_task_queue_size = 4096

enable_workload_group = true
publish_topic_info_interval_ms = 1000
Expand Down

0 comments on commit 11e6560

Please sign in to comment.