Skip to content

Commit

Permalink
[Chore](Job)Add the configuration of the maximum number of persistenc…
Browse files Browse the repository at this point in the history
…e tasks for the job
  • Loading branch information
CalvinKirs committed Dec 14, 2023
1 parent 9e62e3e commit adf2043
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1585,7 +1585,10 @@ public class Config extends ConfigBase {
*/
@ConfField(description = {"任务堆积时用于存放定时任务的队列大小", "The number of timer jobs that can be queued."})
public static int job_dispatch_timer_job_queue_size = 1024;

@ConfField(description = {"一个 Job 的 task 最大的持久化数量,超过这个限制将会丢弃旧的 task 记录, 如果值 < 1, 将不会持久化。",
"Maximum number of persistence allowed per task in a job,exceeding which old tasks will be discarded,"
+ "If the value is less than 1, it will not be persisted." })
public static int max_persistence_task_count = 100;
@ConfField(description = {"finished 状态的 job 最长保存时间,超过这个时间将会被删除, 单位:小时",
"The longest time to save the job in finished status, it will be deleted after this time. Unit: hour"})
public static int finished_job_cleanup_threshold_time_hour = 24;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.job.base.AbstractJob;
Expand Down Expand Up @@ -85,10 +86,7 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
ConnectContext ctx;

@SerializedName("tis")
ConcurrentLinkedQueue<Long> taskIdList;

// max save task num, do we need to config it?
private static final int MAX_SAVE_TASK_NUM = 100;
ConcurrentLinkedQueue<Long> historyTaskIdList;

@Override
public List<InsertTask> createTasks(TaskType taskType, Map taskContext) {
Expand All @@ -100,21 +98,23 @@ public List<InsertTask> createTasks(TaskType taskType, Map taskContext) {
ArrayList<InsertTask> tasks = new ArrayList<>();
tasks.add(task);
super.initTasks(tasks);
addNewTask(task.getTaskId());
recordTask(task.getTaskId());
return tasks;
}

public void addNewTask(long id) {

if (CollectionUtils.isEmpty(taskIdList)) {
taskIdList = new ConcurrentLinkedQueue<>();
public void recordTask(long id) {
if (Config.max_persistence_task_count < 1) {
return;
}
if (CollectionUtils.isEmpty(historyTaskIdList)) {
historyTaskIdList = new ConcurrentLinkedQueue<>();
Env.getCurrentEnv().getEditLog().logUpdateJob(this);
taskIdList.add(id);
historyTaskIdList.add(id);
return;
}
taskIdList.add(id);
if (taskIdList.size() >= MAX_SAVE_TASK_NUM) {
taskIdList.poll();
historyTaskIdList.add(id);
if (historyTaskIdList.size() >= Config.max_persistence_task_count) {
historyTaskIdList.poll();
}
Env.getCurrentEnv().getEditLog().logUpdateJob(this);
}
Expand Down Expand Up @@ -148,11 +148,11 @@ protected void checkJobParamsInternal() {

@Override
public List<InsertTask> queryTasks() {
if (CollectionUtils.isEmpty(taskIdList)) {
if (CollectionUtils.isEmpty(historyTaskIdList)) {
return new ArrayList<>();
}
//TODO it's will be refactor, we will storage task info in job inner and query from it
List<Long> taskIdList = new ArrayList<>(this.taskIdList);
List<Long> taskIdList = new ArrayList<>(this.historyTaskIdList);
Collections.reverse(taskIdList);
List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIdList);
if (CollectionUtils.isEmpty(loadJobs)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.mtmv;

import org.apache.doris.common.Config;
import org.apache.doris.job.extensions.mtmv.MTMVTask;

import com.google.common.collect.Lists;
Expand All @@ -43,8 +44,11 @@ public String getJobName() {
}

public void addHistoryTask(MTMVTask task) {
if (Config.max_persistence_task_count < 1) {
return;
}
historyTasks.add(task);
if (historyTasks.size() > MTMVTask.MAX_HISTORY_TASKS_NUM) {
if (historyTasks.size() > Config.max_persistence_task_count) {
historyTasks.removeFirst();
}
}
Expand Down

0 comments on commit adf2043

Please sign in to comment.