diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 486e2bfdfb015b..516f89ec37ce03 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1585,7 +1585,10 @@ public class Config extends ConfigBase { */ @ConfField(description = {"任务堆积时用于存放定时任务的队列大小", "The number of timer jobs that can be queued."}) public static int job_dispatch_timer_job_queue_size = 1024; - + @ConfField(description = {"一个 Job 的 task 最大的持久化数量,超过这个限制将会丢弃旧的 task 记录, 如果值 < 1, 将不会持久化。", + "Maximum number of persistence allowed per task in a job,exceeding which old tasks will be discarded," + + "If the value is less than 1, it will not be persisted." }) + public static int max_persistence_task_count = 100; @ConfField(description = {"finished 状态的 job 最长保存时间,超过这个时间将会被删除, 单位:小时", "The longest time to save the job in finished status, it will be deleted after this time. Unit: hour"}) public static int finished_job_cleanup_threshold_time_hour = 24; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index 968f1413528324..74581b8f1b0b49 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -20,6 +20,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.Config; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; import org.apache.doris.job.base.AbstractJob; @@ -85,10 +86,7 @@ public class InsertJob extends AbstractJob { ConnectContext ctx; @SerializedName("tis") - ConcurrentLinkedQueue taskIdList; - - // max save task num, do we need to config it? - private static final int MAX_SAVE_TASK_NUM = 100; + ConcurrentLinkedQueue historyTaskIdList; @Override public List createTasks(TaskType taskType, Map taskContext) { @@ -100,21 +98,23 @@ public List createTasks(TaskType taskType, Map taskContext) { ArrayList tasks = new ArrayList<>(); tasks.add(task); super.initTasks(tasks); - addNewTask(task.getTaskId()); + recordTask(task.getTaskId()); return tasks; } - public void addNewTask(long id) { - - if (CollectionUtils.isEmpty(taskIdList)) { - taskIdList = new ConcurrentLinkedQueue<>(); + public void recordTask(long id) { + if (Config.max_persistence_task_count < 1) { + return; + } + if (CollectionUtils.isEmpty(historyTaskIdList)) { + historyTaskIdList = new ConcurrentLinkedQueue<>(); Env.getCurrentEnv().getEditLog().logUpdateJob(this); - taskIdList.add(id); + historyTaskIdList.add(id); return; } - taskIdList.add(id); - if (taskIdList.size() >= MAX_SAVE_TASK_NUM) { - taskIdList.poll(); + historyTaskIdList.add(id); + if (historyTaskIdList.size() >= Config.max_persistence_task_count) { + historyTaskIdList.poll(); } Env.getCurrentEnv().getEditLog().logUpdateJob(this); } @@ -148,11 +148,11 @@ protected void checkJobParamsInternal() { @Override public List queryTasks() { - if (CollectionUtils.isEmpty(taskIdList)) { + if (CollectionUtils.isEmpty(historyTaskIdList)) { return new ArrayList<>(); } //TODO it's will be refactor, we will storage task info in job inner and query from it - List taskIdList = new ArrayList<>(this.taskIdList); + List taskIdList = new ArrayList<>(this.historyTaskIdList); Collections.reverse(taskIdList); List loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIdList); if (CollectionUtils.isEmpty(loadJobs)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobInfo.java index f253c708e909f4..aca04ee706f69a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobInfo.java @@ -17,6 +17,7 @@ package org.apache.doris.mtmv; +import org.apache.doris.common.Config; import org.apache.doris.job.extensions.mtmv.MTMVTask; import com.google.common.collect.Lists; @@ -43,8 +44,11 @@ public String getJobName() { } public void addHistoryTask(MTMVTask task) { + if (Config.max_persistence_task_count < 1) { + return; + } historyTasks.add(task); - if (historyTasks.size() > MTMVTask.MAX_HISTORY_TASKS_NUM) { + if (historyTasks.size() > Config.max_persistence_task_count) { historyTasks.removeFirst(); } }