From a3880339aa79d04431c207684c18f474edb10709 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Wed, 27 Dec 2023 17:03:25 +0800 Subject: [PATCH] [Improve](Job)Create task adds concurrency control --- .../apache/doris/job/base/AbstractJob.java | 47 ++++++++++++------- .../job/extensions/insert/InsertJob.java | 2 +- 2 files changed, 32 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 6e9cb48da1ca445..9ff539c75cfc55f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -50,6 +50,8 @@ import java.util.Comparator; import java.util.List; import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @Data @@ -89,7 +91,8 @@ public abstract class AbstractJob implements Job runningTasks) { + String executeSql) { this.jobId = jobId; this.jobName = jobName; this.jobStatus = jobStatus; @@ -124,10 +126,12 @@ public AbstractJob(Long jobId, String jobName, JobStatus jobStatus, this.jobConfig = jobConfig; this.createTimeMs = createTimeMs; this.executeSql = executeSql; - this.runningTasks = runningTasks; + this.runningTasks = new ArrayList<>(); } - private List runningTasks = new ArrayList<>(); + private List runningTasks; + + private Lock createTaskLock = new ReentrantLock(); @Override public void cancelAllTasks() throws JobException { @@ -198,13 +202,22 @@ public List commonCreateTasks(TaskType taskType, C taskContext) { log.info("job is not ready for scheduling, job id is {}", jobId); return new ArrayList<>(); } - return createTasks(taskType, taskContext); + try { + //it's better to use tryLock and add timeout limit + createTaskLock.lock(); + if (!isReadyForScheduling(taskContext)) { + log.info("job is not ready for scheduling, job id is {}", jobId); + return new ArrayList<>(); + } + List tasks = createTasks(taskType, taskContext); + tasks.forEach(task -> log.info("common create task, job id is {}, task id is {}", jobId, task.getTaskId())); + return tasks; + } finally { + createTaskLock.unlock(); + } } public void initTasks(Collection tasks, TaskType taskType) { - if (CollectionUtils.isEmpty(getRunningTasks())) { - runningTasks = new ArrayList<>(); - } tasks.forEach(task -> { task.setTaskType(taskType); task.setJobId(getJobId()); @@ -364,10 +377,12 @@ public ShowResultSetMetaData getJobMetaData() { } @Override - public void onRegister() throws JobException {} + public void onRegister() throws JobException { + } @Override - public void onUnRegister() throws JobException {} + public void onUnRegister() throws JobException { + } @Override public void onReplayCreate() throws JobException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index 9256864efcacc19..a594c6c2dd4ec7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -192,7 +192,7 @@ public InsertJob(String jobName, Long createTimeMs, String executeSql) { super(getNextJobId(), jobName, jobStatus, dbName, comment, createUser, - jobConfig, createTimeMs, executeSql, null); + jobConfig, createTimeMs, executeSql); this.dbId = ConnectContext.get().getCurrentDbId(); }