Skip to content

Commit

Permalink
[Improve](Job)Create task adds concurrency control
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs committed Dec 27, 2023
1 parent 05f185f commit 0e57221
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
33 changes: 23 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

@Data
Expand Down Expand Up @@ -89,7 +91,8 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
@SerializedName(value = "sql")
String executeSql;

public AbstractJob() {}
public AbstractJob() {
}

public AbstractJob(Long id) {
jobId = id;
Expand All @@ -104,7 +107,7 @@ public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
UserIdentity createUser,
JobExecutionConfiguration jobConfig) {
this(jobId, jobName, jobStatus, currentDbName, comment,
createUser, jobConfig, System.currentTimeMillis(), null, null);
createUser, jobConfig, System.currentTimeMillis(), null);
}

public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
Expand All @@ -113,8 +116,7 @@ public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
UserIdentity createUser,
JobExecutionConfiguration jobConfig,
Long createTimeMs,
String executeSql,
List<T> runningTasks) {
String executeSql) {
this.jobId = jobId;
this.jobName = jobName;
this.jobStatus = jobStatus;
Expand All @@ -124,10 +126,12 @@ public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
this.jobConfig = jobConfig;
this.createTimeMs = createTimeMs;
this.executeSql = executeSql;
this.runningTasks = runningTasks;
this.runningTasks = new ArrayList<>();
}

private List<T> runningTasks = new ArrayList<>();
private List<T> runningTasks;

private Lock createTaskLock = new ReentrantLock();

@Override
public void cancelAllTasks() throws JobException {
Expand Down Expand Up @@ -198,13 +202,22 @@ public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
log.info("job is not ready for scheduling, job id is {}", jobId);
return new ArrayList<>();
}
return createTasks(taskType, taskContext);
try {
//it's better to use tryLock and add timeout limit
createTaskLock.lock();
if (!isReadyForScheduling(taskContext)) {
log.info("job is not ready for scheduling, job id is {}", jobId);
return new ArrayList<>();
}
List<T> tasks = createTasks(taskType, taskContext);
tasks.forEach(task -> log.info("common create task, job id is {}, task id is {}", jobId, task.getTaskId()));
return tasks;
} finally {
createTaskLock.unlock();
}
}

public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
if (CollectionUtils.isEmpty(getRunningTasks())) {
runningTasks = new ArrayList<>();
}
tasks.forEach(task -> {
task.setTaskType(taskType);
task.setJobId(getJobId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public InsertJob(String jobName,
Long createTimeMs,
String executeSql) {
super(getNextJobId(), jobName, jobStatus, dbName, comment, createUser,
jobConfig, createTimeMs, executeSql, null);
jobConfig, createTimeMs, executeSql);
this.dbId = ConnectContext.get().getCurrentDbId();
}

Expand Down

0 comments on commit 0e57221

Please sign in to comment.