Skip to content

Commit

Permalink
[Improve](Job)Refactor JOB
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs committed Nov 13, 2023
1 parent 4fb7457 commit d10cce6
Show file tree
Hide file tree
Showing 17 changed files with 193 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ShowJobStmt extends ShowStmt {
.add("ExecuteType")
.add("RecurringStrategy")
.add("Status")
.add("lastExecuteTaskStatus")
.add("ExecuteSql")
.add("CreateTime")
.add("Comment")
.build();
Expand Down
57 changes: 45 additions & 12 deletions fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.job.task.AbstractTask;
Expand Down Expand Up @@ -63,14 +65,27 @@ public abstract class AbstractJob<T extends AbstractTask> implements Job<T>, Wri
@SerializedName(value = "createTimeMs")
private Long createTimeMs;

private List<? extends AbstractTask> runningTasks = new ArrayList<>();
@SerializedName(value = "executeSql")
String executeSql;

private List<T> runningTasks = new ArrayList<>();

@Override
public void cancel() throws JobException {
if (CollectionUtils.isEmpty(runningTasks)) {
return;
}
runningTasks.forEach(Task::cancel);

}

public void initTasks(List<T> tasks) {
tasks.forEach(task -> {
task.setJobId(jobId);
task.setTaskId(Env.getCurrentEnv().getNextId());
task.setCreateTimeMs(System.currentTimeMillis());
task.setStatus(TaskStatus.PENDING);
});
}

public void checkJobParams() {
Expand All @@ -89,17 +104,18 @@ public void updateJobStatus(JobStatus newJobStatus) {
throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status",
jobStatus.name(), this.jobStatus.name()));
}
// check other status
}

public void resumeJob() {
if (jobStatus != JobStatus.PAUSED) {
throw new IllegalArgumentException(String.format("Can't resume job %s status to the %s status",
if (newJobStatus.equals(JobStatus.RUNNING) && !jobStatus.equals(JobStatus.PAUSED)) {
throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status",
jobStatus.name(), this.jobStatus.name()));
}
jobStatus = JobStatus.RUNNING;
if (newJobStatus.equals(JobStatus.STOPPED) && !jobStatus.equals(JobStatus.RUNNING)) {
throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status",
jobStatus.name(), this.jobStatus.name()));
}
jobStatus = newJobStatus;
}


protected abstract void checkJobParamsInternal();

public static AbstractJob readFields(DataInput in) throws IOException {
Expand All @@ -117,13 +133,14 @@ public static AbstractJob readFields(DataInput in) throws IOException {
}

@Override
public void onTaskFail(long taskId) {
public void onTaskFail(T task) {
updateJobStatusIfEnd();
}

@Override
public void onTaskSuccess(long taskId) {
public void onTaskSuccess(T task) {
updateJobStatusIfEnd();
runningTasks.remove(task);

}

Expand All @@ -139,8 +156,10 @@ private void updateJobStatusIfEnd() {
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(jobStatus);
break;
case RECURRING:
if (null != getJobConfig().getTimerDefinition().getEndTimeMs()
&& getJobConfig().getTimerDefinition().getEndTimeMs() < System.currentTimeMillis()) {
TimerDefinition timerDefinition = getJobConfig().getTimerDefinition();
if (null != timerDefinition.getEndTimeMs()
&& timerDefinition.getEndTimeMs() < System.currentTimeMillis()
+ timerDefinition.getIntervalUnit().getIntervalMs(timerDefinition.getInterval())) {
jobStatus = JobStatus.FINISHED;
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(jobStatus);
}
Expand All @@ -149,4 +168,18 @@ && getJobConfig().getTimerDefinition().getEndTimeMs() < System.currentTimeMillis
break;
}
}

public List<String> getCommonShowInfo() {
List<String> commonShowInfo = new ArrayList<>();
commonShowInfo.add(String.valueOf(jobId));
commonShowInfo.add(jobName);
commonShowInfo.add(createUser);
commonShowInfo.add(jobConfig.getExecuteType().name());
commonShowInfo.add(jobConfig.convertRecurringStrategyToString());
commonShowInfo.add(jobStatus.name());
commonShowInfo.add(executeSql);
commonShowInfo.add(TimeUtils.longToTimeString(createTimeMs));
commonShowInfo.add(comment);
return commonShowInfo;
}
}
13 changes: 7 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.job.base;

import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.qe.ShowResultSetMetaData;
Expand All @@ -33,9 +34,9 @@
*/
public interface Job<T extends AbstractTask> {

List<T> createTasks();
List<T> createTasks(TaskType taskType);

void cancel() throws JobException;
void cancel(T task) throws JobException;

boolean isReadyForScheduling();

Expand All @@ -48,12 +49,12 @@ public interface Job<T extends AbstractTask> {

List<T> queryTasks();

void onTaskFail(long taskId);
void cancel() throws JobException;

void onTaskSuccess(long taskId);
void onTaskFail(T task);

void onTaskCancel(long taskId);
void onTaskSuccess(T task);

void afterTaskRun(long taskId);
void onTaskCancel(T task);

}
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,16 @@ private List<Long> getExecutionDelaySeconds(long windowStartTimeMs, long windowE

// Calculate the trigger time list
for (long triggerTime = firstTriggerTime; triggerTime <= windowEndTimeMs; triggerTime += intervalMs) {
if (triggerTime >= currentTimeMs) {
if (triggerTime >= currentTimeMs && (null == timerDefinition.getEndTimeMs()
|| triggerTime < timerDefinition.getEndTimeMs())) {
timestamps.add(queryDelayTimeSecond(currentTimeMs, triggerTime));
}
}

return timestamps;
}

private String convertRecurringStrategyToString() {
public String convertRecurringStrategyToString() {
switch (executeType) {
case ONE_TIME:
return "AT " + TimeUtils.longToTimeString(timerDefinition.getStartTimeMs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.job.common;

public enum TaskStatus {
PENDING,
CANCEL,
RUNNING,
SUCCESS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

public enum TaskType {

SCHDULER,
MANUAL,
INSTANT,
SCHEDULER,
MANUAL;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import lombok.Data;

@Data
public class TimerJobEvent<T extends AbstractJob> {
public class TimerJobEvent<T extends AbstractJob<?>> {


private T job;

public static <T extends AbstractJob> EventFactory<TimerJobEvent<T>> factory() {
public static <T extends AbstractJob<?>> EventFactory<TimerJobEvent<T>> factory() {
return TimerJobEvent::new;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,23 @@ public void onEvent(ExecuteTaskEvent<T> executeTaskEvent) {
log.warn("task is null, ignore,maybe task has been canceled");
return;
}
if (task.isCancelled()) {
log.info("task is canceled, ignore");
return;
}
if (null == executeTaskEvent.getJobConfig().getMaxConcurrentTaskNum()
|| executeTaskEvent.getJobConfig().getMaxConcurrentTaskNum() <= 0) {
try {
task.runTask();
return;
} catch (Exception e) {
log.warn("execute task error, task id is {}", task.getTaskId(), e);

}
}
int maxConcurrentTaskNum = executeTaskEvent.getJobConfig().getMaxConcurrentTaskNum();
Semaphore semaphore = null;
// get token
try {
int maxConcurrentTaskNum = executeTaskEvent.getJobConfig().getMaxConcurrentTaskNum();
semaphore = TaskTokenManager.tryAcquire(task.getJobId(), maxConcurrentTaskNum);
task.runTask();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.doris.job.executor;

import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.disruptor.TaskDisruptor;
import org.apache.doris.job.disruptor.TimerJobEvent;
import org.apache.doris.job.task.AbstractTask;
Expand All @@ -41,14 +43,14 @@ public DispatchTaskHandler(Map<JobType, TaskDisruptor<T>> disruptorMap) {


@Override
public void onEvent(TimerJobEvent<T> event) throws Exception {
public void onEvent(TimerJobEvent<T> event) {
try {
if (null == event.getJob()) {
log.info("job is null,may be job is deleted, ignore");
return;
}
if (event.getJob().isReadyForScheduling()) {
List<? extends AbstractTask> tasks = event.getJob().createTasks();
if (event.getJob().isReadyForScheduling() && event.getJob().getJobStatus() == JobStatus.RUNNING) {
List<? extends AbstractTask> tasks = event.getJob().createTasks(TaskType.SCHEDULER);
JobType jobType = event.getJob().getJobType();
for (AbstractTask task : tasks) {
disruptorMap.get(jobType).publishEvent(task, event.getJob().getJobConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.common.io.Text;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ShowResultSetMetaData;
Expand All @@ -31,29 +32,38 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

@Data
public class InsertJob extends AbstractJob {
public class InsertJob extends AbstractJob<InsertTask> {

@SerializedName(value = "labelPrefix")
String labelPrefix;
@SerializedName(value = "executeSql")
String executeSql;


@Override
public List<InsertTask> createTasks() {
public List<InsertTask> createTasks(TaskType taskType) {
InsertTask task = new InsertTask(null, null, null, null, null);
task.setJobId(getJobId());
task.setTaskType(taskType);
task.setTaskId(Env.getCurrentEnv().getNextId());
getRunningTasks().add(task);
return getRunningTasks();
ArrayList<InsertTask> tasks = new ArrayList<>();
tasks.add(task);
super.initTasks(tasks);
getRunningTasks().addAll(tasks);
return tasks;
}

@Override
public void cancel(InsertTask task) throws JobException {
super.cancel();
}


@Override
public void cancel() throws JobException {

super.cancel();
}

@Override
Expand All @@ -72,7 +82,7 @@ public static InsertJob readFields(DataInput in) throws IOException {
}

@Override
public List queryTasks() {
public List<InsertTask> queryTasks() {
return null;
}

Expand All @@ -92,24 +102,21 @@ public ShowResultSetMetaData getTaskMetaData() {
}

@Override
public void onTaskFail(long taskId) {

public void onTaskFail(InsertTask task) {
getRunningTasks().remove(task);
}

@Override
public void onTaskSuccess(long taskId) {

public void onTaskSuccess(InsertTask task) {
getRunningTasks().remove(task);
}

@Override
public void onTaskCancel(long taskId) {
public void onTaskCancel(InsertTask task) {
getRunningTasks().remove(task);

}

@Override
public void afterTaskRun(long taskId) {

}

@Override
public void write(DataOutput out) throws IOException {
Expand Down
Loading

0 comments on commit d10cce6

Please sign in to comment.